Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

Batch Processing Patterns with Taskling

5.00/5 (3 votes)
13 Nov 2016MIT10 min read 34.1K   1  
In this article we'll look at common patterns in batch processing and how the Taskling libraries provide a simple and reliable way of using those patterns in your C# batch jobs.

Introduction

There are some common patterns when building batch processes

  • Instance concurrency control - for example, singleton processes
  • Partitioning data into smaller batches
  • Recovering from failures by continuing processing from where the failed instance left off
  • Maintaining/logging the state of instances and the state of the data that gets processed
  • Alerts when things go wrong

Taskling is a set of two libraries that provide these patterns with SQL Server as a backing store. It is host agnostic and can be used in web applications, Azure jobs, console applications etc.

We'll look at each pattern with some example code with Taskling.

Background

In order to describe how you can use the patterns described in this article with Taskling we'll need to start-off by talking about configuration and how to instantiate the ITasklingClient.

Much of the behaviour is controlled by configuration and with Taskling you must create a class that implements the IConfigurationReader which simply returns a string with a series of key value pairs in the format KEY[value]. We'll discuss each setting in the patterns below.

C#
public class MyConfigReader : IConfigurationReader
{
    public string GetTaskConfigurationString(string applicationName, string taskName)
    {
        var key = applicationName + "::" + taskName;
        return ConfigurationManager.AppSettings.Get(key);
    }
}

All tasks are uniquely identified by an application name and a task name. In the above example it gets the configuration string from the application config.

XML
<appSettings>
  <add key="MyApplication::MyTask" value="DB[Server=(local);Database=MyAppDb;Trusted_Connection=True;] TO[120] E[true] CON[-1] KPLT[2] KPDT[40] MCI[1] KA[true] KAINT[1] KADT[10] TPDT[0] RPC_FAIL[true] RPC_FAIL_MTS[600] RPC_FAIL_RTYL[3] RPC_DEAD[true] RPC_DEAD_MTS[600] RPC_DEAD_RTYL[3] MXBL[20]"/>
</appSettings>

We'll look at the settings not related to the patterns

  • DB is the connection string
  • TO is the command timeout in seconds
  • E is Enabled 1, Disabled 0

We'll go over the meaning of the rest of settings below.

We instantiate the ITasklingClient by passing your configuration reader implementation to its constructor.

ITasklingClient tasklingClient = new TasklingClient(new MyConfigReader());

Or via dependency injection (AutoFac example)

builder.Register<TasklingClient>(x => new TasklingClient(new MyConfigReader())).As<ITasklingClient>();

Using the code

In the main class where your batch processing lives we'll need to instantiate a new ITaskExecutionContext which will be responsible for doing all the state mangement, logging and creating child contexts for partitioning data into blocks.

C#
using (var executionContext = _tasklingClient.CreateTaskExecutionContext("MyApplication", "MyTask"))
{
    if(executionContext.TryStart())
    {
        // batch processing logic
    }
}

We're now ready to look at the patterns.

Pattern #1 - Instance Concurrency Limits such as Singletons

Some batch tasks need to be singletons. If you run your task every hour but it can take more than an hour to run then you could end up with two executions running.

As well as singletons it can be useful to limit the number of concurrent executions to prevent the overloading of other components. May be you have large amounts of data to process and so you run the task every minute and each execution takes ten minutes, you'll have ten concurrent executions. When a component in your architecture (web service, database etc) cannot handle the load of ten concurrent executions then you can put a limit of 5 for example. When components show signs of being overloaded you can simply reduce the concurrency limit in real-time and then increase it again later.

Taskling concurrency limits work across servers. The settings key CON sets the limit. -1 means no limit, anything above that will be the limit. So for a singleton set CON[1].

We instantiate the ITaskExecutionContext and call it's TryStart. TryStart() will return false if the limit has already been reached. Any further calls to that context will fail, so we wrap the call in an if statement.

C#
if(executionContext.TryStart()) 
{ 
    // batch processing logic 
}

How Taskling guarantees concurrency control is an interesting subject in itself. Taskling leverages row locking and blocking in SQL Server to create a single-threaded scenario in a multiple threaded and even multiple server environment.

Pattern #2 - Partitioning of Data into Smaller Batches

Taskling can partition data into four types of block

  • Range Blocks
    • Date Range Blocks
    • Numeric Range Blocks
  • List Blocks
  • Object Blocks

What is common to all is that we need isolation between blocks, that is to say, no data overlap. Taskling guarantees block isolation if

  • You wrap your block creation logic in a Critical Section
  • You don't pass duplicate data to Taskling

You can create sections of code that are guaranteed to be single-threaded, even across servers, by wrapping the code as follows:

C#
using (var cs = taskExecutionContext.CreateCriticalSection())
{
    if(cs.TryStart())
    {
        // code that needs to be single-threaded (even acorss servers)
    }
}

The Taskling critical section uses the same method of concurrency control as the main task concurrency control. It will wait for 20 seconds with 2 retries and if it still cannot get in then TryStart will return false; This wait time can be changed in one of the overloads.

Range Blocks with a Date Range Block Example

Range blocks store no data, just a date or numeric range. This range can then be used to retrieve data and process it.

Some processes continually process data between dates. Each time the batch process runs it might check up to which date has been processed and then use that date as a From Date and the current time as a To Date.

  • With Taskling you can get the end date of the last block
  • Uses a critical section to guarantee block isolation for tasks where the concurrency limit is higher than 1
C#
private void RunTask(ITaskExecutionContext taskExecutionContext)
{
    try
    {
        var dateRangeBlocks = GetDateRangeBlocks(taskExecutionContext);
        foreach (var block in dateRangeBlocks)
            ProcessBlock(block);

        taskExecutionContext.Complete();
    }
    catch(Exception ex)
    {
        taskExecutionContext.Error(ex.ToString(), true);
    }
}
C#
// -- Critical Section --
// This method uses a critical section to protect the data identification phase of the task. If two tasks execute
// at the same time then the code inside the critical section can only be executed by one task at a time and
// there is no chance for identifying the same data in both tasks
private IList<IDateRangeBlockContext> GetDateRangeBlocks(ITaskExecutionContext taskExecutionContext)
{
    using (var cs = taskExecutionContext.CreateCriticalSection())
    {
        if(cs.TryStart())
        {
            var startDate = GetDateRangeStartDate(taskExecutionContext);
            var endDate = DateTime.UtcNow;

            return taskExecutionContext.GetDateRangeBlocks(x => x.WithRange(startDate, endDate, TimeSpan.FromMinutes(30)));
        }
        throw new Exception("Could not acquire a critical section, aborted task");
    }
}

// -- Last Block --
// The previous block is used to identify the datetime of the start of the next range of data
// The first time this task is ever run there is no previous block, so we use a configured timespan from the
// current datetime instead
private DateTime GetDateRangeStartDate(ITaskExecutionContext taskExecutionContext)
{
    var lastBlock = taskExecutionContext.GetLastDateRangeBlock(LastBlockOrder.LastCreated);
    if (lastBlock == null)
        return DateTime.UtcNow.Add(_configuration.FirstRunTimeSpan);
    else
        return lastBlock.EndDate;
}

In the above example Taskling takes a date range and a TimeSpan for the maximum block size and returns a list of date range blocks (IDateRangeBlockContext). So if the date range covered 24 hours and we specified a maximum block size of TimeSpan.FromHours(1) we'd get 24 blocks.

Once we have the blocks we then process each one. In this example we are retrieving journeys between the dates of each block, calculating travel insights and persisting them.

  • Start() is called before processing begins
  • Complete() is called once processing is finished. You can optionally pass the number of records processed as an argument
  • The start and end dates of the block are used to retrieve the journeys from the database
  • All code is wrapped in a try catch block, in the catch Failed(string errorMessage) is called and the exception is not thrown again in order to allow subsequent blocks to execute
C#
private void ProcessBlock(IDateRangeBlockContext blockContext)
{
    try
    {
        blockContext.Start();

        var travelDataItems = _travelDataService.GetJourneys(blockContext.DateRangeBlock.StartDate, blockContext.DateRangeBlock.EndDate);
        var travelInsights = new List<TravelInsight>();
        // my insights processing logic
        //...
        //...
        _travelInsightsService.Add(travelInsights);

        int itemCountProcessed = travelInsights.Count;
        blockContext.Complete(itemCountProcessed);
    }
    catch(Exception ex)
    {
        blockContext.Failed(ex.ToString());
    }
}

Numeric blocks are basically the same. You pass Taskling a start number, an end number and a maximum block size.

int maxBlockSize = 500;
return taskExecutionContext.GetNumericRangeBlocks(x => x.WithRange(startNumber, endNumber, maxBlockSize));

So if we pass the number 1 and 1000 with a maximum block size of 100 then 10 INumericBlockContexts will be returned which you can then use to process the data between those ranges.

You can see example code of numeric blocks in the following link

https://github.com/Vanlightly/Taskling.NET/wiki/Task-with-Numeric-Range-Blocks-Example-Code

List Blocks

List blocks actually store the data in SQL Server as JSON. There are two list block contexts

  • IListBlockContext<TItem>
  • IListBlockContext<TItem,THeader>

TItem is a generic type that will be the type of the list items. THeader is the generic type that can store data related to the block.

Let's look at an example of generating IListBlockContext<TItem,THeader> blocks. In this example we retrive data by date range but store the data to be processed in list blocks.

// Code snippet: Unformatted as this coce snippet does not format correctly as a code block for some reason

public class BatchDatesHeader
{
    public DateTime FromDate { get; set; }
    public DateTime ToDate { get; set; }
}

public class Journey
{
    public long JourneyId { get; set; }
    public string DepartureStation { get; set; }
    public string ArrivalStation { get; set; }
    public DateTime TravelDate { get; set; }
    public string PassengerName { get; set; }
}

// end of code snippet

BatchDatesHeader will be our header class and Journey will be our list item class.

Note that ItemStatus.Pending, ItemStatus.Failed is related to recovery from failure and reprocessing previously failed blocks. We'll look at that in more detail in the next pattern.

In this example we retrieve all the journeys since the last time the job ran, partition them into list blocks and then process each list block. For each list block, we'll process individually each journey by extracting a travel insight and notifying the user of that insight.

C#
private void RunTask(ITaskExecutionContext taskExecutionContext)
{
    try
    {
        var listBlocks = GetListBlocks(taskExecutionContext);
        foreach (var block in listBlocks)
            ProcessBlock(block);

        taskExecutionContext.Complete();
    }
    catch (Exception ex)
    {
        taskExecutionContext.Error(ex.ToString(), true);
    }
}

private IList<IListBlockContext<Journey, BatchDatesHeader>> GetListBlocks(ITaskExecutionContext taskExecutionContext)
{
    using (var cs = taskExecutionContext.CreateCriticalSection())
    {
        if (cs.TryStart())
        {
            var startDate = GetDateRangeStartDate(taskExecutionContext);
            var endDate = DateTime.UtcNow;

            var journeys = _travelDataService.GetJourneys(startDate, endDate).ToList();
            var batchHeader = new BatchDatesHeader()
            {
                FromDate = startDate,
                ToDate = endDate
            };

            short blockSize = 500;
            return taskExecutionContext.GetListBlocks<Journey, BatchDatesHeader>(x => x.WithPeriodicCommit(journeys, batchHeader, blockSize, BatchSize.Fifty));
        }
        throw new Exception("Could not acquire a critical section, aborted task");
    }
}

// Header used to identify the date range to be processed
// This is just an example, there are many ways to identify the data to be processed
private DateTime GetDateRangeStartDate(ITaskExecutionContext taskExecutionContext)
{
    var lastBlock = taskExecutionContext.GetLastListBlock<Journey, BatchDatesHeader>();
    if (lastBlock == null)
        return DateTime.UtcNow.Add(_configuration.FirstRunTimeSpan);
    else
        return lastBlock.Header.ToDate;
}


private void ProcessBlock(IListBlockContext<Journey, BatchDatesHeader> blockContext)
{
    try
    {
        blockContext.Start();

        foreach (var journeyItem in blockContext.GetItems(ItemStatus.Pending, ItemStatus.Failed))
            ProcessJourney(journeyItem);

        blockContext.Complete();
    }
    catch (Exception ex)
    {
        blockContext.Failed(ex.ToString());
    }
}

// -- Error handling --
// All code is wrapped in a try catch block, in the catch Failed(string errorMessage) is called and the exception
// is not thrown again in order to allow subsequent list block items to execute
// -- Discard --
// Often a data item does not meet a business rule and should be ignored. Taskling allows you to mark items as  discarded
// Items either get processed successfully, they fail or they get discarded.
private void ProcessJourney(IListBlockItem<Journey> journeyItem)
{
    try
    {
        if (journeyItem.Value.DepartureStation.Equals(journeyItem.Value.ArrivalStation))
        {
            journeyItem.Discarded("Discarded due to distance rule");
        }
        else
        {
            var insight = ExtractInsight(journeyItem.Value);
            _notificationService.NotifyUser(insight);
            journeyItem.Completed();
        }
    }
    catch(Exception ex)
    {
        journeyItem.Failed(ex.ToString());
    }
}

private TravelInsight ExtractInsight(Journey jounery)
{
    TravelInsight insight = new TravelInsight();
    //

    return insight;
}

Some interesting things to note are

  • Each list item has it's own status.
  • For any data items that need to be ignored due to some business rule, you can mark them as discarded with a reason attached.
  • If your data is too big to store in blocks you can store data identifiers in the list blocks instead and then retrieve the data while processing each item.

Pattern #3 - Failure Recovery

Sometimes an application can fail, an Azure job die or an ASP.NET/WCF process get recycled. Or a bug can cause the batch process to fail and stop. Taskling provides a way of continuing the processing from where it left off.

When you ask the ITaskExecutionContext for blocks, be it date range, list or whatever, it will return the data you pass into it as blocks. You can configure taskling to also return previously failed blocks.

If you configure your ITasklingClient with

  • RPC_FAIL[true] RPC_FAIL_MTS[600] RPC_FAIL_RTYL[3] you are telling Taskling to look for failed blocks that were created in the last 600 minutes, and have not been retried more than 3 times.
  • RPC_DEAD[true] RPC_DEAD_MTS[600] RPC_DEAD_RTYL[3] you are telling Taskling to look for dead blocks that were created in the last 600 minutes, and have not been retried more than 3 times.

Then when you call GetListBlocks, it will return the new and the old blocks in one list. But what is a dead block you ask? A dead task/block is one that had a catastrophic failure such that it was unable to register its demise. For example, someone pulls the power cord from the server or IIS kills your thread with a ThreadAbortException. In this case the status is still "In Progress", or for the blocks that have been created but not started then their status will remain "Pending" even though it has died.

Taskling uses a keep alive (heartbeat) to register the fact that it is still alive. Once a configured time has passed since the last keep alive has passed and the status is still In progress or Pending then Taskling knows that it has really died.

To configure the keep alive we set KA[true] KAINT[1] KADT[10] which means use the keep alive, send one every 1 minute and treat it as dead after 10 minutes without any keep alive being received.

Let's use the last example to illustrate how this works. In the last example we create a bunch of list blocks that contain journeys.

When reprocessing of failed and dead blocks is enabled then the following call may return previously failed blocks.

C#
return taskExecutionContext.GetListBlocks<Journey, BatchDatesHeader>(x => x.WithPeriodicCommit(journeys, batchHeader, blockSize, BatchSize.Fifty));

If you don't want to send out that user notification again for the journeys that were processed successfully, then when you are iterating over the items we only ask for the Pending and Failed ones. When we process a block it could be new data or an old block that failed. If it is new then all items will be in ther Pending state anyway.

C#
foreach (var journeyItem in blockContext.GetItems(ItemStatus.Pending, ItemStatus.Failed)) 
    ProcessJourney(journeyItem);

Once a block has reached the retry limit or its creation date is more than the 600 minutes configured in this example then it will not be retried again.

Pattern #4 - Alerts

All data in Taskling is stored in seven tables that you can deploy to a central database server or deploy to each application database that uses Taskling. Because Taskling maintains all state information, blocks data and the configuration used for each task execution we can create SQL queries that can be used in real-time alerts.

Examples are

  • Failed/Dead blocks that reach their retry limit
  • Tasks that fail
  • Tasks that fail X times in Y minutes
  • Tasks that fail X times consecutively
  • Tasks have no run in the past X minutes

There is example SQL on the GitHub wiki pages

https://github.com/Vanlightly/Taskling.NET/wiki/Alerts-Database-Scripts

Conclusion

Taskling is a useful library for implementing common batch processing patterns. It is open source. You just need to include the Taskling and Taskling.SqlServer nuget packages and run the table creation script and you're ready.

For more reading material check out the GitHub wiki on the GitHub page

https://github.com/Vanlightly/Taskling.NET

Sample Code

The TasklingTester solution accompanies this article and contains the source code we've covered. It also includes

  • the SQL scripts necessary to generate the Taskling tables and the Journey and TravelInsight tables
  • a data generator script that simulates new data coming in
  • some useful queries to view the Taskling data that was generated

Read the readme file in the solution first for instructions. Basically all you need to do is run one script and then run the application and you can see it work.

License

This article, along with any associated source code and files, is licensed under The MIT License