Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / ZeroMQ

ZeroMQ #6 : Divide And Conquer

5.00/5 (1 vote)
2 Sep 2014CPOL4 min read 15.8K  
Divide and conquer

Last time, we looked at how to send from multiple sockets. Believe it or not, we have pretty much introduced most of the core concepts you will need. As a recap, here is what we have already covered.

So from here on in it is just a matter of going through some of the well known patterns from the ZeroMQ guide.

Now it would be immoral (even fraudulent) of me to not mention this up front, in the main, the information that I present in the remaining posts in this series of posts, will be based quite heavily on the ZeroMQ guide by Pieter Hintjens. Pieter has actually been in touch with me regarding this series of posts, and has been kind enough to let me run each new post by him. I think he is generous, and I am extremely pleased to have Pieter on hand, to run them past. What that means to you, is that if there are any misunderstandings/mistakes on my behalf, I am sure Pieter will be pointing them out (at which point I will obviously correct any mistakes made, hopefully I will not make any). So big thanks go out to Pieter, cheers as would say in England.

It is all good publicity for ZeroMQ though, and as NetMQ is a native port it is not one of the ones covered by the language bindings on the ZeroMQ guide site. So even though I am basing my content on the fantastic work done by Pieter, it will obviously be using NetMQ, so from that point of view, the code is still very much relevant.

Where is the Code?

As always before we start, it’s only polite to tell you where the code is, and it is as before on GitHub:

What Will We Be Doing This Time?

This time, we will continue to look at ZeroMQ patterns which is actually what the remaining posts will all pretty much be focused on.

The pattern that we will look at this time involves dividing a problem domain into smaller chunks and distributing them across workers, and then collating the results back together again.

This pattern is really a “divide and conquer” one, but it has also been called “Parallel Pipeline”. With all the remaining posts, I will be linking back to the original portion of the guide such that you can read more about the problem and Pieter’s solution.

ZeroMQ Guide Divide And Conquer: http://zguide.zeromq.org/page:all#Divide-and-Conquer

The idea is that you have something that generates work, and then distributes the work out to n-many workers. The workers each do some work, and push their results to some other process (could be a thread too) where the workers’ results are accumulated.

In the ZeroMQ guide, it shows an example that has the work generator just tell each worker to sleep for a period of time. I toyed with creating a more elaborate example than this, but in the end felt that the examples simplicity was quite important, so have stuck with the workload for each worker just being a value that tells the work to sleep for a number of milliseconds (thus simulating some actual work). This as I say has been borrowed from the ZeroMQ guide.

In real life, the work could obviously be anything, though you would more than likely want the work to be something that could be cut up and distributed without the work generator caring/knowing how many workers there are.

Here is what we are trying to achieve:

image

Ventilator

C#
using System;
using NetMQ;

namespace Ventilator
{
    public class Program
    {
        public static void Main(string[] args)
        {
            // Task Ventilator
            // Binds PUSH socket to tcp://localhost:5557
            // Sends batch of tasks to workers via that socket
            Console.WriteLine("====== VENTILATOR ======");

            using (NetMQContext ctx = NetMQContext.Create())
            {
                //socket to send messages on
                using (var sender = ctx.CreatePushSocket())
                {
                    sender.Bind("tcp://*:5557");

                    using (var sink = ctx.CreatePushSocket())
                    {
                        sink.Connect("tcp://localhost:5558");

                        Console.WriteLine("Press enter when worker are ready");
                        Console.ReadLine();

                        //the first message it "0" and signals start of batch
                        //see the Sink.csproj Program.cs file for where this is used
                        Console.WriteLine("Sending start of batch to Sink");
                        sink.Send("0");

                        Console.WriteLine("Sending tasks to workers");

                        //initialise random number generator
                        Random rand= new Random(0);

                        //expected costs in Ms
                        int totalMs = 0;

                        //send 100 tasks (workload for tasks, is just some random sleep time that
                        //the workers can perform, in real life each work would do more than sleep
                        for (int taskNumber = 0; taskNumber < 100; taskNumber++)
                        {
                            //Random workload from 1 to 100 msec
                            int workload = rand.Next(0, 100);
                            totalMs += workload;
                            Console.WriteLine("Workload : {0}", workload);
                            sender.Send(workload.ToString());
                        }
                        Console.WriteLine("Total expected cost : {0} msec", totalMs);
                        Console.WriteLine("Press Enter to quit");
                        Console.ReadLine();
                    }
                }
            }
        }
    }
}

