Other .mdc

Pyspark Etl Best Practices

Cursor rules for PySpark ETL development with code style, joins, window functions, map operations, and Iceberg patterns.

How to use
  1. Copy the rule content.
  2. In your project root, create .cursorrules or .cursor/rules/pyspark-etl-best-practices.mdc
  3. Paste the content and save.

You are an expert in PySpark, Spark SQL, Apache Iceberg, and production data engineering. You write performant, idiomatic ETL code that is testable, readable, and safe for cumulative/snapshot tables.

Follow these rules when generating or reviewing PySpark code.

PySpark ETL Best Practices

1. Project Structure

ETL class scaffold

Create a base class that manages the SparkSession lifecycle. Accept an optional spark_session parameter so tests can inject a local session. Use an abstract method for the job logic.

from abc import ABC, abstractmethod
from pyspark.sql import SparkSession

class BaseETL(ABC):
    def __init__(self, config, app_name="ETL Job", spark_session=None):
        self.spark = spark_session or SparkSession.builder.appName(app_name).getOrCreate()
        self.config = config
        self.logger = logging.getLogger(self.__class__.__name__)

    @abstractmethod
    def run_job(self): ...

    def stop(self):
        self.spark.stop()

Config — use a factory function

Keep the dataclass as pure data and put CLI parsing in a standalone factory function. This makes configs easy to construct in tests without touching sys.argv.

@dataclass
class MyConfig:
    read_date: int = 20200101

def create_config() -> MyConfig:
    parser = argparse.ArgumentParser()
    parser.add_argument("--read_date", type=int, default=20200101)
    args = parser.parse_args()
    return MyConfig(read_date=args.read_date)

Pipeline composition with .transform()

Keep run_job as orchestration. Each step is a named method.

events = self.read_source().transform(self.enrich).transform(self.merge_with_existing)

Use a shared reader for partition-aware reads

Build a generic reader utility that handles partition mechanics (date filters, hour ranges, latest-partition lookups). Don’t create one-off reader classes per table — keep domain-specific filters in the ETL where they’re visible.

class PartitionedReader:
    @staticmethod
    def read_latest(spark, table_name, partition_col):
        row = spark.read.table(table_name).agg(F.max(partition_col)).first()
        if row is None or row[0] is None:
            return spark.createDataFrame([], spark.read.table(table_name).schema)
        return spark.read.table(table_name).filter(F.col(partition_col) == row[0])

    @staticmethod
    def read_by_date(spark, table_name, partition_col, date_value):
        return spark.read.table(table_name).filter(F.col(partition_col) == date_value)

# Reader handles partitioning
events = PartitionedReader.read_by_date(spark, "catalog.my_table", "event_date", 20260319)

# Business filters stay in the ETL
events = events.filter(F.col("event_type").isin("login", "purchase"))

Shared merge utilities

For simple outer-join-with-coalesce merges, build a reusable merge function that handles aliasing, join key coalescing, and per-column defaults. Use map_zip_with when you need per-key conflict resolution (timestamp-aware merges).

2. Code Style

Use F.col() — always use the F. prefix

Import functions as import pyspark.sql.functions as F and use F.col(), F.when(), F.lit(), etc. throughout. This makes PySpark expressions immediately recognizable and greppable.

Avoid df.colA attribute access — it binds the column to a specific DataFrame variable, which breaks after joins or when the variable is reassigned. Use F.col() with .alias() on the DataFrame if disambiguation is needed.

# BAD — binds column to a specific DataFrame variable, breaks after joins
df.select(F.lower(df1.colA), F.upper(df2.colB))

# GOOD
df.select(F.lower(F.col('colA')), F.upper(F.col('colB')))

Extract complex conditions into named variables

Limit logic inside .filter() or F.when() to 3 expressions. Extract the rest.

# BAD — redundant logic hidden in nested parentheses
F.when((F.col('status') == 'Delivered') | (((F.datediff('date_a', 'date_b') < 0) & ...)), 'Active')

# GOOD
is_delivered = (F.col('status') == 'Delivered')
date_passed = (F.datediff(F.col('date_a'), F.col('date_b')) < 0)
has_registration = (F.col('registration').rlike('.+'))
F.when(is_delivered | (date_passed & has_registration), 'Active')

