More File Processing with Erlang

September 29th, 2007  |  Published in erlang  |  21 Comments  |  Bookmark on Pinboard.in

Tim has weighed in with another attempt at using Erlang for his Wide Finder Project, but he’s unhappy with his new solution too. I feel his pain; below, I’ll show another solution which improves on my earlier one, but I’m not entirely happy with this one either.

Tim’s assessment of my earlier solution is accurate. What was worst about my original approach was that it worked only by reading the whole file into memory, which obviously doesn’t work too well for the quarter-gigabyte log files Tim’s dealing with, unless you have a big machine with big memory. Tim also complains about the “number of processes” parameter, but as I’ll show at the very end of this blog entry, it’s not really needed, and it’s there only to allow for experimentation. And finally, Tim said he doesn’t like my ad hoc binary splitting algorithm, but he also points out that for this approach, it’s pretty much necessary, since Erlang doesn’t provide any modules supporting that.

So what improvements does this new version contain?

  • First, it addresses the file size issue by using klacke’s bfile module, which allows me to work with a full-size logfile rather than Tim’s original 10,000 line sample. If klacke hadn’t posted this to the erlang-questions mailing list, I wouldn’t have even tried to create a new solution. It’s a great module.
  • Second, it uses a two-level process hierarchy. I observed that with full-sized logfiles, the number of processes launched to perform matching could be quite large, and those processes would finish and seem to sit around waiting for the parent process to collect their results. Not surprisingly, the more processes the main program launched, the slower and slower it became. The two-level process hierarchy launches collectors whose sole job it is to launch processes to perform matching and then collect their results. This results in far fewer messages sitting around waiting for the main thread to collect them, and also allows for a higher degree of concurrency to be applied to both reading the file and collecting the results.
  • It still performs binary splitting into data chunks ending on newlines, but I think the algorithm is a little improved. Specifically, it accumulates the “leftovers” and passes them along to the next recursion, where they’re efficiently stuck onto the front of the next chunk. Coincidentally, Tim’s latest approach does something similar, but I don’t think it’s as efficient (but I didn’t try it to verify that, so I could be wrong).
  • Finally, at 84 lines including blank ones, the solution has remained relatively brief. This isn’t an improvement, but keeping the code short has been an unstated goal for me. After all, the brevity of the Ruby solution is pretty striking, plus if I have to write hundreds of lines of Erlang without achieving a significant speedup, I might as well do it in hundreds of line of Java, C++, or C instead.

Regardless of these improvements, the best time I achieved with this new solution is 9.8 seconds on an 8-core 2.33 GHz dual Intel Xeon system with 8 GB of RAM. On my dual-core 2.33 GHz 2 GB MacBook Pro, it clocks in at just over 24 seconds. Still too slow.

I’ve been naming my Erlang modules after Tim, given that he’s the originator of this problem, and he’s responsible for my getting even less sleep than usual over the past week. :-) The module for the solution below is called tbray5. The module for my original solution was of course named just tbray. Don’t ask what happened to tbray1 through tbray4; let’s just say that failed experiments are useful too.

There was a tbray6 briefly as well, when I experimented with adding mmap capabilities to klacke’s bfile module. As I mentioned to Tim in email a few days ago, I was wondering whether one could just drop into C, mmap the large logfile into memory, and return a binary representing the whole file back to Erlang. Seems simple enough, and I got it working, but because of mmap‘s alignment restrictions combined with the way the Erlang runtime allocate binaries, I was forced to copy the data into the binary, thus killing any performance benefits mmap might have provided.

Anyway, here’s tbray5.erl, and below, I’ll explain each section. Stop here if you’re not interested in the details.

Compiling and Running

Run the following command to compile the code:

erl -smp -make

To execute it, use one of the following command lines:

erl -smp -noshell -run tbray5 main <numProcs> <logfile>
erl -smp -noshell -run tbray5 main <logfile>

In these command lines, <numProcs> specifies both the number of processes to use for logfile analysis as well the number of 1k blocks to read from the logfile at a time, and <logfile> specifies the name of the logfile to analyze. Use the first command line to experiment with the number of processes to launch and 1k blocks to read. I found that 512 procs/blocks seems to yield the fastest execution times, so the second command line above defaults to 512, but YMMV.

find_match

The find_match function below is the same as always:

find_match("/ongoing/When/" ++ Last) ->
    case lists:member($., Last) of
        false -> 1;
        true -> 0
    end;
find_match(_) -> 0.

process_binary

