Introduction
Today, I'll talk about a solution to enable duplex communication with netMsmqBinding in WCF.
The good news is that my solution can enable duplex communication with any binding which supports IOutputChannel
/IInputChannel
.
In plain English, it means that if a binding can send and receive a message in OneWay, it can also be used for duplex communication, i.e., send messages back to the client from the server with callbacks. Yes, you want and you will be able to use duplex communication over Twitter. I will talk about my project WCF over Twitter in another article to learn more about WCF!
If you just want to use duplex MSMQ, go here to download the project: http://duplexmsmq.codeplex.com/; an example is included.
SOAP Plumbing
First, some people will say: "you could use CompositeDuplexBindingElement
", I won't re-explain why it won't work, this blog article explains the problem (Thanks to Mike Taulty).
He found a solution to make MSMQ with duplex communication to work, but it requires a reliable session. The problem with reliable sessions is that "WCF supports reliable sessions between endpoints that are active and alive at the same time". And when we use MSMQ, we want to be able to send messages to the service even if it is offline, and we want the service to be able to send messages to a client even if it isn't connected. So it's not a good solution.
But what you should find interesting is how I manage to make it work without a reliable session! And maybe, why it works only with a reliable session.
First of all, I will explain at the SOAP level how duplex communication works.
I'll use Visio with this Visio stencil "that contains 51 integration pattern icons as Visio shapes".
Quickly, in WCF, a Channel is something which can receive or send messages. There are several types of channels, but I will only talk about IInputChannel
/IOutputChannel
. These channels are the ones used by netMsmqBinding, and it means that a IOutputChannel
only supports sending a message in OneWay to another IInputChannel
. The server listens to a IInputChannel
, and the client sends a message with the IOutputChannel
. Obviously, the concrete implementations of these channels with netMsmqBinding uses MSMQ.
If we want to be able to send a message back to the client in a duplex scenario, the client needs to expose a IInputChannel
and the service needs to use a IOutputChannel
.
In WCF, IInputChannel
+ IOutputChannel
= IDuplexChannel
, and it is the responsibility of CompositeDuplexBindingElement
shipped with WCF, to transform a pair of IInputChannel
/IOutputChannel
to a IDuplexChannel
. We will come back here later.
First, the client needs to say to the server where to send a reply, like you can see in the figure below. This is done through some extra elements in the SOAP headers of every message.
And, when the server responds, it need to specify the return address.
So my goal was:
- The client must create a
IInputChannel
from netMsmqBinding to receive messages from the server. - Attach the address of this
IInputChannel
to the return address of every message sent to the server. - The server must create a
IOutputChannel
from netMsmqBinding to send messages to the client. - Attach the address of the
IInputChannel
to the return address of every message sent to the client.
You can see that my solution is not coupled with MSMQ, and can be used with every binding which supports IInputChannel
/IOutputChannel
.
Well, first, I will explain why the solution of Mike Taulty worked with a reliable session, and why it's a bad solution. The trick is that the return address is sent to the server with the first message sent from the client to the server. The server creates a session for the client and stores the return address in memory. Then, the client sends, in every SOAP header, its session ID without the return address. Obviously, if the server crashes, the session is lost, and the messages of the client are dropped, or sent to a dead queue.
So, how can I extend WCF to solve this problem? I will create my own BindingElement
, but we need to understand what a binding is.
WCF Plumbing
When I think about a binding, I see hamburgers. And in reality, there are two types of burgers: those made by your favorite fast-food-chain, and those from your mama/wife/yourself made with love and care. The ingredients of a binding are BindingElement
s.
Let's talk about fastfood first. The benefit of the built-in bindings of WCF is that they are really easy to understand and create. And well... it's fast-food, so you don't know what really is inside, but they have a name and somehow you guess there are some BindingElement
s inside.
For example, with a WSHttpBinding
, you guess that there is a HttpTransportBindingElement
inside, and you somehow guess that it sends a SOAP message over HTTP. You guess that this code snippet activates a reliable session, but you don't know exactly which ingredient or "BindingElement
" it impacts.
wsHttpBinding.ReliableSession.Enabled = true;
The good thing is that in most cases, you don't have to know what is going on inside. Microsoft did a great job to satisfy most common use cases. When you pass the binding to your service host or channel factory, it will call:
binding.CreateBindingElements()
to create all the BindingElement
s based on the properties you have set on the built-in Binding
.
But sometimes, you need to specify yourself which ingredients or "BindingElement
s" you will add to your Binding
. This is called home made binding, and there are to ways to create one:
And I'll use the latter one, because this way, it's most easy to explain how a Binding
works.
A CustomBinding
is just a binding where you directly specify all the BindingElement
s inside. For example:
binding = new CustomBinding(
new ListenUriBindingElement()
{
ListenUriBaseAddress = baseClient
},
new CompositeDuplexBindingElement(),
new ReplyToBindingElement(),
new TextMessageEncodingBindingElement(),
CreateMsmqBinding());
This creates a Binding
with a ListenUriBindingElement
, CompositeDuplexBindingElement
, ReplyToBindingElement
, TextMessageEncodingBindingElement
, and a MsmqTransportBindingElement
(its the return type of CreateMsmqBinding()
). This enumeration is called "BindingElement stack"; the last BindingElement
is the lowest in the stack, and it's always a TransportBindingElement
.
Most of the time, the BindingElement
stack is the same in the server and client side. Each BindingElement
can decide to modify, inspect, or validate a message, or they can pass parameters to BindingElement
down in the BindingElement
stack. For example, when you choose message security for your binding, internally, WCF creates binding elements which encode/decode a message with a certificate or something else. The server and the client having the same stack; the BindingElement
will encode a message sent on the client side and decode it on the server side.
Another example, when you choose to encode your message in plain text TextMessageEncodingBindingElement
, it doesn't intercept any message, but it passes itself as a parameter down in the stack.
context.BindingParameters.Add(this);
This way, the TransportBindingElement
(the lowest element in the stack) can retrieve the MessageEncodingBindingElement
to know how to serialize and deserialize a Message
to a stream of bytes. Each BindingElement
says clearly if it can build a channel type with CanBuildChannelListener<TChannel>
and CanBuildChannelFactory<TChannel>
. And, each BindingElement
has a BindingContext
during the creation of their "channel managers" to pass parameters down to the stack, or to construct "channel managers" of the BindingElement
directly below in the stack.
A channel manager is a IChannelFactory
or IChannelListenner
; BuildChannelListener
is called on the server side and BuildChannelFactory
is called on the client side.
IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)
Most of the time, intermediate bindings are not interested to create a channel manager, so they just delegate the call down to the stack, as TextMessageEncodingBindingElement
does.
return context.BuildInnerChannelFactory<TChannel>();
In fact, this code just calls the BuildChannelFactory
of the BindingElement
directly below the current BindingElement
.
But, if you want to intercept and modify messages, you need to wrap the inner channel manager inside a custom channel manager. What I call the "inner channel manager" is the channel manager created by the BindingElement
below the current one.
This figure explains how channel managers are created. It assumes that BindingElement1
and BindingElement2
want to intercept messages, so they wrap the inner channel managers. TextMessageEncodingBindingElement
doesn't need to intercept messages, so it doesn't wrap the channel manager returned by TransportBindingElement1
.
Here is an example of a binding element which wants to intercept messages; it's the Decorator pattern.
public override IChannelFactory<TChannel>
BuildChannelFactory<TChannel>(BindingContext context)
{
if(!CanBuildChannelFactory<TChannel>(context))
{
throw new ArgumentException("Impossible to build ReplyToBindingElement");
}
var innerChannelFactory = (IChannelFactory<IOutputChannel>)
context.BuildInnerChannelFactory<TChannel>();
return (IChannelFactory<TChannel>)new ReplyToChannelFactory(
context.ListenUriBaseAddress, innerChannelFactory);
}
My implementation of ReplyToChannelFactory
will intercept calls destined to the innerChannel
. Most calls are just delegated to the innerChannelFactory
. We will see the implementation later. The class ServiceHost
or ChannelFactory<ServiceType>
will use the channel manager returned by the top most binding element to send or receive messages. Please do not confuse ChannelFactory<ServiceType>
with IChannelFactory<TChannel>
; ChannelFactory<ServiceType>
is an implementation of this interface, but it is primarily used to create a client proxy, and is not returned by a BindingElement
.
ChannelFactory<ServiceType>
is just a class which serializes and deserializes the body of the SOAP message based on the method and parameter you call on a client proxy, then passes the message to a channel created by the top-most IChannelFactory
(created by your top most BindingElement
). ServiceHost
does the same thing, just replace IChannelFactory
by IChannelListener
.
Here is the creation of the channel; note that IChannelFactories
also uses the Decorator pattern to create channels.
Then, when you call a method on the proxy, a message is created and the Send
method of the top most channel is called:
Duplex MSMQ Implementation
OK, now, if you have understood what I said above, you'll think that the implementation will be a piece of cake, and it really is! There is a deep learning curve to understand how to make it work, but once understood, you'll think that WCF is easily extensible and well designed. You'll see that it is not so difficult after all. The more I learn about WCF, the more I have project ideas (cool ones and very bad ones...)
First, I created helper classes called ProxyXXXX : XXXX
, where XXXX is an interface on which I want to intercept calls. ProxyXXXX
will take an inner XXXX and forward all calls to it.
Every method, property, and event of ProxyXXXX
is virtual
, so if I want to intercept a call, I inherit ProxyXXXX
and override a class member.
For example, here, what I do inside the ProxyOutputChannel
, my channel will override the Send
method to intercept the message:
public virtual void Send(System.ServiceModel.Channels.Message message,
System.TimeSpan timeout)
{
_InnerOutputChannel.Send(message, timeout);
}
ReplyToBindingElement
is responsible to add headers in the output messages to enable duplex communication. So, it wraps every inner ChannelFactory
.
public override IChannelFactory<TChannel>
BuildChannelFactory<TChannel>(BindingContext context)
{
if(!CanBuildChannelFactory<TChannel>(context))
{
throw new ArgumentException("Impossible to build ReplyToBindingElement");
}
var innerChannel = (IChannelFactory<IOutputChannel>)
context.BuildInnerChannelFactory<TChannel>();
return (IChannelFactory<TChannel>)
new ReplyToChannelFactory(context.ListenUriBaseAddress, innerChannel);
}
public override IChannelListener<TChannel>
BuildChannelListener<TChannel>(BindingContext context)
{
return base.BuildChannelListener<TChannel>(context);
}
public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
{
return typeof(TChannel) == typeof(IOutputChannel);
}
Then ReplyToChannelFactory
wraps every inner channel:
public class ReplyToChannelFactory : ProxyChannelFactory<IOutputChannel>
{
readonly Uri _ReplyAddress;
public ReplyToChannelFactory(Uri replyAddress, IChannelFactory<IOutputChannel> inner)
: base(inner)
{
_ReplyAddress = replyAddress;
}
public override IOutputChannel CreateChannel(System.ServiceModel.EndpointAddress to)
{
return new ReplyToChannel(_ReplyAddress, base.CreateChannel(to));
}
public override IOutputChannel
CreateChannel(System.ServiceModel.EndpointAddress to, Uri via)
{
return new ReplyToChannel(_ReplyAddress, base.CreateChannel(to, via));
}
}
And, the channel adds all the necessary headers to the outgoing message:
public class ReplyToChannel : ProxyOutputChannel
{
readonly Uri _ReplyAddress;
public ReplyToChannel(Uri replyAddress, IOutputChannel inner)
: base(inner)
{
_ReplyAddress = replyAddress;
}
public override void Send(Message message)
{
ApplyReplyTo(message);
base.Send(message);
}
public override void Send(Message message, TimeSpan timeout)
{
ApplyReplyTo(message);
base.Send(message, timeout);
}
void ApplyReplyTo(Message message)
{
if(message.Headers.MessageId == null)
{
message.Headers.MessageId = new System.Xml.UniqueId();
}
if(message.Headers.From == null)
{
message.Headers.From =
new System.ServiceModel.EndpointAddress(_ReplyAddress);
}
if(message.Headers.ReplyTo == null)
{
message.Headers.ReplyTo =
new System.ServiceModel.EndpointAddress(_ReplyAddress);
}
}
}
Here is my service implementation and contract:
[ServiceContract(CallbackContract = typeof(IConversation))]
public interface IConversation
{
[OperationContract(IsOneWay = true)]
void Say(String something);
}
public class Conversation : IConversation
{
#region IConversation Members
public void Say(string something)
{
Console.WriteLine("Someone says \"{0}\" to you, what is your response ?",
something);
String response = Console.ReadLine();
OperationContext.Current.GetCallbackChannel<IConversation>().Say(response);
}
#endregion
}
But in the server side, an exception is thrown when attempting to send a message in the callback channel. I don't know why, but it seems that WCF doesn't automatically set the To header of the outgoing message to the address of the ReplyTo header of the ingoing message... I have no idea why I need to do that only on the server side, and not on the client side, and I'll be happy if someone can explain that to me... So, I needed to create a behavior with a message inspector which fixes the problem.
public class ReplyToBehavior : IEndpointBehavior
{
public class ReplyToInspector : IDispatchMessageInspector
{
#region IDispatchMessageInspector Members
public object AfterReceiveRequest(ref Message request,
IClientChannel channel, InstanceContext instanceContext)
{
var reply = request.Headers.ReplyTo;
OperationContext.Current.OutgoingMessageHeaders.To = reply.Uri;
OperationContext.Current.OutgoingMessageHeaders.RelatesTo =
request.Headers.MessageId;
return null;
}
public void BeforeSendReply(ref Message reply, object correlationState)
{
}
#endregion
}
#region IEndpointBehavior Members
public void AddBindingParameters(ServiceEndpoint endpoint,
BindingParameterCollection bindingParameters)
{
}
public void ApplyClientBehavior(ServiceEndpoint endpoint,
System.ServiceModel.Dispatcher.ClientRuntime clientRuntime)
{
}
public void ApplyDispatchBehavior(ServiceEndpoint endpoint,
System.ServiceModel.Dispatcher.EndpointDispatcher endpointDispatcher)
{
endpointDispatcher.DispatchRuntime.MessageInspectors.Add(new ReplyToInspector());
}
public void Validate(ServiceEndpoint endpoint)
{
}
#endregion
}
My implementation is done!
Two last things:
During the creation of the BindingElement
stacks, I needed two things: first, a BindingElement
for my client to specify the reply address of the messages, and the MSMQ queue to listen. That's is the responsibility of ListenUriBindingElement
.
I've taken the one in this blog article. The implementation is not complicated, it just sets the context.ListenUriBaseAddress
. BindingElement
s lower in the stack will need this value to know where to listen for input messages.
public override IChannelListener<TChannel>
BuildChannelListener<TChannel>(BindingContext context)
{
if(listenUriBaseAddress != null)
context.ListenUriBaseAddress = listenUriBaseAddress;
return base.BuildChannelListener<TChannel>(context);
}
I needed also to specify that my server can send messages, and that my client must listen to a MSMQ channel (this channel is specified with the context.ListenUriBaseAddress
set by the previous BindingElement
ListenUriBindingElement
). ServiceHost
will enable a service to send messages through a callback if the Binding
can build an IDuplexChannel
. Alas, that's not the case because one of the BindingElement
s in the stack doesn't support it: MsmqTransportBindingElement.CanBuildChannelListener<IDuplexChannel>
returns false
. So, I need a BindingElement
which creates a IDuplexChannel
on the top a channel which only accepts to create IInputChannel
and IOuputChannel
. And it's easy, you just need to use the CompositeDuplexBindingElement
shipped with WCF.
This is how the ChannelFactory<ServiceType>
creates the top-most IChannelFactory
in my BindingElement
stack; if you want to know how it works on the service side, just replace IChannelFactory
by IChannelListener
and ChannelFactory<ServiceType>
by ServiceHost
.
We say that CompositeDuplexBindingElement
change the shape of a channel. For more information, ReliableSessionBindingElement
also changes the shape of a channel; it transforms a XXXChannel
to a IXXXChannel
to a IXXXSessionChannel
to enable a sessionfull communication.
On the client side, I will use DuplexChannelFactory
instead of ChannelFactory
to specify the callback implementation. The same problem as the one with ServiceHost
is solved with the same solution.
Client and Server creation:
Here is the creation of my BindingElement
stack for the two sides:
public class BindingFactory
{
public static Binding Create(Uri baseClient)
{
CustomBinding binding = null;
binding = new CustomBinding(
new ListenUriBindingElement()
{
ListenUriBaseAddress = baseClient
},
new CompositeDuplexBindingElement(),
new ReplyToBindingElement(),
new TextMessageEncodingBindingElement(),
CreateMsmqBinding());
return binding;
}
private static MsmqTransportBindingElement CreateMsmqBinding()
{
var binding = new MsmqTransportBindingElement();
binding.MsmqTransportSecurity.MsmqAuthenticationMode =
MsmqAuthenticationMode.None;
binding.MsmqTransportSecurity.MsmqProtectionLevel =
System.Net.Security.ProtectionLevel.None;
binding.UseActiveDirectory = false;
binding.ExactlyOnce = false;
return binding;
}
}
Obviously, the server side doesn't need ListenUriBindingElement
, because ServiceHost
will set context.ListenUriBaseAddress
.
public override IChannelListener<TChannel>
BuildChannelListener<TChannel>(BindingContext context)
{
if(listenUriBaseAddress != null)
context.ListenUriBaseAddress = listenUriBaseAddress;
return base.BuildChannelListener<TChannel>(context);
}
To not override context.ListenUriBaseAddress
, I will call:
BindingFactory.Create(null)
on the server side.
Client creation:
static void Main(string[] args)
{
Console.WriteLine("Hit a key");
Console.ReadLine();
Binding bind = BindingFactory.Create(new Uri("net.msmq://SHIELDP/private/client"));
DuplexChannelFactory<IConversation> channel = new DuplexChannelFactory<IConversation>
(new Conversation(),
bind,
new EndpointAddress("net.msmq://SHIELDP/private/server"));
channel.CreateChannel().Say("Hello I'm Nico");
while(true)
{
}
}
Server creation:
class Program
{
static void Main(string[] args)
{
Binding bind = BindingFactory.Create(null);
ServiceHost host = new ServiceHost(typeof(Conversation),
new Uri("net.msmq://SHIELDP/private/server"));
var endpoint = host.AddServiceEndpoint(typeof(IConversation), bind, "");
endpoint.Behaviors.Add(new ReplyToBehavior());
host.Open();
Console.WriteLine("Open");
while(true)
{
}
}
}
Now, you can use callbacks from the client side and the server side through MSMQ.
Conclusion
I'm sure this project will be useful for people with high availability requirements. It's too bad that Microsoft didn't implement this; MSMQ is so well suited for duplex communication... I hope you've enjoyed this article. The next one will talk about WCF over Twitter, and will show how to create your own TransportBindingElement
. If you liked this article, let me know! :)