Best Practices

FinOps labels, field aliases, architecture, and recommendations for production pipelines.

FinOps Labels

Every BigQueryInsertJobOperator task automatically gets FinOps labels injected from src/dagsmith/configs/airflow_registry.yaml. These labels enable cost tracking and attribution in BigQuery.

LabelValue (Jinja template)Purpose
dag_id{{ dag.dag_id }}Identify which DAG ran the job
task_id{{ task.task_id }}Identify which task ran the job
execution_date{{ ds_nodash }}Execution date for cost grouping
instance_name{{ var.value.composer_env_name | default('composer') }}Composer environment name
run_idCleaned, lowercase dag_run.run_idUnique run identifier

Controlling FinOps labels

yaml
# Default: FinOps labels are auto-injected
- task_id: "load_data"
  operator: BigQueryInsertJobOperator
  sql: "sql/load.sql"
  include_finops_labels: true          # default, can be omitted
  labels:                              # custom labels merged with FinOps labels
    bundle: "daily_load"
    team: "data-engineering"

# Opt out of FinOps labels for a specific task
- task_id: "ad_hoc_query"
  operator: BigQueryInsertJobOperator
  sql: "sql/ad_hoc.sql"
  include_finops_labels: false         # no FinOps labels injected

Field Aliases — Quick Reference

Several YAML fields accept alternative names for convenience. Both forms are equivalent — use whichever reads better in context.

Canonical FieldAliasSectionNotes
retry_delayretry_delay_secondsdefault_args, task-levelSeconds between retries
slasla_secondsdefault_argsSLA timeout in seconds
scheduleschedule_intervaldagCron expression or preset
gcp_conn_idgoogle_cloud_conn_idgcpAirflow GCP connection ID
execution_deltaexecution_delta_secondsExternalTaskSensorFixed timedelta offset
execution_datelogical_dateTriggerDagRunOperatorAirflow 2.x → 3.x naming
poke_intervalpoll_intervalGCSObjectsWithPrefixExistenceSensorSeconds between pokes

Architecture Overview

DagSmith follows a clear pipeline from YAML input to formatted Python output:

Pipeline
YAML Spec File
    |
    v
[1] Loader (loader.py)
    - Read YAML file
    - Expand ${VAR__...__VAR} variables
    - Parse with PyYAML
    - Validate via YamlDagSpec (Pydantic)
    |
    v
[2] Registry (registry/core.py)
    - Load airflow_registry.yaml
    - Map operator names to (module, class) tuples
    - Resolve aliases and conflicts
    |
    v
[3] Code Generator (code_generator.py)
    - Pre-scan: collect imports + SQL variable names
    - Render: header, imports, config, default_args, DAG block
    - Dispatch tasks to type-specific renderers
    |
    v
[4] Post-Processing
    - ruff check --fix (remove unused imports)
    - ruff format (consistent style)
    |
    v
Generated .py DAG File

Key Design Patterns

Discriminated Unions
TaskOrGroupSpec uses Pydantic's Discriminator to dispatch task types based on the operator field. Known operators get dedicated specs; unknown operators route to generic specs via registry lookup.
Spec + Renderer Pairs
Each operator has a Pydantic spec class and a corresponding render_* function. This separation keeps validation and code generation cleanly decoupled.
Import Strategy
Top-level: always-needed imports (future, timedelta, DAG) + conditional callbacks/params. Deferred: operator/sensor imports inside the with DAG block, sorted and deduplicated.
Registry System
YAML-driven at src/dagsmith/configs/airflow_registry.yaml. Supports standard, third-party, and custom origins with alias conflict resolution. Extensible via DAGSMITH_EXTRA_REGISTRY env var.

YAML Authoring Best Practices

Naming Conventions

Structure

Validation

Sensors

BigQuery

Security

Callables

DagSmith uses dotted import paths to reference Python callables (callbacks, python_callable, execution_date_fn, sla_miss_callback). For the complete guide on path format, import placement rules, team-based package structure, code examples, deployment by platform, and troubleshooting, see the dedicated page:

Callables Guide — Where to place callable modules, how to structure them by team/domain (e.g. acme.data_engineering.services.validation.schema.validate_schema), and how DagSmith generates aliased imports.

Custom Operator/Sensor Management

Recommendation: Use the DAGSMITH_EXTRA_REGISTRY environment variable to manage custom operators and sensors independently from the bundled registry.

Instead of editing src/dagsmith/configs/airflow_registry.yaml directly (which creates merge conflicts on upgrades), maintain your team's custom operators in a separate file:

yaml
# my_team_registry.yaml
# Keep this file in your project repo or shared config location.
airflow_class_registry:
  custom:
    SlackWebhookOperator:
      module: airflow.providers.slack.operators.slack_webhook
      class: SlackWebhookOperator
      type: operator
    HttpSensor:
      module: airflow.providers.http.sensors.http
      class: HttpSensor
      type: sensor
    S3KeySensor:
      module: airflow.providers.amazon.aws.sensors.s3
      class: S3KeySensor
      type: sensor
    TeradataToGCSOperator:
      module: myproject.operators.teradata
      class: TeradataToGCSOperator
      type: operator

Then point DagSmith to it before generating:

bash
# Set once per shell session, or add to .bashrc / .zshrc / CI config
export DAGSMITH_EXTRA_REGISTRY=/path/to/my_team_registry.yaml

# Generate as usual - custom operators are automatically available
dagsmith generate specs/
dagsmith list --origin custom

Why this approach?

Adding a New Operator

There are two paths to adding a new operator:

Option A: Generic Plugin (zero code) — Recommended

Register it in a separate registry file (via DAGSMITH_EXTRA_REGISTRY) or in src/dagsmith/configs/airflow_registry.yaml and use immediately. No field-level validation, but no Python changes needed.

Option B: Built-in Spec (full validation)

For operators that need field-level validation, follow these steps:

  1. Create a spec class (inherits BaseTaskSpec or BaseSensorOperatorSpec) + render_* function in src/dagsmith/schemas/<category>/
  2. Re-export from src/dagsmith/schemas/<category>/__init__.py
  3. Add to TaskSpec and TaskOrGroupSpec unions in src/dagsmith/schemas/__init__.py
  4. Add a match/case arm in DagCodeGenerator._render_task()
  5. Register the class in src/dagsmith/configs/airflow_registry.yaml under the appropriate section
  6. Add tests in tests/dagsmith/schemas/<category>/