Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Hosted-services / Azure

Best practices for using strongly typed messages with Azure Service Bus

4.73/5 (18 votes)
17 Jun 2013CPOL8 min read 86.7K  
This article will show you a pattern for sending and receiving messages using Azure Service Bus + worker roles in a strongly typed manner, taking advantage of the built-in functionality that comes with Brokered Messages.

Introduction

When building Azure web applications, or any web applications for that matter, it's often useful to use a background service to perform expensive tasks, or those which rely on external resources. This helps increase the performance and robustness of your web app. One way Azure allows you to achieve this is by using one or more worker roles and Azure Service bus, where your web app sends messages to the Azure Service bus, which are then retrieved by your worker role. The worker can then perform the appropriate task based on the type of message received, without impacting the performance of your web site. 

This article will look at what I think are some best practices for implementing messaging using a worker role, including:

  • Using a pattern for sending and receiving strongly-typed messages 
  • Logging exceptions
  • Deadlettering of poison messages
  • Automatic polling backoff when no messages are received
  • Use of IOC with Ninject   

Background 

There are plenty of tutorials out there on the web that show examples of using the BrokeredMessage class, which set properties on the message before sending, then read these properties when the message is retrieved. For example, you might have code for sending messages (as shown in this article) that sends the message as follows: 

C#
 // Create message, passing a string message for the body
BrokeredMessage message = new BrokeredMessage("Test message " + i);
// Set some addtional custom app-specific properties
message.Properties["TestProperty"] = "TestValue";
message.Properties["Message number"] = i;   
// Send message to the queue
Client.Send(message); 

and on the receiving side:

C#
BrokeredMessage message = Client.Receive();
Console.WriteLine("Body: " + message.GetBody<string>());
Console.WriteLine("MessageID: " + message.MessageId);
Console.WriteLine("Test Property: " + message.Properties["TestProperty"]); 

Note the second line of the receiver, in which the GetBody<string> method is called. This will only work if the body of the message is in fact a string. The same applies when you might serialize a different type of object in the message body, for example, this article shows a Pizza order being serialized as the message body as follows:

C#
[DataContract]
public class PizzaOrder
{
    [DataMember]
    public string Name { get; set; }
    [DataMember]
    public string Pizza { get; set; }
    [DataMember]
    public int Quantity { get; set; }
}
....
// Create a new pizza order.
PizzaOrder orderIn = new PizzaOrder()
{
    Name = "Alan",
    Pizza = "Hawaiian",
    Quantity = 1
};
 
// Create a brokered message based on the order.
BrokeredMessage orderInMsg = new BrokeredMessage(orderIn);
 
// Send the message to the queue.
queueClient.Send(orderInMsg); 

 and on the receiving side:

C#
BrokeredMessage orderOutMsg = queueClient.Receive();
 
