What Is ZeroMQ
ZeroMq is a C library that contains a bunch of insane sockets, that provide a very very cool abstraction over the typical socket code you would find yourself writing. It provides building blocks by way of a standard set of sockets that have been built with certain scenarios in mind.
The people that make it were instrumental in the advanced message queue protocol (AMQP) being written, and are very big in the messaging world.
There is a completely awesome book that everyone should read, it is by Pieter Hintjens
http://www.amazon.co.uk/ZeroMQ-Messaging-Applications-Pieter-Hintjens/dp/1449334067
There is also a online version of the PDF book that has full code samples, which is known as the guide:
http://zguide.zeromq.org/page:all
Why Use ZeroMq/Messaging At All
If you have ever written any asynchronous code, and have had to deal with shared state, and have had to deal with that, you will know that that brings locks/semaphores etc etc.
Now imagine a world where you don’t care about locks, semaphores etc etc, you just pass a message, and there is no shared state to worry about. Welcome to messaging. That is how you can write systems with a high throughput of messages per second, without the overhead of shared state management.
Zero is at its core a messaging framework. It can be setup in a brokerless manner or also used broker, or even peer to peer. It is sockets that make it powerful. They are the fundamental building blocks, which you may use to create large distributed architectures, or very small ones.
I would urge you all to read the book, or check out the guide, they have changed the way I think about certain tasks for sure.
Where Do I Get The ZeroMq library?
First things first, I mentioned that ZeroMq is written in C, but has many many language bindings. There is in fact a C# binding that you could use, which is the zmqcli binding. The thing with that, is that the errors you get are quite confusing at times, as it has a tendency to show the actual C error code.
I wanted (if possible) to use an entirely native port of ZeroMq, luckily for me there is just such a thing, by way of the NetMq project. That is the library I will be using throughout all my posts.
You can install NetMq using the following Nuget package manager command line:
Install-Package NetMQ
So without further ado lets get down to see a very simple example.
What Does The Example Do
The example is dead straight forward, we send a message from the client to the server, and the server sends a message back. This is a (extremely simple) example of the Request/Response pattern, of which we will see a lot more examples. Zero also supports publish/subscribe which we will look at too (though not in as much detail as request/response).
So let’s see some code shall we:
using System;
using NetMQ;
namespace HelloWorldDemo
{
class Program
{
private static void Main(string[] args)
{
using (NetMQContext ctx = NetMQContext.Create())
{
using (var server = ctx.CreateResponseSocket())
{
server.Bind("tcp://127.0.0.1:5556");
using (var client = ctx.CreateRequestSocket())
{
client.Connect("tcp://127.0.0.1:5556");
client.Send("Hello");
string fromClientMessage = server.ReceiveString();
Console.WriteLine("From Client: {0}", fromClientMessage);
server.Send("Hi Back");
string fromServerMessage = client.ReceiveString();
Console.WriteLine("From Server: {0}", fromServerMessage);
Console.ReadLine();
}
}
}
}
}
}
Believe it or not that code is enough to have a fully functioning request (client) / response (server) pattern, up and working. Don’t believe me here is some output to prove it
Ok, so it does work, that is quite mad. So how does it work?
Well there are a couple of take away points there
- We are able to create a request/response pattern, by using specialized sockets that are geared towards working in the request/response scenario.
- We can use tcp as the protocol (ZeroMq also supports others such as inproc)
- We did not have to spin up any extra thread on the server to deal with the freshly connected client socket and then continue to accept other client sockets. In fact this code could pretty much talk to 1000nds of clients without much alteration at all (in fact I will show you an example of the using separate processes)
- There is this magical NetMqContext. This is mandatory and must be used whenever you use ZeroMq sockets.
There are a couple of specific things to talk about in request/response, in that those type of sockets are assumed to be 1:1 request/response. So If you call ReceiveString() twice on the clients request socket without the servers response socket sending something you will get an Exception, as can be seen in the screen shot below.
Running In Separate Threads
This demo is to show you how you could use an internal processes messaging system. You obviously need to use new threads in this example as we need to not block on the receive() method of the sockets
Program
This just kicks of a few clients for the server to deal with
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace HelloWorldDemoSeparateThreads
{
public class Program
{
public static void Main(string[] args)
{
Server server = new Server();
server.Run();
foreach (Client client in Enumerable.Range(0, 5).Select(
x => new Client(string.Format("client {0}", x))))
{
client.Run();
}
Console.ReadLine();
}
}
}
Client
using System;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
namespace HelloWorldDemoSeparateThreads
{
sealed class Client
{
private readonly string clientName;
public Client(string clientName)
{
this.clientName = clientName;
}
public void Run()
{
Task.Run(() =>
{
using (NetMQContext ctx = NetMQContext.Create())
{
using (var client = ctx.CreateRequestSocket())
{
client.Connect("tcp://127.0.0.1:5556");
while (true)
{
client.Send(string.Format("Hello from client {0}", clientName));
string fromServerMessage = client.ReceiveString();
Console.WriteLine("From Server: {0} running on ThreadId : {1}",
fromServerMessage, Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(5000);
}
}
}
});
}
}
}
Server
using System;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
namespace HelloWorldDemoSeparateThreads
{
sealed class Server
{
public void Run()
{
Task.Run(() =>
{
using (NetMQContext ctx = NetMQContext.Create())
{
using (var server = ctx.CreateResponseSocket())
{
server.Bind("tcp://127.0.0.1:5556");
while (true)
{
string fromClientMessage = server.ReceiveString();
Console.WriteLine("From Client: {0} running on ThreadId : {1}",
fromClientMessage, Thread.CurrentThread.ManagedThreadId);
server.Send("Hi Back");
}
}
}
});
}
}
}
Running In Separate Processes
Client
using System;
using System.Threading;
using NetMQ;
namespace HelloWorldSeparateClient
{
sealed class Client
{
private string clientName;
public static void Main(string[] args)
{
Client c = new Client();
c.clientName = args[0];
c.Run();
}
public void Run()
{
using (NetMQContext ctx = NetMQContext.Create())
{
using (var client = ctx.CreateRequestSocket())
{
client.Connect("tcp://127.0.0.1:5556");
while (true)
{
client.Send(string.Format("Hello from client {0}", clientName));
string fromServerMessage = client.ReceiveString();
Console.WriteLine("From Server: {0} running on ThreadId : {1}",
fromServerMessage, Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(5000);
}
}
}
}
}
}
Server
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
namespace HelloWorldSeparateServer
{
sealed class Server
{
public static void Main(string[] args)
{
Server s = new Server();
s.Run();
}
public void Run()
{
using (NetMQContext ctx = NetMQContext.Create())
{
using (var server = ctx.CreateResponseSocket())
{
server.Bind("tcp://127.0.0.1:5556");
while (true)
{
string fromClientMessage = server.ReceiveString();
Console.WriteLine("From Client: {0} running on ThreadId : {1}",
fromClientMessage, Thread.CurrentThread.ManagedThreadId);
server.Send("Hi Back");
}
}
}
}
}
}
Here is the code running in separate processes, where it can be seen that we did not have to do a damn thing to change the server code at all we just moved it around to run in a new process rather than a new thread.
Where Is The Code For This Article?
The code for all these posts is hosted in one large solution in github:
https://github.com/sachabarber/ZeroMqDemos