Introduction
Sprinkler is an advanced synchronization object, similar to a semaphore, but with added functionality.
Sprinkler enables you to externalize data from internal threads. In a high concurrency environment, there are many cases where one main thread needs to wait for one or more threads, and data needs to be shared among all the threads.
For instance, you may want to externalize exceptions from the inner threads to the main thread.
Background
Back in the day, we designed an asynchronous execution queue (an architecture for executing concurrent tasks in high scale).
Our work methodology is TDD, so we started to think about testing our framework from day one.
We learned very quickly that we need some kind of barrier that will help our main thread to wait (with timeout) for our framework's inner threads.
Even this was not enough, because there were cases where the test expected an exception, but because the exception was being thrown by the inner thread, it could not be externalized to the main thread by simply doing a Java throw.
Moreover, sometimes the test needed additional data from the inner threads, and we wanted to find a good way to transfer it.
In order to fulfill our requirements, we needed some kind of semaphore and a shared memory. And so, Sprinkler was created.
Using the code
Sprinkler is based on the Singleton design pattern. It contains two primary methods: await and release.
Sprinkler supports multiple semaphores (contexts). With each call to await and release, you must provide
a unique identifier (context).
Here is an example of Sprinkler's capabilities - data and exception externalization:
public class TestSprinkler {
@Test
public void testAwait_ReleaserDeliversData() {
final int CONTEXT = 1;
final String DATA = "bla bla";
ExecutorServiceFactory.getCachedThreadPoolExecutor().submit(new Callable<void>() {
@Override
public Void call() throws Exception {
Sprinkler.getInstance().release(CONTEXT, DATA);
return null;
}
});
String data = (String) Sprinkler.getInstance().await(CONTEXT, 10000);
Assert.assertEquals(DATA, data);
}
@Test
public void testAwait_InnerThreadExternalizeException() {
final int CONTEXT = 1;
final String EXCEPTION_MESSAGE = "test inner thread exception message";
ExecutorServiceFactory.getCachedThreadPoolExecutor().submit(new Callable<void>() {
@Override
public Void call() throws Exception {
Sprinkler.getInstance().release(CONTEXT, new RuntimeException(EXCEPTION_MESSAGE));
return null;
}
});
Throwable thrown = null;
try {
Sprinkler.getInstance().await(CONTEXT, 10000);
} catch (Throwable t) {
thrown = t;
}
Assert.assertTrue(thrown instanceof SprinklerException);
Assert.assertEquals(EXCEPTION_MESSAGE, thrown.getCause().getMessage());
}
@Test
public void testAwait_Timeout() {
final int CONTEXT = 1;
ExecutorServiceFactory.getCachedThreadPoolExecutor().submit(new Callable<void>() {
@Override
public Void call() throws Exception {
Thread.sleep(10000);
return null;
}
});
Throwable thrown = null;
try {
Sprinkler.getInstance().await(CONTEXT, 1);
} catch (Throwable t) {
thrown = t;
}
Assert.assertTrue(thrown.getCause() instanceof TimeoutException);
}
}
Sprinkler source code:
public class Sprinkler {
private static Sprinkler _instance = new Sprinkler();
private final ConcurrentMap<Integer, SprinklerData> _data =
new ConcurrentHashMap<Integer, SprinklerData>();
private Sprinkler() {}
public static Sprinkler getInstance() {
return _instance;
}
public void reset() {
_data.clear();
}
public Object await(int key, long timeout) {
SprinklerData data = null;
try {
data = getData(key);
doAwait(data.getLatch(), timeout);
externalizeException(data);
} finally {
_data.remove(key);
}
return data != null ? data.getInternal() : null;
}
public void release(int key) {
release(key, null, null);
}
public synchronized void release(int key, Object internalData) {
release(key, internalData, null);
}
public synchronized void release(int key, Throwable ex) {
release(key, null, ex);
}
public synchronized void release(int key, Object internalData, Throwable ex) {
SprinklerData data = getData(key);
data.setInternal(internalData);
data.setAlreadyReleased(true);
if (ex != null) {
data.setException(ex);
}
notify(data.getLatch());
}
private synchronized SprinklerData getData(int key) {
SprinklerData data = _data.get(key);
if (data == null) {
data = new SprinklerData();
_data.put(key, data);
}
return data;
}
private void externalizeException(SprinklerData data) {
if (!isAlreadyReleased(data)) {
throw new SprinklerException(new TimeoutException());
}
Throwable thrown = data.getException();
if (thrown != null) {
throw new SprinklerException(thrown);
}
}
private void doAwait(CountDownLatch latch, long timeout) {
try {
latch.await(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
throw new SprinklerException(ex);
}
}
private synchronized boolean isAlreadyReleased(SprinklerData data) {
return data.isAlreadyReleased();
}
private void notify(CountDownLatch lock) {
lock.countDown();
}
private static class SprinklerData {
private final CountDownLatch _latch;
private boolean _isAlreadyReleased = false;
private Throwable _thrown;
private Object _internal;
public SprinklerData() {
_latch = new CountDownLatch(1);
}
public Object getInternal() {
return _internal;
}
public void setInternal(Object data) {
_internal = data;
}
public CountDownLatch getLatch() {
return _latch;
}
public boolean isAlreadyReleased() {
return _isAlreadyReleased;
}
public void setAlreadyReleased(boolean isAlreadyReleased) {
_isAlreadyReleased = isAlreadyReleased;
}
public Throwable getException() {
return _thrown;
}
public void setException(Throwable thrown) {
_thrown = thrown;
}
@Override
public String toString() {
return String.format(
"SprinklerData [latch.count=%s, isAlreadyReleased=%s, internal=%s, thrown.message=%s]",
_latch == null ? "null latch" : _latch.getCount(),
_isAlreadyReleased,
_internal,
_thrown == null ? "null" : _thrown.getMessage());
}
}
}
Points of Interest
Another way of achieving this purpose is to use Thread Pool. A thread pool provides you with a way to centralize all currently running threads. Since it has control over
all the threads running in the pool, you can use it to call wait on a thread.
Example:
public class TestThreadPoolJoiner {
@Test
public void testJoin() throws InterruptedException, ExecutionException {
final int TASKS = 10;
final AtomicInteger executedTasks = new AtomicInteger(0);
ThreadPoolJoiner joiner = new ThreadPoolJoiner();
for (int i = 0; i < TASKS; i++) {
joiner.submit(new Callable<Integer>() {
@Override
public Integer call() {
return executedTasks.incrementAndGet();
}
});
}
joiner.join();
Assert.assertEquals(TASKS, executedTasks.get());
}
}
public class ThreadPoolJoiner extends ThreadPoolExecutor {
Collection<Future<?>> _tasks = new CopyOnWriteArrayList<Future<?>>();
public ThreadPoolJoiner() {
super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
public void join() throws InterruptedException, ExecutionException {
for (Future<?> currTask : _tasks) {
currTask.get();
}
}
@Override
public <T> Future<T> submit(Callable<T> task) {
Future<T> ret = super.submit(task);
_tasks.add(ret);
return ret;
}
}