Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / desktop / WinForms

Transferring Data from a Single XML Stream into Multiple Tables with One Forward-Only Read

4.75/5 (12 votes)
30 Sep 2011CPOL13 min read 44.1K   764  
Using parallel implementations of SqlBulkCopy to achieve fast data transfer from a single XML source into multiple tables.

Introduction

Data transfer is a very common business process. Most developers are familiar with finding ways of taking data from a remote source and writing it to a local database. It is often possible, and even appropriate, to load the remote data into memory and then write it to your database using some kind of data adapter. But what if you need to transfer a very large amount of data from your remote source? The larger the amount of data, the less likely you will be to want to load it into memory, and the more likely you will be to want to use some kind of forward-only streaming technique to load the data dynamically as the stream is being read. Fortunately, ADO.NET provides a useful tool for doing just that: the SqlBulkCopy class.

SqlBulkCopy is associated with a single database table, identified by the DestinationTable property:

C#
SqlBulkCopy bulkCopy = new SqlBulkCopy(myConnection);
bulkCopy.DestinationTableName = "MyTable";

To work its magic, SqlBulkCopy uses the WriteToServer method, which takes four overloads. Three out of the four overloads involve DataRows and DataTables, which suggests that data has already been extracted from somewhere and packaged into a DataTable. Since we are concerned in this case with transferring large amounts of data, these overloads are less interesting to us than the one that takes an IDataReader:

C#
void SqlBulkCopy.WriteToServer(IDataReader reader)

You will need to create your own class to implement System.Data.IDataReader. To get this right, you need to understand the fairly straightforward way in which SqlBulkCopy.WriteToServer reads your IDataReader-implementing object:

  1. It checks the IsClosed flag, terminating if the flag is set to true.
  2. It calls the Read() method and checks the return value.
  3. If the Read() method returned true, it uses the property accessors and the GetValues() method to write the current values into the table specified by DestinationTableName.
  4. It repeats until termination.

It is your responsibility, then, to ensure that your IDataReader object walks through its data source appropriately in response to the Read() calls, and sets the IsClosed flag when the data source has been exhausted.

If you are lucky enough for your data source to correspond to a single table in your database (which may happen if the data has been published to a single large CSV file, for example), you can see how this can enable you to perform very fast uploads of large amounts of data using a forward-only reader. Just create a class that implements IDataReader, and pass your data source to that class as a stream reader. At each call to Read(), you advance your stream reader to the next record, whilst writing its contents into the containers that are accessed by this[int i], this[string name], and GetValues(object[] values). So far so good. There are some very good demonstrations of how to do this online: type SqlBulkCopy into a search engine and you will find several of them straight away.

But what if you are not lucky enough for your data source to correspond to a single table? What then? As I mentioned parenthetically, your data provider could publish a single large CSV file, in which case you can attach a reader to your custom IDataReader object, which you then give to a single SqlBulkCopy object and everything works smoothly. However, it is equally common, especially with large and complex data repositories, for publishers to share their data in XML or JSON format. In this case, you may have to write your data to multiple tables, and ideally you would like to do this with a single forward-only read of your data source.

You can't do that with a single SqlBulkCopy object. Once you have set the DestinationTableName property, SqlBulkCopy looks only at that table and no other. Strangely, you can change the DestinationTableName property after its original assignment, and SqlBulkCopy does not complain. It just says, "Thank you very much", and carries on as if nothing had changed. Regardless, since one bulk copy object can write to at most one table, we know we will need multiple SqlBulkCopy objects.

Life gets simpler if you relax the requirement for only a single read. If you are populating three tables, you can read the same data source three consecutive times, pointing at each table in turn. But can we use .NET's parallel libraries to run multiple SqlBulkCopy objects simultaneously, and achieve our objective of populating multiple tables in a single forward-only read?

Yes we can, and that is the subject of this article.

The Example

As an example, I am going to use a data source I am all too familiar with: the timetable of my night bus home. This is published by Transport for London in TransXChange format. I am not going to provide a full database schema corresponding to the TransXChange format, which would enable you to read the entire timetable from your local database. That is left as an exercise for the dedicated reader. I am, however, going to show, for illustration purposes, how the data from my sample timetable document can be streamed into a sample database containing two skeletal tables:

Image 1

In the example code, I create and destroy a sample database, nominally entitled SampleXmlData. When running the example, all you need to do is to customise the connection string:

Data Source=<YOUR DB SERVER>;Initial Catalog=master;Integrated Security=SSPI

from the WinForms TextBox.

The setup and teardown operations, create and destroy the sample database. When you have performed the transfer, which should run very quickly because the example transfers a small amount of data, feel free to check that the TxcRoute and TxcRouteLink tables are populated as expected.

Image 2

The Strategy

As with so many interesting problems, the design of the solution becomes apparent if you state the requirements clearly enough. We need to write data to multiple tables, whilst reading that data from a single XML source. Therefore we create two classes: DatabaseTableWriter and XmlDataReader. In the code, one single XmlDataReader object reads the XML data source, while multiple DatabaseTableWriter objects use its output to write to their own database tables.

The DatabaseTableWriters get their data from the XmlDataReader as it walks through the XML file that has been passed to it from an external source. The crux of the solution is the XmlDataReader's walkThroughTheXml() method, in which it reads the current data using a forward-only System.Xml.XmlReader object and shares the results with its DatabaseTableWriters:

C#
private void walkThroughTheXml()
{
    while (!xmlReader.EOF && !(xmlReader.ReadState == ReadState.Closed) && !cancelled)
    {
        WaitHandle.WaitAll(ResetEventWrapper.getWaitHandles(tableWriterResetEvents));

        try
        {
            // Anything that refers to xmlReader is vulnerable.
            // It can be tripped up by a poor connection.
            // In order to allow the parallel loops
            // to finish, we need the finally block
            // to ensure that signalling is completed,
            // even when the read() method falls over.
            if (xmlReader.ReadState == ReadState.Closed)
                break;

            if (xmlReader.ReadState == ReadState.Error)
                throw new Exception("The reader has encountered an internal error");

            read();
        }
        catch (Exception ex)
        {
            // This will dispose of all the table writers,
            // rendering their wait handles
            // obsolete and allowing all threads to terminate.
            Dispose();
            throw ex;
        }
        finally
        {
            signalAllTableWriters();
        }
        if (cancelled)
            xmlReader.Close();
    }
}

The DatabaseTableWriter objects, which implement the IDataReader interface, are being controlled by their own SqlBulkCopy objects, each on its own thread. If we are writing data to N tables, then there will be N + 1 threads running in parallel: one for each SqlBulkCopy.WriteToServer() implementation, and one for the XmlDataReader's walkThroughTheXml() method. In the read() method, the walkthrough thread works out where it is in the XML tree, and if it corresponds to one of the DatabaseTableWriter objects, it passes the XML node to that object, which knows the structure of its own table and can use the node to populate its internal values:

C#
private void read()
{
    bool tableElementIsPopped = false;
    bool columnNameIsPopped = false;
    populatedTableWriter = null;
    do
    {
        pushElementStacks();
        DatabaseTableWriter columnElementWriter;
        if (readerIsAtTheStartOfAColumnElement(out columnElementWriter))
            columnElementWriter.readColumnValue(xmlReader, columnNameStack);
        else
            xmlReader.Read();
        popElementStacks(out tableElementIsPopped, out columnNameIsPopped);
        if (tableElementIsPopped)
            return;
    }
    while (!xmlReader.EOF && !(xmlReader.ReadState == ReadState.Closed));
}

The important thing to note is that the hard work, in which the XmlDataReader maintains its own internal element stack and tells the appropriate DatabaseTableWriter to populate itself, is all being done on the walkthrough thread. In the SqlBulkCopy.WriteToServer() threads, life is much easier. Here is what happens in the DatabaseTableWriter.Read() thread:

C#
public bool Read()
{
    clearRowValues();
    while (!IsClosed && !cancelled)
    {
        if (!WriterResetEvent.set())
            return false;

        if (!ReaderResetEvent.wait())
            return false;

        if (!cancelled && dataReader.canRead(this))
        {
            ++rowCount;
            return true;
        }
    }
    return false;
}

In the WriterResetEvent.set() method, which is my wrapper for the System.Threading.AutoResetEvent.Set() method, the thread signals the walkthrough thread, telling it that it is ready to read its data. It then waits for the walkthrough thread to perform the hard work of reading the current XML element. If the walkthrough has just read a node which corresponds exactly to this table, then the dataReader object will return true, and, as per the implementation of SqlBulkCopy.WriteToServer(), the writer thread will read the values which have been generated by the walkthrough thread and write them to its database table. Otherwise the Read() method returns false, and the writer thread continues to search for data corresponding to its table.

