Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / database / MySQL

FloatingBridge

5.00/5 (4 votes)
22 Aug 2017CPOL14 min read 13.3K   120  
A Simple Messaging Workflow ETL system on MySQL and .NET

Introduction

FloatingBridge is a simple Messaging\ Workflow\ETL system based on MySql and .NET. MySql is used as the backend engine for the system. The front-end is designed in .NET WPF (Windows Presentation Foundation). The business logic is handled by .NET windows services.

This article gives a brief introduction to this system. The installation files and detailed documentation are provided with this article.

Background

I have worked quite a lot with Microsoft BizTalk Server. I have also worked with some of the other alternative systems. My intention was to make a simple Messaging\ Workflow\ ETL system based on a modern RDS which uses JSON data type instead of XML.

Features

The system comes with a management console which helps you to configure your Messaging\ Workflow system. The management console has the following features:

  • Backend Configuration - All the required scripts to initialize the MySql database are run when the console is launched the first time.
  • Service Configuration - The system comes with 3 kinds of services. They are described in the section below. However, we can configure as many instances of the services as we want. Typically, we would configure one per Application Domain.
  • Messaging System Configuration - This can be configured by configuring the corresponding Message Type, Publishers, Subscribers, Publications & Subscriptions.
  • Workflow Configuration - Configure a workflow which is a sequence of branched tasks with business logic defined in JSON.
  • Manage Messages & Workflows - Old messages can be made redundant (orphan), waiting Workflows can be re-started.
  • History - History of messages & Workflows can be viewed.
  • Transformations - Transformation tasks which transform JSON can be created using JUST - https://www.codeproject.com/Articles/1187172/JUST-JSON-Under-Simple-Transformation.
  • Custom Tasks - The system comes with some well known custom tasks.
  • API - The system comes with an API which can be used by external systems to perform operations on the system like Publish messages, Pull Subscribed messages, Publish Workflows, Run synchronous workflows & Re-start waiting workflows. The Core API can be used to implement your own custom tasks.

System Architecture

The diagram below illustrates the different components of the system.

Image 1

Database Engine

The database engine is the central component of the system. MySql database is used for the engine. The engine stores the configuration & tracking data of the system.

The following configuration data is stored:

  1. Message configuration - message type, its application domain
  2. Workflow configuration - workflow type, application domain and business logic
  3. Publishers which can publish messages and workflows
  4. Subscribers which can subscribe to messages
  5. Tasks which can either be part of a workflow, listener or subscription

In addition, all the historical data associated with a message and workflow is stored for tracking purposes.

Subscriber Service

This is a .NET Windows service which polls the database for subscription tasks. A subscription task simply reads a published message from the database and processes it. There are inbuilt subscription tasks provided by the system which are described in the last section.

Listener Service

This is also a .NET windows service which runs tasks associated with listeners. A listener is a task which reads data from an external source and publishes a message to the database based on the data gathered from the external source.

Workflow Service

This is the .NET Windows service which runs the various tasks associated with workflows in an order defined by the business logic of the workflow. It performs the following tasks:

  • Reads workflow messages from the database and performs the next action based on the business logic and the current state.
  • Runs the next synchronous task and saves the state to the engine.
  • Runs the next outgoing task and waits for the incoming task.
  • Retries the task based upon the retry count.
  • Runs the error handler or error notification task in case of failures.

Management Console

This is an application written in WPF (Windows Presentation Foundation) that enables an end user to manage the configuration & tracking of messages and workflows.

FloatingBridge API

The FloatingBridge API is a simple DLL which can be referenced in an external project. It is located in the installation folder as well as available as a nuget package. By referencing the API, an external entity or system can perform the following tasks:

  • Publish a message
  • Pull subscribed messages
  • Publish a workflow
  • Run a synschronous workflow
  • Restart a waiting workflow

FloatingBridge Terminology

