Complete Reference Template

Every supported YAML section and field in one copyable template. Use this as a starting point for new DAGs.

How to use: Copy this template, remove the sections and fields you don't need, and fill in your values. The inline comments document every field's purpose, default, and constraints.

Full Template

yaml
# =============================================================================
# DagSmith - Complete Reference Template
#
# Section order (convention):
#   variables -> configurations -> metadata -> gcp -> default_args ->
#   user_defined_macros -> dag -> tasks -> dependencies
#
# Legend:
#   REQUIRED   - field must be present
#   OPTIONAL   - field can be omitted (default shown after "=")
#   ALIAS      - alternative name accepted for the same field
# =============================================================================


# =============================================================================
# 1. VARIABLES - OPTIONAL section
# =============================================================================
# Key-value pairs for ${VAR_NAME} substitution throughout the entire YAML.
# Naming rules: VAR____VAR
# -----------------------------------------------------------------------------
variables:
  VAR__PROJECT_ID__VAR: "my-gcp-project-id"
  VAR__DATASET__VAR: "my_dataset"
  VAR__ENV__VAR: "prod"
  VAR__BUNDLE__VAR: "my_bundle"


# =============================================================================
# 2. CONFIGURATIONS - OPTIONAL (default: base_path="/home/airflow/gcs/dags")
# =============================================================================
configurations:
  base_path: "/home/airflow/gcs/dags/${VAR__PROJECT_ID__VAR}/"


# =============================================================================
# 3. METADATA - REQUIRED (all fields required)
# =============================================================================
metadata:
  title: "My Pipeline Title"
  owner: "team-name@example.com"
  email: "team-name@example.com"
  version: "1.0.0"
  jira: "PROJ-1234"
  developer_name: "reference_template"


# =============================================================================
# 4. GCP - REQUIRED
# =============================================================================
gcp:
  project_id: "${VAR__PROJECT_ID__VAR}"
  location: "US"
  gcp_conn_id: "google_cloud_default"
  # impersonation_chain: "sa@project.iam.gserviceaccount.com"
  # deferrable: false


# =============================================================================
# 5. DEFAULT_ARGS - OPTIONAL
# =============================================================================
default_args:
  owner: "airflow"
  depends_on_past: false
  retries: 3                          # >= 0
  retry_delay: 60                     # seconds, >= 30. ALIAS: retry_delay_seconds
  # sla: 3600                         # seconds, > 0. ALIAS: sla_seconds
  deferrable: true
  email:
    - "team-name@example.com"
  email_on_failure: true
  email_on_retry: false
  # on_failure_callback: "mypackage.callbacks.on_failure"
  # on_success_callback: "mypackage.callbacks.on_success"
  # on_retry_callback: "mypackage.callbacks.on_retry"


# =============================================================================
# 6. USER_DEFINED_MACROS - OPTIONAL
# =============================================================================
user_defined_macros:
  project_name: "${VAR__PROJECT_ID__VAR}"
  fact_dataset: "${VAR__DATASET__VAR}"
  bundle: "${VAR__BUNDLE__VAR}"
  env: "${VAR__ENV__VAR}"
  datastore: "BQ"
  latency: 1


# =============================================================================
# 7. DAG - REQUIRED
# =============================================================================
dag:
  dag_id: "reference_template"
  description: "A comprehensive reference template demonstrating all features."
  schedule: "0 6 * * *"              # cron or preset. ALIAS: schedule_interval
  start_date: "2026-01-01"           # YYYY-MM-DD or YYYY-MM-DD HH:MM:SS
  timezone: "America/New_York"       # IANA timezone. Default: "UTC"
  catchup: false
  max_active_runs: 1                 # >= 1
  dagrun_timeout: 7200               # REQUIRED, seconds, > 0
  is_paused_upon_creation: true
  tags:
    - "warehouse:bigquery"
    - "module:reference"
    - "env:${VAR__ENV__VAR}"

  # DAG-level Airflow Params (for Trigger w/ Config UI):
  # params:
  #   env:
  #     type: "string"
  #     default: "PROD"
  #     enum: ["PROD", "PLE", "DEV"]
  #     title: "Environment"
  #     description: "Target environment"


