Python Data Science Program
📓 Abrir notebook en GitHub

Clase 210 — PySpark para datasets grandes

Parte: 5 — Ingeniería de Datos · Fuente: Chambers & Zaharia Spark: The Definitive Guide (O'Reilly, 2018) + PySpark docs 3.5+. ⏱️ Duración estimada: 90 min.

🎯 Objetivo

Procesar datasets que no entran en RAM con PySpark 3.5+: DataFrames con lazy evaluation, Spark SQL, particionado, joins eficientes (broadcast vs shuffle), y entender cuándo Spark gana vs pandas/Polars (>10 GB) y cuándo pierde (<1 GB, dev local).

📚 Resultados de aprendizaje

Al finalizar, el estudiante podrá:

🗺️ Temas

# Tema Por qué importa
1 RDD vs DataFrame vs SQL — 3 APIs DataFrame es el default; SQL si tu equipo lo prefiere.
2 Lazy evaluation + DAG de ejecución Optimización que pandas no tiene.
3 Particionado: al leer, al escribir, en memoria Donde se gana o pierde 10× perf.
4 Joins: broadcast vs shuffle vs sort-merge El bottleneck más común.
5 Caching / persist Cuándo materializar; cuándo NO.
6 Spark UI: stages, shuffle, skew Diagnosis sin esto = ciego.

📖 Definiciones y características

📂 Dataset / recursos

🧪 Ejercicios

  1. Spark session local: spark = SparkSession.builder.master("local[4]").appName("demo").getOrCreate(). Cargá un parquet, mostrá schema con df.printSchema().
  2. Lazy vs eager: df2 = df.filter(...).select(...) (rápido, no ejecuta). df2.count() (lento, ejecuta). Mirá en Spark UI (localhost:4040) los stages.
  3. Broadcast join: cargá taxi (10 GB) y zones (1 KB). Hacé taxi.join(broadcast(zones), "zone_id"). Compará con taxi.join(zones, "zone_id") sin hint — debería ser igual gracias a AQE auto-broadcast.
  4. Particionado al escribir: df.write.partitionBy("date").parquet("out/"). Verificá estructura out/date=2024-01-01/part-*.parquet. Lecturas con filtro WHERE date='2024-01-01' solo leen ese subdirectorio.
  5. Skew: simulá una key skewed (90% rows con user_id=1). Hacé groupBy → observá UI: 1 task tarda 90% del tiempo. Mitigá con salting: agregar columna random salt = (rand() * 10).cast("int"), group por (user_id, salt), después sumar.

📝 Homework verificable

Notebook + reporte:

  1. Pipeline PySpark que: lee NYC Taxi (parquet), filtra outliers, agrega por pickup_zone y hour, escribe parquet particionado por pickup_date.
  2. Comparación de performance: misma agregación en (a) pandas (si entra), (b) Polars, (c) PySpark. Reportar tiempo y RAM peak.
  3. Identificar 1 stage skewed en Spark UI y aplicar salting para mitigar — mostrar antes/después.
  4. Output final con .write.bucketBy(20, "zone_id").parquet(...) para acelerar joins futuros.
  5. README con cuándo elegir cada uno: criterios objetivos (tamaño, latencia, equipo).

Criterio de aceptación: el alumno justifica con números por qué Spark gana en su dataset y muestra una optimización (broadcast/salting/partitioning) con impacto medido.

⚠️ Errores comunes

Síntoma / mensaje Causa y cómo arreglar
OutOfMemoryError: Java heap space Executor sin RAM. Fix: --driver-memory 4g --executor-memory 8g, o .config("spark.driver.memory", "4g").
df.show() tarda 5 minutos Estás computando todo el DataFrame para mostrar 20 filas. Fix: df.limit(20).show() o materializar intermedio con df.cache().
Job tarda 30 min en una agregación de 1 GB Sospechosos: shuffle excesivo, default partitions=200 para 1 GB es overkill. Fix: spark.sql.shuffle.partitions=20 para datasets chicos.
Caused by: BindException: Address already in use Spark UI en :4040 conflicto. Fix: .config("spark.ui.port", "4041") o matar proceso anterior.
to_pandas() falla con OOM Estás trayendo TODO el DataFrame al driver. Fix: o filtrá antes (df.limit(1000).toPandas()) o usá df.toLocalIterator() para streaming.
Resultados distintos entre runs UDFs no-determinísticos, o monotonically_increasing_id() con coalesce. Fix: usar row_number() con orden explícito.
partitionBy("user_id") con 1M users crea 1M directorios High-cardinality partitioning es anti-pattern. Fix: particionar por columna baja-cardinalidad (date, country); para usuarios usar bucketing.

❓ Preguntas frecuentes

❓ ¿PySpark, pandas, Polars o DuckDB?

Decisión por tamaño + uso: - <1 GB local: pandas o Polars (Polars 5-10× más rápido). - 1-50 GB single machine: Polars o DuckDB. - >50 GB single machine: Polars streaming, DuckDB out-of-core, o PySpark local. - >500 GB y/o cluster: PySpark.

❓ ¿PySpark en local o necesito cluster?

master("local[*]") corre Spark en tu laptop usando todos los cores — perfecto para dev/test con datasets hasta unos GB. Para producción TB: Databricks, EMR, GCP Dataproc, Azure Synapse, o K8s con Spark Operator.

❓ ¿UDF Python vs Spark SQL functions?

SQL functions (F.col, F.when, F.regexp_extract) son mucho más rápidas — corren en JVM. UDFs Python serializan a Python por row (lento). Si no podés evitarlo: Pandas UDFs (vectorizadas, ~10× UDF normal).

❓ ¿Por qué df.cache() no aceleró?

Cache es lazy también. Tenés que disparar una action (df.count()) para materializarlo. Después las siguientes actions reutilizan el cache. Y: si tu dataset no entra en memoria, cache no ayuda — usá df.persist(StorageLevel.DISK_ONLY).

❓ ¿Spark 3 vs 4 vs Databricks Runtime?

Spark 3.5 es el estándar open-source en 2026. Spark 4 trae Spark Connect (cliente liviano vs server JVM), Variant type, mejoras en streaming. Databricks Runtime es Spark + extensiones propietarias (Photon engine 2-5× más rápido). Para empezar: Spark 3.5 open-source.

❓ ¿Cuándo usar Spark SQL vs DataFrame API?

Equivalentes en performance (mismo Catalyst). DataFrame API: refactor-friendly, type hints. SQL: mejor si tu equipo es SQL-first o querés migrar de un warehouse. Mezclables: spark.sql("SELECT * FROM tbl").filter(...).

🔗 Referencias

📥 Material descargable

➡️ Siguiente clase

Clase 211 — Polars como alternativa moderna