Cloud Data Fusion: Reverse ETL from BigQuery to CloudSQL

Justin Taras
9 min readMay 19, 2022

TL;DR Traditional ETL is all about moving data from operational systems into a system of truth like a data warehouse. The reverse ETL model moves data out of systems of truth back into operational systems to serve actionable insights to end users. Data Fusion can serve users in both capacities and this article will look at reverse ETL pipeline out of BigQuery into a CloudSQL Postgres Database.

Reverse ETL

ETL has traditionally been used to move data from operational systems into a system of truth like a data warehouse. In the warehouse, data from different source systems are transformed and integrated into analytical data models where they are presented to BI platforms like Looker or Tableau. But what happens when a user has created an insight they’d like to share beyond traditional reporting tools? What if they want to add it back to their operational systems? For example, a business analyst created a lifetime value metric for their customers and would like to add it to their CRM application. Adding this metric into the CRM App would provide better information to their sales team. In this scenario data would need to be pulled out of the data warehouse, transformed and modeled to fit the applications data model and then loaded into the application database.

Fundamentally, is a reverse ETL pipeline any different than a traditional ETL pipeline? No. Whats different is usually the type of connections used for the target systems. Whereas ETL is all about moving data from traditional OLTP databases into data warehouses and lakes, Reverse ETL is all about moving data into end user applications like ERP’s and SaaS applications

Data Fusion Example

In this example we’ll look at a scenario where an insight developed in BigQuery needs to be delivered to an operational system running on a CloudSQL Postgres database. Below is a simple diagram of the pipeline where we source data from a BigQuery table and write it to a Postgres table on CloudSQL.

Simple Data Fusion Pipeline

The BigQuery table contains data about US State abbreviations. Below is the DDL for the table in Postgres.

CREATE TABLE state_abbreviations 
(name VARCHAR(100),
abbreviation VARCHAR(100),
PRIMARY KEY(name)
);

Configure CloudSQL Proxy

In order for Data Fusion’s Application Service to access the CloudSQL system we’ll need to deploy a Proxy VM to facilitate the connection. This is primarily because both Data Fusion and CloudSQL are deployed in tenant projects that are peered with your VPC. Tenant projects can’t directly communicate with each other through the same VPC (transitive property doesn’t apply). Note that you only need the proxy when designing your pipelines in Data Fusion. When you deploy your pipeline you can directly connect to the CloudSQL database internal IP address because the Dataproc cluster runs directly in your VPC and not a tenant project.

gcloud compute instances create cloudsqlgateway \
--project=generalproject-340815 \
--zone=us-central1-a \
--machine-type=e2-medium \
--network-interface=subnet=default,no-address

The output of the VM should provide you the internal IP Address of the instance. Make note of that IP address because it will be needed for the Data Fusion pipeline.

With the Proxy VM deployed, run the following commands to download the CloudSQL Auth Proxy and configure it to connect to your CloudSQL database. Before you can configure the proxy you will need to retrieve the connection name of your CloudSQL instance. To find the connection name, navigate to your instance overview page and look for the“Connection Name” section. It should be in the format [project-id]:[region]:[instance name] .

Once you have the connection name, you can run the following commands on the VM to establish the proxy.

sudo apt install wget wget https://dl.google.com/cloudsql/cloud_sql_proxy.linux.amd64 -O cloud_sql_proxychmod +x cloud_sql_proxynohup ./cloud_sql_proxy -instances=[INSTANCE CONNECTION NAME]=tcp:0.0.0.0:5432 -ip_address_types=PRIVATE &

If you get any 403 errors it may be because the VM’s Service Account isn’t authorized to connect to CloudSQL. If this is the case you’ll need to either add the required permissions to that Service Account OR reference it in your connection command. Make sure the service account has the following permissions:

  • Cloud SQL Client
  • Cloud SQL Editor
  • Cloud SQL Admin

See below for the command to reference a local service account.

nohup ./cloud_sql_proxy -instances=[INSTANCE CONNECTION NAME]=tcp:0.0.0.0:5432 -ip_address_types=PRIVATE -credential_file=sa.json &

When you execute the command it should look something like this if successful. The proxy is awaiting connections.

Building the Pipeline

The pipeline in this example is fairly simple. BigQuery reads from a very small table and writes the data to a table in Postgres. Take note that the Postgres Sink connector only supports inserts. We’ll revisit that in the following sections. Below is the configured BigQuery source plugin pulling from the table containing state abbreviations.

BigQuery Plugin Configuration

On the CloudSQL Sink Plugin you’ll need to specify the database, the user/password and the instance type. In this example the CloudSQL instance type is private so you’ll need to use the IP address of the proxy VM we configured in the prior section.

Configuration for the CloudSQL Postgres Sink Plugin

If everything is successful you should get no errors on pipeline validation. If you receive validation errors and you’re confident your credentials and configuration are correct, check your Proxy VM for logs. With it running with the nohup command it will output the logs to a file in the local directory it was launched in. Check there for any errors with connectivity.

Once you have run a quick preview of the pipeline you can deploy the pipeline and run a job with it. If everything is good, your Postgres table will have 59 happy rows in it!

Successful write to Postgres

This is a VERY simple pipeline and one you wouldn’t find unless you only need to load a table one time. In the case with most data movement situations, you want to update data instead of rewriting it. The next section will explore handling changing data.

Handling Changing Data

Since the CloudSQL Postgres sink plugin doesn’t support updates, we can leverage the CloudSQL Postgres Execute plugin to mutate the data via SQL. You can use Execute Plugins before and after the main body of your pipeline but not in your pipeline.

Below you can see my new staging table. BigQuery will load data to this Postgres staging table where it will be merged into the core table. Below is the new staging table:

CREATE TABLE state_abbreviations_stg 
(name VARCHAR(100),
abbreviation VARCHAR(100),
PRIMARY KEY(name)
);

The SQL to add new records to the table can be seen below. We check the target table for key matches and if we don’t find one, we insert the new record. If we had a scenario where we’d need to update rows that could still be accommodated by using the upsert functionality of Postgres which we’ll look at next.

INSERT INTO state_abbreviations (name, abbreviation) 
SELECT name, abbreviation FROM state_abbreviations_stg
WHERE NOT EXISTS (select name FROM state_abbreviations);

The Execute plugin will look very similar to the sink plugin except for a text field where you can define the database command to execute.

CloudSQL Postgres Execute Plugin Configuration

I’ve added a second Execute Plugin to act as a clean up stage for the job. If successful, this plugin will clean up the staging table in preparation for the next run.

Clean up staging table after merge

But what happens if my pipeline fails before we can run this truncate command? This can cause all sorts of issues, especially if you have PK’s defined and you try to reload them (duplicate key errors). The way around this is to create an alert action. This is an action that runs based upon the success or failure of a pipeline. In this scenario, we’ll create an alert action for truncating the Postgres staging table (you do this during the design phase of the pipeline).

Configure Alerts for Your Pipeline

The Alert Action looks just like the configuration we did for the Action plugin. They are very similar with the exception that the alert has a run condition that is used to invoke it. In this example, the alert action will ONLY run on a pipeline failure.

Failure Action to truncate staging table

To test this I went to the BigQuery table and added a record. Let’s assume that a new state is added to the United States. This new state is called Justania and will be abbreviated JT.

INSERT INTO demo_data.state_abbreviations (name,abbreviation) VALUES ('Justania','JT');

When my existing pipeline runs, I should see one record added to my table! Note that with Execution Plugins, we don’t have any row metrics to report on. If any of the Execute actions failed, the whole pipeline would have failed.

Successful Pipeline Execution moving 60 records (59 original +1)

Quickly validate that the new row has been added to the database table.

The new state has been added!

The Power of the Upsert

Since we covered adding new records to the table let’s tackle existing record updates. In our example, I want to change the abbreviation of my state from JT to JA. Let’s also add another state to showcase the insert as well. The SQL below represents that change.

UPDATE demo_data.state_abbreviations set abbreviation = 'JA' where name='Justania';INSERT INTO demo_data.state_abbreviations (name, abbreviation) VALUES ('Justintucky','JK');

To handle the ability to update we need to modify our SQL to handle situations of record change. The code below allows us to insert records that don’t have a conflict with the existing records. If there is a conflict, the record is updated.

INSERT INTO state_abbreviations (name, abbreviation)
SELECT stg.name, stg.abbreviation from state_abbreviations_stg stg
ON CONFLICT (name) DO UPDATE SET abbreviation = EXCLUDED.abbreviation;

If everything runs correctly, BigQuery will source 61 records and write them to our Postgres staging table.

Successful pipeline moving 61 records (60 +1 new)

A quick look at the Postgres table shows the new records and the updated record. The upsert operation worked!

Newly inserted record “Justintucky” and updated record Justania JT -> JA

Concluding Thoughts

While this example is relatively simple, more complex pipelines can be made for updating data between systems. This sample pipeline was always doing a full table scan from BigQuery and sending everything to Postgres. However, we could build additional logic on the BigQuery side so that we are only consuming the records that have changed since the last pipeline run. That would reduce the amount of data sent to Postgres. A colleague of mine has a really good article on doing batch CDC with Data Fusion. The concepts explored there can be used here as well, just in a reverse fashion. I highly recommend learning more about it.

In regards to running the CloudSQL Proxy, I’d recommend reviewing the documentation around deploying it for more resiliency during development. The approach we looked at here will work but there are better options and additional configurations to apply to run this in production. Take a look at this article for a different approach to tackle routing traffic between multiple peered VPC’s through HAProxy.

--

--

Justin Taras

I’m a Google Customer Engineer interested in all things data. I love helping customers leverage their data to build new and powerful data driven applications!