Last time we looked at how to use the Poller to work with multiple sockets, and detect their readiness. This time we will continue to work with the familiar request/response model that we have been using thus far. We will however be beefing things up a bit, and shall examine several ways in which you can have more than one thread pushing messages to the server and getting responses, which is a fairly typical requirement (at least in my book it is).
Where Is The Code?
As always before we start, it’s only polite to tell you where the code is, and it is as before on GitHub:
https://github.com/sachabarber/ZeroMqDemos
One Thing Before We Start
As you may have realised by now, ZeroMQ is a messaging framework, and as such, promotes the idea of lock free messaging. I also happen to think this is a very good idea. You can achieve an excellent throughput of messages and save yourself a lot of synchronization pain, if you try and avoid shared data structures. By doing this you will also be saving yourself the pain of having to synchronize access to them. So in general try and work with ZeroMQ in the way it wants to be worked with, which is via message passing, and avoiding locks, shared data structures.
Setting The Scene For This Post
Ok so we are nearly at the point where we can start to look at some code, but before we do that, let’s just talk a little bit more about what this post is trying to discuss.
In the code I typically write, it is quite common for a bunch of client threads all to be running at once, each capable of talking to the server. If this sounds like a requirement that you have had to deal with, then you may find this post of use, as this is exactly the scenario this post is aimed at solving.
As the aim of this post is to have asynchronous client, we need a asynchronous server too, so we use DealerSocket(s) for the client(s) and a RouterSocket for the server.
As with most things there is more than one way to skin a cat, so we will look at a couple of options, each with the their own pros/cons.
Option 1 : Each Thread Has It’ Own DealerSocket
The first options does need a bit of .NET threading knowledge, but if you have that, then the idea is a simple one. For each client thread we also create a dedicated DealerSocket that *should be* used exclusively by that thread.
This is achieved using the ThreadLocal<T> .NET class, which allows us to have a DealerSocket per thread. We add each of the client created DealerSocket(s) to a Poller instance, and listen to the ReceieveReady event on each socket, which allows us to get the message back from the server.
The obvious downside to this approach is that there will be more socket(s) created on the client side. The upside is that it is very easy to implement, and just works.
Here is an image showing what we are trying to achieve here
Here is the code for this scenario:
using System;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;
namespace ManualThreadingDemo
{
public class Program
{
public void Run()
{
ThreadLocal<DealerSocket> clientSocketPerThread =
new ThreadLocal<DealerSocket>();
int delay = 3000;
Poller poller = new Poller();
using (NetMQContext ctx = NetMQContext.Create())
{
using (var server = ctx.CreateRouterSocket())
{
server.Bind("tcp://127.0.0.1:5556");
for (int i = 0; i < 3; i++)
{
Task.Factory.StartNew((state) =>
{
DealerSocket client = null;
if (!clientSocketPerThread.IsValueCreated)
{
client = ctx.CreateDealerSocket();
client.Connect("tcp://127.0.0.1:5556");
client.ReceiveReady += Client_ReceiveReady;
clientSocketPerThread.Value = client;
poller.AddSocket(client);
}
else
{
client = clientSocketPerThread.Value;
}
while (true)
{
var messageToServer = new NetMQMessage();
messageToServer.AppendEmptyFrame();
messageToServer.Append(state.ToString());
client.SendMessage(messageToServer);
Thread.Sleep(delay);
}
},string.Format("client {0}", i), TaskCreationOptions.LongRunning);
}
Task task = Task.Factory.StartNew(poller.Start);
while (true)
{
var clientMessage = server.ReceiveMessage();
Console.WriteLine("========================");
Console.WriteLine(" INCOMING CLIENT MESSAGE ");
Console.WriteLine("========================");
for (int i = 0; i < clientMessage.FrameCount; i++)
{
Console.WriteLine("Frame[{0}] = {1}", i,
clientMessage[i].ConvertToString());
}
if (clientMessage.FrameCount == 3)
{
var clientAddress = clientMessage[0];
var clientOriginalMessage = clientMessage[2].ConvertToString();
string response = string.Format("{0} back from server {1}",
clientOriginalMessage, DateTime.Now.ToLongTimeString());
var messageToClient = new NetMQMessage();
messageToClient.Append(clientAddress);
messageToClient.AppendEmptyFrame();
messageToClient.Append(response);
server.SendMessage(messageToClient);
}
}
}
}
}
void Client_ReceiveReady(object sender, NetMQSocketEventArgs e)
{
bool hasmore = false;
e.Socket.Receive(out hasmore);
if (hasmore)
{
string result = e.Socket.ReceiveString(out hasmore);
Console.WriteLine("REPLY " + result);
}
}
[STAThread]
public static void Main(string[] args)
{
Program p = new Program();
p.Run();
}
}
}
If you were to run this, you would see something like this:
Option 2 : Each Thread Delegates Of To A Local Broker
The next example keeps the idea of a separate threads that want to send message(s) to the server. This time however we will use a broker on the client side. The idea being that the client threads will push to a shared data queue, I know I have told you to avoid shared data structures. Thing is, this is not a shared data structure it is just a thread safe queue, that many threads can write to. Where as a a shared data structure may mean several threads all trying to update the current Bid rate of an Fx option quote price. There is a difference. OK the shared queue will have some synchronization somewhere to make it thread safe, thankfully we can rely on the good work of the PFX team at Microsoft for that. Those guys are smart and I am sure the Concurrent collections namespace is pretty well designed and can be trusted to be pretty optimal.
Again we need to call on a bit of .NET know how, so for the centralized queue we use a ConcurrentQueue<T>. All client threads will enqueue their messages for the server here.
There will also be another thread started. This extra thread is the one that will be processing the messages that have been queued onto the centralized queue. When there is a message taken of the centralized queue it will be sent to the server. The thing is only the thread that reads from the centralized queue will send messages to the server.
As we still want messages to be sent out asynchronously we stick with using a DealerSocket, but since their is now only one place where we send messages to the server we only need a single DealerSocket.
We add the SINGLE DealerSocket(s) to a Poller instance, and listen to the ReceieveReady event on each socket, which allows us to get the message back from the server.
This is more complex than the first example as there are more moving parts, but we no longer have loads of sockets being create. There is just one.
As before here is a diagram of what we are trying to achieve here
Here is the code for this scenario:
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
namespace ConcurrentQueueDemo
{
public class Program
{
public void Run()
{
ConcurrentQueue<string> messages = new ConcurrentQueue<string>();
int delay = 3000;
Poller poller = new Poller();
using (NetMQContext ctx = NetMQContext.Create())
{
using (var server = ctx.CreateRouterSocket())
{
server.Bind("tcp://127.0.0.1:5556");
for (int i = 0; i < 3; i++)
{
Task.Factory.StartNew((state) =>
{
while (true)
{
messages.Enqueue(state.ToString());
Thread.Sleep(delay);
}
}, string.Format("client {0}", i), TaskCreationOptions.LongRunning);
}
Task.Factory.StartNew((state) =>
{
var client = ctx.CreateDealerSocket();
client.Connect("tcp://127.0.0.1:5556");
client.ReceiveReady += Client_ReceiveReady;
poller.AddSocket(client);
while (true)
{
string clientMessage = null;
if (messages.TryDequeue(out clientMessage))
{
var messageToServer = new NetMQMessage();
messageToServer.AppendEmptyFrame();
messageToServer.Append(clientMessage);
client.SendMessage(messageToServer);
}
}
}, TaskCreationOptions.LongRunning);
Task task = Task.Factory.StartNew(poller.Start);
while (true)
{
var clientMessage = server.ReceiveMessage();
Console.WriteLine("========================");
Console.WriteLine(" INCOMING CLIENT MESSAGE ");
Console.WriteLine("========================");
for (int i = 0; i < clientMessage.FrameCount; i++)
{
Console.WriteLine("Frame[{0}] = {1}", i,
clientMessage[i].ConvertToString());
}
if (clientMessage.FrameCount == 3)
{
var clientAddress = clientMessage[0];
var clientOriginalMessage = clientMessage[2].ConvertToString();
string response = string.Format("{0} back from server {1}",
clientOriginalMessage, DateTime.Now.ToLongTimeString());
var messageToClient = new NetMQMessage();
messageToClient.Append(clientAddress);
messageToClient.AppendEmptyFrame();
messageToClient.Append(response);
server.SendMessage(messageToClient);
}
}
}
}
}
void Client_ReceiveReady(object sender, NetMQSocketEventArgs e)
{
bool hasmore = false;
e.Socket.Receive(out hasmore);
if (hasmore)
{
string result = e.Socket.ReceiveString(out hasmore);
Console.WriteLine("REPLY " + result);
}
}
[STAThread]
public static void Main(string[] args)
{
Program p = new Program();
p.Run();
}
}
}
If you were to run this, you would see something like this:
Option 3 : Use NetMQScheduler
The final option is to use the NetMQ library class : NetMQScheduler. I think the best place to start with that is by reading the link I just included. Then come back here.
…….
…….
Time passes
…….
…….
Oh hello you’re back. Ok so now you know that the NetMQScheduler offers us a way to use TPL to schedule work and that there is a Poller that we pass into the NetMQScheduler. Cool.
The NetMQScheduler is a custom TPL scheduler, which allows us to create tasks that we want done, and it will take care of the threading aspects of them. Since we told the NetMQScheduler about the Poller we want to use we are able to hook up the ReceiveReady event and use that to get messages back from the server.
The difference here is that since we are using TPL and NetMQ we need to use TPL Task(s) and the NetMQScheduler instance whenever we want to Send/Receive.
To be honest, I think I like this design the least, as it mixes up too many concepts, and the TPL stuff tends to be mixing a bit too much with the ZeroMQ goodness for my taste. I did however just want to show this example for completeness.
So the code for this example has two parts. A simple client, and then the code that spins up a client instance and then multiple threads that use the client instance to send messages to the server. There is also a basic server loop (which I will show below under the title “The Rest”)
Client Code
Here is the client code, where it can be seen that we create a NetMQScheduler which gets handed a new Poller instance to use internally. The idea is that anyone can send a message simply by calling the clients SendMessage(..)
method
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NetMQ;
namespace NetMQSchedulerDemo
{
public class Client : IDisposable
{
private readonly NetMQContext context;
private readonly string address;
private Poller poller;
private NetMQScheduler scheduler;
private NetMQSocket clientSocket;
public Client(NetMQContext context, string address)
{
this.context = context;
this.address = address;
}
public void Start()
{
poller = new Poller();
clientSocket = context.CreateDealerSocket();
clientSocket.ReceiveReady += clientSocket_ReceiveReady;
clientSocket.Connect(address);
scheduler = new NetMQScheduler(context, poller);
Task.Factory.StartNew(poller.Start, TaskCreationOptions.LongRunning);
}
void clientSocket_ReceiveReady(object sender, NetMQSocketEventArgs e)
{
string result = e.Socket.ReceiveString();
Console.WriteLine("REPLY " + result);
}
public async Task SendMessage(NetMQMessage message)
{
Task task = new Task(() => clientSocket.SendMessage(message));
task.Start(scheduler);
await task;
await ReceiveMessage();
}
public async Task ReceiveMessage()
{
Task task = new Task(() =>
{
var result = clientSocket.ReceiveString();
Console.WriteLine("REPLY " + result);
});
task.Start(scheduler);
await task;
}
public void Dispose()
{
scheduler.Dispose();
clientSocket.Dispose();
poller.Stop();
}
}
}
The Rest
And here is the rest of the code that is responsible for spinning up the client and extra threads to push messages through the client (using the SendMessage(..)
method above)
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.SqlServer.Server;
using NetMQ;
using NetMQ.Sockets;
using NetMQSchedulerDemo;
using NUnit.Framework;
namespace NetMQSchedulerDemo
{
public class Program
{
public void Run()
{
int delay = 3000;
using (NetMQContext ctx = NetMQContext.Create())
{
using (var server = ctx.CreateRouterSocket())
{
server.Bind("tcp://127.0.0.1:5556");
using (var client = new Client(ctx, "tcp://127.0.0.1:5556"))
{
client.Start();
for (int i = 0; i < 2; i++)
{
Task.Factory.StartNew(async (state) =>
{
while (true)
{
var messageToServer = new NetMQMessage();
messageToServer.AppendEmptyFrame();
messageToServer.Append(state.ToString());
await client.SendMessage(messageToServer);
Thread.Sleep(delay);
}
}, string.Format("client {0}", i), TaskCreationOptions.LongRunning);
}
while (true)
{
var clientMessage = server.ReceiveMessage();
Console.WriteLine("========================");
Console.WriteLine(" INCOMING CLIENT MESSAGE ");
Console.WriteLine("========================");
for (int i = 0; i < clientMessage.FrameCount; i++)
{
Console.WriteLine("Frame[{0}] = {1}", i,
clientMessage[i].ConvertToString());
}
if (clientMessage.FrameCount == 3)
{
var clientAddress = clientMessage[0];
var clientOriginalMessage = clientMessage[2].ConvertToString();
string response = string.Format("{0} back from server {1}",
clientOriginalMessage, DateTime.Now.ToLongTimeString());
var messageToClient = new NetMQMessage();
messageToClient.Append(clientAddress);
messageToClient.AppendEmptyFrame();
messageToClient.Append(response);
server.SendMessage(messageToClient);
}
}
}
}
}
}
[STAThread]
public static void Main(string[] args)
{
Program p = new Program();
p.Run();
}
}
}
If you run this code you may see something like this: