Skip to main content

Phase 17 — Apache Airflow — Data Pipeline Orchestration

Apache Airflow is the standard tool for scheduling and orchestrating data pipelines. Workflows are defined as Python DAGs (Directed Acyclic Graphs) — each node is a task, edges define execution order and dependencies.


When to Use Airflow vs Temporal vs n8n

ToolBest For
n8nEvent-driven API automations, quick integrations
TemporalLong-running business workflows, retries, sagas
AirflowScheduled data pipelines, ETL, batch jobs, ML workflows

On this cluster, Airflow primarily handles data and ML workloads — scheduled batch jobs, training pipelines, data ingestion — complementing Temporal and n8n.


Core Concepts

DAG → A workflow definition (Python file)
Task → A single unit of work (run SQL, call API, train model)
Operator → Pre-built task types (PythonOperator, BashOperator, KubernetesPodOperator)
Schedule → Cron expression or interval
XCom → Pass data between tasks

Deploy Airflow via Helm

helm repo add apache-airflow https://airflow.apache.org

helm install airflow apache-airflow/airflow \
--namespace airflow \
--create-namespace \
--set executor=KubernetesExecutor \
--set dags.gitSync.enabled=true \
--set dags.gitSync.repo="https://gitlab.yourdomain.com/platform/airflow-dags.git" \
--set dags.gitSync.branch=main \
--set data.metadataConnection.host=postgres-svc \
--set data.metadataConnection.db=airflow \
--set redis.enabled=false \
--set postgresql.enabled=false

KubernetesExecutor runs each task as a separate pod — scales to cluster capacity, no idle workers.

gitSync pulls DAGs directly from your GitLab repo — push a DAG, it appears in Airflow automatically.


Access Airflow UI

kubectl port-forward svc/airflow-webserver -n airflow 8080:8080

Open: http://localhost:8080

Default credentials: admin / admin


Example DAG — Nightly Data Ingestion

# dags/nightly_ingest.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator

def extract_data(**context):
import requests
data = requests.get("https://api.example.com/data").json()
context["ti"].xcom_push(key="raw_data", value=data)
return len(data)

def transform_data(**context):
raw = context["ti"].xcom_pull(key="raw_data")
transformed = [
{"id": r["id"], "value": r["value"] * 1.2}
for r in raw if r["value"] > 0
]
context["ti"].xcom_push(key="transformed", value=transformed)

with DAG(
"nightly_ingest",
schedule_interval="0 2 * * *", # 02:00 UTC every night
start_date=datetime(2026, 1, 1),
catchup=False,
default_args={"retries": 2, "retry_delay": timedelta(minutes=5)},
) as dag:

extract = PythonOperator(
task_id="extract",
python_callable=extract_data,
)

transform = PythonOperator(
task_id="transform",
python_callable=transform_data,
)

load = PostgresOperator(
task_id="load",
postgres_conn_id="postgres_cluster",
sql="INSERT INTO data_table VALUES (%s, %s)",
)

extract >> transform >> load

Example DAG — ML Training Pipeline

# dags/ml_training.py
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from datetime import datetime

with DAG("ml_training", schedule_interval="@weekly",
start_date=datetime(2026, 1, 1)) as dag:

prepare_data = KubernetesPodOperator(
task_id="prepare_data",
name="data-prep",
namespace="airflow",
image="harbor.local/ml/data-prep:latest",
cmds=["python", "prepare.py"],
)

train_model = KubernetesPodOperator(
task_id="train_model",
name="training",
namespace="airflow",
image="harbor.local/ml/trainer:latest",
cmds=["python", "train.py", "--epochs", "50"],
resources={"request_cpu": "6", "request_memory": "12Gi"},
)

register_model = KubernetesPodOperator(
task_id="register_model",
name="mlflow-register",
namespace="airflow",
image="harbor.local/ml/trainer:latest",
cmds=["python", "register.py"],
env_vars={"MLFLOW_TRACKING_URI": "http://mlflow.mlflow.svc:5000"},
)

prepare_data >> train_model >> register_model

Each step runs as a Kubernetes pod — using your cluster's full CPU for training.


GitSync — Automatic DAG Deployment

You write a new DAG → git push to GitLab

Airflow gitSync pulls every 60s

New DAG appears in Airflow UI automatically

No restart, no manual upload

Done When

✔ Airflow pods Running with KubernetesExecutor
✔ GitSync pulling from GitLab DAG repo
✔ First DAG visible in UI and triggerable
✔ KubernetesPodOperator launching pods successfully
✔ ML training DAG running on cluster resources