Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

Griffin.Networking - A Networking Library for .NET

4.96/5 (8 votes)
7 May 2012LGPL311 min read 27.3K  
A somewhat performant networking library for .NET
Disclaimer: The current framework release is a beta. It should be a reasonable stable, but don't blame me if it blows up your computer.

Introduction

Griffin.Networking is a networking library written in C# whose purpose is to:

  1. Abstract away the repetitive tasks which you have to do with vanilla .NET
  2. Create a more structured way of processing the inbound and outbound data

Those two goals should lower the time it takes to develop networking applications and also improve the performance thanks to a (hopefully) well-designed networking layer.

The framework also has Inversion Of Control support built in from start (be careful, slow containers hurt performance a lot). The IoC support is provided by a service location interface (which should not be exposed outside the framework).

The goal of this article is to describe the framework and show you have to develop an application with it. You will have a working JSON RPC implementation (server side) when done. The client side can be quite easily created afterwards. Simply create a RequestEncoder and a ResponseDecoder for the framework, everything else can be reused from the server implementation.

Background

I've built several networking applications in the past. Everything from small client applications in C# to performant socket servers in C++ utilizing IO Completion Ports.

They have worked well, but I always seemed to repeat myself when implementing my applications. The biggest problem is that it's hard to get an extendable architecture where you can inject handlers into the protocol handling. My C# WebServer (Google "C# webserver" and click on the first search result) illustrates this well. It's not easy to follow the communication flow.

I did therefore decide to try create a networking library which is easy to use and extend. During my research, I stumbled upon Netty for Java which my library is heavily inspired by (the architecture, the code is all mine).

Architecture

The purpose of this section is to give you a brief overview of the architecture and the terms which will be used throughout the article. Some things in this section may not make sense until you have read the entire article.

Channel

The channel is the IO layer. In most cases, it's a socket implementation, but could be anything used for communication. The default socket implementation uses the classical Begin/End type of methods. They will probably be replaced by the new Async methods later on.

There are two types of channels - Server channels whose responsibility is to accept new connections and build the correct client channel (and its pipeline). The client channels are responsible for sending and receiving information from the remote peer.

C#
public interface IChannel
{
    void HandleDownstream(IPipelineMessage message);
}

As you can see, the channel interface is quite small. The reason for this is that the entire framework is asynchronous. All communication is made by messages.

The contract of a channel says that is should only be able to receive and process messages. A message (read more below) can for instance be Connect, SendBuffer or Close. All channel implementations take a pipeline (see below) in the constructor and use it to send messages to your application.

Pipeline

The pipeline is the most central part of the library. All the action happens in the pipeline. It's in the pipeline that you authorize the users, transform the incoming byte[] array into something more usable like a HttpRequest, etc.

Image 1

The pipeline has two directions (compare with a road with two lanes). The lane from the channel to the application is called upstream, since the message travels up from the channel to your application. The other direction is called downstream since the message travels down to the channel.

A pipeline can contain an arbitrary number of handlers, and each direction have its unique set of handlers. A HTTP streaming server might only contain the HttpHeaderDecoder in the upstream and a HttpFileStreamer in the downstream to gain performance, while a complete HttpServer would include session management, authentication, logging, error handler, etc. as upstream handlers.

Image 2

C#
public interface IPipeline
{
  /// <summary>
  /// Send something from the channel to all handlers.
  /// </summary>
  /// <param name="message">Message to send to the client</param>
  void SendUpstream(IPipelineMessage message);
 
  /// <summary>
  /// Set down stream end point
  /// </summary>
  /// <param name="channel">channel which will handle all down stream messages</param>
  void SetChannel(IChannel channel);
 
  /// <summary>
  /// Send a message from the client and downwards.
  /// </summary>
  /// <param name="message">Message to send to the channel</param>
  void SendDownstream(IPipelineMessage message);
} 

The architecture allows you to have full control over how the incoming and outgoing data is processed before it arrives in your application or in the channel.

Messages

As mentioned in the previous section, the pipeline are used to send message to/from your application. These messages are small classes which contains the information to process. A message can be compared with the EventArg classes in the .NET event mechanism. POCO classes which implements the IPipelineMessage interface. Messages that require actions should be named as verbs (Send) while event messages should be named past tense (Received).

