Operators & Sensors
All 16 built-in operators and sensors, plus the generic plugin system.
Standard Operators (Airflow Core)
EmptyOperator
operator airflow.operators.emptyNo-op placeholder / pipeline marker. Useful for start/end nodes and join points. No additional fields beyond the common base fields.
- task_id: "start"
operator: EmptyOperator
BashOperator
operator airflow.operators.bashExecute a bash command or script. The command is Jinja-templatable.
false
"utf-8"
- task_id: "run_shell_script"
operator: BashOperator
bash_command: "echo 'Hello from {{ ds }}'"
env:
MY_VAR: "value"
PythonOperator
operator airflow.operators.pythonRun a Python callable. The callable is specified as a dotted import path and automatically imported in the generated DAG.
"mypackage.validators.validate_params")
{}
()
None
[".sql"]
true
- task_id: "validate_params"
operator: PythonOperator
python_callable: "mypackage.validators.validate_params"
op_kwargs:
env: "{{ params.env }}"
start_dt: "{{ params.start_dt }}"
BranchPythonOperator
operator airflow.operators.pythonBranch based on callable return value. Inherits all PythonOperator fields. The callable must return a task_id or list[task_id].
- task_id: "choose_branch"
operator: BranchPythonOperator
python_callable: "mypackage.branching.pick_branch"
op_kwargs:
env: "prod"
TriggerDagRunOperator
operator airflow.operators.trigger_dagrunTrigger another DAG run.
{}
false
false
60.0
["success"]
["failed"]
- task_id: "trigger_downstream"
operator: TriggerDagRunOperator
trigger_dag_id: "downstream_pipeline"
conf:
triggered_by: "my_dag"
run_date: "{{ ds }}"
wait_for_completion: false
reset_dag_run: true
ExternalTaskSensor
sensor airflow.sensors.external_taskWait for a task in another DAG. Supports three scheduling modes (mutually exclusive):
- execution_delta — fixed timedelta offset alias: execution_delta_seconds
- execution_date_fn — dotted path to custom date-mapping callable
- (neither) — pure logical-date equality
external_task_ids and external_task_group_id)
["success"]
true
# Mode 1: Fixed delta (upstream ran 1 hour before this DAG)
- task_id: "wait_for_upstream"
operator: ExternalTaskSensor
external_dag_id: "upstream_pipeline"
external_task_id: "final_step"
mode: "reschedule"
poke_interval: 300
timeout: 21600
allowed_states: ["success"]
execution_delta: 3600
# Mode 2: Custom callable
- task_id: "wait_custom_date"
operator: ExternalTaskSensor
external_dag_id: "reference_pipeline"
external_task_id: "metric"
mode: "reschedule"
poke_interval: 600
timeout: 43200
allowed_states: ["success"]
execution_date_fn: "mypackage.scheduling.get_reference_date"
# Mode 3: Same logical date (no delta or fn)
- task_id: "wait_sibling"
operator: ExternalTaskSensor
external_dag_id: "sibling_pipeline"
external_task_id: "done"
mode: "reschedule"
poke_interval: 300
timeout: 21600
allowed_states: ["success"]
BigQuery Operators & Sensors
BigQueryInsertJobOperator
operator airflow.providers.google.cloud.operators.bigqueryRun SQL via the BigQuery Jobs API. Supports SQL file paths and inline SQL. FinOps labels are auto-injected by default.
WRITE_TRUNCATE | WRITE_APPEND | WRITE_EMPTY
CREATE_IF_NEEDED | CREATE_NEVER
{}
true
SQL rendering
sql: "path/file.sql"→ rendered as Jinja include:{% include 'path/file.sql' %}sql: "SELECT 1"→ inline one-linersql: |(multi-line) → assigned to a_sql_<task_id>variable
# SQL file reference
- task_id: "stage_data"
operator: BigQueryInsertJobOperator
sql: "sql/stage_data.sql"
params:
project_id: "my-gcp-project-001"
src_dataset: "warehouse_tables"
labels:
bundle: "daily_load"
module: "pipeline"
# Inline multi-line SQL
- task_id: "transform_data"
operator: BigQueryInsertJobOperator
sql: |
SELECT
account_id,
SUM(amount) AS total_amount
FROM `project.dataset.transactions`
WHERE process_date = DATE('{{ ds }}')
GROUP BY account_id
retries: 2
BigQueryCheckOperator
operator airflow.providers.google.cloud.operators.bigqueryAssert a SQL query returns a truthy first cell. No additional fields beyond sql.
- task_id: "data_quality_check"
operator: BigQueryCheckOperator
sql: |
SELECT CASE
WHEN COUNT(1) > 0 THEN 1
ELSE 0
END AS has_data
FROM `project.dataset.transactions`
WHERE process_date = DATE('{{ ds }}')
BigQueryValueCheckOperator
operator airflow.providers.google.cloud.operators.bigqueryAssert a SQL scalar matches an expected value.
- task_id: "row_count_check"
operator: BigQueryValueCheckOperator
sql: "sql/count_rows.sql"
pass_value: 1000
tolerance: 0.1
BigQueryTableExistenceSensor
sensor airflow.providers.google.cloud.sensors.bigqueryWait for a BigQuery table to exist. Inherits all sensor base fields.
- task_id: "wait_for_table"
operator: BigQueryTableExistenceSensor
project_id: "my-gcp-project-001"
dataset_id: "warehouse_tables"
table_id: "transactions"
timeout: 3600
poke_interval: 120
mode: "reschedule"
GCS Operators & Sensors
GCSToBigQueryOperator
operator airflow.providers.google.cloud.transfers.gcs_to_bigqueryLoad GCS files into BigQuery.
[]
NEWLINE_DELIMITED_JSON, CSV, AVRO, PARQUET, etc.
false
WRITE_TRUNCATE
CREATE_IF_NEEDED
- task_id: "load_from_gcs"
operator: GCSToBigQueryOperator
bucket: "my-data-bucket"
source_objects:
- "data/{{ ds_nodash }}/*.json"
destination_project_dataset_table: "my-project.my_dataset.raw_events"
source_format: "NEWLINE_DELIMITED_JSON"
autodetect: true
write_disposition: "WRITE_TRUNCATE"
GCSToGCSOperator
operator airflow.providers.google.cloud.transfers.gcs_to_gcsCopy or move objects between GCS buckets.
false
- task_id: "archive_files"
operator: GCSToGCSOperator
source_bucket: "my-data-bucket"
source_object: "data/{{ ds_nodash }}/"
destination_bucket: "my-archive-bucket"
destination_object: "archive/{{ ds_nodash }}/"
move_object: false
GCSDeleteObjectsOperator
operator airflow.providers.google.cloud.operators.gcsDelete objects from a GCS bucket.
- task_id: "cleanup_staging"
operator: GCSDeleteObjectsOperator
bucket_name: "my-data-bucket"
prefix: "staging/{{ ds_nodash }}/"
GCSObjectsWithPrefixExistenceSensor
sensor airflow.providers.google.cloud.sensors.gcsWait for objects with a given prefix to exist in GCS.
300. alias: poke_interval
- task_id: "wait_for_files"
operator: GCSObjectsWithPrefixExistenceSensor
bucket: "my-data-bucket"
prefix: "incoming/{{ ds_nodash }}/"
mode: "reschedule"
poll_interval: 300
timeout: 3600
TaskGroup
TaskGroup
util airflow.utils.task_groupA named group of tasks rendered as with TaskGroup(...): in the DAG. Supports nested groups (groups within groups). Intra-group dependencies use the same >> / << syntax.
"TaskGroup"
""
tasks:
- operator: TaskGroup
group_id: "staging_group"
tooltip: "Stage source tables into BigQuery"
tasks:
- task_id: "stage_orders"
operator: BigQueryInsertJobOperator
sql: "sql/stage_orders.sql"
- task_id: "stage_customers"
operator: BigQueryInsertJobOperator
sql: "sql/stage_customers.sql"
dependencies:
- "stage_orders >> stage_customers" # intra-group only
dependencies:
- "start >> staging_group >> transform" # cross-group
Generic Plugin System
Register any Airflow operator or sensor in src/dagsmith/configs/airflow_registry.yaml and use it immediately — no Python code changes required.
Registering a Custom Operator
# src/dagsmith/configs/airflow_registry.yaml
airflow_class_registry:
custom:
SlackWebhookOperator:
module: airflow.providers.slack.operators.slack_webhook
class: SlackWebhookOperator
type: operator
Then use it in any YAML spec — all extra keys are passed through as operator kwargs:
- task_id: "notify_slack"
operator: SlackWebhookOperator
slack_webhook_conn_id: "slack_default"
message: "Pipeline {{ dag.dag_id }} completed for {{ ds }}"
channel: "#data-alerts"
Registering a Custom Sensor
Set type: sensor to get validated sensor fields (poke_interval, timeout, mode, soft_fail, etc.):
# Registry entry
airflow_class_registry:
custom:
HttpSensor:
module: airflow.providers.http.sensors.http
class: HttpSensor
type: sensor
# Usage in spec
- task_id: "wait_for_api"
operator: HttpSensor
timeout: 3600 # validated: must be > 0
poke_interval: 120 # validated: must be > 0
mode: "reschedule" # validated: "poke" or "reschedule"
endpoint: "/health" # passed through as kwarg
External Registry File
Use the DAGSMITH_EXTRA_REGISTRY environment variable to maintain a separate registry file:
export DAGSMITH_EXTRA_REGISTRY=/path/to/my_registry.yaml
dagsmith generate my_dag.yaml
Trade-offs
| Built-in Operators | Generic Operators | Generic Sensors | |
|---|---|---|---|
| Field validation | Full Pydantic schema | None (runtime errors) | Sensor fields only |
| Registry entry | Not required | Required | Required |
| Python changes | None | None | None |