Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

How to Implement and Use Awaitable Sockets in C# using TAP

5.00/5 (3 votes)
23 Jul 2020MIT12 min read 23.8K   436  
Explore adapting Socket's async model to a task based one and adding some awaitable socket operations to your projects
Create an awaitable socket by adapting the EAP programming model used by Socket to the newer awaitable Task-based Asynchronous Pattern (TAP) programming model. The demo then demonstrates using this model to serve a simple HTTP request entirely asynchronously.

wwwserv

Introduction

This article covers providing a TAP based awaitable Socket so that we can do asynchronous socket operations using the built in async/await facilities in C#. Furthermore, it demonstrates using the awaitable features to serve HTTP content. The HTTP server is very minimalistic, and only servers a "Hello World!" page, but it can easily be built up to do more, if for some reason you want to tinker.

For most situations, I recommend using ASP.NET or even http.sys to serve HTTP content. ASP.NET is simply more secure, more scalable, and thoroughly tested and documented. This is hardly any of those things, as it's a demo project. Even if you really wanted to roll your own, you might consider using the HttpListener class instead. This does HTTP just to give the socket something to send and receive. It's not really an article on how to build a webserver.

The goal here is to teach some techniques using the TAP model. This article expects that you've used async/await, Task and Socket before.

Conceptualizing this Mess

Implementing the TAP pattern allows us to expose awaitable methods in a consistent way. Letting it guide you ensures that your code will be consumable by a wide audience of .NET developers because of the familiarity of your API in relation to the ones that Microsoft provides, for example. This way of exposing awaitable methods allows for easier learning curves and more robust code. We'll be doing that here.

Furthermore, being able to adapt other asynchronous models like EAP or a similar event based model (such as the one used by Socket) to TAP will make your code more flexible and more modern. We'll be covering that here, with this article as reference and for background. Some of the source is from Stephen Toub's code here, but I'll explain enough of it for us mortals to understand.

From APM to TAP

What we're going to have to do is wrap the Socket's asynchronous methods with our own that expose those members using the TAP model and awaitable methods. It should be noted that the Socket exposes multiple asynchronous patterns - both the APM model with BeginXXXX()/EndXXXX() calls as well as its own event based model similar to EAP. We're concerned about the latter event based model because it's more efficient under the hood, but we can't always use it in every situation so we'll wrap the former as well.

The main thing we do when we wrap APM method with a TAP method, is we need to create something called a task completion source which we then configure based on the results of the asynchronous operation. For an old style APM model with BeginXXXX()/EndXXXX() methods, the wrapping is straightforward - from Stephen Toubb's example code:

C#
public static Task<int> ReceiveAsync(
    this Socket socket, byte[] buffer, int offset, int size,
    SocketFlags socketFlags)
{
    var tcs = new TaskCompletionSource<int>();
    socket.BeginReceive(buffer, offset, size, socketFlags, iar =>
    { 
        try { tcs.TrySetResult(socket.EndReceive(iar)); }
        catch (Exception exc) { tcs.TrySetException(exc); }
    }, null);
    return tcs.Task;
}

What we're doing here is creating a new task completion source, beginning the asynchronous operation, and then hooking the callback of it to call EndReceive() and then we configure the task completion source depending on the result. Finally, we return the task completion source's associated task. Get used to this, as we'll be doing something very similar repeatedly. One thing we could have done to make this more efficient is pass the tcs variable as the async state for the operation rather than relying on anonymous method variable hoisting to pass the tcs value through to the anonymous method:

C#
public static Task<int> ReceiveAsync(
    this Socket socket, byte[] buffer, int offset, int size,
    SocketFlags socketFlags)
{
    var tcs = new TaskCompletionSource<int>(socket);
    socket.BeginReceive(buffer, offset, size, socketFlags, iar =>
    {
        var t = (TaskCompletionSource<int>)iar.AsyncState;
        var s = (Socket)t.Task.AsyncState;
        try { t.TrySetResult(s.EndReceive(iar)); }
        catch (Exception exc) { t.TrySetException(exc); }
    }, tcs);
    return tcs.Task;
}

The only difference here is we're not relying on the compiler to hoist our tcs value and make it available to the anonymous method, so we don't have to deal with that extra inefficiency. Instead, we pass it as part of the APM's AsyncState. According to Stephen Toub, this avoids an extra allocation. Otherwise, the behavior is identical.

From the Socket's Event Model to TAP

