Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Asynchronous Context Processor

0.00/5 (No votes)
25 Aug 2004 1  
Asynchronous message processing infrastructure for .NET applications.

Sample Image - ACPArch.png

1. Introduction

.NET asynchronous programming pattern combined with .NET Thread Pool, provides a good start for building asynchronous applications. But there is lot more involved in building maintainable, scalable, and extensible enterprise applications that use asynchronous message processing at their core.

ACP is a framework that provides basic building blocks in the form of interfaces and infrastructure classes for designing and developing enterprise class .NET applications using asynchronous message processing. This article explains the components of ACP and how to use them from a .NET application. The source code of ACP and the demo application is available for download from this article.

ACP is modeled using interface based design. Interface based design combined with reflection API in .NET allows us to build extensible applications with pluggable components. Check the 'Customizing/Extending ACP' section (Section-3) on how ACP uses this capability of .NET to allow pluggable components in the ACP framework itself.

Components of ACP

Let us start by listing key components required to build a maintainable, scalable, and extensible asynchronous message processing infrastructure for an Enterprise application (.NET application, in our case) and how they are defined using different interfaces in ACP.

  1. Message. We need a good .NET type that can model any type of message. ACP provides this in the form of IMessage interface whose methods and properties model a SOAP message (fairly confirming to industry standards for interoperability).
  2. Message Processor. We need a type that can understand and process our message. In ACP, this is called a Message Processor and is defined by an interface named 'IMessageProcessor'. This interface allows a custom message processor object to be plugged into the application built using ACP for any new type of messages.
  3. Context. We need a Context to establish a relation between logically related messages of a particular message type. In the current implementation of ACP, a Context is defined using an interface called IContext. All the messages related to a context are processed in serial order. This provides data integrity among logically related messaging tasks.
  4. Context Manager. We need a Context Manager to create and maintain Contexts. In ACP, this is defined using an interface named 'IContextManager'.
  5. Context Processor. We need an object that can schedule and execute our message processing related to a Context. This object could be a Thread within the same AppDomain as ACP host application, or another AppDomain in a different process, or another machine altogether. In ACP, this is called Context Processor and is defined using an interface named 'IContextProcessor'. Treating a context processor as an object allows for design and development of more scalable and extensible applications. Today, a Thread and an in-memory queue for asynchronous message processing might suite the application requirement. But tomorrow, as the load on the system grows, it would be scalable to use a different system and a more robust and reliable messaging infrastructure (like MSMQ) for asynchronous message processing.
  6. Context Processor Pool. Having one Context Processor for each context would be a very costly affair and may not scale well. Think of an Enterprise sales system, which is accessed by hundreds of vendor applications and thousands of end-users. In such an application, maintaining a Context Processor per Context would bring down the system in no time. Better way would be to maintain a pool of Context Processors that can handle message processing for more than one context.

    In ACP, this is defined using an interface called IContextProcessorPool. This interface allows to specify a name for the Context Processor pool, maximum and minimum context processors for the pool, and finally a very important attribute for the pool, which defines the behavior of the Context Processors in the pool. This is called the 'Message Processing Mode'. This value could be a 'Pull' or 'Push'.

    In Pull model, each Context Processor will continuously monitor its assigned Contexts for any messages. This is suitable for time-critical applications with minimum number of Context Processors.

    In Push model, each Context will notify its assigned Context Processor whenever it has a message. This model is suitable for situations that demand more contexts and not so time-critical processing.

    There is a trade-off in both models. Pull model is CPU intensive and Push model is memory intensive. Currently, .NET framework offers a process wide Thread pool. But, is this good enough for an Enterprise application that relies heavily on asynchronous messaging? My answer is no. .NET thread pool is hard to manage for several reasons. Thread Pool threads do not have identity and behavior, and once a request is submitted to the Thread pool, there is no way to cancel it. Moreover, they are in-process. ACP based Context Processors and the Context Processor pool has identities, configurable behavior, and are location independent, which makes them ideal candidates for designing maintainable, scalable and extensible asynchronous messaging based applications.

  7. Context Processor Pool Manager. In an Enterprise application, execution of different modules need to be handled differently. So, having one Context Processor Pool with a fixed setting may not cater to all the different context in which the application runs. To handle this effectively, ACP defines an object called Context Processor Pool Manager, using an interface (guess) 'IContextProcessorPoolManager'. This allows us to create and maintain more than one Context Processor Pool with different settings (different message processing mode, Push/Pull, different number of Max and Min Context Processors, etc.).

