paryoja opened a new issue, #54378:
URL: https://github.com/apache/spark/issues/54378
### What type of issue is this?
Bug
### Spark version
4.0.1 (with Iceberg 1.10.1)
### Describe the bug
When using Storage-Partitioned Join (SPJ) with
`spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled=true`,
both `dropDuplicates()` and Window-based dedup (`row_number()`) produce
**incorrect results** — duplicate rows that should have been removed survive in
the output.
### Root cause
Partial clustering splits a partition with many files across multiple tasks
to improve parallelism. However, downstream dedup operations (`dropDuplicates`,
`row_number() OVER (PARTITION BY ...)`) rely on the assumption that all rows
with the same partition key are co-located in a single task. Since SPJ
eliminates the Exchange (shuffle), each split independently deduplicates, and
duplicate partition keys survive across splits.
### Steps to reproduce
1. Create two Iceberg tables partitioned by `part_key`:
- **Big table**: 20 separate appends to partition `p1` (= 20 data files),
plus 1 append to `p2`
- **Small table**: 1 append containing both `p1` and `p2`
2. Enable SPJ with partial clustering:
```
spark.sql.sources.v2.bucketing.enabled = true
spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled = true
spark.sql.iceberg.planning.preserve-data-grouping = true
spark.sql.autoBroadcastJoinThreshold = -1
```
3. Perform a `leftsemi` join on `part_key`, then
`dropDuplicates(["part_key"])`
4. The physical plan contains **no Exchange node**, confirming SPJ is active
```
import shutil
import tempfile
from pyspark.sql import SparkSession, Window, functions as F
from pyspark.sql.types import IntegerType, StringType, StructField,
StructType
import pytest
ICEBERG_PKG = "org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.1"
SCHEMA = StructType([
StructField("id", StringType()),
StructField("part_key", StringType()),
StructField("value", IntegerType()),
StructField("padding", StringType()),
])
@pytest.fixture(scope="module")
def warehouse_dir():
d = tempfile.mkdtemp(prefix="iceberg_spj_test_")
yield d
shutil.rmtree(d, ignore_errors=True)
@pytest.fixture(scope="module")
def spark_iceberg(warehouse_dir):
spark = (
SparkSession.builder.master("local[4]")
.appName("spj_window_dedup_bug")
.config("spark.jars.packages", ICEBERG_PKG)
.config(
"spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
)
.config("spark.sql.catalog.local",
"org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.local.type", "hadoop")
.config("spark.sql.catalog.local.warehouse", warehouse_dir)
.config("spark.ui.enabled", "false")
.config("spark.driver.memory", "2g")
.config("spark.sql.shuffle.partitions", "4")
.config("spark.sql.adaptive.enabled", "false")
.getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")
yield spark
spark.stop()
@pytest.fixture()
def asymmetric_tables(spark_iceberg):
"""Two Iceberg tables with asymmetric file counts to trigger partial
clustering."""
big, small = "local.db.big", "local.db.small"
for t in (big, small):
spark_iceberg.sql(f"DROP TABLE IF EXISTS {t}")
spark_iceberg.sql(f"""
CREATE TABLE {t} (id STRING, part_key STRING, value INT, padding
STRING)
USING iceberg PARTITIONED BY (part_key)
""")
# Big table: 20 appends = 20 files for partition p1
for i in range(20):
data = [(f"big_{i}_{j}", "p1", i, "X" * 1000) for j in range(200)]
spark_iceberg.createDataFrame(data, SCHEMA).writeTo(big).append()
spark_iceberg.createDataFrame([("big_other", "p2", 99, "Y")],
SCHEMA).writeTo(big).append()
# Small table: 1 append = 1 file
spark_iceberg.createDataFrame(
[("small_0", "p1", 0, "Z"), ("small_1", "p2", 1, "Z")], SCHEMA
).writeTo(small).append()
spark_iceberg.conf.set("spark.sql.sources.v2.bucketing.enabled", "true")
spark_iceberg.conf.set(
"spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled", "true"
)
spark_iceberg.conf.set("spark.sql.iceberg.planning.preserve-data-grouping",
"true")
spark_iceberg.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
return spark_iceberg.table(big), spark_iceberg.table(small)
def _get_plan(df) -> str:
return df._jdf.queryExecution().executedPlan().toString()
class TestSPJDedupBug:
def test_drop_duplicates_after_join_produces_duplicates(self,
asymmetric_tables):
"""dropDuplicates after SPJ join → duplicates survive (expected 2,
gets >2)."""
big, small = asymmetric_tables
deduped = big.join(small, on="part_key",
how="leftsemi").dropDuplicates(["part_key"])
assert "Exchange" not in _get_plan(deduped)
assert deduped.count() > 2
def test_window_dedup_after_join_produces_duplicates(self,
asymmetric_tables):
"""row_number() Window dedup after SPJ join → duplicates survive."""
big, small = asymmetric_tables
joined = big.join(small, on="part_key", how="leftsemi")
w = Window.partitionBy("part_key").orderBy(F.col("value").desc())
deduped = joined.withColumn("_r", F.row_number().over(w)).filter("_r
= 1").drop("_r")
assert "Exchange" not in _get_plan(deduped)
assert deduped.count() > 2
def test_disabling_partial_clustering_fixes_it(self, asymmetric_tables,
spark_iceberg):
"""Setting partiallyClusteredDistribution.enabled=false → dedup
works."""
big, small = asymmetric_tables
spark_iceberg.conf.set(
"spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled", "false"
)
deduped = big.join(small, on="part_key",
how="leftsemi").dropDuplicates(["part_key"])
assert deduped.count() == 2
```
### Related
SPARK-38166: Duplicates after task failure in dropDuplicates and repartition
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]