The following source was built using Visual Studio 6.0 SP5 and Visual Studio .NET. You need to have
a version of the Microsoft Platform
SDK installed
Note that the debug builds of the code waste a lot of CPU cycles due to the the debug trace output.
It's only worth profiling the release builds.
Overview
In the previous article
we designed a reusable socket server class to make writing high performance
socket based servers easy. We presented a series of simple examples, from the humble echo server through
to some slightly more real-world packet echo server and a fake POP3 server. This article continues
to make the example server more usable in the real-world by adding a business logic thread pool to
the server so that messages are processed by a thread that isn't part of the IO thread pool. This helps
to maintain the scalability and performance of the server by moving potentially blocking work off into
its own thread pool.
Why do we need another thread pool
To be able to handle variable load it's often useful to have a thread pool that can be expanded and
contracted depending on the current load on the server. As we pointed out in the last article, all of our
asynchronous socket IO is handled by the socket server's thread pool. The threads in this pool cannot
be terminated whilst they have outstanding IO operations or the operations will be terminated. This means
that the socket server's thread pool cannot shrink without us keeping track of the IO operations
associated with a particular worker thread and only allowing the thread to terminate when all IO operations
have completed. To maintain performance we need to make sure that the threads in the socket server's
thread pool do not block, there are a finite number of them and if they all block then no socket IO
will occur until they unblock. The easiest way to ensure that the IO threads don't block is to move
the business logic processing out of the IO thread pool and into a new thread pool. The IO threads
then simply handle the IO, chunk the data stream into messages and pass the messages off to the business
logic thread pool.
A business logic thread pool
Our requirements for the business logic thread pool are that it should be flexible and capable of
increasing and decreasing the number of worker threads as the load on the server dictates. Passing work
items into the thread pool should be a non blocking operation so that the IO threads can operate at
maximum efficiency but we need to be able to know when a work item hasn't been picked up by a thread
within a certain time period so that we can add more threads to the pool. We also need to keep a track
off the number of idle threads that we have and, every so often, reduce the number of threads in the pool
to conserve resources in times of low server loading.
As you would probably expect, the thread pool uses IO Completion Ports to dispatch work items to
worker threads. To be able to monitor how long a work item takes to be processed and therefore be able
to work out when we need to add more threads to the pool we use an event. When we dispatch a work item
to the IO Completion Port we wait on the event for a configurable timeout period. When a thread picks
up a work item from the completion port the first thing that it does is signal the event. If all threads
are busy when we dispatch our work item our timeout may expire before a thread signals the event. In this
case we may wish to add another thread to the pool to deal with the work load. The dispatch code could
look something like this:
void CThreadPool::HandleDispatch(
ULONG_PTR completionKey,
DWORD dwNumBytes,
OVERLAPPED *pOverlapped)
{
m_dispatchCompleteEvent.Reset();
bool processed = false;
m_workPort.PostStatus(completionKey, dwNumBytes, pOverlapped);
bool threadStarted = false;
while (!processed)
{
DWORD result = m_dispatchCompleteEvent.Wait(m_timeoutMillis);
if (result == WAIT_OBJECT_0)
{
processed = true;
}
else if (result == WAIT_TIMEOUT)
{
if (!threadStarted && m_processingThreads == m_activeThreads &&
(size_t)m_activeThreads < m_maxThreads)
{
StartWorkerThread();
threadStarted = true;
}
}
else
{
throw CWin32Exception(_T("CThreadPool::Dispatch()"),
GetLastError());
}
}
}
Whilst there are threads available to process the work items we don't need to start new threads. As
soon as all of the threads in the pool are active we may timeout during the dispatch and then, if we're
not already running with the maximum number of threads that we've been configured for, we start a new
thread. The actual code is slightly more complex as it handles shutdown requests and adjusts the
timeout when we're already running at our maximum number of threads. The dispatcher needs to know
how many threads we have in the pool and how many of those threads are processing so each worker thread
calls back to the thread pool to let the pool know what state it's in.
The problem with this piece of work item dispatch code is that it doesn't fulfill our requirement
of being able to dispatch a work item to the pool in a non blocking fashion. To achieve that, we add
another level of indirection, and another IO Completion Port.
Non blocking dispatch
To ensure that users wishing to dispatch a work item to the thread pool can do so without blocking
we implement the user level dispatch function as follows:
void CThreadPool::Dispatch(
ULONG_PTR completionKey,
DWORD dwNumBytes ,
OVERLAPPED *pOverlapped )
{
if (completionKey == 0)
{
throw CException(_T("CThreadPool::Dispatch()"),
_T("0 is an invalid value for completionKey"));
}
m_dispatchPort.PostStatus(completionKey, dwNumBytes, pOverlapped);
}
The restriction on 0 valued completion keys is unfortunate but allows us to shut down the thread pool's
dispatch thread by posting a 0 to its completion port. The thread pool now has two IO Completion Ports. The
dispatch port is serviced by a single maintenance thread which executes the HandleDispatch()
method to dispatch work items to the worker threads. Users dispatch without blocking and the maintenance thread
dispatches in a blocking manner so that it can expand the thread pool when it needs to. The work item
port is serviced by a variable number of threads. We've seen how we know when we need to expand the
number of threads, now we'll look at how we reduce the number of threads when the work load is low.
Shutting down dormant threads
Often work items come in batches, the thread pool gets busy, expands, services all of the work items and
then becomes less busy. At this point the pool contains threads which aren't being used but which are
still consuming resources. These dormant threads can be safely shutdown as the pool can expand again as
load increases. The question is, how do we decide when to shut down some threads?
The maintenance thread that handles our blocking dispatch also handles checking for dormant threads.
Every so often (a configurable amount) the maintenance thread uses an algorithm to determine if it
should shut some threads down. The current algorithm is as follows:
void CThreadPool::HandleDormantThreads()
{
if ((size_t)m_activeThreads > m_minThreads)
{
const size_t dormantThreads = m_activeThreads - m_processingThreads;
if (dormantThreads > m_maxDormantThreads)
{
const size_t threadsToShutdown =
(dormantThreads - m_maxDormantThreads) / 2 + 1;
StopWorkerThreads(threadsToShutdown);
}
}
}
If we have more threads than the minimum number we're allowed to have, find out how many threads
aren't currently processing work items and if that number is more than the number of dormant threads
that we're allowed to have, shut half of them down (rounding up). Stopping worker threads is a simple
case of posting an IO completion key of 0 to the work port for each worker thread that we want to
shut down.
Doing the work
We now have a thread pool that fulfills our requirements of automatic expansion and contraction
depending upon load and non blocking dispatch for users. The remaining thing to do is allow the derived
class to provide its own WorkerThread
class to do the work. The worker thread class must implement the
following interface:
virtual bool Initialise();
virtual void Process(
ULONG_PTR completionKey,
DWORD dwNumBytes,
OVERLAPPED *pOverlapped) = 0;
virtual void Shutdown();
Initialise()
is called when it's first created, Shutdown()
is called when
the thread is terminating and Process()
is called for each work item.
A socket server with a business logic thread pool
Now that we have a suitable thread pool we can integrate it with our fake POP3 socket server so that
the actual processing of commands can occur in the business logic pool whilst the IO pool is left to get
on with the IO operations. We can also move socket closure off to the business logic pool so that we
don't block the IO threads with a lingering socket close.
The first thing we need to do is create and configure our thread pool. Then we can pass a reference
to it to our socket server class so that it can pass a reference to it to our IO threads.
CThreadPool pool(
5,
5,
10,
5,
5000,
100,
10000);
pool.Start();
CSocketServer server(
INADDR_ANY,
5001,
10,
10,
1024,
pool);
server.Start();
When our socket server has a complete, distinct, message to process it can dispatch it
to the thread pool for processing, rather than processing it on one of its IO threads.
void CSocketServer::ProcessCommand(
CSocketServer::Socket *pSocket,
CIOBuffer *pBuffer)
{
pSocket->AddRef();
pBuffer->AddRef();
m_pool.Dispatch(reinterpret_cast<ULONG_PTR>(pSocket),
0, pBuffer->GetAsOverlapped());
}
Since we're passing the socket and IO buffer to another thread we have to increment their reference
counts so that they don't get cleared up from underneath us. Over in our business logic thread we can
finally process the message, and then release the references we took on the socket and IO buffer.
void CThreadPoolWorkerThread::Process(
ULONG_PTR completionKey,
DWORD operation,
OVERLAPPED *pOverlapped)
{
Socket *pSocket = reinterpret_cast<Socket *>(completionKey);
CIOBuffer *pBuffer = CIOBuffer::FromOverlapped(pOverlapped);
ProcessMessage(pSocket, pBuffer);
pSocket->Release();
pBuffer->Release();
}
Since the socket class marshals all IO requests back to the IO thread pool we can safely make read
and write requests from our business logic thread even though the thread may be terminated before the
IO requests completes.
Maintaining per-connection state
The final thing that our server may need to do is associate some internal server state with a
particular socket connection, the Socket
class makes this particularly easy as it provides
the following member functions:
void *GetUserPtr() const;
void SetUserPtr(void *pData);
unsigned long GetUserData() const;
void SetUserData(unsigned long data);
These provide access to a single void *
user data pointer which is stored in the
Socket
. The common usage pattern for this user data is as follows: When the connection is
established the socket server is notified by OnConnectionEstablished()
, the server can
allocate a new per-connection data structure and associate it with the socket passed to
OnConnectionEstablished()
by calling SetUserPtr()
, in subsequent
read and write completions the pointer to the per-connection user data structure can be extracted with
GetUserPtr()
. When the connection is terminated the server is notified by
OnConnectionClosed
and the per-connection user data can be retrieved and deleted.
Although there are two versions of the user data access functions, one for a void *
and
one for an unsigned long
there is only a single storage location. The two versions are
merely for convenience and to reduce casting if the user data required is simply an index into an
internal server structure rather than a pointer.
The example server marshals the OnConnectionEstablished()
and
OnConnectionClosed()
calls across to the business logic thread pool and maintains some
fairly trivial per-connection user data there. The data we maintain is the address of the client
connection (obtained from the buffer passed into OnConnectionEstablished()
and the number
of messages that have been processed on this particular connection.
The complete example
The shell of a POP3 server which performs its business logic processing in a separate
thread pool to
its IO can be downloaded from
here.
The server has a call to
::Sleep()
within its message processing code so that the processing takes some time and
blocks. Notice how the IO on other connections is unaffected by this, and, if you want, add a similar
call to the server we developed at the end of the last article and compare the behavior.
As with the other examples, simply telnet to localhost 5001 to test the server. The server runs
until a named event is set and then shuts down. The very simple Server Shutdown program, available
here, provides the
off switch.
Revision history
- 21st May 2002 - Initial revision.
- 27th May 2002 - Added pause/resume functionality to all servers and the server shutdown program. Use
CSocket
to protect from resource leaks when creating the listening socket. Refactored the Socket and
CIOBuffer
classes so that common list management code is now in CNodeList
and common user data code is now in
COpaqueUserData
.
- 29th May 2002 - Linting and general code cleaning
- 18th June 2002 - Removed call to
ReuseAddress
() during the creation of the listening socket as it not required - Thanks to Alun Jones for pointing this out to me.
- 28th June 2002 - Adjusted how we handle socket closure.
- 30th June 2002 - Removed the requirement for users to subclass the socket server's worker thread class. All of the work can now be done by simply subclassing the socket server class.
- 15th July 2002 - Socket closure notifications now occur when the server shuts down whilst there are active connections.
SocketServer
can now be set to ensure read and write packet sequences.
- 23 July 2002 - Bug fix to
CSocketServer::ProcessDataStream()
. We were
reusing the buffer when we
shouldn't have been. Code was fine up until the changes on 30th June and is fine again now. Thanks to an anonymous CodeProject reader for pointing this out to me.
- 12th August 2002 - Removed the race condition in socket closure - Thanks to David McConnell for pointing this out.
Derived class can receive connection reset and connection error notifications. Socket provides a means to determine if
send/receive are connected. Dispatch to the thread pool now uses shared enums rather than hard coded constant values.
General code cleaning and lint issues.
Other articles in the series
-
A reusable socket server class
-
Business logic processing in a socket server
-
Speeding up socket server connections with AcceptEx
-
Handling multiple pending socket read and write operations
-
Testing socket servers with C# and .Net
-
A high performance TCP/IP socket server COM component for VB