Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Processing Collection in C# .NET using Multiple Threads

0.00/5 (No votes)
14 Nov 2014 1  
Processing Collection in C# .NET using multiple threads

Introduction

I was recently working on developing a new .NET (C#) based solution that generates a PDF report using data in a relational database and have it emailed. I was looking to implement a multi-threaded solution that would allow one thread to continue writing the files to disk and another to continue processing a queue of items to be emailed out.

Using the Code

The solution below uses classes within System.Threading.Tasks library namespace and Cancellation framework along with the BlockingCollection<T> class from the System.Collections.Concurrent namespace.

BlockingCollection<T> class is thread safe and implements a producer/consumer pattern collection which can be concurrently written to by many threads and provides an iterator that blocks if all current items have been iterated through and waits for an item to be added, until a time where the object is explicitly told there are no more items expected to be added to the collection. Also, it provides for a mechanism to cancel the iteration.

The following code implements a generic class called a BlockingCollectionProcess which implements a specific action supplied to the class on each item in the collection.

public interface ICollectionActionProcessor { void PerformAction(object t); }

public class BlockingCollectionProcessor<U> : ICollectionActionProcessor
    {
        private BlockingCollection<U> bc;
        private Action<U> ac;
        private bool stat = false;
        public BlockingCollectionProcessor(BlockingCollection<U> collection,Action<U> customAction)
        {
            this.bc = collection;
            this.ac = customAction;           
        }       
    
        /// <summary>
        /// This method expects an object parameter of type TaskFactory from which the methods
        /// retrieves the Cancellation Token. The parameter is of type object to facilitate
        /// the usage of Tasks to call this method.
        /// </summary>
        /// <param name="t"></param>
        public void PerformAction(object t)
        {
            //throw new NotImplementedException();
            stat = true;
            try
            {
                foreach (U o in bc.GetConsumingEnumerable((t as TaskFactory).CancellationToken))
                {
                    //sd.SendEmail(o);
                    ac.Invoke(o);                   
                }
            }
            catch (OperationCanceledException opcex)
            {
              //Handle, Log or Throw an exception after cancellation
          (t as TaskFactory).CancellationToken.ThrowIfCancellationRequested();
               
            }          
            catch (Exception ex)
            {
                BatchLogger.WriteException(ex);               
            }
        }       
    }

The interface can be implemented by any other processor which uses another methodology to process the collection and perform a specific action on that collection. My particular use case required me to implement a class that sends emails using SMTP. The abstract class EmailSender is inherited by SMTPEmailSender which consumes an object which implements the IEmailInfo interface to retrieve information about the recipients, the body and the attachment, which in my case are the PDF files.

public abstract class EmailSender 
    {
        public abstract void SendEmail(IEmailInfo objectParam);        
    }

    public class SMTPEmailSender : EmailSender
    {
        private string emailhost, emailaccount;
        private int emailport;
        public SMTPEmailSender(string emailHost, int emailPort, string emailAccount)
        {
            this.emailaccount = emailAccount;
            this.emailhost = emailHost;
            this.emailport = emailPort;
        }
        
        public override void SendEmail(IEmailInfo objectParam)
        {
            //throw new NotImplementedException();
            
            try
            {
                IEmailInfo info = objectParam as IEmailInfo;
                SmtpClient client = new SmtpClient();
                client.Host = emailhost;
                client.Port = emailport;

                MailAddress from = new MailAddress
                     (emailaccount, info.GetFromEmailId(), Encoding.UTF8);
                MailMessage msg = new MailMessage();
                msg.From = from;
                msg.To.Add(info.GetToEmailIds());
                msg.BodyEncoding = Encoding.UTF8;
                msg.Body = info.GetEmailBody();

                using (Attachment at = new Attachment(info.GetAttachmentFileName(), 
                                       info.GetAttachmentMimeType()))
                {
                    at.ContentStream.Flush();
                    msg.Attachments.Add(at);

                    client.Send(msg);
                    at.ContentStream.Close();
                }
            }
            catch (Exception ex)
            {
               return;
            }

            return;
        }
    }
   public interface IEmailInfo
    {
       string GetToEmailIds();
       string GetSubject();
       string GetAttachmentFileName();
       string GetAttachmentFileTypeExtension();
       string GetAttachmentMimeType();
       string GetFromEmailId();
       string GetEmailBody();                
    }
public class CaseEmailRecord: IEmailInfo
    {
        private string emailid, subject, 
        attachementfilename, attachmentfiletypeextension, attachementmimetype;
        private string fromemailid, emailbody;

        public CaseEmailRecord(string emailID,string emailSubject,
        string attachmentFileName,string attachmentfileTypeExtension,
        string attachmentMimeType,string fromEmailID,string emailBody)
        {
            this.emailbody = emailBody;
            this.emailid = emailID;
            this.attachementfilename = attachmentFileName;
            this.subject = emailSubject;
            this.attachmentfiletypeextension = attachmentfileTypeExtension;
            this.attachementmimetype = attachmentMimeType;
            this.fromemailid = fromEmailID;
            this.emailbody = emailBody;
        }

        public string GetToEmailIds()
        {            
            return this.emailid;
        }

        public string GetSubject()
        {            
            return this.subject;
        }

        public string GetAttachmentFileName()
        {            
            return this.attachementfilename;
        }

        public string GetAttachmentFileTypeExtension()
        {
            
            return this.attachmentfiletypeextension;
        }

        public string GetAttachmentMimeType()
        {            
            return this.attachementmimetype;
        }

        public string GetFromEmailId()
        {            
            return this.fromemailid;
        }

        public string GetEmailBody()
        {            
            return this.emailbody;
        }
    }

The following code snippet was used to send the emails out by adding items to the BlockingCollection on a main thread. The ProcessAction method of the BlockingCollectionProcessor is called using a Task thereby executing it on a separate thread. Also, we create a Cancellation token using the Cancellation framework provided by the .NET framework using the CancellationTokenSource class which allows the main thread to signal to the task and/or in our case the BlockingCollection which monitors the CancellationToken as we use the overloaded method of BlockingCollection<T>.GetConsumingEnumerable(CancellationToken cts) which accepts the cancellation Token and monitors if a cancellation has been requested. Upon receiving a cancellation request, the BlockingCollection throws an OperationCanceledException which we catch. Finally, we signal to the BlockingCollection that we no longer have items to be added by calling the BlockingCollection<T>.CompleteAdding() method, thereby signalling to the BlockingCollection to complete the foreach loop and thereby completing the Task. Also, in the main Thread, we use the static method Task.WaitAll which accepts an array of Task objects which the thread should wait for to finish before the main thread finishes. You will have to catch the OperationCancelledException if you choose not to catch it within the BlockingCollectionProcessor. Also for brevity, the code does not provide detailed explanation of Task class which provides multiple mechanisms to execute code on a separate thread.

class Program
    {       
        static void Main(string[] args)
        {
            BlockingCollection<CaseEmailRecord> 
            bc = new BlockingCollection<caseemailrecord>();

            CaseEmailRecord rc1 = new CaseEmailRecord
            ("aa@aa.com,bb@bb.com", Summary Report", 
            @"C:\\Temp\LCTest\000QT_146024.pdf", ".pdf", 
            "application/pdf", "aa@aa.com", Case Email");

            bc.Add(rc1);

            ICollectionActionProcessor bcproc = 
            new BlockingCollectionProcessor<caseemailrecord>
            (bc, new SMTPEmailSender("<mail server>", 
            25, "<email account>").SendEmail);

            CancellationTokenSource cts = new CancellationTokenSource();
            cts.Token.Register(() => 
            { Console.WriteLine
            ("Cancellation of Email Processing Requested"); });
            TaskFactory factory = new TaskFactory(cts.Token);
            Task tsk1 = factory.StartNew(bcproc.PerformAction, factory);

            Thread.Sleep(5000);

            CaseEmailRecord rc2 = new CaseEmailRecord("aa@aa.com,bb@bb.com", 
            Summary Report", @"C:\\Temp\LCTest\000QT_146024.pdf", 
            ".pdf", "application/pdf", 
            "aa@aa.com", Case Email");

            Thread.Sleep(5000);
            Console.WriteLine("Running main test process on Thread " + 
            Thread.CurrentThread.ManagedThreadId);
            /// cts.Cancel(); Uncomment this line to 
            /// test the cancellation of the iteration by the BlockingCollection

            CaseEmailRecord rc3 = new CaseEmailRecord("aa@aa.com,bb@bb.com", 
            Summary Report", @"C:\\Temp\LCTest\000QT_146024.pdf", 
            ".pdf", "application/pdf", 
            "aa@aa.com", Case Email");

            Console.WriteLine("Finished adding to blocking collection");
            bc.CompleteAdding(); // Method call indicates to that blocking collection 
            // that it should no longer wait for any more items to be added 
            // to the Collection and should complete 
 
            try
            {
                Task.WaitAll(new Task[] { tsk1 }, cts.Token);
            }
            catch (OperationCanceledException opcex)
            {
                Assert.Equals(1, 2);
            }
        }
     }

Points of Interest

Multi-threaded programming can be a confusing and difficult subject, but .NET Framework provides multiple classes which make the task simple. The framework also provides for many mechanisms to asynchronously execute code.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here