Metadata Driven Fabric Pipelines (1 of 2) - Setup

Metadata Driven Fabric Pipelines (1 of 2) - Setup

Intro & Context

Over the last couple of months I've been working on a few projects where the question of choosing the right components of Fabric to combine has come up. Typically, my personal preference is pro-code first, but as well as wanting to realise some of the benefits of the pre-built connectors in Fabric for extract and load processes, many users would often prefer to not be in Python or PySpark notebooks all day! The addition of CI/CD support for Gen 2 Dataflows was a step in the right direction for making them an easy recommendation, but I've noticed issues with preview versions either not being able to be invoked via pipelines or not connecting to staging lakehouses post CI/CD processing. Unfortunately, only preview versions are viable through CI/CD, so I was exploring some other options and wanted to share how I've used metadata to drive pipelines and copy activities as I've seen done for notebooks in the past.

It's worth noting that there are many examples of end to end scenarios and examples online, and this isn't to say copy tasks are the way to go, but rather to provide some guidance on paramaterising the pipeline itself.

Lastly, in order to enable people to re-create the content here, I'm splitting this into two parts. This blog is working through the initial setup of assets and structure, with the following blog, that I will link to retrospectively, covering the dynamic content and parameters. 

Pre-Reqs & Notes

The only real pre-requisite here is to have a Fabric (trial or otherwise capacity), and that's it! But the asset structure includes; a Sample Warehouse, a Sample Lakehouse, a few stored procedures, and sample data from the diabetes, retail (fact_sale), and nyctaxi databases available via the copy assistant. For context, the downstream processes here include combination of Lakehouse and Warehouse purely because I was conducting some testing for a customer whereby the staging option when writing direct to a warehouse from on-premises mandated external (ADLSG2), hence we wrote to a staging lakehouse first, and we also required SQL functionality not currently supported for lakehouse SQL endpoints. In production, I would consider doing this differently as you are running two jobs rather than one, and storing two copies of data. But for this purpose, given the jobs were only a few minutes and we didn't have capacity challenges, the constraints weren't of concern.

Creating the data assets

I'm briefly going to talk through the assets I created and how it could be replicated, but it's worth noting that this is a simplified version as it wouldn't be required to replicate my end state job. First, create the blank workspaces - a Dev, Test, and Prod workspace with any relevant prefixes. Within Dev, add a new lakehouse and warehouse (I've titled these "Sample Warehouse" and "SampleLakehouse"). The ETL processes here are simple copy tasks with no transformation as it was more to prove the concept of metadata driven pipelines.

Once provisioned, I added a source data pipeline (mine is on Github here). I created this by adding copy tasks for 3 sample data sets because I wanted to create something that looked more like a real scenario in moving multiple tables at once.

Create a new data pipeline (new item, data pipeline), then add sample data. I first did this through a copy activity for sample data from the NYC Taxi data set and loaded to the sample lakehouse:

For the purposes of being able to manage this across a number of tables at once, I created copy activities to ingest the diabetes, NYC Taxi, and fact_sale from Retail data sets. Ultimately, this looks something like the below:

The next step was to create a pipeline to move this data from the lakehouse to warehouse. This started by creating another pipeline (named Transformed Pipeline), then navigating through a copy assistant task and adding tables as new, but I noticed things getting a little funny running this multiple times and record counts didn't add up. The initial solution for testing was to combine a copy task creating data as new with a truncate stored procedure then create as existing. Instead, I have created a stored procedure to create the empty warehouse tables. Be sure to run the drop table and create table code once via the warehouse (i.e. below, excluding create proc as. Alternatively, create the procedure then run it via SQL using eexec dbo.storedprocname), then create the stored procedure before creating a new pipeline:

	CREATE PROC [dbo].[sp_create_table_schemas]
	AS
	BEGIN
	DROP TABLE IF EXISTS [dbo].[fact_sale]
	DROP TABLE IF EXISTS [dbo].[nyctlc]
	DROP TABLE IF EXISTS [dbo].[diabetes]
	CREATE TABLE [dbo].[fact_sale]
	(
	    [SaleKey] [bigint] NULL,
	    [CityKey] [int] NULL,
	    [CustomerKey] [int] NULL,
	    [BillToCustomerKey] [int] NULL,
	    [StockItemKey] [int] NULL,
	    [InvoiceDateKey] [datetime2](6) NULL,
	    [DeliveryDateKey] [datetime2](6) NULL,
	    [SalespersonKey] [int] NULL,
	    [WWIInvoiceID] [int] NULL,
	    [Description] [varchar](8000) NULL,
	    [Package] [varchar](8000) NULL,
	    [Quantity] [int] NULL,
	    [UnitPrice] [decimal](18,2) NULL,
	    [TaxRate] [decimal](18,3) NULL,
	    [TotalExcludingTax] [decimal](18,2) NULL,
	    [TaxAmount] [decimal](18,2) NULL,
	    [Profit] [decimal](18,2) NULL,
	    [TotalIncludingTax] [decimal](18,2) NULL,
	    [TotalDryItems] [int] NULL,
	    [TotalChillerItems] [int] NULL,
	    [LineageKey] [int] NULL
	)
	CREATE TABLE [Sample Warehouse].[dbo].[nyctlc]
	(
	    [vendorID] [int] NULL,
	    [lpepPickupDatetime] [datetime2](6) NULL,
	    [lpepDropoffDatetime] [datetime2](6) NULL,
	    [passengerCount] [int] NULL,
	    [tripDistance] [float] NULL,
	    [puLocationId] [varchar](8000) NULL,
	    [doLocationId] [varchar](8000) NULL,
	    [pickupLongitude] [float] NULL,
	    [pickupLatitude] [float] NULL,
	    [dropoffLongitude] [float] NULL,
	    [dropoffLatitude] [float] NULL,
	    [rateCodeID] [int] NULL,
	    [storeAndFwdFlag] [varchar](8000) NULL,
	    [paymentType] [int] NULL,
	    [fareAmount] [float] NULL,
	    [extra] [float] NULL,
	    [mtaTax] [float] NULL,
	    [improvementSurcharge] [varchar](8000) NULL,
	    [tipAmount] [float] NULL,
	    [tollsAmount] [float] NULL,
	    [ehailFee] [float] NULL,
	    [totalAmount] [float] NULL,
	    [tripType] [int] NULL
	)
	CREATE TABLE [Sample Warehouse].[dbo].[diabetes]
	(
	    [AGE] [bigint] NULL,
	    [SEX] [bigint] NULL,
	    [BMI] [float] NULL,
	    [BP] [float] NULL,
	    [S1] [bigint] NULL,
	    [S2] [float] NULL,
	    [S3] [float] NULL,
	    [S4] [float] NULL,
	    [S5] [float] NULL,
	    [S6] [bigint] NULL,
	    [Y] [bigint] NULL
	)
END

Then, I added a copy task through the copy assistant for populating tables in the warehouse. Select the sample lakehouse and 3 relevant tables as the data source and sample warehouse as destination. For each table, select "Load to existing table" (if this doesn't show, make sure you've run the above stored procedure) and click next. Then enable staging with workspace as the data store type. I unticked the start data transfer immediately box, then clicked okay. This is just personal preference, but I like giving a once-over before saving and running myself. Make sure to add a stored procedure activity and connect it (on success) to your copy task.

Deployment Pipeline Setup

The last pre-requisite before getting in to the metadata driving this, is setting up the deployment pipeline. Click the workspaces ribbon (left hand side), and then new deployment pipeline. I've left the deployment stages as-is with development, test, and production, but you can tailor if needed. Once created, attach relevant workspaces to each stage and make sure to press the green tick to assign them.

You'll see some additional items in mine, these haven't been created just yet but are part of the next bit of development. For now, just make sure to select the sample lakehouse and warehouse, and both source and transformed pipelines then deploy to the test workspace. This might take a few minutes on the first deployment, but it usually gets quicker for subsequent deployments when the resources you're changing (e.g. Warehouse) already exist in the target workspace.

I'm not going to run anything in the test workspace, I just wanted to work through this to show what it actually looks like without any metadata driving the deployment. If you open the transformed pipeline in the test workspace, you'll see that the destination is still set to the warehouse in the development workspace.

That brings part 1 to a close, for part 2 of this blog I will walk through the necessary changes to paramaterise inputs that will automatically associate tasks with the current workspace's assets.