In this article we will be looking at one (and there are many ways) that you can communicate between role instances within a single Azure Cloud Service. This article will make use of various things, such as WCF / Azure ServiceBus / Azure Cloud Service / Azure roles (I am using worker roles for ease of use, but this could be a mixture of different role types, for example WebRole/WorkerRole etc etc) and lastly but by no means least Reactive Extensions (RX).
You can grab all the code from my GitHub account:
https://github.com/sachabarber/AzureInterRole
Before we start lets firstly ponder the question of what exactly a Azure Cloud Service is. An "Azure Cloud Service" is an example of Platform-As-A-Service (PAAS) and can be thought of as a collection of VMs that are hosted in the Microsoft cloud. These VMs can have software installed on them, and may also be remoted into.
I think in this case it is best to get the information straight from the horses mouth, so lets do that:
More control also means less ease of use; unless you need the additional control options, it's typically quicker and easier to get a web application up and running in Websites compared to Cloud Services.
The technology provides two slightly different VM options: instances of web roles run a variant of Windows Server with IIS, while instances of worker roles run the same Windows Server variant without IIS. A Cloud Services application relies on some combination of these two options.
For example, a simple application might use just a web role, while a more complex application might use a web role to handle incoming requests from users, then pass the work those requests create to a worker role for processing. (This communication could use Service Bus or Azure Queues.)
As the figure suggests, all of the VMs in a single application run in the same cloud service. Because of this, users access the application through a single public IP address, with requests automatically load balanced across the application's VMs. The platform will deploy the VMs in a Cloud Services application in a way that avoids a single point of hardware failure.
Even though applications run in virtual machines, it's important to understand that Cloud Services provides PaaS, not IaaS. Here's one way to think about it: With IaaS, such as Azure Virtual Machines, you first create and configure the environment your application will run in, then deploy your application into this environment. You're responsible for managing much of this world, doing things such as deploying new patched versions of the operating system in each VM. In PaaS, by contrast, it's as if the environment already exists. All you have to do is deploy your application. Management of the platform it runs on, including deploying new versions of the operating system, is handled for you.
With Cloud Services, you don't create virtual machines. Instead, you provide a configuration file that tells Azure how many of each you'd like, such as three web role instances and two worker role instances, and the platform creates them for you. You still choose what size those VMs should be -- the options are the same as with Azure VMs -- but you don't explicitly create them yourself. If your application needs to handle a greater load, you can ask for more VMs, and Azure will create those instances. If the load decreases, you can shut those instances down and stop paying for them.
A Cloud Services application is typically made available to users via a two-step process. A developer first uploads the application to the platform's staging area. When the developer is ready to make the application live, she uses the Azure Management Portal to request that it be put into production. This switch between staging and production can be done with no downtime, which lets a running application be upgraded to a new version without disturbing its users.
Cloud Services also provides monitoring. Like Azure Virtual Machines, it will detect a failed physical server and restart the VMs that were running on that server on a new machine. But Cloud Services also detects failed VMs and applications, not just hardware failures. Unlike Virtual Machines, it has an agent inside each web and worker role, and so it's able to start new VMs and application instances when failures occur.
The PaaS nature of Cloud Services has other implications, too. One of the most important is that applications built on this technology should be written to run correctly when any web or worker role instance fails. To achieve this, a Cloud Services application shouldn't maintain state in the file system of its own VMs. Unlike VMs created with Azure Virtual Machines, writes made to Cloud Services VMs aren't persistent; there's nothing like a Virtual Machines data disk. Instead, a Cloud Services application should explicitly write all state to SQL Database, blobs, tables, or some other external storage. Building applications this way makes them easier to scale and more resistant to failure, both important goals of Cloud Services.
Source :
http://azure.microsoft.com/en-gb/documentation/articles/fundamentals-application-models/ up on 19/03/2015
When we talk about a Cloud Service role, we really mean a VM. There are currently only a few different types of roles, the most common of which are:
- WebRole: Windows Server with IIS
- WorkerRole: Windows Server variant without IIS
An Azure Cloud Service can be made up of a mixture of these, up to a limit of 25 (currently). You can check out the limitations for Azure Cloud Services using the following link:
http://azure.microsoft.com/en-gb/documentation/articles/azure-subscription-service-limits/#cloud-service-limits
This section will discuss various techniques you could use, and why I don't think any of them are as useful as what the demo code in this article shows.
One solution would be to use a WCF endpoint. Lets talk about endpoints a bit
For each Azure role is capable of exposes internal/external endpoints, which may be either Http/Tcp. You may construct complex rules about what is allowed on this endpoints.
You may configure the end point using then properties of the role within the Azure Cloud Service. The following screen shots show you where you can do this.
Which launches the properties for the role. Once the properties are shown it is just a question of telling the role what endpoints you wish to expose. The following screen shot demonstrates that.
CLICK FOR BIGGER IMAGE
There is also an extremely good article on MSDN about role endpoints, which I urge you all to read:
https://azure.microsoft.com/en-gb/documentation/articles/cloud-services-enable-communication-role-instances/
Once you have some InternalEnpoint(s) / ExternalEndpoint(s) setup, it is pretty easy to use one of those ports to communicate with a WCF Service that you could host (using a good old fashioned ServiceHost
) in a particular role. The problem with this approach is that (in my opinion) it does not scale that well. As whenever you need to communicate between a pair of roles, you will need one end to have a hosted WCF service, and the other end (the client) to use a WCF proxy to talk to the WCF hosted service (the other end of the communication channel if you like), and lets say you need to talk between different roles a lot, this soon becomes a nightmare to maintain.
Another way might be to bypass endpoints altogether, and go straight for some cloud based messaging system like Azure queus or Service Bus Messaging. This is certainly a nice idea, now the problem with this approach is that you have very generic messages, which are either
- Azure Queues :
CloudQueueMessage
which hold the serialized message data internally - Azure ServiceBus :
BrokeredMessage
which holds the serialized message data internally
So how do you know which messages are for which role. You can of course Peek, which is what you would have to do. Ok so you use Peek (which both Azure Queues and Azure ServiceBus support) to determine if a new message is available on a queue your role is monitoring, but how do you know if this message is one of interest to the role?
Well Azure ServiceBus BrokeredMessage(s) certainly support the adding of custom metadata via the use of the Properties propety, which allows you to do things like
BrokeredMessage message = new BrokenMessage(....)
message.Properties.Add("Source","Columbia");
message.Properties.Add("Weight","200g");
But as far as I know (and I could well be wrong) Azure Queues do not allow the addition of metadata to a CloudQueueMessage
.
The other big win that the ServiceBus has over Azure queues (at least in my opinion) is that it supports topic based subscriptions, which makes it a clear winner in this comparison. You can read more about topic based Azure ServiceBus messaging here:
http://azure.microsoft.com/en-gb/documentation/articles/service-bus-dotnet-how-to-use-topics-subscriptions/
There is no doubt that the use of the ServiceBus would allow good interrole communications, and would also work when communicating between roles within different Azure Cloud Service(s). So that is certainly a good/workable approach. The thing with this approach I did not like is that it just seemed like a lot of plumbing to put in place, so I sought an alternative approach, which is what this article is all about really. Something more light weight shall we say.
What I forsaw was that I might make use of the Azure ServiceBus and also use RX to provide some inter role messaging.
This only works within one Azure Cloud Service instance, if you want inter Cloud Service you WILL HAVE to use ServiceBus topics.
This approach will make use of the Azure ServiceBus but will make use of the Azure ServiceBus Relay functionality that it provides, such that it may be used as a Binding for a regular WCF service. Turns out I am not the 1st person to think of this, and there are at least 2 other people that have done this before I thought of it:
What both of these articles had in common was they both use RX. The thing that I did not like about either of them was that they made a generic IObserver<T>
implementing class which was passed in to the Observable
for each role instance. The result of that was that the logic for each role and the way it handles messages would be be generic (the same). I did not like this, as the way I thought it should work was that the centralised messaging layer (WCF using Azure ServiceBus Relay functionality) would simply expose an IObservable<T>
that the roles themselves could listen too.
That way the role itself could filter/react and possibly ignore certain messages based on its own logic/criteria.
So I set about refactoring the code I found in these 2 articles to make it work how I wanted it to work, the result of which will follow in the subsequent sections.
It all starts with a message that you wish to send, which is this simply class, which is a simple DataContract serializable class, which we will be using with a WCF service
using System.Runtime.Serialization;
namespace InterRoleBroadcast
{
[DataContract(Namespace = BroadcastNamespaces.DataContract)]
public class BroadcastEvent
{
public BroadcastEvent(string senderInstanceId, string message)
{
this.SenderInstanceId = senderInstanceId;
this.Message = message;
}
[DataMember]
public string SenderInstanceId { get; private set; }
[DataMember]
public string Message { get; private set; }
}
}
We then have this incredible simple WCF service, which allows publishers to publish messages (as just shown), and also exposes an IObservable<BroadcastEvent>
such that subscribers may use RX to listen to notifications.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.ServiceModel;
namespace InterRoleBroadcast
{
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single,
ConcurrencyMode = ConcurrencyMode.Multiple)]
public class BroadcastService : IBroadcastServiceContract
{
private object syncLock = new object();
private Subject<BroadcastEvent> eventStream =
new Subject<BroadcastEvent>();
public IObservable<BroadcastEvent> ObtainStream()
{
return eventStream.AsObservable();
}
public void Publish(BroadcastEvent e)
{
lock (syncLock)
{
try
{
eventStream.OnNext(e);
}
catch (Exception exception)
{
eventStream.OnError(exception);
}
}
}
}
}
As there is a WCF service there is a client proxy that we need to call it which is as follows:
using System;
using System.ServiceModel;
using System.ServiceModel.Channels;
using System.ServiceModel.Description;
using Microsoft.ServiceBus;
namespace InterRoleBroadcast
{
public class ServiceBusClient<T> where T :
class, IClientChannel, IDisposable
{
private ChannelFactory<T> _channelFactory;
private T _channel;
private bool _disposed = false;
public ServiceBusClient()
{
CreateChannel();
}
public void CreateChannel()
{
Uri address = ServiceBusEnvironment.CreateServiceUri("sb",
EndpointInformation.ServiceNamespace, EndpointInformation.ServicePath);
NetTcpRelayBinding binding = new NetTcpRelayBinding(EndToEndSecurityMode.None,
RelayClientAuthenticationType.None);
TransportClientEndpointBehavior credentialsBehaviour =
new TransportClientEndpointBehavior();
credentialsBehaviour.TokenProvider =
TokenProvider.CreateSharedAccessSignatureTokenProvider(
EndpointInformation.KeyName, EndpointInformation.Key);
ServiceEndpoint endpoint = new ServiceEndpoint(
ContractDescription.GetContract(typeof(T)),
binding, new EndpointAddress(address));
endpoint.Behaviors.Add(credentialsBehaviour);
_channelFactory = new ChannelFactory<T>(endpoint);
_channel = _channelFactory.CreateChannel();
_channel.Faulted += Channel_Faulted;
}
void Channel_Faulted(object sender, EventArgs e)
{
ICommunicationObject theChannel = (ICommunicationObject) sender;
theChannel.Faulted -= Channel_Faulted;
KillChannel(theChannel);
KillChannelFactory(_channelFactory);
CreateChannel();
}
public T Client
{
get
{
if (_channel.State == CommunicationState.Opening)
{
return null;
}
if (_channel.State != CommunicationState.Opened)
{
_channel.Open();
}
return _channel;
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
private void KillChannel(ICommunicationObject theChannel)
{
if (theChannel.State == CommunicationState.Opened)
{
theChannel.Close();
}
else
{
theChannel.Abort();
}
}
private void KillChannelFactory<T>(ChannelFactory<T> theChannelFactory)
{
if (theChannelFactory.State == CommunicationState.Opened)
{
theChannelFactory.Close();
}
else
{
theChannelFactory.Abort();
}
}
public void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
try
{
KillChannel(_channel);
}
catch
{
}
try
{
KillChannelFactory(_channelFactory);
}
catch
{
}
_disposed = true;
}
}
}
~ServiceBusClient()
{
Dispose(false);
}
}
}
There is also the hosting of the WCF service to consider, which is as follows:
using System;
using System.ServiceModel;
using System.ServiceModel.Description;
using Microsoft.ServiceBus;
namespace InterRoleBroadcast
{
public class ServiceBusHost<T> where T : class
{
private ServiceHost _serviceHost;
private bool _disposed = false;
public ServiceBusHost()
{
CreateHost();
}
private void CreateHost()
{
Uri address = ServiceBusEnvironment.CreateServiceUri("sb",
EndpointInformation.ServiceNamespace, EndpointInformation.ServicePath);
NetTcpRelayBinding binding = new NetTcpRelayBinding(
EndToEndSecurityMode.None, RelayClientAuthenticationType.None);
TransportClientEndpointBehavior credentialsBehaviour =
new TransportClientEndpointBehavior();
credentialsBehaviour.TokenProvider =
TokenProvider.CreateSharedAccessSignatureTokenProvider(
EndpointInformation.KeyName, EndpointInformation.Key);
ServiceEndpoint endpoint = new ServiceEndpoint(
ContractDescription.GetContract(typeof(T)), binding,
new EndpointAddress(address));
endpoint.Behaviors.Add(credentialsBehaviour);
_serviceHost = new ServiceHost(Activator.CreateInstance(typeof(T)));
_serviceHost.Faulted += ServiceHost_Faulted;
_serviceHost.Description.Endpoints.Add(endpoint);
_serviceHost.Open();
}
void ServiceHost_Faulted(object sender, EventArgs e)
{
ServiceHost host = (ServiceHost)sender;
host.Faulted -= ServiceHost_Faulted;
KillHost(host);
CreateHost();
}
public T ServiceInstance
{
get
{
return _serviceHost.SingletonInstance as T;
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
private void KillHost(ServiceHost theHost)
{
if (theHost.State == CommunicationState.Opened)
{
theHost.Close();
}
else
{
theHost.Abort();
}
}
public void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
try
{
KillHost(_serviceHost);
}
catch
{
}
finally
{
_disposed = true;
}
}
}
}
~ServiceBusHost()
{
Dispose(false);
}
}
}
What both the client proxy and the service host have in common is that they both make use of the Azure ServiceBus Relay functionality, which essentially allows the Azure Service bus to be used with WCF.
To make the communications easier to deal with there is also this simple helper class, which really just exposes the publisher side (client channel) and the service itself (source of subscription data):
using System;
namespace InterRoleBroadcast
{
public class BroadcastCommunicator : IDisposable
{
private ServiceBusClient<IBroadcastServiceChannel> _publisher;
private ServiceBusHost<BroadcastService> _subscriber;
private bool _disposed = false;
public void Publish(BroadcastEvent e)
{
if (this.Publisher.Client != null)
{
this.Publisher.Client.Publish(e);
}
}
public IObservable<BroadcastEvent> BroadcastEventsStream
{
get { return this.Subscriber.ServiceInstance.ObtainStream(); }
}
private ServiceBusClient<IBroadcastServiceChannel> Publisher
{
get
{
if (_publisher == null)
{
_publisher = new ServiceBusClient<IBroadcastServiceChannel>();
}
return _publisher;
}
}
private ServiceBusHost<BroadcastService> Subscriber
{
get
{
if (_subscriber == null)
{
_subscriber = new ServiceBusHost<BroadcastService>();
}
return _subscriber;
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
public void Dispose(bool disposing)
{
if (!_disposed && disposing)
{
try
{
_subscriber.Dispose();
_subscriber = null;
}
catch
{
}
try
{
_publisher.Dispose();
_publisher = null;
}
catch
{
}
_disposed = true;
}
}
~BroadcastCommunicator()
{
Dispose(false);
}
}
}
With those peices in place all that needs to be done is to make use of it. This is as simple as this WorkerRole code:
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using InterRoleBroadcast;
using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.Diagnostics;
using Microsoft.WindowsAzure.ServiceRuntime;
using Microsoft.WindowsAzure.Storage;
using System.Reactive.Linq;
namespace WorkerRole1
{
public class WorkerRole : RoleEntryPoint
{
private volatile BroadcastCommunicator _broadcastCommunicator;
private volatile IDisposable _broadcastSubscription;
private volatile bool _keepLooping = true;
public override bool OnStart()
{
_broadcastCommunicator = new BroadcastCommunicator();
_broadcastSubscription = _broadcastCommunicator.BroadcastEventsStream
.Where(x => x.SenderInstanceId != RoleEnvironment.CurrentRoleInstance.Id)
.Subscribe(
theEvent =>
{
Logger.AddLogEntry(
String.Format("{0} got message from {1} {2}",
RoleEnvironment.CurrentRoleInstance.Id,
theEvent.SenderInstanceId,
theEvent.Message));
},
ex =>
{
Logger.AddLogEntry(ex);
});
return base.OnStart();
}
public override void Run()
{
while (_keepLooping)
{
int secs = 2;
Thread.Sleep(secs * 1000);
try
{
BroadcastEvent broadcastEvent =
new BroadcastEvent(RoleEnvironment.CurrentRoleInstance.Id,
"Hello world from WorkerRole1");
_broadcastCommunicator.Publish(broadcastEvent);
}
catch (Exception ex)
{
Logger.AddLogEntry(ex);
}
}
}
public override void OnStop()
{
_keepLooping = false;
if (_broadcastCommunicator != null)
{
_broadcastCommunicator.Dispose();
}
if (_broadcastSubscription != null)
{
_broadcastSubscription.Dispose();
}
base.OnStop();
}
}
}
Just ensure you update the EndpointInformation
class with your own ServiceBus key.
As I have stated already, the approach I describe in this article WILL NOT be suitable for communicating between separate roles within seperate cloud services. For that you would need to use either
Anyway that is all I wanted to say this time, I hope you have learnt something from this discussion, as always comments / votes are welcome