Chapter 4: Batch Ingestion and ETL/ELT Pipelines

Learning Objectives

Pre-Section Quiz: ETL vs ELT and AWS Glue

1. Why does ELT typically outperform ETL when the destination is a modern cloud warehouse?

A. Cloud warehouses can scan more bytes per second than any intermediate ETL server.
B. Cloud warehouses provide elastic, pay-per-use compute that scales up for transforms and back down when idle, while a dedicated ETL server costs the same whether busy or not.
C. ELT pipelines never need a transformation framework because raw data is self-describing.
D. ETL is incompatible with semi-structured formats like Parquet or JSON.

2. Which scenario most strongly justifies keeping an ETL stage in front of a cloud warehouse?

A. The team wants new fields to ride along automatically without redeployment.
B. The warehouse already supports semi-structured types like VARIANT.
C. PII columns must be hashed or tokenized before raw values are ever queryable in the warehouse.
D. The pipeline produces fewer than one million rows per day.

3. What is the primary role of the AWS Glue Data Catalog?

A. It executes Spark jobs and writes their results back to S3.
B. It is a central metadata repository (table definitions, schemas, partitions) that lets Athena, Redshift Spectrum, EMR, and Glue jobs all agree on what a table is.
C. It stores the actual row-level data on managed SSDs operated by AWS.
D. It is a visual ETL builder that generates PySpark code from drag-and-drop diagrams.

4. A bucket of JSON files has the field address.zip stored as a string in some records and as an integer in others. What does a Glue DynamicFrame do that a Spark DataFrame cannot?

A. It rejects every record that does not match a single inferred schema.
B. It silently casts every zip to a string by default with no way to reconsider.
C. It stores both possibilities per record using a "choice type" and lets you resolve the ambiguity later via ResolveChoice.
D. It refuses to load the data and emits a Spark schema-mismatch exception.

5. Which Glue tuning lever directly addresses the driver memory pressure caused by an S3 prefix containing thousands of small files?

A. Switching to G.8X workers and accepting the cost.
B. Disabling Adaptive Query Execution.
C. Setting the connection option groupFiles: 'inPartition' so many small files collapse into a single Spark task.
D. Calling job.commit() twice at the end of the script.

ETL vs ELT and AWS Glue

For three decades "ETL" was synonymous with data integration: a dedicated server pulled rows out of source systems, reshaped them in flight, and wrote the polished result into the warehouse. Cloud warehouses inverted that flow. Today most pipelines load raw data first and transform it inside the warehouse, an arrangement called ELT. Knowing when to use each pattern, and where the boundary lies, is the most consequential architectural decision in a batch pipeline.

Key Points

Why ELT dominates in cloud warehouses

Cloud warehouses favor ELT for three reinforcing reasons. First, compute elasticity: Snowflake, BigQuery, and Redshift can scale compute up to handle a transform and back down when it finishes; a dedicated ETL server costs the same whether processing 100 GB or sitting idle. Second, pay-per-use billing: Snowflake charges per second of warehouse compute, BigQuery per byte scanned, Redshift via reserved or serverless capacity — all of which reward "transform when needed" rather than "transform always." Third, schema agility: loading raw data first means new fields land automatically and historical reprocessing is a query, not a redeployment.

The performance gap can be dramatic. Loading 100 GB of clickstream data with traditional ETL means an intermediate server has to extract, join, aggregate, and then push the result into the warehouse — often over hours. The ELT version loads the raw 100 GB into Snowflake in minutes and applies the same transformations using massively parallel warehouse compute.

Animation: ETL vs ELT — transform-then-load vs load-then-transform
ETL transforms before loading to the warehouse; ELT loads raw and transforms in place. ETL — Transform before load Source DB orders, users Extract pull rows Transform intermediate server Load shaped rows Warehouse final shape only ELT — Load raw, transform in place Source DB orders, users Extract pull rows Load raw no reshape Warehouse raw landed Transform in-warehouse SQL ETL pays a transform server full-time; ELT spins up warehouse compute on demand.
Both flows start at the source. ETL transforms in an intermediate server before the warehouse sees the data; ELT lands raw and transforms inside the warehouse using elastic compute.

