Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

DataTable Synchronization Manager

0.00/5 (No votes)
4 Mar 2006 1  
Adds synchronization to the DataTable Transaction Logger.

Introduction

In my previous article, I discussed the DataTableTransactionLog. This article further develops those concepts by implementing a DataTable synchronization manager. The synchronization manager is responsible for taking the transaction records and creating a list of record "packets". It also takes a list of record "packets" and reconstructs the transaction records. These transaction records can then be applied to a target DataTable, synchronizing the individual DataTables.

Refactoring

Several refactorings have been made that readers of the previous article should be aware of. Besides the change of namespace, to "Clifton.Data", the others are the following:

DataTableTransactionLog.cs:

  • Moved Apply and Revert record manipulation to the DataTableTransactionLog class.
  • Moved RestoreRowFields and SaveRowFields to the DataTableTransactionLog class.
  • Added indexer to return the transaction record.
  • Modified the Deleting event to search all the previous transactions and save the current row data in any transaction record that references the row about to be deleted.
  • The logger now utilizes an internal DataView to facilitate finding records by primary key(s) during synchronization.

DataTableTransactionRecord.cs:

  • TransType property renamed to TransactionType.
  • TransactionType enumeration renamed to RecordType.
  • Added a virtual Apply method that implements the logic that used to be in the DataTableTransactionLog class.
  • Added a non-virtual Revert method that implements the logic that used to be in the DataTableTransactionLog class.
  • columnValues is exposed (ColumnValues) and is settable.

Architecture

There is a specific architecture that drives the application's interface to the synchronization manager. The application requirements are:

  • ensuring that each DataTable implements the same column names.
  • utilizing one or more primary keys to uniquely identify each row.
  • ensuring that primary key values are initialized and valid before getting the transaction packets from the synchronization manager.
  • ensuring that primary key values are initialized and valid before applying transaction packets to the target DataTable.
  • the primary keys are determined from the DataTable's PrimaryKey array.

The following is a UML diagram illustrating the relationships between the logger and the synchronization manager:

The classes relevant to the DataTable synchronization will be discussed next.

TransactionRecordPacket

In the previous article, the DataTableTransactionRecord tracked changes to DataRow instances. Since each DataTable will have a different DataRow instance, it is necessary to use a different mechanism to synchronize unique DataTable instances. The mechanism I chose was to utilize the primary key column(s). If your DataTable does not have a primary key column, you will have to add one, or modify your query to acquire the primary key column from the persistent store.

The TransactionRecordPacket stores the primary key values in a dictionary, mapping the primary key column name and the primary key value. If the transaction is a column change, the column name of the field being changed and the new value is also stored. The old value is not preserved, nor is the entire row's field values if the row is deleted.

Implementation

The critical part of the implementation is the call to GetGuaranteedRowValue, which gets the specified field value regardless of whether the row has been deleted:

public TransactionRecordPacket(DataTableTransactionRecord record)
{
  pkValues = new Dictionary<string, object>();
  tranType = record.TransactionType;

  foreach (DataColumn dc in record.Row.Table.PrimaryKey)
  {
    pkValues.Add(dc.ColumnName, record.GetGuaranteedRowValue(dc.ColumnName));
  }

  // Fill in some additional information if the transaction is a ChangeField.

  if (tranType == DataTableTransactionRecord.RecordType.ChangeField)
  {
    columnName = record.ColumnName;
    newValue = record.NewValue;
  }
}

This means that even for deleted rows, we can acquire the primary key field values so we can find the row in the DataTable being synchronized and delete it. The internal implementation of the GetGuaranteedRowValue method is:

public object GetGuaranteedRowValue(string fieldName)
{
  object ret = null;

  if (WasDeleted)
  {
    ret = columnValues[fieldName];
  }
  else
  {
    if (row.RowState == DataRowState.Deleted)
    {
      throw new DataTableTransactionException(
          "Row has been deleted and there is no saved column values.");
    }

    ret = row[fieldName];
  }

  return ret;
}

The columnValues property is populated when the record is deleted, as mentioned above, for all transaction records that reference the row being deleted. This is the refactoring of the OnRowDeleting handler:

protected void OnRowDeleting(object sender, DataRowChangeEventArgs e)
{
  if (doLogging)
  {
    DataTableTransactionRecord record;
    record = new DataTableTransactionRecord(transactions.Count, e.Row, 
         DataTableTransactionRecord.RecordType.DeleteRow);
    record.SaveRowFields(e.Row);
    OnTransactionAdding(new TransactionEventArgs(record));
    transactions.Add(record);
    Dictionary<string, object> colVals = record.ColumnValues;

    // Tell all transaction records involving this row to save the row fields.

    // This allows us to access deleted row data in earlier transactions.

    // Alternatively, since the row is deleted, all transactions involving the 

    // deleted row could be removed. I'm not sure about this approach though--

    // is it possible for transactions to affect other non-deleted data before

    // the row deleted?

    for (int i = 0; i < transactions.Count - 1; i++)
    {
      if (transactions[i].Row == e.Row)
      {
        transactions[i].ColumnValues = colVals;
      }
    }

    OnTransactionAdded(new TransactionEventArgs(record));
  }
}

I'm not particularly thrilled with this implementation simply because it requires iterating through the transaction list. Well, optimizations can be done later, right?

DataTablePKTransactionRecord

This class is derived from DataTableTransactionRecord and overrides the Apply method. As mentioned above, the primary key values of a row must be used rather than the DataRow instance.

Implementation

The Apply method is implemented as follows:

public override DataRow Apply(DataView dataView)
{
  DataTable sourceTable=dataView.Table;

  // If the transaction record contains a known DataRow, then use

  // the default Apply method.

  if (row != null)
  {
    base.Apply(dataView);
  }
  else
  {
    // We have to use the PK value information to determine the row.

    switch (transType)
    { 
      case RecordType.NewRow:
        row = sourceTable.NewRow();
        SetPKFieldValues();
        sourceTable.Rows.Add(row);
        break;

      case RecordType.DeleteRow:
        row = FindRow(dataView);
        row.Delete();
        break;

     case RecordType.ChangeField:
       row = FindRow(dataView);
       row[columnName] = newValue;
       break;
    }
  }

  return null;
}

The call to SetPKFieldValues:

protected void SetPKFieldValues()
{
  foreach (KeyValuePair<string, object> kvp in pkFieldNameValues)
  {
    row[kvp.Key] = kvp.Value;
  }
}

is necessary so that a new row can be located after it has been added. This results in some redundancy in transactions that set the primary key values even though they've already been set here. A possible refactoring would be to accumulate all the field change transactions and apply them all at once when adding a new row. That would involve an implementation that has access to all the transaction records, whereas what I'm focusing on here is applying transaction records one at a time. This is an important step as it validates the implementation for the basic requirements.

The FindRow method sets up the primary key field values as determined by the packet's primary key field/value dictionary:

protected DataRow FindRow(DataView dataView)
{
  if ((dataView.Sort == null) || (dataView.Sort == String.Empty))
  {
    throw new DataTableSynchronizationManagerException("The transaction 
       logger's SetupSort method must be called before synchronization.");
  }

  object[] pks = new object[pkValues.Count];
  pkValues.Values.CopyTo(pks, 0);
  int idx = dataView.Find(pks);

  if (idx < 0)
  {
    throw new DataTableSynchronizationManagerException("Could not find row 
          to update.");
  }

  return dataView[idx].Row;
}

DataTableSynchronizationManager

The synchronization manager interfaces with the DataTableTransactionLog instance to acquire the transactions and convert them into TransactionRecordPacket instances. It also does the reverse--taking a collection of TransactionRecordPacket instances and adding them to the logger's transaction collection.

Implementation

GetTransactions returns a list of TransactionRecordPacket instances:

public List<TransactionRecordPacket> GetTransactions()
{
  if (logger.SourceTable.PrimaryKey == null)
  {
    throw new DataTableTransactionException(
       "GetTransactions requires at least one PK.");
  }

  List<TransactionRecordPacket> packets = new List<TransactionRecordPacket>();

  foreach (DataTableTransactionRecord record in logger.Log)
  {
    TransactionRecordPacket trp = new TransactionRecordPacket(record);
    packets.Add(trp);
  }

  return packets;
}

These can then be applied to a transaction of the logger managing the mirrored DataTable:

public void AddTransactions(List<TransactionRecordPacket> transactionPackets)
{
  foreach (TransactionRecordPacket trp in transactionPackets)
  {
    logger.Log.Add(new DataTablePKTransactionRecord(trp));
  }
}

Note how the synchronization manager adds the instances of the specialized DataTablePKTransactionRecord class. This is so that the Appy method can be overridden so that primary key values are used to locate rows rather than the DataRow instance itself.

Lastly, the synchronization manager implements two methods, SetupSort and Sync, the latter of which is used to apply all transactions added to the transaction log. These are expected to be exclusively DataTablePKTransactionRecord instances:

public void Sync()
{
  SetupSort();
  logger.SuspendLogging();

  foreach (DataTableTransactionRecord record in logger.Log)
  {
    if (!(record is DataTablePKTransactionRecord))
    {
      throw new DataTableSynchronizationManagerException("Expected a record 
          of type DataTablePKTransactionRecord.");
    }

    record.Apply(logger.DataView);
  }

  logger.ResumeLogging();
}

The SetupSort method will set up the DataView.Sort property so that the DataView.Find method can be used to find rows based on their primary keys. For this to work, the DataColumn instances in the DataTable's Column collection must be initialized in the same order:

protected void SetupSort()
{
  if (logger.SourceTable.PrimaryKey == null)
  { 
    throw new DataTableTransactionException(
        "GetTransactions requires at least one PK.");
  }

  string sortBy = String.Empty;
  string comma = String.Empty;

  foreach (DataColumn dc in logger.SourceTable.PrimaryKey)
  {
    sortBy += comma + dc.ColumnName;
    comma = ", ";
  }

  logger.DataView.Sort = sortBy;
}

Unit Tests

I've created a unit test for both the core logger functionality and the synchronization manager.

Transaction Logger

The transaction logger unit tests are a sequential set of unit tests, runnable using my Advanced Unit Test engine.

using System;
using System.Collections.Generic;
using System.Data;

using Vts.UnitTest;

using Clifton.Data;

namespace TransactionLoggerUnitTests
{
  [TestFixture]
  [ProcessTest]
  public class LoggerTests
  {
    protected DataTable dt;
    protected DataTableTransactionLog dttl;
    protected DataRow row;

    [TestFixtureSetUp]
    public void FixtureSetup()
    {
      dt = new DataTable();
      dt.Columns.Add(new DataColumn("LastName", typeof(string)));
      dt.Columns.Add(new DataColumn("FirstName", typeof(string)));

      dttl = new DataTableTransactionLog();
      dttl.SourceTable = dt;
    }

    [Test, Sequence(1)]
    public void NewRow()
    {
      row = dt.NewRow();
      Assertion.Assert(dttl.Log.Count == 1, "Expected one entry.");
      Assertion.Assert(dttl.Log[0].TransactionType == 
          DataTableTransactionRecord.RecordType.NewRow, 
          "Expected new row transaction.");
    }

    [Test, Sequence(2)]
    public void SetFields()
    {
      row["LastName"] = "Clifton";
      row["FirstName"] = "Marc";
      dt.Rows.Add(row);
      Assertion.Assert(dttl.Log.Count == 3, "Expected three entries.");
      Assertion.Assert(dttl.Log[1].TransactionType == 
           DataTableTransactionRecord.RecordType.ChangeField, 
           "Expected change field transaction.");
      Assertion.Assert(dttl.Log[2].TransactionType == 
           DataTableTransactionRecord.RecordType.ChangeField, 
           "Expected change field transaction.");
      Assertion.Assert(dttl.Log[1].NewValue.ToString()=="Clifton", 
           "Incorrect new value.");
      Assertion.Assert(dttl.Log[2].NewValue.ToString() == "Marc", 
           "Incorrect new value.");
    }

    [Test, Sequence(3)]
    public void CollectNothing()
    {
      dttl.CollectUncommittedRows();    
      Assertion.Assert(dt.Rows.Count == 1, "Committed row was collected!");
    }

    [Test, Sequence(4)]
    public void CollectUncommitted()
    {
      dt.NewRow();
      dttl.CollectUncommittedRows();
      Assertion.Assert(dttl.Log.Count == 3, "Expected three entries.");
    }

    [Test, Sequence(5)]
    public void RevertFirstNameChange()
    {
      dttl.Revert(2);
      Assertion.Assert(dt.Rows[0]["LastName"].ToString() ==
         "Clifton", "Incorrect value.");
      Assertion.Assert(dt.Rows[0]["FirstName"]==DBNull.Value, 
         "Incorrect new value.");
    }

    [Test, Sequence(6)]
    public void RevertLastNameChange()
    {
      dttl.Revert(1);
      Assertion.Assert(dt.Rows[0]["LastName"] == DBNull.Value, 
          "Incorrect value.");
      Assertion.Assert(dt.Rows[0]["FirstName"] == DBNull.Value, 
          "Incorrect new value.");
    }

    [Test, Sequence(7)]
    public void RevertNewRowChange()
    {
      dttl.Revert(0);
      Assertion.Assert(dt.Rows.Count == 0, "Row should have been deleted.");
    }

    [Test, Sequence(8)]
    public void ApplyNewRow()
    {
      dttl.Apply(0);
      Assertion.Assert(dt.Rows.Count == 1, "Row was not added.");
      Assertion.Assert(dt.Rows[0]["LastName"] == DBNull.Value, 
           "Incorrect value.");
      Assertion.Assert(dt.Rows[0]["FirstName"] == DBNull.Value, 
           "Incorrect new value.");
    }

    [Test, Sequence(9)]
    public void ApplyLastName()
    {
      dttl.Apply(1);
      Assertion.Assert(dt.Rows.Count == 1, "Row was not added.");
      Assertion.Assert(dt.Rows[0]["LastName"].ToString() == "Clifton", 
           "Incorrect value.");
      Assertion.Assert(dt.Rows[0]["FirstName"] == DBNull.Value, 
           "Incorrect new value.");
    }

    [Test, Sequence(10)]
    public void ApplyFirstName()
    {
      dttl.Apply(2);
      Assertion.Assert(dt.Rows.Count == 1, "Row was not added.");
      Assertion.Assert(dt.Rows[0]["LastName"].ToString() == "Clifton", 
           "Incorrect value.");
      Assertion.Assert(dt.Rows[0]["FirstName"].ToString() == "Marc", 
           "Incorrect new value.");
    }
  }
}

Synchronization Manager

The unit test for the synchronization manager illustrates the basic process for synchronizing two DataTable instances:

using System;
using System.Collections.Generic;
using System.Data;

using Vts.UnitTest;

using Clifton.Data;

namespace TransactionSyncrhonizationManagerUnitTests
{
  [TestFixture]
  [ProcessTest]
  public class CurrencyTests
  {
    protected DataTable dt1;
    protected DataTableTransactionLog dttl1;
    protected DataTable dt2;
    protected DataTableTransactionLog dttl2;

    [TestFixtureSetUp]
    public void FixtureSetup()
    {
      dt1 = new DataTable();
      dt1.Columns.Add(new DataColumn("PK", typeof(Guid)));
      dt1.Columns.Add(new DataColumn("LastName", typeof(string)));
      dt1.Columns.Add(new DataColumn("FirstName", typeof(string)));
      dt1.PrimaryKey = new DataColumn[] { dt1.Columns["PK"] };

      dttl1 = new DataTableTransactionLog(dt1);

      dt2 = dt1.Clone();
      dttl2 = new DataTableTransactionLog(dt2);

      DataRow row=dt1.NewRow();
      row["PK"]=Guid.NewGuid();
      row["LastName"]="Clifton";
      row["FirstName"]="Marc";
      dt1.Rows.Add(row);

      row=dt1.NewRow();
      row["PK"]=Guid.NewGuid();
      row["LastName"]="Linder";
      row["FirstName"]="Karen";
      dt1.Rows.Add(row);

      row=dt1.NewRow();
      row["PK"]=Guid.NewGuid();
      row["LastName"]="Doe";
      row["FirstName"]="John";
      dt1.Rows.Add(row);

      dt1.Rows[2].Delete();
    }

    [Test, Sequence(1)]
    public void UpdateMirror()
    {
      DataTableSynchronizationManager dtcm1 = 
          new DataTableSynchronizationManager(dttl1);
      List<TransactionRecordPacket> trpList = dtcm1.GetTransactions();

      DataTableSynchronizationManager dtcm2 = 
          new DataTableSynchronizationManager(dttl2);
      dtcm2.AddTransactions(trpList);
      dtcm2.Sync();

      Assertion.Assert(dt2.Rows.Count == 2, "Expected 2 rows.");
      Assertion.Assert(dt2.Rows[0]["LastName"].ToString() == "Clifton", 
          "Unexpected value");
      Assertion.Assert(dt2.Rows[0]["FirstName"].ToString() == "Marc", 
          "Unexpected value");      
      Assertion.Assert(dt2.Rows[1]["LastName"].ToString() == "Linder", 
          "Unexpected value");
      Assertion.Assert(dt2.Rows[1]["FirstName"].ToString() == "Karen", 
          "Unexpected value");
    }
  }
}

You will note how both the DataTable instances are set up with the same column structure. The transaction packet collection, acquired from the DataTableSynchronizationManager that interfaces to the first logger, is passed to a second DataTableSynchronizationManager instance that interfaces with the second logger. The packets are added and the Sync method is called. Also note that because the two tables are intended to be synchronized, both implement a primary key column now.

Conclusion

The DataTableSynchronizationManager is a useful component in synchronizing the data between two remote DataTable instances. The transaction packet list is suitable for serialization, using my RawSerialization library, which would provide for a compact format to send across a network.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here