Tool for migrating data from MSSQL to AWS Redshift part 3 / 3

This tool is now mostly finished. I just need to finish the integration tests for Pytest testing module. Otherwise please feel free to clone, fork or contribute.

You can find it here:

https://github.com/datahappy1/mssql_to_redshift_data_transfer_tool

 

Advertisements

Tool for migrating data from MSSQL to AWS Redshift part 2 / 3

As promised, here’s an update on this project. In the MSSQL part, the T-SQL code is pretty much ready, you can check out the installation script here:

https://github.com/datahappy1/mssql_to_redshift_data_transfer_tool/tree/master/install/mssql

So how is this going to work? The Python wrapper will call the Stored Procedure [MSSQL_to_Redshift].[mngmt].[Extract_Filter_BCP] using PyMSSQL module like this:


EXEC [mngmt].[Extract_Filter_BCP]
@DatabaseName = N'AdventureWorksDW2016',
@SchemaName = N'dbo',
@TargetDirectory = N'C:\mssql_to_redshift\files',
@DryRun = 'False'

You’ll provide the Database name and the schema name, that’s the database you’re connecting to and the schema that’s containing the source tables with the data you are about to transfer into AWS Redshift.

You’ll also provide the target directory on your hard drive, that’s the location where the .csv files will be generated using bcp in the xp_cmdshell wrapper inside the Stored Procedure. This target directory will be created for you inside the Python code in the final version. The last parameter is called DryRun. When set to True, BCP extraction query is modified to return 0 rows for each file using “WHERE 1 = 0” pattern.

Once the Python coding part is ready, these Stored Procedure parameters will be internal and you’ll set these values as arguments while running the Python app.

This SP returns a Python-ready “string tuple” with the generated file names from the current run, in the case it succeeded. This tuple will be used further in the Python code to guarantee we pick up and move over to AWS Redshift only the expected set of files.

The main thing here is, that you need to fill out a table called mngmt.ControlTable. In the Github installation script, I loaded this table with the AdventureWorks DataWarehouse 2016 database columns for the demo purposes. So this table holds values like this:

ControlTable The IsActive flag determines, if the column makes it to the generated .csv file created for the corresponding table. Column_id is defining the order of the columns persisted into the .csv file.

The Stored Procedure [MSSQL_to_Redshift].[mngmt].[Extract_Filter_BCP] is writing the logs to a table called [mngmt].[ExecutionLogs] like this:

ExecutionLogs

And that’s all for now. Have a look at the installation build script, that should be pretty self-explanatory.

 

Tool for migrating data from MSSQL to AWS Redshift part 1 / 3

Today, I’d like to introduce to you my new project, a SQLServer to AWS Redshift data migration tool . There’s not much tooling for this out there on the Internet, so I hope this tool is going to be valuable for some of you. It’s going to be written in Python 3.7 and once finished, it will be published to my Github account under a MIT Licence. What I’m currently doing is going to be described here in this blog in 2 phases.

Phase #1 will be all about SQL Server coding, there I’ll need to:

  • extract and filter the data from the SQL Server tables I need to transfer to AWS Redshift
  • I’ll need to persist this data using dynamically generated BCP commands into .csv files ( these .csv files will be split based on the target Redshift tables )
  • I’ll need to store these .csv files on a local hard drive.

Untitled Diagram

Phase #2 will be about Python and AWS Boto3 libraries and wrapping this tool all together to push the data through all the way to AWS Redshift. That means:

  • Upload the .csv files from Phase #1 into a AWS S3 bucket
  • Run the copy commands to load these .csv files to AWS Redshift target tables
  • Do the cleanup of the files and write log data

Untitled Diagram2

As soon as I have this initial version out, I would like to extend this tool to be capable of running incremental data loads based on watermarks as well.

 

Dynamic T-SQL for serialization of columns in a table

