Export (0) Print
Expand All

Developing a Custom Source Component

Updated: 14 April 2006

SQL Server 2005 Integration Services (SSIS) gives developers the ability to write source components that can connect to custom data sources and supply data from those sources to other components in a data flow task. The ability to create custom sources is valuable when you must connect to data sources that cannot be accessed by using one of the existing Integration Services sources.

Source components have one or more outputs and zero inputs. At design time, source components are used to create and configure connections, read column metadata from the external data source, and configure the source's output columns based on the external data source. During execution they connect to the external data source and add rows to an output buffer. The data flow task then provides this buffer of data rows to downstream components.

For a sample source component, see the ADO Source Component Sample. For a general overview of data flow component development, see Developing a Custom Data Flow Component.

Implementing the design-time functionality of a source component involves specifying a connection to an external data source, adding and configuring output columns that reflect the data source, and validating that the component is ready to execute. By definition, a source component has zero inputs and one or more asynchronous outputs.

Creating the Component

Source components connect to external data sources by using ConnectionManager objects defined in a package. They indicate their requirement for a connection manager by adding an element to the RuntimeConnectionCollection collection of the ComponentMetaData property. This collection serves two purposes—to hold references to connection managers in the package used by the component, and to advertise the need for a connection manager to the designer. When an IDTSRuntimeConnection90 has been added to the collection, the Advanced Editor displays the Connection Properties tab, which lets users select or create a connection in the package.

The following code example shows an implementation of ProvideComponentProperties that adds an output, and adds a IDTSRuntimeConnection90 object to the RuntimeConnectionCollection.

Imports System.Data
Imports System.Data.SqlClient
Imports Microsoft.SqlServer.Dts.Runtime
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper

<DtsPipelineComponent(DisplayName:="MySourceComponent", ComponentType:=ComponentType.SourceAdapter)> _
Public Class MySourceComponent
    Inherits PipelineComponent

    Public Overrides Sub ProvideComponentProperties()

        ' Allow for resetting the component.
        RemoveAllInputsOutputsAndCustomProperties()
        ComponentMetaData.RuntimeConnectionCollection.RemoveAll()

        Dim output As IDTSOutput90 = ComponentMetaData.OutputCollection.New()
        output.Name = "Output"

        Dim connection As IDTSRuntimeConnection90 = ComponentMetaData.RuntimeConnectionCollection.New()
        connection.Name = "ADO.NET"

    End Sub
End Class

Connecting to an External Data Source

After a connection has been added to the RuntimeConnectionCollection, you override the AcquireConnections method to establish a connection to the external data source. This method is called during both design and execution time. The component should establish a connection to the connection manager specified by the run-time connection, and subsequently, to the external data source.

After the connection is established, it should be cached internally by the component and released when the ReleaseConnections method is called. The ReleaseConnections method is called at design and execution time, like the AcquireConnections method. Developers override this method, and release the connection established by the component during AcquireConnections.

The following code example shows a component that connects to an ADO.NET connection in the AcquireConnections method and closes the connection in the ReleaseConnections method.

Private sqlConnection As SqlConnection

Public Overrides Sub AcquireConnections(ByVal transaction As Object)

    If Not IsNothing(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager) Then

        Dim cm As ConnectionManager = Microsoft.SqlServer.Dts.Runtime.DtsConvert.ToConnectionManager(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager)
        Dim cmado As ConnectionManagerAdoNet = CType(cm.InnerObject, ConnectionManagerAdoNet)

        If IsNothing(cmado) Then
            Throw New Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.")
        End If

        sqlConnection = CType(cmado.AcquireConnection(transaction), SqlConnection)
        sqlConnection.Open()

    End If
End Sub

Public Overrides Sub ReleaseConnections()

    If Not IsNothing(sqlConnection) And sqlConnection.State <> ConnectionState.Closed Then
        sqlConnection.Close()
    End If

End Sub

Creating and Configuring Output Columns

The output columns of a source component reflect the columns from the external data source that the component adds to the data flow during execution. At design time, you add output columns after the component has been configured to connect to an external data source. The design-time method that a component uses to add the columns to its output collection can vary based on the needs of the component, although they should not be added during Validate or AcquireConnections. For example, a component that contains a SQL statement in a custom property that controls the data set for the component may add its output columns during the SetComponentProperty method. The component checks to see whether it has a cached connection, and, if it does, connects to the data source and generates its output columns.

