Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

Fault Resilience Support in Publish/Subscribe Paradigm using WCF and Socket Programming

4.89/5 (21 votes)
13 Sep 2008CPOL5 min read 1   724  
Implementation of Fault Resilience support in publish/subscribe paradigm

Introduction

This is Part 2 in a series of articles on writing the WCF server of publish/subscribe paradigm using WCF and socket programming. This series shows fault resilience implementation of pub/sub server using WCF and socket programming.

If you have not yet read Part 1, please take a few minutes and read it. This series builds on previous articles and does not repeat information.

Background

Think you have built a server which notifies the subscribed clients when an event is sent out by one of its publishers. The thousands of publishers and subscribers are connected to the Server. Think of what will happen if the Server restarts or any Network problem occurs between the server and subscriber or publisher.

Fault can occur:

  • Server restart / shutdown / power fault
  • Any Network problem: wire broke, intermediary router problem, etc.

faultresilence_small.png

Figure: Various Network Faults

How We Can Tackle Faults

There are three entities in such systems:

  • Subscriber
  • Publisher
  • Server

For publisher, we can maintain hard state fault resilience. That means that when the publisher tries to send an event if it fails, then we can guess there is a fault. Fault can be for Server problem or Network problem. Here publisher is an active entity so we can determine a fault easily when there is a failed case to send an event.

On the other end, subscriber is a passive entity. It only receives events. If no event is received, it assumes no event is multicasted by the Server. So there is no way to determine any fault by the subscribers. So for subscriber, you have to use soft state fault tolerance.

Hard State Fault Tolerance: When a publisher notices that a fault has occurred, then hard state fault tolerance will probe whether both the Network and Server is ok periodically. When it determines that both are ok, it will reestablish the connection. This scheme is used by publisher.

image002.gif

Figure: Fault Resilience in Publisher

Soft State Fault Tolerance: After making a successful connection between the server and subscriber, a component will periodically probe whether the Server and Network is ok or not. If at anytime it determines a fault, it will reestablish the connection. This scheme is used by subscriber.

image001.gif

Figure: Fault Resilience in Subscriber

The Demo App

This article is accompanied by a demo application, available for download at the top of this page. Download the attached file, extract it into your local directory, and open the TestWcfApp.sln solution file with Visual Studio 2008. Build and Run.

For Vista users, don't forget to do (That is because http.sys allows the root namespace to administrators only):

  • Start a command prompt using "Run as administrator"
  • Then give the following four commands:
    • netsh http add urlacl url=http://+:80/myService user=DOMAIN\user
    • netsh http add urlacl url=http://+:8003/MyService/ user=DOMAIN\user
    • netsh http add urlacl url=http://+:4000/ user=DOMAIN\user
    • netsh http add urlacl url=http://+:3000/ user=DOMAIN\user
  • Don't forget to change DOMAIN\user

How to Test Fault Resilience

At first, run the application as specified in part 1 of this series of articles. Then restart the pub/sub server and try to publish the event and observe how it works.

Another way, unplug the network wire or stop the intermediate router for a while, then make these ok and try to publish the event and observe how it works.

Description of Implementation

It is important to know after fault whenever the Server and Network become ok or not. We can say everything (Network, Server) is ok if the server is reachable. So here testing reach ability is done by asynchronous socket programming with timeout for better performance. The following code meets the purpose:

