Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

How to Use TPL Dataflow for Reading Files and Inserting to Database

5.00/5 (4 votes)
8 Dec 2017CPOL1 min read 13.8K  
How to use TPL dataflow for reading files and inserting to database

Introduction

A very common scenario in applications is to read a number of files (e.g., .doc and .pdf files) and insert them to database. This article will explain and show how to use TPL Dataflow to create a pipeline for this work.

TPL Dataflow in my opinion is a very useful library which makes producer consumer pattern very easy and helps get rid of most synchronization primitives.

As there are few samples on this topic, I have decided to put up this sample.

Using the Code

Few methods I have omitted out like BulkInsertResumes which take IEnumerable<Resume> as input and return BulkImportResumeResult. BulkImportResumeResult is a class having 2 Lists as properties for holding inserted resumes to SQL, and another holds all the resumes which failed while inserting.

Dataflow_BulkImportResumes method takes IEnumerable<string> as input, user can call this method by passing a folder or multiple folders where either .doc or .pdf files are located.

This method creates a pipeline using TPD dataflow. First block is a TransformMany block which takes a string (folder) as input and returns multiple file URLs as string. First block is connected with 2 blocks, i.e., Pdf block and word block for reading pdf and word files. Since I wanted to use bulk insert to SQL, therefore pdf and word blocks are connected to batch block with batch size of 50. Batch block connects with insertosqlblock which does inserting to SQL. insertosqlblock is a Transform block which returns ImportResult as output. This block is connected to last block which performs indexing using lucene.Net.

C#
public async Task Dataflow_BulkImportResumes(IEnumerable<string> folders)
        {            
            var firstBlock = new TransformManyBlock<string, string>(input =>
            {
                var files = new List<string>();
                files.AddRange(Directory.EnumerateFiles(input, "*.doc", SearchOption.AllDirectories));
                files.AddRange(Directory.EnumerateFiles(input, "*.pdf", SearchOption.AllDirectories));
                files.RemoveAll(x =>
                    (Path.GetFileName(x).StartsWith("~") &&
                        (x.EndsWith("doc", StringComparison.OrdinalIgnoreCase) ||
                        x.EndsWith("docx", StringComparison.OrdinalIgnoreCase))
                    ));

                outputOfTransformMany.AddRange(files.Distinct().ToList());
                return files.Distinct().ToList();
            });
            
            var pdfBlock = new TransformBlock<string, Resume>(file =>
            {
                var resume = new Resume();
                resume.Name = System.IO.Path.GetFileNameWithoutExtension(file);
                resume.Url = System.IO.Path.GetFileName(file);
                resume.DataInPlainText = PDFDocumentHelper.Read(file);
                resume.DateUploaded = System.IO.File.GetLastWriteTime(file).Date;
                return resume;
            });

            var wordBlock = new TransformBlock<string, Resume>(file =>
            {
                var resume = new Resume();
                resume.Name = System.IO.Path.GetFileNameWithoutExtension(file);
                resume.Url = System.IO.Path.GetFileName(file);
                resume.DataInPlainText = WordDocumentHelper.Read(file);
                resume.DateUploaded = System.IO.File.GetLastWriteTime(file).Date;
                return resume;
            }, new ExecutionDataflowBlockOptions 
                      { MaxDegreeOfParallelism = Environment.ProcessorCount });
            
            var batchBlock = new BatchBlock<Resume>(50);
            var insertToSqlBlock = new TransformBlock<IEnumerable<Resume>, BulkImportResumeResult>(x =>
            {
                return BulkInsertResumes(x);
            });

            var importResult = new List<BulkImportResumeResult>();
            var lastBlock = new ActionBlock<BulkImportResumeResult>(x =>
            {
                //build index using lucene
                luceneService.BuildIndex(x.ImportedResumes);
                importResult.Add(x);
            });

            var options = new DataflowLinkOptions { PropagateCompletion = true };
            firstBlock.LinkTo(pdfBlock, options, x => x.ToLower().EndsWith(".pdf"));
            firstBlock.LinkTo(wordBlock, options, 
                    x => x.ToLower().EndsWith(".doc") | x.ToLower().EndsWith(".docx"));

            pdfBlock.LinkTo(batchBlock);
            wordBlock.LinkTo(batchBlock);

            batchBlock.LinkTo(insertToSqlBlock, options);
            insertToSqlBlock.LinkTo(lastBlock, options);

            foreach (var item in folders)
            {
                firstBlock.Post(item);
            }

            firstBlock.Complete();

            await Task.WhenAll(pdfBlock.Completion, wordBlock.Completion)
                .ContinueWith(x => batchBlock.Complete());

            await lastBlock.Completion;

            wordBlock.Completion.ContinueWith(t =>
            {
                logService.Info("word block faulted");
            }, TaskContinuationOptions.OnlyOnFaulted);

            pdfBlock.Completion.ContinueWith(t =>
            {
                logService.Info("pdf block faulted");
            }, TaskContinuationOptions.OnlyOnFaulted);            
        }

Suggestions and improvements are most welcome.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)