Erlang-China

erlang 中文社区

Case Study: Fast File IO in Erlang


俺喜欢从实例中学习——遵循:“问题-分析-方案-结论”的认知过程。这篇是个小小的尝试,如果效果还不错,后面再去整理几个。杀猫长文,代码满塞,慎入。:D

【问题】

9月23日,Erlang 的 maillist 中,有人发现了这么一篇文章《Erlang Blues

正如它的标题,这篇文章从批评的眼光,指出了许多作者本人并不喜欢 Erlang 的地方。文章本身的观点姑且不论,但确实是指出了作为一个“普通程序员”看待和评估 Erlang 时必然会留意到的问题。即,按照通常习惯写出来的文件 io 代码,性能很糟糕,比如,下面的代码:

  1. scan_line(eof, _, Count) -> Count;
  2. scan_line(_, File, Count) ->
  3.     scan_line(io:get_line(File, ''), File, Count + 1).

这是一个简单的统计文件行数的程序,在他的 MacBook 上处理一个 1,167,948 行的日志文件,竟然用了 34.165 秒,而相同功能的 ruby 程序只用了 3.036 秒。性能差距有十倍之多,不可谓不大。

这是为什么?莫非 Erlang 的文件 io 系统设计得如此糟糕?且看 Erlang 的社区如何反应此事?

【分析】

Thomas Lindgren (在 XML 处理上,对 Erlang 和 Ruby 有过 10s / xxm 的比对经验)指出:

io:get_line 是在字符串级别处理文件的函数,它的实现非常慢,而 file:read_file 将整个文件读为 binary 则要快得多。

Caoyuan (是的,就是这里的 dcaoyuan ,知名的 ErlyBird 的作者)指出:

这个问题其实并不是关于文件 io 的,测试表明,file:read_file 以 binary 方式读到内存非常快,重要的是在于这个过程需要遍历 binary 数据,遍历整个 200m 的 binary 数据是很耗费时间的一个任务。

他给出了如下的测试程序:

  1. -module(widefinder).
  2.  
  3. -export([test/1,
  4.          test1/1,
  5.          test2/1,
  6.          test3/1]).
  7.  
  8. test(FileName) ->
  9.     statistics(wall_clock),
  10.     {ok, IO} = file:open(FileName, read),
  11.     {Matched, Total} = scan_line(IO),
  12.     {_, Duration} = statistics(wall_clock),
  13.     io:format("Duration ~pms~n Matched:~B, Total:~B", [Duration,
  14. Matched, Total]).
  15.  
  16. scan_line(IO) -> scan_line("", IO, 0, -1).
  17. scan_line(eof, _, Matched, Total) -> {Matched, Total};
  18. scan_line(Line, IO, Matched, Total) ->
  19.     NewCount = Matched + process_match(Line),
  20.     scan_line(io:get_line(IO, ''), IO, NewCount, Total + 1).
  21.  
  22. process_match([]) -> 0;
  23. process_match("/ongoing/When/"++Rest) ->
  24.     case parse_until_space(Rest, false) of
  25. true  -> 0;
  26. false -> 1
  27.     end;
  28. process_match([_H|Rest]) -> process_match(Rest).
  29.  
  30. test1(FileName) ->
  31.     statistics(wall_clock),
  32.     {ok, Bin} = file:read_file(FileName),
  33.     {Matched, Total} = scan_line1(Bin),
  34.     {_, Duration} = statistics(wall_clock),
  35.     io:format("Duration ~pms~n Matched:~B, Total:~B", [Duration,
  36. Matched, Total]).
  37.  
  38. scan_line1(Bin) -> scan_line1(Bin, [], 0, 0).
  39. scan_line1(<<>>, _Line, Matched, Total) -> {Matched, Total};
  40. scan_line1(<<$\n, Rest/binary>>, Line, Matched, Total) ->
  41.     %Line1 = lists:reverse(Line),
  42.     scan_line1(Rest, [], Matched, Total + 1);
  43. scan_line1(<<C:1/binary, Rest/binary>>, Line, Matched, Total) ->
  44.     %NewCount = Matched + process_match(Line),
  45.     scan_line1(Rest, [C|Line], Matched, Total).
  46.  
  47. test2(FileName) ->
  48.     statistics(wall_clock),
  49.     {ok, Bin} = file:read_file(FileName),
  50.     Total = travel_bin(Bin),
  51.     {_, Duration} = statistics(wall_clock),
  52.     io:format("Duration ~pms~n Total:~B", [Duration, Total]).
  53.  
  54. travel_bin(Bin) -> travel_bin(Bin, 0).
  55. travel_bin(<<>>, ByteCount) -> ByteCount;
  56. travel_bin(<<_C:1/binary, Rest/binary>>, ByteCount) ->
  57.     travel_bin(Rest, ByteCount + 1).
  58.  
  59. test3(FileName) ->
  60.     statistics(wall_clock),
  61.     {ok, Bin} = file:read_file(FileName),
  62.     Total = travel_list(binary_to_list(Bin)),
  63.     {_, Duration} = statistics(wall_clock),
  64.     io:format("Duration ~pms~n Total:~B", [Duration, Total]).
  65.  
  66. travel_list(List) -> travel_list(List, 0).
  67. travel_list([], CharCount) -> CharCount;
  68. travel_list([_C|Rest], CharCount) ->
  69.     travel_list(Rest, CharCount + 1).
  70.  
  71. parse_until_space([$\040|_Rest], Bool) -> Bool;
  72. parse_until_space([$.|_Rest], _Bool) -> true;
  73. parse_until_space([_H|Rest], Bool) -> parse_until_space(Rest, Bool).