It is important to understand the terminology used in FloatingBridge, as the management console is solely based upon it and various actions around it.

  • Application Domain ​- A logical grouping for an application which has message types & workflows associated it. Each service runs under an application domain.
  • Service ​- A .NET Windows service (Listener, Subscriber or Workflow) which runs under an application domain. A service requires two input parameters:
    1. Application Domain
    2. connection string to the MySql database engine
  • Message Type​ - A type that defines a message. Publishers can publish this type of message & subscribers call pull this type of message.
  • Publisher ​- A client has the license to publish messages to the system.
  • Subscriber ​- A client that has the license to subscribe to messages in the system.
  • Task ​- A function which is run by the system. A task can be publishing, subscribing or a workflow task.
  • Listener ​- This is run by the listener service periodically to listen to messages which need to be published. Exampel, a task which reads files from a directory and publishes a message as soon as files are found based upon the content of the files.
  • Publication ​- This configuration decides whether a publisher or a listener can publish a specific message type.
  • Subscription​ - This configuration decides whether a Subscriber or a task can pull messages out of the system.
  • Active Message​ - A message which has been published to the system and is waiting to be pulled.
  • Orphaned Message​ - An active message can be set to orphan in the management console. Once set to orphan, this message cannot be pulled.
  • Workflow (Workflow Type)​ - A sequence of steps(tasks) to execute. The steps are defined in a JSON format which is called as business logic. A workflow is associated with an application domain. The workflow service executes the steps in the workflow.
  • Orphaned workflow​ - A workflow which is waiting either for a manual external input or an incoming task.

As mentioned above, ​Task ​is a function run by the system. There are 8 different kinds of tasks. They are simple C# classes which need to implement one of the 8 different interfaces in the FloatinBridge.Core assembly.
Each of the 8 different tasks accomplish a different purpose:

  • Publisher Task ​- A task which is run by the listener service. The task returns a message which is published into the system.
  • Subscriber Task​ - A task which is run by the subscriber service. The service pulls all the subscribed messages to the task and passes them to the task.
  • Workflow Task​ - A synchronous workflow step run by the workflow service. The service sends a message to the task and the task returns a message back to the service.
  • Workflow Splitter Task ​- Similar to the above, the only difference being that in this case, the task returns multiple messages back to the service. This is used when we want to split the workflow into multiple workflows. When the workflow is split into multiple workflows, something called ​SplitID ​is assigned to each workflow. This ID determines the individual workflow once it is split off from the original workflow.
  • Outgoing Task ​- An asynchronous workflow step. The workflow goes into a waiting state after this task is executed. The workflow needs to be resumed either manually by the API or an incoming task.
  • Incoming Task​ - An asynchronous workflow step which resumes a waiting workflow.
  • Error Handler Task ​- This is a compensating task which is associated with a workflow step and is executed when the step fails. Once this task is executed, the workflow resumes with the output of this task.
  • Error Notification Task​ - Similar to the above task except that the workflow does not resume after its completion. The workflow has to be resumed manually by the API.

Installation & Configuration

The installation procedure is a normal installation procedure where you click on the MSI and choose an application folder on your system. The installer just copies all the artifacts to the application folder.

1202556/Screen3.png

The Messaging\ Workflow system can be configured using the various options on the menu and choosing the correct artifact. I am not attaching screen shots of each configuration option here in this article.

The screen-shot below shows the various options available to configure the artifacts. The right-hand panel shows the message history.

1202556/Screen1.png

Configured Workflow

1202556/Screen2.png

Workflow Business Logic

The flow and functionality of the workflow is defined by the business logic. The business logic is JSON object. Below is the JSON representation of the configured workflow in the above section.

