sunchao commented on code in PR #42194:
URL: https://github.com/apache/spark/pull/42194#discussion_r1288949568


##########
core/src/main/scala/org/apache/spark/Partitioner.scala:
##########
@@ -137,6 +137,18 @@ private[spark] class PartitionIdPassthrough(override val 
numPartitions: Int) ext
   override def getPartition(key: Any): Int = key.asInstanceOf[Int]
 }
 
+/**
+ * A [[org.apache.spark.Partitioner]] that partitions all records use 
partition value map
+ */
+private[spark] class KeyGroupedPartitioner(
+    valueMap: mutable.Map[Seq[Any], Int],

Review Comment:
   nit: can we add some comments on what `valueMap` is? 



##########
core/src/main/scala/org/apache/spark/Partitioner.scala:
##########
@@ -137,6 +137,18 @@ private[spark] class PartitionIdPassthrough(override val 
numPartitions: Int) ext
   override def getPartition(key: Any): Int = key.asInstanceOf[Int]
 }
 
+/**
+ * A [[org.apache.spark.Partitioner]] that partitions all records use 
partition value map
+ */
+private[spark] class PartitionValueMapPartitioner(
+    valueMap: mutable.Map[Seq[Any], Int],
+    override val numPartitions: Int) extends Partitioner {
+  override def getPartition(key: Any): Int = {
+    val keys = key.asInstanceOf[Seq[Any]]
+    valueMap.getOrElseUpdate(keys, Utils.nonNegativeMod(keys.hashCode, 
numPartitions))

Review Comment:
   I wonder what will happen if we have the following in the `valueMap`:
   ```
   key1 -> 0
   key2 -> 1
   key3 -> 2
   ```
   
   Now we have a incoming `key4` that is not the same as any of `key[1-3]`. 
This means we have a row on one side that doesn't belong to any partition on 
the other side. In this case, the current logic is we just take the hash and 
randomly pick an existing partition between 1 and 3 for the row.
   
   This should work for inner joins, and maybe OK for outer joins too (although 
I think we should check closely on this). Could you add some unit tests to 
cover this scenario?
   



##########
core/src/main/scala/org/apache/spark/Partitioner.scala:
##########
@@ -137,6 +137,18 @@ private[spark] class PartitionIdPassthrough(override val 
numPartitions: Int) ext
   override def getPartition(key: Any): Int = key.asInstanceOf[Int]
 }
 
+/**
+ * A [[org.apache.spark.Partitioner]] that partitions all records use 
partition value map
+ */
+private[spark] class PartitionValueMapPartitioner(
+    valueMap: mutable.Map[Seq[Any], Int],

Review Comment:
   I see. I guess it also makes sense to keep `KeyGroupedPartitioner` in 
`spark-catalyst` or `spark-sql`, since the concept of `KeyGroupedPartitioning` 
only applies to Spark SQL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to