Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / ZeroMQ

ZeroMq #3: Socket Options/Identity And SendMore

5.00/5 (1 vote)
26 Aug 2014CPOL6 min read 24.6K  
ZeroMq #3: Socket Options/Identity And SendMore

Last time we looked at the different socket types within ZeroMq, and I also told you their equivalent in NetMq (which is what I use for these posts).

This time, we will look at 3 small areas of ZeroMq, which are nonetheless very important areas, and should not be overlooked.

These areas are socket Options/Identity and SendMore.

Where is the Code?

The code for all these posts is hosted in one large solution in github:

Socket Options

Depending on the type of sockets you are using, or the topology you are attempting to create, you may find that you need to set some ZeroMq options. In NetMq, this is done using the xxxxSocket.Options property.

Here is a listing of the available properties that you may set on an xxxxSocket. It is hard to say exactly which of these values you may need to set, as that obviously depends entirely on what you are trying to achieve. All I can do is list the options, and make you aware of them. So here they are:

  • Affinity
  • BackLog
  • CopyMessages
  • DelayAttachOnConnect
  • Endian
  • GetLastEndpoint
  • IPv4Only
  • Identity
  • Linger
  • MaxMsgSize
  • MulticastHops
  • MulticastRate
  • MulticastRecoveryInterval
  • ReceiveHighWaterMark
  • ReceiveMore
  • ReceiveTimeout
  • ReceiveBuffer
  • ReconnectInterval
  • ReconnectIntervalMax
  • SendHighWaterMark
  • SendTimeout
  • SendBuffer
  • TcpAcceptFilter
  • TcpKeepAlive
  • TcpKeepaliveCnt
  • TcpKeepaliveIdle
  • TcpKeepaliveInterval
  • XPubVerbose

To see exactly what all these options mean, you will more than likely need to refer to the actual ZeroMq documentation, i.e the guide.

Identity

One of the great things (at least in my opinion) when working with ZeroMq is that we can still stick with a standard request/response arrangement (just like we had in the 1st posts hello world example http://sachabarbs.wordpress.com/2014/08/19/zeromq-1-introduction/) but we may then choose to switch to having an asynchronous server. This is easily achieved using a RouterSocket for the server. The clients stay as RequestSocket(s).

So this is now an interesting arrangement, we have:

  • Synchronous clients, thanks to standard RequestSocket type
  • Asynchronous server, thanks to new socket called RouterSocket

The RouterSocket is a personal favourite of mine, as it is very easy to use (as are many of the ZeroMq sockets, once you know what they do), but it is capable of creating a server that can seamlessly talk to 1000nds of clients, all asynchronously, with very little changes to the code we saw in part 1.

Slight Diversion

When you work with RequestSocket(s), they do something clever for you, they always provide a message that has the following frames:

  • Frame[0] address
  • Frame[1] empty frame
  • Frame[2] the message payload

Even though all we did was send a payload (look at the “Hello World” example in Part 1).

Likewise, when you work with ResponseSocket(s), they also do some of the heavy lifting for us, where they always provide a message that has the following frames:

  • Frame[0] return address
  • Frame[1] empty frame
  • Frame[2] the message payload

Even though all we did was send a payload (look at the “Hello World” example in Part 1).

By understanding how the standard synchronous request/response socket works, it is now fairly easy to create a fully asynchronous server using the RouterSocket, that knows how to dispatch messages back to the correct client. All we need to do is emulate how the standard ResponseSocket works, where we construct the message frames ourselves. Where we would be looking to create the following frames from the RouterSocket (thus emulating the behaviour of the standard ResponseSocket).

XML
<!--EndFragment-->
  • Frame[0] return address
  • Frame[1] empty frame
  • Frame[2] the message payload

I think the best way to understand this is via an example. The example works like this:

  1. There are 4 clients, these are standard synchronous RequestSocket(s)
  2. There is a single asynchronous server, which uses a RouterSocket
  3. If the client sends a message with the prefix “_B” it gets a special message from the server, all other clients get a standard response message

Without further ado, here is the full code for this example:

C#
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.SqlServer.Server;
using NetMQ;
using NetMQ.Sockets;

namespace ZeroMqIdentity
{
    public class Program : IDisposable
    {
        private List<RequestSocket> clients = new List<RequestSocket>();

        public void Run()
        {
            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var server = ctx.CreateRouterSocket())
                {
                    server.Bind("tcp://127.0.0.1:5556");

                    CreateClient(ctx, "A_");
                    CreateClient(ctx, "B_");
                    CreateClient(ctx, "C_");
                    CreateClient(ctx, "D_");

                    while (true)
                    {

                        var clientMessage = server.ReceiveMessage();
                        Console.WriteLine("========================");
                        Console.WriteLine(" INCOMING CLIENT MESSAGE ");
                        Console.WriteLine("========================");
                        for (int i = 0; i < clientMessage.FrameCount; i++)
                        {
                            Console.WriteLine("Frame[{0}] = {1}", i, 
                                clientMessage[i].ConvertToString());
                        }

                        var clientAddress = clientMessage[0];
                        var clientOriginalMessage = clientMessage[2].ConvertToString();
                        string response = string.Format("{0} back from server", 
                            clientOriginalMessage);

                        // "B_" client is special
                        if (clientOriginalMessage.StartsWith("B_"))
                        {
                            response = string.Format(
                                "special Message for 'B' back from server");
                        }

                        var messageToClient = new NetMQMessage();
                        messageToClient.Append(clientAddress);
                        messageToClient.AppendEmptyFrame();
                        messageToClient.Append(response);
                        server.SendMessage(messageToClient);
                    }
                }
            }

            Console.ReadLine();
        }

        private static void Main(string[] args)
        {
            Program p = new Program();
            p.Run();
        }

        private void CreateClient(NetMQContext ctx, string prefix)
        {
            Task.Run(() =>
            {
                var client = ctx.CreateRequestSocket();
                clients.Add(client);
                client.Connect("tcp://127.0.0.1:5556");
                client.Send(string.Format("{0}Hello", prefix));

                //read client message
                var echoedServerMessage = client.ReceiveString();
                Console.WriteLine(
                    "\r\nClient Prefix is : '{0}', Server Message : '{1}'",
                    prefix, echoedServerMessage);

            });
        }

        public void Dispose()
        {
            foreach (var client in clients)
            {
                client.Dispose();
            }
        }
    }
}

