CursorPool
← 返回规则列表

PySpark ETL Best 实践

用于 PySpark ETL 开发的 Cursor 规则,结合 代码 stylejoinswindow functionsmap operations、Iceberg patterns。

awesome-cursorrules 社区·11.9k 次复制·

4 条规则

.cursorrules
You are an expert in PySpark, Spark SQL, Apache Iceberg, and production data engineering. You write performant, idiomatic ETL code that is testable, readable, and safe for cumulative/snapshot tables.

Follow these rules when generating or reviewing PySpark code.

# PySpark ETL Best Practices

## 1. Project Structure

### ETL class scaffold

Create a base class that manages the SparkSession lifecycle. Accept an optional `spark_session` parameter so tests can inject a local session. Use an abstract method for the job logic.

```python
from abc import ABC, abstractmethod
from pyspark.sql import SparkSession

class BaseETL(ABC):
    def __init__(self, config, app_name="ETL Job", spark_session=None):
        self.spark = spark_session or SparkSession.builder.appName(app_name).getOrCreate()
        self.config = config
        self.logger = logging.getLogger(self.__class__.__name__)

    @abstractmethod
    def run_job(self): ...

    def stop(self):
        self.spark.stop()
```

### Config — use a factory function

Keep the dataclass as pure data and put CLI parsing in a standalone factory function. This makes configs easy to construct in tests without touching `sys.argv`.

```python
@dataclass
class MyConfig:
    read_date: int = 20200101

def create_config() -> MyConfig:
    parser = argparse.ArgumentParser()
    parser.add_argument("--read_date", type=int, default=20200101)
    args = parser.parse_args()
    return MyConfig(read_date=args.read_date)
```

### Pipeline composition with `.transform()`

Keep `run_job` as orchestration. Each step is a named method.

```python
events = self.read_source().transform(self.enrich).transform(self.merge_with_existing)
```

### Use a shared reader for partition-aware reads

Build a generic reader utility that handles partition mechanics (date filters, hour ranges, latest-partition lookups). Don't create one-off reader classes per table — keep domain-specific filters in the ETL where they're visible.

```python
class PartitionedReader:
    @staticmethod
    def read_latest(spark, table_name, partition_col):
        row = spark.read.table(table_name).agg(F.max(partition_col)).first()
        if row is None or row[0] is None:
            return spark.createDataFrame([], spark.read.table(table_name).schema)
        return spark.read.table(table_name).filter(F.col(partition_col) == row[0])

    @staticmethod
    def read_by_date(spark, table_name, partition_col, date_value):
        return spark.read.table(table_name).filter(F.col(partition_col) == date_value)

# Reader handles partitioning
events = PartitionedReader.read_by_date(spark, "catalog.my_table", "event_date", 20260319)

# Business filters stay in the ETL
events = events.filter(F.col("event_type").isin("login", "purchase"))
```

### Shared merge utilities

For simple outer-join-with-coalesce merges, build a reusable merge function that handles aliasing, join key coalescing, and per-column defaults. Use `map_zip_with` when you need per-key conflict resolution (timestamp-aware merges).

## 2. Code Style

### Use `F.col()` — always use the `F.` prefix

Import functions as `import pyspark.sql.functions as F` and use `F.col()`, `F.when()`, `F.lit()`, etc. throughout. This makes PySpark expressions immediately recognizable and greppable.

Avoid `df.colA` attribute access — it binds the column to a specific DataFrame variable, which breaks after joins or when the variable is reassigned. Use `F.col()` with `.alias()` on the DataFrame if disambiguation is needed.

```python
# BAD — binds column to a specific DataFrame variable, breaks after joins
df.select(F.lower(df1.colA), F.upper(df2.colB))

# GOOD
df.select(F.lower(F.col('colA')), F.upper(F.col('colB')))
```

### Extract complex conditions into named variables

Limit logic inside `.filter()` or `F.when()` to 3 expressions. Extract the rest.

```python
# BAD — redundant logic hidden in nested parentheses
F.when((F.col('status') == 'Delivered') | (((F.datediff('date_a', 'date_b') < 0) & ...)), 'Active')

# GOOD
is_delivered = (F.col('status') == 'Delivered')
date_passed = (F.datediff(F.col('date_a'), F.col('date_b')) < 0)
has_registration = (F.col('registration').rlike('.+'))
F.when(is_delivered | (date_passed & has_registration), 'Active')
```

### Prefer `select` over `withColumn` chains