We've covered the basics of wrapping APM above, but we want to wrap the Socket's event based model as well since it is more efficient, so that we can use that. This is quite a bit more involved, as first we have to create a special awaitable class that can be used with the await keyword. If there's documentation for this, I'm not sure where to find it, but basically it needs to implement System.Runtime.CompilerServices.INotifyCompletion plus a couple of other methods; GetAwaiter() and GetResult(). Without this fluff, you couldn't use await on it. The GetAwaiter() method in Stephen Toub's implementation returns its own instance. Normally, you'd have a separate awaiter class or struct. The bottom line is, if you ever implement an awaitable class yourself, it will need this method, and it should an awaiter class. The other one is GetResult() which checks the result, and then returns the value for the await. In our case, we return no value, so it's null. The methods here aren't part of any interface. The compiler simply expects them.** We're going to cover the awaitable class more in the next section. There's some documentation about this here and here.

The whole reason for all of this is because Socket's event based asynchronous methods allow you to reuse its event arguments instead of having to do more allocations of the arguments each time you do a send or receive. That may not seem like a big deal but socket operations can happen with incredible frequency and we want to keep our garbage collector from stopping up our sending or receiving periodically to clean up all of that extra stuff. This is why Socket exposes its EAP-like event model beside the APM model, and why it's more efficient. We just needed a way to wrap this with the TAP awaitable task model, and that's what the SocketAwaitable class is for. We can reuse the SocketAwaitable class in successive reads and writes to reuse our socket's event arguments underneath the covers, and thus use the event based asynchronous model Socket uses the way it was designed to be used.

** The compiler generates code when it finds an await keyword and that code uses the specified methods. There is no binary interface for them that it uses, so this is compiler specific, not framework/runtime specific.

Using the Asynchronous Socket Operations

We'll be using both methods of communicating with sockets in our demo app, which exposes a remedial HTTP server that emits some static content. In the demo, we expose more extension methods for doing asynchronous HTTP based operations and we use them to create our webserver. The HTTP operations themselves use our wrappers above to efficiently and asynchronously process requests. Finally the main program simply whips up a listening socket and begins the request/response cycle, serving content until a key is pressed, at which point it exits. The HTTP based Socket extension methods are more extensive than what the demo needs to operate, but I decided to provide them in case you want to hammer a tiny homebrew webserver out of it. The meat of this is in HttpSocketUtility, which relies on HttpRequest to return the request information. There is no corresponding HttpResponse object simply because I didn't need to implement it yet. It wasn't necessary for the demo, and I've already added much more than the demo needs to function.

Our HTTP request processing is entirely asynchronous, like the rest of the server. It uses our awaitable socket extensions and the SocketAwaiter to perform its work, thereby doing it in the most efficient way possible, in terms of asynchronicity. Doing everything asynchronously means we aren't tying up as many actual system threads to wait and perform operations. We are doing everything from the main thread, using asynchronous I/O completion ports (at least in Windows, other platforms vary) so the kernel is basically dealing us driver callbacks and we're processing in those callbacks instead of on a thread. That's serious code for serious servers, and we can efficiently fulfill multiple requests on a single thread this way!

It's hard to really go into the concepts further without exploring the code, so let's do that now.

Coding this Mess

The SocketAwaitable Class

It is possible to call await on instances of this class such that if you return this class from a function that function is awaitable. We use this to wrap our asynchronous programming into the TAP model. This class has three primary responsibilities; holding a SocketEventArgs instance for reuse, tracking our continuation, and providing the requisite members necessary to make await work. That's all it does. Let's take a look:

C#
/// <summary>
/// An awaiter for asynchronous socket operations
/// </summary>
// adapted from Stephen Toub's code at
// https://blogs.msdn.microsoft.com/pfxteam/2011/12/15/awaiting-socket-operations/
public sealed class SocketAwaitable : INotifyCompletion
{
    // placeholder for when we don't have an actual continuation. does nothing
    readonly static Action _sentinel = () => { };
    // the continuation to use
    Action _continuation;

    /// <summary>
    /// Creates a new instance of the class for the specified <paramref name="eventArgs"/>
    /// </summary>
    /// <param name="eventArgs">The socket event args to use</param>
    public SocketAwaitable(SocketAsyncEventArgs eventArgs)
    {
        if (null == eventArgs) throw new ArgumentNullException("eventArgs");
        EventArgs = eventArgs;
        eventArgs.Completed += delegate
        {
            var prev = _continuation ?? Interlocked.CompareExchange(
                ref _continuation, _sentinel, null);
            if (prev != null) prev();
        };
    }
    /// <summary>
    /// Indicates the event args used by the awaiter
    /// </summary>
    public SocketAsyncEventArgs EventArgs { get; internal set; }
    /// <summary>
    /// Indicates whether or not the operation is completed
    /// </summary>
    public bool IsCompleted { get; internal set; }