The general guideline is that each message may only contain one type of information. You may not have a message called Received with an object property which is a byte[] in the beginning and a SuperDeluxeObject in the end. Rather create a new message named ReceivedSuperDeluxe which contains the SuperDeluxeObject object. It makes the processing cleaner and easier to follow.

Example message:

C#
public class Connect : IPipelineMessage
{
    private readonly EndPoint _remoteEndPoint;
    public Connect(EndPoint remoteEndPoint)
    {
        if (remoteEndPoint == null)
            throw new ArgumentNullException("remoteEndPoint");
        _remoteEndPoint = remoteEndPoint;
    }
    public EndPoint RemoteEndPoint
    {
        get { return _remoteEndPoint; }
    }
} 

Pipeline Handlers

Pipeline handlers are used to process the messages which are sent through the pipeline. They can either be singletons (shared among channels) or be created per channel. Handlers that are constructed together with the pipeline can store state information since they are used by one channel only.

Example upstream handler which traces the received information:

C#
public class BufferTracer : IUpstreamHandler
{
    private readonly ILogger _logger = LogManager.GetLogger<BufferTracer>();
    public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
    {
        var msg = message as Received;
        if (msg != null)
        {
            var str = Encoding.UTF8.GetString
            (msg.BufferSlice.Buffer, msg.BufferSlice.Position, msg.BufferSlice.RemainingLength);
            _logger.Trace(str);
        }
        context.SendUpstream(message);
    }
}

Notice how it sends all messages to the next handler using context.SendUpstream(message). This is quite important. Each handler gets to decide whether the message should be propagated up the call stack or not. It's also how messages are transformed into something more usable.

Let's look at the HTTP HeaderDecoder handler:

C#
public class HeaderDecoder : IUpstreamHandler
{
    private readonly IHttpParser _parser;
    private int _bodyBytesLeft = 0;
    public HeaderDecoder(IHttpParser parser)
    {
        if (parser == null) throw new ArgumentNullException("parser");
        _parser = parser;
    }
    public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
    {
        if (message is Closed)
        {
            _bodyBytesLeft = 0;
            _parser.Reset();
        }
        else if (message is Received)
        {
            var msg = (Received) message;
            // complete the body
            if (_bodyBytesLeft > 0)
            {
                _bodyBytesLeft -= msg.BufferSlice.Count;
                context.SendUpstream(message);
                return;
            }
            var httpMsg = _parser.Parse(msg.BufferSlice);
            if (httpMsg != null)
            {
                var recivedHttpMsg = new ReceivedHttpRequest((IRequest) httpMsg);
                _bodyBytesLeft = recivedHttpMsg.HttpRequest.ContentLength;
                _parser.Reset();
                // send up the message to let someone else handle the body
                context.SendUpstream(recivedHttpMsg);
                msg.BytesHandled = msg.BufferSlice.Count;
                context.SendUpstream(msg);
            }
            return;
        }
        context.SendUpstream(message);
    }
} 

Two things are important here:

It follows Single Responsibility Principle

It doesn't actually parse the HTTP message but uses an external parser for that. It's easy to follow what the handler does since it does not violate Single Responsibility Principle, and we can at any time switch parser if we find a more performant one.

It transforms the Received message into a ReceivedHttpRequest

All messages should be considered to be immutable. Don't change their contents unless you have a really good reason to. Don't propagate the original package upstream, but create a new message instead (if you have processed the message).

Switching Sides

A pipeline handler can at any time switch from the downstream to the upstream (or vice versa). Switching sides will always invoke the first handler in the other side. This allows us to streamline the process and to avoid confusion.

Image 3

C#
public class AuthenticationHandler : IUpstreamHandler
{
  private readonly IAuthenticator _authenticator;
  private readonly IPrincipalFactory _principalFactory;
 
  public AuthenticationHandler(IAuthenticator authenticator, IPrincipalFactory principalFactory)
  {
    _authenticator = authenticator;
    _principalFactory = principalFactory;
  }
 
