Home > study > Case Study: Fast File IO in Erlang

Case Study: Fast File IO in Erlang

October 3rd, 2007 :: jackyz

俺喜欢从实例中学习——遵循:“问题-分析-方案-结论”的认知过程。这篇是个小小的尝试,如果效果还不错,后面再去整理几个。杀猫长文,代码满塞,慎入。:D

【问题】

9月23日,Erlang 的 maillist 中,有人发现了这么一篇文章《Erlang Blues

正如它的标题,这篇文章从批评的眼光,指出了许多作者本人并不喜欢 Erlang 的地方。文章本身的观点姑且不论,但确实是指出了作为一个“普通程序员”看待和评估 Erlang 时必然会留意到的问题。即,按照通常习惯写出来的文件 io 代码,性能很糟糕,比如,下面的代码:

  1. scan_line(eof, _, Count) -> Count;
  2. scan_line(_, File, Count) ->
  3.     scan_line(io:get_line(File, ''), File, Count + 1).

这是一个简单的统计文件行数的程序,在他的 MacBook 上处理一个 1,167,948 行的日志文件,竟然用了 34.165 秒,而相同功能的 ruby 程序只用了 3.036 秒。性能差距有十倍之多,不可谓不大。

这是为什么?莫非 Erlang 的文件 io 系统设计得如此糟糕?且看 Erlang 的社区如何反应此事?

【分析】

Thomas Lindgren (在 XML 处理上,对 Erlang 和 Ruby 有过 10s / xxm 的比对经验)指出:

io:get_line 是在字符串级别处理文件的函数,它的实现非常慢,而 file:read_file 将整个文件读为 binary 则要快得多。

Caoyuan (是的,就是这里的 dcaoyuan ,知名的 ErlyBird 的作者)指出:

这个问题其实并不是关于文件 io 的,测试表明,file:read_file 以 binary 方式读到内存非常快,重要的是在于这个过程需要遍历 binary 数据,遍历整个 200m 的 binary 数据是很耗费时间的一个任务。

他给出了如下的测试程序:

  1. -module(widefinder).
  2.  
  3. -export([test/1,
  4.          test1/1,
  5.          test2/1,
  6.          test3/1]).
  7.  
  8. test(FileName) ->
  9.     statistics(wall_clock),
  10.     {ok, IO} = file:open(FileName, read),
  11.     {Matched, Total} = scan_line(IO),
  12.     {_, Duration} = statistics(wall_clock),
  13.     io:format("Duration ~pms~n Matched:~B, Total:~B", [Duration,
  14. Matched, Total]).
  15.  
  16. scan_line(IO) -> scan_line("", IO, 0, -1).
  17. scan_line(eof, _, Matched, Total) -> {Matched, Total};
  18. scan_line(Line, IO, Matched, Total) ->
  19.     NewCount = Matched + process_match(Line),
  20.     scan_line(io:get_line(IO, ''), IO, NewCount, Total + 1).
  21.  
  22. process_match([]) -> 0;
  23. process_match("/ongoing/When/"++Rest) ->
  24.     case parse_until_space(Rest, false) of
  25. true  -> 0;
  26. false -> 1
  27.     end;
  28. process_match([_H|Rest]) -> process_match(Rest).
  29.  
  30. test1(FileName) ->
  31.     statistics(wall_clock),
  32.     {ok, Bin} = file:read_file(FileName),
  33.     {Matched, Total} = scan_line1(Bin),
  34.     {_, Duration} = statistics(wall_clock),
  35.     io:format("Duration ~pms~n Matched:~B, Total:~B", [Duration,
  36. Matched, Total]).
  37.  
  38. scan_line1(Bin) -> scan_line1(Bin, [], 0, 0).
  39. scan_line1(<<>>, _Line, Matched, Total) -> {Matched, Total};
  40. scan_line1(<<$\n, Rest/binary>>, Line, Matched, Total) ->
  41.     %Line1 = lists:reverse(Line),
  42.     scan_line1(Rest, [], Matched, Total + 1);
  43. scan_line1(<<C:1/binary, Rest/binary>>, Line, Matched, Total) ->
  44.     %NewCount = Matched + process_match(Line),
  45.     scan_line1(Rest, [C|Line], Matched, Total).
  46.  
  47. test2(FileName) ->
  48.     statistics(wall_clock),
  49.     {ok, Bin} = file:read_file(FileName),
  50.     Total = travel_bin(Bin),
  51.     {_, Duration} = statistics(wall_clock),
  52.     io:format("Duration ~pms~n Total:~B", [Duration, Total]).
  53.  
  54. travel_bin(Bin) -> travel_bin(Bin, 0).
  55. travel_bin(<<>>, ByteCount) -> ByteCount;
  56. travel_bin(<<_C:1/binary, Rest/binary>>, ByteCount) ->
  57.     travel_bin(Rest, ByteCount + 1).
  58.  
  59. test3(FileName) ->
  60.     statistics(wall_clock),
  61.     {ok, Bin} = file:read_file(FileName),
  62.     Total = travel_list(binary_to_list(Bin)),
  63.     {_, Duration} = statistics(wall_clock),
  64.     io:format("Duration ~pms~n Total:~B", [Duration, Total]).
  65.  
  66. travel_list(List) -> travel_list(List, 0).
  67. travel_list([], CharCount) -> CharCount;
  68. travel_list([_C|Rest], CharCount) ->
  69.     travel_list(Rest, CharCount + 1).
  70.  
  71. parse_until_space([$\040|_Rest], Bool) -> Bool;
  72. parse_until_space([$.|_Rest], _Bool) -> true;
  73. parse_until_space([_H|Rest], Bool) -> parse_until_space(Rest, Bool).