# =============================================================================
# 8. TASKS
# =============================================================================
tasks:

  # --- EmptyOperator (no-op marker) ---
  - task_id: "start"
    operator: EmptyOperator

  # --- BashOperator ---
  - task_id: "run_shell_script"
    operator: BashOperator
    bash_command: "echo 'Hello from {{ ds }}'"

  # --- PythonOperator ---
  - task_id: "validate_params"
    operator: PythonOperator
    python_callable: "mypackage.validators.validate_params"
    op_kwargs:
      env: "{{ params.env }}"
      start_dt: "{{ params.start_dt }}"

  # --- BranchPythonOperator ---
  - task_id: "choose_branch"
    operator: BranchPythonOperator
    python_callable: "mypackage.branching.pick_branch"
    op_kwargs:
      env: "${VAR__ENV__VAR}"

  # --- BigQueryInsertJobOperator (SQL file) ---
  - task_id: "stage_data"
    operator: BigQueryInsertJobOperator
    sql: "sql/stage_data.sql"
    params:
      project_id: "${VAR__PROJECT_ID__VAR}"
      src_dataset: "${VAR__DATASET__VAR}"
    labels:
      bundle: "${VAR__BUNDLE__VAR}"
      module: "reference_pipeline"
    include_finops_labels: true

  # --- BigQueryInsertJobOperator (inline SQL) ---
  - task_id: "transform_data"
    operator: BigQueryInsertJobOperator
    sql: |
      SELECT
        account_id,
        SUM(amount) AS total_amount,
        COUNT(*) AS txn_count
      FROM `{{ project_name }}.{{ fact_dataset }}.transactions`
      WHERE process_date = DATE('{{ ds }}')
      GROUP BY account_id
    retries: 2

  # --- BigQueryCheckOperator ---
  - task_id: "data_quality_check"
    operator: BigQueryCheckOperator
    sql: |
      SELECT CASE
        WHEN COUNT(1) > 0 THEN 1
        ELSE 0
      END AS has_data
      FROM `{{ project_name }}.{{ fact_dataset }}.transactions`
      WHERE process_date = DATE('{{ ds }}')
    retries: 6
    retry_delay: 300

  # --- BigQueryValueCheckOperator ---
  - task_id: "row_count_check"
    operator: BigQueryValueCheckOperator
    sql: "sql/count_rows.sql"
    pass_value: 1000

  # --- BigQueryTableExistenceSensor ---
  - task_id: "wait_for_table"
    operator: BigQueryTableExistenceSensor
    project_id: "${VAR__PROJECT_ID__VAR}"
    dataset_id: "${VAR__DATASET__VAR}"
    table_id: "transactions"
    timeout: 3600
    poke_interval: 120
    mode: "reschedule"

  # --- ExternalTaskSensor (fixed delta) ---
  - task_id: "wait_for_upstream_fixed_delta"
    operator: ExternalTaskSensor
    external_dag_id: "upstream_pipeline"
    external_task_id: "final_step"
    mode: "reschedule"
    poke_interval: 300
    timeout: 21600
    allowed_states: ["success"]
    execution_delta: 3600

  # --- ExternalTaskSensor (callable) ---
  - task_id: "wait_for_upstream_callable"
    operator: ExternalTaskSensor
    external_dag_id: "reference_pipeline"
    external_task_id: "metric"
    mode: "reschedule"
    poke_interval: 600
    timeout: 43200
    allowed_states: ["success"]
    execution_date_fn: "mypackage.scheduling.get_reference_date"

  # --- ExternalTaskSensor (same date) ---
  - task_id: "wait_for_upstream_same_date"
    operator: ExternalTaskSensor
    external_dag_id: "sibling_pipeline"
    external_task_id: "done"
    mode: "reschedule"
    poke_interval: 300
    timeout: 21600
    allowed_states: ["success"]

  # --- TriggerDagRunOperator ---
  - task_id: "trigger_downstream"
    operator: TriggerDagRunOperator
    trigger_dag_id: "downstream_pipeline"
    conf:
      triggered_by: "reference_template"
      run_date: "{{ ds }}"
      env: "{{ env }}"
    wait_for_completion: false
    reset_dag_run: true

  # --- GCSToBigQueryOperator ---
  - task_id: "load_from_gcs"
    operator: GCSToBigQueryOperator
    bucket: "my-data-bucket"
    source_objects:
      - "data/{{ ds_nodash }}/*.json"
    destination_project_dataset_table: "${VAR__PROJECT_ID__VAR}.${VAR__DATASET__VAR}.raw_events"
    source_format: "NEWLINE_DELIMITED_JSON"
    autodetect: true
    write_disposition: "WRITE_TRUNCATE"

  # --- GCSToGCSOperator ---
  - task_id: "archive_files"
    operator: GCSToGCSOperator
    source_bucket: "my-data-bucket"
    source_object: "data/{{ ds_nodash }}/"
    destination_bucket: "my-archive-bucket"
    destination_object: "archive/{{ ds_nodash }}/"
    move_object: false

  # --- GCSDeleteObjectsOperator ---
  - task_id: "cleanup_staging"
    operator: GCSDeleteObjectsOperator
    bucket_name: "my-data-bucket"
    prefix: "staging/{{ ds_nodash }}/"

  # --- GCSObjectsWithPrefixExistenceSensor ---
  - task_id: "wait_for_files"
    operator: GCSObjectsWithPrefixExistenceSensor
    bucket: "my-data-bucket"
    prefix: "incoming/{{ ds_nodash }}/"
    mode: "reschedule"
    poll_interval: 300
    timeout: 3600

  # --- TaskGroup ---
  - operator: TaskGroup
    group_id: "staging_group"
    tooltip: "Stage source tables into BigQuery"
    tasks:
      - task_id: "stage_orders"
        operator: BigQueryInsertJobOperator
        sql: "sql/stage_orders.sql"
        labels:
          bundle: "${VAR__BUNDLE__VAR}"

      - task_id: "stage_customers"
        operator: BigQueryInsertJobOperator
        sql: "sql/stage_customers.sql"
        labels:
          bundle: "${VAR__BUNDLE__VAR}"

      - task_id: "stage_products"
        operator: BigQueryInsertJobOperator
        sql: "sql/stage_products.sql"

    dependencies:
      - "stage_orders >> stage_customers >> stage_products"

  # --- Terminal marker ---
  - task_id: "end"
    operator: EmptyOperator


