Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

A reusable, high performance, socket server class - Part 2

0.00/5 (No votes)
17 Aug 2002 3  
To maintain performance a socket server shouldn't make blocking calls from its IO threads. This article builds on the previous one to add a business logic thread pool to our example server.

The following source was built using Visual Studio 6.0 SP5 and Visual Studio .NET. You need to have a version of the Microsoft Platform SDK installed

Note that the debug builds of the code waste a lot of CPU cycles due to the the debug trace output. It's only worth profiling the release builds.

Overview

In the previous article we designed a reusable socket server class to make writing high performance socket based servers easy. We presented a series of simple examples, from the humble echo server through to some slightly more real-world packet echo server and a fake POP3 server. This article continues to make the example server more usable in the real-world by adding a business logic thread pool to the server so that messages are processed by a thread that isn't part of the IO thread pool. This helps to maintain the scalability and performance of the server by moving potentially blocking work off into its own thread pool.

Why do we need another thread pool

To be able to handle variable load it's often useful to have a thread pool that can be expanded and contracted depending on the current load on the server. As we pointed out in the last article, all of our asynchronous socket IO is handled by the socket server's thread pool. The threads in this pool cannot be terminated whilst they have outstanding IO operations or the operations will be terminated. This means that the socket server's thread pool cannot shrink without us keeping track of the IO operations associated with a particular worker thread and only allowing the thread to terminate when all IO operations have completed. To maintain performance we need to make sure that the threads in the socket server's thread pool do not block, there are a finite number of them and if they all block then no socket IO will occur until they unblock. The easiest way to ensure that the IO threads don't block is to move the business logic processing out of the IO thread pool and into a new thread pool. The IO threads then simply handle the IO, chunk the data stream into messages and pass the messages off to the business logic thread pool.

A business logic thread pool

Our requirements for the business logic thread pool are that it should be flexible and capable of increasing and decreasing the number of worker threads as the load on the server dictates. Passing work items into the thread pool should be a non blocking operation so that the IO threads can operate at maximum efficiency but we need to be able to know when a work item hasn't been picked up by a thread within a certain time period so that we can add more threads to the pool. We also need to keep a track off the number of idle threads that we have and, every so often, reduce the number of threads in the pool to conserve resources in times of low server loading.

As you would probably expect, the thread pool uses IO Completion Ports to dispatch work items to worker threads. To be able to monitor how long a work item takes to be processed and therefore be able to work out when we need to add more threads to the pool we use an event. When we dispatch a work item to the IO Completion Port we wait on the event for a configurable timeout period. When a thread picks up a work item from the completion port the first thing that it does is signal the event. If all threads are busy when we dispatch our work item our timeout may expire before a thread signals the event. In this case we may wish to add another thread to the pool to deal with the work load. The dispatch code could look something like this:

   void CThreadPool::HandleDispatch(
      ULONG_PTR completionKey, 
      DWORD dwNumBytes, 
      OVERLAPPED *pOverlapped) 
   {
      m_dispatchCompleteEvent.Reset();

      bool processed = false;
   
      m_workPort.PostStatus(completionKey, dwNumBytes, pOverlapped); 

      // wait for someone to toggle the 'got message' event?


      bool threadStarted = false;

      while (!processed)
      {
         DWORD result = m_dispatchCompleteEvent.Wait(m_timeoutMillis);

         if (result == WAIT_OBJECT_0)
         {
            processed = true;
         }
         else if (result == WAIT_TIMEOUT)
         {
            if (!threadStarted && m_processingThreads == m_activeThreads && 
                (size_t)m_activeThreads < m_maxThreads)
            {            
               StartWorkerThread();

               threadStarted = true;
            }
         }
         else
         {
            throw CWin32Exception(_T("CThreadPool::Dispatch()"),
                                  GetLastError());
         }
      }
   }

Whilst there are threads available to process the work items we don't need to start new threads. As soon as all of the threads in the pool are active we may timeout during the dispatch and then, if we're not already running with the maximum number of threads that we've been configured for, we start a new thread. The actual code is slightly more complex as it handles shutdown requests and adjusts the timeout when we're already running at our maximum number of threads. The dispatcher needs to know how many threads we have in the pool and how many of those threads are processing so each worker thread calls back to the thread pool to let the pool know what state it's in.

The problem with this piece of work item dispatch code is that it doesn't fulfill our requirement of being able to dispatch a work item to the pool in a non blocking fashion. To achieve that, we add another level of indirection, and another IO Completion Port.

