Introduction
The intention of this post is to provide details on how to develop a working example of an asynchronous (scalable) duplex communication WCF service.
Background
IIS has always had issues concerning scalability when it comes to open connections, compared to JBoss\Jetty\Tomcat etc. (with Java's non-blocking IO class).
XP, Vista, or Win7 will allow 10 open connections, which will quickly get allocated. To get around this limitation, it is suggested to use Windows 2008 with asynchronous calls.
To create a push technology oriented service, a duplex mechanism is needed.
But there is a catch, you need to make the call to the duplex WCF service in an asynchronous manner. Remember, asynchronous will not hug the treads and
duplex keeps the channel open for real-time updates.
How to Make Your Duplex Calls Scalable
You will need to deploy your duplex WCF service on a Windows 2008 server. The Windows 2008 server incorporates a mechanism to handle asynchronous calls
so that they do not take up a new connection (this is the key).
If you were to make a synchronous call, one of the one hundred (or so) available connections will be allocated to your request; in time, these connections
will get tied up and your requests will have to wait until a connection is freed up.
So, to get around the allocation of connections, but still have a connection to your application from the service, you must make your duplex calls asynchronous
ones (duplex keeps the channel open, asynchronous stops the server from allocating a new connection).
Screenshot of the application
The screenshot above shows three open applications and displaying the same information. This data is being pushed from a WCF service (using a timer) to all sessions
that made the duplex call to the service initially.
Project Structure
Above, you can see that the solution is split into multiple projects. There is a project to host the service, the client, and a model class. WpdClient is the start project,
with MainWindow as the start page.
Code Explanation
Client Code - MainWindow Class
private void SubscribeToStockServer()
{
try
{
log.Info("Creating services");
InstanceContext context = new InstanceContext(new CallbackController());
client = new SessionsControllerClient(context);
client.SubscribeToNotificationsAsync(SessionGuid.ToString());
log.Info("Services created");
}
catch (Exception ex)
{
log.Error("Services error: " + ex.Message);
}
}
Initially, the client makes a call to the service to let the service know that it wants to receive updates. The important thing here
is the class called CallbackController
. This class implements the callback method that gets executed in the service (you will see an Interface in
the service called IStockSessionCallbackContract
). The CallbackController
code implements the method in the IStockSessionCallbackContract
interface.
When the client subscribes to the duplex service (in an asynchronous manner), it is passing in a new GUID object - this GUID will be used to uniquely
identify the session in a static collection on the server.
public MainWindow()
{
try
{
InitializeComponent();
this.InitialiseApplication();
this.SubscribeToStockServer();
Messenger.Default.Register<StockQuotes>(this, "ServerUpdateResponse",
data => { this.UpdateFromServer(data); });
}
catch (Exception ex)
{
log.Error(ex.Message);
}
}
The line of code to notice here is the MVVMLight Messenger statement that will listen for a ServerUpdateResponse
token
key (from the CallbackController
class) and then forwards the StockQuote
object to the client processing method UpdateFromServer
(below).
private void UpdateFromServer(StockQuotes data)
{
try
{
switch (data.Exchange)
{
case StockQuotes.StockExchanges.Cac:
Dispatcher.Invoke(
System.Windows.Threading.DispatcherPriority.Normal,
new Action(
delegate()
{
(this.MyLine.Series[0] as DataPointSeries).ItemsSource =
data.UpdatedStockQuotes;
}
));
break;
case StockQuotes.StockExchanges.Dac:
Dispatcher.Invoke(
System.Windows.Threading.DispatcherPriority.Normal,
new Action(
delegate()
{
(this.MyScatter.Series[0] as DataPointSeries).ItemsSource =
data.UpdatedStockQuotes;
}
));
break;
case StockQuotes.StockExchanges.Dow:
Dispatcher.Invoke(
System.Windows.Threading.DispatcherPriority.Normal,
new Action(
delegate()
{
(this.MyColumn.Series[0] as DataPointSeries).ItemsSource =
data.UpdatedStockQuotes;
}
));
break;
case StockQuotes.StockExchanges.Ftse:
Dispatcher.Invoke(
System.Windows.Threading.DispatcherPriority.Normal,
new Action(
delegate()
{
(this.MyPie.Series[0] as DataPointSeries).ItemsSource =
data.UpdatedStockQuotes;
}
));
break;
}
this.UpdateNoticeBox(data);
}
catch (Exception ex)
{
log.Error(ex.Message);
}
}
The UpdateFromServer
method simply rebinds the new StockQuotes
List
object to the respective chart (based on the value
of the Exchange
property).
Client Code - CallBackController Class
namespace WpfClient.Model
{
class CallbackController : SessionService.ISessionsControllerCallback
{
public void SendNotificationToClients(StockQuotes message)
{
Messenger.Default.Send<StockQuotes>(message, "ServerUpdateResponse");
}
public IAsyncResult BeginSendNotificationToClients(StockQuotes message,
AsyncCallback callback, object asyncState) { return null; }
public void EndSendNotificationToClients(IAsyncResult result){ }
}
}
Here is were the project informs the compiler that we have declared the "Begin" and "End" methods associated with an asynchronous call. But, we are not
going to use them as the service will be using the duplex callback method to send information to the client - not the "End" method above. We have to declare
them for compiler reasons. Notice the method SendNotificationToClients
, this is declared in the interface IStockSessionCallbackContract
on the server, but is coded on the client. Also, this is were the MVVMLight Messenger send happens too.
Adding the Service
Notice that you will need to check "Generate asynchronous operations" so that calls will be asynchronous and not take up a connection.
StockQuote Class
namespace Stocks
{
public class StockQuotes
{
private StockExchanges exchange;
public StockExchanges Exchange
{
get { return exchange; }
set { exchange = value; }
}
List<StockValue> updatedStockQuotes;
public List<StockValue> UpdatedStockQuotes
{
get { return updatedStockQuotes; }
set { updatedStockQuotes = value; }
}
public enum StockExchanges
{
Ftse,
Dow,
Dac,
Cac
}
}
public class StockValue
{
public StockValue() { }
public StockValue(string stockName, int stockValue)
{
Symbol = stockName;
Value = stockValue;
}
public string Symbol { get; set; }
public int Value { get; set; }
}
The above class is just a plain old C# class that is used to contain stock quotes. This class is passed to the client from the server and manipulated in the client
code when binding to the charts.
WCF Service Code
Session Helper Class
static public class SessionHelper
{
static private string browserValue;
static Dictionary<string, IStockSessionCallbackContract> clients;
static SessionHelper()
{
clients = new Dictionary<string, IStockSessionCallbackContract>();
}
public static string BrowserValue
{
get { return browserValue; }
set { browserValue = value; }
}
public static void AddCallbackChannel(string clientId,
IStockSessionCallbackContract callbackChannel)
{
if (clients.Where(c => c.Key == clientId).Count() == 0)
{
object syncRoot = new Object();
lock (syncRoot)
{
if (clients.Where(c => c.Key == clientId).Count() == 0)
clients.Add(clientId, callbackChannel);
}
}
}
public static bool IsClientConnected(string clientId)
{
return clients.Where(c => c.Key == clientId).Count() > 0;
}
public static IEnumerable<IStockSessionCallbackContract>
GetCallbackChannels(string sessionId)
{
var filteredChannels = from channel in clients
where channel.Key != sessionId
select channel;
return filteredChannels.Select(c => c.Value);
}
public static IEnumerable<IStockSessionCallbackContract> GetCallbackChannels()
{
return clients.Select(c => c.Value);
}
public static IStockSessionCallbackContract GetCallbackChannel(string clientId)
{
return clients.Where(c => c.Key == clientId).Single().Value;
}
public static bool DeleteClient(string clientId)
{
if (clients.Where(c => c.Key == clientId).Count() > 0)
{
object syncRoot = new Object();
lock (syncRoot)
{
if (clients.Where(c => c.Key == clientId).Count() > 0)
return clients.Remove(clientId);
}
}
return false;
}
}
The above static class will maintain all the sessions, doing the CRUD operations on the actual sessions. You will notice that it uses
the IStockSessionCallbackContract
interface in the dictionary collection, because this interface has the declaration for the callback method (which is coded on the client).
ISessionsController Service Interface
namespace StockService.Interfaces
{
[ServiceContract(Namespace = "StockService.Services.Interfaces",
CallbackContract = typeof(IStockSessionCallbackContract))]
public interface ISessionsController
{
[OperationContract(IsOneWay = true)]
void SubscribeToNotifications(string browserName);
[OperationContract(IsOneWay = true)]
void UnsubscribeToNotifications();
}
}
The above operations allow the client to connect and disconnect from the session collection, and thus from any communication from the server.
IStockSessionCallbackContract Service Interface
namespace StockService.Interfaces
{
[ServiceContract]
public interface IStockSessionCallbackContract
{
[OperationContract(IsOneWay = true)]
void SendNotificationToClients(StockQuotes data);
}
}
The above interface signature is declared on the server but is coded on the client (due to the callback nature of duplex coding). In the interface ISessionsController
,
you will see the annotation [ServiceContract(Namespace = "StockService.Services.Interfaces", CallbackContract = typeof(IStockSessionCallbackContract))]
,
and you will see the CallbackContract
attribute which specifies the interface IStockSessionCallbackContract
. So this is how we round
the circle of communications. The service method knows to call a particular interface method which is coded and executed on the client.
SessionsController Service Methods
public void SubscribeToNotifications(string clientGuid)
{
IStockSessionCallbackContract ch =
OperationContext.Current.GetCallbackChannel<IStockSessionCallbackContract>();
lock (syncRoot)
{
SessionHelper.BrowserValue = clientGuid;
if (!SessionHelper.IsClientConnected(clientGuid))
{
SessionHelper.AddCallbackChannel(clientGuid, ch);
OperationContext.Current.Channel.Closing += new EventHandler(Channel_Closing);
OperationContext.Current.Channel.Faulted += new EventHandler(Channel_Faulted);
}
}
}
When a client makes that initial asynchronous call to the service, it will come in on the SubscribeToNotifications
method (passing a GUID). Then, its callback contract interface IStockSessionCallbackContract
is obtained and stored in a collection.
It will also have closing event handlers associated with it, to do any housekeeping procedures when finished.
private void ProcessUpdate(object sender, EventArgs e)
{
try
{
StockQuotes data = GenerateStockQuotes();
if (SessionHelper.GetCallbackChannels().Count() > 0)
{
lock (syncRoot)
{
IEnumerable<IStockSessionCallbackContract>
allChannels = SessionHelper.GetCallbackChannels();
allChannels.ToList().ForEach(c => c.SendNotificationToClients(data));
}
}
}
catch (Exception) { }
}
The above method will perform the posting of information to all the clients.
public SessionsController()
{
timer.Elapsed += new ElapsedEventHandler(this.ProcessUpdate);
timer.Interval = 2000;
timer.Enabled = true;
timer.Start();
}
A simple timer is executed in the service to mimic the generation of data (data updates, for example). Every two seconds, the ProcessUpdate
method
is called and new stock quotes are pushed to all the connected sessions.
Improvements
- Serialize the data on the client and compress it before sending to the service. Which in turn will process and send (compressed) to the open channels.