Case Study: Fast File IO in Erlang
俺喜欢从实例中学习——遵循:“问题-分析-方案-结论”的认知过程。这篇是个小小的尝试,如果效果还不错,后面再去整理几个。杀猫长文,代码满塞,慎入。:D
【问题】
9月23日,Erlang 的 maillist 中,有人发现了这么一篇文章《Erlang Blues》
正如它的标题,这篇文章从批评的眼光,指出了许多作者本人并不喜欢 Erlang 的地方。文章本身的观点姑且不论,但确实是指出了作为一个“普通程序员”看待和评估 Erlang 时必然会留意到的问题。即,按照通常习惯写出来的文件 io 代码,性能很糟糕,比如,下面的代码:
- scan_line(eof, _, Count) -> Count;
- scan_line(_, File, Count) ->
- scan_line(io:get_line(File, ''), File, Count + 1).
这是一个简单的统计文件行数的程序,在他的 MacBook 上处理一个 1,167,948 行的日志文件,竟然用了 34.165 秒,而相同功能的 ruby 程序只用了 3.036 秒。性能差距有十倍之多,不可谓不大。
这是为什么?莫非 Erlang 的文件 io 系统设计得如此糟糕?且看 Erlang 的社区如何反应此事?
【分析】
Thomas Lindgren (在 XML 处理上,对 Erlang 和 Ruby 有过 10s / xxm 的比对经验)指出:
io:get_line 是在字符串级别处理文件的函数,它的实现非常慢,而 file:read_file 将整个文件读为 binary 则要快得多。
Caoyuan (是的,就是这里的 dcaoyuan ,知名的 ErlyBird 的作者)指出:
这个问题其实并不是关于文件 io 的,测试表明,file:read_file 以 binary 方式读到内存非常快,重要的是在于这个过程需要遍历 binary 数据,遍历整个 200m 的 binary 数据是很耗费时间的一个任务。
他给出了如下的测试程序:
- -module(widefinder).
- -export([test/1,
- test1/1,
- test2/1,
- test3/1]).
- test(FileName) ->
- statistics(wall_clock),
- {ok, IO} = file:open(FileName, read),
- {Matched, Total} = scan_line(IO),
- {_, Duration} = statistics(wall_clock),
- io:format("Duration ~pms~n Matched:~B, Total:~B", [Duration,
- Matched, Total]).
- scan_line(IO) -> scan_line("", IO, 0, -1).
- scan_line(eof, _, Matched, Total) -> {Matched, Total};
- scan_line(Line, IO, Matched, Total) ->
- NewCount = Matched + process_match(Line),
- scan_line(io:get_line(IO, ''), IO, NewCount, Total + 1).
- process_match([]) -> 0;
- process_match("/ongoing/When/"++Rest) ->
- case parse_until_space(Rest, false) of
- true -> 0;
- false -> 1
- end;
- process_match([_H|Rest]) -> process_match(Rest).
- test1(FileName) ->
- statistics(wall_clock),
- {ok, Bin} = file:read_file(FileName),
- {Matched, Total} = scan_line1(Bin),
- {_, Duration} = statistics(wall_clock),
- io:format("Duration ~pms~n Matched:~B, Total:~B", [Duration,
- Matched, Total]).
- scan_line1(Bin) -> scan_line1(Bin, [], 0, 0).
- scan_line1(<<>>, _Line, Matched, Total) -> {Matched, Total};
- scan_line1(<<$\n, Rest/binary>>, Line, Matched, Total) ->
- %Line1 = lists:reverse(Line),
- scan_line1(Rest, [], Matched, Total + 1);
- scan_line1(<<C:1/binary, Rest/binary>>, Line, Matched, Total) ->
- %NewCount = Matched + process_match(Line),
- scan_line1(Rest, [C|Line], Matched, Total).
- test2(FileName) ->
- statistics(wall_clock),
- {ok, Bin} = file:read_file(FileName),
- Total = travel_bin(Bin),
- {_, Duration} = statistics(wall_clock),
- io:format("Duration ~pms~n Total:~B", [Duration, Total]).
- travel_bin(Bin) -> travel_bin(Bin, 0).
- travel_bin(<<>>, ByteCount) -> ByteCount;
- travel_bin(<<_C:1/binary, Rest/binary>>, ByteCount) ->
- travel_bin(Rest, ByteCount + 1).
- test3(FileName) ->
- statistics(wall_clock),
- {ok, Bin} = file:read_file(FileName),
- Total = travel_list(binary_to_list(Bin)),
- {_, Duration} = statistics(wall_clock),
- io:format("Duration ~pms~n Total:~B", [Duration, Total]).
- travel_list(List) -> travel_list(List, 0).
- travel_list([], CharCount) -> CharCount;
- travel_list([_C|Rest], CharCount) ->
- travel_list(Rest, CharCount + 1).
- parse_until_space([$\040|_Rest], Bool) -> Bool;
- parse_until_space([$.|_Rest], _Bool) -> true;
- parse_until_space([_H|Rest], Bool) -> parse_until_space(Rest, Bool).
需要说明的是,这个测试程序很总要,它更清晰的“定位”了问题,file:read 并不慢,慢的是对它的处理。另外,后面还有关于这段代码的讨论。
Bob Ippolito (MochWeb库的作者)则指出:
做这件事最显而易见的方法是 spawn 一个 cat 的进程,让 Erlang 的 port 机制来完成这些事情。
他给出了一个例子。(需要 MochWeb_Util)
- -module(o10k).
- -export([test/0, results_for/1]).
- -define(FOLDL_SPOOL_TIMEOUT, 10000).
- foldl_spool(Fun, Acc0, Port) ->
- receive
- {Port, {data,{_eol_or_noeol, Line}}} ->
- Acc1 = Fun(Line, Acc0),
- foldl_spool(Fun, Acc1, Port);
- {Port, eof} ->
- port_close(Port),
- Acc0
- after ?FOLDL_SPOOL_TIMEOUT ->
- port_close(Port),
- {error, timeout}
- end.
- fold_line(Line, Acc) ->
- Uri = lists:nth(7, string:tokens(Line, " ")),
- case Uri of
- "/ongoing/When/" ++ Trailer ->
- case lists:member($., Trailer) of
- true ->
- Acc;
- false ->
- Acc + 1
- end;
- _ ->
- Acc
- end.
- test() ->
- results_for("o10k.ap").
- results_for(File) ->
- Port = mochiweb_util:cmd_port(["cat", File],
- [eof, stream, {line, 4096}]),
- foldl_spool(fun fold_line/2, 0, Port).
这个例子提供了一个很好的思路,当我们遇到类似问题的时候,如果暂时不知道怎么解决,我们可以用 port 的方式先绕过问题。不过,这仍然没有解决我们的疑惑——仅用 Erlang 本身,能否很好的处理这类问题?
Ulf Wiger(ErlHive的作者)给出了一个 Erlang 下快速的文件 io 处理方案:
- %% The Computer Language Benchmark Games
- %% http://shootout.alioth.debian.org/
- %% Contributed by Ulf Wiger
- -module(sumcol).
- -export([main/1]).
- -export([start/0]).
- %% get the program argument, which is how many test iterations to run
- %% for this test, we ignore this parameter
- main(_Args) ->
- user ! {self(), wait_ready},
- receive
- ready ->
- halt()
- end.
- %% callback function for user-defined line-oriented io
- start() ->
- spawn(fun() ->
- process_flag(priority,high),
- register(user, self()),
- read_in(open_port({fd,0,1}, [{line,128},eof]), 0)
- end).
- read_in(Port, Sum) ->
- receive
- {Port, {data, {Eol, Str}}} when Eol==eol; Eol==noeol->
- read_in(Port, Sum + list_to_integer(Str));
- {Port, eof} ->
- ready(Port, Sum)
- end.
- ready(Port, Sum) ->
- port_command(Port, [integer_to_list(Sum), "\n"]),
- erlang:port_close(Port),
- receive
- {From, wait_ready} ->
- From ! ready
- end.
细看之下,这个例子也是用 port 的方式来完成文件 io 的,那么,纯 Erlang 方式下,这个问题是否有办法得到解决呢?
Thomas Lindgren 指出了 dacaoyuan 上面代码中尚有改进余地的地方:
首先,没有必要将 binary 转换成 list ,而是可以直接从 binary 获得一个字节。也就是:
- travel_bin(Bin) -> travel_bin(Bin, 0).
- travel_bin(<<>>, ByteCount) -> ByteCount;
- travel_bin(<<_C:1/binary, Rest/binary>>, ByteCount) ->
- travel_bin(Rest, ByteCount + 1).
可以写成:
- travel_bin(Bin) -> travel_bin(Bin, 0).
- travel_bin(<<>>, ByteCount) -> ByteCount;
- travel_bin(<<C, Rest/binary>>, ByteCount) ->
- travel_bin(Rest, ByteCount + 1).
这么做的好处是可以避免生成一个单字节的 binary ,每次操作都能在堆上省下几个字节。
其次,象这样来遍历一个 binary 通常不是最好的办法,因为<这一句通常意味着要构造一个新的 binary (Rest,3byte的指针),这通常不如将其转换为 list (2byte的结构)再处理更高效。>
在这个程序结构之上优化,可以一次处理多个字节,如:
- loop(<<C0,C1,C2,C3,C4,C5,C6,C7, Rest/binary>>) ->
- ..., loop(Rest);
- loop(Small_bin) -> ...
Bjorn Gustavsson (OTP Team)指出:
Thomas Lindgren 的建议在 R11 中是正确的,但在即将到来的 R12 中,可能会导致性能反而下降。因为 R12 引入的新特性中有“Sub Binary”,大多数情况下,编译器会自动优化上述
<<_C:1/binary, Rest/binary>>的语句,避免生成另一个 binary 。
R12还有什么新特性?令人期待啊。
Steve Vinoski 提出了一个 map reduce 的方案(代码见后面),获得了比较好的效果。
Klacke (即Claes Wikstorm)指出:
Spawn cat 方法的缺陷在于缺乏控制。即,这可能导致大量消息发往 port 的所有者,造成大量来不及处理的消息堆积在 message queue 中(也就是 Ulf Wiger 提到的 input too fast 问题)。在 port 方法中,可以用 {line, L} 模式的 port 来改善这一问题,但仍然无法彻底解决缺乏控制的问题。
他认为,当前最佳“按行处理文件”的办法是:
1. 打开文件 file:open(Filename, [read, raw])
2. 在循环中用 buffer 获取文件内容 {ok, Bin} = file:read(Fd, BufSize),
3. 使用 binary 的正则表达式匹配库来处理,比如这个他提到目前 otp 自带的 regex 包,当前的版本还有性能问题。
Steve Vinoski 指出:
虽然用并发改进了速度,但 Erlang 的文件 io 缓慢仍然是无法回避的问题。他进一步深化了讨论,提出了 Erlang 是否有办法/计划改进文件 io 性能这样的本质问题。
Ulf Wiger 指出:
在 Erlang 最初的设计中,文件 io 的设计原则是要尽可能的减少侵入。比如,mnesia的日志写入等,都被设计为低优先级的后台进程,少有需要读取大文件的应用场景。随着 Erlang 向其他的应用领域发展,文件 io 的要求越来越高,此时文件 io 的问题就被越来越多的人诟病。这个问题需要设计者们来加以改进。
Per Gustafsson 给出了目前为止最快的纯 Erlang 方案(代码见后面)。
Klacke 贡献了早在 Bluetail 时期对这一问题的终极解决方案——更好的文件 io 模块 bfile (代码见后面),使用这一模块,可以获得与 C 相当的文件 io 性能,以及直接支持的分行处理。
【方案】
Steve Vinoski 想到了利用 Erlang 强大的并发能力来加速这个过程。他的代码是这样的:
- -module(tbray).
- -export([start/2]).
- find_match("/ongoing/When/" ++ Last) ->
- case lists:member($., Last) of
- false -> 1;
- true -> 0
- end;
- find_match(_) -> 0.
- process_binary(Pid, Bin) ->
- spawn(fun() ->
- L = string:tokens(binary_to_list(Bin), "\n"),
- V = lists:foldl(
- fun(Line, Total) ->
- Total + find_match(lists:nth(7, string:tokens(Line, " "))) end,
- 0, L),
- Pid ! V
- end).
- split_on_newline(Bin, N, All) when size(Bin) < N ->
- All ++ [Bin];
- split_on_newline(Bin, N, All) ->
- {_, <<C:8, _/binary>>} = split_binary(Bin, N),
- case C of
- $\n ->
- {B21, B22} = split_binary(Bin, N+1),
- split_on_newline(B22, N, All ++ [B21]);
- _ -> split_on_newline(Bin, N+1, All)
- end.
- split_on_newline(Bin, N) when N == size(Bin) -> [Bin];
- split_on_newline(Bin, N) -> split_on_newline(Bin, N, []).
- start(Num, Input) ->
- {ok, Data} = file:read_file(Input),
- Bins = split_on_newline(Data, size(Data) div Num),
- Me = self(),
- Pids = [process_binary(Me, B) || B <- Bins],
- lists:foldl(
- fun(_, Total) -> receive X -> Total + X end end,
- 0, Pids).
他的主要思路是,读取文件,拆分成 Num 个部分进行“分行”处理,然后用“map reduce”来处理匹配。
运行时:
- $ erl -smp enable +P 60000
- 1> c(tbray).
- {ok,tbray}
- 2> timer:tc(tbray, start, [1, "o10k.ap"]).
- {661587,1101}
当他把 process 数量调到 4000 左右的时候,获得了比较稳定的速度。
Caoyuan 的另一个方案是这样的:
- -module(tbray).
- -compile([native]).
- -export([start/2,
- collect_loop/2,
- buffered_read/3]).
- -include_lib("kernel/include/file.hrl").
- %% The best BUFFER_SIZE is 4096
- -define(BUFFER_SIZE, 4096).
- -record(context, {lineBuf = [],
- matched = 0,
- total = 0,
- lastProcessedSeq = 0,
- dataBuf = [],
- processNum}).
- %% erl -smp enable +P 60000
- %% timer:tc(wide, start, [1000, "o1000k.ap"]).
- start(ProcessNum, FileName) ->
- statistics(wall_clock),
- {ok, FileInfo} = file:read_file_info(FileName),
- Size = FileInfo#file_info.size,
- {ok, File} = file:open(FileName, [read, binary]),
- Collect = spawn(?MODULE, collect_loop, [self(), #context{processNum = ProcessNum}]),
- psplit_read_file(Collect, File, Size div ProcessNum, ProcessNum, 1),
- {Matched, Total} =
- receive
- #context{matched=MatchedX, total=TotalX} -> {MatchedX, TotalX}
- end,
- {_, Duration2} = statistics(wall_clock),
- io:format("scan lines:\t ~pms~nMatched: ~B, Total: ~B~n", [Duration2, Matched, Total]).
- psplit_read_file(_Collector, _File, _ChunkSize, ProcessNum, I) when I > ProcessNum -> done;
- psplit_read_file(Collector, File, ChunkSize, ProcessNum, I) ->
- spawn(
- fun () ->
- Offset = ChunkSize * (I - 1),
- %% if it's last chuck, read all bytes left, which will not exceed ChunkSize * 2
- Length = if I == ProcessNum -> ChunkSize * 2;
- true -> ChunkSize
- end,
- {ok, Data} = file:pread(File, Offset, Length),
- Collector ! {I, Data}
- end),
- psplit_read_file(Collector, File, ChunkSize, ProcessNum, I + 1).
- collect_loop(Pid, #context{lastProcessedSeq= ProcessNum,
- processNum=ProcessNum}=Context) -> Pid ! Context;
- collect_loop(Pid, #context{dataBuf=DataBuf}=Context) ->
- receive
- {Seq, Data} ->
- SortedDatas = lists:keysort(1, [{Seq, Data} | DataBuf]),
- Context1 = process_arrived_datas(SortedDatas, Context#context{dataBuf = []}),
- %io:format("Last processed Seq: ~B~n", [Context1#context.lastProcessedSeq]),
- collect_loop(Pid, Context1)
- end.
- process_arrived_datas([], Context) -> Context;
- process_arrived_datas([{Seq, Data}|T], #context{lineBuf=LineBuf,
- matched=Matched,
- total=Total,
- lastProcessedSeq=LastProcessedSeq,
- dataBuf=DataBuf}=Context) ->
- if Seq == LastProcessedSeq + 1 ->
- {LineBuf1, Matched1, Total1} = buffered_read(
- fun (Buffer, {LineBufX, MatchedX, TotalX}) ->
- scan_line(binary_to_list(Buffer), LineBufX, MatchedX, TotalX)
- end, {LineBuf, Matched, Total}, Data),
- process_arrived_datas(T, Context#context{lineBuf = LineBuf1,
- matched = Matched1,
- total = Total1,
- lastProcessedSeq = Seq});
- true ->
- process_arrived_datas(T, Context#context{dataBuf = [{Seq, Data} | DataBuf]})
- end.
- buffered_read(Fun, Acc, Bin) ->
- case Bin of
- <<Buf:?BUFFER_SIZE/binary, Rest/binary>> ->
- Acc1 = Fun(Buf, Acc),
- buffered_read(Fun, Acc1, Rest);
- _ ->
- Fun(Bin, Acc)
- end.
- scan_line([], LineBuf, Matched, Total) -> {LineBuf, Matched, Total};
- scan_line([$\n|Rest], LineBuf, Matched, Total) ->
- Line1 = lists:reverse(LineBuf),
- %io:format("~n~s~n", [Line1]),
- Matched1 = Matched + process_match(Line1),
- scan_line(Rest, [], Matched1, Total + 1);
- scan_line([C|Rest], LineBuf, Matched, Total) ->
- scan_line(Rest, [C | LineBuf], Matched, Total).
- process_match([]) -> 0;
- process_match("/ongoing/When/"++Rest) ->
- case match_until_space(Rest, false) of
- true -> 0;
- false -> 1
- end;
- process_match([_H|Rest]) ->
- process_match(Rest).
- match_until_space([$\040|_Rest], Bool) -> Bool;
- match_until_space([$.|_Rest], _Bool) -> true;
- match_until_space([_H|Rest], Bool) ->
- match_until_space(Rest, Bool).
这个方案起了多个进程来并发读取文件,然后用 collect_loop 将各个片段转换为小的 lists ,scan_line 会进行分行处理,然后用 process_match 来匹配。这里的要点在于,lists的大小为 4096 比较合适。
Per Gustafsson 的方案是这样的:
- %%%-------------------------------------------------------------------
- %%% File : line_server.erl
- %%% Author : Per Gustafsson <pergu@it.uu.se>
- %%% Description :
- %%%
- %%% Created : 25 Sep 2007 by Per Gustafsson <pergu@it.uu.se>
- %%%-------------------------------------------------------------------
- -module(line_server).
- -export([get_handle/1,get_lines/1,close/1]).
- -compile([native]).
- -define(BUFFER_SIZE, 70000).
- get_handle(FileName) ->
- erlang:spawn_monitor(fun() -> start(FileName) end).
- get_lines({Pid,MRef}) ->
- Pid ! {get_lines, MRef, self()},
- receive
- {{Data,Prep}, MRef} ->
- case divide_lines(Data) of
- [H|T] ->
- [<<Prep/binary,H/binary>>|T];
- [] ->
- [Prep]
- end;
- {eof,MRef} ->
- eof;
- {'DOWN', MRef, _Type, Pid, _Info} ->
- erlang:fault(file_closed)
- end.
- close({Pid,_}) ->
- Pid ! stop.
- start(FileName) ->
- {ok,F} = file:open(FileName,[raw,binary]),
- serve_lines(F,<<>>).
- serve_lines(File,Rest) ->
- {Data, NewRest} =
- get_more_data(File,Rest),
- receive
- {get_lines, Ref, Pid} ->
- Pid ! {Data, Ref},
- serve_lines(File,NewRest);
- stop ->
- file:close(File)
- end.
- get_more_data(File,Rest) ->
- case file: