Cloud Data Fusion: Building Job Metadata Pipelines

Justin Taras
12 min readNov 2, 2022

TL;DR Data Fusion creates a wealth of metadata related to pipeline performance and configuration. This article will explore building a pipeline for the purposes of sourcing job related metadata and metrics and storing them in BigQuery for analysis.

The Data Challenge

When a pipeline is executed in Data Fusion, metrics are collected across all the plugins utilized. Metrics like records processed, errors, process time, etc are captured and stored for each pipeline execution. Data Fusion will store these metrics at both an individual run level (run_id) as well as a summary view so you can see how the pipeline performance changes overtime.

Summary View of a Pipelines Execution

By clicking on the individual plugin for a deployed pipeline, you can see performance metrics for that plugin. These metrics are just for the individual pipeline run level, not a summary.

Individual Plugin Metrics for a Pipeline Run

But what if you wanted to do some trending analysis on pipeline run times or whether a certain stage of a pipeline is running longer now than it was in the past. These things can help determine whether you need to deploy a larger Dataproc cluster or add more splits when pulling data from a source database. Other questions may be audit related. What if I have some questions on what parameters were used in last months run of a particular pipeline? For recently launched pipelines these details can be easily acquired in the UI but when you’re dealing with hundreds to thousands of pipelines you need an easier way to retrieve this information.

What most customers end up doing is exporting these metrics to BigQuery for analysis. Data Fusion has a Rest API for basically everything so its pretty easy to retrieve this information. Customer’s usually build a Python script that takes the run id of a job and polls the metrics and jobs API and writes the response to BigQuery. If an analyst or ETL developer has any questions related to a pipeline or a pipelines performance they can go to BigQuery and write SQL to get the details they require.

I was recently thinking: What if we could build a Data Fusion pipeline for collecting these metrics and job details? A job like this could be triggered after pipeline completion (either successful or failure) and load the metadata into BigQuery in an automated way without the need for custom scripts. This could help customers that don’t want to manage any code related to their ETL processes and simplify the collection of this data in an easy programatic way.

This article will explore what this process could look like and some tips and tricks on setting it up.

The Pipeline Overview

Below you’ll find the pipeline I built to accomplish this goal of sourcing job related metadata.

Simple sourcing pipeline

The Pipeline utilizes the following plugins:

  • HTTP Source: Pull data from Metrics and Job Details API’s
  • Wrangler: Flatten the JSON response
  • Multi-Field Adder: Add additional metadata like job name, namespace, run id
  • Joiner: Join job details and job metrics together on run id
  • BigQuery Sink: write results to table

Getting an OAuth2 Refresh Token

The HTTP Source Plugin is used to interact with the REST interfaces. I’m using the built-in OAuth2 functionality of the plugin for authentication. OAuth2 supports using a refresh token that’s used for refreshing access tokens. This means that users don’t have to worry about expiring tokens or sourcing one for each pipeline run.

To get your refresh token you’ll need a Google Cloud user principal. I recommend creating a user specifically for this and not using an individual user account. To retrieve the details for the user, log into the user account via gcloud on your cloud shell.

gcloud auth application-default login

This will provide you with an access code. Click on the link and follow the workflow. Make sure you log in as your Data Fusion user.

Allow Access for you Data Fusion user

You will be prompted with a verification code.

Your Verification Code

Take the code and paste it into the prompt. You’ll then receive a file system path that has all the credentials you will need.

The path to your credentials will look something like this:

/tmp/tmp.gCtm863Cwq/application_default_credentials.json

When you open this file, you should see the following format. For the plugin you’ll need all these elements. Copy these down and save them for later.

{
"client_id": "[your client id]",
"client_secret": "[you client secret]",
"refresh_token": "[refresh token]",
"type": "authorized_user"
}

The reason why I recommend using an ETL user account for this is because the client_id is tied to the principal. Once the principal has been deleted, the refresh token will not work. So if a developer uses their own personal user and they leave the company, the jobs will fail, so be deliberate in choosing a user that won’t expire or be deleted.

