Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

Event traffic calming

4.75/5 (3 votes)
26 Jan 2013CPOL6 min read 17.5K   209  
How to build an object to calm down event traffic.

Introduction

Have you ever had an object that generates events so fast your program doesn't have the time to process them? Imagine we have an online store. Every time a sell order (just order from now on) gets in, an events is triggered. Some event handler is executed to update the list of latest orders (or all of them). What if another order comes in after the update process has started? The update can take a long time, let's say 2 seconds, several orders could arrive during that time. We cannot start a new update for each of them, at some point we will need a synchronization system that will allow only a single update at a time. Constantly updating controls could cause our application to freeze. We would use a persistence system, like a DB, so it would be better to make bigger queries than constant very small queries. What if we could use an object to calm these events traffic for us?

What is it good for?

Before talking about problem or solution details, let's take a look to sample applications on Figure 1. On the left there is the Calmed Client, which uses the event traffic calmer. On the right there is the Real Time Client, which processes orders as they arrive. On the calm application we can see and navigate the orders, at least before it refreshes every 5 secs. The real time application, on the other hand, is a flickering application where is impossible to even read unless we capture the screen. If events arrive a little bit faster it might even freeze.

Image 1
Figure 1: Sample applications.

Our Store

Let's quickly define the domain we will use for the rest of this article. We need an entity and a repository.

C#
public class Order
{
    public string Title { get; set; }
    public DateTime CreatedAt { get; set; }
}
 
public interface IOrderRepository
{
    IEnumerable<Order> ListFrom(DateTime startDate);
    event EventHandler OrderArrived ;
} 
Code Excerpt 1: Sample domain.

From our sample code we'll make reference to an instance of orders repository in variable named _orders.

The code that use to handle orders (from our form) was:

C#
_orders.OrderArrived += HandleOrderArrived;
Code Excerpt 2: Handling order arrived events.

Every time an order arrives our handler is executed. The handler then queries the repository for orders that arrived since the last time it was executed and uses this list to update the UI. Our sample code shows this set of orders. See project: RealTimeClient.

Building an object to solve the problem.

We need an object (the calmer) with two members: a handler to hook to events we want to calm down and an event that it will trigger calmly.

C#
public class EventCalmer
{
    public void HandleEvent(object sender, EventArgs e) { ... }
    public event EventHandler EventOccurred ;
} 
Code Excerpt 3: A calmer.

What we need is to put this object between the actual event generator and our code.

C#
_calmer = new EventCalmer(TimeSpan.FromSeconds(5));
...
_orders.OrderArrived += _calmer.HandleEvent;
_calmer.EventOccurred += HandleOrderArrived; 
Code Excerpt 4: Handling calmed order arrived events.

With this setup, every time an order arrives, it will trigger only events when some time elapses, 5s in our example. If more than 3 orders arrive in 5s, the second will trigger no event, but the third will. The calmer will ensure it triggers the event once after the last order arrives so there will never be an order that hasn't been processed. The calmer will trigger its event a number of times close (+/-1) to all 5s executions that would fit in the total execution time. That is total time divided by 5s. If some handler execution takes longer than 5s, the calmer would start the next execution as soon as the previous finishes. If the execution finishes earlier, it will sleep for the remaining time.

It's very difficult to explain. I don't even understand what I'm writing. Let's try with a diagram:

Image 2
Figure 2: How a calmer works.

This really changes everything. Transitions labeled Event Received speak for themselves. Finished will be used for transitions out from executing. Done will be used for transitions out from waiting.

Since it looks so very much like an automaton, an automaton-like implementation could be a solution.

States

We'll create an enum with one value for each of Figure 2 states.

C#
private enum CalmerState
{
    Idle,
    Executing,
    ExecutingOneInQueue,
    Waiting,
    WaitingOneInQueue,
}; 
Code Excerpt 5: Calmer states.

Transition Function

An easy way to define this transition function would be by using two static dictionaries, one for Event Received, another one for Done and Finished together.

C#
private static readonly Dictionary<CalmerState, CalmerState> EventReceivedTransitions=
    new Dictionary<CalmerState, CalmerState>
    {
        { CalmerState.Idle, CalmerState.Executing },
        { CalmerState.Executing, CalmerState.ExecutingOneInQueue },
        { CalmerState.Waiting, CalmerState.WaitingOneInQueue },
        { CalmerState.WaitingOneInQueue, CalmerState.WaitingOneInQueue },
        { CalmerState.ExecutingOneInQueue, CalmerState.ExecutingOneInQueue },
    };
 