  public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
  {
    var msg = message as ReceivedHttpRequest;
    if (msg == null)
    {
      context.SendUpstream(message);
      return;
    }
 
    var authHeader = msg.HttpRequest.Headers["Authorization"];
    if (authHeader == null)
    {
      context.SendUpstream(message);
      return;
    }
 
    var user = _authenticator.Authenticate(msg.HttpRequest);
    if (user == null)
    {
      //Not authenticated, send error downstream and abort handling
      var response = msg.HttpRequest.CreateResponse(HttpStatusCode.Unauthorized, 
                                                    "Invalid username or password.");
      context.SendDownstream(new SendHttpResponse(msg.HttpRequest, response));
    }
    else
    {
      var principal =
        _principalFactory.Create(new PrincipalFactoryContext {Request = msg.HttpRequest, User = user});
      Thread.CurrentPrincipal = principal;
    }
  }
} 

Pipeline Factories

A pipeline (and all of its handlers) need to be constructed each time a new channel is created. There are two built in factories in the framework.

One that uses an interface called IServiceLocator which allows you to add support for your favorite IoC container. And one that uses delegates to created stateful handlers.

C#
var factory = new DelegatePipelineFactory();
factory.AddDownstreamHandler(() => new ResponseEncoder());
factory.AddUpstreamHandler(() => new HeaderDecoder(new HttpParser()));
factory.AddUpstreamHandler(new HttpErrorHandler(new SimpleErrorFormatter())); //singleton
factory.AddUpstreamHandler(() => new BodyDecoder(new CompositeBodyDecoder(), 65535, 6000000));
factory.AddUpstreamHandler(() => new FileHandler());
factory.AddUpstreamHandler(() => new MessageHandler());
factory.AddUpstreamHandler(new PipelineFailureHandler());                     //singleton

Buffers

A fundamental part of a performant networking library is how the data is handled. All larger allocations hurt performance. We don't want to create a new byte[65535] each time we read or send a new packet. It takes time to do the allocation, the garbage collector has to work more and the memory will end up fragmented.

The framework solves this by using buffer pools and a class called BufferSlice. We can allocate a buffer which is 5MB large and slice it into smaller pieces which we use in the processing. We can either make the buffer pool a singleton or let each handler allocate its own buffer pool (it's still just five allocations instead of 5000 if you have five handlers).

The BufferSlice class returns its buffer to the pool when it's disposed. It's therefore important that all messages that uses the BufferSlice class implement IDisposable, since the channel will dispose all messages when it's done with them.

Performance

The framework is still quite new (abut one month =)). The performance is not at its peak yet.

However, I've used Apache's ab tool to throw 5000 requests at the HTTP listener. The framework handled about 280 HTTP requests per second (localhost) which I consider to be OK this early in the project. The memory consumption was about 80MB (working set). (Note that the numbers don't really say anything unless they are compared with other frameworks.) Feel free to help improve the performance or do your own benchmarks. I would love to get a sample application which I can use for performance tuning (and compare the performance with other frameworks).

Please do not compare the HttpListener with the one in .NET, since the .NET version uses http.sys which runs in kernel mode. No pure .NET solution will ever get performance close to it.

Building a JSON RPC Server

It's time to start building a JSON RPC server. Create a new console application name something like JsonRpcServer. Start the nuget package console and run install-package griffin.networking to install the framework.

The specification for JSON RPC can be found at the official website. This article will not help you understand it, but only show how you can implement it. The specification do not say anything about how the messages are transferred and we'll therefore create a simple header which will be used to wrap the messages. The header is a simple binary header with a version (byte) and a length (int) field.

Decoding / Encoding

The first thing we need to do is to process the incoming bytes. We have to decode them into something that we can work with. As mentioned, we'll use a simple envelope. Something like:

C#
public class SimpleHeader
{
  public int Length { get; set; }
  public byte Version { get; set; }
} 

But to be able to use that class, we need to decode the incoming bytes in some way. So let's create our first pipeline handler which we'll use for just that:

C#
public class HeaderDecoder : IUpstreamHandler
{
  public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
  {
    var msg = message as Received;
    if (msg == null)
    {
      context.SendUpstream(message);
      return;
    }
 
    // byte + int
    if (msg.BufferSlice.RemainingLength < 5)
    {
      return;
    }
 
    var header = new SimpleHeader
             {
               Version = msg.BufferSlice.Buffer[msg.BufferSlice.Position++],
               Length = BitConverter.ToInt32(msg.BufferSlice.Buffer, msg.BufferSlice.Position)
             };
    msg.BufferSlice.Position += 4;
    context.SendUpstream(new ReceivedHeader(header));
 
    if (msg.BufferSlice.RemainingLength > 0)
      context.SendUpstream(msg);
  }
}} 

