Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

TCP Socket Off-the-shelf - Revisited with Async-Await and .NET Core

5.00/5 (21 votes)
8 May 2018CPOL12 min read 41K   3.4K  
Handy component for asynchronous TCP connection. With the same code it is available in .NET Core and Desktop versions.

Introduction

This article and code provide .NET Core implementation of asynchronous TCP communication. My previous article on the topic entitled TCP Socket Off-the-shelf presents a component for synchronous TCP communication. Communication component in this work, being simpler, possesses additional important useful features. Particularly, it

  • uses .NET Core compatible to all major operating systems,
  • applies async-await pattern for asynchronous behavior,
  • provides handy infrastructure for Remote Procedure Call (below referred to as RPC) and continuous data streaming,
  • automatically handles serialized object scattered over several received chunks,
  • produces timestamp of transferred data to determine transport delay.

The component does not require dedicated thread per connection and takes asynchronous approach. Its implementation is considerably simplified by using async-await pattern.

To use .NET Core version of code sample .NET Core 2.0 should be installed. This can be done from this Microsoft site. Microsoft Visual Studio 2017 was used for development.

Note. Since many readers use sockets in desktop application, I decided to add a desktop version. It has essentially the same code. To use the desktop version installation of .NET Core is not required. Parts of the article expllaining installation etc. are addressed to .NET Core version.

Code Description

TcpHelperLib project (DLL) implements TCP communication. It is common for server and client. The main type of the component is TcpHelper. Its constructors allow the user to configure an instance either explicitly providing arguments (with possibility to omit some of them and use their default definitions) or implicitly by reading a JSON configuration file. The constructors take as arguments instance's id and processMethod user supplied handler to process received data. It is called internally by TcpHelper type and takes the following arguments:

Type Description
DateTime Time when data were sent (timestamp).
List<byte> Data received.
TcpClientWrapper Proxy object representing communication counterpart. It is used to send data back.
StateProperties Object that preserves state of the connection.

Type TcpHelper provides the most important public methods, that is async Listen() for server and async Connect() for client.

The server object of TcpHelper starts to listen on given IP address for incoming clients' calls by activating its method Listen().

public async void Listen(int port, string host = null)
{
	var listener = new TcpListener(GetHost(ref host), port);
	listener.Start();

	// Continue listening.  
	Log($"Server \"{Id}\" is listening on {host}:{port} ...");
	TcpClient client = null;
	while ((client = await listener.AcceptTcpClientAsync()) != null && client.Connected)
	{
		var str = $"Client {client.Client.RemoteEndPoint} {ackConnection} \"{Id}\" {host}:{port}";
		Log(str);
		isActive = true;
		var clientWrapper = new TcpClientWrapper(Delim, DelimRepeated) { Peer = client };
		await clientWrapper.SendAsync();
		await clientWrapper.SendAsync(str);
		Receive(client);
	}
}

Client also creates an object of TcpHelper type and calls its method Connect() sending a request for connection to server.

public async Task<TcpClientWrapper> Connect(int port, string host = null)
{
	var server = await GetConnectedServer(host, port);

	if (server.Connected)
	{
		Log($"Client \"{Id}\" connected to Server {host}:{port}.");
		Receive(server);
	}
	else
		LogError("Connection failed after reties.");

	return new TcpClientWrapper(Delim, DelimRepeated) { Peer = server };
}

private async Task<TcpClient> GetConnectedServer(string host, int port)
{
	var server = new TcpClient();
	for (int i = 0; i < maxConnectionAttempts && !server.Connected && !cts.IsCancellationRequested; i++)
	{
		try
		{
			await server.Client.ConnectAsync(GetHost(ref host), port);
		}
		catch
		{
			await Task.Delay(intervalBetweenConnectionAttemptsInMs, cts.Token);
		}
	}

	return server;
}

As soon as an incoming call from client arrives, TcpHelper object in the server side creates a connection, assigning it to a new socket with some port differed from one on which the server is listening. After connection has been established client and server exchange with messages. Method Receive() that is common for server and client, provides message processing.

