Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C#

Careful with Captures and Sneaky Threads

5.00/5 (4 votes)
22 Sep 2014CPOL3 min read 11K   46  
Problem detected while testing a parallel consumer had a very dark explanation and a very simple solution.

Introduction

Parallel code is very hard to test. That's the reason why in most cases, it's not even tested: it's just released to production and everybody hopes that it will simply work. Well, you know what? Nothing simply works.

Background

A team mate and me were developing a parallel batch consumer. It was a simple task: the consumer receives messages, and when a certain number of messages is reached, it starts a thread (task) that writes all those messages to a DAL. This is so simple and classic that most people would just write the code. We decided instead to attack it with pair programming and TDD. Fortunately!

The Code

Not including fields, constructor and some other secondary members, our consumer looked something like this:

C#
public void Consume(TMessage inputMessage) {
    lock (criticalSectionForConsume) {
        pendingMessages.Add(inputMessage);

        if (pendingMessages.Count < batchSize)
            return;

        SaveAsyncAndClearPendingMessages();
    }
}

private void SaveAsyncAndClearPendingMessages() {
    CreateSaveTask()
        .Start();

    pendingMessages = new List<TMessage>();
}

private Task CreateSaveTask() {
    var task = new Task(() => messageDal.Save(pendingMessages));
    task.ContinueWith(SafelyRemoveTaskFromPendingSaveTasks);

    SafelyAddToPendingSaveTasks(task);

    return task;
}

private void SafelyRemoveTaskFromPendingSaveTasks(Task t) {
    lock (criticalSectionForPendingSaveTasks) {
        pendingSaveTasks.Remove(t);
    }
}

As mentioned before, it's so simple that nothing can go wrong. But luckily for us, we wrote the test first... and it failed. It made no sense, but it failed. Let's see the test:

C#
[Test]
public void Consumes_messages_when_batch_size_is_reached() {
    var resetEvent = new ManualResetEvent(false);
    var searchLogMessages = Enumerable.Range(0, BatchSize)
        .Select(x => Fixture.Create<Message>())
        .ToArray();

    MessageDal.Setup(x => x.Save(It.IsAny<IEnumerable<Message>>()))
        .Callback<IEnumerable<Message>>(b => resetEvent.Set());

    ConsumeAll(searchLogMessages);

    Assert.True(resetEvent.WaitOne(20));

    MessageDal.Verify(x => x.Save(searchLogMessages), Times.Once());
}

private void ConsumeAll(IEnumerable<Message> searchLogMessages)    {
    foreach (var message in searchLogMessages)
        Sut.Consume(message);
}

Let's analyse what's in here for a moment. We are using AutoFixture to create some messages and Moq to create a test double for the DAL. The DAL will just signal when messages are written to it. Finally, we verify that the messages written are the same ones that are consumed.

That exactly was the failure. The DAL was signaling properly, but the messages written were different from those consumed, so the verification failed.

We stared at the screen like dumb people for a very long time. We debugged it. We thought that the tests were wrong, they were lying to us! We even thought the very stable and mature libraries we were using were broken. We lost faith in ourselves and then, suddenly... it lit up.

And There Was Light

We extracted the pedingMessages reference inside CreateSaveTask to a parameter:

C#
private Task CreateSaveTask(List<TMessage> messages) {
    var task = new Task(() => messageDal.Save(messages));
    task.ContinueWith(SafelyRemoveTaskFromPendingSaveTasks);

    SafelyAddToPendingSaveTasks(task);

    return task;
}

...and that solved the problem. You can test it on the sample project. By inlining/extracting the parameter, you will see the tests failing and passing. The reason, though, is frightening.

Nemesis

Our task is created from a parameterless lambda expression:

C#
() => messageDal.Save(messages)

In this expression, there is an implicit capture of the messages variable. As a parameter, there is no problem, the variable scope simply ends with the method. With the instance field though, this does not happen. The original lambda was:

C#
() => messageDal.Save(pendingMessages)

which captured the instance field pendingMessages, and just a after that, a new value is assigned to that field:

C#
CreateSaveTask()
    .Start();

pendingMessages = new List<TMessage>();

If the task has not started by the time the field is assigned, which most likely would be the case, the DAL will receive the new empty list instead of the intended one. Pending messages will be lost. Our boss will be angry. Our necks... well, better not to think about it.

Happy Ending

After discovering and fixing this very subtle bomb, everything went smoothly and we lived happily ever after. We then created a bunch of tests to make sure that everything worked fine. Tests were repeated many times and random sleeps were introduced all around to try to smoke out hidden errors. Our consumer is now working under heavy pressure and its error log file keeps being empty since the release.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)