需要说明的是,这个测试程序很总要,它更清晰的“定位”了问题,file:read 并不慢,慢的是对它的处理。另外,后面还有关于这段代码的讨论。

Bob Ippolito (MochWeb库的作者)则指出:

做这件事最显而易见的方法是 spawn 一个 cat 的进程,让 Erlang 的 port 机制来完成这些事情。

他给出了一个例子。(需要 MochWeb_Util)

下载: o10k.erl
  1. -module(o10k).
  2. -export([test/0, results_for/1]).
  3.  
  4. -define(FOLDL_SPOOL_TIMEOUT, 10000).
  5.  
  6. foldl_spool(Fun, Acc0, Port) ->
  7.     receive
  8.         {Port, {data,{_eol_or_noeol, Line}}} ->
  9.             Acc1 = Fun(Line, Acc0),
  10.             foldl_spool(Fun, Acc1, Port);
  11.         {Port, eof} ->
  12.             port_close(Port),
  13.             Acc0
  14.     after ?FOLDL_SPOOL_TIMEOUT ->
  15.             port_close(Port),
  16.             {error, timeout}
  17.     end.
  18.  
  19. fold_line(Line, Acc) ->
  20.     Uri = lists:nth(7, string:tokens(Line, " ")),
  21.     case Uri of
  22.         "/ongoing/When/" ++ Trailer ->
  23.             case lists:member($., Trailer) of
  24.                 true ->
  25.                     Acc;
  26.                 false ->
  27.                     Acc + 1
  28.             end;
  29.         _ ->
  30.             Acc
  31.     end.
  32.  
  33. test() ->
  34.     results_for("o10k.ap").
  35.  
  36. results_for(File) ->
  37.     Port = mochiweb_util:cmd_port(["cat", File],
  38.                                   [eof, stream, {line, 4096}]),
  39.     foldl_spool(fun fold_line/2, 0, Port).

这个例子提供了一个很好的思路,当我们遇到类似问题的时候,如果暂时不知道怎么解决,我们可以用 port 的方式先绕过问题。不过,这仍然没有解决我们的疑惑——仅用 Erlang 本身,能否很好的处理这类问题?

Ulf Wiger(ErlHive的作者)给出了一个 Erlang 下快速的文件 io 处理方案:

下载: sumcol.erl
  1. %% The Computer Language Benchmark Games
  2. %% http://shootout.alioth.debian.org/
  3. %% Contributed by Ulf Wiger
  4.  
  5. -module(sumcol).
  6. -export([main/1]).
  7. -export([start/0]).
  8.  
  9. %% get the program argument, which is how many test iterations to run
  10. %% for this test, we ignore this parameter
  11. main(_Args) ->
  12.     user ! {self(), wait_ready},
  13.     receive
  14.     ready ->
  15.         halt()
  16.     end.
  17.  
  18. %% callback function for user-defined line-oriented io
  19. start() ->
  20.     spawn(fun() ->
  21.           process_flag(priority,high),
  22.           register(user, self()),
  23.           read_in(open_port({fd,0,1}, [{line,128},eof]), 0)
  24.       end).
  25.  
  26. read_in(Port, Sum) ->
  27.     receive
  28.     {Port, {data, {Eol, Str}}} when Eol==eol; Eol==noeol->
  29.         read_in(Port, Sum + list_to_integer(Str));
  30.     {Port, eof}  ->
  31.         ready(Port, Sum)
  32.     end.
  33.  
  34. ready(Port, Sum) ->
  35.     port_command(Port, [integer_to_list(Sum), "\n"]),
  36.     erlang:port_close(Port),
  37.     receive
  38.     {From, wait_ready} ->
  39.          From ! ready
  40.     end.

细看之下,这个例子也是用 port 的方式来完成文件 io 的,那么,纯 Erlang 方式下,这个问题是否有办法得到解决呢?

Thomas Lindgren 指出了 dacaoyuan 上面代码中尚有改进余地的地方:

首先,没有必要将 binary 转换成 list ,而是可以直接从 binary 获得一个字节。也就是:

  1. travel_bin(Bin) -> travel_bin(Bin, 0).
  2. travel_bin(<<>>, ByteCount) -> ByteCount;
  3. travel_bin(<<_C:1/binary, Rest/binary>>, ByteCount) ->
  4.     travel_bin(Rest, ByteCount + 1).

可以写成:

  1. travel_bin(Bin) -> travel_bin(Bin, 0).
  2. travel_bin(<<>>, ByteCount) -> ByteCount;
  3. travel_bin(<<C, Rest/binary>>, ByteCount) ->
  4.     travel_bin(Rest, ByteCount + 1).