Pretty straightforward. We don't process anything until we got at least five bytes (the channel will continue to fill the buffer at the end until we handle something). Then we just decode the header, send a RecievedHeader message and pass on the remaining bytes. Notice that I use the version byte first. By doing so, we can change the header as much as we like in future versions without screwing everything up.

The header doesn't say anything more than the size of the actual JSON message. So we need something to process the JSON to. Let's create another upstream handler for that (and therefore complying with the Single Responsibility Principle). And will be called... BodyDecoder Wink | <img src= " /> (I've cheated and created the Request/Response/Error objects which the JSON RPC specification describes.)

C#
public class BodyDecoder : IUpstreamHandler
{
  private static readonly BufferPool _bufferPool = new BufferPool(65535, 50, 50);
  private readonly BufferPoolStream _stream;
  private SimpleHeader _header;
 
  public BodyDecoder()
  {
    var slice = _bufferPool.PopSlice();
    _stream = new BufferPoolStream(_bufferPool, slice);
  }
 
  public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
  {
    var headerMsg = message as ReceivedHeader;
    if (headerMsg != null)
    {
      _header = headerMsg.Header;
      if (_header.Length > 65535)
      {
        var error = new ErrorResponse("-9999", new RpcError
                               {
                                 Code = RpcErrorCode.InvalidRequest,
                                 Message =
                                   "Support requests which is at most 655355 bytes.",
                               });
        context.SendDownstream(new SendResponse(error));
      }
 
      return;
    }
 
    var received = message as Received;
    if (received != null)
    {
      var count = Math.Min(received.BufferSlice.RemainingLength, _header.Length);
      _stream.Write(received.BufferSlice.Buffer, received.BufferSlice.Position, count);
      received.BufferSlice.Position += count;
 
      if (_stream.Length == _header.Length)
      {
        _stream.Position = 0;
        var request = DeserializeRequest(_stream);
        context.SendUpstream(new ReceivedRequest(request));
      }
 
      return;
    }
 
    context.SendUpstream(message);
  }
 
  protected virtual Request DeserializeRequest(BufferPoolStream body)
  {
    var reader = new StreamReader(body);
    var json = reader.ReadToEnd();
    return JsonConvert.DeserializeObject<Request>(json);
  }
} 

Here, we are using the BufferPool instead of creating a new buffer each time. Hence, quite a large performance gain and a lot less fragmented memory if the server runs for a while. Also notice that the framework has a BufferPoolStream which uses the BufferPool to get byte[] buffers. Future versions of the stream will most likely be able to use several buffers behind the scenes (and therefore be able to handle larger amount of data without creating too large buffers).

Before we continue with the actual application, let's add the only downstream handler. The response encoder.

C#
public class ResponseEncoder : IDownstreamHandler
{
  private static readonly BufferPool _bufferPool = new BufferPool(65535, 50, 100);
 
  public void HandleDownstream(IPipelineHandlerContext context, IPipelineMessage message)
  {
    var msg =  message as SendResponse;
    if (msg == null)
    {
      context.SendDownstream(message);
      return;
    }
 
    var result = JsonConvert.SerializeObject(msg.Response, Formatting.None);
 
    // send header
    var header = new byte[5];
    header[0] = 1;
    var lengthBuffer = BitConverter.GetBytes(result.Length);
    Buffer.BlockCopy(lengthBuffer, 0, header, 1, lengthBuffer.Length);
    context.SendDownstream(new SendBuffer(header, 0, 5));
 
    // send JSON
    var slice = _bufferPool.PopSlice();
    Encoding.UTF8.GetBytes(result, 0, result.Length, slice.Buffer, slice.StartOffset);
    slice.Position = slice.StartOffset;
    slice.Count = result.Length;
    context.SendDownstream(new SendSlice(slice));
  }
}

Now, we only got one thing left to do in the pipeline. And that's to handle the requests. Let's start by creating a very simple handler:

C#
class MyApplication : IUpstreamHandler
{
  public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
  {
    var msg = message as ReceivedRequest;
    if (msg == null)
      return;
 
    var parray = msg.Request.Parameters as object[];
    if (parray == null)
      return; // muhahaha, violating the API specification
 
    object result;
    switch (msg.Request.Method)
    {
      case "add":
        result = int.Parse(parray[0].ToString()) + int.Parse(parray[0].ToString());
        break;
      case "subtract":
        result = int.Parse(parray[0].ToString()) + int.Parse(parray[0].ToString());
        break;
      default:
        result = "Nothing useful.";
        break;
    }
 
    var response = new Response(msg.Request.Id, result);
    context.SendDownstream(new SendResponse(response));
  }
} 

How do we run the application then? We need to create a server channel and define the client pipeline. I usually do it in a class called XxxxListener to follow the .NET standard. So let's create a JsonRpcListener.

C#
public class JsonRpcListener : IUpstreamHandler, IDownstreamHandler
{
  private TcpServerChannel _serverChannel;
  private Pipeline _pipeline;
 
  public JsonRpcListener(IPipelineFactory clientFactory)
  {
    _pipeline = new Pipeline();
    _pipeline.AddDownstreamHandler(this);
    _pipeline.AddUpstreamHandler(this);
    _serverChannel = new TcpServerChannel(_pipeline, clientFactory, 2000);
 
  }
 
  public void Start(IPEndPoint endPoint)
  {
    _pipeline.SendDownstream(new BindSocket(endPoint));
  }
 
  public void Stop()
  {
    _pipeline.SendDownstream(new Close());
  }
 
  public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
  {
    var msg = message as PipelineFailure;
    if (msg != null)
      throw new TargetInvocationException("Pipeline failed", msg.Exception);
  }
 
  public void HandleDownstream(IPipelineHandlerContext context, IPipelineMessage message)
  {
    context.SendDownstream(message);
  }
}

So now, we can define the client pipeline in Program.cs and inject it in the RpcListener:

C#
class Program
{
  static void Main(string[] args)
  {
    LogManager.Assign(new SimpleLogManager<ConsoleLogger>());
 
    var factory = new DelegatePipelineFactory();
    factory.AddUpstreamHandler(() => new HeaderDecoder());
    factory.AddUpstreamHandler(() => new BodyDecoder());
    factory.AddUpstreamHandler(new MyApplication());
    factory.AddDownstreamHandler(new ResponseEncoder());
 
    JsonRpcListener listener = new JsonRpcListener(factory);
    listener.Start(new IPEndPoint(IPAddress.Any, 3322));
 
    Console.ReadLine();
  }
} 

The first two upstream handlers are stateful, so we need to create those for every channel which is generated. That's why we use a delegate. The last two are not stateful and can therefore be singletons.

That's it! You now got a working JSON RPC server. Sure. It's pretty basic, but the actual remoting layer doesn't have much to do with the networking layer.

I did however take some time to create a proof of concept RPC system. Let's define our RPC service first:

C#
public class MathModule
{
  [OperationContract]
  public int Sum(int x, int y)
  {
    return x + y;
  }
}

Then, we need to redefine the client pipeline:

C#
var invoker = new RpcServiceInvoker(new DotNetValueConverter(), new SimpleServiceLocator());
invoker.Map<MathModule>();
factory.AddUpstreamHandler(() => new HeaderDecoder());
factory.AddUpstreamHandler(() => new BodyDecoder());
factory.AddUpstreamHandler(new RequestHandler(invoker));
factory.AddDownstreamHandler(new ResponseEncoder());

That's it! From here, we could go and include the Http protocol implementation and switch out our simple header against the HeaderDecoder in the HTTP implementation and therefore get an implementation which works over HTTP instead of our basic binary header. We have to make a few minor changes to achieve that, keeping most of the Json RPC implementation intact.

Summary

I hope that I've managed to demonstrate how to develop networking applications with Griffin.Networking and show the power that it gives you compared to vanilla .NET socket servers.

The code is available as a nuget package griffin.networking and the HTTP implementation is (soon) available as griffin.networking.http. The JSON RPC implementation is still just a concept and therefore not added as a release yet. Feel free to participate to complete it.

All code is also available at github. Please report any bugs & feature requests at github.

License

This article, along with any associated source code and files, is licensed under The GNU Lesser General Public License (LGPLv3)