Contents
The WSE 2.0 Messaging introduces a new message driven mechanism to exchange data between the service and its consumer based on the WS-* contract. The three levels of WSE2 lightweight Messaging infrastructure allow for programmatically creating a single communication channel and handle its behavior and passing the SOAP envelope message.
The ServiceBus
represents a logical connectivity of the Connected Systems, encapsulation of the business layer from the communication services. The Logical Model Connectivity uses the ServiceBus
to connect any number of systems based on the Connectivity Knowledge Base (KB). Data in the KB represents metadata of the physical connectivity and message process behavior. It can be persisted in the virtual storage such as database, config file, message, etc. The Connectivity Knowledge Base has a capability for tuning the business workflow processing through the service oriented and message driven architecture.
Plugging the ServiceBus
into the application process (creating the Connected Systems) enables a connectivity via WS-* standard with any member of the party in a fully transparent manner. The following picture shows the WSE2 Messaging stack connected to the ServiceBus
.
As the above picture shows, the business layer is sitting on top of the Level 2 with a full encapsulation from the message processing. This article describes how to create a connectivity metadata (KB) for WSE2 stack and attach it to the ServiceBus
using the configuration paradigm. The core of the ServiceBus
is capable of self-building managers to manage a standard workflow processing such as open/close services, attaching services, WS Eventing, etc.
The following picture shows an example of the ServiceBus
components:
The Bootstrapper and Bus Manager are two built-in managers at the core of the ServiceBus
. Based on the pre-built templates and filters, the Bootstrapper can perform a "Bus booting" and forward a control to the Bus Manager. The "boot" configuration is always located in the host process config file, but the application config can be loaded from any source such as database, incoming messages, file system, etc. Built-in Logger to the configured subscriber publishes the behavior of message processing via the Bus.
The following code snippet shows an example of the Bootstrapper config metadata on the ServiceBus
. Note that the ref
attribute is referencing to the knowledge base of the templates (in the sample solution, can be located in the same config file).
<rksb:Bootstrapper mode="on" enablelog="true">
<wsa:EndpointReference>
<wsa:Address>soap.tcp://localhost:911/Bootstrapper</wsa:Address>
</wsa:EndpointReference>
<rksb:InputFilters mode="on">
<rksb:Filter name="OpenServices" mode="on"
type="RKiss.ServiceBus.Filters.OpenServicesInputFilter"/>
</rksb:InputFilters>
<rksb:OutputFilters mode="on">
<rksb:Filter name="NextTo" mode="on"
type="RKiss.ServiceBus.Filters.SendMessageOutputFilter" />
<rksb:Filter name="DisableLogger" mode="off"
type="RKiss.ServiceBus.Filters.SendMessageOutputFilter"/>
</rksb:OutputFilters>
<rksb:SendMessages name="Bootstrapper" enable="true">
<rksb:SendMessage name="BootMessage" ref="BootstrapMessage" />
<rksb:SendMessage name="LoggerTo" ref="Logger"/>
<rksb:SendMessage name="NextTo" ref="StartUp"/>
<rksb:SendMessage name="DisableLogger" ref="DisableLogger" />
</rksb:SendMessages>
<rksb:AddInfo/>
</rksb:Bootstrapper>
The communication contract on the ServiceBus
is defined by WS-* and is implemented in the WSE2 Messaging namespace. Basically, the SoapService
is published (registered) at the specified EndpointReference
and ready to receive an incoming message formatted as a SoapEnvelope
. The following picture shows a basic scenario of this loosely coupled connectivity.
This article focuses on creating a Logical Connectivity driven by the Config Knowledge Base such as creating services, clients, messages, workflow processing, managers, etc. This article assumes that you have a knowledge of the WS-* spec and WSE2 Messaging namespace. Note that this solution is a pre-release version.
The ServiceBus
has the following features:
- Business layer encapsulation
- Loosely coupled configuration
- Boot service
- Built-in Message Logger
- Config driven by Knowledge Base (References)
- Built-in Services and Filters
- Open design pattern
- Plug-in Connected Systems
- Built-in support for IIS hosting
- Migration to the Indigo
- Workflow message processor (not included in this version)
- WS-Eventing (not included in this version)
- Store Manager (not included in this version)
The ServiceBus
concept is based on creating and activating its resources such as services, clients, messages, managers, workflow processors, etc. by configuration metadata located in the Knowledge Base (in the current version, the KB is represented by References cached in the hash table). Using the small core (configuration engine and filters) and "boot knowledge base" allows attaching the application specific components on the Bus statically or dynamically in a fully transparent fashion.
Encapsulating a configuration and message pipeline from the code to the workflow metadata and small reusable components (a function oriented) enables to orchestrate, tune and deploy a business processing administratively or on the fly. The following sequences of the picture show this scenario:
Processing a Request message via SoapService infrastructure is divided into three major parts.
In the first part, the message flows through the input pipeline stack of the SoapInputFilters
. This part is known as Pre-Processor
and its responsibility is to properly prepare the message for business action such as validation of data, etc. Subsequently, the message is entered into the Processor
to handle a business action. This part in the WSE2 infrastructure is implemented and driven by the Receive
method. The last part of the service message processing is a Post-Processor
where message is prepared by SoapOutputFilters
to return back to the caller or forward to the NextTo
EndpointReference
. Note that this feature is built-in the ServiceBus
.
The following picture shows the message flows through the ServiceBus
Processor:
Note: Bypassing a message via the Action Processor (NOP Business operation), the business workflow can be done in the Pre and Post- Processors driven by pluggable SoapFilter
s. This feature is built-in the Processor
object and it can be used as a base class of the business service by overriding its Receive
method.
As the above picture shows, there can be many variable objects driven by resources in the message processing such as EndpointReferences
, ReferenceProperties
, database connection, business oriented resources, etc. Encapsulating this data from the code and organizing them into the logical metadata of the connectivity and message processing, the above picture can be redrawn as the following:
Now, the Processor
represents a generic business processor and Metadata
is a Knowledge Base of the application specific process unit. Needless to say, behind the Processor
is a library of reusable function (Action, filters, etc.) oriented components.
Next step is to chain Processors as it's shown in the following picture. The NextTo
message is forwarded to the next Processor and so on, until the last unit of the business process. So, the business process has been encapsulated into the small reusable business oriented units (Processors) located anywhere on the Enterprise Network. Each Processor has the knowledge only where to forward message to. This scenario is knows as a Business workflow.
Finally, we can redraw the above picture to the following state where all Processors are under the Business Workflow code and its Knowledge Base is located on the ServiceBus
as an abstract definition of the Logical Connectivity.
Of course, we can continue to the next level and concentrate all Business Workflows to one block known as Connected Systems
and so on.
Like other config driven .NET Objects, the host process (or machine) config is the primary entry place to hold a configuration process flow. The ServiceBus
config boilerplate layout has a "hard coded" structure shown in the following code snippet:
<configuration>
<configSections>
<section name="rk.ConnectedSystem"
type="RKiss.ServiceBus.Configuration.ServiceBusConfigurationHandler,
RKiss.ServiceBus, Version=1.0.0.0, Culture=neutral,
PublicKeyToken=8f31f8b1a3da056f" />
</configSections>
<rk.ConnectedSystem >
<ConfigurationHandlers>
<add name="References"
type="RKiss.ServiceBus.Configuration.ReferencesConfiguration,
RKiss.ServiceBus" />
</ConfigurationHandlers>
<References>
</References>
<ServiceBus name="main">
<-- components such as services (managers), clients, transports, etc. -->
</ServiceBus>
</rk.ConnectedSystem>
</configuration>
The entry key is represented by rk.ConnectedSystem
element where a config processor will start to collect config metadata. There are three child elements:
ConfigurationHandlers
- collection of handlers to load a specific element (e.g., References, Services, Clients, etc.) References
- collection of templates (e.g., Service, Client, SendMessage, etc.) ServiceBus
- collection of ServiceBus
components (Services, Clients, Workflows, Eventing, etc.)
The component can be attached to the ServiceBus
programmatically or administratively; plugged-in component section into the ServiceBus
element, and handler into the ConfigurationHandlers
array. The rest of the work is done by the config engine. This process is known as ServiceBus.Load
.
ServiceConfig
The ServiceConfig
object holds the config knowledge base of the Process
(SoapService
) object. It describes how the service must be configured, its behavior via message pipeline, and also forwarding a message to the NextTo
Process
. Additionally, the ServiceConfig
can hold application specific properties. The ServiceConfig
object is loaded from the config file (rksb:Services
section) using the IConfigurationSectionHandler.Create
method.
The following code snippet shows an example of the EventSink
service in the config file.
<rksb:Service name="EventSink" mode="SingleCall" enablelog="true"
type="EventSinkConsole.EventSink, EventSinkConsole" >
<wsa:EndpointReference>
<wsa:Address>soap.tcp://localhost:1234/EventSink</wsa:Address>
</wsa:EndpointReference>
<rksb:InputFilters mode="on">
<rksb:Filter name="BeginTransaction"
type="RKiss.ServiceBus.Filters.BeginTransactionInputFilter"
TxOption="RequiresNew"/>
</rksb:InputFilters>
<rksb:OutputFilters mode="on">
<rksb:Filter name="NextTo" mode="on"
type="RKiss.ServiceBus.Filters.SendMessageOutputFilter" />
<rksb:Filter name="EndTransaction" mode="on"
type="RKiss.ServiceBus.Filters.EndTransactionOutputFilter" />
</rksb:OutputFilters>
<rksb:SendMessages name="EventSink" enable="true">
<rksb:SendMessage name="LoggerTo" ref="Logger"/>
<rksb:SendMessage name="NextTo" ref="StartUp"/>
</rksb:SendMessages>
<rksb:AddInfo>
<connectionString>server=myServer; ....</connectionString>
<args arg1="12345" arg2="abcd"></args>
</rksb:AddInfo>
</rksb:Service>
The ServiceConfig
object populated with the above elements is stored in the hash table under the key EventSink
and EndpointReference
address allowing to retrieve its value (config object) on the fly faster by the service Pre-Processor. Note that the WSE2 Messaging didn't allow to pass any additional parameter to add a SoapService's type into the SoapReceivers
, so the only way to configure the SingleCall service is in the pre-processing phase, that's why it is necessary to derive any business service from the Processor
class.
The ServiceConfig
child elements are:
wsa:EndpointReference
- receiver address. rksb:InputFilters
- array of the SoapInputFilter
derived objects to perform a message pre-processing in their order. rksb:OutputFilters
- array of the SoapOutputFilter
derived objects to perform a message post-processing in their order. rksb:SendMessages
- array (storage) of the SendMessage
objects used in this service. The above example embedded two SendMessage
objects with their references (templates). rksb:AddInfo
- additional application specific info related to this service.
Note that using a SOAPMSMQ transport to forward the configured NextTo
message is done in a transactional manner with other resources in the message pipeline.
SendMessage
The SendMessage
object holds the config knowledge base of the sending message, destination address, body, etc. The Config KB can hold many pre-loaded SendMessage
templates useful for the message processing such as BootMessage
, NextTo
, EventTo
, Notification
, etc.
The following code snippet shows an example of the BootMessage
:
<rksb:SendMessage name="BootMessage" reqrsp="false" >
<wsa:EndpointReference>
<wsa:Address>soap.tcp://localhost:1234/Bootstrapper</wsa:Address>
</wsa:EndpointReference>
<wsa:Action>BootstrapMessage</wsa:Action>
<rksb:MessageBody name="BootMessageBody">
<rksb:OpenServices>
<rksb:Service name="EventSink" mode="SingleCall" />
</rksb:OpenServices>
</rksb:MessageBody>
</rksb:SendMessage>
The SendMessage
child elements are:
wsa:EndpointReference
- receiver address wsa:Action
- name of the service Action rksb:MessageBody
- body of the message, for instance: a request for open services on the ServiceBus
.
The message can be sent in the RequestResponse
fashion when its attribute reqrsp="true"
.
Boot process
Activation of the ServiceBus
is performed by built-in Bootstrapper service configured in the host process config file. The application calling the ServiceBus.Boot()
method starts this process. Location of the boot method should be in the place of the host start process. Note that the ServiceBus
has a built-in IIS hosting mechanism in a loosely coupled manner (modifying the web.config file).
The following steps are performed when the ServiceBus
has been booted:
- Loading the References to the cache (creating the local Knowledge Base)
- Loading
ServiceBus
components based on their configuration handlers - Attaching built-in services to the
ServiceBus
- Opening (registering) a
ServiceBusManager
service - Sending
BootMessage
to customize the ServiceBus
based on the application needs.
When ServiceBus
has been booted, it is ready to process its configuration.
ServiceBus
requires installation of the Microsoft WSE 2/SP2 and RKiss.ServiceBus assemblies into the GAC.
The Bus can be booted by simple configuration object like it is shown in the following code snippet. This example doesn't have any application specific component in the configuration layout.
="1.0"="utf-8"
<configuration>
<configSections>
<section name="rk.ConnectedSystem"
type="RKiss.ServiceBus.Configuration.ServiceBusConfigurationHandler,
RKiss.ServiceBus, Version=1.0.0.0, Culture=neutral,
PublicKeyToken=8f31f8b1a3da056f"/>
</configSections>
<rk.ConnectedSystem >
<ConfigurationHandlers>
<add name="References"
type="RKiss.ServiceBus.Configuration.ReferencesConfiguration,
RKiss.ServiceBus"/>
<add name="Bootstrapper"
type="RKiss.ServiceBus.Configuration.BootstrapperConfig,
RKiss.ServiceBus"/>
<add name="ServiceBusManager"
type="RKiss.ServiceBus.Configuration.ServiceBusManagerConfig,
RKiss.ServiceBus" />
<add name="Services"
type="RKiss.ServiceBus.Configuration.ServicesConfiguration,
RKiss.ServiceBus" />
</ConfigurationHandlers>
<References mode="on"
xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/03/addressing"
xmlns:rksb="http://www.rkiss.net/schemas/sb/2004/08/servicebus">
<rksb:SendMessage name="BootstrapMessage" >
<wsa:EndpointReference>
<wsa:Address>soap.tcp://localhost:911/Bootstrapper</wsa:Address>
</wsa:EndpointReference>
<wsa:Action>BootstrapMessage</wsa:Action>
<rksb:MessageBody name="BootMessageBody">
<rksb:OpenServices>
<rksb:Service name="EventSink" mode="SingleCall" />
</rksb:OpenServices>
</rksb:MessageBody>
</rksb:SendMessage>
<rksb:SendMessage name="Logger">
<wsa:EndpointReference>
<wsa:Address>soap.tcp://localhost:8888/WinFormSink</wsa:Address>
</wsa:EndpointReference>
<wsa:Action>WriteLogMessage</wsa:Action>
<rksb:MessageBody />
</rksb:SendMessage>
<rksb:SendMessage name="StartUp">
<wsa:EndpointReference>
<wsa:Address>soap.tcp://localhost:911/ServiceBusManager</wsa:Address>
</wsa:EndpointReference>
<wsa:Action>Notify</wsa:Action>
<rksb:MessageBody>
<text>This is a test message.</text>
</rksb:MessageBody>
</rksb:SendMessage>
</References>
<ServiceBus name="main"
xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/03/addressing"
xmlns:rksb="http://www.rkiss.net/schemas/sb/2004/08/servicebus">
<rksb:Bootstrapper mode="on" enablelog="false">
<wsa:EndpointReference>
<wsa:Address>soap.tcp://localhost:911/Bootstrapper</wsa:Address>
</wsa:EndpointReference>
<rksb:InputFilters mode="on">
<rksb:Filter name="OpenServices"
type="RKiss.ServiceBus.Filters.OpenServicesInputFilter"/>
</rksb:InputFilters>
<rksb:OutputFilters mode="on">
<rksb:Filter name="NextTo" mode="off"
type="RKiss.ServiceBus.Filters.SendMessageOutputFilter"/>
</rksb:OutputFilters>
<rksb:SendMessages name="Bootstrapper" enable="true">
<rksb:SendMessage name="BootMessage" ref="BootstrapMessage" />
<rksb:SendMessage name="LoggerTo" ref="Logger"/>
<rksb:SendMessage name="NextTo" ref="StartUp"/>
</rksb:SendMessages>
<rksb:AddInfo/>
</rksb:Bootstrapper>
<rksb:ServiceBusManager mode="on" enablelog="false">
<wsa:EndpointReference>
<wsa:Address>soap.tcp://localhost:911/ServiceBusManager</wsa:Address>
</wsa:EndpointReference>
<rksb:InputFilters mode="on">
<rksb:Filter name="Echo" mode="on"
type="RKiss.ServiceBus.Filters.CustomerInputFilter"/>
</rksb:InputFilters>
<rksb:OutputFilters/>
<rksb:SendMessages name="ServiceBusManager" enable="true">
<rksb:SendMessage name="LoggerTo" ref="Logger"/>
</rksb:SendMessages>
<rksb:AddInfo/>
</rksb:ServiceBusManager>
<rksb:Services
xmlns:rksb="http://www.rkiss.net/schemas/sb/2004/08/servicebus"
xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/03/addressing">
</rksb:Services>
</ServiceBus>
</rk.ConnectedSystem>
</configuration>
The above "boot config template" will create a simple ServiceBus
knowledge base:
- References of the three
SendMessage
elements such as BootMessage
, Logger
and StartUp
. - Bootstrapper built-in service.
- ServiceBusManager built-in service.
Attaching an application specific component, for instance, EventSink service to the Bus, can be accomplished by one of the following ways:
- Administratively - inserting the following service configuration element into the Services (see the bolded comment in the above code snippet).
<rksb:Service name="EventSink" mode="SingleCall" enablelog="false" >
<wsa:EndpointReference>
<wsa:Address>soap.tcp://localhost:911/EventSink</wsa:Address>
</wsa:EndpointReference>
<rksb:InputFilters mode="on">
<rksb:Filter name="BeginTransaction"
type="RKiss.ServiceBus.Filters.BeginTransactionInputFilter"/>
</rksb:InputFilters>
<rksb:OutputFilters mode="on">
<rksb:Filter name="EndTransaction"
type="RKiss.ServiceBus.Filters.EndTransactionOutputFilter"/>
</rksb:OutputFilters>
<rksb:SendMessages name="EventSink" enable="true">
<rksb:SendMessage name="LoggerTo" ref="Logger"/>
</rksb:SendMessages>
<rksb:AddInfo>
<connectionString>server=myServer; ....</connectionString>
<args arg1="12345" arg2="abcd"></args>
</rksb:AddInfo>
</rksb:Service>
- Programmatically - sending proper messages to the
ServiceBusManager
(LoadServices
and OpenServices
).
<rksb:SendMessage name="LoadEventSink" reqrsp="true">
<wsa:EndpointReference>
<wsa:Address>soap.tcp://localhost:911/ServiceBusManager</wsa:Address>
</wsa:EndpointReference>
<wsa:Action>LoadServices</wsa:Action>
<rksb:MessageBody>
<rksb:LoadServices>
<rksb:Service name="EventSink" mode="SingleCall" enablelog="true" >
</rksb:Service>
</rksb:LoadServices>
</rksb:MessageBody>
</rksb:SendMessage>
<rksb:SendMessage name="OpenEventSink" reqrsp="true">
<wsa:EndpointReference>
<wsa:Address>soap.tcp://localhost:911/ServiceBusManager</wsa:Address>
</wsa:EndpointReference>
<wsa:Action>OpenServices</wsa:Action>
<rksb:MessageBody>
<>
<rksb:Service name="EventSink"/>
</rksb:OpenServices>
</rksb:MessageBody>
</rksb:SendMessage>rksb:OpenServices
The following code snippet is an example of sending a message request to the ServiceBus
:
XmlElement loadmsgXmlElement =
ServiceBus.References.GetXmlElement(ServiceBusDefault.SendMessage,
"LoadEventSink");
ServiceBus.Request(loadmsgXmlElement);
XmlElement openmsgXmlElement =
ServiceBus.References.GetXmlElement(ServiceBusDefault.SendMessage,
"OpenEventSink");
ServiceBus.Request(openmsgXmlElement);
Of course, it assumes that the caller's knowledge base holds these messages, otherwise the message has to be created programmatically.
As I mentioned earlier, the ServiceBus
has a built-in simple manager (ServiceBusManager
) to handle some common request/responses on the Bus. In the case of the application specific requirements with more complexity, the special service can be built in and attached to the Bus during the Boot process. For instance: the Loader service for loading and opening services in one request.
In most cases, the first above approach is a preferred way to configure the components on the ServiceBus
. The second way can be useful in dynamically distributed applications, where the "travel message - stateful object" can hold this connectivity knowledge base in advance (for instance, the NextTo
message) and the target pre-processor filters will perform the Loading, Opening, Closing, etc. Services.
Attaching other components to the Bus is created in the same way as it has been described for Services. The following code snippet shows an example of the Clients
configuration for the EventSink service. Like the Services, the Clients
element needs to add its handler to the ConfigurationHandlers
array - see Config_file.
<rksb:Clients xmlns:rksb="http://www.rkiss.net/schemas/sb/2004/08/servicebus"
xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/03/addressing">
<rksb:Client name="EventSinkClient" mode="on" enablelog="false"
type="EventSinkConsole.EventSinkClient, EventSinkConsole">
<wsa:EndpointReference
xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/03/addressing">
<wsa:Address>soap.tcp://localhost:911/EventSink</wsa:Address>
</wsa:EndpointReference>
<rksb:InputFilters mode="on">
<rksb:Filter name="Echo" mode="on"
type="RKiss.ServiceBus.Filters.CustomerInputFilter" />
</rksb:InputFilters>
<rksb:OutputFilters mode="on">
<rksb:Filter name="Test" mode="on"
type="RKiss.ServiceBus.Filters.CustomerOutputFilter"/>
</rksb:OutputFilters>
<rksb:SendMessages name="EventSinkClient" enable="true">
<rksb:SendMessage name="LoggerTo" ref="Logger"/>
</rksb:SendMessages>
</rksb:Client>
</rksb:Clients>
Connected Systems
Systems can be easily connected via ServiceBus
and Connectivity Knowledge Base. Consuming Services on the Bus are fully transparent and encapsulated from the business layer. Based on the earlier description, for example: the EventSink, it can be consumed by the business layer as it is shown in the following code snippet:
string strSinkName = "EventSinkClient";
EventSinkClient client = ServiceBus.Clients[strSinkName] as EventSinkClient;
XmlElement notifyXmlElement =
ServiceBus.References.GetXmlElement(ServiceBusDefault.SendMessage,
"NotifyMessage");
SendMessageBase smb = new SendMessageBase(notifyXmlElement);
smb.MessageBody.Add(myBusinessObject);
client.SendOneWay("Notify", ServiceBus.CreateSoapEnvelope(smb));
As you can see, the business layer handles the consumption of a service without any connectivity knowledge. Based on the business rules (name of the sink and request message), the conversation is processed in a loosely coupled manner. Additionally, the metadata of the request message can be assembled and controlled by the workflow knowledge base, statically or dynamically, on the fly.
Note, that this scenario (code + knowledge base) enables a straightforward migration underplaying level (ServiceBus
) to the next communication model such as Indigo without "touching the code".
Logging Messages
ServiceBus
has built-in a publisher to send (in fire&forget fashion) a log message captured in the client or service processor. The log message is added to the referenced SendMessage
template including an info about its source such as location, timestamp, etc. The following code snippet shows an example of capturing messages in the filter:
try
{
object state = envelope.Context[ServiceBusDefault.StateKey];
...
if(Parent != null && Parent.EnableLog)
{
Parent.Log(Severity.Checkpoint, Category.OutputFilter, FullName, 1301,
"...Message...", envelope, state, obj1, objX);
}
}
catch(Exception ex)
{
if(Parent != null && Parent.EnableLog)
{
Parent.Log(Severity.Error, Category.OutputFilter, FullName, 1302,
ex.Message, envelope, state, "more objects here");
}
}
Subscriber for LogMessages
can be created like another configurable service. For test purpose, I built one in the WindowForm. Note that the Log Publisher can be controlled dynamically on the fly (sending a properly message) or administratively in the config file.
Using the DTC Transaction
When business workflow requires handling the pipeline message processing through more than one resource in a transactional manner, the DTC support can be injected in the appropriate position. Typical example is updating a workflow state, business data and send message via a soap.msmq transport. The ServiceBus
has built-in a pre/post processor filter for creating a transaction context in the specified pipeline segment. Any access to the resources such as database, MSMQ, etc. will enlist transaction in the 2p manner. Early abort of the message processing will properly force to end the transaction in the message pipeline.
The following picture shows an example of the manager and its worker pool to perform some "batch workflow". This is a typical example how to increase a workflow concurrency (instead of calling a request one by one, the manager preparing a batch of the requests for its workers).
The business workflow is divided into two workflows. Manager handles the first workflow with responsibility to create a job message for second workflow driven by Worker processor. Manager will create N number of stateless requests in a transactional manner based on the workflow status. When a batch request is completed, the NextTo
request is sent to the next workflow processor, otherwise the Job Request is generated. Note that all JobRequest
messages will be sent to the destination queue via soap.msmq transport after the manager's post-processor performs the commit transaction. Therefore, the position of the SendMessage
in post-processor transactional segment is not critical and it can be anywhere where the sent data is ready.
Processing the JobRequest
message is created similarly to manager. The worker will perform business logic and then updates the worker status. Based on the worker status, finally, the EventTo
message can be sent to the manager to identify that all workers are done and waiting for next jobs. Aborting workflow process can be accomplished changing a workflow status, for instance by the supervisor or watchdog service (timeout).
The configuration of the Manager service may look like as shown in the following code snippet:
<rksb:Service name="BatchManager" mode="SingleCall"
type="WorkfowTest.BatchManager, Test">
<wsa:EndpointReference>
<wsa:Address>soap.tcp://localhost:1234/BatchManager</wsa:Address>
</wsa:EndpointReference>
<rksb:InputFilters mode="on">
<rksb:Filter name="BeginTransaction"
type="RKiss.ServiceBus.Filters.BeginTransactionInputFilter"/>
<rksb:Filter name="GetStatus"
sp="getstatus" type="WorkfowTest.SqlInputFilter, Test"/
</rksb:InputFilters>
<rksb:OutputFilters mode="on">
<rksb:Filter name="JobRequest" addenvelope="true"
type="RKiss.ServiceBus.Filters.SendMessageOutputFilter"/>
<rksb:Filter name="UpdateStatus" sp="updatestatus"
type="WorkfowTest.SqlOutputFilter, Test"/
<rksb:Filter name="NextTo" mode="on"
type="RKiss.ServiceBus.Filters.SendMessageOutputFilter" />
<rksb:Filter name="EndTransaction"
type="RKiss.ServiceBus.Filters.EndTransactionOutputFilter" />
</rksb:OutputFilters>
<rksb:SendMessages name="BatchManager" enable="true">
<rksb:SendMessage name="LoggerTo" ref="Logger" enable="true"/>
<rksb:SendMessage name="NextTo" ref="NextTo" enable="true"/>
<rksb:SendMessage name="JobRequest" enable="true">
<wsa:EndpointReference>
<wsa:Address>soap.msmq://./private$/WorkerQueue</wsa:Address>
</wsa:EndpointReference>
<wsa:Action>DoIt</wsa:Action>
<rksb:MessageBody />
</rksb:SendMessage>
</rksb:SendMessages>
<rksb:AddInfo>
<connectionString>server=myServer; ....</connectionString>
</rksb:AddInfo>
</rksb:Service>
Implementation of the ServiceBus
is divided into six namespaces based on their functionalities. I documented their classes and methods using the NDoc help program - see the following picture:
IConfigurationSectionHandler and IXmlElement interfaces
From the business layer point of view, the ServiceBus
represents a connectivity abstract layer to the WSE2 Messaging infrastructure based on the SoapEnvelope
object. The business logic (code behind the service and client) works in an object-oriented fashion, fully transparently to the XmlElement
driven underlying messaging. For this pattern, the following common contract interface has been implemented in the base classes that can be loaded and saved as XmlElelent
objects, for instance:
public class BaseClass : IXmlElement, IConfigurationSectionHandler
{
public BaseClass(){}
public BaseClass(BaseClass baseclass) { Copy(baseclass); }
public BaseClass(XmlElement element) { LoadXml(element); }
public object Create(object parent, object configContext, XmlNode section)
{
...
}
public virtual XmlElement GetXml(XmlDocument document)
{
...
}
public virtual void LoadXml(XmlElement element)
{
...
}
}
The derived object from the above base class can be used at the boundary level to export or import objects between two different layers such as object class and XmlElement
s. Thanks to Microsoft.Web.Services2.Xml
namespace; we get a simplified ServiceBus
implementation. Note that this factory pattern is very common to export/import object to/from XML in loosely coupled connected systems (it will be nice to have this feature in the CLR).
Processor
Processor
class represents the major class of the service. It's derived from the SoapService
and it has built-in functionalities requested by the ServiceBus
features such as configuration on the fly, closing transaction context, pre-processing and post-processing, handler for OnDispatchFailed
, Open/Close service, etc. The following code snippet shows its post-processor:
private void PostProcessOutputMessage(SoapEnvelope envelope)
{
if(envelope == null)
throw new ArgumentNullException("envelope");
try
{
if(!Pipeline.IsIntermediary)
{
envelope.Envelope.SetAttribute("xmlns:wsa", WSAddressing.NamespaceURI);
envelope.Context.Addressing.GetXml(envelope);
}
for(int ii = this.Pipeline.OutputFilters.Count - 1; ii >= 0; ii--)
this.Pipeline.OutputFilters[ii].ProcessMessage(envelope);
}
catch(Exception ex)
{
#region EndTransaction
if(Config != null && ContextUtil.IsInTransaction)
{
lock(Config)
{
if(Config.Mode == ConfigMode.SingleCall)
{
ContextUtil.SetAbort();
ServiceDomain.Leave();
}
}
}
#endregion
if(Config.EnableLog)
{
object state = envelope.Context[ServiceBusDefault.StateKey];
Config.Log(Severity.Error, Category.PostProcessor,
Config.Name,1002,ex.Message,envelope,state);
}
}
}
Service boilerplate
Making a configurable Service and attaching it to the ServiceBus
requires deriving a business service from the Processor
base class and implementing its Config constructor. Note that the Singleton service mode is handled on the fly in the pre-processor section.
[SoapService("MyNamespace")]
public class MyService : Processor
{
public MyService() {}
public MyService(ServiceConfig config) : base(config) {}
}
Filter boilerplate
The filter represents an encapsulation of the message processing into the reusable functional oriented object. The filter runs under the incoming resp. outgoing SoapEnvelope.Context
which can be properly modified. SoapEnvelope
message is processed through the filter in the order of the Pipeline stack. The Pipeline can be modified on the fly by adding (or removing) filters. The ServiceBus
uses the Filters as a part of the business workflow functionality such as open service, storing data, sending message, etc.
The following code snippet shows a boilerplate for Custom Filter (Input):
public class MyInputFilter : SoapInputFilter
{
#region Properties
private Configuration.FilterConfig _config;
private ConfigBase _parent;
private string _fullname = string.Empty;
public Configuration.FilterConfig Config
{ get { return _config; } set { _config = value; }}
public ConfigBase Parent
{ get { return _parent; } set { _parent = value; }}
public string FullName { get { return _fullname;}}
#endregion
#region constructors
public MyInputFilter() :base() {}
public MyInputFilter(ConfigBase parent, FilterConfig config) :base()
{
_config = config;
_parent = parent;
_fullname = string.Format("{0}.{1}", Parent.Name, Config.Name);
}
#endregion
#region Override the ProcessMessage method.
public override void ProcessMessage(SoapEnvelope envelope)
{
try
{
if(Parent != null && Parent.EnableLog)
{
Parent.Log(Severity.Checkpoint, Category.InputFilter,
FullName, 1101,"Done",envelope,state);
}
}
catch(Exception ex)
{
if(ContextUtil.IsInTransaction)
ContextUtil.SetAbort();
if(Parent != null && Parent.EnableLog)
Parent.Log(Severity.Error, Category.InputFilter,
FullName, 1102, ex.Message, envelope);
throw;
}
}
#endregion
}
Hosting ServiceBus by IIS.
ServiceBus
has a built-in support for being hosted by IIS process without coding requirements. Basically, there are two plug-in places in the IIS server which have to be handled - HttpModule
and HttpHandler
. Registering the ServiceBus
(plug-in) to the IIS server is done administratively in the web.config file as it is shown in the following picture:
<system.web>
<httpModules>
<add name="ServiceBus"
type="RKiss.ServiceBus.ServiceBusHostedByIIS, RKiss.ServiceBus"/>
</httpModules>
<httpHandlers>
<add verb="*" path="*.ashx"
type="RKiss.ServiceBus.SoapRequestDispatcher, RKiss.ServiceBus"/>
</httpHandlers>
</system.web>
The first handler in the HttpModule
has a responsibility to boot the ServiceBus
on the start-up process and close it at the exit, see the following code snippet:
public class ServiceBusHostedByIIS : IHttpModule
{
public virtual void Init(HttpApplication application)
{
try
{
ServiceBus.Boot();
Trace.WriteLine("***** ServiceBus is ready *****");
}
catch(Exception ex)
{
Trace.WriteLine("Failed to initialize a ServiceBus, error = "
+ ex.Message);
throw ex;
}
}
public virtual void Dispose()
{
try
{
ServiceBus.Close();
Trace.WriteLine("***** ServiceBus has been destroyed *****");
}
catch(Exception ex)
{
Trace.WriteLine("Failed to dispose a ServiceBus, error = " +
ex.Message);
throw ex;
}
}
}
The next handler is a custom HTTP handler to forward a HTTP request to the SoapReceiver
in a synchronous manner. The SoapRequestDispatcher
implements a contract ProcessMessage
method where a httpContext.Request.Url
string is parsed for the extension .ashx and service name. Based on the service name (unique key on the ServiceBus.Services
), the handler will create an instance of the configured service - see the following code snippet:
public virtual void ProcessRequest(HttpContext httpContext)
{
...
if(config != null && config.Mode ==
ConfigMode.SingleCall && config.Type != null)
{
SoapReceiver receiver = null;
if(config.Type.IsSubclassOf(typeof(Processor)))
receiver = Activator.CreateInstance(config.Type,
new object[]{ config}) as SoapReceiver;
else
throw new ConfigurationException("*** The service" +
" is not derived from 'Processor'");
receiver.ProcessRequest(httpContext);
}
else if(config != null && config.Mode ==
ConfigMode.Singleton && config.Epr != null)
{
SoapReceiver receiver =
SoapReceivers.Receiver(config.Epr) as SoapReceiver;
receiver.ProcessRequest(httpContext);
}
else
{
string strErrMsg =
string.Format("Can't dispatch this SoapRequest to the {0}",
httpContext.Request.Url.AbsoluteUri);
throw new ConfigurationException(strErrMsg);
}
...
}
Consuming the service hosted by IIS service requires a correctly specified EndpointReference
's address of the service. For instance:
http://localhost/MyApplication/EventSink.ashx
where MyApplication is the name of the virtual directory and EventSink is the name of the service on the ServiceBus
(located in the KB). Note that the assemblies have to be located under \bin folder and the process web.config file in the MyApplication (the same ASP.NET layout). Note that the ServiceBus.dll assembly should be installed in the GAC or in the MyApplication\bin directory.
The ServiceBus
has built-in a message logging feature, where its Publisher can sent a LogMessage
to the Subscriber service sink. For test purposes, my solution includes a simple ServiceBus Monitor that sees a message processing in the Pipeline. See the following picture:
The ServiceBus Monitor features:
- Open/Close a Local
ServiceBus
- Dump the host process config file
- Get All Receivers
- Get/Remove specified reference from the local Knowledge Base
- Get All references from the local
ServiceBus
SendMessage
using the Request
call SendMessage
using the RequestResponse
call - Modifying
Epr
address in the SendMessage
object (this feature allowing to update the Epr
in the SendMessage
template) - Displaying incoming
LogMessage
s in the tree object (Logger)
The LogMessage
in the Logger tree is colored based on its source such as Pre-Processor, InputFilter, OutputFiler, Error, etc. The first LogMessage
in the Pipeline has yellow background to distinguish messages between the services. The messages are stored in the tree in the order of receiving the message, but the message timestamp represents a publisher time.
To receive a LogMessage
by WinForm, it's necessary to wire the Subscriber sink with Form. The following code snippet shows how this can be done:
public class Form1 : : System.Windows.Forms.Form
{
public Form1()
{
...
RKiss.ServiceBus.ServiceBus.Load("main");
WinFormSink sink = new
WinFormSink(ServiceBus.References.GetXmlElement("Service",
"WinFormSink"));
sink.Config.Wire += new DelegateProcessMessage(OnLogger);
sink.Open();
RKiss.ServiceBus.ServiceBus.Boot("main");
...
}
...
public void OnLogger(SoapEnvelope envelope)
{
XmlElement element = Helper.SelectXmlElement(envelope.Body,
ServiceBusDefault.LogMessage);
...
}
public class WinFormSink : Processor
{
public WinFormSink(XmlElement configXmlElement) : base(configXmlElement) {}
public override void ProcessMessage(SoapEnvelope message)
{
if(Config.Wire != null && message != null)
{
DelegateProcessMessage dpm = new DelegateProcessMessage(OnWire);
dpm.BeginInvoke(message, null, null);
}
else
Trace.WriteLine("WinFormSink.Processor The wiring or message is null");
}
public void OnWire(SoapEnvelope message)
{
Config.Wire(message);
}
}}
and the host process config file holds the reference of the WinFormSink
:
<References>
<rksb:Service name="WinFormSink" >
<wsa:EndpointReference>
<wsa:Address>soap.tcp://localhost:8888/WinFormSink</wsa:Address>
</wsa:EndpointReference>
</rksb:Service>
</References>t;
OK, lets test it.
The following picture shows our test components connected via ServiceBus
:
Simple test sequence:
- Launch the included simple service hosted by
EventSinkConsole
- Launch the ServiceBus Monitor
- Click Logger and clear screen (right mouse button)
- Clink SendRequestResponse button. This action will perform sending a
rksb:SendMessage.Test
template message. - Expand the LogMessages in the Logger and analyze them.
- Select another template, for instance:
rksb:SendMessage.GetManagers
and click Get - Send this message by
SenRequestResponse
- Check the Logger
- Clear Logger
- Uncheck Epr
- Change the Epr: soap.tcp://localhost:911/ServiceBusManager
- Send this message again (destination is the
ServiceBusManager
hosted by EventSinkConsole
) - Select another template and evaluate its message processing in the Logger.r.
- You can modify or add any template in the config file and check its behavior without any programming.
In this pre-release version, I didn't include some services such as WS-Eventing and WorkflowProcessor (they are not part of the core). I am going to describe how WS-Eventing can be incorporated with ServiceBus
. Thanks to Plumbwork Orange, WS-Eventing classes helped me build my services (Managers) for WS-Eventing on the Bus. My plan is to publish this Bus extension as a separate article in the upcoming months.
WS-Eventing
The WS-Eventing plays very significant role in the asynchronous (event driven) application. The following is a typical example of the Order process. The Order
submits a request for order with a specific business data and notification info such as SubscriptionId
, ReplyTo
, etc. The caller will immediately receive a response with the OrderId
. When AsyncProcessor
completes the job, it will send an event to the Publisher to perform a notification based on the SubscriptionId
and topic of the event.
The following picture shows this scenario on the ServiceBus
:
The concept of creating and attaching the WS-Eventing to the ServiceBus
is the same like another component on the Bus. It requires to create a separate component section and its handler. The following code snippet shows an example of the rksb:Eventing
:
<rksb:Eventing
xmlns:wse="http://schemas.xmlsoap.org/ws/2004/08/eventing/"
xmlns:rksb="http://www.rkiss.net/schemas/sb/2004/08/servicebus"
xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/03/addressing">
<Publisher ref="Publisher" />
<Subscriber ref="Subscriber" />
<SubscriptionStore ref="SubscriptionStore" />
<Subscriptions mode="on">
<wse:Subscribe name="ServiceBusSubscription" mode="on"
xmlns:wse="http://schemas.xmlsoap.org/ws/2004/08/eventing</a />"
xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/03/addressing"
xmlns:rksb="http://www.rkiss.net/schemas/sb/2004/08/servicebus" >
<wse:EndTo>
<wsa:Address>soap.tcp://localhost/StoreChangesSink</wsa:Address>
</wse:EndTo>
<wse:Delivery>
<wse:NotifyTo>
<wsa:Address>soap.tcp://localhost/StoreChangesSink</wsa:Address>
<wsa:ReferenceProperties>
<rksb:EventAction>Notify</rksb:EventAction>
<rksb:EventTopics>Store\Changes\* Store\Errors\*</rksb:EventTopics>
<rksb:EventSource>SubscriptionStore</rksb:EventSource>
<wse:Identifier>uuid:00000000-000000-0000-111111111111</wse:Identifier>
</wsa:ReferenceProperties>
</wse:NotifyTo>
</wse:Delivery>
<wse:Expires>2004-09-09T22:25:40Z</wse:Expires>
</wse:Subscribe>
<wse:Subscribe name="MySubscription" mode="on" ref="SubscriptioTemplate" />
</Subscriptions>
</rksb:Eventing>
During the boot time, the config engine is walking through the above section and subscribing all configured subscriptions using the referenced Subscriber
and SubscriptionStore
. As you can see, the Subscriber
element represents a WS-Eventing Subscribe Request. Note that the wse:Subscribe
attributes (name
, mode
, ref
) are only used for internal purposes of the ServiceBus
and they are not sent to the SubscriptionManager
.
The purpose of the rksb:Eventing
config section is to automatically subscribe well known subscriptions required by the internal usage in the ServiceBus
or consumers with a long term leasing contract. For this feature, the wse:Subscribe
is asking to use its unique wse:Identifier
instead of obtaining it by the SubscriptionManager
. Note that this is a custom feature of the WS-Eventing and it will be nice to have it in the spec (soap.Header
).
Based on the known Subscription Identifier, the SubscriptionStore
will accept and automatically renew subscription, if the subscription already exists. The usage of this feature is for instance: the SubscriptionStore
can sent notifications when any changes happen such as new subscription in the store, renew subscriptions, etc. based on the configured Subscribing.
Using WS-Eventing on the ServiceBus
requires building and attaching the following managers (derived form the Processor
class) :):
<Service name="SubscriptionManager" mode="SingleCall"
type="RKiss.SB.Eventing.SubscriptionManager, RKiss.SBEventing" >
<wsa:EndpointReference xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/03/addressing">
<wsa:Address>soap.tcp://localhost:8888/SubscriptionManager</wsa:Address>
</wsa:EndpointReference>
<rksb:SendMessages name="SubscriptionManager" enable="true">
<rksb:SendMessage name="LoggerTo" ref="Logger" enable="true"/>
</rksb:SendMessages>
<rksb:AddInfo>
<SubscriptionStore ref="MemorySubscriptionStore"/>
</rksb:AddInfo>
</Service>
<Service name="PublisherManager" mode="SingleCall"
type="RKiss.SB.Eventing.PublisherManager, RKiss.SBEventing" >
<wsa:EndpointReference xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/03/addressing">
<wsa:Address>soap.tcp://localhost:8888/PublisherManager</wsa:Address>
</wsa:EndpointReference>
<rksb:SendMessages name="PublisherManager" enable="true">
<rksb:SendMessage name="LoggerTo" ref="Logger" enable="true"/>
</rksb:SendMessages>
<rksb:AddInfo>
<SubscriptionStore ref="MemorySubscriptionStore"/>
</rksb:AddInfo>
</Service>
<Service name="MemorySubscriptionStore" mode="Singleton"
type="RKiss.SB.Eventing.MemorySubscriptionStore, RKiss.SBEventing">
<wsa:EndpointReference xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/03/addressing">
<wsa:Address>soap.tcp://localhost:8888/SubscriptionStore</wsa:Address>
</wsa:EndpointReference>
<rksb:SendMessages name="SubscriptionStore" enable="true">
<rksb:SendMessage name="LoggerTo" ref="Logger" enable="true"/>
</rksb:SendMessages>
<rksb:AddInfo>
<Overwrite>true</Overwrite>
</rksb:AddInfo>
</Service>t;
and the clients (derived form the Output
class):
<Client name="Publisher" type="RKiss.SB.Eventing.Publisher, RKiss.SBEventing" >
<wsa:EndpointReference xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/03/addressing">
<wsa:Address>soap.tcp://localhost:8888/PublisherManager</wsa:Address>
</wsa:EndpointReference>
</Client>
<Client name="Subscriber" type="RKiss.SB.Eventing.Subscriber, RKiss.SBEventing" >
<wsa:EndpointReference xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/03/addressing">
<wsa:Address>soap.tcp://localhost:8888/SubscriptionManager</wsa:Address>
</wsa:EndpointReference>
</Client>
<Client name="SubscriptionStore"
type="RKiss.SB.Eventing.SubscriptionStore, RKiss.SBEventing" >
<wsa:EndpointReference xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/03/addressing">
<wsa:Address>soap.tcp://localhost:8888/SubscriptionStore</wsa:Address>
</wsa:EndpointReference>
</Client>
The WS-Eventing Request can be sent internally from the ServiceBus
component (using the Input rsp. Output Filter) or externally like another WSE message. SubscriptionStore
can use any resource such as memory, file system or database directly or remotely via ServiceBus
.
In this article, I've discussed the design and implementation of the encapsulation of business layer into the "code units" and its Connectivity Knowledge Base. I hope you will enjoy this pre-release version.
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.