I have a confession: with the never ending things going on (you know: life ;-) I missed the (fairly) recent changes in the C++ language. C++ was my first OO language and it probably remains my favorite. I can't help but love the mix of high level abstractions with metal grinding pointer arithmetic - it's like a cool sports car with manual transmission. Beauty and power. You have to be more alert, more involved. That's part of the fun - you are taking control. But for some time now, C++ felt old, tired, disconnected from the endless stream of new languages. Until C++11 came along.
Wikipedia describes C++11 as follows:
C++11 (formerly known as C++0x) is the most recent version of the standard of the C++ programming language. It was approved by ISO on 12 August 2011. C++11 includes several additions to the core language and extends the C++ standard library, incorporating most of the C++ Technical Report 1 (TR1) libraries.
Bjarne Stroustrup wrote that:
Surprisingly, C++11 feels like a new language: The pieces just fit together better than they used to and I find a higher-level style of programming more natural than before and as efficient as ever.
And it does feel like a new language. And this is exciting for geeks like me. In this blog post, I discuss how I implemented Schmidt's Active Object pattern in a novel way using C++11 Closures.
First, another confession: for a long time I've suffered from Node envy. Node.js envy, to be precise. Look at this "Hello World" JavaScript code:
var http = require("http");
http.createServer(function(request, response) {
response.writeHead(200, {"Content-Type": "text/plain"});
response.write("Hello World");
response.end();
}).listen(8888);
What I "envy" is not the use of asynchronous I/O operation with callbacks ("the callback pattern"), but the compelling beauty of Lambda functions. Lambda functions simplify asynchronous programming because they allow us to write code that is seemingly synchronous. The code that is executed by the lambda function is temporally disjointed from the code that precedes it, and yet both parts are spatially co-located. And the outcome is smaller, tighter code that feels more natural and is easier to read and maintain. And this can be done in C++11.
I won't discuss C++11 lambda functions because others have done this better than I can. This article is an example of some of the great coverage you can find on the net (Alex Allain has lots of interesting material to read). But I do want to touch on the difference between Lambda functions and Closures, since my implementation below uses Closures. Lambda functions are anonymous functions that don't need to be bound to a name and can be specified as lambda expressions. A Closure is an example of a lambda function which "closes" over the environment in which it was specified (meaning that it can access the variables available in the referencing environment). Alex Allain's article (which I referenced above) doesn't make a big distinction between lambdas and closures and simply treats closures as lambdas with "variable capture". Syntactically in C++ lambdas and closures are almost identical, so the distinction is there and it is slight, yet I think it is important to note the semantic difference.
On to Active Object.
Douglas Schmidt describes the Active Object design pattern in Pattern Oriented Software Architecture (Volume 2: Patterns for Concurrent and Networked Objects):
The Active Object design pattern decouples method execution from method invocation to enhance concurrency and simplify synchronized access to objects that reside in their own threads of control.
Once again, I don't want to paraphrase the work of others, so I assume that you are knowledgeable about the details of the Active Object pattern. If not, you should probably familiarize yourself with the pattern before reading on.
To illustrate my ideas, I will only concentrate on one variation of the Active Object pattern. In this variation, the Client and Proxy are "folded" into the same object and the Scheduler and ActivationList implement a simple message queue policy (this is reminiscent of Schmidt's original AO paper, which he later expanded on). I think this is probably the most prevalent variation of the pattern - in which we want to serialize access to an object, and use an in-order queue (FIFO) to "bounce" the method invocation from one thread to another.
Let's look at the example code from the Wikipedia entry on Active Object. The Wikipedia code is implemented in Java and I went and implemented it using C++11. I placed the comments in the code to explain the logic.
#include "stdafx.h"
#include <thread>
#include <queue>
#include <mutex>
#include <future>
typedef std::function<void()> Operation;
class OriginalClass {
private:
double val;
public:
OriginalClass() : val(0) {}
void doSomething() {
val = 1.0;
}
void doSomethingElse() {
val = 2.0;
}
};
class DispatchQueue {
std::mutex qlock;
std::queue<Operation> ops_queue;
std::condition_variable empty;
public:
void put(Operation op) {
std::lock_guard<std::mutex> guard(qlock);
ops_queue.push(op);
empty.notify_one();
}
Operation take() {
std::unique_lock<std::mutex> lock(qlock);
empty.wait(lock, [&]{ return !ops_queue.empty(); });
Operation op = ops_queue.front();
ops_queue.pop();
return op;
}
};
class BecomeActiveObject
{
private:
double val;
DispatchQueue dispatchQueue;
std::atomic<bool> done;
std::unique_ptr<std::thread> runnable;
public:
BecomeActiveObject() : val(0), done(false) {
runnable = std::make_unique<std::thread>(&BecomeActiveObject::run, this);
}
~BecomeActiveObject() { runnable->join(); }
void run() {
while (!done) {
dispatchQueue.take()();
}
}
void doSomething()
{
auto runnable = [&]() {
val = 1.0;
};
dispatchQueue.put(runnable);
}
void doSomethingElse()
{
dispatchQueue.put(( [&]() {
val = 2.0;
}
));
}
};
int main(int argc, char **argv) {
BecomeActiveObject active;
active.doSomething();
active.doSomethingElse();
return 0;
}
The more "traditional" method of implementing ActiveObject in C++ involves defining two sets of interfaces: a public
interface and a private
interface. Every method in the public
interface also appears in the private
interface. The public
interface is used by clients to invoke methods on the object, and they create a message indicating the request and its parameters and enqueue the message. The private
interface is used by the dispatcher which dequeues messages and invokes the private
method. This works well enough but creates big classes that have a lot of extraneous code that is there just to get all this mechanics to work. Every change to the interface requires a series of changes (public
and private
interface; message definition).
#include "stdafx.h"
#include <thread>
#include <queue>
#include <mutex>
#include <future>
#include <string>
#include <iostream>
template<typename T>
class DispatchQueue {
std::mutex qlock;
std::queue<T> ops_queue;
std::condition_variable empty;
public:
void put(T op) {
std::lock_guard<std::mutex> guard(qlock);
ops_queue.push(op);
empty.notify_one();
}
T take() {
std::unique_lock<std::mutex> lock(qlock);
empty.wait(lock, [&]{ return !ops_queue.empty(); });
T op = ops_queue.front();
ops_queue.pop();
return op;
}
};
class BecomeActiveObject
{
private:
enum {
id_doSomething,
id_doSomethingElse
};
struct MessageInt {
int val;
};
struct MessageStr {
char* val;
};
struct MessageData {
size_t id;
union {
MessageInt intVal;
MessageStr strVal;
};
};
DispatchQueue<MessageData> dispatchQueue;
std::atomic<bool> done;
std::unique_ptr<std::thread> runnable;
void __doSomething(int a)
{
std::cout << "In __doSomething(" << a << ")" << std::endl;
}
void __doSomethingElse(const std::string &a)
{
std::cout << "In __doSomethingElse(" << a << ")" << std::endl;
}
public:
BecomeActiveObject() : done(false) {
runnable = std::make_unique<std::thread>(&BecomeActiveObject::run, this);
}
~BecomeActiveObject() { runnable->join(); }
void run() {
while (!done) {
MessageData msg = dispatchQueue.take();
switch (msg.id) {
case id_doSomething:
__doSomething(msg.intVal.val);
break;
case id_doSomethingElse:
__doSomethingElse(msg.strVal.val);
delete msg.strVal.val;
break;
default:
break;
}
}
}
void doSomething(int a)
{
MessageData data;
data.intVal.val = a;
data.id = id_doSomething;
dispatchQueue.put(data);
}
void doSomethingElse(const std::string &a)
{
MessageData data;
data.strVal.val = _strdup(a.c_str());
data.id = id_doSomethingElse;
dispatchQueue.put(data);
}
};
int main(int argc, char **argv) {
BecomeActiveObject active;
active.doSomething(5);
active.doSomethingElse("HelloWorld");
return 0;
}
A somewhat more sophisticated implementation uses functors. We no longer need the code which does the switching on the message type when we grab a message from the FIFO and dispatch it. But the sophistication of the code probably only adds a layer of obfuscation if you are not familiar with the underlying idiom. We gain too little from this to be worthwhile.
#include "stdafx.h"
#include <thread>
#include <queue>
#include <mutex>
#include <future>
#include <string>
#include <iostream>
template<typename T>
class DispatchQueue {
std::mutex qlock;
std::queue<T> ops_queue;
std::condition_variable empty;
public:
void put(T op) {
std::lock_guard<std::mutex> guard(qlock);
ops_queue.push(op);
empty.notify_one();
}
T take() {
std::unique_lock<std::mutex> lock(qlock);
empty.wait(lock, [&]{ return !ops_queue.empty(); });
T op = ops_queue.front();
ops_queue.pop();
return op;
}
};
struct IMessage {
virtual int execute() = 0;
};
template <class TARGET, class METHOD, class PARAMS>
struct Message : public IMessage {
Message(TARGET *target, METHOD handlerMethod, PARAMS params) : target(target),
handlerMethod(handlerMethod),
params(params) {}
int execute();
private:
TARGET *target;
METHOD handlerMethod;
PARAMS params;
};
template <class TARGET, class METHOD, class PARAMS>
int Message<TARGET, METHOD, PARAMS>::execute() {
(target->*handlerMethod)(params);
return 1;
};
class BecomeActiveObject
{
private:
DispatchQueue<IMessage*> dispatchQueue;
std::atomic<bool> done;
std::thread runnable;
struct MessageInt {
int val;
};
struct MessageStr {
char* val;
};
union MessageData {
MessageInt intVal;
MessageStr strVal;
};
typedef void (BecomeActiveObject::*MsgHandler)(MessageData msgData);
typedef Message<BecomeActiveObject, MsgHandler, MessageData> AoMsg;
void __doSomething(MessageData data)
{
std::cout << "In __doSomething(" << data.intVal.val << ")" << std::endl;
}
void __doSomethingElse(MessageData data)
{
std::cout << "In __doSomethingElse(" << data.strVal.val << ")" << std::endl;
delete data.strVal.val;
}
public:
BecomeActiveObject() : done(false) {
runnable = std::thread(&BecomeActiveObject::run, this);
}
~BecomeActiveObject() { runnable.join(); }
void run() {
while (!done) {
IMessage* msg = dispatchQueue.take();
msg->execute();
delete msg;
}
}
void doSomething(int a)
{
MessageData data;
data.intVal.val = a;
AoMsg* m = new AoMsg(this, &BecomeActiveObject::__doSomething, data);
dispatchQueue.put(m);
}
void doSomethingElse(const std::string &a)
{
MessageData data;
data.strVal.val = _strdup(a.c_str());
AoMsg* m = new AoMsg(this, &BecomeActiveObject::__doSomethingElse, data);
dispatchQueue.put(m);
}
};
int main(int argc, char **argv) {
BecomeActiveObject active;
active.doSomething(5);
active.doSomethingElse("HelloWorld");
return 0;
}
Now let's come full circle and return to the Closure
implementation of ActiveObject
and add a few of features to it.
#include "stdafx.h"
#include <thread>
#include <queue>
#include <mutex>
#include <future>
#include <iostream>
typedef std::function<void()> Operation;
class DispatchQueue {
std::mutex qlock;
std::queue<Operation> ops_queue;
std::condition_variable empty;
public:
void put(Operation op) {
std::lock_guard<std::mutex> guard(qlock);
ops_queue.push(op);
empty.notify_one();
}
Operation take() {
std::unique_lock<std::mutex> lock(qlock);
empty.wait(lock, [&]{ return !ops_queue.empty(); });
Operation op = ops_queue.front();
ops_queue.pop();
return op;
}
};
class BecomeActiveObject
{
private:
double val;
DispatchQueue dispatchQueue;
std::atomic<bool> done;
std::thread runnable;
public:
BecomeActiveObject() : val(0), done(false) {
runnable = std::thread([=]{ run(); });
}
~BecomeActiveObject() {
dispatchQueue.put([&]() { done = true; });
runnable.join();
}
double getVal() { return val; }
void run() {
while (!done) {
dispatchQueue.take()();
}
}
int doSomething()
{
std::promise<int> return_val;
auto runnable = [&]() {
int ret = 999;
return_val.set_value(ret);
};
dispatchQueue.put(runnable);
return return_val.get_future().get();
}
void doSomethingElse()
{
dispatchQueue.put(([this]() {
this->val = 2.0;
}
));
}
void doSomethingWithParams(int a, int b) {
dispatchQueue.put(([a,b]() {
std::cout << "this is the internal implementation of doSomethingWithParams(";
std::cout << a << "," << b << ")\n";
}
));
}
void doSomethingWithReferenceParams(int &a, int &b) {
std::promise<void> return_val;
dispatchQueue.put(([&a, &b, &return_val]() {
std::cout << "this is the internal implementation of doSomethingWithReferenceParams(";
std::cout << a << "," << b << ")\n";
a = 1234;
b = 5678;
return_val.set_value();
}
));
return_val.get_future().get();
}
};
int main(int argc, char **argv) {
BecomeActiveObject active;
int i = active.doSomething();
assert(i = 999);
std::thread t1(&BecomeActiveObject::doSomethingElse, &active);
active.doSomethingWithParams(5, 7);
int a=1, b=2;
active.doSomethingWithReferenceParams(a, b);
assert(a == 1234 && b == 5678);
t1.join();
assert(active.getVal() == 2.0);
return 0;
}