Configuring the HTTP Plugin

With your newly acquired OAuth2 credentials, you can now configure your HTTP source plugin. In the first section you fill out the URL and the method of access. For the Metrics API we’ll use the POST method to retrieve our data. The API uses the following url format: DataFusionEndpoint/v3/metrics/query. You can see that I used a macro for the Data Fusion endpoint so I don’t have to hard code it across all my plugins that use it.

Build HTTP API Call

Use the following code to generate your Data Fusion endpoint.

export AUTH_TOKEN=$(gcloud auth print-access-token)
export INSTANCE_ID=[instance name]
export CDAP_ENDPOINT=$(gcloud beta data-fusion instances describe \
--location=[region] \
--format="value(apiEndpoint)" \
${INSTANCE_ID})

Next we define the request body. For this API, we’re submitting a query to retrieve our job metrics. To target a unique job run we need to provide the namespace, job name, and the job run_id. We also need to define what plugins we want to collect metrics for. The API has metrics for every plugin for each job run. At a minimum I recommend collecting metrics for all Sources, Sinks, and Wrangler Plugins.

Here is a list of the available metics to collect data from. I recommend at a minimum, use the records.in/out/error and process.time.total per plugin.

  • records.in
  • records.out
  • records.error
  • process.time.total
  • process.time.avg
  • process.time.max
  • process.time.min
  • process.time.stddev

As the last step we need to format the plugins to pull the metrics for. The format looks like the following. Note that the Plugin Label used above can be found in the Label field in each plugin configuration.

user.[Plugin Label].[metric to collect]

For example, here’s what a fully constructed metric would look like where GCS is the label of the plugin and records.out is the metric I want to collect.

user.GCS.records.out

Below you’ll find my complete Request Payload with fully defined metrics. Note in my example I’m collecting data for every metric option across all my plugins(2x) for my simple job that reads from GCS and then writes to GCS. I left the metrics part hard coded for the purposes of this exercise but in reality you would want this stored as a macro so you can pass in the pipeline plugins/metrics you want to collect data on for each pipeline execution.

{
"query": {
"tags": {
"namespace": "${namespace}",
"app": "${pipeline_name}",
"workflow": "DataPipelineWorkflow",
"run": "${runid}"
},
"metrics": [
"user.WriteGCS.records.in",
"user.WriteGCS.records.out",
"user.WriteGCS.records.error",
"user.WriteGCS.process.time.avg",
"user.WriteGCS.process.time.min",
"user.WriteGCS.process.time.total",
"user.WriteGCS.process.time.max",
"user.WriteGCS.process.time.stddev",
"user.GCS.records.in",
"user.GCS.records.out",
"user.GCS.records.error",
"user.GCS.process.time.total",
"user.GCS.process.time.avg",
"user.GCS.process.time.max",
"user.GCS.process.time.min",
"user.GCS.process.time.stddev"], "timeRange" : {
"aggregate": true
}
}
}

You can find more information about the metrics API in the Google Cloud Documentation and the CDAP documentation.

Lastly we can configure the OAuth2 authentication. Paste in your ClientID, ClientSecret and Refresh Token. You’ll also need the following elements below.

Configure OAuth2 Authentication

For the data format of the actual API response I used the text format. This allows me to parse the data downstream with Wrangler.

Use Text Format

When you preview the the pipeline, the HTTP Plugin output should look like this:

Super long JSON object

Wrangler Configuration

With the data collected we can now use Wrangle to parse the JSON to flatten it out. The parser handles nested objects pretty well so you can unnest the data to get to the elements you need. Here’s a list of my used directives.

parse-as-json :body 2
parse-as-json :body_query_series 1
parse-as-json :body_query_series 1
drop :body_query_series_grouping
parse-as-json :body_query_series_data 1
parse-as-json :body_query_series_data 1
drop :body_query_startTime
drop :body_query_endTime
drop :body_query_resolution
drop :body_query_series_data_time
rename body_query_series_metricName metricName
rename body_query_series_data_value metricValue

Note that I dropped a coupe of fields because they weren’t pertinent to the analysis I wanted to do downstream. startTime, endTime, resolution and data_time are needed when you are aggregating at a certain time grain. In our example we’re aggregating at the job level, not a a particular interval(1s,1m,1hr, etc).

The output looks really clean. We can see the source read in records was 74 and the total process time for the GCS source plugin was 2408 milliseconds. Pretty quick!

You’re probably wondering how I developed the transformations in Wrangler. Since HTTP isn’t a connection profile in Wrangler we have to use a small sample extract for building our transformations. I took the response from one of my metrics API calls, put it in a file and uploaded to wrangler. You can see it in my Wrangler image above (look for red box). I took the JSON response and I flattened it in the file so the JSON object would be on one single line. That way, when wrangler looks for newline delimiters I get the full message. With this sample record I can then define my transformations with ease.

To figure out how to build your API calls with curl to generate your sample data, check out the API reference page for Data Fusion.

Field Adder Configuration

I use the multi-field adder plugin extensively because you can add new fields to your pipeline and populate them with runtime parameters. Here I’m adding critical metadata elements that will help in future analysis. While I stopped at runid, namespace and pipeline_name you can add other pipeline elements like the run date, caller application, runtime user, where the data is sources, target tables, etc. You can really make this what you need it to be. You can even use key:value pairs for situations where you may have dynamic metadata.

Use the multi-field adder plugin to add critical metadata

The Joiner Configuration

In my pipeline I added an additional HTTP Call to the Job Details API. I thought it would be helpful to enrich my metrics data with the configuration details of the job, Dataproc cluster, and other runtime metrics like status, job start and end time. The one unifying element of both these feeds is runid so I use that to join the two streams together. Plus this prevents having to run a second pipeline to get the jobs details.

Joiner Configuration

Data In Big Query

Once the pipeline has successfully run, you should see data now present in the target table.

Some of the data I left in the original record format because of its dynamic nature. We can still work with that data by using JSON string functions in BigQuery. In the example below, I’m looking for the number of Dataproc worker nodes used in my cluster for runid fca3922c-53a9–11ed-b4f2–16a72ab8f11e.

select distinct JSON_EXTRACT(dataproc_cluster_status, '$.numNodes')  FROM `generalproject-340815.demo_data.df_metrics2` where runid='fca3922c-53a9-11ed-b4f2-16a72ab8f11e'

You can also begin building reports that look at trending for individual pipelines or pipeline components. Below is a simple SQL that looks at the max process time across the history of a single pipelines GCS sink plugin. With the date aspect, you can then pull this data into Google Data Studio for further analysis.

select
AVG(metricValue) AS avgValue
MIN(metricValue) AS minValue ,
MAX(metricValue) AS maxValue,
metricName,
CAST(job_start AS DATE) AS job_start_dt
FROM demo_data.df_metrics2
WHERE metricName='user.WriteGCS.process.time.max'
AND pipeline_name = 'GCS2GCS'
GROUP BY job_start_dt, metricName

As you can see from the example, the more plugins you have are multiplied by the number of pipeline runs. For instance, if you’re collecting metrics for a pipeline w/ 5 plugins (collecting 5 metrics) and the pipeline runs 5 times a day, a years worth of data will be over 45,000+ records for this single pipeline. With a couple of hundred pipelines you could be generating tens of millions of records a year. One way to make accessing these runs more efficiently is to use a table with partitioning or clustering.

  • Partitioning by job date will help break up the table so you can quickly run reports based on time range. This is really nice strategy when analysis over large periods of time. Note you can only partition by a date/timestamp/datetime field.
  • Clustering by runid will enable BigQuery to quickly identify the records you’re looking for. It’s another IO elimination technique by telling Big Query where to exactly look for data. This is optimal for finding the records quickly w/o unnecessary IO. Most fields are supported for clustering.
  • Use Both! You can both partition and cluster your data to make accessing as efficient and fast as possible.
  • Materialized Views can be used on pre-aggregate common query patterns. For instance, you have a report that looks at the average runtimes of a set of jobs by week. You could encapsulate that logic into a materialized view and queries would use the materialized view instead of computing the result. You can also partition and cluster on these to make them even more performant.

