Operators & Sensors

All 16 built-in operators and sensors, plus the generic plugin system.

Standard Operators (Airflow Core)

EmptyOperator

operator airflow.operators.empty

No-op placeholder / pipeline marker. Useful for start/end nodes and join points. No additional fields beyond the common base fields.

yaml
- task_id: "start"
  operator: EmptyOperator

BashOperator

operator airflow.operators.bash

Execute a bash command or script. The command is Jinja-templatable.

bash_command required Jinja-templatable bash command
env optional Dict of environment variables for the process
append_env optional Append system env to custom env. Default: false
output_encoding optional Default: "utf-8"
skip_on_exit_code optional Exit code(s) that skip the task. Accepts int or list.
cwd optional Working directory
yaml
- task_id: "run_shell_script"
  operator: BashOperator
  bash_command: "echo 'Hello from {{ ds }}'"
  env:
    MY_VAR: "value"

PythonOperator

operator airflow.operators.python

Run a Python callable. The callable is specified as a dotted import path and automatically imported in the generated DAG.

python_callable required Dotted import path (e.g. "mypackage.validators.validate_params")
op_kwargs optional Dict of keyword arguments passed to the callable. Default: {}
op_args optional List of positional arguments. Default: ()
templates_dict optional Jinja-rendered key-value pairs. Default: None
templates_exts optional Template file extensions. Default: [".sql"]
show_return_value_in_logs optional Default: true
yaml
- task_id: "validate_params"
  operator: PythonOperator
  python_callable: "mypackage.validators.validate_params"
  op_kwargs:
    env: "{{ params.env }}"
    start_dt: "{{ params.start_dt }}"

BranchPythonOperator

operator airflow.operators.python

Branch based on callable return value. Inherits all PythonOperator fields. The callable must return a task_id or list[task_id].

yaml
- task_id: "choose_branch"
  operator: BranchPythonOperator
  python_callable: "mypackage.branching.pick_branch"
  op_kwargs:
    env: "prod"

TriggerDagRunOperator

operator airflow.operators.trigger_dagrun

Trigger another DAG run.

trigger_dag_id required DAG ID to trigger
conf optional Config dict passed to the triggered DAG. Default: {}
wait_for_completion optional Block until triggered DAG completes. Default: false
reset_dag_run optional Clear & re-trigger if run exists. Default: false
execution_date optional Jinja-templatable logical date. alias: logical_date
poke_interval optional Polling interval when waiting. Default: 60.0
allowed_states optional Terminal success states. Default: ["success"]
failed_states optional Terminal failure states. Default: ["failed"]
yaml
- task_id: "trigger_downstream"
  operator: TriggerDagRunOperator
  trigger_dag_id: "downstream_pipeline"
  conf:
    triggered_by: "my_dag"
    run_date: "{{ ds }}"
  wait_for_completion: false
  reset_dag_run: true

ExternalTaskSensor

sensor airflow.sensors.external_task

Wait for a task in another DAG. Supports three scheduling modes (mutually exclusive):

  1. execution_delta — fixed timedelta offset alias: execution_delta_seconds
  2. execution_date_fn — dotted path to custom date-mapping callable
  3. (neither) — pure logical-date equality
external_dag_id required External DAG to monitor
external_task_id Single task target (mutually exclusive with external_task_ids and external_task_group_id)
external_task_ids List of task targets
external_task_group_id Task group target
allowed_states Default: ["success"]
execution_delta Seconds offset from upstream run. alias: execution_delta_seconds
execution_date_fn Dotted path to callable returning a date
check_existence Validate external DAG/task exists. Default: true
yaml
# Mode 1: Fixed delta (upstream ran 1 hour before this DAG)
- task_id: "wait_for_upstream"
  operator: ExternalTaskSensor
  external_dag_id: "upstream_pipeline"
  external_task_id: "final_step"
  mode: "reschedule"
  poke_interval: 300
  timeout: 21600
  allowed_states: ["success"]
  execution_delta: 3600

# Mode 2: Custom callable
- task_id: "wait_custom_date"
  operator: ExternalTaskSensor
  external_dag_id: "reference_pipeline"
  external_task_id: "metric"
  mode: "reschedule"
  poke_interval: 600
  timeout: 43200
  allowed_states: ["success"]
  execution_date_fn: "mypackage.scheduling.get_reference_date"

