Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Concurrent Observable Collection, Dictionary, and Sorted Dictionary

0.00/5 (No votes)
17 Nov 2012 5  
Provides implementations of concurrent observable collection classes for binding to WPF controls so that the collections can be updated from a thread that isn't the WPF GUI thread

Test Application

Introduction

This article provides a description of how I implemented a Dictionary type class that can be bound to a WPF list control, but can be updated from a thread that is not the WPF user interface thread. This is useful for when you have worker threads updating a Dictionary and you want to have the contents of the Dictionary bound to a control. Microsoft does provide some ObservableCollection classes in .NET 4.0, but unfortunately ObservableDictionary isn't one of them, also the ObservableCollection classes they do provide aren't multi-threading friendly. While the classes I provide in this article aren't as efficient as the collections found in the new .NET 4.0 System.Collections.Concurrent namespace, they do allow worker threads to update an ordered collection bound to a view. There are some things in my implementation that I feel could be done better, so I welcome your comments, both positive and negative, at the bottom of this article.

Background

I wanted a Threadsafe Observable Sorted Dictionary, but before I got there I had to go through these steps:

  • Threadsafe Observable Collection
  • Observable Dictionary
  • Threadsafe Observable Dictionary
  • Observable Sorted Dictionary
  • Threadsafe Observable Sorted Dictionary

The Threadsafe Observable Collection

I started with the .NET framework ObservableCollection and set about making it threadsafe, keeping in mind the following things:

  • Prevent the collection being modified while a value is being read out
  • Prevent the collection being modified for the duration of external event handling
  • The collection is ordered, not a queue, so items need to be inserted
  • Need to be able to hand off an enumerable that, due to the collection's ordered nature, won't be affected by subsequent changes to the collection

What I came up with was a base class that uses a .NET ReaderWriterLockSlim to gate reads and serialize writes to an ObservableCollection object, and pipes the serialized events via a producer/consumer queue to a View Model that can be bound to the user interface. For the enumerable side of things, I just hand off an immutable collection which is a snapshot of the base collection.

Here's the procedure for when the collection is written to:

  1. Acquire read lock
  2. Upgrade to write lock
  3. Set flag indicating a new enumerable snapshot is required
  4. Update encapsulated ObservableCollection
  5. Handle event from encapsulated collection:
    1. Add to producer consumer queue
    2. If current thread is the user interface thread, then process the entire producer consumer queue to ensure the View Model reflects what has been added.
  6. If lock is still a write lock, downgrade to a read lock
  7. Release read lock

Here's the procedure for when the collection is read from:

  1. Acquire read lock
  2. Return value
  3. Release read lock

Here's the procedure for when the enumerator is requested:

  1. Acquire read lock
  2. Check collection snapshot flag to see if a new snapshot is required
  3. If new collection snapshot is required:
    1. Acquire write lock on the collection snapshot
    2. Create a new collection snapshot
    3. Clear the collection snapshot update required flag
    4. Release write lock on the collection snapshot
  4. Return the enumerator from the snapshot

Write Procedure Code

/// <summary>
/// Calls the read function passed in, and if it returns true,
/// then calls the next read function, else calls the write
/// function.
/// </summary>
protected TResult DoBaseReadWrite<TResult>(Func<bool> readFuncTest, 
	Func<TResult> readFunc, Func<TResult> writeFunc) {
    _readWriteLock.TryEnterUpgradeableReadLock(Timeout.Infinite);
    try {
      if(readFuncTest()) {
        return readFunc();
      } else {
        _readWriteLock.TryEnterWriteLock(Timeout.Infinite);
        try {
          _newSnapshotRequired = true;
          TResult returnValue = writeFunc();
          return returnValue;
        } finally {
          if(_readWriteLock.IsWriteLockHeld) {
            _readWriteLock.ExitWriteLock();
          }
        }
      }
    } finally {
      _readWriteLock.ExitUpgradeableReadLock();
    }
  }
}
/// <summary>
/// Handles write access to the base collection when a return value is required
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="writeFunc"></param>
/// <returns></returns>
protected TResult DoBaseWrite<TResult>(Func<TResult> writeFunc) {
  _readWriteLock.TryEnterUpgradeableReadLock(Timeout.Infinite);
  try {
    _readWriteLock.TryEnterWriteLock(Timeout.Infinite);
    _newSnapshotRequired = true;
    return writeFunc();
  } finally {
    if(_readWriteLock.IsWriteLockHeld) {
      _readWriteLock.ExitWriteLock();
    }
    _readWriteLock.ExitUpgradeableReadLock();
  }
}

Read Procedure Code

/// <summary>
/// Handles read access from the base collection
/// </summary>
protected TResult DoBaseRead<TResult>(Func<TResult> readFunc) {
  if(IsDispatcherThread) {
    return readFunc();
  }

  _readWriteLock.TryEnterReadLock(Timeout.Infinite);
  try {
    return readFunc();
  } finally {
    _readWriteLock.ExitReadLock();
  }
}

Get Enumerator Code

/// <summary>
/// Gets the enumerator for a snapshot of the collection
/// </summary>
public IEnumerator<T> GetEnumerator() {
  if(IsDispatcherThread) {
    return _viewModel.GetEnumerator();
  } else {
    return Snapshot.GetEnumerator();
  }
}

