Configuration-driven data pipeline

Solution ideas

This article is a solution idea. If you'd like us to expand the content with more information, such as potential use cases, alternative services, implementation considerations, or pricing guidance, let us know by providing GitHub feedback.

This article is a solution idea for creating and maintaining a data pipeline by using a configuration file. The file can, for example, contain a configuration that's specified by using JSON format. The file defines the ingestion, transformations, and curation of the data. It's the only file that needs to be maintained for data processing, so the business users or operations team can manage the data pipeline without the aid of a developer.

Apache®, Apache Spark®, and the flame logo are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. No endorsement by The Apache Software Foundation is implied by the use of these marks.

Architecture

Download a PowerPoint file of this architecture.

Dataflow

  1. Configuration: The metadata of the pipeline defines the pipeline stages, data sources, transformations, and aggregation logic. There are three pipeline stages: staging, standardization, and serving. The aggregation logic can be implemented in Spark SQL.
  2. REST APIs: The REST APIs are used to manage the pipeline configuration. The business users or operations team can manage the pipeline configuration by using a web UI that's based on the API layer.
  3. Framework: The framework loads the configuration files and converts them into Azure Databricks jobs. It encapsulates the complex Spark cluster and job runtime and presents an interface that's easy to use so that the business users can focus on business logic. The framework is based on PySpark and Azure Delta Lake. It's created and managed by data engineers.

Components

  • Azure Data Factory loads external data and stores it in Azure Data Lake Storage.
  • Azure Event Hubs accepts streaming data from various sources. Azure Databricks loads streaming data directly from Event Hubs.
  • Data Lake Storage is the data storage layer of the staging, standardization, and serving zones.
  • Azure Databricks is the calculation engine for data transformation. The transformation logic is implemented with Spark SQL.
  • Azure Databricks Client Library provides a convenient interface to the Azure Databricks REST API, which is used to deploy the Azure Databricks job that's converted from the configuration file.
  • Azure Functions provides a way to implement the API layer that's used to create and deploy Azure Databricks jobs.
  • Azure Pipelines is the Azure Devops service that provides the pipelines that automate the builds and deployments in the framework. The framework can be built as a wheel (.whl) file and published to Azure Databricks clusters as a library.
  • Power BI is a collection of software services, apps, and connectors that can work together to turn unrelated sources of data into coherent, visually immersive, and interactive insights.

Scenario details

This section has additional information about applying and implementing configuration-driven pipelines.

The medallion architecture

SaleBestseller No. 1
HP Elite Desktop PC Computer Intel Core i5 3.1-GHz, 8 gb Ram, 1 TB Hard Drive, DVDRW, 19 Inch LCD Monitor, Keyboard, Mouse, Wireless WiFi, Windows 10 (Renewed)
  • This Certified Refurbished product is tested and...
  • HP Elite 6200 Small Form Factor Desktop PC, Intel...
  • Includes: USB Keyboard & Mouse, WiFi Adapter,...
  • Ports: USB 2.0, DisplayPort, VGA, PS/2 keyboard,...
  • Operating System: Windows 10 64 Bit –...