# =============================================================================
# 9. DEPENDENCIES
# =============================================================================
dependencies:
  - "start >> run_shell_script"
  - "run_shell_script >> validate_params"
  - "validate_params >> choose_branch"
  - "choose_branch >> [wait_for_table, wait_for_files]"
  - "[wait_for_table, wait_for_files] >> stage_data"
  - "stage_data >> transform_data >> data_quality_check >> row_count_check"
  - "row_count_check >> staging_group"
  - "staging_group >> load_from_gcs"
  - "load_from_gcs >> archive_files >> cleanup_staging"
  - "[wait_for_upstream_fixed_delta, wait_for_upstream_callable, wait_for_upstream_same_date] >> trigger_downstream"
  - "[cleanup_staging, trigger_downstream] >> end"

Minimal Template

The smallest valid DagSmith YAML spec — only the required sections:

yaml
metadata:
  title: "Simple Pipeline"
  owner: "team@example.com"
  email: "team@example.com"
  version: "1.0.0"
  jira: "PROJ-001"
  developer_name: "simple_pipeline"

dag:
  dag_id: "simple_pipeline"
  dagrun_timeout: 3600

gcp:
  project_id: "my-project"

tasks:
  - task_id: "start"
    operator: EmptyOperator

  - task_id: "load_data"
    operator: BigQueryInsertJobOperator
    sql: "sql/load.sql"

  - task_id: "end"
    operator: EmptyOperator

