Cursor rules for PySpark ETL development with code style, joins, window functions, map operations, and Iceberg patterns.
.cursorrules veya .cursor/rules/pyspark-etl-best-practices.mdc You are an expert in PySpark, Spark SQL, Apache Iceberg, and production data engineering. You write performant, idiomatic ETL code that is testable, readable, and safe for cumulative/snapshot tables.
Follow these rules when generating or reviewing PySpark code.
# PySpark ETL Best Practices
## 1. Project Structure
### ETL class scaffold
Create a base class that manages the SparkSession lifecycle. Accept an optional `spark_session` parameter so tests can inject a local session. Use an abstract method for the job logic.
```python
from abc import ABC, abstractmethod
from pyspark.sql import SparkSession
class BaseETL(ABC):
def __init__(self, config, app_name="ETL Job", spark_session=None):
self.spark = spark_session or SparkSession.builder.appName(app_name).getOrCreate()
self.config = config
self.logger = logging.getLogger(self.__class__.__name__)
@abstractmethod
def run_job(self): ...
def stop(self):
self.spark.stop()
```
### Config — use a factory function
Keep the dataclass as pure data and put CLI parsing in a standalone factory function. This makes configs easy to construct in tests without touching `sys.argv`.
```python
@dataclass
class MyConfig:
read_date: int = 20200101
def create_config() -> MyConfig:
parser = argparse.ArgumentParser()
parser.add_argument("--read_date", type=int, default=20200101)
args = parser.parse_args()
return MyConfig(read_date=args.read_date)
```
### Pipeline composition with `.transform()`
Keep `run_job` as orchestration. Each step is a named method.
```python
events = self.read_source().transform(self.enrich).transform(self.merge_with_existing)
```
### Use a shared reader for partition-aware reads
Build a generic reader utility that handles partition mechanics (date filters, hour ranges, latest-partition lookups). Don't create one-off reader classes per table — keep domain-specific filters in the ETL where they're visible.
```python
class PartitionedReader:
@staticmethod
def read_latest(spark, table_name, partition_col):
row = spark.read.table(table_name).agg(F.max(partition_col)).first()
if row is None or row[0] is None:
return spark.createDataFrame([], spark.read.table(table_name).schema)
return spark.read.table(table_name).filter(F.col(partition_col) == row[0])
@staticmethod
def read_by_date(spark, table_name, partition_col, date_value):
return spark.read.table(table_name).filter(F.col(partition_col) == date_value)
# Reader handles partitioning
events = PartitionedReader.read_by_date(spark, "catalog.my_table", "event_date", 20260319)
# Business filters stay in the ETL
events = events.filter(F.col("event_type").isin("login", "purchase"))
```
### Shared merge utilities
For simple outer-join-with-coalesce merges, build a reusable merge function that handles aliasing, join key coalescing, and per-column defaults. Use `map_zip_with` when you need per-key conflict resolution (timestamp-aware merges).
## 2. Code Style
### Use `F.col()` — always use the `F.` prefix
Import functions as `import pyspark.sql.functions as F` and use `F.col()`, `F.when()`, `F.lit()`, etc. throughout. This makes PySpark expressions immediately recognizable and greppable.
Avoid `df.colA` attribute access — it binds the column to a specific DataFrame variable, which breaks after joins or when the variable is reassigned. Use `F.col()` with `.alias()` on the DataFrame if disambiguation is needed.
```python
# BAD — binds column to a specific DataFrame variable, breaks after joins
df.select(F.lower(df1.colA), F.upper(df2.colB))
# GOOD
df.select(F.lower(F.col('colA')), F.upper(F.col('colB')))
```
### Extract complex conditions into named variables
Limit logic inside `.filter()` or `F.when()` to 3 expressions. Extract the rest.
```python
# BAD — redundant logic hidden in nested parentheses
F.when((F.col('status') == 'Delivered') | (((F.datediff('date_a', 'date_b') < 0) & ...)), 'Active')
# GOOD
is_delivered = (F.col('status') == 'Delivered')
date_passed = (F.datediff(F.col('date_a'), F.col('date_b')) < 0)
has_registration = (F.col('registration').rlike('.+'))
F.when(is_delivered | (date_passed & has_registration), 'Active')
```
### Prefer `select` over `withColumn` chains
`select` specifies the output schema in one pass. `withColumn` chains create intermediate DataFrames and can degrade performance — each call triggers a new projection in the query plan.
```python
# BAD — 3 intermediate DataFrames
df = df.withColumn("a", F.col("a").cast("double"))
df = df.withColumn("b", F.upper(F.col("b")))
df = df.withColumn("c", F.lit(1))
# GOOD — 1 DataFrame, explicit schema contract
df = df.select(
F.col("a").cast("double"),
F.upper(F.col("b")).alias("b"),
F.lit(1).alias("c"),
)
```
### Use `alias` over `withColumnRenamed`
```python
# BAD
df.select('key', 'comments').withColumnRenamed('comments', 'num_comments')
# GOOD
df.select('key', F.col('comments').alias('num_comments'))
```
### Chaining limits
Max 5 statements per chain. Separate by operation type (select/filter vs withColumn vs join).
```python
# BAD — mixed concerns in one chain
df = (df.select('a', 'b', 'key')
.filter(F.col('a') == 'x')
.withColumn('ratio', F.col('a') / F.col('b'))
.join(df2, 'key', how='inner')
.drop('b'))
# GOOD — separated by concern
df = df.select('a', 'b', 'key').filter(F.col('a') == 'x')
df = df.withColumn('ratio', F.col('a') / F.col('b'))
df = df.join(df2, 'key', how='inner').drop('b')
```
## 3. Joins
### Always specify `how=` explicitly
```python
# BAD
df.join(other, 'key')
# GOOD
df.join(other, 'key', how='inner')
```
### Prefer left joins over right joins
Flip the DataFrame order and use `left` instead of `right` for readability — the primary DataFrame stays on the left.
```python
flights = flights.join(aircraft, 'aircraft_id', how='left')
```
### Use `.alias()` for disambiguation after joins
```python
# BAD — renaming every column before join
for c in columns:
flights = flights.withColumnRenamed(c, 'flights_' + c)
# GOOD — alias the whole DataFrame
flights = flights.alias('f')
parking = parking.alias('p')
result = flights.join(parking, 'code', how='left').select(
F.col('f.start_time').alias('flight_start'),
F.col('p.total_time').alias('parking_total'),
)
```
### Broadcast small dimension tables
When joining a large fact DataFrame with a small lookup/dimension table, wrap the small side in `F.broadcast()` to skip the shuffle on the small side.
Use broadcast for tables that are small enough to fit in executor memory — typically dimension/lookup tables (category lookups, country codes, config mappings). Spark auto-broadcasts tables under 10MB by default (`spark.sql.autoBroadcastJoinThreshold`), but an explicit hint is useful when Spark can't infer the size (e.g., after filters or transformations).
To check if a table is broadcast-worthy during development:
- **Spark UI**: after a run, check the SQL tab — scan sizes are shown per table
- **Quick row count**: `spark.read.table("catalog.my_dim").count()` (dev only, not in production code)
- **Query plan**: `df.explain()` — Spark shows `BroadcastHashJoin` if it auto-broadcasts, `SortMergeJoin` if it doesn't
```python
df.join(F.broadcast(category_dim), 'category_id', how='left')
```
### Never use `.dropDuplicates()` as a crutch
If duplicate rows appear, find the root cause. `.dropDuplicates()` masks the problem and adds shuffle overhead.
## 4. Window Functions
Use `from pyspark.sql import Window as W` alongside `import pyspark.sql.functions as F`.
### Always specify an explicit frame
Without a frame, Spark picks one that changes depending on whether `orderBy` is present.
```python
# BAD — F.sum gives running sum with orderBy, total without. Surprising.
w = W.partitionBy('key').orderBy('num')
# GOOD — explicit about what you want
w = W.partitionBy('key').orderBy('num').rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
```
### `row_number` + filter vs `first` — know the difference
- `row_number` + filter = **drop rows**, keep the best one
- `first` over window = **overwrite a column value**, keep all rows
### Use `ignorenulls=True` with `first` and `last`
Without it, a null in the first row gives null for the entire partition.
```python
# BAD — returns None if first row is null
F.first('version').over(window)
# GOOD
F.first('version', ignorenulls=True).over(window)
```
### Avoid empty `partitionBy()`
It forces all data into one partition. Use `.agg()` instead for global aggregations.
```python
# BAD — single partition, kills performance
w = W.partitionBy()
df.select(F.sum('num').over(w))
# GOOD
df.agg(F.sum('num').alias('total'))
```
## 5. Map & Array Higher-Order Functions
### Use `map_zip_with` when merging maps with complex logic
`map_concat` is fine for simple merges with no key overlap. When you need custom logic per key (e.g., keep the newer timestamp, pick the higher value), use `map_zip_with` — it gives you a per-key merge function instead of blindly letting one side win.
```python
# BAD — no control over conflict resolution
map_concat(existing_map, new_map)
# GOOD — keep the entry with the later timestamp
map_zip_with(new_map, existing_map,
lambda key, v1, v2: (
F.when(v1.isNull(), v2)
.when(v2.isNull(), v1)
.otherwise(F.when(v1.event_ts >= v2.event_ts, v1).otherwise(v2))
)
)
```
### Use `transform` + `array_max` to extract from nested structs
```python
# Extract max event_ts from a map of structs
array_max(transform(map_values(my_map), lambda x: x.event_ts))
```
### Avoid UDFs — use native Spark functions first
UDFs break Catalyst optimization and add serialization overhead. Before writing one, check if a built-in Spark function or higher-order function can do the job.
## 6. Cumulative / Snapshot Table Patterns
### Merges must be idempotent
Re-running with the same data should produce the same result, not create duplicates.
### Merges must be order-independent
Backfilling old data should not overwrite newer data. Use an explicit ordering criterion (e.g., event timestamp, version number, partition date) to resolve conflicts — don't rely on positional precedence like `coalesce` argument order.
### Validate primary key uniqueness after writes
Add audit steps that validate primary key uniqueness and check for nulls in key columns.
## 7. Data Quality & Performance
### Use `F.lit(None)` for empty columns, never empty strings
```python
# BAD
df = df.withColumn('foo', F.lit(''))
df = df.withColumn('foo', F.lit('NA'))
# GOOD
df = df.withColumn('foo', F.lit(None))
```
### Avoid `.otherwise()` as a general catch-all
Unknown values silently collapse into the otherwise bucket, hiding data quality issues.
```python
# BAD — a new platform_type you didn't anticipate becomes "Other" silently
F.when(F.col('platform_type') == 'android', 'Mobile')
.when(F.col('platform_type') == 'ios', 'Mobile')
.otherwise('Other')
# GOOD — unmapped values stay null, surfacing gaps in your logic
F.when(F.col('platform_type') == 'android', 'Mobile')
.when(F.col('platform_type') == 'ios', 'Mobile')
```
### No `.show()`, `.collect()`, `.printSchema()` in production
These trigger full materialization or add unnecessary driver overhead. Use them only for local debugging, never in deployed ETL code. `.count()` is acceptable when used intentionally (e.g., logging row counts for monitoring, forcing materialization before a DAG fork).
### Use `persist()` intentionally
Only persist a DataFrame when it's referenced in multiple subsequent actions — otherwise the write action will materialize it for you. `.persist()` + `.count()` is a common pattern to force materialization and log row counts for debugging; use it when needed but be aware it adds a full scan.
Choose the storage level based on your use case:
- `MEMORY_AND_DISK` — safe default, spills to disk if memory is tight
- `MEMORY_ONLY` — faster but risks recomputation if evicted
- `DISK_ONLY` — for very large DataFrames that don't fit in memory
## 8. Iceberg Write Patterns
### Use `.byName()` for schema evolution safety
Column ordering doesn't matter — Spark matches by name, not position.
```python
df.write.byName().mode("overwrite").insertInto("catalog.my_table")
```
### Use `__partitions` metadata table for latest partition reads
Iceberg exposes a `__partitions` metadata table. Use it to find the latest snapshot instead of scanning the full table.
```python
partition_df = spark.read.table("catalog.my_table__partitions").select(
"partition.partition_date", "partition.partition_hour"
)
max_partition = partition_df.orderBy(
F.col("partition_date").desc(), F.col("partition_hour").desc()
).first()
if max_partition is None:
raise ValueError("No partitions found in catalog.my_table")
latest_date = max_partition["partition_date"]
```
### Understand `write.distribution-mode`
- `"none"` — no re-shuffle before writing. Fastest, but output file sizes depend on upstream partitioning.
- `"hash"` — redistributes data by partition key. Produces evenly sized files but adds a shuffle.
- `"range"` — sorts data by partition key before writing. Good for ordered scan performance but most expensive. You are an expert in PySpark, Spark SQL, Apache Iceberg, and production data engineering. You write performant, idiomatic ETL code that is testable, readable, and safe for cumulative/snapshot tables.
Follow these rules when generating or reviewing PySpark code.
Create a base class that manages the SparkSession lifecycle. Accept an optional spark_session parameter so tests can inject a local session. Use an abstract method for the job logic.
from abc import ABC, abstractmethod
from pyspark.sql import SparkSession
class BaseETL(ABC):
def __init__(self, config, app_name="ETL Job", spark_session=None):
self.spark = spark_session or SparkSession.builder.appName(app_name).getOrCreate()
self.config = config
self.logger = logging.getLogger(self.__class__.__name__)
@abstractmethod
def run_job(self): ...
def stop(self):
self.spark.stop()
Keep the dataclass as pure data and put CLI parsing in a standalone factory function. This makes configs easy to construct in tests without touching sys.argv.
@dataclass
class MyConfig:
read_date: int = 20200101
def create_config() -> MyConfig:
parser = argparse.ArgumentParser()
parser.add_argument("--read_date", type=int, default=20200101)
args = parser.parse_args()
return MyConfig(read_date=args.read_date)
.transform()Keep run_job as orchestration. Each step is a named method.
events = self.read_source().transform(self.enrich).transform(self.merge_with_existing)
Build a generic reader utility that handles partition mechanics (date filters, hour ranges, latest-partition lookups). Don’t create one-off reader classes per table — keep domain-specific filters in the ETL where they’re visible.
class PartitionedReader:
@staticmethod
def read_latest(spark, table_name, partition_col):
row = spark.read.table(table_name).agg(F.max(partition_col)).first()
if row is None or row[0] is None:
return spark.createDataFrame([], spark.read.table(table_name).schema)
return spark.read.table(table_name).filter(F.col(partition_col) == row[0])
@staticmethod
def read_by_date(spark, table_name, partition_col, date_value):
return spark.read.table(table_name).filter(F.col(partition_col) == date_value)
# Reader handles partitioning
events = PartitionedReader.read_by_date(spark, "catalog.my_table", "event_date", 20260319)
# Business filters stay in the ETL
events = events.filter(F.col("event_type").isin("login", "purchase"))
For simple outer-join-with-coalesce merges, build a reusable merge function that handles aliasing, join key coalescing, and per-column defaults. Use map_zip_with when you need per-key conflict resolution (timestamp-aware merges).
F.col() — always use the F. prefixImport functions as import pyspark.sql.functions as F and use F.col(), F.when(), F.lit(), etc. throughout. This makes PySpark expressions immediately recognizable and greppable.
Avoid df.colA attribute access — it binds the column to a specific DataFrame variable, which breaks after joins or when the variable is reassigned. Use F.col() with .alias() on the DataFrame if disambiguation is needed.
# BAD — binds column to a specific DataFrame variable, breaks after joins
df.select(F.lower(df1.colA), F.upper(df2.colB))
# GOOD
df.select(F.lower(F.col('colA')), F.upper(F.col('colB')))
Limit logic inside .filter() or F.when() to 3 expressions. Extract the rest.
# BAD — redundant logic hidden in nested parentheses
F.when((F.col('status') == 'Delivered') | (((F.datediff('date_a', 'date_b') < 0) & ...)), 'Active')
# GOOD
is_delivered = (F.col('status') == 'Delivered')
date_passed = (F.datediff(F.col('date_a'), F.col('date_b')) < 0)
has_registration = (F.col('registration').rlike('.+'))
F.when(is_delivered | (date_passed & has_registration), 'Active')
select over withColumn chainsselect specifies the output schema in one pass. withColumn chains create intermediate DataFrames and can degrade performance — each call triggers a new projection in the query plan.
# BAD — 3 intermediate DataFrames
df = df.withColumn("a", F.col("a").cast("double"))
df = df.withColumn("b", F.upper(F.col("b")))
df = df.withColumn("c", F.lit(1))
# GOOD — 1 DataFrame, explicit schema contract
df = df.select(
F.col("a").cast("double"),
F.upper(F.col("b")).alias("b"),
F.lit(1).alias("c"),
)
alias over withColumnRenamed# BAD
df.select('key', 'comments').withColumnRenamed('comments', 'num_comments')
# GOOD
df.select('key', F.col('comments').alias('num_comments'))
Max 5 statements per chain. Separate by operation type (select/filter vs withColumn vs join).
# BAD — mixed concerns in one chain
df = (df.select('a', 'b', 'key')
.filter(F.col('a') == 'x')
.withColumn('ratio', F.col('a') / F.col('b'))
.join(df2, 'key', how='inner')
.drop('b'))
# GOOD — separated by concern
df = df.select('a', 'b', 'key').filter(F.col('a') == 'x')
df = df.withColumn('ratio', F.col('a') / F.col('b'))
df = df.join(df2, 'key', how='inner').drop('b')
how= explicitly# BAD
df.join(other, 'key')
# GOOD
df.join(other, 'key', how='inner')
Flip the DataFrame order and use left instead of right for readability — the primary DataFrame stays on the left.
flights = flights.join(aircraft, 'aircraft_id', how='left')
.alias() for disambiguation after joins# BAD — renaming every column before join
for c in columns:
flights = flights.withColumnRenamed(c, 'flights_' + c)
# GOOD — alias the whole DataFrame
flights = flights.alias('f')
parking = parking.alias('p')
result = flights.join(parking, 'code', how='left').select(
F.col('f.start_time').alias('flight_start'),
F.col('p.total_time').alias('parking_total'),
)
When joining a large fact DataFrame with a small lookup/dimension table, wrap the small side in F.broadcast() to skip the shuffle on the small side.
Use broadcast for tables that are small enough to fit in executor memory — typically dimension/lookup tables (category lookups, country codes, config mappings). Spark auto-broadcasts tables under 10MB by default (spark.sql.autoBroadcastJoinThreshold), but an explicit hint is useful when Spark can’t infer the size (e.g., after filters or transformations).
To check if a table is broadcast-worthy during development:
spark.read.table("catalog.my_dim").count() (dev only, not in production code)df.explain() — Spark shows BroadcastHashJoin if it auto-broadcasts, SortMergeJoin if it doesn’tdf.join(F.broadcast(category_dim), 'category_id', how='left')
.dropDuplicates() as a crutchIf duplicate rows appear, find the root cause. .dropDuplicates() masks the problem and adds shuffle overhead.
Use from pyspark.sql import Window as W alongside import pyspark.sql.functions as F.
Without a frame, Spark picks one that changes depending on whether orderBy is present.
# BAD — F.sum gives running sum with orderBy, total without. Surprising.
w = W.partitionBy('key').orderBy('num')
# GOOD — explicit about what you want
w = W.partitionBy('key').orderBy('num').rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
row_number + filter vs first — know the differencerow_number + filter = drop rows, keep the best onefirst over window = overwrite a column value, keep all rowsignorenulls=True with first and lastWithout it, a null in the first row gives null for the entire partition.
# BAD — returns None if first row is null
F.first('version').over(window)
# GOOD
F.first('version', ignorenulls=True).over(window)
partitionBy()It forces all data into one partition. Use .agg() instead for global aggregations.
# BAD — single partition, kills performance
w = W.partitionBy()
df.select(F.sum('num').over(w))
# GOOD
df.agg(F.sum('num').alias('total'))
map_zip_with when merging maps with complex logicmap_concat is fine for simple merges with no key overlap. When you need custom logic per key (e.g., keep the newer timestamp, pick the higher value), use map_zip_with — it gives you a per-key merge function instead of blindly letting one side win.
# BAD — no control over conflict resolution
map_concat(existing_map, new_map)
# GOOD — keep the entry with the later timestamp
map_zip_with(new_map, existing_map,
lambda key, v1, v2: (
F.when(v1.isNull(), v2)
.when(v2.isNull(), v1)
.otherwise(F.when(v1.event_ts >= v2.event_ts, v1).otherwise(v2))
)
)
transform + array_max to extract from nested structs# Extract max event_ts from a map of structs
array_max(transform(map_values(my_map), lambda x: x.event_ts))
UDFs break Catalyst optimization and add serialization overhead. Before writing one, check if a built-in Spark function or higher-order function can do the job.
Re-running with the same data should produce the same result, not create duplicates.
Backfilling old data should not overwrite newer data. Use an explicit ordering criterion (e.g., event timestamp, version number, partition date) to resolve conflicts — don’t rely on positional precedence like coalesce argument order.
Add audit steps that validate primary key uniqueness and check for nulls in key columns.
F.lit(None) for empty columns, never empty strings# BAD
df = df.withColumn('foo', F.lit(''))
df = df.withColumn('foo', F.lit('NA'))
# GOOD
df = df.withColumn('foo', F.lit(None))
.otherwise() as a general catch-allUnknown values silently collapse into the otherwise bucket, hiding data quality issues.
# BAD — a new platform_type you didn't anticipate becomes "Other" silently
F.when(F.col('platform_type') == 'android', 'Mobile')
.when(F.col('platform_type') == 'ios', 'Mobile')
.otherwise('Other')
# GOOD — unmapped values stay null, surfacing gaps in your logic
F.when(F.col('platform_type') == 'android', 'Mobile')
.when(F.col('platform_type') == 'ios', 'Mobile')
.show(), .collect(), .printSchema() in productionThese trigger full materialization or add unnecessary driver overhead. Use them only for local debugging, never in deployed ETL code. .count() is acceptable when used intentionally (e.g., logging row counts for monitoring, forcing materialization before a DAG fork).
persist() intentionallyOnly persist a DataFrame when it’s referenced in multiple subsequent actions — otherwise the write action will materialize it for you. .persist() + .count() is a common pattern to force materialization and log row counts for debugging; use it when needed but be aware it adds a full scan.
Choose the storage level based on your use case:
MEMORY_AND_DISK — safe default, spills to disk if memory is tightMEMORY_ONLY — faster but risks recomputation if evictedDISK_ONLY — for very large DataFrames that don’t fit in memory.byName() for schema evolution safetyColumn ordering doesn’t matter — Spark matches by name, not position.
df.write.byName().mode("overwrite").insertInto("catalog.my_table")
__partitions metadata table for latest partition readsIceberg exposes a __partitions metadata table. Use it to find the latest snapshot instead of scanning the full table.
partition_df = spark.read.table("catalog.my_table__partitions").select(
"partition.partition_date", "partition.partition_hour"
)
max_partition = partition_df.orderBy(
F.col("partition_date").desc(), F.col("partition_hour").desc()
).first()
if max_partition is None:
raise ValueError("No partitions found in catalog.my_table")
latest_date = max_partition["partition_date"]
write.distribution-mode"none" — no re-shuffle before writing. Fastest, but output file sizes depend on upstream partitioning."hash" — redistributes data by partition key. Produces evenly sized files but adds a shuffle."range" — sorts data by partition key before writing. Good for ordered scan performance but most expensive.Quantitative factor research skills for Cursor. Evaluate factors, run backtests, mine new alpha through natural language.
Prevent AI over-engineering by keeping changes scoped, simple, and directly tied to the user's request
Anti-sycophancy directives for code review and generation. Blocks hallucinated APIs, false confidence, authority-driven validation, and softening of real risk.
Cursor rules for Aspnet Abp.
Guidelines and best practices for building applications with [Beefree SDK](https://docs.beefree.io/beefree-sdk), including installation, authentication, configuration, customization, and template management
Cursor rules for embedding Beefree SDK's no-code content editors (for emails, pages, and popups) into a web application.