Introduction
This tip describes the implementation of an generic queue processor that can be used in systems where we need multithreaded processing of events. This approach can be useful when we have some time-consuming processing and we do not want to stop the main processing thread.
Background
This approach uses C# generic type class. We implemented this manager as an abstract
class that has to be overloaded with methods that do the real processing. We also used the class serialization and deserialization for objects queue storage.
Using the Code
The core class of this processor is QueueProcessor
. We implement our new queue processor with our event class and constructor with two input parameters:
QueueFilename
: filename for storing queue when process stops
- Processing interval: interval of processing the queue
public class MyQueue : QueueProcessor<Data>
{
public MyQueue(String queueFileName,
double processingIntervalms) : base(queueFileName, processingIntervalms)
{
}
If we do not define the queue filename, our queue processor will not be saving queue to filesystem and all not processed events will got lost when service stops.
QueueProcessor
is an abstract
class and we have to implement two methods:
ProcessEvent
: Method that really does the processing (e.g., saving to database, calling webservice,...)
protected override bool ProcessEvent(Data eventToProcess)
{
log.Debug("Start processing: " + eventToProcess.EventData);
Thread.Sleep(1000);
log.Debug("End processing: " + eventToProcess.EventData);
return true;
}
In this example, we do some sleeping just to show delayed processing. With the return result of this method, we notify the QueueProcessor
to remove the event or not.
QueueLengthChanged
: This method is called whenever the queue length changed - this method can be used for monitoring the queue length.
protected override void QueueLengthChanged(int newQueueLen)
{
log.Debug("Queue length: " + newQueueLen.ToString());
}
In the example, we just log the current queue length.
The main program loop in the example - firstly initialize the MyQueue
processor:
MyQueue myQueue = new MyQueue(QUEUE_FILENAME, QUEUE_PROCSSING_INTERVAL);
After that, we read lines from user input and create a new event to be processed:
while (true)
{
userInput = Console.ReadLine();
if (userInput.ToLower().Equals(LOOP_END_CMD))
break;
else
myQueue.AddEvent(new Data() { TimeStamp = DateTime.Now, EventData = userInput });
}
The important thing to do when the program stops is to dispose the queue processor. When we dispose it, all not processed events in the queue are serialized with another class DataSerializer
.
myQueue.Dispose();
DataSerializer
class has methods for serializing object to file or string and list of object to file or string. The DataSerializer
includes the type information into serialization so it is possible to serialize lists with different types of objects.
Events or data class-es must have public
setter for all properties and a constructor that takes no argument.
See our example class:
public class Data
{
public DateTime TimeStamp { get; set; }
public String EventData { get; set; }
}
How Does It Work
The main data storage in the QueueProcesor
is one list with our event objects.
private List<T> eventList = new List<T>();
When we call the method AddEvent
, the event is added to the list at the end and processing of the list is started in a new thread - so that the method returns fast.
public void AddEvent(T newEvent)
{
lock (this.locker)
{
this.eventList.Add(newEvent);
}
Thread processThread = new Thread(processingPendingList);
processThread.Start();
}
The list processing is done with a method called processingPendingList
. We use Monitor.TryEnter
with a mutex object to check if the processing is already running - if it is running, we do nothing.
if (Monitor.TryEnter(processingLocker))
{
........
........
........
Monitor.Exit(processingLocker);
}
If we successfully entered the processing, we firstly take the first element from the events list - this is done locking the list locker.
T firstEvent = null;
lock (this.locker)
{
if (this.eventList.Count > 0)
firstEvent = this.eventList[0];
}
If there are no elements in the list, we end here. If we have some event to process, we call the abstract
method ProcessEvent
. If processing returns with true
, we remove the first element from the list. If processing fails, we stop processing the event list because processing is done in list order.
Boolean success = false;
success = ProcessEvent(firstEvent);
if (success)
{
lock (this.locker)
{
this.eventList.RemoveAt(0);
QueueLengthChanged(this.eventList.Count);
}
}
else
{
break;
}
Points of Interest
I found this approach for delayed processing very useful in different project, mainly server based. I used different implementations of queue processors for database storage, webservice call and other time consuming tasks that must not stop the server processing.
History
- 5/25/2015 - Initial release