dependencies:
  - "start >> load_data >> end"

Template with Variables

Same pipeline using variables for multi-environment deployment:

yaml
variables:
  VAR__PROJECT_ID__VAR: "my-gcp-project-001"
  VAR__DATASET__VAR: "warehouse_tables"
  VAR__ENV__VAR: "prod"

configurations:
  base_path: "/home/airflow/gcs/dags/${VAR__PROJECT_ID__VAR}/"

metadata:
  title: "Daily Account Activity Load"
  owner: "data-team@example.com"
  email: "data-team@example.com"
  version: "1.0.0"
  jira: "PROJ-456"
  developer_name: "daily_load"

dag:
  dag_id: "daily_account_load"
  description: "Load daily account activity into BigQuery."
  schedule: "0 6 * * *"
  start_date: "2026-01-01"
  timezone: "America/New_York"
  catchup: false
  dagrun_timeout: 7200
  tags:
    - "warehouse:bigquery"
    - "env:${VAR__ENV__VAR}"

gcp:
  project_id: "${VAR__PROJECT_ID__VAR}"
  location: "us-east4"

default_args:
  retries: 2
  retry_delay: 60
  email:
    - "data-team@example.com"
  email_on_failure: true

tasks:
  - task_id: "start"
    operator: EmptyOperator

  - task_id: "stage_data"
    operator: BigQueryInsertJobOperator
    sql: "sql/stage_acct_activity.sql"
    params:
      project_id: "${VAR__PROJECT_ID__VAR}"
      src_dataset: "${VAR__DATASET__VAR}"

  - task_id: "transform_data"
    operator: BigQueryInsertJobOperator
    sql: "sql/transform_acct_activity.sql"

  - task_id: "load_final"
    operator: BigQueryInsertJobOperator
    sql: "sql/load_acct_activity.sql"
    retries: 3

  - task_id: "end"
    operator: EmptyOperator

dependencies:
  - "start >> stage_data >> transform_data >> load_final >> end"

Generated Output Examples

Below are real examples of Python DAG files generated by DagSmith from YAML specs. These show exactly what dagsmith generate produces.

Example 1: Simple Sequential BigQuery Pipeline

A straightforward three-task pipeline that stages, transforms, and loads data.

Input YAML

yaml
variables:
  VAR__PROJECT_ID__VAR: "my-gcp-project"
  VAR__DATASET__VAR: "analytics"

configurations:
  base_path: "/home/airflow/gcs/dags/${VAR__PROJECT_ID__VAR}/"

metadata:
  title: "Daily Revenue Pipeline"
  owner: "data-engineering"
  email: "data-eng@company.com"
  version: "1.0.0"
  jira: "DE-101"
  developer_name: "Alice Martin"

gcp:
  project_id: "${VAR__PROJECT_ID__VAR}"
  location: "US"

default_args:
  owner: "airflow"
  depends_on_past: false
  retries: 2
  retry_delay: 120
  email: ["data-eng@company.com"]
  email_on_failure: true
  email_on_retry: false

dag:
  dag_id: "daily_revenue_pipeline"
  description: "Extract, transform, and load daily revenue data into BigQuery."
  schedule: "0 6 * * *"
  start_date: "2026-01-01 12:10:10"
  timezone: "America/New_York"
  catchup: false
  max_active_runs: 1
  dagrun_timeout: 7200
  is_paused_upon_creation: true
  tags:
    - "warehouse:bigquery"
    - "domain:finance"
    - "cadence:daily"

