Cloud Data Fusion: Building Config Driven Pipelines

Justin Taras
6 min readJan 28, 2022

TL;DR Cloud Data Fusion uses macros to enable developers to build configuration driven data pipelines.

Macros

Macros are variables within Data Fusion that can be set at runtime when the pipeline runs. This allows developers to configure pipelines at runtime instead of hard coding values. If a configuration changes, like moving a pipeline from development to production, a macro can be an easy to handle those changes by swapping out system information at runtime. While there are several ways of setting a macro value at runtime, a common way is through the Argument Setter Action Plugin.

Using the Argument Setter Plugin

The argument setter is a type of action plugin that takes inputs from databases or cloud storage and uses their outputs to source values for macros. This allows developers to configure elements of the pipelines external to the pipeline itself. Take the scenario of adding a new column to a pipeline. If the input and output schemas are stored as macros as well as the wrangler directives, the pipeline can be updated by just changing the macro argument file. Take this simple pipeline as an example:

Simple Config Driven Pipeline

The pipeline itself is super simple. The GCS Plugin reads a file, parses it with wrangler and loads to BigQuery.

The GCS Argument Setter plugin sources data from a user configured JSON file stored on GCS. The arguments in this file are sourced by the macros.

Here is what the argument file looks like:

{
"arguments" : [
{
"name" : "my.recipe",
"type" : "array",
"value" : [
"parse-as-csv :body ',' false",
"drop :body",
"set-type :body_3 integer",
"parse-as-simple-date :body_4 yyyy-MM-dd HH:mm:ss"
]
},
{
"name" : "bq.dataset",
"value" :"demo_data"
},
{
"name" : "input.path",
"value" : "gs://rds-sample-data/Retail_Data_Transactions.csv"
},
{
"name" : "bq.table",
"value" : "my_table_macros"
},
{
"name" : "my.schema",
"type" : "schema",
"value" : [
{"name": "body_1", "type": "string", "nullable" : true},
{"name": "body_2", "type": "string", "nullable" : true},
{"name": "body_3", "type": "int", "nullable" : true},
{"name": "body_4", "type": "timestamp", "nullable" : true}
]
}
]
}

The GCS Argument Setter is sourcing 5 different arguments for the pipeline (in order of appearance):

  • The wrangler directive
  • BigQuery dataset name
  • The input path for the CSV file to be processed
  • BigQuery table name
  • The input/output schema for the pipeline

Building the argument files can be kinda tricky. Please refer to this github documentation for examples on how to build these argument files. *Note that the name provided for each argument will be the name that is used for the corresponding macro. This is how the macros map back to the argument file.

Once the argument file has been created, load it into GCS so that it can be accessed by Data Fusion. In the Argument Setter Plugin, the location of the argument file is referenced in the PATH field.

Set the path of the argument JSON file

*Note that IF the PATH field were a macro, the path value could be passed in at runtime and could change dynamically.

In the GCS Source plugin, the Path field is sourced by a macro. The macro name is used to map it to the respective argument supplied by the argument setter plugin.

GCS Source Plugin: add macro to Path

For the wrangler plugin, click the macro button in the red box below and add the macro in the recipe text field.

Wrangler Plugin: Add macro to directives

The schema for the wrangler plugin is handled in a similar manner. The actions dropdown box will have a macro selection that will create a text box that can be used to supply the macro. The input schema of the next downstream plugin will inherit the macro schema supplied here.

Wrangler Plugin: Add macro to output schema

In the BigQuery Sink macros are supplied for the dataset and table.

BigQuery Sink Plugin: Set Macro for Dataset and Table

The output schema for the BigQuery table must also be set. Since the schema from the input doesn’t differ from the output table, the same my.schema macro is used here as well.

Use the my.schema macro for the BigQuery Output Schema

With all the macros in place, the pipeline can be previewed before deployment. Action plugins don’t run in preview mode so the macro values will have to be substituted manually. These can be set prior to previewing the pipeline and is a great way to test out changes prior to deploying a pipeline.

Add macro values when testing in preview mode

Once the preview has been validated the pipeline can be deployed and scheduled. Now you have a pipeline that can be dynamically configured without having to redeploy your pipelines! The next section will look at modifying the arguments to adapt to changing pipeline requirements.

Loaded data in BigQuery

Updating the Argument Configuration

Now what happens if the source file changes? Let’s take the scenario presented earlier of a new field being added to a source file.

Below is the new job parameter JSON file that will be needed for the file changes. The changes for the new field can be found in the my.recipe, bq.table, input.path, and my.schema arguments.

{
"arguments" : [
{
"name" : "my.recipe",
"type" : "array",
"value" : [
"parse-as-csv :body ',' false",
"drop :body",
"set-type :body_3 integer",
"parse-as-simple-date :body_4 yyyy-MM-dd HH:mm:ss",
"set-type :body_5 integer"
]
},
{
"name" : "bq.dataset",
"value" :"demo_data"
},
{
"name" : "input.path",
"value" : "gs://rds-sample-data/Retail_Data_Transactions_addfield.csv"
},
{
"name" : "bq.table",
"value" : "my_table_macros_add"
},
{
"name" : "my.schema",
"type" : "schema",
"value" : [
{"name": "body_1", "type": "string", "nullable" : true},
{"name": "body_2", "type": "string", "nullable" : true},
{"name": "body_3", "type": "int", "nullable" : true},
{"name": "body_4", "type": "timestamp", "nullable" : true},
{"name": "body_5", "type": "int", "nullable" : true},
]
}
]
}

Job Argument Changes:

  • my.recipe: There is a new directive added to cast the new field to an integer from a string.
  • input.path: The path to the new source file. In the real world the file names would probably be the same but with a new field. Changed for better presentation of the scenario.
  • my.schema: The schema is updated to add the new field.
  • bq.table: The table name for the new table. If you want to add the field to the existing table, make sure Update Table Schema parameter is set to True.

The only change that needs to be done to this pipeline is updating the arguments file on GCS! No pipeline redeployments! With those small changes, the pipeline can now be run with the new arguments. Below is the new table populated with data from the new source file.

New table with data from updated source file

Concluding Thoughts

Using macros to programmatically configure pipelines with Data Fusion can be incredibly powerful. This model is ideal for situations where there is a very common pattern or pipeline design that is repeated for many sources. This allows developers to decouple the configuration of the pipeline from the physical pipeline. This allows a single pipeline to be reused across many different data sources. Instead of managing many individual pipelines, all that needs to be managed in this scenario are the individual configurations files.

For more information on macros and creating reusable pipelines see the following links:

https://cdap.atlassian.net/wiki/spaces/DOCS/pages/1188036697/Macros+and+macro+functions

--

--

Justin Taras

I’m a Google Customer Engineer interested in all things data. I love helping customers leverage their data to build new and powerful data driven applications!