C#
//This method checks whether server is reachable or not.
//If the server is reachable, It indicates that the network and server is ok
//and there is no fault and we can reestablish the communication.
public static bool isServerUp(string endpoint)
{
string[] splitstrings = endpoint.Split(new char[] { ':' });
if (splitstrings[0] != null)
{
if (splitstrings[0] == "net.pipe") return true;
}
if (splitstrings[1] != null && splitstrings[2] != null)
{
IPAddress ip = IPAddress.Parse(splitstrings[1].Remove(0, 2));
int port = Convert.ToInt32(splitstrings[2].Split("/".ToCharArray())[0]);
IPEndPoint remoteEndPoint = new IPEndPoint(ip, port);
bool fg = Connect(remoteEndPoint, 4000);
return fg;
}
return false;
}
private static ManualResetEvent waitob = new ManualResetEvent(false);
private static Exception asynexception;
//This method performs a connectivity testing asynchronously for a socket(IP,Port)
//with specified timeout.If it can connect with destination address with specified
//time limit, it returns true otherwise false.
public static bool Connect(IPEndPoint remoteEndPoint, int timeoutMSec)
{
string host = Convert.ToString(remoteEndPoint.Address);
int portNo = remoteEndPoint.Port;
waitob.Reset();
asynexception = null;
TcpClient tc = new TcpClient();
IAsyncResult iar = tc.BeginConnect(host, portNo, new AsyncCallback(ConnectCallBack), tc);
if (waitob.WaitOne(timeoutMSec, false))
{
if (asynexception == null)
{
return true;
}
else
{
return false;
}
}
else
{
tc.Close();
return false;
}
}
//This is the callback method which will be called when beginconnect
//operation is completed.
private static void ConnectCallBack(IAsyncResult ar)
{
try
{
TcpClient client = ar.AsyncState as TcpClient;
if (client.Client != null)
{
client.EndConnect(ar);
}
else
{
}
if (client.Connected == false)
{
asynexception = new Exception();
}
}
catch (Exception ex)
{
asynexception = ex;
}
finally
{
waitob.Set();
}
}

When this method “IsServerUp” informs that Server is reachable, we can reestablish the communication with WCF PUB/SUB Server.

For Subscriber:

As we know subscriber is a passive entity, so after establishing connection with WCF Server a thread will periodically (5 seconds) check whether Server and Network are ok or not. Whenever it determines that either the network or server is not ok, then it will delegate responsibility to the class “SubscribeReconnectionHelper”:

C#
/// <summary>
/// This method is called when connect button is clicked from UI.
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
void OnSubscribe(object sender, EventArgs e)
{
try
{
CommunicationObject.Subscribe();
((Button)sender).Visible = false;
button2.Visible = true;
//The following method is called when a successful connection is
//established between the subscriber and server.
//The following method call is performed for soft state fault resilience, which
//will periodically probe whether both network and server is ok or not.
//Whenever it determines there is a fault it will delegate the
//responsibility to SubscribeReconnectionHelper class to reestablish the connection.
ClientActiveHelper.Keepactiveclients
	(CommunicationObject, this, ReceiverProtocolType.TCP );
}
catch
{
}
}

//This is the class that helps to keep the communication live
//between server and subscriber.Another important thing
//is that this class is protocol agnostic.
public class ClientActiveHelper
{
//This method checks whether the connecting is live or not.
//One noticeable thing is that here socket programming is not used
//because when a connection is active it takes negligible time
// to communicate thru WCF and
//it also checks other channel issues of WCF.
//To perform live test asynchronously , thread pool
// is used here.
public static void Keepactiveclients(ICommunication CommunicationObject,
	IEvent subscriber, ReceiverProtocolType receiverProtocolType)
{
ReceiverThreadParam tp = new ReceiverThreadParam();
tp.CommunicationObject = CommunicationObject;
tp.ReceiverProtocolType = receiverProtocolType;
tp.Subscriber = subscriber;
WaitCallback eventthread = delegate(object data)
{
livetest(data as ReceiverThreadParam);
};
Action<ReceiverThreadParam> eventthreadQueueUP =
		delegate(ReceiverThreadParam callbackData)
{
ThreadPool.QueueUserWorkItem(eventthread, callbackData);
};
eventthreadQueueUP(tp);
}
//This method performs the livetest whether
//connection is live or not periodically.
//Here 5 seconds is used as interval.
private static void livetest(object threadparam)
{
ICommunication br = ((ReceiverThreadParam)threadparam).CommunicationObject;
IEvent ievent = ((ReceiverThreadParam)threadparam).Subscriber;
ReceiverProtocolType protocoltype =
	((ReceiverThreadParam)threadparam).ReceiverProtocolType;
//Infinite Loop
while (true)
{
if (br == null) break;
bool flag = true;
try
{
br.Subscribe();
}
catch
{
flag = false;
}
if (!flag)
{
//Whenever this method detects a fault it calls
//the method "SubscribeReconnectionHelper.reconnect"
//to reestablish the communication.
SubscribeReconnectionHelper.reconnect(br, ievent, protocoltype);
break;
}
//For periodically
Thread.Sleep(5000);
}
}
}

