Building Your First DAG with Apache Airflow
Apache Airflow is a powerful platform for orchestrating complex workflows. After learning the Fundamentals and Installing Airflow with Docker, it’s time to dive into one of its most essential features – the Directed Acyclic Graph (DAG). In this article, we’ll explore what a DAG is, break down its structure, and build a complete example DAG that demonstrates an end-to-end ETL (Extract, Transform, Load) process.
The Anatomy of a DAG
Let’s break down the components that make up a DAG in Apache Airflow:
Default Arguments (default_args): These are key-value pairs that define common parameters for tasks. They include settings such as the owner, retry behavior, start date, and email notifications. Centralizing these settings ensures consistency across all tasks in the DAG.
DAG Object: The DAG object is defined by its ID, a description, the default arguments, and the schedule interval. It acts as a container for all the tasks.
Operators and Tasks: Tasks are the individual units of work. Operators (e.g., PythonOperator, BashOperator) encapsulate the logic to execute a particular task. Each task is associated with a unique task ID.
Task Dependencies: You establish the order in which tasks run by setting dependencies. This can be done using bitshift operators (e.g., task1 >> task2) or methods like set_downstream() and set_upstream().
Scheduling: The schedule_interval defines how often your DAG runs. It can be set to a cron expression, a timedelta, or even None if you prefer manual triggering.
Step 1: Set Up Your Environment
Before you begin, ensure Apache Airflow is installed and running. If you’re using Docker, you likely launched Airflow via a docker-compose.yaml file, which starts the scheduler, webserver, and other core components. To add your workflows, place DAG files in the mounted dags/ directory – this allows Airflow to automatically detect and execute them.
If you’re new to Airflow or unsure how to structure your project, the Astro CLI simplifies setup. This open-source tool initializes a fully configured Airflow environment with one command:
This generates all necessary files and even includes an example DAG to help you start experimenting immediately. Read Create an Astra project for more details on the project structure.
Step 2: Create Your DAG File
Create a new Python file (cocktail_apil_dag.py) in your dags/ folder. This file will contain all the logic for your DAG.
Step 3: Import Required Modules
At the beginning of the file, we import the necessary modules. These include core Airflow classes, operators for interacting with PostgreSQL and HTTP endpoints, as well as helper libraries like json, pandas for data normalization, and datetime for scheduling.
DAG: The central object that orchestrates the workflow.
Operators and Sensors: Specialized classes to perform tasks such as running SQL commands, making HTTP calls, and executing Python functions.
PostgresHook: Provides an interface to interact with a PostgreSQL database.
json_normalize: Used to convert JSON data into a flat table, ideal for processing API responses.
Step 4: Defining Helper Functions
Two helper functions are defined outside the DAG context. These functions will later be used by PythonOperators to process and store the data.
a. Data Processing Function
b. Data Storage Function
Step 5: Define Default Arguments and Initialize the DAG
Define the default_args dictionary to include common settings for all tasks. This helps manage parameters such as the owner, start date, retries, and retry delay.
6. Creating the DAG and Its Tasks
The DAG is defined using a context manager (with DAG(...) as dag:), which ensures that all tasks declared within the block are automatically associated with this DAG.
a. Creating the PostgreSQL Table
b. Checking API Availability
c. Extracting Data from the API
d. Processing the Extracted Data
e. Storing the Processed Data
f. Establishing Task Dependencies
This chain guarantees that each step completes before the next begins.
Note: Before running the DAG, ensure that the following Airflow connections are properly configured:
Connection ID: cocktail_api; Type: HTTP; Host: https://www.thecocktaildb.com/
Connection ID: postgres; Type: Postgres; Host: Postgres (within the Docker container); Login: airflow; Port: 5432
You can configure these connections in the Airflow UI under Admin > Connections.
Testing Your DAG Locally
Before deploying your DAG, it’s crucial to test it locally to ensure that all tasks execute correctly and dependencies are properly configured. Apache Airflow provides several ways to test and debug DAGs before running them in production.
Validate the DAG Syntax
Before running the DAG, check for syntax errors and misconfigurations by listing the available DAGs:
Trigger Individual Tasks for Testing
You can test individual tasks separately to debug them before running the entire DAG. The following command allows you to run a specific task and view its output:
After running all these tasks and validating they worked, run a query against the postgres DB and see if your table exists and have stored some cocktail data.
Run the Entire DAG Manually
To simulate an actual DAG execution, trigger it manually using:
Check the Task Logs in the Airflow UI
For a more interactive debugging experience, open the Airflow web UI (http://localhost:8080) and:
Navigate to DAGs and select cocktail_api_dag.
Click on Graph View or Tree View to visualize task dependencies.
Click on an individual task and go to the Logs tab to inspect execution details.
Disclaimer
It’s important to note that Apache Airflow is not a data processing framework but an orchestration tool designed to manage and schedule workflows. In this DAG, we performed some data processing within an Airflow task to create a complete, real-world example for learning purposes. However, in a production environment, data processing is typically handled by specialized frameworks such as Apache Spark outside of Airflow. Airflow’s primary role is to coordinate tasks and manage dependencies, not to perform heavy data transformations. This example was intentionally simplified to focus on DAG structure without introducing too many external components.
Conclusion
This detailed example demonstrates how to build a robust, real-world DAG in Apache Airflow. We started by creating helper functions for data processing and storage, then moved on to define our tasks using a mix of PostgreSQL operators, HTTP sensors/operators, and Python operators. By setting clear task dependencies, the DAG ensures a smooth, sequential execution of the workflow – from setting up the database table to ingesting data from an API and finally storing processed results.
With this example as a foundation, you can further customize your DAGs to suit various data workflows. Happy DAG-building and orchestrating your data pipelines!