Google Data Fusion: Using Cloud Storage file uploads to trigger Data Fusion ETL Pipelines

Justin Taras
6 min readJul 14, 2021

TL;DR Google Cloud Functions and Google Data Fusion’s REST API can be jointly used to deliver event based execution.

Google Data Fusion Pipeline Execution

In most typical deployments of Data Fusion, Pipelines are launched using the following three methods.

  • In-built Data Fusion scheduler
  • Google Cloud Composer (Fully managed Apache Airflow)
  • 3rd party scheduler through Data Fusion REST API’s

But what about event based scenarios? What happens if a pipeline needs to start based upon some event like a file arriving in a Google Cloud Storage (GCS) bucket. Instead of waiting until the next scheduled pipeline run how can the pipeline be automatically triggered?

Take this simple Data Fusion pipeline:

Simple Data Fusion Batch Pipeline

When this pipeline runs, the GCS source plugin looks for any new files in the designated bucket, passes the file into Wrangler where it is parsed and formatted, and then written to Google BigQuery. This is a very common ETL pattern that’s used extensively in Google Cloud. How can this simple pipeline be invoked in an event driven manner?

This article will explore how to take the aforementioned pipeline and make it event driven using Google Cloud Functions.

Google Cloud Functions Setup

Google Cloud Functions can be deployed with a variety of event trigger types that can be used to invoke a function. In this example, the Finalize/Create trigger event will be used because it signals when a new file is written to a bucket or a new version of a file is created. This can be used to identify when a new file has landed and is ready for processing by Data Fusion.

The Cloud Storage Event Trigger

Google Cloud Functions also enables developers to create environment variables to modify the configuration of the function. Instead of hardcoding values into the function code itself, environment variables can be used to pass those values in at function runtime. In this example, the core components needed for interacting with the Data Fusion API and PubSub API are included as variables. For reference, don’t use this method if you need to pass in sensitive information like passwords or access keys. For managing sensitive data, check out Google Secret Manager.

In the Cloud Function Editor, select the runtime of the function (python 3.7).

When you select the python runtime, two files will be generated:

  • main.py
  • requirements.txt

The main.py is the file that will contain the python code that will power the function. The requirements.txt file will have the python library dependencies that are required for the function to execute. In this example the only library that will ben needed is the PubSub client. Most of the other core python libraries that are required are already installed on the function image.

google-cloud-pubsub

The run_job definition is the entry point for the function and is the definition that will be called when function is executed. Make sure it’s set or otherwise the function won’t work.

Remember to set the function Entry Point.

The Cloud Function Code

The python example code has 2 definitions. The get_access_token function is used to generate an access token for the Data Fusion API’s. This is accomplished using the cloud function instance metadata server to retrieve the access token with an enforced scope.

The run_job definition is the main entry point for the function and is where the API calls to Data Fusion are produced. The data from the cloud storage event will be passed into the run_job definition via the data parameter. This is a JSON object that will need to be parsed to retrieve the bucket and file names needed.

Passing File Details to the Data Fusion Pipeline

With the function being triggered by files landing on GCS, the file details need to be passed to Data Fusion in order for the pipeline to know what file to pickup and process. This is done using Macros. In the pipeline example, data is read from GCS, wrangled in Wrangler, and written to BigQuery. A macro is placed in the GCS sink plugin so it can be programmatically changed with each pipeline run.

${my-file} macro in my GCS plugin

When the pipeline is invoked via REST, the macro details need to be included in the API call. The my-file key in the struct below is the macro name in the pipeline.

data = '{"my-file":' + default_file_name +'}'

Below, the macro and its corresponding value are passed as a payload in the API call. When Data Fusion receives the API call, they payload key/value pairs are matched with the pipelines macros and the values are substituted.

r1 = requests.post(post_endpoint,data=data,headers=post_headers)

Capturing Pipeline Run Details

Once the Data Fusion pipeline been triggered, a unique id is generated for that individual pipeline run. The example function code calls the Data Fusion API to retrieve the pipeline run details. These details are written to PubSub so that a downstream service can consume them for monitoring purposes. Cloud Functions are short lived so monitoring of the Data Fusion pipeline execution shouldn’t be handled by the function itself. See lines 63–80 for the pipeline run data extraction and publication to PubSub.

Invoking the Function

Once the function is deployed just drop a file in the bucket that the Cloud Function event trigger is listening on. The logs for the deployed function should indicate a successful function execution.

Logs from Function Execution, Status ‘ok’

A quick look in the Data Fusion UI for the deployed pipeline that was executed shows that the pipeline is in “Running State”.

Deployed Data Fusion pipeline “Running”

The pipeline run details can be found in the assigned PubSub subscription. Below is the event from the recently executed pipeline. The “runid” is how to uniquely identify the pipeline from other pipeline executions.

Data Fusion pipeline details written to PubSub

Final Thoughts

Google Cloud Functions is an interesting way to make data fusion pipelines even driven. For pipelines that consume data from GCS on a regular basis, Cloud Functions can accelerate downstream delivery of data without manual processes or relying on scheduling, so long as the pipeline isn’t awaiting any upstream dependencies.

While Cloud Functions does a great job of of starting the the pipelines, additional work is needed to ensure that the pipelines complete successfully and don’t encounter any failures. This would need to be managed by an outside system as this is out of the realm of functions.

--

--

Justin Taras

I’m a Google Customer Engineer interested in all things data. I love helping customers leverage their data to build new and powerful data driven applications!