Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / All-Topics

Creating an AMQP Sample App using RabbitMQ

5.00/5 (1 vote)
9 Dec 2011CPOL6 min read 49.8K  
How to create an AMQP sample app using RabbitMQ

This sample app is written using .NET 4.0 in C#.

RabbitMQ provides a .NET client library appropriately called RabbitMQ.Client. Documentation on the client can be found here. As with all new technologies, the documentation is bare bones, the API is kind of raw and disorganized in that almost everything is accessible from one object - IModel. So one of the first things to do is to:

  1. Make it a little easier to use by organizing the functionality around our most common use cases.
    1. Publish something
    2. Receive something
  2. Separate the declarative from functional.
    • The process of creating exchanges, queues and bindings is separate from
    • The process of publishing and receiving messages.
  3. Make it configurable using the app.config file.
Before we design the simplified APIs described above, let's take a brief tour of the functionality the RabbitMQ.Client provides. Here's example code to highlight how the RabbitMQ.Client API allows us to interact with the RabbitMQ server.
C#
// Creating Connections
RabbitMQ.Client.ConnectionFactory factory = new RabbitMQ.Client.ConnectionFactory();
factory.Endpoint = new AmqpTcpEndpoint(server);
factory.UserName = userName;
factory.Password = password;

IConnection Connection = factory.CreateConnection();
To connect to the RabbitMQ server, a connection has to be established between the client and the server as shown above. Line 7 creates the connection that we're going to use in the following example.

The connection is not what we're going to use to communicate with the server, instead we use dedicated communication channel called IModel object that the connection object creates for us.

IModel is the communication channel between the client and the broker and multiple channels can be made. One of the RabbitMQ oddities is that the IModel object can't be shared between threads and we need to make sure to take that into account when designing our applications. The next section shows some of the most commonly used features that the channel object provides.

C#
using (IModel channel = Connection.CreateModel())
{
    ... // Object declaration //
    channel.ExchangeDeclare( /* parameters */);
    channel.QueueDeclare( /* parameters */);
    channel.QueueBind( /* parameters */);

// Publishing 
     channel.BasicPublish( /* object parameters */);

// Synchronous receiving
    channel.BasicGet( /* parameters */);

// Acknowledging a message
   channel.BasicAck( /* parameters */);

// Reject a message
   channel.BasicReject( /* parameters */);

// Requeue a message
   channel.BasicReject( /* parameters - make sure to set requeue = true */);
}
Oddly enough, Requeueing messages uses the BasicReject(...) method. BasicReject has a requeue parameter, and by setting that to true, the message is requeued.

Creating a Dedicated AMQP App Config Section

When we use RabbitMQ, we have to configure the server to appropriately receive and distribute messages to clients as well as configure our application to send or receive messages from the appropriate exchange or queues.

The easiest way to do that is to use the configuration file to do so, this also allows us to easily change these values as our application get's deployed to various environments.

I chose to create a dedicated configuration section for RabbitMQ because there's a lot to configure and I wanted to keep all of the RabbitMQ configuration elements together. The result is shown below (code can be found at CodePlex project: codeplex_link.

XML
<configuration>
  <configsections>
    <sectiongroup name="AMQPConnection">
      <section name="ConnectionSettings" 
      type="Sample.Configuration.AMQP.Config.ConnectionSection, 
      Sample.Configuration.AMQP" />
    </sectiongroup>
    <sectiongroup name="AMQPAdmin">
      <section name="AMQPObjectsDeclaration" 
      type="Sample.Configuration.AMQP.Config.AMQPObjectsDeclarationSection, 
      Sample.Configuration.AMQP" allowlocation="true" 
      allowdefinition="Everywhere" />
    </sectiongroup>
  </configsections>
  <amqpadmin>
    <amqpobjectsdeclaration>
      <exchangelist>
        <add name="orders" type="topic" 
        durable="true" autodelete="false" />
      </exchangelist>
      <queuelist>
        <add name="uk_orders" durable="true" 
        autodelete="false" />
      </queuelist>
      <bindinglist>
        <add subscriptionkey="order.uk.#" 
        queue="uk_orders" exchange="orders" />
      </bindinglist>
    </amqpobjectsdeclaration>
  </amqpadmin>
  <amqpconnection>
    <connectionsettings>
      <connection name="connection" 
      username="guest" server="devserver-rr1" 
      password="guest" />
      <publisher exchange="orders" />
      <asyncreceiver queue="uk_orders" maxthreads="4" />
    </connectionsettings>
  </amqpconnection>
  </configuration>

