Github user srowen commented on a diff in the pull request:
https://github.com/apache/spark/pull/3345#discussion_r20502430
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---
@@ -193,15 +193,16 @@ class DAGScheduler(
eventProcessActor ! TaskSetFailed(taskSet, reason)
}
- private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = {
- if (!cacheLocs.contains(rdd.id)) {
- val blockIds = rdd.partitions.indices.map(index =>
RDDBlockId(rdd.id, index)).toArray[BlockId]
- val locs = BlockManager.blockIdsToBlockManagers(blockIds, env,
blockManagerMaster)
- cacheLocs(rdd.id) = blockIds.map { id =>
- locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host,
bm.executorId))
- }
+ private def getLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = {
+ val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id,
index)).toArray[BlockId]
+ val locs = BlockManager.blockIdsToBlockManagers(blockIds, env,
blockManagerMaster)
+ blockIds.map { id =>
+ locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host,
bm.executorId))
}
- cacheLocs(rdd.id)
+ }
+
+ private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = {
+ cacheLocs.getOrElseUpdate(rdd.id,getLocs(rdd))
--- End diff --
Hm, is this actually thread-safe either? It's not a concurrent `Map`. Even
if it is, I'm not clear that it stops many values for being cached and computed
for an ID, but if that's cheap and they're immutable, that could be fine.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]