Callables

How DagSmith handles Python callbacks, PythonOperator callables, and scheduling functions — and where to place your code so imports work at runtime.

What Are Callables?

DagSmith uses dotted import paths in YAML to reference Python functions. At generation time, these paths are resolved into proper from ... import ... as ... statements in the generated DAG file. There are four categories:

CategoryYAML FieldWhere Used
Callbacks on_failure_callback, on_success_callback, on_retry_callback default_args (DAG-level) and individual tasks (task-level override)
Python callables python_callable PythonOperator, BranchPythonOperator
Scheduling functions execution_date_fn ExternalTaskSensor
SLA callbacks sla_miss_callback dag section

Dotted Path Format

Every callable must follow the pattern <package_path>.<module>.<function_name>. DagSmith splits on the last dot to separate the module path from the function name:

yaml
# Pattern: "package.subpackage.module.function_name"
#                |--- module path ---|  |-- fn --|

# Callbacks
on_failure_callback: "acme.data_engineering.callbacks.notify_failure"
on_retry_callback:   "acme.data_engineering.callbacks.log_retry"

# PythonOperator
python_callable: "acme.data_engineering.services.validation.validate_schema"

# ExternalTaskSensor
execution_date_fn: "acme.data_engineering.scheduling.lookback_one_day"

# SLA
sla_miss_callback: "acme.data_engineering.alerts.sla_breach_handler"

DagSmith generates an aliased import with a callable_ prefix for every callable to prevent name collisions with task variable names:

python
# YAML:  "acme.data_engineering.callbacks.notify_failure"
# Generates:
from acme.data_engineering.callbacks import notify_failure as callable_notify_failure

# YAML:  "acme.data_engineering.services.validation.validate_schema"
# Generates:
from acme.data_engineering.services.validation import validate_schema as callable_validate_schema

Why the alias? Without it, a function named validate would collide with a task variable also named validate. The callable_ prefix guarantees uniqueness in the generated DAG file.

Import Placement Rules

DagSmith places callable imports at different levels depending on where they are referenced:

UsageImport PlacementReason
DAG-level on_failure_callback, on_success_callback, on_retry_callback Top-level (before with DAG) Referenced in default_args dict, defined outside the DAG block
DAG-level sla_miss_callback Top-level (before with DAG) Referenced in the DAG() constructor
Task-level on_failure_callback, on_success_callback, on_retry_callback Deferred (inside with DAG) Referenced only within individual task constructors
PythonOperator.python_callable Deferred (inside with DAG) Referenced only within the operator constructor
ExternalTaskSensor.execution_date_fn Deferred (inside with DAG) Referenced only within the sensor constructor

Placing Callable Modules

For DagSmith-generated DAGs to run successfully, every dotted path must resolve to an importable Python module at Airflow runtime. This means the package root must be on the worker's PYTHONPATH.

Recommendation: Place your callable packages inside Airflow's plugins directory or DAGs directory, organized by team and domain using proper Python package structure. This keeps callable code versioned, discoverable, and cleanly separated from generated DAG files.

Airflow's import-friendly directories

DirectoryOn PYTHONPATH?Best For
$AIRFLOW_HOME/dags/ Yes (by default) DAG files + lightweight callable packages that deploy alongside DAGs
$AIRFLOW_HOME/plugins/ Yes (by default) Shared libraries, team packages, utilities — preferred for non-trivial callable code
Installed Python package Yes (via pip/uv) Mature, versioned libraries installed into the Airflow worker image

Recommended: team-based package structure in plugins

Organize callables by organization → team → domain → module, mirroring your company's internal structure. This maps directly to dotted import paths in your YAML specs.

bash
# Airflow home directory
$AIRFLOW_HOME/
  dags/
    daily_revenue_pipeline.py          # generated DAG
    etl_with_task_groups.py            # generated DAG
    sql/
      stage_transactions.sql
      transform_revenue.sql

  plugins/                             # Airflow auto-adds this to PYTHONPATH
    acme/                              # company-level namespace package
      __init__.py

      data_engineering/                # team package
        __init__.py

        callbacks/                     # shared callback functions
          __init__.py
          notifications.py             # Slack, PagerDuty, email alerts
          logging.py                   # structured logging callbacks

        services/                      # business logic callables
          __init__.py
          validation/                  # data validation sub-domain
            __init__.py
            schema.py                  # schema validation functions
            quality.py                 # data quality check functions
          enrichment/                  # data enrichment sub-domain
            __init__.py
            geo.py                     # geolocation lookups
            currency.py                # currency conversion

        scheduling/                    # custom scheduling logic
          __init__.py
          lookback.py                  # execution_date_fn implementations
          business_calendar.py         # skip holidays, weekends

        alerts/                        # SLA and critical alert handlers
          __init__.py
          sla.py                       # sla_miss_callback implementations
          pagerduty.py                 # PagerDuty integration

      platform_team/                   # another team's package
        __init__.py
        connectors/
          __init__.py
          teradata.py
          salesforce.py