private async void Receive(TcpClient client)
{
	if (client == null || !client.Connected)
		return;

	var lstData = new List<byte>();
	var stateProprties = new StateProperties();

	using (var netStream = client.GetStream())
	{
		var clientWrapper = new TcpClientWrapper(Delim, DelimRepeated) { Peer = client };
		var buffer = new byte[receiveBufferSize];
		int readBytes = 0;
		while ((readBytes = await ReadNetStreamAsync(netStream, buffer, cts.Token)) > 0)
		{
			lstData.AddRange(GetReceivedBuffer(buffer, readBytes));

			try
			{
				ProcessReceived(lstData, clientWrapper, stateProprties);
				SetLastInteractionTime();
			}
			catch (Exception e)
			{
				LogError("ProcessReceived() failed.", e);
			}
		}
	}
}

It is called per connection, and in its while loop reads received data. Due to async-await pattern the loop does not block a thread, thus implementing asynchronous socket.

The roles of client and server differ only in the beginning of communication. Client initiates the communication, and server accepts it. As soon as communication has been established, the peers exchanges with messages. So we can talk about message provider and message recipient, which can be client and server alike. Usually these messages contain serialized objects. In order to communicate via sequential channel, provider serializes objects to byte arrays and sends them to recipient as continuous stream of bytes. Recipient should reconstruct (deserialize) meaningful objects from these bytes. For that purpose, recipient has to know the boundaries of the serialized objects in the incoming stream of bytes. This can be achieved either using objects that can be serialized to byte arrays of fixed size, or by delimiting byte stream. In this work the latter approach is implemented. Delimiter constitutes some byte repeated several times in row in the byte stream. The byte and number of its repetitions are configurable (by default byte is a char object "pipe" '|' 0x7C and repetition is 3 times). Repetition is required to distinguish between delimitation and occasional appearance of delimiter byte in the transferred meaningful data. A related important issue with transfer of serialized data is joining the parts of the same object spread to in different received chunks.

Both delimitation and object joining problems are solved internally in TcpHelperLib implementing the following simple protocol. Provider automatically added repeated delimiter byte to sent data, and recipient parses received data accordingly. Stream of bytes is depicted in the following figure:

Image 1

Structure of received data

Method Receive() inside its while loop calls method ProcessReceived().

private async void ProcessReceived(List<byte> lstData, TcpClientWrapper clientWrapper, StateProperties stateProperties)
{
	Log($"{clientWrapper.RemoteEndPoint}  ThreadId = {Thread.CurrentThread.ManagedThreadId}");

	// Split to parts by delimiter and update lstData
	var lstParts = SplitToDelimitedParts(lstData);

	// Process parts 
	foreach (var lstByte in lstParts)
	{
		ProcessingResult result = null;
		if (lstByte != null)
		{
			var timestamp = GetTimestamp(lstByte);
			if (timestamp > DateTime.MinValue)
			{
				CheckIfRpc(lstByte, stateProprties);
				try
				{
					result = processMethod?.Invoke(timestamp, lstByte, clientWrapper, stateProprties);
				}
				catch (Exception e)
				{
					LogError("ProcessReceived(): user supplied processMethod() callback failed.", e);
				}
			}
			else
				LogError("ProcessReceived(): timestamp == DateTime.MinValue");
		}

		if (result != null)
		{
			byte[] bts = result.BytesToSendBack;
			if (bts != null && bts.Length > 0)
			{
				var task = clientWrapper.SendAsync(bts);

				if (result.IsSyncSend)
					await task;
			}
		}
	}
}

Method ProcessReceived() gets as its arguments a list of bytes List<byte> lstData containing received bytes started right after delimiter, object TcpClientWrapper clientWrapper representing communication peer and used to send a response back, and StateProperties stateProperties preserving state of the communication. Object stateProperties holds properties that may be inserted either in Receive() method (like handler for RPC in appropriate scenario described below) or in user supplied processMethod callback. Method ProcessReceived() calls method SplitToDelimitedParts() to obtained list of delimited lists of bytes - object of type List<List<byte>>. The latter method also modifies its argument List<byte> lstData to left there only "tail" of received bytes (if not empty) that will be added later as a "head" to an upcoming received chunk. Then for each of delimited lists of bytes deserialization to objects is performed. Method Task SendAsync(byte[] dataToSend) of type TcpClientWrapper used to send bytes, automatically puts timestamp (as long representation of DateTime object in ticks) at the beginning and delimiter at the and of sending data. Recipient reads this timestamp calling GetTimestamp() method inside method ProcessReceived().

