1. Introduction
.NET asynchronous programming pattern combined with .NET Thread Pool, provides a
good start for building asynchronous applications. But there is lot more
involved in building maintainable, scalable, and extensible enterprise
applications that use asynchronous message processing at their core.
ACP is a framework that provides basic building blocks in the form of interfaces
and infrastructure classes for designing and developing enterprise class .NET
applications using asynchronous message processing. This article explains the
components of ACP and how to use them from a .NET application. The source code
of ACP and the demo application is available for download from this article.
ACP is modeled using interface based design. Interface based design combined
with reflection API in .NET allows us to build extensible applications with
pluggable components. Check the 'Customizing/Extending ACP' section (Section-3)
on how ACP uses this capability of .NET to allow pluggable components in the
ACP framework itself.
Components of ACP
Let us start by listing key components required to build a maintainable,
scalable, and extensible asynchronous message processing infrastructure for an
Enterprise application (.NET application, in our case) and how they are defined
using different interfaces in ACP.
-
Message. We need a good .NET type that can model any type of message.
ACP provides this in the form of
IMessage
interface whose methods and properties model a SOAP message (fairly confirming
to industry standards for interoperability).
-
Message Processor. We need a type that can understand and process our
message. In ACP, this is called a Message Processor and is defined by an
interface named '
IMessageProcessor
'. This interface allows a
custom message processor object to be plugged into the application built using
ACP for any new type of messages.
-
Context. We need a Context to establish a relation between logically
related messages of a particular message type. In the current implementation of
ACP, a Context is defined using an interface called
IContext
.
All the messages related to a context are processed in serial order. This
provides data integrity among logically related messaging tasks.
-
Context Manager. We need a Context Manager to create and maintain
Contexts. In ACP, this is defined using an interface named '
IContextManager
'.
-
Context Processor. We need an object that can schedule and execute our
message processing related to a Context. This object could be a Thread within
the same AppDomain as ACP host application, or another AppDomain in a different
process, or another machine altogether. In ACP, this is called Context
Processor and is defined using an interface named '
IContextProcessor
'.
Treating a context processor as an object allows for design and development of
more scalable and extensible applications. Today, a Thread and an in-memory
queue for asynchronous message processing might suite the application
requirement. But tomorrow, as the load on the system grows, it would be
scalable to use a different system and a more robust and reliable messaging
infrastructure (like MSMQ) for asynchronous message processing.
-
Context Processor Pool. Having one Context Processor for each context
would be a very costly affair and may not scale well. Think of an Enterprise
sales system, which is accessed by hundreds of vendor applications and
thousands of end-users. In such an application, maintaining a Context Processor
per Context would bring down the system in no time. Better way would be to
maintain a pool of Context Processors that can handle message processing for
more than one context.
In ACP, this is defined using an interface called IContextProcessorPool
.
This interface allows to specify a name for the Context Processor pool, maximum
and minimum context processors for the pool, and finally a very important
attribute for the pool, which defines the behavior of the Context Processors in
the pool. This is called the 'Message Processing Mode'. This value could be a
'Pull' or 'Push'.
In Pull model, each Context Processor will continuously monitor its assigned
Contexts for any messages. This is suitable for time-critical applications with
minimum number of Context Processors.
In Push model, each Context will notify its assigned Context Processor whenever
it has a message. This model is suitable for situations that demand more
contexts and not so time-critical processing.
There is a trade-off in both models. Pull model is CPU intensive and Push model
is memory intensive. Currently, .NET framework offers a process wide Thread
pool. But, is this good enough for an Enterprise application that relies
heavily on asynchronous messaging? My answer is no. .NET thread pool is hard to
manage for several reasons. Thread Pool threads do not have identity and
behavior, and once a request is submitted to the Thread pool, there is no way
to cancel it. Moreover, they are in-process. ACP based Context Processors and
the Context Processor pool has identities, configurable behavior, and are
location independent, which makes them ideal candidates for designing
maintainable, scalable and extensible asynchronous messaging based
applications.
-
Context Processor Pool Manager. In an Enterprise application, execution
of different modules need to be handled differently. So, having one Context
Processor Pool with a fixed setting may not cater to all the different context
in which the application runs. To handle this effectively, ACP defines an
object called Context Processor Pool Manager, using an interface (guess) '
IContextProcessorPoolManager
'.
This allows us to create and maintain more than one Context Processor Pool with
different settings (different message processing mode, Push/Pull, different
number of Max and Min Context Processors, etc.).
2. Using ACP in a .NET (C#) application
OK, all the above stuff we discussed involves theory, based on some abstract
interfaces. But, who is going to provide implementation? Fortunately, ACP comes
with a default generic implementation of all the above interfaces, and a demo
interface implementation for IMessageProcessor
, which is largely
application specific. The demo IMessageProcessor
implementation is
a database message processor, that can handle select
queries
asynchronously.
This article does not explain step-by-step of the ACP's internal logic for the
default implementation of its interfaces. ACP code is organized well, and I
tried to document code in a decent manner (code comments using C# XML code
comment tags), so that an average .NET and C# person can easily figure out
what's happening inside. But, I'll walk through the important points in using
ACP in a .NET (C#)application, just to give a head start and to explain certain
important concepts, which might take up your valuable time to figure out
yourself from code.
Step - 1. Identify and Create the Context(s) [message queuing]
To start with, in ACP, as discussed in Introduction, everything is
centered around a Context. So, the first step to use ACP in your application is
to identify what a Context means to your application. Basically, a Context
means a group of messages that have to be processed in sequence. But this is a
very abstract definition. To help you, here is a typical example of a
Context...
For instance, in a Purchase Order application, during order placing process, you
would like to update the database and then send an e-mail to the customer
regarding follow-up information. These two tasks have to be executed in
sequence. So ideally, these tasks fall under a Context. You should be careful
enough to distinguish between a Context and database/application level
Transactions. A Transaction could be treated as a Context by executing all the
tasks involved in a Transaction inside a Context. But vice-versa is not true.
I.e., a Context is not equivalent to a Database Transaction, where all the
operations succeed or fail for a given instance of the Transaction. Whereas a
Context is not bothered about the success or failure of the messages processed
by it, Context is more concerned about the order in which messages are
processed, so that messages are processed in the order they are posted to the
Context.
Do not get lost in the terminology. Core of ACP is queuing messages and
processing them. ACP wraps this simple concept into a powerful infrastructure
used for design and development of .NET applications based on asynchronous
message processing. Asynchronous message based design will allow the
applications to scale to heavy loads, be loosely coupled, highly interoperable
and extensible, and finally, easily maintainable. Also, ACP is not meant
to replace any technique available in .NET or Microsoft Windows platform for
asynchronous messaging. Rather, ACP provides a framework that abstracts the
underlying message queuing, message scheduling, and message
processing sub-systems of the applications, thus providing more control
on how an application evolves.
Code snippet, finally... The following code snippet shows you how to create a
new Context or get hold of an existing Context from a .NET (C#) application.
This code snippet is extracted from the demo application (ACPDemo), so please
go through the demo application for the meaning of the variables used.
IContextManager ctxMgr = ProviderManager.GetContextManager("Generic");
if (ctxMgr != null)
{
IContext ctx = null;
if (pubsCtx != null)
{
ctx = ctxMgr.GetContext(pubsCtx);
}
else
{
ctx = ctxMgr.CreateContext(ContextMsgQType.ContextLevel,"DBMsgProc");
pubsCtx = ctx.ID;
ctx.MessageProcessor.MessageProcessingCompleted =
new MessageProcessCompleteHandler(
MessageProcessor_MessageProcessingCompleted_Pubs);
}
}
Notable points (bold letter words) from code snippet...
-
'
ProviderManager
' is an ACP class that allows the application to
choose the Context Manager implementation classes dynamically. These classes
along with their assemblies are configurable in the application config file (xxx.exe.config).
See the Section-3 for more details about customizing ACP. In our code snippet,
I'm using the key name 'Generic' for Context Manager, which is mapped to the
default implementation of 'IContextManager
' interface provided by
ACP. This mapping is defined in the demo application config file (ACPDemo.exe.config).
-
'
GetContext
' method of 'IContextManager
' takes the
unique ID of the Context (in the default implementation of 'IContextManager
'
provided by ACP, this is the string form of the 'Ticks' taken from current DateTime
at the creation of the Context) and retrieves the IContext
interface
on the Context object. ACP comes with a default 'IContext
'
implementation, which uses in-memory synchronized queue for storing messages.
-
'
CreateContext
' method of IContextManager
creates a
new Context. Observe the 'ContextMsgQType
'. This is an enumeration
that has two values. 'ContextLevel
' and 'Global
'. 'ContextLevel
'
means the context thus created will have its own private message queue for
storing messages posted to it. 'Global
' means the context thus
created will share a AppDomain wide common queue for storing messages posted to
it.
-
'
DBMsgProc
' is a key, which is mapped to a demo implementation of
'IMessageProcessor
' interface. More about this in Step-3.
Step - 2. Define a Pool of Context Processors to Schedule and execute Message
Processing [message scheduling]
Next thing is to define a pool of Context Processors for scheduling and
executing the messages corresponding to the Context. Following code snippet
from demo application (ACPDemo) shows how to create a Context Processor Pool
and assign Context to one of its Context Processors.
IContextProcessorPoolManager cppMgr =
ProviderManager.GetContextProcessorPoolManager("Generic");
if (cppMgr != null)
{
IContextProcessorPool cpp =
cppMgr.CreateContextProcessorPool("Test",
ContextProcessorPoolMode.Push);
if (cpp != null)
{
cpp.AssignContext(ctx);
Notable points in code snippet...
-
'
GetContextProcessorPoolManager
' static method of ProviderManager
is used to get the 'IContextProcessorPoolManager
' interface
reference on the Context Processor Pool Manager implementation mapped to the
key 'Generic
'. 'Generic
' key in this case is mapped
to the default implementation of 'IContextProcessorPoolManager
'
provided by ACP. This mapping is defined in the demo application config file (ACPDemo.exe.config).
As described in 'Introduction' (Section-1), Context Processor Pool Manager is
used to manage a name based map of Context Processor Pools.
-
'
CreateContextProcessorPool
' method of 'IContextProcessorPoolManager
'
will create a new Context Processor Pool (implementing 'IContextProcessorPool
')
with the given name and the 'Context Processor Pool Mode' (Mode is discussed in Introduction
section). The default implementation of 'IContextProcessorPoolManager
'
provided by ACP, will retrieve an existing Pool object if the properties match
(i.e., if the name and the mode are same as that of an existing pool).
-
'
AssignContext
' method of 'IContextProcessorPool
'
will create a new Context Processor or will grab an existing Context Processor
(selected using Round Robin algorithm) and will assign the context to it. This
will provide a Context Processor for scheduling and executing the messages of
the assigned Context. Default implementation of Context Processor provided by
ACP is implemented using Threads.
Step - 3. Define the type of Message Processor(s) for the Context(s) [message
processing]
ACP design allows for specifying the Message Processor type to be used while
creating a Context. This design allows for specifying different Message
Processor types for the different instances of the same context type. Following
code snippet from demo application (ACPDemo) shows how to assign a Message
Processor for an instance of a Context.
IContextManager ctxMgr = ProviderManager.GetContextManager("Generic");
if (ctxMgr != null)
{
IContext ctx = null;
if (pubsCtx != null)
{
ctx = ctxMgr.GetContext(pubsCtx);
}
else
{
ctx = ctxMgr.CreateContext(ContextMsgQType.ContextLevel,"DBMsgProc");
pubsCtx = ctx.ID;
ctx.MessageProcessor.MessageProcessingCompleted =
new MessageProcessCompleteHandler(
MessageProcessor_MessageProcessingCompleted_Pubs);
}
}
Notable points in code snippet...
The 'DBMsgProc
' string is a key in the ACPDemo application config
file (ACPDemo.exe.config) that is mapped to a demo implementation class
of 'IMessageProcessor
' in ACP. This demo implementation has a good
code base for handling database messages asynchronously but is in-complete at
this stage. However, the current implementation of 'IMessageProcessor
'
(class DatabaseQueryMessageProcessor
) in ACP gives a good start
for users of ACP on the tasks involved in creating a Message Processor.
I'll update this section in the coming days with a fairly complete
implementation of the 'DatabaseQueryMessageProcessor
', and will
try to highlight some guidelines and tips.
Step - 4. Post messages to the Context(s) and handle processed messages
[message processing]
Now we have a Context to post messages, Context Processor to schedule the
messages, and a Message Processor to process the messages posted to the
Context. Here comes the actual story.
-
How to post a message to the Context?
You can use the 'QueueMessage
' method on 'IContext
'
interface of Context object, to post a message to the Context for processing.
Following is the code snippet taken from the demo application (ACPDemo). You
can see the 'QueueMessage
' in bold in the code snippet.
IContextProcessorPool cpp =
cppMgr.CreateContextProcessorPool("Test",
ContextProcessorPoolMode.Push);
if (cpp != null)
{
cpp.AssignContext(ctx);
DBMessage msg = new DBMessage();
string conStr = ConfigurationSettings.AppSettings["conStr"];;
string query = ConfigurationSettings.AppSettings["query2"];;
DBRequest request = new
DBRequest(conStr,query,DBRequest.DBQueryType.Select);
msg.Request = request;
ctx.QueueMessage(msg,true);
timer2.Enabled = true;
-
How to retrieve the processed message?
'IMessageProcessor
' interface on Message Processor provides a
delegate 'MessageProcessingCompleted
', which is called after the
Message Processor completes processing of each message queued to the
corresponding Context. Following is the code snippet taken from the demo
application (ACPDemo). You can see that the 'MessageProcessingCompleted
'
delegate is assigned to a private method in the 'form' class of the demo
application. This method is called after each message for the corresponding
Context is processed.
ctx = ctxMgr.CreateContext(ContextMsgQType.ContextLevel,"DBMsgProc");
authCtx = ctx.ID;
ctx.MessageProcessor.MessageProcessingCompleted = new
MessageProcessCompleteHandler(
MessageProcessor_MessageProcessingCompleted);
private void MessageProcessor_MessageProcessingCompleted(IMessage msg)
{
object[] args = new object[] { msg };
this.Invoke(new DisplayData(this.ShowDataset),args);
}
-
How to wait till all the pending messages are processed for a given Context?
'IContext
' interface provides a method 'WaitOnAsyncOperations
',
using which the caller can wait until all the pending messages for the calling
Context are processed. This method supports a time-out to be specified for
waiting, so that the caller need not be blocked indefinitely. Following is the
code snippet taken from demo application (ACPDemo). You can see the usage of 'WaitOnAsyncOperations
'
method call, in bold in the code snippet.
IContextProcessorPool cpp =
cppMgr.CreateContextProcessorPool("Test",
ContextProcessorPoolMode.Push);
if (cpp != null)
{
cpp.AssignContext(ctx);
DBMessage msg = new DBMessage();
string conStr = ConfigurationSettings.AppSettings["conStr"];
string query = ConfigurationSettings.AppSettings["query1"];
DBRequest request = new DBRequest(conStr,query,
DBRequest.DBQueryType.Select);
msg.Request = request;
ctx.QueueMessage(msg,true);
timer1.Enabled = true;
while(ctx.WaitOnAsyncOperations(1) == false)
{
Application.DoEvents();
}
-
How to ask Context if it has finished processing a particular message?
I'm working on this and will update this in the coming days.
3. Customizing/Extending ACP
Extensible points in ACP
The following 3 components of ACP are customizable (all these three components
put together forms the ACP).
-
Context Manager (creates and maintains Contexts).
-
Context Processor Pool Manager (Creates and maintains Context Processor Pools).
-
Message Processor (Processes messages).
The customization is possible through specifying the assembly and class name of
your implementation of the interfaces related to the above three components in
the ACP's hosting application config file.
Configuring the Extensible points in ACP
You need to specify a key value pair under the 'acpSettings
' node
of the ACP host application's config file. Following XML snippets show the
configuration for various pluggable components in ACP.
-
Message Processor
<MessageProcessors>
<add key="DBMsgProc"
value="DBMsgProc.dll, DBMessageProcessorClass" />
</MessageProcessors>
-
Context Manager
<ContextManagers>
<add key="MyContextManager"
value="MyContextManager.dll, MyContextManagerClass" />
</ContextManagers>
-
Context Processor Pool Manager
<ContextProcessorPoolManagers>
<add key="MyContextProcessorPoolManager"
value="MyContextProcessorPoolManager.dll,
MyContextProcessorPoolManagerClass" />
</ContextProcessorPoolManagers>
Note: For a sample config file, please see the application config
file (ACPDemo.exe.config) for the demo application (ACPDemo).
4. Demo application
I tried to put the basic features together in the demo application, which
processes database messages asynchronously. For the demo, I used the 'Pubs'
database that comes with SQL Server and MSDE. To use the demo, you need to
change the SQL Server/MSDE connection string and queries accordingly, in the
demo exe config file (ACPDemo.exe.config), which is shipped with the
demo download.
Demo application is simple in nature and is currently developed to show a quick
usage of the ACP in a .NET application. The demo application shows two SQL
queries (configured in the demo application config file, ACPDemo.exe.config)
running parallel. Pardon me for in-completeness in the demo application. I'm
planning to enhance the demo application in the coming days and update the
article code accordingly.
5. Points of Interest
This is only Part-1 of my work. I'm planning to create a next part of this
article that will cover more advanced techniques using ACP, like...
-
Canceling a message processing request queued for a Context.
-
Getting runtime statistics out of ACP infrastructure, like, number of
ContextPools, Contexts, Messages, etc.
-
More features in the
DatabaseMessageProcessor
implementation, like handling transactions, automatic commit, and automatic
rollback on exceptions.
-
In the current version of ACP, the Context class is implemented using in-memory
queue and the Context Processor is implemented using .NET threads. In up coming
parts of my ACP article series, I would discuss the implementation of Context
and Context Processor using MSMQ and a dedicated Windows Service.
This is a short glimpse of what to expect in forthcoming parts of the ACP series
of articles.
Finally, I hope the ACP framework will provide a good infrastructure for those
working on asynchronous messaging based applications in the .NET world. Also,
I'll keep updating this article in the coming days with any corrections, and
will fill in with any left over concepts/info.
6. History
ACP v1.0.
7. Usage
This software is provided "as is" with no expressed or implied warranty. I
accept no liability for any damage or loss of business that this software may
cause.