Non blocking dispatch

To ensure that users wishing to dispatch a work item to the thread pool can do so without blocking we implement the user level dispatch function as follows:

   void CThreadPool::Dispatch(
      ULONG_PTR completionKey, 
      DWORD dwNumBytes /*= 0*/, 
      OVERLAPPED *pOverlapped /*= 0*/) 
   {
      if (completionKey == 0)
      {
         throw CException(_T("CThreadPool::Dispatch()"),
                       _T("0 is an invalid value for completionKey"));
      }

      m_dispatchPort.PostStatus(completionKey, dwNumBytes, pOverlapped); 
   }

The restriction on 0 valued completion keys is unfortunate but allows us to shut down the thread pool's dispatch thread by posting a 0 to its completion port. The thread pool now has two IO Completion Ports. The dispatch port is serviced by a single maintenance thread which executes the HandleDispatch() method to dispatch work items to the worker threads. Users dispatch without blocking and the maintenance thread dispatches in a blocking manner so that it can expand the thread pool when it needs to. The work item port is serviced by a variable number of threads. We've seen how we know when we need to expand the number of threads, now we'll look at how we reduce the number of threads when the work load is low.

Shutting down dormant threads

Often work items come in batches, the thread pool gets busy, expands, services all of the work items and then becomes less busy. At this point the pool contains threads which aren't being used but which are still consuming resources. These dormant threads can be safely shutdown as the pool can expand again as load increases. The question is, how do we decide when to shut down some threads?

The maintenance thread that handles our blocking dispatch also handles checking for dormant threads. Every so often (a configurable amount) the maintenance thread uses an algorithm to determine if it should shut some threads down. The current algorithm is as follows:

   void CThreadPool::HandleDormantThreads()
   {
      if ((size_t)m_activeThreads > m_minThreads)
      {
         const size_t dormantThreads = m_activeThreads - m_processingThreads;

         if (dormantThreads > m_maxDormantThreads)
         {
            const size_t threadsToShutdown = 
                            (dormantThreads - m_maxDormantThreads) / 2 + 1;

            StopWorkerThreads(threadsToShutdown);
         }
      }
   }

If we have more threads than the minimum number we're allowed to have, find out how many threads aren't currently processing work items and if that number is more than the number of dormant threads that we're allowed to have, shut half of them down (rounding up). Stopping worker threads is a simple case of posting an IO completion key of 0 to the work port for each worker thread that we want to shut down.

Doing the work

We now have a thread pool that fulfills our requirements of automatic expansion and contraction depending upon load and non blocking dispatch for users. The remaining thing to do is allow the derived class to provide its own WorkerThread class to do the work. The worker thread class must implement the following interface:


   virtual bool Initialise();

   virtual void Process(
      ULONG_PTR completionKey,
      DWORD dwNumBytes,
      OVERLAPPED *pOverlapped) = 0;

   virtual void Shutdown();

Initialise() is called when it's first created, Shutdown() is called when the thread is terminating and Process() is called for each work item.

A socket server with a business logic thread pool

Now that we have a suitable thread pool we can integrate it with our fake POP3 socket server so that the actual processing of commands can occur in the business logic pool whilst the IO pool is left to get on with the IO operations. We can also move socket closure off to the business logic pool so that we don't block the IO threads with a lingering socket close.

The first thing we need to do is create and configure our thread pool. Then we can pass a reference to it to our socket server class so that it can pass a reference to it to our IO threads.

   CThreadPool pool(
      5,                    // initial number of threads to create

      5,                    // minimum number of threads to keep in the pool

      10,                   // maximum number of threads in the pool

      5,                    // maximum number of "dormant" threads

      5000,                 // pool maintenance period (millis)

      100,                  // dispatch timeout (millis)

      10000);               // dispatch timeout for when pool is at max threads


   pool.Start();

   CSocketServer server(
      INADDR_ANY,          // address to listen on

      5001,                // port to listen on

      10,                  // max number of sockets to keep in the pool

      10,                  // max number of buffers to keep in the pool

      1024,                // buffer size 

      pool);

   server.Start();

When our socket server has a complete, distinct, message to process it can dispatch it to the thread pool for processing, rather than processing it on one of its IO threads.

   void CSocketServer::ProcessCommand(
      CSocketServer::Socket *pSocket,
      CIOBuffer *pBuffer)
   {
      pSocket->AddRef();
      pBuffer->AddRef();

      m_pool.Dispatch(reinterpret_cast<ULONG_PTR>(pSocket), 
                      0, pBuffer->GetAsOverlapped());
   }

