Home > misc > Comet and Erlang, A perfect match

Comet and Erlang, A perfect match

October 21st, 2008 :: jackyz

家住 London 的 Richard Jones 同学(原来是 Last.fm 的 co-founded 之一)开始了关于 Comet 和 Erlang 的一个文章系列 —— 《A Million-user Comet Application with Mochiweb》。这个系列的第一篇在 [这里] 可以看到原文。

update: 第二篇的原文在[这里]。

update: 第三篇的原文在[这里]。

整个系列已经结束。翻译的同学,趁热。 :D

update:

oscar 同学目前提供了第一篇第二篇的中文译版,辛苦了 oscar !

idisc 同学已经提供了完整三篇的中文译版,分别是:第一篇第二篇第三篇,辛苦了 idisc !

在并不算长的时间里, oscar 和 idisc 同学就给我们提供了这篇极具参考价值文章的高质量翻译,非常难得,严重感谢!有了你们这样热心的成员,我们小而温馨的 Erlang 中文社区一定会变得越来越好。

干货如下:

作为这个系列的开篇, Richard Jones 首先在 mochiweb 上实现了一个原型的 comet 程序。用 Erlang 在 mochiweb 下做这件事已经变得超级简单,可以与[这篇]中的代码对照起来看。

在验证这个程序能正常工作之后 Richard 同学立即对其进行调优。首先是环境,也就是对 Linux Kernel 的 TCP 配置进行优化。其次是工具,他写了一个 Erlang 的小程序,从文件中加载 URL 然后发起连结。通过实测,他得出了自己对性能的结论:

The resident size of the mochiweb beam process with 10,000 active connections was 450MB – that’s 45KB per connection. CPU utilization on the machine was practically nothing, as expected.

应该说 45KB per connection 并不能算是特别好的结果。作者认为,如果用 libevent 的 c 程序,可能能够达到 4.5KB per connection 的水平。comment 中 Bob Ippolito (mochiweb的作者)提到,通过修改 spawn 的 heap_size 属性,可以获得更低的资源消耗水平。

update: 第二篇已经通过让进程 hibernate 的方式调优到了 8k per connection 的水平。

update: 第三篇作者通过利用打了 patch 的 libevent 做了一个 cnode 达到了不可思议的 C1024K 并发负载量。

关于 mochiweb 和 comet ,也是本站一直以来的关注重点,除了“放狗”和“wikipedia”之外,也可以比较便利的参考本站的[这篇]以及[这篇]。

为方便阅读,全文 paste 如下:

A Million-user Comet Application with Mochiweb, Part 1

In this series I will detail what I found out empirically about how mochiweb performs with lots of open connections, and show how to build a comet application using mochiweb, where each mochiweb connection is registered with a router which dispatches messages to various users. We end up with a working application that can cope with a million concurrent connections, and crucially, knowing how much RAM we need to make it work.

In part one:

  • Build a basic comet mochiweb app that sends clients a message every 10 seconds.
  • Tune the Linux kernel to handle lots of TCP connections
  • Build a flood-testing tool to open lots of connections (ye olde C10k test)
  • Examine how much memory this requires per connection.

Future posts in this series will cover how to build a real message routing system, additional tricks to reduce memory usage, and more testing with 100k and 1m concurrent connections.

I assume you know your way around the Linux command line, and know a bit of Erlang.

Building a Mochiweb test application

In brief:

      Install and build Mochiweb
      Run: /your-mochiweb-path/scripts/new_mochiweb.erl mochiconntest
      cd mochiconntest and edit src/mochiconntest_web.erl

This code (mochiconntest_web.erl) just accepts connections and uses chunked transfer to send an initial welcome message, and one message every 10 seconds to every client.

  1. -module(mochiconntest_web).
  2.       -export([start/1, stop/0, loop/2]).
  3.       %% External API
  4.       start(Options) ->
  5.           {DocRoot, Options1} = get_option(docroot, Options),
  6.           Loop = fun (Req) ->
  7.                          ?MODULE:loop(Req, DocRoot)
  8.                  end,
  9.           % we’ll set our maximum to 1 million connections. (default: 2048)
  10.           mochiweb_http:start([{max, 1000000}, {name, ?MODULE}, {loop, Loop} | Options1]).
  11.       
  12.       stop() ->
  13.           mochiweb_http:stop(?MODULE).
  14.       
  15.       loop(Req, DocRoot) ->
  16.           "/" ++ Path = Req:get(path),
  17.           case Req:get(method) of
  18.               Method when Method =:= ‘GET’; Method =:= ‘HEAD’ ->
  19.                   case Path of
  20.                       "test/" ++ Id ->
  21.                           Response = Req:ok({"text/html; charset=utf-8",
  22.                                             [{"Server","Mochiweb-Test"}],
  23.                                             chunked}),
  24.                           Response:write_chunk("Mochiconntest welcomes you! Your Id: " ++ Id ++ "\n"),
  25.                           %% router:login(list_to_atom(Id), self()),
  26.                           feed(Response, Id, 1);
  27.                       _ ->
  28.                           Req:not_found()
  29.                   end;
  30.               ‘POST’ ->
  31.                   case Path of
  32.                       _ ->
  33.                           Req:not_found()
  34.                   end;
  35.               _ ->
  36.                   Req:respond({501, [], []})
  37.           end.
  38.       
  39.       feed(Response, Path, N) ->
  40.           receive
  41.               %{router_msg, Msg} ->
  42.               %    Html = io_lib:format("Recvd msg #~w: ‘~s’<br/>", [N, Msg]),
  43.               %    Response:write_chunk(Html);
  44.           after 10000 ->
  45.               Msg = io_lib:format("Chunk ~w for id ~s\n", [N, Path]),
  46.               Response:write_chunk(Msg)
  47.           end,
  48.           feed(Response, Path, N+1).
  49.       
  50.       %% Internal API
  51.       get_option(Option, Options) ->
  52.           {proplists:get_value(Option, Options), proplists:delete(Option, Options)}.

Start your mochiweb app

make && ./start-dev.sh

By default mochiweb listens on port 8000, on all interfaces. If you are doing this on the desktop, you can test with any web browser. Just navigate to http://localhost:8000/test/foo.

Here’s the command-line test:

$ lynx --source "http://localhost:8000/test/foo"
Mochiconntest welcomes you! Your Id: foo<br/>
Chunk 1 for id foo<br/>
Chunk 2 for id foo<br/>
Chunk 3 for id foo<br/>
^C

Yep, it works. Now let’s make it suffer.

Tuning the Linux Kernel for many tcp connections

Save yourself some time and tune the kernel tcp settings before testing with lots of connections, or your test will fail and you’ll see lots of Out of socket memory messages (and if you are masquerading, nf_conntrack: table full, dropping packet.)

Here are the sysctl settings I ended up with – YMMV, but these will probably do:

# General gigabit tuning:
net.core.rmem_max = 16777216
net.core.wmem_max = 16777216
net.ipv4.tcp_rmem = 4096 87380 16777216
net.ipv4.tcp_wmem = 4096 65536 16777216
net.ipv4.tcp_syncookies = 1
# this gives the kernel more memory for tcp
# which you need with many (100k+) open socket connections
net.ipv4.tcp_mem = 50576   64768   98152
net.core.netdev_max_backlog = 2500
# I was also masquerading the port comet was on, you might not need this
net.ipv4.netfilter.ip_conntrack_max = 1048576

Put these in /etc/sysctl.conf then run sysctl -p to apply them. No need to reboot, now your kernel should be able to handle a lot more open connections, yay.
Creating a lot of connections

There are many ways to do this. Tsung is quite sexy, and there and plenty of other less-sexy ways to spam an httpd with lots of requests (ab, httperf, httpload etc). None of them are ideally suited for testing a comet application, and I’d been looking for an excuse to try the Erlang http client, so I wrote a basic test to make lots of connections.
Just because you can, doesn’t mean you should.. one process per connection would definitely be a waste here. I’m using one process to load urls from a file, and another process to establish and receive messages from all http connections (and one process as a timer to print a report every 10 seconds). All data received from the server is discarded, but it does increment a counter so we can keep track of how many HTTP chunks were delivered.

  1. -module(floodtest).
  2.       -export([start/2, timer/2, recv/1]).
  3.       
  4.       start(Filename, Wait) ->
  5.           inets:start(),
  6.           spawn(?MODULE, timer, [10000, self()]),
  7.           This = self(),
  8.           spawn(fun()-> loadurls(Filename, fun(U)-> This ! {loadurl, U} end, Wait) end),
  9.           recv({0,0,0}).
  10.       
  11.       recv(Stats) ->
  12.           {Active, Closed, Chunks} = Stats,
  13.           receive
  14.               {stats} -> io:format("Stats: ~w\n",[Stats])
  15.               after 0 -> noop
  16.           end,
  17.           receive
  18.               {http,{_Ref,stream_start,_X}} ->  recv({Active+1,Closed,Chunks});
  19.               {http,{_Ref,stream,_X}} ->          recv({Active, Closed, Chunks+1});
  20.               {http,{_Ref,stream_end,_X}} ->  recv({Active-1, Closed+1, Chunks});
  21.               {http,{_Ref,{error,Why}}} ->
  22.                   io:format("Closed: ~w\n",[Why]),
  23.                   recv({Active-1, Closed+1, Chunks});
  24.               {loadurl, Url} ->
  25.                   http:request(get, {Url, []}, [], [{sync, false}, {stream, self}, {version, 1.1}, {body_format, binary}]),
  26.                       recv(Stats)
  27.           end.
  28.       
  29.       timer(T, Who) ->
  30.           receive
  31.           after T ->
  32.               Who ! {stats}
  33.           end,
  34.           timer(T, Who).
  35.       
  36.       % Read lines from a file with a specified delay between lines:
  37.       for_each_line_in_file(Name, Proc, Mode, Accum0) ->
  38.           {ok, Device} = file:open(Name, Mode),
  39.           for_each_line(Device, Proc, Accum0).
  40.       
  41.       for_each_line(Device, Proc, Accum) ->
  42.           case io:get_line(Device, "") of
  43.               eof  -> file:close(Device), Accum;
  44.               Line -> NewAccum = Proc(Line, Accum),
  45.                           for_each_line(Device, Proc, NewAccum)
  46.           end.
  47.       
  48.       loadurls(Filename, Callback, Wait) ->
  49.           for_each_line_in_file(Filename,
  50.               fun(Line, List) ->
  51.                   Callback(string:strip(Line, right, $\n)),
  52.                   receive
  53.                   after Wait ->
  54.                       noop
  55.                   end,
  56.                   List
  57.               end,
  58.               [read], []).

Each connection we make requires an ephemeral port, and thus a file descriptor, and by default this is limited to 1024. To avoid the Too many open files problem you’ll need to modify the ulimit for your shell. This can be changed in /etc/security/limits.conf, but requires a logout/login. For now you can just sudo and modify the current shell (su back to your non-priv’ed user after calling ulimit if you don’t want to run as root):

