Airflow Xcom Exclusive Official
If you want tighter Airflow integration, implement a custom XCom backend (subclass XCom) that exposes a method claim_xcom(key, consumer_id) which performs atomic claim semantics in the chosen storage (DB or external store). Register the custom backend in airflow.cfg (xcom backend class path).
Pros:
Simple design:
To understand when XCom is appropriate, compare it to other Airflow features: airflow xcom exclusive
| Feature | Use Case | Persistence | | :--- | :--- | :--- | | XCom | Passing dynamic data between specific tasks within a DAG run. | Persists for the duration of the DAG run (usually cleaned up eventually). | | Variables | Storing static configuration or global settings (e.g., API keys, environment names). | Persists globally until manually deleted. | | External Storage | Moving large datasets (files, large DataFrames). | Persists until externally deleted. |
By default, Airflow tasks push and pull XComs via the metadata database (usually PostgreSQL or MySQL). A simple pattern is:
# Task A (Push)
def push_path(**context):
file_path = f"/data/report_context['ds'].csv"
context['ti'].xcom_push(key='report_path', value=file_path)
return file_path
Pass exclusive keys to triggered DAGs:
trigger = TriggerDagRunOperator(
task_id="trigger_child",
trigger_dag_id="child_dag",
conf="xcom_passthrough": " ti.xcom_pull(task_ids='parent_task', key='authorized_key') ",
)
In child DAG, exclusive mode ensures only keys passed via conf are accessible.
Problem: You enable exclusive mode but still store heavy objects in the default DB.
Solution: Use CustomXComBackend that serializes large objects to external storage (GCS, S3, Redis) and stores only a URI in the xcom table.
Example:
class S3XCom(BaseXCom):
@staticmethod
def serialize(value):
if size_of(value) > 1_000_000:
s3_key = upload_to_s3(value)
return "__s3_uri": s3_key
return value
Overview: Have producers LPUSH (or RPUSH) payloads to a Redis list and consumers use RPOP (or LPOP) to consume items, ensuring each item is removed once and processed by at most one consumer.
Benefits:
Integration:
Example (Python using redis-py):
r.rpush(key, json.dumps(payload))
item = r.rpop(key) # None if empty; item is removed atomically