Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Synchronize access to stream data section by section

0.00/5 (No votes)
23 Feb 2004 1  
An article about locking sections of a stream for reading and writing.

Introduction

We need not be concerned with serializing access where two or more threads, referencing the same stream, read or write data in very different section. We only need to synchronize the read and write operations where sections intersect.

This article is about locking discrete sections of a data stream for reads and writes. Obviously, one thread should wait for another thread to finish its operation before proceeding with its own.

A section lock spans a contiguous section of a stream, starting at offset 'a' and ending with offset 'b' where the value of 'a' is necessarily less than the value of 'b'. Here is an example of a section lock applied.

void ThreadProc1() {    
  // create a section lock from offset 100 to offset 200    

  SectionLock section = new SectionLock(bytes, 100, 200);

  // apply the section lock and modify the global byte stream

  lock(section.SyncRoot)
  {
    for(int i=100; i<=200; i++)
    bytes[i] = 0;
  }
}  
  
void ThreadProc2() {    
  // create a section lock from offset 150 to offset 250    

  SectionLock section = new SectionLock(bytes, 100, 200);

  // apply the section lock and modify the global byte stream

  lock(section.SyncRoot)
  {
    for(int i=150; i<=250; i++)
      bytes[i] = 0;
  }
}
    

Looking at the example, there are two things to be pointed out. Firstly, the section locks in ThreadProc1 and ThreadProc2 span two different but intersecting sections. Secondly, the section locks are co-operatively applied by the two thread procedures.

Here is what we have to achieve. Because the stream sections intersect at offsets '150' to '200' we must serialize the threads reading and writing to the stream. To do this, we apply a section lock to the range of affected bytes.

The SectionLock class

Here is a class designed to synchronize access to a section of a data stream. I will present it part by part to better describe its design.

public class SectionLock {

  // maintains a list of protected section, one list per stream

  static Hashtable table = Hashtable.Synchronized( new Hashtable() );

  public SectionLock(
    Object stream, // this lock is relative to this stream

    long start,    // starting offset of protected section

    long end       // ending offset of protected section

    ) {

    // get the section list ore create a new one 

    ArrayList list = null;
    lock(table.SyncRoot) {
      list = table[stream] as ArrayList;
      if(list == null) {
        list = ArrayList.Synchronized( new ArrayList() );
        table.Add(stream, list);
      }
    }

    // more code below here ...


  }

  // more code below here

}    
    

Our immediate consideration is to establish records of all streams and their protected sections. We envision multiple streams where each stream associates one list of protected sections. A class-wide Hashtable serves well to associate lists of protected sections to streams.

The required input data for a SectionLock construction are a reference to the affected stream object, and the starting and ending offsets of its to be protected section.

The first step is to look-up the table for a previously created sections list. If none is found, a new list is created and an association of stream to list is added to the table. The next step would be to create a Section object representing the to be protected part of the data stream and to add it to the list.

Here is is the Section class.

internal class Section {
  internal Object SyncRoot;
  internal long start, end;
  internal Section(long start, long end, Object syncRoot) {
    this.start = start;
    this.end = end;
    this.SyncRoot = syncRoot;
  }
  internal bool IsIntersection(long start, long end) {
    return (start >= this.start && start <= this.end) || 
            (end >= this.start && start <= this.end);
  }
}
    

Two things deserve a comment. The SyncRoot member is to be the synchronization reference when passed to the monitor as in Monitor.Enter(obj.SyncRoot) or its equivalent form lock(obj.SyncRoot) { }. The IsIntersection method serves to identify overlapping sections that obviously need to share the same SyncRoot. Here is how the constructor code continues, identifying a matching SyncRoot.

public class SectionLock {

  // maintains a list of protected section, one list per stream

  static Hashtable table = Hashtable.Synchronized( new Hashtable() );
  
  public Object SyncRoot;
  
  public SectionLock(
    Object stream, // this lock is relative to this stream

    long start,    // starting offset of protected section

    long end       // ending offset of protected section

    ) {

    // get the section list ore create a new one 

    ArrayList list = null;
    lock(table.SyncRoot) {
      list = table[stream] as ArrayList;
      if(list == null) {
        list = ArrayList.Synchronized( new ArrayList() );
        table.Add(stream, list);
      }
    }
    
    // iterate through all sections and see if one intersects

    // if one intersects, use its SyncRoot

    lock(list.SyncRoot) {
      IEnumerator enm = list.GetEnumerator();
      while(enm.MoveNext()) {
        Section curr = (Section)enm.Current;
        if(curr.IsIntersection(start, end)) {
          // pick up the SyncRoot

          this.SyncRoot = curr.SyncRoot;
          break;
        }
      }
    }
    
    // if this section does not intersect with

    // another section create a new SyncRoot

    if(this.SyncRoot == null)
      this.SyncRoot = new Object();

    // create a new section object and add it to the section list

    Section newSection = new Section(start, end, this.SyncRoot);
    lock(list.SyncRoot) {
      list.Add(newSection);
    } 
  }
}        
    

Access to two overlapping sections must be serialized. That is why the Section objects keep a reference to a SyncRoot. A new section must therefore be tested to ascertain whether it intersects with another section. If so, it must share the other section's SyncRoot. If not so, a new SyncRoot is created and assigned to the new Section object.

As the constructor code indicates, it is all about the right SyncRoot. The rule is very simple. If the two sections overlap, the sections must share the same SyncRoot object. Otherwise, the sections will have each have a SyncRoot object of their own.

Using the SectionLock object

SectionLock objects can be applied to data streams that are randomly accessible. This is the case with any type of array in the .NET framework. But stream objects that are based on the abstract class Stream are a more complicated matter. To begin with, not all stream objects are randomly accessible as, for example, streams that return false from a call to Stream.CanSeek like the NetworkStream. Also, a random access to a Stream requires that the position pointer be 'seeked' or placed to a specific offset before reading or writing. One thread can not move the position pointer while another reads or writes the stream. What we really need is an interface definition for a stream object that facilitates random access. This might be one like so.

public interface IRandomAccessStream {
  void Read(long position, Byte[] buffer, int offset, int count);
  void Write(long position, Byte[] buffer, int offset, int count);
}    

The Read and Write methods differ from the ones of the Stream class by the first parameter, which indicates the position from the beginning of the stream to read or to write to. We can implement the IRandomAccessStream interface only on array objects, and on disk files providing we map the files to memory.

The source files, attached to this article and which you may download, contain implementations of two classes, a ByteStream and a MemoryMappedFile. To verify that the section locks work, the Read and Write methods deliberately force a thread switch after reading and writing a byte of data. This, of course, was designed to be for demo purposes only.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here