Last time, we looked at how to send from multiple sockets. Believe it or not, we have pretty much introduced most of the core concepts you will need. As a recap, here is what we have already covered.
So from here on in it is just a matter of going through some of the well known patterns from the ZeroMQ guide.
Now it would be immoral (even fraudulent) of me to not mention this up front, in the main, the information that I present in the remaining posts in this series of posts, will be based quite heavily on the ZeroMQ guide by Pieter Hintjens. Pieter has actually been in touch with me regarding this series of posts, and has been kind enough to let me run each new post by him. I think he is generous, and I am extremely pleased to have Pieter on hand, to run them past. What that means to you, is that if there are any misunderstandings/mistakes on my behalf, I am sure Pieter will be pointing them out (at which point I will obviously correct any mistakes made, hopefully I will not make any). So big thanks go out to Pieter, cheers as would say in England.
It is all good publicity for ZeroMQ though, and as NetMQ is a native port it is not one of the ones covered by the language bindings on the ZeroMQ guide site. So even though I am basing my content on the fantastic work done by Pieter, it will obviously be using NetMQ, so from that point of view, the code is still very much relevant.
Where is the Code?
As always before we start, it’s only polite to tell you where the code is, and it is as before on GitHub:
What Will We Be Doing This Time?
This time, we will continue to look at ZeroMQ patterns which is actually what the remaining posts will all pretty much be focused on.
The pattern that we will look at this time involves dividing a problem domain into smaller chunks and distributing them across workers, and then collating the results back together again.
This pattern is really a “divide and conquer” one, but it has also been called “Parallel Pipeline”. With all the remaining posts, I will be linking back to the original portion of the guide such that you can read more about the problem and Pieter’s solution.
ZeroMQ Guide Divide And Conquer: http://zguide.zeromq.org/page:all#Divide-and-Conquer
The idea is that you have something that generates work, and then distributes the work out to n-many workers. The workers each do some work, and push their results to some other process (could be a thread too) where the workers’ results are accumulated.
In the ZeroMQ guide, it shows an example that has the work generator just tell each worker to sleep for a period of time. I toyed with creating a more elaborate example than this, but in the end felt that the examples simplicity was quite important, so have stuck with the workload for each worker just being a value that tells the work to sleep for a number of milliseconds (thus simulating some actual work). This as I say has been borrowed from the ZeroMQ guide.
In real life, the work could obviously be anything, though you would more than likely want the work to be something that could be cut up and distributed without the work generator caring/knowing how many workers there are.
Here is what we are trying to achieve:
Ventilator
using System;
using NetMQ;
namespace Ventilator
{
public class Program
{
public static void Main(string[] args)
{
Console.WriteLine("====== VENTILATOR ======");
using (NetMQContext ctx = NetMQContext.Create())
{
using (var sender = ctx.CreatePushSocket())
{
sender.Bind("tcp://*:5557");
using (var sink = ctx.CreatePushSocket())
{
sink.Connect("tcp://localhost:5558");
Console.WriteLine("Press enter when worker are ready");
Console.ReadLine();
Console.WriteLine("Sending start of batch to Sink");
sink.Send("0");
Console.WriteLine("Sending tasks to workers");
Random rand= new Random(0);
int totalMs = 0;
for (int taskNumber = 0; taskNumber < 100; taskNumber++)
{
int workload = rand.Next(0, 100);
totalMs += workload;
Console.WriteLine("Workload : {0}", workload);
sender.Send(workload.ToString());
}
Console.WriteLine("Total expected cost : {0} msec", totalMs);
Console.WriteLine("Press Enter to quit");
Console.ReadLine();
}
}
}
}
}
}
Worker
using System;
using System.Threading;
using NetMQ;
namespace Worker
{
public class Program
{
public static void Main(string[] args)
{
Console.WriteLine("====== WORKER ======");
using (NetMQContext ctx = NetMQContext.Create())
{
using (var receiver = ctx.CreatePullSocket())
{
receiver.Connect("tcp://localhost:5557");
using (var sender = ctx.CreatePushSocket())
{
sender.Connect("tcp://localhost:5558");
while (true)
{
string workload = receiver.ReceiveString();
Thread.Sleep(int.Parse(workload));
Console.WriteLine("Sending to Sink");
sender.Send(string.Empty);
}
}
}
}
}
}
}
Sink
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
namespace Sink
{
public class Program
{
public static void Main(string[] args)
{
Console.WriteLine("====== SINK ======");
using (NetMQContext ctx = NetMQContext.Create())
{
using (var receiver = ctx.CreatePullSocket())
{
receiver.Bind("tcp://localhost:5558");
var startOfBatchTrigger = receiver.ReceiveString();
Console.WriteLine("Seen start of batch");
Stopwatch watch = new Stopwatch();
watch.Start();
for (int taskNumber = 0; taskNumber < 100; taskNumber++)
{
var workerDoneTrigger = receiver.ReceiveString();
if (taskNumber % 10 == 0)
{
Console.Write(":");
}
else
{
Console.Write(".");
}
}
watch.Stop();
Console.WriteLine();
Console.WriteLine("Total elapsed time {0} msec", watch.ElapsedMilliseconds);
Console.ReadLine();
}
}
}
}
}
There is a couple of batch files you can use to spin up different amounts of workers, see:
Run1Worker.bat : One worker
Which when run should give you some output like this in the Sink process console output
====== SINK ====== Seen start of batch :………:………:………:………:………:………:………:……… :………:……… Total elapsed time 5695 msec
Run2Worker.bat : two workers
Which when run should give you some output like this in the Sink process Console output
====== SINK ====== Seen start of batch :………:………:………:………:………:………:………:……… :………:……… Total elapsed time 2959 msec
Run4Worker.bat : four workers
Which when run should give you some output like this in the Sink process Console output
====== SINK ====== Seen start of batch :………:………:………:………:………:………:………:……… :………:……… Total elapsed time 1492 msec
There are a couple of points to be aware of with this pattern
- The Ventilator uses a NetMQ PushSocket to distribute work to the workers, this is referred to as load balancing.
- The Ventilator and the Sink are the static parts of the system, whereas workers are dynamic. It is trivial to add more workers, we can just spin up a new instance of a worker, and in theory the work gets done quicker.
- We need to synchronize the starting of the batch (when workers are ready), as if we did not do that, the first worker that connected would get more messages that the rest, which is not really load balanced
- The Sink uses a NetMQ PullSocket to accumulate the results from the workers.