Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Rabbit Mq Shovel Example

0.00/5 (No votes)
5 Jan 2012 2  
A look at using RabbitMq to do message routing.

Table of Contents

Introduction

It has been a while since I have written an article, that is not because I am not busy, far from it, I have been extremely busy working on a productivity tool that I think people may like. I think I will certainly use it, which is partly why I am writing it. I am working on this tool with CodeProject's own Pete O'Hanlon, and it is coming on well. We hope to have something ready for its first showing some time in the new year. Anyway enough of the reasons why I haven't written anything for a while, and back to the here and now.

At work just before X'mas, my team leader Richard King (co-author Baboon Converters) and I were looking into providing a MSMQ queue based system where we would have multiple queues in place and each machine in the chain could receive and send. To keep it simple, let's forget about the duplex comms and just consider it to be a single direction of message travel across machines. The following diagram illustrates roughly what we wanted.

Essentially what we want is some sort of routing from Machine A to Machine B.

This could all be done using pretty standard MSMQ code, and we could just forward the queue messages programmatically, or we could even use MSMQ over WCF or use the new WCF 4.0 RoutingService. We kind of had reservations behind all of these approaches.

  • MSMQ code: Simply too much boilerplate code, sure we could abstract that and end up with something pretty slim, but we wanted to see what we could do without going down this route.
  • MSMQ over WCF: Yeah OK, but lots of config required, and we need to host the service somewhere. Also need to create the MSMQ queues and administer the access rights to these queues to only authorised users.
  • RoutingService: This is quite nice, but it still relies on WCF, so suffers from the same problems as MSMQ over WCF.

We just felt that all of these approaches either involved too much config/setup, or was not quite what we wanted without us writing a load of code. We also felt that this bridge must have been crossed before, so we set about trying to look at available messaging solutions out there (and there are loads); we will now discuss a few of these.

Note: This article is quite specific in that it really is all about talking about how to tackle this routing arrangement that we needed to solve, so if you think this is not for you, no worries, we get that. However if you have a requirement like that, you never know this article may talk about something that you might use. The choice is yours.

Available Frameworks

There are literally hundreds of messaging solutions out there when you really start to look. We looked at three in some detail, which we will talk about briefly below. We will not go into loads of detail, but shall rather list the attributes that each of the framework vendors claim set their frameworks apart.

NServiceBus

Website: http://www.nservicebus.com/

Vendor claims:

  • Bus architecture
  • Publish/Subscribe
  • Durable messages
  • Sagas
  • Scalable across servers
  • Transactions
  • Serialization
  • IOC support

MassTransit

Website: http://masstransit-project.com/

Vendor claims:

  • Bus architecture
  • Sagas
  • Exception management
  • Transactions
  • Serialization
  • Headers
  • Consumer lifecycle
  • Built on top of Rabbit Mq
  • IOC support

RabbitMq

Website: http://www.rabbitmq.com/

Vendor claims:

  • Messaging that just works

Picking a Framework

Now when it came to actually picking a framework, we had the following criteria:

  1. Is it easy to use?
  2. How long will it take to get up and running?
  3. Will it fit our requirement exactly?
  4. Is the API intuitive, would we understand it in 6 months time?
  5. Having never used it, would we know how to fix it if something went wrong?

Based on extensive proof of concepts, we actually ended up going with Rabbit Mq (which you may find strange as its only claim is "Messaging that just works"), the reason being that it just seemed to have less smoke and mirrors than NServiceBus and Mass Transit. We are not saying these two frameworks are not good, they are both really good, it was just for our purposes we wanted something dead simple to use, and after we conducted our tests, we felt that Rabbit Mq was it. It just had much simpler configuration (once you got the hang of it) and did not involve so much esoteric code that only the designers of these frameworks truly understand.

Rabbit Mq also had a lot better documentation than the others, at least we felt it did anyway.

