Introduction
I was recently working on developing a new .NET (C#) based solution that generates a PDF report using data in a relational database and have it emailed. I was looking to implement a multi-threaded solution that would allow one thread to continue writing the files to disk and another to continue processing a queue of items to be emailed out.
Using the Code
The solution below uses classes within System.Threading.Tasks
library namespace and Cancellation framework along with the BlockingCollection<T>
class from the System.Collections.Concurrent
namespace.
BlockingCollection<T>
class is thread safe and implements a producer/consumer pattern collection which can be concurrently written to by many threads and provides an iterator that blocks if all current items have been iterated through and waits for an item to be added, until a time where the object is explicitly told there are no more items expected to be added to the collection. Also, it provides for a mechanism to cancel the iteration.
The following code implements a generic class called a BlockingCollectionProcess
which implements a specific action supplied to the class on each item in the collection.
public interface ICollectionActionProcessor { void PerformAction(object t); }
public class BlockingCollectionProcessor<U> : ICollectionActionProcessor
{
private BlockingCollection<U> bc;
private Action<U> ac;
private bool stat = false;
public BlockingCollectionProcessor(BlockingCollection<U> collection,Action<U> customAction)
{
this.bc = collection;
this.ac = customAction;
}
public void PerformAction(object t)
{
stat = true;
try
{
foreach (U o in bc.GetConsumingEnumerable((t as TaskFactory).CancellationToken))
{
ac.Invoke(o);
}
}
catch (OperationCanceledException opcex)
{
(t as TaskFactory).CancellationToken.ThrowIfCancellationRequested();
}
catch (Exception ex)
{
BatchLogger.WriteException(ex);
}
}
}
The interface can be implemented by any other processor which uses another methodology to process the collection and perform a specific action on that collection. My particular use case required me to implement a class that sends emails using SMTP. The abstract
class EmailSender
is inherited by SMTPEmailSender
which consumes an object which implements the IEmailInfo
interface to retrieve information about the recipients, the body and the attachment, which in my case are the PDF files.
public abstract class EmailSender
{
public abstract void SendEmail(IEmailInfo objectParam);
}
public class SMTPEmailSender : EmailSender
{
private string emailhost, emailaccount;
private int emailport;
public SMTPEmailSender(string emailHost, int emailPort, string emailAccount)
{
this.emailaccount = emailAccount;
this.emailhost = emailHost;
this.emailport = emailPort;
}
public override void SendEmail(IEmailInfo objectParam)
{
try
{
IEmailInfo info = objectParam as IEmailInfo;
SmtpClient client = new SmtpClient();
client.Host = emailhost;
client.Port = emailport;
MailAddress from = new MailAddress
(emailaccount, info.GetFromEmailId(), Encoding.UTF8);
MailMessage msg = new MailMessage();
msg.From = from;
msg.To.Add(info.GetToEmailIds());
msg.BodyEncoding = Encoding.UTF8;
msg.Body = info.GetEmailBody();
using (Attachment at = new Attachment(info.GetAttachmentFileName(),
info.GetAttachmentMimeType()))
{
at.ContentStream.Flush();
msg.Attachments.Add(at);
client.Send(msg);
at.ContentStream.Close();
}
}
catch (Exception ex)
{
return;
}
return;
}
}
public interface IEmailInfo
{
string GetToEmailIds();
string GetSubject();
string GetAttachmentFileName();
string GetAttachmentFileTypeExtension();
string GetAttachmentMimeType();
string GetFromEmailId();
string GetEmailBody();
}
public class CaseEmailRecord: IEmailInfo
{
private string emailid, subject,
attachementfilename, attachmentfiletypeextension, attachementmimetype;
private string fromemailid, emailbody;
public CaseEmailRecord(string emailID,string emailSubject,
string attachmentFileName,string attachmentfileTypeExtension,
string attachmentMimeType,string fromEmailID,string emailBody)
{
this.emailbody = emailBody;
this.emailid = emailID;
this.attachementfilename = attachmentFileName;
this.subject = emailSubject;
this.attachmentfiletypeextension = attachmentfileTypeExtension;
this.attachementmimetype = attachmentMimeType;
this.fromemailid = fromEmailID;
this.emailbody = emailBody;
}
public string GetToEmailIds()
{
return this.emailid;
}
public string GetSubject()
{
return this.subject;
}
public string GetAttachmentFileName()
{
return this.attachementfilename;
}
public string GetAttachmentFileTypeExtension()
{
return this.attachmentfiletypeextension;
}
public string GetAttachmentMimeType()
{
return this.attachementmimetype;
}
public string GetFromEmailId()
{
return this.fromemailid;
}
public string GetEmailBody()
{
return this.emailbody;
}
}
The following code snippet was used to send the emails out by adding items to the BlockingCollection
on a main thread. The ProcessAction
method of the BlockingCollectionProcessor
is called using a Task
thereby executing it on a separate thread. Also, we create a Cancellation token using the Cancellation framework provided by the .NET framework using the CancellationTokenSource
class which allows the main thread to signal to the task and/or in our case the BlockingCollection
which monitors the CancellationToken
as we use the overloaded method of BlockingCollection<T>.GetConsumingEnumerable(CancellationToken cts)
which accepts the cancellation Token and monitors if a cancellation has been requested. Upon receiving a cancellation request, the BlockingCollection
throws an OperationCanceledException
which we catch. Finally, we signal to the BlockingCollection
that we no longer have items to be added by calling the BlockingCollection<T>.CompleteAdding()
method, thereby signalling to the BlockingCollection
to complete the foreach
loop and thereby completing the Task
. Also, in the main Thread
, we use the static
method Task.WaitAll
which accepts an array of Task
objects which the thread should wait for to finish before the main thread finishes. You will have to catch the OperationCancelledException
if you choose not to catch it within the BlockingCollectionProcessor
. Also for brevity, the code does not provide detailed explanation of Task
class which provides multiple mechanisms to execute code on a separate thread.
class Program
{
static void Main(string[] args)
{
BlockingCollection<CaseEmailRecord>
bc = new BlockingCollection<caseemailrecord>();
CaseEmailRecord rc1 = new CaseEmailRecord
("aa@aa.com,bb@bb.com", Summary Report",
@"C:\\Temp\LCTest\000QT_146024.pdf", ".pdf",
"application/pdf", "aa@aa.com", Case Email");
bc.Add(rc1);
ICollectionActionProcessor bcproc =
new BlockingCollectionProcessor<caseemailrecord>
(bc, new SMTPEmailSender("<mail server>",
25, "<email account>").SendEmail);
CancellationTokenSource cts = new CancellationTokenSource();
cts.Token.Register(() =>
{ Console.WriteLine
("Cancellation of Email Processing Requested"); });
TaskFactory factory = new TaskFactory(cts.Token);
Task tsk1 = factory.StartNew(bcproc.PerformAction, factory);
Thread.Sleep(5000);
CaseEmailRecord rc2 = new CaseEmailRecord("aa@aa.com,bb@bb.com",
Summary Report", @"C:\\Temp\LCTest\000QT_146024.pdf",
".pdf", "application/pdf",
"aa@aa.com", Case Email");
Thread.Sleep(5000);
Console.WriteLine("Running main test process on Thread " +
Thread.CurrentThread.ManagedThreadId);
CaseEmailRecord rc3 = new CaseEmailRecord("aa@aa.com,bb@bb.com",
Summary Report", @"C:\\Temp\LCTest\000QT_146024.pdf",
".pdf", "application/pdf",
"aa@aa.com", Case Email");
Console.WriteLine("Finished adding to blocking collection");
bc.CompleteAdding();
try
{
Task.WaitAll(new Task[] { tsk1 }, cts.Token);
}
catch (OperationCanceledException opcex)
{
Assert.Equals(1, 2);
}
}
}
Points of Interest
Multi-threaded programming can be a confusing and difficult subject, but .NET Framework provides multiple classes which make the task simple. The framework also provides for many mechanisms to asynchronously execute code.