How directory structure maps to YAML paths

The dotted path in YAML directly mirrors the filesystem structure under the plugins directory:

File Path (under plugins/)FunctionYAML Dotted Path
acme/data_engineering/callbacks/notifications.py notify_failure() "acme.data_engineering.callbacks.notifications.notify_failure"
acme/data_engineering/callbacks/logging.py log_retry() "acme.data_engineering.callbacks.logging.log_retry"
acme/data_engineering/services/validation/schema.py validate_schema() "acme.data_engineering.services.validation.schema.validate_schema"
acme/data_engineering/services/validation/quality.py check_row_count() "acme.data_engineering.services.validation.quality.check_row_count"
acme/data_engineering/services/enrichment/currency.py convert_to_usd() "acme.data_engineering.services.enrichment.currency.convert_to_usd"
acme/data_engineering/scheduling/lookback.py one_day_ago() "acme.data_engineering.scheduling.lookback.one_day_ago"
acme/data_engineering/alerts/sla.py sla_breach_handler() "acme.data_engineering.alerts.sla.sla_breach_handler"
acme/platform_team/connectors/teradata.py extract_daily() "acme.platform_team.connectors.teradata.extract_daily"

Critical: Every directory in the path must have an __init__.py file to be recognized as a Python package. Missing even one __init__.py causes ModuleNotFoundError at runtime.

Example Callable Implementations

Callback functions

python
# plugins/acme/data_engineering/callbacks/notifications.py
from __future__ import annotations

import logging

log = logging.getLogger(__name__)


def notify_failure(context: dict) -> None:
    """Send a Slack alert when a task fails."""
    ti = context["task_instance"]
    dag_id = context["dag"].dag_id
    task_id = ti.task_id
    execution_date = context["execution_date"]
    log_url = ti.log_url

    message = (
        f"*TASK FAILED*\n"
        f"DAG: `{dag_id}`\n"
        f"Task: `{task_id}`\n"
        f"Date: {execution_date}\n"
        f"Logs: {log_url}"
    )
    # Replace with your actual Slack/PagerDuty integration
    log.error(message)


def notify_success(context: dict) -> None:
    """Log task success for audit trail."""
    ti = context["task_instance"]
    log.info("SUCCESS: %s.%s", ti.dag_id, ti.task_id)


def log_retry(context: dict) -> None:
    """Log retry attempts with try number."""
    ti = context["task_instance"]
    log.warning(
        "RETRY #%d: %s.%s", ti.try_number, ti.dag_id, ti.task_id
    )

PythonOperator callable (data validation)

python
# plugins/acme/data_engineering/services/validation/schema.py
from __future__ import annotations

import logging

log = logging.getLogger(__name__)


def validate_schema(**kwargs) -> None:
    """Validate that required columns exist in the source table."""
    params = kwargs.get("params", {})
    project_id = params["project_id"]
    dataset = params["dataset"]
    table = params["table"]
    required_columns = params.get("required_columns", [])

    log.info(
        "Validating schema for %s.%s.%s", project_id, dataset, table
    )
    log.info("Required columns: %s", required_columns)

    # Your validation logic here — query BQ INFORMATION_SCHEMA,
    # compare columns, raise on mismatch
    # ...

    log.info("Schema validation passed")

Scheduling function (ExternalTaskSensor)

python
# plugins/acme/data_engineering/scheduling/lookback.py
from __future__ import annotations

from datetime import timedelta

import pendulum


def one_day_ago(execution_date: pendulum.DateTime, **kwargs) -> pendulum.DateTime:
    """Return the execution date minus one day."""
    return execution_date - timedelta(days=1)


def previous_business_day(execution_date: pendulum.DateTime, **kwargs) -> pendulum.DateTime:
    """Return the most recent business day (skip weekends)."""
    candidate = execution_date - timedelta(days=1)
    while candidate.day_of_week in (pendulum.SATURDAY, pendulum.SUNDAY):
        candidate = candidate - timedelta(days=1)
    return candidate

SLA miss callback