private static readonly Dictionary<CalmerState, CalmerState> DoneOrFinishedTransitions=
    new Dictionary<CalmerState, CalmerState>
    {
        { CalmerState.Executing, CalmerState.Waiting },
        { CalmerState.Waiting, CalmerState.Idle },
        { CalmerState.ExecutingOneInQueue, CalmerState.WaitingOneInQueue },
        { CalmerState.WaitingOneInQueue, CalmerState.Executing },
    };
Code Excerpt 6: Transition function dictionaries.

Calmer's handler and trigger.

Standard event handling and triggering code will ensure events get to the calmer and from the calmer to the store system. Just before triggering the calmer's event, we need to store the time it's been triggered in an instance variable.

C#
public void HandleEvent(object sender, EventArgs e)
{
    EventReceived();
}
 
public event EventHandler EventOccurred;
 
private void OnEventOccurred(EventArgs e)
{
    _lastExecutionBeganAt = DateTime.Now;
    if (EventOccurred != null) EventOccurred(this, e);
} 
Code Excerpt 7: Calmer's handler and trigger.

Starting up the calmer

All the magic begins when first event is received and EventReceived() gets executed.

C#
private void EventReceived()
{
    lock (_criticalSectionDoor)
    {
        var newState = GetStateToMoveToWhenEventArrives();
        SetCurrentStateTo(newState);
 
        if (_currentState == CalmerState.Executing)
            BeginExecutionThread();
    }
}
 
private CalmerState GetStateToMoveToWhenEventArrives()
{
    return EventArrivedTransitions[_currentState] ;
}
 
private void SetCurrentStateTo(CalmerState newState)
{
    _currentState = newState;
}
Code Excerpt 8: Staring up the calmer.

Inside a locked block the transition to a new state is made by means of the first dictionary. Later, if new state is Executing, which means the calmer should be moved out of idle, execution begins. BeginExecutionThread() method is not very interesting, it just starts an asynchronous call to another method, ExecuteWhileNotIddle(). That one is.

The execution thread

C#
private void ExecuteWhileNotIddle()
{
    while (_currentState != CalmerState.Idle)
    {
        if (currentState == CalmerState.Executing || 
            currentState == CalmerState.ExecutingOneInQueue)
            DoExecuteStep();
        else
            DoWaitStep();
 
        UpdateStateAfterDoneOrFinished();
    }
}
Code Excerpt 9: Execution thread.

The execution thread executes steps as long as the state isn't idle.

The steps

There are two kinds of steps: Execute or Wait steps. Executes, trigger the event. Waits, sleep.

C#
private void DoExecuteStep()
{
    OnEventOccurred(EventArgs.Empty);
}
 
private void DoWaitStep()
{
    var stillNeedToWait = _sleepTime - TimeItTookLastExecution();
 
    if (stillNeedToWait > TimeSpan.Zero)
        Thread.Sleep(stillNeedToWait);
}
 
private TimeSpan TimeItTookLastExecution()
{
    return DateTime.Now - _lastExecutionBeganAt;
}
Code Excerpt 10: Execute and Wait steps.

Execute triggers the event. When used in production some error handling would be useful. Right now the calmer would break and become unusable if an error occurs in a handler. Note that event trigger is synchronous, so it would take as long as the actual handler.

Wait sleeps the remaining time. Let's say we want the calmer to release events every 5s. If the actual handler takes 2s, the calmer would sleep for 3s. If the actual handler takes 6s, the calmer would not sleep.

Update state after done waiting or finished executing

After any of previous steps finish we need to update the automaton state. This time we'll use the second dictionary.

C#
private void UpdateStateAfterDoneOrFinished()
{
    lock (_criticalSectionDoor)
    {
        var newState = GetStateToMoveToWhenDoneOrFinished();
        SetCurrentStateTo(newState);
    }
}
 
private CalmerState GetStateToMoveToWhenDoneOrFinished()
{
    return DoneOrFinishedTransitions[_currentState]
} 
Code Excerpt 11: Update state after executing a step.

The calmer is ready.

Testing it

I tried to develop all of this using TDD but it was so hard to think how to test asynchronous code without putting unneeded public properties or accessing private members so I didn't. I just created a couple of tests and develop from there the whole thing. Project TrafficCalming.Tests contains those tests.

There is a pretty useful couple of integration tests. Each of this tests take around 10s and they are repeated 10 times. Still, they've never failed: 

  1. The last time the calmer triggers an event is after the last event that arrived to the calmer.
  2. This test ensures handled events count is close (+/-1) to (total time) / (handling time).

Limitation

The only one I could think so far: The event itself cannot contain meaningful information. Not even the reference to the object generating the event is passed on to the calmed application. 

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)