Prefer select over withColumn chains

select specifies the output schema in one pass. withColumn chains create intermediate DataFrames and can degrade performance — each call triggers a new projection in the query plan.

# BAD — 3 intermediate DataFrames
df = df.withColumn("a", F.col("a").cast("double"))
df = df.withColumn("b", F.upper(F.col("b")))
df = df.withColumn("c", F.lit(1))

# GOOD — 1 DataFrame, explicit schema contract
df = df.select(
    F.col("a").cast("double"),
    F.upper(F.col("b")).alias("b"),
    F.lit(1).alias("c"),
)

Use alias over withColumnRenamed

# BAD
df.select('key', 'comments').withColumnRenamed('comments', 'num_comments')

# GOOD
df.select('key', F.col('comments').alias('num_comments'))

Chaining limits

Max 5 statements per chain. Separate by operation type (select/filter vs withColumn vs join).

# BAD — mixed concerns in one chain
df = (df.select('a', 'b', 'key')
    .filter(F.col('a') == 'x')
    .withColumn('ratio', F.col('a') / F.col('b'))
    .join(df2, 'key', how='inner')
    .drop('b'))

# GOOD — separated by concern
df = df.select('a', 'b', 'key').filter(F.col('a') == 'x')
df = df.withColumn('ratio', F.col('a') / F.col('b'))
df = df.join(df2, 'key', how='inner').drop('b')

3. Joins

Always specify how= explicitly

# BAD
df.join(other, 'key')

# GOOD
df.join(other, 'key', how='inner')

Prefer left joins over right joins

Flip the DataFrame order and use left instead of right for readability — the primary DataFrame stays on the left.

flights = flights.join(aircraft, 'aircraft_id', how='left')

Use .alias() for disambiguation after joins

# BAD — renaming every column before join
for c in columns:
    flights = flights.withColumnRenamed(c, 'flights_' + c)

# GOOD — alias the whole DataFrame
flights = flights.alias('f')
parking = parking.alias('p')
result = flights.join(parking, 'code', how='left').select(
    F.col('f.start_time').alias('flight_start'),
    F.col('p.total_time').alias('parking_total'),
)

Broadcast small dimension tables

When joining a large fact DataFrame with a small lookup/dimension table, wrap the small side in F.broadcast() to skip the shuffle on the small side.

Use broadcast for tables that are small enough to fit in executor memory — typically dimension/lookup tables (category lookups, country codes, config mappings). Spark auto-broadcasts tables under 10MB by default (spark.sql.autoBroadcastJoinThreshold), but an explicit hint is useful when Spark can’t infer the size (e.g., after filters or transformations).

To check if a table is broadcast-worthy during development:

  • Spark UI: after a run, check the SQL tab — scan sizes are shown per table
  • Quick row count: spark.read.table("catalog.my_dim").count() (dev only, not in production code)
  • Query plan: df.explain() — Spark shows BroadcastHashJoin if it auto-broadcasts, SortMergeJoin if it doesn’t
df.join(F.broadcast(category_dim), 'category_id', how='left')

Never use .dropDuplicates() as a crutch

If duplicate rows appear, find the root cause. .dropDuplicates() masks the problem and adds shuffle overhead.

4. Window Functions

Use from pyspark.sql import Window as W alongside import pyspark.sql.functions as F.

Always specify an explicit frame

Without a frame, Spark picks one that changes depending on whether orderBy is present.

# BAD — F.sum gives running sum with orderBy, total without. Surprising.
w = W.partitionBy('key').orderBy('num')

# GOOD — explicit about what you want
w = W.partitionBy('key').orderBy('num').rowsBetween(W.unboundedPreceding, W.unboundedFollowing)

row_number + filter vs first — know the difference

  • row_number + filter = drop rows, keep the best one
  • first over window = overwrite a column value, keep all rows

Use ignorenulls=True with first and last

Without it, a null in the first row gives null for the entire partition.

# BAD — returns None if first row is null
F.first('version').over(window)

# GOOD
F.first('version', ignorenulls=True).over(window)

Avoid empty partitionBy()

It forces all data into one partition. Use .agg() instead for global aggregations.

# BAD — single partition, kills performance
w = W.partitionBy()
df.select(F.sum('num').over(w))

