peter-toth commented on PR #55519: URL: https://github.com/apache/spark/pull/55519#issuecomment-4592341717
@ammarchalifah , as far as I see from [the logs](https://github.com/apache/iceberg/actions/runs/26567729434/job/78266654659?pr=16424) of [your PR](https://github.com/apache/iceberg/pull/16424) your new `testJoinWithBucketStringSubsetOfPartitionKeys` test passed: ``` TestStoragePartitionedJoins > testJoinsWithUnpartitionedTables() > catalogName = testhadoop, implementation = org.apache.iceberg.spark.SparkCatalog, config = {type=hadoop, cache-enabled=false}, planningMode = LOCAL PASSED ``` while your new `testMergeIntoWithBucketStringSubsetOfPartitionKeys` test failed with: ``` TestStoragePartitionedJoins > testMergeIntoWithBucketStringSubsetOfPartitionKeys() > catalogName = testhadoop, implementation = org.apache.iceberg.spark.SparkCatalog, config = {type=hadoop, cache-enabled=false}, planningMode = LOCAL FAILED java.lang.IllegalStateException: Unknown type for int field. Type name: java.lang.String at org.apache.iceberg.spark.source.StructInternalRow.getInt(StructInternalRow.java:138) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificOrdering.compare(Unknown Source) ... at scala.collection.AbstractIterable.toSet(Iterable.scala:936) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.filteredPartitions$lzycompute(BatchScanExec.scala:87) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.filteredPartitions(BatchScanExec.scala:64) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputRDD$lzycompute(BatchScanExec.scala:130) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputRDD(BatchScanExec.scala:129) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.doExecuteColumnar(DataSourceV2ScanExecBase.scala:207) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.doExecuteColumnar$(DataSourceV2ScanExecBase.scala:205) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.doExecuteColumnar(BatchScanExec.scala:38) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnarRDD$1(SparkPlan.scala:222) ``` , which is very different to the [original stacktrace in the issue you reported](https://github.com/apache/iceberg/issues/15349#issuecomment-3915636021). The reason for this failure is that runtime filtering is enabled for row-level operations (`spark.sql.optimizer.runtime.rowLevelOperationGroupFilter.enabled`) by default, and due to the inserted dynamic filter the filtered partitions keys are calculated incorrectly using a `KeyGroupedPartitioning` that contains only a subset of partitioning expressions when `allowJoinKeysSubsetOfPartitionKeys` is enabled. This issue got fixed in Spark 4.2 by the SPJ refactor as a side effect due to extracting the SPJ logic from the scan into a new operator. As a workaround you can try disabling the config. Regarding the stacktrace/issue in https://github.com/apache/iceberg/issues/15349#issuecomment-3915636021, I still think [SPARK-54439](https://issues.apache.org/jira/browse/SPARK-54439) / https://github.com/apache/spark/pull/53142 is very likely the cure. If you need the fix in Spark 3.5 then we can try to backport it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
