Github user josephlijia commented on the pull request:
https://github.com/apache/spark/pull/1297#issuecomment-110615752
Look at the code below:
def multiget(ks: Array[Id]): Map[Id, V] = {
val ksByPartition = ks.groupBy(k =>
self.partitioner.get.getPartition(k))
val results: Array[Array[(Id, V)]] =
self.context.runJob(self.partitionsRDD,
(context: TaskContext, partIter: Iterator[P[V]]) => {
if (partIter.hasNext &&
ksByPartition.contains(context.partitionId)) {
val part = partIter.next()
val ksForPartition = ksByPartition.get(context.partitionId).get
part.multiget(ksForPartition).toArray
} else {
Array.empty
}
}, partitions, allowLocal = true)}
It partitions the keys according to self partitioner. That is to say, each
partition only gets its corresponding keys. It does't send all keys to all
partitions, is it?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]