Introduction
Currently I am working on a networking communication project and I am trying to develop it using C++ STL and boost. When developing a multi-thread program, synchronization is an important issue. If your program needs to process streaming packets, then maintaining a queue is a good idea.
Background
This is my first time using boost, it's not really easy to use because of a lack of good examples. You can find the boost library and documentation at http://www.boost.org/. Here is the advantage of using boost, taken from its website:
In a word, Productivity. Use of high-quality libraries like Boost speeds initial development, results in fewer bugs, reduces reinvention-of-the-wheel, and cuts long-term maintenance costs. And since Boost libraries tend to become de facto or de jure standards, many programmers are already familiar with them.
I am only using the boost synchronization class in this example but all the functions can be rewritten with boost and can be used in cross-platform development. The boost synchronization class looks straightforward but I still made some mistakes as a beginner, so I developed a test project to verify its functionality. After understanding how to use it, it will help you simplify the code and reduce bugs.
Using the Code
In this example, I implemented the thread synchronization model as producer-consumer. The producer thread creates data and inserts it into the queue and the consumer thread uses the data and deletes the data from the queue. I use a mutex object to keep the two threads synchronized.
I am trying to use different approaches to solve the same problem and then compare its advantages and disadvantages.
First I designed an interface to abstract the sync queue model. The ISynchronizedQueue
abstract class has only have two methods: add()
and get()
. add()
will be used in the producer thread to insert data into the queue and get()
will be used in the consumer thread to acquire and remove data from the queue. There are three different implementations of this interface:
SynchronizedDequeue
: is a double-ended queue, implemented with STL deque. SychronizedVector
: is a ring or cycle queue, implemented with STL vector. SychronizedVectorNB
: is the no-blocking version of SychronizedVector
.
Here is the header and interface definition:
#include <iostream>
#include <deque>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
using namespace std;
#define default_packesize 1280
class TPacket
{
int my_size;
unsigned char my_databuf[default_packesize];
unsigned int ID;
public:
TPacket() {std::memset(my_databuf,0,sizeof(my_databuf));my_size=0;}
~TPacket() {;}
int GetSize() {return my_size;}
void SetSize(int size) {my_size = size;}
unsigned int GetID() {return ID;}
void SetID(int id) {ID = id;}
bool GetData(char* pbuf,int& size)
{
if(my_size>size)
return false;
size = my_size;
memcpy(pbuf,my_databuf,my_size);
return true;
}
bool SetData(char* pbuf,int size)
{
if(size>default_packesize)
return false;
memcpy(my_databuf,pbuf,size);
my_size=size;
return true;
}
public:
virtual bool IsValid() {return false;}
virtual bool Encode() {return false;}
virtual bool Decode() {return false;}
};
template <class T>
class ISynchronizedQueue
{
public:
virtual bool add(T pkt) = 0;
virtual bool get(T& pkt) = 0;
virtual bool read(T& pkt) = 0;
virtual bool del(T& pkt) = 0;
virtual bool clear() = 0;
};
Let's see the implementations:
class SynchronizedDequeue: public ISynchronizedQueue<TPacket>
{
boost::mutex m_mutex;
deque<TPacket> m_queue;
boost::condition_variable m_cond;
public:
bool add(TPacket pkt)
{
boost::lock_guard<boost::mutex> lock(m_mutex);
if(m_queue.size()>100)
m_queue.clear();
m_queue.push_back(pkt);
return true;
}
bool get(TPacket& pkt)
{
boost::lock_guard<boost::mutex> lock(m_mutex);
if (!m_queue.size())
{
return false;
}
pkt = m_queue.front();
m_queue.pop_front();
return true;
}
bool read(TPacket& pkt)
{
boost::lock_guard<boost::mutex> lock(m_mutex);
if (!m_queue.size())
{
return false;
}
pkt = m_queue.front();
return true;
}
bool del(TPacket& pkt)
{
return get(pkt);
}
bool clear()
{
boost::lock_guard<boost::mutex> lock(m_mutex);
m_queue.clear();
return true;
}
};
SynchronizedDequeue
has a dynamic queue size. The advantage is that if the producer is faster than the consumer, no data will escape, all the produced data will be processed by the consumer. The disadvantage is it has more impact on memory management performance. It will allocate memory when a packet is inserted into the queue, and release memory when we return the data to the consumer thread. Since there will be many times of memory allocation and deallocation, this may slow down the memory reclaim for bigger objects in the same process.
class SynchronizedVector :public ISynchronizedQueue<TPacket>
{
int queue_size;
boost::mutex m_mutex;
std::vector<TPacket> my_vector;
int start,end;
public:
SynchronizedVector(int q_size=100) {queue_size = q_size; start=end=0; my_vector.assign(queue_size,TPacket());}
bool add(TPacket pkt)
{
boost::lock_guard<boost::mutex> lock(m_mutex);
my_vector[end++] = pkt;
if(end>=queue_size)
end = 0;
if(end == start)
start = end+1;
if(start>=queue_size)
start = 0;
return true;
}
bool get(TPacket& pkt)
{
boost::lock_guard<boost::mutex> lock(m_mutex);
if(start==end)
return false;
pkt = my_vector[start++];
if(start>=queue_size)
start = 0;
return true;
}
bool read(TPacket& pkt) {
return false;
}
bool del(TPacket& pkt) {
return false;
}
bool clear()
{
boost::lock_guard<boost::mutex> lock(m_mutex);
start = end =0;
return true;
}
};
SychronizedVector
uses a fixed size queue to avoid memory management overhead but it will overwrite old data in the queue which is not timely processed while new data comes and flushes it out.
class SynchronizedVectorNB :public ISynchronizedQueue<TPacket>
{
int queue_size;
boost::mutex m_mutex;
std::vector<TPacket> my_vector;
int start,end;
public:
SynchronizedVectorNB(int q_size=100) {queue_size = q_size; start=end=0; my_vector.assign(queue_size,TPacket());}
bool add(TPacket pkt)
{
boost::unique_lock<boost::mutex> lock(m_mutex,boost::try_to_lock_t());
if(!lock.owns_lock())
return false;
my_vector[end++] = pkt;
if(end>=queue_size)
end = 0;
if(end == start)
start = end+1;
if(start>=queue_size)
start = 0;
return true;
}
bool get(TPacket& pkt)
{
boost::unique_lock<boost::mutex> lock(m_mutex,boost::try_to_lock_t());
if(!lock.owns_lock())
return false;
if(start==end)
return false;
pkt = my_vector[start++];
if(start>=queue_size)
start = 0;
return true;
}
bool read(TPacket& pkt) {
return false;
}
bool del(TPacket& pkt) {
return false;
}
bool clear()
{
boost::lock_guard<boost::mutex> lock(m_mutex);
start = end =0;
return true;
}
};
SychronizedVectorNB
will not be blocked by either the producer or consumer thread. The advantage is that if there is some other activity needs to be done in the same loop of the queue access thread, then the non-block version will guarantee the response time.
The two queues above may block the thread when the thread tries to own the mutex object. If one thread owns the mutex and then some exception happens, the other thread will also be blocked. Its disadvantage is that it may fail in adding data to the queue when it fails to own the lock, the caller then needs to add the same data again.
Here is the sample code for the producer thread:
DWORD WINAPI ProducerServerThread(LPVOID lpParam)
{
int count=0;
ISynchronizedQueue<TPacket>* pQ = (ISynchronizedQueue<TPacket>*)lpParam;
TPacket pkt;
LOG("\n-------------------------Producer thread begin-----------------------");
while(1)
{
DWORD t1 = GetTickCount();
Sleep(50);
if(count++>=1000)
break;
memset(&pkt,0,sizeof(pkt));
pkt.SetID(count);
if(pQ->add(pkt))
LOG("Add PACKET ID = %d ",pkt.GetID());
else
LOG("Add Packet Failed");
DWORD t2 = GetTickCount();
LOG("ONE-LOOP DURATION = %d",t2-t1);
}
LOG("\n-------------------------Producer thread end-----------------------");
return 0;
}
Here is the sample code for the consumer thread:
DWORD WINAPI ConsumerServerThread(LPVOID lpParam)
{
int count=0;
ISynchronizedQueue<TPacket>* pQ = (ISynchronizedQueue<TPacket>*)lpParam;
TPacket pkt;
LOG("\n-------------------------Cosumer thread begin-----------------------");
while(1)
{
Sleep(10);
if(count++>=1200)
break;
if(pQ->get(pkt))
LOG("Get Packet ID = %d",pkt.GetID());
else
LOG("Get Packet Failed");
}
LOG("\n-------------------------Cosumer thread end-----------------------");
return 0;
}
Here is the sample code for the main thread:
SynchronizedDequeue m_q[5];
int _tmain(int argc, _TCHAR* argv[])
{
int thread_count =5;
HANDLE server_threads[10];
for (int i=0; i < thread_count ;i++)
{
server_threads[i] = CreateThread(
NULL,
0,
ProducerServerThread,
&m_q[i],
0,
NULL
);
if (server_threads[i] == NULL)
{
LOG( "Create Thread failed: %d\n", GetLastError());
return 0;
}
}
for (int i= 0; i < thread_count ;i++)
{
server_threads[i+thread_count] = CreateThread(
NULL,
0,
ConsumerServerThread,
&m_q[i],
0,
NULL
);
if (server_threads[i] == NULL)
{
LOG( "Create Thread failed: %d\n", GetLastError());
return 0;
}
}
int retval = WaitForMultipleObjects(
2*thread_count,
server_threads,
TRUE,
INFINITE
);
if ((retval == WAIT_FAILED) || (retval == WAIT_TIMEOUT))
{
LOG( "WaitForMultipleObjects failed: %d\n", GetLastError());
return 0;
}
}
In the test code, I create five producers, five consumers, and five queues. Each producer has its partner consumer linked by using the same queue. You can verify if each packet data produced is processed in order by consumer thread through its packet ID. You can define the LOG macro by yourself, I use a thread safe LOG macro with a log time output. With the log time you can see the thread performance more clearly.
19:33:50:106 5972 info: ConsumerServerThread: Get Packet Failed
19:33:50:106 4244 info: ConsumerServerThread: Get Packet Failed
19:33:50:122 5808 info: ConsumerServerThread: Get Packet Failed
19:33:50:122 8464 info: ConsumerServerThread: Get Packet Failed
19:33:50:122 7760 info: ProducerServerThread: Add PACKET ID = 1
19:33:50:122 7416 info: ConsumerServerThread: Get Packet ID = 1
19:33:50:122 7760 info: ProducerServerThread: ONE-LOOP DURATION = 63
19:33:50:138 5808 info: ConsumerServerThread: Get Packet Failed
19:33:50:138 5972 info: ConsumerServerThread: Get Packet Failed
19:33:50:138 8464 info: ConsumerServerThread: Get Packet Failed
19:33:50:138 4244 info: ConsumerServerThread: Get Packet Failed
19:33:50:138 8268 info: ProducerServerThread: Add PACKET ID = 1
19:33:50:138 7416 info: ConsumerServerThread: Get Packet Failed
19:33:50:138 8268 info: ProducerServerThread: ONE-LOOP DURATION = 62
19:33:50:153 4244 info: ConsumerServerThread: Get Packet ID = 1
19:33:50:153 5808 info: ConsumerServerThread: Get Packet Failed
19:33:50:153 8836 info: ProducerServerThread: Add PACKET ID = 1
19:33:50:153 7352 info: ProducerServerThread: Add PACKET ID = 1
19:33:50:153 8464 info: ConsumerServerThread: Get Packet Failed
19:33:50:153 5972 info: ConsumerServerThread: Get Packet ID = 1
19:33:50:153 8836 info: ProducerServerThread: ONE-LOOP DURATION = 63
19:33:50:153 7352 info: ProducerServerThread: ONE-LOOP DURATION = 63
After testing the three different sync queues with 5 producer-consumer threads pair and 1000 packets add and get each, their performances are basically same. The log itself will cost around 10 ms. You can modify it to see how the three types of queues perform in a larger data set, a longer running time, or with a big object memory allocation.
Points of Interest
It was kind of fun to use a little bit of the C++ boost library for the first time.
History
- First version, 08/17/2012.
- Second version, 08/19/2012: Update the article with more explanation.
- Third version, 10/28/2014: Update the code to support template.