Introduction
Data transfer is a very common business process. Most developers are familiar with finding ways of taking data from a remote source and writing it to a local database.
It is often possible, and even appropriate, to load the remote data into memory and then write it to your database using some kind of data adapter. But what if you need
to transfer a very large amount of data from your remote source? The larger the amount of data, the less likely you will be to want to load it into memory,
and the more likely you will be to want to use some kind of forward-only streaming technique to load the data dynamically as the stream is being read.
Fortunately, ADO.NET provides a useful tool for doing just that: the SqlBulkCopy
class.
SqlBulkCopy
is associated with a single database table, identified by the DestinationTable
property:
SqlBulkCopy bulkCopy = new SqlBulkCopy(myConnection);
bulkCopy.DestinationTableName = "MyTable";
To work its magic, SqlBulkCopy
uses the WriteToServer
method, which takes four overloads. Three out of the four overloads involve
DataRow
s and DataTable
s, which suggests that data has already been extracted from somewhere and packaged into a DataTable
.
Since we are concerned in this case with transferring large amounts of data, these overloads are less interesting to us than the one that takes an IDataReader
:
void SqlBulkCopy.WriteToServer(IDataReader reader)
You will need to create your own class to implement System.Data.IDataReader
. To get this right, you need to understand the fairly straightforward way
in which SqlBulkCopy.WriteToServer
reads your IDataReader
-implementing object:
- It checks the
IsClosed
flag, terminating if the flag is set to true
. - It calls the
Read()
method and checks the return value. - If the
Read()
method returned true, it uses the property accessors and the GetValues()
method
to write the current values into the table specified by DestinationTableName
. - It repeats until termination.
It is your responsibility, then, to ensure that your IDataReader
object walks through its data source appropriately in response to the Read()
calls,
and sets the IsClosed
flag when the data source has been exhausted.
If you are lucky enough for your data source to correspond to a single table in your database (which may happen if the data has been published to a single large
CSV file, for example), you can see how this can enable you to perform very fast uploads of large amounts of data using a forward-only reader. Just create a class that
implements IDataReader
, and pass your data source to that class as a stream reader. At each call to Read()
, you advance your stream reader
to the next record, whilst writing its contents into the containers that are accessed by this[int i]
, this[string name]
, and GetValues(object[] values)
.
So far so good. There are some very good demonstrations of how to do this online: type SqlBulkCopy
into a search engine and you will find several of them straight away.
But what if you are not lucky enough for your data source to correspond to a single table? What then? As I mentioned parenthetically, your data provider could publish a single
large CSV file, in which case you can attach a reader to your custom IDataReader
object, which you then give to a single SqlBulkCopy
object
and everything works smoothly. However, it is equally common, especially with large and complex data repositories, for publishers to share their data in XML or JSON format.
In this case, you may have to write your data to multiple tables, and ideally you would like to do this with a single forward-only read of your data source.
You can't do that with a single SqlBulkCopy
object. Once you have set the DestinationTableName
property, SqlBulkCopy
looks only at that table and no other. Strangely, you can change the DestinationTableName
property after its original assignment, and SqlBulkCopy
does not complain. It just says, "Thank you very much", and carries on as if nothing had changed. Regardless, since one bulk copy object can write
to at most one table, we know we will need multiple SqlBulkCopy
objects.
Life gets simpler if you relax the requirement for only a single read. If you are populating three tables, you can read the same data source three consecutive times,
pointing at each table in turn. But can we use .NET's parallel libraries to run multiple SqlBulkCopy
objects simultaneously, and achieve our objective
of populating multiple tables in a single forward-only read?
Yes we can, and that is the subject of this article.
The Example
As an example, I am going to use a data source I am all too familiar with: the timetable of my night bus home. This is published by Transport
for London in TransXChange format. I am not going to provide a full database schema corresponding to the TransXChange format,
which would enable you to read the entire timetable from your local database. That is left as an exercise for the dedicated reader. I am, however, going to show,
for illustration purposes, how the data from my sample timetable document can be streamed into a sample database containing two skeletal tables:
In the example code, I create and destroy a sample database, nominally entitled SampleXmlData
. When running the example, all you need to do
is to customise the connection string:
Data Source=<YOUR DB SERVER>;Initial Catalog=master;Integrated Security=SSPI
from the WinForms TextBox
.
The setup and teardown operations, create and destroy the sample database. When you have performed the transfer, which should run very quickly because the example
transfers a small amount of data, feel free to check that the TxcRoute and TxcRouteLink tables are populated as expected.
The Strategy
As with so many interesting problems, the design of the solution becomes apparent if you state the requirements clearly enough. We need to write data to multiple tables,
whilst reading that data from a single XML source. Therefore we create two classes: DatabaseTableWriter
and XmlDataReader
. In the code, one single
XmlDataReader
object reads the XML data source, while multiple DatabaseTableWriter
objects use its output to write to their own database tables.
The DatabaseTableWriter
s get their data from the XmlDataReader
as it walks through the XML file that has been passed to it from an external source.
The crux of the solution is the XmlDataReader
's walkThroughTheXml()
method, in which it reads the current data using a forward-only
System.Xml.XmlReader
object and shares the results with its DatabaseTableWriter
s:
private void walkThroughTheXml()
{
while (!xmlReader.EOF && !(xmlReader.ReadState == ReadState.Closed) && !cancelled)
{
WaitHandle.WaitAll(ResetEventWrapper.getWaitHandles(tableWriterResetEvents));
try
{
if (xmlReader.ReadState == ReadState.Closed)
break;
if (xmlReader.ReadState == ReadState.Error)
throw new Exception("The reader has encountered an internal error");
read();
}
catch (Exception ex)
{
Dispose();
throw ex;
}
finally
{
signalAllTableWriters();
}
if (cancelled)
xmlReader.Close();
}
}
The DatabaseTableWriter
objects, which implement the IDataReader
interface, are being controlled by their own SqlBulkCopy
objects,
each on its own thread. If we are writing data to N tables, then there will be N + 1 threads running in parallel: one for each SqlBulkCopy.WriteToServer()
implementation,
and one for the XmlDataReader
's walkThroughTheXml()
method. In the read()
method, the walkthrough thread works out where it is in the XML tree,
and if it corresponds to one of the DatabaseTableWriter
objects, it passes the XML node to that object, which knows the structure of its own table and can use the node
to populate its internal values:
private void read()
{
bool tableElementIsPopped = false;
bool columnNameIsPopped = false;
populatedTableWriter = null;
do
{
pushElementStacks();
DatabaseTableWriter columnElementWriter;
if (readerIsAtTheStartOfAColumnElement(out columnElementWriter))
columnElementWriter.readColumnValue(xmlReader, columnNameStack);
else
xmlReader.Read();
popElementStacks(out tableElementIsPopped, out columnNameIsPopped);
if (tableElementIsPopped)
return;
}
while (!xmlReader.EOF && !(xmlReader.ReadState == ReadState.Closed));
}
The important thing to note is that the hard work, in which the XmlDataReader
maintains its own internal element stack and tells the appropriate
DatabaseTableWriter
to populate itself, is all being done on the walkthrough thread. In the SqlBulkCopy.WriteToServer()
threads, life is much easier.
Here is what happens in the DatabaseTableWriter.Read()
thread:
public bool Read()
{
clearRowValues();
while (!IsClosed && !cancelled)
{
if (!WriterResetEvent.set())
return false;
if (!ReaderResetEvent.wait())
return false;
if (!cancelled && dataReader.canRead(this))
{
++rowCount;
return true;
}
}
return false;
}
In the WriterResetEvent.set()
method, which is my wrapper for the System.Threading.AutoResetEvent.Set()
method, the thread signals the walkthrough
thread, telling it that it is ready to read its data. It then waits for the walkthrough thread to perform the hard work of reading the current XML element. If the walkthrough
has just read a node which corresponds exactly to this table, then the dataReader
object will return true, and, as per the implementation
of SqlBulkCopy.WriteToServer()
, the writer thread will read the values which have been generated by the walkthrough thread and write them to its database table.
Otherwise the Read()
method returns false, and the writer thread continues to search for data corresponding to its table.
Because the writer threads are not responsible for interpreting the XML, and their only job is to write previously generated data into their respective tables, they should
be expected to run very quickly. If alarm bells ring because of the blocking
WaitHandle.WaitAll(ResetEventWrapper.getWaitHandles(tableWriterResetEvents))
call
in the walkthrough thread, do not be too alarmed. Although we are waiting from a signal from each of our N threads, only one of those N threads is going to have any
data to publish to the database, because the
xmlReader
object can be at the end of at most one node at any one time. And, given that we
are using
SqlBulkCopy
because of its speed, we can expect that one write operation to execute very quickly indeed. Therefore even if we are reading
a stream directly from a web server, we will not be blocking access to that server.
All this throws up one remaining question: how do the DatabaseTableWriter
objects know the structure of the table they're writing to?
When the walkthrough thread interrogates them, how do they know what to do? I encapsulate this logic into another XML file (called txcXmlReader
in the example) which specifies instructions for each table: what the database table name is, which XML node or nodes it corresponds to in the data source,
how often to notify the caller while the transfer is being performed, and how many rows to expect. The nifty feature of this approach is that you can read
the instruction file from anywhere at any time, and so you may not have to recompile the code in order to introduce a new set of tables. Also, the business logic
lives entirely in the instruction file: the DatabaseTableWriter
objects just blindly follow these instructions and populate their tables accordingly.
If you want to tweak this code, then this is where you can start to use your imagination. You can extend this instruction meta-language, so that the instruction file
can specify the location of the source data, and the method you might use to extract it. If you are working on a server, your instruction file can specify a schedule,
and with a little more coding, you can get to a place where you can introduce new schedules, doing new jobs, loading different data, without having to change a single
line of code, or even restart your server.
To illustrate how the instruction files are read, let's have a close look at the txcXmlReader
file. It reads as follows:
<XmlParser>
<Table XmlTableName="RouteSection">
<Column XmlColumnName="RouteSection" ValueAttribute="id" ColumnType="System.String" />
<Table XmlTableName="RouteLink" DatabaseTableName="TxcRouteLink"
expectedRowCount="265" notifyAfter="10">
<Column XmlColumnName="RouteLink" DatabaseColumnName="RouteLinkID"
ValueAttribute="id" ColumnType="System.String" />
<Column XmlFormula="../RouteSection"
DatabaseColumnName="RouteSectionRef" ColumnType="System.String" />
<Column XmlColumnName="From.StopPointRef"
DatabaseColumnName="FromID" ColumnType="System.String" />
<Column XmlColumnName="To.StopPointRef"
DatabaseColumnName="ToID" ColumnType="System.String" />
<Column XmlColumnName="Direction" DatabaseColumnName="Direction"
ColumnType="System.String" Converters="FirstLetter,UpperCase" />
</Table>
</Table>
<Table XmlTableName="Route" DatabaseTableName="TxcRoute"
expectedRowCount="4" notifyAfter="1">
<Column XmlColumnName="Route" DatabaseColumnName="RouteID"
ValueAttribute="id" ColumnType="System.String" />
<Column XmlColumnName="PrivateCode"
DatabaseColumnName="PrivateCode" ColumnType="System.String" />
<Column XmlColumnName="Description"
DatabaseColumnName="Description" ColumnType="System.String" />
<Column XmlColumnName="RouteSectionRef"
DatabaseColumnName="RouteSectionRef" ColumnType="System.String" />
</Table>
</XmlParser>
The first thing to notice about this file, although it is a fairly trite observation, is that it is short. It is always going to be short: this is a set of instructions,
not a data repository. This means that we don't have to use a blunt instrument like an XmlReader
to read it: we can load it into memory using the LINQ-enabled
XReader.Load()
method, which gives us access to all the funky tricks and syntactic gymnastics of LINQ to XML.
The next thing to notice is the nested structure of the <Table>
elements. There are three such elements in the txcXmlReader
file,
but only two database tables are to be populated: TxcRouteLink and TxcRoute. These correspond to the DatabaseTableName
attributes in the <Table>
elements. It stands to reason, then, that the interpreter generates one SqlBulkCopy
object not for each
<Table>
element, but for each DatabaseTableName
attribute on a <Table>
element.
While they do not all have DatabaseTableName
attributes, the <Table>
elements do all have XmlTableName
attributes.
This suggests that each <Table>
element corresponds to the family of X
elements in the target stream, where X
is the value
of the <Table>
element's XmlTableName
attribute.
The nesting of the <Table>
elements suggests something about the structure of the target stream. If a <Table XmlTableName="Y" ... />
node
is nested inside a <Table XmlTableName="X" ... />
node in the instruction file, then it is reasonable to infer that the <Y>
nodes
are nested inside the <X>
nodes in the target stream.
This would make sense of the ../RouteSection
notation in the nested RouteLink
table in the example:
<column xmlformula="../RouteSection"
databasecolumnname="RouteSectionRef" columntype="System.String" />
It also suggests something about the behaviour of the DatabaseTableWriter
objects. It seems there is indeed one for each <Table>
element in the instruction file, but not all of them correspond to SqlBulkCopy
objects. Some are there just to hold data and to respond to queries from
table writers corresponding to the children of their equivalent <Table>
elements.
If the DatabaseTableWriter
objects do have a parent-child structure corresponding to that of the <Table>
elements,
then a DatabaseTableWriter
object can interrogate its parent if some of its column data has already been read by the XmlReader
in the walkthrough thread. This can be confirmed by looking at the popTableElementStack()
method in the walkthrough thread, which is called when
the XmlReader
has finished reading the contents of a table row element:
private void popTableElementStack(out bool tableElementIsPopped)
{
if (tableElementIsPopped = xmlReaderIsAtTheEndOfTheCurrentTableElement())
{
CurrentTableWriter.evaluateFormulaColumns();
populatedTableWriter = tableWriterStack.Pop();
}
}
The walkthrough thread instructs the current DatabaseTableWriter
object to evaluate its own formula columns, which it duly does:
public void evaluateFormulaColumns()
{
foreach (string formula in FormulaColumns.Keys)
rowValues[FormulaColumns[formula].ColumnIndex] =
evaluateFormula(FormulaColumns[formula]);
}
private object evaluateFormula(DatabaseColumn databaseColumn)
{
string path = databaseColumn.FormulaName;
List<string> pathComponents = StringUtils.splitAndTrim(path, '/').ToList();
DatabaseTableWriter currentTableWriter = this;
for (int i = 0; i < pathComponents.Count; ++i)
if (pathComponents[i].Equals(".."))
currentTableWriter = currentTableWriter.parentTableWriter;
else
return currentTableWriter.rowValues[currentTableWriter.XmlColumns[pathComponents[i]].ColumnIndex];
return null;
}
As predicted, the evaluateFormula
method walks up the DatabaseTableWriter
tree in response to the ../
instruction.
If you need to introduce any other kinds of formulae into your interpreter's lexicon, this is a place to do it, but I would caution against doing any over-elaborate
processing here: remember that we are working on the rate-limiting walkthrough thread, and we have a vested interest in keeping things lean and mean in this part of the code.
Looking back at the txcXmlReader
file, all this leads us to infer is, again very reasonably, that in the TransXChange format, each <RouteLink>
node is nested inside a <RouteSection>
node, and that the TxcRouteLink table reads its row data not just from the <RouteLink>
node
and its children, but also from its parent <RouteSection>
node. I can confirm that this is true, but I also invite you to verify it by downloading the project
and looking through the SampleData/busTimeTable.xml file in the Rob.DataTransfer project.
Further Investigation
One of the limitations of working with the SqlBulkCopy
class is that you can't run two SqlBulkCopy.WriteToServer()
methods simultaneously
on the same connection: you have to run them all on separate connections. If you want to run an upload in a single transaction, this limitation will unfortunately
prevent you from doing so. However, I'm not convinced that this is a problem: usually, when you are loading large amounts of data, you will want to load it up into
a staging area and switch over only when the process has gone through successfully and you have thoroughly tested the integrity of your new data. If something goes
wrong mid-process (for example, you lose your connection to the remote source), you can simply clean out your staging area and start again.
Once again, writing the code to implement the staging, verification, and cleanup processes will be left as an exercise for the dedicated reader.
Happy coding!
History
2011/09/30
Added a detailed description of the way the instruction files are read and used by the DatabaseTableWriter
objects, and a close examination of the structure
of the txcXmlReader
file.
2011/10/01
Cleaned up the code files, and added exception handling and connection string persistence to the WinForms app.