tasks:
  - task_id: "stage_transactions"
    operator: BigQueryInsertJobOperator
    sql: "sql/stage_transactions.sql"
    params:
      project_id: "${VAR__PROJECT_ID__VAR}"
      src_dataset: "raw_data"
      stg_dataset: "${VAR__DATASET__VAR}"
    write_disposition: "WRITE_APPEND"
    create_disposition: "CREATE_IF_NEEDED"

  - task_id: "transform_revenue"
    operator: BigQueryInsertJobOperator
    sql: "sql/transform_revenue.sql"
    params:
      project_id: "${VAR__PROJECT_ID__VAR}"
      dataset: "${VAR__DATASET__VAR}"
    write_disposition: "WRITE_APPEND"
    create_disposition: "CREATE_NEVER"

  - task_id: "load_final"
    operator: BigQueryInsertJobOperator
    sql: "sql/load_revenue_final.sql"
    retries: 3
    labels:
      team: "data-engineering"
      pipeline: "daily_revenue"
    include_finops_labels: true

dependencies:
  - "stage_transactions >> transform_revenue >> load_final"

Generated Python DAG

python
"""
Generated DAG code for daily_revenue_pipeline through automation.

DAG                : daily_revenue_pipeline
Title              : Daily Revenue Pipeline
Version            : 1.0.0
Owner              : data-engineering
Email              : data-eng@company.com
Jira               : DE-101
DAG Generated Date : 2026-05-04
Source             : daily_revenue_pipeline.yaml
============================================================
                    DO NOT EDIT MANUALLY
============================================================
"""

from __future__ import annotations

from datetime import timedelta
from typing import Any, Final

import pendulum

from airflow import DAG

# ========================
# CONFIGURATION
# ========================
TIMEZONE: Final[str] = "America/New_York"
BASE_PATH: Final[str] = "/home/airflow/gcs/dags/my-gcp-project/"

# ========================
# DEFAULT ARGS
# ========================
default_args: dict[str, Any] = {
    "owner": "airflow",
    "depends_on_past": False,
    "retries": 2,
    "retry_delay": timedelta(seconds=120),  # Retry Delay Time: 2 min(s)
    "sla": None,
    "deferrable": True,
    "email": ["data-eng@company.com"],
    "email_on_failure": True,
    "email_on_retry": False,
    # ========================
    # GCP DEFAULT ARGS
    # ========================
    "gcp_conn_id": "google_cloud_default",
    "google_cloud_conn_id": "google_cloud_default",
    "use_legacy_sql": False,
    "allow_large_results": True,
    "project_id": "my-gcp-project",
    "location": "US",
}

# ========================
# TAGS
# ========================
tags: set[str] = {
    "project_id:my-gcp-project",
    "warehouse:bigquery",
    "domain:finance",
    "cadence:daily",
}

