Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / database / SQL-Server

Digging SSIS object model

4.83/5 (12 votes)
19 May 2007CPOL14 min read 1   990  
Describes SQL server 2005 Integration Service object model and different data flow components

Introduction

Microsoft SQL Server 2005 Integration Services (SSIS) is a platform for building high performance data integration solutions, including extraction, transformation, and load (ETL) packages for data warehousing. (read more from MSDN)

Business Intelligence (BI) Studio provides graphical tools for designing and debugging packages; tasks for performing workflow functions such as FTP operations, for executing SQL statements, or for sending e-mail messages; data sources and destinations for extracting and loading data; transformations for cleaning, aggregating, merging, and copying data. SSIS ships along with a rich set of and application programming interfaces (APIs) for programming the Integration Services object model.

Integration Services replaces Data Transformation Services (DTS), which was first introduced as a component of SQL Server 7.0.

In this article we will explore the SSIS APIs and how we can use them for our purpose.

Background

Couple of months ago, I had to work with SSIS APIs to implement a custom import wizard (similar to SQL server import wizard) and during that time I explored these APIs. I found that it contains a great improvement over its previous version (a.k.a. DTS).

Unfortunately I did not find adequate code sample on these APIs which could help me to run faster in my business way. And that's why I wrote an article explaining how we can programmatically create an SSIS package in Code Project. I personally received several emails asking me to explain more on this. And that's why I am writing another article on this.

A quick architectural overview

SSIS Architecture (source MSDN)

SSIS Architecture (image source : MSDN)

Integration services (SSIS) architecture consists of several containers and task components. These elements can be categorized in four sections:

  • Integration Services Service
  • Integration Services Object Model
  • Integration Services Runtime
  • Integration Services Data Flow

The first one, Integration Services service, a Windows service, monitors running SSIS packages and manages the storage of packages. It's available in SQL Server Management Studio.

SSIS object model is a managed set of APIs that allows us to write custom applications, command line utilities etc.

SSIS Runtime is an environment that runs the SSIS package, manages transactions etc. Runtime executables are the smallest part that runs into the environment. For example, packages, containers, tasks, event handlers all of these are runtime executables.

Data flow is the most important (and critical too) part of SSIS architecture. It contains a data flow engine that manages the data flow components. A significant number of data flow components ships along with the SSIS installation. And these can be categorized in three categories – source components (which extracts the data from a system), transformation components (performs transformations, modifications onto the extracted data) and load components (which simply performs the data loading tasks into the destination systems). Besides the available data flow components, we can write our own custom data flow components to accomplish any custom requirements.

Although the architectural codes are written in native language (C++, COM), we can use any CLR languages to start programming with these APIs.

For a detailed knowledge about the architecture of SSIS, please go to MSDN.

Start writing a pilot project

I believe, in order to understand the details it's always better to start implementing a "pilot" application first. Therefore let's implement a simple package. For this pilot project we will use C#.

Pilot Package

Pilot package into the designer

I am going to create a vanilla SSIS package that consists of a single data flow task. And the data flow task contains two data flow components. A source data flow component that reads a source (CSV file) and a destination component which loads the data into a destination (SQL server database - in this sample).

Dataflow task details

The dataflow components

Before writing any code, let's create a comma separated file which we will use as our data source. So create a file named as Sample.csv and paste the following data into the file and finally save it.

plain
"Name","Age","JoinDate","Salary","Retired" 
"Jon Tuite","45","2004-01-03 00:00:00","55.265","False" 
"Linda Hamilton","25","2002-01-03 00:00:00","155.265","False" 
"Sen Seldiver","42","2002-01-03 00:00:00,","458.2","True" 
"Rose Dalson","25","2004-01-03 00:00:00","55.265","False" 