After an output column has been created, set its data type properties by calling the SetDataTypeProperties method. This method is necessary because the DataType, Length, Precision, and CodePage properties are read-only and each property is dependent on the settings of the other. This method enforces the need for these values to be set consistently, and the data flow task validates that they are set correctly.

The DataType of the column determines the values that are set for the other properties. The following table shows the requirements on the dependent properties for each DataType. The data types not listed have their dependent properties set to zero.

DataType Length Scale Precision CodePage

DT_DECIMAL

0

Greater than 0 and less than or equal to 28.

0

0

DT_CY

0

0

0

0

DT_NUMERIC

0

Greater than 0 and less than or equal to 28, and less than Precision.

Greater than or equal to 1 and less than or equal to 38.

0

DT_BYTES

Greater than 0.

0

0

0

DT_STR

Greater than 0 and less than 8000.

0

0

Not 0, and a valid code page.

DT_WSTR

Greater than 0 and less than 4000.

0

0

0

Because the restrictions on the data type properties are based on the data type of the output column, you must choose the correct SSIS data type when you work with managed types. The base class provides three helper methods, ConvertBufferDataTypeToFitManaged, BufferTypeToDataRecordType, and DataRecordTypeToBufferType, to assist managed component developers in selecting an SSIS data type given a managed type. These methods convert managed data types to SSIS data types, and vice versa.

The following code example shows how the output column collection of a component is populated based on the schema of a table. The helper methods of the base class are used to set the data type of the column, and the dependent properties are set based on the data type.

Private sqlCommand As SqlCommand

Private Sub CreateColumnsFromDataTable()

    ' Get the output.
    Dim output As IDTSOutput90 = ComponentMetaData.OutputCollection(0)

    ' Start clean, and remove the columns from both collections.
    output.OutputColumnCollection.RemoveAll()
    output.ExternalMetadataColumnCollection.RemoveAll()

    Me.sqlCommand = sqlConnection.CreateCommand()
    Me.sqlCommand.CommandType = CommandType.Text
    Me.sqlCommand.CommandText = CStr(ComponentMetaData.CustomPropertyCollection("SqlStatement").Value)

    Dim schemaReader As SqlDataReader = Me.sqlCommand.ExecuteReader(CommandBehavior.SchemaOnly)
    Dim dataTable As DataTable = schemaReader.GetSchemaTable()

    ' Walk the columns in the schema, 
    ' and for each data column create an output column and an external metadata column.
    For Each row As DataRow In dataTable.Rows

        Dim outColumn As IDTSOutputColumn90 = output.OutputColumnCollection.New()
        Dim exColumn As IDTSExternalMetadataColumn90 = output.ExternalMetadataColumnCollection.New()

        ' Set column data type properties.
        Dim isLong As Boolean = False
        Dim dt As DataType = DataRecordTypeToBufferType(CType(row("DataType"), Type))
        dt = ConvertBufferDataTypeToFitManaged(dt, isLong)
        Dim length As Integer = 0
        Dim precision As Integer = CType(row("NumericPrecision"), Short)
        Dim scale As Integer = CType(row("NumericScale"), Short)
        Dim codepage As Integer = dataTable.Locale.TextInfo.ANSICodePage

        Select Case dt

            ' The length cannot be zero, and the code page property must contain a valid code page.
            Case DataType.DT_STR
            Case DataType.DT_TEXT
                length = precision
                precision = 0
                scale = 0

            Case DataType.DT_WSTR
                length = precision
                codepage = 0
                scale = 0
                precision = 0

            Case DataType.DT_BYTES
                precision = 0
                scale = 0
                codepage = 0

            Case DataType.DT_NUMERIC
                length = 0
                codepage = 0

                If precision > 38 Then
                    precision = 38
                End If

                If scale > 6 Then
                    scale = 6
                End If

            Case DataType.DT_DECIMAL
                length = 0
                precision = 0
                codepage = 0

            Case Else
                length = 0
                precision = 0
                codepage = 0
                scale = 0
        End Select

        ' Set the properties of the output column.
        outColumn.Name = CStr(row("ColumnName"))
        outColumn.SetDataTypeProperties(dt, length, precision, scale, codepage)
    Next
End Sub

Validating the Component

You should validate a source component and verify that the columns defined in its output column collections match the columns at the external data source. Sometimes, verifying the output columns against the external data source can be impossible, such as in a disconnected state, or when it is preferable to avoid lengthy round trips to the server. In these situations, the columns in the output can still be validated by using the ExternalMetadataColumnCollection of the output object. For more information, see Validating a Data Flow Component.

This collection exists on both input and output objects and you can populate it with the columns from the external data source. You can use this collection to validate the output columns when SSIS Designer is offline, when the component is disconnected, or when the ValidateExternalMetadata property is false. The collection should be first populated at the same time that the output columns are created. Adding external metadata columns to the collection is relatively easy because the external metadata column should initially match the output column. The data type properties of the column should have already been set correctly, and the properties can be copied directly to the IDTSExternalMetadataColumn90 object.

The following sample code adds an external metadata column that is based on a newly created output column. It assumes that the output column has already been created.

Private Sub CreateExternalMetaDataColumn(ByVal output As IDTSOutput90, ByVal outputColumn As IDTSOutputColumn90)


        ' Set the properties of the external metadata column.
        Dim externalColumn As IDTSExternalMetadataColumn90 = output.ExternalMetadataColumnCollection.New()
        externalColumn.Name = outputColumn.Name
        externalColumn.Precision = outputColumn.Precision
        externalColumn.Length = outputColumn.Length
        externalColumn.DataType = outputColumn.DataType
        externalColumn.Scale = outputColumn.Scale

        ' Map the external column to the output column.
        outputColumn.ExternalMetadataColumnID = externalColumn.ID

    End Sub

During execution, components add rows to output buffers that are created by the data flow task and provided to the component in PrimeOutput. Called once for source components, the method receives an output buffer for each IDTSOutput90 of the component that is connected to a downstream component.

Locating Columns in the Buffer

The output buffer for a component contains the columns defined by the component and any columns added to the output of a downstream component. For example, if a source component provides three columns in its output, and the next component adds a fourth output column, the output buffer provided for use by the source component contains these four columns.

The order of the columns in a buffer row is not defined by the index of the output column in the output column collection. An output column can only be accurately located in a buffer row by using the FindColumnByLineageID method of the BufferManager. This method locates the column with the specified lineage ID in the specified buffer, and returns its location in the row. The indexes of the output columns are typically located in the PreExecute method, and stored for use during PrimeOutput.

The following code example finds the location of the output columns in the output buffer during a call to PreExecute, and stores them in an internal structure. The name of the column is also stored in the structure and is used in the code example for the PrimeOutput method in the next section of this topic.

Public Overrides Sub PreExecute()

    Me.columnInformation = New ArrayList()
    Dim output As IDTSOutput90 = ComponentMetaData.OutputCollection(0)

    For Each col As IDTSOutputColumn90 In output.OutputColumnCollection

        Dim ci As ColumnInfo = New ColumnInfo()
        ci.BufferColumnIndex = BufferManager.FindColumnByLineageID(output.Buffer, col.LineageID)
        ci.ColumnName = col.Name
        columnInformation.Add(ci)
    Next
End Sub

Processing Rows

Rows are added to the output buffer by calling the AddRow method, which creates a new buffer row with empty values in its columns. The component then assigns values to the individual columns. The output buffers provided to a component are created and monitored by the data flow task. As they become full, the rows in the buffer are moved to the next component. There is no way to determine when a batch of rows has been sent to the next component because the movement of rows by the data flow task is transparent to the component developer, and the RowCount property is always zero on output buffers. When a source component is finished adding rows to its output buffer, it notifies the data flow task by calling the SetEndOfRowset method of the PipelineBuffer, and the remaining rows in the buffer are passed to the next component.

While the source component reads rows from the external data source, you may want to update the "Rows read" or "BLOB bytes read" performance counters by calling the IncrementPipelinePerfCounter method. For more information, see Monitoring Performance of the Data Flow Engine.

The following code example shows a component that adds rows to an output buffer in PrimeOutput. The indexes of the output columns in the buffer were located using PreExecute in the previous code example.

