szehon-ho commented on code in PR #46255:
URL: https://github.com/apache/spark/pull/46255#discussion_r1620089266
##########
core/src/main/scala/org/apache/spark/Partitioner.scala:
##########
@@ -149,7 +150,9 @@ private[spark] class KeyGroupedPartitioner(
override val numPartitions: Int) extends Partitioner {
override def getPartition(key: Any): Int = {
val keys = key.asInstanceOf[Seq[Any]]
- valueMap.getOrElseUpdate(keys, Utils.nonNegativeMod(keys.hashCode,
numPartitions))
+ val normalizedKeys = ArraySeq.from(keys)
Review Comment:
Iirc, I hit a bug due to trying to compare different Seq types (more info in
the pr description)
> normalize the valueMap key type in KeyGroupedPartitioner to use specific
Seq implementation class. Previously the partitioner's map are initialized with
keys as Vector , but then compared with keys as ArraySeq, and these seem to
have different hashcodes, so will always create new entries with new partition
ids.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]