C#
{ 
   "Start":{ 
      "Next":[ 
         "Branch1",
         "Branch2",
         "Branch3"
      ],
      "TaskID":41,
      "OnFailure":null,
      "Identifier":"FirstTask",
      "TaskRetries":0,
      "RunCondition":null,
      "TaskProperties":null,
      "IncommingTaskID":0,
      "IncommingTaskProperties":null
   },
   "Tasks":[ 
      { 
         "Next":[ 
            "Final"
         ],
         "TaskID":23,
         "OnFailure":"NotifierTask",
         "Identifier":"Branch1",
         "TaskRetries":5,
         "RunCondition":{ 
            "Operator":"stringequals",
            "Evaluated":"Norway",
            "Evaluator":"#valueof($.MessageBody.Country)"
         },
         "TaskProperties":{ 
            "FileName":"D:/Test/Branch1.json"
         },
         "IncommingTaskID":31,
         "IncommingTaskProperties":{ 
            "FileName":"D:/Test/Branch1In.json"
         }
      },
      { 
         "Next":[ 
            "Final"
         ],
         "TaskID":23,
         "OnFailure":null,
         "Identifier":"Branch3",
         "TaskRetries":-1,
         "RunCondition":{ 
            "Operator":"stringequals",
            "Evaluated":"Denmark",
            "Evaluator":"#valueof($.MessageBody.Country)"
         },
         "TaskProperties":{ 
            "FileName":"D:/Test/branch3.json"
         },
         "IncommingTaskID":0,
         "IncommingTaskProperties":null
      },
      { 
         "Next":[ 
            "Final"
         ],
         "TaskID":35,
         "OnFailure":null,
         "Identifier":"Branch2",
         "TaskRetries":-1,
         "RunCondition":{ 
            "Operator":"stringequals",
            "Evaluated":"Sweden",
            "Evaluator":"#valueof($.MessageBody.Country)"
         },
         "TaskProperties":{ 
            "FileName":"D:/Test/Branch2.json"
         },
         "IncommingTaskID":0,
         "IncommingTaskProperties":null
      },
      { 
         "Next":null,
         "TaskID":41,
         "OnFailure":null,
         "Identifier":"Final",
         "TaskRetries":-1,
         "RunCondition":null,
         "TaskProperties":null,
         "IncommingTaskID":0,
         "IncommingTaskProperties":null
      }
   ],
   "OnFailure":null,
   "TaskRetries":0
}

The above JSON might look scary at first glance but it is configured using the management Console.

The business logic JSON contains the following properties:

  • Start ​- The first step or starting point of the workflow. This is a Task JSON object.
  • Tasks ​- An array of all other steps in the workflow. A JSON array containing Task JSON objects.
  • TaskRetries ​- A global setting for the workflow stating number of times a step should be retried before failing. This is overridden by the individual step setting.
  • OnFailure ​- A global setting for the workflow stating the ErrorHandler or ErrorNotification task step to be executed in case of failure. This is overridden by the individual step setting.

Task

A task represents an individual step in the workflow. It has the following properties:

  • Identifier ​- A simple string identifying the task in the workflow. Needs to be unique for the workflow.
  • TaskID ​- This is the actual ID of the task in the database. This is set automatically when you create the task. It can be modified.
  • OnFailure ​- If defined, overrides the corresponding property of the workflow.
  • TaskRetries ​- If defined, overrides the corresponding property of the workflow.
  • Next ​- A string array containing a list of identifiers of the task which needs to be executed after this task.
  • RunCondition ​- A JSON object which defines the condition under which this task shall be executed. This is used when we want to branch a workflow based on different conditions.
  • TaskProperties ​- A JSON object containing properties for the execution of the step. Example, for a JSON writer, it will contain the name of the file to write to.
  • IncommingTaskID ​- Identifier of the incoming task step if this step is a one-way outgoing step.
  • IncommingTaskProperties ​- Corresponding JSON task properties for this incoming task.

Task Properties

Task properties is a JSON object which defined the properties for the execution of a task. Each task has its own JSON schema. Example, for a JSON writer task, the task properties look like this:

C#
{ 
   "FileName":"D:/Test/Out.json"
}

The task properties can be made dynamic by using ​JUST - JSON Under Simple Transformation. https://www.codeproject.com/Articles/1187172/JUST-JSON-Under-Simple-Transformation

Example, we want to set the “FileName”, based upon a value inside the message body.

C#
{ 
   "FileName":"#valueof($.MessageBody.FileName)"
}

Condition

This is what determines if a step shall be run or not. This is a JSON object having these 3 properties:

  • Evaluator ​- An expression which is checked with the evaluated.
  • Evaluated ​- An expression against which the evaluator is checked.
  • Operator ​- The operator for evaluation. The following operators are supported:
    • stringequals
    • stringcontains
    • mathequals
    • mathgreaterthan
    • mathlessthan
    • mathgreaterthanorequalto
    • Mathlessthanorequalto

All of the above 3 properties can be made dynamic using ​JUST​.