Public Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer())

    Dim output As IDTSOutput90 = ComponentMetaData.OutputCollection(0)
    Dim buffer As PipelineBuffer = buffers(0)

    Dim dataReader As SqlDataReader = sqlCommand.ExecuteReader()

    ' Loop over the rows in the DataReader, 
    ' and add them to the output buffer.
    While (dataReader.Read())

        ' Add a row to the output buffer.
        buffer.AddRow()

        For x As Integer = 0 To columnInformation.Count

            Dim ci As ColumnInfo = CType(columnInformation(x), ColumnInfo)

            Dim ordinal As Integer = dataReader.GetOrdinal(ci.ColumnName)

            If (dataReader.IsDBNull(ordinal)) Then
                buffer.SetNull(ci.BufferColumnIndex)
            Else
                buffer(ci.BufferColumnIndex) = dataReader(ci.ColumnName)

            End If
        Next

    End While

    buffer.SetEndOfRowset()
End Sub

The following sample shows a simple source component that uses a File connection manager to load the binary contents of files into the data flow. This sample does not demonstrate all the methods and functionality discussed in this topic. It demonstrates the important methods that every custom source component must override, but does not contain code for design-time validation. For a more complete sample source component, see the ADO Source Component Sample.

Imports System 
Imports System.IO 
Imports Microsoft.SqlServer.Dts.Pipeline 
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper 
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper 
Namespace BlobSrc 

 <DtsPipelineComponent(DisplayName="BLOB Inserter Source", Description="Inserts files into the data flow as BLOBs")> _ 
 Public Class BlobSrc 
 Inherits PipelineComponent 
   Private m_ConnMgr As IDTSConnectionManager90 
   Private m_FileNameColumnIndex As Integer = -1 
   Private m_FileBlobColumnIndex As Integer = -1 

   Public  Overrides Sub ProvideComponentProperties() 
     Dim output As IDTSOutput90 = ComponentMetaData.OutputCollection.New 
     output.Name = "BLOB File Inserter Output" 
     Dim column As IDTSOutputColumn90 = output.OutputColumnCollection.New 
     column.Name = "FileName" 
     column.SetDataTypeProperties(DataType.DT_WSTR, 256, 0, 0, 0) 
     column = output.OutputColumnCollection.New 
     column.Name = "FileBLOB" 
     column.SetDataTypeProperties(DataType.DT_IMAGE, 0, 0, 0, 0) 
     Dim conn As IDTSRuntimeConnection90 = ComponentMetaData.RuntimeConnectionCollection.New 
     conn.Name = "FileConnection" 
   End Sub 

   Public  Overrides Sub AcquireConnections(ByVal transaction As Object) 
     Dim conn As IDTSRuntimeConnection90 = ComponentMetaData.RuntimeConnectionCollection(0) 
     m_ConnMgr = conn.ConnectionManager 
   End Sub 

   Public  Overrides Sub ReleaseConnections() 
     m_ConnMgr = Nothing 
   End Sub 

   Public  Overrides Sub PreExecute() 
     Dim output As IDTSOutput90 = ComponentMetaData.OutputCollection(0) 
     m_FileNameColumnIndex = CType(BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection(0).LineageID), Integer) 
     m_FileBlobColumnIndex = CType(BufferManager.FindColumnByLineageID(output.Buffer, output.OutputColumnCollection(1).LineageID), Integer) 
   End Sub 

   Public  Overrides Sub PrimeOutput(ByVal outputs As Integer, ByVal outputIDs As Integer(), ByVal buffers As PipelineBuffer()) 
     Dim strFileName As String = CType(m_ConnMgr.AcquireConnection(Nothing), String) 
     While Not (strFileName Is Nothing) 
       buffers(0).AddRow 
       buffers(0).SetString(m_FileNameColumnIndex, strFileName) 
       Dim fileInfo As FileInfo = New FileInfo(strFileName) 
       Dim fileData(fileInfo.Length) As Byte 
       Dim fs As FileStream = New FileStream(strFileName, FileMode.Open, FileAccess.Read, FileShare.Read) 
       fs.Read(fileData, 0, fileData.Length) 
       buffers(0).AddBlobData(m_FileBlobColumnIndex, fileData) 
       strFileName = CType(m_ConnMgr.AcquireConnection(Nothing), String) 
     End While 
     buffers(0).SetEndOfRowset 
   End Sub 
 End Class 
End Namespace

Release History

14 April 2006

New content:
  • Noted the availability of performance counters for rows read.

Community Additions

ADD
Show:
© 2014 Microsoft