Last time we looked at the different socket types within ZeroMq, and I also told you their equivalent in NetMq (which is what I use for these posts).
This time, we will look at 3 small areas of ZeroMq, which are nonetheless very important areas, and should not be overlooked.
These areas are socket Options/Identity and SendMore.
Where is the Code?
The code for all these posts is hosted in one large solution in github:
Socket Options
Depending on the type of sockets you are using, or the topology you are attempting to create, you may find that you need to set some ZeroMq options. In NetMq, this is done using the xxxxSocket.Options
property.
Here is a listing of the available properties that you may set on an xxxxSocket
. It is hard to say exactly which of these values you may need to set, as that obviously depends entirely on what you are trying to achieve. All I can do is list the options, and make you aware of them. So here they are:
Affinity
BackLog
CopyMessages
DelayAttachOnConnect
Endian
GetLastEndpoint
IPv4Only
Identity
Linger
MaxMsgSize
MulticastHops
MulticastRate
MulticastRecoveryInterval
ReceiveHighWaterMark
ReceiveMore
ReceiveTimeout
ReceiveBuffer
ReconnectInterval
ReconnectIntervalMax
SendHighWaterMark
SendTimeout
SendBuffer
TcpAcceptFilter
TcpKeepAlive
TcpKeepaliveCnt
TcpKeepaliveIdle
TcpKeepaliveInterval
XPubVerbose
To see exactly what all these options mean, you will more than likely need to refer to the actual ZeroMq documentation, i.e the guide.
Identity
One of the great things (at least in my opinion) when working with ZeroMq is that we can still stick with a standard request/response arrangement (just like we had in the 1st posts hello world example http://sachabarbs.wordpress.com/2014/08/19/zeromq-1-introduction/) but we may then choose to switch to having an asynchronous server. This is easily achieved using a RouterSocket
for the server. The clients stay as RequestSocket
(s).
So this is now an interesting arrangement, we have:
- Synchronous clients, thanks to standard
RequestSocket
type - Asynchronous server, thanks to new socket called
RouterSocket
The RouterSocket
is a personal favourite of mine, as it is very easy to use (as are many of the ZeroMq sockets, once you know what they do), but it is capable of creating a server that can seamlessly talk to 1000nds of clients, all asynchronously, with very little changes to the code we saw in part 1.
Slight Diversion
When you work with RequestSocket
(s), they do something clever for you, they always provide a message that has the following frames:
- Frame[0] address
- Frame[1] empty frame
- Frame[2] the message payload
Even though all we did was send a payload (look at the “Hello World
” example in Part 1).
Likewise, when you work with ResponseSocket
(s), they also do some of the heavy lifting for us, where they always provide a message that has the following frames:
- Frame[0] return address
- Frame[1] empty frame
- Frame[2] the message payload
Even though all we did was send a payload (look at the “Hello World
” example in Part 1).
By understanding how the standard synchronous request/response socket works, it is now fairly easy to create a fully asynchronous server using the RouterSocket
, that knows how to dispatch messages back to the correct client. All we need to do is emulate how the standard ResponseSocket
works, where we construct the message frames ourselves. Where we would be looking to create the following frames from the RouterSocket
(thus emulating the behaviour of the standard ResponseSocket
).
- Frame[0] return address
- Frame[1] empty frame
- Frame[2] the message payload
I think the best way to understand this is via an example. The example works like this:
- There are 4 clients, these are standard synchronous
RequestSocket
(s) - There is a single asynchronous server, which uses a
RouterSocket
- If the client sends a message with the prefix “
_B
” it gets a special message from the server, all other clients get a standard response message
Without further ado, here is the full code for this example:
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.SqlServer.Server;
using NetMQ;
using NetMQ.Sockets;
namespace ZeroMqIdentity
{
public class Program : IDisposable
{
private List<RequestSocket> clients = new List<RequestSocket>();
public void Run()
{
using (NetMQContext ctx = NetMQContext.Create())
{
using (var server = ctx.CreateRouterSocket())
{
server.Bind("tcp://127.0.0.1:5556");
CreateClient(ctx, "A_");
CreateClient(ctx, "B_");
CreateClient(ctx, "C_");
CreateClient(ctx, "D_");
while (true)
{
var clientMessage = server.ReceiveMessage();
Console.WriteLine("========================");
Console.WriteLine(" INCOMING CLIENT MESSAGE ");
Console.WriteLine("========================");
for (int i = 0; i < clientMessage.FrameCount; i++)
{
Console.WriteLine("Frame[{0}] = {1}", i,
clientMessage[i].ConvertToString());
}
var clientAddress = clientMessage[0];
var clientOriginalMessage = clientMessage[2].ConvertToString();
string response = string.Format("{0} back from server",
clientOriginalMessage);
if (clientOriginalMessage.StartsWith("B_"))
{
response = string.Format(
"special Message for 'B' back from server");
}
var messageToClient = new NetMQMessage();
messageToClient.Append(clientAddress);
messageToClient.AppendEmptyFrame();
messageToClient.Append(response);
server.SendMessage(messageToClient);
}
}
}
Console.ReadLine();
}
private static void Main(string[] args)
{
Program p = new Program();
p.Run();
}
private void CreateClient(NetMQContext ctx, string prefix)
{
Task.Run(() =>
{
var client = ctx.CreateRequestSocket();
clients.Add(client);
client.Connect("tcp://127.0.0.1:5556");
client.Send(string.Format("{0}Hello", prefix));
var echoedServerMessage = client.ReceiveString();
Console.WriteLine(
"\r\nClient Prefix is : '{0}', Server Message : '{1}'",
prefix, echoedServerMessage);
});
}
public void Dispose()
{
foreach (var client in clients)
{
client.Dispose();
}
}
}
}
I think to full appreciate this example, one needs to examine the output, which should be something like this (it may not be exactly this, as the RouterSocket
is FULLY async
, so it may deal with RequestSocket
(s) in a different order for you:
========================
INCOMING CLIENT MESSAGE
========================
Frame[0] = ???"
Frame[1] =
Frame[2] = A_Hello
========================
INCOMING CLIENT MESSAGE
========================
Frame[0] = @??"
Frame[1] =
Frame[2] = D_Hello
Client Prefix is : ‘A_’, Server Message : ‘A_Hello back from server’
========================
INCOMING CLIENT MESSAGE
========================
Frame[0] = A??"
Frame[1] =
Frame[2] = B_Hello
Client Prefix is : ‘D_’, Server Message : ‘D_Hello back from server’
========================
INCOMING CLIENT MESSAGE
========================
Frame[0] = B??"
Frame[1] =
Frame[2] = C_Hello
Client Prefix is : ‘B_’, Server Message : ‘special Message for ‘B’ back from ser
ver’
Client Prefix is : ‘C_’, Server Message : ‘C_Hello back from server’
SendMore
ZeroMq works using message frames. Using ZeroMq, you are able to create multipart messages which you may use for a variety of reasons, such as:
- Including address information (which we just saw an example of above actually)
- Designing a protocol for your end purpose
- Sending serialized data (for example the 1st message frame could be the type of the item, and the next message frame could be the actual serialized data)
When you work with multipart messages, you must send/receive all the parts of the message you want to work with.
I think the best way to try and get to understand multipart message is perhaps via a small test. I have stuck to use an all in one demo, which builds on the original “Hello World
” request/response demo. We use NUnit to do Asserts on the data between the client/server.
Here is a small test case, where the following points should be observed:
- We construct the 1st message part and use the
xxxxSocket.SendMore()
method, to send the 1st message - We construct the 2nd (and final) message part using the
xxxxSocket.Send()
method - The Server is able to receive the 1st message part, and also assign a value to determine if there are more parts. Which is done by using an overload of the
xxxxSocket.Receive(..)
that allows us to get an out
value for “more” - We may also use an actual
NetMqMessage
and append to it, which we can then send using xxxxSocket.SendMessage
, where the receiving socket would use xxxxSocket.ReceieveMessage(..)
and can examine the actual NetMqMessage
frames
Anyway, here is the code:
using System;
using System.Threading;
using NetMQ;
using NUnit.Framework;
namespace SendMore
{
[TestFixture]
public class SendMoreTests
{
[Test]
public void SendMoreTest()
{
using (NetMQContext ctx = NetMQContext.Create())
{
using (var server = ctx.CreateResponseSocket())
{
server.Bind("tcp://127.0.0.1:5556");
using (var client = ctx.CreateRequestSocket())
{
client.Connect("tcp://127.0.0.1:5556");
client.SendMore("A");
client.Send("Hello");
bool more;
string m = server.ReceiveString(out more);
Assert.AreEqual("A", m);
Assert.IsTrue(more);
string m2 = server.ReceiveString(out more);
Assert.AreEqual("Hello", m2);
Assert.False(more);
var m3 = new NetMQMessage();
m3.Append("From");
m3.Append("Server");
server.SendMessage(m3);
var m4 = client.ReceiveMessage();
Assert.AreEqual(2, m4.FrameCount);
Assert.AreEqual("From", m4[0].ConvertToString());
Assert.AreEqual("Server", m4[1].ConvertToString());
}
}
}
}
}
}
Here are a couple of REALLY important points from the Zero Guide when working with SendMore and multi part messages, this talks about the ZeroMq C++ core implementation, not the NetMq version, but the points are just as valid when using NetMq.
Some things to know about multipart messages:
- When you send a multipart message, the first part (and all following parts) are only actually sent on the wire when you send the final part.
- If you are using zmq_poll(), when you receive the first part of a message, all the rest has also arrived.
- You will receive all parts of a message, or none at all.
- Each part of a message is a separate zmq_msg item.
- You will receive all parts of a message whether or not you check the more property.
- On sending, ØMQ queues message frames in memory until the last is received, then sends them all.
- There is no way to cancel a partially sent message, except by closing the socket.
That’s all I wanted to talk about in the post, so until the next time then.