Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / ZeroMQ

ZeroMq # 1: Hello World

4.86/5 (5 votes)
26 Aug 2014CPOL4 min read 42.6K  
What Is ZeroMQ ZeroMq is a C library that contains a bunch of insane sockets, that provide a very very cool abstraction over the typical socket code you would find yourself writing. It provides building blocks by way of a standard set of sockets that have been built with certain scenarios in mind.

What Is ZeroMQ

ZeroMq is a C library that contains a bunch of insane sockets, that provide a very very cool abstraction over the typical socket code you would find yourself writing. It provides building blocks by way of a standard set of sockets that have been built with certain scenarios in mind.

The people that make it were instrumental in the advanced message queue protocol (AMQP) being written, and are very big in the messaging world.

There is a completely awesome book that everyone should read, it is by Pieter Hintjens

http://www.amazon.co.uk/ZeroMQ-Messaging-Applications-Pieter-Hintjens/dp/1449334067

There is also a online version of the PDF book that has full code samples, which is known as the guide:

http://zguide.zeromq.org/page:all

Why Use ZeroMq/Messaging At All

If you have ever written any asynchronous code, and have had to deal with shared state, and have had to deal with that, you will know that that brings locks/semaphores etc etc.

Now imagine a world where you don’t care about locks, semaphores etc etc, you just pass a message, and there is no shared state to worry about. Welcome to messaging. That is how you can write systems with a high throughput of messages per second, without the overhead of shared state management.

Zero is at its core a messaging framework. It can be setup in a brokerless manner or also used broker, or even peer to peer. It is sockets that make it powerful. They are the fundamental building blocks, which you may use to create large distributed architectures, or very small ones.

I would urge you all to read the book, or check out the guide, they have changed the way I think about certain tasks for sure.

Where Do I Get The ZeroMq library?

First things first, I mentioned that ZeroMq is written in C, but has many many language bindings. There is in fact a C# binding that you could use, which is the zmqcli binding. The thing with that, is that the errors you get are quite confusing at times, as it has a tendency to show the actual C error code.

I wanted (if possible) to use an entirely native port of ZeroMq, luckily for me there is just such a thing, by way of the NetMq project. That is the library I will be using throughout all my posts.

You can install NetMq using the following Nuget package manager command line:

Install-Package NetMQ

So without further ado lets get down to see a very simple example.

What Does The Example Do

The example is dead straight forward, we send a message from the client to the server, and the server sends a message back. This is a (extremely simple) example of the Request/Response pattern, of which we will see a lot more examples. Zero also supports publish/subscribe which we will look at too (though not in as much detail as request/response).

So let’s see some code shall we:

using System;
using NetMQ;

namespace HelloWorldDemo
{
    class Program
    {
        private static void Main(string[] args)
        {
            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var server = ctx.CreateResponseSocket())
                {
                    server.Bind("tcp://127.0.0.1:5556");

                    using (var client = ctx.CreateRequestSocket())
                    {
                        client.Connect("tcp://127.0.0.1:5556");

                        client.Send("Hello");

                        string fromClientMessage = server.ReceiveString();

                        Console.WriteLine("From Client: {0}", fromClientMessage);

                        server.Send("Hi Back");

                        string fromServerMessage = client.ReceiveString();

                        Console.WriteLine("From Server: {0}", fromServerMessage);

                        Console.ReadLine();
                    }
                }
            }
        }
    }
}

Believe it or not that code is enough to have a fully functioning request (client) / response (server) pattern, up and working. Don’t believe me here is some output to prove it

image

Ok, so it does work, that is quite mad. So how does it work?

Well there are a couple of take away points there

  1. We are able to create a request/response pattern, by using specialized sockets that are geared towards working in the request/response scenario.
  2. We can use tcp as the protocol (ZeroMq also supports others such as inproc)
  3. We did not have to spin up any extra thread on the server to deal with the freshly connected client socket and then continue to accept other client sockets. In fact this code could pretty much talk to 1000nds of clients without much alteration at all (in fact I will show you an example of the using separate processes)
  4. There is this magical NetMqContext. This is mandatory and must be used whenever you use ZeroMq sockets.

