Introduction
I tasked my self with developing a generic Entity Framework based, asynchronous poller with the following requirements:
- Must poll for a specific entity type which has been configured within a DbContext
- Must except a lambda expression in the form of a poll query
- Must implement a exponential back off retry pattern for connection failures
- Must run asynchronously and be event based
Background
Many of us have had to use some kind of polling mechanism in our code as it's generally required to listen for changes/new entries and do something when they arrive.
Pollers generally range from the basic while() loop with a Thread.Sleep to elaborate complex async Observer patterns and SqlDependency code...
I've been working a lot with the Entity Framework lately and I had a requirement to poll the database for records within a table (Entity) that has a specific StatusId.
I decided to write a generic poller which is very easy to use and allows a developer to specify a lambda expression to asynchronously query the database:
Using the code
To use the attached EntityPoller<T>
simply write the following code:
var poller = new EntityPoller<Notification>(context, q => q.StatusId == (int) Status.New);
poller.ActiveCycle = 200;
poller.IdleCycle = 10000;
poller.RetryAttempts = 5;
poller.RetryInitialInterval = 10000;
poller.EntityReceived += OnNotification;
poller.Error += OnError;
poller.Start();
Code Breakdown
The poll
First of all, you can see below that the constructor is excepting two parameters, the first is your DbContext
which must contain an entity set (DbSet<>
)
of type T
, the second is your lambda expression which returns bool.
public sealed class EntityPoller<T>
where T : class
{
private readonly DbContext _context;
private readonly Func<T, bool> _query;
private readonly object _sync = new object();
private bool _polling;
public EntityPoller(DbContext context, Func<T, bool> query)
{
_context = context;
_query = query;
IdleCycle = 10000;
ActiveCycle = 1000;
RetryAttempts = 5;
RetryInitialInterval = 10000;
}
public int IdleCycle { get; set; }
public int ActiveCycle { get; set; }
public int RetryAttempts { get; set; }
public int RetryInitialInterval { get; set; }
The poll consists of a main loop which will run continuously until an entity is returned. For each cycle of the loop there is a pause (Thread.Sleep
)
the duration of the pause depends upon the result of the defined query. If the query returns an entity then the pause uses ActiveCycle
, otherwise it will use IdleCycle
.
This is by design to allow entity notifications to be evented out of the object in a controlled manner. If you want new entities to be evented out as fast as possible then
set ActiveCycle
to 0.
private T Poll()
{
var set = _context.Set<T>();
T entity = null;
try
{
while (_polling)
{
entity = Retry(() => set.FirstOrDefault(_query),RetryAttempts,RetryInitialInterval);
if (entity != null) break;
Thread.Sleep(IdleCycle);
}
Thread.Sleep(ActiveCycle);
}
catch (Exception ex)
{
Stop();
if (Error != null)
Error.Invoke(ex);
}
return entity;
}
The poll method is invoked by using an AsyncWorker
delegate and an AsyncOperation
. I won't cover the async code in this article but the code
is all included in the source!
Retry Pattern
As you may have noticed in the above code snippet, the query Func<t,bool>
is wrapped
within a Retry method. The Retry pattern will run the query RetryAttempts
times starting with RetryInitialInterval
in milliseconds
and increasing this exponentially by the power of 5. This is just what I use as a default but you may wish to alter this logic to suit your needs.
The Retry function is as follows:
private static T Retry(Func<T> action, int attempts = 5, int initialInterval = 10000)
{
if (action == null)
throw new ArgumentNullException("action");
for (int i = 1; i <= attempts; i++)
{
try
{
T result = action.Invoke();
return result;
}
catch (Exception)
{
if (i >= attempts) throw;
Thread.Sleep(initialInterval);
}
initialInterval *= 5;
}
return null;
}
And that's it! It works like a dream for me. If anyone uses this and would like to comment or ask questions/make suggestions then please feel free.
Thanks for reading.