if (orderOutMsg != null)
{
    // Deserialize the message body to a pizza order.
    PizzaOrder orderOut = orderOutMsg.GetBody<PizzaOrder>();
 
    Console.WriteLine("Received order, {0} {1} for {2}",
        orderOut.Quantity, orderOut.Pizza, orderOut.Name);
        
... 

Again, using the .GetBody<T> method allows us to retrieve the message body in a strongly typed manner, but this method will only work if the message body is of type PizzaOrder.

You may have noticed by now that if we use the pattern above, the worker role will only be able to process a single message type. What if we wanted to serialize multiple different types of objects for sending to the background worker role, and have it automatically deserialize them and perform an action based on their type, without knowing what the message type was in advance?  

This article will show how to achieve this, so that the worker role isn't limited to only processing a single message type.    

Motivation for writing this article 

As I mentioned above, all of the examples of the web seem to deal with only needing to send/receive a single type of message. When you need to send/receive multiple types, you have to look at other options, such as either having multiple worker roles, or using a messaging framework such as NServiceBus, or MassTransit.

The inspiration for this article comes from NServiceBus, which (when I last used it some years ago) seemed to be using something similar this pattern for serializing/deserializing messages. I also found the Azure Tutorial at http://www.windowsazure.com/en-us/develop/net/tutorials/multi-tier-web-site/1-overview/ very helpful for this. I thought I'd implement similar code, and package it in a basic solution for anyone to use. 

The basic pattern involves sending a BrokeredMessage to the Azure service containing a serialized message body, and also an additional property specifying the full type of message body. On the receiving end, the worker role uses the type information in the Brokered Message to dynamically determine the serialized type contained within the Brokered Message, and retrieve it in a strongly typed manner. It then calls a predefined handler which is specified for that message type.

Using the code  

Sending messages 

The code for sending messages is fairly simply. You simply need to ensure that all messages are sent through a centralized point, which contains the following code: 

C#
const string QueueName = "ProcessingQueue";
protected void SendQueueMessage<T>(T message)
{
    // Create the queue if it does not exist already
    string connectionString = 
      CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
    var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
    if (!namespaceManager.QueueExists(QueueName))
    {
        namespaceManager.CreateQueue(QueueName);
    }
    // Initialize the connection to Service Bus Queue
    var client = QueueClient.CreateFromConnectionString(connectionString, QueueName);
    // Create message, with the message body being automatically serialized
    BrokeredMessage brokeredMessage = new BrokeredMessage(message);
    brokeredMessage.Properties["messageType"] = message.GetType().AssemblyQualifiedName;
    client.Send(brokeredMessage);
} 

Note the last three lines:

  • The first line creates a brokered message, and uses the built-in functionality to pass in an object that will be serialized as the message body
  • The second line sets a property (messageType) on the brokered message specifying the full type of the object contained in the message body
  • The third line sends the message to the queue 

Note: I've used Visual Studio 2012 to develop the code for this article, and added references to Azure Service bus in my web project in order to use the Azure message queueing functionality:

Install-Package WindowsAzure.ServiceBus  

Receiving Messages

On the receiving end, we need to read the messageType property, then use reflection to extract the message body based on that type. We then need to call the appropriate message handler for that message type.

To achieve this, we first define an interface named IMessageHandler as follows:

C#
public interface IMessageHandler<T> where T : class
{
    void Handle(T message);
} 

We then define a DTO class representing a message that might be sent. For example:

C#
public class PersonMessage
{
    public string Name { get; set; }
    public int Age { get; set; }
} 

Now we need to define a handler for this type of message, for example:

C#
public class PersonMessageHandler : IMessageHandler<PersonMessage>
{
    public void Handle(PersonMessage message)
    {
        //Do something useful with the message here
    }
}

Finally, in our WorkerRole.cs class class, we do the following:

  • In our OnStart method, we configure our IOC container - in this case Ninject, but you could do the same with others - to automatically wire up all implementations of the IMessageHandler interface to the correct implementations. 
  • We also configure diagnostics for our worker role, so any exceptions will automatically be logged to our storage account for analysis  
  • When we receive a message off the queue we read the messageType, use reflection to extract the message body and determine the correct handler type. We then use Ninject to find the appropriate handler, and call that handler. (Note that I didn't have to use Ninject here - I could have used another IOC container, or just used reflection to create an instance of the message handler. I'm already using Ninject elsewhere though, so though I would take advantage of it) 
  • Once the handler has completed, we mark the message as complete so it will be removed from the queue 

We also take care of logging exceptions, deadlettering poison messages, and automatic backoff if no message are received within the current poll interval. The full source code is shown below:

C#
public class WorkerRole : RoleEntryPoint
{
    private IKernel NinjectKernel;
    // The name of your queue
    const string QueueName = "ProcessingQueue";
    int _currentPollInterval = 5000;
    int _minPollInterval = 5000;
    int _maxPollInterval = 300000;
    // QueueClient is thread-safe. Recommended that you cache 
    // rather than recreating it on every request
    QueueClient Client;
    bool IsStopped;
    private volatile bool _returnedFromRunMethod = false;
    public override void Run()
    {
        while (!IsStopped)
        {
            BrokeredMessage msg = null;
            try
            {
                // Receive as many messages as possible (to reduce the number of storage transactions)
                var receivedMessages = Client.ReceiveBatch(32);
                if (receivedMessages.Count() == 0)
                {
                    Thread.Sleep(_currentPollInterval);
                    //No messages, so increase poll interval
                    if (_currentPollInterval < _maxPollInterval)
                    {
                        _currentPollInterval = _currentPollInterval * 2;
                    }
                    continue;
                }
                //At least one message, so reset our poll interval
                _currentPollInterval = _minPollInterval;
                foreach (var receivedMessage in receivedMessages)
                {
                    msg = receivedMessage;
                    // Process the message
                    Trace.WriteLine("Processing", receivedMessage.SequenceNumber.ToString());
                    //If it's a poison message, move it off to the deadletter queue
                    if (receivedMessage.DeliveryCount > 3)
                    {
                        Trace.TraceError("Deadlettering poison message: message {0}", 
                                     receivedMessage.ToString());
                        receivedMessage.DeadLetter();
                        continue;
                    }
                    //Get actual message type
                    var messageBodyType = 
                      Type.GetType(receivedMessage.Properties["messageType"].ToString());
                    if (messageBodyType == null)
                    {
                        //Should never get here as a messagebodytype should
                        //always be set BEFORE putting the message on the queue
                        Trace.TraceError("Message does not have a messagebodytype" + 
                          " specified, message {0}", receivedMessage.MessageId);
                        receivedMessage.DeadLetter();
                    }
                    //Use reflection to figure out the type
                    //of object contained in the message body, and extract it
                    MethodInfo method = typeof(BrokeredMessage).GetMethod("GetBody", new Type[] { });
                    MethodInfo generic = method.MakeGenericMethod(messageBodyType);
                    var messageBody = generic.Invoke(receivedMessage, null);
                    //Process the message contents
                    ProcessMessage(messageBody);
                    //Everything ok, so take it off the queue
                    receivedMessage.Complete();
                }
            }
            catch (MessagingException e)
            {
                if (!e.IsTransient)
                {
                    Trace.WriteLine(e.ToString());
                }
                Thread.Sleep(10000);
            }
            catch (Exception ex)
            {
                string err = ex.ToString();
                if (ex.InnerException != null)
                {
                    err += "\r\n Inner Exception: " + ex.InnerException.ToString();
                }
                if (msg != null)
                {
                    err += "\r\n Last queue message retrieved: " + msg.ToString();
                }
                Trace.TraceError(err);
                // Don't fill up Trace storage if we have a bug in either process loop.
                System.Threading.Thread.Sleep(1000 * 60);
            }
        }
        // If OnStop has been called, return to do a graceful shutdown.
        _returnedFromRunMethod = true;
        Trace.WriteLine("Exiting run method");
    }
    public override bool OnStart()
    {
        // Set the maximum number of concurrent connections 
        ServicePointManager.DefaultConnectionLimit = 12;
        //Diagnostics
        ConfigureDiagnostics();
        // Create the queue if it does not exist already
        string connectionString = 
          CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
        var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
        if (!namespaceManager.QueueExists(QueueName))
        {
            namespaceManager.CreateQueue(QueueName);
        }
        // Initialize the connection to Service Bus Queue
        Client = QueueClient.CreateFromConnectionString(connectionString, QueueName);
        IsStopped = false;
        ConfigureNinject();
        return base.OnStart();
    }
    private void ConfigureDiagnostics()
    {
        DiagnosticMonitorConfiguration config = DiagnosticMonitor.GetDefaultInitialConfiguration();
        config.ConfigurationChangePollInterval = TimeSpan.FromMinutes(1d);
        config.Logs.BufferQuotaInMB = 500;
        config.Logs.ScheduledTransferLogLevelFilter = LogLevel.Verbose;
        config.Logs.ScheduledTransferPeriod = TimeSpan.FromMinutes(1d);
        DiagnosticMonitor.Start(
               "Microsoft.WindowsAzure.Plugins.Diagnostics.ConnectionString",
               config);
    }
    private void ConfigureNinject()
    {
        var kernel = new StandardKernel();
        kernel.Bind(x => x.FromThisAssembly()
              .SelectAllClasses().InheritedFrom(typeof(IMessageHandler<>))
              .BindAllInterfaces());
        NinjectKernel = kernel;
    }
    public override void OnStop()
    {
        // Close the connection to Service Bus Queue
        IsStopped = true;
        while (_returnedFromRunMethod == false)
        {
            System.Threading.Thread.Sleep(1000);
        }
        Client.Close();
        base.OnStop();
    }
    /// <summary>
    /// Locates the correct handler type, and executes it using the current message
    /// </summary>
    /// <typeparam name="T">The type of message</typeparam>
    /// <param name="message">The actual message body</param>
    public void ProcessMessage<T>(T message) where T : class
    {
        //Voodoo to construct the right message handler type
        Type handlerType = typeof(IMessageHandler<>);
        Type[] typeArgs = { message.GetType() };
        Type constructed = handlerType.MakeGenericType(typeArgs);
		//NOTE: Could just use reflection here to locate and create an instance
		// of the desired message handler type here if you didn't want to use an IOC container...
        //Get an instance of the message handler type
        var handler = NinjectKernel.Get(constructed);
        //Handle the message
        var methodInfo = constructed.GetMethod("Handle");
        methodInfo.Invoke(handler, new[] { message });
    }
} 

As you can see, this code does quite a lot, but all of it addresses the various best practices mentioned earlier. I'd recommend downloading the full source code from GitHub if you're interested in applying this pattern in your chosen solution.  

Note: I've used Visual Studio 2012 to develop the code for this article, and added references to Azure Service bus in my worker project in order to use the Azure message queuing functionality. Since I used Ninject, I also had to add the relevant NuGet packages for it, as shown below: 

Install-Package WindowsAzure.ServiceBus
Install-Package ninject.extensions.conventions -Version 3.0.0.11 

Where is the source code?  

All source code for this article is available within the repository for my YouConf website on GitHub at https://github.com/phillee007/youconf. The relevant parts to look at are:

  • The YouConfWorker project, which contains the code for the Azure worker role that receives and processes messages
  • The YouConf.Common project, which contains the types of messages that will be serialized within the body of the brokered messages 
  • The BaseController class within the YouConf web project, which is responsible for sending messages using the manner described earlier  

 Additional points of interest 

Reflection and other IOC containers 

As mentioned in earlier comments - whilst this sample uses Ninject to automatically locate the correct message handler for a given message type, you could just as easily use plain old reflection to accomplish the tame task, or your own IOC container of choice. I would recommend using an IOC container personally, as this is exactly the sort of situation that they can help with, and take away a lot of the plumbing code you would otherwise have to write.

Multi-threading

If the tasks being performed by your worker role are heavily network IO-bound (for example, relying on making long-running external web service calls or similar), or you're running on a multi-core instance (medium/large role) you might want to look at making your worker role multi-threaded. This could be achieved fairly easily using the the Task Parallel library and wrapping the call to ProcessMessage in a call to Task.Run (.Net 4.5) or TaskFactory.StartNew (.Net 4.0). Just make sure to catch and process any AggregateExceptions in addition to the other exceptions that could be generated.   

Note that whether you want or need to use multi-threading will depend largely on your particular circumstances and threading knowledge, hence it's safer not to attempt it till you're confident it will help. 

Caveat - Service Level Agreements (SLAs)

If your worker role has to conform to a specific SLA for a given type of message type (e.g. x number of messages processed per second) it might be better to not use this approach, but rather, use a separate worker role for each difference message type. This would allow you to more accurately measure your throughput for specific message types, and could be relevant where, for example, you have a different message type per user/client of your system, and you've guaranteed them that you can achieve a given level of throughput for them. 

Your Comments

If  you have any comments or feedback on how I could improve this article, please let me know, as this is still a work in progress as my knowledge of Azure improves.  

Finally, if you found this article useful, be sure to check out my main CodeProject article - http://www.codeproject.com/Articles/584534/YouConf-Your-Live-Online-Conferencing-Tool, which contains a multitude of tips like this to help you build a rock-solid Azure application. This was just one of many discoveries I made during the Azure Developer Challenge. 

Thanks for reading and I hope you've learned something useful!  

References 

History

  • 6/6/2013 - Initial article.
  • 14/06/2013 - minor grammatical corrections 

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)