Logging new Airflow DAG entires in Cloud Composer
DAG Upload Audit
I was recently approached by a customer with a question on how to track when a new DAG is added to Composer. It seems like an easy ask, but the logs within Airflow don’t provide the needed information.
While the Airflow scheduler does print out the DAGs it discovers, it’s difficult to tell which DAGs are new and when they were originally uploaded.
To find this information, we’re going to have to look at Cloud Logging or Storage Event capture to collect this information. This article will present several approaches for collecting this information and storing it in BigQuery for analysis.
Cloud Event Logging Sink
This approach requires audit logging to be enabled for your cloud storage buckets. When enabled, audit logging records events on any interaction a user, service, or service account has with cloud storage. This helps answer the crucial question of “who did what, where, and when?” It’s a powerful tool, but it can increase logging costs due to the storage of log data. Google allows you to easily select the services for which you want auditing enabled and the desired level (reads, writes, admin activity). In this specific use case, I enabled it solely for Cloud Storage write events to reduce my log volume.
Going back to the use case, this audit logging feature can be used to identify those particular DAG events we want to collect.
logName="projects/[project_id]/logs/cloudaudit.googleapis.com%2Fdata_access"
proto_payload.method_name="storage.objects.create"
protoPayload.resourceName=~"projects/_/buckets/[DAG Bucket]/objects/dags/*"
When I search for this in the Cloud Logging UI, I get back all the events that correspond to my search filter. The protoPayload.resourceName filter only looks at the bucket path for my DAGs. This is critical because there are many other non-DAG objects in the bucket. This filter ensures we’re only looking at objects that match the path prefix.
Once you’ve identified the filter required you can create a log sink that with route the filtered message directly into a BigQuery table. You’ll need to create a BigQuery Dataset to hold the output from this sink but aside from that the setup is minimal!
The log sink will only capture events from the moment of creation and beyond. It won’t retroactively load historical events. You should see the table added to your dataset once your sink detects some new events.
With the table created, you can now write SQL against the table to extract the details behind the DAG addition.
Log Analytics
While the previous approach explicitly finds logging events and builds an output table for you, this approach leverages already stored logs on BigQuery. Many customers route their cloud logging events into BigQuery, and there are enormous benefits:
- SQL-based querying capabilities allow for complex analysis of log data, enabling users to identify trends, anomalies, and insights that can be visualized with BI tools.
- Long-term storage and archiving of logs.
- Correlation with other datasets.
- Use BQML to build predictive models on log data.
If your company already has this configured, you can write queries directly against the logs to find the event data we loaded in the previous section. However, all of this depends on Audit Logging being enabled so you can find those interactions with your storage buckets. Confirm with your admin team that this has been enabled in the project where your bucket resides.
Below is SQL against the logging export that identifies the time, principal, and the object that was added. You can use this in a view or as an ELT procedure to populate a table.
SELECT
timestamp as event_ts,
proto_payload.audit_log.method_name,
proto_payload.audit_log.resource_name,
proto_payload.audit_log.authentication_info.principal_email
FROM `[projectID].[dataset]._AllLogs`
WHERE log_name = "projects/[projectID]/logs/cloudaudit.googleapis.com%2Fdata_access"
AND proto_payload.audit_log.method_name="storage.objects.create"
AND proto_payload.audit_log.authorization_info[safe_offset(0)].permission="storage.objects.create"
AND STARTS_WITH(proto_payload.audit_log.resource_name,"projects/_/buckets/[bucket]/objects/dags/")
AND timestamp_trunc(timestamp, DAY) = '2024-07-30'
Cloud Storage Events
While the first two approaches use Cloud Logging, they depend on audit logging being enabled. Some cloud environments may not have this enabled or have it on certain projects. If you don’t have audit events enabled you can use Cloud Storage Triggers to generate events for file creations that you can use to load data into BigQuery.
The sample function code below will collect events ONLY when it the object path contains dag/ . This allows us to ignore all other storage events that may be emitted by the bucket and only load those specific DAG events into BigQuery. The function will extract the eventId, event timestamp, object name, and bucket from the event payload and will write it to a BigQuery table.
from cloudevents.http import CloudEvent
import functions_framework
from google.cloud import bigquery
PROJECT_ID = "[project_id]"
DATASET_ID = "[dataset_id]"
TABLE_NAME = "[table name]"
TABLE_ID = PROJECT_ID + "." + DATASET_ID + "." + TABLE_NAME
# Triggered by a change in a storage bucket
@functions_framework.cloud_event
def gcs_event_load(cloud_event: CloudEvent) -> tuple:
data = cloud_event.data
event_id = cloud_event["id"]
event_type = cloud_event["type"]
bucket = data["bucket"]
object_name = data["name"]
event_time = data["timeCreated"]
# Filter out events with 'dags/' in the object name
if "dags/" in object_name:
# Write event data to BigQuery
client = bigquery.Client(project=PROJECT_ID)
schema = [
bigquery.SchemaField("event_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("bucket_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("object_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("event_time", "TIMESTAMP", mode="REQUIRED"),
]
table = bigquery.Table(TABLE_ID, schema=schema)
try:
client.get_table(table) # Check if table exists
except:
client.create_table(table) # Create table if it doesn't exist
rows_to_insert = [
{
"event_id": event_id,
"bucket_name": bucket,
"object_name": object_name,
"event_time": event_time,
}
]
errors = client.insert_rows_json(table, rows_to_insert)
if errors == []:
print("New rows have been added.")
else:
print("Encountered errors while inserting rows: {}".format(errors))
While this approach is great, it does come with one major drawback. The main challenge is that we cannot pre-filter events based on their path prefix. This means that ANY file added to the bucket will trigger an event that’s collected. For some buckets, that may not be a problem, but for Composer, the DAGs are in a bucket along with the log directory. This can result in MANY events, leading to some inefficiencies due to having to process events we simply discard. However, event processing with functions is relatively inexpensive. One million Gen2 function calls will cost around $10.24 a month.
Concluding Thoughts
While all the solutions presented in this article have merit, I’d recommend the log analytics solution. Having the logging data exported to BigQuery will help with this scenario and many more. While the other solutions explored are more targeted, they’ll still get the job done in the event you can’t enable audit logging or have logging routed to BigQuery. Work with your admin team to determine what’s possible in order to determine the right solution for your use case!
To learn more about log analytics check on the links below for more detail: