Developing a Custom Source Component
SQL Server Integration Services gives developers the ability to write source components that can connect to custom data sources and supply data from those sources to other components in a data flow task. The ability to create custom sources is valuable when you must connect to data sources that cannot be accessed by using one of the existing Integration Services sources.
Source components have one or more outputs and zero inputs. At design time, source components are used to create and configure connections, read column metadata from the external data source, and configure the source's output columns based on the external data source. During execution they connect to the external data source and add rows to an output buffer. The data flow task then provides this buffer of data rows to downstream components.
Implementing the design-time functionality of a source component involves specifying a connection to an external data source, adding and configuring output columns that reflect the data source, and validating that the component is ready to execute. By definition, a source component has zero inputs and one or more asynchronous outputs.
Creating the Component
Source components connect to external data sources by using ConnectionManager objects defined in a package. They indicate their requirement for a connection manager by adding an element to the RuntimeConnectionCollection collection of the ComponentMetaData property. This collection serves two purposes—to hold references to connection managers in the package used by the component, and to advertise the need for a connection manager to the designer. When an IDTSRuntimeConnection100 has been added to the collection, the Advanced Editor displays the Connection Properties tab, which lets users select or create a connection in the package.
Imports System.Data Imports System.Data.SqlClient Imports Microsoft.SqlServer.Dts.Runtime Imports Microsoft.SqlServer.Dts.Runtime.Wrapper Imports Microsoft.SqlServer.Dts.Pipeline Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper <DtsPipelineComponent(DisplayName:="MySourceComponent", ComponentType:=ComponentType.SourceAdapter)> _ Public Class MySourceComponent Inherits PipelineComponent Public Overrides Sub ProvideComponentProperties() ' Allow for resetting the component. RemoveAllInputsOutputsAndCustomProperties() ComponentMetaData.RuntimeConnectionCollection.RemoveAll() Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.New() output.Name = "Output" Dim connection As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New() connection.Name = "ADO.NET" End Sub End Class
Connecting to an External Data Source
After a connection has been added to the RuntimeConnectionCollection, you override the AcquireConnections method to establish a connection to the external data source. This method is called during both design and execution time. The component should establish a connection to the connection manager specified by the run-time connection, and subsequently, to the external data source.
After the connection is established, it should be cached internally by the component and released when the ReleaseConnections method is called. The ReleaseConnections method is called at design and execution time, like the AcquireConnections method. Developers override this method, and release the connection established by the component during AcquireConnections.
Private sqlConnection As SqlConnection Public Overrides Sub AcquireConnections(ByVal transaction As Object) If Not IsNothing(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager) Then Dim cm As ConnectionManager = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager) Dim cmado As ConnectionManagerAdoNet = CType(cm.InnerObject, ConnectionManagerAdoNet) If IsNothing(cmado) Then Throw New Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.") End If sqlConnection = CType(cmado.AcquireConnection(transaction), SqlConnection) sqlConnection.Open() End If End Sub Public Overrides Sub ReleaseConnections() If Not IsNothing(sqlConnection) And sqlConnection.State <> ConnectionState.Closed Then sqlConnection.Close() End If End Sub
Creating and Configuring Output Columns
The output columns of a source component reflect the columns from the external data source that the component adds to the data flow during execution. At design time, you add output columns after the component has been configured to connect to an external data source. The design-time method that a component uses to add the columns to its output collection can vary based on the needs of the component, although they should not be added during Validate or AcquireConnections. For example, a component that contains a SQL statement in a custom property that controls the data set for the component may add its output columns during the SetComponentProperty method. The component checks to see whether it has a cached connection, and, if it does, connects to the data source and generates its output columns.
After an output column has been created, set its data type properties by calling the SetDataTypeProperties method. This method is necessary because the DataType, Length, Precision, and CodePage properties are read-only and each property is dependent on the settings of the other. This method enforces the need for these values to be set consistently, and the data flow task validates that they are set correctly.
The DataType of the column determines the values that are set for the other properties. The following table shows the requirements on the dependent properties for each DataType. The data types not listed have their dependent properties set to zero.
Greater than 0 and less than or equal to 28.
Greater than 0 and less than or equal to 28, and less than Precision.
Greater than or equal to 1 and less than or equal to 38.
Greater than 0.
Greater than 0 and less than 8000.
Not 0, and a valid code page.
Greater than 0 and less than 4000.
Because the restrictions on the data type properties are based on the data type of the output column, you must choose the correct SSIS data type when you work with managed types. The base class provides three helper methods, ConvertBufferDataTypeToFitManaged, BufferTypeToDataRecordType, and DataRecordTypeToBufferType, to assist managed component developers in selecting an SSIS data type given a managed type. These methods convert managed data types to SSIS data types, and vice versa.
The following code example shows how the output column collection of a component is populated based on the schema of a table. The helper methods of the base class are used to set the data type of the column, and the dependent properties are set based on the data type.
Private sqlCommand As SqlCommand Private Sub CreateColumnsFromDataTable() ' Get the output. Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0) ' Start clean, and remove the columns from both collections. output.OutputColumnCollection.RemoveAll() output.ExternalMetadataColumnCollection.RemoveAll() Me.sqlCommand = sqlConnection.CreateCommand() Me.sqlCommand.CommandType = CommandType.Text Me.sqlCommand.CommandText = CStr(ComponentMetaData.CustomPropertyCollection("SqlStatement").Value) Dim schemaReader As SqlDataReader = Me.sqlCommand.ExecuteReader(CommandBehavior.SchemaOnly) Dim dataTable As DataTable = schemaReader.GetSchemaTable() ' Walk the columns in the schema, ' and for each data column create an output column and an external metadata column. For Each row As DataRow In dataTable.Rows Dim outColumn As IDTSOutputColumn100 = output.OutputColumnCollection.New() Dim exColumn As IDTSExternalMetadataColumn100 = output.ExternalMetadataColumnCollection.New() ' Set column data type properties. Dim isLong As Boolean = False Dim dt As DataType = DataRecordTypeToBufferType(CType(row("DataType"), Type)) dt = ConvertBufferDataTypeToFitManaged(dt, isLong) Dim length As Integer = 0 Dim precision As Integer = CType(row("NumericPrecision"), Short) Dim scale As Integer = CType(row("NumericScale"), Short) Dim codepage As Integer = dataTable.Locale.TextInfo.ANSICodePage Select Case dt ' The length cannot be zero, and the code page property must contain a valid code page. Case DataType.DT_STR Case DataType.DT_TEXT length = precision precision = 0 scale = 0 Case DataType.DT_WSTR length = precision codepage = 0 scale = 0 precision = 0 Case DataType.DT_BYTES precision = 0 scale = 0 codepage = 0 Case DataType.DT_NUMERIC length = 0 codepage = 0 If precision > 38 Then precision = 38 End If If scale > 6 Then scale = 6 End If Case DataType.DT_DECIMAL length = 0 precision = 0 codepage = 0 Case Else length = 0 precision = 0 codepage = 0 scale = 0 End Select ' Set the properties of the output column. outColumn.Name = CStr(row("ColumnName")) outColumn.SetDataTypeProperties(dt, length, precision, scale, codepage) Next End Sub
Validating the Component
You should validate a source component and verify that the columns defined in its output column collections match the columns at the external data source. Sometimes, verifying the output columns against the external data source can be impossible, such as in a disconnected state, or when it is preferable to avoid lengthy round trips to the server. In these situations, the columns in the output can still be validated by using the ExternalMetadataColumnCollection of the output object. For more information, see Validating a Data Flow Component.
This collection exists on both input and output objects and you can populate it with the columns from the external data source. You can use this collection to validate the output columns when SSIS Designer is offline, when the component is disconnected, or when the ValidateExternalMetadata property is false. The collection should be first populated at the same time that the output columns are created. Adding external metadata columns to the collection is relatively easy because the external metadata column should initially match the output column. The data type properties of the column should have already been set correctly, and the properties can be copied directly to the IDTSExternalMetadataColumn100 object.
The following sample code adds an external metadata column that is based on a newly created output column. It assumes that the output column has already been created.
Private Sub CreateExternalMetaDataColumn(ByVal output As IDTSOutput100, ByVal outputColumn As IDTSOutputColumn100) ' Set the properties of the external metadata column. Dim externalColumn As IDTSExternalMetadataColumn100 = output.ExternalMetadataColumnCollection.New() externalColumn.Name = outputColumn.Name externalColumn.Precision = outputColumn.Precision externalColumn.Length = outputColumn.Length externalColumn.DataType = outputColumn.DataType externalColumn.Scale = outputColumn.Scale ' Map the external column to the output column. outputColumn.ExternalMetadataColumnID = externalColumn.ID End Sub
During execution, components add rows to output buffers that are created by the data flow task and provided to the component in PrimeOutput. Called once for source components, the method receives an output buffer for each IDTSOutput100 of the component that is connected to a downstream component.
Locating Columns in the Buffer
The output buffer for a component contains the columns defined by the component and any columns added to the output of a downstream component. For example, if a source component provides three columns in its output, and the next component adds a fourth output column, the output buffer provided for use by the source component contains these four columns.
The order of the columns in a buffer row is not defined by the index of the output column in the output column collection. An output column can only be accurately located in a buffer row by using the FindColumnByLineageID method of the BufferManager. This method locates the column with the specified lineage ID in the specified buffer, and returns its location in the row. The indexes of the output columns are typically located in the PreExecute method, and stored for use during PrimeOutput.
The following code example finds the location of the output columns in the output buffer during a call to PreExecute, and stores them in an internal structure. The name of the column is also stored in the structure and is used in the code example for the PrimeOutput method in the next section of this topic.
Public Overrides Sub PreExecute() Me.columnInformation = New ArrayList() Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0) For Each col As IDTSOutputColumn100 In output.OutputColumnCollection Dim ci As ColumnInfo = New ColumnInfo() ci.BufferColumnIndex = BufferManager.FindColumnByLineageID(output.Buffer, col.LineageID) ci.ColumnName = col.Name columnInformation.Add(ci) Next End Sub
Rows are added to the output buffer by calling the AddRow method, which creates a new buffer row with empty values in its columns. The component then assigns values to the individual columns. The output buffers provided to a component are created and monitored by the data flow task. As they become full, the rows in the buffer are moved to the next component. There is no way to determine when a batch of rows has been sent to the next component because the movement of rows by the data flow task is transparent to the component developer, and the RowCount property is always zero on output buffers. When a source component is finished adding rows to its output buffer, it notifies the data flow task by calling the SetEndOfRowset method of the PipelineBuffer, and the remaining rows in the buffer are passed to the next component.
While the source component reads rows from the external data source, you may want to update the "Rows read" or "BLOB bytes read" performance counters by calling the IncrementPipelinePerfCounter method. For more information, see Monitoring the Performance of the Data Flow Engine.
The following code example shows a component that adds rows to an output buffer in PrimeOutput. The indexes of the output columns in the buffer were located using PreExecute in the previous code example.
Public Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer()) Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0) Dim buffer As PipelineBuffer = buffers(0) Dim dataReader As SqlDataReader = sqlCommand.ExecuteReader() ' Loop over the rows in the DataReader, ' and add them to the output buffer. While (dataReader.Read()) ' Add a row to the output buffer. buffer.AddRow() For x As Integer = 0 To columnInformation.Count Dim ci As ColumnInfo = CType(columnInformation(x), ColumnInfo) Dim ordinal As Integer = dataReader.GetOrdinal(ci.ColumnName) If (dataReader.IsDBNull(ordinal)) Then buffer.SetNull(ci.BufferColumnIndex) Else buffer(ci.BufferColumnIndex) = dataReader(ci.ColumnName) End If Next End While buffer.SetEndOfRowset() End Sub
The following sample shows a simple source component that uses a File connection manager to load the binary contents of files into the data flow. This sample does not demonstrate all the methods and functionality discussed in this topic. It demonstrates the important methods that every custom source component must override, but does not contain code for design-time validation. For a more complete sample source component, see the Readme_ADO Source Component Sample.
Imports System Imports System.IO Imports Microsoft.SqlServer.Dts.Pipeline Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper Imports Microsoft.SqlServer.Dts.Runtime.Wrapper Namespace BlobSrc <DtsPipelineComponent(DisplayName="BLOB Inserter Source", Description="Inserts files into the data flow as BLOBs")> _ Public Class BlobSrc Inherits PipelineComponent Private m_ConnMgr As IDTSConnectionManager100 Private m_FileNameColumnIndex As Integer = -1 Private m_FileBlobColumnIndex As Integer = -1 Public Overrides Sub ProvideComponentProperties() Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.New output.Name = "BLOB File Inserter Output" Dim column As IDTSOutputColumn100 = output.OutputColumnCollection.New column.Name = "FileName" column.SetDataTypeProperties(DataType.DT_WSTR, 256, 0, 0, 0) column = output.OutputColumnCollection.New column.Name = "FileBLOB" column.SetDataTypeProperties(DataType.DT_IMAGE, 0, 0, 0, 0) Dim conn As IDTSRuntimeConnection90 = ComponentMetaData.RuntimeConnectionCollection.New conn.Name = "FileConnection" End Sub Public Overrides Sub AcquireConnections(ByVal transaction As Object) Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection(0) m_ConnMgr = conn.ConnectionManager End Sub Public Overrides Sub ReleaseConnections() m_ConnMgr = Nothing End Sub Public Overrides Sub PreExecute() Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0) m_FileNameColumnIndex = CType(BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection(0).LineageID), Integer) m_FileBlobColumnIndex = CType(BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection(1).LineageID), Integer) End Sub Public Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer()) Dim strFileName As String = CType(m_ConnMgr.AcquireConnection(Nothing), String) While Not (strFileName Is Nothing) buffers(0).AddRow buffers(0).SetString(m_FileNameColumnIndex, strFileName) Dim fileInfo As FileInfo = New FileInfo(strFileName) Dim fileData(fileInfo.Length) As Byte Dim fs As FileStream = New FileStream(strFileName, FileMode.Open, FileAccess.Read, FileShare.Read) fs.Read(fileData, 0, fileData.Length) buffers(0).AddBlobData(m_FileBlobColumnIndex, fileData) strFileName = CType(m_ConnMgr.AcquireConnection(Nothing), String) End While buffers(0).SetEndOfRowset End Sub End Class End Namespace