Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / programming / threads

Work queue with std or Boost threads

4.75/5 (7 votes)
15 Nov 2011CPOL2 min read 33.9K   704  
Worker thread example with std or Boost threads and C++.

Introduction

Asynchronous worker queue(s) process(es) requests independent of the calling thread. These requests are pushed to a queue whose access is protected by a mutex. Independent worker thread(s) pop items from these queues to handle them. During processing in the worker thread the mutex is unlocked so that the queue is accessible by calling threads. A condition variable is used to signal that items need to be processed. As an optimization, it only signals a condition variable when all worker threads are idle.

WorkQueueCplusplus/queue1.PNG

In typical producer-consumer scenarios, the queue can get flooded when the consumer cannot handle requests as fast as the producer produces them. The queue then overflows. This class offers a virtual function 'Overflow' which gets called when the number of items reaches the maximum queue size. The default implementation drops the item, but an overridden function could use another strategy (e.g., prioritize items in the queue and drop other items).

The class presented here is named KBaseWorkQueue. The following section lists the most important parts.

Template parameter

  • T: template parameter to specify the item. The item gets copied to and from the queue. Use shared_ptr's if the copying of the item is too heavy (e.g., video images).

Data members

  • nThreads: specifies the number of worker threads.
  • nMaxSize: max queue size before an overflow is signaled.

Member functions

  • Add: function to add an item to the work queue
  • Process: pure virtual function which must be overridden to process items. Note that the mutex is unlocked when this function is called (in the context of the worker thread).
  • Overflow: called in the context of the calling thread in overflow cases.

Background

This article heavily uses the example as described in the excellent book 'Programming with POSIX Threads', Butenhof 7.2. Butenhof uses pthreads, while this one makes use of std or Boost.Threads as library and therefore should be platform independent. Boost.Threads is inspired by many concepts of pthreads; std threads are inspired by Boost.

There are already multiple articles on www.codeproject.com describing worker queues:

Using the Code

Example:

C++
//
// Test queue (uses STL version)
//
#include "kbaseworkqueue.hpp"
#include <atomic>
#include <assert.h>


class TestQueue : public KBaseWorkQueue<int>
{
   typedef KBaseWorkQueue<int> base;

public:
   TestQueue(size_t nThreads, size_t nMaxSize)
      : base(nThreads, nMaxSize)
      , m_n(0)
   {}

   virtual void Process(const int& rn) override
   {
      m_n += rn;
   }
   
   std::atomic<int>  m_n;  // atomic only necessary if more than 1 thread is used
};


int main()
{
   const size_t nThreads = 2;
   const size_t nQueue   = 10;

   TestQueue queue{nThreads, nQueue};

   for (size_t n = 0; n != 50; ++n)
   {
      queue.Add(2);
   }

   queue.Stop();

   assert(queue.m_n <= 100);
   
   return 0;
}

Room for Improvement

Multi-threaded code is always extra complex. I have stress-tested this class on a quad core without noticing artifacts. Tests can't proove the absence of bugs though. Also, it would be better to wrap the unlock/lock call sequence around the 'Process' call in a 'scope' object. In my private implementation I have extended the work queue with a 'drop' or 'wait' policy in case the work queue overflows.

History

  • 13 November 2011 - Original version posted.
  • 28 April 2018 - std version

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)