$ sudo bash
# ulimit -n 999999
# erl

You might as well increase the ephemeral port range to the maximum too:

# echo "1024 65535" > /proc/sys/net/ipv4/ip_local_port_range

Generate a file of URLs to feed to the floodtest program:

( for i in `seq 1 10000`; do echo "http://localhost:8000/test/$i" ; done ) > /tmp/mochi-urls.txt

From the erlang prompt you can now compile and launch floodtest.erl:

erl> c(floodtest).
erl> floodtest:start("/tmp/mochi-urls.txt", 100).

This will establish 10 new connections per second (ie, 1 connection every 100ms).

It will output stats in the form {Active, Closed, Chunks} where Active is the number of connections currently established, Closed is the number that were terminated for some reason, and Chunks is the number of chunks served by chunked transfer from mochiweb. Closed should stay on 0, and Chunks should be more than Active, because each active connection will receive multiple chunks (1 every 10 seconds).

The resident size of the mochiweb beam process with 10,000 active connections was 450MB – that’s 45KB per connection. CPU utilization on the machine was practically nothing, as expected.

Assessment so far

That was a reasonable first attempt. 45KB per-connection seems a bit high – I could probably cook something up in C using libevent that could do this with closer to 4.5KB per connection (just a guess, if anyone has experience please leave a comment). If you factor in the amount of code and time it took to do this in Erlang compared with C, I think the increased memory usage is more excusable.

In future posts I’ll cover building a message router (so we can uncomment lines 25 and 41-43 in mochiconntest_web.erl) and talk about some ways to reduce the overall memory usage. I’ll also share the results of testing with 100k and 1M connections.

update: 第二篇:

A Million-user Comet Application with Mochiweb, Part 2

In Part 1, we built a (somewhat useless) mochiweb comet application that sent clients a message every 10 seconds. We tuned the Linux kernel, and built a tool to establish a lot of connections in order to test performance and memory usage. We found that it took around 45KB per connection.

Part 2 is about turning the application into something useful, and saving memory:

  • Implement a message router with a login/logout/send API
  • Update the mochiweb app to receive messages from the router
  • Setup a distributed erlang system so we can run the router on a different node/host to mochiweb
  • Write a tool to spam the router with lots of messages
  • Graph memory usage over 24hrs, and optimise the mochiweb app to save memory.

This means we are decoupling the message sending logic from the mochiweb app. In tandem with the floodtest tool from part 1, we can benchmark a setup closer to a production scenario.

Implementing the message router

The router API is just 3 functions:

  • login(Id, Pid) register a process (of pid Pid) to receive messages for Id
  • logout(Pid) to stop receiving messages
  • send(Id, Msg) sends the message Msg to any client logged in as Id

Note that, by design, it is possible for one process to login with multiple different Ids.

