Contents
Tasks (operations) flow control plays an important role in many software projects. Its reliable and effective implementation may be vital for project success. Several years ago, I already presented an article with a similar scope entitled Tiny Framework for Parallel Computing. Although the main approach of that work remained unchanged, some concepts are evolving, and design is remarkably different. Implementation of major types was simplified, and additional important and useful features were added to the infrastructure. My intention was to make this article self-sustained and independent on its predecessor, so some concepts are reiterated. The subject of this framework is control of processes consisting of separate operations performing either sequentially or in parallel. Examples of such processes can be machine control, complex software testing, financial flows, etc. Code is written in C# and targeted to .NET Core, but can be easily ported to other languages. It was tested in Windows and Linux (Ubuntu 18.04).
The two main entities of the presented framework are command and processor (that's why I called it Processor-Command Framework, or in short PCF). Command constitutes any class that implements ICommand
interface. The main method of this interface is method Do()
. This method performs useful activity of the command. Processor constitutes types responsible for commands proper queuing and execution of their Do()
method. The methods may be called either sequentially or in parallel according to the way they were queued. Processor performs all required actions for Do()
execution, like error handling, logging, etc.
The main idea of this framework is clear and strict separation between command and processor. Such an approach has several advantages, both technical and organizational. Those advantages are:
- High flexibility in commands implementation. Assemblies (DLLs) containing commands can be loaded at runtime.
- Processor related code is relatively stable. Being written once, it will seldom be changed, if at all. This will considerably reduce amount of testing.
- While processor development requires multi-threading and synchronization, writing of command code is generally (however not always!) simpler, but requires more knowledge in activity domain. So the commands may be developed in many cases by technical support and QA staff.
- The framework is applicable to tasks flow in various fields. Its activity domain is defined by its set of commands, generally loaded dynamically at runtime.
Relationships between main elements of the framework are depicted below:
The following table shows infrastructure components and their brief description:
Component | Description |
AsyncLockLib | Synchronization types for async -await methods |
CommandLib | Command and State related interfaces and types |
CommandsAssemblyLoaderLib | Dynamic commands DLL loader |
ConfigurationLib | Application configuration support |
LogInterfaceLib | ILog interface |
LogLib | Logger implementation based on NLog |
ProcessorLib | Processor and queues related types |
SerializationLib | Objects serialization support |
SignalRBaseHubClientLib | Base class for SignalR hub clients |
SignalRBaseHubServerLib | Base class for SignalR hub providing object streaming |
StreamingDataProviderLib | Interfaces and base class for streaming data provider |
TcpHelperLib | Component for TCP communication |
In the scope of this framework, every action is a command. It can be domain related procedure like e.g. some measurements or interaction with data storage. Or command may call remote service. This can be performed either in blocking mode when remote method call gets service's response, or asynchronically, when the response will be received later on by another object. The asynchronous call will be discussed below in chapter State. Command can also switch some operation mode. It can also create a new command or even create a new processor. Logging in this framework also operates as sequential commands (please see details below).
All commands derived from abstract
base class Command
implementing interface ICommand
with its main method Do()
described above. To be executed, command first should be enqueued by appropriate processor. For the sake of flexibility, commands may be placed in a separate assemblies loaded dynamically at runtime. Such a loading is carried out with CommandsAssemblyLoader
class of CommandsAssemblyLoaderLib
infrastructure component. This class also provides public
method CreateCommand()
implementing interface ICommandFactory
to create command from the uploaded assembly.
When commands containing assembly are loaded dynamically, constructor of a contained command is not available in caller code. So command may be created in three different ways. The first way is usage of reflection with Activator.CreateInstance()
method. This approach does not require additional code but is very slow and therefore does not suit for often created commands. The second way is to use Expression.Lambda()
method approach implemented in static
class CommandCreator
of CommandLib
infrastructure component. This is approach is much faster as compared to direct reflection usage, but still less efficient with respect to constructor call. So in case when the command type needs to be often instantiated, it would be useful to implement class CommandFactory : ICommandFactory
which in its method CreateCommand()
calls constructor of command required. Type CommandsAssemblyLoader
for command creation works as follows. First, in its method Load()
after loading commands assembly, it tries to find there command factory implementing interface ICommandFactory
. If such type is found, then it is instantiated with reflection once and is kept in the instance of CommandsAssemblyLoader
. On command creation, method CreateCommand()
tries to use this command factory (if found). If the command factory was not found or failed to create required command, then the Expression Lambda technique is used.
ProcessorLib
component is responsible for running commands. It provides environment for commands queueing and execution. Commands are queued with Processor
and ProcessorQueue
types according to their priorities. Processor.ProcessAsync()
executes Command.Do()
methods:
private Task<ParallelLoopResult> ProcessAsync(params ICommand[] commands)
{
if (commands == null)
return null;
return Task.Run(() =>
{
return Parallel.ForEach(commands, command =>
{
if (command == null || command.IsProcessed || command.Err != null)
return;
var commandDescription =
$"COMMAND Type: \"{command.GetType()}\",
Id:\"{command.Id}\", Priority: {command.Priority} ";
_log?.Debug($"{_logPrefix}{commandDescription} - BEGIN.");
try
{
command.Do();
command.IsProcessed = true;
_log?.Debug($"{_logPrefix}{commandDescription} - END.");
}
catch (Exception e)
{
command.Err = new Error { Ex = e };
_log?.Error($"{_logPrefix}{commandDescription} produced the following exception: ",e);
try
{
_actionOnException?.Invoke(command, e);
}
catch (Exception ex)
{
var msg = $"{_logPrefix}Exception in exception handler for
command \"{command.Id}\".";
_log?.Fatal(msg, ex);
throw new Exception(msg, ex);
}
}
finally
{
if (command.IsProcessed)
{
var now = DateTime.Now;
command.ProcessingLag = now - command.TimeStamp;
command.TimeStamp = now;
SetMaxProcessingLag(command.GetType(), command.ProcessingLag);
}
_log?.Debug($"{_logPrefix}{command}");
}
});
});
}
In its constructor, Processor
class gets logger implementation (if required) to avoid circular references with LogLib
component, and optionally action on exception which may happen while executing Command.Do()
method. Processor
's methods:
public void Enqueue(params ICommand[] commands)
and:
public void EnqueueParallel(params ICommand[] commands)
allow caller to enqueue array of commands either sequentially or in parallel.
Note: Commands enqueued for sequential execution by the same Enqueue()
method will be executed according to their individual priorities. However, commands enqueued for parallel execution by the same EnqueueParallel()
method will be executed according to their highest priority.
Class State
belongs to CommandLib
component. As it is clear from its name, it maintains the state of the system. This is singleton with protected multi-threaded access properties dictionary.
private static readonly ConcurrentDictionary<string, object> _cdctProperties =
new ConcurrentDictionary<string, object>();
Command takes data from State
for processing and puts some of its output there to be used in upcoming commands. State
also contains handlers for receiving and processing asynchronous messages (events) and streaming data from services. These handler objects are created by commands establishing connection to the services. Upon receiving asynchronous messages from services, they normally enqueue additional commands for processing those messages. This will be shown in our software sample discussion below.
ConfigurationLib
infrastructure component provides support for configuration reading from a dedicated JSON file. Class Configuration
takes path to this file as an argument of its constructor and provides handy methods for reading data from the file. By default, JSON configuration file is named according to the following pattern: <Application name>.config.json.
SerializationLib
infrastructure component provides methods for binary serialization using Stream
(static
class SerializationBin
) and collection of useful extension methods for byte and JSON serialization.
Logging is presented with two components, namely LogInterfaceLib
and LogLib
. The former defines interface ILog
whereas the latter provides its implementation with type Log : ILog
. This implementation is based on well known NLog product and uses appropriate configuration file <Application name>nlog.config. The logger internally uses Processor-Command paradigm with its dedicated processor and command class LogCommand : Command
.
It is important to provide convenient-to-use components for communication between commands and services. Presented infrastructure supports out-of-the-box communication via TCP sockets and SignalR technique.
Component TcpHelperLib
provides infrastructure for TCP sockets communication including possibility of remote method call and data streaming. It is also configurable with JSON using Configuration
type discussed above. Main public type for server and client alike is TcpHelper
. This TCP communication mechanism implemented in the component described in details in my Code Project article, TCP Socket Off-the-shelf - Revisited with Async-Await and .NET Core.
SignalR
is a library allowing user to organize duplex communication with either WebSockets (preferred when available) or HTTP long polling. Component SignalRBaseHubServerLib
provides SignalR server infrastructure whereas component SignalRBaseHubClientLib
stands for the client side. SignalR communication technique used here is described in details (with very minor changes) in my Code Project article, Simple SignalR Data Streaming Infrastructure Out-of-the-Box.
Both communication techniques used in this work and discussed above provide user with infrastructure for data streaming. Streaming here means procedure when client registers with server and subscribes for data sent by the server asynchronously. Server has data provider(s) responsible for generation of data. As soon as new piece of data has been generated, data provider sends it to all client subscribed for these data. Infrastructure component StreamingDataProviderLib
provides generic base class StreamingDataProvider<T>
for user streaming data providers. This class is parameterized with data type that will be streamed. Invocation of setter StreamingDataProvider<T>.Current
triggers sending new data to subscribers. During this procedure, subscribers are checked for their validity, and non-valid ones are purged from the subscribers list.
Since in most of the places, multi-threading is based on async
-await
paradigm, the adequate synchronization mechanism is required. Infrastructure component AsyncLockLib
provides appropriate synchronization classes AsyncLock
from here and AsyncAutoResetEvent
from this source.
MachineLib
component is not part of the framework infrastructure. But it's the only type Machine
provides some useful (although not mandatory) superstructure above Processor
. Constructor of Machine
in its current implementation loads basic configuration, creates main processor and enqueues the first command.
Folder Tests contains two console test applications. Application CommanderTest
includes three command classes. Its Main()
method enqueues with a processor TestCommand
s with different priorities in variety of combinations. Commands SuspendCommand
and ResumeCommand
are used to suspend and then resume processor functioning. Please note that suspending command is executed by the main processor. Method Do()
of SuspendCommand
creates a new processor to execute ResumeCommand
later and puts this processor to State
. In order to resume main processor, ResumeCommand
is enqueued with the resume processor. The command takes main processor from State
and executes its method Resume()
. Of course, in this simple case, we could execute resume
method of the main processor directly without additional processor and command, but in more complex cases, usage of additional processor and command may be more appropriate. It is interesting to analyze sequence of commands execution and then play with different combinations of commands queueing. All commands are enqueued almost immediately since methods Enqueue()
and EnqueueParallel()
of class Processor
call in their turn method Enqueue()
of internal class ProcessorQueue
. The latter enqueues commands, calls method Run()
, executing in a thread of a thread pool, and immediately returns. However, the command of the highest priority enqueued with the first call of Processor.Enqueue()
method, will be executed first (in the test this is command S-12
). The rest of the commands will be executed according their priorities and enqueueing sequence. Expected output of the test is provided at the end of file ProgramCommanderTest.cs as a comment.
The second test is MainApp
application. It works along with TcpSvc
and SignalRSvc
services. Its method Main()
creates instance of Machine
type. Constructor of class Machine
reads JSON configuration file MainApp.config.json. From Machines
array, it chooses machine with given "id": "Machine1"
. According to configuration, the constructor loads from directory $(OutputDir)/CommandLibs two external commands assemblies (DLLs) GeneralCommandsLib.dll and StreamHandlersCommandsLib.dll, and creates MainProcessor
. Finally Machine
constructor enqueues InitCommand
with MainProcessor
.
Note: Dynamically loaded assemblies may have references to some other assemblies, particularly, infrastructure ones. So it is recommended that all infrastructure assemblies should be referenced by main application.
Method Do()
of InitCommand
enqueues commands CreateTcpClientCommand
and CreateSignalRClientCommand
in parallel with MainProcessor
. Each of them creates appropriate communication client object to be connected with TcpSvc
and SignalRSvc
services. Methods Do()
of both commands call their respective asynchronous methods async void ConnectAsync()
and immediately return. This way, lengthy connection establishing procedure with possible numerous retries does not block MainProcessor
execution flow (actually because of such commands design it is not important whether enqueue them sequentially or in parallel). After connections to services were established, appropriate client objects are placed in State
.
TCP and SignalR connection objects being inherently different, from user perspective, operate in much the same way. They provide possibility for duplex remote method calls and enable data streaming from service to client. To emphasize this similarity, both TcpSvc
and SignalRSvc
services use the same data provider component DtoProviderLib
. It streams Dto
object defined in ModelLib
component. Both TCP and SignalR communications are configurable in files <Application name>.config.json. In case of TCP, constructor of TcpHelper
class reads appropriate section of configuration file in server and client. For SignalR constructor of infrastructure class HubClient
reads configuration section for client, and service which creates hub (in our case, this is SignalRSvc
) reads configuration section for server. SignalR configuration provides boolean parameter "isSecureProtocol"
which defines whether HTTP ("isSecureProtocol": false
) or HTTPS ("isSecureProtocol": true
) connection will be created.
Source code may be loaded to Visual Studio 2017 with PCF.sln solution and built. Output directory for MainApp
will be $(OutputDir) = $(SolutionDir)/_bin/netcoreapp<version>. Assemblies that should be loaded at runtime will be placed according to configuration provided in file MainApp.config.json to directory $(OutputDir)/CommandLibs. After successful build, you may run services TcpSvc.dll, SignalRSvc.dll and application MainApp.dll either from Visual Studio or using console from their respective output directories with standard command dotnet. According to their NLog configuration files <DLL name>.nlog.config, the applications will log to their respective consoles and also to files $(OutputDir)/Log/<DLL name>-<Date YYYY-MM-DD>.txt.
Demo for this article is at the same time collection of all required files that may be deployed and run in Linux. Such a collection is formed by running _publish.cmd files for the services and MainApp
. Those files contain the following command:
dotnet publish -o publish -c Release
This command results in publish directory containing all files required to run the applications (for unknown to me reason configuration file of MainApp.config.json is not automatically included and should be copied manually). Contents of these files for TcpSvc
, SignalRSvc
and MainApp
should be copied to a single demo folder. In addition, to the same folder the directory CommandLibs containing main application runtime loaded assemblies (release version) should be placed. Then the services and main application may be started from demo folder with ordinary dotnet command.
To run demo in Linux, the first appropriate version of .NET Core (currently this is version 2.2) should be installed in Linux environment. The installation procedure for Ubuntu 18.04 x64 is described here. I tested TcpSvc
, SignalRSvc
and MainApp
with Ubuntu 18.04 installed in Oracle VirtualBox using MobaXterm
application for deployment and running the software.
This article presents extensible Processor-Command Framework (PCF) for operations flow control. The framework provides flexible mechanism for commands queueing and execution while enforces clear separation between processor and commands. Processor part is stable while commands ensure flexibility implementing variety of operations. PCF also offers out-of-the-box infrastructure for configuration, logging, TCP and SignalR communication, etc. This compact and simple-to-use framework is applicable in the activity domains with operations flow and can be used in conjunction with the Actor model products and message brokers.