Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Mobile / Android
Print

SyncEvent: The missing Java Event Class

5.00/5 (1 vote)
30 Jun 2015CPOL7 min read 9.5K   51  
This article presents a general purpose event class called SyncEvent supporting standard thread synchronization operations like signalling and waiting, also on multiple events.

Introduction

If you are programming with asynchronous operations (e.g. threads), sooner or later, you will need to have a mechanism which enables a thread to suspend awaiting some kind of "acknowledge" (signaling) from another thread (the basic idea is "I want to wait when you have finished your job, tell me when"). Well, that mechanism is well known in computer literacy as an "Event" and most operating systems provide such functionality in their API. Java however does not provide an out-of-the-box class implementing this full mechanism, but only provides the basic functions needed to construct one. Moreover, it does not provide a straightforward way to suspend a thread waiting on multiple events for the first event to happen, feature which is sometimes needed. The SyncEvent packs all these functionalities in a Java class, which can be used as an hassle-free way for thread synchronization operations.

Background

I come from a Microsoft background and I had to switch to Java to write some simple Android apps. Android uses heavily asynchronous operations (for example all network related calls must not be called from the main process thread) and I was astonished that neither Java nor Android provides an out of the box class for such functionality, so common in Microsoft operating system and .NET framework. I looked on the internet for a general purpose class, but I was unable to find one, so I decided to build the SyncEvent class, with the most needed functionalities.

Description of SyncEvent Class

