Introduction
FloatingBridge
is a simple Messaging\ Workflow\ETL system based on MySql and .NET. MySql is used as the backend engine for the system. The front-end is designed in .NET WPF (Windows Presentation Foundation). The business logic is handled by .NET windows services.
This article gives a brief introduction to this system. The installation files and detailed documentation are provided with this article.
Background
I have worked quite a lot with Microsoft BizTalk Server. I have also worked with some of the other alternative systems. My intention was to make a simple Messaging\ Workflow\ ETL system based on a modern RDS which uses JSON data type instead of XML.
Features
The system comes with a management console which helps you to configure your Messaging\ Workflow system. The management console has the following features:
- Backend Configuration - All the required scripts to initialize the MySql database are run when the console is launched the first time.
- Service Configuration - The system comes with 3 kinds of services. They are described in the section below. However, we can configure as many instances of the services as we want. Typically, we would configure one per Application Domain.
- Messaging System Configuration - This can be configured by configuring the corresponding Message Type, Publishers, Subscribers, Publications & Subscriptions.
- Workflow Configuration - Configure a workflow which is a sequence of branched tasks with business logic defined in JSON.
- Manage Messages & Workflows - Old messages can be made redundant (orphan), waiting Workflows can be re-started.
- History - History of messages & Workflows can be viewed.
- Transformations - Transformation tasks which transform JSON can be created using JUST - https://www.codeproject.com/Articles/1187172/JUST-JSON-Under-Simple-Transformation.
- Custom Tasks - The system comes with some well known custom tasks.
- API - The system comes with an API which can be used by external systems to perform operations on the system like Publish messages, Pull Subscribed messages, Publish Workflows, Run synchronous workflows & Re-start waiting workflows. The Core API can be used to implement your own custom tasks.
System Architecture
The diagram below illustrates the different components of the system.
Database Engine
The database engine is the central component of the system. MySql database is used for the engine. The engine stores the configuration & tracking data of the system.
The following configuration data is stored:
- Message configuration - message type, its application domain
- Workflow configuration - workflow type, application domain and business logic
- Publishers which can publish messages and workflows
- Subscribers which can subscribe to messages
- Tasks which can either be part of a workflow, listener or subscription
In addition, all the historical data associated with a message and workflow is stored for tracking purposes.
Subscriber Service
This is a .NET Windows service which polls the database for subscription tasks. A subscription task simply reads a published message from the database and processes it. There are inbuilt subscription tasks provided by the system which are described in the last section.
Listener Service
This is also a .NET windows service which runs tasks associated with listeners. A listener is a task which reads data from an external source and publishes a message to the database based on the data gathered from the external source.
Workflow Service
This is the .NET Windows service which runs the various tasks associated with workflows in an order defined by the business logic of the workflow. It performs the following tasks:
- Reads workflow messages from the database and performs the next action based on the business logic and the current state.
- Runs the next synchronous task and saves the state to the engine.
- Runs the next outgoing task and waits for the incoming task.
- Retries the task based upon the retry count.
- Runs the error handler or error notification task in case of failures.
Management Console
This is an application written in WPF (Windows Presentation Foundation) that enables an end user to manage the configuration & tracking of messages and workflows.
FloatingBridge API
The FloatingBridge
API is a simple DLL which can be referenced in an external project. It is located in the installation folder as well as available as a nuget package. By referencing the API, an external entity or system can perform the following tasks:
- Publish a message
- Pull subscribed messages
- Publish a workflow
- Run a synschronous workflow
- Restart a waiting workflow
FloatingBridge Terminology
It is important to understand the terminology used in FloatingBridge
, as the management console is solely based upon it and various actions around it.
- Application Domain - A logical grouping for an application which has message types & workflows associated it. Each service runs under an application domain.
- Service - A .NET Windows service (Listener, Subscriber or Workflow) which runs under an application domain. A service requires two input parameters:
- Application Domain
- connection string to the MySql database engine
- Message Type - A type that defines a message. Publishers can publish this type of message & subscribers call pull this type of message.
- Publisher - A client has the license to publish messages to the system.
- Subscriber - A client that has the license to subscribe to messages in the system.
- Task - A function which is run by the system. A task can be publishing, subscribing or a workflow task.
- Listener - This is run by the listener service periodically to listen to messages which need to be published. Exampel, a task which reads files from a directory and publishes a message as soon as files are found based upon the content of the files.
- Publication - This configuration decides whether a publisher or a listener can publish a specific message type.
- Subscription - This configuration decides whether a Subscriber or a task can pull messages out of the system.
- Active Message - A message which has been published to the system and is waiting to be pulled.
- Orphaned Message - An active message can be set to orphan in the management console. Once set to orphan, this message cannot be pulled.
- Workflow (Workflow Type) - A sequence of steps(tasks) to execute. The steps are defined in a JSON format which is called as business logic. A workflow is associated with an application domain. The workflow service executes the steps in the workflow.
- Orphaned workflow - A workflow which is waiting either for a manual external input or an incoming task.
As mentioned above, Task is a function run by the system. There are 8 different kinds of tasks. They are simple C# classes which need to implement one of the 8 different interfaces in the FloatinBridge.Core
assembly.
Each of the 8 different tasks accomplish a different purpose:
- Publisher Task - A task which is run by the listener service. The task returns a message which is published into the system.
- Subscriber Task - A task which is run by the subscriber service. The service pulls all the subscribed messages to the task and passes them to the task.
- Workflow Task - A synchronous workflow step run by the workflow service. The service sends a message to the task and the task returns a message back to the service.
- Workflow Splitter Task - Similar to the above, the only difference being that in this case, the task returns multiple messages back to the service. This is used when we want to split the workflow into multiple workflows. When the workflow is split into multiple workflows, something called
SplitID
is assigned to each workflow. This ID determines the individual workflow once it is split off from the original workflow. - Outgoing Task - An asynchronous workflow step. The workflow goes into a waiting state after this task is executed. The workflow needs to be resumed either manually by the API or an incoming task.
- Incoming Task - An asynchronous workflow step which resumes a waiting workflow.
- Error Handler Task - This is a compensating task which is associated with a workflow step and is executed when the step fails. Once this task is executed, the workflow resumes with the output of this task.
- Error Notification Task - Similar to the above task except that the workflow does not resume after its completion. The workflow has to be resumed manually by the API.
Installation & Configuration
The installation procedure is a normal installation procedure where you click on the MSI and choose an application folder on your system. The installer just copies all the artifacts to the application folder.
The Messaging\ Workflow system can be configured using the various options on the menu and choosing the correct artifact. I am not attaching screen shots of each configuration option here in this article.
The screen-shot below shows the various options available to configure the artifacts. The right-hand panel shows the message history.
Configured Workflow
Workflow Business Logic
The flow and functionality of the workflow is defined by the business logic. The business logic is JSON object. Below is the JSON representation of the configured workflow in the above section.
{
"Start":{
"Next":[
"Branch1",
"Branch2",
"Branch3"
],
"TaskID":41,
"OnFailure":null,
"Identifier":"FirstTask",
"TaskRetries":0,
"RunCondition":null,
"TaskProperties":null,
"IncommingTaskID":0,
"IncommingTaskProperties":null
},
"Tasks":[
{
"Next":[
"Final"
],
"TaskID":23,
"OnFailure":"NotifierTask",
"Identifier":"Branch1",
"TaskRetries":5,
"RunCondition":{
"Operator":"stringequals",
"Evaluated":"Norway",
"Evaluator":"#valueof($.MessageBody.Country)"
},
"TaskProperties":{
"FileName":"D:/Test/Branch1.json"
},
"IncommingTaskID":31,
"IncommingTaskProperties":{
"FileName":"D:/Test/Branch1In.json"
}
},
{
"Next":[
"Final"
],
"TaskID":23,
"OnFailure":null,
"Identifier":"Branch3",
"TaskRetries":-1,
"RunCondition":{
"Operator":"stringequals",
"Evaluated":"Denmark",
"Evaluator":"#valueof($.MessageBody.Country)"
},
"TaskProperties":{
"FileName":"D:/Test/branch3.json"
},
"IncommingTaskID":0,
"IncommingTaskProperties":null
},
{
"Next":[
"Final"
],
"TaskID":35,
"OnFailure":null,
"Identifier":"Branch2",
"TaskRetries":-1,
"RunCondition":{
"Operator":"stringequals",
"Evaluated":"Sweden",
"Evaluator":"#valueof($.MessageBody.Country)"
},
"TaskProperties":{
"FileName":"D:/Test/Branch2.json"
},
"IncommingTaskID":0,
"IncommingTaskProperties":null
},
{
"Next":null,
"TaskID":41,
"OnFailure":null,
"Identifier":"Final",
"TaskRetries":-1,
"RunCondition":null,
"TaskProperties":null,
"IncommingTaskID":0,
"IncommingTaskProperties":null
}
],
"OnFailure":null,
"TaskRetries":0
}
The above JSON might look scary at first glance but it is configured using the management Console.
The business logic JSON contains the following properties:
Start
- The first step or starting point of the workflow. This is a Task
JSON object. Tasks
- An array of all other steps in the workflow. A JSON array containing Task
JSON objects. TaskRetries
- A global setting for the workflow stating number of times a step should be retried before failing. This is overridden by the individual step setting. OnFailure
- A global setting for the workflow stating the ErrorHandler
or ErrorNotification
task step to be executed in case of failure. This is overridden by the individual step setting.
Task
A task represents an individual step in the workflow. It has the following properties:
Identifier
- A simple string
identifying the task in the workflow. Needs to be unique for the workflow. TaskID
- This is the actual ID of the task in the database. This is set automatically when you create the task. It can be modified. OnFailure
- If defined, overrides the corresponding property of the workflow. TaskRetries
- If defined, overrides the corresponding property of the workflow. Next
- A string
array containing a list of identifiers of the task
which needs to be executed after this task
. RunCondition
- A JSON object which defines the condition under which this task shall be executed. This is used when we want to branch a workflow based on different conditions. TaskProperties
- A JSON object containing properties for the execution of the step. Example, for a JSON writer, it will contain the name of the file to write to. IncommingTaskID
- Identifier of the incoming task step if this step is a one-way outgoing step. IncommingTaskProperties
- Corresponding JSON task properties for this incoming task.
Task Properties
Task
properties is a JSON object which defined the properties for the execution of a task. Each task has its own JSON schema. Example, for a JSON writer task
, the task
properties look like this:
{
"FileName":"D:/Test/Out.json"
}
The task
properties can be made dynamic by using JUST - JSON Under Simple Transformation. https://www.codeproject.com/Articles/1187172/JUST-JSON-Under-Simple-Transformation
Example, we want to set the “FileName
”, based upon a value
inside the message
body
.
{
"FileName":"#valueof($.MessageBody.FileName)"
}
Condition
This is what determines if a step shall be run or not. This is a JSON object having these 3 properties:
Evaluator
- An expression which is checked with the evaluated. Evaluated
- An expression against which the evaluator is checked. Operator
- The operator for evaluation. The following operators are supported:
stringequals
stringcontains
mathequals
mathgreaterthan
mathlessthan
mathgreaterthanorequalto
Mathlessthanorequalto
All of the above 3 properties can be made dynamic using JUST.
{
"Operator":"stringequals",
"Evaluated":"Norway",
"Evaluator":"#valueof($.MessageBody.Country)"
}
Transformer
A JSON transformer task is an inbuilt task in the system. The task transforms a JSON into another JSON using JUST. Some of the transformations in the system could be quite complex. Hence, I decided to add transformers as a separate feature instead of adding it to the task
properties.
Using the Code
The FloatingBridge
assembly is shipped with the installation package. It is also available as Nuget.
Install-Package FloatingBridge -Version 1.0.0
A new instance of the FloatingBridgeClient
can be initialized in the following way:
FloatingBridgeClient client = new FloatingBridgeClient(connectionString)
Publish a Message
A message can be published by using the API and calling the PublishMessage
method.
User user = new User() { Country = "Norway", Language = "Norsk", Name = "Ola" };
client.PublishMessage("UserAppDomain", "UserMessage", "UserPublisher",
"123",JsonConvert.SerializeObject(user), null);
Pull a Subscribed Message
A Message
can be pulled by using the API and calling the Getsubscribedmessages
method.
var messages = client.GetSubscribedMessages("UserAppDomain",
"UserMessage", "UserSubscriber", "123");
foreach (Message message in messages)
{
Console.WriteLine(JsonConvert.SerializeObject(message));
}
The result is:
{
"ID":"71ca9177-19d0-4aba-bdb0-6d8e414114d4",
"Body":{"Name":"Ola","Country":"Norway","Language":"Norsk"},
"CustomProperties":{"PublisherIdentity":"UserPublisher"},
"Timestamp":"2017-08-09T14:44:41",
"UniqueID":"46cd3ec4-7d11-11e7-b4fe-a08cfd1a3662",
"MessageTypeID":1,
"ConditionExpression":{"Operator":"stringequals",
"Evaluated":"Norway","Evaluator":"#valueof($. Body.Country)"}
}
Publish a Workflow
client.PublishWorkflow("UserAppDomain", "UserWorkflow", "UserPublisher",
"123", "{\"Country\":\"Sweden\"}", null);
Run a Workflow Synchronously
The step identifier of the step that it waits for to be completed needs to be specified. Once this step is completed, the result from this step is returned back.
client.RunWorkflow("pinecone", "SimpleWorkflow",
"CreditCard", "abcdefg", "{}", null,"AddApplication");
Re-Start a Workflow Based on Workflow ID
A workflow which is in the waiting (orphan) state can be restarted via. the API using the workflow
ID.
client.RestartWorkflow("UserPublisher", "123",
"{}", "56eaa328-b8fe-43fb-87db-80b118eb8ee4");
Re-Start Workflows Based on Custom Properties
In the above example, it is important to know the workflow
ID of the workflow. If an external system wants to use the API to re-start the workflow
, then the system might want to do it based on some correlated property.
Hence, we also need the ability to be able to re-start workflow
s based on custom properties.
Here is a sample code that uses a custom property name and value to start all waiting workflows
:
client.RestartWorkflowsByProperty("UserPublisher", "123", "LastExecutedTask", "Branch3", "{}");
Use FloatingBridge Core to Create Custom Tasks
The FloatingBridge.Core.dll from the application folder must be referenced. Nuget is also available.
Install-Package FloatingBridge.Core -Version 1.0.0
A total of 8 interfaces are provided, all of which have the same method Run
with different arguments and return types. The table below gives an overview of the interfaces.
Interface name | Input parameters | Return type |
IPublisherTask |
string taskPropertiesJson
| List<TaskMessage> |
ISubscriberTask |
TaskMessage input string taskPropertiesJson
| void |
IWorkflowTask |
TaskMessage input string taskPropertiesJson
| TaskMessage |
IWorkflowSplitterTask |
TaskMessage input string taskPropertiesJson
| SplitTaskMessage |
IIncommingTask |
string taskPropertiesJson
| TaskMessage |
IOutgoingTask |
TaskMessage input string taskPropertiesJson
| void |
IErrorHandlerTask |
TaskMessage input string taskPropertiesJson
| TaskMessage |
IErrorNotificationTask |
TaskMessage input string taskPropertiesJson
| void |
In-Built Tasks
The previous section described the various interfaces provided in the Core which helps us to develop our own tasks.
However, there are a few inbuilt tasks already provided by the system.
IErrorHandlerTask
Namespace - FloatingBridge.Core.Tasks.ErrorHandler
JsonWriter
XmlWriter
FlatFileWriter
JsonTransformer
MessagePublisher
MySqlDataConnector
MySqlDataWriter
Passthrough
RESTConnector
WorkflowPublisher
IErrorNotificationTask
Namespace - FloatingBridge.Core.Tasks.ErrorNotification
JsonWriter
- Writes the MessageBody
to a JSON file XmlWriter
- Converts the MessageBody
into XML and writes to an XML file FlatFileWriter
- Converts the MessageBody
into a flat file format and writes to the file MessagePublisher
- Publishes the MessageBody
to a FloatingBridge
publication MySqlDataWriter
- Executes configured stored procedure or text on configured MySql RESTConnector
- Executes configured REST API WorkflowPublisher
- Publishes the MessageBody
as a new FloatingBridge
workflow
IIncommingTask
Namespace - FloatingBridge.Core.Tasks.Incomming
JsonReader
- Reads from a specific JSON file XmlReader
- Reads from a specific XML file FlatFileReader
- Reads a flat file format from a specific file MySqlDataConnector
- Connects to a configured MySql and reads from SP or text RESTConnector
MessageSubscriber
- Subscribes to a configured FloatingBridge
subscription
IOutgoingTask
Namespace - FloatingBridge.Core.Tasks.Outgoing
JsonWriter
XmlWriter
FlatFileWriter
MessagePublisher
MySqlDataWriter
RESTConnector
WorkflowPublisher
IPublisherTask
Namespace - FloatingBridge.Core.Tasks.Publisher
JsonReader
XmlReader
FlatFileReader
MySqlDataConnector
RESTConnector
JsonMultiFileReader
- Reads multiple JSON files from a folder XmlMultiFileReader
- Reads multiple XML files from a folder MultiFlatFileReader
- Reads multiple flat files from a folder
ISubscriberTask
Namespace - FloatingBridge.Core.Tasks.Subscriber
JsonWriter
XmlWriter
FlatFileWriter
MySqlDataWriter
RESTConnector
IWorkflowTask
Namespace - FloatingBridge.Core.Tasks.Workflow
JsonWriter
XmlWriter
FlatFileWriter
MessagePublisher
MySqlDataWriter
RESTConnector
WorkflowPublisher
Passthrough
- Passes the message to the next task. This is useful when you want to configure multiple conditions for a branch. Each pass through task shall have one of those required run conditions. SyncWorkflowRunner
- Runs another FloatingBridge
workflow synchronously. DependentWorkflowStarter
- Re-starts another FloatingBridge
workflow. JsonTransformer
- Transforms the MessageBody
using JUST. MySqlDataConnector
IWorkflowSplitterTask
Namespace - FloatingBridge.Core.Tasks.Workflow
JsonSplitter
- Splits the MessageBody
using JUST. MySqlDataRowReader
- Connects to the configured MySql and splits the message by each row read.
Task Properties
Each task has its own predefined task properties configuration. Below are examples how to initialize the in-built task properties. While making you own custom tasks, the corresponding task properties need to be defined.
Task | Task Properties example |
JsonWriter | {“FileName” : “D:/test/out.json”} |
XmlWriter | {“FileName” : “D:/test/out.xml”, ”RootElementName” : “Root”} |
FlatFileWriter | {“FileName” : “D:/test/out.txt”,<br> ”FieldDelimiter” : “, ” , ”RecordDelimiter” : “\r\n”} |
Passthrough | |
JsonMultiFileReader | {“DirectoryName” : “D:/test”} |
XmlMultiFileReader | {“DirectoryName” : “D:/test”} |
MultiFlatFileReader | {“DirectoryName” : “D:/test”, “Pattern” : “*.csv”} |
RESTConnector |
{
"Url":"Http://www.yahoo.com",
"Method":"POST",
"Headers":{"header1":"value1","header2":"value2"},
"Body":"#valueof($.MessageBody.RestRequest)"
}
|
MySqlDataWriter |
{
"ConnectionString": "xxxxxxxx",
"CommandText": null,
"StoredProcedureName": "StoreValues",
"StoredProcedureParamaters": [
{"Name":"param1","Value":"Value1"},
{"Name":"param2","Value":"Value2"}
]
}
{
"ConnectionString": "xxxxxxxx",
"CommandText": "#xconcat(UPDATE user SET username =
,#valueof($.value.Window), WHERE id = 4)",
"StoredProcedureName": null,
"StoredProcedureParamaters": null
}
|
MySqlDataConnector | Configuration same as MySqlDataWriter . |
MySqlDataRowReader | Configuration same as MySqlDataWriter . |
JsonReader | {“FileName” : “D:/test/in.json”} |
XmlReader | {“FileName” : “D:/test/in.xml”} |
FlatFileReader |
{
"FileName" : "D:/test/in.txt",
"FieldDelimiter" : [",",";"] ,
"RecordDelimiter" : ["\r\n","\n"],
"HeaderIdentifier":["header","start"],
"FooterIdentifier":["footer","end"]
}
|
JsonTransformer | {“TransformerID” : 2} |
JsonSplitter | {“ArrayPath” : “$.MessageBody.Organisation.Users”} |
MessagePublisher |
{
"ConnectionString":"xxxxxxx",
"ApplicationDomainName":"UserAppDomain",
"MessageTypeName":"UserMessage",
"PublisherIdentity":"UserPubliser",
"PublisherSecret":"123",
"CustomProperties":{"source":"scanner","OS":"windows"}
}
|
MessageSubscriber |
{
"ConnectionString":"xxxxxxx",
"ApplicationDomainName":"UserAppDomain",
"MessageTypeName":"UserMessage",
"SubscriberIdentity":"UserPubliser",
"SubscriberSecret":"123",
"CustomProperties":{"source":"scanner","OS":"windows"}
}
|
WorkflowPublisher |
{
"ConnectionString":"xxxxxxx",
"ApplicationDomainName":"UserAppDomain",
"WorkflowTypeName":"UserMessage",
"PublisherIdentity":"UserPubliser",
"PublisherSecret":"123",
"CustomProperties":{"source":"scanner","OS":"windows"}
}
|
SyncWorkflowRunner |
{
"ConnectionString":"xxxxxxx",
"ApplicationDomainName":"UserAppDomain",
"WorkflowTypeName":"UserMessage",
"PublisherIdentity":"UserPubliser",
"PublisherSecret":"123",
"CustomProperties":{"source":"scanner","OS":"windows"},
"StepIdentifier" :"AddRolesStep"
}
|
DependentWorkflowStarter |
{
"ConnectionString":"xxxxxxx",
"PublisherIdentity":"UserPubliser",
"PublisherSecret":"123",
"SplitID" :null,
"WorkflowID" :null,
"CustomPropertyName" :"LastExecutedTask",
"CustomPropertyValue" :"Branch3"
}
{
"ConnectionString":"xxxxxxx",
"PublisherIdentity":"UserPubliser",
"PublisherSecret":"123",
"SplitID" :1,
"WorkflowID" :"56eaa328-b8fe-43fb-87db-80b118eb8ee4",
"CustomPropertyName" :null,
"CustomPropertyValue" :null
}
|
History
- First version of
FloatingBridge
- Added source code link