Export (0) Print
Expand All

SSIS Sharding and Azure Components

Challenges

Any nontrivial dataset that will go in an Azure™ SQL database will need some level of sharding. You can accomplish sharding in Microsoft® SQL Server® Integration Services (SSIS) by using a conditional split and multiple destination adapters. However, this technique suffers from several limitations:

  • Design process is tedious. The design surface for SSIS becomes unwieldy when you use many components. Some sharding schemes may contain hundreds of shards. Designing an SSIS package to hit all of these destinations becomes time consuming and error prone because you must open and set up each component. Each component must also have a connection manager that must be parameterized if you will move it from development to production environments without redesign.

    For a more complete discussion of sharding, including reasons and techniques, see the Sharding with SQL Azure blog entry.

  • Dynamically changing environment requires redesign. Parameters allow for server names and database names to change, but they do not allow for numbers of destinations to change. In sharded data stores, the number of shards can change frequently. For traditional, out-of-box components, changes would require a redesign and deployment of an SSIS package. The SSIS sharding destination component can overcome this problem by reading a sharding catalog at each run and establishing the correct connections based on the contents of the sharding catalog.

  • Table names are not dynamic in out-of-box destinations. To insert data in an Azure SQL database, you can use an ADO.NET, ODBC, or OLE DB destination adapter. However, in all of these components, tables must be set and cannot be parameterized. In some logical sharding scenarios, table names can actually change, or additional tables can be added. The sharding components allow for the table names to also be dynamic in the destination.

  • No SSIS components exist for Azure or compression. When your data moves over a wide area or limited bandwidth connection, you must compress files and chunk large data sets to allow for processing into destinations as data is extracted from sources. The Blob source and destination components enable the chunking of data, compression of files, movement into and out of Azure Blob storage (as part of a two-step movement, or as part of a movement into Blob storage for HDInsight usage), and optional use of queues to allow for packages on each end of the limited network to process the chunked data in a pipeline.

The SSIS database sharding components can help you overcome these challenges.

Download and installation

The components and the Sharding Catalog Setup scripts are available as separate downloads from the Microsoft Download Center. To use the components to full effect, download and install both files as follows.

To download and install the components:

  1. Download the SSIS Database Sharding Components.zip file from the download site.

  2. Open the file and move the contents into a directory on the computer on which you intend to install the SSIS components.

    noteNote
    SSIS from Microsoft SQL Server 2012 must be installed on the computer on which you plan to install the components.

  3. Run the SSISDatabaseSharding64.msi installer.

  4. Accept the license agreement, and then click Next. All components and dependencies will be installed.

  5. Open SQL Server Data Tools and start a new SSIS project.

  6. Drag a dataflow task onto the design surface and double-click it.

  7. Look at your SSIS toolbar and ensure that the Blob destination, Blob source, and database sharding components appear in your toolbox as shown in Figure 1.

    Figure 1Figure 1. Blob destination, Blob source, and database sharding components appear in your dataflow toolbox when components are properly installed.

To install the sharding catalog:

  • Download the Sharding Catalog Setup Scripts.zip file from the download site.

  • In a SQL Server database, or in an Azure SQL database that will be available to the package as it is designed and run, execute the ShardingSchema.sql script and then the ShardingStoredProcedures.sql script. These two scripts contain the table definitions for the sharding catalog, along with the stored procedures that assist you in setting up a valid sharding catalog.

  • The TestAndExamples.sql script contains an example of how you can use the stored procedures to set up a sharding catalog.

After you install the components and the sharding catalog, you are ready to use the components to connect to, and move data to, a sharded data destination in any OLE DB–compliant data store.

Setting up a sharding catalog

You can use the sharding destination to enter data destinations manually. However, to gain the full benefit of the ability to change the sharding scheme of your destination without redeveloping your packages, you must set up a sharding catalog. This section walks you through the steps of using the stored procedures to set up a sharding catalog.

A fully set-up sharding catalog contains the following elements:

  1. One or more database servers. In the sharding catalog, you must enter each server that contains a database to be used in the sharding scheme. This information is how the servers will be located, and it includes the server name that the sharding component will use to build the connection string.

  2. One or more databases that you must set up. Each database must exist in a server that has been entered in the sharding catalog. The sharding destination component will use the database name to establish required connections.

  3. At least one shard set that you must create. The shard set is how you will identify the sharding scheme that you will use to distribute the data. Ideally, a shard set is a sharded data store.

  4. At least one sharding distribution that you must create for each shard set. This distribution defines a sharding scheme as POINT or RANGE, defines what column will be used as the sharding key, and defines what the data type of the column is. It also provides information about the column when data types such as FLOAT or DOUBLE are used.

  5. Members that you must set up for each sharding distribution. This membership defines which table and database rows that contain certain values will be sent. Each shard member must relate to a database.