C#
{ 
   "Operator":"stringequals",
   "Evaluated":"Norway",
   "Evaluator":"#valueof($.MessageBody.Country)"
}

Transformer

A JSON transformer task is an inbuilt task in the system. The task transforms a JSON into another JSON using JUST. Some of the transformations in the system could be quite complex. Hence, I decided to add transformers as a separate feature instead of adding it to the task properties.

Using the Code

The FloatingBridge assembly is shipped with the installation package. It is also available as Nuget.

C#
Install-Package FloatingBridge -Version 1.0.0  

A new instance of the FloatingBridgeClient can be initialized in the following way:

C#
FloatingBridgeClient client = new FloatingBridgeClient(connectionString)

Publish a Message

A message can be published by using the API and calling the ​PublishMessage ​method.

C#
User user = new User() {  Country = "Norway",  Language = "Norsk",  Name = "Ola"  }; 
client.PublishMessage("UserAppDomain", "UserMessage", "UserPublisher", 
"123",JsonConvert.SerializeObject(user), null);  

Pull a Subscribed Message

A Message can be pulled by using the API and calling the ​Getsubscribedmessages ​method.

C#
var messages = client.GetSubscribedMessages("UserAppDomain", 
"UserMessage", "UserSubscriber", "123");   
foreach (Message message in messages) 
{            
	 Console.WriteLine(JsonConvert.SerializeObject(message));
}  

The result is:

C#
{   
	"ID":"71ca9177-19d0-4aba-bdb0-6d8e414114d4",  
	"Body":{"Name":"Ola","Country":"Norway","Language":"Norsk"}, 
	"CustomProperties":{"PublisherIdentity":"UserPublisher"},  
	"Timestamp":"2017-08-09T14:44:41",
	"UniqueID":"46cd3ec4-7d11-11e7-b4fe-a08cfd1a3662",  
	"MessageTypeID":1, 
	"ConditionExpression":{"Operator":"stringequals",
	"Evaluated":"Norway","Evaluator":"#valueof($. Body.Country)"}
} 

Publish a Workflow

C#
client.PublishWorkflow("UserAppDomain", "UserWorkflow", "UserPublisher", 
"123", "{\"Country\":\"Sweden\"}", null);

Run a Workflow Synchronously

The step identifier of the step that it waits for to be completed needs to be specified. Once this step is completed, the result from this step is returned back.

C#
client.RunWorkflow("pinecone", "SimpleWorkflow", 
"CreditCard", "abcdefg", "{}", null,"​AddApplication​");

Re-Start a Workflow Based on Workflow ID

A workflow which is in the waiting (orphan) state can be restarted via. the API using the workflow ID.

C#
client.RestartWorkflow("UserPublisher", "123", 
"{}", "56eaa328-b8fe-43fb-87db-80b118eb8ee4");

Re-Start Workflows Based on Custom Properties

In the above example, it is important to know the workflow ID of the workflow. If an external system wants to use the API to re-start the workflow, then the system might want to do it based on some correlated property.

Hence, we also need the ability to be able to re-start workflows based on custom properties.

Here is a sample code that uses a custom property name and value to start all waiting workflows:

C#
client.RestartWorkflowsByProperty("UserPublisher", "123", "LastExecutedTask", "Branch3", "{}");

Use FloatingBridge Core to Create Custom Tasks

The FloatingBridge.Core.dll from the application folder must be referenced. Nuget is also available.

C#
Install-Package FloatingBridge.Core -Version 1.0.0

A total of 8 interfaces are provided, all of which have the same method ​Run ​with different arguments and return types. The table below gives an overview of the interfaces.

Interface name Input parameters Return type
IPublisherTask
  • string taskPropertiesJson
List<TaskMessage>
ISubscriberTask
  • TaskMessage input
  • string taskPropertiesJson
void
IWorkflowTask
  • TaskMessage input
  • string taskPropertiesJson
TaskMessage
IWorkflowSplitterTask
  • TaskMessage input
  • string taskPropertiesJson
SplitTaskMessage
IIncommingTask
  • string taskPropertiesJson
TaskMessage
IOutgoingTask
  • TaskMessage input
  • string taskPropertiesJson