Description of the Configuration File by Lines

  • 4, 7: Map the configuration section interpreter to the appropriate class in your project. Each XML element represents an object and has to be interpreted
  • 13. Creates a topic exchange called orders that is durable (meaning it will persist a server reboot) and is not autodelete (if the autodelete flag is set to true, the exchange will be removed when all clients are done publishing to it).
  • 16. Creates a queue called "uk_orders" to represent all orders for UK customers. The queue is set to durable and not autodelete
  • 19. Binds the "uk_orders" queue to the "orders" exchange using the "order.uk.#" subscription key. That way, all orders starting with "order.uk" will end up in that queue
  • 25: Configures the connection string to the server.
  • 26,. Configures the publisher to publish messages to the order exchange
  • 27. Configures the asynchronous receiver to use 4 threads to pick up orders from "uk_orders" queue as they arrive

The configuration sections can accept lists of each type of AMQP object (exchange, queue and binding).

RabbitAdmin

So what does this look like when used in creating exchanges, queues and their bindings?

C#
namespace Sample.Configuration.AMQP.Admin
{
    public class RabbitAdmin
    {
        internal static void InitializeObjects(IConnection Connection)
        {
            var config = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
            var objects = config.GetSection
            ("AMQPAdmin/AMQPObjectsDeclaration") as AMQPObjectsDeclarationSection;
            if (objects != null)
            {
                Parallel.For(0, objects.ExchangeList.Count, i =>
                {
                    using (IModel channel = Connection.CreateModel())
                    {
                        var exchange = objects.ExchangeList[i];
                        channel.ExchangeDeclare(exchange.Name, 
                        exchange.Type.ToString(), exchange.Durable, exchange.AutoDelete, null);
                    }
                });
                Parallel.For(0, objects.QueueList.Count, i =>
                {
                    using (IModel channel = Connection.CreateModel())
                    {
                        var queue = objects.QueueList[i];
                        channel.QueueDeclare(queue.Name, queue.Durable, 
                        queue.Exclusive, queue.AutoDelete, null);
                    }
                });
                Parallel.For(0, objects.BindingList.Count, i =>
                {
                    using (IModel channel = Connection.CreateModel())
                    {
                        var binding = objects.BindingList[i];
                        channel.QueueBind(binding.Queue, 
                        binding.Exchange, binding.SuscriptionKey);
                    }
                });
            }
      }
}

The RabbitAdmin class creates the exchanges, queues and their bindings. It takes a connection object and uses that to communicate with the RabbitMQ server. The InitializeObjects uses the parallel for loop to create each type of object. Exchanges are created first, then queues, bindings are declared the last since they need both the queue and exchange to be there to bind the two.

GatewayFactory

The GatewayFactory creates the connection to the server, calls RabbitAdmin to declare all the objects and provides methods to create the publisher and asynchronous listener helper objects. Messages are passed to the publisher and received from the queues, the message object is a very simple object, it contains a header and a body.

C#
/// <summary>
/// The object encapsulating all the common message properties
/// transmitted to and received from the message bus.
/// </summary>
public class Message {
    public IBasicProperties Properties { get; set;}
    public byte[] Body { get; set; }
    public string RoutingKey { get; set; }
}

The body is a simple byte array that your internal data structure is serialized to. To be able to generically handle conversions between data structures and messages, the following delegates and interfaces are included in the package.

C#
namespace Sample.Configuration.AMQP.Gateway
{
    public delegate Message ConvertToMessage(IModel channel, object packetToSend);
        