The following is a full example setup.

-- First, add the server.
DECLARE @srvid INT

EXEC [ShardMeta].[AddServerToShardingScheme] 
@ServerName= 'changed.database.windows.net'
,@Descr= 'Destination Server for the Demo'
,@ServerID = @srvid OUTPUT

-- Next, add the shard databases.

DECLARE @shard0ID INT
DECLARE @shard1ID INT
DECLARE @shard2ID INT
DECLARE @shard3ID INT
DECLARE @shard4ID INT
DECLARE @shard5ID INT
DECLARE @shard6ID INT
DECLARE @shard7ID INT
DECLARE @shard8ID INT
DECLARE @shard9ID INT
DECLARE @shard10ID INT

EXEC [ShardMeta].[AddDatabaseToShardingScheme]
@ServerID= @srvid
,@DatabaseName= 'SHARD000'
,@Descr= 'Destination Database for the Demo'
,@DatabaseID= @shard0ID OUTPUT

EXEC [ShardMeta].[AddDatabaseToShardingScheme]
@ServerID= @srvid
,@DatabaseName= 'SHARD001'
,@Descr= 'Destination Database for the Demo'
,@DatabaseID= @shard1ID OUTPUT

EXEC [ShardMeta].[AddDatabaseToShardingScheme]
@ServerID= @srvid
,@DatabaseName= 'SHARD002'
,@Descr= 'Destination Database for the Demo'
,@DatabaseID= @shard2ID OUTPUT

EXEC [ShardMeta].[AddDatabaseToShardingScheme]
@ServerID= @srvid
,@DatabaseName= 'SHARD003'
,@Descr= 'Destination Database for the Demo'
,@DatabaseID= @shard3ID OUTPUT

EXEC [ShardMeta].[AddDatabaseToShardingScheme]
@ServerID= @srvid
,@DatabaseName= 'SHARD004'
,@Descr= 'Destination Database for the Demo'
,@DatabaseID= @shard4ID OUTPUT


EXEC [ShardMeta].[AddDatabaseToShardingScheme]
@ServerID= @srvid
,@DatabaseName= 'SHARD005'
,@Descr= 'Destination Database for the Demo'
,@DatabaseID= @shard5ID OUTPUT

EXEC [ShardMeta].[AddDatabaseToShardingScheme]
@ServerID= @srvid
,@DatabaseName= 'SHARD006'
,@Descr= 'Destination Database for the Demo'
,@DatabaseID= @shard6ID OUTPUT


EXEC [ShardMeta].[AddDatabaseToShardingScheme]
@ServerID= @srvid
,@DatabaseName= 'SHARD007'
,@Descr= 'Destination Database for the Demo'
,@DatabaseID= @shard7ID OUTPUT

EXEC [ShardMeta].[AddDatabaseToShardingScheme]
@ServerID= @srvid
,@DatabaseName= 'SHARD008'
,@Descr= 'Destination Database for the Demo'
,@DatabaseID= @shard8ID OUTPUT

EXEC [ShardMeta].[AddDatabaseToShardingScheme]
@ServerID= @srvid
,@DatabaseName= 'SHARD009'
,@Descr= 'Destination Database for the Demo'
,@DatabaseID= @shard9ID OUTPUT


EXEC [ShardMeta].[AddDatabaseToShardingScheme]
@ServerID= @srvid
,@DatabaseName= 'SHARD010'
,@Descr= 'Destination Database for the Demo'
,@DatabaseID= @shard10ID OUTPUT

-- Next, add the shard set.

DECLARE @ShardSID INT

EXEC [ShardMeta].[AddShardSetToShardingScheme]
@name= 'Point Distro For demo'
,@Descr= 'Point Distribution for Demo'
,@ShardSetID= @ShardSID OUTPUT

-- Add a distribution to the shard set.

