Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Using Pipeline from Parallel Extension Extras

0.00/5 (No votes)
2 Oct 2013 2  
The combined use of Pipeline and NullObject patterns greatly simplifies data processing

Introduction

This is a second article in the series on Parallel Extensions Extras.

Part 1. ObjectPool<T>

Background 

Quite often a need arises, to process large streams of data without putting to much pressure on the system where the process is running. The Pipeline extension available in Parallel Extensions Extras, greatly simplifies the process of data processing in multiple steps on multiple streams.

Using the code 

For testing purposes, I have created a table in my database with one million random records.

CREATE TABLE [dbo].[Invoices](
	[Id] [bigint] IDENTITY(1,1) NOT NULL,
	[Number] [nvarchar](100) NOT NULL,
	[DueDate] [date] NOT NULL,
	[IssuedDate] [date] NOT NULL,
	[Amount] [money] NOT NULL
) ON [PRIMARY] 

It ended up being just shy of a 100MB in storage requirements, no indexing have been applied. 

As always, lets start with the sample code: 

//
namespace ConsoleApplication1
{
    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Data;
    using System.Data.SqlClient;
    using System.Diagnostics;
    using System.Linq;
    using System.Threading;

    internal class Program
    {
        #region Static Fields

        private static readonly ObjectPool<SqlConnection> ObjectPool = 
          new ObjectPool<SqlConnection>(() => 
          new SqlConnection(SqlConnectionStringBuilder.ConnectionString));

        private static readonly SqlConnectionStringBuilder 
          SqlConnectionStringBuilder = new SqlConnectionStringBuilder
                        {
                            DataSource = "server",
                            UserID = "user",
                            Password = "user",
                            IntegratedSecurity = false,
                            InitialCatalog = "database"
                        };

        #endregion

        #region Methods

        private static IEnumerable<IDataRecord> GetRows(string sql)
        {
            SqlConnection connection = ObjectPool.GetObject();
            using (var command = new SqlCommand(sql, connection))
            {
                if (connection.State == ConnectionState.Closed)
                {
                    connection.Open();
                }
                using (SqlDataReader reader = command.ExecuteReader())
                {
                    foreach (IDataRecord record in reader)
                    {
                        yield return record;
                    }
                }
            }
            ObjectPool.PutObject(connection);
        }

        private static void Main(string[] args)
        {
            var sw = new Stopwatch();
            Action timeStamp = () => Console.WriteLine(
              "{0}ms elapsed", sw.ElapsedMilliseconds);
            sw.Start();
            IEnumerable<Invoice> generator = 
              GetRows("SELECT * From Invoices").Select(
                r => new Invoice
                     {
                         Id = Convert.ToInt64(r["Id"]),
                         Number = Convert.ToString(r["Number"]),
                         DueDate = Convert.ToDateTime(r["DueDate"]),
                         IssuedOn = Convert.ToDateTime(r["IssuedDate"]),
                         Amount = Convert.ToDecimal(r["Amount"])
                     });
            DateTime startDate = DateTime.Today.AddDays(-60);
            const int DegreeOfParallelism = 1;
            Pipeline<Invoice, Invoice> pipeline =
                Pipeline.Create<Invoice, Invoice>(invoice => 
                    invoice.IssuedOn >= startDate ? invoice : Invoice.Null, DegreeOfParallelism)
                    .Next(invoice => invoice.IsPastDue ? invoice : Invoice.Null, DegreeOfParallelism);
            decimal total = pipeline.Process(generator).AsParallel().Where(
              w => !w.Equals(Invoice.Null)).Sum(s => s.Amount);
            Console.WriteLine("Total for all outstanding invoices is {0:N2}", total);
            timeStamp.Invoke();
        }

        #endregion

        internal class Invoice
        {
            #region Static Fields

            public static readonly Invoice Null = new NullInvoice();

            #endregion

            #region Public Properties

            public decimal Amount { get; set; }
            public DateTime DueDate { get; set; }
            public long Id { get; set; }

            public virtual bool IsPastDue
            {
                get { return this.DueDate < DateTime.Today; }
            }

            public DateTime IssuedOn { get; set; }
            public string Number { get; set; }

            #endregion

            private class NullInvoice : Invoice
            {
                #region Public Properties

                public override bool IsPastDue
                {
                    get { return false; }
                }

                #endregion
            }
        }
    }
}
//

So in this example, we are looking for all invoices, which have been issued less than 60 days ago, and are past due. The past due decision is simply available by comparing the Due Date, with today's date. I am also using a NullObject pattern, by exposing an instance of a private class, thus ensuring that only one such instance will be created for the entire life of the application. This allows me to avoid checking for null objects, which the creator of the null type calls "a Billion Dollar mistake". 

The Pipeline class allows me to specify various degrees of parallelism for each step, in this example I have decided to use the same number of parallel threads for each of them. 

The big advantage of the Pipeline class is that it too follows the Producer / Consumer pattern, which means that at no point in time, all 1M records were retrieved from the database into memory. The generator was simply iterating sequentially through all the records in the database, which were passed to the first step of the pipeline as soon as a thread became available, then to the next step, and finally to the parallel summation call. So memory usage stayed flat, while CPU were cranking the data.

On an 8 core workstation, with degree of parallelism set at 1, which actually created two concurrent pipeline threads, one for each of the steps, the entire program took 4819 milliseconds to complete. With four threads each, it took 4905 ms. The shortest execution time was 4701 ms with two threads per step. Also, replacing the parallel summation with sequential one, reduced the run-time by another 10%. This demonstrates the importance of considering all factors for parallel processing. Sometime, more is actually less.  

History

  • 10/2/13 Initial version 

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here