I think to full appreciate this example, one needs to examine the output, which should be something like this (it may not be exactly this, as the RouterSocket is FULLY async, so it may deal with RequestSocket(s) in a different order for you:

========================
INCOMING CLIENT MESSAGE
========================
Frame[0] =  ???"
Frame[1] =
Frame[2] = A_Hello
========================
INCOMING CLIENT MESSAGE
========================
Frame[0] =  @??"
Frame[1] =
Frame[2] = D_Hello

Client Prefix is : ‘A_’, Server Message : ‘A_Hello back from server’
========================
INCOMING CLIENT MESSAGE
========================
Frame[0] =  A??"
Frame[1] =
Frame[2] = B_Hello

Client Prefix is : ‘D_’, Server Message : ‘D_Hello back from server’
========================
INCOMING CLIENT MESSAGE
========================
Frame[0] =  B??"
Frame[1] =
Frame[2] = C_Hello

Client Prefix is : ‘B_’, Server Message : ‘special Message for ‘B’ back from ser
ver’

Client Prefix is : ‘C_’, Server Message : ‘C_Hello back from server’

SendMore

ZeroMq works using message frames. Using ZeroMq, you are able to create multipart messages which you may use for a variety of reasons, such as:

  • Including address information (which we just saw an example of above actually)
  • Designing a protocol for your end purpose
  • Sending serialized data (for example the 1st message frame could be the type of the item, and the next message frame could be the actual serialized data)

When you work with multipart messages, you must send/receive all the parts of the message you want to work with.

I think the best way to try and get to understand multipart message is perhaps via a small test. I have stuck to use an all in one demo, which builds on the original “Hello World” request/response demo. We use NUnit to do Asserts on the data between the client/server.

Here is a small test case, where the following points should be observed:

  1. We construct the 1st message part and use the xxxxSocket.SendMore() method, to send the 1st message
  2. We construct the 2nd (and final) message part using the xxxxSocket.Send() method
  3. The Server is able to receive the 1st message part, and also assign a value to determine if there are more parts. Which is done by using an overload of the xxxxSocket.Receive(..) that allows us to get an out value for “more”
  4. We may also use an actual NetMqMessage and append to it, which we can then send using xxxxSocket.SendMessage, where the receiving socket would use xxxxSocket.ReceieveMessage(..) and can examine the actual NetMqMessage frames

Anyway, here is the code:

C#
using System;
using System.Threading;
using NetMQ;
using NUnit.Framework;

namespace SendMore
{
    [TestFixture]
    public class SendMoreTests
    {
        [Test]
        public void SendMoreTest()
        {
            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var server = ctx.CreateResponseSocket())
                {
                    server.Bind("tcp://127.0.0.1:5556");

                    using (var client = ctx.CreateRequestSocket())
                    {
                        client.Connect("tcp://127.0.0.1:5556");

                        //client send message
                        client.SendMore("A");
                        client.Send("Hello");

                        //server receive 1st part
                        bool more;
                        string m = server.ReceiveString(out more);
                        Assert.AreEqual("A", m);
                        Assert.IsTrue(more);

                        //server receive 2nd part
                        string m2 = server.ReceiveString(out more);
                        Assert.AreEqual("Hello", m2);
                        Assert.False(more);

                        //server send message, this time use NetMqMessage
                        //which will be sent as frames if the client calls
                        //ReceieveMessage()
                        var m3 = new NetMQMessage();
                        m3.Append("From");
                        m3.Append("Server");
                        server.SendMessage(m3);

                        //client receive
                        var m4 = client.ReceiveMessage();
                        Assert.AreEqual(2, m4.FrameCount);
                        Assert.AreEqual("From", m4[0].ConvertToString());
                        Assert.AreEqual("Server", m4[1].ConvertToString());
                    }
                }
            }
        }
    }
}

Here are a couple of REALLY important points from the Zero Guide when working with SendMore and multi part messages, this talks about the ZeroMq C++ core implementation, not the NetMq version, but the points are just as valid when using NetMq.

Some things to know about multipart messages:

  • When you send a multipart message, the first part (and all following parts) are only actually sent on the wire when you send the final part.
  • If you are using zmq_poll(), when you receive the first part of a message, all the rest has also arrived.
  • You will receive all parts of a message, or none at all.
  • Each part of a message is a separate zmq_msg item.
  • You will receive all parts of a message whether or not you check the more property.
  • On sending, ØMQ queues message frames in memory until the last is received, then sends them all.
  • There is no way to cancel a partially sent message, except by closing the socket.

That’s all I wanted to talk about in the post, so until the next time then.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)