/// <summary>
/// Gets an immutable snapshot of the collection
/// </summary>
public ImmutableCollectionBase<T> Snapshot {
  get {
    return DoBaseRead(() => {
      UpdateSnapshot();
      return _baseSnapshot;
    });  
  }
}

/// <summary>
/// Updates the snapshot that is used to generate an Enumerator
/// </summary>
private void UpdateSnapshot() {
  if(_newSnapshotRequired) {
    _snapshotLock.TryEnterWriteLock(Timeout.Infinite);
    if(_newSnapshotRequired) {
      _baseSnapshot = new ImmutableCollection<t>(_baseCollection);
      _newSnapshotRequired = false;
    }
    _snapshotLock.ExitWriteLock();
  }
}

The Observable Dictionary

So now I have a method of wrapping access to an observable collection to make it suitable for updating from threads that are not the GUI thread. So all I need to do is wrap the ObservableDictionary class from .NET, however there isn't one at the current time, so I had to write my own.

The ObservableDictionary class I wrote encapsulates 2 collections:

  • An ObservableCollection of Key-Value pairs, that is used as the basis of the observable part of the ObservableDictionary
  • A Dictionary that maps the Key to the Index in the base ObservableCollection

This however does not allow me to neatly insert items into the ObservableCollection without updating a whole bunch of Dictionary entries, so what I did was store link list nodes in the Dictionary instead of indices where the order of the linking is the same as the order ObservableCollection. The link list nodes have a recursive decrement forward, and increment forward methods for shifting the index they point to when removing or inserting nodes. The code below shows how they work:

/// <summary>
/// This function effectively removes this node from the linked list,
/// and decrements the position index of all the nodes that follow it.
/// It removes the node by changing the nodes that come before and
/// after it to point to each other, thus bypassing this node.
/// </summary>
public void Remove() {
  if (this.Previous != null) {
    this.Previous.Next = this.Next;
  }
  if (this.Next != null) {
    this.Next.Previous = this.Previous;
  }
  DecrementForward();
}

/// <summary>
/// This recursive function decrements the position index of all the nodes
/// in front of this node. Used for when a node is removed from a list.
/// </summary>
private void DecrementForward() {
  if (this.Next != null) {
    this.Next.Index--;
    this.Next.DecrementForward();
  }
}

/// <summary>
/// This recursive function decrements the position index of all the nodes
/// in front of this node. Used for when a node is inserted into a list.
/// </summary>
private void IncrementForward() {
  if (this.Next != null) {
    this.Next.Index++;
    this.Next.IncrementForward();
  }
}

Once I had the ObservableDictionary class, the threadsafe version was just a matter of wrapping the ObservableDictionary with a whole lot of accessors, except for the Keys and Values properties. The Keys and Values are read out of a snapshot of the Dictionary, which is updated if necessary when the Keys or Values properties are read.

The Observable Sorted Dictionary

At this point, the sorted version of the ObservableDictionary was just simply a matter of specifying the index to insert a new item at when an item was added. For this, I used a binary sort algorithm:

/// <summary>
/// Gets the position for a key to be inserted such that the sort order is maintained.
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public int GetInsertIndex(int count, TKey key, Func<int, TKey> indexToKey) {
  return BinarySearchForIndex(0, count - 1, key, indexToKey);
}

/// <summary>
/// Searches for the index of the insertion point for the key passed in such that
/// the sort order is maintained. Implemented as a non-recursive method.
/// </summary>
/// <param name="low"></param>
/// <param name="high"></param>
/// <param name="key"></param>
/// <returns></returns>
private int BinarySearchForIndex
(int low, int high, TKey key, Func<int, TKey> indexToKey) {
  while (high >= low) {

    // Calculate the mid point and determine if the key passed in
    // should be inserted at this point, below it, or above it.
    int mid = low + ((high - low) >> 1);
    int result = this.Compare(indexToKey(mid), key);

    // Return the current position, or change the search bounds
    // to be above or below the mid point depending on the result.
    if (result == 0)
      return mid;
    else if (result < 0)
      low = mid + 1;
    else
      high = mid - 1;
  }
  return low;
}

As with the ObservableDictionary, it was just a matter of wrapping the ObservableSortedDictionary with accessors from the threadsafe ConcurrentObservableBase class I wrote.

Using the Code

In the Swordfish.NET.General project in the attached source code, I have provided the following collection classes under the namespace Swordfish.NET.Collections:

  • ObservableDictionary
  • ObservableSortedDictionary
  • ConcurrentObservableCollection
  • ConcurrentObservableDictionary
  • ConcurrentObservableSortedDictionary

The attached source code also contains a WPF application that you can use for some light interactive testing of the above collection classes.

Points of Interest

With .NET 4.0, Microsoft has implemented some unordered concurrent collections, including a ConcurrentDictionary.

John Simmons raised the need for an ObservableDictionary on The Code Project back in May 2010 and linked to an implementation.

History

  • First posted on 8th June 2011
  • Fixed some issues found by Olly Hayes on 12th September 2011
  • Significant update, changed ReadWriteLock to ReadWriteLockSlim, and changed to use a separate View Model collection for binding to the user interface. Labelled the source code v2 to reflect the significant changes.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here