python
# plugins/acme/data_engineering/alerts/sla.py
from __future__ import annotations

import logging

log = logging.getLogger(__name__)


def sla_breach_handler(dag, task_list, blocking_task_list, slas, blocking_tis) -> None:
    """Called when any task in the DAG misses its SLA."""
    dag_id = dag.dag_id
    missed_tasks = [str(t) for t in task_list]
    blockers = [str(t) for t in blocking_task_list]

    log.critical(
        "SLA BREACH in DAG '%s'. Missed: %s. Blocked by: %s",
        dag_id,
        missed_tasks,
        blockers,
    )
    # Integrate with PagerDuty, OpsGenie, etc.

Full YAML Example with Callables

A complete spec demonstrating every callable type, using the team-based package structure:

yaml
metadata:
  title: "Revenue Pipeline with Full Callable Integration"
  owner: "data-engineering@acme.com"
  email: "data-engineering@acme.com"
  version: "2.1.0"
  jira: "DE-2045"
  developer_name: "revenue_pipeline"

dag:
  dag_id: "revenue_with_callables"
  description: "Daily revenue ETL with validation, alerting, and custom scheduling."
  schedule: "0 6 * * 1-5"
  start_date: "2026-01-01"
  timezone: "America/New_York"
  catchup: false
  dagrun_timeout: 7200
  tags:
    - "domain:finance"
    - "team:data-engineering"
  # DAG-level SLA callback (top-level import)
  sla_miss_callback: "acme.data_engineering.alerts.sla.sla_breach_handler"

gcp:
  project_id: "acme-analytics-prod"
  location: "US"

# DAG-level callbacks — applied to ALL tasks via default_args
# These generate TOP-LEVEL imports (before the `with DAG` block)
default_args:
  retries: 2
  retry_delay: 120
  email:
    - "data-engineering@acme.com"
  email_on_failure: true
  on_failure_callback: "acme.data_engineering.callbacks.notifications.notify_failure"
  on_success_callback: "acme.data_engineering.callbacks.notifications.notify_success"
  on_retry_callback: "acme.data_engineering.callbacks.notifications.log_retry"

tasks:
  - task_id: "start"
    operator: EmptyOperator

  # ExternalTaskSensor with custom scheduling function
  # execution_date_fn generates a DEFERRED import
  - task_id: "wait_for_upstream"
    operator: ExternalTaskSensor
    external_dag_id: "source_ingestion"
    external_task_id: "ingest_complete"
    timeout: 21600
    mode: "reschedule"
    poke_interval: 300
    execution_date_fn: "acme.data_engineering.scheduling.lookback.previous_business_day"

  # PythonOperator with team callable
  # python_callable generates a DEFERRED import
  - task_id: "validate_source_schema"
    operator: PythonOperator
    python_callable: "acme.data_engineering.services.validation.schema.validate_schema"
    op_kwargs:
      project_id: "acme-analytics-prod"
      dataset: "raw_finance"
      table: "daily_transactions"

  - task_id: "stage_transactions"
    operator: BigQueryInsertJobOperator
    sql: "sql/stage_transactions.sql"

  - task_id: "transform_revenue"
    operator: BigQueryInsertJobOperator
    sql: "sql/transform_revenue.sql"

  # PythonOperator for data quality
  - task_id: "check_row_counts"
    operator: PythonOperator
    python_callable: "acme.data_engineering.services.validation.quality.check_row_count"
    op_kwargs:
      project_id: "acme-analytics-prod"
      dataset: "analytics"
      table: "daily_revenue"
      min_rows: 1000

  - task_id: "load_final"
    operator: BigQueryInsertJobOperator
    sql: "sql/load_revenue_final.sql"
    retries: 3
    # Task-level callback OVERRIDE — only this task uses critical_failure
    # instead of the DAG-level notify_failure. Generates a DEFERRED import.
    on_failure_callback: "acme.data_engineering.alerts.sla.sla_breach_handler"

  # Currency conversion using enrichment service callable
  - task_id: "enrich_currency"
    operator: PythonOperator
    python_callable: "acme.data_engineering.services.enrichment.currency.convert_to_usd"
    op_kwargs:
      source_table: "analytics.daily_revenue"
      target_table: "analytics.daily_revenue_usd"

  - task_id: "end"
    operator: EmptyOperator

dependencies:
  - "start >> wait_for_upstream >> validate_source_schema"
  - "validate_source_schema >> stage_transactions >> transform_revenue"
  - "transform_revenue >> check_row_counts >> load_final"
  - "load_final >> enrich_currency >> end"

