Airflow Xcom Exclusive Official

If you want tighter Airflow integration, implement a custom XCom backend (subclass XCom) that exposes a method claim_xcom(key, consumer_id) which performs atomic claim semantics in the chosen storage (DB or external store). Register the custom backend in airflow.cfg (xcom backend class path).

Pros:

Simple design:


To understand when XCom is appropriate, compare it to other Airflow features: airflow xcom exclusive

| Feature | Use Case | Persistence | | :--- | :--- | :--- | | XCom | Passing dynamic data between specific tasks within a DAG run. | Persists for the duration of the DAG run (usually cleaned up eventually). | | Variables | Storing static configuration or global settings (e.g., API keys, environment names). | Persists globally until manually deleted. | | External Storage | Moving large datasets (files, large DataFrames). | Persists until externally deleted. |

By default, Airflow tasks push and pull XComs via the metadata database (usually PostgreSQL or MySQL). A simple pattern is:

# Task A (Push)
def push_path(**context):
    file_path = f"/data/report_context['ds'].csv"
    context['ti'].xcom_push(key='report_path', value=file_path)
    return file_path

Pass exclusive keys to triggered DAGs:

trigger = TriggerDagRunOperator(
    task_id="trigger_child",
    trigger_dag_id="child_dag",
    conf="xcom_passthrough": " ti.xcom_pull(task_ids='parent_task', key='authorized_key') ",
)

In child DAG, exclusive mode ensures only keys passed via conf are accessible.


Problem: You enable exclusive mode but still store heavy objects in the default DB.
Solution: Use CustomXComBackend that serializes large objects to external storage (GCS, S3, Redis) and stores only a URI in the xcom table.

Example:

class S3XCom(BaseXCom):
    @staticmethod
    def serialize(value):
        if size_of(value) > 1_000_000:
            s3_key = upload_to_s3(value)
            return "__s3_uri": s3_key
        return value

Overview: Have producers LPUSH (or RPUSH) payloads to a Redis list and consumers use RPOP (or LPOP) to consume items, ensuring each item is removed once and processed by at most one consumer.

Benefits:

Integration:

Example (Python using redis-py):

r.rpush(key, json.dumps(payload))
item = r.rpop(key)  # None if empty; item is removed atomically