# ========================
# DAG DEFINITION
# ========================
with DAG(
    dag_id="daily_revenue_pipeline",
    start_date=pendulum.datetime(2026, 1, 1, 12, 10, 10, tz=TIMEZONE),
    schedule="0 6 * * *",  # Daily at 06:00 AM
    default_args=default_args,
    tags=tags,
    dagrun_timeout=timedelta(seconds=7200),  # DAG Timeout: 2 hour(s)
    catchup=False,
    max_active_runs=1,
    is_paused_upon_creation=True,
    description="""Extract, transform, and load daily revenue data into BigQuery.""",
    template_searchpath=[BASE_PATH],
) as dag:
    # ----- Deferred imports -----
    from airflow.providers.google.cloud.operators.bigquery import (
        BigQueryInsertJobOperator,
    )

    # ----- Tasks -----
    stage_transactions = BigQueryInsertJobOperator(
        task_id="stage_transactions",
        configuration={
            "query": {
                "query": "{% include 'sql/stage_transactions.sql' %}",
                "useLegacySql": False,
                "allowLargeResults": True,
                "writeDisposition": "WRITE_APPEND",
                "createDisposition": "CREATE_IF_NEEDED",
            },
            "labels": {
                "dag_id": "{{ dag.dag_id }}",
                "task_id": "{{ task.task_id }}",
                "execution_date": "{{ ds_nodash }}",
                "instance_name": "{{ var.value.composer_env_name | default('composer') }}",
                "run_id": "{{ dag_run.run_id | lower | replace(':', '') | replace('+', '') | replace('.', '') | replace('-', '') | replace('T', '') }}",
            },
        },
        params={
            "project_id": "my-gcp-project",
            "src_dataset": "raw_data",
            "stg_dataset": "analytics",
        },
    )

    transform_revenue = BigQueryInsertJobOperator(
        task_id="transform_revenue",
        configuration={
            "query": {
                "query": "{% include 'sql/transform_revenue.sql' %}",
                "useLegacySql": False,
                "allowLargeResults": True,
                "writeDisposition": "WRITE_APPEND",
                "createDisposition": "CREATE_NEVER",
            },
            "labels": {
                "dag_id": "{{ dag.dag_id }}",
                "task_id": "{{ task.task_id }}",
                "execution_date": "{{ ds_nodash }}",
                "instance_name": "{{ var.value.composer_env_name | default('composer') }}",
                "run_id": "{{ dag_run.run_id | lower | replace(':', '') | replace('+', '') | replace('.', '') | replace('-', '') | replace('T', '') }}",
            },
        },
        params={
            "project_id": "my-gcp-project",
            "dataset": "analytics",
        },
    )

    load_final = BigQueryInsertJobOperator(
        task_id="load_final",
        configuration={
            "query": {
                "query": "{% include 'sql/load_revenue_final.sql' %}",
                "useLegacySql": False,
                "allowLargeResults": True,
            },
            "labels": {
                "dag_id": "{{ dag.dag_id }}",
                "task_id": "{{ task.task_id }}",
                "execution_date": "{{ ds_nodash }}",
                "instance_name": "{{ var.value.composer_env_name | default('composer') }}",
                "run_id": "{{ dag_run.run_id | lower | replace(':', '') | replace('+', '') | replace('.', '') | replace('-', '') | replace('T', '') }}",
                "team": "data-engineering",
                "pipeline": "daily_revenue",
            },
        },
        retries=3,
    )

    # ----- Dependencies -----
    stage_transactions >> transform_revenue >> load_final

Key things to notice:

${VAR__...__VAR} variables are fully expanded to their values
• SQL file paths render as Jinja includes: {% include 'sql/...' %}
• FinOps labels are auto-injected into every BigQuery operator
• Custom labels (team, pipeline) are merged alongside FinOps labels
retries: 3 on load_final overrides the DAG-level default of 2
• Operator imports are deferred inside the with DAG block


Example 2: Task Groups with Cross-Group Dependencies

An ETL pipeline with staging and aggregation phases organized into named task groups.

Input YAML

yaml
metadata:
  title: "ETL Pipeline with Task Groups"
  owner: "platform-team"
  email: "platform@company.com"
  version: "2.0.0"
  jira: "PT-300"
  developer_name: "Clara Rivera"

gcp:
  project_id: "my-gcp-project"
  location: "US"

default_args:
  owner: "airflow"
  retries: 1
  retry_delay: 60

dag:
  dag_id: "etl_with_task_groups"
  description: "Staging and aggregation phases organized into task groups."
  schedule: null
  start_date: "2026-01-01"
  catchup: false
  max_active_runs: 1
  dagrun_timeout: 7200
  tags:
    - "pattern:taskgroup"
    - "domain:warehouse"

