Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

Base Class to Make a User Thread Manager Check User Jobs

4.00/5 (2 votes)
23 Aug 2011MIT6 min read 20.9K   253  
Pattern to make a user Thread Manager to check the Synchronous or Asynchronous call. This class can be specialized and used in a normal windows service and checks the jobs status.
WFA_StaticTaskSampleRid.gif

A Generic Thread Manager

This is a simple base to make a user thread manager. With a thread manager, you can check the activities of your jobs. Using this generic pattern, you can:

  1. know the status of task/thread, startup/shutdown time, last time, and in case of error a last error description
  2. start/stop the thread/task 
  3. load configuration from a database table to config file
  4. save the status in a database table or config file
  5. call a user synchronous/asynchronous method
  6. have a user recovery phase
  7. have a single windows service to check all your tasks/threads or a multiple windows service for a distributed system

You can use these classes for making your windows service and save the status of thread in a database/config file and log the main activities.

What is my Task?

My task is called StaticTask and it's a mono-thread always on. When you start the task, the thread manager throws a thread of this family. When you stop the task, the task manager closes the Task's job. When the task is on, in a loop, it's called a method of a user class every sleep time.

The Main Classes are StaticTask and TaskManager

TaskManager

TaskManager connects the StaticTask with tasks vector. This is an array of StaticTask classes. To fill this, you should use:

  • SetNewTask(): to load a new user StaticTask class.
  • SetTasks(): to finalize the tasks vector. When all tasks are loaded, the SetTasks() must be called to finalize the tasks vector. This class calls the virtual method OnChangeTasksStatus() to warn that the status has changed. The status of the tasks are INITED.
  • Start() method: To start all tasks, call in the loop the AsyncRun method of the StaticTasks objects loaded in the vector.
    After Start(), the status is going to STARTED.
    At the end of the loop, the Start() method calls a VerififyTaskStatus.
    This is a internal thread to verify the Tasks status:
    VerififyTaskStatus every DELTA_CHANGETIME calls IsStatusChange to check if the status of one or more tasks are changed.
    If the status is changed, the method calls the virtual OnChangeTasksStatus else OnChangeConfiguration.
    The user can use OnChangeTasksStatus to save the Tasks status on a DB table or in a config file.
    ModifiedStatusTasks method tells you which tasks are changed.
    The user can use OnChangeConfiguration to check if the status of configuration is changed.
    The user can, for example, change the status to TO_START/TO_STOP, and the method OnChangeConfiguration can use start()/stop() task method
  • Stop() method to stop all tasks.
    It calls in the loop the method calls the Stop() method of the task. Later, the tasks are going to STOPPED.

Other methods:

  • SetTask to change a task: it's used, for example, to re-init the task
  • GetTask to get the task object
  • GetTasks to get all task objects

StaticTask

The StaticTask is a class to check the execution of a loop. In this loop, there is the virtual method called "Execute()". Also, in the loop, there is a Sleep therefore the Execute() method is called every Sleep(wait) time. The StaticTask has a property called status (TaskStatus Enum) to enumerate the Tasks status. The construction method takes these arguments:

  • Wait
  • TaskConfig class: it has a dictionary (key, value) to configure the user task
  • Log function delegate(string desc,string category)

The constructor sets the status to INITED. The Main method is "Run()". This calls the virtual methods:

  • BeginTask: before the Status is STARTING-later STARTED. For example, it's used for connecting to a database.
  • Execute: in the loop and every Sleep(Wait). It's used to check the loop's speed.
  • EndTask: before the status is STOPPING. For example, it's used for disconnecting from a database.
  • OnError when Execute() throws an exception. For example, it's used for disconnecting from a database and/or logging.

The method Run() can't throw an exception because there is a external try-catch. When the error occurs: Run() changes the status to ERROR and fills a public string errordesc. So there aren't unhandled exceptions. When you call Stop(), the loop is going to be concluded. Also this method's called a callback function to set the status to STOPPED or ERROR.

  • When the StaticTask passes to STARTED sets StartTime
  • When the StaticTask passes to STOPPED/ERROR sets EndTime
  • Each time the StaticTask calls Execute() with success sets LastTime

Using the Code: My Example

My example is a Windows Form Application. In this project, there are two TaskManagers:

  • NotifierManager: It contains a events generator called NotifierTask. It generates an event and saves it in a queue. The method Execute() performs synchronous call each time the method is called from task.
  • ProcessEventManager: It contains two events processors called ProcessEventTask. It processes the events got in the queue. The method Execute() performs asynchronous call each time the method is called from task.

The tasks use a queue to communicate. This is created by an instance of EventQueue class.

NotifierTask Inherits StaticTask

The constructor sets wait, config, log. The Execute() implements the events notifier. Each time the method is called, it creates a new event with GenericEvent class.

C#
public override void  Execute(bool sync, bool recovery)
{
	/* very simple */
	if(event_id==0)
		event_id=1;
	else
		event_id = 0;

	GenericEvent ge = new GenericEvent(event_id);

	((NotifierConfig)config).event_queue.AddEvent(ge);

	log(base.config.name + " Execute event ID = " + event_id, "LOG");

	base.Execute(sync, recovery);
}  

Then this new event is added in an EventQueue. The instance is stored in the config object.
This class has two methods, Add and GetEvent. These methods are protected by lock variable call lock_event. So, this is thread safe.

C#
ArrayList event_list;

static object lock_event = new object();

public int AddEvent(GenericEvent ev)
{
	lock (lock_event)
	{
		return event_list.Add(ev);
	}
}

public int CountEvent()
{
	return event_list.Count;
}

public GenericEvent GetEvent()
{
	lock (lock_event)
	{
		if (event_list.Count > 0)
		{
			GenericEvent tmp = (GenericEvent)event_list[0];
			event_list.RemoveAt(0);
			return tmp;
		}
	}
	return null;
}

If your job runs in a distributed system, it is better to use a database table and define Add/GetEvent procedures to access the table. When the record used is locked and it's skipped, another process takes a new message. Also you can use a system queue like a msmq.

ProcessEventTask Inherits StaticTask

This is more complex than NotifierTask because the Execute() method uses asynchronous calls to perform the job.

In the Execute() method:

  1. there is a first loop to remove the ended asynchronous calls
  2. if the number of asynchronous calls are minor than maximum, it throws a new asynchronous call
  3. get next event (the method GetEvent is thread safe)
  4. with a new event, the method Execute() can create a new event processor
    calling CreateProcessEvent()
C#
public override void  Execute(bool sync, bool recovery)
	{
		for (int i = CountAsyncCall()-1; i >=0 ; i--)
		{
			BaseExecutor ex = (BaseExecutor)_async_calls[i];
			if (ex.ar.IsCompleted)
			{
				_async_calls.Remove(ex);
				if (!ex.stato)
				{
					throw new Exception(ex.error);
				}
			}
		}

		if (CountAsyncCall() < max_executor_num)
		{
			GenericEvent corrent_event = event_queue.GetEvent();

			if (corrent_event == null)
				return;

			BaseProcessEvent ex = CreateProcessEvent(corrent_event);
			_async_calls.Add(ex.AsyncExecute());

		}

		base.Execute(sync, recovery);
	}

BaseProcessEvent: Generic Events Processor

The BaseProcessEvent is an events processor. The Main method is a Execute(). This method calls in a loop all actions (see class BaseAction method Process()) loaded before with Add() method. There is a sync call Execute() and the async call AsyncExecute. In this case, the variable ar(AsyncResult) is filled.

C#
public abstract class BaseAction
{
    public abstract void Execute(string id, BaseAction ac_old);
}

In my Events Processor, there are three ProcessEvents (ProcessEvent1, ProcessEvent2, ProcessEvent3). The method CreateProcessEvent decides which ProcessEvent to create, it depends on the event type.

C#
public override BaseProcessEvent  CreateProcessEvent(GenericEvent _generic_event)
{
	switch(_generic_event.Id)
	{
		case "0": return new ProcessEvent1(_generic_event.Id, _orchestratorLog);
		case "1": return new ProcessEvent2(_generic_event.Id, _orchestratorLog);
		case "3": return new ProcessEvent3(_generic_event.Id, _orchestratorLog);
	}
	return null;
}

ProcessEvent1 inherits BaseProcessEvent: its actions are:

  • CreateAction
  • ProcessAction
  • LastAction

ProcessEvent2 inherits BaseProcessEvent: its actions are:

  • CreateAction
  • housekeeping
  • LastAction

ProcessEvent3 inherits BaseProcessEvent: its actions are:

  • CreateAction
  • UpdateAction
  • LastAction

In my sample, all actions make logs and sleep.

C#
override public void Process(string Exeid,BaseAction ac_old)
{
	log(Exeid + "  " + id + "   ProcessAction begin", "LOG");
	Thread.Sleep(1000);
	log(Exeid + "  " + id + "   ProcessAction end", "LOG");
	return;
}

Points of Interest

The asynchronous calls are created with BeginInvoke() function. The BeginInvoke() uses the Thread Pool mechanism (for details, you can see the MSDN documentation).

C#
...
   AsyncExecute asyncRunDelegate = new AsyncExecute(Run);
   async_call= asyncRunDelegate.BeginInvoke(false,CallbackMethod, null);
...
...

void CallbackMethod(IAsyncResult ar)
{
	AsyncResult result = (AsyncResult) ar;
	AsyncExecute caller = (AsyncExecute) result.AsyncDelegate ;

	string formatString = (string)ar.AsyncState ;

	caller.EndInvoke(ar);
}
...

History

  • 23rd August, 2011: Initial version

License

This article, along with any associated source code and files, is licensed under The MIT License