Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C++

Data Processing Thread with the Pause/Resume Functionality

4.96/5 (12 votes)
1 Apr 2013CPOL7 min read 40.5K   769  
The pause/resume thread functionality helps to avoid problems caused by invalid memory access and race conditions.

Motivation

Many communication applications, such as proxy servers, have one main control thread and several data processing threads. The control thread is responsible for configuration and management of the data threads. The application can hold a set (pool) of the data threads; the set size usually fits the CPU amount. The data thread performs time-critical job. For example, a data thread reads messages from configurable list of sockets, parses them using a dictionary, processes them and passes them on to another thread to send them out. In the described scenario the data thread accesses its configurable data (for example the socket list or parsing dictionary) almost exclusively: most of the time it is the only one that touches the configurable data. Only in rare re-configuration/shutdown events the main control thread will modify the thread’s configurable data. It would be extremely inefficient to guard the thread data by mutex locks all the time: indeed, the guard is not required most of time but the costly mutex acquiring operation would be performed each time the data thread reads a message from the socket list or processes it using the dictionary.

The “conflict-of-interest” can be solved in many ways. The pause/resume functionality is one of the reusable solutions.

Interface Description

The class working_thread is cross-platform (Windows/Linux) implementation of the pause/resume functionality. The class (unlike BOOST/C++11 threads) exploits classical approach for thread class implementation: a derived class must implement pure virtual function that will be called in the thread context.

C++
class working_thread
{
public:
 
    enum state_type
    {
        init, //! thread did not start yet
        paused, //! thread is paused 
        running, //! thread is running 
        completed //! thread function is completed 
    };
 
public:
    working_thread
    (
        priority_type p=priority_type(), 
        size_t stack_size=DEFAULT_STACK_VALUE
    )
  void stop(bool force_interrupt=false);
    bool start();
    bool pause();
    bool resume();
    void join();
    handle_type handle();
    id_type id() const;
    state_type state() const;
protected:
    virtual bool action()=0;
    virtual void on_start(){}
    virtual void on_exit(){}
 
    virtual void on_interrupt(){}
    bool is_interrupted() const; 
 
};

The constructor creates a not active thread object with given priority and stack size. The start() method launches a new system thread; the stop() method closes the thread, the pause() method puts the thread asleep and the resume() method wakes it up. The join() method waits till the thread’s main function is completed. A derived class must implement the pure virtual method action() that will be called repeatedly in context of the running system thread. The method must return true to be called again. If it returns false the thread’s main function will exit. Also the derived class can implement two other virtual methods: on_start() and on_exit() called when the thread starts and stops correspondingly.

All the start(), stop(), pause() and resume() methods are blocking. They will wait till the corresponding operation is completed: void stop() will return after the system thread's main function is over. The bool start() method will return after the thread object is in the running state or after it found that the thread cannot be started because the thread’s main function already is over. If the thread cannot be started the bool start() method will return false. The same applies for pause() and resume() methods: the pause() method puts the thread main function asleep and waits till the operation is completed; if the thread cannot be put asleep the method returns false. The resume() method wakes the thread main function up and waits till the operation is completed; if the operation cannot be completed the method returns false. The only reason to return false is that the thread’s main function already is over and the thread object is in the completed state.

The interface has a few advantages:

  • The control thread can “pause” the data thread before changing the configurable data; it is safe to change the thread's configurable data while the data thread is asleep and no mutex lock is needed.
  • All exposed methods are blocking. That allows us to avoid many surprises caused by invalid memory access (for example when the thread object was destroyed but the thread’s main function is still running) and race conditions.

The pause/resume facility implementation requires certain cooperation from the derived class: it imposes the following requirements on the action() implementation:

  • The action() method controls the thread execution by returning true or false. If the function returns true it will be called again. If the function returns false the thread’s main function will exit.
  • The action() method must break its execution if the is_interrupted() flag is set to true. It does not mean that the thread’s main function will exit.

The action() function can meet the second requirement in different ways. The action() function can periodically check the is_interrupted() flag. The checking period defines the waiting time of the stop() and pause() methods. For example (in pseudocode):