-- There are only two types of @Distribution_type:
-- POINT and RANGE
-- POINT means only one value per shard. Example: col1 = 1 or col1 = 2
-- RANGE means a range goes into the shard. Example: -1 < col1 <= 2
-- When @Boundary_value_in_high, the upper bound of a range is included
-- in the shard, but not the lower. So when this is set to 1,
-- a range of -1 to 2 would be: -1 < col1 <= 2
-- Conversely, if it is set to 0, the range from -1 to 2 would be
-- defined as: -1 <= col1 < 2
-- If the sharding column is a system data type, both the
-- @System_Type_Name and @User_type_name parameters contain the system data type name.
-- If the user type is different, name the type name in the @User_Type_Name 
-- parameter.


EXEC [ShardMeta].[AddShardDistributionToShardingScheme]
@ShardSetID= @ShardSID
,@Distribution_Name= 'Point Distro For demo'
,@Distribution_Type= 'POINT'
,@System_Type_Name= 'INT'
,@User_type_name= 'INT'
,@Boundary_value_in_high = 1

-- Add in the members for the “Point Distro For demo” distribution.

-- @MemberID must be unique within each @ShardSetID.
-- The order of MemberID is the order in which the conditions will be
-- evaluated for placement into a shard.
-- @Name is the case-sensitive name of the column as it
-- will appear in the SSIS dataflow. If this is the name
-- of the column in the source, nothing must be done
-- to change this in the SSIS dataflow. However, if it does
-- not exist in the source, use a derived column to create the 
-- column in the dataflow before it reaches the sharding destination.
-- For POINT distributions, @Range_Low and @Range_High are the same.


EXEC [ShardMeta].[AddShardSetMemberToShardingScheme]
@ShardSetID= @ShardSID
,@MemberID= 0
,@Distribution_Name= 'Point Distro For demo'
,@Name= 'ShardingKey'
,@DatabaseID= @Shard0ID
,@TableName= 'dbo.TestData'
,@Range_Low= 0
,@Range_High= 0

EXEC [ShardMeta].[AddShardSetMemberToShardingScheme]
@ShardSetID= @ShardSID
,@MemberID= 1
,@Distribution_Name= 'Point Distro For demo'
,@Name= 'ShardingKey'
,@DatabaseID= @Shard1ID
,@TableName= 'dbo.TestData'
,@Range_Low= 1
,@Range_High= 1

EXEC [ShardMeta].[AddShardSetMemberToShardingScheme]
@ShardSetID= @ShardSID
,@MemberID= 2
,@Distribution_Name= 'Point Distro For demo'
,@Name= 'ShardingKey'
,@DatabaseID= @Shard2ID
,@TableName= 'dbo.TestData'
,@Range_Low= 2
,@Range_High= 2

EXEC [ShardMeta].[AddShardSetMemberToShardingScheme]
@ShardSetID= @ShardSID
,@MemberID= 3
,@Distribution_Name= 'Point Distro For demo'
,@Name= 'ShardingKey'
,@DatabaseID= @Shard3ID
,@TableName= 'dbo.TestData'
,@Range_Low= 3
,@Range_High= 3

EXEC [ShardMeta].[AddShardSetMemberToShardingScheme]
@ShardSetID= @ShardSID
,@MemberID= 4
,@Distribution_Name= 'Point Distro For demo'
,@Name= 'ShardingKey'
,@DatabaseID= @Shard4ID
,@TableName= 'dbo.TestData'
,@Range_Low= 4
,@Range_High= 4

EXEC [ShardMeta].[AddShardSetMemberToShardingScheme]
@ShardSetID= @ShardSID
,@MemberID= 5
,@Distribution_Name= 'Point Distro For demo'
,@Name= 'ShardingKey'
,@DatabaseID= @Shard5ID
,@TableName= 'dbo.TestData'
,@Range_Low= 5
,@Range_High= 5

EXEC [ShardMeta].[AddShardSetMemberToShardingScheme]
@ShardSetID= @ShardSID
,@MemberID= 6
,@Distribution_Name= 'Point Distro For demo'
,@Name= 'ShardingKey'
,@DatabaseID= @Shard6ID
,@TableName= 'dbo.TestData'
,@Range_Low= 6
,@Range_High= 6

EXEC [ShardMeta].[AddShardSetMemberToShardingScheme]
@ShardSetID= @ShardSID
,@MemberID= 7
,@Distribution_Name= 'Point Distro For demo'
,@Name= 'ShardingKey'
,@DatabaseID= @Shard7ID
,@TableName= 'dbo.TestData'
,@Range_Low= 7
,@Range_High= 7

EXEC [ShardMeta].[AddShardSetMemberToShardingScheme]
@ShardSetID= @ShardSID
,@MemberID= 8
,@Distribution_Name= 'Point Distro For demo'
,@Name= 'ShardingKey'
,@DatabaseID= @Shard8ID
,@TableName= 'dbo.TestData'
,@Range_Low= 8
,@Range_High= 8

EXEC [ShardMeta].[AddShardSetMemberToShardingScheme]
@ShardSetID= @ShardSID
,@MemberID= 9
,@Distribution_Name= 'Point Distro For demo'
,@Name= 'ShardingKey'
,@DatabaseID= @Shard9ID
,@TableName= 'dbo.TestData'
,@Range_Low= 9
,@Range_High= 9

EXEC [ShardMeta].[AddShardSetMemberToShardingScheme]
@ShardSetID= @ShardSID
,@MemberID= 10
,@Distribution_Name= 'Point Distro For demo'
,@Name= 'ShardingKey'
,@DatabaseID= @Shard10ID
,@TableName= 'dbo.TestData'
,@Range_Low= 10
,@Range_High= 10

In the demo dataset, 11 shards are set up. Each shard is in a separate database. A single shardset and sharding distribution is set up. The data is sharded on ShardingKey (remember that this is case sensitive). One shardset member is set up for each shard where data will be directed. In this scheme, only in ShardingKey from 0 to 10 will be sent to destinations.

We highly recommend use of the stored procedures. However, due to the validation that each of the stored procedures does to enter the data, you can find the data that is entered by the stored procedures in the catalog tables, and you can modify them without the stored procedures. To find the data that you entered into the stored procedures, look into the tables in the ShardMeta schema.

Using the sharding destination with the sharding catalog

After the sharding catalog is set up and pointed to actual databases, you can use it for a data transfer. This section contains an example of creating a package that will use the sharding component and the sharding catalog.

In this example, the source and destination tables have a schema that is defined as follows.

CREATE TABLE [dbo].[TestData](
[KeyCol] [bigint] NOT NULL,
[IntCol1] [int] NOT NULL,
[IntCol2] [int] NOT NULL,
[IntCol3] [int] NOT NULL,
[CharCol1] [nvarchar](50) NOT NULL,
[CharCol2] [nvarchar](50) NOT NULL,
[CharCol3] [nvarchar](50) NOT NULL,
[CharCol4] [nvarchar](50) NOT NULL,
PRIMARY KEY CLUSTERED 
(
[KeyCol] ASC
)
)

