A Generic Thread Manager
This is a simple base to make a user thread manager. With a thread manager, you can check the activities of your jobs. Using this generic pattern, you can:
- know the status of task/thread, startup/shutdown time, last time, and in case of error a last error description
- start/stop the thread/task
- load configuration from a database table to config file
- save the status in a database table or config file
- call a user synchronous/asynchronous method
- have a user recovery phase
- have a single windows service to check all your tasks/threads or a multiple windows service for a distributed system
You can use these classes for making your windows service and save the status of thread in a database/config file and log the main activities.
What is my Task?
My task is called StaticTask
and it's a mono-thread always on. When you start the task, the thread manager throws a thread of this family. When you stop the task, the task manager closes the Task's job. When the task is on, in a loop, it's called a method of a user class every sleep time.
The Main Classes are StaticTask and TaskManager
TaskManager
TaskManager
connects the StaticTask
with tasks vector. This is an array of StaticTask
classes. To fill this, you should use:
SetNewTask()
: to load a new user StaticTask
class. SetTasks()
: to finalize the tasks vector. When all tasks are loaded, the SetTasks()
must be called to finalize the tasks vector. This class calls the virtual method OnChangeTasksStatus()
to warn that the status has changed. The status of the tasks are INITED
. Start()
method: To start all tasks, call in the loop the AsyncRun
method of the StaticTasks
objects loaded in the vector.
After Start()
, the status is going to STARTED
.
At the end of the loop, the Start()
method calls a VerififyTaskStatus
.
This is a internal thread to verify the Tasks status:
VerififyTaskStatus
every DELTA_CHANGETIME
calls IsStatusChange
to check if the status of one or more tasks are changed.
If the status is changed, the method calls the virtual OnChangeTasksStatus
else OnChangeConfiguration
.
The user can use OnChangeTasksStatus
to save the Tasks status on a DB table or in a config file.
ModifiedStatusTasks
method tells you which tasks are changed.
The user can use OnChangeConfiguration
to check if the status of configuration is changed.
The user can, for example, change the status to TO_START
/TO_STOP
, and the method OnChangeConfiguration
can use start()
/stop()
task method Stop()
method to stop all tasks.
It calls in the loop the method calls the Stop()
method of the task. Later, the tasks are going to STOPPED
.
Other methods:
SetTask
to change a task: it's used, for example, to re-init the task GetTask
to get the task object GetTasks
to get all task objects
StaticTask
The StaticTask
is a class to check the execution of a loop. In this loop, there is the virtual method called "Execute()
". Also, in the loop, there is a Sleep
therefore the Execute
() method is called every Sleep
(wait) time. The StaticTask
has a property called status (TaskStatus Enum
) to enumerate the Tasks
status. The construction method takes these arguments:
- Wait
TaskConfig
class: it has a dictionary (key, value) to configure the user task - Log function
delegate(string desc,string category)
The constructor sets the status to INITED
. The Main
method is "Run()
". This calls the virtual methods:
BeginTask
: before the Status is STARTING-later STARTED. For example, it's used for connecting to a database. Execute:
in the loop and every Sleep
(Wait
). It's used to check the loop's speed. EndTask
: before the status is STOPPING. For example, it's used for disconnecting from a database. OnError
when Execute()
throws an exception. For example, it's used for disconnecting from a database and/or logging.
The method Run()
can't throw an exception because there is a external try
-catch
. When the error occurs: Run()
changes the status to ERROR
and fills a public string errordesc
. So there aren't unhandled exceptions. When you call Stop()
, the loop is going to be concluded. Also this method's called a callback function to set the status to STOPPED
or ERROR
.
- When the
StaticTask
passes to STARTED
sets StartTime
- When the
StaticTask
passes to STOPPED
/ERROR
sets EndTime
- Each time the
StaticTask
calls Execute()
with success sets LastTime
Using the Code: My Example
My example is a Windows Form Application. In this project, there are two TaskManager
s:
NotifierManager
: It contains a events generator called NotifierTask
. It generates an event and saves it in a queue. The method Execute()
performs synchronous call each time the method is called from task. ProcessEventManager
: It contains two events processors called ProcessEventTask
. It processes the events got in the queue. The method Execute()
performs asynchronous call each time the method is called from task.
The tasks use a queue to communicate. This is created by an instance of EventQueue
class.
NotifierTask Inherits StaticTask
The constructor sets wait, config, log. The Execute()
implements the events notifier. Each time the method is called, it creates a new event with GenericEvent
class.
public override void Execute(bool sync, bool recovery)
{
if(event_id==0)
event_id=1;
else
event_id = 0;
GenericEvent ge = new GenericEvent(event_id);
((NotifierConfig)config).event_queue.AddEvent(ge);
log(base.config.name + " Execute event ID = " + event_id, "LOG");
base.Execute(sync, recovery);
}
Then this new event is added in an EventQueue
. The instance is stored in the config
object.
This class has two methods, Add
and GetEvent
. These methods are protected by lock
variable call lock_event
. So, this is thread safe.
ArrayList event_list;
static object lock_event = new object();
public int AddEvent(GenericEvent ev)
{
lock (lock_event)
{
return event_list.Add(ev);
}
}
public int CountEvent()
{
return event_list.Count;
}
public GenericEvent GetEvent()
{
lock (lock_event)
{
if (event_list.Count > 0)
{
GenericEvent tmp = (GenericEvent)event_list[0];
event_list.RemoveAt(0);
return tmp;
}
}
return null;
}
If your job runs in a distributed system, it is better to use a database table and define Add
/GetEvent
procedures to access the table. When the record used is locked and it's skipped, another process takes a new message. Also you can use a system queue like a msmq.
ProcessEventTask Inherits StaticTask
This is more complex than NotifierTask
because the Execute()
method uses asynchronous calls to perform the job.
In the Execute()
method:
- there is a first loop to remove the ended asynchronous calls
- if the number of asynchronous calls are minor than maximum, it throws a new asynchronous call
- get next event (the method
GetEvent
is thread safe) - with a new event, the method
Execute()
can create a new event processor
calling CreateProcessEvent()
public override void Execute(bool sync, bool recovery)
{
for (int i = CountAsyncCall()-1; i >=0 ; i--)
{
BaseExecutor ex = (BaseExecutor)_async_calls[i];
if (ex.ar.IsCompleted)
{
_async_calls.Remove(ex);
if (!ex.stato)
{
throw new Exception(ex.error);
}
}
}
if (CountAsyncCall() < max_executor_num)
{
GenericEvent corrent_event = event_queue.GetEvent();
if (corrent_event == null)
return;
BaseProcessEvent ex = CreateProcessEvent(corrent_event);
_async_calls.Add(ex.AsyncExecute());
}
base.Execute(sync, recovery);
}
BaseProcessEvent: Generic Events Processor
The BaseProcessEvent
is an events processor. The Main
method is a Execute()
. This method calls in a loop all actions (see class BaseAction
method Process()
) loaded before with Add()
method. There is a sync call Execute()
and the async call AsyncExecute
. In this case, the variable ar(AsyncResult)
is filled.
public abstract class BaseAction
{
public abstract void Execute(string id, BaseAction ac_old);
}
In my Events Processor, there are three ProcessEvents (ProcessEvent1, ProcessEvent2, ProcessEvent3
). The method CreateProcessEvent
decides which ProcessEvent
to create, it depends on the event type.
public override BaseProcessEvent CreateProcessEvent(GenericEvent _generic_event)
{
switch(_generic_event.Id)
{
case "0": return new ProcessEvent1(_generic_event.Id, _orchestratorLog);
case "1": return new ProcessEvent2(_generic_event.Id, _orchestratorLog);
case "3": return new ProcessEvent3(_generic_event.Id, _orchestratorLog);
}
return null;
}
ProcessEvent1
inherits BaseProcessEvent
: its actions are:
CreateAction
ProcessAction
LastAction
ProcessEvent2
inherits BaseProcessEvent
: its actions are:
CreateAction
housekeeping
LastAction
ProcessEvent3
inherits BaseProcessEvent
: its actions are:
CreateAction
UpdateAction
LastAction
In my sample, all actions make logs and sleep.
override public void Process(string Exeid,BaseAction ac_old)
{
log(Exeid + " " + id + " ProcessAction begin", "LOG");
Thread.Sleep(1000);
log(Exeid + " " + id + " ProcessAction end", "LOG");
return;
}
Points of Interest
The asynchronous calls are created with BeginInvoke()
function. The BeginInvoke()
uses the Thread Pool mechanism (for details, you can see the MSDN documentation).
...
AsyncExecute asyncRunDelegate = new AsyncExecute(Run);
async_call= asyncRunDelegate.BeginInvoke(false,CallbackMethod, null);
...
...
void CallbackMethod(IAsyncResult ar)
{
AsyncResult result = (AsyncResult) ar;
AsyncExecute caller = (AsyncExecute) result.AsyncDelegate ;
string formatString = (string)ar.AsyncState ;
caller.EndInvoke(ar);
}
...
History
- 23rd August, 2011: Initial version