page_type | languages | products | |||||
---|---|---|---|---|---|---|---|
sample |
|
|
End to end sample of data processing to be viewed in pbi.
Contoso is an organization with multiple factories and multiple data models. Each factory upload data periodically to a storage account. Contoso is looking for cost-effective solution, which will be able to provide their analytical team a better view of the data.
Contoso already developed a component named ControlBox, its capabilities (out of scope for this sample) are:
-
Authenticate and authorize factories.
-
Provide factories with SAS token, used by the factory to upload periodic data.
-
Register new file uploaded in the storage account in a control table.
-
Update the control table each time a file is processed.
The following diagram illustrates the solution implemented by Contoso. It leverages serverless computing for data movement, cleansing, restructure and reporting.
As part of the sample we included bicep code, which will create the minimum required resources for it to run.
The following are the prerequisites for deploying this sample :
Note: Using PowerBI is an optional way to visulize data.
-
Create a resource group in which the resources would be created.
-
Clone or fork this repository.
-
Edit
deploy/bicep/param.json
file and provide your values, they should be self explained.Note: The
suffix
will be used to create the synapse and the storage instances with a unique namespace in Azure. If the suffix is already in use, please choose another one.Another Note: The default setup is for publicly open solution. This is the reason start and stop IPs allow for any IP address access to Synapse.
-
Open a command line, go to 'sample-mdw-serverless/deploy/bicep' and run
az deployment group create --resource-group <your rg name> --template-file main.bicep --parameters @param.json
on the 'bicep' folder. This operation may take a few minutes to complete. -
Open the newly created Synapse workspace.
-
Point the Synapse workspace to the cloned/forked repository as shown in this document.
-
In the workspace, go to Manage > Linked Services > medallion_storage > Parameters > suffix and the same value you gave in the bicep
param.json
. Once you update it would be reflected in all affected integration datasets. -
Run the 'Copy Data Samples' pipeline. This will copy the control file and the data samples to your local repository. See details.
Note: You can use the
Debug
to get started quickly, or setup a trigger as described here. -
Run the 'Process Factories Data'. This will run the Bronze to Silver transformations per factory and per data model. See details.
-
Go to and Develop > SQL Scrips > Factories and open the
InitDB
script. -
Run the first commands against the
master
database. -
Run the remaining commands by order against the newly created DB. This pipeline will run the silver to gold data transformations. See details.
-
Open the
Create-External-Tables
script, replace thesuffix
with the one used throughout the sample and the SAS token to access your storage account. Run the commands by order. -
Open Power BI Desktop and follow the steps in this document to connect your Gold Data Lake storage to Power BI Desktop.
-
Optionally, you can set up an automated DevOps pipeline using these instructions.
The sample files consist of daily dropped data in zip format. Each zip file contains a data file with a JSON per line.
{"dataModelName":"data_model_1","operation":"U","factory":1354010702,"lineId":14871,"date":"2022-06-22T00:00:00","feature1":1,"dim":73,"yield":37307}
{"dataModelName":"data_model_1","operation":"U","factory":1354010702,"lineId":14872,"date":"2022-06-22T00:00:00","feature1":1,"dim":73,"yield":37306}
{"dataModelName":"data_model_1","operation":"U","factory":1354010702,"lineId":14873,"date":"2022-06-23T00:00:00","feature1":1,"dim":73,"yield":37305}
{"dataModelName":"data_model_1","operation":"U","factory":1354010702,"lineId":14874,"date":"2022-06-23T00:00:00","feature1":1,"dim":73,"yield":37304}
{"dataModelName":"data_model_1","operation":"U","factory":1354010702,"lineId":14875,"date":"2022-06-23T00:00:00","feature1":1,"dim":73,"yield":37303}
{"dataModelName":"data_model_1","operation":"U","factory":1354010702,"lineId":14876,"date":"2022-06-24T00:00:00","feature1":1,"dim":73,"yield":37302}
{"dataModelName":"data_model_1","operation":"U","factory":1354010702,"lineId":14877,"date":"2022-06-24T00:00:00","feature1":1,"dim":73,"yield":37307}
{"dataModelName":"data_model_1","operation":"U","factory":1354010702,"lineId":14878,"date":"2022-06-24T00:00:00","feature1":1,"dim":73,"yield":37300}
A control table is used to store information about the data uploaded into browse layer. This table stores the location of all the uploaded files per factory, the data model, uploaded date and if the file was already processed or not.
FactoryID | DataModelName | FileLocation | UpdateDate | Processed |
---|---|---|---|---|
1354010702 | data_model_1 | factory=1354010702/dataModelName=data_model_1/y=2022/m=06/d=25 | 2022-06-25 | false |
1354010702 | data_model_2 | factory=1354010702/dataModelName=data_model_2/y=2022/m=06/d=25 | 2022-06-25 | true |
1353534654 | data_model_1 | factory=1353534654/dataModelName=data_model_1/y=2022/m=06/d=26 | 2022-06-26 | true |
... | ... | ... | ... | ... |
Every time a new file lands in the bronze layer, or it is processed, this table must be automatically updated by another process (out of scope for this sample).
Note: To keep this sample simple, the control information was hardcoded in a JSON file named dropped_files.json (manual edit to the control JSON file can be done directly from the portal). However, for production this is an anti-pattern and we strongly advise using a metadata table and a process to automatically update it.
The data from the different factories lands in the same storage account. The storage account has a container per layer of a Medallion Architecture, bronze, silver and gold. Inside each container there is a folder per factory, per data model and per day. See the following example:
<your_storage>/bronze/factory=1782/dataModelName=data_model_1/y=2022/m=07/d=24
In the Synapse workspace, a Lookup activity will read the control table information. There is a ForEach() per data model that will iterate over all factories with unprocessed files. For each factory and data model the relevant business logic would be applied. To keep this sample more generic, the files are just copied from bronze to silver and converted to a parquet format.
Inside each ForEach() activity, there is a IfCondition() activity, which filters the unprocessed data for specific data model.
Each type of file will have to be mapped at least once. While this process might be tedious, you will need to spend time on it, to ensure that all the necessary fields are assigned to right type and saved during the sink. Additional fields (e.g calculated/derived) can also be added in this tab.
As for time, in order to extract the nested JSON values you will have to map these values to a type in the Mapping tab of the Copy() activity.
In some cases daily files may contain previous dates of data. In such scenarios it is recomended to fix alter the directory structure, and reflect the right location/partition.
Read more on this function here.
When calling the azure function ('bronze2silver - Azure Function' Pipeline), you would need to have the following post payload defined in the activity, using the dynamic content.
@concat('{',
'"file_name"',':','"',item().FileLocation,'/daily.zip"', ',',
'"source_container"',':','"',pipeline().parameters.source_container,'"', ',',
'"target_cs"',':','"',pipeline().parameters.target_container,'"', ',',
'"source_cs"',':','"',activity('Get CS from AKV').output.value,'"', ',',
'"target_cs"',':','"',activity('Get CS from AKV').output.value,'"',
'}'
)
Alternatively to the Azure Fuction, there is also the option to leverage a Notebook ('bronze2silver - Notebook' Pipeline). The code also addresses the scenario where it is recomended to fix alter the directory structure, and reflect the right location/partition. This option is recommended when the amount of data to be processed is big (eg. initial load).
The linked service to the storage account is used to write the files to the silver container and save the data in a parquet format. The original directory structure is kept.
The parquet files can be queried using Synapse Serverless SQL Pool. See the following example:
select *
FROM
OPENROWSET(
BULK 'https://<storage-account-name>.dfs.core.windows.net/<container>/<folder>/**',
FORMAT = 'PARQUET'
) AS [result]
As described in this document there are few initialization activities. In the following sections, a Serverless SQL pool is used.
-- Create a DB
CREATE DATABASE <db_name>
-- Create Master Key (if not already created)
CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<password>';
-- Create credentials
CREATE DATABASE SCOPED CREDENTIAL [factories_cred]
WITH IDENTITY='SHARED ACCESS SIGNATURE',
SECRET = ''
In order to create SAS token, you can follow this document. Alternate solution in case you want one scoped credentials that can be used for the entire storage account. This can be created using the portal:
- Click on 'Shared Access Signature' in the Security + Networking blads:
- Select required operation, IP restrictions, dates etc:
The following statement needs to be executed once per workspace:
IF NOT EXISTS (SELECT * FROM sys.external_file_formats WHERE name = 'SynapseParquetFormat')
CREATE EXTERNAL FILE FORMAT [SynapseParquetFormat]
WITH ( FORMAT_TYPE = PARQUET)
GO
The following is creating an external data source, which will host the gold tables.
IF NOT EXISTS (SELECT * FROM sys.external_data_sources WHERE name = 'gold')
CREATE EXTERNAL DATA SOURCE [gold]
WITH (
LOCATION = 'abfss://<gold container>@<storage account>.dfs.core.windows.net'
)
GO
Finally lets make use of the resources and data created, by creating the external table, this sample is essentially coping the entire content of all parquet files into a single table, this is the place where additional aggregations, filtering can be applied.
CREATE EXTERNAL TABLE table_name
WITH (
LOCATION = '<specific location within the gold container>/',
DATA_SOURCE = [gold],
FILE_FORMAT = [SynapseParquetFormat]
)
AS
select *
FROM
OPENROWSET(
BULK 'https://<storage account>.dfs.core.windows.net/<silver container>/<folder>/**',
FORMAT = 'PARQUET'
) AS [result]
After this activity is completed, you can access the table using the serverless SQL pool, or from Power BI.