This example router module uses 2 ets tables to store bidirectional mappings between Pids and Ids. (pid2id and id2pid in the #state record below.)

下载: router.erl
  1. -module(router).
  2.       -behaviour(gen_server).
  3.       
  4.       -export([start_link/0]).
  5.       -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  6.            terminate/2, code_change/3]).
  7.       
  8.       -export([send/2, login/2, logout/1]).
  9.       
  10.       -define(SERVER, global:whereis_name(?MODULE)).
  11.       
  12.       % will hold bidirectional mapping between id <–> pid
  13.       -record(state, {pid2id, id2pid}).
  14.       
  15.       start_link() ->
  16.           gen_server:start_link({global, ?MODULE}, ?MODULE, [], []).
  17.       
  18.       % sends Msg to anyone logged in as Id
  19.       send(Id, Msg) ->
  20.           gen_server:call(?SERVER, {send, Id, Msg}).
  21.       
  22.       login(Id, Pid) when is_pid(Pid) ->
  23.           gen_server:call(?SERVER, {login, Id, Pid}).
  24.       
  25.       logout(Pid) when is_pid(Pid) ->
  26.           gen_server:call(?SERVER, {logout, Pid}).
  27.       
  28.       %%
  29.       
  30.       init([]) ->
  31.           % set this so we can catch death of logged in pids:
  32.           process_flag(trap_exit, true),
  33.           % use ets for routing tables
  34.           {ok, #state{
  35.                       pid2id = ets:new(?MODULE, [bag]),
  36.                       id2pid = ets:new(?MODULE, [bag])
  37.                      }
  38.           }.
  39.       
  40.       handle_call({login, Id, Pid}, _From, State) when is_pid(Pid) ->
  41.           ets:insert(State#state.pid2id, {Pid, Id}),
  42.           ets:insert(State#state.id2pid, {Id, Pid}),
  43.           link(Pid), % tell us if they exit, so we can log them out
  44.           io:format("~w logged in as ~w\n",[Pid, Id]),
  45.           {reply, ok, State};
  46.       
  47.       handle_call({logout, Pid}, _From, State) when is_pid(Pid) ->
  48.           unlink(Pid),
  49.           PidRows = ets:lookup(State#state.pid2id, Pid),
  50.           case PidRows of
  51.               [] ->
  52.                   ok;
  53.               _ ->
  54.                   IdRows = [ {I,P} || {P,I} <- PidRows ], % invert tuples
  55.                   % delete all pid->id entries
  56.                   ets:delete(State#state.pid2id, Pid),
  57.                   % and all id->pid
  58.                   [ ets:delete_object(State#state.id2pid, Obj) || Obj <- IdRows ]
  59.           end,
  60.           io:format("pid ~w logged out\n",[Pid]),
  61.           {reply, ok, State};
  62.       
  63.       handle_call({send, Id, Msg}, _From, State) ->
  64.           % get pids who are logged in as this Id
  65.           Pids = [ P || { _Id, P } <- ets:lookup(State#state.id2pid, Id) ],
  66.           % send Msg to them all
  67.           M = {router_msg, Msg},
  68.           [ Pid ! M || Pid <- Pids ],
  69.           {reply, ok, State}.
  70.       
  71.       % handle death and cleanup of logged in processes
  72.       handle_info(Info, State) ->
  73.           case Info of
  74.               {‘EXIT’, Pid, _Why} ->
  75.                   % force logout:
  76.                   handle_call({logout, Pid}, blah, State);
  77.               Wtf ->
  78.                   io:format("Caught unhandled message: ~w\n", [Wtf])
  79.           end,
  80.           {noreply, State}.
  81.       
  82.       handle_cast(_Msg, State) ->
  83.           {noreply, State}.
  84.       terminate(_Reason, _State) ->
  85.           ok.
  86.       code_change(_OldVsn, State, _Extra) ->
  87.           {ok, State}.

Updating the mochiweb application

Let’s assume a user is represented by an integer Id based on the URL they connect to mochiweb with, and use that id to register with the message router. Instead of blocking for 10 seconds then sending something, the mochiweb loop will block on receiving messages from the router, and send an HTTP chunk to the client for every message the router sends it:

  • Client connects to mochiweb at http://localhost:8000/test/123
  • Mochiweb app registers the pid for that connection against the id ‘123′ with the message router
  • If you send a message to the router addressed to id ‘123′, it will be relayed to the correct mochiweb process, and appear in the browser for that user

Here’s the updated version of mochiconntest_web.erl:

  1. -module(mochiconntest_web).
  2.       
  3.       -export([start/1, stop/0, loop/2]).
  4.       
  5.       %% External API
  6.       
  7.       start(Options) ->
  8.           {DocRoot, Options1} = get_option(docroot, Options),
  9.           Loop = fun (Req) ->
  10.                          ?MODULE:loop(Req, DocRoot)
  11.                  end,
  12.           % we’ll set our maximum to 1 million connections. (default: 2048)
  13.           mochiweb_http:start([{max, 1000000}, {name, ?MODULE}, {loop, Loop} | Options1]).
  14.       
  15.       stop() ->
  16.           mochiweb_http:stop(?MODULE).
  17.       
  18.       loop(Req, DocRoot) ->
  19.           "/" ++ Path = Req:get(path),
  20.           case Req:get(method) of
  21.               Method when Method =:= ‘GET’; Method =:= ‘HEAD’ ->
  22.                   case Path of
  23.                       "test/" ++ Id ->
  24.                           Response = Req:ok({"text/html; charset=utf-8",
  25.                                             [{"Server","Mochiweb-Test"}],
  26.                                             chunked}),
  27.                           % login using an integer rather than a string
  28.                           {IdInt, _} = string:to_integer(Id),
  29.                           router:login(IdInt, self()),
  30.                           feed(Response, IdInt, 1);
  31.                       _ ->
  32.                           Req:not_found()
  33.                   end;
  34.               ‘POST’ ->
  35.                   case Path of
  36.                       _ ->
  37.                           Req:not_found()
  38.                   end;
  39.               _ ->
  40.                   Req:respond({501, [], []})
  41.           end.
  42.       
  43.       feed(Response, Id, N) ->
  44.           receive
  45.           {router_msg, Msg} ->
  46.               Html = io_lib:format("Recvd msg #~w: ‘~s’", [N, Msg]),
  47.               Response:write_chunk(Html)
  48.           end,
  49.           feed(Response, Id, N+1).
  50.       
  51.       %% Internal API
  52.       
  53.       get_option(Option, Options) ->
  54.           {proplists:get_value(Option, Options), proplists:delete(Option, Options)}.

It’s Alive!

Now let’s bring it to life – we’ll use 2 erlang shells, one for mochiweb and one for the router. Edit start-dev.sh, used to start mochiweb, and add the following additional parameters to erl:

  • -sname n1 to name the erlang node ‘n1′
  • +K true to enable kernel-poll. Seems daft not to when dealing with lots of connections
  • +P 134217727 the default maximum number of processes you can spawn is 32768. Considering we need one process per connection (and I don’t know of any good reason not to) I suggest just setting this to the maximum possible value. 134,217,727 is the max according to “man erl”.

Now run make && ./start-dev.sh and you should see a prompt like this: (n1@localhost)1> – your mochiweb app is now running and the erlang node has a name.

Now run another erlang shell like so:

erl -sname n2

Currently those two erlang instances don’t know about each other, fix that:

(n2@localhost)1> nodes().
[]
(n2@localhost)2> net_adm:ping(n1@localhost).
pong
(n2@localhost)3> nodes().
[n1@localhost]

Now compile and start the router from this shell:

(n2@localhost)4> c(router).
{ok,router}
(n2@localhost)5> router:start_link().
{ok,<0.38.0>}

Now for the fun bit, go to http://localhost:8000/test/123 in your browser (or use lynx –source “http://localhost:8000/test/123″ from the console). Check the shell you launched the router in, you should see it logged in one user.

You can now send messages to the router and watch them appear in your browser. Only send strings for now, because we are using ~s to format them with io_lib:format in the feed function, and atoms will crash it:

Just borrow the shell you used to launch the router:

(n2@localhost)6> router:send(123, "Hello World").
(n2@localhost)7> router:send(123, "Why not open another browser window too?").
(n2@localhost)8> router:send(456, "This message will go into the void unless you are connected as /test/456 too").

Check your browser, you’ve got comet :)

Running in a distributed erlang system

It makes sense to run the router and mochiweb front-end(s) on different machines. Assuming you have a couple of spare machines to test this on, you should start the erlang shells as distributed nodes, i.e. use -name n1@host1.example.com instead of -sname n1 (and the same for n2). Make sure they can see each other by using net_adm:ping(…) as above.

Note that on line 16 of router.erl, the name of the router process (’router’) is registered globally, and that because we are using the following macro to identify/locate the router in calls to gen_server, it will already work fine in a distributed system:

-define(SERVER, global:whereis_name(?MODULE)).

A global name registry for processes in a distributed system is just one of the things you get for free with Erlang.

Generating lots of messages

In a real environment we might see a long-tail like usage pattern, with some very active users and many infrequent users. However for this test we’ll just indiscriminately spam random users with fake messages.

下载: msggen.erl
  1. -module(msggen).
  2.       -export([start/3]).
  3.       
  4.       start(0, _, _) -> ok;
  5.       start(Num, Interval, Max) ->
  6.           Id = random:uniform(Max),
  7.           router:send(Id, "Fake message Num = " ++ Num),
  8.           receive after Interval -> start(Num -1, Interval, Max) end.

This will send Num messages to random user Ids between 1 and Max, waiting Interval ms between each send.

You can see this in action if you run the router and the mochiweb app, connect with your browser to http://localhost:8000/test/3 then run:

erl -sname test
(test@localhost)1> net_adm:ping(n1@localhost).
pong
(test@localhost)2> c(msggen).
{ok,msggen}
(test@localhost)3> msggen:start(20, 10, 5).
ok

This will send 20 messages to random Ids between 1-5, with a 10ms wait between messages. Chances are Id 3 will receive a message or four.

We can even run a few of these in parallel to simulate multiple sources for messages. Here’s an example of spawning 10 processes that each send 20 messages to ids 1-5 with a 100ms delay between each message:

[ spawn(fun() -> msggen:start(20, 100, 5), io:format("~w finished.\n", [self()]) end) || _ <- lists:seq(1,10) ].
[<0.97.0>,<0.98.0>,<0.99.0>,<0.100.0>,<0.101.0>,<0.102.0>,
<0.103.0>,<0.104.0>,<0.105.0>,<0.106.0>]
<0.101.0> finished.
<0.105.0> finished.
<0.106.0> finished.
<0.104.0> finished.
<0.102.0> finished.
<0.98.0> finished.
<0.99.0> finished.
<0.100.0> finished.
<0.103.0> finished.
<0.97.0> finished.

C10K again, with feeling

We have the pieces we need to run another larger-scale test now; clients connect to our mochiweb app, which registers them with the message router. We can generate a high volume of fake messages to fire at the router, which will send them to any registered clients. Let’s run the 10,000 concurrent-user test again from Part 1, but this time we’ll leave all the clients connected for a while while we blast lots of messages through the system.

Assuming you followed the instructions in Part 1 to tune your kernel and increase your max files ulimit etc, this should be easy. You already have the mochiweb app and router running, so let’s dump more traffic on it.

Without any clients connected, the mochiweb beam process uses around 40MB (resident):

$ ps -o rss= -p `pgrep -f 'sname n1'`
40156

This greps for the process ID of the command with ’sname n1′ in it, which is our mochiweb erlang process, then uses some formatting options to ps to print the RSS value – the resident memory size (KB)

I concocted this hideous one-liner to print the timestamp (human readable and a unixtime in case we need it later), current memory usage of mochiweb (resident KB), and the number of currently established connections every 60 seconds – leave this running on the mochiweb machine in a spare terminal:

$ MOCHIPID=`pgrep -f 'name n1'`; while [ 1 ] ; do NUMCON=`netstat -n | awk ‘/ESTABLISHED/ && $4==”127.0.0.1:8000″‘ | wc -l`; MEM=`ps -o rss= -p $MOCHIPID`; echo -e “`date`\t`date +%s`\t$MEM\t$NUMCON”; sleep 60; done | tee -a mochimem.log

If anyone knows a better way to plot memory usage for a single process over time please leave a comment..
Now launch the floodtest tool from Part 1 in a new erl shell:

erl> floodtest:start("/tmp/mochi-urls.txt", 10).

This will establish 100 new connections per second until all 10,000 clients are connected.
You’ll see it quickly reaches 10k connections:

erl> floodtest:start("/tmp/mochi-urls.txt", 10).
Stats: {825,0,0}
Stats: {1629,0,0}
Stats: {2397,0,0}
Stats: {3218,0,0}
Stats: {4057,0,0}
Stats: {4837,0,0}
Stats: {5565,0,0}
Stats: {6295,0,0}
Stats: {7022,0,0}
Stats: {7727,0,0}
Stats: {8415,0,0}
Stats: {9116,0,0}
Stats: {9792,0,0}
Stats: {10000,0,0}
...

Check the hideous memory usage one-liner output:

Mon Oct 20 16:57:24 BST 2008 1224518244 40388 1
Mon Oct 20 16:58:25 BST 2008 1224518305 41120 263
Mon Oct 20 16:59:27 BST 2008 1224518367 65252 5267
Mon Oct 20 17:00:32 BST 2008 1224518432 89008 9836
Mon Oct 20 17:01:37 BST 2008 1224518497 90748 10001
Mon Oct 20 17:02:41 BST 2008 1224518561 90964 10001
Mon Oct 20 17:03:46 BST 2008 1224518626 90964 10001
Mon Oct 20 17:04:51 BST 2008 1224518691 90964 10001

It reached 10k concurrent connections (plus one I had open in firefox) and the resident memory size of mochiweb is around 90MB (90964KB).

Now unleash some messages:

erl> [ spawn(fun() -> msggen:start(1000000, 100, 10000) end) || _ <- lists:seq(1,100) ].
[<0.65.0>,<0.66.0>,<0.67.0>,<0.68.0>,<0.69.0>,<0.70.0>,
<0.71.0>,<0.72.0>,<0.73.0>,<0.74.0>,<0.75.0>,<0.76.0>,
<0.77.0>,<0.78.0>,<0.79.0>,<0.80.0>,<0.81.0>,<0.82.0>,
<0.83.0>,<0.84.0>,<0.85.0>,<0.86.0>,<0.87.0>,<0.88.0>,
<0.89.0>,<0.90.0>,<0.91.0>,<0.92.0>,<0.93.0>|...]

That’s 100 processes each sending a million messages at a rate of 10 messages a second to random Ids from 1 to 10,000. That means the router is seeing 1000 messages per second, and on average each of our 10k clients will get one message every 10 seconds.

Check the output in the floodtest shell, and you’ll see clients are receiving http chunks (remember it was {NumConnected, NumClosed, NumChunksRecvd}):

...
Stats: {10000,0,5912}
Stats: {10000,0,15496}
Stats: {10000,0,25145}
Stats: {10000,0,34755}
Stats: {10000,0,44342}
...

A million messages at a rate of 10 per second per process will take 27 hours to complete. Here’s how the memory usage looks after just 10 mins:

Mon Oct 20 16:57:24 BST 2008 1224518244 40388 1
Mon Oct 20 16:58:25 BST 2008 1224518305 41120 263
Mon Oct 20 16:59:27 BST 2008 1224518367 65252 5267
Mon Oct 20 17:00:32 BST 2008 1224518432 89008 9836
Mon Oct 20 17:01:37 BST 2008 1224518497 90748 10001
Mon Oct 20 17:02:41 BST 2008 1224518561 90964 10001
Mon Oct 20 17:03:46 BST 2008 1224518626 90964 10001
Mon Oct 20 17:04:51 BST 2008 1224518691 90964 10001
Mon Oct 20 17:05:55 BST 2008 1224518755 90980 10001
Mon Oct 20 17:07:00 BST 2008 1224518820 91120 10001
Mon Oct 20 17:08:05 BST 2008 1224518885 98664 10001
Mon Oct 20 17:09:10 BST 2008 1224518950 106752 10001
Mon Oct 20 17:10:15 BST 2008 1224519015 114044 10001
Mon Oct 20 17:11:20 BST 2008 1224519080 119468 10001
Mon Oct 20 17:12:25 BST 2008 1224519145 125360 10001

You can see the size already crept up from 40MB to 90MB when all 10k clients were connected, and to 125MB after running a bit longer.

It’s worth pointing out that the floodtest shell is almost CPU-bound, the msggen shell is using 2% CPU and the router and mochiweb less than 1%. (ie, only simulating lots of clients is using much CPU – the server app itself is very light on the CPU). It helps to have multiple machines, or a multicore CPU for testing.

Results after running for 24 hours

I ran this for 24 hours, whilst logging memory usage of the mochiweb process to mochimem.log. This is with 10,000 connected clients, and 1000 messages per second being sent to random clients.

The following bit of bash/awk was used to trick gnuplot into turning the mochimem.log file into a graph:

(echo -e "set terminal png size 500,300\nset xlabel \"Minutes Elapsed\"\nset ylabel \"Mem (KB)\"\nset title \"Mem usage with 10k active connections, 1000 msg/sec\"\nplot \"-\" using 1:2 with lines notitle" ; awk 'BEGIN{FS="\t";} NR%10==0 {if(!t){t=$2} mins=($2-t)/60; printf("%d %d\n",mins,$3)}' mochimem.log ; echo -e "end" ) | gnuplot > mochimem.png

Graph of memory usage with c10k, 1000msg/sec, 24hrs
Memory usage with c10k, 1000msg/sec, 24hrs

This graph shows the memory usage (with 10k active connections and 1000 msgs/sec) levels off at around 250MB over a 24 hour period. The two big drops, once near the start and once at the end of the test, are when I ran this in the mochiweb erlang process, just out of curiosity:

erl> [erlang:garbage_collect(P) || P <- erlang:processes()].

This forces all processes to garbage collect, and reclaimed around 100MB of memory – next up we investigate ways to save memory without resorting to manually forcing garbage collection.

Reducing memory usage in mochiweb

Seeing as the mochiweb app is just sending messages and then immediately forgetting them, the memory usage shouldn’t need to increase with the number of messages sent.

I’m a novice when it comes to Erlang memory management, but I’m going to assume that if I can force it to garbage collect more often, it will allow us to reclaim much of that memory, and ultimately let us serve more users with less overall system memory. We might burn a bit more CPU in the process, but that’s an acceptable trade-off.

Digging around in the erlang docs yields this option:

erlang:system_flag(fullsweep_after, Number)

Number is a non-negative integer which indicates how many times generational garbages collections can be done without forcing a fullsweep collection. The value applies to new processes; processes already running are not affected.
In low-memory systems (especially without virtual memory), setting the value to 0 can help to conserve memory.
An alternative way to set this value is through the (operating system) environment variable ERL_FULLSWEEP_AFTER.

Sounds intriguing, but it only applies to new processes and would affect all processes in the VM, not just our mochiweb processes.

Next up is this:

erlang:system_flag(min_heap_size, MinHeapSize)

Sets the default minimum heap size for processes. The size is given in words. The new min_heap_size only effects processes spawned after the change of min_heap_size has been made. The min_heap_size can be set for individual processes by use of spawn_opt/N or process_flag/2.

Could be useful, but I’m pretty sure our mochiweb processes need a bigger heap than the default value anyway. I’d like to avoid needing to patch the mochiweb source to add spawn options if possible.

Next to catch my eye was this:

erlang:hibernate(Module, Function, Args)

Puts the calling process into a wait state where its memory allocation has been reduced as much as possible, which is useful if the process does not expect to receive any messages in the near future.

The process will be awaken when a message is sent to it, and control will resume in Module:Function with the arguments given by Args with the call stack emptied, meaning that the process will terminate when that function returns. Thus erlang:hibernate/3 will never return to its caller.

If the process has any message in its message queue, the process will be awaken immediately in the same way as described above.

In more technical terms, what erlang:hibernate/3 does is the following. It discards the call stack for the process. Then it garbage collects the process. After the garbage collection, all live data is in one continuous heap. The heap is then shrunken to the exact same size as the live data which it holds (even if that size is less than the minimum heap size for the process).

If the size of the live data in the process is less than the minimum heap size, the first garbage collection occurring after the process has been awaken will ensure that the heap size is changed to a size not smaller than the minimum heap size.

Note that emptying the call stack means that any surrounding catch is removed and has to be re-inserted after hibernation. One effect of this is that processes started using proc_lib (also indirectly, such as gen_server processes), should use proc_lib:hibernate/3 instead to ensure that the exception handler continues to work when the process wakes up.

This sounds reasonable – let’s try hibernating after every message and see what happens.

Edit mochiconntest_web.erl and change the following:

  • Make the last line of the feed(Response, Id, N) function call hibernate instead of calling itself
  • Call hibernate immediately after logging into the router, rather than calling feed and blocking on receive
  • Remember to export feed/3 so hibernate can call back into the function on wake-up

Updated mochiconntest_web.erl with hibernation between messages:

  1. -module(mochiconntest_web).
  2.       
  3.       -export([start/1, stop/0, loop/2, feed/3]).
  4.       
  5.       %% External API
  6.       
  7.       start(Options) ->
  8.           {DocRoot, Options1} = get_option(docroot, Options),
  9.           Loop = fun (Req) ->
  10.                          ?MODULE:loop(Req, DocRoot)
  11.                  end,
  12.           % we’ll set our maximum to 1 million connections. (default: 2048)
  13.           mochiweb_http:start([{max, 1000000}, {name, ?MODULE}, {loop, Loop} | Options1]).
  14.       
  15.       stop() ->
  16.           mochiweb_http:stop(?MODULE).
  17.       
  18.       loop(Req, DocRoot) ->
  19.           "/" ++ Path = Req:get(path),
  20.           case Req:get(method) of
  21.               Method when Method =:= ‘GET’; Method =:= ‘HEAD’ ->
  22.                   case Path of
  23.                       "test/" ++ IdStr ->
  24.                           Response = Req:ok({"text/html; charset=utf-8",
  25.                                             [{"Server","Mochiweb-Test"}],
  26.                                             chunked}),
  27.                           {Id, _} = string:to_integer(IdStr),
  28.                           router:login(Id, self()),
  29.                           % Hibernate this process until it receives a message:
  30.                           proc_lib:hibernate(?MODULE, feed, [Response, Id, 1]);
  31.                       _ ->
  32.       
  33.                           Req:not_found()
  34.                   end;
  35.               ‘POST’ ->
  36.                   case Path of
  37.                       _ ->
  38.                           Req:not_found()
  39.                   end;
  40.               _ ->
  41.                   Req:respond({501, [], []})
  42.           end.
  43.       
  44.       feed(Response, Id, N) ->
  45.           receive
  46.           {router_msg, Msg} ->
  47.               Html = io_lib:format("Recvd msg #~w: ‘~w’<br/>", [N, Msg]),
  48.               Response:write_chunk(Html)
  49.           end,
  50.           % Hibernate this process until it receives a message:
  51.           proc_lib:hibernate(?MODULE, feed, [Response, Id, N+1]).
  52.       
  53.       
  54.       %% Internal API
  55.       
  56.       get_option(Option, Options) ->
  57.           {proplists:get_value(Option, Options), proplists:delete(Option, Options)}.

I made these changes, ran make to rebuild mochiweb, then redid the same c10k test (1000msgs/sec for 24hrs).

Results after running for 24 hours w/ proc_lib:hibernate()

Graph of memory usage with c10k, 1000msg/sec, 24hrs, using hibernate()
Memory usage with c10k, 1000msg/sec, 24hrs, using hibernate()

Judicious use of hibernate means the mochiweb application memory levels out at 78MB Resident with 10k connections, much better than the 450MB we saw in Part 1. There was no significant increase in CPU usage.

Summary

We made a comet application on Mochiweb that lets us push arbitrary messages to users identified by an integer ID. After pumping 1000 msgs/sec through it for 24 hours, with 10,000 connected users, we observed it using 80MB, or 8KB per user. We even made pretty graphs.

This is quite an improvement from the 45KB per used we saw in Part 1. The savings are attributed to making the application behave in a more realistic way, and use of hibernate for mochiweb processes between messages.

Next Steps

In Part 3 (coming soon), I’ll turn it up to 1 million connected clients. I will be deploying the test app on a multi-cpu 64-bit server with plenty of RAM. This will show what difference, if any, running on a 64-bit VM makes. I’ll also detail some additional tricks and tuning needed in order to simulate 1 million client connections.

The application will evolve into a sort of pub-sub system, where subscriptions are associated to user Ids and stored by the app, rather than provided by clients when they connect. We’ll load in a typical social-network dataset: friends. This will allow a user to login with their user Id and automatically receive any event generated by one of their friends.

A Million-user Comet Application with Mochiweb, Part 3

Part 1 and Part 2 in this series showed how to build a comet application using mochiweb, and how to route messages to connected users. We managed to squeeze application memory down to 8KB per connection. We did ye olde c10k test, and observed what happened with 10,000 connected users. We made graphs. It was fun, but now it’s time to make good on the claims made in the title, and turn it up to 1 million connections.

This post covers the following:

  • Add a pubsub-like subscription database using Mnesia
  • Generate a realistic friends dataset for a million users
  • Tune mnesia and bulk load in our friends data
  • Opening a million connections from one machine
  • Benchmark with 1 Million connected users
  • Libevent + C for connection handling
  • Final thoughts

One of the challenging parts of this test was actually being able to open 1M connections from a single test machine. Writing a server to accept 1M connections is easier than actually creating 1M connections to test it with, so a fair amount of this article is about the techniques used to open 1M connections from a single machine.
Getting our pubsub on

In Part 2 we used the router to send messages to specific users. This is fine for a chat/IM system, but that there are sexier things we could do instead. Before we launch into a large-scale test, let’s add one more module – a subscription database. We want the application store who your friends are, so it can push you all events generated by people on your friends list.

My intention is to use this for Last.fm so I can get a realtime feed of songs my friends are currently listening to. It could equally apply to other events generated on social networks. Flickr photo uploads, Facebook newsfeed items, Twitter messages etc. FriendFeed even have a realtime API in beta, so this kind of thing is definitely topical. (Although I’ve not heard of anyone except Facebook using Erlang for this kind of thing).

Implementing the subscription-manager

We’re implementing a general subscription manager, but we’ll be subscribing people to everyone on their friends list automatically – so you could also think of this as a friends database for now.

The subsmanager API:

  • add_subscriptions([{Subscriber, Subscribee},...])
  • remove_subscriptions([{Subscriber, Subscribee},...])
  • get_subscribers(User)

subsmanager.erl

  1. -module(subsmanager).
  2.       -behaviour(gen_server).
  3.       -include("/usr/local/lib/erlang/lib/stdlib-1.15.4/include/qlc.hrl").
  4.       -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  5.       -export([add_subscriptions/1,
  6.                remove_subscriptions/1,
  7.                get_subscribers/1,
  8.                first_run/0,
  9.                stop/0,
  10.                start_link/0]).
  11.       -record(subscription, {subscriber, subscribee}).
  12.       -record(state, {}). % state is all in mnesia
  13.       -define(SERVER, global:whereis_name(?MODULE)).
  14.       
  15.       start_link() ->
  16.           gen_server:start_link({global, ?MODULE}, ?MODULE, [], []).
  17.       
  18.       stop() ->
  19.           gen_server:call(?SERVER, {stop}).
  20.       
  21.       add_subscriptions(SubsList) ->
  22.           gen_server:call(?SERVER, {add_subscriptions, SubsList}, infinity).
  23.       
  24.       remove_subscriptions(SubsList) ->
  25.           gen_server:call(?SERVER, {remove_subscriptions, SubsList}, infinity).
  26.       
  27.       get_subscribers(User) ->
  28.           gen_server:call(?SERVER, {get_subscribers, User}).
  29.       
  30.       %%
  31.       
  32.       init([]) ->
  33.           ok = mnesia:start(),
  34.           io:format("Waiting on mnesia tables..\n",[]),
  35.           mnesia:wait_for_tables([subscription], 30000),
  36.           Info = mnesia:table_info(subscription, all),
  37.           io:format("OK. Subscription table info: \n~w\n\n",[Info]),
  38.           {ok, #state{}}.
  39.       
  40.       handle_call({stop}, _From, State) ->
  41.           {stop, stop, State};
  42.       
  43.       handle_call({add_subscriptions, SubsList}, _From, State) ->
  44.           % Transactionally is slower:
  45.           % F = fun() ->
  46.           %         [ ok = mnesia:write(S) || S <- SubsList ]
  47.           %     end,
  48.           % mnesia:transaction(F),
  49.           [ mnesia:dirty_write(S) || S <- SubsList ],
  50.           {reply, ok, State};
  51.       
  52.       handle_call({remove_subscriptions, SubsList}, _From, State) ->
  53.           F = fun() ->
  54.               [ ok = mnesia:delete_object(S) || S <- SubsList ]
  55.           end,
  56.           mnesia:transaction(F),
  57.           {reply, ok, State};
  58.       
  59.       handle_call({get_subscribers, User}, From, State) ->
  60.           F = fun() ->
  61.               Subs = mnesia:dirty_match_object(#subscription{subscriber=‘_’, subscribee=User}),
  62.               Users = [Dude || #subscription{subscriber=Dude, subscribee=_} <- Subs],
  63.               gen_server:reply(From, Users)
  64.           end,
  65.           spawn(F),
  66.           {noreply, State}.
  67.       
  68.       handle_cast(_Msg, State) -> {noreply, State}.
  69.       handle_info(_Msg, State) -> {noreply, State}.
  70.       
  71.       terminate(_Reason, _State) ->
  72.           mnesia:stop(),
  73.           ok.
  74.       
  75.       code_change(_OldVersion, State, _Extra) ->
  76.           io:format("Reloading code for ?MODULE\n",[]),
  77.           {ok, State}.
  78.       
  79.       %%
  80.       
  81.       first_run() ->
  82.           mnesia:create_schema([node()]),
  83.           ok = mnesia:start(),
  84.           Ret = mnesia:create_table(subscription,
  85.           [
  86.            {disc_copies, [node()]},
  87.            {attributes, record_info(fields, subscription)},
  88.            {index, [subscribee]}, %index subscribee too
  89.            {type, bag}
  90.           ]),
  91.           Ret.

Noteworthy points:

  • I’ve included qlc.hrl, needed for mnesia queries using list comprehension, using an absolute path. That can’t be best practice, it wasn’t finding it otherwise though.
  • get_subscribers spawns another process and delegates the job of replying to that process, using gen_server:reply. This means the gen_server loop won’t block on that call if we throw lots of lookups at it and mnesia slows down.
  • rr(”subsmanager.erl”). in the example below allows you to use record definitions in the erl shell. Putting your record definitions into a records.hrl file and including that in your modules is considered better style. I inlined it for brevity.

Now to test it. first_run() creates the mnesia schema, so it’s important to run that first. Another potential gotcha with mnesia is that (by default) the database can only be accessed by the node that created it, so give the erl shell a name, and stick with it.

$ mkdir /var/mnesia
$ erl -boot start_sasl -mnesia dir '"/var/mnesia_data"' -sname subsman
(subsman@localhost)1> c(subsmanager).
{ok,subsmanager}
(subsman@localhost)2> subsmanager:first_run().
...
{atomic,ok}
(subsman@localhost)3> subsmanager:start_link().
Waiting on mnesia tables..
OK. Subscription table info:
[{access_mode,read_write},{active_replicas,[subsman@localhost]},{arity,3},{attributes,[subscriber,subscribee]},{checkpoints,[]},{commit_work,[{index,bag,[{3,{ram,57378}}]}]},{cookie,{{1224,800064,900003},subsman@localhost}},{cstruct,{cstruct,subscription,bag,[],[subsman@localhost],[],0,read_write,[3],[],false,subscription,[subscriber,subscribee],[],[],{{1224,863164,904753},subsman@localhost},{{2,0},[]}}},{disc_copies,[subsman@localhost]},{disc_only_copies,[]},{frag_properties,[]},{index,[3]},{load_by_force,false},{load_node,subsman@localhost},{load_order,0},{load_reason,{dumper,create_table}},{local_content,false},{master_nodes,[]},{memory,288},{ram_copies,[]},{record_name,subscription},{record_validation,{subscription,3,bag}},{type,bag},{size,0},{snmp,[]},{storage_type,disc_copies},{subscribers,[]},{user_properties,[]},{version,{{2,0},[]}},{where_to_commit,[{subsman@localhost,disc_copies}]},{where_to_read,subsman@localhost},{where_to_write,[subsman@localhost]},{wild_pattern,{subscription,’_',’_'}},{{index,3},57378}]

{ok,<0.105.0>}
(subsman@localhost)4> rr("subsmanager.erl").
[state,subscription]
(subsman@localhost)5> subsmanager:add_subscriptions([ #subscription{subscriber=alice, subscribee=rj} ]).
ok
(subsman@localhost)6> subsmanager:add_subscriptions([ #subscription{subscriber=bob, subscribee=rj} ]).
ok
(subsman@localhost)7> subsmanager:get_subscribers(rj).
[bob,alice]
(subsman@localhost)8> subsmanager:remove_subscriptions([ #subscription{subscriber=bob, subscribee=rj} ]).
ok
(subsman@localhost)8> subsmanager:get_subscribers(rj).
[alice]
(subsman@localhost)10> subsmanager:get_subscribers(charlie).
[]

We’ll use integer Ids to represent users for the benchmark – but for this test I used atoms (rj, alice, bob) and assumed that alice and bob are both on rj’s friends list. It’s nice that mnesia (and ets/dets) doesn’t care what values you use – any Erlang term is valid. This means it’s a simple upgrade to support multiple types of resource. You could start using {user, 123} or {photo, 789} to represent different things people might subscribe to, without changing anything in the subsmanager module.

Modifying the router to use subscriptions

Instead of addressing messages to specific users, ie router:send(123, “Hello user 123″), we’ll mark messages with a subject – that is, the person who generated the message (who played the song, who uploaded the photo etc) – and have the router deliver the message to every user who has subscribed to the subject user. In other words, the API will work like this: router:send(123, “Hello everyone subscribed to user 123″)

Updated router.erl:

下载: router.erl.2
  1. -module(router).
  2.       -behaviour(gen_server).
  3.       
  4.       -export([start_link/0]).
  5.       -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  6.            terminate/2, code_change/3]).
  7.       
  8.       -export([send/2, login/2, logout/1]).
  9.       
  10.       -define(SERVER, global:whereis_name(?MODULE)).
  11.       
  12.       % will hold bidirectional mapping between id <–> pid
  13.       -record(state, {pid2id, id2pid}).
  14.       
  15.       start_link() ->
  16.           gen_server:start_link({global, ?MODULE}, ?MODULE, [], []).
  17.       
  18.       % sends Msg to anyone subscribed to Id
  19.       send(Id, Msg) ->
  20.           gen_server:call(?SERVER, {send, Id, Msg}).
  21.       
  22.       login(Id, Pid) when is_pid(Pid) ->
  23.           gen_server:call(?SERVER, {login, Id, Pid}).
  24.       
  25.       logout(Pid) when is_pid(Pid) ->
  26.           gen_server:call(?SERVER, {logout, Pid}).
  27.       
  28.       %%
  29.       
  30.       init([]) ->
  31.           % set this so we can catch death of logged in pids:
  32.           process_flag(trap_exit, true),
  33.           % use ets for routing tables
  34.           {ok, #state{
  35.                       pid2id = ets:new(?MODULE, [bag]),
  36.                       id2pid = ets:new(?MODULE, [bag])
  37.                      }
  38.           }.
  39.       
  40.       handle_call({login, Id, Pid}, _From, State) when is_pid(Pid) ->
  41.           ets:insert(State#state.pid2id, {Pid, Id}),
  42.           ets:insert(State#state.id2pid, {Id, Pid}),
  43.           link(Pid), % tell us if they exit, so we can log them out
  44.           %io:format("~w logged in as ~w\n",[Pid, Id]),
  45.           {reply, ok, State};
  46.       
  47.       handle_call({logout, Pid}, _From, State) when is_pid(Pid) ->
  48.           unlink(Pid),
  49.           PidRows = ets:lookup(State#state.pid2id, Pid),
  50.           case PidRows of
  51.               [] ->
  52.                   ok;
  53.               _ ->
  54.                   IdRows = [ {I,P} || {P,I} <- PidRows ], % invert tuples
  55.                   ets:delete(State#state.pid2id, Pid),   % delete all pid->id entries
  56.                   [ ets:delete_object(State#state.id2pid, Obj) || Obj <- IdRows ] % and all id->pid
  57.           end,
  58.           %io:format("pid ~w logged out\n",[Pid]),
  59.           {reply, ok, State};
  60.       
  61.       handle_call({send, Id, Msg}, From, State) ->
  62.           F = fun() ->
  63.               % get users who are subscribed to Id:
  64.               Users = subsmanager:get_subscribers(Id),
  65.               io:format("Subscribers of ~w = ~w\n",[Id, Users]),
  66.               % get pids of anyone logged in from Users list:
  67.               Pids0 = lists:map(
  68.                   fun(U)->
  69.                       [ P || { _I, P } <- ets:lookup(State#state.id2pid, U) ]
  70.                   end,
  71.                   [ Id | Users ] % we are always subscribed to ourselves
  72.               ),
  73.               Pids = lists:flatten(Pids0),
  74.               io:format("Pids: ~w\n", [Pids]),
  75.               % send Msg to them all
  76.               M = {router_msg, Msg},
  77.               [ Pid ! M || Pid <- Pids ],
  78.               % respond with how many users saw the message
  79.               gen_server:reply(From, {ok, length(Pids)})
  80.           end,
  81.           spawn(F),
  82.           {noreply, State}.
  83.       
  84.       % handle death and cleanup of logged in processes
  85.       handle_info(Info, State) ->
  86.           case Info of
  87.               {‘EXIT’, Pid, _Why} ->
  88.                   handle_call({logout, Pid}, blah, State);
  89.               Wtf ->
  90.                   io:format("Caught unhandled message: ~w\n", [Wtf])
  91.           end,
  92.           {noreply, State}.
  93.       
  94.       handle_cast(_Msg, State) ->
  95.           {noreply, State}.
  96.       terminate(_Reason, _State) ->
  97.           ok.
  98.       code_change(_OldVsn, State, _Extra) ->
  99.           {ok, State}.

And here’s a quick test that doesn’t require mochiweb – I’ve used atoms instead of user ids, and omitted some output for clarity:

(subsman@localhost)1> c(subsmanager), c(router), rr("subsmanager.erl").
(subsman@localhost)2> subsmanager:start_link().
(subsman@localhost)3> router:start_link().
(subsman@localhost)4> Subs = [#subscription{subscriber=alice, subscribee=rj}, #subscription{subscriber=bob, subscribee=rj}].
[#subscription{subscriber = alice,subscribee = rj},
#subscription{subscriber = bob,subscribee = rj}]
(subsman@localhost)5> subsmanager:add_subscriptions(Subs).
ok
(subsman@localhost)6> router:send(rj, “RJ did something”).
Subscribers of rj = [bob,alice]
Pids: []
{ok,0}
(subsman@localhost)7> router:login(alice, self()).
ok
(subsman@localhost)8> router:send(rj, “RJ did something”).
Subscribers of rj = [bob,alice]
Pids: [<0.46.0>]
{ok,1}
(subsman@localhost)9> receive {router_msg, M} -> io:format(”~s\n”,[M]) end.
RJ did something
ok

This shows how alice can a receive a message when the subject is someone she is subscribed to (rj), even though the message wasn’t sent directly to alice. The output shows that the router identified possible targets as [alice,bob] but only delivered the message to one person, alice, because bob was not logged in.

Generating a typical social-network friends dataset

We could generate lots of friend relationships at random, but that’s not particularly realistic. Social networks tend to exhibit a power law distribution. Social networks usually have a few super-popular users (some Twitter users have over 100,000 followers) and plenty of people with just a handful of friends. The Last.fm friends data is typical – it fits a Barabási–Albert graph model, so that’s what I’ll use.

To generate the dataset I’m using the python module from the excellent igraph library:

fakefriends.py:

  1. import igraph
  2.       g = igraph.Graph.Barabasi(1000000, 15, directed=False)
  3.       print "Edges: " + str(g.ecount()) + " Verticies: " + str(g.vcount())
  4.       g.write_edgelist("fakefriends.txt")

This will generate with 2 user ids per line, space separated. These are the friend relationships we’ll load into our subsmanager. User ids range from 1 to a million.

Bulk loading friends data into mnesia

This small module will read the fakefriends.txt file and create a list of subscription records.

readfriends.erl – to read the fakefriends.txt and create subscription records:

  1. -module(readfriends).
  2.       -export([load/1]).
  3.       -record(subscription, {subscriber, subscribee}).
  4.       
  5.       load(Filename) ->
  6.           for_each_line_in_file(Filename,
  7.               fun(Line, Acc) ->
  8.                   [As, Bs] = string:tokens(string:strip(Line, right, $\n), " "),
  9.                   {A, _} = string:to_integer(As),
  10.                   {B, _} = string:to_integer(Bs),
  11.                   [ #subscription{subscriber=A, subscribee=B} | Acc ]
  12.               end, [read], []).
  13.       
  14.       % via: http://www.trapexit.org/Reading_Lines_from_a_File
  15.       for_each_line_in_file(Name, Proc, Mode, Accum0) ->
  16.           {ok, Device} = file:open(Name, Mode),
  17.           for_each_line(Device, Proc, Accum0).
  18.       
  19.       for_each_line(Device, Proc, Accum) ->
  20.           case io:get_line(Device, "") of
  21.               eof  -> file:close(Device), Accum;
  22.               Line -> NewAccum = Proc(Line, Accum),
  23.                           for_each_line(Device, Proc, NewAccum)
  24.           end.

Now in the subsmanager shell, you can read from the text file and add the subscriptions:

$ erl -name router@minifeeds4.gs2 +K true +A 128 -setcookie secretcookie -mnesia dump_log_write_threshold 50000 -mnesia dc_dump_limit 40
erl> c(readfriends), c(subsmanager).
erl> subsmanager:first_run().
erl> subsmanager:start_link().
erl> subsmanager:add_subscriptions( readfriends:load("fakefriends.txt") ).

Note the additional mnesia parameters – these are to avoid the ** WARNING ** Mnesia is overloaded messages you would (probably) otherwise see. Refer to my previous post: On bulk loading data into Mnesia for alternative ways to load in lots of data. The best solution seems to be (as pointed out in the comments, thanks Jacob!) to set those options. The Mnesia reference manual contains many other settings under Configuration Parameters, and is worth a look.

Turning it up to 1 Million

Creating a million tcp connections from one host is non-trivial. I’ve a feeling that people who do this regularly have small clusters dedicated to simulating lots of client connections, probably running a real tool like Tsung. Even with the tuning from Part 1 to increase kernel tcp memory, increase the file descriptor ulimits and set the local port range to the maximum, we will still hit a hard limit on ephemeral ports. When making a tcp connection, the client end is allocated (or you can specify) a port from the range in /proc/sys/net/ipv4/ip_local_port_range. It doesn’t matter if you specify it manually, or use an ephemeral port, you’re still going to run out. In Part 1, we set the range to “1024 65535″ – meaning there are 65535-1024 = 64511 unprivileged ports available. Some of them will be used by other processes, but we’ll never get over 64511 client connections, because we’ll run out of ports.

The local port range is assigned per-IP, so if we make our outgoing connections specifically from a range of different local IP addresses, we’ll be able to open more than 64511 outgoing connections in total.

So let’s bring up 17 new IP addresses, with the intention of making 62,000 connections from each – giving us a total of 1,054,000 connections. Safely over the 2^32 mark:

$ for i in `seq 1 17`; do echo sudo ifconfig eth0:$i 10.0.0.$i up ; done

If you run ifconfig now you should see your virtual interfaces: eth0:1, eth0:2 … eth0:17, each with a different IP address. Obviously you should chose a sensible part of whatever address space you are using.

All that remains now is to modify the floodtest tool from Part 1 to specify the local IP it should connect from… Unfortunately the erlang http client doesn’t let you specify the source IP. Neither does ibrowse, the alternative http client library. Damn.

[crazy idea]
At this point I considered another option: bringing up 17 pairs of IPs – one on the server and one on the client – each pair in their own isolated /30 subnet. I think that if I then made the client connect to any given server IP, it would force the local address to be other half of the pair on that subnet, because only one of the local IPs would actually be able to reach the server IP. In theory, this would mean declaring the local source IP on the client machine would not be necessary (although the range of server IPs would need to be specified). I don’t know if this would really work – it sounded plausible at the time. In the end I decided it was too perverted and didn’t try it.
[/crazy idea]

I also poked around in OTP’s http_transport code and considered adding support for specifying the local IP. It’s not really a feature you usually need in an HTTP client though, and it would certainly have been more work.

gen_tcp lets you specify the source address, so I ended up writing a rather crude client using gen_tcp specifically for this test:

floodtest2.erl

  1. -module(floodtest2).
  2.       -compile(export_all).
  3.       -define(SERVERADDR, "10.1.2.3"). % where mochiweb is running
  4.       -define(SERVERPORT, 8000).
  5.       
  6.       % Generate the config in bash like so (chose some available address space):
  7.       % EACH=62000; for i in `seq 1 17`; do echo "{{10,0,0,$i}, $((($i-1)*$EACH+1)), $(($i*$EACH))}, "; done
  8.       
  9.       run(Interval) ->
  10.               Config = [
  11.       {{10,0,0,1}, 1, 62000},
  12.       {{10,0,0,2}, 62001, 124000},
  13.       {{10,0,0,3}, 124001, 186000},
  14.       {{10,0,0,4}, 186001, 248000},
  15.       {{10,0,0,5}, 248001, 310000},
  16.       {{10,0,0,6}, 310001, 372000},
  17.       {{10,0,0,7}, 372001, 434000},
  18.       {{10,0,0,8}, 434001, 496000},
  19.       {{10,0,0,9}, 496001, 558000},
  20.       {{10,0,0,10}, 558001, 620000},
  21.       {{10,0,0,11}, 620001, 682000},
  22.       {{10,0,0,12}, 682001, 744000},
  23.       {{10,0,0,13}, 744001, 806000},
  24.       {{10,0,0,14}, 806001, 868000},
  25.       {{10,0,0,15}, 868001, 930000},
  26.       {{10,0,0,16}, 930001, 992000},
  27.       {{10,0,0,17}, 992001, 1054000}],
  28.               start(Config, Interval).
  29.       
  30.       start(Config, Interval) ->
  31.               Monitor = monitor(),
  32.               AdjustedInterval = Interval / length(Config),
  33.               [ spawn(fun start/5, [Lower, Upper, Ip, AdjustedInterval, Monitor])
  34.                 || {Ip, Lower, Upper}  <- Config ],
  35.               ok.
  36.       
  37.       start(LowerID, UpperID, _, _, _) when LowerID == UpperID -> done;
  38.       start(LowerID, UpperID, LocalIP, Interval, Monitor) ->
  39.               spawn(fun connect/5, [?SERVERADDR, ?SERVERPORT, LocalIP, "/test/"++LowerID, Monitor]),
  40.               receive after Interval -> start(LowerID + 1, UpperID, LocalIP, Interval, Monitor) end.
  41.       
  42.       connect(ServerAddr, ServerPort, ClientIP, Path, Monitor) ->
  43.               Opts = [binary, {packet, 0}, {ip, ClientIP}, {reuseaddr, true}, {active, false}],
  44.               {ok, Sock} = gen_tcp:connect(ServerAddr, ServerPort, Opts),
  45.               Monitor ! open,
  46.               ReqL = io_lib:format("GET ~s\r\nHost: ~s\r\n\r\n", [Path, ServerAddr]),
  47.               Req = list_to_binary(ReqL),
  48.               ok = gen_tcp:send(Sock, [Req]),
  49.               do_recv(Sock, Monitor),
  50.               (catch gen_tcp:close(Sock)),
  51.               ok.
  52.       
  53.       do_recv(Sock, Monitor)->
  54.               case gen_tcp:recv(Sock, 0) of
  55.                       {ok, B} ->
  56.                               Monitor ! {bytes, size(B)},
  57.                               io:format("Recvd ~s\n", [ binary_to_list(B)]),
  58.                               io:format("Recvd ~w bytes\n", [size(B)]),
  59.                               do_recv(Sock, Monitor);
  60.                       {error, closed} ->
  61.                               Monitor ! closed,
  62.                               closed;
  63.                       Other ->
  64.                               Monitor ! closed,
  65.                               io:format("Other:~w\n",[Other])
  66.               end.
  67.       
  68.       % Monitor process receives stats and reports how much data we received etc:
  69.       monitor() ->
  70.               Pid = spawn(?MODULE, monitor0, [{0,0,0,0}]),
  71.               timer:send_interval(10000, Pid, report),
  72.               Pid.
  73.       
  74.       monitor0({Open, Closed, Chunks, Bytes}=S) ->
  75.               receive
  76.                       report  -> io:format("{Open, Closed, Chunks, Bytes} = ~w\n",[S]);
  77.                       open    -> monitor0({Open + 1, Closed, Chunks, Bytes});
  78.                       closed  -> monitor0({Open, Closed + 1, Chunks, Bytes});
  79.                       chunk   -> monitor0({Open, Closed, Chunks + 1, Bytes});
  80.                       {bytes, B} -> monitor0({Open, Closed, Chunks, Bytes + B})
  81.               end.

As an initial test I was connecting to the mochiweb app from Part 1 – it simply sends one message to every client every 10 seconds.

erl> c(floodtest2), floodtest2:run(20).

This quickly ate all my memory.

Turns out opening lots of connections with gen_tcp like that eats a lot of ram. I think it’d need ~36GB to make it work without any additional tuning. I’m not interested in trying to optimise my quick-hack erlang http client (in the real world, this would be 1M actual web browsers), and the only machine I could get my hands on that has more than 32GB of RAM is one of our production databases, and I can’t find a good excuse to take Last.fm offline whilst I test this :) Additionally, it seems like it still only managed to open around 64,500 ports. Hmm.

At this point I decided to break out the trusty libevent, which I was pleased to discover has an HTTP API. Newer versions also have a evhttp_connection_set_local_address function in the http API. This sounds promising.

Here’s the http client in C using libevent:

下载: httpclient.c
  1. #include <sys/types.h>
  2.       #include <sys/time.h>
  3.       #include <sys/queue.h>
  4.       #include <stdlib.h>
  5.       #include <err.h>
  6.       #include <event.h>
  7.       #include <evhttp.h>
  8.       #include <unistd.h>
  9.       #include <stdio.h>
  10.       #include <sys/socket.h>
  11.       #include <netinet/in.h>
  12.       #include <time.h>
  13.       #include <pthread.h>
  14.       
  15.       #define BUFSIZE 4096
  16.       #define NUMCONNS 62000
  17.       #define SERVERADDR "10.103.1.43"
  18.       #define SERVERPORT 8000
  19.       #define SLEEP_MS 10
  20.       
  21.       char buf[BUFSIZE];
  22.       
  23.       int bytes_recvd = 0;
  24.       int chunks_recvd = 0;
  25.       int closed = 0;
  26.       int connected = 0;
  27.       
  28.       // called per chunk received
  29.       void chunkcb(struct evhttp_request * req, void * arg)
  30.       {
  31.           int s = evbuffer_remove( req->input_buffer, &buf, BUFSIZE );
  32.           //printf("Read %d bytes: %s\n", s, &buf);
  33.           bytes_recvd += s;
  34.           chunks_recvd++;
  35.           if(connected >= NUMCONNS && chunks_recvd%10000==0)
  36.               printf(">Chunks: %d\tBytes: %d\tClosed: %d\n", chunks_recvd, bytes_recvd, closed);
  37.       }
  38.       
  39.       // gets called when request completes
  40.       void reqcb(struct evhttp_request * req, void * arg)
  41.       {
  42.           closed++;
  43.       }
  44.       
  45.       int main(int argc, char **argv)
  46.       {
  47.           event_init();
  48.           struct evhttp *evhttp_connection;
  49.           struct evhttp_request *evhttp_request;
  50.           char addr[16];
  51.           char path[32]; // eg: "/test/123"
  52.           int i,octet;
  53.           for(octet=1; octet<=17; octet++){
  54.               sprintf(&addr, "10.224.0.%d", octet);
  55.               for(i=1;i<=NUMCONNS;i++) {
  56.                   evhttp_connection = evhttp_connection_new(SERVERADDR, SERVERPORT);
  57.                   evhttp_connection_set_local_address(evhttp_connection, &addr);
  58.                   evhttp_set_timeout(evhttp_connection, 864000); // 10 day timeout
  59.                   evhttp_request = evhttp_request_new(reqcb, NULL);
  60.                   evhttp_request->chunk_cb = chunkcb;
  61.                   sprintf(&path, "/test/%d", ++connected);
  62.                   if(i%100==0)  printf("Req: %s\t->\t%s\n", addr, &path);
  63.                   evhttp_make_request( evhttp_connection, evhttp_request, EVHTTP_REQ_GET, path );
  64.                   evhttp_connection_set_timeout(evhttp_request->evcon, 864000);
  65.                   event_loop( EVLOOP_NONBLOCK );
  66.                   if( connected % 200 == 0 )
  67.                       printf("\nChunks: %d\tBytes: %d\tClosed: %d\n", chunks_recvd, bytes_recvd, closed);
  68.                   usleep(SLEEP_MS*1000);
  69.               }
  70.           }
  71.           event_dispatch();
  72.           return 0;
  73.       }

Most parameters are hardcoded as #define’s so you configure it by editing the source and recompiling.

Compile and run:

$ gcc -o httpclient httpclient.c -levent
$ ./httpclient

This still failed to open more than 64,500 ports. Although it used less RAM doing it.

It turns out that although I was specifying the local addresses, the ephemeral port allocation somewhere in the kernel or tcp stack didn’t care, and still ran out after 2^16. So in order to open more than 64,500 connections, you need to specify the local address and local port yourself, and manage them accordingly.

Unfortunately the libevent HTTP API doesn’t have an option to specify the local port. I patched libevent to add a suitable function:

void evhttp_connection_set_local_port(struct evhttp_connection *evcon, u_short port);.

This was a surprisingly pleasant experience; libevent seems well written, and the documentation is pretty decent too.

With my modified libevent installed, I was able to add the following under the set_local_address line in the above code:

evhttp_connection_set_local_port(evhttp_connection, 1024+i);

With that in place, multiple connections from different addresses were able to use the same local port number, specific to the the local address. I recompiled the client and let it run for a bit to confirm it would break the 2^16 barrier.

Netstat confirms it:

# netstat -n | awk '/^tcp/ {t[$NF]++}END{for(state in t){print state, t[state]}}’
TIME_WAIT 8
ESTABLISHED 118222

This shows how many ports are open in various states. We’re finally able to open more than 2^16 connections, phew.

Now we have a tool capable of opening a million http connections from a single box. It seems to consume around 2KB per connection, plus whatever the kernel needs. It’s time to use it for the “million connected user” test against our mochiweb comet server.

C1024K Test – 1 million comet connections

For this test I used 4 different servers of varying specs. These specs may be overpowered for the experiment, but they were available and waiting to go into production, and this made a good burn-in test. All four servers are on the same gigabit LAN, with up to 3 switches and a router in the middle somewhere.

The 1 million test I ran is similar to the 10k test from parts 1 and 2, the main difference being the modified client, now written in C using libevent, and that I’m running in a proper distributed-erlang setup with more than one machine.

On server 1 – Quad-core 2GHz CPU, 16GB of RAM

  • Start subsmanager
  • Load in the friends data
  • Start the router

On server 2 – Dual Quad-core 2.8GHz CPU, 32GB of RAM

  • Start mochiweb app

On server 3 – Quad-core 2GHz CPU, 16GB of RAM

  • Create 17 virtual IPs as above
  • Install patched libevent
  • Run client: ./httpclient to create 100 connections per second, up to 1M

On server 4 – Dual-core 2GHz, 2GB RAM

  • Run msggen program, to send lots of messages to the router

I measured the memory usage of mochiweb during the ramp-up to a million connections, and for the rest of the day:

mochiweb Memory, 1M connections

The httpclient has a built in delay of 10ms between connections, so it took nearly 3 hours to open a million connections. The resident memory used by the mochiweb process with 1M open connections was around 25GB. Here’s the server this was running on as seen by Ganglia, which measures CPU, network and memory usage and produces nice graphs:

server running mochiweb, 1M connections

You can see it needs around 38GB and has started to swap. I suspect the difference is mostly consumed by the kernel to keep those connections open. The uplift at the end is when I started sending messages.

Messages were generated using 1,000 processes, with an average time between messages of 60ms per process, giving around 16,666 messages per second overall:

erl> [ spawn( fun()->msggen:start(1000000, 10+random:uniform(100), 1000000) end) || I <- lists:seq(1,1000) ].

The machine (server-4) generating messages looked like this on Ganglia:

16,666 messages/s

That’s 10 MB per second of messages it’s pumping out – 16,666 messages a second. Typically these messages would come from a message bus, app servers, or part of an existing infrastructure.

When I started sending messages, the load on server 1 (hosting subsmanager and router) stayed below 1, and CPU utilization increased from 0 to 5%.

CPU on server 2 (hosting mochiweb app, with 1M connections) increased more dramatically:

mochiweb server

Naturally as processes have to leave their hibernate state to handle messages, memory usage will increase slightly. Having all connections open with no messages is a best-case for memory usage – unsurprisingly, actually doing stuff requires more memory.

So where does this leave us? To be on the safe side, the mochiweb machine would need 40GB of RAM to hold open 1M active comet connections. Under load, up to 30GB of the memory would be used by the mochiweb app, and the remaining 10GB by the kernel. In other words, you need to allow 40KB per connection.

During various test with lots of connections, I ended up making some additional changes to my sysctl.conf. This was part trial-and-error, I don’t really know enough about the internals to make especially informed decisions about which values to change. My policy was to wait for things to break, check /var/log/kern.log and see what mysterious error was reported, then increase stuff that sounded sensible after a spot of googling. Here are the settings in place during the above test:

net.core.rmem_max = 33554432
net.core.wmem_max = 33554432
net.ipv4.tcp_rmem = 4096 16384 33554432
net.ipv4.tcp_wmem = 4096 16384 33554432
net.ipv4.tcp_mem = 786432 1048576 26777216
net.ipv4.tcp_max_tw_buckets = 360000
net.core.netdev_max_backlog = 2500
vm.min_free_kbytes = 65536
vm.swappiness = 0
net.ipv4.ip_local_port_range = 1024 65535

I would like to learn more about Linux tcp tuning so I can make a more informed decision about these settings. These are almost certainly not optimal, but at least they were enough to get to 1M connections. These changes, along with the fact this is running on a 64bit Erlang VM, and thus has a wordsize of 8bytes instead of 4, might explain why the memory usage is much higher than I observed during the C10k test of part 2.

An Erlang C-Node using Libevent

After dabbling with the HTTP api for libevent, it seemed entirely sensible to try the 1M connection test against a libevent HTTPd written in C so we have a basis for comparison.

I’m guessing that enabling kernel poll means the erlang VM is able to use epoll (or similar), but even so there’s clearly some overhead involved which we might be able to mitigate by delegating the connection handling to a C program using libevent. I want to reuse most of the Erlang code so far, so let’s do the bare minimum in C – just the connection handling and HTTP stuff.

Libevent has an asynchronous HTTP API, which makes implementing http servers trivial – well, trivial for C, but still less trivial than mochiweb IMO ;) I’d also been looking for an excuse to try the Erlang C interface, so the following program combines the two. It’s a comet http server in C using libevent which identifies users using an integer Id (like our mochiweb app), and also acts as an Erlang C-Node.

It connects to a designated erlang node, listens for messages like {123, <<"Hello user 123">>} then dispatches “Hello user 123″ to user 123, if connected. Messages for users that are not connected are discarded, just like previous examples.

httpdcnode.c

下载: httpdcnode.c
  1. #include <sys/types.h>
  2.       #include <sys/time.h>
  3.       #include <sys/queue.h>
  4.       #include <stdlib.h>
  5.       #include <err.h>
  6.       #include <event.h>
  7.       #include <evhttp.h>
  8.       #include <stdio.h>
  9.       #include <sys/socket.h>
  10.       #include <netinet/in.h>
  11.       
  12.       #include "erl_interface.h"
  13.       #include "ei.h"
  14.       
  15.       #include <pthread.h>
  16.       
  17.       #define BUFSIZE 1024
  18.       #define MAXUSERS (17*65536) // C1024K
  19.       
  20.       // List of current http requests by uid:
  21.       struct evhttp_request * clients[MAXUSERS+1];
  22.       // Memory to store uids passed to the cleanup callback:
  23.       int slots[MAXUSERS+1];
  24.       
  25.       // called when user disconnects
  26.       void cleanup(struct evhttp_connection *evcon, void *arg)
  27.       {
  28.           int *uidp = (int *) arg;
  29.           fprintf(stderr, "disconnected uid %d\n", *uidp);
  30.           clients[*uidp] = NULL;
  31.       }
  32.       
  33.       // handles http connections, sets them up for chunked transfer,
  34.       // extracts the user id and registers in the global connection table,
  35.       // also sends a welcome chunk.
  36.       void request_handler(struct evhttp_request *req, void *arg)
  37.       {
  38.               struct evbuffer *buf;
  39.               buf = evbuffer_new();
  40.               if (buf == NULL){
  41.                   err(1, "failed to create response buffer");
  42.               }
  43.       
  44.               evhttp_add_header(req->output_headers, "Content-Type", "text/html; charset=utf-8");
  45.       
  46.               int uid = -1;
  47.               if(strncmp(evhttp_request_uri(req), "/test/", 6) == 0){
  48.                   uid = atoi( 6+evhttp_request_uri(req) );
  49.               }
  50.       
  51.               if(uid <= 0){
  52.                   evbuffer_add_printf(buf, "User id not found, try /test/123 instead");
  53.                   evhttp_send_reply(req, HTTP_NOTFOUND, "Not Found", buf);
  54.                   evbuffer_free(buf);
  55.                   return;
  56.               }
  57.       
  58.               if(uid > MAXUSERS){
  59.                   evbuffer_add_printf(buf, "Max uid allowed is %d", MAXUSERS);
  60.                   evhttp_send_reply(req, HTTP_SERVUNAVAIL, "We ran out of numbers", buf);
  61.                   evbuffer_free(buf);
  62.                   return;
  63.               }
  64.       
  65.               evhttp_send_reply_start(req, HTTP_OK, "OK");
  66.               // Send welcome chunk:
  67.               evbuffer_add_printf(buf, "Welcome, Url: ‘%s’ Id: %d\n", evhttp_request_uri(req), uid);
  68.               evhttp_send_reply_chunk(req, buf);
  69.               evbuffer_free(buf);
  70.       
  71.               // put reference into global uid->connection table:
  72.               clients[uid] = req;
  73.               // set close callback
  74.               evhttp_connection_set_closecb( req->evcon, cleanup, &slots[uid] );
  75.       }
  76.       
  77.       
  78.       // runs in a thread - the erlang c-node stuff
  79.       // expects msgs like {uid, msg} and sends a a ‘msg’ chunk to uid if connected
  80.       void cnode_run()
  81.       {
  82.           int fd;                                  /* fd to Erlang node */
  83.           int got;                                 /* Result of receive */
  84.           unsigned char buf[BUFSIZE];              /* Buffer for incoming message */
  85.           ErlMessage emsg;                         /* Incoming message */
  86.       
  87.           ETERM *uid, *msg;
  88.       
  89.           erl_init(NULL, 0);
  90.       
  91.           if (erl_connect_init(1, "secretcookie", 0) == -1)
  92.               erl_err_quit("erl_connect_init");
  93.       
  94.           if ((fd = erl_connect("httpdmaster@localhost")) < 0)
  95.               erl_err_quit("erl_connect");
  96.       
  97.           fprintf(stderr, "Connected to httpdmaster@localhost\n\r");
  98.       
  99.           struct evbuffer *evbuf;
  100.       
  101.           while (1) {
  102.               got = erl_receive_msg(fd, buf, BUFSIZE, &emsg);
  103.               if (got == ERL_TICK) {
  104.                   continue;
  105.               } else if (got == ERL_ERROR) {
  106.                   fprintf(stderr, "ERL_ERROR from erl_receive_msg.\n");
  107.                   break;
  108.               } else {
  109.                   if (emsg.type == ERL_REG_SEND) {
  110.                       // get uid and body data from eg: {123, <<"Hello">>}
  111.                       uid = erl_element(1, emsg.msg);
  112.                       msg = erl_element(2, emsg.msg);
  113.                       int userid = ERL_INT_VALUE(uid);
  114.                       char *body = (char *) ERL_BIN_PTR(msg);
  115.                       int body_len = ERL_BIN_SIZE(msg);
  116.                       // Is this userid connected?
  117.                       if(clients[userid]){
  118.                           fprintf(stderr, "Sending %d bytes to uid %d\n", body_len, userid);               
  119.                           evbuf = evbuffer_new();
  120.                           evbuffer_add(evbuf, (const void*)body, (size_t) body_len);
  121.                           evhttp_send_reply_chunk(clients[userid], evbuf);
  122.                           evbuffer_free(evbuf);
  123.                       }else{
  124.                           fprintf(stderr, "Discarding %d bytes to uid %d - user not connected\n",
  125.                                   body_len, userid);               
  126.                           // noop
  127.                       }
  128.                       erl_free_term(emsg.msg);
  129.                       erl_free_term(uid);
  130.                       erl_free_term(msg);
  131.                   }
  132.               }
  133.           }
  134.           // if we got here, erlang connection died.
  135.           // this thread is supposed to run forever
  136.           // TODO - gracefully handle failure / reconnect / etc
  137.           pthread_exit(0);
  138.       }
  139.       
  140.       int main(int argc, char **argv)
  141.       {
  142.           // Launch the thread that runs the cnode:
  143.           pthread_attr_t tattr;
  144.           pthread_t helper;
  145.           int status;
  146.           pthread_create(&helper, NULL, cnode_run, NULL);
  147.       
  148.           int i;
  149.           for(i=0;i<=MAXUSERS;i++) slots[i]=i;
  150.           // Launch libevent httpd:
  151.           struct evhttp *httpd;
  152.           event_init();
  153.           httpd = evhttp_start("0.0.0.0", 8000);
  154.           evhttp_set_gencb(httpd, request_handler, NULL);
  155.           event_dispatch();
  156.           // Not reached, event_dispatch() shouldn’t return
  157.           evhttp_free(httpd);
  158.           return 0;
  159.       }

The maximum number of users is #defined, and similarly to the mochiweb server, it listens on port 8000 and expects users to connect with a path like so: /test/. Also hardcoded is the name of the erlang node it will connect to in order to receive messages, httpdmaster@localhost, and the erlang cookie, “secretcookie”. Change these accordingly.

Run the erlang node it will connect to first:

$ erl -setcookie secretcookie -sname httpdmaster@localhost

Compile and run like so:

$ gcc -o httpdcnode httpdcnode.c -lerl_interface -lei -levent
$ ./httpdcnode

In the erlang shell, check you can see the hidden c-node:

erl> nodes(hidden).
[c1@localhost]

Now connect in your browser to http://localhost:8000/test/123. You should see the welcome message.

Now back to the erlang shell – send a message to the C node:

erl> {any, c1@localhost} ! {123, <<"Hello Libevent World">>}.

Note that we don’t have a Pid to use, so we use the alternate representation of {procname, node}. We use ‘any’ as the process name, which is ignored by the C-node.

Now you’re able to deliver comet messages via Erlang, but all the http connections are managed by a libevent C program which acts as an Erlang node.

After removing the debug print statements, I connected 1M clients to the httpdcnode server using the same client as above, the machine showed a total of just under 10GB or memory used. The resident memory of the server process was stable at under 2GB:

memory of libevent based server process, 1M connection

So big savings compared to mochiweb when handling lots of connections – the resident memory per connection for the server process with libevent is just under 2KB. With everything connected, the server machine claims:
Mem: 32968672k total, 9636488k used, 23332184k free, 180k buffers
So the kernel/tcp stack is consuming an additional 8KB per connection, which seems a little high, but I have no basis for comparison.

This libevent-cnode server needs a bit more work. It doesn’t sensibly handle multiple connections from the same user yet, and there’s no locking so a race condition exists if you disconnect at just when a message was going to be dispatched.

Even so, I think this could be generalized in such a way that would allow you to use Erlang for all the interesting stuff, and have a C+libevent process act as a dumb connection-pool. With a bit more wrapper code and callbacks into Erlang, you’d hardly need to know this was going on – the C program could be run as a driver or a C-node, and an Erlang wrapper could give you a decent api built on top of libevent. (see this post for an example Erlang C driver). I would like to experiment further with this.

Final Thoughts

I have enough data now to judge how much hardware would be needed if we deploy a large scale comet system for Last.fm. Even a worst case of 40KB per connection isn’t unreasonable – memory is pretty cheap at the moment, and 40GB to support a million users is not unreasonable. 10GB is even better. I will finish up the app I’m building and deploy it somewhere people can try it out. Along the way I’ll tidy up the erlang memcached client I’m using and release that (from jungerl, with modifications for consistent hashing and some bug fixes), and some other things. Stay tuned :)

update: 全文转完,关于 erlang / linux / tcp tuning 这是一个绝佳的系列文章,极具参考价值。

thanks you Richard Jones!

misc , , , , , ,

  1. refactor
    October 22nd, 2008 at 10:26 | #1

    在MacOS下测试时报错econnrefused,mochi-urls.txt文件的url不能用localhost,得用127.0.0.1才行

    在Erlang的邮件列表中找到了解释:
    We have seen the same issue with CouchDB. What we found out
    is that in our case localhost was not only resolving to 127.0.0.1 (IPv4)
    but also ::1 (IPv6) and that http:request() would try to connect to
    ::1 where no service was listening.

    Erlang的http模块缺省是打开IPv6模式的,当测试程序通过http模块连接localhost时,连接的地址是IPv6的::1地址,但是mochi没有在此监听,所以连接出错 {error,econnrefused}

    解决方法两种:
    1. 将mochi-urls.txt中的地址都改成127.0.0.1
    2. 在http:request(…)运行之前调用http:set_options([{ipv6,disabled}]),关闭测试程序的IPv6模式,使用IPv4模式

    理论上还有一种方法:让mochi服务器开启IPv6模式监听,这个我不知道这么开,
    参考
    You should teach Yaws to listen also on IPv6 – “localhost” resolves not
    only to IPv4 127.0.0.1, but also to IPv6 ::1.

    有点跑题了,呵呵

  2. Arbow
    October 24th, 2008 at 12:05 | #2

    第二篇里面:
    http://www.metabrew.com/article/a-million-user-comet-application-with-mochiweb-part-2/
    通过调用proc_lib:hibernate(),优化到8KB per user

  3. October 28th, 2008 at 12:27 | #3

    中文翻译见:http://ro4tub.blogspot.com/2008/10/mochiwebcomet.html

  4. October 29th, 2008 at 14:50 | #4

    嗯,非常好。通过hibernate以及min process heap size实现了进一步优化。
    不知道作者下一步如何实现.

  5. Arbow
    November 11th, 2008 at 11:23 | #5

    辛苦了。。。3 IN 1 的杀猫大荟萃,结合了libevent之后真的很惊人啊,百万连接,这时候带宽就成瓶颈了

  6. idisc
    November 19th, 2008 at 14:26 | #6
  7. jackyz
    November 30th, 2008 at 21:03 | #7

    再次感谢 oscar 和 idisc 同学,这篇连同译文,稍后都会收集到 edoc 当中去。edoc 的作业,欠大了。

  1. No trackbacks yet.