Github user kmadhugit commented on the pull request:
https://github.com/apache/spark/pull/7461#issuecomment-129291779
@shivaram , I've created a test case using a different approach. The
testcase would check whether the treeAggregate combines all local partitions
together in first reduction operation. The testcase is given below,
https://github.com/kmadhugit/spark-testcase/blob/master/src/main/scala/org/madhu/App.scala
But the testcase fails, because DAG scheduler doesn't seems to be
scheduling the reduce tasks based on the locality of partition in the mapside.
Lets take an example of a RDD with 6 partitions and 2 executors depth as 3.
The executor A has partitions 1,3,5 and executor B has 2,4 & 6. The default
tree Aggregation tries to merge no of partitions as (6 partitions)->(3)->(1)
but we would like to it to aggregate partitions as (6)->(2)->(1). By using
executor key in reduceByKey we were able trigger merging correctly, i.e 1,3,5
into one partition and 2,4,6 to another. However the actual merge of these
partitions((1,3,5)=>X & (2,4,6)=>Y) is happening on executor A itself, so the
data from partitions 2,4 & 6 are transferred from executor B to A. In effect we
couldn't avoid the shuffle effect even after selecting correct partitions to
merge.
I used two executors in same node in standalone mode. If that is the reason
for this behavior then we may need relax the requirement as same work-node
rather than same executor for local partitions merge.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]