For the rest of this article, we will talk about how to configure Rabbit Mq to send messages from one machine to another, which is what our requirements were. If this does not sound that interesting to you, or you can't see the benefit of this, then this is probably the best place to call it a day. However if you think this sort of arrangement that we were trying to solve may be of use to you, please read on.

Rabbit Mq

Rabbit Mq calls itself a message broker, where the typical setup is to have a single Rabbit Mq broker (which they refer to as Agents) that sits on a certain box, and simply deals with incoming messages and makes sure these are dispatched accordingly. One of the stranger aspects (at least for a .NET developer) is that Rabbit Mq is actually written in Erlang. So you will need to install that (which we will talk about in a minute), but don't let that put you off, there are loads of Rabbit Mq clients out there, .NET being one of them.

Now you may be thinking if Rabbit Mq is a broker type arrangement, which is typically something like this:

How can that possibly do the routing of message as specified by our initial requirements?

That doesn't look much like a broker type architecture, where we have a central broker. Luckily, Rabbit Mq comes with a handy plug-in called "Shovel" which the Rabbit Mq documentation describes as follows:

rabbitmq_shovel: "A plug-in for RabbitMQ that shovels messages from a queue on one broker to an exchange on another broker."

All of a sudden, we have two machines involved, each running a Rabbit Mq broker. Mmm, sounds more like our initial requirement all of a sudden. Groovy.

So with that knowledge in place, let's carry on with the rest of the article where we will discuss what we need to install/configure to successfully see that our requirements are met.

Installation

The first thing you will need to do to get Rabbit Mq up and running is install the required bits and pieces. Now for our requirements, since we wanted to have a Rabbit Mq broker on each of the machines in the message chain, all of these and subsequent installation instructions apply to all machines in the message chain (so for our requirements, that would be two machines).

You should now have several folders created.

Erlang

Which should look like this:

Rabbit Mq

Which should look like this:

Plug-ins Installation

Now that we have installed Erlang and the Rabbit Mq server, we need to install two plug-ins which are discussed below:

Installing the WebServer Plug-in

From the Rabbit installation folder (C:\Program Files\RabbitMQ Server\rabbitmq_server-2.7.0\sbin typically), run the following command line:

rabbitmq-plugins.bat enable rabbitmq_management

After running this command line, we need to get the Rabbit Mq server to see these additional plug-ins, so we need to start and stop the Rabbit server (where you will need to run the following command lines, where the command windows are opened with Admin rights):

  • rabbitmq-service stop
  • rabbitmq-service remove
  • rabbitmq-service install
  • rabbitmq-service start

You might want to wrap these up into a Batch (.BAT) file as we will need to use this combination again.

Installing the Shovel Plug-in

From the Rabbit installation folder (C:\Program Files\RabbitMQ Server\rabbitmq_server-2.7.0\sbin typically), run the following command lines:

  • rabbitmq-plugins.bat enable rabbitmq_shovel
  • rabbitmq-plugins.bat enable rabbitmq_shovel_management

As before, after running these command lines, we need to get the Rabbit Mq server to see these additional plug-ins, so we need to start and stop the Rabbit server (where you will need to run the following command lines, where the command windows are opened with Admin rights):

  • rabbitmq-service stop
  • rabbitmq-service remove
  • rabbitmq-service install
  • rabbitmq-service start

Checking the WebServer

Next check that the web server is available, which can be checked using the following URL: http://localhost:55672/#/.

Where username and password are default of:

  1. username =“guest
  2. password “guest

Click the image to see a bigger version.

It can be seen that we have a running web server by which we can monitor all of the Rabbit Mq server components, such as:

  • Connections
  • Queues
  • Exchanges
  • Shovels (which will not work yet, as we have not configured it)

So all good so far, let's now turn our attention to configuring the Shovel plug-in, shall we?

Configuring Shovel

The Shovel Rabbit Mq plug-in does this:

rabbitmq_shovel: "A plug-in for RabbitMQ that shovels messages from a queue on one broker to an exchange on another broker."

Before we can use Shovel, we need to configure it.

Creating the Environment Variable

