In this post, we will cover how to create a simple and fully dynamic synapse pipeline to incrementally load data from an SQL database to parquet files stored in a data lake.
Why should we move data from a database to a data lake
In the traditional data warehouse approach, it is very common to move all the data sources and historize them into an ODS database.
However, moving and storing data from all the data sources into a single database can quickly become costly and very complex to develop and maintain.
On the other hand, moving all data sources (even those from databases) to a data lake will likely be much cheaper, quicker, and easier to develop.
Once all the enterprise data sources are stored in the data lake, we can curate, transform and move the data where we wish.
Data lake and parquet files
To keep things short and simple we can say that a data lake is a storage place that stores and holds the enterprise data in its native, raw format. As we saw above this approach differs from the old traditional approach where we usually transform the data before the ingestion process.
In this post, we will store the data as parquet files which tend to offer better performance and compression than other common file formats. But depending on your scenario and what you want to do with your data once they are in the data lake you may need to work with Avro or even CSV formats.
So even if I use parquet files in this post the pipeline that we will create will work with any of those formats and would require only one small change in the sink dataset configuration.
Setting up the synapse pipeline
As always to set up a synapse pipeline we first need to define the dataset source and the sink and the linked service used by the datasets. If you are moving data from an Onprem database which is what I’m doing in this post we also need to configure a self-hosted integration runtime.
We first need to set up the two linked services one for the dataset source and one for the dataset sink.
Linked Service for the dataset source
Go to linked Services then click on new and then select SQL Server, this method would work with any SQL database but I used SQL server on-prem for this post.
The most important thing is to make our connection string dynamic so for that we need to create two parameters the ServerName and the DBNAme.
For the authentification, I suggest using a service account and Azure key vault to store the password.
There are two main reasons at least that I can think of to use a dynamic connections string.
The reusability so we can reuse the same linked service across different databases and servers.
And the deployment, by using a server parameter we can simply change the server according to where we deploy the pipeline such as Test, UAT, and Prod.
Linked Service for the dataset sink
For the target dataset, we need to create an Azure Data Lake Storage V2 linked Services. It is pretty straightforward to configure since you only need to pass the endpoint URL of your storage account. I assume that a storage account has already been configured with the right permission granted to the Synapse-managed identity account, if not you can follow the instruction here.
Similar to the linked services we must ensure that the datasets are parameterizable so we can reuse them for each dataset sharing the same type and also across different environments.
For the dataset source, we make the database name and server name parameterizable but we leave the table name since we’re going to use dynamic SQL to incrementally load the multiple table sources.
For the sink dataset, we’re again using parameters.
- The ContainerName if we want to use different containers for different projects
- The FolderPath is unique for each table and I recommend using something like “schema/tablename/inputfiles”
- The filename will be the concatenation of the table + timestamp of the pipeline execution
It is now time to create our pipeline, as a sneak peek this is what will look like the final pipeline and I will break down each component one by one.
In order to make the synapse pipeline dynamic and reusable across different environments we have to create a bunch of parameters.
- ServerName: server source
- DBName: database source
- ContainerName: where the output files are stored
- FolderPath; where the output files of each table are stored
- FileName: Name of each output file
It does not really matter what default values I’m using since these values are going to be overridden later.
Lookup: List of tables
The key to incrementally loading our data with synapse is to have a control table that will contain all the parameters to configure the delta load of our multiple tables.
Here is a basic script to create the control table, we can add more columns for more complex scenarios like specific filters to apply for specific tables, or even write SQL queries to be run dynamically.
create table [dbo].[COntrolTable]( [ID] int identity(1,1) , [ServerName] varchar(50) , [DBname] varchar(50) , [SchemaName] varchar(20) , [TableName] varchar(50) , [DateColumn]varchar(50) , [LoadType] varchar(10) , [FromDate] datetime2 , [ToProcess] varchar(3) , [CreatedDate] datetime2 , [LastUpdatedDate] datetime2 )
If we want to avoid configuring the control table for each table one by one, we can use a similar query to the one below, again there are many ways to create and configure the control table.
And I highly recommend anticipating any potential future changes on this table as all the pipeline configurations will rely on the control table.
insert into [dbo].[COntrolTable] select 'Your Server' as [ServerName] , DB_NAME() as [DBname] , s.name as [SchemaName] , t.name as [TableName] , 'LastEditedWhen' as [DateColumn] , 'delta' as [LoadType] , '1901-01-01 00:00:00' as [FromDate] , 'yes' as [ToProcess] , getdate() as [CreatedDate] , getdate() as [LastUpdatedDate] from sys.tables t inner join sys.schemas s on t.schema_id=s.schema_id where s.name='Sales'
This is an example of the output of the control table used for this post.
As we can see the DateColumn that will be used for the incremental load is always the same which is, of course, good practice but in reality, this is rarely the case.
The Lookup configuration
Here is the lookup activity which returns a JSON string containing all the values of each table that we need to incrementally load.
Note that we use the source dataset that we have previously created and we pass the values of the pipeline parameters to the dataset parameters.
The for each is used to loop through the list of tables returned by the lookup so can repeat the delta load for each table according to their parameters.
In order to fetch the values related to each table we need to pass the command “@activity(‘LkpListTables’).output.value”
The delta load activities
The first part of our synapse pipeline allowed us to retrieve all the different parameters related to each table that we want to incrementally load. Now the second part of the pipeline is to run the delta load for each table.
Get current date from
Since we only want to load the new data we need to retrieve the DateTime of the last change that occurred on the table so then we can use this value as a filter for the next run.
The date of the last change becomes the “date from” of the next run and so on.
Here is the dynamic SQL command used in the lookup to retrieve the date of the last change.
Note that I tend to wait 1 second before running the copy activity to avoid potential missing commits that would occur within the same second, this may need not be needed in your scenario. second. This is actually very unlikely to happen since I’m using milliseconds for my technical date from column but this is a habit that I have maybe a bad habit though…
Copy data – Source
The copy activity is completely dynamic since it reads the parameter values returned by the lookup “LKPListTables”.
The dynamic SQL command is as follows:
All the variables are given by the lookup “LKPListTables” except the filter “<=” which is given by the lookup “LkpCurrentWaterMark”.
Note that I only filter on a time interval in this synapse pipeline but we can perfectly add more filters and make everything dynamic as long as we are using the control table.
Copy data – Sink
The parquet files are going to be copied into the data lake that we have previously configured.
Each parquet file is created into a specific folder corresponding to the table name and we saw in this post above the file name is the combination of the table name and the timestamp of when the file was created. This should guarantee that each file has a unique name but also help to keep track of when the file was generated.
SP to update the watermark value
Finally, before running the synapse pipeline the last thing to do is to update the watermark value for the next run. In order to update the DateFrom to use as a filter for the next run I use a simple stored procedure and pass as a parameter the date of the latest changed previously retrieved by the lookup “LkpCurrentWaterMark”
CREATE PROC [dbo].[SynapseUpdateWaterMark] @DBName varchar(50), @SchemaName varchar(50), @TableNAme varchar(50), @DateFrom datetime2 AS BEGIN SET NOCOUNT ON; UPDATE t SET FromDate=@DateFrom , Tec_UpdatedDate=GETDATE() FROM dbo.SynapseLoad t WHERE DBNAme=@DBName AND SchemaName=@SchemaName AND TableName=@TableNAme RETURN 0 END; GO
Run the synapse pipeline
Before running the pipeline or adding a trigger to schedule it, you will need to pass the parameters. Not that we can add multiple triggers to the same pipeline and thus pass different parameters for each run.
After the first run the pipeline will create the list of folders corresponding to the list of tables that we incrementally load to the data lake.
In each folder, a folder called “InputFiles” is created and then the parquet files are generated each time the pipeline will run. Note that depending on the configuration and the size of the data to load, more than one file can be created.
In this post, we created a simple synapse pipeline to incrementally load multiple tables into parquet files.
Even if this pipeline is simple we can easily tweak it and make it more complex with some specific rules or specific filters to be applied on some tables.
It’s been a while since I wanted to write this post and this post had stayed in my draft post for already some months. So as done is better than perfect, I preferred to share it as it is now, and hopefully, it will still be helpful and clear enough and I should plan to revisit it soon.