Logging new Airflow DAG entires in Cloud Composer

6 min readAug 2, 2024

DAG Upload Audit

I was recently approached by a customer with a question on how to track when a new DAG is added to Composer. It seems like an easy ask, but the logs within Airflow don’t provide the needed information.

DAG Processor Logs

While the Airflow scheduler does print out the DAGs it discovers, it’s difficult to tell which DAGs are new and when they were originally uploaded.

To find this information, we’re going to have to look at Cloud Logging or Storage Event capture to collect this information. This article will present several approaches for collecting this information and storing it in BigQuery for analysis.

Cloud Event Logging Sink

This approach requires audit logging to be enabled for your cloud storage buckets. When enabled, audit logging records events on any interaction a user, service, or service account has with cloud storage. This helps answer the crucial question of “who did what, where, and when?” It’s a powerful tool, but it can increase logging costs due to the storage of log data. Google allows you to easily select the services for which you want auditing enabled and the desired level (reads, writes, admin activity). In this specific use case, I enabled it solely for Cloud Storage write events to reduce my log volume.

Select what service and what level of audit logging you need

Going back to the use case, this audit logging feature can be used to identify those particular DAG events we want to collect.

logName="projects/[project_id]/logs/cloudaudit.googleapis.com%2Fdata_access"
proto_payload.method_name="storage.objects.create"
protoPayload.resourceName=~"projects/_/buckets/[DAG Bucket]/objects/dags/*"

When I search for this in the Cloud Logging UI, I get back all the events that correspond to my search filter. The protoPayload.resourceName filter only looks at the bucket path for my DAGs. This is critical because there are many other non-DAG objects in the bucket. This filter ensures we’re only looking at objects that match the path prefix.

The storage.objects.create filter for my Composer DAG bucket

Once you’ve identified the filter required you can create a log sink that with route the filtered message directly into a BigQuery table. You’ll need to create a BigQuery Dataset to hold the output from this sink but aside from that the setup is minimal!

Log sink for my filtered DAG additions

The log sink will only capture events from the moment of creation and beyond. It won’t retroactively load historical events. You should see the table added to your dataset once your sink detects some new events.

Table Schema

With the table created, you can now write SQL against the table to extract the details behind the DAG addition.

Simple SQL identifying the time of DAG upload, the principal and DAG name

Log Analytics

While the previous approach explicitly finds logging events and builds an output table for you, this approach leverages already stored logs on BigQuery. Many customers route their cloud logging events into BigQuery, and there are enormous benefits:

  • SQL-based querying capabilities allow for complex analysis of log data, enabling users to identify trends, anomalies, and insights that can be visualized with BI tools.
  • Long-term storage and archiving of logs.
  • Correlation with other datasets.
  • Use BQML to build predictive models on log data.

If your company already has this configured, you can write queries directly against the logs to find the event data we loaded in the previous section. However, all of this depends on Audit Logging being enabled so you can find those interactions with your storage buckets. Confirm with your admin team that this has been enabled in the project where your bucket resides.

Below is SQL against the logging export that identifies the time, principal, and the object that was added. You can use this in a view or as an ELT procedure to populate a table.

SELECT 
timestamp as event_ts,
proto_payload.audit_log.method_name,
proto_payload.audit_log.resource_name,
proto_payload.audit_log.authentication_info.principal_email
FROM `[projectID].[dataset]._AllLogs`
WHERE log_name = "projects/[projectID]/logs/cloudaudit.googleapis.com%2Fdata_access"
AND proto_payload.audit_log.method_name="storage.objects.create"
AND proto_payload.audit_log.authorization_info[safe_offset(0)].permission="storage.objects.create"
AND STARTS_WITH(proto_payload.audit_log.resource_name,"projects/_/buckets/[bucket]/objects/dags/")
AND timestamp_trunc(timestamp, DAY) = '2024-07-30'

Cloud Storage Events

While the first two approaches use Cloud Logging, they depend on audit logging being enabled. Some cloud environments may not have this enabled or have it on certain projects. If you don’t have audit events enabled you can use Cloud Storage Triggers to generate events for file creations that you can use to load data into BigQuery.

gcs_event cloud function

The sample function code below will collect events ONLY when it the object path contains dag/ . This allows us to ignore all other storage events that may be emitted by the bucket and only load those specific DAG events into BigQuery. The function will extract the eventId, event timestamp, object name, and bucket from the event payload and will write it to a BigQuery table.

from cloudevents.http import CloudEvent

import functions_framework
from google.cloud import bigquery

PROJECT_ID = "[project_id]"
DATASET_ID = "[dataset_id]"
TABLE_NAME = "[table name]"
TABLE_ID = PROJECT_ID + "." + DATASET_ID + "." + TABLE_NAME


# Triggered by a change in a storage bucket
@functions_framework.cloud_event
def gcs_event_load(cloud_event: CloudEvent) -> tuple:

data = cloud_event.data

event_id = cloud_event["id"]
event_type = cloud_event["type"]

bucket = data["bucket"]
object_name = data["name"]
event_time = data["timeCreated"]

# Filter out events with 'dags/' in the object name
if "dags/" in object_name:
# Write event data to BigQuery
client = bigquery.Client(project=PROJECT_ID)

schema = [
bigquery.SchemaField("event_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("bucket_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("object_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("event_time", "TIMESTAMP", mode="REQUIRED"),
]

table = bigquery.Table(TABLE_ID, schema=schema)
try:
client.get_table(table) # Check if table exists
except:
client.create_table(table) # Create table if it doesn't exist

rows_to_insert = [
{
"event_id": event_id,
"bucket_name": bucket,
"object_name": object_name,
"event_time": event_time,
}
]
errors = client.insert_rows_json(table, rows_to_insert)
if errors == []:
print("New rows have been added.")
else:
print("Encountered errors while inserting rows: {}".format(errors))

While this approach is great, it does come with one major drawback. The main challenge is that we cannot pre-filter events based on their path prefix. This means that ANY file added to the bucket will trigger an event that’s collected. For some buckets, that may not be a problem, but for Composer, the DAGs are in a bucket along with the log directory. This can result in MANY events, leading to some inefficiencies due to having to process events we simply discard. However, event processing with functions is relatively inexpensive. One million Gen2 function calls will cost around $10.24 a month.

Concluding Thoughts

While all the solutions presented in this article have merit, I’d recommend the log analytics solution. Having the logging data exported to BigQuery will help with this scenario and many more. While the other solutions explored are more targeted, they’ll still get the job done in the event you can’t enable audit logging or have logging routed to BigQuery. Work with your admin team to determine what’s possible in order to determine the right solution for your use case!

To learn more about log analytics check on the links below for more detail:

--

--

Justin Taras
Justin Taras

Written by Justin Taras

I’m a Google Customer Engineer interested in all things data. I love helping customers leverage their data to build new and powerful data driven applications!

No responses yet