Generated import structure

The YAML above produces this import layout in the generated DAG:

python
from __future__ import annotations

from datetime import timedelta
from typing import Any, Final

import pendulum

from airflow import DAG

# --- Top-level imports: DAG-level callbacks (referenced in default_args) ---
from acme.data_engineering.callbacks.notifications import notify_failure as callable_notify_failure
from acme.data_engineering.callbacks.notifications import notify_success as callable_notify_success
from acme.data_engineering.callbacks.notifications import log_retry as callable_log_retry

# ...
# default_args = {
#     "on_failure_callback": callable_notify_failure,
#     "on_success_callback": callable_notify_success,
#     "on_retry_callback": callable_log_retry,
# }
# ...

with DAG(
    dag_id="revenue_with_callables",
    sla_miss_callback=callable_sla_breach_handler,
    # ...
) as dag:
    # --- Deferred imports: operators + task-level callables ---
    from airflow.operators.empty import EmptyOperator
    from airflow.operators.python import PythonOperator
    from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
    from airflow.sensors.external_task import ExternalTaskSensor
    from acme.data_engineering.alerts.sla import sla_breach_handler as callable_sla_breach_handler
    from acme.data_engineering.scheduling.lookback import previous_business_day as callable_previous_business_day
    from acme.data_engineering.services.enrichment.currency import convert_to_usd as callable_convert_to_usd
    from acme.data_engineering.services.validation.quality import check_row_count as callable_check_row_count
    from acme.data_engineering.services.validation.schema import validate_schema as callable_validate_schema

    # Tasks reference callables by their alias:
    #   python_callable=callable_validate_schema,
    #   python_callable=callable_check_row_count,
    #   execution_date_fn=callable_previous_business_day,
    #   on_failure_callback=callable_sla_breach_handler,  (task-level override)
    # ...

Deployment Guide by Platform

Google Cloud Composer

Composer syncs the GCS DAGs bucket to /home/airflow/gcs/dags/ and the plugins bucket to /home/airflow/gcs/plugins/. Both directories are on PYTHONPATH.

bash
# GCS bucket structure for Composer
gs://your-composer-bucket/
  dags/
    daily_revenue_pipeline.py      # generated DAGs
    sql/
      stage_transactions.sql       # SQL files

  plugins/
    acme/                          # team callable packages
      __init__.py
      data_engineering/
        __init__.py
        callbacks/
          __init__.py
          notifications.py
        services/
          __init__.py
          validation/
            __init__.py
            schema.py
            quality.py
        scheduling/
          __init__.py
          lookback.py

Docker / Kubernetes (Helm chart)

Mount or install callable packages in the worker image. The Airflow Helm chart supports mounting extra volumes to /opt/airflow/plugins/.

bash
# Option A: mount via Docker volumes
/opt/airflow/
  dags/
    daily_revenue_pipeline.py
  plugins/
    acme/
      data_engineering/
        ...

# Option B: install as a Python package in the Dockerfile
# Dockerfile
FROM apache/airflow:2.10.0
COPY acme_data_engineering-1.0.0-py3-none-any.whl /tmp/
RUN pip install /tmp/acme_data_engineering-1.0.0-py3-none-any.whl

Local development

bash
# Ensure the callable package is importable
export PYTHONPATH="${PYTHONPATH}:/path/to/your/plugins"

# Or symlink into your local Airflow's dags/plugins directory
ln -s /path/to/acme ~/airflow/plugins/acme

Troubleshooting

ModuleNotFoundError at Airflow runtime

This means the callable module is not importable from the worker process. Check:

• Every directory in the path has an __init__.py file
• The module is in a directory on the worker's PYTHONPATH (dags/ or plugins/)
• The dotted path in YAML exactly matches the directory/file/function structure
• The function name is spelled correctly and exists in the target module
• If using Composer, the file has been synced to the GCS plugins bucket

ValueError: Invalid callable path at generation time

DagSmith requires at least one dot in the path (i.e. module.function). A bare function name like "validate_params" is rejected. Use the full dotted path: "acme.data_engineering.services.validation.schema.validate_params".

Quick import test: Verify your callable is importable before generating DAGs:

bash
# Test from the Airflow plugins directory
cd $AIRFLOW_HOME/plugins
python -c "from acme.data_engineering.callbacks.notifications import notify_failure; print('OK')"

# Or from your project root with PYTHONPATH set
PYTHONPATH=./plugins python -c "from acme.data_engineering.services.validation.schema import validate_schema; print('OK')"