Airflow 2.x on FRAKMA worked, but it carried three dependencies we didn't want: a bundled PostgreSQL pod, Redis for the Celery executor, and Helm hooks that blocked GitOps-style upgrades. Airflow 3 fixed most of the scheduling internals, so we took the opportunity to clean up the deployment model at the same time.
What changed in our setup
| Concern | Airflow 2.9 | Airflow 3.1 (current) |
|---|---|---|
| Metadata DB | Bundled Bitnami PostgreSQL pod | Shared postgresql.warble-system |
| Executor | CeleryExecutor + Redis | KubernetesExecutor (no Redis) |
| Helm hooks | DB migrate + create-user as hooks | Hooks disabled, manual post-install |
| Log storage | Local pod ephemeral storage | Azure Blob WASB remote logs |
| Scheduler replicas | 2 (active-active, experimental) | 1 (stable, cost-efficient) |
Disabling bundled dependencies
# platform/airflow/values.yaml
postgresql:
enabled: false # use shared warble-system PostgreSQL
redis:
enabled: false # KubernetesExecutor needs no queue broker
data:
metadataSecretName: airflow-postgres-secret
resultBackendSecretName: airflow-broker-url
executor: KubernetesExecutor
The secrets hold the full connection string. Create them before Helm install:
kubectl create secret generic airflow-postgres-secret -n airflow \
--from-literal=connection="postgresql+psycopg2://airflow:pass@postgresql.warble-system:5432/airflow"
kubectl create secret generic airflow-broker-url -n airflow \
--from-literal=connection="db+postgresql+psycopg2://airflow:pass@postgresql.warble-system:5432/airflow"
Disabling Helm hooks for GitOps compatibility
Helm hooks (helm.sh/hook: post-install) run outside of normal reconciliation and don't play well with Argo CD's sync model. With hooks enabled, every Argo CD sync attempts to re-run the DB migration job. We disable them and run migrations manually once:
createUserJob:
useHelmHooks: false
applyCustomEnv: false
migrateDBJob:
useHelmHooks: false
applyCustomEnv: false
webserver:
defaultUser:
enabled: false # create admin user manually after install
# One-time post-install: run DB migration + create admin
kubectl exec -n airflow deploy/airflow-scheduler -- airflow db migrate
kubectl exec -n airflow deploy/airflow-webserver -- airflow users create \
--username admin --password admin --firstname Admin --lastname User \
--role Admin --email admin@frakma.io
With useHelmHooks: false, Argo CD treats the Airflow chart like any other Helm release — no hook Jobs to manage, no sync errors on subsequent deploys. The DB migration job only needs to run on first install and version upgrades, not on every reconcile.
Remote logs on Azure Blob
Task logs stored on the pod's ephemeral filesystem vanish when the KubernetesExecutor task pod terminates. We configure Airflow to ship logs to Azure Blob Storage immediately:
config:
logging:
remote_logging: "True"
remote_base_log_folder: "wasb://airflow-logs@warbleosstate.blob.core.windows.net/"
remote_log_conn_id: azure_blob_storage
# Create the Airflow connection for blob storage
kubectl exec -n airflow deploy/airflow-webserver -- airflow connections add \
azure_blob_storage \
--conn-type wasb \
--conn-login warbleosstate \
--conn-password <storage-access-key>
Writing a KubeRay drift-detection DAG
GitSync pulls DAG files from the dags/ directory of this repo every 60 seconds — no Airflow restarts needed. Here's a minimal drift-detection DAG that submits a KubeRay job when drift is detected:
# dags/detect_and_retrain.py
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from kubernetes import client, config
@dag(schedule="0 6 * * *", start_date=days_ago(1), catchup=False,
tags=["llmops", "drift"])
def detect_and_retrain():
@task
def compute_drift() -> float:
# Query Qdrant for last-24h embeddings, compare to baseline
from qdrant_client import QdrantClient
client = QdrantClient("qdrant.warble-system.svc.cluster.local")
vectors = client.scroll("query_embeddings", limit=500)[0]
return compute_cosine_drift(vectors)
@task.branch
def gate(score: float) -> str:
return "submit_rayjob" if score > 0.15 else "skip"
@task
def submit_rayjob():
config.load_incluster_config()
batch = client.CustomObjectsApi()
batch.create_namespaced_custom_object(
group="ray.io", version="v1", plural="rayjobs",
namespace="kuberay", body=rayjob_manifest()
)
@task
def skip():
pass
score = compute_drift()
gate(score) >> [submit_rayjob(), skip()]
detect_and_retrain()
KubernetesExecutor: what it means in practice
With KubernetesExecutor, every task runs in its own Kubernetes pod — scheduled, executed, and terminated. There is no worker pool to size or maintain.
- Isolation: a crashing task kills its pod, not a shared worker process.
- Resource requests per task: GPU-intensive tasks get GPU node affinity; lightweight tasks run on the system pool.
- No queue: the scheduler submits pod specs directly to the Kubernetes API. Redis is not involved.
- Cost: you pay only for the pods that are actively running tasks, not for idle workers waiting for work.