Introduction
SQL server 2005 Integration Service ships with a rich set of APIs and in this article I am going to show you a way in which you can create and execute an SSIS package programmatically from C# code.
Background
Couple of weeks ago, I was going to import a huge size CSV (comma separated value) file into the SQL server database. Well, it is not such a difficult job that I will write an article on it. Anyone can do that using the SQL server 2005 import wizard, which is a great flexible tool to do this kind of job. But I need to do that work programmatically. And that's what made me think and search the Internet for a solution. After spending some hours I found that SQL server 2005 provides a rich set of APIs of SSIS (SQL server 2005 Integration Service) and anyone can easily create an SSIS package (called DTS Package previously) and execute that package. Unfortunately I did not find code that creates a package that imports data from the CSV file to the SQL server 2005 database. Most of the demo code that I found during my searching imported data from the SQL server to the SQL server or from the SQL server to any flat file. It would be very straightforward to implement one for my own purpose- hopefully just by following the existing demo codes. But soon I discovered that it is a subtle different then those example codes. And after two days of brainstorming I have created a code base which satisfied me. In this article I am just sharing my code and thoughts with you.
Using the code
In my code base I have created two projects for this import business. The first one (ImportLib
) is a class library that implements the SSIS package creation tasks and the second one (Import
) is a Windows form application which is actually a DEMO that uses the first one as a reference and initiates an Import Job. So exploring the first one is enough to get an good idea about what I actually did.
The object model that I have created is basically standing on a couple of interfaces, namely ImportLib.ISourceStorage
, ImportLib.IDestinationStorage
, ImportLib.IntegrationService.ISource
and ImportLib.IntegrationService.IDestination
. Before diving into the details I would like to express my thoughts.
After exploring the SSIS you would find that it is very easy to import data from any kind of data sources to any data destinations. The SSIS object model implemented some well defined interfaces to support this wonderful feature. As I am writing code to programmatically create packages - why shouldn't I implement my code in a way that I can extend more in the future? Currently it will be able to import data from a CSV file to SQL databases but who knows that tomorrow I don't have to import data from an Excel file to SQL server or any other kind of data destination? This idea made me write the ImportLib.ISourceStorage
and ImportLib.IDestinationStorage
interfaces. I guess you already got an abstract idea for what these interfaces are there – right? Yeah, For all source kind of source storage I defined the ImportLib.ISourceStorage
interface. Currently I am only implementing this interface for CSV file sources but later when I will write the Excel source I will simply implement this interface again. The same idea is also applicable to the destination storage where the interface is ImportLib.IDestinationStorage
.
Let's take a look on these interfaces:
public interface ISourceStorage
{
void InitializeSource(IDictionary<string, string> properties);
DataTable GetSchemaTable();
StorageMedium StorageMedium { get; }
ColumnMappingController MapManager { get; set; }
}
And
public interface IDestinationStorage
{
void InitializeDestination(IDictionary<string,
string> properties);
StorageMedium StorageMedium { get; }
ColumnMappingController MapManager { get; set; }
}
Now let's take a look at the concrete implementations. ImportLib.Delimited.DelimitedDataSource
is the concrete class that implements the ImportLib.ISourceStorage
. This class simply encapsulated the functionality that is defined by the interface and is specific for a CSV file. It helps to read the CSV file for the source schema using the System.Data.OleDb.OleDbConnection
class. And another concrete class that I named as ImportLib.Sql.SqlStorageDestination
implements the ImportLib.IDestinationStorage
interface. This class provides the functionality defined by the interface and it also contains the routines that are used to create the destination datastore (database and table).
So far so good. Now let's concentrate on the SSIS specific classes and interfaces. Well here again I have defined two interfaces that I mentioned earlier. ImportLib.IntegrationService.ISource
and ImportLib.IntegrationService.IDestination
. Let's take a look on them.
public interface ISource
{
ISourceStorage DataSource
{
get;
set;
}
void CreateConnection(DTSPackage package);
DataflowComponent CreateSourceDataFlowComponent(
DTSPackage package, DTSExecutable dataflowTask);
void InitializeDataflowComponent(DataflowComponent
sourceDataFlowComponent);
}
And another one is:
public interface IDestination
{
IDestinationStorage DataDestination
{
get;
set;
}
ISource SsisSource
{
get;
set;
}
void CreateConnection(DTSPackage package);
DTSExecutable CreateStorageCreationTask(DTSPackage package);
DataflowComponent CreateDestinationDataFlowComponent
(DTSPackage package, DTSExecutable dataflowTask);
void InitializeDataflowComponent(DataflowComponent
destinationDataFlowComponent);
}
These interfaces strictly define the SSIS specific routines such as creating connections, creating data flow components along with some initialization routines. Defining SSIS dataflow is out of scope of this article, for detailed information on these SSIS components, please read MSDN. Now we will take a look at the concrete implementations. ImportLib.IntegrationService.Specialized.FlatFileSource
is the concrete implementation of the ImportLib.IntegrationService.ISource
interface. Have a look at this class. Here the important thing is we have to explicitly create the source columns ( CreateSourceColumns
method ) for the connection which SSIS will never create for you.
namespace ImportLib.IntegrationService.Specialized
{
public class FlatFileSource : ISource
{
private DelimitedDataSource delimitedDataSource;
public const string FlatFileMoniker = @"FLATFILE";
public const string SourceDataFlowComponentID
= "{90C7770B-DE7C-435E-880E-E718C92C0573}";
private DataTable schemaTable = null;
private ConnectionManager connectionManager;
public FlatFileSource()
{
}
#region ISsisSource Members
public ISourceStorage DataSource
{
get { return delimitedDataSource; }
set
{
delimitedDataSource = value as DelimitedDataSource;
schemaTable = value.GetSchemaTable();
Debug.Assert(delimitedDataSource != null);
}
}
public void CreateConnection(DTSPackage package)
{
#region Logging
Logger.WriteInformation(
"Creating connection to the source file.");
#endregion
connectionManager =
package.InnerObject.Connections.Add(FlatFileMoniker);
connectionManager.ConnectionString =
delimitedDataSource.FileName;
connectionManager.Name =
"SSIS Connection Manager for Files";
connectionManager.Description =
string.Concat("SSIS Connection Manager");
connectionManager.Properties
["ColumnNamesInFirstDataRow"].
SetValue(connectionManager,
delimitedDataSource.FirstRowIsHeader);
connectionManager.Properties["Format"].SetValue
(connectionManager, "Delimited");
connectionManager.Properties["HeaderRowDelimiter"].
SetValue(connectionManager,
delimitedDataSource.HeaderRowDelimiter);
if (delimitedDataSource.TextQualifier != null)
{
connectionManager.Properties["TextQualifier"]
.SetValue(connectionManager,
delimitedDataSource.TextQualifier);
}
CreateSourceColumns();
#region Logging
Logger.WriteInformation(
"Creating connection to the source file.....Completed");
#endregion
}
public DataflowComponent CreateSourceDataFlowComponent
(DTSPackage package, DTSExecutable dataflowTask)
{
DataflowComponent sourceDataFlowComponent =
new DataflowComponent(dataflowTask, SourceDataFlowComponentID,
"Source Data Flow component");
return sourceDataFlowComponent;
}
public void InitializeDataflowComponent(
DataflowComponent sourceDataFlowComponent)
{
#region Logging
Logger.WriteInformation(
"Initializing the managed instance for the source file.");
#endregion
CManagedComponentWrapper managedFlatFileInstance =
sourceDataFlowComponent.ComponentInstance;
managedFlatFileInstance.ProvideComponentProperties();
if (
sourceDataFlowComponent.InnerObject.
RuntimeConnectionCollection.Count > 0)
{
sourceDataFlowComponent.InnerObject.
RuntimeConnectionCollection[0].ConnectionManagerID =
connectionManager.ID;
sourceDataFlowComponent.InnerObject.
RuntimeConnectionCollection[0].ConnectionManager =
DtsConvert.ToConnectionManager90
(connectionManager);
}
managedFlatFileInstance.AcquireConnections(null);
managedFlatFileInstance.ReinitializeMetaData();
IDTSExternalMetadataColumn90 exOutColumn;
foreach (IDTSOutputColumn90 outColumn in
sourceDataFlowComponent.InnerObject.
OutputCollection[0].OutputColumnCollection)
{
exOutColumn =
sourceDataFlowComponent.InnerObject.
OutputCollection[0].
ExternalMetadataColumnCollection[outColumn.Name];
managedFlatFileInstance.MapOutputColumn(
sourceDataFlowComponent.InnerObject.
OutputCollection[0].ID, outColumn.ID,
exOutColumn.ID, true);
}
managedFlatFileInstance.ReleaseConnections();
#region Logging
Logger.WriteInformation
("Initializing the managed instance for the source
file......completed");
#endregion
}
#endregion
private void CreateSourceColumns()
{
RuntimeWrapper.IDTSConnectionManagerFlatFile90
flatFileConnection =
connectionManager.InnerObject
as RuntimeWrapper.IDTSConnectionManagerFlatFile90;
RuntimeWrapper.IDTSConnectionManagerFlatFileColumn90 column;
RuntimeWrapper.IDTSName90 name;
Debug.WriteLine(flatFileConnection.Columns.Count);
DataTable schemaTable
= DataSource.GetSchemaTable();
foreach (DataRow row in schemaTable.Rows)
{
string colName
= row["ColumnName"] as string;
column
= flatFileConnection.Columns.Add();
if (schemaTable.Rows.IndexOf(row)
== (schemaTable.Rows.Count - 1))
column.ColumnDelimiter =
delimitedDataSource.HeaderRowDelimiter;
else
column.ColumnDelimiter = delimitedDataSource.Delimiter;
column.TextQualified =
delimitedDataSource.TextQualifier != null;
column.ColumnType = "Delimited";
column.DataType =
RuntimeWrapper.DataType.DT_WSTR;
column.DataPrecision = 0;
column.DataScale = 0;
name = (RuntimeWrapper.IDTSName90)column;
name.Name = colName;
}
}
}
}
And now let's have a look at the counterpart, the destination implementation.
namespace ImportLib.IntegrationService.Specialized
{
public class SqlServerDataDestination : IDestination
{
private SqlStorageDestination sqlDataDestination;
public const string OleDBMoniker = "OLEDB";
public const string OleDBDestinationDataFlowComponentID
= "{E2568105-9550-4F71-A638-B7FE42E66922}";
private ISource ssisSource;
private ConnectionManager connectionManager;
#region ISsisDestination Members
public IDestinationStorage DataDestination
{
get { return sqlDataDestination; }
set
{
sqlDataDestination = value as SqlStorageDestination;
Debug.Assert(sqlDataDestination != null);
}
}
public ISource SsisSource
{
get { return ssisSource; }
set { ssisSource = value; }
}
public void CreateConnection(DTSPackage package)
{
connectionManager =
package.InnerObject.Connections.Add(OleDBMoniker);
connectionManager.ConnectionString =
GetSsisConnectionString();
connectionManager.Name =
"SSIS Connection Manager for Oledb";
connectionManager.Description =
string.Concat(
"SSIS Connection Manager for ",
sqlDataDestination.DatabaseName);
}
public DTSExecutable CreateStorageCreationTask(DTSPackage package)
{
Type taskType = typeof(ExecuteSQLTask);
DTSExecutable executable = new DTSExecutable(package, taskType);
TaskHost taskHost = executable.InnerObject as TaskHost;
ExecuteSQLTask sqlTask = taskHost.InnerObject as ExecuteSQLTask;
sqlTask.Connection = connectionManager.Name;
sqlTask.SqlStatementSource =
sqlDataDestination.GetDestinationTableCreationSql
(ssisSource.DataSource.GetSchemaTable());
return executable;
}
public DataflowComponent CreateDestinationDataFlowComponent
(DTSPackage package, DTSExecutable dataflowTask)
{
#region Logging
Logger.WriteInformation(
"Creating managed instances for the destination database");
#endregion
DataflowComponent destinationDataFlowComponent
= new DataflowComponent(dataflowTask,
OleDBDestinationDataFlowComponentID,
"Destination Oledb Component");
sqlDataDestination.CreateDataStore
(ssisSource.DataSource.GetSchemaTable());
CManagedComponentWrapper managedOleInstance =
destinationDataFlowComponent.ComponentInstance;
managedOleInstance.ProvideComponentProperties();
if (destinationDataFlowComponent.InnerObject.
RuntimeConnectionCollection.Count > 0)
{
destinationDataFlowComponent.InnerObject.
RuntimeConnectionCollection[0].
ConnectionManagerID =
connectionManager.ID;
destinationDataFlowComponent.InnerObject.
RuntimeConnectionCollection[0].
ConnectionManager =
DtsConvert.ToConnectionManager90
(connectionManager);
}
managedOleInstance.SetComponentProperty(
"AccessMode", 0);
managedOleInstance.SetComponentProperty
("AlwaysUseDefaultCodePage", false);
managedOleInstance.SetComponentProperty
("DefaultCodePage", 1252);
managedOleInstance.SetComponentProperty
("FastLoadKeepIdentity", false);
managedOleInstance.SetComponentProperty
("FastLoadKeepNulls", false);
managedOleInstance.SetComponentProperty
("FastLoadMaxInsertCommitSize", 0);
managedOleInstance.SetComponentProperty
("FastLoadOptions",
"TABLOCK,CHECK_CONSTRAINTS");
managedOleInstance.SetComponentProperty("OpenRowset",
string.Format("[{0}].[dbo].[{1}]",
sqlDataDestination.DatabaseName,
sqlDataDestination.TableName));
#region Logging
Logger.WriteInformation(
"Creating managed instances for the destination
database....completed");
#endregion
return destinationDataFlowComponent;
}
public void InitializeDataflowComponent(DataflowComponent
destinationDataFlowComponent)
{
#region Logging
Logger.WriteInformation(
"Creating the destination columns and their mappings");
#endregion
CManagedComponentWrapper managedOleInstance
= destinationDataFlowComponent.ComponentInstance;
managedOleInstance.AcquireConnections(null);
managedOleInstance.ReinitializeMetaData();
IDTSInput90 input = destinationDataFlowComponent.
InnerObject.InputCollection[0];
IDTSVirtualInput90 vInput = input.GetVirtualInput();
foreach (IDTSVirtualInputColumn90 vColumn
in vInput.VirtualInputColumnCollection)
{
bool res = sqlDataDestination.MapManager.
IsSuppressedSourceColumn
(vColumn.Name,ssisSource.DataSource.GetSchemaTable());
if (!res)
{
managedOleInstance.SetUsageType(
input.ID, vInput, vColumn.LineageID,
DTSUsageType.UT_READONLY);
}
}
IDTSExternalMetadataColumn90 exColumn;
foreach (IDTSInputColumn90 inColumn in
destinationDataFlowComponent.InnerObject.
InputCollection[0].InputColumnCollection)
{
exColumn = destinationDataFlowComponent.InnerObject.
InputCollection[0].
ExternalMetadataColumnCollection[inColumn.Name];
string destName = sqlDataDestination.MapManager.
GetDestinationColumn(exColumn.Name).ColumnName;
exColumn.Name = destName;
managedOleInstance.MapInputColumn(
destinationDataFlowComponent.
InnerObject.InputCollection[0].ID,
inColumn.ID, exColumn.ID);
}
managedOleInstance.ReleaseConnections();
sqlDataDestination.DeleteDataStore();
#region Logging
Logger.WriteInformation
("Creating the destination columns and their
mappings.....completed");
#endregion
}
#endregion
private string GetSsisConnectionString()
{
"Data Source=VSTS;
Initial Catalog=TEST;Provider=SQLNCLI;Integrated
Security=SSPI;Auto Translate=false;";
string connectionString
= sqlDataDestination.ConnectionString;
Dictionary<string,
string> connectionProperties =
new Dictionary<string, string>();
foreach( string part in
connectionString.Split(";".ToCharArray()))
{
string[] keyValue = part.Split("=".ToCharArray(),
StringSplitOptions.RemoveEmptyEntries);
if (keyValue != null && keyValue.Length == 2)
{
string propertyName = keyValue[0].Trim();
string valueName = keyValue[1].Trim();
connectionProperties.Add(propertyName, valueName);
}
}
connectionProperties["Provider"]
= "SQLNCLI";
connectionProperties["Integrated Security"]
= "SSPI";
connectionProperties["Auto Translate"]
= "false";
StringBuilder ssisCompatibaleConnectionString
= new StringBuilder();
for (Dictionary<string, string>.Enumerator iterator
=
connectionProperties.GetEnumerator();
iterator.MoveNext(); )
{
if (ssisCompatibaleConnectionString.Length > 0)
{
ssisCompatibaleConnectionString.Append(";");
}
ssisCompatibaleConnectionString.Append(
string.Format("{0}={1}",
iterator.Current.Key, iterator.Current.Value));
}
return ssisCompatibaleConnectionString.ToString();
}
}
}
Along with these core classes for import business, I have also written a Log provider for the SSIS package by which I keep Log into the GUI while the package is executing. Creating a log provider means you have to extend the LogProviderBase
class provided by Microsoft. Here is my Log provider class.
namespace ImportLib.IntegrationService.Logging
{
[DtsLogProvider(DisplayName = "LogProvider",
Description = "
Log provider for DTS packages.",
LogProviderType = "Custom")]
public class EventLogProvider : LogProviderBase
{
public override string ConfigString
{
get
{
return string.Empty;
}
set
{
}
}
public override void OpenLog()
{
base.OpenLog();
}
public override void CloseLog()
{
base.CloseLog();
}
public override void InitializeLogProvider
(Connections connections,
IDTSInfoEvents events,
ObjectReferenceTracker refTracker)
{
base.InitializeLogProvider(connections, events, refTracker);
}
public override void Log(string logEntryName,
string computerName,
string operatorName,
string sourceName,
string sourceID,
string executionID, string messageText,
DateTime startTime,
DateTime endTime, int dataCode, byte[] dataBytes)
{
LogEventArgs e = new
LogEventArgs(logEntryName,computerName,messageText);
OnLogCreated(e);
}
protected virtual void OnLogCreated(LogEventArgs e)
{
LogCreatedDelegate mLogCreated = this.LogCreated;
if (mLogCreated != null)
{
mLogCreated(this, e);
}
}
public event LogCreatedDelegate LogCreated;
}
public delegate void LogCreatedDelegate ( object sender , LogEventArgs e );
}
Well, the last but not least important thing is the mapping. That means, during the importing task it is possible to import one column's data to another one. So you can provide the mapping between the source and destination column. You can even ignore some source column's data during the importing task. Although, I have not created a GUI for defining the mappings, I have created the mapping from the code. But it is simply a matter of time to create an interactive GUI where use can specify the mappings. Here is the class that encapsulates the mapping information:
namespace ImportLib.Mappings
{
[Serializable()]
public class ColumnMappingController
{
public ColumnMappingController()
{
mappings = new List<Map>();
}
private List<Map> mappings;
public List<Map> Mappings
{
get { return mappings; }
set { mappings = value; }
}
private Column[] destinationColumns;
public Column[] DestinationColumns
{
get { return destinationColumns; }
set { destinationColumns = value; }
}
public Column[] UnmappedDestinationColumns
{
get
{
List<Column> unmappedColumns = new List<Column>();
foreach (Column destinationColumn in destinationColumns)
{
if (!ContainsInDestinationMap(destinationColumn))
{
unmappedColumns.Add(destinationColumn);
}
}
return unmappedColumns.ToArray();
}
}
public Column[] SuppressedSourceColumns(DataTable srcSchemaTable)
{
List<Column> suppressedColumns = new List<Column>();
foreach (DataRow row in srcSchemaTable.Rows)
{
string columnName = row["columnName"] as string;
if (!ContainsInSourceMap(columnName))
{
suppressedColumns.Add(new Column(columnName));
}
}
return suppressedColumns.ToArray();
}
public bool IsSuppressedSourceColumn(
string sourceColumnName, DataTable srcSchemaTable)
{
return Array.IndexOf<Column>(
SuppressedSourceColumns(srcSchemaTable),
new Column(sourceColumnName)) > -1;
}
public Column GetDestinationColumn(string sourceColumnName)
{
foreach (Map map in mappings)
{
if (map.SourceColumn.ColumnName.Equals(sourceColumnName))
return map.DestinationColumn;
}
throw new ApplicationException
("No mapping defined for the source column " +
sourceColumnName);
}
private bool ContainsInSourceMap(string sourceColumnName)
{
foreach (Map map in mappings)
{
if (map.SourceColumn.ColumnName.Equals(sourceColumnName))
return true;
}
return false;
}
private bool ContainsInDestinationMap(Column destinationColumn)
{
foreach (Map map in mappings)
{
if (map.DestinationColumn.Equals(destinationColumn))
return true;
}
return false;
}
private void Validate()
{
throw
new NullReferenceException
("SourceColumns is not set to an instance of an object");
if (destinationColumns == null)
throw
new NullReferenceException(
"DestinationColumns is
not set to an instance of an object");
}
}
}
You will see that when I create an import job I give an instance of this class to the Import manager. So you can either create this instance from the code (like I did) or enable your user to create this from a GUI.
I hope you will enjoy this!
Points of Interest
I have not written any code that can perform any transformations during importing, say you need to perform some work on the data before it is imported into the destination data store. I hope to write some code on that part soon and then I will try to publish an article on that topic.