Chapter 6: Distributed Processing with Spark and EMR
Learning Objectives
Explain the Spark execution model — how the driver, executors, stages, and tasks coordinate to process data in parallel.
Use the DataFrame API and Spark SQL to express transformations that run efficiently at scale.
Tune Spark jobs using partitioning, caching, broadcast joins, and Adaptive Query Execution (AQE).
Choose between EMR on EC2, EMR Serverless, and EMR on EKS based on workload, cost, and operational requirements.
When pipelines outgrow a single machine, Apache Spark answers the question of how to coordinate hundreds of machines for batch and streaming analytics, and Amazon EMR is the AWS-managed substrate that runs Spark in production. This guide tours Spark's scheduling model, the day-to-day APIs, EMR's three deployment models, and modern performance tuning anchored by Adaptive Query Execution.
Chapter Overview Diagrams
Two diagrams set the foundation for the rest of the chapter. The first shows where each piece of code physically runs in a Spark cluster; the second shows how a single action becomes a hierarchy of stages and tasks separated by shuffle boundaries.
Part 1: Spark Internals — Driver, Executors, Catalyst
Pre-Reading Check — Part 1
1. A PySpark job runs df.collect() on a 100 GB DataFrame. What is the most likely failure mode and why?
Executors run out of memory because each executor must materialize the full 100 GB.The driver runs out of memory because collect() pulls every partition back to a single JVM.The cluster manager rejects the job because it exceeds the YARN container size limit.The job runs successfully but takes hours because Spark cannot parallelize collect().
2. Why does writing the same query as a DataFrame typically outperform an RDD pipeline of map and filter calls on opaque objects?
DataFrames run on more cores per executor than RDDs.RDDs use disk by default while DataFrames stay in memory.DataFrames expose schema and structure to Catalyst, which can rearrange operations and generate fused code via Tungsten.DataFrames bypass the JVM and run native machine code for every operator.
3. A job has three stages of identical narrow transformations. Why are they three stages instead of one?
Each stage corresponds to a separate action in the user code.Each stage is bounded by a shuffle, so wide transformations between them force the boundary.Spark always uses three stages by default for parallelism.The DAG scheduler splits long stages to keep individual task durations short.
Driver, Cluster Manager, and Executors
A Spark cluster is like a kitchen running a banquet. The driver is the head chef who reads the orders and decides what dishes to prepare. The cluster manager is the restaurant manager who allocates kitchen stations. The executors are the line cooks who chop, sauté, and plate the food.
The Spark driver runs the application's main() function and maintains the SparkContext. It converts user actions into tasks, schedules those tasks on executors, collects results, and communicates with the cluster manager. If the driver dies, the application dies. Executors are JVM worker processes that run tasks, cache RDDs and DataFrames in memory, and send heartbeats to the driver. The cluster manager — YARN on EMR on EC2, Kubernetes on EMR on EKS and EMR Serverless, or Standalone — allocates executor processes and handles node failures.
Component
Role
Lifetime
Driver
Plans and coordinates execution
Application
Cluster Manager
Allocates resources
Cluster
Executor
Runs tasks, caches data
Application (typically)
Task
Processes one partition
Stage
The driver-executor split is the source of one of the most common Spark gotchas. df.collect() pulls all partitions back to the driver and can OOM it if the result is large. Conversely, you cannot reference a SparkContext inside a UDF, because the UDF runs on executors that have no access to it.
Animated: Driver dispatches tasks to executors across worker nodes
The driver plans, the cluster manager allocates, the executors do the work.
Knowing where each piece of your code runs is the foundation for both correctness and performance.
RDDs, DataFrames, and Datasets
Spark exposes three layered abstractions. The RDD is a fault-tolerant, partitioned collection of records — flexible but opaque to the optimizer. The DataFrame is a distributed table with named, typed columns; because the schema is known, Spark can rearrange operations and generate efficient code. The Dataset is a typed Scala/Java extension of DataFrame; PySpark has no separate Dataset.
Abstraction
Schema
Optimizer Visibility
Typical Use
RDD
None
Opaque
Custom partitioning, unstructured data
DataFrame
Yes
Full
95% of analytics workloads
Dataset
Yes
Full (Scala/Java)
Type-safe pipelines on the JVM
If RDD is hand-written assembly, DataFrame is C — high enough that a compiler can optimize aggressively, low enough to express almost any computation. Default to DataFrames. Reach for RDDs only when you need control the optimizer cannot give you.
Catalyst Optimizer and Tungsten
Two engines make DataFrame operations fast. Catalyst is Spark's query optimizer: it translates a query into a logical plan, applies rule-based optimizations (predicate pushdown, constant folding, column pruning), then explores cost-based physical alternatives. Catalyst will push a filter down into the Parquet reader so the file format itself skips disqualified row groups. Tungsten is the physical execution engine: off-heap memory, cache-friendly binary row formats, and whole-stage code generation that fuses operators into a single tight loop of bytecode.
Catalyst pushes region = 'EU' into the sales scan, prunes customer columns, decides whether customers fits a broadcast join, and reorders operations to minimize shuffle. Tungsten then generates a single fused operator: scan → filter → project → join probe → partial aggregate, with no intermediate row collections.
Figure 6.2: Catalyst optimizer phases
flowchart LR
A[DataFrame / SQL] --> B[Unresolved Logical Plan]
B -->|Catalog lookup| C[Resolved Logical Plan]
C -->|Rule-based: predicate pushdown, column pruning, constant folding| D[Optimized Logical Plan]
D -->|Cost-based strategy selection| E[Physical Plans]
E -->|Cost model| F[Selected Physical Plan]
F -->|Tungsten whole-stage codegen| G[Executable RDDs]
Animated: Catalyst lights up each plan stage in sequence
Each phase activates in turn — parsed, analyzed, optimized, physical plan, codegen — with a 0.6s stagger.
Jobs, Stages, Tasks, and the Shuffle
Spark uses lazy evaluation. Transformations like filter, select, and join build a plan but do not execute. Only an action — collect(), count(), save(), write() — triggers a job.
The DAG scheduler splits each job into stages separated by shuffle boundaries. Within a stage, operators pipeline. A task is the smallest unit of work — one partition on one executor core. The task scheduler prefers data locality: PROCESS_LOCAL > NODE_LOCAL > RACK_LOCAL > ANY.
Wide transformations (groupByKey, join, distinct, repartition) force shuffles because rows sharing a key must land on the same partition. The shuffle has four phases: map-side partitioning to local disk, sorted shuffle write, network fetch by reducers, and reduce-side aggregation. Shuffle is by far the most expensive operation in Spark. If a partition is lost, Spark recomputes it from lineage — the "resilient" in RDD.
Key Takeaways
A job becomes stages at every shuffle boundary; a stage becomes one task per partition.
The Spark UI's stage and task views are the most direct way to diagnose performance problems.
Catalyst plus Tungsten is why DataFrames win — keep operations declarative so the optimizer can see them.
Reinforcement — Part 1
1. A PySpark job runs df.collect() on a 100 GB DataFrame. What is the most likely failure mode and why?
Executors run out of memory because each executor must materialize the full 100 GB.The driver runs out of memory because collect() pulls every partition back to a single JVM.The cluster manager rejects the job because it exceeds the YARN container size limit.The job runs successfully but takes hours because Spark cannot parallelize collect().
2. Why does writing the same query as a DataFrame typically outperform an RDD pipeline of map and filter calls on opaque objects?
DataFrames run on more cores per executor than RDDs.RDDs use disk by default while DataFrames stay in memory.DataFrames expose schema and structure to Catalyst, which can rearrange operations and generate fused code via Tungsten.DataFrames bypass the JVM and run native machine code for every operator.
3. A job has three stages of identical narrow transformations. Why are they three stages instead of one?
Each stage corresponds to a separate action in the user code.Each stage is bounded by a shuffle, so wide transformations between them force the boundary.Spark always uses three stages by default for parallelism.The DAG scheduler splits long stages to keep individual task durations short.
Part 2: Writing Spark Jobs and EMR Deployment
Pre-Reading Check — Part 2
1. A team writes a PySpark pipeline of ten chained transformations followed by silver.write.save(...). When does Spark actually start computing?
As each transformation is added, since PySpark is eager.Only when save() runs, because all the prior calls were lazy transformations.When SparkSession.builder.getOrCreate() initializes the context.When a downstream BI tool issues a query against the output table.
2. A daily 30-minute Spark batch job today runs on a long-lived EMR-on-EC2 cluster sized for the peak window. What change would most likely cut cost without losing functionality?
Switch to EMR Serverless so you only pay for the 30 minutes of vCPU and memory consumed.Add more EC2 instances to finish faster.Move to EMR on EKS to add a Kubernetes cluster surcharge.Disable AQE so the job uses fewer optimizer cycles.
3. Why does PySpark match Scala speed for declarative DataFrame work but lag on row-level Python UDFs?
Python is single-threaded so it cannot use multiple cores.DataFrame operations execute in the JVM regardless of front-end language, while Python UDFs require per-row serialization between the JVM and a Python process.PySpark UDFs always run on the driver, blocking parallelism.Scala has a faster Catalyst optimizer than PySpark.
DataFrame Transformations and Actions
DataFrame operations come in two flavors: transformations that build a logical plan (lazy) and actions that trigger execution (eager). Transformations include select, filter, withColumn, groupBy, agg, join, union, distinct, repartition. Actions include show, collect, count, take, write, foreach.
The whole pipeline is a chain of transformations that build a single logical plan; nothing runs until .save(). Output is partitioned by date and region so downstream queries can skip files via partition pruning.
Spark SQL and the Hive Metastore
Spark SQL is the same engine as DataFrame with SQL as another front end. Any DataFrame can be registered as a temporary view (df.createOrReplaceTempView("orders")) and queried with SQL; any SQL query returns a DataFrame.
For long-lived tables Spark uses a metastore — typically the Hive Metastore or AWS Glue Data Catalog on EMR — to persist schema, location, partition layout, and storage format. EMR integrates with the AWS Glue Data Catalog, so the same table is visible to Spark, Athena, Redshift Spectrum, and Trino. That single shared catalog is what lets a pile of S3 files behave like a queryable warehouse for multiple engines.
PySpark vs Scala vs SQL
DataFrame and SQL operations run inside the JVM regardless of front-end language, so PySpark matches Scala speed for declarative work. PySpark falls behind only on custom Python UDFs, which serialize every row to Python and back. Pandas UDFs (vectorized, Arrow-based) close most of that gap.
Aspect
Scala
PySpark
SQL
Native runtime
JVM
Python ↔ JVM bridge
JVM
DataFrame perf
Fastest baseline
Same as Scala
Same as DataFrame
UDF perf
Fast (JVM UDFs)
Slower; pandas UDFs help
N/A
Best for
Custom UDFs, frameworks
ETL, ML pipelines
Reports, ad hoc
Pragmatic recommendation: PySpark for orchestration and team familiarity, Spark SQL views for reusable transformations, and Scala libraries for performance-critical components.
Key Takeaways
Build pipelines as long chains of declarative transformations and trigger them with a single action.
The metastore turns S3 files into a shared warehouse — Spark, Athena, Redshift Spectrum, Trino all see the same table.
Avoid Python UDFs in hot paths; use pandas UDFs or Spark SQL functions instead.
Amazon EMR — Three Deployment Models
Amazon EMR is AWS's managed big-data platform. It supports Spark, Hadoop, Hive, HBase, Trino, and Flink, but for modern data engineering it is shorthand for "managed Spark." AWS reports EMR Spark runs up to 5.4x faster than open-source Apache Spark.
EMR on EC2 (Classic Clusters)
A cluster of EC2 instances configured as Hadoop master and worker nodes, with YARN as the cluster manager and HDFS or EMRFS (S3) for storage. Maximum configurability, manual cluster sizing, 3–10 minute cold start. Spot Instance integration enables 50–90% cost reductions for non-critical workloads.
Choose when: long-running 24/7 ETL, configuration-heavy workloads (custom JARs, kernel tuning, mixed frameworks), or sustained batch where Spot Instances on a persistent cluster pay for themselves.
EMR Serverless
You define an "application" with a framework (Spark or Hive) and submit jobs; AWS provisions and tears down capacity transparently. Pay-per-job billing based on actual vCPU-hours and memory-hours — no idle cost. Cold start of 30–60 seconds, fully automatic autoscaling, limited customization (custom Python venvs and Docker images supported).
Choose when: bursty or scheduled workloads, dev/test environments, anywhere a long-lived cluster would waste 23+ hours of idle capacity per day.
EMR on EKS
Spark jobs run as Kubernetes pods on an existing Amazon EKS cluster. Native K8s scheduling, RBAC, network policies. You pay for the EKS cluster regardless of EMR usage; EMR adds a per-vCPU surcharge for Spark jobs. Resource sharing across workload types improves utilization.
Choose when: the organization already runs EKS in production and wants a unified platform for data and application workloads, or needs advanced K8s features like GPU node pools.
Comparison
Dimension
EMR on EC2
EMR Serverless
EMR on EKS
Infrastructure
Manual cluster
Fully managed
K8s-managed
Cold start
3–10 min
30–60 s
20–60 s
Cost model
Instance-based
Pay-per-job (vCPU-h)
EKS + pod resources
Idle cost
High
None
EKS baseline
Best for
Sustained 24/7 ETL
Bursty / scheduled / dev
Existing EKS shops
Mature platforms often mix the models: EMR on EC2 (or EKS) for always-on baseline pipelines, EMR Serverless for the long tail of analyst queries, backfills, and data-science exploration. This minimizes idle cost while preserving operational control where it matters.
Key Takeaway
EMR on EC2 for sustained, configuration-heavy work.
EMR Serverless for sporadic, unpredictable jobs — no idle cost.
EMR on EKS when you already run Kubernetes and want unified infrastructure.
Reinforcement — Part 2
1. A team writes a PySpark pipeline of ten chained transformations followed by silver.write.save(...). When does Spark actually start computing?
As each transformation is added, since PySpark is eager.Only when save() runs, because all the prior calls were lazy transformations.When SparkSession.builder.getOrCreate() initializes the context.When a downstream BI tool issues a query against the output table.
2. A daily 30-minute Spark batch job today runs on a long-lived EMR-on-EC2 cluster sized for the peak window. What change would most likely cut cost without losing functionality?
Switch to EMR Serverless so you only pay for the 30 minutes of vCPU and memory consumed.Add more EC2 instances to finish faster.Move to EMR on EKS to add a Kubernetes cluster surcharge.Disable AQE so the job uses fewer optimizer cycles.
3. Why does PySpark match Scala speed for declarative DataFrame work but lag on row-level Python UDFs?
Python is single-threaded so it cannot use multiple cores.DataFrame operations execute in the JVM regardless of front-end language, while Python UDFs require per-row serialization between the JVM and a Python process.PySpark UDFs always run on the driver, blocking parallelism.Scala has a faster Catalyst optimizer than PySpark.
Part 3: Performance Tuning — Partitions, Shuffle, AQE
Pre-Reading Check — Part 3
1. The Spark UI shows that 199 of 200 tasks finish in seconds while one drags on for 12 minutes. What is happening and what tool addresses it automatically?
A garbage collection pause on one executor — increasing executor memory fixes it.Data skew — one partition is dramatically larger than the others. AQE's OptimizeSkewedJoin rule splits oversized partitions and replicates the matching side.A flaky network link — Spark's speculative execution will retry the slow task.Scheduler congestion — increasing spark.task.cpus resolves it.
2. A query joins a 2 TB facts table with a 5 MB dimension table. Why does a broadcast join win, and what risk does it carry?
Broadcast sends the small side to every executor as a hash table, eliminating the shuffle on the large side; the risk is OOM on the driver if the small side is actually larger than expected.Broadcast joins are always faster regardless of size; there is no risk.Broadcast moves both sides to a single executor; the risk is losing parallelism.Broadcast disables shuffle for the entire job; the risk is incorrect results.
3. What is the conceptual shift Adaptive Query Execution introduces compared to traditional Catalyst optimization?
AQE replaces Catalyst with a different optimizer.AQE moves from static optimization (plan fixed before execution from table stats) to dynamic optimization (plan adapts at runtime using actual shuffle output sizes).AQE caches the result of every stage so reruns are instant.AQE only applies to streaming workloads, not batch.
Partition Sizing and Skew
Partitions are the unit of parallelism. Sizing rule of thumb: target 100–200 MB per partition. Tasks then run for tens of seconds — long enough to amortize scheduling overhead, short enough that retries are cheap. The default spark.sql.shuffle.partitions = 200 is rarely the right answer for real workloads.
Skew is the silent killer. If most join keys have a few thousand rows but one (say customer_id = 'enterprise_account') has 50 million, that one partition becomes a "straggler" while the rest of the cluster idles. The task duration histogram in the Spark UI surfaces this immediately.
Manual mitigations include repartitioning before joins, salting hot keys (append a random suffix and replicate the matching side), filtering early, and pre-aggregating before the shuffle. AQE's OptimizeSkewedJoin rule handles this automatically — published benchmarks show reductions from ~7.7 minutes to ~1 minute on heavily skewed joins.
Broadcast Joins and Shuffle Reduction
A regular join shuffles both sides so matching keys land on the same partition — expensive. A broadcast join sends the smaller table to every executor as a hash table; each executor probes its local table with no shuffle on the large side.
Figure 6.4: Shuffle hash join vs broadcast join
flowchart TB
subgraph Shuffle[Shuffle Hash Join - both sides shuffled]
direction LR
L1[Large Table partitions] -->|shuffle by key| LS[Repartitioned Large]
S1[Small Table partitions] -->|shuffle by key| SS[Repartitioned Small]
LS --> J1[Join: matching keys co-located]
SS --> J1
end
subgraph Broadcast[Broadcast Join - small side replicated]
direction LR
L2[Large Table partitions stay put] --> J2[Local Hash Probe on every executor]
S2[Small Table] -->|broadcast to all executors| HT[Hash Table in executor memory]
HT --> J2
end
Broadcast joins win when one side is small (default <30 MB after compression, configurable via spark.sql.adaptive.broadcastJoinThreshold) or when the large side is heavily skewed — broadcast joins are immune to skew because they do not partition the large side at all.
from pyspark.sql.functions import broadcast
# Explicit hint - recommended when you know the table is small
sales.join(broadcast(dim_region), "region_id")
# Automatic via AQE - converts at runtime when size < threshold
# Requires: spark.sql.adaptive.enabled = true
The risk: broadcast joins OOM the driver if the "small" side turns out to be 5 GB. Validate the size before using an explicit hint, and rely on AQE's runtime size detection when unsure.
Caching, Persisting, and Adaptive Query Execution
Caching keeps a DataFrame's partitions in memory (or memory + disk) so subsequent actions reuse them. Cache when a DataFrame is referenced by multiple actions, when an iterative algorithm revisits the same data, or when an expensive transformation feeds multiple queries. Don't cache one-time DataFrames or data that doesn't fit. Always pair .cache() with .unpersist().
Adaptive Query Execution (AQE), default-enabled in modern Spark, is the most important tuning feature added in the last decade. AQE uses runtime statistics — actual intermediate result sizes after each stage — to re-plan the rest of the query. The shift is from static optimization (plans set before execution) to dynamic optimization (plans adapt to runtime data sizes).
AQE Rule
What it does
Typical benefit
CoalesceShufflePartitions
Merges small post-shuffle partitions
200 → 4 partitions; less scheduling overhead
OptimizeSkewedJoin
Splits oversized partitions, replicates matching side
stateDiagram-v2
[*] --> InitialPlan: Catalyst optimizes using static stats
InitialPlan --> RunStage: Submit next stage
RunStage --> CollectStats: Stage completes
CollectStats --> Decide: Inspect actual shuffle output sizes
Decide --> Coalesce: Many tiny partitions?
Decide --> SkewSplit: Oversized skewed partition?
Decide --> ConvertJoin: Small side now fits broadcast?
Decide --> RunStage: No change needed
Coalesce --> RunStage: Merge partitions and continue
SkewSplit --> RunStage: Split + replicate matching side
ConvertJoin --> RunStage: Sort-merge to broadcast hash join
RunStage --> Done: Final stage complete
Done --> [*]
Animated: AQE detects a skewed partition and splits it at runtime
Stage runs → stats collected → AQE detects oversized partition → splits into balanced sub-partitions → next stage continues with even work.
A solid baseline configuration for production Spark on EMR:
# Enable AQE and its sub-rules
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.localShuffleReader.enabled=true
# Broadcast join threshold (after AQE runtime sizing)
spark.sql.adaptive.broadcastJoinThreshold=30M
# Dynamic partition pruning (for partitioned fact tables)
spark.sql.dynamicPartitionPruning.enabled=true
# Initial shuffle partition count (AQE will coalesce as needed)
spark.sql.shuffle.partitions=400
Validation Workflow
Enable AQE and run the job; capture runtime in the Spark UI.
Check the SQL tab for AQE annotations showing coalesced partitions or converted joins.
Look for straggler tasks in the Stages tab; add broadcast() hints or salt if AQE missed a skew.
Check executor memory and GC time; spilling to disk means more memory or smaller partitions.
Use EXPLAIN FORMATTED to confirm predicate pushdown to file readers.
Key Takeaways
Aim for 100–200 MB partitions and watch the Spark UI for stragglers.
The fastest shuffle is the one you avoid — broadcast small dims, pre-aggregate, filter early.
Cache only what you reuse; orphaned caches eat executor memory.
Enable AQE everywhere — it handles most coalescing, skew, and join conversion automatically.
Reinforcement — Part 3
1. The Spark UI shows that 199 of 200 tasks finish in seconds while one drags on for 12 minutes. What is happening and what tool addresses it automatically?
A garbage collection pause on one executor — increasing executor memory fixes it.Data skew — one partition is dramatically larger than the others. AQE's OptimizeSkewedJoin rule splits oversized partitions and replicates the matching side.A flaky network link — Spark's speculative execution will retry the slow task.Scheduler congestion — increasing spark.task.cpus resolves it.
2. A query joins a 2 TB facts table with a 5 MB dimension table. Why does a broadcast join win, and what risk does it carry?
Broadcast sends the small side to every executor as a hash table, eliminating the shuffle on the large side; the risk is OOM on the driver if the small side is actually larger than expected.Broadcast joins are always faster regardless of size; there is no risk.Broadcast moves both sides to a single executor; the risk is losing parallelism.Broadcast disables shuffle for the entire job; the risk is incorrect results.
3. What is the conceptual shift Adaptive Query Execution introduces compared to traditional Catalyst optimization?
AQE replaces Catalyst with a different optimizer.AQE moves from static optimization (plan fixed before execution from table stats) to dynamic optimization (plan adapts at runtime using actual shuffle output sizes).AQE caches the result of every stage so reruns are instant.AQE only applies to streaming workloads, not batch.
Chapter Summary
Apache Spark distributes work through a driver–cluster manager–executor topology. The driver translates code into a DAG of stages separated by shuffle boundaries; executors run one task per partition. Shuffles are the most expensive operation, so reducing or avoiding them is the primary tuning lever. The DataFrame API and Spark SQL are canonical because they expose the entire computation to Catalyst and Tungsten. PySpark, Scala, and SQL share the same engine — the choice is mostly about team productivity, with the lone exception of Python UDFs.
Amazon EMR offers three deployment models: EC2 for sustained 24/7 workloads, Serverless for bursty pay-per-job billing, and EKS for organizations already running Kubernetes. Mature platforms mix them. Performance tuning has shifted from manual to dynamic — Adaptive Query Execution coalesces shuffle partitions, splits skewed joins, and converts sort-merge joins to broadcast at runtime. Combined with sensible partition sizing and strategic caching, AQE handles most tuning that used to require deep expertise.