Because the writer threads are not responsible for interpreting the XML, and their only job is to write previously generated data into their respective tables, they should be expected to run very quickly. If alarm bells ring because of the blocking WaitHandle.WaitAll(ResetEventWrapper.getWaitHandles(tableWriterResetEvents)) call in the walkthrough thread, do not be too alarmed. Although we are waiting from a signal from each of our N threads, only one of those N threads is going to have any data to publish to the database, because the xmlReader object can be at the end of at most one node at any one time. And, given that we are using SqlBulkCopy because of its speed, we can expect that one write operation to execute very quickly indeed. Therefore even if we are reading a stream directly from a web server, we will not be blocking access to that server.

All this throws up one remaining question: how do the DatabaseTableWriter objects know the structure of the table they're writing to? When the walkthrough thread interrogates them, how do they know what to do? I encapsulate this logic into another XML file (called txcXmlReader in the example) which specifies instructions for each table: what the database table name is, which XML node or nodes it corresponds to in the data source, how often to notify the caller while the transfer is being performed, and how many rows to expect. The nifty feature of this approach is that you can read the instruction file from anywhere at any time, and so you may not have to recompile the code in order to introduce a new set of tables. Also, the business logic lives entirely in the instruction file: the DatabaseTableWriter objects just blindly follow these instructions and populate their tables accordingly.

If you want to tweak this code, then this is where you can start to use your imagination. You can extend this instruction meta-language, so that the instruction file can specify the location of the source data, and the method you might use to extract it. If you are working on a server, your instruction file can specify a schedule, and with a little more coding, you can get to a place where you can introduce new schedules, doing new jobs, loading different data, without having to change a single line of code, or even restart your server.

To illustrate how the instruction files are read, let's have a close look at the txcXmlReader file. It reads as follows:

XML
<XmlParser>
  <Table XmlTableName="RouteSection">
    <Column XmlColumnName="RouteSection" ValueAttribute="id" ColumnType="System.String" />
    <Table XmlTableName="RouteLink" DatabaseTableName="TxcRouteLink" 
           expectedRowCount="265" notifyAfter="10">
      <Column XmlColumnName="RouteLink" DatabaseColumnName="RouteLinkID" 
           ValueAttribute="id" ColumnType="System.String" />
      <Column XmlFormula="../RouteSection" 
           DatabaseColumnName="RouteSectionRef" ColumnType="System.String" />
      <Column XmlColumnName="From.StopPointRef" 
           DatabaseColumnName="FromID" ColumnType="System.String" />
      <Column XmlColumnName="To.StopPointRef" 
           DatabaseColumnName="ToID" ColumnType="System.String" />
      <Column XmlColumnName="Direction" DatabaseColumnName="Direction" 
           ColumnType="System.String" Converters="FirstLetter,UpperCase" />
    </Table>
  </Table>
  <Table XmlTableName="Route" DatabaseTableName="TxcRoute" 
         expectedRowCount="4" notifyAfter="1">
    <Column XmlColumnName="Route" DatabaseColumnName="RouteID" 
         ValueAttribute="id" ColumnType="System.String" />
    <Column XmlColumnName="PrivateCode" 
         DatabaseColumnName="PrivateCode" ColumnType="System.String" />
    <Column XmlColumnName="Description" 
         DatabaseColumnName="Description" ColumnType="System.String" />
    <Column XmlColumnName="RouteSectionRef" 
         DatabaseColumnName="RouteSectionRef" ColumnType="System.String" />
  </Table>
</XmlParser>

The first thing to notice about this file, although it is a fairly trite observation, is that it is short. It is always going to be short: this is a set of instructions, not a data repository. This means that we don't have to use a blunt instrument like an XmlReader to read it: we can load it into memory using the LINQ-enabled XReader.Load() method, which gives us access to all the funky tricks and syntactic gymnastics of LINQ to XML.

The next thing to notice is the nested structure of the <Table> elements. There are three such elements in the txcXmlReader file, but only two database tables are to be populated: TxcRouteLink and TxcRoute. These correspond to the DatabaseTableName attributes in the <Table> elements. It stands to reason, then, that the interpreter generates one SqlBulkCopy object not for each <Table> element, but for each DatabaseTableName attribute on a <Table> element.

