Introduction
If you are programming with asynchronous operations (e.g. threads), sooner or later, you will need to have a mechanism which enables a thread to suspend awaiting some kind of "acknowledge" (signaling) from another thread (the basic idea is "I want to wait when you have finished your job, tell me when"). Well, that mechanism is well known in computer literacy as an "Event" and most operating systems provide such functionality in their API. Java however does not provide an out-of-the-box class implementing this full mechanism, but only provides the basic functions needed to construct one. Moreover, it does not provide a straightforward way to suspend a thread waiting on multiple events for the first event to happen, feature which is sometimes needed. The SyncEvent
packs all these functionalities in a Java class, which can be used as an hassle-free way for thread synchronization operations.
Background
I come from a Microsoft background and I had to switch to Java to write some simple Android apps. Android uses heavily asynchronous operations (for example all network related calls must not be called from the main process thread) and I was astonished that neither Java nor Android provides an out of the box class for such functionality, so common in Microsoft operating system and .NET framework. I looked on the internet for a general purpose class, but I was unable to find one, so I decided to build the SyncEvent
class, with the most needed functionalities.
Description of SyncEvent Class
Let's take a look at the interface of SyncEvent
class:
- We have a constructor which takes a
String
as input parameter, which is used only in logging to distinguish an event from another (it's highly likely that you will have more than one event in your programs). - The main methods are (they are all documented according to javadoc specification):
waitForEvent
: which suspends the thread, awaiting a signal on the event. You can specify whether the wait is for an indefinite/definite period of time and whether the signalled state of the event is autocleared on return from the method (true
as default). signalEvent
: which wakes up one of the threads awaiting on this event. resetEvent
: which manually clears the signalled state of the event isSignalled
: which returns whether this event has been signalled or not. You can specify also here if you wish to auto-clear the signal status on return or not. getName
: which returns the event name, as previously specified in the constructor.
- There are also two
static
methods:
package com.fdm.syncevent;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import android.util.Pair;
public class SyncEvent {
final static ILogger log = new LogcatLogger("SyncEvent");
Object sync;
boolean syncSignalled;
String syncName;
public SyncEvent(String name) {
sync=new Object();
syncSignalled=false;
syncName=name;
}
public String getName() {
return syncName;
}
public boolean waitForEvent() {
return waitForEvent(true);
}
public boolean waitForEvent(boolean autoclear) {
synchronized (sync) {
while (!syncSignalled) {
try {
sync.wait();
}
catch (InterruptedException e) {
return false;
}
}
if (autoclear)
syncSignalled=false;
}
return true;
}
public int waitForEvent(long millis) {
return waitForEvent(millis, true);
}
public int waitForEvent(long millis, boolean autoclear) {
long startTime,elapsed;
synchronized (sync) {
while (!syncSignalled) {
try {
startTime=System.currentTimeMillis();
sync.wait(millis);
elapsed=System.currentTimeMillis()-startTime;
if (elapsed>=millis)
break;
else
millis-=elapsed;
}
catch (InterruptedException e) {
return -1;
}
}
boolean signal=syncSignalled;
if (autoclear)
syncSignalled=false;
return (signal?0:-2);
}
}
public static int waitForMultipleEventsExecutor(final long millis, final SyncEvent ...events) {
int numEvents=events.length,ris=-1;
ExecutorService poolMultipleWait=Executors.newFixedThreadPool(numEvents);
ExecutorCompletionService<Pair<Integer,Integer>>
cs=new ExecutorCompletionService<Pair<Integer,Integer>>(poolMultipleWait);
for (int i=0;i<numEvents;i++) {
final int index=i;
cs.submit(new Callable<Pair<Integer,Integer>>() {
@Override
public Pair<Integer,Integer> call() throws Exception {
return new Pair<Integer,Integer>(index,events[index].waitForEvent(millis,false));
}
});
}
Pair<Integer, Integer> signalled;
try {
log.d("waitForMultipleEvents: awaiting on %d events %d timeout",numEvents,millis);
signalled = cs.take().get();
if (signalled.second==0)
ris=signalled.first;
else
ris=signalled.second;
events[signalled.first].resetEvent();
log.d("waitForMultipleEvents: received event %d (%s) result %d",
signalled.first,events[signalled.first].syncName,signalled.second);
}
catch (InterruptedException e) {
ris=-1;
}
catch (ExecutionException e) {
log.e(e,"waitForMultipleEvents: exception");
}
finally {
poolMultipleWait.shutdownNow();
}
return ris;
}
public static int waitForMultipleEvents (final long millis, final SyncEvent ...events) {
int numEvents=events.length,ris=-1;
final SyncEvent eventQueueNotEmpty=new SyncEvent("eventQueueNotEmpty");
final Queue<Pair<Integer,Integer>>
eventQueue=new ConcurrentLinkedQueue<Pair<Integer,Integer>>();
Thread [] poolMultipleWait=new Thread[numEvents];
for (int i=0;i<numEvents;i++) {
final int index=i;
poolMultipleWait[i]=new Thread() {
@Override
public void run() {
eventQueue.add(new Pair<Integer,Integer>
(index,events[index].waitForEvent(false)?0:-1));
eventQueueNotEmpty.signalEvent();
}
};
poolMultipleWait[i].start();
}
log.d("waitForMultipleEvents: awaiting on %d events %d timeout
(ActiveThreads %d)",numEvents,millis,Thread.activeCount());
ris=eventQueueNotEmpty.waitForEvent(millis);
for (int i=0;i<numEvents;i++)
if (poolMultipleWait[i].isAlive())
poolMultipleWait[i].interrupt();
if (ris==0) {
Pair<Integer, Integer> signalled = eventQueue.peek();
if (signalled.second==0) {
ris=signalled.first;
events[signalled.first].resetEvent();
}
else
ris=signalled.second;
log.d("waitForMultipleEvents: received event %d (%s) result %d",
signalled.first,events[signalled.first].syncName,signalled.second);
}
return ris;
}
public static int waitForMultipleEvents (final SyncEvent ...events) {
int numEvents=events.length,ris=-1;
final SyncEvent eventQueueNotEmpty=new SyncEvent("eventQueueNotEmpty");
final Queue<Pair<Integer,Integer>>
eventQueue=new ConcurrentLinkedQueue<Pair<Integer,Integer>>();
Thread [] poolMultipleWait=new Thread[numEvents];
for (int i=0;i<numEvents;i++) {
final int index=i;
poolMultipleWait[i]=new Thread() {
@Override
public void run() {
eventQueue.add(new Pair<Integer,Integer>
(index,events[index].waitForEvent(false)?0:-1));
eventQueueNotEmpty.signalEvent();
}
};
poolMultipleWait[i].start();
}
log.d("waitForMultipleEvents: awaiting on %d events (ActiveThreads %d)",
numEvents,Thread.activeCount());
ris=eventQueueNotEmpty.waitForEvent()?0:-1;
for (int i=0;i<numEvents;i++)
if (poolMultipleWait[i].isAlive())
poolMultipleWait[i].interrupt();
if (ris==0) {
Pair<Integer, Integer> signalled = eventQueue.peek();
if (signalled.second==0) {
ris=signalled.first;
events[signalled.first].resetEvent();
}
else
ris=signalled.second;
log.d("waitForMultipleEvents: received event %d (%s) result %d",
signalled.first,events[signalled.first].syncName,signalled.second);
}
return ris;
}
public static int waitForMultipleEventsExecutor (final SyncEvent ...events) {
int numEvents=events.length,ris=-1;
ExecutorService poolMultipleWait=Executors.newFixedThreadPool(numEvents);
ExecutorCompletionService<Pair<Integer,Boolean>>
cs=new ExecutorCompletionService<Pair<Integer,Boolean>>(poolMultipleWait);
for (int i=0;i<numEvents;i++) {
final int index=i;
cs.submit(new Callable<Pair<Integer,Boolean>>() {
@Override
public Pair<Integer,Boolean> call() throws Exception {
return new Pair<Integer,Boolean>(index,events[index].waitForEvent(false));
}
});
}
Pair<Integer, Boolean> signalled;
try {
log.d("waitForMultipleEvents: awaiting on %d events",numEvents);
signalled = cs.take().get();
if (signalled.second)
ris=signalled.first;
events[signalled.first].resetEvent();
log.d("waitForMultipleEvents: received event %d (%s) result %b",
signalled.first,events[signalled.first].getName(),signalled.second);
}
catch (InterruptedException e) {
ris=-1;
}
catch (ExecutionException e) {
log.e(e,"waitForMultipleEvents: exception");
}
finally {
poolMultipleWait.shutdownNow();
}
return ris;
}
public boolean isSignalled () {
return isSignalled(true);
}
public boolean isSignalled (boolean autoclear) {
synchronized (sync) {
boolean ris=syncSignalled;
if (autoclear)
syncSignalled=false;
return ris;
}
}
public void signalEvent() {
synchronized (sync) {
syncSignalled=true;
sync.notify();
}
}
public void resetEvent() {
synchronized (sync) {
syncSignalled=false;
}
}
}
Implementations Notes
The SyncEvent
class deals with most of the common issues found in Java standard functions for synchronization (wait and notify functions of Object
class):
- Missed Events: If you issue a notify function on an object BEFORE a thread has started waiting on the same object, the signal is lost (i.e. the waiting thread continues to wait without waking up). The solution is well known (see [1]) and it is based on saving the signalled status on an internal variable called
syncSignalled
. - Spurious Wakeups: For "inexplicable" reasons, it is possible for threads to wake up from a wait call even if
notify()
method has not been called. This is known as spurious wakeups (e.g. wakeups without any reason). To deal with such unexpected behaviour, the syncSignalled
variable comes through again, since all waitForEvent
methods go in a check variable-wait loop until the syncSignalled
variable is set. - Wait For Multiple Events: Java does not provide a synchronization function which waits on multiple events (well known in Windows API as
WaitForMultipleObjects
). The idea to solve this problem is to create several threads (one for every event we want to wait on), each of which suspends in a waitForEvent
operation on one of the input events (in the WaitForMultipleEventsExecutor
method threads are created using the ExecutorService
class as thread pool). In turn, the thread calling the static waitForMultipleEvents
method suspends to wait for which of the created threads wakes up "first", using a queue and the same SyncEvent class to notify the termination of a thread (in the WaitForMultipleEventsExecutor the queue and termination event are all handled by the ExecutorCompletionService
class). As soon as one of events is signalled, the corresponding created thread ends and WaitForMultipleEvents
function awakes, collects the result and sends an interrupt to all the remaining threads, still waiting for the other unfired events. It is important here to use non auto-clearing waitForEvent
operations, otherwise some events could get lost: for example when two events are signalled one after another, before the WaitForMultipleEvents
function has had enough time to interrupt all the remaining event-waiting threads. - InterruptedException: Many Java methods which deal with thread suspension throw the unwanted
InterruptedException
which must be caught, even though most of the times is not even handled by software (e.g., try
with an empty catch
). The InterruptedException
is basically notifying to a "waiting" thread (e.g. thread which has called wait or sleep) that someone is trying to "kill" us (e.g. the user is trying to close the process or another thread has explicitly called interrupt method on the suspended thread). Since the suspended thread is not being executed, the only way to notify is by resuming it throwing an exception, which, as you can fancy, is the renowed InterruptedException
exception. If you have understood the mechanism, you can probably imagine that the best way to handle this exception is by terminating whatever operation you were doing in the suspended thread, releasing any allocated resources and then exiting the thread. It would be useful if this exception could be propagated up through the call stack, but due to Java's checked exception, this is not always possible (e.g. use of wait
functions in a method with a precise signature which does not allow any further throw
clauses, such as when implementing methods of a Java interface), so for convenience's sake, all methods of SyncEvent
class catch InterruptedException
internally and return a particular value, when interrupted (see method documentation).
Using the Code
First, instantiate the SyncEvent
objects:
SyncEvent connected = new SyncEvent("connected");
SyncEvent newPeer = new SyncEvent("newPeer");
On the thread you want to wait for the event, call:
connected.waitForEvent();
connected.waitForEvent(10000);
On the thread you want to signal the event, call:
connected.signalEvent();
If you want to wait for multiple events on a thread:
SyncEvent.waitForMultipleEvents(connected,newPeer);
SyncEvent.waitForMultipleEvents(10000,connected,newPeer);
Source Code
You can find attached an Android Eclipse project named SyncEvent
containing the SyncEvent
class and a sample test program. It consists of a simple activity containing just two buttons:
- Start: which has a dual behaviour:
- If there is no waiting thread, it creates a thread called "Thread 1" which simply waits endlessly on two
SyncEvent
s called "start" and "stop" (actually instead of waiting indefinitely it simulates it by using a while loop and definite wait of 5 seconds to test the "wait for timeout" methods, which were bugged in earlier versions). Whenever either start or stop event is received, "Thread 1" logs the received event and then re-begins waiting endlessly on the both SyncEvents. - If a waiting thread exists, it sends a random event (either start or stop) to wake "Thread 1".
- Abort: which aborts the waiting thread.
If you launch the activity, you can check the output in Eclipse LogCat window. The application logs also the number of active threads whenever it starts waiting on a SyncEvent
in order to check that all thread creation/interruption operations works flawlessly (i.e. thread count does not rise over time).
Here follows the main code for the click events, which is straightforward:
@Override
public void onClick(View v) {
switch (v.getId()) {
case R.id.startBtn:
if (t==null) {
t=new Thread() {
public void run() {
log.i("Thread 1 started");
while (true) {
log.i("Awaiting start or stop signal from another thread");
int ris=SyncEvent.waitForMultipleEvents(5000,events);
if (ris==-1) {
log.i("Thread 1 abort signal received");
break;
}
else if (ris>=0)
log.i("Thread 1 %s event received, processing...",events[ris].getName());
else
log.i("Thread 1 timeout elapsed, restarting...");
}
log.i("Thread 1 exiting");
t=null;
};
};
t.start();
}
else {
int offset=(int) (System.currentTimeMillis()%2);
log.i("Signalling %s to thread 1",events[offset].getName());
events[offset].signalEvent();
}
break;
case R.id.abortBtn:
log.i("Signalling abort to thread 1");
if (t!=null)
t.interrupt();
break;
}
}
Remarks
One could note that all these operations could be simply realized by inserting a constant value in a blocking queue. Obviously this is true, since a blocking queue is "under the hood" a combination of a queue and events. To avoid misuse, events should be used when you just want to wake a thread (e.g., send a signal without any data), while blocking queues should be used when you are sending some data to a "consumer" thread.
References
History
- V1.0 Initial release (30 June 2015)
- V 1.1 Fixed
SyncEvent
class source code (30 June 2015) - V 1.2 Fixed
waitForEvent
with timeout method (to discriminate between spurious wakeup and timeout), reimplemented waitForMultipleEvents
with ConcurrentLinkedQueue, Threads and a SyncEvent for the most "basic" code [older waitForMultipleEvents implementation is available in new waitForMultipleEventsExecutor
methods], added getName
method and updated Android test application (8 July 2015)