SoapMSMQ Transport update for the WSE 3.0 Technology.
Contents
Web Services Enhancements (WSE) version 3.0 is a Microsoft product that enables building a secure Web Services and Message driven connectivity model using the latest technologies such as Visual Studio 2005 and .Net Framework 2.0. It was developed based on the version 2.0 SP3 and requirements of the interoperability to the new communication foundation model (WCF) for Windows 2k/XP and higher platforms. Note since the WCF (Indigo) will run only on the Windows 2k3/XP and higher platforms, so the WSE 3 will enable you to build the connected systems between the Win2k and higher platforms using the Messaging namespace based on the WS-* spec.
WSE 3.0 only has built-in transports for exchanging messages via the http and tcp protocols with capability hosted with or without an Http server. The custom transport can be built and plugged into the WSE 3.0 layer stack programmatically or administratively, based on the open messaging model.
The modern Enterprise Application model can be designed using the event driven architecture with encapsulating a business workflow into the pre-processing, processing and post-processing activities. All activities should be fully encapsulated from the business logic and moved to the underlying communication model. Microsoft offers very stable technology for asynchronously processing such as MSMQ 3.0 (note that its feature has been enhanced for WCF model - beta version of the MSMQ 3.5).
I built a SoapMSMQ Transport for WSE 2.0/SP3 based on the MSMQ 3.0 feature. The concept, design and details of its implementation can be downloaded here.
This article is focusing on the migrating SoapMSMQ Transport to the WSE 3.0 version, its configuration and usage. I am assuming that the reader is familiar with WSE 2/3 and MSMQ Technologies.
- Request message pattern (OneWay)
- Request - ReplyTo message pattern ( 2x OneWay)
- Context driven pattern
- Multiple transactional Requests
- DTC transactional Request
- Event driven listener
- Concurrent receivers
- Non-delivery handler
- MSMQ 3.0 support
- Configurable properties
Migrating SoapMSMQ (version WSE 2/SP3) has been done with the following minor changes:
- replacing namespaces
- replacing the SoapDimeFormatter by SoapPlainFormatter as a default formatter
- taking out the workaround of the WSE 2SP3 bug related to obtain a receiver from the SoapReceivers collection based on the EndpointReference key.
- removing override method
Capabilities
in the derived channel classes
- adding new method SoapBindingTransportUri in the transport requested by wsdl document
- fixing the msmq correlation message Id for Request/Response messaging, where guide schema (uuid) has been added in the RelatesTo identifier.
- using the Transaction namespace
- adding new feature - option to chance formatter in the config file (built-in formatters or custom formatter)
- standardizing a soap.msmq transport addressing based on the WCF (Indigo)
The soap.msmq
transport properties are cached in the SoapMsmqTransportOptions
object. Overwriting the default value for each property in the Options
object can be done programmatically or by using the host process config file as it is shown in the following snippets:
<configuration>
<configSections>
<section name="microsoft.web.services3"
type="Microsoft.Web.Services3.Configuration.WebServicesConfiguration,
Microsoft.Web.Services3, Version=3.0.0.0, Culture=neutral,
PublicKeyToken=31bf3856ad364e35" />
</configSections>
<microsoft.web.services3>
<diagnostics />
<messaging>
<transports>
<add scheme="soap.msmq"
type="RKiss.WseTransports.SoapMsmqTransport,
SoapMSMQ, Version=3.0.0.0, Culture=neutral, PublicKeyToken=1019d5a553d1e207">
<transactional enabled="true" />
<formatter type="SoapPlainFormatter" />
<sender>
<adminQueue path=".\private$\adminchannel" />
<toBeReceived timeInSec="30" />
<toBeReachQueue timeInSec="5" />
<toBeReplied timeInSec="120" />
<useDeadQueue enabled="true" />
</sender>
<receiver>
<workerThreads number="3" />
<errorQueue path=".\private$\reqchannel_error" />
</receiver>
</add>
<add schema="soap.null"
type="RKiss.WseTransports.SoapNullTransport,
SoapNULL, Version=3.0.0.0, Culture=neutral, PublicKeyToken=1019d5a553d1e207" />
</transports>
</messaging>
</microsoft.web.services3>
</configuration>
The above config file shows configuration of two custom transports: soap.msmq
and soap.null
.
The configuration section for soap.msmq
transport in the config file has been divided into common, sender and receiver sections. Note that the attribute names are case sensitive and the type of the transport has to be located on one line.
The soap.msmq
transport can be configured by the following properties:
Name and Default value | Transport | Note |
<transactionalenabled="true"/> | sender and receiver | sender and receiver will handle the message in the transactional manner. |
<formatter type="SoapPlainFormatter" /> | sender and receiver | name of the WSE 3.0 formatter or full qualified type of the Custom Formatter |
<adminQueue path=""/> | sender | administration queue for timeout messages. |
<toBeReceived timeInSec="0"/> | sender | timeout limit to retrieve a message by receiver. |
<toBeReachQueue timeInSec="0"/> | sender | timeout limit to deliver a message to the destination queue. |
<useDeadQueue enabled="true"/> | sender | option to use a dead queue. |
<workerThreads number="1"/> | receiver | number of worker threads to retrieve messages, max. number is 25. |
<errorQueue path=""/> | receiver | receiver exception queue (non SoapEnvelope, etc.). |
| | |
Endpoint Transport Address
The soap.msmq
transport URI address format must have a correct syntax to construct the MSMQ format name. The following formats can be used by soap.msmq
transport:
URI address (examples) | Comment |
soap.msmq://localhost/private/MyQueue | private queue on the local machine. |
soap.msmq://MyServer/private/MyQueue | private queue on the machine MyServer. |
soap.msmq://MyServer/MyQueue | public queue on the machine MyServer. |
soap.msmq://127.0.0.1/private/MyQueue | private queue on the machine defined by IP address. |
soap.msmq://234.1.1.1:4455 | broadcasting (multicast) message to the queues specified by this IP address and port. |
soap.msmq://MyServer/msmq/private/MyQueue | private queue over Internet. |
soap.msmq://127.0.0.1:1234/private/MyQueue | private queue on the machine specified by IP address and port. |
soap.msmq://localhost./MyQueue | public queue on the local machine. |
The following examples show possible usages of the soap.msmq
addressing:
- Send message to the endpoint with address
urn:myReceiver
via a local private 'MyQueue
' queue:
EndpointReference epr = new EndpointReference(new Uri("urn:myReceiver"));
epr.Via = new Uri(@"soap.msmq://localhost/private/reqchannel"));
SoapSender Sender = new SoapSender(epr);
Sender.Send(message);
- Send message to all
Endpoints
with urn:myReceiver
via a MSMQ broadcasting address 234.1.1.1 and port 4455:
EndpointReference epr = new EndpointReference(new Uri("urn:myReceiver"));
epr.Via = new Uri(@"soap.msmq://234.1.1.1:4455"));
SoapSender Sender = new SoapSender(epr);
Sender.Send(message);
- Send message to the
Endpoint
urn:myReceiver
via a private 'MyQueue
' queue located on MyServer over Internet (note that the server must have enabled a MSMQ component for this feature):
EndpointReference epr = new EndpointReference(new Uri("urn:myReceiver"));
epr.Via = new Uri(@"soap.msmq://MyServer/msmq/private/MyQueue"));
SoapSender Sender = new SoapSender(epr);
Sender.Send(message);
SoapEnvelope Context for soap.msmq transport
The soap.msmq
transport has the capability to be controlled by the SoapEnvelope
context based on the contract properties. The following table shows the properties available at the sender/receiver sides:
Name | Type | Used by | Note |
TransactionId | string | receiver | MSMQ transaction ID. |
IsFirstInTransaction | Boolean | receiver | true if the message is first in the single/multiple transaction. |
IsLastInTransaction | Boolean | receiver | true if the message is last in the multiple transaction or in the single transaction. |
Acknowledgment | object | receiver | classification of acknowledgment that the received message represents. |
MessageQueueTransaction | object | sender receiver | Object to provide a Message Queuing internal transaction (MSMQ). |
MessageLabel | string | sender receiver | message label |
The following examples show a usage of the above context properties:
- Example to identify a message position in the multiple transactions
bool first = Convert.ToBoolean(message.Context[SoapMSMQ.IsFirstInTransaction]);
- Example to setup a label of the MSMQ message (suitable for troubleshooting)
envelope.Context.Add(SoapMSMQ.MessageLabel, "This is a test label");
The SoapMSMQ Transport enables an exchange messages between the Channel layers on the following Message Exchange Pattern (MEP):
- Request (Datagram) - this is a one way MEP, where sender is sending a message in the Fire&Forget fashion. The following picture shows this messaging pattern:
The SoapClient uses a SendOneWay method performing a Request message in the fire&forget fashion without waiting for its reply. This call is done in the synchronously manner, therefore the client only has information (state) of the message processing. The above picture shows a logical pattern for this activity known as an AsyncWorkflow. There are two kinds of activities:
- SyncActivity has a responsibility preparing the business data, state and unique Workflow Context (if doesn't exist it) for business processing. This activity is known also as a Pre-Pocessor. The Workflow Context can persist in the sharable storage or passing in the message. The workflow identifier is returned back to the client as a Ticket for later correlation purposes. Note that message represents a stateful object in the Workflow.
- AsyncActivity is a business process that executes and updates workflow state.
The workflow activities are logically connected allowing to flow the workflow context and exchange messages based on connectivity activity. The SoapMSMQ Transport represents a physical implementation of that logical connectivity with an asynchronously activity. Let's start with few examples showing this logical connectivity driven by soap.msmq transport.
All of the following examples assumue the same SoapService for performing the business processing (OneWay and RequestResponse patterns). Note that the EchoResponse method is used by ReplyTo message pattern example.
class MyService : SoapService
{
[SoapMethod("Bar")]
public void Bar(string text)
{
Console.WriteLine(text);
}
[SoapMethod("Echo")]
public string Echo(string text)
{
string response = "ServiceEcho: " + text;
Console.WriteLine(response);
return response;
}
[SoapMethod("EchoResponse")]
public void EchoResponse(string text)
{
string response = "ServiceEchoResponse: " + text;
Console.WriteLine(response);
}
}
Service registration for the specific endpoint address:
EndpointReference epr = new EndpointReference(new Uri("urn:myService"));
epr.Via = new Uri(@"soap.msmq://localhost/private/reqchannel");
SoapReceivers.Add(epr, new MyService());
Note: The following examples are using two private transactional queues - ReqChannel
and RspChannel
.
Example 1.1 - Simple Async Workflow
Based on the above description, the following code snippet shows an example of the Pre-processor for SyncActivity processing . The Bar method represents a business method, where business logic can be incorporated. In our example, there are four steps. In the prolog of the method we created a workflow ticket and some business pre-processing. In the step 2 we are attaching a workflow header to the destination addressing, then we sent a Datagram and finally (epilog), we returned a ticket back to the caller.
class SimpleClient : SoapClient
{
public SimpleClient (EndpointReference destination) :
base(destination){}
public SimpleClient (Uri address, Uri via) :
base(new EndpointReference(address, via)) {}
[SoapMethod("Bar")]
public string Bar(string text)
{
string ticketId = Guid.NewGuid().ToString();
string reqtext = string.Concat(ticketId, ": ", text);
Destination.ReferenceProperties = WorkflowContext(ticketId);
base.SendOneWay("Bar", reqtext);
return ticketId;
}
private ReferenceProperties WorkflowContext(string ticketId)
{
XmlDocument doc = new XmlDocument();
doc.LoadXml(string.Format(
@"<wsa:ReferenceProperties xmlns:wsa='{0}'>
<ticketId>{1}</ticketId>
</wsa:ReferenceProperties>", WSAddressing.NamespaceURI, ticketId));
return new ReferenceProperties(doc.DocumentElement);
}
}
Now, if we have a Pre-processor, the following code snippet is showing its usage:
EndpointReference epr = new EndpointReference(new Uri("urn:myReceiver"));
epr.Via = new Uri(@"soap.msmq://localhost/private/reqchannel");
SimpleClient simpleClient = new SimpleClient(epr);
string ticketId = simpleClient.Bar("This is a Datagram message");
The datagram sent by Bar method is transactional under the MSMQ resource manager. The message is going to be delivered immediately in the step 3. Practically, in most cases, the SyncActivity handles more than one resources such as database, MSMQ, filesystem, etc. In those cases, it is necessary to use a transactional scope to perform a workflow in the ACID manner. Let's modify our SimpleClient for this feature in the following example:
Example 1.2 - Simple DTC Async Workflow
Adding a resource in the sync workflow activity will require using the DTC transactional coordinator with a 2 phase commit. Our Async Workflow will look like the following picture. In this example, both business actions are incorporated into the SyncActivity while using the Enterprise Database to share a Workflow and Business state. This is a most common pattern in the event driven architecture.
Now, based on the business requirements, the SynchActivity can require to create a new transaction or be a part of the transactional context. In the following code snippet, the SyncActivity has been required to be a root of the new transaction scope, therefore, its commit is not dependent from the caller transactional context.
[SoapMethod("Bar")]
public string Bar(string text)
{
string ticketId = string.Empty;
using (TransactionScope txscope =
new TransactionScope(TransactionScopeOption.RequiresNew, TimeSpan.FromSeconds(20)))
{
ticketId = Guid.NewGuid().ToString();
string reqtext = string.Concat(ticketId, ": ", text);
Destination.ReferenceProperties = WorkflowContext(ticketId);
base.SendOneWay("Bar", reqtext);
Console.WriteLine("Press key 'a' to abort this workflow or any key to continue ...");
if(Console.ReadKey(true).Key == ConsoleKey.A)
throw new Exception("The workflow has been aborted");
txscope.Complete();
}
return ticketId;
}
As we can see, the Bar method has two Actions (step 3 and 3a) operated by resources such as soap.msmq transport and Database (simulated by keyboard). Sending Datagram (inserting a message in the queue) is depend from the successful transaction represents by invoking the TransactionScope.Complete method. All resources will commit when txscope has been closed. In this moment, the Datagram is sent to the destination queue. In case of the remote queue, the Datagram is temporary stored in the local Outgoing queue of the MSMQ manager and then deliver over network to the destination queue. Note that the unsuccessful delivery will not rollback a transaction and business compensator should be used to recover data or state.
Example 1.3 - Multiple (background) DTC Async Workflows
The .net 2.0 Transaction namespace simplified usage of the transaction. This example shows how simply and straightforward it is to implement the transactional background activity. The transactional SyncActivity pre-processor can delegate an activity to the transactional dependent worker threads and commits it until all workers are completed.
In this case, the Bar method represents a transactional parent activity where the worker threads are created and passed into the ThreadPoolQueue for their processing. Each worker thread has own state populated by the business logic and workflow context.
After that, the parent transaction scope is completed and awaits for votes from the individual worker threads (activities). All worker threads must be completed otherwise the parent transaction will be aborted..
The following code snippet shows implementation of the Bar method:
[SoapMethod("Bar")]
public string[] Bar(string text)
{
int numberOfTx = 5;
string[] ticketIds = new string[numberOfTx];
using (TransactionScope txscope =
new TransactionScope(TransactionScopeOption.Required, TimeSpan.FromSeconds(20)))
{
for (int ii = 0; ii < numberOfTx; ii++)
{
MyState mystate = new MyState();
mystate.dtx =
Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
mystate.ticketId = Guid.NewGuid().ToString();
mystate.text = text;
mystate.ii = ii;
ThreadPool.QueueUserWorkItem(new WaitCallback(MyWorker), mystate);
ticketIds[ii] = mystate.ticketId;
}
txscope.Complete();
}
return ticketIds;
}
The following code snippet shows the implementation of the worker thread. It looks like the original Bar method, but the transaction scope has been created from the parent transaction dependencies, therefore it is necessary to perform two commits such as transaction in the scope and then dependent transaction. Note that the parent transaction behavior can be configured based on the business needs such as BlockCommitUntilComplete and RollbackIfNotComplete.
public void MyWorker(object state)
{
try
{
MyState mystate = state as MyState;
using (TransactionScope txscope = new TransactionScope(mystate.dtx))
{
string reqtext = string.Concat(mystate.ii, ", ", mystate.ticketId, ": ", mystate.text);
Destination.ReferenceProperties = WorkflowContext(mystate.ticketId);
base.SendOneWay("Bar", reqtext);
txscope.Complete();
}
mystate.dtx.Complete();
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}
internal class MyState
{
public DependentTransaction dtx;
public string ticketId;
public string text;
public int ii;
}
That is all for Datagram message pattern. The Datagram messaging represents a fundamental communication pattern between the source and target. Based on that, other message exchange patterns such as the Request-ReplyTo and Request-Response can be created. The SoapMSMQ Transport supports these patterns. Let's describe their usage:
- Request-ReplyTo - is based on the two Datagrams. The sender is sending a Datagram (Request) message with a ReplyTo endpoint address without waiting for its response. This is the most common message pattern in the event driven architecture. Based on the ReplyTo address, the AsyncActivity is responsible for sending a notification datagram or result of the business processor to the specified endpoint to complete a AsyncWorkflow. This last state of the workflow activity (PostActivity) is known as a Post-Processor. The following picture shows a Request-ReplayTo message pattern:
This pattern forwards the message between the workflow activities in the Push model fashion. The current activity needs to known only the next activity (ReplyTo endpoint address). In the special case, the PostActivity can also be a SyncActivity, in other words, the post-processor and pre-processor can be combined in one SyncActivity, allowing to flow and chaining the Workflow Contexts.
The following picture shows a chaining AsyncWorkflows:
The special case of the last PostActivity can be a Datagram broadcasting - Notification message. The soap.msmq transport support this feature. The broadcasting message is very useful message pattern for unknown number of subscribers or implicitly expanding environment such as clustering. The following picture shows ReplyTo pattern with a broadcasting PostActivity:
Example 2.1 - Simple Request-ReplyTo Async Workflow
We can process Request-ReplyTo message pattern by adding the following Echo method in our test SimpleClient class:
[SoapMethod("Echo")]
public void Echo(string text)
{
EndpointReference eprReplay = new EndpointReference(new Uri("urn:myReceiver"));
eprReplay.Via = new Uri(@"soap.msmq://localhost/private/reqchannel");
SoapEnvelope envelope = new SoapEnvelope();
envelope.SetBodyObject(text);
envelope.Context.Addressing.ReplyTo = new ReplyTo(eprReplay);
base.SendOneWay("Echo", envelope);
}
As the above code snippet shows, the Echo method sent a Request Datagram with a specified endpoint address where its Response will be accepted. This very powerful message pattern is supported by WS-* spec. It is a perfect pattern for event driven architecture. Based on the DataContract, the Response payload can carry a business result or notification (Workflow state) of the result availability . Note that this pattern also supports a DTC transaction (Please see more details in the above Examples 1.2 and 1.3).
- Request - Response - this is a special case of the Request-ReplyTo message pattern, where the client (sender of the Request) is waiting (specific amount time) for a Response Datagram itself. This pattern is known as half-duplex communication. The usage of the RequestResponse message pattern can be applied only for connected systems (service and its consumer must be physical connected within the response time). The soap.msmq transport supports this MEP for talking to applications such as Biztalk. The following picture shows a RequestResponse message pattern:
The concept of the message exchange is based on the inner SoapClientAsyncResult class which sends a Request and waits for its Response within the timeout limit. Of course, the client needs to populate a ReplyTo address for this message pattern.
Example 3.1 - Simple Request/Response Async Workflow
Adding the following Echo method in our test SimpleClient class we can process Request-Response message pattern:
[SoapMethod("Echo")]
public string Echo(string text)
{
EndpointReference eprReplay = new EndpointReference(new Uri("urn:myClient"));
eprReplay.Via = new Uri(@"soap.msmq://localhost/private/rspchannel");
SoapEnvelope envelope = new SoapEnvelope();
envelope.SetBodyObject(text);
envelope.Context.Addressing.ReplyTo = new ReplyTo(eprReplay);
return (string)base.SendRequestResponse("Echo", envelope).GetBodyObject(Type.GetType("System.String"));
}
Note that the ReplyTo endpoint address has to be created and assigned to the Request message. From the client point of view, all message exchange is fully transparent in the RPC fashion and it looks like transport via tcp or http.
How about the DTC Request-Response messaging?
Well, there is a simply answer - Impossible. The client would wait for a response message that never came, because the Request datagram can not be committed (sent to the destination queue) within the transaction scope. It is absolutely correct behavior for DTC transaction. Basically, there are tree MSMQ transaction: Pre-Processor, Processor and Pos-Processor. We can not combine the Pre-Processor and Post-Processor activities under the same transaction context.
The SoapMSMQ Transport can be tested with simple console programs such as client and server. There are included in the download. Note that the client has many test options commented and its code has been written "dirty" for test purpose only!
The following screen snaps show a start up test on the server and client host processes. Before run the test, be sure that your machine has the following resources:
- .Net 2.0
- WSE 3.0 installed
- SoapMSMQ Transport (msi package)
- Private transactional queues: ReqChannel, RspChannel, ReqChannel2, ReqChannel_Ack, ReqChannel_Error
Using Async Workflow in the Web Enterprise Application enables creating a distributed middle tier, where web services are playing significant role. The WSE 3.0 is a technology which can implement a Service Oriented Architecture (SOA) driven model. Encapsulating business logic from the connectivity enables creating a Logical Model Connectivity where physical tiers can be deployed based on the needs. The SoapMSMQ Transport can be a part of this hierarchy.
I will finish this article unusually - showing another usage of the soap.msmq transport in the typical clustering Web Application where user submits a Request and awaits for its Response. In past 5 years, accessing businesses via Web Services has been rapidly increased. The typical 3 tiers architecture where middle tier accessing only one resource (local or Enterprise database) has been changed by Web Services and SAO. We can see that the middle tier giving a new dimension - Distributed Middle Tier.
What about the Front-End?
Well, the typical synchronously Request/Response will not work properly in the real world, where access to the distributed resources in the loosely coupled manner is not guaranteed and Front-end will timeout. Using the AsyncWorkflow in the web application is a suitable solution where the business workflow can be divided into the Sync and Async parts. The Sync part always handles local resources, so it is very fast to accept the user request. Once the application accepted request, the Async activity can be processed across the distributed middle tier. This is a typical Push model pattern.
What about the Front-end? How does the Front-end get the Response? What is the Front-end doing during this time? etc.
Those questions are correct. Keep in mind, the Front-end is only capable of using a Pull data model pattern. In non-event driven architecture, the concept of the polling pattern is based on the periodically visit the Response in the database. This solution has a many side effects based on the business requirements.
Is there another, a better solution, to get the Response, for example in the synchronously way?
Yes, there is. Using the event driven architecture, where the AsyncActivity will generate a notification for post-processing activity to complete a business workflow. The following picture shows a design pattern used on the clustering environment transparently without the knowledge of the WebGardens and their processes and also the WebFarms.
The concept of the above design is based on the soap.msmq transport with a multicast notification message. The Front-end is waiting (or waiting + polling) a local resource on the WebFarm such as local (private) notification queue configured for multicast listener. Once the notification message arrived, the front-end can pick-up the Response (business result) from the database. The waiting time can be animated in the browser using the progress bar (for instance, like http://www.expedia.com/ is doing). From the user point of view it looks like a synchronously request. This pseudo-sync design pattern allowing better usage of the local resources without loosing the submitted requests. For instance, when user will close the browser during a business processing and reopen again, the process can be recovered with only one access to the database.
That's all for now about the soap.msmq transport. I am working on the soap.null transport which it enables building the AsyncWorkflow boilerplate administratively (based on the configuration metadata) dividing into the separate services (activities) such as Pre-Pocessor, Processor and Post-Processor and as always I will post it on the codeproject.