Before proceeding with method ProcessReceived() let's briefly describe the main scenarios of message exchange between TCP communication peers. The most common scenarios are the following:

  • synchronous dialog when a peer sends message only in response to received one, that is sequential messages exchange,
  • asynchronous dialog when the peers send and receive messages non-sequentially. Particular case of asynchronous dialog is streaming when one side requests the other one for continuous stream of messages. This situation may arise e. g. when one side subscribes for events issued by the other.
  • RPC when partners call some methods of each other remotely.

ProcessReceived() provides support for the above scenarios. It calls method CheckIfRpc(lstByte, stateProperties) to determine whether RPC scenario takes place. In this case received bytes are deserialized to JSON string providing remote procedure name and arguments. This JSON string is converted to an instance of RemoteProcInfo type and placed to stateProperties.

User supplied processMethod handler is called by method ProcessReceived() in all the above scenarios. processMethod is free from delimiting and joining problems. It gets list of bytes argument lst corresponding to one array of useful data ready for deserialization, and in case of RPC, in addition, already prepared RemoteProcInfo object that can be retrieved from stateProperties argument.

For the RPC case processMethod extracts RemoteProcInfo object from stateProperties and performs actual procedure call. In other scenarios processMethod uses List<byte> lst argument. As its result, processMethod generates object of type ProcessingResult. The content of this output object (properties BytesToSendBack or StringToSendBack) is automatically sent back to connection peer either synchronously or asynchronously depending on value of IsSyncSend boolean property of ProcessingResult type. If return value of processMethod is null then nothing is sent back to communication peer after call of processMethod. This is used e.g. in streaming on the data recipient side when acknowledgment of receiving data is not required.

Argument TcpClientWrapper clientWrapper of processMethod acts as a proxy of communication peer and may be explicitly used for asynchronous sending data to it. To illustrate this in our sample we use it in timer's handler to implement the streaming scenario, as it will be discussed below.

Code Sample

The sample shows usage of TcpHelperLib component. It is developed with .NET Core and therefore may run in all major operating systems (it was tested under Windows 10 and Linux as it will be shown below). The sample solution consists of the four projects, namely, TcpHelperLib itself placed in folder Lib, Server and Client console applications, and auxiliary TestObjectLib.

Server

Complete code of Server is presented below:

using System;
using System.Threading;
using Newtonsoft.Json.Linq;
using TcpHelperLib;
using TestObjectLib;

namespace AsyncSocketServer
{
    class Server
    {
        const int port = 11511;

        const string JSON_CONFIG_FILE = "tcpHelperSettings.json";

        const string TIMER_NAME = "Timer";
        const string START_STREAMING = "Start Streaming";

        static MethodCaller caller = new MethodCaller();

        static void Main(string[] args)
        {
            Console.WriteLine("Async. Socket SERVER");

            caller["Foo"] =
                rpi =>
                {
                    var p = rpi.Params;
                    var jo = p[2] as JObject;
                    var test = new Test { Str = jo.V<string>("Str"), Num = jo.V<int>("Num") };
                    return new ProcessingResult
                        {
                            StringToSendBack = $"{Implementation.Foo((string)p[0], (double)p[1], test)}",
                            //IsSyncSend = true
                        };
                };
                
            var server = new TcpHelper(id: "SVR", processMethod: (dt, lst, clientWrapper, stateProprties) => 
                {
                    var rpi = stateProprties.GetRpi("Foo");
                    if (rpi != null)
                        // RPC
                        return caller.ExecuteMethod(rpi);

                    // STREAMING
                    if (lst.ToStr() == START_STREAMING)
                        // Streaming timer
                        stateProprties[TIMER_NAME] = new Timer(async _ =>
                            {
                                try
                                {
                                    await clientWrapper.SendAsync($"{DateTime.Now}");

                                    Console.WriteLine($"Streaming remote endpoint: {clientWrapper.RemoteEndPoint}");
                                }
                                catch (Exception e)
                                {
                                    (stateProprties[TIMER_NAME] as Timer)?.Dispose();
                                    stateProprties[TIMER_NAME] = null;

                                    Console.WriteLine(e);
                                }
                            },
                            null, 1000, 500);

                    return null;
                }, 
                configFilePath: JSON_CONFIG_FILE);

            server.Listen(port);

            Console.WriteLine("Press any key to quit...");
            Console.ReadKey();

            server.Stop();
        }
    }