这么做的好处是可以避免生成一个单字节的 binary ,每次操作都能在堆上省下几个字节。
其次,象这样来遍历一个 binary 通常不是最好的办法,因为 <> 这一句通常意味着要构造一个新的 binary (Rest,3byte的指针),这通常不如将其转换为 list (2byte的结构)再处理更高效。
在这个程序结构之上优化,可以一次处理多个字节,如:

  1. loop(<<C0,C1,C2,C3,C4,C5,C6,C7, Rest/binary>>) ->
  2.    ..., loop(Rest);
  3. loop(Small_bin) -> ...

Bjorn Gustavsson (OTP Team)指出:

Thomas Lindgren 的建议在 R11 中是正确的,但在即将到来的 R12 中,可能会导致性能反而下降。因为 R12 引入的新特性中有“Sub Binary”,大多数情况下,编译器会自动优化上述 <<_C:1/binary, Rest/binary>> 的语句,避免生成另一个 binary 。

R12还有什么新特性?令人期待啊。

Steve Vinoski 提出了一个 map reduce 的方案(代码见后面),获得了比较好的效果。

Klacke (即Claes Wikstorm)指出:

Spawn cat 方法的缺陷在于缺乏控制。即,这可能导致大量消息发往 port 的所有者,造成大量来不及处理的消息堆积在 message queue 中(也就是 Ulf Wiger 提到的 input too fast 问题)。在 port 方法中,可以用 {line, L} 模式的 port 来改善这一问题,但仍然无法彻底解决缺乏控制的问题。

他认为,当前最佳“按行处理文件”的办法是:

1. 打开文件 file:open(Filename, [read, raw])
2. 在循环中用 buffer 获取文件内容 {ok, Bin} = file:read(Fd, BufSize),
3. 使用 binary 的正则表达式匹配库来处理,比如这个

他提到目前 otp 自带的 regex 包,当前的版本还有性能问题。

Steve Vinoski 指出:

虽然用并发改进了速度,但 Erlang 的文件 io 缓慢仍然是无法回避的问题。他进一步深化了讨论,提出了 Erlang 是否有办法/计划改进文件 io 性能这样的本质问题。

Ulf Wiger 指出:

在 Erlang 最初的设计中,文件 io 的设计原则是要尽可能的减少侵入。比如,mnesia的日志写入等,都被设计为低优先级的后台进程,少有需要读取大文件的应用场景。随着 Erlang 向其他的应用领域发展,文件 io 的要求越来越高,此时文件 io 的问题就被越来越多的人诟病。这个问题需要设计者们来加以改进。

Per Gustafsson 给出了目前为止最快的纯 Erlang 方案(代码见后面)。

Klacke 贡献了早在 Bluetail 时期对这一问题的终极解决方案——更好的文件 io 模块 bfile (代码见后面),使用这一模块,可以获得与 C 相当的文件 io 性能,以及直接支持的分行处理

【方案】

Steve Vinoski 想到了利用 Erlang 强大的并发能力来加速这个过程。他的代码是这样的:

下载: tbray.erl
  1. -module(tbray).
  2. -export([start/2]).
  3.  
  4. find_match("/ongoing/When/" ++ Last) ->
  5.     case lists:member($., Last) of
  6.         false -> 1;
  7.         true -> 0
  8.     end;
  9. find_match(_) -> 0.
  10.  
  11. process_binary(Pid, Bin) ->
  12.     spawn(fun() ->
  13.         L = string:tokens(binary_to_list(Bin), "\n"),
  14.         V = lists:foldl(
  15.             fun(Line, Total) ->
  16.                 Total + find_match(lists:nth(7, string:tokens(Line, " "))) end,
  17.             0, L),
  18.         Pid ! V
  19.         end).
  20.  
  21. split_on_newline(Bin, N, All) when size(Bin) < N ->
  22.     All ++ [Bin];
  23. split_on_newline(Bin, N, All) ->
  24.     {_, <<C:8, _/binary>>} = split_binary(Bin, N),
  25.     case C of
  26.         $\n ->
  27.           {B21, B22} = split_binary(Bin, N+1),
  28.           split_on_newline(B22, N, All ++ [B21]);
  29.         _ -> split_on_newline(Bin, N+1, All)
  30.     end.
  31. split_on_newline(Bin, N) when N == size(Bin) -> [Bin];
  32. split_on_newline(Bin, N) -> split_on_newline(Bin, N, []).
  33.  
  34. start(Num, Input) ->
  35.     {ok, Data} = file:read_file(Input),
  36.     Bins = split_on_newline(Data, size(Data) div Num),
  37.     Me = self(),
  38.     Pids = [process_binary(Me, B) || B <- Bins],
  39.     lists:foldl(
  40.         fun(_, Total) -> receive X -> Total + X end end,
  41.         0, Pids).

他的主要思路是,读取文件,拆分成 Num 个部分进行“分行”处理,然后用“map reduce”来处理匹配。
运行时:

  1. $ erl -smp enable +P 60000
  2. 1> c(tbray).
  3. {ok,tbray}
  4. 2> timer:tc(tbray, start, [1, "o10k.ap"]).
  5. {661587,1101}

当他把 process 数量调到 4000 左右的时候,获得了比较稳定的速度。

Caoyuan 的另一个方案是这样的:

下载: tbray2.erl
  1. -module(tbray).
  2.  
  3. -compile([native]).
  4.  
  5. -export([start/2,
  6.          collect_loop/2,
  7.          buffered_read/3]).
  8.  
  9. -include_lib("kernel/include/file.hrl").
  10.  
  11. %% The best BUFFER_SIZE is 4096
  12. -define(BUFFER_SIZE, 4096).
  13.  
  14. -record(context, {lineBuf = [],
  15.                   matched = 0,
  16.                   total = 0,
  17.                   lastProcessedSeq = 0,
  18.                   dataBuf = [],
  19.                   processNum}).
  20.  
  21. %% erl -smp enable +P 60000
  22. %% timer:tc(wide, start, [1000, "o1000k.ap"]).  
  23. start(ProcessNum, FileName) ->
  24.     statistics(wall_clock)
  25.     {ok, FileInfo} = file:read_file_info(FileName),
  26.     Size = FileInfo#file_info.size,
  27.     {ok, File} = file:open(FileName, [read, binary]),
  28.  
  29.     Collect = spawn(?MODULE, collect_loop, [self(), #context{processNum = ProcessNum}]),
  30.  
  31.     psplit_read_file(Collect, File, Size div ProcessNum, ProcessNum, 1),
  32.    
  33.     {Matched, Total} =
  34.       receive
  35.           #context{matched=MatchedX, total=TotalX} -> {MatchedX, TotalX}
  36.       end,
  37.  
  38.     {_, Duration2} = statistics(wall_clock),
  39.     io:format("scan lines:\t ~pms~nMatched: ~B, Total: ~B~n", [Duration2, Matched, Total]).
  40.  
  41. psplit_read_file(_Collector, _File, _ChunkSize, ProcessNum, I) when I > ProcessNum -> done;
  42. psplit_read_file(Collector, File, ChunkSize, ProcessNum, I) ->
  43.     spawn(
  44.       fun () ->
  45.               Offset = ChunkSize * (I - 1),
  46.               %% if it's last chuck, read all bytes left, which will not exceed ChunkSize * 2
  47.               Length = if  I == ProcessNum -> ChunkSize * 2;
  48.                            true -> ChunkSize
  49.                        end,
  50.               {ok, Data} = file:pread(File, Offset, Length),
  51.               Collector ! {I, Data}
  52.       end),
  53.     psplit_read_file(Collector, File, ChunkSize, ProcessNum, I + 1).
  54.  
  55.  
  56. collect_loop(Pid, #context{lastProcessedSeq= ProcessNum,
  57.                            processNum=ProcessNum}=Context) -> Pid ! Context;
  58. collect_loop(Pid, #context{dataBuf=DataBuf}=Context) ->
  59.     receive
  60.         {Seq, Data} ->
  61.             SortedDatas = lists:keysort(1, [{Seq, Data} | DataBuf]),
  62.             Context1 = process_arrived_datas(SortedDatas, Context#context{dataBuf = []}),
  63.             %io:format("Last processed Seq: ~B~n", [Context1#context.lastProcessedSeq]),
  64.             collect_loop(Pid, Context1)
  65.     end.
  66.  
  67. process_arrived_datas([], Context) -> Context;
  68. process_arrived_datas([{Seq, Data}|T], #context{lineBuf=LineBuf,
  69.                                                 matched=Matched,
  70.                                                 total=Total,
  71.                                                 lastProcessedSeq=LastProcessedSeq,
  72.                                                 dataBuf=DataBuf}=Context) ->
  73.     if  Seq == LastProcessedSeq + 1 ->
  74.             {LineBuf1, Matched1, Total1} = buffered_read(
  75.               fun (Buffer, {LineBufX, MatchedX, TotalX}) ->
  76.                       scan_line(binary_to_list(Buffer), LineBufX, MatchedX, TotalX)
  77.               end, {LineBuf, Matched, Total}, Data),
  78.             process_arrived_datas(T, Context#context{lineBuf = LineBuf1,
  79.                                                      matched = Matched1,
  80.                                                      total = Total1,
  81.                                                      lastProcessedSeq = Seq});
  82.         true ->
  83.             process_arrived_datas(T, Context#context{dataBuf = [{Seq, Data} | DataBuf]})
  84.     end.
  85.  
  86. buffered_read(Fun, Acc, Bin) ->
  87.     case Bin of
  88.         <<Buf:?BUFFER_SIZE/binary, Rest/binary>> ->
  89.             Acc1 = Fun(Buf, Acc),
  90.             buffered_read(Fun, Acc1, Rest);
  91.         _ ->
  92.             Fun(Bin, Acc)
  93.     end.
  94.    
  95. scan_line([], LineBuf, Matched, Total) -> {LineBuf, Matched, Total};
  96. scan_line([$\n|Rest], LineBuf, Matched, Total) ->
  97.     Line1 = lists:reverse(LineBuf),
  98.     %io:format("~n~s~n", [Line1]),
  99.     Matched1 = Matched + process_match(Line1),
  100.     scan_line(Rest, [], Matched1, Total + 1);
  101. scan_line([C|Rest], LineBuf, Matched, Total) ->
  102.     scan_line(Rest, [C | LineBuf], Matched, Total).
  103.  
  104. process_match([]) -> 0;
  105. process_match("/ongoing/When/"++Rest) ->
  106.     case match_until_space(Rest, false) of
  107.     true  -> 0;
  108.     false -> 1
  109.     end;
  110. process_match([_H|Rest]) ->
  111.     process_match(Rest).
  112.    
  113. match_until_space([$\040|_Rest], Bool) -> Bool;
  114. match_until_space([$.|_Rest], _Bool) -> true;
  115. match_until_space([_H|Rest], Bool) ->
  116.     match_until_space(Rest, Bool).

这个方案起了多个进程来并发读取文件,然后用 collect_loop 将各个片段转换为小的 lists ,scan_line 会进行分行处理,然后用 process_match 来匹配。这里的要点在于,lists的大小为 4096 比较合适。

Per Gustafsson 的方案是这样的:

  1. %%%-------------------------------------------------------------------
  2. %%% File    : line_server.erl
  3. %%% Author  : Per Gustafsson <pergu@it.uu.se>
  4. %%% Description :
  5. %%%
  6. %%% Created : 25 Sep 2007 by Per Gustafsson <pergu@it.uu.se>
  7. %%%-------------------------------------------------------------------
  8. -module(line_server).
  9.  
  10. -export([get_handle/1,get_lines/1,close/1]).
  11.  
  12. -compile([native]).
  13.  
  14. -define(BUFFER_SIZE, 70000).
  15.  
  16. get_handle(FileName) ->
  17.   erlang:spawn_monitor(fun() -> start(FileName) end).
  18.  
  19. get_lines({Pid,MRef}) ->
  20.   Pid ! {get_lines, MRef, self()},
  21.   receive
  22.     {{Data,Prep}, MRef} ->
  23.       case divide_lines(Data) of
  24.     [H|T] ->
  25.       [<<Prep/binary,H/binary>>|T];
  26.     [] ->
  27.       [Prep]
  28.       end;
  29.     {eof,MRef} ->
  30.       eof;
  31.     {'DOWN', MRef, _Type, Pid, _Info} ->
  32.       erlang:fault(file_closed)
  33.   end.
  34.  
  35. close({Pid,_}) ->
  36.   Pid ! stop.
  37.  
  38. start(FileName) ->
  39.     {ok,F} = file:open(FileName,[raw,binary]),
  40.     serve_lines(F,<<>>).
  41.  
  42. serve_lines(File,Rest) ->
  43.   {Data, NewRest} =
  44.     get_more_data(File,Rest),
  45.   receive 
  46.     {get_lines, Ref, Pid} ->
  47.       Pid ! {Data, Ref},
  48.       serve_lines(File,NewRest);
  49.     stop ->
  50.       file:close(File)
  51.   end.
  52.  
  53. get_more_data(File,Rest) ->
  54.   case file:read(File, ?BUFFER_SIZE) of
  55.     eof when size(Rest) =:= 0 ->
  56.       {eof,<<>>};
  57.     eof ->
  58.       {{<<>>,Rest},<<>>};
  59.     {ok,Bin} ->
  60.       case get_last_line(Bin) of
  61.     {Lines,NR} ->
  62.       {{Lines,Rest},NR};
  63.     NR ->
  64.       get_more_data(File,<<Rest/binary,NR/binary>>)
  65.       end
  66.   end.
  67.  
  68. get_last_line(Bin) ->
  69.   get_last_newline(Bin,size(Bin)-1).
  70.  
  71. get_last_newline(Bin,S) ->
  72.   case Bin of
  73.     <<Lines:S/binary,10,Rest/binary>> ->
  74.       {Lines,Rest};
  75.     _ ->
  76.       if S =< 0 -> Bin;
  77.      true -> get_last_newline(Bin,S-1)
  78.       end
  79.   end.
  80.  
  81. divide_lines(Bin) ->
  82.     divide_lines(Bin, 0, []).
  83.  
  84. divide_lines(Bin, S, Acc) ->
  85.   case Bin of
  86.     <<_:S/binary,10,_/binary>> ->
  87.       L = S,
  88.       <<New:L/binary,_,Rest/binary>> = Bin,
  89.       divide_lines(Rest,0,[New|Acc]);
  90.     <<_:S/binary,_,10,_/binary>> ->
  91.       L = S+1,
  92.       <<New:L/binary,_,Rest/binary>> = Bin,
  93.       divide_lines(Rest,0,[New|Acc]);
  94.     <<_:S/binary,_,_,10,_/binary>> ->
  95.       L = S+2,
  96.       <<New:L/binary,_,Rest/binary>> = Bin,
  97.       divide_lines(Rest,0,[New|Acc]);
  98.     <<_:S/binary,_,_,_,10,_/binary>> ->
  99.       L = S+3,
  100.       <<New:L/binary,_,Rest/binary>> = Bin,
  101.       divide_lines(Rest,0,[New|Acc]);
  102.     <<_:S/binary,_,_,_,_,10,_/binary>> ->
  103.       L = S+4,
  104.       <<New:L/binary,_,Rest/binary>> = Bin,
  105.       divide_lines(Rest,0,[New|Acc]);
  106.     <<_:S/binary,_,_,_,_,_,10,_/binary>> ->
  107.       L = S+5,
  108.       <<New:L/binary,_,Rest/binary>> = Bin,
  109.       divide_lines(Rest,0,[New|Acc]);
  110.     <<_:S/binary,_,_,_,_,_,_,10,_/binary>> ->
  111.       L = S+6,
  112.       <<New:L/binary,_,Rest/binary>> = Bin,
  113.       divide_lines(Rest,0,[New|Acc]);
  114.     <<_:S/binary,_,_,_,_,_,_,_,10,_/binary>> ->
  115.       L = S+7,
  116.       <<New:L/binary,_,Rest/binary>> = Bin,
  117.       divide_lines(Rest,0,[New|Acc]);
  118.     <<_:S/binary,_,_,_,_,_,_,_,_,10,_/binary>> ->
  119.       L = S+8,
  120.       <<New:L/binary,_,Rest/binary>> = Bin,
  121.       divide_lines(Rest,0,[New|Acc]);
  122.     <<_:S/binary,_,_,_,_,_,_,_,_,_,10,_/binary>> ->
  123.       L = S+9,
  124.       <<New:L/binary,_,Rest/binary>> = Bin,
  125.       divide_lines(Rest,0,[New|Acc]);
  126.     <<_:S/binary,_:80,_/binary>> ->
  127.       divide_lines(Bin,S+10,Acc);
  128.     <<_:S/binary,_,_/binary>> ->
  129.       divide_lines(Bin,S+1,Acc);
  130.     _ ->
  131.       lists:reverse([Bin|Acc])
  132.   end.

以及

下载: pcount.erl
  1. %%%-------------------------------------------------------------------
  2. %%% File    : pcount.erl
  3. %%% Author  : Per Gustafsson <pergu@it.uu.se>
  4. %%% Description : Counts get requests in Tim Bray's logfiles
  5. %%%
  6. %%% Created : 23 Sep 2007 by Per Gustafsson <pergu@c83-253-58-61.bredband.comhem.se>
  7. %%%-------------------------------------------------------------------
  8. -module(pcount).
  9.  
  10. -export([run/1,main/2]).
  11.  
  12. -compile([native]).
  13.  
  14. %%-type(handle()::any()).
  15. %%-type(dict()::any()).
  16.  
  17. %%-spec(run/1::([string()]) -> unit()).
  18.  
  19. %% run should be evoked from the commandline with:
  20. %% erl -smp -noshell -run pcount run filename n
  21. %%
  22. %% filename is the name of the log file and n is the number
  23. %% of processes to use
  24.  
  25. run([F]) ->
  26.   run([F,"4"]);
  27. run([FileName,NString]) ->
  28.   N = list_to_integer(NString),
  29.   main(FileName,N)
  30.   halt().
  31.  
  32. %%-spec(main/2::([string(),integer()]) -> ok).
  33.  
  34. %% main can be used from the shell to do the same thing as
  35. %% run
  36.  
  37. main(FileName,N) ->    
  38.   H = line_server:get_handle(FileName),
  39.   Dict = merge_dicts(pmap(fun (F) ->
  40.                   loop(F,dict:new())
  41.               end, lists:duplicate(N,H))),
  42.   line_server:close(H),
  43.   List = dict:to_list(Dict),
  44.   SortedList = lists:reverse(lists:keysort(2,List)),
  45.   lists:foreach(fun pp/1, SortedList).
  46.  
  47. %%-spec(merge_dicts/1::([dict()]) -> dict()).
  48.  
  49. merge_dicts([D1,D2|Rest]) ->
  50.     merge_dicts([dict:merge(fun(_,X,Y) -> X+Y end,D1,D2)|Rest]);
  51. merge_dicts([D]) -> D.
  52.  
  53. %%-spec(loop/2::(handle(),dict()) -> dict()).
  54.                 
  55. loop(Handle,Dict) ->
  56.     case line_server:get_lines(Handle) of
  57.     eof -> Dict;
  58.     Lines ->
  59.         loop(Handle,handle_lines(Lines,Dict))
  60.     end.
  61.  
  62. %%-spec(pp/1::({binary(),integer()}) -> ok).
  63.  
  64. pp({Name,Gets}) ->
  65.   io:format("~p get requests for ~s~n",[Gets,binary_to_list(Name)]).   
  66.  
  67. %%-spec(handle_lines/2::([binary()], dict()) -> dict()).
  68.  
  69. handle_lines([Line|Rest],Dict) ->
  70.   handle_lines(Rest,match_and_update_counter(Line,Dict));
  71. handle_lines([],Dict) -> Dict.
  72.  
  73. %%-spec(match_and_update_counter/2::(binary(), dict()) -> dict()).
  74.  
  75. match_and_update_counter(Line, Dict) ->
  76.     Word = get_nth_word(7,Line,$ ),
  77.     case is_proper_get(Word) of
  78.     true ->
  79.         dict:update_counter(Word,1,Dict);
  80.     false ->
  81.         Dict
  82.     end.
  83.  
  84. %%-spec(is_proper_get::(binary()) -> bool()).
  85.  
  86. is_proper_get(<<"/ongoing/When/",Rest/binary>>) ->
  87.     not(member(Rest, $.));
  88. is_proper_get(_) ->
  89.     false.
  90.    
  91. %%-spec(member/2::(binary(),byte()) -> bool()).
  92.  
  93. member(Bin, X) -> 
  94.     case Bin of
  95.     <<X,_/binary>> -> true;
  96.     <<_,Rest/binary>> -> member(Rest, X);
  97.     <<>> -> false
  98.     end.
  99.  
  100. %%-spec(get_nth_word/3::(integer(), binary(), byte()) -> binary()).
  101.  
  102. get_nth_word(N,Bin,Val) ->
  103.     get_word(0, skip_to_nth_word(N,Bin,Val),Val).
  104.  
  105. %%-spec(skip_to_nth_word/3::(integer(), binary(), byte()) -> binary()).
  106.  
  107. skip_to_nth_word(1,Bin,_Val) ->
  108.   Bin;
  109. skip_to_nth_word(N,Bin,Val) ->
  110.   case Bin of
  111.     <<Val,Rest/binary>> -> skip_to_nth_word(N-1,Rest,Val);
  112.     <<_,Rest/binary>> -> skip_to_nth_word(N,Rest,Val)
  113.   end.
  114.  
  115. %%-spec(get_word/3::(integer(), binary(), byte()) -> binary()).
  116.  
  117. get_word(S,Bin,Val) ->
  118.     case Bin of
  119.     <<_:S/binary,Val,_/binary>> ->
  120.         <<Word:S/binary,_/binary>> = Bin,
  121.         Word;
  122.     _ ->
  123.         get_word(S+1,Bin,Val)
  124.     end.
  125.  
  126. %% pmap implementation taken from luke gorrie's blog
  127. %% http://lukego.livejournal.com/6753.html
  128.  
  129. %%-spec(pmap/2::(fun((X) -> Y),[X]) -> [Y]).
  130.  
  131. pmap(F,List) ->
  132.   [wait_result(Worker) || Worker <- [spawn_worker(self(),F,E) || E <- List]].
  133.  
  134. %%-spec(spawn_worker/3::(pid(),fun((X) -> any()),X) -> {pid(),ref()}).
  135.  
  136. spawn_worker(Parent, F, E) ->
  137.   erlang:spawn_monitor(fun() -> Parent ! {self(), F(E)} end).
  138.  
  139. %%-spec(wait_result/1::({pid(),ref()}) -> any()).
  140.  
  141. wait_result({Pid,Ref}) ->
  142.   receive
  143.     {'DOWN', Ref, _, _, normal} -> receive {Pid,Result} -> Result end;
  144.     {'DOWN', Ref, _, _, Reason} -> exit(Reason)
  145.   end.

他的方法综合使用了上面提到的几种优化方法,用 buffer 读文件,每次迭代处理多个字节,并发处理耗时的 binary 转换。因而,获得了现有纯 Erlang 方法中最佳的性能。

Klacke 的 bfile 模块提供了和其他语言一样丰富的分行文件 io 特性。这个包是以 c – Erlang driver 的代码的方式提供的,可以从这里下载到这个包

用起来象这样:

  1. 2> bfile:load_driver().
  2. ok
  3. 4> {ok, Fd} = bfile:fopen("Makefile", "r").
  4. {ok,{bfile,#Port<0.98>}}
  5. 5> bfile:fgets(Fd).
  6. {line,<<10>>}
  7. 6> bfile:fgets(Fd).
  8. {line,<<10>>}
  9. 7> bfile:fgets(Fd).
  10. {line,<<97,108,108,58,32,10>>}
  11. 14> bfile:fread(Fd, 10000).
  12. {ok,<<10,10,105,110,115,116,97,108,108,58,32,97,108,108,10,9,40,99,100,32,99,95,115,114,99,59,32,...>>}
  13. 15> bfile:fread(Fd, 10000).
  14. eof

【结论】
0. Erlang 目前的文件 io 模块确实糟糕,这个 io:read_line 用来处理 console 输入就好了,不要用来做大文件的读取。
1. 可以用 file:read 以二进制的方式来读取文件,但它不提供分行,所以,你最好自己来写个 buffer 读取。
2. 把一个 200M 的东西放到内存中,再搞 binary_to_list 的转换,这不是一个好主意。
3. 有了 list 就好办了,怎么弄都快了。
4. 上面的步骤中,如果发现某个步骤很慢,那么可以考虑用 map reduce 来加速。
5. 不要死脑筋,该用 port 就用 port (虽说有控制不力的问题,但可以用分行来缓解),该写 driver 就写 driver (毕竟 Erlang 是开源的,没什么妨碍你去扩展它),别费半天的功夫去和一个“设计缺陷”较劲。

[BLAH]
PS1. Bluetail 私藏的好东西可真多啊,Erlang 真是没人疼的好孩子。
PS2. Erlang 的社区是个好社区,不回避问题,有合作精神,乐于贡献。
PS3. 现在搞程序的都用 Mac Power Book ?好像没有 Mac Book 都不好意思和人打招呼。汗自己一个。

study

  1. pi1ot
    October 3rd, 2007 at 14:09 | #1

    以前看到过说google拿map-reduce搞log分析,所以看开头猜想应该就是这条路子,但是对erlang语法的不熟悉严重影响了我读这篇文章时的阅读体验…programming erlang的翻译进度到多少了?

  2. October 4th, 2007 at 14:31 | #2

    erlang的io确实有点问题,我几天前为了要用erlang实现一个实例也在头疼这个问题。这让我有点怀念我在winxgui界面库中实现的Archive类。:) 不过我想这是实现缺陷。
    另外我有些困惑的是,erlang似乎不太适合做文本分析,我把数据读入(binary),有些担心binary_to_list浪费我的时间。但直接操作binary似乎也是一个馊主意。

  3. October 4th, 2007 at 14:46 | #3

    再仔细看了Caoyuan的程序,看起来直接操作binary是可行的(我原先有点担心binary的split是一个费时的行为,另外有点不太熟悉binary的语法)。

  4. pi1ot
    October 4th, 2007 at 14:46 | #4

    erlang适合做把文本分析的任务分解到几十个节点上然后再合并结果的工作,每个节点一个c port。

  5. dcaoyuan
    October 5th, 2007 at 21:37 | #5

    刚刚重写了代码:http://blogtrader.net/page/dcaoyuan?entry=tim_bray_s_erlang_exercise1
    新版本用了Per的部分代码,差不多是个完整本了,而且,比较地象一个真正的并行处理。在2-core的机器上可以有差不多177%的性能提升。
    Erlang的binary match看来目前不适合做太细节的binary处理,还是转换成一小段一小段的list好些。

  6. pi1ot
    October 7th, 2007 at 08:23 | #6

    说到log分析,有什么适于处理大文本的语言么,主要是性能方面。

  7. dcaoyuan
    October 7th, 2007 at 12:14 | #7

    perl

  8. jackyz
    October 8th, 2007 at 08:46 | #8

    实际工作中的log分析还是用perl/awk这些传统工具比较靠谱。

    erlang的文件io,bluetail的bfile方案是很不错的选择,如果能尽快纳入OTP就太好了。

  9. pi1ot
    October 8th, 2007 at 09:45 | #9

    只是提供一套高效的regex就可以算是高效处理大文本吗?

  10. mryufeng
    October 11th, 2007 at 12:58 | #10

    个人感觉IO不会慢 而是处理数据的时候变形太多 导致效率不理想。这种应用还是让erlang作master的工作 cport用来作worker最简单。不过这篇讨论把erlang 能想出来提高效率的招都摆到桌面上来了 对于应用的开发有很大的参考意义。

  11. Arbow
    November 2nd, 2007 at 23:42 | #11

    事情现在有了新的进展了,jackyz让俺写个类似的后续报道,因为一直没有跟进,写不出来,只能提提之后的情况,还是蛮精彩,不容错过。

    在文中,继使用bfile模块和并行算法来提高效率后,大伙们仍然在继续研究。dcaoyuan在 http://blogtrader.org/page/dcaoyuan/entry/learning_coding_binary_was_tim 里面公布了最新的进展,这个纯Erlang的方案能够在3秒内完成计算:

    $ erlc -smp tbray5.erl
    $ time erl +h 8192 -smp -noshell -run tbray5 start o1000k.ap -s erlang halt

    real 0m2.972s
    user 0m9.685s
    sys 0m0.748s

    诀窍是如下几个要点:
    I. Don’t split a binary unless the split binaries are what you exactly want
    II. Calculate the final offsets first, then split it when you’ve got the exactly offsets
    III. Use “+h Size” option or [{min_heap_size, Size}] with spawn_opt
    IV. Others
    * Don’t forget to compile to native by adding “-compile([native]).” in your code.
    * Maybe “+A Size” to set the number of threads in async thread pool also helps a bit when do IO.

    代码可以在 http://blogtrader.net/page/dcaoyuan/resource/code/tbray5.erl 下载。

    过了几天,http://www.erlang.org/pipermail/erlang-questions/2007-October/030273.html 中,pichi同学上传了他的更快更强的方案,含 chunk_reader.erl,nlt_reader.erl,file_map_reduce.erl和wf_pichi3.erl四个文件,代码量达到500+之巨。

    紧接着(http://www.erlang.org/pipermail/erlang-questions/2007-October/030278.html),
    Anders改进了自己的代码(wfinder1_1 + wfbm4_ets1_1),在 8-core 2.33GHz Intel Xeon Linux,2G RAM的机器上,取得了
    real 0m0.567s
    user 0m2.249s
    sys 0m0.956s
    的高性能。

    10.30,http://www.tbray.org/ongoing/When/200x/2007/10/30/WF-Results 公布了结果,第一版代码中相对Ruby代码极差的性能,现在性能幅度已经提升得相当高了:

    Name Language Elapsed User System LoC
    wfinder1_1 Erlang 6.46 34.07 8.02 287
    wf_pichi3 Erlang 8.28 51.98 9.38 545
    tbray5 Erlang 20.74 3:51.33 8.00 76
    wf_p Ruby 50.16 37.58 12.50 39

    11.1 dcaoyuan在 http://blogtrader.org/page/dcaoyuan/entry/learning_coding_binary_was_tim 更新补充了 Boyer-Moore搜索算法,性能可以提高100%。

  12. jackyz
    November 5th, 2007 at 08:13 | #12

    @Arbow well done.

  1. No trackbacks yet.