The process_binary function below, which launches “matcher” processes, is the same as before, too, except I switched from lists:foldl to lists:foldr because it seemed to provide a slight speedup. This function receives the ID of a process to send results to, and a string (as an Erlang binary) that’s assumed to end with a newline. It launches a process that breaks the binary into a list of strings, tokenizes each string, then counts the matches using find_match.

process_binary(Pid, Bin) ->
    spawn(
      fun() ->
              L = string:tokens(binary_to_list(Bin), "\n"),
              V = lists:foldr(
                    fun(Line, Total) ->
                            Tok = string:tokens(Line, " "),
                            Total + find_match(lists:nth(7, Tok))
                    end, 0, L),
              Pid ! V
      end).

break_chunk_on_newline

The break_chunk_on_newline set of functions below breaks Erlang binaries read from the logfile into chunks that end with a newline. The first variant handles the case where the binary is already smaller than the desired chunk size. It just returns a 2-tuple consisting of the list of all chunks obtained so far, along with the remainder as a binary. The second variant does most of the work, splitting the binary into chunks of the desired size and walking them along to ensure they end with newlines, and accumulating all the processed chunks into a list. The third variant just encapsulates the chunk size calculation and passes the initial empty chunk accumulator list.

break_chunk_on_newline(Bin, Pos, All) when (Pos >= size(Bin)) -> {All, Bin};
break_chunk_on_newline(Bin, Pos, All) ->
    {_, <<C:8, _/binary>>} = split_binary(Bin, Pos),
    case C of
        $\n ->
            {Ba, Bb} = split_binary(Bin, Pos+1),
            break_chunk_on_newline(Bb, Pos, All ++ [Ba]);
        _ -> break_chunk_on_newline(Bin, Pos+1, All)
    end.
break_chunk_on_newline(Bin, N) -> break_chunk_on_newline(Bin, size(Bin) div N, []).

spawn_collector

The spawn_collector function below just spawns a function that collects match counts from process_binary processes, and then sends the total matches to another process. It takes a list of binaries as an argument and calls process_binary for each one, passing the collector process ID to each, and then it returns the collector process ID. The two-level process hierarchy I talked about above has collectors at the first level and “matcher” processes, spawned by the collectors, at the second level.

spawn_collector(Bins, Me) ->
    Collector = spawn(
                  fun() ->
                          V = lists:foldr(fun(_, T) -> receive V -> T + V end end, 0, Bins),
                          Me ! V
                  end),
    [process_binary(Collector, B) || B <- Bins],
    Collector.

scan_finish

The scan_finish set of functions below handles the remainder binary, the last one after all file reading and binary splitting is done. It ensures that a collector is spawned to handle the remainder, if there is one. The first variant is called if the remainder is empty, the second one otherwise.

scan_finish(<<>>, _, Pids) -> Pids;
scan_finish(More, Me, Pids) -> [spawn_collector([More], Me) | Pids].

scan_file

The scan_file set of functions below reads chunks of the logfile via bfile:fread, breaks each chunk via break_chunk_on_newline, and spawns collectors to process them. It handles any remainder binaries by prepending them to the front of the next chunk, or when the file is completely read, by passing any remainders to scan_finish. Note that the first variant of scan_file does all the work; the second one just initializes the recursion. The return value of scan_file is the list of collector process IDs.

scan_file(F, N, Readsize, Me, Leftover, Pids) ->
    Rd = bfile:fread(F, Readsize),
    case Rd of
        {ok, Bin} ->
            {Bins, More} = break_chunk_on_newline(list_to_binary([Leftover, Bin]), N),
            scan_file(F, N, Readsize, Me, More, [spawn_collector(Bins, Me) | Pids]);
        eof -> scan_finish(Leftover, Me, Pids)
    end.
scan_file(F, N, Readsize) ->
    scan_file(F, N, Readsize, self(), <<>>, []).

start

The start functions below initializes bfile, calls scan_file, and then collects results from the collector processes. The second variant sets the number of bytes to read at a time from the logfile to a default of 512 1k blocks.

start(Num, File, Readsize) ->
    bfile:load_driver(),
    {ok, F} = bfile:fopen(File, "r"),
    Pids = scan_file(F, Num, Readsize),
    bfile:fclose(F),
    lists:foldr(fun(_, T) -> receive V -> T + V end end, 0, Pids).
start(Num, File) ->
    start(Num, File, 512*1024).

main

And finally, the main functions below handle invocations from the shell command line, as explained earlier. The second variant runs a list of values used for the number of processes and the number of bytes to read from the logfile at a time, and prints out a list consisting of each value and the number of seconds it took to execute for that value.

main([N, F]) ->
    io:format("~p matches found~n", [start(list_to_integer(N), F)]),
    halt();
