Chapter 9: Workflow Orchestration with Airflow and MWAA

Learning Objectives

Imagine a railroad dispatcher in a busy switching yard. Trains arrive on different tracks at different times, some carrying passengers, others freight, others empty cars headed back to the depot. The dispatcher does not drive any train; instead, they decide which track is clear, which signals turn green, and what order trains depart so nothing collides and everything reaches its destination on time. Workflow orchestrators play exactly this role for data pipelines: they do not move bytes themselves, but they decide when and in what order every extractor, transformer, and loader runs.

Chapter Architecture Overview

flowchart LR DAGs[(DAG Folder
Python files)] -->|parse every 30s| Sched[Scheduler] Sched -->|reads/writes state| MetaDB[(Metadata DB
Postgres)] Sched -->|enqueues task| Exec{Executor} Exec -->|Celery: via broker| Broker[(Redis / RabbitMQ / SQS)] Broker --> W1[Worker 1] Broker --> W2[Worker 2] Exec -->|K8s: spawn pod| Pod[Per-task Pod] W1 --> MetaDB W2 --> MetaDB Pod --> MetaDB Web[Webserver / UI / REST API] --> MetaDB User([User / Operator]) --> Web

Part 1 — Airflow Foundations: DAGs and Operators

Pre-Reading Check — Part 1

1. What does the "acyclic" requirement of a DAG actually buy you?

It guarantees the scheduler can determine when the graph is finished. It lets every task run in parallel. It removes the need for retries. It allows tasks to share the same task_id.

2. A team wants a sensor that does NOT tie up a worker slot while waiting hours for an upstream file. Which mode should they pick?

poke mode reschedule mode always-on mode eager mode

3. Which executor offers per-task resource isolation at the cost of ~30–60s startup?

Sequential Executor Celery Executor Kubernetes Executor Local Executor

4. Why is calling a slow API at the top of a DAG file an anti-pattern?

It is logged with PII. It runs on every parse cycle of the scheduler (≈ every 30s). It cannot be retried. DAG files cannot make network calls.

5. Where is the practical size limit for an XCom payload, and why?

~64 KB; XComs serialize to the metadata DB. No limit; XComs are stored in S3. ~1 GB; XComs use Redis. ~5 MB; XComs are gzipped to disk.

1.1 DAGs, Tasks, and Operators

A DAG (Directed Acyclic Graph) is a Python file declaring tasks and the dependencies between them. "Directed" means edges have a direction (A runs before B); "acyclic" means there are no loops, which is what lets the scheduler decide when the graph is finished. An operator is a template for the unit of work a task performs — PythonOperator runs a Python callable, BashOperator runs a shell command, PostgresOperator runs SQL. Each invocation of an operator inside a DAG yields a task; each scheduled run of a task on a particular logical date produces a task instance.

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data_team',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='data_pipeline',
    default_args=default_args,
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:

    def extract():
        return "data"

    task_extract = PythonOperator(
        task_id='extract_data',
        python_callable=extract,
    )

The scheduler parses every Python file in the dag_folder roughly every 30 seconds, instantiates the DAG and operator objects, validates the graph for cycles, and stores metadata in the database. A critical implication: every line of top-level code in a DAG file runs on every parse. Slow API calls or heavy SQL at module scope are the classic anti-pattern; that work belongs inside an operator's execute() method, which only runs when the task is dispatched.

1.2 Sensors and Task Groups

Sensors are operators that wait for an external condition: a file landing, a SQL row appearing, an S3 prefix going non-empty. They run in one of two modes:

Task Groups (Airflow 2.x, replacing legacy SubDAGs) give visual and logical grouping in the UI without spawning a child DAG. Wrap related work in with TaskGroup('extract'): and the UI collapses it into one expandable node — invaluable when DAGs balloon past 50 tasks.

1.3 Schedulers, Executors, and Workers — the Four Pillars

Airflow's runtime is a four-pillar architecture:

  1. Scheduler — parses DAGs and queues task instances
  2. Webserver — renders the UI and serves the REST API
  3. Workers — actually run task code
  4. Metadata DB — Postgres in production; stores DAG definitions, task states, connections, variables, XComs

The executor bridges scheduler and workers. Celery Executor uses a broker (Redis / RabbitMQ / SQS) and pre-warmed workers — fast startup (5–10s), fixed worker shape. Kubernetes Executor spawns a fresh pod per task with custom resources — total isolation, slower startup (30–60s), needs a K8s cluster.

