Introduction
A very common scenario in applications is to read a number of files (e.g., .doc and .pdf files) and insert them to database. This article will explain and show how to use TPL Dataflow to create a pipeline for this work.
TPL Dataflow in my opinion is a very useful library which makes producer consumer pattern very easy and helps get rid of most synchronization primitives.
As there are few samples on this topic, I have decided to put up this sample.
Using the Code
Few methods I have omitted out like BulkInsertResumes
which take IEnumerable<Resume>
as input and return BulkImportResumeResult
. BulkImportResumeResult
is a class having 2 Lists as properties for holding inserted resumes to SQL, and another holds all the resumes which failed while inserting.
Dataflow_BulkImportResumes
method takes IEnumerable<string>
as input, user can call this method by passing a folder or multiple folders where either .doc or .pdf files are located.
This method creates a pipeline using TPD dataflow. First block is a TransformMany
block which takes a string
(folder) as input and returns multiple file URLs as string
. First block is connected with 2 blocks, i.e., Pdf block and word block for reading pdf and word files. Since I wanted to use bulk insert to SQL, therefore pdf and word blocks are connected to batch block with batch size of 50. Batch block connects with insertosqlblock
which does inserting to SQL. insertosqlblock
is a Transform
block which returns ImportResult
as output. This block is connected to last block which performs indexing using lucene.Net.
public async Task Dataflow_BulkImportResumes(IEnumerable<string> folders)
{
var firstBlock = new TransformManyBlock<string, string>(input =>
{
var files = new List<string>();
files.AddRange(Directory.EnumerateFiles(input, "*.doc", SearchOption.AllDirectories));
files.AddRange(Directory.EnumerateFiles(input, "*.pdf", SearchOption.AllDirectories));
files.RemoveAll(x =>
(Path.GetFileName(x).StartsWith("~") &&
(x.EndsWith("doc", StringComparison.OrdinalIgnoreCase) ||
x.EndsWith("docx", StringComparison.OrdinalIgnoreCase))
));
outputOfTransformMany.AddRange(files.Distinct().ToList());
return files.Distinct().ToList();
});
var pdfBlock = new TransformBlock<string, Resume>(file =>
{
var resume = new Resume();
resume.Name = System.IO.Path.GetFileNameWithoutExtension(file);
resume.Url = System.IO.Path.GetFileName(file);
resume.DataInPlainText = PDFDocumentHelper.Read(file);
resume.DateUploaded = System.IO.File.GetLastWriteTime(file).Date;
return resume;
});
var wordBlock = new TransformBlock<string, Resume>(file =>
{
var resume = new Resume();
resume.Name = System.IO.Path.GetFileNameWithoutExtension(file);
resume.Url = System.IO.Path.GetFileName(file);
resume.DataInPlainText = WordDocumentHelper.Read(file);
resume.DateUploaded = System.IO.File.GetLastWriteTime(file).Date;
return resume;
}, new ExecutionDataflowBlockOptions
{ MaxDegreeOfParallelism = Environment.ProcessorCount });
var batchBlock = new BatchBlock<Resume>(50);
var insertToSqlBlock = new TransformBlock<IEnumerable<Resume>, BulkImportResumeResult>(x =>
{
return BulkInsertResumes(x);
});
var importResult = new List<BulkImportResumeResult>();
var lastBlock = new ActionBlock<BulkImportResumeResult>(x =>
{
luceneService.BuildIndex(x.ImportedResumes);
importResult.Add(x);
});
var options = new DataflowLinkOptions { PropagateCompletion = true };
firstBlock.LinkTo(pdfBlock, options, x => x.ToLower().EndsWith(".pdf"));
firstBlock.LinkTo(wordBlock, options,
x => x.ToLower().EndsWith(".doc") | x.ToLower().EndsWith(".docx"));
pdfBlock.LinkTo(batchBlock);
wordBlock.LinkTo(batchBlock);
batchBlock.LinkTo(insertToSqlBlock, options);
insertToSqlBlock.LinkTo(lastBlock, options);
foreach (var item in folders)
{
firstBlock.Post(item);
}
firstBlock.Complete();
await Task.WhenAll(pdfBlock.Completion, wordBlock.Completion)
.ContinueWith(x => batchBlock.Complete());
await lastBlock.Completion;
wordBlock.Completion.ContinueWith(t =>
{
logService.Info("word block faulted");
}, TaskContinuationOptions.OnlyOnFaulted);
pdfBlock.Completion.ContinueWith(t =>
{
logService.Info("pdf block faulted");
}, TaskContinuationOptions.OnlyOnFaulted);
}
Suggestions and improvements are most welcome.