需要说明的是,这个测试程序很总要,它更清晰的“定位”了问题,file:read 并不慢,慢的是对它的处理。另外,后面还有关于这段代码的讨论。

Bob Ippolito (MochWeb库的作者)则指出:

做这件事最显而易见的方法是 spawn 一个 cat 的进程,让 Erlang 的 port 机制来完成这些事情。

他给出了一个例子。(需要 MochWeb_Util)

下载: o10k.erl
  1. -module(o10k).
  2. -export([test/0, results_for/1]).
  3.  
  4. -define(FOLDL_SPOOL_TIMEOUT, 10000).
  5.  
  6. foldl_spool(Fun, Acc0, Port) ->
  7.     receive
  8.         {Port, {data,{_eol_or_noeol, Line}}} ->
  9.             Acc1 = Fun(Line, Acc0),
  10.             foldl_spool(Fun, Acc1, Port);
  11.         {Port, eof} ->
  12.             port_close(Port),
  13.             Acc0
  14.     after ?FOLDL_SPOOL_TIMEOUT ->
  15.             port_close(Port),
  16.             {error, timeout}
  17.     end.
  18.  
  19. fold_line(Line, Acc) ->
  20.     Uri = lists:nth(7, string:tokens(Line, " ")),
  21.     case Uri of
  22.         "/ongoing/When/" ++ Trailer ->
  23.             case lists:member($., Trailer) of
  24.                 true ->
  25.                     Acc;
  26.                 false ->
  27.                     Acc + 1
  28.             end;
  29.         _ ->
  30.             Acc
  31.     end.
  32.  
  33. test() ->
  34.     results_for("o10k.ap").
  35.  
  36. results_for(File) ->
  37.     Port = mochiweb_util:cmd_port(["cat", File],
  38.                                   [eof, stream, {line, 4096}]),
  39.     foldl_spool(fun fold_line/2, 0, Port).

这个例子提供了一个很好的思路,当我们遇到类似问题的时候,如果暂时不知道怎么解决,我们可以用 port 的方式先绕过问题。不过,这仍然没有解决我们的疑惑——仅用 Erlang 本身,能否很好的处理这类问题?

Ulf Wiger(ErlHive的作者)给出了一个 Erlang 下快速的文件 io 处理方案:

下载: sumcol.erl
  1. %% The Computer Language Benchmark Games
  2. %% http://shootout.alioth.debian.org/
  3. %% Contributed by Ulf Wiger
  4.  
  5. -module(sumcol).
  6. -export([main/1]).
  7. -export([start/0]).
  8.  
  9. %% get the program argument, which is how many test iterations to run
  10. %% for this test, we ignore this parameter
  11. main(_Args) ->
  12.     user ! {self(), wait_ready},
  13.     receive
  14.     ready ->
  15.         halt()
  16.     end.
  17.  
  18. %% callback function for user-defined line-oriented io
  19. start() ->
  20.     spawn(fun() ->
  21.           process_flag(priority,high),
  22.           register(user, self()),
  23.           read_in(open_port({fd,0,1}, [{line,128},eof]), 0)
  24.       end).
  25.  
  26. read_in(Port, Sum) ->
  27.     receive
  28.     {Port, {data, {Eol, Str}}} when Eol==eol; Eol==noeol->
  29.         read_in(Port, Sum + list_to_integer(Str));
  30.     {Port, eof}  ->
  31.         ready(Port, Sum)
  32.     end.
  33.  
  34. ready(Port, Sum) ->
  35.     port_command(Port, [integer_to_list(Sum), "\n"]),
  36.     erlang:port_close(Port),
  37.     receive
  38.     {From, wait_ready} ->
  39.          From ! ready
  40.     end.

细看之下,这个例子也是用 port 的方式来完成文件 io 的,那么,纯 Erlang 方式下,这个问题是否有办法得到解决呢?

Thomas Lindgren 指出了 dacaoyuan 上面代码中尚有改进余地的地方:

首先,没有必要将 binary 转换成 list ,而是可以直接从 binary 获得一个字节。也就是:

  1. travel_bin(Bin) -> travel_bin(Bin, 0).
  2. travel_bin(<<>>, ByteCount) -> ByteCount;
  3. travel_bin(<<_C:1/binary, Rest/binary>>, ByteCount) ->
  4.     travel_bin(Rest, ByteCount + 1).

可以写成:

  1. travel_bin(Bin) -> travel_bin(Bin, 0).
  2. travel_bin(<<>>, ByteCount) -> ByteCount;
  3. travel_bin(<<C, Rest/binary>>, ByteCount) ->
  4.     travel_bin(Rest, ByteCount + 1).

这么做的好处是可以避免生成一个单字节的 binary ,每次操作都能在堆上省下几个字节。
其次,象这样来遍历一个 binary 通常不是最好的办法,因为 <> 这一句通常意味着要构造一个新的 binary (Rest,3byte的指针),这通常不如将其转换为 list (2byte的结构)再处理更高效。
在这个程序结构之上优化,可以一次处理多个字节,如:

  1. loop(<<C0,C1,C2,C3,C4,C5,C6,C7, Rest/binary>>) ->
  2.    ..., loop(Rest);
  3. loop(Small_bin) -> ...

Bjorn Gustavsson (OTP Team)指出:

Thomas Lindgren 的建议在 R11 中是正确的,但在即将到来的 R12 中,可能会导致性能反而下降。因为 R12 引入的新特性中有“Sub Binary”,大多数情况下,编译器会自动优化上述 <<_C:1/binary, Rest/binary>> 的语句,避免生成另一个 binary 。

R12还有什么新特性?令人期待啊。

Steve Vinoski 提出了一个 map reduce 的方案(代码见后面),获得了比较好的效果。

Klacke (即Claes Wikstorm)指出:

Spawn cat 方法的缺陷在于缺乏控制。即,这可能导致大量消息发往 port 的所有者,造成大量来不及处理的消息堆积在 message queue 中(也就是 Ulf Wiger 提到的 input too fast 问题)。在 port 方法中,可以用 {line, L} 模式的 port 来改善这一问题,但仍然无法彻底解决缺乏控制的问题。