tasks:
  - task_id: "start"
    operator: EmptyOperator

  - operator: TaskGroup
    group_id: "staging"
    tooltip: "Stage source tables sequentially"
    tasks:
      - task_id: "stage_customers"
        operator: BigQueryInsertJobOperator
        sql: "sql/stage_customers.sql"
      - task_id: "stage_orders"
        operator: BigQueryInsertJobOperator
        sql: "sql/stage_orders.sql"
      - task_id: "stage_products"
        operator: BigQueryInsertJobOperator
        sql: "sql/stage_products.sql"
    dependencies:
      - "stage_customers >> stage_orders >> stage_products"

  - operator: TaskGroup
    group_id: "aggregation"
    tooltip: "Run aggregations in parallel"
    tasks:
      - task_id: "agg_revenue"
        operator: BigQueryInsertJobOperator
        sql: "sql/agg_revenue.sql"
      - task_id: "agg_inventory"
        operator: BigQueryInsertJobOperator
        sql: "sql/agg_inventory.sql"

  - task_id: "done"
    operator: EmptyOperator

dependencies:
  - "start >> staging"
  - "staging >> aggregation"
  - "aggregation >> done"

Generated Python DAG

python
"""
Generated DAG code for etl_with_task_groups through automation.

DAG                : etl_with_task_groups
Title              : ETL Pipeline with Task Groups
Version            : 2.0.0
Owner              : platform-team
Email              : platform@company.com
Jira               : PT-300
DAG Generated Date : 2026-05-04
Source             : etl_with_task_groups.yaml
============================================================
                    DO NOT EDIT MANUALLY
============================================================
"""

from __future__ import annotations

from datetime import timedelta
from typing import Any, Final

import pendulum

from airflow import DAG

# ========================
# CONFIGURATION
# ========================
TIMEZONE: Final[str] = "UTC"
BASE_PATH: Final[str] = "/home/airflow/gcs/dags"

# ========================
# DEFAULT ARGS
# ========================
default_args: dict[str, Any] = {
    "owner": "airflow",
    "depends_on_past": False,
    "retries": 1,
    "retry_delay": timedelta(seconds=60),  # Retry Delay Time: 1 min(s)
    "sla": None,
    "deferrable": True,
    "email": [],
    "email_on_failure": False,
    "email_on_retry": False,
    # ========================
    # GCP DEFAULT ARGS
    # ========================
    "gcp_conn_id": "google_cloud_default",
    "google_cloud_conn_id": "google_cloud_default",
    "use_legacy_sql": False,
    "allow_large_results": True,
    "project_id": "my-gcp-project",
    "location": "US",
}

# ========================
# TAGS
# ========================
tags: set[str] = {
    "project_id:my-gcp-project",
    "pattern:taskgroup",
    "domain:warehouse",
}

