mrjoe7 opened a new issue, #56515:
URL: https://github.com/apache/spark/issues/56515
### Spark version
- 4.1.2
### Iceberg version
- 1.11.0
- Spark runtime package:
org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.11.0
### Repro
Self-contained repro:
```python
from __future__ import annotations
import os
import shutil
import tempfile
from datetime import date, datetime
from pathlib import Path
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
def main() -> None:
warehouse_dir = Path(tempfile.mkdtemp(prefix="spark-iceberg-warehouse-"))
ivy_dir = Path(tempfile.mkdtemp(prefix="spark-ivy-cache-"))
os.environ.setdefault("SPARK_LOCAL_IP", "127.0.0.1")
os.environ.setdefault("SPARK_LOCAL_HOSTNAME", "localhost")
spark = (
SparkSession.builder.master("local[2]")
.appName("repro-no-plan-for-tablereference")
.config("spark.jars.packages",
"org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.11.0")
.config("spark.jars.ivy", str(ivy_dir))
.config("spark.sql.catalog.iceberg",
"org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.defaultCatalog", "iceberg")
.config("spark.sql.catalogImplementation", "in-memory")
.config("spark.sql.catalog.iceberg.type", "hadoop")
.config("spark.sql.catalog.iceberg.warehouse", str(warehouse_dir))
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
)
table_name = "iceberg.temporal.no_plan_for_tablereference"
row_schema = T.StructType(
[
T.StructField("asset_id", T.StringType(), False),
T.StructField("state_valid_from", T.DateType(), False),
T.StructField("state_valid_to", T.DateType(), False),
T.StructField("customer_name", T.StringType(), True),
T.StructField("state_hash", T.StringType(), False),
T.StructField("generated_at", T.TimestampType(), False),
]
)
try:
spark.sql("CREATE NAMESPACE IF NOT EXISTS iceberg.temporal")
spark.sql(
f"""
CREATE TABLE {table_name} (
asset_id STRING NOT NULL,
state_valid_from DATE NOT NULL,
state_valid_to DATE NOT NULL,
customer_name STRING,
state_hash STRING NOT NULL,
generated_at TIMESTAMP NOT NULL
)
USING iceberg
TBLPROPERTIES ('format-version' = '3')
"""
)
spark.createDataFrame(
[
("EAN-1", date(2024, 2, 1), date(2024, 2, 10), "keep",
"hash-keep", datetime(2024, 2, 1)),
("EAN-1", date(2024, 2, 11), date(2024, 2, 20), "delete-me",
"hash-delete", datetime(2024, 2, 11)),
],
schema=row_schema,
).writeTo(table_name).append()
replacement_state_df = spark.createDataFrame(
[
("EAN-1", date(2024, 2, 1), date(2024, 2, 10), "updated",
"hash-updated", datetime(2030, 1, 1)),
],
schema=row_schema,
)
join_columns = ["asset_id", "state_valid_from", "state_valid_to"]
target_columns = row_schema.fieldNames()
merge_source_df = (
spark.table(table_name)
.alias("target")
.join(replacement_state_df.alias("source"), on=join_columns,
how="full_outer")
.select(
F.coalesce(F.col("source.asset_id"),
F.col("target.asset_id")).alias("asset_id"),
F.coalesce(F.col("source.state_valid_from"),
F.col("target.state_valid_from")).alias(
"state_valid_from"
),
F.coalesce(F.col("source.state_valid_to"),
F.col("target.state_valid_to")).alias("state_valid_to"),
*[F.col(f"source.{column}").alias(column) for column in
target_columns if column not in join_columns],
F.when(F.col("source.asset_id").isNull(), F.lit("delete"))
.when(F.col("target.asset_id").isNull(), F.lit("insert"))
.when(F.col("source.state_hash") ==
F.col("target.state_hash"), F.lit("noop"))
.otherwise(F.lit("update"))
.alias("_merge_operation"),
)
)
merge_source_df.createOrReplaceTempView("merge_source")
spark.sql(
f"""
MERGE INTO {table_name} target
USING merge_source source
ON target.asset_id = source.asset_id
AND target.state_valid_from = source.state_valid_from
AND target.state_valid_to = source.state_valid_to
WHEN MATCHED AND source._merge_operation = 'delete' THEN DELETE
WHEN MATCHED AND source._merge_operation = 'update' THEN
UPDATE SET
target.asset_id = source.asset_id,
target.state_valid_from = source.state_valid_from,
target.state_valid_to = source.state_valid_to,
target.customer_name = source.customer_name,
target.state_hash = source.state_hash,
target.generated_at = source.generated_at
WHEN NOT MATCHED AND source._merge_operation = 'insert' THEN
INSERT (asset_id, state_valid_from, state_valid_to,
customer_name, state_hash, generated_at)
VALUES (
source.asset_id,
source.state_valid_from,
source.state_valid_to,
source.customer_name,
source.state_hash,
source.generated_at
)
"""
)
finally:
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
spark.stop()
shutil.rmtree(warehouse_dir, ignore_errors=True)
shutil.rmtree(ivy_dir, ignore_errors=True)
if __name__ == "__main__":
main()
```
### Expected
`MERGE INTO` should complete successfully.
It should not fail with an internal planner assertion.
### Actual
Spark fails during planning with an internal error:
```text
org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase
planning failed with an internal error.
...
Caused by: java.lang.AssertionError: assertion failed: No plan for
TableReference[asset_id#..., state_valid_from#..., state_valid_to#...,
customer_name#..., state_hash#..., generated_at#...]
iceberg.temporal.no_plan_for_tablereference
...
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
...
at
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.compileSubquery(InsertAdaptiveSparkPlan.scala:167)
```
### Workaround:
Downgrade to spark 4.0.3 solved the issue for me.
### Notes
- The failure reproduces consistently with local Spark 4.1.2
- The reduced shape seems to be:
- MERGE INTO Iceberg table
- USING temp view
- temp view lineage reads from the same target Iceberg table
- planner later trips on TableReference[...]
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]