Let's take a look at the interface of SyncEvent class:

  • We have a constructor which takes a String as input parameter, which is used only in logging to distinguish an event from another (it's highly likely that you will have more than one event in your programs).
  • The main methods are (they are all documented according to javadoc specification):
    • waitForEvent: which suspends the thread, awaiting a signal on the event. You can specify whether the wait is for an indefinite/definite period of time and whether the signalled state of the event is autocleared on return from the method (true as default).
    • signalEvent: which wakes up one of the threads awaiting on this event.
    • resetEvent: which manually clears the signalled state of the event
    • isSignalled: which returns whether this event has been signalled or not. You can specify also here if you wish to auto-clear the signal status on return or not.
    • getName: which returns the event name, as previously specified in the constructor.
  • There are also two static methods:
    • WaitForMultipleEvents: which allows waiting for multiple events, awaking as soon as one of the specified events happens (from version 1.2 it has been re-implemented with a queue and the same SyncEvent class for the most "basic" code).
    • WaitForMultipleEventsExecutor: earlier version of the WaitForMultipleEvents implemented with ExecutorCompletionService class (remains as an alternative implementation).

Java
package com.fdm.syncevent;

import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import android.util.Pair;

public class SyncEvent {
    final static ILogger log = new LogcatLogger("SyncEvent");

    Object sync;
    boolean syncSignalled;     	// the boolean is needed to save the status of signal notification, 
				// when the signal comes before the wait operation    
    String syncName;

    public SyncEvent(String name) {
        sync=new Object();
        syncSignalled=false;
        syncName=name;
    }
   

    /**
    * Gets the signal name specified during creation.
    * @return Returns the signal name used for logging.
    */
    public String getName() {
        return syncName;
    }

    /**
    * Waits indefinetely for the event to be signalled. 
    * The signalled status of the event is auto-cleared on return.
    * @return <li>false</li> if the wait operation has been interrupted 
    * (someone is trying to "kill" our thread), <li>true</li> is the event has been signalled.
    */
    public boolean waitForEvent() {
        return waitForEvent(true);
    }

    /**
    * Waits indefinetely for the event to be signalled.
    * @param  autoclear    if true the signalled status of the event is auto-cleared on return.
    * @return <li>false</li> if the wait operation has been interrupted 
    * (someone is trying to "kill" our thread), <li>true</li> is the event has been signalled.
    */
    public boolean waitForEvent(boolean autoclear) {
          synchronized (sync) {
                  // while is needed since Java's wait function could return even when no signal 
		  // has been raised (so called spurious wakeups)
                  while (!syncSignalled) {
                      //log.d("%s: Awaiting signal",syncName);
                      try {
                          sync.wait();
                      }
                      catch (InterruptedException e) {
                          return false;                     
                      }
                  }
                  if (autoclear)
                      syncSignalled=false;                       
          }

          return true;
    }
   

    /**
    * Waits for the event to be signalled up to the specified time. 
    * The signalled status is auto-cleared on return
    * @param  millis    maximum number of milliseconds to wait for
    * @return
    * <li>0</li> if the event has been signalled;
    * <li>-1</li> if the wait operation has been interrupted (someone is trying to "kill" our thread);
    * <li>-2</li> if the timeout has elapsed and the event has not been signalled
    */    
    public int waitForEvent(long millis) {
        return waitForEvent(millis, true);
    }
   
   

    /**
    * Waits for the event to be signalled up to the specified time.
    * @param  millis    maximum number of milliseconds to wait for
    * @param  autoclear    if true the signalled status of the event is auto-cleared on return.
    * @return
    * <li>0</li> if the event has been signalled;
    * <li>-1</li> if the wait operation has been interrupted (someone is trying to "kill" our thread);
    * <li>-2</li> if the timeout has elapsed and the event has not been signalled
    */    
    public int waitForEvent(long millis, boolean autoclear) {                 
        long startTime,elapsed;

        synchronized (sync) {
                // while is needed since Java's wait function could return even when no signal 
                // has been raised (so called spurious wakeups)
                  while (!syncSignalled) {
                      //log.d("%s: Awaiting signal",syncName);
                      try {
                          startTime=System.currentTimeMillis();
                          sync.wait(millis);
                          elapsed=System.currentTimeMillis()-startTime;
                          if (elapsed>=millis)  // if it is a real timeout exit
                              break;
                          else
                              millis-=elapsed;  // if it is a spurious wakeup go on waiting 
						// till the timeout has been reached
                      }
                      catch (InterruptedException e) {
                          return -1;                     
                      }
                  }
                  boolean signal=syncSignalled;
                  if (autoclear)
                      syncSignalled=false;                       
                  return (signal?0:-2);
          }
    }

   

    /**
        * Waits for the first of multiple events to be signalled up to the specified time.
        * The signalled status of the first event is auto-cleared on return
        * @param  millis    maximum number of milliseconds to wait for
        * @param  events    an array of one or more events to wait concurrently for
        * @return
        * <li>i</li>  i>=0, if the i-th event has been signalled;
        * <li>-1</li> if the wait operation has been interrupted
        * (someone is trying to "kill" our thread);
        * <li>-2</li> if the timeout has elapsed and none of the events have been signalled
        */         
        public static int waitForMultipleEventsExecutor(final long millis, final SyncEvent ...events) {
            int numEvents=events.length,ris=-1;                

            ExecutorService poolMultipleWait=Executors.newFixedThreadPool(numEvents);
            ExecutorCompletionService<Pair<Integer,Integer>>
            cs=new ExecutorCompletionService<Pair<Integer,Integer>>(poolMultipleWait);

            // we launch multiple waits in parallel
            for (int i=0;i<numEvents;i++) {
                final int index=i;
                cs.submit(new Callable<Pair<Integer,Integer>>() {
                    @Override
                    public Pair<Integer,Integer> call() throws Exception {
                        return new Pair<Integer,Integer>(index,events[index].waitForEvent(millis,false));
                    }
                });
            }

            Pair<Integer, Integer> signalled;
            try {
                log.d("waitForMultipleEvents: awaiting on %d events %d timeout",numEvents,millis);
                signalled = cs.take().get();
                if (signalled.second==0)
                    ris=signalled.first;
                else
                    ris=signalled.second;
                events[signalled.first].resetEvent(); // we clear the event status
                log.d("waitForMultipleEvents: received event %d (%s) result %d",
			signalled.first,events[signalled.first].syncName,signalled.second);
            }
            catch (InterruptedException e) {
                    ris=-1;  // we have been interrupted
            }
            catch (ExecutionException e) {
                log.e(e,"waitForMultipleEvents: exception");
            }
            finally {
                poolMultipleWait.shutdownNow();  // abort any other wait operation           
            }        

            return ris;       
        }    

    /**
    * Waits for the first of multiple events to be signalled up to the specified time. 
    * The signalled status of the first event is auto-cleared on return
    * @param  millis    maximum number of milliseconds to wait for
    * @param  events    an array of one or more events to wait concurrently for
    * @return
    * <li>i</li>  i>=0, if the i-th event has been signalled;
    * <li>-1</li> if the wait operation has been interrupted (someone is trying to "kill" our thread);
    * <li>-2</li> if the timeout has elapsed and none of the events have been signalled
    */         
    public static int waitForMultipleEvents (final long millis, final SyncEvent ...events) {
        int numEvents=events.length,ris=-1;                

        final SyncEvent eventQueueNotEmpty=new SyncEvent("eventQueueNotEmpty");
        final Queue<Pair<Integer,Integer>> 
		eventQueue=new ConcurrentLinkedQueue<Pair<Integer,Integer>>();

        Thread [] poolMultipleWait=new Thread[numEvents];

        // we launch multiple waits in parallel
        for (int i=0;i<numEvents;i++) {
            final int index=i;
            poolMultipleWait[i]=new Thread() {

                @Override
                public void run() {
                    eventQueue.add(new Pair<Integer,Integer>
			(index,events[index].waitForEvent(false)?0:-1));
                    eventQueueNotEmpty.signalEvent();
                    //log.d("waitForMultipleEvents: thread waiting on event %s exiting...",
                    //events[index].getName());
                }
            };
            poolMultipleWait[i].start();
        }
                   

        log.d("waitForMultipleEvents: awaiting on %d events %d timeout 
		(ActiveThreads %d)",numEvents,millis,Thread.activeCount());
        ris=eventQueueNotEmpty.waitForEvent(millis);

        // abort any other wait operation
        for (int i=0;i<numEvents;i++)
            if (poolMultipleWait[i].isAlive())
                poolMultipleWait[i].interrupt();            

        // if we have been signalled
        if (ris==0) {       
            // check the "first" signalled event
            Pair<Integer, Integer> signalled = eventQueue.peek();
            if (signalled.second==0) {
                ris=signalled.first;
                events[signalled.first].resetEvent(); // we clear the event status
            }
            else
                ris=signalled.second;

            log.d("waitForMultipleEvents: received event %d (%s) result %d",
		signalled.first,events[signalled.first].syncName,signalled.second);
        }

        return ris;       
    }
   
   

    /**
    * Waits indefinitely for the first of multiple events to be signalled. 
    * The signalled status of the first event is auto-cleared on return
    * @param    events    an array of one or more events to wait concurrently for
    * @return
    * <li>i</li> i>=0, if the i-th event has been signalled;
    * <li>-1</li> if the wait operation has been interrupted (someone is trying to "kill" our thread);
    */         
    public static int waitForMultipleEvents (final SyncEvent ...events) {
        int numEvents=events.length,ris=-1;                

        final SyncEvent eventQueueNotEmpty=new SyncEvent("eventQueueNotEmpty");
        final Queue<Pair<Integer,Integer>> 
		eventQueue=new ConcurrentLinkedQueue<Pair<Integer,Integer>>();

        Thread [] poolMultipleWait=new Thread[numEvents];

        // we launch multiple waits in parallel
        for (int i=0;i<numEvents;i++) {
            final int index=i;
            poolMultipleWait[i]=new Thread() {

                @Override
                public void run() {
                    eventQueue.add(new Pair<Integer,Integer>
			(index,events[index].waitForEvent(false)?0:-1));
                    eventQueueNotEmpty.signalEvent();
                    //log.d("waitForMultipleEvents: thread waiting on event %s exiting...",
                    //events[index].getName());
                }
            };
            poolMultipleWait[i].start();
        }
                   

        log.d("waitForMultipleEvents: awaiting on %d events (ActiveThreads %d)",
		numEvents,Thread.activeCount());
        ris=eventQueueNotEmpty.waitForEvent()?0:-1;

        // abort any other wait operation
        for (int i=0;i<numEvents;i++)
            if (poolMultipleWait[i].isAlive())
                poolMultipleWait[i].interrupt();            

        // if we have been signalled
        if (ris==0) {       
            // check the "first" signalled event
            Pair<Integer, Integer> signalled = eventQueue.peek();
            if (signalled.second==0) {
                ris=signalled.first;
                events[signalled.first].resetEvent(); // we clear the event status
            }
            else
                ris=signalled.second;

            log.d("waitForMultipleEvents: received event %d (%s) result %d",
		signalled.first,events[signalled.first].syncName,signalled.second);
        }

        return ris;       
    }
   

    /**
    * Waits indefinitely for the first of multiple events to be signalled. 
    * The signalled status of the first event is auto-cleared on return
    * @param    events    an array of one or more events to wait concurrently for
    * @return
    * <li>i</li> i>=0, if the i-th event has been signalled;
    * <li>-1</li> if the wait operation has been interrupted (someone is trying to "kill" our thread);
    */         
    public static int waitForMultipleEventsExecutor (final SyncEvent ...events) {
        int numEvents=events.length,ris=-1;

        ExecutorService poolMultipleWait=Executors.newFixedThreadPool(numEvents);
        ExecutorCompletionService<Pair<Integer,Boolean>> 
		cs=new ExecutorCompletionService<Pair<Integer,Boolean>>(poolMultipleWait);

        // we launch multiple waits in parallel
        for (int i=0;i<numEvents;i++) {
            final int index=i;
            cs.submit(new Callable<Pair<Integer,Boolean>>() {
                @Override
                public Pair<Integer,Boolean> call() throws Exception {
                    return new Pair<Integer,Boolean>(index,events[index].waitForEvent(false));
                }
            });
        }

        Pair<Integer, Boolean> signalled;
        try {
            log.d("waitForMultipleEvents: awaiting on %d events",numEvents);
            signalled = cs.take().get();
            if (signalled.second)
                ris=signalled.first;
            events[signalled.first].resetEvent(); // we clear the event status
            log.d("waitForMultipleEvents: received event %d (%s) result %b",
		signalled.first,events[signalled.first].getName(),signalled.second);
        }
        catch (InterruptedException e) {
            ris=-1;
        }
        catch (ExecutionException e) {
            log.e(e,"waitForMultipleEvents: exception");
        }
        finally {
            poolMultipleWait.shutdownNow();  // abort any other wait operation           
        }

        return ris;       
    }

   

    /**
    * Checks if the event has been signalled. The signal status is auto-cleared on return.
    * @return <li>true</li> if the event has been signalled;
    * <li>false</li>    otherwise
    */    
    public boolean isSignalled () {
        return isSignalled(true);
    }
   
   

    /**
    * Checks if the event has been signalled.
    * @param  autoclear    if true the signalled status of the event is auto-cleared on return.
    * @return <li>true</li> if the event has been signalled;
    * <li>false</li>    otherwise
    */    
    public boolean isSignalled (boolean autoclear) {
        synchronized (sync) {
            boolean ris=syncSignalled;
            if (autoclear)
                syncSignalled=false;
            return ris;
        }
    }

   
   

    /**
    * Signals the event, waking one of the threads waiting on the signal (can be more than one).
    */    
    public void signalEvent() {
          synchronized (sync) {
            syncSignalled=true;
            sync.notify();
          }
    }
   

    /**
    * Clears manually the signalled status of the event.
    */    
    public void resetEvent() {
          synchronized (sync) {
            syncSignalled=false;
          }       
    }
}

Implementations Notes

The SyncEvent class deals with most of the common issues found in Java standard functions for synchronization (wait and notify functions of Object class):

  • Missed Events: If you issue a notify function on an object BEFORE a thread has started waiting on the same object, the signal is lost (i.e. the waiting thread continues to wait without waking up). The solution is well known (see [1]) and it is based on saving the signalled status on an internal variable called syncSignalled.
  • Spurious Wakeups: For "inexplicable" reasons, it is possible for threads to wake up from a wait call even if notify() method has not been called. This is known as spurious wakeups (e.g. wakeups without any reason). To deal with such unexpected behaviour, the syncSignalled variable comes through again, since all waitForEvent methods go in a check variable-wait loop until the syncSignalled variable is set.
  • Wait For Multiple Events: Java does not provide a synchronization function which waits on multiple events (well known in Windows API as WaitForMultipleObjects). The idea to solve this problem is to create several threads (one for every event we want to wait on), each of which suspends in a waitForEvent operation on one of the input events (in the WaitForMultipleEventsExecutor method threads are created using  the ExecutorService class as thread pool). In turn, the thread calling the static waitForMultipleEvents method suspends to wait for which of the created threads wakes up "first", using a queue and the same SyncEvent class to notify the termination of a thread (in the WaitForMultipleEventsExecutor the queue and termination event are all handled by the ExecutorCompletionService class). As soon as one of events is signalled, the corresponding created thread ends and WaitForMultipleEvents function awakes, collects the result and sends an interrupt to all the remaining threads, still waiting for the other unfired events. It is important here to use non auto-clearing waitForEvent operations, otherwise some events could get lost: for example when two events are signalled one after another, before the WaitForMultipleEvents function has had enough time to interrupt all the remaining event-waiting threads.
  • InterruptedException: Many Java methods which deal with thread suspension throw the unwanted InterruptedException which must be caught, even though most of the times is not even handled by software (e.g., try with an empty catch). The InterruptedException is basically notifying to a "waiting" thread (e.g. thread which has called wait or sleep) that someone is trying to "kill" us (e.g. the user is trying to close the process or another thread has explicitly called interrupt method on the suspended thread). Since the suspended thread is not being executed, the only way to notify is by resuming it throwing an exception, which, as you can fancy, is the renowed InterruptedException exception. If you have understood the mechanism, you can probably imagine that the best way to handle this exception is by terminating whatever operation you were doing in the suspended thread, releasing any allocated resources and then exiting the thread. It would be useful if this exception could be propagated up through the call stack, but due to Java's checked exception, this is not always possible (e.g. use of wait functions in a method with a precise signature which does not allow any further throw clauses, such as when implementing methods of a Java interface), so for convenience's sake, all methods of SyncEvent class catch InterruptedException internally and return a particular value, when interrupted (see method documentation).

Using the Code

First, instantiate the SyncEvent objects:

Java
SyncEvent connected = new SyncEvent("connected");
SyncEvent newPeer = new SyncEvent("newPeer");

On the thread you want to wait for the event, call:

Java
// the thread will suspend for indefinite time, awaiting the event
// returns true when the event is signalled or false when we have been interrupted
connected.waitForEvent();
Java
// the thead will suspend awaiting the event for 10 seconds
// returns an 0 when the event is signalled, -1 when we have been interrupted 
// and -2 when the timeout elapsed without any event.
connected.waitForEvent(10000);

On the thread you want to signal the event, call:

Java
connected.signalEvent();

If you want to wait for multiple events on a thread:

Java
// the thread will suspend indefinitely, awaiting both events
// returns i-th event where i>=0, if i-th event has been signalled 
// (0 for connected, 1 for newPeer) or  -1 when we have been interrupted
SyncEvent.waitForMultipleEvents(connected,newPeer);
Java
// the thread will suspend awaiting both events for 10 seconds
// returns i-th event where i>=0, if i-th event has been signalled 
// (0 for connected, 1 for newPeer), -1 when we have been interrupted and -2 
// when the timeout elapsed without any event.
SyncEvent.waitForMultipleEvents(10000,connected,newPeer);

Source Code

You can find attached an Android Eclipse project named SyncEvent containing the SyncEvent class and a sample test program. It consists of a simple activity containing just two buttons:

  • Start: which has a dual behaviour:
    • If there is no waiting thread, it creates a thread called "Thread 1" which simply waits endlessly on two SyncEvents called "start" and "stop" (actually instead of waiting indefinitely it simulates it by using  a while loop and definite wait of 5 seconds to test the "wait for timeout" methods, which were bugged in earlier versions). Whenever either start or stop event is received, "Thread 1" logs the received event and then re-begins waiting endlessly on the both SyncEvents.
    • If a waiting thread exists, it sends a random event (either start or stop) to wake "Thread 1".
  • Abort: which aborts the waiting thread.

If you launch the activity, you can check the output in Eclipse LogCat window. The application logs also the number of active threads whenever it starts waiting on a SyncEvent in order to check that all thread creation/interruption operations works flawlessly (i.e. thread count does not rise over time).

Here follows the main code for the click events, which is straightforward:

Java
@Override
public void onClick(View v) {
    switch (v.getId()) {
        case R.id.startBtn:
            // Create Thread 1
            if (t==null) {
                t=new Thread() {
                    public void run() {

                        log.i("Thread 1 started");
                        while (true) {
                            log.i("Awaiting start or stop signal from another thread");
                            int ris=SyncEvent.waitForMultipleEvents(5000,events);
                            if (ris==-1) {
                                log.i("Thread 1 abort signal received");
                                break;
                            }
                            else if (ris>=0)
                                log.i("Thread 1 %s event received, processing...",events[ris].getName());
                            else
                                log.i("Thread 1 timeout elapsed, restarting...");
                        }
                        log.i("Thread 1 exiting");
                        t=null;
                    };
                };
                t.start();
            }
            else {
                int offset=(int) (System.currentTimeMillis()%2);
                log.i("Signalling %s to thread 1",events[offset].getName());
                events[offset].signalEvent();
        }

        break;

        case R.id.abortBtn:
            log.i("Signalling abort to thread 1");
            if (t!=null)
                t.interrupt();
        break;
    }
}

Remarks

One could note that all these operations could be simply realized by inserting a constant value in a blocking queue. Obviously this is true, since a blocking queue is "under the hood" a combination of a queue and events. To avoid misuse, events should be used when you just want to wake a thread (e.g., send a signal without any data), while blocking queues should be used when you are sending some data to a "consumer" thread.

References

History

  • V1.0 Initial release (30 June 2015)
  • V 1.1 Fixed SyncEvent class source code (30 June 2015)
  • V 1.2 Fixed waitForEvent with timeout method (to discriminate between spurious wakeup and timeout), reimplemented waitForMultipleEvents with ConcurrentLinkedQueue, Threads and a SyncEvent for the most "basic" code [older waitForMultipleEvents implementation is available in new waitForMultipleEventsExecutor methods], added getName method and updated Android test application (8 July 2015)

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)