Callables
How DagSmith handles Python callbacks, PythonOperator callables, and scheduling functions — and where to place your code so imports work at runtime.
What Are Callables?
DagSmith uses dotted import paths in YAML to reference Python functions. At generation time, these paths are resolved into proper from ... import ... as ... statements in the generated DAG file. There are four categories:
| Category | YAML Field | Where Used |
|---|---|---|
| Callbacks | on_failure_callback, on_success_callback, on_retry_callback |
default_args (DAG-level) and individual tasks (task-level override) |
| Python callables | python_callable |
PythonOperator, BranchPythonOperator |
| Scheduling functions | execution_date_fn |
ExternalTaskSensor |
| SLA callbacks | sla_miss_callback |
dag section |
Dotted Path Format
Every callable must follow the pattern <package_path>.<module>.<function_name>. DagSmith splits on the last dot to separate the module path from the function name:
# Pattern: "package.subpackage.module.function_name"
# |--- module path ---| |-- fn --|
# Callbacks
on_failure_callback: "acme.data_engineering.callbacks.notify_failure"
on_retry_callback: "acme.data_engineering.callbacks.log_retry"
# PythonOperator
python_callable: "acme.data_engineering.services.validation.validate_schema"
# ExternalTaskSensor
execution_date_fn: "acme.data_engineering.scheduling.lookback_one_day"
# SLA
sla_miss_callback: "acme.data_engineering.alerts.sla_breach_handler"
DagSmith generates an aliased import with a callable_ prefix for every callable to prevent name collisions with task variable names:
# YAML: "acme.data_engineering.callbacks.notify_failure"
# Generates:
from acme.data_engineering.callbacks import notify_failure as callable_notify_failure
# YAML: "acme.data_engineering.services.validation.validate_schema"
# Generates:
from acme.data_engineering.services.validation import validate_schema as callable_validate_schema
Why the alias? Without it, a function named validate would collide with a task variable also named validate. The callable_ prefix guarantees uniqueness in the generated DAG file.
Import Placement Rules
DagSmith places callable imports at different levels depending on where they are referenced:
| Usage | Import Placement | Reason |
|---|---|---|
DAG-level on_failure_callback, on_success_callback, on_retry_callback |
Top-level (before with DAG) |
Referenced in default_args dict, defined outside the DAG block |
DAG-level sla_miss_callback |
Top-level (before with DAG) |
Referenced in the DAG() constructor |
Task-level on_failure_callback, on_success_callback, on_retry_callback |
Deferred (inside with DAG) |
Referenced only within individual task constructors |
PythonOperator.python_callable |
Deferred (inside with DAG) |
Referenced only within the operator constructor |
ExternalTaskSensor.execution_date_fn |
Deferred (inside with DAG) |
Referenced only within the sensor constructor |
Placing Callable Modules
For DagSmith-generated DAGs to run successfully, every dotted path must resolve to an importable Python module at Airflow runtime. This means the package root must be on the worker's PYTHONPATH.
Recommendation: Place your callable packages inside Airflow's plugins directory or DAGs directory, organized by team and domain using proper Python package structure. This keeps callable code versioned, discoverable, and cleanly separated from generated DAG files.
Airflow's import-friendly directories
| Directory | On PYTHONPATH? | Best For |
|---|---|---|
$AIRFLOW_HOME/dags/ |
Yes (by default) | DAG files + lightweight callable packages that deploy alongside DAGs |
$AIRFLOW_HOME/plugins/ |
Yes (by default) | Shared libraries, team packages, utilities — preferred for non-trivial callable code |
| Installed Python package | Yes (via pip/uv) | Mature, versioned libraries installed into the Airflow worker image |
Recommended: team-based package structure in plugins
Organize callables by organization → team → domain → module, mirroring your company's internal structure. This maps directly to dotted import paths in your YAML specs.
# Airflow home directory
$AIRFLOW_HOME/
dags/
daily_revenue_pipeline.py # generated DAG
etl_with_task_groups.py # generated DAG
sql/
stage_transactions.sql
transform_revenue.sql
plugins/ # Airflow auto-adds this to PYTHONPATH
acme/ # company-level namespace package
__init__.py
data_engineering/ # team package
__init__.py
callbacks/ # shared callback functions
__init__.py
notifications.py # Slack, PagerDuty, email alerts
logging.py # structured logging callbacks
services/ # business logic callables
__init__.py
validation/ # data validation sub-domain
__init__.py
schema.py # schema validation functions
quality.py # data quality check functions
enrichment/ # data enrichment sub-domain
__init__.py
geo.py # geolocation lookups
currency.py # currency conversion
scheduling/ # custom scheduling logic
__init__.py
lookback.py # execution_date_fn implementations
business_calendar.py # skip holidays, weekends
alerts/ # SLA and critical alert handlers
__init__.py
sla.py # sla_miss_callback implementations
pagerduty.py # PagerDuty integration
platform_team/ # another team's package
__init__.py
connectors/
__init__.py
teradata.py
salesforce.py
How directory structure maps to YAML paths
The dotted path in YAML directly mirrors the filesystem structure under the plugins directory:
File Path (under plugins/) | Function | YAML Dotted Path |
|---|---|---|
acme/data_engineering/callbacks/notifications.py |
notify_failure() |
"acme.data_engineering.callbacks.notifications.notify_failure" |
acme/data_engineering/callbacks/logging.py |
log_retry() |
"acme.data_engineering.callbacks.logging.log_retry" |
acme/data_engineering/services/validation/schema.py |
validate_schema() |
"acme.data_engineering.services.validation.schema.validate_schema" |
acme/data_engineering/services/validation/quality.py |
check_row_count() |
"acme.data_engineering.services.validation.quality.check_row_count" |
acme/data_engineering/services/enrichment/currency.py |
convert_to_usd() |
"acme.data_engineering.services.enrichment.currency.convert_to_usd" |
acme/data_engineering/scheduling/lookback.py |
one_day_ago() |
"acme.data_engineering.scheduling.lookback.one_day_ago" |
acme/data_engineering/alerts/sla.py |
sla_breach_handler() |
"acme.data_engineering.alerts.sla.sla_breach_handler" |
acme/platform_team/connectors/teradata.py |
extract_daily() |
"acme.platform_team.connectors.teradata.extract_daily" |
Critical: Every directory in the path must have an __init__.py file to be recognized as a Python package. Missing even one __init__.py causes ModuleNotFoundError at runtime.
Example Callable Implementations
Callback functions
# plugins/acme/data_engineering/callbacks/notifications.py
from __future__ import annotations
import logging
log = logging.getLogger(__name__)
def notify_failure(context: dict) -> None:
"""Send a Slack alert when a task fails."""
ti = context["task_instance"]
dag_id = context["dag"].dag_id
task_id = ti.task_id
execution_date = context["execution_date"]
log_url = ti.log_url
message = (
f"*TASK FAILED*\n"
f"DAG: `{dag_id}`\n"
f"Task: `{task_id}`\n"
f"Date: {execution_date}\n"
f"Logs: {log_url}"
)
# Replace with your actual Slack/PagerDuty integration
log.error(message)
def notify_success(context: dict) -> None:
"""Log task success for audit trail."""
ti = context["task_instance"]
log.info("SUCCESS: %s.%s", ti.dag_id, ti.task_id)
def log_retry(context: dict) -> None:
"""Log retry attempts with try number."""
ti = context["task_instance"]
log.warning(
"RETRY #%d: %s.%s", ti.try_number, ti.dag_id, ti.task_id
)
PythonOperator callable (data validation)
# plugins/acme/data_engineering/services/validation/schema.py
from __future__ import annotations
import logging
log = logging.getLogger(__name__)
def validate_schema(**kwargs) -> None:
"""Validate that required columns exist in the source table."""
params = kwargs.get("params", {})
project_id = params["project_id"]
dataset = params["dataset"]
table = params["table"]
required_columns = params.get("required_columns", [])
log.info(
"Validating schema for %s.%s.%s", project_id, dataset, table
)
log.info("Required columns: %s", required_columns)
# Your validation logic here — query BQ INFORMATION_SCHEMA,
# compare columns, raise on mismatch
# ...
log.info("Schema validation passed")
Scheduling function (ExternalTaskSensor)
# plugins/acme/data_engineering/scheduling/lookback.py
from __future__ import annotations
from datetime import timedelta
import pendulum
def one_day_ago(execution_date: pendulum.DateTime, **kwargs) -> pendulum.DateTime:
"""Return the execution date minus one day."""
return execution_date - timedelta(days=1)
def previous_business_day(execution_date: pendulum.DateTime, **kwargs) -> pendulum.DateTime:
"""Return the most recent business day (skip weekends)."""
candidate = execution_date - timedelta(days=1)
while candidate.day_of_week in (pendulum.SATURDAY, pendulum.SUNDAY):
candidate = candidate - timedelta(days=1)
return candidate
SLA miss callback
# plugins/acme/data_engineering/alerts/sla.py
from __future__ import annotations
import logging
log = logging.getLogger(__name__)
def sla_breach_handler(dag, task_list, blocking_task_list, slas, blocking_tis) -> None:
"""Called when any task in the DAG misses its SLA."""
dag_id = dag.dag_id
missed_tasks = [str(t) for t in task_list]
blockers = [str(t) for t in blocking_task_list]
log.critical(
"SLA BREACH in DAG '%s'. Missed: %s. Blocked by: %s",
dag_id,
missed_tasks,
blockers,
)
# Integrate with PagerDuty, OpsGenie, etc.
Full YAML Example with Callables
A complete spec demonstrating every callable type, using the team-based package structure:
metadata:
title: "Revenue Pipeline with Full Callable Integration"
owner: "data-engineering@acme.com"
email: "data-engineering@acme.com"
version: "2.1.0"
jira: "DE-2045"
developer_name: "revenue_pipeline"
dag:
dag_id: "revenue_with_callables"
description: "Daily revenue ETL with validation, alerting, and custom scheduling."
schedule: "0 6 * * 1-5"
start_date: "2026-01-01"
timezone: "America/New_York"
catchup: false
dagrun_timeout: 7200
tags:
- "domain:finance"
- "team:data-engineering"
# DAG-level SLA callback (top-level import)
sla_miss_callback: "acme.data_engineering.alerts.sla.sla_breach_handler"
gcp:
project_id: "acme-analytics-prod"
location: "US"
# DAG-level callbacks — applied to ALL tasks via default_args
# These generate TOP-LEVEL imports (before the `with DAG` block)
default_args:
retries: 2
retry_delay: 120
email:
- "data-engineering@acme.com"
email_on_failure: true
on_failure_callback: "acme.data_engineering.callbacks.notifications.notify_failure"
on_success_callback: "acme.data_engineering.callbacks.notifications.notify_success"
on_retry_callback: "acme.data_engineering.callbacks.notifications.log_retry"
tasks:
- task_id: "start"
operator: EmptyOperator
# ExternalTaskSensor with custom scheduling function
# execution_date_fn generates a DEFERRED import
- task_id: "wait_for_upstream"
operator: ExternalTaskSensor
external_dag_id: "source_ingestion"
external_task_id: "ingest_complete"
timeout: 21600
mode: "reschedule"
poke_interval: 300
execution_date_fn: "acme.data_engineering.scheduling.lookback.previous_business_day"
# PythonOperator with team callable
# python_callable generates a DEFERRED import
- task_id: "validate_source_schema"
operator: PythonOperator
python_callable: "acme.data_engineering.services.validation.schema.validate_schema"
op_kwargs:
project_id: "acme-analytics-prod"
dataset: "raw_finance"
table: "daily_transactions"
- task_id: "stage_transactions"
operator: BigQueryInsertJobOperator
sql: "sql/stage_transactions.sql"
- task_id: "transform_revenue"
operator: BigQueryInsertJobOperator
sql: "sql/transform_revenue.sql"
# PythonOperator for data quality
- task_id: "check_row_counts"
operator: PythonOperator
python_callable: "acme.data_engineering.services.validation.quality.check_row_count"
op_kwargs:
project_id: "acme-analytics-prod"
dataset: "analytics"
table: "daily_revenue"
min_rows: 1000
- task_id: "load_final"
operator: BigQueryInsertJobOperator
sql: "sql/load_revenue_final.sql"
retries: 3
# Task-level callback OVERRIDE — only this task uses critical_failure
# instead of the DAG-level notify_failure. Generates a DEFERRED import.
on_failure_callback: "acme.data_engineering.alerts.sla.sla_breach_handler"
# Currency conversion using enrichment service callable
- task_id: "enrich_currency"
operator: PythonOperator
python_callable: "acme.data_engineering.services.enrichment.currency.convert_to_usd"
op_kwargs:
source_table: "analytics.daily_revenue"
target_table: "analytics.daily_revenue_usd"
- task_id: "end"
operator: EmptyOperator
dependencies:
- "start >> wait_for_upstream >> validate_source_schema"
- "validate_source_schema >> stage_transactions >> transform_revenue"
- "transform_revenue >> check_row_counts >> load_final"
- "load_final >> enrich_currency >> end"
Generated import structure
The YAML above produces this import layout in the generated DAG:
from __future__ import annotations
from datetime import timedelta
from typing import Any, Final
import pendulum
from airflow import DAG
# --- Top-level imports: DAG-level callbacks (referenced in default_args) ---
from acme.data_engineering.callbacks.notifications import notify_failure as callable_notify_failure
from acme.data_engineering.callbacks.notifications import notify_success as callable_notify_success
from acme.data_engineering.callbacks.notifications import log_retry as callable_log_retry
# ...
# default_args = {
# "on_failure_callback": callable_notify_failure,
# "on_success_callback": callable_notify_success,
# "on_retry_callback": callable_log_retry,
# }
# ...
with DAG(
dag_id="revenue_with_callables",
sla_miss_callback=callable_sla_breach_handler,
# ...
) as dag:
# --- Deferred imports: operators + task-level callables ---
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.sensors.external_task import ExternalTaskSensor
from acme.data_engineering.alerts.sla import sla_breach_handler as callable_sla_breach_handler
from acme.data_engineering.scheduling.lookback import previous_business_day as callable_previous_business_day
from acme.data_engineering.services.enrichment.currency import convert_to_usd as callable_convert_to_usd
from acme.data_engineering.services.validation.quality import check_row_count as callable_check_row_count
from acme.data_engineering.services.validation.schema import validate_schema as callable_validate_schema
# Tasks reference callables by their alias:
# python_callable=callable_validate_schema,
# python_callable=callable_check_row_count,
# execution_date_fn=callable_previous_business_day,
# on_failure_callback=callable_sla_breach_handler, (task-level override)
# ...
Deployment Guide by Platform
Google Cloud Composer
Composer syncs the GCS DAGs bucket to /home/airflow/gcs/dags/ and the plugins bucket to /home/airflow/gcs/plugins/. Both directories are on PYTHONPATH.
# GCS bucket structure for Composer
gs://your-composer-bucket/
dags/
daily_revenue_pipeline.py # generated DAGs
sql/
stage_transactions.sql # SQL files
plugins/
acme/ # team callable packages
__init__.py
data_engineering/
__init__.py
callbacks/
__init__.py
notifications.py
services/
__init__.py
validation/
__init__.py
schema.py
quality.py
scheduling/
__init__.py
lookback.py
Docker / Kubernetes (Helm chart)
Mount or install callable packages in the worker image. The Airflow Helm chart supports mounting extra volumes to /opt/airflow/plugins/.
# Option A: mount via Docker volumes
/opt/airflow/
dags/
daily_revenue_pipeline.py
plugins/
acme/
data_engineering/
...
# Option B: install as a Python package in the Dockerfile
# Dockerfile
FROM apache/airflow:2.10.0
COPY acme_data_engineering-1.0.0-py3-none-any.whl /tmp/
RUN pip install /tmp/acme_data_engineering-1.0.0-py3-none-any.whl
Local development
# Ensure the callable package is importable
export PYTHONPATH="${PYTHONPATH}:/path/to/your/plugins"
# Or symlink into your local Airflow's dags/plugins directory
ln -s /path/to/acme ~/airflow/plugins/acme
Troubleshooting
ModuleNotFoundError at Airflow runtime
This means the callable module is not importable from the worker process. Check:
• Every directory in the path has an __init__.py file
• The module is in a directory on the worker's PYTHONPATH (dags/ or plugins/)
• The dotted path in YAML exactly matches the directory/file/function structure
• The function name is spelled correctly and exists in the target module
• If using Composer, the file has been synced to the GCS plugins bucket
ValueError: Invalid callable path at generation time
DagSmith requires at least one dot in the path (i.e. module.function). A bare function name like "validate_params" is rejected. Use the full dotted path: "acme.data_engineering.services.validation.schema.validate_params".
Quick import test: Verify your callable is importable before generating DAGs:
# Test from the Airflow plugins directory
cd $AIRFLOW_HOME/plugins
python -c "from acme.data_engineering.callbacks.notifications import notify_failure; print('OK')"
# Or from your project root with PYTHONPATH set
PYTHONPATH=./plugins python -c "from acme.data_engineering.services.validation.schema import validate_schema; print('OK')"