Introduction
When building Azure web applications, or any web applications for that matter, it's often useful to use a background service to perform expensive tasks, or those which rely on external resources. This helps increase the performance and robustness of your web app. One way Azure allows you to achieve this is by using one or more worker roles and Azure Service bus, where your web app sends messages to the Azure Service bus, which are then retrieved by your worker role. The worker can then perform the appropriate task based on the type of message received, without impacting the performance of your web site.
This article will look at what I think are some best practices for implementing messaging using a worker role, including:
- Using a pattern for sending and receiving strongly-typed messages
- Logging exceptions
- Deadlettering of poison messages
- Automatic polling backoff when no messages are received
- Use of IOC with Ninject
Background
There are plenty of tutorials out there on the web that show examples of using the BrokeredMessage
class, which set properties on the message before sending, then read these properties when the message is retrieved. For example, you might have code for sending messages (as shown in this article) that sends the message as follows:
BrokeredMessage message = new BrokeredMessage("Test message " + i);
message.Properties["TestProperty"] = "TestValue";
message.Properties["Message number"] = i;
Client.Send(message);
and on the receiving side:
BrokeredMessage message = Client.Receive();
Console.WriteLine("Body: " + message.GetBody<string>());
Console.WriteLine("MessageID: " + message.MessageId);
Console.WriteLine("Test Property: " + message.Properties["TestProperty"]);
Note the second line of the receiver, in which the GetBody<string>
method is called. This will only work if the body of the message is in fact a string. The same applies when you might serialize a different type of object in the message body, for example, this article shows a Pizza order being serialized as the message body as follows:
[DataContract]
public class PizzaOrder
{
[DataMember]
public string Name { get; set; }
[DataMember]
public string Pizza { get; set; }
[DataMember]
public int Quantity { get; set; }
}
....
PizzaOrder orderIn = new PizzaOrder()
{
Name = "Alan",
Pizza = "Hawaiian",
Quantity = 1
};
BrokeredMessage orderInMsg = new BrokeredMessage(orderIn);
queueClient.Send(orderInMsg);
and on the receiving side:
BrokeredMessage orderOutMsg = queueClient.Receive();
if (orderOutMsg != null)
{
PizzaOrder orderOut = orderOutMsg.GetBody<PizzaOrder>();
Console.WriteLine("Received order, {0} {1} for {2}",
orderOut.Quantity, orderOut.Pizza, orderOut.Name);
...
Again, using the .GetBody<T>
method allows us to retrieve the message body in a strongly typed manner, but this method will only work if the message body is of type PizzaOrder
.
You may have noticed by now that if we use the pattern above, the worker role will only be able to process a single message type. What if we wanted to serialize multiple different types of objects for sending to the background worker role, and have it automatically deserialize them and perform an action based on their type, without knowing what the message type was in advance?
This article will show how to achieve this, so that the worker role isn't limited to only processing a single message type.
Motivation for writing this article
As I mentioned above, all of the examples of the web seem to deal with only needing to send/receive a single type of message. When you need to send/receive multiple types, you have to look at other options, such as either having multiple worker roles, or using a messaging framework such as NServiceBus, or MassTransit.
The inspiration for this article comes from NServiceBus, which (when I last used it some years ago) seemed to be using something similar this pattern for serializing/deserializing messages. I also found the Azure Tutorial at http://www.windowsazure.com/en-us/develop/net/tutorials/multi-tier-web-site/1-overview/ very helpful for this. I thought I'd implement similar code, and package it in a basic solution for anyone to use.
The basic pattern involves sending a BrokeredMessage
to the Azure service containing a serialized message body, and also an additional property specifying the full type of message body. On the receiving end, the worker role uses the type information in the Brokered Message to dynamically determine the serialized type contained within the Brokered Message, and retrieve it in a strongly typed manner. It then calls a predefined handler which is specified for that message type.
Using the code
Sending messages
The code for sending messages is fairly simply. You simply need to ensure that all messages are sent through a centralized point, which contains the following code:
const string QueueName = "ProcessingQueue";
protected void SendQueueMessage<T>(T message)
{
string connectionString =
CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
if (!namespaceManager.QueueExists(QueueName))
{
namespaceManager.CreateQueue(QueueName);
}
var client = QueueClient.CreateFromConnectionString(connectionString, QueueName);
BrokeredMessage brokeredMessage = new BrokeredMessage(message);
brokeredMessage.Properties["messageType"] = message.GetType().AssemblyQualifiedName;
client.Send(brokeredMessage);
}
Note the last three lines:
- The first line creates a brokered message, and uses the built-in functionality to pass in an object that will be serialized as the message body
- The second line sets a property (
messageType
) on the brokered message specifying the full type of the object contained in the message body - The third line sends the message to the queue
Note: I've used Visual Studio 2012 to develop the code for this article, and added references to Azure Service bus in my web project in order to use the Azure message queueing functionality:
Install-Package WindowsAzure.ServiceBus
Receiving Messages
On the receiving end, we need to read the messageType
property, then use reflection to extract the message body based on that type. We then need to call the appropriate message handler for that message type.
To achieve this, we first define an interface named IMessageHandler
as follows:
public interface IMessageHandler<T> where T : class
{
void Handle(T message);
}
We then define a DTO class representing a message that might be sent. For example:
public class PersonMessage
{
public string Name { get; set; }
public int Age { get; set; }
}
Now we need to define a handler for this type of message, for example:
public class PersonMessageHandler : IMessageHandler<PersonMessage>
{
public void Handle(PersonMessage message)
{
}
}
Finally, in our WorkerRole.cs class class, we do the following:
- In our
OnStart
method, we configure our IOC container - in this case Ninject, but you could do the same with others - to automatically wire up all implementations of the IMessageHandler
interface to the correct implementations. - We also configure diagnostics for our worker role, so any exceptions will automatically be logged to our storage account for analysis
- When we receive a message off the queue we read the
messageType
, use reflection to extract the message body and determine the correct handler type. We then use Ninject to find the appropriate handler, and call that handler. (Note that I didn't have to use Ninject here - I could have used another IOC container, or just used reflection to create an instance of the message handler. I'm already using Ninject elsewhere though, so though I would take advantage of it) - Once the handler has completed, we mark the message as complete so it will be removed from the queue
We also take care of logging exceptions, deadlettering poison messages, and automatic backoff if no message are received within the current poll interval. The full source code is shown below:
public class WorkerRole : RoleEntryPoint
{
private IKernel NinjectKernel;
const string QueueName = "ProcessingQueue";
int _currentPollInterval = 5000;
int _minPollInterval = 5000;
int _maxPollInterval = 300000;
QueueClient Client;
bool IsStopped;
private volatile bool _returnedFromRunMethod = false;
public override void Run()
{
while (!IsStopped)
{
BrokeredMessage msg = null;
try
{
var receivedMessages = Client.ReceiveBatch(32);
if (receivedMessages.Count() == 0)
{
Thread.Sleep(_currentPollInterval);
if (_currentPollInterval < _maxPollInterval)
{
_currentPollInterval = _currentPollInterval * 2;
}
continue;
}
_currentPollInterval = _minPollInterval;
foreach (var receivedMessage in receivedMessages)
{
msg = receivedMessage;
Trace.WriteLine("Processing", receivedMessage.SequenceNumber.ToString());
if (receivedMessage.DeliveryCount > 3)
{
Trace.TraceError("Deadlettering poison message: message {0}",
receivedMessage.ToString());
receivedMessage.DeadLetter();
continue;
}
var messageBodyType =
Type.GetType(receivedMessage.Properties["messageType"].ToString());
if (messageBodyType == null)
{
Trace.TraceError("Message does not have a messagebodytype" +
" specified, message {0}", receivedMessage.MessageId);
receivedMessage.DeadLetter();
}
MethodInfo method = typeof(BrokeredMessage).GetMethod("GetBody", new Type[] { });
MethodInfo generic = method.MakeGenericMethod(messageBodyType);
var messageBody = generic.Invoke(receivedMessage, null);
ProcessMessage(messageBody);
receivedMessage.Complete();
}
}
catch (MessagingException e)
{
if (!e.IsTransient)
{
Trace.WriteLine(e.ToString());
}
Thread.Sleep(10000);
}
catch (Exception ex)
{
string err = ex.ToString();
if (ex.InnerException != null)
{
err += "\r\n Inner Exception: " + ex.InnerException.ToString();
}
if (msg != null)
{
err += "\r\n Last queue message retrieved: " + msg.ToString();
}
Trace.TraceError(err);
System.Threading.Thread.Sleep(1000 * 60);
}
}
_returnedFromRunMethod = true;
Trace.WriteLine("Exiting run method");
}
public override bool OnStart()
{
ServicePointManager.DefaultConnectionLimit = 12;
ConfigureDiagnostics();
string connectionString =
CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
if (!namespaceManager.QueueExists(QueueName))
{
namespaceManager.CreateQueue(QueueName);
}
Client = QueueClient.CreateFromConnectionString(connectionString, QueueName);
IsStopped = false;
ConfigureNinject();
return base.OnStart();
}
private void ConfigureDiagnostics()
{
DiagnosticMonitorConfiguration config = DiagnosticMonitor.GetDefaultInitialConfiguration();
config.ConfigurationChangePollInterval = TimeSpan.FromMinutes(1d);
config.Logs.BufferQuotaInMB = 500;
config.Logs.ScheduledTransferLogLevelFilter = LogLevel.Verbose;
config.Logs.ScheduledTransferPeriod = TimeSpan.FromMinutes(1d);
DiagnosticMonitor.Start(
"Microsoft.WindowsAzure.Plugins.Diagnostics.ConnectionString",
config);
}
private void ConfigureNinject()
{
var kernel = new StandardKernel();
kernel.Bind(x => x.FromThisAssembly()
.SelectAllClasses().InheritedFrom(typeof(IMessageHandler<>))
.BindAllInterfaces());
NinjectKernel = kernel;
}
public override void OnStop()
{
IsStopped = true;
while (_returnedFromRunMethod == false)
{
System.Threading.Thread.Sleep(1000);
}
Client.Close();
base.OnStop();
}
public void ProcessMessage<T>(T message) where T : class
{
Type handlerType = typeof(IMessageHandler<>);
Type[] typeArgs = { message.GetType() };
Type constructed = handlerType.MakeGenericType(typeArgs);
var handler = NinjectKernel.Get(constructed);
var methodInfo = constructed.GetMethod("Handle");
methodInfo.Invoke(handler, new[] { message });
}
}
As you can see, this code does quite a lot, but all of it addresses the various best practices mentioned earlier. I'd recommend downloading the full source code from GitHub if you're interested in applying this pattern in your chosen solution.
Note: I've used Visual Studio 2012 to develop the code for this article, and added references to Azure Service bus in my worker project in order to use the Azure message queuing functionality. Since I used Ninject, I also had to add the relevant NuGet packages for it, as shown below:
Install-Package WindowsAzure.ServiceBus
Install-Package ninject.extensions.conventions -Version 3.0.0.11
Where is the source code?
All source code for this article is available within the repository for my YouConf website on GitHub at https://github.com/phillee007/youconf. The relevant parts to look at are:
- The YouConfWorker project, which contains the code for the Azure worker role that receives and processes messages
- The YouConf.Common project, which contains the types of messages that will be serialized within the body of the brokered messages
- The
BaseController
class within the YouConf web project, which is responsible for sending messages using the manner described earlier
Additional points of interest
Reflection and other IOC containers
As mentioned in earlier comments - whilst this sample uses Ninject to automatically locate the correct message handler for a given message type, you could just as easily use plain old reflection to accomplish the tame task, or your own IOC container of choice. I would recommend using an IOC container personally, as this is exactly the sort of situation that they can help with, and take away a lot of the plumbing code you would otherwise have to write.
Multi-threading
If the tasks being performed by your worker role are heavily network IO-bound (for example, relying on making long-running external web service calls or similar), or you're running on a multi-core instance (medium/large role) you might want to look at making your worker role multi-threaded. This could be achieved fairly easily using the the Task Parallel library and wrapping the call to ProcessMessage in a call to Task.Run (.Net 4.5) or TaskFactory.StartNew (.Net 4.0). Just make sure to catch and process any AggregateExceptions in addition to the other exceptions that could be generated.
Note that whether you want or need to use multi-threading will depend largely on your particular circumstances and threading knowledge, hence it's safer not to attempt it till you're confident it will help.
Caveat - Service Level Agreements (SLAs)
If your worker role has to conform to a specific SLA for a given type of message type (e.g. x number of messages processed per second) it might be better to not use this approach, but rather, use a separate worker role for each difference message type. This would allow you to more accurately measure your throughput for specific message types, and could be relevant where, for example, you have a different message type per user/client of your system, and you've guaranteed them that you can achieve a given level of throughput for them.
Your Comments
If you have any comments or feedback on how I could improve this article, please let me know, as this is still a work in progress as my knowledge of Azure improves.
Finally, if you found this article useful, be sure to check out my main CodeProject article - http://www.codeproject.com/Articles/584534/YouConf-Your-Live-Online-Conferencing-Tool, which contains a multitude of tips like this to help you build a rock-solid Azure application. This was just one of many discoveries I made during the Azure Developer Challenge.
Thanks for reading and I hope you've learned something useful!
References
History
- 6/6/2013 - Initial article.
- 14/06/2013 - minor grammatical corrections