    internal void Reset()
    {
        _continuation = null;
    }
    /// <summary>
    /// This method supports the async/await framework
    /// </summary>
    /// <returns>Itself</returns>
    public SocketAwaitable GetAwaiter() { return this; }
        
    // for INotifyCompletion
    void INotifyCompletion.OnCompleted(Action continuation)
    {
        if (_continuation == _sentinel ||
            Interlocked.CompareExchange(
                ref _continuation, continuation, null) == _sentinel)
        {
            Task.Run(continuation);
        }
    }
    /// <summary>
    /// Checks the result of the socket operation, throwing if unsuccessful
    /// </summary>
    /// <remarks>This is used by the async/await framework</remarks>
    public void GetResult()
    {
        if (EventArgs.SocketError != SocketError.Success)
            throw new SocketException((int)EventArgs.SocketError);
    }
}

There's some shifty code in the constructor that I should explain. Since the Socket exposes its asynchronous operations using events to signal completion, we must hook the appropriate completion delegate. Inside that is some scary looking code. Interlocked.CompareExchange() isn't pretty to call in the first place. Basically, what we're doing is we're checking to see if _continuation is null and if it is we use a thread safe operation to set the value to the _sentinel, which is just an empty method. Either way, we store the old or current value in prev and then invoke it. At the end of the day, this is both a barrier to prevent _continuation from being null, and to invoke _continuation if it's not null. This is to make sure a continuation fires whenever an asynchronous socket operation is Completed.

The other non-trivial method is INotifyCompletion.OnCompleted() which gets called when you await this class and the operation completes. Here we short circuit if the _continuation and _sentinel match, and Run() the continuation provided). Otherwise, we compare the current _continuation with null and if it is, we replace it with the new continuation and then exit, not running the task. This is a very efficient way to handle some convoluted logic without using intrinsic if statements but I'm not sure I would have written it this way because it's difficult to read. If nothing else, maybe I should write the less efficient but clearer counterpart and put it in the comments. I simply haven't done so since I'm already explaining it here. As it is this the above strangeness is Stephen Toub's code, and I didn't want to potentially introduce bugs by changing it, even if it might make it clearer.

Now we need to use this class.

The ReceiveAsync() and SendAsync() Wrapper Methods

We'll only be covering two of them here, since only two of them use SocketAwaitable. The others use the APM paradigm which I outlined prior. These ones use the event model to do asynchronous things:

C#
/// <summary>
/// Receive data using the specified awaitable class
/// </summary>
/// <param name="socket">The socket</param>
/// <param name="awaitable">An instance of <see cref="SocketAwaitable"/></param>
/// <returns><paramref name="awaitable"/></returns>
public static SocketAwaitable ReceiveAsync(this Socket socket,
SocketAwaitable awaitable)
{
    awaitable.Reset();
    if (!socket.ReceiveAsync(awaitable.EventArgs))
        awaitable.IsCompleted = true;
    return awaitable;
}
/// <summary>
/// Sends data using the specified awaitable class
/// </summary>
/// <param name="socket">The socket</param>
/// <param name="awaitable">An instance of <see cref="SocketAwaitable"/></param>
/// <returns><paramref name="awaitable"/></returns>
public static SocketAwaitable SendAsync(this Socket socket,
    SocketAwaitable awaitable)
{
    awaitable.Reset();
    if (!socket.SendAsync(awaitable.EventArgs))
        awaitable.IsCompleted = true;
    return awaitable;
}

That's nice, the documentation is longer than the code. Here, we clear the continuation from above with Reset() which sets it to null. We have to do this before we begin an asynchronous socket operation. Then we are basically translating the call, by setting the IsCompleted flag if necessary and then returning our SocketAwaitable to be awaited on. The reason we accept one as well is so we can recycle them for multiple calls, which allows us to make these calls without allocating objects on the managed heap.

Using them goes something like this:

C#
var recv = new byte[1024];
var args = new SocketAsyncEventArgs();
args.SetBuffer(recv, 0, recv.Length);
var saw = new SocketAwaitable(args);
await socket.ReceiveAsync(saw);
var bytesRead = args.BytesTransferred;
if (0 != bytesRead)
{
    var reqheaders = new StringBuilder();
    var s = Encoding.ASCII.GetString(recv, 0, bytesRead);
    reqheaders.Append(s);
    var i = reqheaders.ToString().IndexOf("\r\n\r\n");
    while (0 > i && 0 != bytesRead)
    {
        await socket.ReceiveAsync(saw);
        bytesRead = args.BytesTransferred;
        if (0 != bytesRead)
        {
            s = Encoding.ASCII.GetString(recv, 0, bytesRead);
            reqheaders.Append(s);
            i = reqheaders.ToString().IndexOf("\r\n\r\n");
        }
    }
    if (0 > i)
        throw new Exception("Bad Request");
}