Analogy: Celery is a fleet of buses on fixed routes — cheap, fast to board, every bus the same size. Kubernetes is ride-share dispatching custom vehicles — any size you want, but the car has to drive to you first.

Animated Figure 9.1 — Scheduler → Executor → Worker DAG flow
DAG file: Python module parsed every ~30s by the scheduler DAG file .py Scheduler: parses DAGs, evaluates schedule, queues task instances Scheduler parse + queue Executor: dispatch layer (Celery via broker, or KubernetesExecutor pod) Executor Celery / K8s Worker: runs operator.execute() and reports state back to the metadata DB Worker execute() + state Metadata DB (Postgres): single source of truth for DAG state Metadata DB Postgres task instance dispatched state written back
A scheduled task instance flows DAG → Scheduler → Executor → Worker; the worker writes the final state back to the Metadata DB.

1.4 Connections, Variables, and XComs

Airflow ships three first-class mechanisms for moving config and small payloads:

Correct pattern for big data: land it in S3, push only the reference (bucket key, partition path, row count) through XCom.

Key Points — Airflow Foundations

Post-Reading Check — Part 1

1. What does the "acyclic" requirement of a DAG actually buy you?

It guarantees the scheduler can determine when the graph is finished. It lets every task run in parallel. It removes the need for retries. It allows tasks to share the same task_id.

2. A team wants a sensor that does NOT tie up a worker slot while waiting hours for an upstream file. Which mode should they pick?

poke mode reschedule mode always-on mode eager mode

3. Which executor offers per-task resource isolation at the cost of ~30–60s startup?

Sequential Executor Celery Executor Kubernetes Executor Local Executor

4. Why is calling a slow API at the top of a DAG file an anti-pattern?

It is logged with PII. It runs on every parse cycle of the scheduler (≈ every 30s). It cannot be retried. DAG files cannot make network calls.

5. Where is the practical size limit for an XCom payload, and why?

~64 KB; XComs serialize to the metadata DB. No limit; XComs are stored in S3. ~1 GB; XComs use Redis. ~5 MB; XComs are gzipped to disk.

Part 2 — Amazon MWAA and Production Patterns

Pre-Reading Check — Part 2

1. How are DAGs deployed to an MWAA environment?

SSH into a worker and rsync the files. Push to a Git repository connected to MWAA. Sync the dags/ prefix of the configured S3 bucket; MWAA pulls every ~30s. Upload through the Airflow UI.

2. Modifying requirements.txt in the MWAA bucket triggers what?

Nothing — it's read at task time. Immediate hot reload of dependencies. A pipeline restart that can take 20–30 minutes. A required manual environment recreation.

3. Which template variable is the safest key for an idempotent partitioned write?

datetime.now().date() {{ ds }} (execution date) uuid4() {{ random_seed }}

4. You ship a new DAG on March 15 with start_date Jan 1 and the default catchup setting in older Airflow. What happens?

Nothing until March 15's run. Airflow attempts to schedule 73 historical runs in rapid succession. Airflow refuses to deploy. Airflow runs only the most recent date.

5. A task succeeds at 11 PM but its SLA was 9 AM. What does Airflow do if SLA is set?

Marks the task failed retroactively. Records an SLA miss and surfaces it (callback / email if configured); run still succeeds. Kills the run. Nothing — SLA is informational only and never recorded.

2.1 MWAA Environment Sizing

Self-hosting Airflow means running schedulers, webservers, workers, brokers, Postgres, secrets, log aggregation, and patching all of it. MWAA is AWS's managed Airflow on Fargate behind a customer-controlled VPC, syncing DAGs from S3 and shipping logs to CloudWatch.

Three environment classes fix per-worker compute shape:

ClassvCPUMemoryTasks/WorkerHourlyUse Case
mw1.small14 GB5~$0.49Light / dev workloads
mw1.medium28 GB10~$0.98Standard production ETL
mw1.large416 GB10+ (configurable)~$1.96Heavy transforms, large pandas

A mw1.medium with 10 minimum workers running 24/7 costs roughly $720/month. Worker autoscaling watches RunningTasks and QueuedTasks CloudWatch metrics; you set a min and max worker count. AWS guidance: if MWAA spends >6 hr/day above its minimum, raise the minimum.

Three sizing strategies:

2.2 DAG Deployment via S3