# Mode 3: Same logical date (no delta or fn)
- task_id: "wait_sibling"
  operator: ExternalTaskSensor
  external_dag_id: "sibling_pipeline"
  external_task_id: "done"
  mode: "reschedule"
  poke_interval: 300
  timeout: 21600
  allowed_states: ["success"]

BigQuery Operators & Sensors

BigQueryInsertJobOperator

operator airflow.providers.google.cloud.operators.bigquery

Run SQL via the BigQuery Jobs API. Supports SQL file paths and inline SQL. FinOps labels are auto-injected by default.

sql required SQL file path or inline SQL. alias: query
params optional Jinja template params
write_disposition optional WRITE_TRUNCATE | WRITE_APPEND | WRITE_EMPTY
create_disposition optional CREATE_IF_NEEDED | CREATE_NEVER
destination_dataset_table optional Fully-qualified destination table
labels optional Custom labels merged with FinOps labels. Default: {}
include_finops_labels optional Auto-inject FinOps labels. Default: true
maximum_bytes_billed optional Bytes billing limit

SQL rendering

  • sql: "path/file.sql" → rendered as Jinja include: {% include 'path/file.sql' %}
  • sql: "SELECT 1" → inline one-liner
  • sql: | (multi-line) → assigned to a _sql_<task_id> variable
yaml
# SQL file reference
- task_id: "stage_data"
  operator: BigQueryInsertJobOperator
  sql: "sql/stage_data.sql"
  params:
    project_id: "my-gcp-project-001"
    src_dataset: "warehouse_tables"
  labels:
    bundle: "daily_load"
    module: "pipeline"

# Inline multi-line SQL
- task_id: "transform_data"
  operator: BigQueryInsertJobOperator
  sql: |
    SELECT
      account_id,
      SUM(amount) AS total_amount
    FROM `project.dataset.transactions`
    WHERE process_date = DATE('{{ ds }}')
    GROUP BY account_id
  retries: 2

BigQueryCheckOperator

operator airflow.providers.google.cloud.operators.bigquery

Assert a SQL query returns a truthy first cell. No additional fields beyond sql.

yaml
- task_id: "data_quality_check"
  operator: BigQueryCheckOperator
  sql: |
    SELECT CASE
      WHEN COUNT(1) > 0 THEN 1
      ELSE 0
    END AS has_data
    FROM `project.dataset.transactions`
    WHERE process_date = DATE('{{ ds }}')

BigQueryValueCheckOperator

operator airflow.providers.google.cloud.operators.bigquery

Assert a SQL scalar matches an expected value.

sql required SQL returning a single scalar value
pass_value required Expected value
tolerance optional Fractional deviation allowed
yaml
- task_id: "row_count_check"
  operator: BigQueryValueCheckOperator
  sql: "sql/count_rows.sql"
  pass_value: 1000
  tolerance: 0.1

BigQueryTableExistenceSensor

sensor airflow.providers.google.cloud.sensors.bigquery

Wait for a BigQuery table to exist. Inherits all sensor base fields.

project_id required GCP project containing the table
dataset_id required BigQuery dataset ID
table_id required BigQuery table ID
yaml
- task_id: "wait_for_table"
  operator: BigQueryTableExistenceSensor
  project_id: "my-gcp-project-001"
  dataset_id: "warehouse_tables"
  table_id: "transactions"
  timeout: 3600
  poke_interval: 120
  mode: "reschedule"

GCS Operators & Sensors

GCSToBigQueryOperator

operator airflow.providers.google.cloud.transfers.gcs_to_bigquery

Load GCS files into BigQuery.

bucket required GCS bucket name
source_objects optional List of GCS object paths/patterns. Default: []
destination_project_dataset_table required BQ destination table (fully-qualified)
source_format optional NEWLINE_DELIMITED_JSON, CSV, AVRO, PARQUET, etc.
autodetect optional Auto-detect schema. Default: false
write_disposition optional Default: WRITE_TRUNCATE
create_disposition optional Default: CREATE_IF_NEEDED
schema_fields optional Explicit BQ schema fields list
yaml
- task_id: "load_from_gcs"
  operator: GCSToBigQueryOperator
  bucket: "my-data-bucket"
  source_objects:
    - "data/{{ ds_nodash }}/*.json"
  destination_project_dataset_table: "my-project.my_dataset.raw_events"
  source_format: "NEWLINE_DELIMITED_JSON"
  autodetect: true
  write_disposition: "WRITE_TRUNCATE"

GCSToGCSOperator

