Motivation
Many communication
applications, such as proxy servers, have one main control thread and several
data processing threads. The control thread is responsible for configuration
and management of the data threads. The application can hold a set (pool) of the data threads;
the set size usually fits the CPU amount. The data thread performs time-critical job. For example, a
data thread reads messages from configurable list of sockets, parses them using a dictionary, processes them and passes them on to another thread to send them out. In the described scenario the data thread accesses its
configurable data (for example the socket list or parsing dictionary) almost
exclusively: most of the time it is the only one that touches the configurable data.
Only in rare re-configuration/shutdown events the main control thread will
modify the thread’s configurable data. It would be extremely inefficient to
guard the thread data by mutex locks all the time: indeed, the guard is not
required most of time but the costly mutex acquiring operation would be performed each time the data
thread reads a message from the socket list or processes it using the dictionary.
The “conflict-of-interest” can be solved in many ways.
The pause/resume functionality is one of the reusable solutions.
Interface Description
The class working_thread
is cross-platform (Windows/Linux) implementation of the pause/resume functionality. The class
(unlike BOOST/C++11 threads) exploits classical approach for thread class implementation: a
derived class must implement pure virtual function that will be called in the thread context.
class working_thread
{
public:
enum state_type
{
init, paused, running, completed };
public:
working_thread
(
priority_type p=priority_type(),
size_t stack_size=DEFAULT_STACK_VALUE
)
void stop(bool force_interrupt=false);
bool start();
bool pause();
bool resume();
void join();
handle_type handle();
id_type id() const;
state_type state() const;
protected:
virtual bool action()=0;
virtual void on_start(){}
virtual void on_exit(){}
virtual void on_interrupt(){}
bool is_interrupted() const;
};
The constructor creates a not active thread object with given priority and stack size. The start()
method launches a new system thread; the stop()
method closes the thread, the pause()
method puts the thread asleep and the resume()
method wakes it up.
The join()
method waits till the thread’s main function is completed. A derived class must implement the pure virtual method action()
that will be called repeatedly in context of the running system thread. The method must return true
to be called again. If it returns false
the thread’s main function will exit. Also the derived class can implement two other virtual methods: on_start()
and on_exit()
called when the thread starts and stops correspondingly.
All the start()
, stop()
, pause()
and resume()
methods are blocking. They will wait till the corresponding operation is completed: void stop()
will return after the system thread's main function is over. The bool start()
method will return after the thread object is in the running state or after it found that the thread cannot be started because the thread’s main function already is over. If the thread cannot be started the bool start()
method will return false
. The same applies for pause()
and resume()
methods: the pause()
method puts the thread main function asleep and waits till the operation is completed; if the thread cannot be put asleep the method returns false
. The resume()
method wakes the thread main function up and waits till the operation is completed; if the operation cannot be completed the method returns false
. The only reason to return false
is that the thread’s main function already is over and the thread object is in the completed state.
The interface has a few advantages:
- The control thread can “pause” the data thread before changing the configurable data; it is safe to change the thread's configurable data while the data thread is asleep and no mutex lock is needed.
- All exposed methods are blocking. That allows us to avoid many surprises caused by invalid memory access (for example when the thread object was destroyed but the thread’s main function is still running) and race conditions.
The pause/resume facility implementation requires certain cooperation from the derived class: it imposes the following requirements on the action()
implementation:
- The
action()
method controls the thread execution by returning
true
or false
. If the function returns
true
it will be called again. If the function returns
false
the thread’s main function will exit. - The
action()
method must break its execution if the
is_interrupted()
flag is set to true. It does not mean that the thread’s main function will exit.
The action()
function can meet the second requirement in different ways. The action()
function can periodically check the is_interrupted()
flag. The checking period defines the waiting time of the stop()
and pause()
methods. For example (in pseudocode):
virtual bool action()
{
while(!is_interrupted())
{
select(socket_list, time_out_value);
if(data_arrived)
{
...
}
}
return true;
}
Another way is to implement
the on_interrupt()
virtual method. This callback is invoked each time either
the stop()
or the pause()
method is called. The callback implementation should
be used to force the action()
method to check the is_interrupted()
flag. For
example (in pseudocode):
virtual bool on_interrupt()
{
signal(special_handle);
}
virtual bool action()
{
while(!is_interrupted())
{
select(socket_list, special_handle) if(data_arrived)
{
...
}
}
return true;
}
As an addition the stop()
(but not pause()
) method can use the
force_interrupt
flag: stop(true)
. This flag will interrupt the action()
code waiting at any of BOOST’s thread interruption points:
boost::thread::join()
boost::thread::timed_join()
boost::thread::try_join_for()
boost::thread::try_join_until()
boost::condition_variable::wait()
boost::condition_variable::timed_wait()
boost::condition_variable::wait_for()
boost::condition_variable::wait_until()
boost::condition_variable_any::wait()
boost::condition_variable_any::timed_wait()
boost::condition_variable_any::wait_for()
boost::condition_variable_any::wait_until()
boost::thread::sleep()
boost::this_thread::sleep_for()
boost::this_thread::sleep_until()
boost::this_thread::interruption_point()
It is highly recommended
that the derived class calls to the stop()
method in its destructor – it will
guarantee that the thread’s main function is completed before the class virtual
table is destroyed. That is crucial because the thread’s main function calls
the class’s virtual methods (action()
, on_start()
, on_exit()
); and this is why
the stop()
function should be called in the derived class and not in the base
class working_thread
.
Bulk Operations. Working with the Thread Pool
The blocking interface is not always efficient when the application uses a thread pool. Suppose the action()
method checks the is_interrrupted()
flag each second. That will give waiting time of the stop()
method above 1 second, which is ok. Suppose the application has 10 data threads. Consecutive call to the stop()
method for all 10 threads will give 10 seconds. And that is too long.
The total waiting time can be reduces up to 1 second if the application first signals each thread to exit and then waits till all threads are completed. However that optimization requires non-blocking interface that we do not want to expose.
To allow bulk operations without exposing the non-blocking interface we can introduce auxiliary class bulk
. The class, as a “friend”, has access the working_thread
’s non-blocking interface but it itself exposes only blocking interface:
struct bulk
{
bulk(…);
void start();
void stop(bool force_interrupt);
void pause();
void resume();
size_t count(state);
…
};
The constructor
accepts collection of working_thread
objects. The functions start()
, stop()
,
pause()
, resume()
use hidden non-blocking working_thread
's interface to ask each thread object to
begin the start/stop/pause/resume operation and then wait till all threads
completed the operation.
Implementation
The implementation is simple. The main idea is to arrange waiting state
in the thread’s main function when the action()
method is not called.
Also the blocking function start()/stop()/pause()/resume()
are waiting
on a condition variable and the variable is signaled each time the thread
object changes its state.
The implementation uses the BOOST’s thread class but it is not essential.
The thread’s main function is implemented as:
void working_thread::main()
{
signal_state(running);
on_start();
try
{
while(rq_stop!=m_request)
{
while(rq_none==m_request)
{
if(!action()) break;
}
if(rq_pause!=m_request) break;
idle();
}
}
catch(const boost::thread_interrupted&)
{
}
signal_state(completed);
on_exit();
}
Where the idle()
method is part of the main cycle when the action()
function is not called:
void working_thread::idle()
{
signal_state(paused);
{
boost::unique_lock<boost::mutex> lock(m_guard);
while(rq_pause==m_request)
{
m_pause.wait(lock);
}
}
signal_state(running);
}
The signal_state()
method changes the flag m_request
and signals the condition variable:
void working_thread::signal_state(state_type state)
{
boost::unique_lock<boost::mutex> lock(m_guard);
m_state=state;
m_signal.notify_one();
}
All the interface functions will change the flag m_request
and then wait while the condition variable is signaled.
For example, the pause()
initiates the "pause" operation and waits till it is completed:
bool working_thread::pause()
{
event_status rc=pause_event();
if(rc.wait)
rc.success&=wait_till_paused();
return rc.success;
}
We needs these two parts: "initiate an operation" and "wait till it is completed" to implement the "bulk" interface further.
The first "initiate the pause operation" part is:
working_thread::event_status working_thread::pause_event()
{
if(paused==m_state)
return event_status(true, false);
if(detached())
return event_status(false);
request(rq_pause);
on_interrupt();
return event_status(true);
}
And the second "wait till it is paused" part is:
bool working_thread::wait_till_paused()
{
{
boost::unique_lock<boost::mutex> lock(m_guard);
while(paused!=m_state && completed!=m_state)
{
m_signal.wait(lock);
}
}
return paused==m_state;
}
The bulk pause()
operation implementation is easy:
template<typename It>
static void pause(It begin, It end)
{
std::list<working_thread*> waiting_list;
for(; begin != end; ++begin)
pause_item(get_ptr(*begin), waiting_list);
std::for_each(waiting_list.begin(), waiting_list.end(),
boost::mem_fn(&working_thread::wait_till_paused));
}
The destructor must assert if the thread’s main function is still running:
virtual ~working_thread()
{
BOOST_ASSERT_MSG(detached(),
"The thread function must be completed at this point");
}
BOOST Thread Tip
The class implementation
uses the BOOST’s thread class that is cross-platform and hides OS specific
interface. The BOOST’s thread class
does not implement all features of the system threads. In particular it does
not have interface for the thread priority.
Fortunately the class
provides access to the thread “native handle” that allows us to implement any
additional OS specific functionality.
Working with the thread
priority:
boost::thread_attributes attr;
attr.set_stack_size(m_stack_size);
boost::thread th=boost::thread(attr, ...);
#if defined(BOOST_THREAD_PLATFORM_WIN32)
SetThreadPriority(th.native_handle(), priority_value);
#endif
boost::thread_attributes attr;
attr.set_stack_size(m_stack_size);
#if defined(BOOST_THREAD_PLATFORM_PTHREAD)
pthread_attr_setschedpolicy(attr.native_handle(), priority_class);
struct sched_param params;
memset(¶ms, 0, sizeof(params));
params.sched_priority=priority_value;
pthread_attr_setschedparam(attr.native_handle(), ¶ms);
boost::thread th=boost::thread(attr, ...);
#endif
Platforms
- Compilers/IDE: MS Visual Studio 2010, KDevelop 4.4.1 using gcc 4.7.2
- Boost library: 1.53.0
- Operating systems: Windows 7 SP1, Fedora 18
References
History
- Initial version: 04/01/2013.