#007 5 Anti-Patterns with Airflow
FORMAT used in this article : Incorrect code → Fix code → Example context → Worst-case → Why bad → Ops fix checklist (guardrails).
1) Monolithic DAGs
❌ Incorrect (what not to do)
# monolithic_dag.py
with DAG("daily_etl", schedule_interval="@daily", default_args=default_args) as dag:
load_customers = PythonOperator(task_id="load_customers", python_callable=load_customers_func)
load_orders = PythonOperator(task_id="load_orders", python_callable=load_orders_func)
load_payments = PythonOperator(task_id="load_payments", python_callable=load_payments_func)
# ... imagine 200+ tasks ...
build_marts = PythonOperator(task_id="build_marts", python_callable=build_marts_func)
[load_customers, load_orders, load_payments] >> build_marts
✅ Fix (do this instead)
# mart_customers.py
with DAG("mart_customers", schedule_interval="@daily", default_args=default_args) as dag:
with TaskGroup("staging") as staging:
load_customers = PythonOperator(task_id="load_customers", python_callable=load_customers_func)
load_orders = PythonOperator(task_id="load_orders", python_callable=load_orders_func)
with TaskGroup("warehouse") as warehouse:
build_dim_customer = PythonOperator(task_id="build_dim_customer", python_callable=build_dim_customer_func)
staging >> warehouse
Example context: A single
daily_etlbuilds all staging, dims, facts, and marts.Worst-case: Scheduler parse time > schedule interval → missed runs; webserver timeouts; tiny code change triggers parse storms.
Why bad: Airflow’s parser + scheduler scale with DAG size; large graphs slow parsing, scheduling, and UI rendering.
Ops fix checklist: Break into domain DAGs; use TaskGroups, DAG factories; enable DAG serialization; keep per-DAG tasks < O(50–100); add SLA + alerts.
2) Heavy Compute Inside Operators
❌ Incorrect
def train_model():
import xgboost as xgb
dtrain = xgb.DMatrix("train.csv")
params = {"max_depth": 10, "eta": 0.1}
bst = xgb.train(params, dtrain, num_boost_round=1000) # hours on worker
bst.save_model("model.json")
train_task = PythonOperator(task_id="train_model", python_callable=train_model, dag=dag)
✅ Fix
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
train_task = DatabricksSubmitRunOperator(
task_id="train_model",
json={
"new_cluster": {"spark_version": "11.3.x-scala2.12", "num_workers": 4},
"spark_python_task": {"python_file": "dbfs:/ml/train.py", "parameters": ["--date", "{{ ds }}"]}
},
dag=dag,
)
Example context: 4-hour XGBoost/ETL runs inside
PythonOperatorblocking worker slots.Worst-case: Worker starvation → other DAGs miss SLAs; OOM → retry storms; cluster instability cascades.
Why bad: Airflow workers aren’t a compute grid; long CPU/RAM jobs defeat concurrency and retries.
Ops fix checklist: Offload to Spark/Databricks/EMR/BigQuery; set execution_timeout, retries with backoff; use queues/pools to isolate; pass params only.
3) Hard-coding Config & Secrets
❌ Incorrect
def upload_to_s3():
import boto3
client = boto3.client("s3",
aws_access_key_id="AKIA123...",
aws_secret_access_key="abcdEFGH..."
)
client.upload_file("/tmp/file.csv", "prod-bucket", "file.csv")
upload_task = PythonOperator(task_id="upload", python_callable=upload_to_s3, dag=dag)
✅ Fix
def upload_to_s3():
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
hook = S3Hook(aws_conn_id="aws_default") # via Secrets Backend
hook.load_file("/tmp/file.csv", key="file.csv", bucket_name="my-data-bucket")
upload_task = PythonOperator(task_id="upload", python_callable=upload_to_s3, dag=dag)
Example context: Credentials and prod bucket names embedded in DAG code.
Worst-case: Secret leak via Git; audit failure; cross-env deploys break; forced rotations and outages.
Why bad: Violates least privilege, prevents environment parity, and complicates rotations.
Ops fix checklist: Use Connections/Variables + Secrets Backend (Vault/AWS SM/GCP SM); per-env conn IDs; restrict IAM; lint CI to block secrets; rotate regularly.
4) Overusing XCom for Large Data
❌ Incorrect
def produce_data(**kwargs):
import pandas as pd
df = pd.read_csv("customers.csv")
kwargs["ti"].xcom_push(key="data", value=df.to_json()) # 10s–100s MB!
produce_task = PythonOperator(task_id="produce", python_callable=produce_data, provide_context=True, dag=dag)
✅ Fix
def produce_data(**kwargs):
import pandas as pd, os
df = pd.read_csv("customers.csv")
path = f"s3://my-bucket/customers/{{{{ ds }}}}/customers.parquet"
df.to_parquet(path)
kwargs["ti"].xcom_push(key="data_path", value=path)
produce_task = PythonOperator(task_id="produce", python_callable=produce_data, provide_context=True, dag=dag)
Example context: Passing entire DataFrames/files through XCom across tasks.
Worst-case: Metadata DB bloat; scheduler and UI slow/crash; backfills time out; DB vacuum/maintenance emergencies.
Why bad: XCom is metadata, not a data lake; large blobs hammer the metastore.
Ops fix checklist: Put payloads in S3/GCS/HDFS; XCom only URIs/IDs; set xcom_backend limits; add DB autovacuum tuning; monitor XCom size.
5) Ignoring Proper Dependencies (sleep/poll hacks)
❌ Incorrect
def wait_for_file():
import time
time.sleep(900) # hope it arrives in 15 min
return True
wait_task = PythonOperator(task_id="wait_for_file", python_callable=wait_for_file, dag=dag)
✅ Fix
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
wait_task = S3KeySensor(
task_id="wait_for_file",
bucket_key="incoming/{{ ds }}/file.csv",
bucket_name="my-data-bucket",
aws_conn_id="aws_default",
poke_interval=60,
timeout=3600,
mode="reschedule", # or deferrable variant
dag=dag,
)
Example context: Upstream data arrives “around” 1am; DAG sleeps then proceeds blindly.
Worst-case: Reads partial files; race conditions corrupt facts; silent data drift; downstream dashboards wrong.
Why bad: Time-based waits don’t encode real dependencies; wastes worker slots.
Ops fix checklist: Use Sensors/Deferrable Operators, ExternalTaskSensor, event triggers; explicit cross-DAG deps; add data quality checks (GE/dbt tests) to fail fast.
Quick global guardrails (apply to all 5)
Airflow = orchestrator, not compute or storage.
Add SLAs, alerts, retry with backoff, idempotent tasks.
Use pools/queues to prevent noisy-neighbor issues.
Keep DAGs modular, parameterized, and environment-agnostic.

