Complete Reference Template
Every supported YAML section and field in one copyable template. Use this as a starting point for new DAGs.
How to use: Copy this template, remove the sections and fields you don't need, and fill in your values. The inline comments document every field's purpose, default, and constraints.
Full Template
# =============================================================================
# DagSmith - Complete Reference Template
#
# Section order (convention):
# variables -> configurations -> metadata -> gcp -> default_args ->
# user_defined_macros -> dag -> tasks -> dependencies
#
# Legend:
# REQUIRED - field must be present
# OPTIONAL - field can be omitted (default shown after "=")
# ALIAS - alternative name accepted for the same field
# =============================================================================
# =============================================================================
# 1. VARIABLES - OPTIONAL section
# =============================================================================
# Key-value pairs for ${VAR_NAME} substitution throughout the entire YAML.
# Naming rules: VAR____VAR
# -----------------------------------------------------------------------------
variables:
VAR__PROJECT_ID__VAR: "my-gcp-project-id"
VAR__DATASET__VAR: "my_dataset"
VAR__ENV__VAR: "prod"
VAR__BUNDLE__VAR: "my_bundle"
# =============================================================================
# 2. CONFIGURATIONS - OPTIONAL (default: base_path="/home/airflow/gcs/dags")
# =============================================================================
configurations:
base_path: "/home/airflow/gcs/dags/${VAR__PROJECT_ID__VAR}/"
# =============================================================================
# 3. METADATA - REQUIRED (all fields required)
# =============================================================================
metadata:
title: "My Pipeline Title"
owner: "team-name@example.com"
email: "team-name@example.com"
version: "1.0.0"
jira: "PROJ-1234"
developer_name: "reference_template"
# =============================================================================
# 4. GCP - REQUIRED
# =============================================================================
gcp:
project_id: "${VAR__PROJECT_ID__VAR}"
location: "US"
gcp_conn_id: "google_cloud_default"
# impersonation_chain: "sa@project.iam.gserviceaccount.com"
# deferrable: false
# =============================================================================
# 5. DEFAULT_ARGS - OPTIONAL
# =============================================================================
default_args:
owner: "airflow"
depends_on_past: false
retries: 3 # >= 0
retry_delay: 60 # seconds, >= 30. ALIAS: retry_delay_seconds
# sla: 3600 # seconds, > 0. ALIAS: sla_seconds
deferrable: true
email:
- "team-name@example.com"
email_on_failure: true
email_on_retry: false
# on_failure_callback: "mypackage.callbacks.on_failure"
# on_success_callback: "mypackage.callbacks.on_success"
# on_retry_callback: "mypackage.callbacks.on_retry"
# =============================================================================
# 6. USER_DEFINED_MACROS - OPTIONAL
# =============================================================================
user_defined_macros:
project_name: "${VAR__PROJECT_ID__VAR}"
fact_dataset: "${VAR__DATASET__VAR}"
bundle: "${VAR__BUNDLE__VAR}"
env: "${VAR__ENV__VAR}"
datastore: "BQ"
latency: 1
# =============================================================================
# 7. DAG - REQUIRED
# =============================================================================
dag:
dag_id: "reference_template"
description: "A comprehensive reference template demonstrating all features."
schedule: "0 6 * * *" # cron or preset. ALIAS: schedule_interval
start_date: "2026-01-01" # YYYY-MM-DD or YYYY-MM-DD HH:MM:SS
timezone: "America/New_York" # IANA timezone. Default: "UTC"
catchup: false
max_active_runs: 1 # >= 1
dagrun_timeout: 7200 # REQUIRED, seconds, > 0
is_paused_upon_creation: true
tags:
- "warehouse:bigquery"
- "module:reference"
- "env:${VAR__ENV__VAR}"
# DAG-level Airflow Params (for Trigger w/ Config UI):
# params:
# env:
# type: "string"
# default: "PROD"
# enum: ["PROD", "PLE", "DEV"]
# title: "Environment"
# description: "Target environment"
# =============================================================================
# 8. TASKS
# =============================================================================
tasks:
# --- EmptyOperator (no-op marker) ---
- task_id: "start"
operator: EmptyOperator
# --- BashOperator ---
- task_id: "run_shell_script"
operator: BashOperator
bash_command: "echo 'Hello from {{ ds }}'"
# --- PythonOperator ---
- task_id: "validate_params"
operator: PythonOperator
python_callable: "mypackage.validators.validate_params"
op_kwargs:
env: "{{ params.env }}"
start_dt: "{{ params.start_dt }}"
# --- BranchPythonOperator ---
- task_id: "choose_branch"
operator: BranchPythonOperator
python_callable: "mypackage.branching.pick_branch"
op_kwargs:
env: "${VAR__ENV__VAR}"
# --- BigQueryInsertJobOperator (SQL file) ---
- task_id: "stage_data"
operator: BigQueryInsertJobOperator
sql: "sql/stage_data.sql"
params:
project_id: "${VAR__PROJECT_ID__VAR}"
src_dataset: "${VAR__DATASET__VAR}"
labels:
bundle: "${VAR__BUNDLE__VAR}"
module: "reference_pipeline"
include_finops_labels: true
# --- BigQueryInsertJobOperator (inline SQL) ---
- task_id: "transform_data"
operator: BigQueryInsertJobOperator
sql: |
SELECT
account_id,
SUM(amount) AS total_amount,
COUNT(*) AS txn_count
FROM `{{ project_name }}.{{ fact_dataset }}.transactions`
WHERE process_date = DATE('{{ ds }}')
GROUP BY account_id
retries: 2
# --- BigQueryCheckOperator ---
- task_id: "data_quality_check"
operator: BigQueryCheckOperator
sql: |
SELECT CASE
WHEN COUNT(1) > 0 THEN 1
ELSE 0
END AS has_data
FROM `{{ project_name }}.{{ fact_dataset }}.transactions`
WHERE process_date = DATE('{{ ds }}')
retries: 6
retry_delay: 300
# --- BigQueryValueCheckOperator ---
- task_id: "row_count_check"
operator: BigQueryValueCheckOperator
sql: "sql/count_rows.sql"
pass_value: 1000
# --- BigQueryTableExistenceSensor ---
- task_id: "wait_for_table"
operator: BigQueryTableExistenceSensor
project_id: "${VAR__PROJECT_ID__VAR}"
dataset_id: "${VAR__DATASET__VAR}"
table_id: "transactions"
timeout: 3600
poke_interval: 120
mode: "reschedule"
# --- ExternalTaskSensor (fixed delta) ---
- task_id: "wait_for_upstream_fixed_delta"
operator: ExternalTaskSensor
external_dag_id: "upstream_pipeline"
external_task_id: "final_step"
mode: "reschedule"
poke_interval: 300
timeout: 21600
allowed_states: ["success"]
execution_delta: 3600
# --- ExternalTaskSensor (callable) ---
- task_id: "wait_for_upstream_callable"
operator: ExternalTaskSensor
external_dag_id: "reference_pipeline"
external_task_id: "metric"
mode: "reschedule"
poke_interval: 600
timeout: 43200
allowed_states: ["success"]
execution_date_fn: "mypackage.scheduling.get_reference_date"
# --- ExternalTaskSensor (same date) ---
- task_id: "wait_for_upstream_same_date"
operator: ExternalTaskSensor
external_dag_id: "sibling_pipeline"
external_task_id: "done"
mode: "reschedule"
poke_interval: 300
timeout: 21600
allowed_states: ["success"]
# --- TriggerDagRunOperator ---
- task_id: "trigger_downstream"
operator: TriggerDagRunOperator
trigger_dag_id: "downstream_pipeline"
conf:
triggered_by: "reference_template"
run_date: "{{ ds }}"
env: "{{ env }}"
wait_for_completion: false
reset_dag_run: true
# --- GCSToBigQueryOperator ---
- task_id: "load_from_gcs"
operator: GCSToBigQueryOperator
bucket: "my-data-bucket"
source_objects:
- "data/{{ ds_nodash }}/*.json"
destination_project_dataset_table: "${VAR__PROJECT_ID__VAR}.${VAR__DATASET__VAR}.raw_events"
source_format: "NEWLINE_DELIMITED_JSON"
autodetect: true
write_disposition: "WRITE_TRUNCATE"
# --- GCSToGCSOperator ---
- task_id: "archive_files"
operator: GCSToGCSOperator
source_bucket: "my-data-bucket"
source_object: "data/{{ ds_nodash }}/"
destination_bucket: "my-archive-bucket"
destination_object: "archive/{{ ds_nodash }}/"
move_object: false
# --- GCSDeleteObjectsOperator ---
- task_id: "cleanup_staging"
operator: GCSDeleteObjectsOperator
bucket_name: "my-data-bucket"
prefix: "staging/{{ ds_nodash }}/"
# --- GCSObjectsWithPrefixExistenceSensor ---
- task_id: "wait_for_files"
operator: GCSObjectsWithPrefixExistenceSensor
bucket: "my-data-bucket"
prefix: "incoming/{{ ds_nodash }}/"
mode: "reschedule"
poll_interval: 300
timeout: 3600
# --- TaskGroup ---
- operator: TaskGroup
group_id: "staging_group"
tooltip: "Stage source tables into BigQuery"
tasks:
- task_id: "stage_orders"
operator: BigQueryInsertJobOperator
sql: "sql/stage_orders.sql"
labels:
bundle: "${VAR__BUNDLE__VAR}"
- task_id: "stage_customers"
operator: BigQueryInsertJobOperator
sql: "sql/stage_customers.sql"
labels:
bundle: "${VAR__BUNDLE__VAR}"
- task_id: "stage_products"
operator: BigQueryInsertJobOperator
sql: "sql/stage_products.sql"
dependencies:
- "stage_orders >> stage_customers >> stage_products"
# --- Terminal marker ---
- task_id: "end"
operator: EmptyOperator
# =============================================================================
# 9. DEPENDENCIES
# =============================================================================
dependencies:
- "start >> run_shell_script"
- "run_shell_script >> validate_params"
- "validate_params >> choose_branch"
- "choose_branch >> [wait_for_table, wait_for_files]"
- "[wait_for_table, wait_for_files] >> stage_data"
- "stage_data >> transform_data >> data_quality_check >> row_count_check"
- "row_count_check >> staging_group"
- "staging_group >> load_from_gcs"
- "load_from_gcs >> archive_files >> cleanup_staging"
- "[wait_for_upstream_fixed_delta, wait_for_upstream_callable, wait_for_upstream_same_date] >> trigger_downstream"
- "[cleanup_staging, trigger_downstream] >> end"
Minimal Template
The smallest valid DagSmith YAML spec — only the required sections:
metadata:
title: "Simple Pipeline"
owner: "team@example.com"
email: "team@example.com"
version: "1.0.0"
jira: "PROJ-001"
developer_name: "simple_pipeline"
dag:
dag_id: "simple_pipeline"
dagrun_timeout: 3600
gcp:
project_id: "my-project"
tasks:
- task_id: "start"
operator: EmptyOperator
- task_id: "load_data"
operator: BigQueryInsertJobOperator
sql: "sql/load.sql"
- task_id: "end"
operator: EmptyOperator
dependencies:
- "start >> load_data >> end"
Template with Variables
Same pipeline using variables for multi-environment deployment:
variables:
VAR__PROJECT_ID__VAR: "my-gcp-project-001"
VAR__DATASET__VAR: "warehouse_tables"
VAR__ENV__VAR: "prod"
configurations:
base_path: "/home/airflow/gcs/dags/${VAR__PROJECT_ID__VAR}/"
metadata:
title: "Daily Account Activity Load"
owner: "data-team@example.com"
email: "data-team@example.com"
version: "1.0.0"
jira: "PROJ-456"
developer_name: "daily_load"
dag:
dag_id: "daily_account_load"
description: "Load daily account activity into BigQuery."
schedule: "0 6 * * *"
start_date: "2026-01-01"
timezone: "America/New_York"
catchup: false
dagrun_timeout: 7200
tags:
- "warehouse:bigquery"
- "env:${VAR__ENV__VAR}"
gcp:
project_id: "${VAR__PROJECT_ID__VAR}"
location: "us-east4"
default_args:
retries: 2
retry_delay: 60
email:
- "data-team@example.com"
email_on_failure: true
tasks:
- task_id: "start"
operator: EmptyOperator
- task_id: "stage_data"
operator: BigQueryInsertJobOperator
sql: "sql/stage_acct_activity.sql"
params:
project_id: "${VAR__PROJECT_ID__VAR}"
src_dataset: "${VAR__DATASET__VAR}"
- task_id: "transform_data"
operator: BigQueryInsertJobOperator
sql: "sql/transform_acct_activity.sql"
- task_id: "load_final"
operator: BigQueryInsertJobOperator
sql: "sql/load_acct_activity.sql"
retries: 3
- task_id: "end"
operator: EmptyOperator
dependencies:
- "start >> stage_data >> transform_data >> load_final >> end"
Generated Output Examples
Below are real examples of Python DAG files generated by DagSmith from YAML specs. These show exactly what dagsmith generate produces.
Example 1: Simple Sequential BigQuery Pipeline
A straightforward three-task pipeline that stages, transforms, and loads data.
Input YAML
variables:
VAR__PROJECT_ID__VAR: "my-gcp-project"
VAR__DATASET__VAR: "analytics"
configurations:
base_path: "/home/airflow/gcs/dags/${VAR__PROJECT_ID__VAR}/"
metadata:
title: "Daily Revenue Pipeline"
owner: "data-engineering"
email: "data-eng@company.com"
version: "1.0.0"
jira: "DE-101"
developer_name: "Alice Martin"
gcp:
project_id: "${VAR__PROJECT_ID__VAR}"
location: "US"
default_args:
owner: "airflow"
depends_on_past: false
retries: 2
retry_delay: 120
email: ["data-eng@company.com"]
email_on_failure: true
email_on_retry: false
dag:
dag_id: "daily_revenue_pipeline"
description: "Extract, transform, and load daily revenue data into BigQuery."
schedule: "0 6 * * *"
start_date: "2026-01-01 12:10:10"
timezone: "America/New_York"
catchup: false
max_active_runs: 1
dagrun_timeout: 7200
is_paused_upon_creation: true
tags:
- "warehouse:bigquery"
- "domain:finance"
- "cadence:daily"
tasks:
- task_id: "stage_transactions"
operator: BigQueryInsertJobOperator
sql: "sql/stage_transactions.sql"
params:
project_id: "${VAR__PROJECT_ID__VAR}"
src_dataset: "raw_data"
stg_dataset: "${VAR__DATASET__VAR}"
write_disposition: "WRITE_APPEND"
create_disposition: "CREATE_IF_NEEDED"
- task_id: "transform_revenue"
operator: BigQueryInsertJobOperator
sql: "sql/transform_revenue.sql"
params:
project_id: "${VAR__PROJECT_ID__VAR}"
dataset: "${VAR__DATASET__VAR}"
write_disposition: "WRITE_APPEND"
create_disposition: "CREATE_NEVER"
- task_id: "load_final"
operator: BigQueryInsertJobOperator
sql: "sql/load_revenue_final.sql"
retries: 3
labels:
team: "data-engineering"
pipeline: "daily_revenue"
include_finops_labels: true
dependencies:
- "stage_transactions >> transform_revenue >> load_final"
Generated Python DAG
"""
Generated DAG code for daily_revenue_pipeline through automation.
DAG : daily_revenue_pipeline
Title : Daily Revenue Pipeline
Version : 1.0.0
Owner : data-engineering
Email : data-eng@company.com
Jira : DE-101
DAG Generated Date : 2026-05-04
Source : daily_revenue_pipeline.yaml
============================================================
DO NOT EDIT MANUALLY
============================================================
"""
from __future__ import annotations
from datetime import timedelta
from typing import Any, Final
import pendulum
from airflow import DAG
# ========================
# CONFIGURATION
# ========================
TIMEZONE: Final[str] = "America/New_York"
BASE_PATH: Final[str] = "/home/airflow/gcs/dags/my-gcp-project/"
# ========================
# DEFAULT ARGS
# ========================
default_args: dict[str, Any] = {
"owner": "airflow",
"depends_on_past": False,
"retries": 2,
"retry_delay": timedelta(seconds=120), # Retry Delay Time: 2 min(s)
"sla": None,
"deferrable": True,
"email": ["data-eng@company.com"],
"email_on_failure": True,
"email_on_retry": False,
# ========================
# GCP DEFAULT ARGS
# ========================
"gcp_conn_id": "google_cloud_default",
"google_cloud_conn_id": "google_cloud_default",
"use_legacy_sql": False,
"allow_large_results": True,
"project_id": "my-gcp-project",
"location": "US",
}
# ========================
# TAGS
# ========================
tags: set[str] = {
"project_id:my-gcp-project",
"warehouse:bigquery",
"domain:finance",
"cadence:daily",
}
# ========================
# DAG DEFINITION
# ========================
with DAG(
dag_id="daily_revenue_pipeline",
start_date=pendulum.datetime(2026, 1, 1, 12, 10, 10, tz=TIMEZONE),
schedule="0 6 * * *", # Daily at 06:00 AM
default_args=default_args,
tags=tags,
dagrun_timeout=timedelta(seconds=7200), # DAG Timeout: 2 hour(s)
catchup=False,
max_active_runs=1,
is_paused_upon_creation=True,
description="""Extract, transform, and load daily revenue data into BigQuery.""",
template_searchpath=[BASE_PATH],
) as dag:
# ----- Deferred imports -----
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryInsertJobOperator,
)
# ----- Tasks -----
stage_transactions = BigQueryInsertJobOperator(
task_id="stage_transactions",
configuration={
"query": {
"query": "{% include 'sql/stage_transactions.sql' %}",
"useLegacySql": False,
"allowLargeResults": True,
"writeDisposition": "WRITE_APPEND",
"createDisposition": "CREATE_IF_NEEDED",
},
"labels": {
"dag_id": "{{ dag.dag_id }}",
"task_id": "{{ task.task_id }}",
"execution_date": "{{ ds_nodash }}",
"instance_name": "{{ var.value.composer_env_name | default('composer') }}",
"run_id": "{{ dag_run.run_id | lower | replace(':', '') | replace('+', '') | replace('.', '') | replace('-', '') | replace('T', '') }}",
},
},
params={
"project_id": "my-gcp-project",
"src_dataset": "raw_data",
"stg_dataset": "analytics",
},
)
transform_revenue = BigQueryInsertJobOperator(
task_id="transform_revenue",
configuration={
"query": {
"query": "{% include 'sql/transform_revenue.sql' %}",
"useLegacySql": False,
"allowLargeResults": True,
"writeDisposition": "WRITE_APPEND",
"createDisposition": "CREATE_NEVER",
},
"labels": {
"dag_id": "{{ dag.dag_id }}",
"task_id": "{{ task.task_id }}",
"execution_date": "{{ ds_nodash }}",
"instance_name": "{{ var.value.composer_env_name | default('composer') }}",
"run_id": "{{ dag_run.run_id | lower | replace(':', '') | replace('+', '') | replace('.', '') | replace('-', '') | replace('T', '') }}",
},
},
params={
"project_id": "my-gcp-project",
"dataset": "analytics",
},
)
load_final = BigQueryInsertJobOperator(
task_id="load_final",
configuration={
"query": {
"query": "{% include 'sql/load_revenue_final.sql' %}",
"useLegacySql": False,
"allowLargeResults": True,
},
"labels": {
"dag_id": "{{ dag.dag_id }}",
"task_id": "{{ task.task_id }}",
"execution_date": "{{ ds_nodash }}",
"instance_name": "{{ var.value.composer_env_name | default('composer') }}",
"run_id": "{{ dag_run.run_id | lower | replace(':', '') | replace('+', '') | replace('.', '') | replace('-', '') | replace('T', '') }}",
"team": "data-engineering",
"pipeline": "daily_revenue",
},
},
retries=3,
)
# ----- Dependencies -----
stage_transactions >> transform_revenue >> load_final
Key things to notice:
• ${VAR__...__VAR} variables are fully expanded to their values
• SQL file paths render as Jinja includes: {% include 'sql/...' %}
• FinOps labels are auto-injected into every BigQuery operator
• Custom labels (team, pipeline) are merged alongside FinOps labels
• retries: 3 on load_final overrides the DAG-level default of 2
• Operator imports are deferred inside the with DAG block
Example 2: Task Groups with Cross-Group Dependencies
An ETL pipeline with staging and aggregation phases organized into named task groups.
Input YAML
metadata:
title: "ETL Pipeline with Task Groups"
owner: "platform-team"
email: "platform@company.com"
version: "2.0.0"
jira: "PT-300"
developer_name: "Clara Rivera"
gcp:
project_id: "my-gcp-project"
location: "US"
default_args:
owner: "airflow"
retries: 1
retry_delay: 60
dag:
dag_id: "etl_with_task_groups"
description: "Staging and aggregation phases organized into task groups."
schedule: null
start_date: "2026-01-01"
catchup: false
max_active_runs: 1
dagrun_timeout: 7200
tags:
- "pattern:taskgroup"
- "domain:warehouse"
tasks:
- task_id: "start"
operator: EmptyOperator
- operator: TaskGroup
group_id: "staging"
tooltip: "Stage source tables sequentially"
tasks:
- task_id: "stage_customers"
operator: BigQueryInsertJobOperator
sql: "sql/stage_customers.sql"
- task_id: "stage_orders"
operator: BigQueryInsertJobOperator
sql: "sql/stage_orders.sql"
- task_id: "stage_products"
operator: BigQueryInsertJobOperator
sql: "sql/stage_products.sql"
dependencies:
- "stage_customers >> stage_orders >> stage_products"
- operator: TaskGroup
group_id: "aggregation"
tooltip: "Run aggregations in parallel"
tasks:
- task_id: "agg_revenue"
operator: BigQueryInsertJobOperator
sql: "sql/agg_revenue.sql"
- task_id: "agg_inventory"
operator: BigQueryInsertJobOperator
sql: "sql/agg_inventory.sql"
- task_id: "done"
operator: EmptyOperator
dependencies:
- "start >> staging"
- "staging >> aggregation"
- "aggregation >> done"
Generated Python DAG
"""
Generated DAG code for etl_with_task_groups through automation.
DAG : etl_with_task_groups
Title : ETL Pipeline with Task Groups
Version : 2.0.0
Owner : platform-team
Email : platform@company.com
Jira : PT-300
DAG Generated Date : 2026-05-04
Source : etl_with_task_groups.yaml
============================================================
DO NOT EDIT MANUALLY
============================================================
"""
from __future__ import annotations
from datetime import timedelta
from typing import Any, Final
import pendulum
from airflow import DAG
# ========================
# CONFIGURATION
# ========================
TIMEZONE: Final[str] = "UTC"
BASE_PATH: Final[str] = "/home/airflow/gcs/dags"
# ========================
# DEFAULT ARGS
# ========================
default_args: dict[str, Any] = {
"owner": "airflow",
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(seconds=60), # Retry Delay Time: 1 min(s)
"sla": None,
"deferrable": True,
"email": [],
"email_on_failure": False,
"email_on_retry": False,
# ========================
# GCP DEFAULT ARGS
# ========================
"gcp_conn_id": "google_cloud_default",
"google_cloud_conn_id": "google_cloud_default",
"use_legacy_sql": False,
"allow_large_results": True,
"project_id": "my-gcp-project",
"location": "US",
}
# ========================
# TAGS
# ========================
tags: set[str] = {
"project_id:my-gcp-project",
"pattern:taskgroup",
"domain:warehouse",
}
# ========================
# DAG DEFINITION
# ========================
with DAG(
dag_id="etl_with_task_groups",
start_date=pendulum.datetime(2026, 1, 1, tz=TIMEZONE),
schedule=None,
default_args=default_args,
tags=tags,
dagrun_timeout=timedelta(seconds=7200), # DAG Timeout: 2 hour(s)
catchup=False,
max_active_runs=1,
is_paused_upon_creation=True,
description="""Staging and aggregation phases organized into task groups.""",
template_searchpath=[BASE_PATH],
) as dag:
# ----- Deferred imports -----
from airflow.operators.empty import EmptyOperator
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryInsertJobOperator,
)
from airflow.utils.task_group import TaskGroup
# ----- Tasks -----
start = EmptyOperator(task_id="start")
# ----- Task group: staging -----
with TaskGroup(
group_id="staging",
tooltip="""Stage source tables sequentially""",
) as staging:
stage_customers = BigQueryInsertJobOperator(
task_id="stage_customers",
configuration={
"query": {
"query": "{% include 'sql/stage_customers.sql' %}",
"useLegacySql": False,
"allowLargeResults": True,
},
"labels": {
"dag_id": "{{ dag.dag_id }}",
"task_id": "{{ task.task_id }}",
"execution_date": "{{ ds_nodash }}",
"instance_name": "{{ var.value.composer_env_name | default('composer') }}",
"run_id": "{{ dag_run.run_id | lower | replace(':', '') | replace('+', '') | replace('.', '') | replace('-', '') | replace('T', '') }}",
},
},
)
stage_orders = BigQueryInsertJobOperator(
task_id="stage_orders",
configuration={
"query": {
"query": "{% include 'sql/stage_orders.sql' %}",
"useLegacySql": False,
"allowLargeResults": True,
},
"labels": {
"dag_id": "{{ dag.dag_id }}",
"task_id": "{{ task.task_id }}",
"execution_date": "{{ ds_nodash }}",
"instance_name": "{{ var.value.composer_env_name | default('composer') }}",
"run_id": "{{ dag_run.run_id | lower | replace(':', '') | replace('+', '') | replace('.', '') | replace('-', '') | replace('T', '') }}",
},
},
)
stage_products = BigQueryInsertJobOperator(
task_id="stage_products",
configuration={
"query": {
"query": "{% include 'sql/stage_products.sql' %}",
"useLegacySql": False,
"allowLargeResults": True,
},
"labels": {
"dag_id": "{{ dag.dag_id }}",
"task_id": "{{ task.task_id }}",
"execution_date": "{{ ds_nodash }}",
"instance_name": "{{ var.value.composer_env_name | default('composer') }}",
"run_id": "{{ dag_run.run_id | lower | replace(':', '') | replace('+', '') | replace('.', '') | replace('-', '') | replace('T', '') }}",
},
},
)
# intra-group dependencies
stage_customers >> stage_orders >> stage_products
# ----- Task group: aggregation -----
with TaskGroup(
group_id="aggregation",
tooltip="""Run aggregations in parallel""",
) as aggregation:
agg_revenue = BigQueryInsertJobOperator(
task_id="agg_revenue",
configuration={
"query": {
"query": "{% include 'sql/agg_revenue.sql' %}",
"useLegacySql": False,
"allowLargeResults": True,
},
"labels": {
"dag_id": "{{ dag.dag_id }}",
"task_id": "{{ task.task_id }}",
"execution_date": "{{ ds_nodash }}",
"instance_name": "{{ var.value.composer_env_name | default('composer') }}",
"run_id": "{{ dag_run.run_id | lower | replace(':', '') | replace('+', '') | replace('.', '') | replace('-', '') | replace('T', '') }}",
},
},
)
agg_inventory = BigQueryInsertJobOperator(
task_id="agg_inventory",
configuration={
"query": {
"query": "{% include 'sql/agg_inventory.sql' %}",
"useLegacySql": False,
"allowLargeResults": True,
},
"labels": {
"dag_id": "{{ dag.dag_id }}",
"task_id": "{{ task.task_id }}",
"execution_date": "{{ ds_nodash }}",
"instance_name": "{{ var.value.composer_env_name | default('composer') }}",
"run_id": "{{ dag_run.run_id | lower | replace(':', '') | replace('+', '') | replace('.', '') | replace('-', '') | replace('T', '') }}",
},
},
)
done = EmptyOperator(task_id="done")
# ----- Dependencies -----
start >> staging
staging >> aggregation
aggregation >> done
Key things to notice:
• TaskGroup renders as Python with TaskGroup(...) as name: blocks
• Intra-group dependencies (stage_customers >> stage_orders >> stage_products) stay inside the group block
• Cross-group dependencies (start >> staging >> aggregation >> done) reference group names directly
• Tasks within the aggregation group have no intra-group dependencies — they run in parallel
• FinOps labels are auto-injected into every BigQuery operator within groups too
• schedule: null renders as schedule=None (manual-trigger only)