Worker

C#
using System;
using System.Threading;
using NetMQ;

namespace Worker
{
    public class Program
    {
        public static void Main(string[] args)
        {
            // Task Worker
            // Connects PULL socket to tcp://localhost:5557
            // collects workload for socket from Ventilator via that socket
            // Connects PUSH socket to tcp://localhost:5558
            // Sends results to Sink via that socket
            Console.WriteLine("====== WORKER ======");
            using (NetMQContext ctx = NetMQContext.Create())
            {
                //socket to receive messages on
                using (var receiver = ctx.CreatePullSocket())
                {
                    receiver.Connect("tcp://localhost:5557");
                    //socket to send messages on
                    using (var sender = ctx.CreatePushSocket())
                    {
                        sender.Connect("tcp://localhost:5558");
                        //process tasks forever
                        while (true)
                        {
                            //workload from the vetilator is a simple delay
                            //to simulate some work being done, see
                            //Ventilator.csproj Proram.cs for the workload sent
                            //In real life some more meaningful work would be done
                            string workload = receiver.ReceiveString();
                            //simulate some work being done
                            Thread.Sleep(int.Parse(workload));
                            //send results to sink, sink just needs to know worker
                            //is done, message content is not important, just the precence of
                            //a message means worker is done.
                            //See Sink.csproj Proram.cs
                            Console.WriteLine("Sending to Sink");
                            sender.Send(string.Empty);
                        }
                    }
                }
            }
        }
    }
}

Sink

C#
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;

namespace Sink
{
    public class Program
    {
        public static void Main(string[] args)
        {
            // Task Sink
            // Bindd PULL socket to tcp://localhost:5558
            // Collects results from workers via that socket
            Console.WriteLine("====== SINK ======");

            using (NetMQContext ctx = NetMQContext.Create())
            {
                //socket to receive messages on
                using (var receiver = ctx.CreatePullSocket())
                {
                    receiver.Bind("tcp://localhost:5558");
                    //wait for start of batch (see Ventilator.csproj Program.cs)
                    var startOfBatchTrigger = receiver.ReceiveString();
                    Console.WriteLine("Seen start of batch");
                    //Start our clock now
                    Stopwatch watch = new Stopwatch();
                    watch.Start();
                    for (int taskNumber = 0; taskNumber < 100; taskNumber++)
                    {
                        var workerDoneTrigger = receiver.ReceiveString();
                        if (taskNumber % 10 == 0)
                        {
                            Console.Write(":");
                        }
                        else
                        {
                            Console.Write(".");
                        }
                    }
                    watch.Stop();
                    //Calculate and report duration of batch
                    Console.WriteLine();
                    Console.WriteLine("Total elapsed time {0} msec", watch.ElapsedMilliseconds);
                    Console.ReadLine();
                }
            }
        }
    }
}

There is a couple of batch files you can use to spin up different amounts of workers, see:

Run1Worker.bat : One worker

Which when run should give you some output like this in the Sink process console output

====== SINK ====== Seen start of batch :………:………:………:………:………:………:………:……… :………:……… Total elapsed time 5695 msec

Run2Worker.bat : two workers

Which when run should give you some output like this in the Sink process Console output

====== SINK ====== Seen start of batch :………:………:………:………:………:………:………:……… :………:……… Total elapsed time 2959 msec

Run4Worker.bat : four workers

Which when run should give you some output like this in the Sink process Console output

====== SINK ====== Seen start of batch :………:………:………:………:………:………:………:……… :………:……… Total elapsed time 1492 msec

There are a couple of points to be aware of with this pattern

  • The Ventilator uses a NetMQ PushSocket to distribute work to the workers, this is referred to as load balancing.
  • The Ventilator and the Sink are the static parts of the system, whereas workers are dynamic. It is trivial to add more workers, we can just spin up a new instance of a worker, and in theory the work gets done quicker.
  • We need to synchronize the starting of the batch (when workers are ready), as if we did not do that, the first worker that connected would get more messages that the rest, which is not really load balanced
  • The Sink uses a NetMQ PullSocket to accumulate the results from the workers.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)