This is a slight variant from some actual code we're using in HttpSocketUtility.ReceiveHttpRequestAsync(). I've just snipped some. What we're doing is creating a byte[] buffer, some SocketEventArgs, and finally, a SocketAwaitable. This is the code to read our HTTP request headers, and what we do is we asynchronously read 1024 bytes at a time until we encounter two carriage routines ("\r\n\r\n") in a row signifying the end of the headers. Each time we get more data, we append it to reqheaders. As you can see, we're passing saw each time to the ReadAsync() call, and then checking args.BytesTransferred to see what we read. It works because we set up saw to use recv and args, so now we can use them to see the results. SendAsync() works the same way, except you should populate your args first with the data you want sent. In practice, I don't find as much use for this overload of SendAsync() except under one scenario where I'm reading from something like a Stream and then sending that. SendFileAsync() or one of the other SendAsync() overloads is typically what I'll use.

The Demo Webserver

Now let's look at the web server code so we can see the rest of it in action:

C#
// make Main() awaitable so we can use async/await in it
static async Task Main()
{
    // create a socket
    using (var socket = new Socket(SocketType.Stream, ProtocolType.Tcp))
    {
        // bind to localhost:8080
        socket.Bind(new IPEndPoint(IPAddress.Loopback, 8080));
        socket.Listen(16); // listen
        // begin our accept task
        var s = await socket.AcceptTaskAsync();
        // when we're done accepting, process the request
        await _ProcessRequest(socket, s);
    }
}
static async Task _ProcessRequest(Socket socket,Socket s)
{
    // we need to execute this part concurrently
    var t = Task.Run(async () =>
    {
        // spawn another waiter
        var sss = await socket.AcceptTaskAsync();
        await _ProcessRequest(socket, sss);
    });
    // read the incoming HTTP data
    var req = await s.ReceiveHttpRequestAsync();
    // report it
    Console.Write("Got Request on thread: " + 
                   Thread.CurrentThread.ManagedThreadId.ToString("X")+", ");
    Console.WriteLine(req.Method + " " + req.Url);

    // our html to send
    var html = "<html><head><title>Hello</title></head><body><h1>Hello World!</h1></body>";
    // our headers
    var headers = "HTTP/1.1 200 OK\nDate: "
        + DateTime.Now.ToUniversalTime().ToString("r")
        + "\nContent-Type: text/html\nContent-Length: "
        + html.Length.ToString()
        + "\nConnection: Closed\n";
    // send them asynchronously
    await s.SendAsync(headers + "\n" + html, Encoding.ASCII);
    // disconnect (no keep-alive in demo)
    await s.DisconnectAsync(false);
    s.Close();

    // finally wait for our accepting task if it's still running
    if(!t.IsCompleted && !t.IsFaulted && !t.IsCanceled)
        await t;
}

There's really not a lot to it. Basically, we spin a loop of sorts, but it's not really a loop. We call _ProcessRequest() which in turn calls _ProcessRequest() after accepting again to keep it going. After that, inside _ProcessRequest() is where the meat of our serving happens. It creates that concurrent task to accept on the socket again, and then it serves the request it just got and disconnects. Finally, it waits on that concurrent awaiting task we spun at the top of the method. Right now, the headers are nearly static and the content itself is static, but in the real world it would probably be dynamically generated or served from a file, and served based on contents of req.Url, req.QueryString, and req.Body. Currently, it doesn't support cookies. I didn't want to go more overboard than I had here, since again this project isn't even about serving HTTP. That's just the medium I'm using to demonstrate asynchronous socket operation.

Points of Interest

Diving into the Task framework is not for the faint of heart. There's a lot there and it's easy to get lost looking at everything, much less put it all together into something cohesive. I originally tried this approach above about a year ago I think, and I didn't know enough about it to get it working, much less using async/await as much in the code as I have now. This thing doesn't really block anymore except when waiting for an incoming socket connection. I recently posted an article shedding some light on one of the darker corners of the task framework and I plan to produce more. There is a lot of interesting territory to explore here.

History

  • 23rd June, 2020 - Initial submission

License

This article, along with any associated source code and files, is licensed under The MIT License