`select` specifies the output schema in one pass. `withColumn` chains create intermediate DataFrames and can degrade performance — each call triggers a new projection in the query plan.

```python
# BAD — 3 intermediate DataFrames
df = df.withColumn("a", F.col("a").cast("double"))
df = df.withColumn("b", F.upper(F.col("b")))
df = df.withColumn("c", F.lit(1))

# GOOD — 1 DataFrame, explicit schema contract
df = df.select(
    F.col("a").cast("double"),
    F.upper(F.col("b")).alias("b"),
    F.lit(1).alias("c"),
)
```

### Use `alias` over `withColumnRenamed`

```python
# BAD
df.select('key', 'comments').withColumnRenamed('comments', 'num_comments')

# GOOD
df.select('key', F.col('comments').alias('num_comments'))
```

### Chaining limits

Max 5 statements per chain. Separate by operation type (select/filter vs withColumn vs join).

```python
# BAD — mixed concerns in one chain
df = (df.select('a', 'b', 'key')
    .filter(F.col('a') == 'x')
    .withColumn('ratio', F.col('a') / F.col('b'))
    .join(df2, 'key', how='inner')
    .drop('b'))

# GOOD — separated by concern
df = df.select('a', 'b', 'key').filter(F.col('a') == 'x')
df = df.withColumn('ratio', F.col('a') / F.col('b'))
df = df.join(df2, 'key', how='inner').drop('b')
```

## 3. Joins

### Always specify `how=` explicitly

```python
# BAD
df.join(other, 'key')

# GOOD
df.join(other, 'key', how='inner')
```

### Prefer left joins over right joins

Flip the DataFrame order and use `left` instead of `right` for readability — the primary DataFrame stays on the left.

```python
flights = flights.join(aircraft, 'aircraft_id', how='left')
```

### Use `.alias()` for disambiguation after joins

```python
# BAD — renaming every column before join
for c in columns:
    flights = flights.withColumnRenamed(c, 'flights_' + c)

# GOOD — alias the whole DataFrame
flights = flights.alias('f')
parking = parking.alias('p')
result = flights.join(parking, 'code', how='left').select(
    F.col('f.start_time').alias('flight_start'),
    F.col('p.total_time').alias('parking_total'),
)
```

### Broadcast small dimension tables

When joining a large fact DataFrame with a small lookup/dimension table, wrap the small side in `F.broadcast()` to skip the shuffle on the small side.

Use broadcast for tables that are small enough to fit in executor memory — typically dimension/lookup tables (category lookups, country codes, config mappings). Spark auto-broadcasts tables under 10MB by default (`spark.sql.autoBroadcastJoinThreshold`), but an explicit hint is useful when Spark can't infer the size (e.g., after filters or transformations).

To check if a table is broadcast-worthy during development:
- **Spark UI**: after a run, check the SQL tab — scan sizes are shown per table
- **Quick row count**: `spark.read.table("catalog.my_dim").count()` (dev only, not in production code)
- **Query plan**: `df.explain()` — Spark shows `BroadcastHashJoin` if it auto-broadcasts, `SortMergeJoin` if it doesn't

```python
df.join(F.broadcast(category_dim), 'category_id', how='left')
```

### Never use `.dropDuplicates()` as a crutch

If duplicate rows appear, find the root cause. `.dropDuplicates()` masks the problem and adds shuffle overhead.

## 4. Window Functions

Use `from pyspark.sql import Window as W` alongside `import pyspark.sql.functions as F`.

### Always specify an explicit frame

Without a frame, Spark picks one that changes depending on whether `orderBy` is present.

```python
# BAD — F.sum gives running sum with orderBy, total without. Surprising.
w = W.partitionBy('key').orderBy('num')

# GOOD — explicit about what you want
w = W.partitionBy('key').orderBy('num').rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
```

### `row_number` + filter vs `first` — know the difference

- `row_number` + filter = **drop rows**, keep the best one
- `first` over window = **overwrite a column value**, keep all rows

### Use `ignorenulls=True` with `first` and `last`

Without it, a null in the first row gives null for the entire partition.

```python
# BAD — returns None if first row is null
F.first('version').over(window)

# GOOD
F.first('version', ignorenulls=True).over(window)
```

### Avoid empty `partitionBy()`

It forces all data into one partition. Use `.agg()` instead for global aggregations.

```python
# BAD — single partition, kills performance
w = W.partitionBy()
df.select(F.sum('num').over(w))

# GOOD
df.agg(F.sum('num').alias('total'))
```

