Introduction
This article and code provide .NET Core implementation of asynchronous TCP communication. My previous article on the topic entitled TCP Socket Off-the-shelf presents a component for synchronous TCP communication. Communication component in this work, being simpler, possesses additional important useful features. Particularly, it
- uses .NET Core compatible to all major operating systems,
- applies async-await pattern for asynchronous behavior,
- provides handy infrastructure for Remote Procedure Call (below referred to as RPC) and continuous data streaming,
- automatically handles serialized object scattered over several received chunks,
- produces timestamp of transferred data to determine transport delay.
The component does not require dedicated thread per connection and takes asynchronous approach. Its implementation is considerably simplified by using async-await pattern.
To use .NET Core version of code sample .NET Core 2.0 should be installed. This can be done from this Microsoft site. Microsoft Visual Studio 2017 was used for development.
Note. Since many readers use sockets in desktop application, I decided to add a desktop version. It has essentially the same code. To use the desktop version installation of .NET Core is not required. Parts of the article expllaining installation etc. are addressed to .NET Core version.
Code Description
TcpHelperLib project (DLL) implements TCP communication. It is common for server and client. The main type of the component is TcpHelper
. Its constructors allow the user to configure an instance either explicitly providing arguments (with possibility to omit some of them and use their default definitions) or implicitly by reading a JSON configuration file. The constructors take as arguments instance's id
and processMethod
user supplied handler to process received data. It is called internally by TcpHelper
type and takes the following arguments:
Type | Description |
DateTime | Time when data were sent (timestamp). |
List<byte> | Data received. |
TcpClientWrapper | Proxy object representing communication counterpart. It is used to send data back. |
StateProperties | Object that preserves state of the connection. |
Type TcpHelper
provides the most important public methods, that is async Listen()
for server and async Connect()
for client.
The server object of TcpHelper
starts to listen on given IP address for incoming clients' calls by activating its method Listen()
.
public async void Listen(int port, string host = null)
{
var listener = new TcpListener(GetHost(ref host), port);
listener.Start();
Log($"Server \"{Id}\" is listening on {host}:{port} ...");
TcpClient client = null;
while ((client = await listener.AcceptTcpClientAsync()) != null && client.Connected)
{
var str = $"Client {client.Client.RemoteEndPoint} {ackConnection} \"{Id}\" {host}:{port}";
Log(str);
isActive = true;
var clientWrapper = new TcpClientWrapper(Delim, DelimRepeated) { Peer = client };
await clientWrapper.SendAsync();
await clientWrapper.SendAsync(str);
Receive(client);
}
}
Client also creates an object of TcpHelper
type and calls its method Connect()
sending a request for connection to server.
public async Task<TcpClientWrapper> Connect(int port, string host = null)
{
var server = await GetConnectedServer(host, port);
if (server.Connected)
{
Log($"Client \"{Id}\" connected to Server {host}:{port}.");
Receive(server);
}
else
LogError("Connection failed after reties.");
return new TcpClientWrapper(Delim, DelimRepeated) { Peer = server };
}
private async Task<TcpClient> GetConnectedServer(string host, int port)
{
var server = new TcpClient();
for (int i = 0; i < maxConnectionAttempts && !server.Connected && !cts.IsCancellationRequested; i++)
{
try
{
await server.Client.ConnectAsync(GetHost(ref host), port);
}
catch
{
await Task.Delay(intervalBetweenConnectionAttemptsInMs, cts.Token);
}
}
return server;
}
As soon as an incoming call from client arrives, TcpHelper
object in the server side creates a connection, assigning it to a new socket with some port differed from one on which the server is listening. After connection has been established client and server exchange with messages. Method Receive()
that is common for server and client, provides message processing.
private async void Receive(TcpClient client)
{
if (client == null || !client.Connected)
return;
var lstData = new List<byte>();
var stateProprties = new StateProperties();
using (var netStream = client.GetStream())
{
var clientWrapper = new TcpClientWrapper(Delim, DelimRepeated) { Peer = client };
var buffer = new byte[receiveBufferSize];
int readBytes = 0;
while ((readBytes = await ReadNetStreamAsync(netStream, buffer, cts.Token)) > 0)
{
lstData.AddRange(GetReceivedBuffer(buffer, readBytes));
try
{
ProcessReceived(lstData, clientWrapper, stateProprties);
SetLastInteractionTime();
}
catch (Exception e)
{
LogError("ProcessReceived() failed.", e);
}
}
}
}
It is called per connection, and in its while
loop reads received data. Due to async-await pattern the loop does not block a thread, thus implementing asynchronous socket.
The roles of client and server differ only in the beginning of communication. Client initiates the communication, and server accepts it. As soon as communication has been established, the peers exchanges with messages. So we can talk about message provider and message recipient, which can be client and server alike. Usually these messages contain serialized objects. In order to communicate via sequential channel, provider serializes objects to byte arrays and sends them to recipient as continuous stream of bytes. Recipient should reconstruct (deserialize) meaningful objects from these bytes. For that purpose, recipient has to know the boundaries of the serialized objects in the incoming stream of bytes. This can be achieved either using objects that can be serialized to byte arrays of fixed size, or by delimiting byte stream. In this work the latter approach is implemented. Delimiter constitutes some byte repeated several times in row in the byte stream. The byte and number of its repetitions are configurable (by default byte is a char
object "pipe" '|'
0x7C and repetition is 3 times). Repetition is required to distinguish between delimitation and occasional appearance of delimiter byte in the transferred meaningful data. A related important issue with transfer of serialized data is joining the parts of the same object spread to in different received chunks.
Both delimitation and object joining problems are solved internally in TcpHelperLib implementing the following simple protocol. Provider automatically added repeated delimiter byte to sent data, and recipient parses received data accordingly. Stream of bytes is depicted in the following figure:
Structure of received data
Method Receive()
inside its while
loop calls method ProcessReceived()
.
private async void ProcessReceived(List<byte> lstData, TcpClientWrapper clientWrapper, StateProperties stateProperties)
{
Log($"{clientWrapper.RemoteEndPoint} ThreadId = {Thread.CurrentThread.ManagedThreadId}");
var lstParts = SplitToDelimitedParts(lstData);
foreach (var lstByte in lstParts)
{
ProcessingResult result = null;
if (lstByte != null)
{
var timestamp = GetTimestamp(lstByte);
if (timestamp > DateTime.MinValue)
{
CheckIfRpc(lstByte, stateProprties);
try
{
result = processMethod?.Invoke(timestamp, lstByte, clientWrapper, stateProprties);
}
catch (Exception e)
{
LogError("ProcessReceived(): user supplied processMethod() callback failed.", e);
}
}
else
LogError("ProcessReceived(): timestamp == DateTime.MinValue");
}
if (result != null)
{
byte[] bts = result.BytesToSendBack;
if (bts != null && bts.Length > 0)
{
var task = clientWrapper.SendAsync(bts);
if (result.IsSyncSend)
await task;
}
}
}
}
Method ProcessReceived()
gets as its arguments a list of bytes List<byte> lstData
containing received bytes started right after delimiter, object TcpClientWrapper clientWrapper
representing communication peer and used to send a response back, and StateProperties stateProperties
preserving state of the communication. Object stateProperties
holds properties that may be inserted either in Receive()
method (like handler for RPC in appropriate scenario described below) or in user supplied processMethod
callback. Method ProcessReceived()
calls method SplitToDelimitedParts()
to obtained list of delimited lists of bytes - object of type List<List<byte>>
. The latter method also modifies its argument List<byte> lstData
to left there only "tail" of received bytes (if not empty) that will be added later as a "head" to an upcoming received chunk. Then for each of delimited lists of bytes deserialization to objects is performed. Method Task SendAsync(byte[] dataToSend)
of type TcpClientWrapper
used to send bytes, automatically puts timestamp (as long
representation of DateTime
object in ticks) at the beginning and delimiter at the and of sending data. Recipient reads this timestamp calling GetTimestamp()
method inside method ProcessReceived()
.
Before proceeding with method ProcessReceived()
let's briefly describe the main scenarios of message exchange between TCP communication peers. The most common scenarios are the following:
- synchronous dialog when a peer sends message only in response to received one, that is sequential messages exchange,
- asynchronous dialog when the peers send and receive messages non-sequentially. Particular case of asynchronous dialog is streaming when one side requests the other one for continuous stream of messages. This situation may arise e. g. when one side subscribes for events issued by the other.
- RPC when partners call some methods of each other remotely.
ProcessReceived()
provides support for the above scenarios. It calls method CheckIfRpc(lstByte, stateProperties)
to determine whether RPC scenario takes place. In this case received bytes are deserialized to JSON string providing remote procedure name and arguments. This JSON string is converted to an instance of RemoteProcInfo
type and placed to stateProperties
.
User supplied processMethod
handler is called by method ProcessReceived()
in all the above scenarios. processMethod
is free from delimiting and joining problems. It gets list of bytes argument lst
corresponding to one array of useful data ready for deserialization, and in case of RPC, in addition, already prepared RemoteProcInfo
object that can be retrieved from stateProperties
argument.
For the RPC case processMethod
extracts RemoteProcInfo
object from stateProperties
and performs actual procedure call. In other scenarios processMethod
uses List<byte> lst
argument. As its result, processMethod
generates object of type ProcessingResult
. The content of this output object (properties BytesToSendBack
or StringToSendBack
) is automatically sent back to connection peer either synchronously or asynchronously depending on value of IsSyncSend
boolean property of ProcessingResult
type. If return value of processMethod
is null
then nothing is sent back to communication peer after call of processMethod
. This is used e.g. in streaming on the data recipient side when acknowledgment of receiving data is not required.
Argument TcpClientWrapper clientWrapper
of processMethod
acts as a proxy of communication peer and may be explicitly used for asynchronous sending data to it. To illustrate this in our sample we use it in timer's handler to implement the streaming scenario, as it will be discussed below.
Code Sample
The sample shows usage of TcpHelperLib component. It is developed with .NET Core and therefore may run in all major operating systems (it was tested under Windows 10 and Linux as it will be shown below). The sample solution consists of the four projects, namely, TcpHelperLib itself placed in folder Lib, Server and Client console applications, and auxiliary TestObjectLib.
Server
Complete code of Server is presented below:
using System;
using System.Threading;
using Newtonsoft.Json.Linq;
using TcpHelperLib;
using TestObjectLib;
namespace AsyncSocketServer
{
class Server
{
const int port = 11511;
const string JSON_CONFIG_FILE = "tcpHelperSettings.json";
const string TIMER_NAME = "Timer";
const string START_STREAMING = "Start Streaming";
static MethodCaller caller = new MethodCaller();
static void Main(string[] args)
{
Console.WriteLine("Async. Socket SERVER");
caller["Foo"] =
rpi =>
{
var p = rpi.Params;
var jo = p[2] as JObject;
var test = new Test { Str = jo.V<string>("Str"), Num = jo.V<int>("Num") };
return new ProcessingResult
{
StringToSendBack = $"{Implementation.Foo((string)p[0], (double)p[1], test)}",
};
};
var server = new TcpHelper(id: "SVR", processMethod: (dt, lst, clientWrapper, stateProprties) =>
{
var rpi = stateProprties.GetRpi("Foo");
if (rpi != null)
return caller.ExecuteMethod(rpi);
if (lst.ToStr() == START_STREAMING)
stateProprties[TIMER_NAME] = new Timer(async _ =>
{
try
{
await clientWrapper.SendAsync($"{DateTime.Now}");
Console.WriteLine($"Streaming remote endpoint: {clientWrapper.RemoteEndPoint}");
}
catch (Exception e)
{
(stateProprties[TIMER_NAME] as Timer)?.Dispose();
stateProprties[TIMER_NAME] = null;
Console.WriteLine(e);
}
},
null, 1000, 500);
return null;
},
configFilePath: JSON_CONFIG_FILE);
server.Listen(port);
Console.WriteLine("Press any key to quit...");
Console.ReadKey();
server.Stop();
}
}
internal static class Implementation
{
public static string Foo(string s, double d, Test test)
{
return $"\"Foo()\": Echo from Server: {s} {d}, {test.Str} {test.Num}";
}
}
}
Object TcpHelper server
is created with its processMethod
callback supporting RPC and streaming scenarios. In case of RPC the callback gets RemoteProcInfo
object from the state containing stateProperties
and supplies it as an argument to ExecuteMethod()
of created before MethodCaller caller
object. caller
contains activator for remotely called method Implementation.Foo()
. If stateProperties
does not contain entry for "Foo" then the streaming scenario is assumed. In this case, as soon as the server has got "Start Streaming" message it starts a timer to periodically send asyncronous message (the server's time in this case) to client. In this sample each connection sets up its own timer which is kept in stateProperties
as a part of the connection state. For server, all connections have the same implementation of processMethod
but its instances are different. Server starts to listen on specified URL when its method Listen()
is called.
Client
This is complete code of Client:
#define INCLUDE_RPC
using System;
using System.Collections.Generic;
using System.Threading;
using TcpHelperLib;
using TestObjectLib;
namespace AsyncSocketClient
{
class Client
{
static Timer timer;
static SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);
static bool isStreaming = false;
const int port = 11511;
const string START_STREAMING = "Start Streaming";
static TcpHelper clientStream = null;
static TcpClientWrapper tcwStream = null;
#if INCLUDE_RPC
static TcpHelper clientRpc = null;
static TcpClientWrapper tcwRpc = null;
#endif
static void Main(string[] args)
{
Console.WriteLine("Async. Socket CLIENT");
timer = new Timer(async _ =>
{
int maxIdleTimeInSec = 10;
await semaphore.WaitAsync();
if (clientStream != null && clientStream.TimeSpanSinceLastInteraction > TimeSpan.FromSeconds(maxIdleTimeInSec) &&
(tcwStream == null || !tcwStream.IsConnected))
{
Console.WriteLine("Connection Timer: STREAMING");
isStreaming = false;
tcwStream = await clientStream.Connect(port);
}
#if INCLUDE_RPC
if (clientRpc != null && clientRpc.TimeSpanSinceLastInteraction > TimeSpan.FromSeconds(maxIdleTimeInSec) &&
(tcwRpc == null || !tcwRpc.IsConnected))
{
Console.WriteLine("Connection Timer: RPC");
tcwRpc = await clientRpc.Connect(port);
}
#endif
semaphore.Release();
},
null, 0, 1000);
clientStream = new TcpHelper(id: "STREAMING-CLIENT", processMethod: (timestamp, lstByte, clientWrapper, dctState) =>
{
CommonHandler("STREAMING", timestamp, clientStream, clientWrapper, lstByte);
if (!isStreaming)
{
isStreaming = true;
return new ProcessingResult { StringToSendBack = START_STREAMING };
}
return null;
});
#if INCLUDE_RPC
clientRpc = new TcpHelper(id: "RPC-CLIENT", processMethod: (timestamp, lstByte, clientWrapper, dctState) =>
{
CommonHandler("RPC", timestamp, clientRpc, clientWrapper, lstByte);
return new ProcessingResult
{
StringToSendBack = Proxy.ToJson("Foo", "Apricot", 15.11, new Test { Num = 5, Str = "Mango" }),
};
});
#endif
Console.WriteLine("Press any key to quit...");
Console.ReadKey();
clientStream.Stop();
#if INCLUDE_RPC
clientRpc.Stop();
#endif
}
static void CommonHandler(string name, DateTime timestamp, TcpHelper clientHelper,
TcpClientWrapper clientWrapper, List<byte> lstByte)
{
const int delayLimitinMs = 1500;
var delay = DateTime.Now - timestamp;
if (delay > TimeSpan.FromMilliseconds(delayLimitinMs))
Console.WriteLine($"******** {name}: Large Delay: {delay}, exceeds limit of {delayLimitinMs} ms");
Console.WriteLine($"{name}: {timestamp} ** {lstByte.ToStr()}");
}
}
}
Client is created as object of type TcpHelper
. Object clientStream
provides streaming scenario, and object clientRpc
stands for RPC scenario (the latter is used when INCLUDE_RPC
is defined). Each client supports one connection to server and has its own implementation of processMethod
callback. Client starts connection calling its method Connect()
. In response to connection initiation with Connect()
call server answered with a well-known acknowledgment processed with processMethod
of appropriate client. clientStream
once sends to server request to start streaming and then its processMethod
continuously gets appropriate message. In clientRpc
its processMethod
generates output ProcessingResult
object that will be send asynchronously (since IsSyncSend
is not set to true). It sends JSON call for remote procedure execution and gets result as an object of ProcessingResult
type.
TcpHelperLib component is equipped with "built-in" logging. Its output level is configurable either with constructor argument or with configuration file. In our sample, logging is output to console. Log messages of TcpHelperLib component itself in both server and client are displayed in console window with indent and starting with "TcpHelper: " prefix. Messages of Server and Client test application are output to console without indent.
Running Sample
Windows
The sample's binaries ready for running may be downloaded with Download demo link in the beginning of the article. To prepare these binaries the following command was executed from project folders of Server and Client:
dotnet publish -o publish -c Release
(for convenience this command is placed to publish.cmd command files in these folders) and copy contents of both newly appeared publish folders together in common folder (let's call it AsyncTcp_bin). JSON configuration file tcpHelperSettings.json may be also placed in this common folder. To run the Client and the Server the following command should be executed from AsyncTcp_bin folder in to different console windows:
dotnet Server.dll
and
dotnet Client.dll
Linux
To run the sample in Linux I use Linux virtual machine installed with Oracle VirtualBox. Only the server environment is required here. VirtualBox may be downloaded from its Web site and guidance for its installation may be found e.g. here. After the installation of the virtual machine we need to install a tool for files exchange between Windows and Linux. WinSCP does the job (the acronym stands for Windows Secure Copy). The application may be installed from its site. Although VirtualBox's display window can be used for inserting commands, PuTTY terminal emulator is very useful for that purpose. It may be downloaded from here. Installations of .NET Core should be carried from dedicated Microsoft site.
Content of prepared in Windows AsyncTcp_bin (or downloaded from Download demo) folder should be transferred to Linux environment. This can be carried out with WinSCP application. PuTTY application can be used to run same as in Windows dotnet Client.dll
and dotnet Server.dll
commands.
Test Cases
As the first test case we run one Server and one Client application. They may be run in any order since Client keeps trying to connect Server if it is not immediately available (appropriate parameters may be configured). If INCLUDE_RPC
is defined then in Client we can see messages for both streaming and RPC scenarios (otherwise for streaming only). For the RPC case Server writes console messages of its TcpHelperLib component indicating thread in which processMethod
callback is executed. As you will see, the threads are varying from time to time. This indicates asynchronous message receiving mechanism implemented with async-await pattern. Both Client and Server application may be stopped with any key clicked in their console windows. We can stop any of them, see appropriate exception message in the other one, and then restart the stopped application. Connection will be restored and both Client and Server will run as initially (please note that in Linux virtual machine successful restart is possible only after some time - probably previous socket is not immediately destroyed). We can start a Server with several Clients. Each client establishes its own connections with server for RPC and streaming. Similar to previous case, a restart of any application will cause automatic reconnection.
It is interesting to test the case when the message to be parsed appears in more than one received chunk. For this test we can reduce length of receiving buffer to some 29 bytes. This can be achieve with configuration and TcpHelper
constructor's argument. But to simplify testing, just uncomment two lines marked with comment //1. Now if we insert a breakpoint in the line marked with comment //1* then it will be hit. Even with this tiny receive buffer we should get proper result, although we will see more calls of processMethod
callback on server side.
Conclusions
Component for TCP connection developed in .NET Core for usage in different operating systems is presented in the article. It is based on async-await pattern to avoid thread blocking. The component provides infrastructure for synchronous and asynchronous message exchange as well as for remote procedure call. Test console applications for server and client are attached.