Hi,
Before you type tl;dr, read the cliff-notes ^_^
Some of you will be a bit familiar with my project.
Basically: I read messages from the db. When I do I set a marker on the record called 'Started' to true. That way I won't read them again.
I put the messages into a
SortedSet
. I read from the list like a queue.
So - Here are some snippets. This will probably be quite a code dump - apologies:
Figure 1: [SELECT_WORKFLOW_MESSAGES_DUE] Stored Proc:
ALTER PROCEDURE [wfe].[SELECT_WORKFLOW_MESSAGES_DUE](
@Date DATETIME,
@SetStarted BIT = 1
)
AS
BEGIN
DECLARE @Results TABLE (workflow_message_id INT)
DECLARE @Return TABLE(workflow_message_id INT PRIMARY KEY,
workflow_message_fid INT,
workflow_message_serialized_data XML ,
workflow_message_due DATETIME,
workflow_message_started BIT,
workflow_message_complete BIT,
workflow_message_workflowid INT,
workflow_message_messageinfoid INT)
DECLARE @read INT,@set INT
SET @read = 0
SET @set = 0
INSERT INTO @Results
( workflow_message_id
)
SELECT m.workflow_message_id
FROM wfe.workflow_messages m
WHERE m.workflow_message_started = 0 AND m.workflow_message_due < @DATE
IF(@SetStarted=1)
BEGIN
UPDATE m
SET workflow_message_started = 1
FROM wfe.workflow_messages m
INNER JOIN @Results r ON m.workflow_message_id = r.workflow_message_id
SET @set = @@ROWCOUNT
END
INSERT INTO @Return
( workflow_message_id ,
workflow_message_fid ,
workflow_message_serialized_data ,
workflow_message_due ,
workflow_message_started ,
workflow_message_complete ,
workflow_message_workflowid ,
workflow_message_messageinfoid
)
SELECT m.workflow_message_id ,
m.workflow_message_fid ,
m.workflow_message_serialized_data ,
m.workflow_message_due ,
m.workflow_message_started ,
m.workflow_message_complete ,
m.workflow_message_workflowid ,
m.workflow_message_messageinfoid
FROM wfe.workflow_messages m
INNER JOIN @Results r ON m.workflow_message_id = r.workflow_message_id
SET @read = @@ROWCOUNT
IF(@read > 0 OR @set > 0)
BEGIN
INSERT INTO wfe.test
(test_set, test_read)
VALUES
(@set,@read)
END
SELECT * from @Return
END
I have also included my test items in this. I wanted to make sure the duplicates where not creeping in here. @read and @set are always identical
I set the items to "started" and fetch them in a single SP so the action is guaranteed to return and set the same results every time. I'm using EF for db calls so this is a lot more effective than doing it in c#.
The SortedList PriorityQueue:
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using Extensions.Linq;
using WorkflowDataAccess.Projections;
namespace Tools
{
public class OrderedQueue<T>
{
private readonly SortedSet<T> _queue;
private bool _cancel;
public OrderedQueue(params Func<T, IComparable>[] orderByFuncs)
{
_queue = new SortedSet<T>(orderByFuncs.ToComparer());
}
private readonly object _lockObj = new object();
private SortedSet<T> Queue
{
get
{
lock (_lockObj)
{
return _queue;
}
}
}
public T Next
{
get
{
Console.WriteLine("Ins and Outs count now at {0} in and {1} out",_inInt,_outInt);
if (WaitForItems())
return Dequeue();
return default(T);
}
}
private int _inInt, _outInt;
private void AddItem(T item)
{
_inInt ++;
Queue.Add(item);
var message = item as Message;
if (message != null)
Console.WriteLine(@"1. Message Id: {0}", message.Id);
_waitForItems.Set();
}
public void Enqueue(T item)
{
lock (_lockObj)
{
AddItem(item);
}
}
public void Enqueue(IEnumerable<T> items)
{
var enumerable = items as T[] ?? items.ToArray();
lock (_lockObj)
{
if (items != null && enumerable.Any())
{
foreach (T item in enumerable)
{
AddItem(item);
}
}
}
}
public T Dequeue()
{
T result;
lock (_lockObj)
{
result = Peek();
var message = result as Message;
if (message != null)
Console.WriteLine(@"2. Message Id: {0}", message.Id);
Queue.Remove(result);
_outInt++;
}
return result;
}
public T Peek()
{
lock (_lockObj)
{
return Queue.FirstOrDefault();
}
}
ManualResetEvent _waitForItems = new ManualResetEvent(false);
private bool WaitForItems()
{
bool hasItems;
lock (_lockObj)
{
hasItems = Queue.Any();
if (!hasItems)
{
_waitForItems = new ManualResetEvent(false);
}
}
if (!hasItems)
{
Console.WriteLine("Waiting for items");
_waitForItems.WaitOne();
}
lock (_lockObj)
{
hasItems = Queue.Any();
}
return hasItems;
}
public IEnumerable<T> StreamQueue()
{
while (!_cancel)
while (WaitForItems())
{
if (!_cancel)
yield return Dequeue();
}
}
public void StopStream()
{
_cancel = true;
_waitForItems.Set();
}
}
}
This is not as the original version, but it should still be thread safe. I want it to be generic but right now it does include some debugging information where it casts the T item as a Message.
I instantiate this class as follows:
private readonly OrderedQueue<WorkflowDataAccess.Projections.Message>
_messageQueue = new OrderedQueue<WorkflowDataAccess.Projections.Message>(m => -m.Urgency, m => m.Due, m=>m.Id);
The first two function are the sort-key, the last is used an a unique identifier otherwise items with the same urgency and due date get merged.
I run a separate BackgroundWorker to read into the queue. This is the method run in that worker:
public void GatherMessages(BackgroundWorker bgw)
{
while (!bgw.CancellationPending)
{
_messageQueue.Enqueue(WorkflowDataAccess.Projections.Message.SelectDueMessage());
}
_messageQueue.StopStream();
}
Finally my Message Parse method. This also runs withing a BackgroundWorker:
public void ParseMessageQueue()
{
We need to make sure that we do not load the same workflow twice at once
List<int> liveWorkflowIds = new List<int>();
object lockObject = new object();
WorkflowDataAccess.Projections.Message message;
while ((message = _messageQueue.Next) != null)
{
Console.WriteLine(@"3. Message Id: {0}", message.Id);
var m = message;
ThreadPool.QueueUserWorkItem(state =>
{
IMessage messageInstance = MessageLoader.LoadMessage(m);
IWorkflow workflow = GetWorkflow(messageInstance);
if (workflow == null)
{
if (!messageInstance.SkipIfNotInWorkflowState)
throw new Exception(string.Format("Workflow not found for message id {0}", m.Id));
}
else
{
bool canContinue;
lock (lockObject)
{
canContinue = !liveWorkflowIds.Contains(workflow.WorkflowId);
if (canContinue)
liveWorkflowIds.Add(workflow.WorkflowId);
}
if (canContinue)
{
workflow.ParseMessage(messageInstance);
lock (lockObject)
liveWorkflowIds.Remove(workflow.WorkflowId);
}
else
{
Console.WriteLine(@"3.5. Message Id{0}",m.Id);
messageInstance.Requeue();
}
}
messageInstance.MarkComplete();
Console.WriteLine(@"4. Message Id{0}",m.Id);
});
}
}
The
GatherMessages
method had a delay of 1 second. I still saw a few duplicates. I removed the delay to see what would happen at full throttle and I got LOADS of duplicate messages in the database. Not just one for each message but any type of message could have none, one or up to 3000 duplicates. There is no ceiling to the number of duplicated. I just stopped the process as soon as I got in this morning (don't panic - just local testing ATM)
Is anyone able to pick apart my code, here? I know it's complicated but this is the simplest I can break it down to :S
Please help! I cannot see where the duplicates are spawning from.
Can you think of a way that I can catch the first duplicate with a breakpoint?
I'm banging my head on the desk. I hope this is not tl;tr (to-long-[to]-read) ^_^
Thanks
EDIT: By Request here is
messageInstance.Requeue()
public void Requeue()
{
int id = Id;
int? workflowId = WorkflowId;
bool started = Started;
bool complete = Complete;
DateTime due = Due;
Id = 0;
WorkflowId = null;
Started = false;
Complete = false;
Due = DateTime.Now+TimeSpan.FromSeconds(10);
Save();
Id = id;
WorkflowId = workflowId;
Started = started;
Complete = complete;
Due = due;
}
I am still in two minds about this. This method exists in the abstract BaseMessage class. I didn't know how I could duplicate the message as the derived class will have some properties that this BaseMessage doesn't have.
The
Save()
method will insert a new item if the Id = 0
EDIT2: Update - I am getting closer:
Line 1223: 1. Message Id: 3332
Line 1366: 2. Message Id: 3332
Line 1367: 3. Message Id: 3332
Line 3104: 4. Message Id: 3332
Line 3107: 1. Message Id: 3332
Line 3111: 2. Message Id: 3332
Line 3112: 3. Message Id: 3332
Line 4199: ~Message Id: 3332 Disposal (Type: WorkFlowEngine.Messages.FlightTrackerWorkflowsCreatedMessage, WorkflowId: 556, Urgency: 0)
Line 5061: 1. Message Id: 3332
Line 5065: 2. Message Id: 3332
Line 5066: 3. Message Id: 3332
Line 5069: 4. Message Id: 3332
Line 5272: ~Message Id: 3332 Disposal (Type: WorkFlowEngine.Messages.FlightTrackerWorkflowsCreatedMessage, WorkflowId: 556, Urgency: 0)
Line 6707: 1. Message Id: 3332
Line 6710: 4. Message Id: 3332
Line 6746: 2. Message Id: 3332
Line 6747: 3. Message Id: 3332
Line 8116: ~Message Id: 3332 Disposal (Type: WorkFlowEngine.Messages.FlightTrackerWorkflowsCreatedMessage, WorkflowId: 556, Urgency: 0)
Line 8219: 3.5. Message Id: 3332
Line 8221: 1. Message Id: 3332
Line 8238: 4. Message Id: 3332
Line 8342: 2. Message Id: 3332
Line 8344: 3. Message Id: 3332
Line 9148: 4. Message Id: 3332
Line 9416: ~Message Id: 3332 Disposal (Type: WorkFlowEngine.Messages.FlightTrackerWorkflowsCreatedMessage, WorkflowId: 556, Urgency: 0)
So the message was read more that once, and there was some significant time inbetween.
I won't mark as solved just yet, but I might just get myself there
EDIT: Solution.
Turns out is was nothing to do with the threading. My BaseMessage has the Started property but IMessage did not. The Deserialize method did not set Started so as soon as the message was saved back to the databse it would lose this value >_<</xml>