    internal static class Implementation
    {
        public static string Foo(string s, double d, Test test)
        {
            return $"\"Foo()\": Echo from Server: {s} {d}, {test.Str} {test.Num}";
        }
    }
}

Object TcpHelper server is created with its processMethod callback supporting RPC and streaming scenarios. In case of RPC the callback gets RemoteProcInfo object from the state containing stateProperties and supplies it as an argument to ExecuteMethod() of created before MethodCaller caller object. caller contains activator for remotely called method Implementation.Foo(). If stateProperties does not contain entry for "Foo" then the streaming scenario is assumed. In this case, as soon as the server has got "Start Streaming" message it starts a timer to periodically send asyncronous message (the server's time in this case) to client. In this sample each connection sets up its own timer which is kept in stateProperties as a part of the connection state. For server, all connections have the same implementation of processMethod but its instances are different. Server starts to listen on specified URL when its method Listen() is called.

Client

This is complete code of Client:

#define INCLUDE_RPC

using System;
using System.Collections.Generic;
using System.Threading;
using TcpHelperLib;
using TestObjectLib;

namespace AsyncSocketClient
{
    class Client
    {
        static Timer timer;
        static SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);
        static bool isStreaming = false;

        const int port = 11511;
        const string START_STREAMING = "Start Streaming";

        static TcpHelper clientStream = null;
        static TcpClientWrapper tcwStream = null;

#if INCLUDE_RPC
        static TcpHelper clientRpc = null;
        static TcpClientWrapper tcwRpc = null;
#endif

        static void Main(string[] args)
        {
            Console.WriteLine("Async. Socket CLIENT");

            // Connection timer
            timer = new Timer(async _ =>
                {
                    int maxIdleTimeInSec = 10;

                    await semaphore.WaitAsync();

                    if (clientStream != null && clientStream.TimeSpanSinceLastInteraction > TimeSpan.FromSeconds(maxIdleTimeInSec) &&
                        (tcwStream == null || !tcwStream.IsConnected))
                    {
                        Console.WriteLine("Connection Timer: STREAMING");

                        isStreaming = false;
                        tcwStream = await clientStream.Connect(port);
                    }
#if INCLUDE_RPC
                    if (clientRpc != null && clientRpc.TimeSpanSinceLastInteraction > TimeSpan.FromSeconds(maxIdleTimeInSec) &&
                        (tcwRpc == null || !tcwRpc.IsConnected))
                    {
                        Console.WriteLine("Connection Timer: RPC");
                        tcwRpc = await clientRpc.Connect(port);
                    }
#endif
                    semaphore.Release();
                },
                null, 0, 1000);

            // STREAMING
            clientStream = new TcpHelper(id: "STREAMING-CLIENT", processMethod: (timestamp, lstByte, clientWrapper, dctState) =>
                {
                    CommonHandler("STREAMING", timestamp, clientStream, clientWrapper, lstByte);
                    if (!isStreaming)
                    {
                        isStreaming = true;
                        return new ProcessingResult { StringToSendBack = START_STREAMING /*, IsSyncSend = true*/ };
                    }

                    return null;
                });

#if INCLUDE_RPC
            // RPC
            clientRpc = new TcpHelper(id: "RPC-CLIENT", processMethod: (timestamp, lstByte, clientWrapper, dctState) =>
                {
                    CommonHandler("RPC", timestamp, clientRpc, clientWrapper, lstByte);
                    return new ProcessingResult
                        {
                            StringToSendBack = Proxy.ToJson("Foo", "Apricot", 15.11, new Test { Num = 5, Str = "Mango" }),
                            //IsSyncSend = true
                        };
                });
#endif
            Console.WriteLine("Press any key to quit...");
            Console.ReadKey();

            clientStream.Stop();
#if INCLUDE_RPC
            clientRpc.Stop();
#endif
        }

        static void CommonHandler(string name, DateTime timestamp, TcpHelper clientHelper, 
                                  TcpClientWrapper clientWrapper, List<byte> lstByte)
        {
            const int delayLimitinMs = 1500;
            var delay = DateTime.Now - timestamp;
            if (delay > TimeSpan.FromMilliseconds(delayLimitinMs))
                Console.WriteLine($"******** {name}:  Large Delay: {delay}, exceeds limit of {delayLimitinMs} ms");

            Console.WriteLine($"{name}: {timestamp} ** {lstByte.ToStr()}");
        }
    }
}