2. Using ACP in a .NET (C#) application

OK, all the above stuff we discussed involves theory, based on some abstract interfaces. But, who is going to provide implementation? Fortunately, ACP comes with a default generic implementation of all the above interfaces, and a demo interface implementation for IMessageProcessor, which is largely application specific. The demo IMessageProcessor implementation is a database message processor, that can handle select queries asynchronously.

This article does not explain step-by-step of the ACP's internal logic for the default implementation of its interfaces. ACP code is organized well, and I tried to document code in a decent manner (code comments using C# XML code comment tags), so that an average .NET and C# person can easily figure out what's happening inside. But, I'll walk through the important points in using ACP in a .NET (C#)application, just to give a head start and to explain certain important concepts, which might take up your valuable time to figure out yourself from code.

Step - 1. Identify and Create the Context(s) [message queuing]

To start with, in ACP, as discussed in Introduction, everything is centered around a Context. So, the first step to use ACP in your application is to identify what a Context means to your application. Basically, a Context means a group of messages that have to be processed in sequence. But this is a very abstract definition. To help you, here is a typical example of a Context...

For instance, in a Purchase Order application, during order placing process, you would like to update the database and then send an e-mail to the customer regarding follow-up information. These two tasks have to be executed in sequence. So ideally, these tasks fall under a Context. You should be careful enough to distinguish between a Context and database/application level Transactions. A Transaction could be treated as a Context by executing all the tasks involved in a Transaction inside a Context. But vice-versa is not true. I.e., a Context is not equivalent to a Database Transaction, where all the operations succeed or fail for a given instance of the Transaction. Whereas a Context is not bothered about the success or failure of the messages processed by it, Context is more concerned about the order in which messages are processed, so that messages are processed in the order they are posted to the Context.

Do not get lost in the terminology. Core of ACP is queuing messages and processing them. ACP wraps this simple concept into a powerful infrastructure used for design and development of .NET applications based on asynchronous message processing. Asynchronous message based design will allow the applications to scale to heavy loads, be loosely coupled, highly interoperable and extensible, and finally, easily maintainable. Also, ACP is not meant to replace any technique available in .NET or Microsoft Windows platform for asynchronous messaging. Rather, ACP provides a framework that abstracts the underlying message queuing, message scheduling, and message processing sub-systems of the applications, thus providing more control on how an application evolves.

Code snippet, finally... The following code snippet shows you how to create a new Context or get hold of an existing Context from a .NET (C#) application. This code snippet is extracted from the demo application (ACPDemo), so please go through the demo application for the meaning of the variables used.

// Get a ContextManager

IContextManager ctxMgr = ProviderManager.GetContextManager("Generic");
if (ctxMgr != null)
{
    // Create a Context if it does not exists

    IContext ctx = null;
    if (pubsCtx != null)
    {
        ctx = ctxMgr.GetContext(pubsCtx);
    }
    else
    {
        ctx = ctxMgr.CreateContext(ContextMsgQType.ContextLevel,"DBMsgProc");
        pubsCtx = ctx.ID;
        ctx.MessageProcessor.MessageProcessingCompleted = 
          new MessageProcessCompleteHandler(
          MessageProcessor_MessageProcessingCompleted_Pubs);
    }
}

Notable points (bold letter words) from code snippet...

  1. 'ProviderManager' is an ACP class that allows the application to choose the Context Manager implementation classes dynamically. These classes along with their assemblies are configurable in the application config file (xxx.exe.config). See the Section-3 for more details about customizing ACP. In our code snippet, I'm using the key name 'Generic' for Context Manager, which is mapped to the default implementation of 'IContextManager' interface provided by ACP. This mapping is defined in the demo application config file (ACPDemo.exe.config).
  2. 'GetContext' method of 'IContextManager' takes the unique ID of the Context (in the default implementation of 'IContextManager' provided by ACP, this is the string form of the 'Ticks' taken from current DateTime at the creation of the Context) and retrieves the IContext interface on the Context object. ACP comes with a default 'IContext' implementation, which uses in-memory synchronized queue for storing messages.
  3. 'CreateContext' method of IContextManager creates a new Context. Observe the 'ContextMsgQType'. This is an enumeration that has two values. 'ContextLevel' and 'Global'. 'ContextLevel' means the context thus created will have its own private message queue for storing messages posted to it. 'Global' means the context thus created will share a AppDomain wide common queue for storing messages posted to it.
  4. 'DBMsgProc' is a key, which is mapped to a demo implementation of 'IMessageProcessor' interface. More about this in Step-3.

Step - 2. Define a Pool of Context Processors to Schedule and execute Message Processing [message scheduling]

Next thing is to define a pool of Context Processors for scheduling and executing the messages corresponding to the Context. Following code snippet from demo application (ACPDemo) shows how to create a Context Processor Pool and assign Context to one of its Context Processors.

// Get a ContextProcessorPoolManager

IContextProcessorPoolManager cppMgr = 
   ProviderManager.GetContextProcessorPoolManager("Generic");
if (cppMgr != null)
{
    // Create/Get a Context Processor Pool with 'Push' Mode.

    IContextProcessorPool cpp = 
       cppMgr.CreateContextProcessorPool("Test", 
                 ContextProcessorPoolMode.Push);
    if (cpp != null)
    {
        // Assign a ContextWorker from the pool to the Context

        cpp.AssignContext(ctx);

Notable points in code snippet...

  1. 'GetContextProcessorPoolManager' static method of ProviderManager is used to get the 'IContextProcessorPoolManager' interface reference on the Context Processor Pool Manager implementation mapped to the key 'Generic'. 'Generic' key in this case is mapped to the default implementation of 'IContextProcessorPoolManager' provided by ACP. This mapping is defined in the demo application config file (ACPDemo.exe.config). As described in 'Introduction' (Section-1), Context Processor Pool Manager is used to manage a name based map of Context Processor Pools.
  2. 'CreateContextProcessorPool' method of 'IContextProcessorPoolManager' will create a new Context Processor Pool (implementing 'IContextProcessorPool') with the given name and the 'Context Processor Pool Mode' (Mode is discussed in Introduction section). The default implementation of 'IContextProcessorPoolManager' provided by ACP, will retrieve an existing Pool object if the properties match (i.e., if the name and the mode are same as that of an existing pool).
  3. 'AssignContext' method of 'IContextProcessorPool' will create a new Context Processor or will grab an existing Context Processor (selected using Round Robin algorithm) and will assign the context to it. This will provide a Context Processor for scheduling and executing the messages of the assigned Context. Default implementation of Context Processor provided by ACP is implemented using Threads.

Step - 3. Define the type of Message Processor(s) for the Context(s) [message processing]

ACP design allows for specifying the Message Processor type to be used while creating a Context. This design allows for specifying different Message Processor types for the different instances of the same context type. Following code snippet from demo application (ACPDemo) shows how to assign a Message Processor for an instance of a Context.

// Get a ContextManager

IContextManager    ctxMgr = ProviderManager.GetContextManager("Generic");
if (ctxMgr != null)
{
    // Create a Context if it does not exists

    IContext ctx = null;
    if (pubsCtx != null)
    {
        ctx = ctxMgr.GetContext(pubsCtx);
    }
    else
    {
        ctx = ctxMgr.CreateContext(ContextMsgQType.ContextLevel,"DBMsgProc");
        pubsCtx = ctx.ID;
        ctx.MessageProcessor.MessageProcessingCompleted = 
           new MessageProcessCompleteHandler(
           MessageProcessor_MessageProcessingCompleted_Pubs);
    }
}

Notable points in code snippet...

The 'DBMsgProc' string is a key in the ACPDemo application config file (ACPDemo.exe.config) that is mapped to a demo implementation class of 'IMessageProcessor' in ACP. This demo implementation has a good code base for handling database messages asynchronously but is in-complete at this stage. However, the current implementation of 'IMessageProcessor' (class DatabaseQueryMessageProcessor) in ACP gives a good start for users of ACP on the tasks involved in creating a Message Processor.

I'll update this section in the coming days with a fairly complete implementation of the 'DatabaseQueryMessageProcessor', and will try to highlight some guidelines and tips.

Step - 4. Post messages to the Context(s) and handle processed messages [message processing]

Now we have a Context to post messages, Context Processor to schedule the messages, and a Message Processor to process the messages posted to the Context. Here comes the actual story.

  1. How to post a message to the Context?

    You can use the 'QueueMessage' method on 'IContext' interface of Context object, to post a message to the Context for processing. Following is the code snippet taken from the demo application (ACPDemo). You can see the 'QueueMessage' in bold in the code snippet.

    IContextProcessorPool cpp = 
       cppMgr.CreateContextProcessorPool("Test", 
       ContextProcessorPoolMode.Push);
    if (cpp != null)
    {
        // Assign a ContextWorker to the Context
    
        cpp.AssignContext(ctx);
    
        // Create a DB message and queue it using context
    
        DBMessage msg = new DBMessage();
    
        string conStr = ConfigurationSettings.AppSettings["conStr"];;
        string query = ConfigurationSettings.AppSettings["query2"];;
        DBRequest request = new 
                DBRequest(conStr,query,DBRequest.DBQueryType.Select);
    
        msg.Request = request;
    
        // Queue the Message to be processed asynchronously by 
    
        // the Context Processor assigned to the context
    
        ctx.QueueMessage(msg,true);
        timer2.Enabled = true;
  2. How to retrieve the processed message?

    'IMessageProcessor' interface on Message Processor provides a delegate 'MessageProcessingCompleted', which is called after the Message Processor completes processing of each message queued to the corresponding Context. Following is the code snippet taken from the demo application (ACPDemo). You can see that the 'MessageProcessingCompleted' delegate is assigned to a private method in the 'form' class of the demo application. This method is called after each message for the corresponding Context is processed.

    ctx = ctxMgr.CreateContext(ContextMsgQType.ContextLevel,"DBMsgProc");
    authCtx = ctx.ID;
    ctx.MessageProcessor.MessageProcessingCompleted = new 
      MessageProcessCompleteHandler(
      MessageProcessor_MessageProcessingCompleted);
    
    private void MessageProcessor_MessageProcessingCompleted(IMessage msg)
    {
        object[] args = new object[] { msg };
        this.Invoke(new DisplayData(this.ShowDataset),args);
    }
  3. How to wait till all the pending messages are processed for a given Context?

    'IContext' interface provides a method 'WaitOnAsyncOperations', using which the caller can wait until all the pending messages for the calling Context are processed. This method supports a time-out to be specified for waiting, so that the caller need not be blocked indefinitely. Following is the code snippet taken from demo application (ACPDemo). You can see the usage of 'WaitOnAsyncOperations' method call, in bold in the code snippet.

    IContextProcessorPool cpp = 
      cppMgr.CreateContextProcessorPool("Test", 
                ContextProcessorPoolMode.Push);
    if (cpp != null)
    {
        // Assign a ContextWorker to the Context
    
        cpp.AssignContext(ctx);
    
        // Create a DB message and queue it using context
    
        DBMessage msg = new DBMessage();
    
        string conStr = ConfigurationSettings.AppSettings["conStr"];
        string query = ConfigurationSettings.AppSettings["query1"];
        DBRequest request = new DBRequest(conStr,query,
                                      DBRequest.DBQueryType.Select);
    
        msg.Request = request;
    
        // Queue the Message to be processed asynchronously by 
    
        // the Context Processor assigned to the context
    
        ctx.QueueMessage(msg,true);
        timer1.Enabled = true;
    
        // Wait on our select operation to complete
    
        while(ctx.WaitOnAsyncOperations(1) == false)
        {
            Application.DoEvents();
        }
  4. How to ask Context if it has finished processing a particular message?

    I'm working on this and will update this in the coming days.

3. Customizing/Extending ACP

Extensible points in ACP

The following 3 components of ACP are customizable (all these three components put together forms the ACP).

  1. Context Manager (creates and maintains Contexts).
  2. Context Processor Pool Manager (Creates and maintains Context Processor Pools).
  3. Message Processor (Processes messages).

The customization is possible through specifying the assembly and class name of your implementation of the interfaces related to the above three components in the ACP's hosting application config file.

Configuring the Extensible points in ACP

You need to specify a key value pair under the 'acpSettings' node of the ACP host application's config file. Following XML snippets show the configuration for various pluggable components in ACP.

  1. Message Processor
    <MessageProcessors>
       <add key="DBMsgProc" 
           value="DBMsgProc.dll, DBMessageProcessorClass" />
    </MessageProcessors>
  2. Context Manager
    <ContextManagers>
       <add key="MyContextManager" 
           value="MyContextManager.dll, MyContextManagerClass" />
    </ContextManagers>
  3. Context Processor Pool Manager
    <ContextProcessorPoolManagers>
       <add key="MyContextProcessorPoolManager" 
           value="MyContextProcessorPoolManager.dll, 
                  MyContextProcessorPoolManagerClass" />
    </ContextProcessorPoolManagers>

Note: For a sample config file, please see the application config file (ACPDemo.exe.config) for the demo application (ACPDemo).

4. Demo application

I tried to put the basic features together in the demo application, which processes database messages asynchronously. For the demo, I used the 'Pubs' database that comes with SQL Server and MSDE. To use the demo, you need to change the SQL Server/MSDE connection string and queries accordingly, in the demo exe config file (ACPDemo.exe.config), which is shipped with the demo download.

Demo application is simple in nature and is currently developed to show a quick usage of the ACP in a .NET application. The demo application shows two SQL queries (configured in the demo application config file, ACPDemo.exe.config) running parallel. Pardon me for in-completeness in the demo application. I'm planning to enhance the demo application in the coming days and update the article code accordingly.

5. Points of Interest

This is only Part-1 of my work. I'm planning to create a next part of this article that will cover more advanced techniques using ACP, like...

  1. Canceling a message processing request queued for a Context.
  2. Getting runtime statistics out of ACP infrastructure, like, number of ContextPools, Contexts, Messages, etc.
  3. More features in the DatabaseMessageProcessor implementation, like handling transactions, automatic commit, and automatic rollback on exceptions.
  4. In the current version of ACP, the Context class is implemented using in-memory queue and the Context Processor is implemented using .NET threads. In up coming parts of my ACP article series, I would discuss the implementation of Context and Context Processor using MSMQ and a dedicated Windows Service.

This is a short glimpse of what to expect in forthcoming parts of the ACP series of articles.

Finally, I hope the ACP framework will provide a good infrastructure for those working on asynchronous messaging based applications in the .NET world. Also, I'll keep updating this article in the coming days with any corrections, and will fill in with any left over concepts/info.

6. History

ACP v1.0.

7. Usage

This software is provided "as is" with no expressed or implied warranty. I accept no liability for any damage or loss of business that this software may cause.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here