Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/6652#discussion_r32055013
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---
@@ -1384,17 +1400,32 @@ class DAGScheduler(
if (rddPrefs.nonEmpty) {
return rddPrefs.map(TaskLocation(_))
}
- // If the RDD has narrow dependencies, pick the first partition of the
first narrow dep
- // that has any placement preferences. Ideally we would choose based
on transfer sizes,
- // but this will do for now.
+
rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
+ // If the RDD has narrow dependencies, pick the first partition of
the first narrow dep
+ // that has any placement preferences. Ideally we would choose
based on transfer sizes,
+ // but this will do for now.
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
if (locs != Nil) {
return locs
}
}
+ case s: ShuffleDependency[_, _, _] =>
+ // For shuffle dependencies, pick locations which have at least
REDUCER_PREF_LOCS_FRACTION
+ // of data as preferred locations
+ if (shuffleLocalityEnabled &&
+ rdd.partitions.size < SHUFFLE_PREF_REDUCE_THRESHOLD &&
+ s.rdd.partitions.size < SHUFFLE_PREF_MAP_THRESHOLD) {
+ // Get the preferred map output locations for this reducer
+ val topLocsForReducer =
mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,
+ partition, rdd.partitions.size, REDUCER_PREF_LOCS_FRACTION)
+ if (topLocsForReducer.nonEmpty) {
+ return topLocsForReducer.get.map(loc => TaskLocation(loc.host,
loc.executorId))
+ }
+ }
+
case _ =>
--- End diff --
I think you can delete this pattern now, it shouldn't ever occur.
incidentally, this also suggests that `Depedendency` should be `sealed`. I
just tried making that change, it turned up these other warnings, which
actually seem legit:
```
warn]
/Users/irashid/github/spark/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala:136:
non-variable type argument Product2[K,Any] in type pattern
org.apache.spark.OneToOneDependency[Product2[K,Any]] is unchecked since it is
eliminated by erasure
[warn] case oneToOneDependency: OneToOneDependency[Product2[K, Any]]
=>
[warn] ^
[warn]
/Users/irashid/github/spark/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala:135:
match may not be exhaustive.
[warn] It would fail on the following input: NarrowDependency()
[warn] for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
[warn] ^
[warn]
/Users/irashid/github/spark/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala:109:
match may not be exhaustive.
[warn] It would fail on the following input: NarrowDependency()
[warn] dependencies(depNum) match {
[warn] ^
[warn] three warnings found
```
It seems that code will break if they are ever given a `RangeDependency`
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]