peter-toth commented on code in PR #54182:
URL: https://github.com/apache/spark/pull/54182#discussion_r2777646294
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/KeyGroupedPartitionedScan.scala:
##########
@@ -52,16 +52,16 @@ trait KeyGroupedPartitionedScan[T] {
case Some(projectionPositions) =>
val internalRowComparableWrapperFactory =
InternalRowComparableWrapper.getInternalRowComparableWrapperFactory(
- expressions.map(_.dataType))
+ projectedExpressions.map(_.dataType))
basePartitioning.partitionValues.map { r =>
- val projectedRow = KeyGroupedPartitioning.project(expressions,
+ val projectedRow =
KeyGroupedPartitioning.project(basePartitioning.expressions,
Review Comment:
Actually the wrong projected `excepression` is the root cause of the
`ArrayIndexOutOfBoundsException` you hit and passing in
`basePartitioning.expressions` looks good.
But the test you added will unlikely pass as there is an issue with the test
framework.
I left a note here:
https://github.com/apache/spark/blob/34052557733b3e4ad768d12bb14e27166a005022/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala#L2801-L2802,
but forgot to open a fix for the problem with using `bucket()` in these one
side shuffle tests.
The problem is that the `bucket()` implementation here:
https://github.com/apache/spark/blob/34052557733b3e4ad768d12bb14e27166a005022/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala#L93-L95
and in `InMemoryBaseTable`:
https://github.com/apache/spark/blob/34052557733b3e4ad768d12bb14e27166a005022/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala#L240-L247
mismatch.
So technically the partition keys that the datasource reports and the
calculated key of the partition where the partitioner puts the shuffled records
don't match.
@pan3793, could you please keep your fix in
`KeyGroupedPartitionedScan.scala‎` and fix the `BucketTransform` key
calculation in `InMemoryBaseTable`?
You don't need need the other changes. `originalPartitionValues` seems
unrelated as it is used only when partially clustered distribution is enabled.
BTW, I'm working on refactoring SPJ based on an this idea:
https://github.com/apache/spark/pull/53859#issuecomment-3822791189 and it looks
prosmissing so far, but need some more days to wrap it up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]