# ========================
# DAG DEFINITION
# ========================
with DAG(
    dag_id="etl_with_task_groups",
    start_date=pendulum.datetime(2026, 1, 1, tz=TIMEZONE),
    schedule=None,
    default_args=default_args,
    tags=tags,
    dagrun_timeout=timedelta(seconds=7200),  # DAG Timeout: 2 hour(s)
    catchup=False,
    max_active_runs=1,
    is_paused_upon_creation=True,
    description="""Staging and aggregation phases organized into task groups.""",
    template_searchpath=[BASE_PATH],
) as dag:
    # ----- Deferred imports -----
    from airflow.operators.empty import EmptyOperator
    from airflow.providers.google.cloud.operators.bigquery import (
        BigQueryInsertJobOperator,
    )
    from airflow.utils.task_group import TaskGroup

    # ----- Tasks -----
    start = EmptyOperator(task_id="start")

    # ----- Task group: staging -----
    with TaskGroup(
        group_id="staging",
        tooltip="""Stage source tables sequentially""",
    ) as staging:
        stage_customers = BigQueryInsertJobOperator(
            task_id="stage_customers",
            configuration={
                "query": {
                    "query": "{% include 'sql/stage_customers.sql' %}",
                    "useLegacySql": False,
                    "allowLargeResults": True,
                },
                "labels": {
                    "dag_id": "{{ dag.dag_id }}",
                    "task_id": "{{ task.task_id }}",
                    "execution_date": "{{ ds_nodash }}",
                    "instance_name": "{{ var.value.composer_env_name | default('composer') }}",
                    "run_id": "{{ dag_run.run_id | lower | replace(':', '') | replace('+', '') | replace('.', '') | replace('-', '') | replace('T', '') }}",
                },
            },
        )

        stage_orders = BigQueryInsertJobOperator(
            task_id="stage_orders",
            configuration={
                "query": {
                    "query": "{% include 'sql/stage_orders.sql' %}",
                    "useLegacySql": False,
                    "allowLargeResults": True,
                },
                "labels": {
                    "dag_id": "{{ dag.dag_id }}",
                    "task_id": "{{ task.task_id }}",
                    "execution_date": "{{ ds_nodash }}",
                    "instance_name": "{{ var.value.composer_env_name | default('composer') }}",
                    "run_id": "{{ dag_run.run_id | lower | replace(':', '') | replace('+', '') | replace('.', '') | replace('-', '') | replace('T', '') }}",
                },
            },
        )

        stage_products = BigQueryInsertJobOperator(
            task_id="stage_products",
            configuration={
                "query": {
                    "query": "{% include 'sql/stage_products.sql' %}",
                    "useLegacySql": False,
                    "allowLargeResults": True,
                },
                "labels": {
                    "dag_id": "{{ dag.dag_id }}",
                    "task_id": "{{ task.task_id }}",
                    "execution_date": "{{ ds_nodash }}",
                    "instance_name": "{{ var.value.composer_env_name | default('composer') }}",
                    "run_id": "{{ dag_run.run_id | lower | replace(':', '') | replace('+', '') | replace('.', '') | replace('-', '') | replace('T', '') }}",
                },
            },
        )

        # intra-group dependencies
        stage_customers >> stage_orders >> stage_products

    # ----- Task group: aggregation -----
    with TaskGroup(
        group_id="aggregation",
        tooltip="""Run aggregations in parallel""",
    ) as aggregation:
        agg_revenue = BigQueryInsertJobOperator(
            task_id="agg_revenue",
            configuration={
                "query": {
                    "query": "{% include 'sql/agg_revenue.sql' %}",
                    "useLegacySql": False,
                    "allowLargeResults": True,
                },
                "labels": {
                    "dag_id": "{{ dag.dag_id }}",
                    "task_id": "{{ task.task_id }}",
                    "execution_date": "{{ ds_nodash }}",
                    "instance_name": "{{ var.value.composer_env_name | default('composer') }}",
                    "run_id": "{{ dag_run.run_id | lower | replace(':', '') | replace('+', '') | replace('.', '') | replace('-', '') | replace('T', '') }}",
                },
            },
        )

        agg_inventory = BigQueryInsertJobOperator(
            task_id="agg_inventory",
            configuration={
                "query": {
                    "query": "{% include 'sql/agg_inventory.sql' %}",
                    "useLegacySql": False,
                    "allowLargeResults": True,
                },
                "labels": {
                    "dag_id": "{{ dag.dag_id }}",
                    "task_id": "{{ task.task_id }}",
                    "execution_date": "{{ ds_nodash }}",
                    "instance_name": "{{ var.value.composer_env_name | default('composer') }}",
                    "run_id": "{{ dag_run.run_id | lower | replace(':', '') | replace('+', '') | replace('.', '') | replace('-', '') | replace('T', '') }}",
                },
            },
        )

    done = EmptyOperator(task_id="done")

    # ----- Dependencies -----
    start >> staging
    staging >> aggregation
    aggregation >> done

Key things to notice:

TaskGroup renders as Python with TaskGroup(...) as name: blocks
• Intra-group dependencies (stage_customers >> stage_orders >> stage_products) stay inside the group block
• Cross-group dependencies (start >> staging >> aggregation >> done) reference group names directly
• Tasks within the aggregation group have no intra-group dependencies — they run in parallel
• FinOps labels are auto-injected into every BigQuery operator within groups too
schedule: null renders as schedule=None (manual-trigger only)