Introduction
In the Observer design pattern, a subject holds a list of interested parties – the observers – which it will notify about changes in status. Simply put, it’s a form of subscription, and this design comes up in all sorts of places (which is one of the definitions of the term ‘design pattern‘). It’s well suited for handling asynchronous events, like user interaction in a GUI, sensor information, and so on.
There is, however, often a need to re-synchronise asynchronous events. For instance, you might keep the latest status update until it’s actually needed for display, storage or some calculation. By doing this, you disregard the asynchronous nature of its source, and treat it as just another variable, as if it had been read from the subject right then. In other words, you synchronise a status from the past with the present. Sometimes, though, you don’t want the last value, but the next, which is a bit more complex, as it requires you to wait for the future to happen before we can say it’s the present.
In this article, we will write a simple multi-threaded example implementation of the Observer pattern, and show how to re-synchronise a past event to look current. Then, we’ll demonstrate a technique to treat future events like they’re current, too.
Thread Safety
If, as is often the case, the observed subject and the part of the system that manages the observers run in different threads, we need to make sure that they can co-exist in a friendly manner. Specifically, we must make sure that adding or removing an observer – activities that take place outside the observed thread – does not interfere with the reporting of events.
In other words, accessing the list of observers is a critical section of the code, which may not be interrupted. Since this is a quite common situation, operating systems that support multi-threading also provide tools to handle it. On Windows, this is done with a CRITICAL_SECTION
object.
(If you use MFC, there is an eponymous wrapper for it. However, the implementations of the MFC synchronisation objects have been badly flawed in the past, and I believe they still are.)
Checking the documentation, we see that there are functions available to create and destroy CRITICAL_SECTION
, and to enter and leave a locked state. We also see that it can be entered and left recursively, as long as it’s the same thread. Knowing all this, we can write a C++ class to manage it.
Why not use the Windows API directly? The same reason for almost all C++ wrappers of OS objects – lifetime management. Instead of having to remember to call DeleteCriticalSection
everywhere, it might be needed, we can do that in the destructor, as per the Resource Acquisition Is Initialization idiom.
#include <Windows.h>
class CriticalSection
{
public:
CriticalSection()
{
::InitializeCriticalSection(&cs_);
}
~CriticalSection()
{
::DeleteCriticalSection(&cs_);
}
void Enter()
{
::EnterCriticalSection(&cs_);
}
void Leave()
{
::LeaveCriticalSection(&cs_);
}
private:
CriticalSection(const CriticalSection&);
CriticalSection& operator=(const CriticalSection&);
CRITICAL_SECTION cs_;
};
Quite simple, really, with little overhead. And because every Enter
must be matched by a Leave
, the sensible thing to do is to write a RAII wrapper for that, too. If we don’t, odds are that at some point, we’ll alter the code using the CriticalSection
and introduce a new exit point, via a function return or exception, which won’t get the Leave
function called. A RAII wrapper helps code robustness.
class CSLock
{
public:
CSLock(CriticalSection& section)
: section_(section)
{
section_.Enter();
}
~CSLock()
{
section_.Leave();
}
private:
CSLock(const CSLock&);
CSLock& operator=(const CSLock&);
CriticalSection& section_;
};
This automates the locking, so that we only need declare a CSLock
at the beginning of the scope we wish to lock for interruption, and will automatically unlock as we leave the scope and the destructor is called. We’ll see examples of how these are used below.
Something to See
Now that we have the tools to support a multi-threaded application, let’s write a simple Observer system. This requires something to observe, and something to do the observing. For this example, we’ll make a Subject
class, which declares an internal, abstract Subject::Observer
class, from which we’ll derive our observers. The Subject
notification in this example will send an integer to the observers.
#include <set>
#include "CSLock.h"
class Subject
{
public:
class Observer
{
Subject* subject_;
friend class Subject;
void SetSubject(Subject* subject)
{
if (subject != subject_)
{
if (0 != subject_)
{
subject_->Unregister(*this);
}
subject_ = subject;
}
}
void ReleaseFromSubject(Subject* subject)
{
if (subject == subject_)
{
subject_ = 0;
}
}
protected:
Observer()
: subject_(0)
{}
virtual void Notify(int) = 0;
public:
virtual ~Observer()
{
SetSubject(0);
}
};
~Subject()
{
CSLock lock(criticalsection_);
for (std::set<Observer*>::iterator i = observers_.begin();
i != observers_.end(); ++i)
{
(*i)->ReleaseFromSubject(this);
}
}
void Register(Observer& observer)
{
CSLock lock(criticalsection_);
observers_.insert(&observer);
observer.SetSubject(this);
}
void Unregister(Observer& observer)
{
CSLock lock(criticalsection_);
observers_.erase(&observer);
observer.ReleaseFromSubject(this);
}
void Notify(int val) const
{
CSLock lock(criticalsection_);
for (std::set<Observer*>::const_iterator i = observers_.begin();
i != observers_.end(); ++i)
{
(*i)->Notify(val);
}
}
size_t ObserverCount() const
{
return observers_.size();
}
private:
std::set<Observer*> observers_;
mutable CriticalSection criticalsection_;
};
The first thing to note here is that the Subject
and Observer
are tightly coupled, which somewhat paradoxically is to help de-couple the derived observers from the Subject
. The logic and responsibility of maintaining the relationship is kept private, thanks to the friendship between the Subject
and Observer
, so that derived classes can’t affect it. This tight coupling is also the reason for making the Observer
an internal class, to emphasise this is not any old observer, but one for this particular Subject
.
Another thing worth noting is that just as the Subject::Observer
class leaves the actual handling of a Notify
call to a derived class, the Subject
class here isn’t concerned with the generation of values to notify observers with. That’s for someone else, this Subject
only handles its observers and getting notifications out to them. (Indeed, it would be a relatively trivial task to make the notification type (int
in this example) a template type, and make this a generic and re-usable Observer pattern implementation. To do so is left as an exercise to the reader. Just mind whether you notify by value, reference, or pointer.)
A final point worth making is that the CriticalSection
is declared to be mutable
. The reason for this is that it’s only altered during a function call, by the CSLock
, but at the end of the function call, it will have been restored to its previous state. By indicating it’s mutable, we can make the Notify
function const
.
So, let’s put it all together, with a custom observer that saves the latest value, a function to produce values, a thread, and a complete program.
#include <iostream>
class PastObserver : public Subject::Observer
{
int value_;
mutable CriticalSection criticalsection_;
public:
PastObserver()
: value_(0)
{}
int GetLastValue() const
{
CSLock lock(criticalsection_);
return value_;
}
virtual void Notify(int value)
{
{
CSLock lock(criticalsection_);
value_ = value;
}
std::cout << "PastObserver notified: " << value_ << std::endl;
}
};
DWORD WINAPI ValueFunction(void* pParam)
{
Subject* subject = (Subject*)pParam;
std::cout << "Thread started with " <<
subject->ObserverCount() << " observers" << std::endl;
int val = (int)time(0);
while (0 < subject->ObserverCount())
{
srand(val);
val = rand();
subject->Notify(val);
Sleep(100 * (val & 0x7));
}
std::cout << "Thread ended" << std::endl;
return 0;
}
void main()
{
Subject subject;
PastObserver observer;
subject.Register(observer);
CreateThread(NULL, NULL, &ValueFunction, &subject, NULL, NULL);
Sleep(1000);
subject.Unregister(observer);
std::cout << "Last value: " << observer.GetLastValue() << std::endl;
::WaitForSingleObject(thread, INFINITE);
}
And that’s it. A reasonably small and clear illustration of how the Observer pattern works. In this example, we synchronise with the past, by reading the last value. Now, let’s synchronise with the future!
Reading the Future
So how do you read the future? Well, obviously, our observer has to wait for it to happen, so we’ll need another synchronisation object: the Event. This is a boolean object which can be set or reset, and waited for with WaitForSingleObject
. As it turns out, we’ll need two of those – one to indicate we’re waiting for data, and one to indicate we’ve received it.
class FutureObserver : public Subject::Observer
{
int value_;
HANDLE waiting_;
HANDLE newValue_;
public:
FutureObserver()
: value_(0),
waiting_(0),
newValue_(0)
{
waiting_ = ::CreateEvent(0, 0, 0, 0);
newValue_ = ::CreateEvent(0, 0, 0, 0);
}
~FutureObserver()
{
CloseHandle(waiting_);
CloseHandle(newValue_);
}
int GetNextValue() const
{
SetEvent(waiting_);
if (WAIT_OBJECT_0 == ::WaitForSingleObject(newValue_, INFINITE))
{
return value_;
}
else
{
throw std::exception("Failed waiting for next value");
}
}
virtual void Notify(int value)
{
std::cout << "FutureObserver notified: " << value << std::endl;
if (WAIT_OBJECT_0 == ::WaitForSingleObject(waiting_, 0))
{
value_ = value;
::SetEvent(newValue_);
}
}
};
This observer is a bit more complex, as it has to manage the two Event
objects, but the principle is simple enough. In the GetNextValue()
function, we set one event, and wait for the other. The next time Notify()
is called, it will see the waiting_
flag is set, so it will store the value and signal that newValue_
is ready. The events are created to reset automatically, as soon as a WaitForSingleObject
call is successful (e.g., when the event it is waiting for has been set).
The GetNextValue()
function waits infinitely here – it will not continue until it’s found a value – and so the exception should never happen, unless the FutureObserver
has been deleted in another thread. If you’d prefer a timeout, just overload the GetNextValue()
function:
int GetNextValue(DWORD millisecondTimeout, bool& timedOut) const
{
SetEvent(waiting_);
switch (::WaitForSingleObject(newValue_, millisecondTimeout))
{
case WAIT_OBJECT_0:
timedOut = false;
return value_;
case WAIT_TIMEOUT:
timedOut = true;
return 0;
default:
throw std::exception("Failed waiting for next value");
}
}
The Notify()
function, in contrast, doesn’t wait at all. When WaitForSingleObject
is called with a timeout of zero milliseconds, it returns immediately, so we have to check the return value to see if we were successful. This means we’re not holding up the Subject::Nofify()
more than necessary.
Finally, let’s put it all together:
void main()
{
Subject subject;
PastObserver past;
subject.Register(past);
FutureObserver future;
subject.Register(future);
HANDLE thread = CreateThread(NULL, NULL, &ValueFunction, &subject, NULL, NULL);
Sleep(1000);
std::cout << "Last value: " << past.GetLastValue() << std::endl;
std::cout << "Next value: " << future.GetNextValue() << std::endl;
std::cout << "Last value: " << past.GetLastValue() << std::endl;
std::cout << "Next value: " << future.GetNextValue() << std::endl;
subject.Unregister(past);
subject.Unregister(future);
::WaitForSingleObject(thread, INFINITE);
}
As always, if you found this interesting or useful, or have suggestions for improvements, please let me know.
Filed under: Code, CodeProject
Tagged: C++, Multithreading, Win32