Tutorial

Airflow 3 on AKS:
KubernetesExecutor, GitSync DAGs, Zero Bundled Dependencies

Warble Cloud·9 min read·Airflow · KubeRay · MLflow · Azure Blob
Airflow 3.1KubernetesExecutorGitSync KubeRayAzure Blob logsShared PostgreSQL

Airflow 2.x on FRAKMA worked, but it carried three dependencies we didn't want: a bundled PostgreSQL pod, Redis for the Celery executor, and Helm hooks that blocked GitOps-style upgrades. Airflow 3 fixed most of the scheduling internals, so we took the opportunity to clean up the deployment model at the same time.

What changed in our setup

ConcernAirflow 2.9Airflow 3.1 (current)
Metadata DBBundled Bitnami PostgreSQL podShared postgresql.warble-system
ExecutorCeleryExecutor + RedisKubernetesExecutor (no Redis)
Helm hooksDB migrate + create-user as hooksHooks disabled, manual post-install
Log storageLocal pod ephemeral storageAzure Blob WASB remote logs
Scheduler replicas2 (active-active, experimental)1 (stable, cost-efficient)

Disabling bundled dependencies

# platform/airflow/values.yaml
postgresql:
  enabled: false    # use shared warble-system PostgreSQL

redis:
  enabled: false    # KubernetesExecutor needs no queue broker

data:
  metadataSecretName: airflow-postgres-secret
  resultBackendSecretName: airflow-broker-url

executor: KubernetesExecutor

The secrets hold the full connection string. Create them before Helm install:

kubectl create secret generic airflow-postgres-secret -n airflow \
  --from-literal=connection="postgresql+psycopg2://airflow:pass@postgresql.warble-system:5432/airflow"

kubectl create secret generic airflow-broker-url -n airflow \
  --from-literal=connection="db+postgresql+psycopg2://airflow:pass@postgresql.warble-system:5432/airflow"

Disabling Helm hooks for GitOps compatibility

Helm hooks (helm.sh/hook: post-install) run outside of normal reconciliation and don't play well with Argo CD's sync model. With hooks enabled, every Argo CD sync attempts to re-run the DB migration job. We disable them and run migrations manually once:

createUserJob:
  useHelmHooks: false
  applyCustomEnv: false

migrateDBJob:
  useHelmHooks: false
  applyCustomEnv: false

webserver:
  defaultUser:
    enabled: false    # create admin user manually after install
# One-time post-install: run DB migration + create admin
kubectl exec -n airflow deploy/airflow-scheduler -- airflow db migrate
kubectl exec -n airflow deploy/airflow-webserver -- airflow users create \
  --username admin --password admin --firstname Admin --lastname User \
  --role Admin --email admin@frakma.io
Why this matters for Argo CD

With useHelmHooks: false, Argo CD treats the Airflow chart like any other Helm release — no hook Jobs to manage, no sync errors on subsequent deploys. The DB migration job only needs to run on first install and version upgrades, not on every reconcile.

Remote logs on Azure Blob

Task logs stored on the pod's ephemeral filesystem vanish when the KubernetesExecutor task pod terminates. We configure Airflow to ship logs to Azure Blob Storage immediately:

config:
  logging:
    remote_logging: "True"
    remote_base_log_folder: "wasb://airflow-logs@warbleosstate.blob.core.windows.net/"
    remote_log_conn_id: azure_blob_storage
# Create the Airflow connection for blob storage
kubectl exec -n airflow deploy/airflow-webserver -- airflow connections add \
  azure_blob_storage \
  --conn-type wasb \
  --conn-login warbleosstate \
  --conn-password <storage-access-key>

Writing a KubeRay drift-detection DAG

GitSync pulls DAG files from the dags/ directory of this repo every 60 seconds — no Airflow restarts needed. Here's a minimal drift-detection DAG that submits a KubeRay job when drift is detected:

# dags/detect_and_retrain.py
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from kubernetes import client, config

@dag(schedule="0 6 * * *", start_date=days_ago(1), catchup=False,
     tags=["llmops", "drift"])
def detect_and_retrain():

    @task
    def compute_drift() -> float:
        # Query Qdrant for last-24h embeddings, compare to baseline
        from qdrant_client import QdrantClient
        client = QdrantClient("qdrant.warble-system.svc.cluster.local")
        vectors = client.scroll("query_embeddings", limit=500)[0]
        return compute_cosine_drift(vectors)

    @task.branch
    def gate(score: float) -> str:
        return "submit_rayjob" if score > 0.15 else "skip"

    @task
    def submit_rayjob():
        config.load_incluster_config()
        batch = client.CustomObjectsApi()
        batch.create_namespaced_custom_object(
            group="ray.io", version="v1", plural="rayjobs",
            namespace="kuberay", body=rayjob_manifest()
        )

    @task
    def skip():
        pass

    score = compute_drift()
    gate(score) >> [submit_rayjob(), skip()]

detect_and_retrain()

KubernetesExecutor: what it means in practice

With KubernetesExecutor, every task runs in its own Kubernetes pod — scheduled, executed, and terminated. There is no worker pool to size or maintain.

Back to all posts

← All case studies