Using wpool and pg2 to build broadcastable worker pools
wpool is my preferred erlang worker pool library. But there is no easy way to broadcast a message to all spawned workers. Or is there?
In this article we will have a look at wpool, how to easily replace a single
gen_server
with a process pool and finally how to utilize process groups to
broadcast messages to all the workers in a wpool
.
wpool
The thing that makes wpool great is how it interfaces with your existing
code. You notice that messages to a gen_server
are building up and are not
processed quickly enough. Changing a few calls from gen_server
to
wpool
is trivial and you have a fully operational worker pool to handle the task.
Example
The code for the example can be found at this repository. Each commit coincides with one of the steps below. The initial commit sets up the project with erlang.mk.
If we look at the second commit
we can see the simple gen_server
that we will use in this example. It wraps
around a Counter that increases each time we call our bump
function.
2> {ok, Pid} = broadcast:start_link().
{ok,<0.37.0>}
3> broadcast:bump(Pid).
ok
4>
=INFO REPORT==== 6-Feb-2016::15:09:24 ===
Counter of <0.37.0> is at 1
4> broadcast:bump(Pid).
=INFO REPORT==== 6-Feb-2016::15:09:27 ===
Counter of <0.37.0> is at 2
ok
So far so good. Now let’s make good on our promise to show how easy it is to wrap that in a worker pool with wpool.
The next commit
changes the calls from gen_server
to wpool
and adds it as a project
dependency so it gets downloaded and compiled by erlang.mk.
1> application:ensure_all_started(worker_pool).
{ok,[worker_pool]}
2> broadcast:start_pool().
{ok,<0.41.0>}
3> broadcast:bump().
=INFO REPORT==== 6-Feb-2016::15:26:32 ===
Counter of <0.44.0> is at 1
ok
4> broadcast:bump().
=INFO REPORT==== 6-Feb-2016::15:26:33 ===
Counter of <0.44.0> is at 2
ok
5> broadcast:bump().
=INFO REPORT==== 6-Feb-2016::15:26:34 ===
Counter of <0.44.0> is at 3
ok
6> [broadcast:bump() || X <- lists:seq(1,5)].
=INFO REPORT==== 6-Feb-2016::15:27:11 ===
Counter of <0.44.0> is at 4
=INFO REPORT==== 6-Feb-2016::15:27:11 ===
Counter of <0.53.0> is at 1
=INFO REPORT==== 6-Feb-2016::15:27:11 ===
Counter of <0.45.0> is at 1
=INFO REPORT==== 6-Feb-2016::15:27:11 ===
Counter of <0.46.0> is at 1
=INFO REPORT==== 6-Feb-2016::15:27:11 ===
Counter of <0.47.0> is at 1
Well that was easy; but how can we now send a message reliably to all workers in that pool as stated in the opening of the article?
The answer is simple. We put all of our workers into a process group. For this example we choose pg2, which is shipped with erlang. But notice that there are a lot of different process group libraries out there that might fit your use case better.
Now we have a way of accessing all our worker pids as well as an API function to send all of them a message.
1> application:ensure_all_started(worker_pool).
{ok,[worker_pool]}
=INFO REPORT==== 6-Feb-2016::15:37:01 ===
Creating wpool ETS table2>
2> l(broadcast).
{module,broadcast}
3> broadcast:start_pool().
{ok,<0.43.0>}
4> broadcast:bump().
=INFO REPORT==== 6-Feb-2016::15:37:21 ===
Counter of <0.46.0> is at 1
ok
5> broadcast:bump_all().
=INFO REPORT==== 6-Feb-2016::15:37:27 ===
Counter of <0.46.0> is at 2
=INFO REPORT==== 6-Feb-2016::15:37:27 ===
Counter of <0.47.0> is at 1
<0.58.0>
=INFO REPORT==== 6-Feb-2016::15:37:27 ===
Counter of <0.48.0> is at 1
6>
=INFO REPORT==== 6-Feb-2016::15:37:27 ===
Counter of <0.49.0> is at 1
=INFO REPORT==== 6-Feb-2016::15:37:27 ===
Counter of <0.50.0> is at 1
=INFO REPORT==== 6-Feb-2016::15:37:27 ===
Counter of <0.51.0> is at 1
=INFO REPORT==== 6-Feb-2016::15:37:27 ===
Counter of <0.52.0> is at 1
=INFO REPORT==== 6-Feb-2016::15:37:27 ===
Counter of <0.53.0> is at 1
=INFO REPORT==== 6-Feb-2016::15:37:27 ===
Counter of <0.54.0> is at 1
=INFO REPORT==== 6-Feb-2016::15:37:27 ===
Counter of <0.55.0> is at 1