There are a couple of specific things to talk about in request/response, in that those type of sockets are assumed to be 1:1 request/response. So If you call ReceiveString() twice on the clients request socket without the servers response socket sending something you will get an Exception, as can be seen in the screen shot below.

image

Running In Separate Threads

This demo is to show you how you could use an internal processes messaging system. You obviously need to use new threads in this example as we need to not block on the receive() method of the sockets

image

Program

This just kicks of a few clients for the server to deal with

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace HelloWorldDemoSeparateThreads
{
    public class Program
    {
        public static void Main(string[] args)
        {
            Server server = new Server();
            server.Run();

            foreach (Client client in Enumerable.Range(0, 5).Select(
                x => new Client(string.Format("client {0}", x))))
            {
                client.Run();
            }

            Console.ReadLine();
        }
    }
}

Client

using System;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;


namespace HelloWorldDemoSeparateThreads
{
    sealed class Client
    {
        private readonly string clientName;

        public Client(string clientName)
        {
            this.clientName = clientName;
        }

        public void Run()
        {
            Task.Run(() =>
            {
                using (NetMQContext ctx = NetMQContext.Create())
                {
                    using (var client = ctx.CreateRequestSocket())
                    {
                        client.Connect("tcp://127.0.0.1:5556");
                        while (true)
                        {
                            client.Send(string.Format("Hello from client {0}", clientName));
                            string fromServerMessage = client.ReceiveString();
                            Console.WriteLine("From Server: {0} running on ThreadId : {1}", 
                                fromServerMessage, Thread.CurrentThread.ManagedThreadId);
                            Thread.Sleep(5000);
                        }
                    }
                }
            });

        }
    }
}

Server

using System;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;

namespace HelloWorldDemoSeparateThreads
{
    sealed class Server
    {
        public void Run()
        {
            Task.Run(() =>
            {
                using (NetMQContext ctx = NetMQContext.Create())
                {
                    using (var server = ctx.CreateResponseSocket())
                    {
                        server.Bind("tcp://127.0.0.1:5556");

                        while (true)
                        {
                            string fromClientMessage = server.ReceiveString();
                            Console.WriteLine("From Client: {0} running on ThreadId : {1}", 
                                fromClientMessage, Thread.CurrentThread.ManagedThreadId);
                            server.Send("Hi Back");
                        }

                    }
                }
            });

        }
    }
}

Running In Separate Processes

Client

using System;
using System.Threading;
using NetMQ;

namespace HelloWorldSeparateClient
{
    sealed class Client
    {
        private string clientName;

        public static void Main(string[] args)
        {
            Client c = new Client();
            c.clientName = args[0];
            c.Run();
        }

        public void Run()
        {
            
            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var client = ctx.CreateRequestSocket())
                {
                    client.Connect("tcp://127.0.0.1:5556");
                    while (true)
                    {
                        client.Send(string.Format("Hello from client {0}", clientName));
                        string fromServerMessage = client.ReceiveString();
                        Console.WriteLine("From Server: {0} running on ThreadId : {1}",
                            fromServerMessage, Thread.CurrentThread.ManagedThreadId);
                        Thread.Sleep(5000);
                    }
                }
            }
            

        }
    }

}

Server

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;


namespace HelloWorldSeparateServer
{
    sealed class Server
    {

        public static void Main(string[] args)
        {
            Server s = new Server();
            s.Run();
        }

        public void Run()
        {
            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var server = ctx.CreateResponseSocket())
                {
                    server.Bind("tcp://127.0.0.1:5556");

                    while (true)
                    {
                        string fromClientMessage = server.ReceiveString();
                        Console.WriteLine("From Client: {0} running on ThreadId : {1}",
                            fromClientMessage, Thread.CurrentThread.ManagedThreadId);
                        server.Send("Hi Back");
                    }

                }
             }
        }
    }
}

Here is the code running in separate processes, where it can be seen that we did not have to do a damn thing to change the server code at all we just moved it around to run in a new process rather than a new thread.

image

Where Is The Code For This Article?

The code for all these posts is hosted in one large solution in github:

https://github.com/sachabarber/ZeroMqDemos

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)