operator airflow.providers.google.cloud.transfers.gcs_to_gcs

Copy or move objects between GCS buckets.

source_bucket required Source GCS bucket
source_object required Source GCS object or prefix
destination_bucket optional Defaults to source_bucket
destination_object optional Defaults to source_object
move_object optional Delete source after copy. Default: false
yaml
- task_id: "archive_files"
  operator: GCSToGCSOperator
  source_bucket: "my-data-bucket"
  source_object: "data/{{ ds_nodash }}/"
  destination_bucket: "my-archive-bucket"
  destination_object: "archive/{{ ds_nodash }}/"
  move_object: false

GCSDeleteObjectsOperator

operator airflow.providers.google.cloud.operators.gcs

Delete objects from a GCS bucket.

bucket_name required GCS bucket name
prefix optional Prefix filter for deletion
yaml
- task_id: "cleanup_staging"
  operator: GCSDeleteObjectsOperator
  bucket_name: "my-data-bucket"
  prefix: "staging/{{ ds_nodash }}/"

GCSObjectsWithPrefixExistenceSensor

sensor airflow.providers.google.cloud.sensors.gcs

Wait for objects with a given prefix to exist in GCS.

bucket required GCS bucket name
prefix required GCS object prefix to check
poll_interval optional Seconds between pokes. Default: 300. alias: poke_interval
yaml
- task_id: "wait_for_files"
  operator: GCSObjectsWithPrefixExistenceSensor
  bucket: "my-data-bucket"
  prefix: "incoming/{{ ds_nodash }}/"
  mode: "reschedule"
  poll_interval: 300
  timeout: 3600

TaskGroup

TaskGroup

util airflow.utils.task_group

A named group of tasks rendered as with TaskGroup(...): in the DAG. Supports nested groups (groups within groups). Intra-group dependencies use the same >> / << syntax.

operator Must be "TaskGroup"
group_id required Unique group identifier
tooltip optional Default: ""
tasks List of tasks (and nested groups) within this group
dependencies Intra-group dependencies (references task_id/group_id within this group only)
yaml
tasks:
  - operator: TaskGroup
    group_id: "staging_group"
    tooltip: "Stage source tables into BigQuery"
    tasks:
      - task_id: "stage_orders"
        operator: BigQueryInsertJobOperator
        sql: "sql/stage_orders.sql"

      - task_id: "stage_customers"
        operator: BigQueryInsertJobOperator
        sql: "sql/stage_customers.sql"

    dependencies:
      - "stage_orders >> stage_customers"  # intra-group only

dependencies:
  - "start >> staging_group >> transform"  # cross-group

Generic Plugin System

Register any Airflow operator or sensor in src/dagsmith/configs/airflow_registry.yaml and use it immediately — no Python code changes required.

Registering a Custom Operator

yaml
# src/dagsmith/configs/airflow_registry.yaml
airflow_class_registry:
  custom:
    SlackWebhookOperator:
      module: airflow.providers.slack.operators.slack_webhook
      class: SlackWebhookOperator
      type: operator

Then use it in any YAML spec — all extra keys are passed through as operator kwargs:

yaml
- task_id: "notify_slack"
  operator: SlackWebhookOperator
  slack_webhook_conn_id: "slack_default"
  message: "Pipeline {{ dag.dag_id }} completed for {{ ds }}"
  channel: "#data-alerts"

Registering a Custom Sensor

Set type: sensor to get validated sensor fields (poke_interval, timeout, mode, soft_fail, etc.):

yaml
# Registry entry
airflow_class_registry:
  custom:
    HttpSensor:
      module: airflow.providers.http.sensors.http
      class: HttpSensor
      type: sensor

# Usage in spec
- task_id: "wait_for_api"
  operator: HttpSensor
  timeout: 3600       # validated: must be > 0
  poke_interval: 120  # validated: must be > 0
  mode: "reschedule"  # validated: "poke" or "reschedule"
  endpoint: "/health"  # passed through as kwarg

External Registry File

Use the DAGSMITH_EXTRA_REGISTRY environment variable to maintain a separate registry file:

bash
export DAGSMITH_EXTRA_REGISTRY=/path/to/my_registry.yaml
dagsmith generate my_dag.yaml

Trade-offs

Built-in OperatorsGeneric OperatorsGeneric Sensors
Field validationFull Pydantic schemaNone (runtime errors)Sensor fields only
Registry entryNot requiredRequiredRequired
Python changesNoneNoneNone