Note that my schema design here is purely informational and by no means a standard. It’s meant to give you a starting point on how to model your job related metadata so you can being thinking about reports and data points you want to collect and analyze.

Running the pipelines with Composer

Composer is the recommended service within Google Cloud to schedule your pipelines and more importantly in this case, makes it easy to get the runid of any previously executed pipeline. To get the runid of a previous run use the output variable of the CloudDataFusionStartPipelineOperator. The operator keeps track of the runid so it can track the state of the pipeline.

Composer DAG

Below is a snippet of the Composer DAG. Note that I have a python definition for constructing the pipeline arguments dictionary. I’m assigning the pipeline0.output variable to my runid key. Thats how we’re mapping the runid to downstream pipelines.

def set_runtime_params():
args1 = {}
args1['pipeline_name'] = PIPELINE_NAME0
args1['namespace'] = NAMESPACE
args1['endpoint'] = CDF_ENDPOINT
args1['runid'] = pipeline0.output
print(pipeline0.output)
return args1
with models.DAG(
"call_jobs_1", # Dag id
schedule_interval=None,
start_date=days_ago(1)
) as dag:pipeline0 = CloudDataFusionStartPipelineOperator(
location=REGION,
pipeline_name=PIPELINE_NAME0,
instance_name=INSTANCE_NAME,
runtime_args=args0,
success_states=[PipelineStates.COMPLETED],
pipeline_timeout=3600,
namespace=NAMESPACE,
task_id="start_pipeline",
)
pipeline1 = CloudDataFusionStartPipelineOperator(
location=REGION,
pipeline_name=PIPELINE_NAME1,
instance_name=INSTANCE_NAME,
runtime_args=set_runtime_params(),
success_states=[PipelineStates.COMPLETED],
pipeline_timeout=3600,
namespace=NAMESPACE,
task_id="get_job_metadata",
)
pipeline0 >> pipeline1

If you don’t use Composer but rely on external schedulers fear not! The REST API’s that Data Fusion exposes can be used to start pipelines. To retrieve your runid, make sure you use the bulk API to submit start jobs. The response will be the runid you can parse out and use to start the metadata extraction pipeline.

Code Samples

If you want to find a sample plugin for this example, check out my gist’s.

Concluding Thoughts

The one limitation with this approach of metadata extraction is that you have to spin up a Dataproc instance to pull this information. To minimize cost, this can be a very small cluster with minimal hardware. For instance, here is my pricing calculator of a minimal configuration of 2 nodes (1m, 1w) of n1-standard4 (You could probably just do a 1 node cluster or n1standard4 but the pricing calculator doesn’t support that config). This comes to $.51/hr. A pipeline should run in 6 or so minutes so you’re looking at $.051 per pipeline run.

With hundreds of pipelines running throughout the day, you can see how this can have an additive financial effect. One simple thing you can do to reduce cost is enable cluster reuse. With cluster reuse, Dataproc clusters that are idle from other pipeline jobs can be used to run the metadata extraction jobs so long as they use the same cluster profile. This will reduce the number of clusters that need to be provisioned.

The alternative approach to one seen in this article is to build a python script that does this same extraction. Its not too terribly difficult but for customers looking for a no-code solution, the pipeline method provided in this article may prove more valuable. In a future article we’ll explore what one of this would look like using python to collect these metrics and store in BigQuery.

Happy Pipeline Building!

--

--

Justin Taras

I’m a Google Customer Engineer interested in all things data. I love helping customers leverage their data to build new and powerful data driven applications!