SaleBestseller No. 2
HP 2022 Newest All-in-One Desktop, 21.5" FHD Display, Intel Celeron J4025 Processor, 16GB RAM, 512GB PCIe SSD, Webcam, HDMI, RJ-45, Wired Keyboard&Mouse, WiFi, Windows 11 Home, White
  • 【High Speed RAM And Enormous Space】16GB DDR4...
  • 【Processor】Intel Celeron J4025 processor (2...
  • 【Display】21.5" diagonal FHD VA ZBD anti-glare...
  • 【Tech Specs】2 x SuperSpeed USB Type-A 5Gbps...
  • 【Authorized KKE Mousepad】Include KKE Mousepad

In the medallion architecture that Azure Databricks introduced, a data pipeline has three stages: staging, standardization, and serving.

Stage Description
Bronze/Staging The data from external systems is ingested and stored. The data structures in this stage correspond to the table structures as they are on the source systems, along with additional metadata columns like the date and time of the load, the process ID, and so on.
Silver/Standardization The staged data is cleansed and transformed and then stored. It provides enriched datasets that are suited for further business analysis. The master data can be versioned with slowly changed dimension (SCD) patterns, and the transaction data is deduplicated and contextualized by using master data.
Gold/Serving The data from the standardization stage is aggregated and then stored. The data is organized in consumption-ready, project-specific databases that are provided by services such as those in the Azure SQL family.

Enterprise data warehouses can have large numbers of existing data pipelines. The data pipelines are usually managed by data engineers who write and maintain the code that implements data ingestion, data transformation, and data curation. The code is usually written in Spark SQL, Scala, or Python, and stored in a Git repository. The data engineers need to maintain the code and deploy the pipelines with complicated DevOps deployment pipelines. As business requirements increase and change, the need for engineering effort can become a bottleneck to data pipeline development. As a consequence, the business users or operations teams can wait for a long time to get the data they need.

This solution proposes a data pipeline that's driven by a configuration file. The configuration file can be in JSON format. It specifies the data ingestion, transformation, and curation processes. The configuration file is the only file that needs to be maintained for data processing. In this way, business users or operations teams can maintain the data pipeline without help from developers.

Potential use cases

  • In a manufacturing company, the factory operator wants to ingest all recipe data from the on-premises servers in its factories, which number more than 30. It provides a curated view of the data to ensure that it's complete, so that production can start. The factories can have different data schemas. Configuration-driven data pipelines can simplify the data ingestion and standardization process.
  • A solution provider hopes to build a common data platform for customers. The platform should significantly reduce development efforts by engineers and the need to handle various data sources, data schemas, and transformation logic. This helps the solution provider to onboard the customers rapidly.

Deploy this solution

Configuration file example

The metadata of the pipeline defines the pipeline stages, data sources, transformations, and aggregation logic. Here's an example of a configuration file:

{
  "name": "fruit_data_app",
  "staging": [
    {
      "name": "sales",
      "format": "csv",
      "target": "raw_sales",
      "location": "sales/",
      "type": "batch",
      "output": ["file", "view"],
      "schema": {...}
    },
    {
      "name": "price",
      "format": "csv",
      "target": "raw_price",
      "location": "price/",
      "type": "batch",
      "output": ["file", "view"],
      "schema": {...}
    }
  ],
  "standardization": [
    {
      "name": "fruit_sales",
      "sql": "select price.fruit, price.id, sales.amount, price.price from raw_sales sales left outer join raw_price price on sales.id = price.id",
      "target": "fruit_sales",
      "type": "batch",
      "output": ["file", "view"]
    }
  ],
  "serving": [
    {
      "name": "fruit_sales_total",
      "sql": "select id, fruit, sum(amount*price) as total from fruit_sales group by id, fruit order by total desc",
      "target": "fruit_sales_total",
      "type": "batch",
      "output": ["table", "file"]
    }
  ]
}

There are three pipeline stages: staging, standardization, and serving. The aggregation logic can be implemented in Spark SQL. In this example, there are two Spark SQL procedures. The one in standardization merges price and sales data. The one in serving aggregates the sales data.

Framework code snippets

New
HP Stream 14 inch Laptop for Student and Business, Intel Quad-Core Processor, 16GB RAM, 64GB eMMC, 1-Year Office 365, Webcam, 12H Long Battery Life, Lightweight & Slim Laptop, Wi-Fi, Win 11 H in S
  • 【Processor】Intel Celeron N4120, 4 Cores & 4...
  • 【Display】14.0-inch diagonal, HD (1366 x 768),...
  • 【Storage】16GB high-bandwidth DDR4 Memory (2400...
  • 【Connectivity】1 x USB 3.1 Type-C ports, 2 x...
  • 【System】Windows 11 Home in S mode operating...
New
HAJAAN SuperX Gaming PC | Liquid Cooled | GeForce RTX 4060 8GB | AMD Ryzen 5 5600G | 32GB DDR4 | 1TB SSD | Windows 11 Pro | WiFi | Bluetooth - Black
  • Configured with AMD Ryzen 5 5600G Processor and...
  • 8GB GeForce RTX 4060 GDDR6 dedicated graphics card...
  • Liquid cooling system keeps internal components at...
  • Integrated PCIE Wi-Fi provides excellent wireless...
  • Includes USB Gaming RGB Mechanical Keyboard, Mouse...
New
Lenovo 2023 IdeaPad 1i Essential Laptop Computer, Intel Core i5-1235U 12th Gen, 15.6" FHD Anti-Glare Display, (16GB DDR4 RAM, 512GB SSD), HDMI, Bluetooth, Windows 11, Cloud Grey, W/GaLiMu
  • ✔【Display】 15.6" FHD (1920x1080) TN 220nits...
  • ✔【Memory & Storage】RAM Size 16GB 3200MHz...
  • ✔【Connectivity】 1x USB 2.0, 1x USB 3.2 Gen...
  • ✔【Processor & Graphics】 12th Generation...
  • ✔【Operating System】 Windows 11

Here are some code snippets of the framework, which runs Spark jobs based on the configuration file.

  • In the staging zone, it reads raw data from the source system and stores it in the staging zone of Data Lake Storage. The input and output are defined in the configuration file as follows:
    df = spark \
        .readStream \
        .format(format) \
        .option("multiline", "true") \
        .option("header", "true") \
        .schema(schema) \
        .load(landing_path+"/"+location)    
    
    if "table" in output:
        query = df.writeStream \
            .format(storage_format) \
            .outputMode("append") \
            .option("checkpointLocation", staging_path+"/"+target+"_chkpt") \
            .toTable(target)
    
  • In the standardization zone, it transforms data by using Spark SQL that's defined in the configuration file, and outputs the result to the standardization zone of Data Lake Storage.
    df = spark.sql(sql)
    if type == "streaming":
        query = df.writeStream \
            .format(storage_format) \
            .outputMode("append") \
            .option("checkpointLocation", standardization_path+"/"+target+"_chkpt") \
            .toTable(target)
    
  • In the serving zone, it aggregates data by using Spark SQL that's defined in the configuration file, and outputs the result to the serving zone of Data Lake Storage.
  df = spark.sql(sql)
  if type == "streaming":
      query = df.writeStream \
          .format(storage_format) \
          .outputMode("complete") \
          .option("checkpointLocation", serving_path+"/"+target+"_chkpt") \
          .toTable(target)

Contributors

Principal authors:

Next steps

Original Post>