    public interface IConvertToMessage {
        Message ConvertObjectToMessage( IModel channel, object packetToSend);
    }
}

The client of the system takes care of converting from their data structures to the message structure.

Publishing

C#
using (var gf = new GatewayFactory())
{
    var mc = new Sample.Configuration.AMQP.Gateway.Converter.StringToMessageConverter();
    var publisher = gf.GetPublisher(mc.ConvertObjectToMessage);
    publisher.Publish("Hello world");
}

With configuration in place, publishing is a four line affair. Simply create a GatewayFactory, that internally reads the configuration file and sets everything up. Then request a publisher and pass an objectToMessage conversion handler. The library comes with a default string to message converter that is used in line 5 which is useful for XML documents. Otherwise, a custom converter will have to be created. To give you an idea of how to create a converter, let's take a look at the string converter.

C#
public class StringToMessageConverter : IConvertToMessage
{
    public static readonly string PLAIN_TEXT = "text/plain";
    public const string _defaultCharSet = "utf-8";
    public string CharSet { get; set; }
    public StringToMessageConverter()
    {
        CharSet = _defaultCharSet;
    }
    public virtual Message ConvertObjectToMessage
    (RabbitMQ.Client.IModel channel, object packetToSend)
    {
        var properties = channel.CreateBasicProperties();
        var bytes = Encoding.GetEncoding(CharSet).GetBytes((string)packetToSend);
        properties.ContentType = PLAIN_TEXT;
        properties.ContentEncoding = CharSet;
        return new Message()
        { Body = bytes, Properties = properties, RoutingKey = string.Empty };
    }
}

Conversion to a message is primarily about converting whatever needs to be sent to a bit array, letting the receiver know what the content is and setting the RoutingKey.

The Asynchronous Receiver

Receiving messages asynchronously is as simple as publishing. The receiver has to communicate back to the server the status of the message - it can be acknowledged, rejected or requeued.

C#
class Program
{
    static void Main(string[] args)
    {
        var mp = new MessageProcessor();
        using (var cf = new GatewayFactory())
        {
            cf.GetAsyncReceiver(mp.ConsumeMessage);
        }
    }
}

class MessageProcessor : IMessageConsumer
{
    public void ConsumeMessage
    (Message message, RabbitMQ.Client.IModel channel, DeliveryHeader header)
    {
        try
        {
            var str = ConvertFromMessageToString(message);
            channel.BasicAck(header.DeliveryTag, false);
        }
        catch (Exception ex)
        {
            channel.BasicReject(header.DeliveryTag, true);
        }
    }
    public string ConvertFromMessageToString(Message message)
    {
        var content = string.Empty;
        if (message.Properties.ContentType == StringToMessageConverter.PLAIN_TEXT)
        {
            var encoding = Encoding.GetEncoding
            (message.Properties.ContentEncoding ?? "utf-8");
            var ms = new MemoryStream(message.Body);
            var reader = new StreamReader(ms, encoding, false);
            content = reader.ReadToEnd();
        }
        return content;
    }
}

In Line 8, we pass the message handler to the asynchronous receiver. The async receiver is multi threaded and the handler is expected to be stateless so that it can be used by multiple threads at the same time. Responding to the server is done through the IModel object. The Delivery header contains a deliverytag and a flag to indicate if this was the first time it was delivered.
Something useful with the message can be done between lines 19 and 20.

Results in Action

To test this, I simulated the e-commerce flow I've used as an example previously.

Image 1

This is the result.

Image 2

It shows how orders flow into the order exchange and from there to the queues that are interested in orders.

Image 3

Here, we see how shipments are sent to an exchange and then transmitted to queues that are bound to it.

You can find the source code for a working example of a distributed expert system using rabbitMQ at my source code repository.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)