To enable Rabbit Mq to pick up a config file, we need to create an environment variable to tell Rabbit Mq where its config should be obtained from. This should be done as follows where the Variable value should be the path and name of the Rabbit Mq config file excluding the file extension.

Creating the Shovel Config File

The next step is to create a new Rabbit Mq config file which will configure the Shovel plug-in, an example of this config file may look like this. It it worth knowing that this is an Erlang style config file, which is what Rabbit Mq uses.

So going back to what we wanted to achieve:

Based on this image, we could end up with a Rabbit Mq config file called Rabbit.config being stored as c\:\RabbitConfig which looks like this (the "." at the end is important).

For the demo code, Machine A and Machine B were two machines where I work, called "C1801" and "C1799", and the queue which we communicated on was called "Killer".

You will need to change these to suit your own requirements.

[{rabbitmq_shovel,
  [{shovels,
    [{killer_push,
      [{sources,      [{broker,"amqp://C1801"}]},
       {destinations, [{broker, "amqp://C1799"}]},
       {queue, <<"Killer">>},
       {ack_mode, on_confirm},
       {publish_properties, [{delivery_mode, 2}]},
       {publish_fields, [{exchange, <<"">>},
                         {routing_key, <<"Killer">>}]},
       {reconnect_delay, 5}
      ]}
     ]
   }]
}].

As before, we need to get the Rabbit Mq server to see these plug-in changes, so we need to start and stop the Rabbit server (where you will need to run the following command lines, where the command windows are opened with Admin rights):

  • rabbitmq-service stop
  • rabbitmq-service remove
  • rabbitmq-service install
  • rabbitmq-service start

And that is all there is to the configuration, at least for our intended scenario anyway. It does take a little bit of getting used to the Erlang style config files, but that is just how it is. You get used to it.

Here is a running version of this taken from our actual work PCs where we have fully tested this scenario with Rabbit Mq.

As you can see, this assumes a Rabbit Mq queue called "killer_push" which is the name that is configured in the Rabbit Mq config file shown above.

Demo Code

We have included a simple VS2010 demo solution that contains two simple projects, a Sender and a Receiver, which are shown below. These are intentionally simple, so you can see the messages received. You will need to change these for your own purposes.

Sender Code

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

class Send {

    private ConnectionFactory factory = new ConnectionFactory();
    private IConnection connection = null;
    private IModel channel = null;
    private int counter =0;


    public Send()
    {
        factory.HostName = "C1801";
    }

    private void Setup()
    {
        counter = 0;
        connection = connection = factory.CreateConnection();
        channel = connection.CreateModel();
        channel.ModelShutdown += Channel_ModelShutdown;
        connection.CallbackException += Connection_CallbackException;
        connection.ConnectionShutdown += Connection_ConnectionShutdown;

        bool durable = true;
        channel.QueueDeclare("Killer", durable, false, false, null);
    }

    private void Publish()
    {
        IBasicProperties properties = channel.CreateBasicProperties();
        properties.DeliveryMode = 2;
        properties.CorrelationId = "sachas message";

        try
        {
            while (true)
            {
                string message = string.Format("This is the message {0}, {1}", 
                                        ++counter, DateTime.Now.ToShortTimeString());
                byte[] body = System.Text.Encoding.UTF8.GetBytes(message);
                channel.BasicPublish("", "Killer", properties, body);
                Console.ReadLine();
                Console.WriteLine(" [x] Sent1 {0}", message);
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine("SOMETHING IS WRONG!!!!   " + ex.Message);
        }
        finally
        {
            ReStart();
        }
    }

    private void ReStart()
    {
        CleanUp();
        Setup();
        Publish();
    }

    private void CleanUp()
    {
        if (connection != null)
        {
            connection.Dispose();
            connection.Close();
        }
        if (channel != null)
        {
            channel.Dispose();
            channel.Close();
        }
    }

    public static void Main() 
    {
        Send r = new Send();
        r.Setup();
        r.Publish();
        
    }

    private void Connection_ConnectionShutdown(IConnection connection, ShutdownEventArgs reason)
    {
        Console.WriteLine("connection_ConnectionShutdown " + reason.ToString());
        ReStart();
    }

    private void Connection_CallbackException(object sender, CallbackExceptionEventArgs e)
    {
        Console.WriteLine("connection_CallbackException " + e.Exception.StackTrace);
        ReStart();
    }

    private void Channel_ModelShutdown(IModel model, ShutdownEventArgs reason)
    {
        Console.WriteLine("CHANNEL__MODEL_SHUTDOWN " + reason.ToString());
        ReStart();
    }
}

Receiver Code

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Threading;
using RabbitMQ.Client.Exceptions;
using System;

class Receive {

