Airflow Advanced Features

Suryakant Pandey
3 min readMar 6, 2020

Airflow is an open-source solution to run batch jobs. It comes with a nice UI and lots of useful operators that can be readily used. I have used Airflow for scheduling backend pipelines, running alert scripts, and updating Databases. I would like to share some lesser-known but powerful tricks that I learned in Airflow.

  1. Conditionally Run a Task: How do you skip tasks based on runtime conditions. Eg: Trigger an email task only when the previous task generated valid output.
  2. Using XCOM for communication across Dags: XCOM is used traditionally to communicate only between tasks of a single dag. However, it is also possible to consume XCOMs from other dags as well as XCOMs of the same dag from previous runs. Using this feature unlocks possibilities of communication between multiple dags as well as with previous runs of the same dag. Eg: Use output path of a task of Dag1 as an input to Dag2

We will discuss both these points in detail below using a sample example.

Conditionally Run a Task:

When we want to decide whether to run downstream tasks or not based on the output of upstream tasks the following approach would be useful.

Let’s say we have a DAG that compares the version of a package on two machines and send an alert email if there is a version mismatch. This can be done by writing two tasks: PythonOperator( which would compare package versions on the boxes) and EmailOperator(to send the alert email). However, we want alert emails only when there is a version mismatch. So, the EmailOperator task should run based on the runtime condition of a dag.

This is where ShortCircuitOperator can be useful. With ShortCircuitOperator if the python_callable check_consistency returns false, send_email_task won’t be triggered.

An Example to demonstrate this :

Skipping email tasks when previous task short circuits

Using XCOM for communication across Dags

XCOM from other dags or previous runs of the same dag can be pulled by passing additional parameters when using xcom pull. The default values for these parameters prevent reading xcoms from other dags or from previous runs.

XCOM pull can take the following parameters(Source: Airflow Github)

def xcom_pull(
self,
task_ids: Optional[Union[str, Iterable[str]]] = None,
dag_id: Optional[str] = None,
key: str = XCOM_RETURN_KEY,
include_prior_dates: bool = False) -> Any:

If we pass task_ids and dag_id parameters with values of the other dag, we can read the XCOMs pushed by another dag. We also need to pass include_prior_dates=true in that case. We can also read XCOMs of the same dag from the previous run by passing include_prior_dates=true.

This fetches the latest XCOM(for the given key)pushed by the callee dag before the execution_date of caller dag.

Sample Code to push xcom from one dag and read in other:

1st dag to pull xcoms from another dag, 2nd dag to push xcoms
Left dag is reading XCOM pushed by right dag

--

--