Introduction
In the course of my career, I’ve developed different socket servers in C++, Java, C# with different purposes and using different techniques. In the last 3 years, I’ve developed a socket client/server system in order to dispatch messages from multiple clients to multiple servers. I would like to share what I’ve learned with this experience and write a library to create your own message system.
The library exposes a set of abstract
classes that you can quickly implement in order to create your own message system. In this way, you don’t need to know the library in depth, nor change it. The library uses I/O completion ports and the multithread pool in order to have a performance server. The library exposes a client and a server as a message within a header.
Background
In order to allow the server to handle different clients and do something else during the receiving, the receiving is asynchronous. A BeginReceive
method is called to start receiving pass a callback method and a SocketObjectState
as state object for the asynchronous receiving. Then, a EndReceive
method is called to get the bytes received. When the server receives a message, it fires an event. The event is fired on a different thread so as not to block the server. The ThreadPool
system class is used for this scope.
When the server receives messages, the system socket uses a buffer and receives long messages with multiple reads as it can receive more messages in the same buffer if they are small and send in the same time. For that reason, the AbstractMessage
exposes a static
method TryReadMessage
that handles all the situations. In the unit test project, you can see some unit tests on this method.
The term "socket client handler" is used to indicate the class that handles the system socket. The library includes an abstract handler that manages socket as SSL stream as well.
Using the Code
To create your own message system, you have to implement the following list of abstract
classes:
AbstractMessageHeader
defines the message header of each message AbstractMessage
defines the message AbstractTcpSocketClientHandler
defines the socket client handler AbstractSocketServer
defines the socket server AbstractSocketClient
defines the socket client
The implementation of the AbstractMessageHeader
should define how the header is composed (byte to byte) and its methods to read and write the header on a buffer. The AbstractMessage
’s implementation should define the kind of message header used as shown in the following code:
public class BasicMessage : AbstractMessage
{
....
....
protected override AbstractMessageHeader GetMessageHeaderInstance()
{
return new BasicHeader();
}
}
The other implementations are very simple if you don’t need to do something special. The class implemented from AbstractTcpSocketClient
defines the message class used.
class BasicSocketClientHandler : AbstractTcpSocketClientHandler
{
public BasicSocketClientHandler(Socket handler, SslStream stream)
: base(handler, stream)
{
}
public BasicSocketClientHandler(Socket handler)
: base(handler, null)
{
}
protected override AbstractMessage GetMessageInstance()
{
return new BasicMessage();
}
}
And finally, both of your custom client and server classes define the socket client handler class used.
protected override AbstractTcpSocketClientHandler GetHandler
(System.Net.Sockets.Socket handler, System.Net.Security.SslStream stream)
{
return new BasicClientServerLib.SocketHandler.BasicSocketClientHandler(handler, stream);
}
The solution includes a BasicClientServerLib
project that offers a simple implementation of all the abstract
classes and a simple test application.
Points of Interest
The explanation above the library is composed of the following major components:
- Message + Message Header
- Socket Client Handler
- Server
- Client
You can initialize the server using a certificate. In that way, the communication is on SSL and the client has to provide a client certification in order to be connected to the server. If you want to check the client certificate (or the server certificate), you have to write a piece of code as if you wanted to get the certificate from the windows repository and not from the file system.
When the receive callback is called, you can have in the buffer read a complete message as more complete messages as a piece of a message. The socket client handler calls the TryReadMessage
method to try to read a message from the buffer. If a complete message is read, an event is fired and then the handler checks if there’re more messages in the same buffer. If the buffer doesn’t contain a complete message, the incomplete message is saved in the state
object and at the next callback, the new buffer will be added to the previous buffer. You can see this part inside the TryReadMessage
method.
private static void readCallback(IAsyncResult ar)
{
SocketStateObject state = (SocketStateObject)ar.AsyncState;
AbstractTcpSocketClientHandler handler = state.workHandler;
try
{
int read = handler.EndReceive(ar);
Trace.WriteLine(string.Format("Receive {0} bytes", read));
if (read > 0)
{
handler.OnReceivingMessage(handler);
while (true)
{
AbstractMessage message = AbstractMessage.TryReadMessage
(handler.GetMessageInstance(), state, read);
if (message != null)
{
ReceiveMessageStateObject rcvObj = new ReceiveMessageStateObject()
{ Handler = handler, Message = message };
handler.OnReceiveMessage(rcvObj);
}
if (state.pendingBuffer == null)
{
break;
}
read = 0;
}
handler.socket.BeginReceive(state, new AsyncCallback(readCallback));
}
else
{
handler.Close();
}
}
catch (MessageException mex)
{
....
....
....
}
....
....
....
}
Therefore, the SocketStateObject
is not only used to read incoming messages in an asynchronous way, but also to store the necessary information for the different situations to handle. In detail, the attribute "message
" is used to save an incomplete message when the pendingBuffer
is used to contain the remaining data after the first message.
internal class SocketStateObject
{
public AbstractTcpSocketClientHandler workHandler = null;
public const int BufferSize = 8192;
public byte[] buffer = new byte[BufferSize];
public AbstractMessage message = null;
public byte[] pendingBuffer = null;
}
internal static AbstractMessage TryReadMessage
(AbstractMessage message, SocketStateObject state, int byteRead)
{
AbstractMessage messageRead = null;
int moreMessage = 0;
byte[] buffer = state.buffer;
if (state.pendingBuffer != null)
{
buffer = new byte[byteRead + state.pendingBuffer.Length];
Array.Copy(state.pendingBuffer, 0, buffer, 0, state.pendingBuffer.Length);
Array.Copy(state.buffer, 0, buffer, state.pendingBuffer.Length, byteRead);
byteRead = buffer.Length;
}
state.pendingBuffer = null;
if (state.message == null)
{
state.message = message;
moreMessage = state.message.ReadFirstMessage(buffer, byteRead);
Trace.WriteLine(string.Format("Receive 1st package MessageUID {0} ClientUID {1}",
state.message.MessageUID, state.message.ClientUID));
}
else
{
moreMessage = state.message.AppendBuffer(buffer, byteRead);
Trace.WriteLine(string.Format("Receive more package MessageUID {0} ClientUID {1}",
state.message.MessageUID, state.message.ClientUID));
}
if (state.message.IsComplete())
{
Trace.WriteLine(string.Format("Receive complete message {0} len {1}",
state.message.MessageUID, state.message.MessageLength));
messageRead = state.message;
Trace.WriteLine(string.Format
("Prepare to receive a new message. moreMessage = {0}", moreMessage));
state.message = null;
if (moreMessage > 0)
{
state.pendingBuffer = new byte[byteRead - moreMessage];
Array.Copy(buffer, moreMessage, state.pendingBuffer, 0, state.pendingBuffer.Length);
Trace.WriteLine(string.Format
("Copy {0} bytes to pending buffer", state.pendingBuffer.Length));
}
}
return messageRead;
}
Changes on New Versions (V3)
Follow some suggestions, I've added the connection event in the client class and two new classes in the abstract library model in order to offer a version using the ReceiveAsync
method instead of the Begin
/End Receive
.
The new classes are:
TcpSocketAsync
– inherits from TcpSocket
AbstractAsyncTcpSocketClientHandler
– inherits from AbstractTcpSocketClientHandler
Also, I’ve added a new test library named AsyncClientServerLib
that implements a test library using the new classes.
Add New Functionalities (V4)
I have completed the library with two more functionalities:
- Send messages with asynchronous method (
SendAsync
) - Receive messages using a queue
The first one is very useful on the server side in order to avoid possible blocks during transmission. However, a thread event is used to block a new transmission until the previous one is completed.
The second new functionality could be more useful on the client side in order to avoid too many threads and sort all the incoming messages in a queue. To realize this functionality, I have used a feature contained in .NET 4.0 (BlockingCollection
), so this version cannot be used with previous .NET versions.