    private ConnectionFactory factory = new ConnectionFactory();
    
    private IConnection connection = null;
    private IModel channel = null;
    private QueueingBasicConsumer consumer = null;

    public Receive ()
    {
        factory.HostName = "C1799";
    }

    private void Setup()
    {
        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        channel.ModelShutdown += Channel_ModelShutdown;
        connection.CallbackException += Connection_CallbackException;
        connection.ConnectionShutdown += Connection_ConnectionShutdown;

        bool isDurable = true;
        bool exclusive = false;
        bool autoDelete = false;
        bool noAck = false;

        channel.QueueDeclare("Killer", isDurable, exclusive, autoDelete, null);

        consumer = new QueueingBasicConsumer(channel);
        channel.BasicConsume("Killer", noAck, consumer);

        System.Console.WriteLine(" [*] Waiting for messages." +
                                 "To exit press CTRL+C");
    }

    private void CleanUp()
    {
        if (connection != null)
        {
            connection.Dispose();
            connection.Close();
        }
        if (channel != null)
        {
            channel.Dispose();
            channel.Close();
        }
    }

    private void Listen()
    {
        try
        {
            while (true)
            {

                if (!channel.IsOpen)
                    throw new Exception("Channel is closed");

                BasicDeliverEventArgs ea =
                    (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                byte[] body = ea.Body;
                string s = ea.BasicProperties.CorrelationId;
                string message = System.Text.Encoding.UTF8.GetString(body);
                channel.BasicAck(ea.DeliveryTag, false);
                Console.WriteLine(" [x] Received {0}", message);
            }

        }
        catch (Exception ex)
        {
            Console.WriteLine("SOMETHING IS WRONG!!!!   " + ex.Message);
        }
        finally
        {
            CleanUp();
        }
    }

    public static void Main() 
    {
        Receive r = new Receive();
        r.Setup();
        r.Listen();
    }

    private void ReStart()
    {
        connection.CallbackException -= Connection_CallbackException;
        connection.ConnectionShutdown -= Connection_ConnectionShutdown;

        CleanUp();
        Setup();
        Listen();
    }

    private void Connection_ConnectionShutdown(IConnection connection, ShutdownEventArgs reason)
    {
        Console.WriteLine("connection_ConnectionShutdown " + reason.ToString());
        ReStart();
    }

    private void Connection_CallbackException(object sender, CallbackExceptionEventArgs e)
    {
        Console.WriteLine("connection_CallbackException " + e.Exception.StackTrace);
        ReStart();
    }

    private void Channel_ModelShutdown(IModel model, ShutdownEventArgs reason)
    {
        Console.WriteLine("CHANNEL__MODEL_SHUTDOWN " + reason.ToString());
        ReStart();
    }
}

I think the code is pretty self-explanatory, so I will not go into it too much. A lot of this is pretty much what you get from the Rabbit Mq samples, just refactored slightly to the structure shown above.

That's It

Anyway that is all I wanted to say for now, I realise this is not my normal type of article but rather a step by step instruction type article (which I rarely do), but both Richard and I took a while to get this setup of Rabbit correct, so we just felt it was worth sharing with others. If you like it or feel it's useful, please take sometime to write a comment, or share a vote, both are welcome. Thanks.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here