mrjoe7 opened a new issue, #56515:
URL: https://github.com/apache/spark/issues/56515

   ### Spark version
   
   - 4.1.2
   
   ### Iceberg version
   
   - 1.11.0 
   - Spark runtime package: 
org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.11.0
   
   ### Repro
   
   Self-contained repro:
   
   ```python
   from __future__ import annotations
   
   import os
   import shutil
   import tempfile
   from datetime import date, datetime
   from pathlib import Path
   
   from pyspark.sql import SparkSession
   from pyspark.sql import functions as F
   from pyspark.sql import types as T
   
   
   def main() -> None:
       warehouse_dir = Path(tempfile.mkdtemp(prefix="spark-iceberg-warehouse-"))
       ivy_dir = Path(tempfile.mkdtemp(prefix="spark-ivy-cache-"))
       os.environ.setdefault("SPARK_LOCAL_IP", "127.0.0.1")
       os.environ.setdefault("SPARK_LOCAL_HOSTNAME", "localhost")
   
       spark = (
           SparkSession.builder.master("local[2]")
           .appName("repro-no-plan-for-tablereference")
           .config("spark.jars.packages", 
"org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.11.0")
           .config("spark.jars.ivy", str(ivy_dir))
           .config("spark.sql.catalog.iceberg", 
"org.apache.iceberg.spark.SparkCatalog")
           .config("spark.sql.defaultCatalog", "iceberg")
           .config("spark.sql.catalogImplementation", "in-memory")
           .config("spark.sql.catalog.iceberg.type", "hadoop")
           .config("spark.sql.catalog.iceberg.warehouse", str(warehouse_dir))
           .config("spark.sql.extensions", 
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
           .config("spark.sql.shuffle.partitions", "4")
           .getOrCreate()
       )
   
       table_name = "iceberg.temporal.no_plan_for_tablereference"
       row_schema = T.StructType(
           [
               T.StructField("asset_id", T.StringType(), False),
               T.StructField("state_valid_from", T.DateType(), False),
               T.StructField("state_valid_to", T.DateType(), False),
               T.StructField("customer_name", T.StringType(), True),
               T.StructField("state_hash", T.StringType(), False),
               T.StructField("generated_at", T.TimestampType(), False),
           ]
       )
   
       try:
           spark.sql("CREATE NAMESPACE IF NOT EXISTS iceberg.temporal")
           spark.sql(
               f"""
               CREATE TABLE {table_name} (
                   asset_id STRING NOT NULL,
                   state_valid_from DATE NOT NULL,
                   state_valid_to DATE NOT NULL,
                   customer_name STRING,
                   state_hash STRING NOT NULL,
                   generated_at TIMESTAMP NOT NULL
               )
               USING iceberg
               TBLPROPERTIES ('format-version' = '3')
               """
           )
   
           spark.createDataFrame(
               [
                   ("EAN-1", date(2024, 2, 1), date(2024, 2, 10), "keep", 
"hash-keep", datetime(2024, 2, 1)),
                   ("EAN-1", date(2024, 2, 11), date(2024, 2, 20), "delete-me", 
"hash-delete", datetime(2024, 2, 11)),
               ],
               schema=row_schema,
           ).writeTo(table_name).append()
   
           replacement_state_df = spark.createDataFrame(
               [
                   ("EAN-1", date(2024, 2, 1), date(2024, 2, 10), "updated", 
"hash-updated", datetime(2030, 1, 1)),
               ],
               schema=row_schema,
           )
   
           join_columns = ["asset_id", "state_valid_from", "state_valid_to"]
           target_columns = row_schema.fieldNames()
           merge_source_df = (
               spark.table(table_name)
               .alias("target")
               .join(replacement_state_df.alias("source"), on=join_columns, 
how="full_outer")
               .select(
                   F.coalesce(F.col("source.asset_id"), 
F.col("target.asset_id")).alias("asset_id"),
                   F.coalesce(F.col("source.state_valid_from"), 
F.col("target.state_valid_from")).alias(
                       "state_valid_from"
                   ),
                   F.coalesce(F.col("source.state_valid_to"), 
F.col("target.state_valid_to")).alias("state_valid_to"),
                   *[F.col(f"source.{column}").alias(column) for column in 
target_columns if column not in join_columns],
                   F.when(F.col("source.asset_id").isNull(), F.lit("delete"))
                   .when(F.col("target.asset_id").isNull(), F.lit("insert"))
                   .when(F.col("source.state_hash") == 
F.col("target.state_hash"), F.lit("noop"))
                   .otherwise(F.lit("update"))
                   .alias("_merge_operation"),
               )
           )
           merge_source_df.createOrReplaceTempView("merge_source")
   
           spark.sql(
               f"""
               MERGE INTO {table_name} target
               USING merge_source source
               ON target.asset_id = source.asset_id
                  AND target.state_valid_from = source.state_valid_from
                  AND target.state_valid_to = source.state_valid_to
               WHEN MATCHED AND source._merge_operation = 'delete' THEN DELETE
               WHEN MATCHED AND source._merge_operation = 'update' THEN
                   UPDATE SET
                       target.asset_id = source.asset_id,
                       target.state_valid_from = source.state_valid_from,
                       target.state_valid_to = source.state_valid_to,
                       target.customer_name = source.customer_name,
                       target.state_hash = source.state_hash,
                       target.generated_at = source.generated_at
               WHEN NOT MATCHED AND source._merge_operation = 'insert' THEN
                   INSERT (asset_id, state_valid_from, state_valid_to, 
customer_name, state_hash, generated_at)
                   VALUES (
                       source.asset_id,
                       source.state_valid_from,
                       source.state_valid_to,
                       source.customer_name,
                       source.state_hash,
                       source.generated_at
                   )
               """
           )
       finally:
           spark.sql(f"DROP TABLE IF EXISTS {table_name}")
           spark.stop()
           shutil.rmtree(warehouse_dir, ignore_errors=True)
           shutil.rmtree(ivy_dir, ignore_errors=True)
   
   
   if __name__ == "__main__":
       main()
   ```
   
   ### Expected
   
   `MERGE INTO` should complete successfully.
   
   It should not fail with an internal planner assertion.
   
   ### Actual
   
   Spark fails during planning with an internal error:
   
   ```text
   org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase 
planning failed with an internal error.
   ...
   Caused by: java.lang.AssertionError: assertion failed: No plan for 
TableReference[asset_id#..., state_valid_from#..., state_valid_to#..., 
customer_name#..., state_hash#..., generated_at#...] 
iceberg.temporal.no_plan_for_tablereference
   ...
   at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
   ...
   at 
org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan.compileSubquery(InsertAdaptiveSparkPlan.scala:167)
   ```
   
   ### Workaround:
   
   Downgrade to spark 4.0.3 solved the issue for me.
   
   ### Notes
   
   - The failure reproduces consistently with local Spark 4.1.2
   - The reduced shape seems to be:
       - MERGE INTO Iceberg table
       - USING temp view
       - temp view lineage reads from the same target Iceberg table
       - planner later trips on TableReference[...]
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to