Introduction
In my previous article, I discussed the DataTableTransactionLog
. This article further develops those concepts by implementing a DataTable
synchronization manager. The synchronization manager is responsible for taking the transaction records and creating a list of record "packets". It also takes a list of record "packets" and reconstructs the transaction records. These transaction records can then be applied to a target DataTable
, synchronizing the individual DataTable
s.
Refactoring
Several refactorings have been made that readers of the previous article should be aware of. Besides the change of namespace, to "Clifton.Data
", the others are the following:
DataTableTransactionLog.cs:
- Moved
Apply
and Revert
record manipulation to the DataTableTransactionLog
class.
- Moved
RestoreRowFields
and SaveRowFields
to the DataTableTransactionLog
class.
- Added indexer to return the transaction record.
- Modified the
Deleting
event to search all the previous transactions and save the current row data in any transaction record that references the row about to be deleted.
- The logger now utilizes an internal
DataView
to facilitate finding records by primary key(s) during synchronization.
DataTableTransactionRecord.cs:
TransType
property renamed to TransactionType
.
TransactionType
enumeration renamed to RecordType
.
- Added a virtual
Apply
method that implements the logic that used to be in the DataTableTransactionLog
class.
- Added a non-virtual
Revert
method that implements the logic that used to be in the DataTableTransactionLog
class.
columnValues
is exposed (ColumnValues
) and is settable.
Architecture
There is a specific architecture that drives the application's interface to the synchronization manager. The application requirements are:
- ensuring that each
DataTable
implements the same column names.
- utilizing one or more primary keys to uniquely identify each row.
- ensuring that primary key values are initialized and valid before getting the transaction packets from the synchronization manager.
- ensuring that primary key values are initialized and valid before applying transaction packets to the target
DataTable
.
- the primary keys are determined from the
DataTable
's PrimaryKey
array.
The following is a UML diagram illustrating the relationships between the logger and the synchronization manager:
The classes relevant to the DataTable
synchronization will be discussed next.
TransactionRecordPacket
In the previous article, the DataTableTransactionRecord
tracked changes to DataRow
instances. Since each DataTable
will have a different DataRow
instance, it is necessary to use a different mechanism to synchronize unique DataTable
instances. The mechanism I chose was to utilize the primary key column(s). If your DataTable
does not have a primary key column, you will have to add one, or modify your query to acquire the primary key column from the persistent store.
The TransactionRecordPacket
stores the primary key values in a dictionary, mapping the primary key column name and the primary key value. If the transaction is a column change, the column name of the field being changed and the new value is also stored. The old value is not preserved, nor is the entire row's field values if the row is deleted.
Implementation
The critical part of the implementation is the call to GetGuaranteedRowValue
, which gets the specified field value regardless of whether the row has been deleted:
public TransactionRecordPacket(DataTableTransactionRecord record)
{
pkValues = new Dictionary<string, object>();
tranType = record.TransactionType;
foreach (DataColumn dc in record.Row.Table.PrimaryKey)
{
pkValues.Add(dc.ColumnName, record.GetGuaranteedRowValue(dc.ColumnName));
}
if (tranType == DataTableTransactionRecord.RecordType.ChangeField)
{
columnName = record.ColumnName;
newValue = record.NewValue;
}
}
This means that even for deleted rows, we can acquire the primary key field values so we can find the row in the DataTable
being synchronized and delete it. The internal implementation of the GetGuaranteedRowValue
method is:
public object GetGuaranteedRowValue(string fieldName)
{
object ret = null;
if (WasDeleted)
{
ret = columnValues[fieldName];
}
else
{
if (row.RowState == DataRowState.Deleted)
{
throw new DataTableTransactionException(
"Row has been deleted and there is no saved column values.");
}
ret = row[fieldName];
}
return ret;
}
The columnValues
property is populated when the record is deleted, as mentioned above, for all transaction records that reference the row being deleted. This is the refactoring of the OnRowDeleting
handler:
protected void OnRowDeleting(object sender, DataRowChangeEventArgs e)
{
if (doLogging)
{
DataTableTransactionRecord record;
record = new DataTableTransactionRecord(transactions.Count, e.Row,
DataTableTransactionRecord.RecordType.DeleteRow);
record.SaveRowFields(e.Row);
OnTransactionAdding(new TransactionEventArgs(record));
transactions.Add(record);
Dictionary<string, object> colVals = record.ColumnValues;
for (int i = 0; i < transactions.Count - 1; i++)
{
if (transactions[i].Row == e.Row)
{
transactions[i].ColumnValues = colVals;
}
}
OnTransactionAdded(new TransactionEventArgs(record));
}
}
I'm not particularly thrilled with this implementation simply because it requires iterating through the transaction list. Well, optimizations can be done later, right?
DataTablePKTransactionRecord
This class is derived from DataTableTransactionRecord
and overrides the Apply
method. As mentioned above, the primary key values of a row must be used rather than the DataRow
instance.
Implementation
The Apply
method is implemented as follows:
public override DataRow Apply(DataView dataView)
{
DataTable sourceTable=dataView.Table;
if (row != null)
{
base.Apply(dataView);
}
else
{
switch (transType)
{
case RecordType.NewRow:
row = sourceTable.NewRow();
SetPKFieldValues();
sourceTable.Rows.Add(row);
break;
case RecordType.DeleteRow:
row = FindRow(dataView);
row.Delete();
break;
case RecordType.ChangeField:
row = FindRow(dataView);
row[columnName] = newValue;
break;
}
}
return null;
}
The call to SetPKFieldValues
:
protected void SetPKFieldValues()
{
foreach (KeyValuePair<string, object> kvp in pkFieldNameValues)
{
row[kvp.Key] = kvp.Value;
}
}
is necessary so that a new row can be located after it has been added. This results in some redundancy in transactions that set the primary key values even though they've already been set here. A possible refactoring would be to accumulate all the field change transactions and apply them all at once when adding a new row. That would involve an implementation that has access to all the transaction records, whereas what I'm focusing on here is applying transaction records one at a time. This is an important step as it validates the implementation for the basic requirements.
The FindRow
method sets up the primary key field values as determined by the packet's primary key field/value dictionary:
protected DataRow FindRow(DataView dataView)
{
if ((dataView.Sort == null) || (dataView.Sort == String.Empty))
{
throw new DataTableSynchronizationManagerException("The transaction
logger's SetupSort method must be called before synchronization.");
}
object[] pks = new object[pkValues.Count];
pkValues.Values.CopyTo(pks, 0);
int idx = dataView.Find(pks);
if (idx < 0)
{
throw new DataTableSynchronizationManagerException("Could not find row
to update.");
}
return dataView[idx].Row;
}
DataTableSynchronizationManager
The synchronization manager interfaces with the DataTableTransactionLog
instance to acquire the transactions and convert them into TransactionRecordPacket
instances. It also does the reverse--taking a collection of TransactionRecordPacket
instances and adding them to the logger's transaction collection.
Implementation
GetTransactions
returns a list of TransactionRecordPacket
instances:
public List<TransactionRecordPacket> GetTransactions()
{
if (logger.SourceTable.PrimaryKey == null)
{
throw new DataTableTransactionException(
"GetTransactions requires at least one PK.");
}
List<TransactionRecordPacket> packets = new List<TransactionRecordPacket>();
foreach (DataTableTransactionRecord record in logger.Log)
{
TransactionRecordPacket trp = new TransactionRecordPacket(record);
packets.Add(trp);
}
return packets;
}
These can then be applied to a transaction of the logger managing the mirrored DataTable
:
public void AddTransactions(List<TransactionRecordPacket> transactionPackets)
{
foreach (TransactionRecordPacket trp in transactionPackets)
{
logger.Log.Add(new DataTablePKTransactionRecord(trp));
}
}
Note how the synchronization manager adds the instances of the specialized DataTablePKTransactionRecord
class. This is so that the Appy
method can be overridden so that primary key values are used to locate rows rather than the DataRow
instance itself.
Lastly, the synchronization manager implements two methods, SetupSort
and Sync
, the latter of which is used to apply all transactions added to the transaction log. These are expected to be exclusively DataTablePKTransactionRecord
instances:
public void Sync()
{
SetupSort();
logger.SuspendLogging();
foreach (DataTableTransactionRecord record in logger.Log)
{
if (!(record is DataTablePKTransactionRecord))
{
throw new DataTableSynchronizationManagerException("Expected a record
of type DataTablePKTransactionRecord.");
}
record.Apply(logger.DataView);
}
logger.ResumeLogging();
}
The SetupSort
method will set up the DataView.Sort
property so that the DataView.Find
method can be used to find rows based on their primary keys. For this to work, the DataColumn
instances in the DataTable
's Column
collection must be initialized in the same order:
protected void SetupSort()
{
if (logger.SourceTable.PrimaryKey == null)
{
throw new DataTableTransactionException(
"GetTransactions requires at least one PK.");
}
string sortBy = String.Empty;
string comma = String.Empty;
foreach (DataColumn dc in logger.SourceTable.PrimaryKey)
{
sortBy += comma + dc.ColumnName;
comma = ", ";
}
logger.DataView.Sort = sortBy;
}
Unit Tests
I've created a unit test for both the core logger functionality and the synchronization manager.
Transaction Logger
The transaction logger unit tests are a sequential set of unit tests, runnable using my Advanced Unit Test engine.
using System;
using System.Collections.Generic;
using System.Data;
using Vts.UnitTest;
using Clifton.Data;
namespace TransactionLoggerUnitTests
{
[TestFixture]
[ProcessTest]
public class LoggerTests
{
protected DataTable dt;
protected DataTableTransactionLog dttl;
protected DataRow row;
[TestFixtureSetUp]
public void FixtureSetup()
{
dt = new DataTable();
dt.Columns.Add(new DataColumn("LastName", typeof(string)));
dt.Columns.Add(new DataColumn("FirstName", typeof(string)));
dttl = new DataTableTransactionLog();
dttl.SourceTable = dt;
}
[Test, Sequence(1)]
public void NewRow()
{
row = dt.NewRow();
Assertion.Assert(dttl.Log.Count == 1, "Expected one entry.");
Assertion.Assert(dttl.Log[0].TransactionType ==
DataTableTransactionRecord.RecordType.NewRow,
"Expected new row transaction.");
}
[Test, Sequence(2)]
public void SetFields()
{
row["LastName"] = "Clifton";
row["FirstName"] = "Marc";
dt.Rows.Add(row);
Assertion.Assert(dttl.Log.Count == 3, "Expected three entries.");
Assertion.Assert(dttl.Log[1].TransactionType ==
DataTableTransactionRecord.RecordType.ChangeField,
"Expected change field transaction.");
Assertion.Assert(dttl.Log[2].TransactionType ==
DataTableTransactionRecord.RecordType.ChangeField,
"Expected change field transaction.");
Assertion.Assert(dttl.Log[1].NewValue.ToString()=="Clifton",
"Incorrect new value.");
Assertion.Assert(dttl.Log[2].NewValue.ToString() == "Marc",
"Incorrect new value.");
}
[Test, Sequence(3)]
public void CollectNothing()
{
dttl.CollectUncommittedRows();
Assertion.Assert(dt.Rows.Count == 1, "Committed row was collected!");
}
[Test, Sequence(4)]
public void CollectUncommitted()
{
dt.NewRow();
dttl.CollectUncommittedRows();
Assertion.Assert(dttl.Log.Count == 3, "Expected three entries.");
}
[Test, Sequence(5)]
public void RevertFirstNameChange()
{
dttl.Revert(2);
Assertion.Assert(dt.Rows[0]["LastName"].ToString() ==
"Clifton", "Incorrect value.");
Assertion.Assert(dt.Rows[0]["FirstName"]==DBNull.Value,
"Incorrect new value.");
}
[Test, Sequence(6)]
public void RevertLastNameChange()
{
dttl.Revert(1);
Assertion.Assert(dt.Rows[0]["LastName"] == DBNull.Value,
"Incorrect value.");
Assertion.Assert(dt.Rows[0]["FirstName"] == DBNull.Value,
"Incorrect new value.");
}
[Test, Sequence(7)]
public void RevertNewRowChange()
{
dttl.Revert(0);
Assertion.Assert(dt.Rows.Count == 0, "Row should have been deleted.");
}
[Test, Sequence(8)]
public void ApplyNewRow()
{
dttl.Apply(0);
Assertion.Assert(dt.Rows.Count == 1, "Row was not added.");
Assertion.Assert(dt.Rows[0]["LastName"] == DBNull.Value,
"Incorrect value.");
Assertion.Assert(dt.Rows[0]["FirstName"] == DBNull.Value,
"Incorrect new value.");
}
[Test, Sequence(9)]
public void ApplyLastName()
{
dttl.Apply(1);
Assertion.Assert(dt.Rows.Count == 1, "Row was not added.");
Assertion.Assert(dt.Rows[0]["LastName"].ToString() == "Clifton",
"Incorrect value.");
Assertion.Assert(dt.Rows[0]["FirstName"] == DBNull.Value,
"Incorrect new value.");
}
[Test, Sequence(10)]
public void ApplyFirstName()
{
dttl.Apply(2);
Assertion.Assert(dt.Rows.Count == 1, "Row was not added.");
Assertion.Assert(dt.Rows[0]["LastName"].ToString() == "Clifton",
"Incorrect value.");
Assertion.Assert(dt.Rows[0]["FirstName"].ToString() == "Marc",
"Incorrect new value.");
}
}
}
Synchronization Manager
The unit test for the synchronization manager illustrates the basic process for synchronizing two DataTable
instances:
using System;
using System.Collections.Generic;
using System.Data;
using Vts.UnitTest;
using Clifton.Data;
namespace TransactionSyncrhonizationManagerUnitTests
{
[TestFixture]
[ProcessTest]
public class CurrencyTests
{
protected DataTable dt1;
protected DataTableTransactionLog dttl1;
protected DataTable dt2;
protected DataTableTransactionLog dttl2;
[TestFixtureSetUp]
public void FixtureSetup()
{
dt1 = new DataTable();
dt1.Columns.Add(new DataColumn("PK", typeof(Guid)));
dt1.Columns.Add(new DataColumn("LastName", typeof(string)));
dt1.Columns.Add(new DataColumn("FirstName", typeof(string)));
dt1.PrimaryKey = new DataColumn[] { dt1.Columns["PK"] };
dttl1 = new DataTableTransactionLog(dt1);
dt2 = dt1.Clone();
dttl2 = new DataTableTransactionLog(dt2);
DataRow row=dt1.NewRow();
row["PK"]=Guid.NewGuid();
row["LastName"]="Clifton";
row["FirstName"]="Marc";
dt1.Rows.Add(row);
row=dt1.NewRow();
row["PK"]=Guid.NewGuid();
row["LastName"]="Linder";
row["FirstName"]="Karen";
dt1.Rows.Add(row);
row=dt1.NewRow();
row["PK"]=Guid.NewGuid();
row["LastName"]="Doe";
row["FirstName"]="John";
dt1.Rows.Add(row);
dt1.Rows[2].Delete();
}
[Test, Sequence(1)]
public void UpdateMirror()
{
DataTableSynchronizationManager dtcm1 =
new DataTableSynchronizationManager(dttl1);
List<TransactionRecordPacket> trpList = dtcm1.GetTransactions();
DataTableSynchronizationManager dtcm2 =
new DataTableSynchronizationManager(dttl2);
dtcm2.AddTransactions(trpList);
dtcm2.Sync();
Assertion.Assert(dt2.Rows.Count == 2, "Expected 2 rows.");
Assertion.Assert(dt2.Rows[0]["LastName"].ToString() == "Clifton",
"Unexpected value");
Assertion.Assert(dt2.Rows[0]["FirstName"].ToString() == "Marc",
"Unexpected value");
Assertion.Assert(dt2.Rows[1]["LastName"].ToString() == "Linder",
"Unexpected value");
Assertion.Assert(dt2.Rows[1]["FirstName"].ToString() == "Karen",
"Unexpected value");
}
}
}
You will note how both the DataTable
instances are set up with the same column structure. The transaction packet collection, acquired from the DataTableSynchronizationManager
that interfaces to the first logger, is passed to a second DataTableSynchronizationManager
instance that interfaces with the second logger. The packets are added and the Sync
method is called. Also note that because the two tables are intended to be synchronized, both implement a primary key column now.
Conclusion
The DataTableSynchronizationManager
is a useful component in synchronizing the data between two remote DataTable
instances. The transaction packet list is suitable for serialization, using my RawSerialization library, which would provide for a compact format to send across a network.