MWAA's deployment contract: everything lives in one S3 bucket.

s3://my-mwaa-bucket/
├── dags/                  # Python DAG files (synced ~every 30s)
│   ├── etl_pipeline.py
│   └── reporting.py
├── plugins.zip            # Custom operators, hooks, macros
└── requirements.txt       # Python dependencies (triggers env restart)

The scheduler polls dags/ every ~30s — that is your deploy mechanism. Version the bucket for rollback. requirements.txt changes trigger a 20–30 minute pipeline restart, so treat them as planned outages. CI/CD pattern: PR → DAG-import test → merge → aws s3 sync ./dags s3://my-mwaa-bucket/dags/.

2.3 Networking and IAM

MWAA always runs inside a customer-owned VPC across two private subnets in two AZs. The webserver can be public or private. For air-gapped operation, configure VPC endpoints for S3, CloudWatch Logs, ECR, KMS, and SQS.

The IAM execution role (AWSMWAA-*) is what every task assumes. It needs:

Animated Figure 9.2 — MWAA topology with VPC, IAM, S3 DAG bucket
Customer VPC Developer pushes DAGs via aws s3 sync Developer aws s3 sync S3 DAG bucket: dags/, plugins.zip, requirements.txt — versioned for rollback S3 DAG bucket dags/ + plugins + requirements.txt IAM execution role (AWSMWAA-*): S3, CloudWatch, KMS, SQS, ECS — assumed by every task IAM Role execution role AWSMWAA-* Scheduler: polls S3 every 30s, writes to RDS Scheduler Fargate SQS broker: Celery message bus inside MWAA SQS Broker Celery queue Fargate workers: autoscale on RunningTasks + QueuedTasks Workers Fargate autoscale RDS Postgres: Airflow metadata DB (managed by AWS) RDS Postgres metadata DB CloudWatch Logs: scheduler, webserver, worker, DAG processor, task logs CloudWatch logs DAG sync Outside VPC VPC ingress
DAGs land in S3 outside the VPC; the scheduler (inside the customer VPC) polls and dispatches via SQS to Fargate workers. Every component assumes the IAM execution role; logs flow to CloudWatch.

2.4 Idempotent Task Design

Idempotency means running a task twice produces the same result as running it once. This is non-negotiable because retries, manual reruns, and backfills will execute the same task multiple times for the same logical date.

The single most important Airflow idiom: key writes on the execution date. Airflow templates {{ ds }} (date string YYYY-MM-DD) into operator parameters at runtime. Non-idempotent appends create duplicates on rerun; idempotent versions delete-then-insert by partition:

upsert_partition = PostgresOperator(
    task_id='upsert_daily_partition',
    postgres_conn_id='warehouse',
    sql="""
        DELETE FROM fact_orders WHERE order_date = '{{ ds }}';
        INSERT INTO fact_orders (order_id, order_date, amount)
        SELECT order_id, order_date, amount
        FROM staging.orders
        WHERE order_date = '{{ ds }}';
    """,
)

Common non-idempotent footguns: INSERT without DELETE; using datetime.now() instead of {{ ds }} (because "now" changes on retry); side-effecting API calls without idempotency keys.

2.5 Backfills, Catchup, and Retries

A backfill runs historical DAG runs for execution dates earlier than today. Airflow can do this two ways:

Retries are the unit-test of orchestration: every operator should set retries (2–3) and retry_delay (5 min, with retry_exponential_backoff=True). Pair with execution_timeout so stuck tasks do not run forever.

Animated Figure 9.3 — Task instance lifecycle
none: not yet evaluated by scheduler none scheduled: scheduler picked it up; waiting for an executor slot scheduled queued: executor accepted; waiting for a worker queued running: worker invoked operator.execute() running success: exit 0, state written back to metadata DB success up_for_retry: operator threw, retries remaining; re-enters scheduled after retry_delay up_for_retry scheduler executor worker starts exit 0 exception after retry_delay in-flight states success (terminal) retry / failed
A task moves none → scheduled → queued → running, then either succeeds or enters up_for_retry, where after retry_delay it re-enters scheduled until retries are exhausted.

2.6 SLA Monitoring and Alerting

A pipeline that completes at 11 PM but was due at 9 AM is a failed pipeline, even with all-green tasks. Setting sla=timedelta(hours=2) on a task tells Airflow: "this should finish within 2 hours of its scheduled time." A miss is recorded in the metadata DB, surfaced on the SLA Misses page, and (if configured) emits a callback or email.