Sometimes you might need in your ETL project to move data between stages with explicit column name definition in the queries. I typically end up using the following code snippet I’ve written for straight moves between stages. You could do this without dynamic SQL as well, but I prefer this approach as I usually use this snippet in a iteration to reload all tables based on evaluating some configuration logic inside a config table. The dynamic SQL also helps to append to the columns list stage-specific values, replacing values,dynamically evaluated lookups etc. As you can guess from my previous posts, the next step is creating the INSERT INTO (@COLUMN_LIST) SELECT @COLUMN_LIST statements and kicking this off with a declared batch size as a variable. Simple stuff but having explicitly defined column names is often a must. Next time I’d like to blog on AWS Batch and event-driven data processing so stay tuned.


USE [AdventureWorks2014];
GO

SET ANSI_NULLS ON;
GO

SET QUOTED_IDENTIFIER ON;
GO

DECLARE @STMT_MOVE_PRE_STG_COLS nvarchar(max);
DECLARE @COLUMN_LIST nvarchar(max);
DECLARE @schema_name nvarchar(255);
DECLARE @table_name nvarchar(255);
SET @schema_name = 'Production'
SET @table_name = 'Product'

SET @STMT_MOVE_PRE_STG_COLS = 'SELECT @COLUMN_LIST_IN =
STUFF(
(SELECT '','' + C.NAME
FROM SYS.COLUMNS C
INNER JOIN SYS.TABLES T ON C.Object_ID = T.Object_ID
INNER JOIN SYS.SCHEMAS S ON S.Schema_ID = T.Schema_ID
WHERE S.NAME = '''+ @schema_name +''' AND T.NAME = '''+ @table_name + '''
ORDER BY C.NAME
FOR XML PATH ('''')), 1, 1, '''')';

EXECUTE SP_EXECUTESQL @STMT_MOVE_PRE_STG_COLS, N'@COLUMN_LIST_IN NVARCHAR(MAX) OUT', @COLUMN_LIST out;

PRINT @COLUMN_LIST;

TSQL code to list all columns in a DB containing only NULLs

A colleague of mine lately asked me if I know about a quick way, how to get a list of all the columns in a Database, that contain only NULLs. Well, here’s a script I came up with. I am pretty sure, there are other ( even better ) options out there, but for some reason I liked this so I decided to share the code here.

DECLARE @ID int;
DECLARE @query1 nvarchar(MAX);
DECLARE @TableName nvarchar(255);
DECLARE @ColumnName nvarchar(255);
DECLARE @SchemaName nvarchar(255);

USE AdventureWorksDW2014; --USE THE DB YOU ARE INTERESTED IN
SET NOCOUNT ON;

IF OBJECT_ID('tempdb..#tmp_counts') IS NOT NULL
DROP TABLE #tmp_counts;

IF OBJECT_ID('tempdb..#RESULTSET') IS NOT NULL
DROP TABLE #RESULTSET;

CREATE TABLE #tmp_counts (
ID int IDENTITY (1,1) PRIMARY KEY,
SchemaName nvarchar(255),
TableName nvarchar(255),
ColumnName nvarchar(255));

CREATE TABLE #RESULTSET (
SchemaName nvarchar(255),
TableName NVARCHAR(255),
ColumnName NVARCHAR(255),
[CountOfRows] int,
[CountOfRows_NULL] int);

INSERT INTO #tmp_counts
SELECT
s.name [Schema],
t.name [TableName],
c.name [ColumnName]
FROM sys.tables t
INNER JOIN sys.schemas s ON t.schema_id = s.schema_id
INNER JOIN sys.columns c ON c.object_id = t.object_id
WHERE c.Is_Nullable = 1
ORDER BY 1,2;

--
--Lets start looping through the temp table
WHILE EXISTS (SELECT * FROM #tmp_counts)
BEGIN

SELECT
TOP(1) @ID = ID,
@SchemaName = Schemaname,
@TableName = TableName,
@ColumnName = ColumnName
FROM #tmp_counts
ORDER BY ID;

SELECT @query1 =
'INSERT INTO #RESULTSET  SELECT '''+@SchemaName+''','''+ @TableName+ ''' AS TableName,'''+@ColumnName+ ''' AS ColumnName,
(SELECT COUNT(*) AS CountOfRows FROM ['+@SchemaName+'].['+@TableName+'] WITH (NOLOCK)),
(SELECT COUNT(*) AS CountOfRows_NULL FROM ['+@SchemaName+'].['+@TableName+'] WITH (NOLOCK) WHERE ['+@ColumnName+'] IS NULL);';

EXEC (@query1);
DELETE FROM #tmp_counts WHERE ID = @ID;
END

--YOUR FINAL QUERY IS HERE:
SELECT 'Column with all rows NULL' [Check Description], *
FROM #RESULTSET
WHERE CountOfRows > 0 AND
CountOfRows = CountOfRows_NULL;

TSQL histogram

Sometimes I like to fiddle around with TSQL. Not sure how useful this trick might be, but here is a code I came up with, that delivers histograms based on your data and a few variables you define. So as you can see, the dataset for this script is quite known Adventure Works DW dbo.factInternetSales . You can define the bucket count and the Bar chart width variables to fine tune your output. This histogram splits the dataset into the declared buckets based on the ProductKey FK column values.

SET NOCOUNT ON;

USE AdventureWorksDW2014;

DECLARE @BucketCount DECIMAL(8,3) = 10;
DECLARE @BarChartWidth INT = 100;
DECLARE @iKeyCount DECIMAL(8,3) = (SELECT COUNT(DISTINCT ProductKey) FROM FactInternetSales);
DECLARE @iBucketSize DECIMAL(8,3) = @iKeyCount / @BucketCount;

SELECT
i.*,
REPLICATE('=', @BarChartWidth * i.SumSalesAmount / (SELECT SUM(SalesAmount) FROM factInternetSales)) AS [BarChart]
FROM (
     SELECT
     ROW_NUMBER() OVER (ORDER BY (SELECT NULL)) Bucket_ID,
     MIN(ii.ProductKey) Bucket_Range_From,
     MAX(ii.ProductKey) Bucket_Range_To,
     COUNT(ii.Bucket_ProductKeys_Count) Bucket_ProductKeys_Count,
     SUM(ii.SumSalesAmount) SumSalesAmount
     FROM (
          SELECT
          ProductKey,
          CEILING(CAST((ROW_NUMBER() OVER (ORDER BY ProductKey) )/@iBucketSize AS DECIMAL(8,3))) iBucket_ID,
          COUNT(*) Bucket_ProductKeys_Count,
          SUM(SalesAmount) SumSalesAmount
          FROM FactInternetSales
          GROUP BY ProductKey
          ) ii
      GROUP BY ii.iBucket_ID
      ) i;

And the result may look like this:

histogram1

But I would also like to see more accurate solution. So digging deeper, I came up with a code, that splits the dataset into the declared buckets based on the composite PK SalesOrderNumber, SalesOrderLineNumber. ( I also added precision to the decimal Datatype ) This code still feels quite straightforward to me, but gets a little bit more complex.

SET NOCOUNT ON;

USE AdventureWorksDW2014;

DECLARE @BarChartWidth INT = 100;
DECLARE @BucketCount DECIMAL(38,18) = 10;
DECLARE @iCount DECIMAL(38,18) = (SELECT COUNT(*) FROM FactInternetSales);
DECLARE @iBucketSize DECIMAL(38,18) = @iCount / @BucketCount;

;WITH CTE AS
(
     SELECT
     i.iKey,
     i.iID,
     i.iBucket_ID,
     i.SumSalesAmount
     FROM (
          SELECT
          TOP 100 PERCENT
          CAST([SalesOrderNumber] AS VARCHAR) + '-' + CAST([SalesOrderLineNumber] AS VARCHAR) iKey,
          ROW_NUMBER() OVER (ORDER BY (SELECT NULL)) iID,
          CEILING(CAST((ROW_NUMBER() OVER (ORDER BY (SELECT NULL)) ) / @iBucketSize AS DECIMAL(38,18))) iBucket_ID,
          [SalesOrderNumber],
          [SalesOrderLineNumber],
          SUM(SalesAmount) SumSalesAmount
          FROM FactInternetSales
          GROUP BY [SalesOrderNumber],[SalesOrderLineNumber]
          ORDER BY CAST(REPLACE([SalesOrderNumber],'SO','') AS INT),[SalesOrderLineNumber]
          ) i
)

SELECT
i.iBucket_ID,
i.Bucket_Range_From,
i.Bucket_Range_To,
(SELECT Cfrom.iKey FROM CTE Cfrom WHERE Cfrom.iID = i.Bucket_Range_From) Bucket_Range_SalesOrderKey_From,
(SELECT Cto.iKey FROM CTE Cto WHERE Cto.iID = i.Bucket_Range_To) Bucket_Range_SalesOrderKey_To,
i.Bucket_Row_Count,
i.Bucket_Sales_Amount,
REPLICATE('=', @BarChartWidth * i.Bucket_Sales_Amount / (SELECT SUM(SalesAmount) FROM factInternetSales)) AS [BarChart]
FROM (
     SELECT
     TOP 100 PERCENT
     ii.iBucket_ID,
     MIN(ii.iID) Bucket_Range_From,
     MAX(ii.iID) Bucket_Range_To,
     COUNT(ii.iID) Bucket_Row_Count,
     SUM(ii.SumSalesAmount) Bucket_Sales_Amount
     FROM
     CTE ii
     GROUP BY ii.iBucket_ID
     ORDER BY ii.iBucket_ID
     ) i
ORDER BY iBucket_ID;

The final result is as expected ( check the Bucket_Row_Count column ) and can look like this:

histogram2

 

Compare multiple DB schemas

Sometimes you may need to do a quick examination on how are the DBs, that are supposed to have the same schema, different. This quick query allows you to seek for the outlying and the intersecting columns across multiple databases. If you are looking only for the outlying columns and in which DBs they are, the code below is to be used. If you are looking for the intersection of multiple DBs, just switch from HAVING COUNT(0) <> to HAVING COUNT(0) = and remove the list of databases that the columns are in..

In my example, I created 3 DBs Test 1-3 with some differences between the schemas. The key determining a unique column is defined in the Compound_key column.

schema_compare

IF OBJECT_ID('tempdb..#DB_Schema_Compare') IS NOT NULL
DROP TABLE #DB_Schema_Compare;
CREATE TABLE #DB_Schema_Compare
( [DB_Name] NVARCHAR(100),
[Compound_Key] NVARCHAR(Max)
)

USE Test3;

INSERT INTO #DB_Schema_Compare
SELECT
'Test3',
--s.name SchemaName,
--t.name TableName,
--c.name ColumnName,
--c.is_computed ColumnIsComputed,
--c.is_identity ColumnIsIdentity,
--c.is_nullable ColumnIsNullable,
--c.collation_name ColumnCollationName,
--c.max_length ColumnMaxLength,
CAST(
CAST(s.name AS VARCHAR(200)) + '^' +
CAST(t.name AS VARCHAR(200)) + '^' +
CAST(c.name AS VARCHAR(200)) + '^' +
CAST(c.is_computed AS VARCHAR(1)) + '^' +
CAST(c.is_identity AS VARCHAR(1)) + '^' +
CAST(c.is_nullable AS VARCHAR(1)) + '^' +
ISNULL(CAST(c.collation_name AS VARCHAR(200)) + '^','^') +
CAST(c.max_length AS VARCHAR(10)) + '^' +
CAST(ty.name AS VARCHAR(200))
AS VARCHAR(MAX)) CompoundKey
FROM sys.columns c
inner join sys.tables t on c.object_id = t.object_id
inner join sys.schemas s on s.schema_id = t.schema_id
inner join sys.types ty on ty.system_type_id = c.system_type_id;

SELECT
i.*,
STUFF(( SELECT ',' + SUB.[DB_Name] AS [text()]
FROM
#DB_Schema_Compare SUB
WHERE
SUB.Compound_Key = i.Compound_Key
ORDER BY SUB.[DB_Name]
FOR XML PATH('')
), 1, 1, '' )
AS [DBs_we_HAVE_this_compound_key_in]
FROM (
     SELECT
     COUNT(0) CNT,
     Compound_Key
     FROM
     #DB_Schema_Compare
     GROUP BY [Compound_Key]
     HAVING COUNT(0) <> (SELECT COUNT(DISTINCT [DB_Name]) FROM #DB_Schema_Compare)
) i;

TSQL Large data loads split by a declared batch size

A couple of days back, I was asked how would I use SQL grouping functions to split huge data load into separate batches. Below is the code I came up with. The next logical step would be to load the statements into a temp table, iterate through it and execute the statements with sp_executesql. It is needed to be said, that if you have big gaps of missing IDs in the PK you are scanning, this might not be the best and most accurate solution.


USE [AdventureWorks2012];
GO

DECLARE @BATCHSIZE INT = 10000;

/*CHECK THE MIN AND MAX IDs IN THE SOURCE TABLE*/
SELECT MIN(SalesOrderID) MinID,MAX(SalesOrderID) MaxID FROM [Sales].[SalesOrderHeader];
/**/

SELECT
--i.PartitionID,
--MIN(i.SalesOrderID) MinID,
--MAX(i.SalesOrderID) MaxID,
'INSERT INTO TARGET(..) SELECT COL1,COL2,.. FROM SOURCE WITH (NOLOCK) WHERE SalesOrderID BETWEEN '+
CAST(MIN(i.SalesOrderID) AS VARCHAR(10)) + ' AND ' +
CAST(MAX(i.SalesOrderID) AS VARCHAR(10)) + '; '
AS STMT
FROM
(
	SELECT
	SalesOrderID,
	SalesOrderID / @BATCHSIZE PartitionID
	FROM [Sales].[SalesOrderHeader] WITH (NOLOCK)
) i
GROUP BY i.PartitionID
--ORDER BY i.PartitionID;

sp_testlinkedserver in a try…catch block

Sometimes you definitely need to go with quick workarounds. I am pretty much sure I am not the only BI developer working from time to time with legacy and somehow wacky old code used for production purposes. This time I came across a legacy scheduled stored procedure filling a dataset for SSRS reporting purposes calling openrowset to run MDX query against an OLAP cube but the linked server was failing from time to time because of the weak connections. Whenever the linked server call would fail, there would be simply no reporting as the MDX results were later used in an INNER JOIN 🙂 . I kind of wonder what did the people writing this code thought back then. Anyway I needed this SP to stop failing and to have results even if the linked server connection would fail and to be informed that the linked server call failed so I could react and persist the results from the last successful run.

The solution is quite easy and so far seems bullet proof. Lets use this sample MDX code enveloped in an openrowset for example:

SELECT a.*
FROM OpenRowset( 'MSOLAP','DATASOURCE=myOlapServer; Initial Catalog=FoodMart;',
'SELECT Measures.members ON ROWS,
[Product Category].members ON COLUMNS
FROM [Sales]')
as a
GO

So the trick is to add this chunk of code after sp_testlinkedserver which tests if we are able to connect to the specified linked server and we need to run this together in the try block. Also we might want to set the variable @err to know that an error happened. The code could look something like this:


DECLARE @err BIT = 0;

BEGIN TRY
EXEC sp_testlinkedserver N'MSOLAP';

SELECT a.*
INTO #results
FROM OpenRowset( 'MSOLAP','DATASOURCE=myOlapServer; Initial Catalog=FoodMart;',
'SELECT Measures.members ON ROWS,
[Product Category].members ON COLUMNS
FROM [Sales]' )
as a;

END TRY

BEGIN CATCH
SET @err = 1;
END CATCH

IF @err = 1
BEGIN
--FOR EXAMPLE USE AND PERSIST RESULTS FROM THE LAST TIME THE SP WAS SUCCESSFULLY EXECUTED..
END

Disclaimer: This is just a quick fix tutorial, I definitely agree this is not the best example of using
the try…catch block. Details on sp_testlinkedserver can be easily found here.