Since we're passing the socket and IO buffer to another thread we have to increment their reference counts so that they don't get cleared up from underneath us. Over in our business logic thread we can finally process the message, and then release the references we took on the socket and IO buffer.

void CThreadPoolWorkerThread::Process(
   ULONG_PTR completionKey,
   DWORD operation,
   OVERLAPPED *pOverlapped)
{
   Socket *pSocket = reinterpret_cast<Socket *>(completionKey);
   CIOBuffer *pBuffer = CIOBuffer::FromOverlapped(pOverlapped);
   
   ProcessMessage(pSocket, pBuffer);

   pSocket->Release();
   pBuffer->Release();
}

Since the socket class marshals all IO requests back to the IO thread pool we can safely make read and write requests from our business logic thread even though the thread may be terminated before the IO requests completes.

Maintaining per-connection state

The final thing that our server may need to do is associate some internal server state with a particular socket connection, the Socket class makes this particularly easy as it provides the following member functions:

   void *GetUserPtr() const;

   void SetUserPtr(void *pData);

   unsigned long GetUserData() const;

   void SetUserData(unsigned long data);

These provide access to a single void * user data pointer which is stored in the Socket. The common usage pattern for this user data is as follows: When the connection is established the socket server is notified by OnConnectionEstablished(), the server can allocate a new per-connection data structure and associate it with the socket passed to OnConnectionEstablished() by calling SetUserPtr(), in subsequent read and write completions the pointer to the per-connection user data structure can be extracted with GetUserPtr(). When the connection is terminated the server is notified by OnConnectionClosed and the per-connection user data can be retrieved and deleted.

Although there are two versions of the user data access functions, one for a void * and one for an unsigned long there is only a single storage location. The two versions are merely for convenience and to reduce casting if the user data required is simply an index into an internal server structure rather than a pointer.

The example server marshals the OnConnectionEstablished() and OnConnectionClosed() calls across to the business logic thread pool and maintains some fairly trivial per-connection user data there. The data we maintain is the address of the client connection (obtained from the buffer passed into OnConnectionEstablished() and the number of messages that have been processed on this particular connection.

The complete example

The shell of a POP3 server which performs its business logic processing in a separate thread pool to its IO can be downloaded from here. The server has a call to ::Sleep() within its message processing code so that the processing takes some time and blocks. Notice how the IO on other connections is unaffected by this, and, if you want, add a similar call to the server we developed at the end of the last article and compare the behavior.

As with the other examples, simply telnet to localhost 5001 to test the server. The server runs until a named event is set and then shuts down. The very simple Server Shutdown program, available here, provides the off switch.

Revision history

  • 21st May 2002 - Initial revision.
  • 27th May 2002 - Added pause/resume functionality to all servers and the server shutdown program. Use CSocket to protect from resource leaks when creating the listening socket. Refactored the Socket and CIOBuffer classes so that common list management code is now in CNodeList and common user data code is now in COpaqueUserData.
  • 29th May 2002 - Linting and general code cleaning
  • 18th June 2002 - Removed call to ReuseAddress() during the creation of the listening socket as it not required - Thanks to Alun Jones for pointing this out to me.
  • 28th June 2002 - Adjusted how we handle socket closure.
  • 30th June 2002 - Removed the requirement for users to subclass the socket server's worker thread class. All of the work can now be done by simply subclassing the socket server class.
  • 15th July 2002 - Socket closure notifications now occur when the server shuts down whilst there are active connections. SocketServer can now be set to ensure read and write packet sequences.
  • 23 July 2002 - Bug fix to CSocketServer::ProcessDataStream(). We were reusing the buffer when we shouldn't have been. Code was fine up until the changes on 30th June and is fine again now. Thanks to an anonymous CodeProject reader for pointing this out to me.
  • 12th August 2002 - Removed the race condition in socket closure - Thanks to David McConnell for pointing this out. Derived class can receive connection reset and connection error notifications. Socket provides a means to determine if send/receive are connected. Dispatch to the thread pool now uses shared enums rather than hard coded constant values. General code cleaning and lint issues.

Other articles in the series

  1. A reusable socket server class
  2. Business logic processing in a socket server
  3. Speeding up socket server connections with AcceptEx
  4. Handling multiple pending socket read and write operations
  5. Testing socket servers with C# and .Net
  6. A high performance TCP/IP socket server COM component for VB

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here