upload_to_warehouse = PythonOperator(
    task_id='upload_to_warehouse',
    python_callable=upload_fn,
    sla=timedelta(hours=2),
    on_failure_callback=alert_pagerduty,
    on_retry_callback=log_retry,
)

Production alerting tiers:

Rule of thumb: every DAG has an owner, every owner has an alert channel, every SLA-bearing task has an explicit sla. Silent failures are far more dangerous than loud ones.

Key Points — MWAA & Production Patterns

Post-Reading Check — Part 2

1. How are DAGs deployed to an MWAA environment?

SSH into a worker and rsync the files. Push to a Git repository connected to MWAA. Sync the dags/ prefix of the configured S3 bucket; MWAA pulls every ~30s. Upload through the Airflow UI.

2. Modifying requirements.txt in the MWAA bucket triggers what?

Nothing — it's read at task time. Immediate hot reload of dependencies. A pipeline restart that can take 20–30 minutes. A required manual environment recreation.

3. Which template variable is the safest key for an idempotent partitioned write?

datetime.now().date() {{ ds }} (execution date) uuid4() {{ random_seed }}

4. You ship a new DAG on March 15 with start_date Jan 1 and the default catchup setting in older Airflow. What happens?

Nothing until March 15's run. Airflow attempts to schedule 73 historical runs in rapid succession. Airflow refuses to deploy. Airflow runs only the most recent date.

5. A task succeeds at 11 PM but its SLA was 9 AM. What does Airflow do if SLA is set?

Marks the task failed retroactively. Records an SLA miss and surfaces it (callback / email if configured); run still succeeds. Kills the run. Nothing — SLA is informational only and never recorded.

Part 3 — Alternative Orchestrators

Pre-Reading Check — Part 3

1. Step Functions models a workflow primarily as:

A directed acyclic graph of Python tasks. A state machine in Amazon States Language (JSON). A YAML-defined Kubernetes job graph. A SQL stored procedure.

2. Dagster's primary abstraction is:

Tasks. Data assets, with lineage and quality first-class. Stored procedures. EventBridge rules.

3. Why might a Python-first team pick Prefect over Airflow?

Prefect supports Postgres better. Prefect natively supports runtime-shaped flows (e.g., list comprehensions generating tasks dynamically). Prefect runs only on AWS. Prefect has a larger provider ecosystem than Airflow.

4. A team needs to orchestrate Lambdas, ECS tasks, and SageMaker jobs as a branching state machine on AWS only. Best fit?

Airflow Step Functions Dagster Prefect

5. Which statement about combining orchestrators in mature organizations is most accurate?

They are mutually exclusive — pick one. It's common to mix them; e.g., Airflow for batch ETL, Step Functions for event-driven Lambda flows, Dagster for analytics platforms. Airflow can act as the only orchestrator for any workload. Multiple orchestrators always violate compliance.

3.1 AWS Step Functions

Step Functions is AWS's serverless orchestrator built around the Amazon States Language — a JSON DSL for state machines (Task, Choice, Parallel, Map, Wait, Pass, Succeed, Fail) with explicit transitions.

{
  "StartAt": "ExtractData",
  "States": {
    "ExtractData": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123:function:extract",
      "Next": "CheckCount",
      "Retry": [{"ErrorEquals": ["States.TaskFailed"], "MaxAttempts": 3}]
    },
    "CheckCount": {
      "Type": "Choice",
      "Choices": [
        {"Variable": "$.count", "NumericGreaterThan": 0, "Next": "Transform"}
      ],
      "Default": "Skip"
    },
    "Transform": {"Type": "Task", "Resource": "...", "End": true},
    "Skip": {"Type": "Succeed"}
  }
}

Strengths: service choreography (Lambda + ECS + SageMaker + EventBridge + SQS), native retries/error catchers, parallel/map semantics, pay-per-state-transition (~$0.000025/transition Standard), no scheduler/DAG/worker pool to manage.

Weaknesses (vs Airflow): bare scheduling (cron via EventBridge), manual rerun for backfills, JSON-heavy DSL, smaller ecosystem of "providers" — you write Lambda glue instead of importing SnowflakeOperator.

3.2 Dagster — Asset-First Orchestration