# GOOD
df.agg(F.sum('num').alias('total'))

5. Map & Array Higher-Order Functions

Use map_zip_with when merging maps with complex logic

map_concat is fine for simple merges with no key overlap. When you need custom logic per key (e.g., keep the newer timestamp, pick the higher value), use map_zip_with — it gives you a per-key merge function instead of blindly letting one side win.

# BAD — no control over conflict resolution
map_concat(existing_map, new_map)

# GOOD — keep the entry with the later timestamp
map_zip_with(new_map, existing_map,
    lambda key, v1, v2: (
        F.when(v1.isNull(), v2)
        .when(v2.isNull(), v1)
        .otherwise(F.when(v1.event_ts >= v2.event_ts, v1).otherwise(v2))
    )
)

Use transform + array_max to extract from nested structs

# Extract max event_ts from a map of structs
array_max(transform(map_values(my_map), lambda x: x.event_ts))

Avoid UDFs — use native Spark functions first

UDFs break Catalyst optimization and add serialization overhead. Before writing one, check if a built-in Spark function or higher-order function can do the job.

6. Cumulative / Snapshot Table Patterns

Merges must be idempotent

Re-running with the same data should produce the same result, not create duplicates.

Merges must be order-independent

Backfilling old data should not overwrite newer data. Use an explicit ordering criterion (e.g., event timestamp, version number, partition date) to resolve conflicts — don’t rely on positional precedence like coalesce argument order.

Validate primary key uniqueness after writes

Add audit steps that validate primary key uniqueness and check for nulls in key columns.

7. Data Quality & Performance

Use F.lit(None) for empty columns, never empty strings

# BAD
df = df.withColumn('foo', F.lit(''))
df = df.withColumn('foo', F.lit('NA'))

# GOOD
df = df.withColumn('foo', F.lit(None))

Avoid .otherwise() as a general catch-all

Unknown values silently collapse into the otherwise bucket, hiding data quality issues.

# BAD — a new platform_type you didn't anticipate becomes "Other" silently
F.when(F.col('platform_type') == 'android', 'Mobile')
 .when(F.col('platform_type') == 'ios', 'Mobile')
 .otherwise('Other')

# GOOD — unmapped values stay null, surfacing gaps in your logic
F.when(F.col('platform_type') == 'android', 'Mobile')
 .when(F.col('platform_type') == 'ios', 'Mobile')

No .show(), .collect(), .printSchema() in production

These trigger full materialization or add unnecessary driver overhead. Use them only for local debugging, never in deployed ETL code. .count() is acceptable when used intentionally (e.g., logging row counts for monitoring, forcing materialization before a DAG fork).

Use persist() intentionally

Only persist a DataFrame when it’s referenced in multiple subsequent actions — otherwise the write action will materialize it for you. .persist() + .count() is a common pattern to force materialization and log row counts for debugging; use it when needed but be aware it adds a full scan.

Choose the storage level based on your use case:

  • MEMORY_AND_DISK — safe default, spills to disk if memory is tight
  • MEMORY_ONLY — faster but risks recomputation if evicted
  • DISK_ONLY — for very large DataFrames that don’t fit in memory

8. Iceberg Write Patterns

Use .byName() for schema evolution safety

Column ordering doesn’t matter — Spark matches by name, not position.

df.write.byName().mode("overwrite").insertInto("catalog.my_table")

Use __partitions metadata table for latest partition reads

Iceberg exposes a __partitions metadata table. Use it to find the latest snapshot instead of scanning the full table.

partition_df = spark.read.table("catalog.my_table__partitions").select(
    "partition.partition_date", "partition.partition_hour"
)
max_partition = partition_df.orderBy(
    F.col("partition_date").desc(), F.col("partition_hour").desc()
).first()
if max_partition is None:
    raise ValueError("No partitions found in catalog.my_table")
latest_date = max_partition["partition_date"]

Understand write.distribution-mode

  • "none" — no re-shuffle before writing. Fastest, but output file sizes depend on upstream partitioning.
  • "hash" — redistributes data by partition key. Produces evenly sized files but adds a shuffle.
  • "range" — sorts data by partition key before writing. Good for ordered scan performance but most expensive.

Similar rules

More in Other →