This sample app is written using .NET 4.0 in C#.
Before we design the simplified APIs described above, let's take a brief tour of the functionality the
RabbitMQ.Client
provides. Here's example code to highlight how the
RabbitMQ.Client
API allows us to interact with the RabbitMQ server.
RabbitMQ.Client.ConnectionFactory factory = new RabbitMQ.Client.ConnectionFactory();
factory.Endpoint = new AmqpTcpEndpoint(server);
factory.UserName = userName;
factory.Password = password;
IConnection Connection = factory.CreateConnection();
To connect to the RabbitMQ server, a connection has to be established between the client and the server as shown above. Line 7 creates the connection that we're going to use in the following example.
The connection is not what we're going to use to communicate with the server, instead we use dedicated communication channel called IModel
object that the connection object creates for us.
IModel
is the communication channel between the client and the broker and multiple channels can be made. One of the RabbitMQ oddities is that the IModel
object can't be shared between threads and we need to make sure to take that into account when designing our applications. The next section shows some of the most commonly used features that the channel object provides.
using (IModel channel = Connection.CreateModel())
{
...
channel.ExchangeDeclare( );
channel.QueueDeclare( );
channel.QueueBind( );
channel.BasicPublish( );
channel.BasicGet( );
channel.BasicAck( );
channel.BasicReject( );
channel.BasicReject( );
}
Oddly enough, Requeueing messages uses the
BasicReject(...)
method.
BasicReject
has a
requeue
parameter, and by setting that to
true
, the message is requeued.
Creating a Dedicated AMQP App Config Section
When we use RabbitMQ, we have to configure the server to appropriately receive and distribute messages to clients as well as configure our application to send or receive messages from the appropriate exchange or queues.
The easiest way to do that is to use the configuration file to do so, this also allows us to easily change these values as our application get's deployed to various environments.
I chose to create a dedicated configuration section for RabbitMQ because there's a lot to configure and I wanted to keep all of the RabbitMQ configuration elements together. The result is shown below (code can be found at CodePlex project: codeplex_link.
<configuration>
<configsections>
<sectiongroup name="AMQPConnection">
<section name="ConnectionSettings"
type="Sample.Configuration.AMQP.Config.ConnectionSection,
Sample.Configuration.AMQP" />
</sectiongroup>
<sectiongroup name="AMQPAdmin">
<section name="AMQPObjectsDeclaration"
type="Sample.Configuration.AMQP.Config.AMQPObjectsDeclarationSection,
Sample.Configuration.AMQP" allowlocation="true"
allowdefinition="Everywhere" />
</sectiongroup>
</configsections>
<amqpadmin>
<amqpobjectsdeclaration>
<exchangelist>
<add name="orders" type="topic"
durable="true" autodelete="false" />
</exchangelist>
<queuelist>
<add name="uk_orders" durable="true"
autodelete="false" />
</queuelist>
<bindinglist>
<add subscriptionkey="order.uk.#"
queue="uk_orders" exchange="orders" />
</bindinglist>
</amqpobjectsdeclaration>
</amqpadmin>
<amqpconnection>
<connectionsettings>
<connection name="connection"
username="guest" server="devserver-rr1"
password="guest" />
<publisher exchange="orders" />
<asyncreceiver queue="uk_orders" maxthreads="4" />
</connectionsettings>
</amqpconnection>
</configuration>
Description of the Configuration File by Lines
- 4, 7: Map the configuration section interpreter to the appropriate class in your project. Each XML element represents an object and has to be interpreted
- 13. Creates a topic exchange called orders that is durable (meaning it will persist a server reboot) and is not autodelete (if the autodelete flag is set to
true
, the exchange will be removed when all clients are done publishing to it). - 16. Creates a queue called "
uk_orders
" to represent all orders for UK customers. The queue is set to durable and not autodelete - 19. Binds the "
uk_orders
" queue to the "orders
" exchange using the "order.uk.#
" subscription key. That way, all orders starting with "order.uk
" will end up in that queue - 25: Configures the connection string to the server.
- 26,. Configures the publisher to publish messages to the order exchange
- 27. Configures the asynchronous receiver to use 4 threads to pick up orders from "
uk_orders
" queue as they arrive
The configuration sections can accept lists of each type of AMQP object (exchange, queue and binding).
RabbitAdmin
So what does this look like when used in creating exchanges, queues and their bindings?
namespace Sample.Configuration.AMQP.Admin
{
public class RabbitAdmin
{
internal static void InitializeObjects(IConnection Connection)
{
var config = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
var objects = config.GetSection
("AMQPAdmin/AMQPObjectsDeclaration") as AMQPObjectsDeclarationSection;
if (objects != null)
{
Parallel.For(0, objects.ExchangeList.Count, i =>
{
using (IModel channel = Connection.CreateModel())
{
var exchange = objects.ExchangeList[i];
channel.ExchangeDeclare(exchange.Name,
exchange.Type.ToString(), exchange.Durable, exchange.AutoDelete, null);
}
});
Parallel.For(0, objects.QueueList.Count, i =>
{
using (IModel channel = Connection.CreateModel())
{
var queue = objects.QueueList[i];
channel.QueueDeclare(queue.Name, queue.Durable,
queue.Exclusive, queue.AutoDelete, null);
}
});
Parallel.For(0, objects.BindingList.Count, i =>
{
using (IModel channel = Connection.CreateModel())
{
var binding = objects.BindingList[i];
channel.QueueBind(binding.Queue,
binding.Exchange, binding.SuscriptionKey);
}
});
}
}
}
The RabbitAdmin
class creates the exchanges, queues and their bindings. It takes a connection object and uses that to communicate with the RabbitMQ server. The InitializeObjects
uses the parallel for
loop to create each type of object. Exchanges are created first, then queues, bindings are declared the last since they need both the queue and exchange to be there to bind the two.
GatewayFactory
The GatewayFactory
creates the connection to the server, calls RabbitAdmin
to declare all the objects and provides methods to create the publisher and asynchronous listener helper objects. Messages are passed to the publisher and received from the queues, the message object is a very simple object, it contains a header and a body.
public class Message {
public IBasicProperties Properties { get; set;}
public byte[] Body { get; set; }
public string RoutingKey { get; set; }
}
The body is a simple byte array that your internal data structure is serialized to. To be able to generically handle conversions between data structures and messages, the following delegates and interfaces are included in the package.
namespace Sample.Configuration.AMQP.Gateway
{
public delegate Message ConvertToMessage(IModel channel, object packetToSend);
public interface IConvertToMessage {
Message ConvertObjectToMessage( IModel channel, object packetToSend);
}
}
The client of the system takes care of converting from their data structures to the message structure.
Publishing
using (var gf = new GatewayFactory())
{
var mc = new Sample.Configuration.AMQP.Gateway.Converter.StringToMessageConverter();
var publisher = gf.GetPublisher(mc.ConvertObjectToMessage);
publisher.Publish("Hello world");
}
With configuration in place, publishing is a four line affair. Simply create a GatewayFactory
, that internally reads the configuration file and sets everything up. Then request a publisher and pass an objectToMessage
conversion handler. The library comes with a default string
to message converter that is used in line 5 which is useful for XML documents. Otherwise, a custom converter will have to be created. To give you an idea of how to create a converter, let's take a look at the string
converter.
public class StringToMessageConverter : IConvertToMessage
{
public static readonly string PLAIN_TEXT = "text/plain";
public const string _defaultCharSet = "utf-8";
public string CharSet { get; set; }
public StringToMessageConverter()
{
CharSet = _defaultCharSet;
}
public virtual Message ConvertObjectToMessage
(RabbitMQ.Client.IModel channel, object packetToSend)
{
var properties = channel.CreateBasicProperties();
var bytes = Encoding.GetEncoding(CharSet).GetBytes((string)packetToSend);
properties.ContentType = PLAIN_TEXT;
properties.ContentEncoding = CharSet;
return new Message()
{ Body = bytes, Properties = properties, RoutingKey = string.Empty };
}
}
Conversion to a message is primarily about converting whatever needs to be sent to a bit array, letting the receiver know what the content is and setting the RoutingKey
.
The Asynchronous Receiver
Receiving messages asynchronously is as simple as publishing. The receiver has to communicate back to the server the status of the message - it can be acknowledged, rejected or requeued.
class Program
{
static void Main(string[] args)
{
var mp = new MessageProcessor();
using (var cf = new GatewayFactory())
{
cf.GetAsyncReceiver(mp.ConsumeMessage);
}
}
}
class MessageProcessor : IMessageConsumer
{
public void ConsumeMessage
(Message message, RabbitMQ.Client.IModel channel, DeliveryHeader header)
{
try
{
var str = ConvertFromMessageToString(message);
channel.BasicAck(header.DeliveryTag, false);
}
catch (Exception ex)
{
channel.BasicReject(header.DeliveryTag, true);
}
}
public string ConvertFromMessageToString(Message message)
{
var content = string.Empty;
if (message.Properties.ContentType == StringToMessageConverter.PLAIN_TEXT)
{
var encoding = Encoding.GetEncoding
(message.Properties.ContentEncoding ?? "utf-8");
var ms = new MemoryStream(message.Body);
var reader = new StreamReader(ms, encoding, false);
content = reader.ReadToEnd();
}
return content;
}
}
In Line 8, we pass the message handler to the asynchronous receiver. The async receiver is multi threaded and the handler is expected to be stateless so that it can be used by multiple threads at the same time. Responding to the server is done through the IModel
object. The Delivery header contains a deliverytag
and a flag
to indicate if this was the first time it was delivered.
Something useful with the message can be done between lines 19 and 20.
Results in Action
To test this, I simulated the e-commerce flow I've used as an example previously.
This is the result.
It shows how orders flow into the order exchange and from there to the queues that are interested in orders.
Here, we see how shipments are sent to an exchange and then transmitted to queues that are bound to it.
You can find the source code for a working example of a distributed expert system using rabbitMQ at my source code repository.