他认为,当前最佳“按行处理文件”的办法是:

1. 打开文件 file:open(Filename, [read, raw])
2. 在循环中用 buffer 获取文件内容 {ok, Bin} = file:read(Fd, BufSize),
3. 使用 binary 的正则表达式匹配库来处理,比如这个

他提到目前 otp 自带的 regex 包,当前的版本还有性能问题。

Steve Vinoski 指出:

虽然用并发改进了速度,但 Erlang 的文件 io 缓慢仍然是无法回避的问题。他进一步深化了讨论,提出了 Erlang 是否有办法/计划改进文件 io 性能这样的本质问题。

Ulf Wiger 指出:

在 Erlang 最初的设计中,文件 io 的设计原则是要尽可能的减少侵入。比如,mnesia的日志写入等,都被设计为低优先级的后台进程,少有需要读取大文件的应用场景。随着 Erlang 向其他的应用领域发展,文件 io 的要求越来越高,此时文件 io 的问题就被越来越多的人诟病。这个问题需要设计者们来加以改进。

Per Gustafsson 给出了目前为止最快的纯 Erlang 方案(代码见后面)。

Klacke 贡献了早在 Bluetail 时期对这一问题的终极解决方案——更好的文件 io 模块 bfile (代码见后面),使用这一模块,可以获得与 C 相当的文件 io 性能,以及直接支持的分行处理

【方案】

Steve Vinoski 想到了利用 Erlang 强大的并发能力来加速这个过程。他的代码是这样的:

下载: tbray.erl
  1. -module(tbray).
  2. -export([start/2]).
  3.  
  4. find_match("/ongoing/When/" ++ Last) ->
  5.     case lists:member($., Last) of
  6.         false -> 1;
  7.         true -> 0
  8.     end;
  9. find_match(_) -> 0.
  10.  
  11. process_binary(Pid, Bin) ->
  12.     spawn(fun() ->
  13.         L = string:tokens(binary_to_list(Bin), "\n"),
  14.         V = lists:foldl(
  15.             fun(Line, Total) ->
  16.                 Total + find_match(lists:nth(7, string:tokens(Line, " "))) end,
  17.             0, L),
  18.         Pid ! V
  19.         end).
  20.  
  21. split_on_newline(Bin, N, All) when size(Bin) < N ->
  22.     All ++ [Bin];
  23. split_on_newline(Bin, N, All) ->
  24.     {_, <<C:8, _/binary>>} = split_binary(Bin, N),
  25.     case C of
  26.         $\n ->
  27.           {B21, B22} = split_binary(Bin, N+1),
  28.           split_on_newline(B22, N, All ++ [B21]);
  29.         _ -> split_on_newline(Bin, N+1, All)
  30.     end.
  31. split_on_newline(Bin, N) when N == size(Bin) -> [Bin];
  32. split_on_newline(Bin, N) -> split_on_newline(Bin, N, []).
  33.  
  34. start(Num, Input) ->
  35.     {ok, Data} = file:read_file(Input),
  36.     Bins = split_on_newline(Data, size(Data) div Num),
  37.     Me = self(),
  38.     Pids = [process_binary(Me, B) || B <- Bins],
  39.     lists:foldl(
  40.         fun(_, Total) -> receive X -> Total + X end end,
  41.         0, Pids).

他的主要思路是,读取文件,拆分成 Num 个部分进行“分行”处理,然后用“map reduce”来处理匹配。
运行时:

  1. $ erl -smp enable +P 60000
  2. 1> c(tbray).
  3. {ok,tbray}
  4. 2> timer:tc(tbray, start, [1, "o10k.ap"]).
  5. {661587,1101}

当他把 process 数量调到 4000 左右的时候,获得了比较稳定的速度。

Caoyuan 的另一个方案是这样的:

下载: tbray2.erl
  1. -module(tbray).
  2.  
  3. -compile([native]).
  4.  
  5. -export([start/2,
  6.          collect_loop/2,
  7.          buffered_read/3]).
  8.  
  9. -include_lib("kernel/include/file.hrl").
  10.  
  11. %% The best BUFFER_SIZE is 4096
  12. -define(BUFFER_SIZE, 4096).
  13.  
  14. -record(context, {lineBuf = [],
  15.                   matched = 0,
  16.                   total = 0,
  17.                   lastProcessedSeq = 0,
  18.                   dataBuf = [],
  19.                   processNum}).
  20.  
  21. %% erl -smp enable +P 60000
  22. %% timer:tc(wide, start, [1000, "o1000k.ap"]).  
  23. start(ProcessNum, FileName) ->
  24.     statistics(wall_clock)
  25.     {ok, FileInfo} = file:read_file_info(FileName),
  26.     Size = FileInfo#file_info.size,
  27.     {ok, File} = file:open(FileName, [read, binary]),
  28.  
  29.     Collect = spawn(?MODULE, collect_loop, [self(), #context{processNum = ProcessNum}]),
  30.  
  31.     psplit_read_file(Collect, File, Size div ProcessNum, ProcessNum, 1),
  32.    
  33.     {Matched, Total} =
  34.       receive
  35.           #context{matched=MatchedX, total=TotalX} -> {MatchedX, TotalX}
  36.       end,
  37.  
  38.     {_, Duration2} = statistics(wall_clock),
  39.     io:format("scan lines:\t ~pms~nMatched: ~B, Total: ~B~n", [Duration2, Matched, Total]).
  40.  
  41. psplit_read_file(_Collector, _File, _ChunkSize, ProcessNum, I) when I > ProcessNum -> done;
  42. psplit_read_file(Collector, File, ChunkSize, ProcessNum, I) ->
  43.     spawn(
  44.       fun () ->
  45.               Offset = ChunkSize * (I - 1),
  46.               %% if it's last chuck, read all bytes left, which will not exceed ChunkSize * 2
  47.               Length = if  I == ProcessNum -> ChunkSize * 2;
  48.                            true -> ChunkSize
  49.                        end,
  50.               {ok, Data} = file:pread(File, Offset, Length),
  51.               Collector ! {I, Data}
  52.       end),
  53.     psplit_read_file(Collector, File, ChunkSize, ProcessNum, I + 1).
  54.  
  55.  
  56. collect_loop(Pid, #context{lastProcessedSeq= ProcessNum,
  57.                            processNum=ProcessNum}=Context) -> Pid ! Context;
  58. collect_loop(Pid, #context{dataBuf=DataBuf}=Context) ->
  59.     receive
  60.         {Seq, Data} ->
  61.             SortedDatas = lists:keysort(1, [{Seq, Data} | DataBuf]),
  62.             Context1 = process_arrived_datas(SortedDatas, Context#context{dataBuf = []}),
  63.             %io:format("Last processed Seq: ~B~n", [Context1#context.lastProcessedSeq]),
  64.             collect_loop(Pid, Context1)
  65.     end.
  66.  
  67. process_arrived_datas([], Context) -> Context;
  68. process_arrived_datas([{Seq, Data}|T], #context{lineBuf=LineBuf,
  69.                                                 matched=Matched,
  70.                                                 total=Total,
  71.                                                 lastProcessedSeq=LastProcessedSeq,
  72.                                                 dataBuf=DataBuf}=Context) ->
  73.     if  Seq == LastProcessedSeq + 1 ->
  74.             {LineBuf1, Matched1, Total1} = buffered_read(
  75.               fun (Buffer, {LineBufX, MatchedX, TotalX}) ->
  76.                       scan_line(binary_to_list(Buffer), LineBufX, MatchedX, TotalX)
  77.               end, {LineBuf, Matched, Total}, Data),
  78.             process_arrived_datas(T, Context#context{lineBuf = LineBuf1,
  79.                                                      matched = Matched1,
  80.                                                      total = Total1,
  81.                                                      lastProcessedSeq = Seq});
  82.         true ->
  83.             process_arrived_datas(T, Context#context{dataBuf = [{Seq, Data} | DataBuf]})
  84.     end.
  85.  
  86. buffered_read(Fun, Acc, Bin) ->
  87.     case Bin of
  88.         <<Buf:?BUFFER_SIZE/binary, Rest/binary>> ->
  89.             Acc1 = Fun(Buf, Acc),
  90.             buffered_read(Fun, Acc1, Rest);
  91.         _ ->
  92.             Fun(Bin, Acc)
  93.     end.
  94.    
  95. scan_line([], LineBuf, Matched, Total) -> {LineBuf, Matched, Total};
  96. scan_line([$\n|Rest], LineBuf, Matched, Total) ->
  97.     Line1 = lists:reverse(LineBuf),
  98.     %io:format("~n~s~n", [Line1]),
  99.     Matched1 = Matched + process_match(Line1),
  100.     scan_line(Rest, [], Matched1, Total + 1);
  101. scan_line([C|Rest], LineBuf, Matched, Total) ->
  102.     scan_line(Rest, [C | LineBuf], Matched, Total).
  103.  
  104. process_match([]) -> 0;
  105. process_match("/ongoing/When/"++Rest) ->
  106.     case match_until_space(Rest, false) of
  107.     true  -> 0;
  108.     false -> 1
  109.     end;
  110. process_match([_H|Rest]) ->
  111.     process_match(Rest).
  112.    
  113. match_until_space([$\040|_Rest], Bool) -> Bool;
  114. match_until_space([$.|_Rest], _Bool) -> true;
  115. match_until_space([_H|Rest], Bool) ->
  116.     match_until_space(Rest, Bool).

这个方案起了多个进程来并发读取文件,然后用 collect_loop 将各个片段转换为小的 lists ,scan_line 会进行分行处理,然后用 process_match 来匹配。这里的要点在于,lists的大小为 4096 比较合适。

Per Gustafsson 的方案是这样的:

  1. %%%-------------------------------------------------------------------
  2. %%% File    : line_server.erl
  3. %%% Author  : Per Gustafsson <pergu@it.uu.se>
  4. %%% Description :
  5. %%%
  6. %%% Created : 25 Sep 2007 by Per Gustafsson <pergu@it.uu.se>
  7. %%%-------------------------------------------------------------------
  8. -module(line_server).
  9.  
  10. -export([get_handle/1,get_lines/1,close/1]).
  11.  
  12. -compile([native]).
  13.  
  14. -define(BUFFER_SIZE, 70000).
  15.  
  16. get_handle(FileName) ->
  17.   erlang:spawn_monitor(fun() -> start(FileName) end).
  18.  
  19. get_lines({Pid,MRef}) ->
  20.   Pid ! {get_lines, MRef, self()},
  21.   receive
  22.     {{Data,Prep}, MRef} ->
  23.       case divide_lines(Data) of
  24.     [H|T] ->
  25.       [<<Prep/binary,H/binary>>|T];
  26.     [] ->
  27.       [Prep]
  28.       end;
  29.     {eof,MRef} ->
  30.       eof;
  31.     {'DOWN', MRef, _Type, Pid, _Info} ->
  32.       erlang:fault(file_closed)
  33.   end.
  34.  
  35. close({Pid,_}) ->
  36.   Pid ! stop.
  37.  
  38. start(FileName) ->
  39.     {ok,F} = file:open(FileName,[raw,binary]),
  40.     serve_lines(F,<<>>).
  41.  
  42. serve_lines(File,Rest) ->
  43.   {Data, NewRest} =
  44.     get_more_data(File,Rest),
  45.   receive 
  46.     {get_lines, Ref, Pid} ->
  47.       Pid ! {Data, Ref},
  48.       serve_lines(File,NewRest);
  49.     stop ->
  50.       file:close(File)
  51.   end.
  52.  
  53. get_more_data(File,Rest) ->
  54.   case file: