I recently needed to write an application that would loop through a queue of files and FTP them to our Content Delivery Network for streaming. Users upload files, and our administrators can mark some of them as urgent. Urgent ones need to jump to the front of the queue, otherwise everything should be ordered by broadcast date. My initial code was basically a loop that looked something like this:
While (GetFtpQueue().Count > 0)
{
}
It worked beautifully while we were just uploading small audio files, but as soon as we started adding a lot of video files to the queue, it became so slow that it might take 2 hours or more to upload a single video file. So, we experimented with Filezilla to see how many concurrent uploads we could add before the overall speed of each upload started to drop. We found that at our location, 4 simultaneous FTP uploads seemed to hit the sweet spot: instead of uploading 1 file at 500 kb/s, we could upload all four and each one would still be at that speed, quadrupling our throughput.
I read up on using the new Threading
classes in .NET 4.0, and began refactoring my FTP application. I decided to use the Task Factory to manage the threads, in conjunction with a BlockingCollection
to create a classic Producer/Consumer pattern. My first attempt looked a lot like this:
int maxThreads = 4;
var filesToFtp = new BlockingCollection<FtpItem>(maxThreads);
var processedFiles = new BlockingCollection<FtpItem>();
var initFtp = Task.Factory.StartNew(() =>
{
try
{
While (GetFtpQueue().Count > 0)
{
filesToFtp.Add(new FtpItem { ... };
}
}
finally { filesToFtp .CompleteAdding(); }
});
var process = Task.Factory.StartNew(() =>
{
try
{
foreach(var file in filesToFtp.GetConsumingEnumerable()
{
processedFiles.Add(file);
}
}
finally { processedFiles.CompleteAdding(); }
});
var cleanup = Task.Factory.StartNew(() =>
{
foreach(var file in processedFiles.GetConsumingEnumerable()
{
}
});
Task.WaitAll(initFtp, process, cleanup);
Initially, this looked quite promising. I wrote a bare bones version of it like the one above that just did thread.sleep
to simulate work and iterated through a list of int
s. I was able to verify that each "stage" was running on its own thread, that it never allowed more than 4 items through at a time, that I could add items to the front of the queue and get them processed next, and that it never tried to 'cleanup' a file until that file had passed through both stage 1 and stage 2. However, I did notice that the elapsed time was the same as when I ran a similar unit test in a simple while
loop. It might be obvious to you why this is, but at the time I put it down to a limitation of the unit test and pushed my new code to production. The first thing I noticed was that it wasn't any faster. Not even slightly. It took me hours of staring at the code to finally figure out why my multi threaded code was not running any faster, but the answer is simple: I only created one consumer of filesToFtp
. I had incorrectly assumed that because I was creating up to 4 ftpItems
at a time, and the FTP process was running on its own thread, that it would consume as many as it could, but the reality is that in the code above, while each of the three stages are running on their own thread, the whole process was still happening in series, since stage 1 doesn't create 4 items at once, it creates them one after the other, stage 2 does begin working before stage 1 is complete (as soon as there is an item to consume), but then it will be busy Ftping that first item until that item is fully uploaded, only then will it grab the next file.
To resolve this problem, I simply wrapped stage 2 in a for
loop, and created an IList
of Tasks to wait on:
int maxThreads = 4;
var filesToFtp = new BlockingCollection<FtpItem>(maxThreads);
var processedFiles = new BlockingCollection<FtpItem>();
IList<Task> tasks = new List<Task>();
tasks.Add(Task.Factory.StartNew(() =>
{
try
{
While (GetFtpQueue().Count > 0)
{
filesToFtp.Add(new FtpItem { ... };
}
}
finally { filesToFtp .CompleteAdding(); }
}));
for (int i = 0; i < maxThreads; i++)
{
tasks.Add(Task.Factory.StartNew(() =>
{
try
{
foreach(var file in filesToFtp.GetConsumingEnumerable()
{
processedFiles.Add(file);
}
}
finally { processedFiles.CompleteAdding(); }
}));
}
tasks.Add(Task.Factory.StartNew(() =>
{
foreach(var file in processedFiles.GetConsumingEnumerable()
{
}
}));
Task.WaitAll(tasks.ToArray());
I reran the unit test and it was faster! Very nearly 4 times faster in fact. Wahoo! I updated the code, published my changes and sat back. Sure enough, the FTP process finally started to make up some ground. In the mean time, I went back to my unit test and began tweaking. The first thing I noticed was that sometimes I would get a "System.InvalidOperationException
: The BlockingCollection<T>
has been marked as complete with regards to additions." Luckily, this didn't take a lot of head scratching to figure out: the first thread to reach the 'finally
' clause of stage 2 closed the processedFiles
collection, leaving the other three threads hanging. A final refactoring resolved the issue:
int maxThreads = 4;
var filesToFtp = new BlockingCollection<FtpItem>(maxThreads);
var processedFiles = new BlockingCollection<FtpItem>();
IList<Task> tasks = new List<Task>();
IList<Task> ftpProcessTasks = new List<Task>();
tasks.Add(Task.Factory.StartNew(() =>
{
try
{
While (GetFtpQueue().Count > 0)
{
filesToFtp.Add(new FtpItem { ... };
}
}
finally { filesToFtp .CompleteAdding(); }
}));
for (int i = 0; i < maxThreads; i++)
{
ftpProcessTasks.Add(Task.Factory.StartNew(() =>
{
try
{
foreach(var file in filesToFtp.GetConsumingEnumerable()
{
processedFiles.Add(file);
}
}
}));
}
tasks.Add(Task.Factory.StartNew(() =>
{
foreach(var file in processedFiles.GetConsumingEnumerable()
{
}
}));
Task.WaitAll(ftpProcessTasks.ToArray());
processedFiles.CompleteAdding();
Task.WaitAll(tasks.ToArray());
Download a working example (just enter your FTP Server details prior to running).