Introduction
This is Part 2 in a series of articles on writing the WCF server of publish/subscribe paradigm using WCF and socket programming. This series shows fault resilience implementation of pub/sub server using WCF and socket programming.
If you have not yet read Part 1, please take a few minutes and read it. This series builds on previous articles and does not repeat information.
Background
Think you have built a server which notifies the subscribed clients when an event is sent out by one of its publishers. The thousands of publishers and subscribers are connected to the Server. Think of what will happen if the Server restarts or any Network problem occurs between the server and subscriber or publisher.
Fault can occur:
- Server restart / shutdown / power fault
- Any Network problem: wire broke, intermediary router problem, etc.
Figure: Various Network Faults
How We Can Tackle Faults
There are three entities in such systems:
- Subscriber
- Publisher
- Server
For publisher, we can maintain hard state fault resilience. That means that when the publisher tries to send an event if it fails, then we can guess there is a fault. Fault can be for Server problem or Network problem. Here publisher is an active entity so we can determine a fault easily when there is a failed case to send an event.
On the other end, subscriber is a passive entity. It only receives events. If no event is received, it assumes no event is multicasted by the Server. So there is no way to determine any fault by the subscribers. So for subscriber, you have to use soft state fault tolerance.
Hard State Fault Tolerance: When a publisher notices that a fault has occurred, then hard state fault tolerance will probe whether both the Network and Server is ok periodically. When it determines that both are ok, it will reestablish the connection. This scheme is used by publisher.
Figure: Fault Resilience in Publisher
Soft State Fault Tolerance: After making a successful connection between the server and subscriber, a component will periodically probe whether the Server and Network is ok or not. If at anytime it determines a fault, it will reestablish the connection. This scheme is used by subscriber.
Figure: Fault Resilience in Subscriber
The Demo App
This article is accompanied by a demo application, available for download at the top of this page. Download the attached file, extract it into your local directory, and open the TestWcfApp.sln solution file with Visual Studio 2008. Build and Run.
For Vista users, don't forget to do (That is because http.sys allows the root namespace to administrators only):
- Start a command prompt using "Run as administrator"
- Then give the following four commands:
- netsh http add urlacl url=http://+:80/myService user=DOMAIN\user
- netsh http add urlacl url=http://+:8003/MyService/ user=DOMAIN\user
- netsh http add urlacl url=http://+:4000/ user=DOMAIN\user
- netsh http add urlacl url=http://+:3000/ user=DOMAIN\user
- Don't forget to change DOMAIN\user
How to Test Fault Resilience
At first, run the application as specified in part 1 of this series of articles. Then restart the pub/sub server and try to publish the event and observe how it works.
Another way, unplug the network wire or stop the intermediate router for a while, then make these ok and try to publish the event and observe how it works.
Description of Implementation
It is important to know after fault whenever the Server and Network become ok or not. We can say everything (Network, Server) is ok if the server is reachable. So here testing reach ability is done by asynchronous socket programming with timeout for better performance. The following code meets the purpose:
public static bool isServerUp(string endpoint)
{
string[] splitstrings = endpoint.Split(new char[] { ':' });
if (splitstrings[0] != null)
{
if (splitstrings[0] == "net.pipe") return true;
}
if (splitstrings[1] != null && splitstrings[2] != null)
{
IPAddress ip = IPAddress.Parse(splitstrings[1].Remove(0, 2));
int port = Convert.ToInt32(splitstrings[2].Split("/".ToCharArray())[0]);
IPEndPoint remoteEndPoint = new IPEndPoint(ip, port);
bool fg = Connect(remoteEndPoint, 4000);
return fg;
}
return false;
}
private static ManualResetEvent waitob = new ManualResetEvent(false);
private static Exception asynexception;
public static bool Connect(IPEndPoint remoteEndPoint, int timeoutMSec)
{
string host = Convert.ToString(remoteEndPoint.Address);
int portNo = remoteEndPoint.Port;
waitob.Reset();
asynexception = null;
TcpClient tc = new TcpClient();
IAsyncResult iar = tc.BeginConnect(host, portNo, new AsyncCallback(ConnectCallBack), tc);
if (waitob.WaitOne(timeoutMSec, false))
{
if (asynexception == null)
{
return true;
}
else
{
return false;
}
}
else
{
tc.Close();
return false;
}
}
private static void ConnectCallBack(IAsyncResult ar)
{
try
{
TcpClient client = ar.AsyncState as TcpClient;
if (client.Client != null)
{
client.EndConnect(ar);
}
else
{
}
if (client.Connected == false)
{
asynexception = new Exception();
}
}
catch (Exception ex)
{
asynexception = ex;
}
finally
{
waitob.Set();
}
}
When this method “IsServerUp
” informs that Server is reachable, we can reestablish the communication with WCF PUB/SUB Server.
For Subscriber:
As we know subscriber is a passive entity, so after establishing connection with WCF Server a thread will periodically (5 seconds) check whether Server and Network are ok or not. Whenever it determines that either the network or server is not ok, then it will delegate responsibility to the class “SubscribeReconnectionHelper
”:
void OnSubscribe(object sender, EventArgs e)
{
try
{
CommunicationObject.Subscribe();
((Button)sender).Visible = false;
button2.Visible = true;
ClientActiveHelper.Keepactiveclients
(CommunicationObject, this, ReceiverProtocolType.TCP );
}
catch
{
}
}
public class ClientActiveHelper
{
public static void Keepactiveclients(ICommunication CommunicationObject,
IEvent subscriber, ReceiverProtocolType receiverProtocolType)
{
ReceiverThreadParam tp = new ReceiverThreadParam();
tp.CommunicationObject = CommunicationObject;
tp.ReceiverProtocolType = receiverProtocolType;
tp.Subscriber = subscriber;
WaitCallback eventthread = delegate(object data)
{
livetest(data as ReceiverThreadParam);
};
Action<ReceiverThreadParam> eventthreadQueueUP =
delegate(ReceiverThreadParam callbackData)
{
ThreadPool.QueueUserWorkItem(eventthread, callbackData);
};
eventthreadQueueUP(tp);
}
private static void livetest(object threadparam)
{
ICommunication br = ((ReceiverThreadParam)threadparam).CommunicationObject;
IEvent ievent = ((ReceiverThreadParam)threadparam).Subscriber;
ReceiverProtocolType protocoltype =
((ReceiverThreadParam)threadparam).ReceiverProtocolType;
while (true)
{
if (br == null) break;
bool flag = true;
try
{
br.Subscribe();
}
catch
{
flag = false;
}
if (!flag)
{
SubscribeReconnectionHelper.reconnect(br, ievent, protocoltype);
break;
}
Thread.Sleep(5000);
}
}
}
The “SubscribeReconnectionHelper
” class will start connectivity testing asynchronously. Whenever connectivity is succeeds, it will reestablish connection between subscriber and WCF PUB/SUB Server.
public class SubscribeReconnectionHelper
{
public static void reconnect(Contacts.ICommunication br,
IEvent ievent, ReceiverProtocolType protocoltype)
{
ReceiverThreadParam threadparam = new ReceiverThreadParam();
threadparam.CommunicationObject = br;
threadparam.Subscriber = ievent;
threadparam.ReceiverProtocolType = protocoltype;
WaitCallback eventthread = delegate(object data)
{
trytoconnect(data as ReceiverThreadParam);
};
Action<ReceiverThreadParam> eventthreadQueueUP =
delegate(ReceiverThreadParam callbackData)
{
ThreadPool.QueueUserWorkItem(eventthread, callbackData);
};
eventthreadQueueUP(threadparam);
}
private static void trytoconnect(object threadparam)
{
ICommunication communicationObject =
((ReceiverThreadParam)threadparam).CommunicationObject; ;
IEvent subscriber = ((ReceiverThreadParam)threadparam).Subscriber;
ReceiverProtocolType receiverProtocolType =
((ReceiverThreadParam)threadparam).ReceiverProtocolType;
string Endpoint = ConfigurationManager.AppSettings["EndpointAddress"];
while (true)
{
if (isServerUp(Endpoint))
{
bool returnresult = reestablishconnectionwithServer
(communicationObject, subscriber, receiverProtocolType);
if (returnresult)
return;
}
Thread.Sleep(10000);
}
}
private static bool reestablishconnectionwithServer
(ICommunication communicationObject, IEvent subscriber,
ReceiverProtocolType receiverProtocolType)
{
string Endpoint = ConfigurationManager.AppSettings["EndpointAddress"];
try
{
communicationObject.MakeClient(Endpoint, subscriber);
communicationObject.Subscribe();
ClientActiveHelper.Keepactiveclients(communicationObject,
subscriber, ReceiverProtocolType.TCP);
}
catch
{
return false;
}
return true;
}
}
For publisher:
As we know, publisher is an active entity, so it can detect the fault. The method “OnFierevent
” is executed once for each event publishing. So when an event publishing fails, it indicates that there is a fault. Then we call the method “reconnect
” of class “PublisherReconnectionHelper
”. It starts connectivity testing asynchronously and whenever connectivity testing is succeeds, it will reestablish the connection between the Publisher and PUB/SUB Server. The following code meets the purpose:
Socket programing.. determine a fault:
void OnFireEvent(object sender, EventArgs e)
{
AlertData alertData = PrepareAlertForAstaReceiver();
bool flag = true;
if (proxy.State != System.ServiceModel.CommunicationState.Faulted &&
proxy.State != System.ServiceModel.CommunicationState.Closed)
{
try
{
proxy.OnEvent(alertData);
flag = false;
eventCounter += 1;
}
catch
{
}
if (flag)
{
lock (this)
{
if (proxy.State == System.ServiceModel.CommunicationState.Faulted)
{
PublisherReconnectionHelper.reconnect(proxy.Endpoint, this);
}
}
}
}
txtEventCount.Text = eventCounter.ToString();
}
public class PublisherReconnectionHelper
{
public static void reconnect(ServiceEndpoint serviceEndpoint, IProxy ui)
{
ThreadParam threadparam = new ThreadParam();
threadparam.Ui = ui;
threadparam.ServiceEndpoint = serviceEndpoint;
WaitCallback eventthread = delegate(object data)
{
trytoconnect(data as ThreadParam);
};
Action<ThreadParam> eventthreadQueueUP = delegate(ThreadParam callbackData)
{
ThreadPool.QueueUserWorkItem(eventthread, callbackData);
};
eventthreadQueueUP(threadparam);
}
private static void trytoconnect(object threadparam)
{
ServiceEndpoint endpointAdress = ((ThreadParam)threadparam).ServiceEndpoint;
IProxy ui = ((ThreadParam)threadparam).Ui;
while (true)
{
if (isServerUp(endpointAdress.Address.Uri.ToString()))
{
bool returnresult = reestablishconnectionwithServer(ui,
endpointAdress.Address.Uri.ToString());
if (returnresult)
return;
}
Thread.Sleep(10000);
}
}
private static bool reestablishconnectionwithServer(IProxy ui, string endpoint)
{
string[] splitstrings = endpoint.Split(new char[] { ':' });
if (splitstrings[0] != null)
{
if (splitstrings[0] == "net.pipe")
{
try
{
EndpointAddress endpointAddress = new EndpointAddress(endpoint);
NetNamedPipeBinding namedPipeBindingpublish = new NetNamedPipeBinding();
ui.Proxy = new PublisherProxy(namedPipeBindingpublish, endpointAddress);
}
catch
{
return false;
}
}
else if (splitstrings[0] == "net.tcp")
{
try
{
EndpointAddress endpointAddress = new EndpointAddress(endpoint);
NetTcpBinding NetTcpbinding = new NetTcpBinding(SecurityMode.None);
ui.Proxy = new PublisherProxy();
}
catch
{
return false;
}
}
else if (splitstrings[0] == "http")
{
try
{
WSDualHttpBinding wsDualBindingpublish = new WSDualHttpBinding();
EndpointAddress endpointAddress = new EndpointAddress(endpoint);
string strHostName = Dns.GetHostName();
IPHostEntry ipEntry = Dns.GetHostByName(strHostName);
IPAddress[] addr = ipEntry.AddressList;
wsDualBindingpublish.ClientBaseAddress = new
Uri("http://" + addr[0].ToString() + ":" + "3000" + "/");
ui.Proxy = new PublisherProxy(wsDualBindingpublish, endpointAddress);
}
catch
{
return false;
}
}
}
return true;
}
}
Issues
- The events that are sent out when there is a fault will be lost. If you want these events, then use some persistence strategy.
- For this article, we are not considering client shut down / restart or any of its other faults.
- Does not work under NAT
- No authentication, encryption
- No provision to prevent DOS (Denial of Service attack) attack
Conclusion
If you think of any cool new features or find any bugs, please let me know about it. In a future post, I will give the following implementations:
A partially centralized publish subscribe overlay network in p2p approach using WCF and then I will show how windows and certificate authentication can be implemented in a configurable approach.
History
- 14/09/2008 - Initial article uploaded