It is a CSV file delimited by a comma. And the file decorated with double quote (") as a text qualifier.

Open Visual Studio .NET 2005 and create a new application (you can use Windows application or console application whatever you like). Now add some assembly references that are for SSIS object model. The assemblies are:

  • Microsoft.SQLServer.DTSPipelineWrap
  • Microsoft.SQLServer.DTSRuntimeWrap
  • Microsoft.SQLServer.ManagedDTS
  • Microsoft.SQLServer.PipelineHost

Our project reference will look something like this:

References

Create a new class named ImportPackage and append the following directives inside the ImportPackage.cs code file.

C#
using Microsoft.SqlServer.Dts.Runtime;
using PipeLineWrapper = Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using RuntimeWrapper = Microsoft.SqlServer.Dts.Runtime.Wrapper;

We are now going to declare some member variable that will be required later.

C#
private Package package;
private ConnectionManager flatFileConnectionManager;
private ConnectionManager destinationDatabaseConnectionManager;
private Executable dataflowTask;

Create a method named CreatePackage into the class.

C#
package = new Package();       // create a new package
package.CreationDate = DateTime.Now;// set the creation time stamp
package.ProtectionLevel = DTSProtectionLevel.DontSaveSensitive;
package.Name = "Some name";
package.Description = "A simple package";
package.DelayValidation = true;
package.PackageType =
Microsoft.SqlServer.Dts.Runtime.
DTSPackageType.DTSDesigner90;

So this method simply instantiates a new package variable and sets a value to some initial properties.

According to the diagram created in SSIS designer (that we have seen earlier) we will create a data flow task now. And the dataflow task will contain two data flow components (source and destination) – both components will require a connection component to extract and load data from and to the source and destination. So let's create the connection component for the source first.

It is worth mentioning one thing here, which is, for a flat file connection manager we need to explicitly create the source columns (the columns that are available into the CSV file). We are going to write a method CreateSourceColumns() which will serve this purpose. You might have already guessed that we need to read the available column names from the source file to do this. For this pilot application we are simply hardcoding these inside the code. But it should be read from the file using .NET IO APIs (you can use Microsoft.Jet.OLEDB.4.0 objects as an alternative of .NET IO APIs). In order to hard code the available column names, I am using a string collection as a member variable inside the class.

C#
private List<string> srcColumns = new List<string>();

and populating its value into the constructor method:

C#
public ImportPackage()
{
     srcColumns.Clear();
     // Apparently we are hardcoding this. But it is supposed to be
     // read from the source file using IO APIs
     srcColumns.Add("\"Name\"");
     srcColumns.Add("\"Age\"");
     srcColumns.Add("\"JoinDate\"");
     srcColumns.Add("\"Salary\"");
     srcColumns.Add("\"Retired\"");
}

private void CreateFlatFileConnection()
{
     string flatFileName = @"C:\Sample.csv";
     string FlatFileMoniker = @"FLATFILE";
     flatFileConnectionManager =
         package.Connections.Add(FlatFileMoniker);
     flatFileConnectionManager.ConnectionString =
         flatFileName;
     flatFileConnectionManager.Name =
         "SSIS Connection Manager for Files";
     flatFileConnectionManager.Description =
         string.Concat("SSIS Connection Manager");
     // Setting some common properties of the
     // connection manager object
     flatFileConnectionManager.Properties
              ["ColumnNamesInFirstDataRow"]
              .SetValue(flatFileConnectionManager,
              true);
     flatFileConnectionManager.
        Properties["Format"].
         SetValue(flatFileConnectionManager, "Delimited");
     flatFileConnectionManager.
        Properties["HeaderRowDelimiter"].
         SetValue(flatFileConnectionManager, "\r\n");
     // If user has been specified a text qualifier
     // then put it into the connection string property
     flatFileConnectionManager.Properties
      ["TextQualifier"].
       SetValue(flatFileConnectionManager, "\"");

     // create the source columns into the connection manager
     CreateSourceColumns();
}

private void CreateSourceColumns()
{
     // get the actual connection manager instance
     RuntimeWrapper.IDTSConnectionManagerFlatFile90
        flatFileConnection =
        flatFileConnectionManager.InnerObject as
      RuntimeWrapper.IDTSConnectionManagerFlatFile90;

     RuntimeWrapper.IDTSConnectionManagerFlatFileColumn90 column;
     RuntimeWrapper.IDTSName90 name;

     // trace the current count
     Debug.WriteLine(flatFileConnection.Columns.Count);

     foreach (String colName in srcColumns)
     {   // iterate
         // now create a new column for the connection manager
         column =
        flatFileConnection.Columns.Add();
          // if this is the last row
         if (srcColumns.IndexOf(colName) == (srcColumns.Count - 1))
             column.ColumnDelimiter =
             "\r\n";// add the row delimiter
         else
             column.ColumnDelimiter = ",";

         name = (RuntimeWrapper.IDTSName90)column;
         name.Name = colName.Replace("\"","");
         column.TextQualified = true;
         column.ColumnType =
            "Delimited";
         column.DataType =
         Microsoft.SqlServer.Dts.Runtime.
             Wrapper.DataType.DT_STR;
         column.ColumnWidth = 0;
         column.MaximumWidth = 255;
         column.DataPrecision = 0;
         column.DataScale = 0;
     }
}

You may have noticed that we are creating COM objects using monikers. Now we will create the connection manager object for the destination data flow component.

C#
private void CreateDestinationDatabaseConnection()
{
    string OleDBMoniker =
         @"OLEDB";
    string ConnectionString =
        @"Data Source=MOIM023;
       Initial Catalog=ImportDatabase;
       Integrated Security=SSPI;Provider=SQLNCLI;
       Auto Translate=false;";
    // Creating a connection using the oledb moniker
    destinationDatabaseConnectionManager
        = package.Connections.Add(OleDBMoniker);
    destinationDatabaseConnectionManager.ConnectionString
            = ConnectionString;
    destinationDatabaseConnectionManager.Name
            = "SSIS Connection Manager for Oledb";
    destinationDatabaseConnectionManager.Description
       = string.Concat("SSIS Connection Manager for OLEDB");
}

So far so good, it is time to create the data flow components. I am writing a method CreateDataFlowTask() which will basically create and initialize the data flow components.

Before going to write this data flow task creation method, let's create a class Column like the following:

C#
public class Column
{
    private string name;

    public string Name
    {
        get { return name; }
        set { name = value; }
    }
    private Microsoft.SqlServer.Dts.
        Runtime.Wrapper.DataType dataType;

    public Microsoft.SqlServer.Dts.
        Runtime.Wrapper.DataType DataType
    {
        get { return dataType; }
        set { dataType = value; }
    }
    private int length;

    public int Length
    {
        get { return length; }
        set { length = value; }
    }

    private int precision;

    public int Precision
    {
        get { return precision; }
        set { precision = value; }
    }

    private int scale;

    public int Scale
    {
        get { return scale; }
        set { scale = value; }
    }

    private int codePage = 0;

    public int CodePage
    {
        get { return codePage; }
        set { codePage = value; }
    }
}

Lets create a method into the ImportPackage class which is a utility method that helps to determine the target column's data type length etc.

C#
private Column GetTargetColumnInfo(string sourceColumnName)
{
    Column cl = new Column();
    if( sourceColumnName.Contains("Name"))
    {
        cl.Name = "Name";
        cl.DataType =
            Microsoft.SqlServer.Dts.Runtime.
            Wrapper.DataType.DT_STR;
        cl.Precision = 0;
        cl.Scale = 0;
        cl.Length = 255;
        cl.CodePage = 1252;
    }
    else if(
        sourceColumnName.Contains("Age"))
    {
        cl.Name = "Age";
        cl.DataType =
            Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_I4;
        cl.Precision = 0;
        cl.Scale = 0;
        cl.Length = 0;
    }
    else if( sourceColumnName.Contains("JoinDate"))
    {
        cl.Name = "JoinDate";
        cl.DataType =
            Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_DATE;
        cl.Precision = 0;
        cl.Scale = 0;
        cl.Length = 0;
    }
    else if( sourceColumnName.Contains("Salary"))
    {
        cl.Name = "Salary";
        cl.DataType =
            Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_NUMERIC;
        cl.Precision = 6;
        cl.Scale = 3;
        cl.Length = 0;
    }
    else if( sourceColumnName.Contains("Retired"))
    {
        cl.Name = "Retired";
        cl.DataType =
            Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_BOOL;
        cl.Precision = 0;
        cl.Scale = 0;
        cl.Length = 0;
    }
    return cl;
}

Now we can write the Data flow task creation method.

C#
private void CreateDataFlowTask()
{
    string dataFlowTaskMoniker = "DTS.Pipeline.1";
    dataflowTask = package.Executables.Add(dataFlowTaskMoniker);

Here we are creating a dataflow task executable by using a moniker. A data flow task is an executable that can contain one or more data flow components (i.e. source, transformation or destination components).

Generally all data flow components have two collection properties. One is Input collections and another is output collections. As the names indicates the input collection contains one or more inputs (can come from different sources), the output collection object contains one or more outputs (can provide data to different destinations) and one error output – which contains the errors that has taken place during the data manipulation inside the component.

Now we can create those data flow components under this dataflowTask object.

C#
PipeLineWrapper.IDTSComponentMetaData90 sourceComponent =
        ((dataflowTask as TaskHost).InnerObject as
        PipeLineWrapper.MainPipe).ComponentMetaDataCollection.New();
sourceComponent.Name
        = "Source File Component";
string SourceDataFlowComponentID =
        "{90C7770B-DE7C-435E-880E-E718C92C0573}";
// using the CLSID to instantiate
sourceComponent.ComponentClassID =
        SourceDataFlowComponentID;

The dataflow components are COM objects. When we invoke the TaskHost's ComponentMetaDataCollection.New(), it creates a generic component meta data object but still it is not populated with all specific properties that is necessary for the source component. The following two lines will actually create an instance of a specific data flow component (In this case a source component-according to the given CLSID).

C#
// load the COM for the given GUID
PipeLineWrapper.CManagedComponentWrapper
    managedFlatFileInstance
        = sourceComponent.Instantiate();
// get the populate the properties
managedFlatFileInstance.ProvideComponentProperties();

The source component needs to refer to the connection manager object that we created earlier.

C#
// putting the connection
if (sourceComponent.RuntimeConnectionCollection.Count > 0)
{   // If connection is necessary
    sourceComponent.RuntimeConnectionCollection[0]
        .ConnectionManagerID =
        flatFileConnectionManager.ID;
    sourceComponent.RuntimeConnectionCollection[0].
        ConnectionManager =
        DtsConvert.ToConnectionManager90
        (flatFileConnectionManager);
}
// establish a connection
managedFlatFileInstance.AcquireConnections(null);
// Initialize the metadata
managedFlatFileInstance.ReinitializeMetaData();

managedFlatFileInstance.ReinitializeMetaData() will read the input column names from the connection manager object and will automatically populate an input object inside its input collection object. You can check the Count property of the sourceComponent.InputCollection , you will find that it is 1. Now we need to create the mapping between the input and output columns – so that the component's output contains the values that it will read from the source file.

It is important to understand that, all the objects into a package have an id. In fact, each column (output or input whatever) has an ID that is unique to the scope of the package. I am going to use a dictionary object to keep track between each output column id and it's name. We will need this information later.

C#
Dictionary<string, int> outputColumnLineageIDs
    = new Dictionary<string,int>();

// create the mapping now
PipeLineWrapper.IDTSExternalMetadataColumn90 exOutColumn;
foreach (PipeLineWrapper.IDTSOutputColumn90 outColumn in
    sourceComponent.OutputCollection[0].OutputColumnCollection)
{   // create the MAP
    exOutColumn =
        sourceComponent.OutputCollection
        [0].ExternalMetadataColumnCollection[outColumn.Name];
    // map it
    managedFlatFileInstance.MapOutputColumn(
        sourceComponent.OutputCollection[0].ID,
        outColumn.ID, exOutColumn.ID, true);

    outputColumnLineageIDs.Add(outColumn.Name, outColumn.ID);
}
// Release the connection now
managedFlatFileInstance.ReleaseConnections();

So the source dataflow component is ready to use now. Let's create the destination dataflow component now. Before creating the data destination component we need to create the database and the datatable that will be the destination. So let's create the datatable using the following SQL scripts. (For the sake of simplicity we are creating the database manually – but it is supposed to be created programmatically as well. But it must exist before invoking the destination data flow component's ReinitializeMetaData(), otherwise it will generate an error.)

SQL
/****** Object:  Table [dbo].[DataTable]    ******/
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
SET ANSI_PADDING ON
GO
CREATE TABLE [dbo].[DataTable](
    [Name] [varchar](50) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    [Age] [varchar](50),
    [JoinDate] [varchar](50),
    [Salary] [varchar](50) NULL,
    [Retired] [varchar](50) NULL
) ON [PRIMARY]

GO
SET ANSI_PADDING OFF

Now we can create the data destination component.

C#
string OleDBDestinationDataFlowComponentID
    = "{E2568105-9550-4F71-A638-B7FE42E66922}";
PipeLineWrapper.IDTSComponentMetaData90
    datadestinationComponent =
    ((dataflowTask as TaskHost).InnerObject as
    PipeLineWrapper.MainPipe).ComponentMetaDataCollection.New();
datadestinationComponent.Name =
    "Data Destination Component";
datadestinationComponent.ComponentClassID =
    OleDBDestinationDataFlowComponentID;
// get the COM instance
managedOleInstanceDestinationComponent =
    datadestinationComponent.Instantiate();
// populate the properties
managedOleInstanceDestinationComponent
    .ProvideComponentProperties();

The destination dataflow component is created. We need to create a path between the source components output and destination components input so that the data that gets out from the source component will feed directly to the input of the destination component.

C#
PipeLineWrapper.IDTSPath90 pathDD
    = ((dataflowTask as TaskHost).InnerObject as
    PipeLineWrapper.MainPipe).PathCollection.New();
pathDD.AttachPathAndPropagateNotifications(
    sourceComponent.OutputCollection[0],
    destinationComponent.InputCollection[0]);

We need to prepare the output of the destination component so that the data can be written into the destination database.

C#
string DatabaseName = "ImportDatabase";
string TableName = "DataTable";
// setting the connection
if (destinationComponent
    .RuntimeConnectionCollection.Count > 0)
{   // If connection is necessary
    destinationComponent
    .RuntimeConnectionCollection[0].ConnectionManagerID =
        destinationDatabaseConnectionManager.ID;
    destinationComponent
    .RuntimeConnectionCollection[0].ConnectionManager =
        DtsConvert
        .ToConnectionManager90(destinationDatabaseConnectionManager);
}
// Set the custom properties
managedOleInstanceDestinationComponent.
SetComponentProperty(
    "AccessMode", 0);// Table of View mode
managedOleInstanceDestinationComponent.
    SetComponentProperty
    ("AlwaysUseDefaultCodePage", false);
managedOleInstanceDestinationComponent.
    SetComponentProperty
    ("DefaultCodePage", 1252);
managedOleInstanceDestinationComponent.
    SetComponentProperty(
    "FastLoadKeepIdentity", false);     // Fast load
managedOleInstanceDestinationComponent.
    SetComponentProperty(
    "FastLoadKeepNulls", false);
managedOleInstanceDestinationComponent.
    SetComponentProperty(
    "FastLoadMaxInsertCommitSize", 0);
managedOleInstanceDestinationComponent.
    SetComponentProperty(
    "FastLoadOptions",
    "TABLOCK,CHECK_CONSTRAINTS");
managedOleInstanceDestinationComponent.
    SetComponentProperty(
    "OpenRowset",
    string.Format("[{0}].[dbo].[{1}]",
    DatabaseName, TableName));

// Establish a connection
managedOleInstanceDestinationComponent.
    AcquireConnections(null);
// initialize the metadata
managedOleInstanceDestinationComponent.
    ReinitializeMetaData();
// Get the destination's default input and virtual input
PipeLineWrapper.IDTSInput90 input =
    destinationComponent.InputCollection[0];
PipeLineWrapper.IDTSVirtualInput90 vInput
    = input.GetVirtualInput();
// Iterate through the virtual input column collection
foreach (PipeLineWrapper.IDTSVirtualInputColumn90 vColumn
    in vInput.VirtualInputColumnCollection)
{
    if (outputColumnLineageIDs
        .ContainsKey(vColumn.LineageID))
    {   // if the column came from the derived
        // column dataflow component
        managedOleInstanceDestinationComponent.
            SetUsageType(
            input.ID, vInput,
            vColumn.LineageID,
            PipeLineWrapper.DTSUsageType.UT_READONLY);
    }
}

PipeLineWrapper.IDTSExternalMetadataColumn90 exColumn;
foreach (PipeLineWrapper.IDTSInputColumn90 inColumn in
    destinationComponent.InputCollection[0]
    .InputColumnCollection)
{   // create the map
    exColumn
        = destinationComponent.
        InputCollection[0]
        .ExternalMetadataColumnCollection[
        inColumn.Name.Replace(
        "\"","")];

    // our mapped column
    Column mappedColumn
        = GetTargetColumnInfo(exColumn.Name);
    string destName = mappedColumn.Name;
    // setting the new name
    exColumn.Name = destName;

    // creating the mapping
    managedOleInstanceDestinationComponent
        .MapInputColumn(
        destinationComponent.InputCollection[0].ID,
        inColumn.ID, exColumn.ID);
}
// Now release the connection
managedOleInstanceDestinationComponent.ReleaseConnections();

That's enough for the pilot package project. If we run our application it will move the data from the CSV to the destination database.

Using Data Conversion component

So far, our package was assuming the same data type for the destination columns as it is in the source columns (that's why the columns of our data table are defined as varchar). Now we will modify the package so that it can change the data type before moving the database to the destination component.

In order to change the data type we need to use a different data flow component – Data Conversion Component. So our objective package will be something like the following:

Incorporating data conversion component

We need to create a new data flow component (data conversion component) and we need to place the new component between the source and destination. Therefore, we will now modify the CreateDataFlowTask() method. Let's insert the following lines just after the managedFlatFileInstance.ReleaseConnections(); // close line.

C#
string DataConversionDataflowComponentID
    = "{C3BF62C8-7C5C-4F85-83C3-E0B6F6BE267C}";
PipeLineWrapper.IDTSComponentMetaData90
    dataconversionComponent =
        ((dataflowTask as TaskHost).InnerObject as
        PipeLineWrapper.MainPipe)
        .ComponentMetaDataCollection.New();
dataconversionComponent.Name
    = "Data conversion Component";
dataconversionComponent.ComponentClassID =
    DataConversionDataflowComponentID;
managedOleInstanceDataConversionComponent =
    dataconversionComponent.Instantiate();
managedOleInstanceDataConversionComponent
    .ProvideComponentProperties();
dataconversionComponent.InputCollection
    [0].ExternalMetadataColumnCollection.IsUsed
        = false;
dataconversionComponent
    .InputCollection[0].HasSideEffects = false;

We need to create a path between the source component's output and the data conversion component's input.

C#
PipeLineWrapper.IDTSPath90 path =
    ((dataflowTask as TaskHost).InnerObject as
        PipeLineWrapper.MainPipe).PathCollection.New();
path.AttachPathAndPropagateNotifications(
    sourceComponent.OutputCollection[0],
        dataconversionComponent.InputCollection[0]);

It's time to configure the data conversion component.

C#
// Get the derived's default input and virtual input.
PipeLineWrapper.IDTSInput90 input =
    dataconversionComponent.InputCollection[0];
PipeLineWrapper.IDTSVirtualInput90 vInput
    = input.GetVirtualInput();

// Iterate through the virtual input column collection.
foreach (PipeLineWrapper.IDTSVirtualInputColumn90 vColumn
    in vInput.VirtualInputColumnCollection)
{
    managedOleInstanceDataConversionComponent
        .SetUsageType(
        input.ID, vInput, vColumn.LineageID,
        PipeLineWrapper.DTSUsageType.UT_READONLY);
}

// putting the truncation row disposition
dataconversionComponent.OutputCollection[0]
    .TruncationRowDisposition =
        PipeLineWrapper.DTSRowDisposition.RD_NotUsed;
// putting the error row disposition
dataconversionComponent
    .OutputCollection[0].ErrorRowDisposition =
        PipeLineWrapper.DTSRowDisposition.RD_NotUsed;
// get the output column collection reference
PipeLineWrapper.IDTSOutput90 output =
    dataconversionComponent.OutputCollection[0];

foreach (PipeLineWrapper.IDTSInputColumn90 inColumn in
    dataconversionComponent.InputCollection
    [0].InputColumnCollection)
{   // create the map
    // get the target column from the mapping information
    PipeLineWrapper.IDTSOutputColumn90 outputColumn =
        dataconversionComponent.OutputCollection[0]
        .OutputColumnCollection.New();
        outputColumn.Name = inColumn.Name;
    Column targetColumn
        = GetTargetColumnInfo(inColumn.Name);
    // find out the length of the column
    int length = targetColumn.Length;
    // get the precision of the target column
    int precision = targetColumn.Precision;
    // get the scale of the target column
    int scale = targetColumn.Scale;
    // get the SSSIS complaint datatype from the given mappings
    Microsoft.SqlServer.Dts.Runtime.
        Wrapper.DataType dataType
            = targetColumn.DataType;

    // setting the data properties
    outputColumn.SetDataTypeProperties
        (dataType, length, precision,
        scale, targetColumn.CodePage);
    // putting the external metadata column id to zero
    outputColumn.ExternalMetadataColumnID = 0;
    outputColumn.ErrorRowDisposition
        = PipeLineWrapper.
        DTSRowDisposition.RD_RedirectRow;
    outputColumn.TruncationRowDisposition
        = PipeLineWrapper.DTSRowDisposition.RD_RedirectRow;

    PipeLineWrapper.IDTSCustomProperty90 property
        = outputColumn.CustomPropertyCollection.New();
    property.Name
        = "SourceInputColumnLineageID";
    property.Value
        = GetSourceColumnLineageID(targetColumn.Name);
    property
        = outputColumn.CustomPropertyCollection.New();
    property.Name
        = "FastParse";
    property.Value = false;

    // Now we are preserving the Lineage id into a list.
    // you know, when later we will configure the
    // dataflowcomponent of SQL destination
    // then, we will find all
    // the inputs (the input came from flat file
    // and the inputs
    // came from the derived columns output).
    // And we need to distinguish among them.
    // we will only consider those inputs into the data
    // destination component, where the
    // inputs are coming from
    // the output of the derived column component
    // which is actually here.
    derivedLineageIdentifiers[outputColumn.LineageID] =
        outputColumn.Name;
}

Data conversion component configuration is ready now. Lets create a path between the data conversion component and the data destination component to complete the flow of the data.

C#
PipeLineWrapper.IDTSPath90 pathBetweenDCandDD
    = ((dataflowTask as TaskHost).InnerObject as
    PipeLineWrapper.MainPipe).PathCollection.New();
pathDD.AttachPathAndPropagateNotifications(
    dataconversionComponent.OutputCollection[0],
    destinationComponent.InputCollection[0]);

We need to update the data destination configuration code as well. Basically we need to use the derivedLineageIdentifiers to retrieve the source lineage id and update the external metadata column's data type of the destination component.

Before doing this, let's quickly change the data table structure into the SQL database as follows:

SQL
CREATE TABLE [dbo].[DataTable](
    [Name] [varchar](50) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    [Age] [int] NULL,
    [JoinDate] [datetime] NULL,
    [Salary] [numeric](18, 9) NULL,
    [Retired] [bit] NULL
) ON [PRIMARY]

Notice that we are using different data types now rather using the varchar for all the columns.

C#
// setting the connection
if (destinationComponent.RuntimeConnectionCollection.Count > 0)
{   // If connection is necessary
    destinationComponent.RuntimeConnectionCollection
        [0].ConnectionManagerID =
        destinationDatabaseConnectionManager.ID;
    destinationComponent
        .RuntimeConnectionCollection[0].ConnectionManager =
        DtsConvert.ToConnectionManager90
        (destinationDatabaseConnectionManager);
}
// Set the custom properties.
managedOleInstanceDestinationComponent
    .SetComponentProperty
    ("AccessMode", 0);// Table of View mode
managedOleInstanceDestinationComponent
    .SetComponentProperty
    ("AlwaysUseDefaultCodePage",
    false); // Default Codepage
managedOleInstance.
    SetComponentProperty
    ("DefaultCodePage", 1252);
managedOleInstanceDestinationComponent.
    SetComponentProperty
    ("FastLoadKeepIdentity", false);     // Fast load
managedOleInstanceDestinationComponent
    .SetComponentProperty("FastLoadKeepNulls", false);
managedOleInstanceDestinationComponent
    .SetComponentProperty
    ("FastLoadMaxInsertCommitSize", 0);
managedOleInstanceDestinationComponent
    .SetComponentProperty
    ("FastLoadOptions",
    "TABLOCK,CHECK_CONSTRAINTS");
managedOleInstanceDestinationComponent
    .SetComponentProperty("OpenRowset",
    string.Format("[{0}].[dbo].[{1}]",
    DatabaseName, TableName));
// Establish a connection
managedOleInstanceDestinationComponent
    .AcquireConnections(null);
// initialize the metadata
managedOleInstanceDestinationComponent
    .ReinitializeMetaData();
// Get the destination's default input and virtual input
PipeLineWrapper.IDTSInput90 input =
    destinationComponent.InputCollection[0];
PipeLineWrapper.IDTSVirtualInput90 vInput
    = input.GetVirtualInput();
// Iterate through the virtual input column collection
foreach (PipeLineWrapper.IDTSVirtualInputColumn90 vColumn in
    vInput.VirtualInputColumnCollection)
{
    // Remember, we will find all columns here into
    // the 'vInput.VirtualInputColumnCollection'
    // Which is, if the total columns count into the flat file is
    // 6 then here you will
    // get 12 ( 6 * 2 ) columns into
    // the 'vInput.VirtualInputColumnCollection'. Because,
    // the data derived column usually provides all the inputs as
    // its outputs along with the outputs
    // that it really creates. And here we need to consider
    // only those inputs which came
    // from the derived column component (not from the flat file
    // source component).
    // How can we do that? we can do this by checking the
    // lineageid that we did populate during
    // the derived column creation process.
    if (derivedLineageIdentifiers.ContainsKey(vColumn.LineageID))
    {   // if the column came from the derived column dataflow
        // component
        managedOleInstanceDestinationComponent.
            SetUsageType(
            input.ID, vInput,
            vColumn.LineageID,
            PipeLineWrapper.DTSUsageType.UT_READONLY);
    }
}

PipeLineWrapper.IDTSExternalMetadataColumn90 exColumn;
foreach (PipeLineWrapper.IDTSInputColumn90 inColumn in
    destinationComponent.InputCollection[0]
    .InputColumnCollection)
{   // create the map
    exColumn = destinationComponent
        .InputCollection[0]
        .ExternalMetadataColumnCollection
        [inColumn.Name
        .Replace("\"","")];
    // our mapped column
    Column mappedColumn = GetTargetColumnInfo(exColumn.Name);
    string destName = mappedColumn.Name;
    // setting the new name
    exColumn.Name = destName;
    // setting the datatype
    exColumn.DataType =
        mappedColumn.DataType;  // Now we are
    // changing the data types here!
    // creating the mapping
    managedOleInstanceDestinationComponent.
        MapInputColumn(destinationComponent
        .InputCollection[0].ID,
        inColumn.ID, exColumn.ID);
}
// Now release the connection
managedOleInstanceDestinationComponent.ReleaseConnections();

Done! Now our package can change the data type during the data importing process.

Handling Error data rows

Now we are going to think about the error data rows. Assume if the CSV contains the following data:

plain
"Name","Age","JoinDate","Salary","Retired"
"Jon Tuite","45","","25.3","False"
"Linda Hamilton","25",2002-01-03 00:00:00,"BAD DATA","True"
"Sen Seldiver","42","2002-01-03 00:00:00,","458.2","True"
"Jon Clerk","DAMAGE","2001-01-03 00:00:00","455.265","False"
"Rose Dalson","25","2004-01-03 00:00:00","55.265","False" 

Here notice that the first row contains missing data in the JoinDate field and the second row contains bad data in the Salary field. So the data conversion component will fail to convert these two rows. Now if we want to report these error rows during the data flow task processing, then we need to create an extra destination data flow component into the package. This extra destination component will keep track the error rows and will move them into a SQL database. In order to accomplish this, let's modify the code base again. Now our desired package will look something like this:

Error Component

You might have already guessed that we are going to use another OLEDB destination component to keep track of the error rows. So let's create a connection manager for the new error destination component (we could use the previous one – but let's create another one so that the error can be moved to any other database rather than the destination database).

C#
private void CreateErrorDatabaseConnection()
{
    string OleDBMoniker = @"OLEDB";
    errorDatabaseConnectionManager
        = package.Connections.Add(OleDBMoniker);
    errorDatabaseConnectionManager.ConnectionString
        = ConnectionString;
    errorDatabaseConnectionManager.Name
        = "SSIS error Connection Manager for Oledb";
    errorDatabaseConnectionManager.Description
        = string.Concat("SSIS Connection Manager for ");
}

We need to create another table to keep track of the error rows. So let's create a table using the following SQL script:

SQL
CREATE TABLE [ErrorTable] (
    [Name] VARCHAR(50),
    [Age] VARCHAR(50),
    [JoinDate] VARCHAR(50),
    [Salary] VARCHAR(50),
    [Retired] VARCHAR(50),
    [ErrorCode] INTEGER,
    [ErrorColumn] INTEGER
)

We will modify the CreateDataFlowTask() to implement this error handling functionality. Let's append the following code into that method.

C#
PipeLineWrapper.IDTSComponentMetaData90 errorTrackerComponent =
    ((dataflowTask as TaskHost).InnerObject
    as PipeLineWrapper.MainPipe).ComponentMetaDataCollection.New();
errorTrackerComponent.ComponentClassID =
    OleDBDestinationDataFlowComponentID;

// get the COM instance
managedOleInstanceErrorComponent =
    errorTrackerComponent.Instantiate();
// populate the properties
managedOleInstanceErrorComponent
    .ProvideComponentProperties();
errorTrackerComponent.Name
    = "Error Tracker component";

Naturally we need to establish a path between the data conversion error output and the error destination components input. So do it as follows:

C#
PipeLineWrapper.IDTSPath90 pathDE
    = ((dataflowTask as TaskHost).InnerObject as
    PipeLineWrapper.MainPipe).PathCollection.New();
pathDE.AttachPathAndPropagateNotifications(
    dataconversionComponent.OutputCollection[1],
    errorTrackerComponent.InputCollection[0]);

As we discussed earlier in the article, the output collection contains two outputs; one is the data and another is for error outputs. So we have created the new path with dataconversionComponent.OutputCollection[1].

We will now configure the error destination component.

C#
// setting the connection
if (errorTrackerComponent.RuntimeConnectionCollection.
    Count > 0)
{   // If connection is necessary
    errorTrackerComponent
        .RuntimeConnectionCollection[0].ConnectionManagerID =
        errorDatabaseConnectionManager.ID;
    errorTrackerComponent.
        RuntimeConnectionCollection[0].ConnectionManager =
        DtsConvert.ToConnectionManager90
        (errorDatabaseConnectionManager);
}
// Set the custom properties
managedOleInstanceErrorComponent.
    SetComponentProperty
    ("AccessMode",
    0);// Table of View mode
managedOleInstanceErrorComponent.
    SetComponentProperty
    ("AlwaysUseDefaultCodePage",
    false); // Default Codepage
managedOleInstanceErrorComponent.
    SetComponentProperty
    ("DefaultCodePage", 1252);
managedOleInstanceErrorComponent.
    SetComponentProperty(
    "FastLoadKeepIdentity", false);     // Fast load
managedOleInstanceErrorComponent.
    SetComponentProperty
    ("FastLoadKeepNulls", false);
managedOleInstanceErrorComponent.
    SetComponentProperty(
    "FastLoadMaxInsertCommitSize", 0);
managedOleInstanceErrorComponent.
    SetComponentProperty
    ("FastLoadOptions",
    "TABLOCK,CHECK_CONSTRAINTS");
managedOleInstanceErrorComponent.
    SetComponentProperty
    ("OpenRowset",
    string.Format("[{0}].[dbo].[{1}]",
    DatabaseName, ErrorTableName));
// Establish a connection
managedOleInstanceErrorComponent.AcquireConnections(null);
// initialize the metadata
managedOleInstanceErrorComponent.ReinitializeMetaData();
// Get the destination's default input and virtual input
PipeLineWrapper.IDTSInput90 input =
    errorTrackerComponent.InputCollection[0];
PipeLineWrapper.IDTSVirtualInput90 vInput
    = input.GetVirtualInput();
// Iterate through the virtual input column collection
foreach (PipeLineWrapper.IDTSVirtualInputColumn90 vColumn in
    vInput.VirtualInputColumnCollection)
{
    managedOleInstanceErrorComponent.SetUsageType(
        input.ID, vInput, vColumn.LineageID,
        PipeLineWrapper. DTSUsageType.UT_READONLY);
}

PipeLineWrapper.IDTSExternalMetadataColumn90 exColumn;
foreach (PipeLineWrapper.IDTSInputColumn90 inColumn in
    errorTrackerComponent.InputCollection[0]
    .InputColumnCollection)
{   // create the map
    exColumn
        = errorTrackerComponent.InputCollection[0].
        ExternalMetadataColumnCollection
        [inColumn.Name.Replace("\"","")];

    exColumn.DataType = Microsoft.SqlServer.
        Dts.Runtime.Wrapper.DataType.DT_STR;

    // creating the mapping
    managedOleInstanceErrorComponent.
        MapInputColumn(errorTrackerComponent.
        InputCollection[0].ID,
        inColumn.ID, exColumn.ID);
}
// Now release the connection
managedOleInstanceErrorComponent.ReleaseConnections();

An error component typically stores the error code and error column lineage id along with the entire row data. So the error data table contains the columns available into the CSV file along with two extra columns (errorCode and errorColumn).

In this way we will only find the error code and the error row lineage ID (which is simply an identifier) of the output column. But we can write some extra code to find out an error description along with the error row name rather its identifier. I found an interesting article on this.

Now if we run the application it will move those two error rows into the error table and move all other rows into the datatable.

Using Derived Column Component

The CSV file that we last discussed has two error rows. And our current implementation moves both rows into the error destination table. But what we will do, if we need to bring the first row into the destination database rather than moving it to the error destination? For example, let's assume our destination data table schema says that it can contains null in the JoinDate and salary fields. But our current implementation will never move the missing data rows into the destination database. It will always fail to convert the missing data and finally considers it as an error row. Therefore for some businesses, may be it will be a problem indeed. To resolve this problem, we can replace the data conversion component with a Derived column component to get rid of this problem. Derived column component is an awesome component that is mainly used to transform the data. The Derived Column transformation creates new column values by applying expressions to transformation input columns. An expression can contain any combination of variables, functions, operators, and columns from the transformation input. The result can be added as a new column or inserted into an existing column as a replacement value. The Derived Column transformation can define multiple derived columns, and any variable or input columns can appear in multiple expressions. For more details on this please read MSDN.

In order to replace the data conversion component with a derived column component we need to make some minor change into the existing code base that we developed so far.

Derived Column Component

Creating a derived column component is exactly the same as creating a data conversion component except for the component class ID. The Component class id for a derived column component is "{9CF90BF0-5BCC-4C63-B91D-1F322DC12C26}".

C#
string DerivedColumnDataflowComponentID
    = "{9CF90BF0-5BCC-4C63-B91D-1F322DC12C26}";
PipeLineWrapper.IDTSComponentMetaData90
    derivedColumnComponent =
        ((dataflowTask as TaskHost).InnerObject as
        PipeLineWrapper.MainPipe).ComponentMetaDataCollection.New();
derivedColumnComponent.Name
    = "Derived Column Component";
        derivedColumnComponent.ComponentClassID =
        DerivedColumnDataflowComponentID;
managedOleInstanceDerivedColumnComponent =
    derivedColumnComponent.Instantiate();

managedOleInstanceDerivedColumnComponent.
    ProvideComponentProperties();
derivedColumnComponent.InputCollection[0].
    ExternalMetadataColumnCollection.IsUsed =
        false;
derivedColumnComponent.
    InputCollection[0].HasSideEffects = false;

As you can see, the component creation is pretty much the same as we did for the data conversion component creation. Now we will write the configuration code for the derived column component. This is also the same as the way in which we configured the data conversion component – except here we will create a new property for the expression. The configuration code is as follows:

C#
CManagedComponentWrapper managedOleInstance =
    derivedColumnDataFlowComponent.ComponentInstance;
// Get the derived's default input and virtual input.
IDTSInput90 input =
    derivedColumnDataFlowComponent.
InnerObject.InputCollection[0];
IDTSVirtualInput90 vInput = input.GetVirtualInput();

Dictionary<string, int> lineAgeIDs
    = new Dictionary<string, int>();

// Iterate through the virtual input column collection
foreach (IDTSVirtualInputColumn90 vColumn in
    vInput.VirtualInputColumnCollection)
{
    managedOleInstance.SetUsageType(
    input.ID, vInput, vColumn.LineageID,
    DTSUsageType.UT_READONLY);
    lineAgeIDs[vColumn.Name] = vColumn.LineageID;
}
// putting the truncation row disposition
derivedColumnDataFlowComponent
    .InnerObject.OutputCollection[0].TruncationRowDisposition =
    DTSRowDisposition.RD_NotUsed;
// putting the error row disposition
derivedColumnDataFlowComponent.
    InnerObject.OutputCollection[0].ErrorRowDisposition =
    DTSRowDisposition.RD_NotUsed;
// get the output column collection reference
IDTSOutput90 output = derivedColumnDataFlowComponent
    .InnerObject.OutputCollection[0];
foreach (IDTSInputColumn90 inColumn in
    derivedColumnDataFlowComponent.
    InnerObject.InputCollection[0].InputColumnCollection)
{   // create the map
    IDTSOutputColumn90 outputColumn =
        derivedColumnDataFlowComponent.
        InnerObject.
        OutputCollection[0].OutputColumnCollection.New();
    outputColumn.Name = inColumn.Name;
    Column targetColumn = GetTargetColumnInfo(inColumn.Name);
    // find out the length of the column
    int length = targetColumn.ColumnLength;
    // get the precision of the target column
    int precision = targetColumn.Precision;
    // get the scale of the target column
    int scale = targetColumn.Scale;
    // get the SSSIS complaint datatype from the given mappings
    Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType
        dataType = targetColumn.DataType;
    int codePage = 0;
    if (dataType ==
        Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_STR)
    {
        precision = 0;
        scale = 0;
        codePage = 1252;
    }
    else if(dataType ==
        Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_NUMERIC)
    {
        length = 0;
    }
    else if (dataType ==
        Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_DATE)
    {
        length = 0;
        precision = 0;
        scale = 0;
    }
    else if (dataType ==
        Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_BOOL)
    {
        length = 0;
        precision = 0;
        scale = 0;
    }

    // setting the data properties
    outputColumn.SetDataTypeProperties
        (dataType, length, precision, scale, codePage);
    // putting the external metadata column id to zero
    outputColumn.ExternalMetadataColumnID = 0;
    outputColumn.ErrorRowDisposition =
        DTSRowDisposition.RD_RedirectRow;
    outputColumn.TruncationRowDisposition =
        DTSRowDisposition.RD_RedirectRow;

    string expression = string.Empty;
    string friendlyExpression = string.Empty;

    GetExpressionString(ref expression,
        ref friendlyExpression,
        dataType, length, precision,
        scale,
        lineAgeIDs,  outputColumn.Name);

    IDTSCustomProperty90 property =
        outputColumn.CustomPropertyCollection.New();
    property.Name = "Expression";
    property.Value = expression;
    property = outputColumn.CustomPropertyCollection.New();
    property.Name
        = "FriendlyExpression";
    property.Value
        = friendlyExpression;// "getdate()";

    // Now we are preserving the Lineage id into a list.
    // You know, when later we will configure the
    // dataflowcomponent of SQL destination
    // then, we will find all the inputs (the input came from
    // flat file and the inputs
    // came from the derived columns output). And we need is to
    // distinguish among them.
    // We will only consider those inputs into the data
    // destination component, where the
    // inputs are coming from the output of the derived column
    // component which is actually here.
    derivedLineageIdentifiers[outputColumn.LineageID]
        = outputColumn.Name;

We defined a method here named GetExpressionString() which is actually creating the expression string. Let's implement the method now.

C#
private void GetExpressionString(ref string expression, ref string
    friendlyExpression,
    Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType dataType,
    int length, int precision, int scale,
    Dictionary<string, int> lineAgeIDs,
    string columnName)
{
    expression = string.Empty;
    friendlyExpression = string.Empty;
    // get the lineage id for the column name
    int lineageID = lineAgeIDs[columnName];

    switch (dataType)
    {
        case Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_STR:
        expression =
            string.Format("#{0}",lineageID);
        friendlyExpression = string.Format("{0}", columnName);
        break;
        case Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_I4:
        // 4 byte signed integer
        expression =
            string.Format("[ISNULL](#{0}) || #{1} ==
            \"\" ? NULL(DT_I4) : (DT_I4)#{2}",
            lineageID, lineageID, lineageID);
        friendlyExpression = string.Format("ISNULL([{0}]) || [{1}] ==
            \"\" ? NULL(DT_I4) : (DT_I4)[{2}]",
            columnName, columnName, columnName);
        break;
        case Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_DECIMAL:

        case Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_NUMERIC:
        // Should we handle precision here?
        expression = string.Format("[ISNULL](#{0}) || #{1} ==
            \"\" ? NULL(DT_NUMERIC,{2},{3}) : (DT_NUMERIC,{4},{5})#
            {6}", lineageID, lineageID, precision, scale, precision,
            scale, lineageID);
        friendlyExpression =
            string.Format("[ISNULL]([{0}]) || [{1}] ==
            \"\" ? NULL(DT_NUMERIC,{2},{3}) : (DT_NUMERIC,{4},
            {5})[{6}]", columnName, columnName, precision, scale,
            precision, scale, columnName);
        break;
        case Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_DATE:
        expression = string.Format("[ISNULL](#{0}) || #{1} ==
            \"\" ? NULL(DT_DATE) : (DT_DATE)#{2}", lineageID,
            lineageID, lineageID);
        friendlyExpression = string.Format("ISNULL([{0}]) || [{1}] ==
            \"\" ? NULL(DT_DATE) : (DT_DATE)[{2}]",
            columnName, columnName, columnName);
        break;
        case Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_I8:
        expression = string.Format("[ISNULL](#{0}) || #{1} ==
            \"\" ? NULL(DT_I8) : (DT_I8)#{2}", lineageID,
            lineageID, lineageID);
        friendlyExpression = string.Format("ISNULL([{0}]) || [{1}] ==
            \"\" ? NULL(DT_I8) : (DT_I8)[{2}]",
            columnName, columnName, columnName);
        break;
        case Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_BOOL:
        expression = string.Format("[ISNULL](#{0}) || #{1} ==
            \"\" ? NULL(DT_BOOL) : (DT_BOOL)#{2}",
            lineageID, lineageID, lineageID);
        friendlyExpression =
            string.Format("ISNULL([{0}]) || [{1}] ==
            \"\" ? NULL(DT_BOOL) : (DT_BOOL)[{2}]",
            columnName, columnName, columnName);
        break;
        default:
        expression = string.Format("#{0}", lineageID);
        friendlyExpression =
            string.Format("{0}", columnName);
        break;
    }
}

So that's all for our derived column implementation. Now our package should bring the missing rows into the data destination table – as it is supposed to. Although the derived column usually does the real data derived tasks, it can be used to resolve this issue as well.

Conclusion

In this article I just described the way in which we can programmatically use different data flow components to meet our custom purpose. I hope you will enjoy this.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)