You can create this table and populate it with random data to follow through the example setup by using the shard catalog scheme that you set up previously:

  1. Open SQL Server Data Tools and create a new SSIS project.

  2. Create a package in the project.

  3. Add a dataflow task to the package, and then click the Data Flow tab.

  4. Drag an ADO.NET source onto the design surface of your dataflow.

  5. Follow the steps to create the connection manager and point it to your source data table. Close it after you fully set it up.

    noteNote
    In the sharding scheme setup in the previous section, the sharding key is a column that is named ShardingKey. This key does not exist in the source data, but it must exist in the SSIS dataflow before the data reaches the database sharding destination. Use a derived column to create this column in the SSIS dataflow. This step is not necessary if the shard key exists in the data itself.

  6. Drag a derived column transform onto the design surface of the SSIS dataflow, and connect the output from the ADO.NET source to this component.

  7. Configure the derived column transform as follows:

    1. Use ShardingKey as the derived column name. This name is case sensitive, and it must match the sharding key column name in the sharding catalog.

    2. For the expression, use [KeyCol] % 11. This will give the sharding key the possible values of 0 through 10, which is how the sharding catalog is set up to direct rows.

    After you set up the derived column, it should like Figure 2.

    Figure 2Figure 2. The fully set-up derived column that adds the ShardingKey column to the dataflow and defines it as the modulo of 11 on KeyCol.

  8. Drag a database sharding destination onto the dataflow design surface and connect the output of the derived column transform to it.

  9. Double-click to open the design, and then perform the following steps:

    1. Click New under Please connect to the Sharding Catalog, and then walk through the steps to define a connection manager that points to the sharding catalog database that you created earlier.

    2. In the Please select the Shard Set list, select the shard set that you created for this demo (Point Distro for demo).

    3. Click New under Destination Connection Manager. A new sharding connection manager (SHARDINGCM) appears.

    4. Specify the user name and password that will be used to connect to the destination databases. For the Azure database, you cannot use Security Support Provider Interface (SSPI).

      noteNote
      You can define only one user name and password to connect to all of the servers and databases that participate in your sharding scheme. Ensure that the user name and password exist and have sufficient privileges in each database.

      noteNote
      Although you set the user name and password directly in this demo, they are parameterizable—as are user names and passwords in other connection managers in SSIS. Because the user name and password are parameterizable, you can move your package after development, or you can modify your login information at execution time.

      Figure 3 shows how the editor for the sharding destination adapter appears after you have followed the preceding steps.

      Figure 3Figure 3. Example page 1 of a sharding destination adapter. The drxufmq90b.database.windows.net.ShardCatalog.AzureCat connection manager points to the sharding catalog. The shard set that is used is named Point Distro For Azure CAT Summit. The connection manager that is named SQL Sharding Connection Manager contains the user name and password that will be used to connect to all destination databases in the sharding scheme.

    5. Click OK to exit the Sharding Destination Connection Manager setup.

    6. Click Connections in the left pane of the editor for the sharding destination adapter. At this point, the editor connects to the sharding catalog and displays the destinations as they are currently defined in the sharding catalog, as shown in Figure 4. Note that this is only the data that currently exists in the sharding catalog. You can modify this data after design because on each execution, the component connects to the sharding catalog to determine the current state of the catalog. Select the Use Bulk Insert check box for most cases, although you may want to insert rows individually to handle possible primary key violations.

      Figure 4Figure 4. Connections page on a sharding destination adapter. The information in Expressions and tables has all been pulled from the sharding catalog that was defined on page 1.

    7. Click Mappings in the left pane. The component automatically maps columns of the same name, as shown in Figure 5. However, if columns have nonequal names, you can map them manually. In this case, note that all columns except ShardingKey are mapped. ShardingKey does not exist in the destination table and is in the dataflow solely to allow the sharding destination component to direct the rows to the correct shard.

      Figure 5Figure 5. Mappings page from the sharding destination adapter.

    8. Click OK to exit the editor.

  10. Select the database sharding destination adapter, and then click the Properties tab in SQL Server Data Tools. In the custom properties, note especially FaultToleranceRetryStrategy. Transient fault handling is built into the sharding destination. Properties are set here so that you can change the retry strategy, the number of retries to be attempted, and the retry intervals. Because all of these are set up as custom properties, they are parameterizable so that you can change them at execution time.

At this point, the package is ready for execution or deployment. If you add or remove shards, or change shards between executions, the new sharding configuration will be read the next time the package is executed. Data will be sent to the databases and tables as it exists in the catalog at the beginning of the execution.

Blob components

In addition to the database sharding destination, you can use components to send data to Azure Blob storage or retrieve data from Azure Blob storage. These are the Blob source and Blob destination components.

Completing the information for the Blob destination adapter is a matter of specifying the Azure storage account and key, a target chunk size for the uncompressed data, and a base file name to save the files with. Use the Containers list to select the target container. The Blobs Present list just gives you a view of the files that are already stored in the selected container to help you avoid name conflicts. When the editor is filled in, it may look something like Figure 6.

Figure 6

Figure 6. An example of the Azure Blob destination for SSIS.

When the SSIS package is executed, the Blob destination chunks the data and creates GNU zip (gzip) files in the specified Blob container. It also creates a queue in the storage account that has the same name as the specified file name. The Blob destination places the name of each created file into this queue when you must process the compressed files further. Use a tool such as Neudesic Azure Storage Explorer to see the queue and its contents, or to clear the queue in cases where you need to restart from the beginning.

The Blob source component serves as a source for data files that Blob destination component has created. To use the Blob source component, complete the information in the Storage Account Name and Storage Account Key boxes. Then, the list for the containers is automatically populated. Select the container, and then choose whether retrieval from the queue is required. (We recommend this option in all cases. The option is absolutely necessary if more than one instance of the package will be executed against the same set of data files.) Figure 7 shows an example.

Figure 7

Figure 7.

Usage for HDInsight

Because the Blob storage destination just stores compressed, delimited files in Blob storage, HDInsight can directly consume the files that are produced. Each row is delimited by row delimiter tags, and each column is delimited by column delimiter tags. The column headers are stored in the column header file that is loaded into the container where the files are stored.

Community Additions

ADD
Show:
© 2014 Microsoft