Best Practices
FinOps labels, field aliases, architecture, and recommendations for production pipelines.
FinOps Labels
Every BigQueryInsertJobOperator task automatically gets FinOps labels injected from src/dagsmith/configs/airflow_registry.yaml. These labels enable cost tracking and attribution in BigQuery.
| Label | Value (Jinja template) | Purpose |
|---|---|---|
dag_id | {{ dag.dag_id }} | Identify which DAG ran the job |
task_id | {{ task.task_id }} | Identify which task ran the job |
execution_date | {{ ds_nodash }} | Execution date for cost grouping |
instance_name | {{ var.value.composer_env_name | default('composer') }} | Composer environment name |
run_id | Cleaned, lowercase dag_run.run_id | Unique run identifier |
Controlling FinOps labels
# Default: FinOps labels are auto-injected
- task_id: "load_data"
operator: BigQueryInsertJobOperator
sql: "sql/load.sql"
include_finops_labels: true # default, can be omitted
labels: # custom labels merged with FinOps labels
bundle: "daily_load"
team: "data-engineering"
# Opt out of FinOps labels for a specific task
- task_id: "ad_hoc_query"
operator: BigQueryInsertJobOperator
sql: "sql/ad_hoc.sql"
include_finops_labels: false # no FinOps labels injected
Field Aliases — Quick Reference
Several YAML fields accept alternative names for convenience. Both forms are equivalent — use whichever reads better in context.
| Canonical Field | Alias | Section | Notes |
|---|---|---|---|
retry_delay | retry_delay_seconds | default_args, task-level | Seconds between retries |
sla | sla_seconds | default_args | SLA timeout in seconds |
schedule | schedule_interval | dag | Cron expression or preset |
gcp_conn_id | google_cloud_conn_id | gcp | Airflow GCP connection ID |
execution_delta | execution_delta_seconds | ExternalTaskSensor | Fixed timedelta offset |
execution_date | logical_date | TriggerDagRunOperator | Airflow 2.x → 3.x naming |
poke_interval | poll_interval | GCSObjectsWithPrefixExistenceSensor | Seconds between pokes |
Architecture Overview
DagSmith follows a clear pipeline from YAML input to formatted Python output:
YAML Spec File
|
v
[1] Loader (loader.py)
- Read YAML file
- Expand ${VAR__...__VAR} variables
- Parse with PyYAML
- Validate via YamlDagSpec (Pydantic)
|
v
[2] Registry (registry/core.py)
- Load airflow_registry.yaml
- Map operator names to (module, class) tuples
- Resolve aliases and conflicts
|
v
[3] Code Generator (code_generator.py)
- Pre-scan: collect imports + SQL variable names
- Render: header, imports, config, default_args, DAG block
- Dispatch tasks to type-specific renderers
|
v
[4] Post-Processing
- ruff check --fix (remove unused imports)
- ruff format (consistent style)
|
v
Generated .py DAG File
Key Design Patterns
TaskOrGroupSpec uses Pydantic's Discriminator to dispatch task types based on the operator field. Known operators get dedicated specs; unknown operators route to generic specs via registry lookup.
render_* function. This separation keeps validation and code generation cleanly decoupled.
with DAG block, sorted and deduplicated.
src/dagsmith/configs/airflow_registry.yaml. Supports standard, third-party, and custom origins with alias conflict resolution. Extensible via DAGSMITH_EXTRA_REGISTRY env var.
YAML Authoring Best Practices
Naming Conventions
- Use snake_case for
dag_idandtask_idvalues - Use descriptive, action-oriented task IDs:
stage_orders,transform_data,wait_for_upstream - Prefix sensor tasks with
wait_for_orcheck_for clarity - Variables must follow
VAR__UPPER_NAME__VARpattern strictly
Structure
- Follow the conventional section order: variables → configurations → metadata → dag → gcp → default_args → user_defined_macros → tasks → dependencies
- Group related tasks into
TaskGroupblocks for organization - Keep SQL in separate
.sqlfiles for complex queries; use inline SQL only for simple one-liners - Use variables for values repeated across multiple tasks (project_id, dataset, etc.)
Validation
- Run
dagsmith validate --strictin CI to catch issues early - Always fill in
metadata.titleandmetadata.jira(strict mode flags "N/A") - Set
retries ≥ 1for production DAGs (strict mode warns onretries: 0) - Define dependencies — isolated tasks (no dependency chains) trigger a strict-mode warning
Sensors
- Prefer
mode: "reschedule"over"poke"for long-running waits to free worker slots - Always set an explicit
timeout— it is required and has no default - Consider
soft_fail: truewhen downstream tasks should run even if the sensor times out - Use
exponential_backoff: trueto reduce load on external systems during extended waits
BigQuery
- Keep
include_finops_labels: true(the default) for cost tracking - Add custom
labelsfor additional attribution (bundle, team, module) - Use
write_disposition: "WRITE_TRUNCATE"for idempotent loads - Set
maximum_bytes_billedto prevent runaway queries
Security
- Never put credentials, tokens, or API keys in YAML specs
- Use Airflow Variables, environment variables, or GCP Secret Manager for secrets
- Use GCP connection IDs (
gcp_conn_id) instead of service account key files - Use
impersonation_chainfor least-privilege access patterns
Callables
DagSmith uses dotted import paths to reference Python callables (callbacks, python_callable, execution_date_fn, sla_miss_callback). For the complete guide on path format, import placement rules, team-based package structure, code examples, deployment by platform, and troubleshooting, see the dedicated page:
Callables Guide — Where to place callable modules, how to structure them by team/domain (e.g. acme.data_engineering.services.validation.schema.validate_schema), and how DagSmith generates aliased imports.
Custom Operator/Sensor Management
Recommendation: Use the DAGSMITH_EXTRA_REGISTRY environment variable to manage custom operators and sensors independently from the bundled registry.
Instead of editing src/dagsmith/configs/airflow_registry.yaml directly (which creates merge conflicts on upgrades), maintain your team's custom operators in a separate file:
# my_team_registry.yaml
# Keep this file in your project repo or shared config location.
airflow_class_registry:
custom:
SlackWebhookOperator:
module: airflow.providers.slack.operators.slack_webhook
class: SlackWebhookOperator
type: operator
HttpSensor:
module: airflow.providers.http.sensors.http
class: HttpSensor
type: sensor
S3KeySensor:
module: airflow.providers.amazon.aws.sensors.s3
class: S3KeySensor
type: sensor
TeradataToGCSOperator:
module: myproject.operators.teradata
class: TeradataToGCSOperator
type: operator
Then point DagSmith to it before generating:
# Set once per shell session, or add to .bashrc / .zshrc / CI config
export DAGSMITH_EXTRA_REGISTRY=/path/to/my_team_registry.yaml
# Generate as usual - custom operators are automatically available
dagsmith generate specs/
dagsmith list --origin custom
Why this approach?
- No merge conflicts — the bundled
airflow_registry.yamlstays untouched during upgrades - Team autonomy — each team can maintain their own registry file with operators specific to their stack
- CI-friendly — set the env var in your CI pipeline config and all builds pick it up
- Composable — entries from the extra registry merge into the
customsection alongside any existing custom entries
Adding a New Operator
There are two paths to adding a new operator:
Option A: Generic Plugin (zero code) — Recommended
Register it in a separate registry file (via DAGSMITH_EXTRA_REGISTRY) or in src/dagsmith/configs/airflow_registry.yaml and use immediately. No field-level validation, but no Python changes needed.
Option B: Built-in Spec (full validation)
For operators that need field-level validation, follow these steps:
- Create a spec class (inherits
BaseTaskSpecorBaseSensorOperatorSpec) +render_*function insrc/dagsmith/schemas/<category>/ - Re-export from
src/dagsmith/schemas/<category>/__init__.py - Add to
TaskSpecandTaskOrGroupSpecunions insrc/dagsmith/schemas/__init__.py - Add a
match/casearm inDagCodeGenerator._render_task() - Register the class in
src/dagsmith/configs/airflow_registry.yamlunder the appropriate section - Add tests in
tests/dagsmith/schemas/<category>/