ETL vs ELT comparison

FactorETLELT
Processing powerBound by intermediate serverWarehouse auto-scales
Data movementMultiple hops, repeated I/OSingle load, transform in place
Latency on large datasetsBottlenecked at transform layerNear real-time within warehouse
Schema changesPipeline redeploy + backfillNew fields ride along automatically
Cost when idle24/7 server costStorage only

Analogy. ETL is a meal-kit company that chops your vegetables in a central kitchen and ships pre-prepped boxes. ELT is a grocery delivery service: raw ingredients arrive in your kitchen and you decide what to make tonight. The grocery model wastes nothing if your menu changes; the meal kit forces an upstream change every time you want a new dish.

When ETL still applies

ELT is the default but not the universal answer. ETL keeps a role wherever raw data must not land in the warehouse in its original form:

  1. PII redaction and minimization. GDPR/HIPAA require that PII be processed only for legitimate purposes; if your warehouse cannot guarantee that no one will query a raw email or ssn, you must hash, tokenize, or mask before the row arrives.
  2. Strict schema enforcement. Regulatory feeds, financial reports, and ML feature stores cannot tolerate optional fields, type coercion, or schema drift. ETL's schema-on-write rejects malformed records at the boundary instead of corrupting downstream tables.
  3. Cross-warehouse or air-gapped destinations. When data must traverse trust boundaries or move between clouds, transformation has to happen in a neutral compute layer (Glue, Spark on EMR, Fivetran).

The transformation layer: dbt and SQL Mesh

If raw data lands in the warehouse, transformations have to live somewhere. dbt treats SQL as software: each model is a SELECT in a .sql file plus a YAML descriptor; dbt compiles the files into a DAG, runs them in dependency order, and reports test failures. SQL Mesh adds virtual data environments, semantic versioning of models, and explicit handling of breaking changes via preview environments. Both treat business logic as code in version control, tested before deployment, with the warehouse as the runtime.

AWS Glue overview

AWS Glue is Amazon's serverless data integration service. It bundles four capabilities you will use repeatedly: a managed metadata catalog, automatic schema crawlers, a visual ETL builder (Glue Studio), and a serverless Spark runtime. Together they cover the lifecycle from "we just got a folder of CSVs in S3" to "production-grade Parquet tables refreshing every hour."

Glue crawlers and the Data Catalog

The Glue Data Catalog is the central metadata repository: table definitions, schemas, partition info, and pointers to underlying data, queryable by Athena, Redshift Spectrum, EMR, and Glue jobs. A crawler populates it. Point a crawler at an S3 prefix or a JDBC source and it will (1) classify objects (JSON, Parquet, CSV, Avro, ORC), (2) infer schema and partition structure from prefixes like year=2026/month=05/day=07/, and (3) register or update a table in a target Glue database.

Figure 4.2: Glue crawler-to-catalog-to-consumer pipeline