void
IErrorHandlerTask
  • TaskMessage input
  • string taskPropertiesJson
TaskMessage
IErrorNotificationTask
  • TaskMessage input
  • string taskPropertiesJson
void

In-Built Tasks

The previous section described the various interfaces provided in the Core which helps us to develop our own tasks.

However, there are a few inbuilt tasks already provided by the system.

IErrorHandlerTask

Namespace - FloatingBridge.Core.Tasks.ErrorHandler

  • JsonWriter
  • XmlWriter
  • FlatFileWriter
  • JsonTransformer
  • MessagePublisher
  • MySqlDataConnector
  • MySqlDataWriter
  • Passthrough
  • RESTConnector
  • WorkflowPublisher

IErrorNotificationTask

Namespace - FloatingBridge.Core.Tasks.ErrorNotification

  • JsonWriter - Writes the MessageBody to a JSON file
  • XmlWriter - Converts the MessageBody into XML and writes to an XML file
  • FlatFileWriter - Converts the MessageBody into a flat file format and writes to the file
  • MessagePublisher - Publishes the MessageBody to a FloatingBridge publication
  • MySqlDataWriter - Executes configured stored procedure or text on configured MySql
  • RESTConnector - Executes configured REST API
  • WorkflowPublisher - Publishes the MessageBody as a new FloatingBridge workflow

IIncommingTask

Namespace - FloatingBridge.Core.Tasks.Incomming

  • JsonReader - Reads from a specific JSON file
  • XmlReader - Reads from a specific XML file
  • FlatFileReader - Reads a flat file format from a specific file
  • MySqlDataConnector - Connects to a configured MySql and reads from SP or text
  • RESTConnector
  • MessageSubscriber - Subscribes to a configured FloatingBridge subscription

IOutgoingTask

Namespace - FloatingBridge.Core.Tasks.Outgoing

  • JsonWriter
  • XmlWriter
  • FlatFileWriter
  • MessagePublisher
  • MySqlDataWriter
  • RESTConnector
  • WorkflowPublisher

IPublisherTask

Namespace - FloatingBridge.Core.Tasks.Publisher

  • JsonReader
  • XmlReader
  • FlatFileReader
  • MySqlDataConnector
  • RESTConnector
  • JsonMultiFileReader - Reads multiple JSON files from a folder
  • XmlMultiFileReader - Reads multiple XML files from a folder
  • MultiFlatFileReader - Reads multiple flat files from a folder

ISubscriberTask

Namespace - FloatingBridge.Core.Tasks.Subscriber

  • JsonWriter
  • XmlWriter
  • FlatFileWriter
  • MySqlDataWriter
  • RESTConnector

IWorkflowTask

Namespace - FloatingBridge.Core.Tasks.Workflow

  • JsonWriter
  • XmlWriter
  • FlatFileWriter
  • MessagePublisher
  • MySqlDataWriter
  • RESTConnector
  • WorkflowPublisher
  • Passthrough - Passes the message to the next task. This is useful when you want to configure multiple conditions for a branch. Each pass through task shall have one of those required run conditions.
  • SyncWorkflowRunner - Runs another FloatingBridge workflow synchronously.
  • DependentWorkflowStarter - Re-starts another FloatingBridge workflow.
  • JsonTransformer - Transforms the MessageBody using JUST.
  • MySqlDataConnector

IWorkflowSplitterTask

Namespace - FloatingBridge.Core.Tasks.Workflow

  • JsonSplitter - Splits the MessageBody using JUST.
  • MySqlDataRowReader - Connects to the configured MySql and splits the message by each row read.

Task Properties

Each task has its own predefined task properties configuration. Below are examples how to initialize the in-built task properties. While making you own custom tasks, the corresponding task properties need to be defined.