Dagster reframes orchestration around data assets rather than tasks. Declare a SQL table or ML model as an @asset; Dagster infers the dependency graph from the assets each one consumes, and tracks lineage, freshness, and materialization automatically. Built-in data-quality checks (@asset_check) run inline. Trade-off: conceptual overhead if your team thinks in tasks, and a smaller (though fast-growing) ecosystem.

3.3 Prefect — Pythonic Dynamic Flows

Prefect is the most Pythonic of the four. A flow is just a decorated function:

from prefect import flow, task

@task
def extract(): return [1, 2, 3]

@task
def transform(x): return x * 2

@flow
def pipeline():
    data = extract()
    return [transform(x) for x in data]  # truly dynamic

That list comprehension generates tasks dynamically based on extracted data — something Airflow only approximates with Dynamic Task Mapping (2.3+). Prefect Cloud has free + paid tiers (~$50–300/mo) with strong DX focus.

3.4 Comparison Matrix

DimensionAirflowStep FunctionsDagsterPrefect
ModelDAGState machineAsset graphPython flow
Dynamic workflowsLimited (DTM)Map stateDynamic Graphs APINative
Lineage / data qualityPlugin-basedNone nativeBuilt-inLimited
AWS integrationProvider packagesNativeProvider packagesProvider packages
PricingOSS / MWAA $720+/mo$0.000025/transitionOSS / CloudOSS / Cloud $50–300/mo
Best forBatch ETL at scaleService choreographyData platformsPython-first dynamic flows

3.5 Choosing an Orchestrator

The decision is rarely about features in isolation; it is about fit with infrastructure, team, and workload. Four useful questions:

  1. Where does compute live? AWS-only? Step Functions cuts setup. Multi-cloud / on-prem? Airflow, Dagster, or Prefect.
  2. What is the workload pattern? Daily batch ETL with hundreds of DAGs? Airflow. Asset lineage and quality central? Dagster. Dynamic data-shaped fan-out? Prefect. Cross-service call graphs? Step Functions.
  3. Team size and shape? Small Python-first? Prefect's DX. Large data org with platform team? Airflow's ecosystem. AWS-shop with serverless culture? Step Functions.
  4. How important is data observability? Lineage + quality first-class? Dagster. General run monitoring is enough? Any option, including Airflow with plugins.

Mature organizations often combine orchestrators: Airflow/MWAA for daily batch warehouse pipelines, Step Functions for event-driven Lambda workflows, Dagster for the analytics platform. They are not mutually exclusive.

Decision Tree (Mermaid)

flowchart TD Start([New workflow to orchestrate]) --> Q1{Compute lives
only on AWS?} Q1 -->|Yes| Q2{Service choreography
Lambda + ECS + SageMaker?} Q1 -->|No / Multi-cloud| Q3{Daily batch ETL
with rich scheduling?} Q2 -->|Yes| SF[Step Functions] Q2 -->|No, batch ETL| Q3 Q3 -->|Yes| Q4{Data assets
and lineage central?} Q3 -->|No, dynamic flows| Q5{Python-first team
runtime fan-out?} Q4 -->|Yes| Dagster[Dagster] Q4 -->|No| Airflow[Airflow / MWAA] Q5 -->|Yes| Prefect[Prefect] Q5 -->|No| Airflow

Key Points — Alternative Orchestrators

Post-Reading Check — Part 3

1. Step Functions models a workflow primarily as:

A directed acyclic graph of Python tasks. A state machine in Amazon States Language (JSON). A YAML-defined Kubernetes job graph. A SQL stored procedure.

2. Dagster's primary abstraction is:

Tasks. Data assets, with lineage and quality first-class. Stored procedures. EventBridge rules.

3. Why might a Python-first team pick Prefect over Airflow?

Prefect supports Postgres better. Prefect natively supports runtime-shaped flows (e.g., list comprehensions generating tasks dynamically). Prefect runs only on AWS. Prefect has a larger provider ecosystem than Airflow.

4. A team needs to orchestrate Lambdas, ECS tasks, and SageMaker jobs as a branching state machine on AWS only. Best fit?

Airflow Step Functions Dagster Prefect

5. Which statement about combining orchestrators in mature organizations is most accurate?

They are mutually exclusive — pick one. It's common to mix them; e.g., Airflow for batch ETL, Step Functions for event-driven Lambda flows, Dagster for analytics platforms. Airflow can act as the only orchestrator for any workload. Multiple orchestrators always violate compliance.

Your Progress

Answer Explanations