main([F]) ->
    Sz = [16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384],
    Results = lists:map(
      fun(S) ->
              Start = now(),
              start(S, F, S*1024),
              {S, time_diff(Start, now())}
      end,
      Sz),
    io:format("~p~n", [Results]),
    halt().

The time_diff function (not shown but included in tbray5.erl), which I borrowed from somewhere a few months back, just helps calculate the execution times for the second variant of main shown above.

Responses

  1. Marc Girod says:

    September 29th, 2007 at 4:04 am (#)

    Hi Steve,
    How to implement this kind of concurrency in C++?
    One wants to move dynamic locking (threads) to compile time.
    This is thus a matter of static typing (creating some classes).
    These classes should map to the concurrent “processes” (process is a language primitive, explicit or implicit in Occam 2).
    This part (the process-like aspect: start, stop with return to the original context–a higher-level process) is generic, thus to be coded as template. These templates should be ‘typed’ by being instantiated with the ‘semantic’ classes mentioned above, which would in addition inherit from them (a pattern popularized by Jim Coplien years ago)…
    I leave the other details: friendship, virtual mixins, a replaceable scheduler.
    Do you remember my paper in Linz in 96? Surely not.
    Best Regards!
    Marc

  2. steve says:

    September 29th, 2007 at 3:12 pm (#)

    Hi Marc,

    Even if you worked really hard and wrote some nice classes to handle threading and inter-thread messaging, you’d still never get close to Erlang concurrency with C++, because

    1. You’d have to rely on pthreads or a similar threading system underneath, and such threads are far more heavyweight than Erlang’s processes. If you wrote your own user-space threading package, you’d be looking at a lot of work.
    2. With respect to avoiding shared state, the C++ language does nothing to prevent programmers from creating it and relying on it. Shared state is of course what makes developing multithreaded systems so difficult.

    You might want to read my September/October 2007 “Toward Integration” column from IEEE Internet Computing entitled Concurrency with Erlang for more details.

    Rather than trying to reinvent Erlang in C++, why not just use Erlang?

  3. thomas lackner says:

    September 30th, 2007 at 4:23 pm (#)

    Did you profile this? I wonder how much time is spent finding the line boundaries (break_chunk_on_newline); this seems to be the “ugly” part of the problem, to me; buffered I/O is secondary. Perhaps reading N large binary buffers, and splitting after converting into lists to find the last (or first) newline would be faster, since working with lists is theoretically more optimized than working binaries – at least until R12B.

    What a shame that we have to do all this work just to match Ruby’s speed on such a naive program.

  4. steve says:

    September 30th, 2007 at 11:16 pm (#)

    Hi Thomas, I haven’t yet done any serious profiling, but cprof shows that break_chunk_on_newline/3 is called 3.77 million times for the million-line dataset (which is composed of Tim’s 10000 line sample dataset duplicated 100 times).

    I’ve considered a few ways to avoid or improve the break_chunk_on_newline step, but I’ve come up empty so far. One issue with the approach you mention is that it requires more sophisticated handling of the incomplete line fragments that can appear at the end of a chunk. The easiest way to handle such fragments would be to check for a newline at the end of the binary, and if not found, perform the binary_to_list conversion, reverse the result, and spawn a process to check the tail for matches while treating the head as a fragment to attach to the front of the next block. Unfortunately, this easy approach brings the binary_to_list conversion, which we already know to be expensive, back into the main process, which slows everything down. If you instead keep that processing off in another process, you need a way to collect those fragments, reassemble them, match them, and report the results back to the main process, which seems even more complicated than what I already have. But I’ll think about it some more.

    And yes, in a way it is a shame that we have to do all this work to try to match Ruby, but then again, it’s also been kinda fun. I like Ruby, so I definitely don’t view this as a language war. Rather, I just remain very intrigued by Tim’s original goal of being able to make the best possible use of multicore systems to process large log files, and using Erlang to do it.

  5. Anders N says:

    October 1st, 2007 at 3:41 pm (#)

    Its too bad nobody seems to have bothered profiling this.
    Because then You would see that string:tokens takes most of the CPU time.

    In a comment on Tims blog, see
    http://www.tbray.org/ongoing/When/200x/2007/09/23/Erlang#c1190653717.907469
    I wrote
    “Somehow everybody has gotten very fixated on the file IO, instead of the string handling.

    The string:tokens function that is used in Your example is NOT VERY efficient. So I rewrote the parsing to not use any library functions.

    On my laptop with a file with 100K lines, Your published version takes ~7 s.

    My version takes ~1 s.

    I then added the parallel stuff by Steve Vinoski.

    Using 2 processes on my core 2 duo laptop, takes ~ 0.8s.

    Finally native compilation, two process version gives ~0.7s for 100K lines.”

  6. Asd says:

    October 2nd, 2007 at 4:13 am (#)

    “if I have to write hundreds of lines of Erlang without achieving a significant speedup, I might as well do it in hundreds of line of Java, C++, or C instead.”

    80 lines of Java with 2 threads will give you a 30% speedup. No need for FUD

  7. steve says:

    October 2nd, 2007 at 6:50 am (#)

    Hmm, I wonder why so many Java folks seem to be so touchy? Anyway, 2 threads doesn’t cut it — Tim’s original problem statement was how to best make use of all the cores in a multicore system. I wouldn’t be surprised if 80 lines of Java with only 1 thread gave me a 30% speedup; that’s not the point. I have an 8 core Linux box, how does 2 threads help me? How will 2 threads help Tim when he tries this stuff on his 64-core machine? This isn’t about language wars, it’s about multicore programming, so if you’re here to defend your favorite language, then sorry, you’re wasting your time.

  8. Asd says:

    October 2nd, 2007 at 11:28 am (#)

    Hmm, why do the Erlang folks like to talk about scalability but not performance? ;)

    More than two threads won’t help with this problem because there it is dominated by IO. A configurable number of worker threads would take about 20 lines more. You had an implicit criticism of Java and C++ when you talked about them taking hundreds of lines, which is clearly not true.

    And this is a language war. What Tim was looking for is which language would allow a simple solution that performs well on multiple cores. Just using lots of cores is no good if it is slower than the simple solution in Ruby. Of course he chose the wrong problem, which was unfortunate.

  9. steve says:

    October 2nd, 2007 at 12:43 pm (#)

    I/O bound? Umm, no, that’s definitely not the case. I can read the whole million line sample in Erlang in half a second. It takes 9.5 more seconds on 8 cores to process the data. On 2 cores, it takes 24-25 more seconds to process the data. Clearly both the number of threads and number of cores are important.

  10. Asd says:

    October 2nd, 2007 at 2:19 pm (#)

    9.5 seconds to process the data with 8 cores? Wow. In Java it is pretty much IO bound. It only takes a couple of seconds with 1 core to process the data. The IO could be faster though; the machine I’m testing on has crappy disks and I could use Java nio instead of doing things the easy way.

  11. steve says:

    October 2nd, 2007 at 2:31 pm (#)

    Right, everyone keeps assuming it’s an I/O bound problem, but that’s not true across the board (it started out that way for Tim because he was using a very slow Erlang function for reading lines, but we got past that pretty quickly). Given the T2 that Tim’s planning to run this stuff on, I think I/O speed will definitely not be a problem.

    Erlang’s string handling isn’t all that great, regexp is currently not so hot either, and even the Erlang binary type could be better (and will be in the next release of Erlang, I hear), so that’s why I’ve found it so interesting to work on this problem in Erlang. I could have easily written solutions in C++, Java, perl, python, ruby, or whatever, but I wouldn’t have learned anything at all by doing so.

    Take a gander at comment #5 above, where Anders N talks about how he found some dramatic performance improvements by avoiding certain Erlang library calls. I’ll be digging into that next.

  12. Pete Kirkham says:

    October 2nd, 2007 at 3:12 pm (#)

    What’s your disk set-up? The million line sample is 200,995,500 Bytes, and half a second means you’re getting 400MB/s, so I guess it’s not the MacBook Pro you’re talking about at that rate.

  13. steve says:

    October 2nd, 2007 at 5:49 pm (#)

    Hi Pete, yes, it’s my MacBook Pro:

    16> timer:tc(read, start, ["o1M.ap"]).
    read 200995500 bytes
    {562187,true}
    

    If you’re not familiar with the output shown, 562187 is the number of microseconds the operation took. I even saw one as low as 348596. The read.erl module I used is below:

    -module(read).
    -export([start/1, start/2]).
    -compile([native]).
    
    scan_file(F, Readsize, Total) ->
        Rd = bfile:fread(F, Readsize),
        case Rd of
            {ok, Bin} -> scan_file(F, Readsize, size(Bin)+Total);
            eof -> Total
        end.
    scan_file(F, Readsize) -> scan_file(F, Readsize, 0).
    
    start(File, Readsize) ->
        bfile:load_driver(),
        {ok, F} = bfile:fopen(File, "r"),
        T = scan_file(F, Readsize),
        io:format("read ~p bytes~n", [T]),
        bfile:fclose(F).
    start(File) ->
        start(File, 512*1024).
    

    I read in large chunks by default — 512*1024 — but I find that any chunk size of 1024 or better yields a 0.4-0.6 total time.

    I got similar numbers from my 8-core Dell box running Linux.

  14. Stephan Schmidt says:

    October 3rd, 2007 at 10:39 am (#)

    Steve: “….you’d still never get close to Erlang concurrency with C++, …”

    Steve: “This isn’t about language wars, it’s about multicore programming …”

    Funny.

    Well, if someone wants to parse a large line set on several cores or machines, one would probably solve the problem with map reduce or a fork/join model. Both solutions are language independent and can be solved with one thread or several ones.

    IO is best solved with memory mapped files. They are easy to work on and can be easily splitted for parallel processing.

    Peace
    -stephan


    Stephan Schmidt :: stephan@reposita.org
    Reposita Open Source – Monitor your software development
    http://www.reposita.org
    Blog at http://stephan.reposita.org – No signal. No noise.

  15. Jim says:

    October 3rd, 2007 at 10:44 am (#)

    No WAY are you getting that kind of disk I/O on a notebook. Must be cached in memory.

    Jim

  16. steve says:

    October 3rd, 2007 at 11:03 am (#)

    To me a “language war” is when people emotionally defend their favorite programming language in a pointless protracted argument. Usually, those involved have never actually used the other languages they’re arguing against.

    I use C++/C every day, Erlang every day, Python every day, Perl every day, even emacs-lisp almost every day, and I’ve certainly written my share of Java. If you read my latest IEEE IC column Concurrency with Erlang you’ll see that your quote of me saying “….you’d still never get close to Erlang concurrency with C++…” is firmly backed up with proof. I’ve used C++ since 1988, and I like it a lot and am not bashing it; I’m merely comparing it against another language so that I can keep learning when best to apply which tool. So all in all, that quote has nothing at all in common with what I define as a language war.

  17. Pete Kirkham says:

    October 3rd, 2007 at 2:59 pm (#)

    I do think on that particular measure you’re reading from cache rather than physical disk; on my laptop it takes 8.5s if you clear the cache, 0.4s if you don’t. Most laptop drives are around 30MBps; a bit more if 10,000 rpm. Flash drives are twice that, but excel at seek time.

    However, that sort of number may actually be representative of the target system – if you assume a transfer rate in the 2GBps ballpark (Sun’s Thumper gives 2GBps to memory), and the T2’s 1.4GHz core, that gives only 0.7 CPU cycles (per hardware thread) to process each byte of data, so even the C++ code (which takes around 3 cycles per byte on my laptop) would be CPU limited rather than IO limited if you don’t spread the load over the available hardware threads. There’s more estimates at the link I put as website.

    If it’s a language war then it’s between erlang and ruby; I’m trying to find out what the VM of either language should be doing to solve this problem, so am benchmarking to find where the costs are if you write close-to-the-metal code to solve it. I wouldn’t write a log file extraction script in bit-twiddly C++; I’d use Perl or XSLT. You really shouldn’t have to, but performing experiments to help think about where the bottlenecks may be is useful.

    Pete

  18. steve says:

    October 3rd, 2007 at 4:09 pm (#)

    Hi Pete, yes, MacBook Pro disk I/O throughput is best case about 45 MB/sec from some figures I’ve seen, which I think would put us in the 4-5 sec range for reading this data, best case. If I run the Ruby solution on the full dataset, the first time it takes 7.5-8 secs, but the second time it takes 2.2-2.5 secs, which I believe shows the caching effects. But if we’re getting cached data, then I think that reinforces my point about the Erlang code not being I/O bound.

    Your second paragraph above has some very good insights, and your final paragraph is right on the money, IMO. I’d be really interested in seeing your benchmark results (I assume they’ll be on your website?), and needless to say Tim’s T2 results should be quite interesting as well.

  19. Dilip says:

    October 4th, 2007 at 4:03 pm (#)

    FWIW Joe Cheng has a C# 3.0 version[1] of this problem that seems to perform better than Ruby (even in terms of code brevity!). Of course it still needs PLINQ to make use of multi core/CPU hardware.

    [1] http://jcheng.wordpress.com/2007/10/02/wide-finder-with-linq/#more-240

  20. Hynek (Pichi) Vychodil says:

    October 8th, 2007 at 1:38 am (#)

    Steve: why bfile should be faster than file?
    http://pichis-blog.blogspot.com/2007/10/is-bfile-faster-than-old-erlang-file.html

  21. Hypothetical Labs » Worse Is Better Scaling says:

    October 11th, 2007 at 9:54 am (#)

    […] It’s spawned a couple of interesting threads on erlang-questions and several insightful blog […]