Task Task Properties example
JsonWriter {“FileName” : “D:/test/out.json”}
XmlWriter {“FileName” : “D:/test/out.xml”, ”RootElementName” : “Root”}
FlatFileWriter {“FileName” : “D:/test/out.txt”,<br> ”FieldDelimiter” : “, ” , ”RecordDelimiter” : “\r\n”}
Passthrough  
JsonMultiFileReader {“DirectoryName” : “D:/test”}
XmlMultiFileReader {“DirectoryName” : “D:/test”}
MultiFlatFileReader {“DirectoryName” : “D:/test”, “Pattern” : “*.csv”}
RESTConnector
C#
{
	"Url":"Http://www.yahoo.com", 
	"Method":"POST",
	"Headers":{"header1":"value1","header2":"value2"}, 
    "Body":"#valueof($.MessageBody.RestRequest)"
}
MySqlDataWriter
C#
{ // Example for stored procedure
	"ConnectionString": "xxxxxxxx",
	 "CommandText": null,
	 "StoredProcedureName": "StoreValues",
	 "StoredProcedureParamaters":   [
	  {"Name":"param1","Value":"Value1"},
	  {"Name":"param2","Value":"Value2"}
	 ]
	}

	{ // Example for text
	 "ConnectionString": "xxxxxxxx",
	 "CommandText": "#xconcat(UPDATE user SET username = 
                    ,#valueof($.value.Window), WHERE id = 4)",
	 "StoredProcedureName": null,
	 "StoredProcedureParamaters":   null
	}
MySqlDataConnector Configuration same as MySqlDataWriter.
MySqlDataRowReader Configuration same as MySqlDataWriter.
JsonReader {“FileName” : “D:/test/in.json”}
XmlReader {“FileName” : “D:/test/in.xml”}
FlatFileReader
C#
{
	 "FileName" : "D:/test/in.txt", 
	 "FieldDelimiter" : [",",";"] , 
	 "RecordDelimiter" : ["\r\n","\n"],
	 "HeaderIdentifier":["header","start"], //All records 
	                    //containing these strings are considered header
	 "FooterIdentifier":["footer","end"] // All records 
	                    //containing these strings are considered footers
						}
JsonTransformer {“TransformerID” : 2}
JsonSplitter {“ArrayPath” : “$.MessageBody.Organisation.Users”}
MessagePublisher
C#
{
	"ConnectionString":"xxxxxxx", 
	"ApplicationDomainName":"UserAppDomain",
	"MessageTypeName":"UserMessage",
	"PublisherIdentity":"UserPubliser",
	"PublisherSecret":"123",
	"CustomProperties":{"source":"scanner","OS":"windows"}
}
MessageSubscriber
C#
{
	"ConnectionString":"xxxxxxx", 
	"ApplicationDomainName":"UserAppDomain",
	"MessageTypeName":"UserMessage",
	"SubscriberIdentity":"UserPubliser",
	"SubscriberSecret":"123",
	"CustomProperties":{"source":"scanner","OS":"windows"}
}
WorkflowPublisher
C#
{
	"ConnectionString":"xxxxxxx", 
	"ApplicationDomainName":"UserAppDomain",
	"WorkflowTypeName":"UserMessage",
	"PublisherIdentity":"UserPubliser",
	"PublisherSecret":"123",
	"CustomProperties":{"source":"scanner","OS":"windows"}
}
SyncWorkflowRunner
C#
{
	"ConnectionString":"xxxxxxx", 
	"ApplicationDomainName":"UserAppDomain",
	"WorkflowTypeName":"UserMessage",
	"PublisherIdentity":"UserPubliser",
	"PublisherSecret":"123",
	"CustomProperties":{"source":"scanner","OS":"windows"},
	"StepIdentifier" :"AddRolesStep"
}
DependentWorkflowStarter
C#
{ //RestartWorkflowsByProperty
	"ConnectionString":"xxxxxxx", 
	"PublisherIdentity":"UserPubliser",
	"PublisherSecret":"123",
	"SplitID" :null,
	"WorkflowID" :null,
	"CustomPropertyName" :"LastExecutedTask",
	"CustomPropertyValue" :"Branch3"
	}
						 
	{ //RestartWorkflow
	"ConnectionString":"xxxxxxx", 
	"PublisherIdentity":"UserPubliser",
	"PublisherSecret":"123",
	"SplitID" :1,
	"WorkflowID" :"56eaa328-b8fe-43fb-87db-80b118eb8ee4",
	"CustomPropertyName" :null,
	"CustomPropertyValue" :null
}

History

  • First version of FloatingBridge
  • Added source code link

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)