C++
virtual bool action()
{
   while(!is_interrupted())
   {
     select(socket_list, time_out_value);
     if(data_arrived)
     {
       ...
     } 
   }
 
   return true;
}

Another way is to implement the on_interrupt() virtual method. This callback is invoked each time either the stop() or the pause() method is called. The callback implementation should be used to force the action() method to check the is_interrupted() flag. For example (in pseudocode):

C++
virtual bool on_interrupt()
{
   signal(special_handle);
}
 
virtual bool action()
{
   while(!is_interrupted())
   {
     select(socket_list, special_handle)//!
     if(data_arrived)
     {
 
       ...
     } 
   }
 
   return true;
}

As an addition the stop() (but not pause()) method can use the force_interrupt flag: stop(true). This flag will interrupt the action() code waiting at any of BOOST’s thread interruption points:

  • boost::thread::join()
  • boost::thread::timed_join()
  • boost::thread::try_join_for()
  • boost::thread::try_join_until()
  • boost::condition_variable::wait()
  • boost::condition_variable::timed_wait()
  • boost::condition_variable::wait_for()
  • boost::condition_variable::wait_until()
  • boost::condition_variable_any::wait()
  • boost::condition_variable_any::timed_wait()
  • boost::condition_variable_any::wait_for()
  • boost::condition_variable_any::wait_until()
  • boost::thread::sleep()
  • boost::this_thread::sleep_for()
  • boost::this_thread::sleep_until()
  • boost::this_thread::interruption_point()

It is highly recommended that the derived class calls to the stop() method in its destructor – it will guarantee that the thread’s main function is completed before the class virtual table is destroyed. That is crucial because the thread’s main function calls the class’s virtual methods (action(), on_start(), on_exit()); and this is why the stop() function should be called in the derived class and not in the base class working_thread.

Bulk Operations. Working with the Thread Pool

The blocking interface is not always efficient when the application uses a thread pool. Suppose the action() method checks the is_interrrupted() flag each second. That will give waiting time of the stop() method above 1 second, which is ok. Suppose the application has 10 data threads. Consecutive call to the stop() method for all 10 threads will give 10 seconds. And that is too long.

The total waiting time can be reduces up to 1 second if the application first signals each thread to exit and then waits till all threads are completed. However that optimization requires non-blocking interface that we do not want to expose.

To allow bulk operations without exposing the non-blocking interface we can introduce auxiliary class bulk. The class, as a “friend”, has access the working_thread’s non-blocking interface but it itself exposes only blocking interface:

C++
struct bulk
{
     bulk(…);
 
     void start();
     void stop(bool force_interrupt);
     void pause();
     void resume();
     size_t count(state);
…
};

The constructor accepts collection of working_thread objects. The functions start(), stop(), pause(), resume() use hidden non-blocking working_thread's interface to ask each thread object to begin the start/stop/pause/resume operation and then wait till all threads completed the operation.

Implementation

The implementation is simple. The main idea is to arrange waiting state in the thread’s main function when the action() method is not called.

Also the blocking function start()/stop()/pause()/resume() are waiting on a condition variable and the variable is signaled each time the thread object changes its state.

The implementation uses the BOOST’s thread class but it is not essential.

The thread’s main function is implemented as:

C++
void working_thread::main()
{
    
    // signal that the thread is running
    signal_state(running);
    // perform on-start custom action
    on_start();
 
    // can throw const boost::thread_interrupted
    // if interrupt() was call in any interrupt
    // point
    try
    {
        while(rq_stop!=m_request)
        {
            while(rq_none==m_request)
            {
                if(!action()) break;
            }
 
            if(rq_pause!=m_request) break;
 
            idle();
        }
    }
    catch(const boost::thread_interrupted&)
    {
    }
        
    // update state
    signal_state(completed);
    // perform on-exit custom action
    // after the state was updated
    on_exit();
}

Where the idle() method is part of the main cycle when the action() function is not called:

C++
void working_thread::idle()
{
    // signal paused state
    signal_state(paused);
    
    // wait in the paused state
    {    
        boost::unique_lock<boost::mutex> lock(m_guard);
 
        while(rq_pause==m_request)
        {
            m_pause.wait(lock);
        }
    }
 
    // signal running state
    signal_state(running);
}

The signal_state() method changes the flag m_request and signals the condition variable:

C++
void working_thread::signal_state(state_type state)
{
    // update the state
    // and signal that the thread is 
    // in new state 
    boost::unique_lock<boost::mutex> lock(m_guard);
    m_state=state;
    
    m_signal.notify_one();
}

All the interface functions will change the flag m_request and then wait while the condition variable is signaled.

For example, the pause() initiates the "pause" operation and waits till it is completed:

C++
bool working_thread::pause()
{
    event_status rc=pause_event();
 
    if(rc.wait) 
        rc.success&=wait_till_paused();
    
    return rc.success;
}

We needs these two parts: "initiate an operation" and "wait till it is completed" to implement the "bulk" interface further.

The first "initiate the pause operation" part is:

C++
working_thread::event_status working_thread::pause_event()
{
    // already paused
    if(paused==m_state) 
        return event_status(true, false);
    
    // cannot pause detached state
    if(detached()) 
        return event_status(false);
    
    request(rq_pause);
 
    // callback is called
    // after the flag is changed
    on_interrupt();
 
    return event_status(true);
}

And the second "wait till it is paused" part is:

C++
bool working_thread::wait_till_paused()
{
    // wait till the thread is paused
    {    
        boost::unique_lock<boost::mutex> lock(m_guard);
        while(paused!=m_state && completed!=m_state)
        {
            m_signal.wait(lock);
        }
    }
 
    return paused==m_state;
}

The bulk pause() operation implementation is easy:

C++
template<typename It>
static void pause(It begin, It end)
{
 std::list<working_thread*> waiting_list;
 
 // pause an object and add to the waiting list
 // if we need to wait till the object is paused
 for(; begin != end; ++begin) 
  pause_item(get_ptr(*begin), waiting_list);
 // wait till all objects in the waiting list are done
 std::for_each(waiting_list.begin(), waiting_list.end(), 
   boost::mem_fn(&working_thread::wait_till_paused));
}

The destructor must assert if the thread’s main function is still running:

C++
virtual ~working_thread()
{
        // stop() must be called before
        // the derived class' dtor is completed.
        // stop() cannot be called here
        // because the thread's main function calls to
        // class's virtual functions including the pure
        // virtual action()
        BOOST_ASSERT_MSG(detached(), 
            "The thread function must be completed at this point");
}

BOOST Thread Tip

The class implementation uses the BOOST’s thread class that is cross-platform and hides OS specific interface. The BOOST’s thread class does not implement all features of the system threads. In particular it does not have interface for the thread priority.

Fortunately the class provides access to the thread “native handle” that allows us to implement any additional OS specific functionality.

Working with the thread priority:

C++
// windows version
// the priority must be set after the thread is started

boost::thread_attributes attr;
   
// set stack size - crossplatform
attr.set_stack_size(m_stack_size);

boost::thread th=boost::thread(attr, ...);


#if defined(BOOST_THREAD_PLATFORM_WIN32)
SetThreadPriority(th.native_handle(), priority_value);
#endif
 
// pthread version
// the priority must be set before the thread is started

boost::thread_attributes attr;

// set stack size - crossplatform
attr.set_stack_size(m_stack_size);
 
#if defined(BOOST_THREAD_PLATFORM_PTHREAD)
 
pthread_attr_setschedpolicy(attr.native_handle(), priority_class);
struct sched_param params;
memset(&params, 0, sizeof(params));
params.sched_priority=priority_value;
pthread_attr_setschedparam(attr.native_handle(), &params);

boost::thread th=boost::thread(attr, ...);

#endif

Platforms

  • Compilers/IDE: MS Visual Studio 2010, KDevelop 4.4.1 using gcc 4.7.2
  • Boost library: 1.53.0
  • Operating systems: Windows 7 SP1, Fedora 18

References

History

  • Initial version: 04/01/2013.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)