Client is created as object of type TcpHelper. Object clientStream provides streaming scenario, and object clientRpc stands for RPC scenario (the latter is used when INCLUDE_RPC is defined). Each client supports one connection to server and has its own implementation of processMethod callback. Client starts connection calling its method Connect(). In response to connection initiation with Connect() call server answered with a well-known acknowledgment processed with processMethod of appropriate client. clientStream once sends to server request to start streaming and then its processMethod continuously gets appropriate message. In clientRpc its processMethod generates output ProcessingResult object that will be send asynchronously (since IsSyncSend is not set to true). It sends JSON call for remote procedure execution and gets result as an object of ProcessingResult type.

TcpHelperLib component is equipped with "built-in" logging. Its output level is configurable either with constructor argument or with configuration file. In our sample, logging is output to console. Log messages of TcpHelperLib component itself in both server and client are displayed in console window with indent and starting with "TcpHelper: " prefix. Messages of Server and Client test application are output to console without indent.

Running Sample

Windows

The sample's binaries ready for running may be downloaded with Download demo link in the beginning of the article. To prepare these binaries the following command was executed from project folders of Server and Client:

dotnet publish -o publish -c Release

(for convenience this command is placed to publish.cmd command files in these folders) and copy contents of both newly appeared publish folders together in common folder (let's call it AsyncTcp_bin). JSON configuration file tcpHelperSettings.json may be also placed in this common folder. To run the Client and the Server the following command should be executed from AsyncTcp_bin folder in to different console windows:

dotnet Server.dll

and

dotnet Client.dll

Linux

To run the sample in Linux I use Linux virtual machine installed with Oracle VirtualBox. Only the server environment is required here. VirtualBox may be downloaded from its Web site and guidance for its installation may be found e.g. here. After the installation of the virtual machine we need to install a tool for files exchange between Windows and Linux. WinSCP does the job (the acronym stands for Windows Secure Copy). The application may be installed from its site. Although VirtualBox's display window can be used for inserting commands, PuTTY terminal emulator is very useful for that purpose. It may be downloaded from here. Installations of .NET Core should be carried from dedicated Microsoft site.

Content of prepared in Windows AsyncTcp_bin (or downloaded from Download demo) folder should be transferred to Linux environment. This can be carried out with WinSCP application. PuTTY application can be used to run same as in Windows dotnet Client.dll and dotnet Server.dll commands.

Test Cases

As the first test case we run one Server and one Client application. They may be run in any order since Client keeps trying to connect Server if it is not immediately available (appropriate parameters may be configured). If INCLUDE_RPC is defined then in Client we can see messages for both streaming and RPC scenarios (otherwise for streaming only). For the RPC case Server writes console messages of its TcpHelperLib component indicating thread in which processMethod callback is executed. As you will see, the threads are varying from time to time. This indicates asynchronous message receiving mechanism implemented with async-await pattern. Both Client and Server application may be stopped with any key clicked in their console windows. We can stop any of them, see appropriate exception message in the other one, and then restart the stopped application. Connection will be restored and both Client and Server will run as initially (please note that in Linux virtual machine successful restart is possible only after some time - probably previous socket is not immediately destroyed). We can start a Server with several Clients. Each client establishes its own connections with server for RPC and streaming. Similar to previous case, a restart of any application will cause automatic reconnection.

It is interesting to test the case when the message to be parsed appears in more than one received chunk. For this test we can reduce length of receiving buffer to some 29 bytes. This can be achieve with configuration and TcpHelper constructor's argument. But to simplify testing, just uncomment two lines marked with comment //1. Now if we insert a breakpoint in the line marked with comment //1* then it will be hit. Even with this tiny receive buffer we should get proper result, although we will see more calls of processMethod callback on server side.

Conclusions

Component for TCP connection developed in .NET Core for usage in different operating systems is presented in the article. It is based on async-await pattern to avoid thread blocking. The component provides infrastructure for synchronous and asynchronous message exchange as well as for remote procedure call. Test console applications for server and client are attached.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)