Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Queue Processor

0.00/5 (No votes)
25 May 2015 1  
Implementation of an event queue for delayed processing

Introduction

This tip describes the implementation of an generic queue processor that can be used in systems where we need multithreaded processing of events. This approach can be useful when we have some time-consuming processing and we do not want to stop the main processing thread.

Background

This approach uses C# generic type class. We implemented this manager as an abstract class that has to be overloaded with methods that do the real processing. We also used the class serialization and deserialization for objects queue storage.

Using the Code

The core class of this processor is QueueProcessor. We implement our new queue processor with our event class and constructor with two input parameters:

  • QueueFilename: filename for storing queue when process stops
  • Processing interval: interval of processing the queue
public class MyQueue : QueueProcessor<Data>
    {
        public MyQueue(String queueFileName, 
               double processingIntervalms) : base(queueFileName, processingIntervalms)
        {
        }

If we do not define the queue filename, our queue processor will not be saving queue to filesystem and all not processed events will got lost when service stops.

QueueProcessor is an abstract class and we have to implement two methods:

  • ProcessEvent: Method that really does the processing (e.g., saving to database, calling webservice,...)
     protected override bool ProcessEvent(Data eventToProcess)
     {
         // do some fake long processing
         log.Debug("Start processing: " + eventToProcess.EventData);
         Thread.Sleep(1000);
         log.Debug("End processing: " + eventToProcess.EventData);
         // event successfully processed
         return true;
     }

    In this example, we do some sleeping just to show delayed processing. With the return result of this method, we notify the QueueProcessor to remove the event or not.

  • QueueLengthChanged: This method is called whenever the queue length changed - this method can be used for monitoring the queue length.
     protected override void QueueLengthChanged(int newQueueLen)
     {
        log.Debug("Queue length: " + newQueueLen.ToString());     
     }

In the example, we just log the current queue length.

The main program loop in the example - firstly initialize the MyQueue processor:

 MyQueue myQueue = new MyQueue(QUEUE_FILENAME, QUEUE_PROCSSING_INTERVAL);

After that, we read lines from user input and create a new event to be processed:

while (true)
{
      userInput = Console.ReadLine();
      if (userInput.ToLower().Equals(LOOP_END_CMD))
        break;
      else
        myQueue.AddEvent(new Data() { TimeStamp = DateTime.Now, EventData = userInput });
}

The important thing to do when the program stops is to dispose the queue processor. When we dispose it, all not processed events in the queue are serialized with another class DataSerializer.

myQueue.Dispose();

DataSerializer class has methods for serializing object to file or string and list of object to file or string. The DataSerializer includes the type information into serialization so it is possible to serialize lists with different types of objects.

Events or data class-es must have public setter for all properties and a constructor that takes no argument.

See our example class:

public class Data
{
       public DateTime TimeStamp { get; set; }

       public String EventData { get; set; }
}    

How Does It Work

The main data storage in the QueueProcesor is one list with our event objects.

private List<T> eventList = new List<T>();

When we call the method AddEvent, the event is added to the list at the end and processing of the list is started in a new thread - so that the method returns fast.

public void AddEvent(T newEvent)
{
      lock (this.locker)
      {
         this.eventList.Add(newEvent);
      }
    // start ratings list processing in thread
    Thread processThread = new Thread(processingPendingList);
    processThread.Start();
}

The list processing is done with a method called processingPendingList. We use Monitor.TryEnter with a mutex object to check if the processing is already running - if it is running, we do nothing.

if (Monitor.TryEnter(processingLocker))
{
     ........
     ........
     ........
     Monitor.Exit(processingLocker); 
}

If we successfully entered the processing, we firstly take the first element from the events list - this is done locking the list locker.

// first element on list
T firstEvent = null;
lock (this.locker)
{
    if (this.eventList.Count > 0)
          firstEvent = this.eventList[0];
}  

If there are no elements in the list, we end here. If we have some event to process, we call the abstract method ProcessEvent. If processing returns with true, we remove the first element from the list. If processing fails, we stop processing the event list because processing is done in list order.

// processing of 
Boolean success = false;
success = ProcessEvent(firstEvent);
// processing succeeded
if (success)
{
     lock (this.locker)
     {
        // remove first element from list
        this.eventList.RemoveAt(0);
        // notify queuelen change
        QueueLengthChanged(this.eventList.Count);
      }
 }
 else
 {
      // processing not succeeded - end here
       break;
 }

Points of Interest

I found this approach for delayed processing very useful in different project, mainly server based. I used different implementations of queue processors for database storage, webservice call and other time consuming tasks that must not stop the server processing.

History

  • 5/25/2015 - Initial release

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here