flowchart TD S3[("S3 raw zone
year=/month=/day=")] -->|scan + classify| C[Glue Crawler] C -->|infer schema
+ partitions| DC[(Glue Data Catalog
my_database.orders)] DC --> A[Athena queries] DC --> J[Glue Spark jobs] DC --> RS[Redshift Spectrum] DC --> EMR[EMR / external tools] classDef catalog fill:#1f3a5f,stroke:#58a6ff,color:#fff; class DC catalog;

Glue Studio visual ETL

Not every transformation deserves hand-written Spark. Glue Studio is a drag-and-drop interface; you wire source nodes (Catalog, S3, JDBC, Kinesis), transformation nodes (apply mapping, filter, join, drop fields, aggregate, custom SQL), and sink nodes, and Glue Studio generates PySpark or Scala. It is appropriate for column projections, type casts, joins of a fact to a small dimension, and review-friendly artifacts. Hand-written Spark is the right tool when logic exceeds simple flow-graph transformations.

Glue Spark jobs and DynamicFrames

Under the hood, Glue jobs run on managed Apache Spark. AWS provisions executors, scales them within configured limits, and tears them down when the job finishes; you pay per second of DPU consumption. The Glue API introduces an abstraction on top of Spark called the DynamicFrame: similar to a DataFrame but tolerant of schema variance per record. If half your JSON has address.zip as a string and half as an integer, a DataFrame would fail to infer a single schema; a DynamicFrame stores both possibilities using a "choice type" and lets you resolve later with ResolveChoice.

AspectDynamicFrameSpark DataFrame
Schema strictnessTolerates variance per recordRequires uniform schema
Native sourcesGlue Catalog, S3, JDBCStandard Spark sources
Built-in transformsApplyMapping, ResolveChoice, Relationalize, DropNullFieldsStandard Spark API
Best forHeterogeneous, semi-structured dataCleaned, conforming data

The Relationalize transform is the killer feature: given a deeply nested JSON structure, it walks the tree and produces a set of relational tables joined by surrogate keys — exactly what you need to load nested data into a relational warehouse. Once data is clean, convert a DynamicFrame to a DataFrame with .toDF() and use Spark SQL.

Three tuning levers matter before the first production deploy:

The job.commit() call is the signal that tells Glue to advance the job bookmark and mark the run successful. Without it, the next run reprocesses the same data.

Post-Section Quiz: ETL vs ELT and AWS Glue

1. Why does ELT typically outperform ETL when the destination is a modern cloud warehouse?

A. Cloud warehouses can scan more bytes per second than any intermediate ETL server.
B. Cloud warehouses provide elastic, pay-per-use compute that scales up for transforms and back down when idle, while a dedicated ETL server costs the same whether busy or not.
C. ELT pipelines never need a transformation framework because raw data is self-describing.
D. ETL is incompatible with semi-structured formats like Parquet or JSON.

2. Which scenario most strongly justifies keeping an ETL stage in front of a cloud warehouse?

A. The team wants new fields to ride along automatically without redeployment.
B. The warehouse already supports semi-structured types like VARIANT.
C. PII columns must be hashed or tokenized before raw values are ever queryable in the warehouse.
D. The pipeline produces fewer than one million rows per day.

3. What is the primary role of the AWS Glue Data Catalog?

A. It executes Spark jobs and writes their results back to S3.
B. It is a central metadata repository (table definitions, schemas, partitions) that lets Athena, Redshift Spectrum, EMR, and Glue jobs all agree on what a table is.
C. It stores the actual row-level data on managed SSDs operated by AWS.
D. It is a visual ETL builder that generates PySpark code from drag-and-drop diagrams.

4. A bucket of JSON files has the field address.zip stored as a string in some records and as an integer in others. What does a Glue DynamicFrame do that a Spark DataFrame cannot?

A. It rejects every record that does not match a single inferred schema.
B. It silently casts every zip to a string by default with no way to reconsider.
C. It stores both possibilities per record using a "choice type" and lets you resolve the ambiguity later via ResolveChoice.
D. It refuses to load the data and emits a Spark schema-mismatch exception.

5. Which Glue tuning lever directly addresses the driver memory pressure caused by an S3 prefix containing thousands of small files?

A. Switching to G.8X workers and accepting the cost.
B. Disabling Adaptive Query Execution.
C. Setting the connection option groupFiles: 'inPartition' so many small files collapse into a single Spark task.
D. Calling job.commit() twice at the end of the script.
Pre-Section Quiz: Incremental Ingestion — CDC and Watermarking

1. Which property is unique to log-based CDC (Debezium, AWS DMS) compared with trigger-based or query-based CDC?

A. It works without any connector or external transport.
B. It guarantees zero performance impact on the source while reliably capturing INSERT, UPDATE, and DELETE in transaction order, including the full before/after values.
C. It only requires that the source expose a last_modified_at column.
D. It cannot capture deletes, only inserts and updates.

2. A team uses query-based CDC against a SaaS API: SELECT * WHERE last_modified_at > :prev_hwm. Which weakness of this pattern is structural rather than tunable?

A. It can only run once per day.
B. The watermark column cannot be a timestamp.
C. Deletes are invisible because the query can only return rows that still exist; without a soft-delete flag or periodic full reconcile, removed rows linger downstream.
D. It cannot be combined with a downstream upsert.

3. To absorb clock skew when a transaction commits with last_modified_at = T but is not visible until T + 30s, what is the standard mitigation?

A. Advance prev_hwm exactly to the new current_hwm with no overlap.
B. Capture current_hwm as now() - lag_window and/or read with a small overlap behind prev_hwm; the downstream upsert absorbs duplicates.
C. Switch to an auto-incrementing id column, which detects updates and deletes as a side effect.
D. Disable the watermark and run a full reload nightly forever.

4. Which two ingredients are required for a Glue job bookmark to actually advance between runs?

A. A G.4X worker and AQE enabled.
B. A bookmark-aware source created with a stable transformation_ctx AND a final job.commit() call.
C. A Glue Studio diagram and a Lake Formation permission grant.
D. A scheduled crawler and an Athena query on the result.

5. A team wants to capture inserts only on a strictly append-only event table where rows are never updated. Which watermark column has the cleanest semantics?

A. A last_modified_at timestamp updated by every UPDATE.
B. A boolean is_processed flag.
C. An auto-incrementing id: strictly monotonic, no clock skew, perfect for insert-only tables — at the price of not detecting updates.
D. A random UUID assigned by the producer.

Incremental Ingestion: CDC and Watermarking

Full reloads do not scale. A 10-billion-row source table cannot be re-extracted nightly without saturating the source database, the network, and the warehouse. Incremental ingestion is the practice of moving only what has changed since the last run. There are three families of approaches — log-based CDC, query-based watermarks, and S3-side Glue bookmarks — and you will likely use all three across a single platform.

Key Points

Change Data Capture (CDC) sources

CDC identifies and captures INSERT, UPDATE, and DELETE operations in source databases, enabling incremental synchronization rather than full reloads. Instead of asking "what is the current state of the table?" CDC asks "what events changed the table?" That difference, state vs. event, is what makes incremental ingestion correct.

Three CDC strategies, in descending order of fidelity:

  1. Log-based CDC reads the database's transaction log directly: PostgreSQL's WAL, MySQL's binlog, Oracle's redo logs, SQL Server's change tracking. Connectors like Debezium or AWS DMS decode the log and emit a stream of change events with full INSERT/UPDATE/DELETE semantics, primary keys, before-and-after values, and transaction order.
  2. Trigger-based CDC installs DB triggers that fire on writes and write change rows to a side table; consumers poll. Universal but slow and lossy on failure.
  3. Query-based CDC asks the source for rows changed since a known point in time. Simplest pattern; covered as watermarking below.
Animation: Log-based CDC capture sequence
An INSERT/UPDATE/DELETE writes to the WAL; Debezium decodes; Kafka durably buffers; the warehouse upserts on primary key. Source DB PostgreSQL / MySQL Debezium / DMS decode log Kafka topic durable buffer Stream proc Flink / Spark / Glue Warehouse upsert MERGE on primary key Transaction log (WAL / binlog) read by connector: LSN 1024 INSERT users(id=42, name='Ada') LSN 1025 UPDATE users SET tier='gold' WHERE id=42 LSN 1026 DELETE FROM orders WHERE id=99 (captured!) 1. Write 2. Decode 3. Buffer 4. Process 5. Apply (idempotent)
A row change is written to the WAL/binlog. The connector decodes the log entry into a typed change event (INSERT, UPDATE, or DELETE) and publishes to Kafka. A stream processor consumes events in order and applies them to the warehouse via a MERGE keyed on the primary key, producing an idempotent, replayable apply.

Figure 4.3: Log-based CDC reference architecture

flowchart LR DB[("Source DB
PostgreSQL WAL
or MySQL binlog")] -->|read txn log| CN[Debezium / DMS
connector] CN -->|emit change events
INSERT/UPDATE/DELETE| K[(Kafka topic
durable, replayable)] K --> SP[Stream processor
Flink / Spark / Glue] SP -->|MERGE on
primary key| WH[(Warehouse
upsert sink)]

AWS DMS in particular supports three modes: full load only (one-time snapshot), CDC only (changes after a known LSN), and full-load-plus-CDC (snapshot then continuous stream) — the third is what you want for migrations and ongoing replication into a warehouse.

Watermarking and high-water-mark queries

When log-based CDC is impractical — analytics-only stores, third-party APIs without a binlog, small infrequently changing tables — query-based CDC with a watermark is the pragmatic choice. A watermark is a monotonically increasing column on the source: a last_modified_at timestamp, an auto-incrementing id, or a version field. The pipeline records the highest value it saw last run (the high-water mark, HWM) and asks the source for everything beyond that mark.

The query template:

SELECT *
FROM   users
WHERE  last_modified_at >  :prev_hwm
  AND  last_modified_at <= :current_hwm
ORDER  BY last_modified_at;

prev_hwm is loaded from the pipeline's metadata store. current_hwm is captured at the start of the run, often now() - 5 minutes to give in-flight transactions time to commit. After the run succeeds, the metadata store advances prev_hwm to current_hwm. If the run fails, prev_hwm is left unchanged so the retry covers the same window.

Animation: Watermark advancement across three runs
A high-water mark slides forward across three pipeline runs; rows behind the mark are processed, rows ahead remain unprocessed. SELECT * WHERE last_modified_at > prev_hwm AND <= current_hwm 10:00 10:15 10:30 10:45 11:00 prev_hwm = 10:00 id=42 @10:05 id=43 @10:10 id=44 @10:20 id=45 @10:25 id=46 @10:35 HWM Run 1: HWM 10:00 → 10:15 Run 2: HWM 10:15 → 10:30 Run 3: HWM 10:30 → 10:45 On failure: leave prev_hwm; safe retry.
The HWM slides forward in three steps. Each step processes the rows whose last_modified_at falls inside the current window; processed rows highlight blue. After a successful run the metadata store advances prev_hwm. After a failed run, prev_hwm stays put so the same window is retried — the downstream upsert absorbs duplicates.

Figure 4.4: Watermark advancement state machine

stateDiagram-v2 [*] --> LoadPrevHWM LoadPrevHWM: Load prev_hwm
from metadata store LoadPrevHWM --> CaptureCurrent CaptureCurrent: Capture current_hwm
(now() - lag window) CaptureCurrent --> QuerySource QuerySource: SELECT WHERE col > prev_hwm
AND col <= current_hwm QuerySource --> WriteSink WriteSink: Idempotent write
to warehouse WriteSink --> Success WriteSink --> Failure Success: Advance prev_hwm := current_hwm Success --> [*] Failure: Leave prev_hwm unchanged
(safe retry) Failure --> LoadPrevHWM

Watermark column choices

Watermark columnProsCons
last_modified_at (timestamp)Universal, easy to reason aboutClock skew, late writes
Auto-increment idStrict monotonicity, no skewCannot detect updates, only inserts
Database lsn / sequenceCaptures inserts and updatesEngine-specific, may not be queryable

The right answer is often a combination: insert-only fact tables keyed on id, mutable dimension tables keyed on last_modified_at plus a soft-delete column, and a periodic full-compare to catch drift.

Glue job bookmarks

When the source is S3 rather than a database, Glue offers a built-in incremental mechanism called job bookmarks. A bookmark is metadata Glue persists between runs — which files (by path), timestamps, and row counts have already been processed — and the next run automatically skips them. A canonical use case:

Run 1 (Mon):  Process files 1 to 100.   Bookmark stores last_path=file_100.
Run 2 (Tue):  Process files 101 to 150. Files 1 to 100 are skipped automatically.
Run 3 (Wed):  No new files. Job runs but processes zero rows.

Bookmarks require two cooperating elements in your script: a bookmark-aware source (create_dynamic_frame.from_catalog or from_options with transformation_ctx set) and a final job.commit(). Without job.commit() the bookmark does not advance and the next run reprocesses the same files.

Operational notes: bookmark scope is per transformation_ctx, so multi-source jobs must give each source its own context name; bookmarks survive job edits as long as the context name is stable; resetting requires the deliberate "Reset bookmark" or "Run with bookmark disabled" Glue console actions, useful for backfills.

Post-Section Quiz: Incremental Ingestion — CDC and Watermarking

1. Which property is unique to log-based CDC (Debezium, AWS DMS) compared with trigger-based or query-based CDC?

A. It works without any connector or external transport.
B. It guarantees zero performance impact on the source while reliably capturing INSERT, UPDATE, and DELETE in transaction order, including the full before/after values.
C. It only requires that the source expose a last_modified_at column.
D. It cannot capture deletes, only inserts and updates.

2. A team uses query-based CDC against a SaaS API: SELECT * WHERE last_modified_at > :prev_hwm. Which weakness of this pattern is structural rather than tunable?

A. It can only run once per day.
B. The watermark column cannot be a timestamp.
C. Deletes are invisible because the query can only return rows that still exist; without a soft-delete flag or periodic full reconcile, removed rows linger downstream.
D. It cannot be combined with a downstream upsert.

3. To absorb clock skew when a transaction commits with last_modified_at = T but is not visible until T + 30s, what is the standard mitigation?

A. Advance prev_hwm exactly to the new current_hwm with no overlap.
B. Capture current_hwm as now() - lag_window and/or read with a small overlap behind prev_hwm; the downstream upsert absorbs duplicates.
C. Switch to an auto-incrementing id column, which detects updates and deletes as a side effect.
D. Disable the watermark and run a full reload nightly forever.

4. Which two ingredients are required for a Glue job bookmark to actually advance between runs?

A. A G.4X worker and AQE enabled.
B. A bookmark-aware source created with a stable transformation_ctx AND a final job.commit() call.
C. A Glue Studio diagram and a Lake Formation permission grant.
D. A scheduled crawler and an Athena query on the result.

5. A team wants to capture inserts only on a strictly append-only event table where rows are never updated. Which watermark column has the cleanest semantics?

A. A last_modified_at timestamp updated by every UPDATE.
B. A boolean is_processed flag.
C. An auto-incrementing id: strictly monotonic, no clock skew, perfect for insert-only tables — at the price of not detecting updates.
D. A random UUID assigned by the producer.
Pre-Section Quiz: Reliability — Idempotency and Schema Evolution

1. Why is the clause WHEN MATCHED AND s.version > t.version the load-bearing piece of a CDC MERGE?

A. It accelerates the join by triggering a hash-join plan.
B. It guarantees uniqueness on the primary key.
C. It silently discards stale or duplicate events arriving after a fresher one, making replays and out-of-order CDC streams safe.
D. It enables Adaptive Query Execution at runtime.

2. For a daily batch load that needs partition-grained idempotency without a MERGE-capable engine, which pattern fits best?

A. Stream the entire dataset row-by-row through Kafka.
B. Atomic partition swap: write new data to a staging path, validate, then swap the path into the table catalog atomically — orphaned staging is the only side effect on failure.
C. Drop the partition first, then write. If the job dies mid-write, the partition is gone.
D. Disable retries and require manual reruns.

3. A teammate adds now() inside a transformation that turns raw JSON into a curated table. Why does this break replayability?

A. now() is too slow for batch workloads.
B. now() is non-deterministic — re-running the transform on the same raw input produces different outputs each time, so a replay no longer reconstructs past state.
C. now() only works in streaming engines.
D. now() is incompatible with Parquet.

4. A producer adds a new optional column customer_segment to its event schema. The lake uses Glue crawlers and Parquet. What is the expected behavior?

A. The pipeline fails until every consumer is redeployed.
B. The crawler detects the new field on the next run and updates the catalog table; downstream queries can ignore unknown columns or pick them up immediately. The change is backward-compatible.
C. All historical Parquet files are rewritten to include a NULL column.
D. Downstream consumers must run ResolveChoice with match_catalog for the read to succeed.

5. What role does a schema registry (Glue Schema Registry, Confluent Schema Registry) play that crawlers and ResolveChoice cannot?

A. It executes Spark transformations on registered schemas.
B. It rewrites historical Parquet files to match new schemas.
C. It versions the contract between producers and consumers and gates breaking changes at registration time, before they reach production.
D. It replaces the Glue Data Catalog as the metadata store.

Reliability: Idempotency and Schema Evolution

Incremental ingestion is necessary but not sufficient. The same job will be retried after a network blip, replayed after a bad upstream change, and rerun after a DDL evolution that broke its assumptions. The reliability of a batch pipeline is determined less by its happy-path code than by what happens on the second, third, and fourth runs. Three patterns make those reruns safe: idempotent writes, replayability via raw zone retention, and disciplined schema evolution.

Key Points

Idempotent writes and exactly-once semantics

A write is idempotent when applying it twice produces the same result as applying it once. Idempotency is the single most important property of a reliable batch job, because partial failures, retries, and replays are not edge cases — they are the norm. Three implementation patterns deliver idempotency in practice.

1. Upsert with version guards

The strongest pattern for mutable rows is a MERGE keyed on the primary key, with a guard clause that ignores out-of-order updates:

MERGE INTO target_table t
USING staged_changes  s
ON    t.id = s.id
WHEN MATCHED AND (s.version > t.version OR t.version IS NULL) THEN
    UPDATE SET t.value = s.value,
               t.version = s.version,
               t.updated_at = s.updated_at
WHEN NOT MATCHED THEN
    INSERT (id, value, version, updated_at)
    VALUES (s.id, s.value, s.version, s.updated_at);

The s.version > t.version clause is load-bearing. It means a stale event arriving after a fresher one is silently discarded. Replays are safe; out-of-order CDC streams are safe.

Figure 4.5: Idempotent upsert decision flow with version guard

flowchart TD Start([Incoming change
s.id, s.version, s.value]) --> Lookup{Row exists
in target?} Lookup -->|No| Insert[INSERT new row
id, value, version] Lookup -->|Yes| Compare{s.version >
t.version?} Compare -->|Yes
fresher event| Update[UPDATE value, version,
updated_at] Compare -->|No
stale or duplicate| Skip[No-op
discard silently] Insert --> End([Commit]) Update --> End Skip --> End classDef safe fill:#1f3a5f,stroke:#58a6ff,color:#fff; class Skip safe;

2. Deduplication tables

When MERGE is unavailable (large append-only Parquet partitions, for example), maintain a small ledger of processed event IDs and filter incoming events against it:

INSERT INTO target_table
SELECT s.*
FROM   staged_changes s
WHERE  NOT EXISTS (
    SELECT 1
    FROM   cdc_processed p
    WHERE  p.source_id = s.id
      AND  p.operation_sequence = s.seq
);

Pair the insert with an insert into cdc_processed inside the same transaction (or atomic write). Replays that find the row already in the ledger become no-ops.

3. Atomic partition swaps

For partition-grained idempotency, write the new data to a staging path, validate it, and then swap the staging path into the table catalog atomically. If the run fails midway, the staging path is orphaned (and a janitor cleans it up); the live table is untouched.

PatternWhen to useCost
Upsert with version guardMutable warehouse tables, CDC sinksIndex lookup per row
Dedup tableAppend-only with event IDsExtra storage and join
Atomic partition swapDaily batch loadsTwo-phase write, requires catalog support

"Exactly-once" is a marketing term; "exactly-once-effective" is the engineering reality. Your job may execute twice, but the observable result is the same as if it had executed once. That is what idempotency buys you, and it is the strongest guarantee you should claim.

Replayability via raw zone retention

A pipeline is replayable if you can reconstruct any past state of any downstream table by re-running transformations against retained raw inputs. Replayability is what saves you when (not if) a transformation has a bug, a business rule changes retroactively, or a regulator asks for a six-month-old report computed under the rules in force then.

The architectural foundation is the raw zone: an immutable, retention-tagged S3 prefix where every input file lands in its original form, with metadata recording when it arrived. The raw zone is append-only by policy; nothing in it is ever rewritten. Transformations read from raw and write to a curated zone; the curated zone can be wiped and rebuilt from raw at any time.

s3://lake/raw/                      ← immutable, partitioned by ingest date
  orders/ingest_date=2026-05-07/...
  events/ingest_date=2026-05-07/...

s3://lake/curated/                  ← rebuildable, partitioned by business date
  orders_clean/order_date=2026-05-07/...

s3://lake/marts/                    ← presentation, used by BI tools
  fact_orders/...

Three operational practices keep replayability honest:

Schema evolution handling

Schema evolution is the inevitable change in the structure of incoming data: a new column appears, a type widens from int to bigint, an old column is renamed, an enum gains a value. A pipeline that breaks every time a producer changes is a pipeline that owns the producer's roadmap. The goal is to absorb common changes automatically and surface only the changes that genuinely need engineering attention.

The schema-change taxonomy

Change typeCompatible?Handling
Add nullable columnYes (backward)Auto-add, default to NULL
Add required columnNoProducer must coordinate; default value or version bump
Drop columnSometimes (forward)Keep reading, let column be NULL downstream
Rename columnNoTreat as drop + add; usually requires alias mapping
Widen type (int → bigint)YesPromote downstream type
Narrow type (bigint → int)NoReject or quarantine
Add enum valueDependsAllow if downstream uses string; gate if mapped to bounded type

Three mechanisms handle most of these automatically:

For producer-facing pipelines (events from microservices into a lake), a registry is the right answer. For analyst-facing pipelines (curated tables consumed by dashboards), versioned dbt or SQL Mesh models give you the same control with deployment gates. The unsexy but critical practice is monitoring schema drift: a weekly diff of today's catalog schema against last week's catches the slow accretion of fields that no one announced.

Post-Section Quiz: Reliability — Idempotency and Schema Evolution

1. Why is the clause WHEN MATCHED AND s.version > t.version the load-bearing piece of a CDC MERGE?

A. It accelerates the join by triggering a hash-join plan.
B. It guarantees uniqueness on the primary key.
C. It silently discards stale or duplicate events arriving after a fresher one, making replays and out-of-order CDC streams safe.
D. It enables Adaptive Query Execution at runtime.

2. For a daily batch load that needs partition-grained idempotency without a MERGE-capable engine, which pattern fits best?

A. Stream the entire dataset row-by-row through Kafka.
B. Atomic partition swap: write new data to a staging path, validate, then swap the path into the table catalog atomically — orphaned staging is the only side effect on failure.
C. Drop the partition first, then write. If the job dies mid-write, the partition is gone.
D. Disable retries and require manual reruns.

3. A teammate adds now() inside a transformation that turns raw JSON into a curated table. Why does this break replayability?

A. now() is too slow for batch workloads.
B. now() is non-deterministic — re-running the transform on the same raw input produces different outputs each time, so a replay no longer reconstructs past state.
C. now() only works in streaming engines.
D. now() is incompatible with Parquet.

4. A producer adds a new optional column customer_segment to its event schema. The lake uses Glue crawlers and Parquet. What is the expected behavior?

A. The pipeline fails until every consumer is redeployed.
B. The crawler detects the new field on the next run and updates the catalog table; downstream queries can ignore unknown columns or pick them up immediately. The change is backward-compatible.
C. All historical Parquet files are rewritten to include a NULL column.
D. Downstream consumers must run ResolveChoice with match_catalog for the read to succeed.

5. What role does a schema registry (Glue Schema Registry, Confluent Schema Registry) play that crawlers and ResolveChoice cannot?

A. It executes Spark transformations on registered schemas.
B. It rewrites historical Parquet files to match new schemas.
C. It versions the contract between producers and consumers and gates breaking changes at registration time, before they reach production.
D. It replaces the Glue Data Catalog as the metadata store.

Your Progress

Answer Explanations