While they do not all have DatabaseTableName attributes, the <Table> elements do all have XmlTableName attributes. This suggests that each <Table> element corresponds to the family of X elements in the target stream, where X is the value of the <Table> element's XmlTableName attribute.

The nesting of the <Table> elements suggests something about the structure of the target stream. If a <Table XmlTableName="Y" ... /> node is nested inside a <Table XmlTableName="X" ... /> node in the instruction file, then it is reasonable to infer that the <Y> nodes are nested inside the <X> nodes in the target stream.

This would make sense of the ../RouteSection notation in the nested RouteLink table in the example:

XML
<column xmlformula="../RouteSection" 
  databasecolumnname="RouteSectionRef" columntype="System.String" />

It also suggests something about the behaviour of the DatabaseTableWriter objects. It seems there is indeed one for each <Table> element in the instruction file, but not all of them correspond to SqlBulkCopy objects. Some are there just to hold data and to respond to queries from table writers corresponding to the children of their equivalent <Table> elements.

If the DatabaseTableWriter objects do have a parent-child structure corresponding to that of the <Table> elements, then a DatabaseTableWriter object can interrogate its parent if some of its column data has already been read by the XmlReader in the walkthrough thread. This can be confirmed by looking at the popTableElementStack() method in the walkthrough thread, which is called when the XmlReader has finished reading the contents of a table row element:

C#
private void popTableElementStack(out bool tableElementIsPopped)
{
    if (tableElementIsPopped = xmlReaderIsAtTheEndOfTheCurrentTableElement())
    {
        CurrentTableWriter.evaluateFormulaColumns();
        populatedTableWriter = tableWriterStack.Pop();
    }
}

The walkthrough thread instructs the current DatabaseTableWriter object to evaluate its own formula columns, which it duly does:

C#
public void evaluateFormulaColumns()
{
    foreach (string formula in FormulaColumns.Keys)
        rowValues[FormulaColumns[formula].ColumnIndex] = 
         evaluateFormula(FormulaColumns[formula]);
}

private object evaluateFormula(DatabaseColumn databaseColumn)
{
    string path = databaseColumn.FormulaName;
    List<string> pathComponents = StringUtils.splitAndTrim(path, '/').ToList();
    DatabaseTableWriter currentTableWriter = this;
    for (int i = 0; i < pathComponents.Count; ++i)
        if (pathComponents[i].Equals(".."))
            currentTableWriter = currentTableWriter.parentTableWriter;
        else
            return currentTableWriter.rowValues[currentTableWriter.XmlColumns[pathComponents[i]].ColumnIndex];
    return null;
}

As predicted, the evaluateFormula method walks up the DatabaseTableWriter tree in response to the ../ instruction. If you need to introduce any other kinds of formulae into your interpreter's lexicon, this is a place to do it, but I would caution against doing any over-elaborate processing here: remember that we are working on the rate-limiting walkthrough thread, and we have a vested interest in keeping things lean and mean in this part of the code.

Looking back at the txcXmlReader file, all this leads us to infer is, again very reasonably, that in the TransXChange format, each <RouteLink> node is nested inside a <RouteSection> node, and that the TxcRouteLink table reads its row data not just from the <RouteLink> node and its children, but also from its parent <RouteSection> node. I can confirm that this is true, but I also invite you to verify it by downloading the project and looking through the SampleData/busTimeTable.xml file in the Rob.DataTransfer project.

Further Investigation

One of the limitations of working with the SqlBulkCopy class is that you can't run two SqlBulkCopy.WriteToServer() methods simultaneously on the same connection: you have to run them all on separate connections. If you want to run an upload in a single transaction, this limitation will unfortunately prevent you from doing so. However, I'm not convinced that this is a problem: usually, when you are loading large amounts of data, you will want to load it up into a staging area and switch over only when the process has gone through successfully and you have thoroughly tested the integrity of your new data. If something goes wrong mid-process (for example, you lose your connection to the remote source), you can simply clean out your staging area and start again. Once again, writing the code to implement the staging, verification, and cleanup processes will be left as an exercise for the dedicated reader.

Happy coding!

History

2011/09/30

Added a detailed description of the way the instruction files are read and used by the DatabaseTableWriter objects, and a close examination of the structure of the txcXmlReader file.

2011/10/01

Cleaned up the code files, and added exception handling and connection string persistence to the WinForms app.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)