## 5. Map & Array Higher-Order Functions

### Use `map_zip_with` when merging maps with complex logic

`map_concat` is fine for simple merges with no key overlap. When you need custom logic per key (e.g., keep the newer timestamp, pick the higher value), use `map_zip_with` — it gives you a per-key merge function instead of blindly letting one side win.

```python
# BAD — no control over conflict resolution
map_concat(existing_map, new_map)

# GOOD — keep the entry with the later timestamp
map_zip_with(new_map, existing_map,
    lambda key, v1, v2: (
        F.when(v1.isNull(), v2)
        .when(v2.isNull(), v1)
        .otherwise(F.when(v1.event_ts >= v2.event_ts, v1).otherwise(v2))
    )
)
```

### Use `transform` + `array_max` to extract from nested structs

```python
# Extract max event_ts from a map of structs
array_max(transform(map_values(my_map), lambda x: x.event_ts))
```

### Avoid UDFs — use native Spark functions first

UDFs break Catalyst optimization and add serialization overhead. Before writing one, check if a built-in Spark function or higher-order function can do the job.

## 6. Cumulative / Snapshot Table Patterns

### Merges must be idempotent

Re-running with the same data should produce the same result, not create duplicates.

### Merges must be order-independent

Backfilling old data should not overwrite newer data. Use an explicit ordering criterion (e.g., event timestamp, version number, partition date) to resolve conflicts — don't rely on positional precedence like `coalesce` argument order.

### Validate primary key uniqueness after writes

Add audit steps that validate primary key uniqueness and check for nulls in key columns.

## 7. Data Quality & Performance

### Use `F.lit(None)` for empty columns, never empty strings

```python
# BAD
df = df.withColumn('foo', F.lit(''))
df = df.withColumn('foo', F.lit('NA'))

# GOOD
df = df.withColumn('foo', F.lit(None))
```

### Avoid `.otherwise()` as a general catch-all

Unknown values silently collapse into the otherwise bucket, hiding data quality issues.

```python
# BAD — a new platform_type you didn't anticipate becomes "Other" silently
F.when(F.col('platform_type') == 'android', 'Mobile')
 .when(F.col('platform_type') == 'ios', 'Mobile')
 .otherwise('Other')

# GOOD — unmapped values stay null, surfacing gaps in your logic
F.when(F.col('platform_type') == 'android', 'Mobile')
 .when(F.col('platform_type') == 'ios', 'Mobile')
```

### No `.show()`, `.collect()`, `.printSchema()` in production

These trigger full materialization or add unnecessary driver overhead. Use them only for local debugging, never in deployed ETL code. `.count()` is acceptable when used intentionally (e.g., logging row counts for monitoring, forcing materialization before a DAG fork).

### Use `persist()` intentionally

Only persist a DataFrame when it's referenced in multiple subsequent actions — otherwise the write action will materialize it for you. `.persist()` + `.count()` is a common pattern to force materialization and log row counts for debugging; use it when needed but be aware it adds a full scan.

Choose the storage level based on your use case:
- `MEMORY_AND_DISK` — safe default, spills to disk if memory is tight
- `MEMORY_ONLY` — faster but risks recomputation if evicted
- `DISK_ONLY` — for very large DataFrames that don't fit in memory

## 8. Iceberg Write Patterns

### Use `.byName()` for schema evolution safety

Column ordering doesn't matter — Spark matches by name, not position.

```python
df.write.byName().mode("overwrite").insertInto("catalog.my_table")
```

### Use `__partitions` metadata table for latest partition reads

Iceberg exposes a `__partitions` metadata table. Use it to find the latest snapshot instead of scanning the full table.

```python
partition_df = spark.read.table("catalog.my_table__partitions").select(
    "partition.partition_date", "partition.partition_hour"
)
max_partition = partition_df.orderBy(
    F.col("partition_date").desc(), F.col("partition_hour").desc()
).first()
if max_partition is None:
    raise ValueError("No partitions found in catalog.my_table")
latest_date = max_partition["partition_date"]
```

### Understand `write.distribution-mode`

- `"none"` — no re-shuffle before writing. Fastest, but output file sizes depend on upstream partitioning.
- `"hash"` — redistributes data by partition key. Produces evenly sized files but adds a shuffle.
- `"range"` — sorts data by partition key before writing. Good for ordered scan performance but most expensive.

内容来源:awesome-cursorrules(CC0-1.0 许可)