The “SubscribeReconnectionHelper” class will start connectivity testing asynchronously. Whenever connectivity is succeeds, it will reestablish connection between subscriber and WCF PUB/SUB Server.

C#
//This class helps to reestablish the communication between the sever
//and subscriber when there is a fault and it is protocol agonostic as
//well.
public class SubscribeReconnectionHelper
{
//This method initiates the operation of communication reestablishment
//asynchronously between the server and subscriber.Threadpool is used
//to make this asynchronously.
public static void reconnect(Contacts.ICommunication br,
		IEvent ievent, ReceiverProtocolType protocoltype)
{
ReceiverThreadParam threadparam = new ReceiverThreadParam();
threadparam.CommunicationObject = br;
threadparam.Subscriber = ievent;
threadparam.ReceiverProtocolType = protocoltype;
WaitCallback eventthread = delegate(object data)
{
trytoconnect(data as ReceiverThreadParam);
};
Action<ReceiverThreadParam> eventthreadQueueUP =
		delegate(ReceiverThreadParam callbackData)
{
ThreadPool.QueueUserWorkItem(eventthread, callbackData);
};
eventthreadQueueUP(threadparam);
}
//This method periodically(10 seconds) calls the method "isServerUp"
//to test whether both the network and sever become ok.If it determines
// that both are ok then it calls the method "reestablishconnectionwithServer"
// to reestablish the communication.
private static void trytoconnect(object threadparam)
{
ICommunication communicationObject =
	((ReceiverThreadParam)threadparam).CommunicationObject; ;
IEvent subscriber = ((ReceiverThreadParam)threadparam).Subscriber;
ReceiverProtocolType receiverProtocolType =
	((ReceiverThreadParam)threadparam).ReceiverProtocolType;
string Endpoint = ConfigurationManager.AppSettings["EndpointAddress"];
while (true)
{
if (isServerUp(Endpoint))
{
bool returnresult = reestablishconnectionwithServer
	(communicationObject, subscriber, receiverProtocolType);
if (returnresult)
return;
}
Thread.Sleep(10000);
}
}
//This method reestablishes the communication between
//the server and subscriber.
private static bool reestablishconnectionwithServer
	(ICommunication communicationObject, IEvent subscriber,
	ReceiverProtocolType receiverProtocolType)
{
string Endpoint = ConfigurationManager.AppSettings["EndpointAddress"];
try
{
communicationObject.MakeClient(Endpoint, subscriber);
communicationObject.Subscribe();
ClientActiveHelper.Keepactiveclients(communicationObject,
			subscriber, ReceiverProtocolType.TCP);
}
catch
{
return false;
}
return true;
}
}

For publisher:

As we know, publisher is an active entity, so it can detect the fault. The method “OnFierevent” is executed once for each event publishing. So when an event publishing fails, it indicates that there is a fault. Then we call the method “reconnect” of class “PublisherReconnectionHelper”. It starts connectivity testing asynchronously and whenever connectivity testing is succeeds, it will reestablish the connection between the Publisher and PUB/SUB Server. The following code meets the purpose:

Socket programing.. determine a fault:

C#
// This method is called when an event is fired.
void OnFireEvent(object sender, EventArgs e)
{
AlertData alertData = PrepareAlertForAstaReceiver();
bool flag = true;
if (proxy.State != System.ServiceModel.CommunicationState.Faulted &&
	proxy.State != System.ServiceModel.CommunicationState.Closed)
{
try
{
proxy.OnEvent(alertData);
flag = false;
eventCounter += 1;
}
catch
{
}
if (flag)
{
lock (this)
{
if (proxy.State == System.ServiceModel.CommunicationState.Faulted)
{
//The following method is called when there is an exception to
// send out the event.
//The following method call is performed for hard state fault resilience, which
//will periodically probe whether both network and server are ok or not.
//Whenever it determines both are ok it will
//reestablish the connection.
PublisherReconnectionHelper.reconnect(proxy.Endpoint, this);
}
}
}
}

txtEventCount.Text = eventCounter.ToString();
}
//This class help to reestablish the communication between the sever
//and publisher when there is a fault and it is protocol agnostic as
//well.
public class PublisherReconnectionHelper
{
//This method initiates the operation of communication reestablishment
//asynchronously between the server and publisher.Threadpool is used
//to make this asynchronously.
public static void reconnect(ServiceEndpoint serviceEndpoint, IProxy ui)
{
ThreadParam threadparam = new ThreadParam();
threadparam.Ui = ui;
threadparam.ServiceEndpoint = serviceEndpoint;
WaitCallback eventthread = delegate(object data)
{
trytoconnect(data as ThreadParam);
};
Action<ThreadParam> eventthreadQueueUP = delegate(ThreadParam callbackData)
{
ThreadPool.QueueUserWorkItem(eventthread, callbackData);
};
eventthreadQueueUP(threadparam);
}
//This method periodically(10 seconds) calls the method "isServerUp"
//to test whether both the network and sever become ok.If it determines
// that both are ok then it calls the method "reestablishconnectionwithServer"
// to reestablish the communication.
private static void trytoconnect(object threadparam)
{
ServiceEndpoint endpointAdress = ((ThreadParam)threadparam).ServiceEndpoint;
IProxy ui = ((ThreadParam)threadparam).Ui;
while (true)
{
if (isServerUp(endpointAdress.Address.Uri.ToString()))
{
bool returnresult = reestablishconnectionwithServer(ui,
endpointAdress.Address.Uri.ToString());
if (returnresult)
return;
}
Thread.Sleep(10000);
}
}
//This method reestablishes the communication between
//the server and publisher.
private static bool reestablishconnectionwithServer(IProxy ui, string endpoint)
{
string[] splitstrings = endpoint.Split(new char[] { ':' });
if (splitstrings[0] != null)
{
if (splitstrings[0] == "net.pipe")
{
try
{
EndpointAddress endpointAddress = new EndpointAddress(endpoint);
NetNamedPipeBinding namedPipeBindingpublish = new NetNamedPipeBinding();
ui.Proxy = new PublisherProxy(namedPipeBindingpublish, endpointAddress);
}
catch
{
return false;
}
}
else if (splitstrings[0] == "net.tcp")
{
try
{
EndpointAddress endpointAddress = new EndpointAddress(endpoint);
NetTcpBinding NetTcpbinding = new NetTcpBinding(SecurityMode.None);
// ui.Proxy = new PublisherProxy(NetTcpbinding, endpointAddress);
ui.Proxy = new PublisherProxy();
}
catch
{
return false;
}
}
else if (splitstrings[0] == "http")
{
try
{
WSDualHttpBinding wsDualBindingpublish = new WSDualHttpBinding();
EndpointAddress endpointAddress = new EndpointAddress(endpoint);
string strHostName = Dns.GetHostName();
IPHostEntry ipEntry = Dns.GetHostByName(strHostName);
IPAddress[] addr = ipEntry.AddressList;
wsDualBindingpublish.ClientBaseAddress = new
Uri("http://" + addr[0].ToString() + ":" + "3000" + "/");
ui.Proxy = new PublisherProxy(wsDualBindingpublish, endpointAddress);
}
catch
{
return false;
}
}
}
return true;
}
}

Issues

  • The events that are sent out when there is a fault will be lost. If you want these events, then use some persistence strategy.
  • For this article, we are not considering client shut down / restart or any of its other faults.
  • Does not work under NAT
  • No authentication, encryption
  • No provision to prevent DOS (Denial of Service attack) attack

Conclusion

If you think of any cool new features or find any bugs, please let me know about it. In a future post, I will give the following implementations:

A partially centralized publish subscribe overlay network in p2p approach using WCF and then I will show how windows and certificate authentication can be implemented in a configurable approach. 

History

  • 14/09/2008 - Initial article uploaded 

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)