[jira] [Commented] (SPARK-7703) Task failure caused by block fetch failure in BlockManager.doGetRemote() when using TorrentBroadcast

2016-05-26 Thread Chanh Le (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15303655#comment-15303655
 ] 

Chanh Le commented on SPARK-7703:
-

Any update on that? 
I have the same error.
java.io.IOException: org.apache.spark.storage.BlockFetchException: Failed to 
fetch block from 1 locations. Most recent failure cause:
https://gist.github.com/giaosudau/3f7087707dcabc53c3b3bf54b0503720

> Task failure caused by block fetch failure in BlockManager.doGetRemote() when 
> using TorrentBroadcast
> 
>
> Key: SPARK-7703
> URL: https://issues.apache.org/jira/browse/SPARK-7703
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.2.1, 1.3.1
> Environment: Red Hat Enterprise Linux Server release 7.0 (Maipo)
> Spark 1.3.1 Release
>Reporter: Hailong Wen
>
> I am from IBM Platform Symphony team and we are working to integration Spark 
> with our EGO to provide a fine-grained dynamic allocation Resource Manager. 
> We found a defect in current implementation of BlockManager.doGetRemote():
> {noformat}
>   private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): 
> Option[Any] = {
> require(blockId != null, "BlockId is null")
> val locations = Random.shuffle(master.getLocations(blockId)) 
> <--- Issue2: locations may be out of date
> for (loc <- locations) {
>   logDebug(s"Getting remote block $blockId from $loc")
>   val data = blockTransferService.fetchBlockSync(
> loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() 
>  <--- Issue1: This statement is not in try/catch
>   if (data != null) {
> if (asBlockResult) {
>   return Some(new BlockResult(
> dataDeserialize(blockId, data),
> DataReadMethod.Network,
> data.limit()))
> } else {
>   return Some(data)
> }
>   }
>   logDebug(s"The value of block $blockId is null")
> }
> logDebug(s"Block $blockId not found")
> None
>   }
> {noformat}
> * Issue 1: Although the block fetch uses "for" to try all available 
> locations, the fetch method is not guarded by a "Try" block. When exception 
> occurs, this method will directly throw the error instead of trying other 
> block locations. The uncaught exception will cause task failure.
> * Issue 2: Constant "location" is acquired before fetching, however in a 
> dynamic allocation environment the block locations may change.
> We hit the above 2 issues in our use case, where Executors exit after all its 
> assigned tasks are done. We *occasionally* get the following error (issue 1.):
> {noformat}
> 15/05/13 10:28:35 INFO Executor: Running task 27.0 in stage 0.0 (TID 27)
> 15/05/13 10:28:35 DEBUG Executor: Task 26's epoch is 0
> 15/05/13 10:28:35 DEBUG Executor: Task 28's epoch is 0
> 15/05/13 10:28:35 DEBUG Executor: Task 27's epoch is 0
> 15/05/13 10:28:35 DEBUG BlockManager: Getting local block broadcast_0
> 15/05/13 10:28:35 DEBUG BlockManager: Block broadcast_0 not registered locally
> 15/05/13 10:28:35 INFO TorrentBroadcast: Started reading broadcast variable 0
> 15/05/13 10:28:35 DEBUG TorrentBroadcast: Reading piece broadcast_0_piece0 of 
> broadcast_0
> 15/05/13 10:28:35 DEBUG BlockManager: Getting local block broadcast_0_piece0 
> as bytes
> 15/05/13 10:28:35 DEBUG BlockManager: Block broadcast_0_piece0 not registered 
> locally
> 15/05/13 10:28:35 DEBUG BlockManager: Getting remote block broadcast_0_piece0 
> as bytes
> 15/05/13 10:28:35 DEBUG BlockManager: Getting remote block broadcast_0_piece0 
> from BlockManagerId(c390c311-bd97-4a99-bcb9-b32fd3dede17, sparkbj01, 37599)
> 15/05/13 10:28:35 TRACE NettyBlockTransferService: Fetch blocks from 
> sparkbj01:37599 (executor id c390c311-bd97-4a99-bcb9-b32fd3dede17)
> 15/05/13 10:28:35 DEBUG TransportClientFactory: Creating new connection to 
> sparkbj01/9.111.254.195:37599
> 15/05/13 10:28:35 ERROR RetryingBlockFetcher: Exception while beginning fetch 
> of 1 outstanding blocks 
> java.io.IOException: Failed to connect to sparkbj01/9.111.254.195:37599
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>   at 
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>   at 
> org.apache.spark.network.netty.NettyBlockTransf

[jira] [Commented] (SPARK-7703) Task failure caused by block fetch failure in BlockManager.doGetRemote() when using TorrentBroadcast

2016-05-12 Thread Hailong Wen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15281557#comment-15281557
 ] 

Hailong Wen commented on SPARK-7703:


The issue will happen when executor is dynamic and when driver tries to fetch 
the data, the executor is being shutdown (executor's BlockManager stops serving 
the block, while the block location information are not updated in 
BlockManagerMaster, so driver still think it can fetch block data from this 
dead executor).

>From your description, it seems that your problem is that yarn-client OOM. 
>They should be different issues from my perspective.

> Task failure caused by block fetch failure in BlockManager.doGetRemote() when 
> using TorrentBroadcast
> 
>
> Key: SPARK-7703
> URL: https://issues.apache.org/jira/browse/SPARK-7703
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.2.1, 1.3.1
> Environment: Red Hat Enterprise Linux Server release 7.0 (Maipo)
> Spark 1.3.1 Release
>Reporter: Hailong Wen
>
> I am from IBM Platform Symphony team and we are working to integration Spark 
> with our EGO to provide a fine-grained dynamic allocation Resource Manager. 
> We found a defect in current implementation of BlockManager.doGetRemote():
> {noformat}
>   private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): 
> Option[Any] = {
> require(blockId != null, "BlockId is null")
> val locations = Random.shuffle(master.getLocations(blockId)) 
> <--- Issue2: locations may be out of date
> for (loc <- locations) {
>   logDebug(s"Getting remote block $blockId from $loc")
>   val data = blockTransferService.fetchBlockSync(
> loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() 
>  <--- Issue1: This statement is not in try/catch
>   if (data != null) {
> if (asBlockResult) {
>   return Some(new BlockResult(
> dataDeserialize(blockId, data),
> DataReadMethod.Network,
> data.limit()))
> } else {
>   return Some(data)
> }
>   }
>   logDebug(s"The value of block $blockId is null")
> }
> logDebug(s"Block $blockId not found")
> None
>   }
> {noformat}
> * Issue 1: Although the block fetch uses "for" to try all available 
> locations, the fetch method is not guarded by a "Try" block. When exception 
> occurs, this method will directly throw the error instead of trying other 
> block locations. The uncaught exception will cause task failure.
> * Issue 2: Constant "location" is acquired before fetching, however in a 
> dynamic allocation environment the block locations may change.
> We hit the above 2 issues in our use case, where Executors exit after all its 
> assigned tasks are done. We *occasionally* get the following error (issue 1.):
> {noformat}
> 15/05/13 10:28:35 INFO Executor: Running task 27.0 in stage 0.0 (TID 27)
> 15/05/13 10:28:35 DEBUG Executor: Task 26's epoch is 0
> 15/05/13 10:28:35 DEBUG Executor: Task 28's epoch is 0
> 15/05/13 10:28:35 DEBUG Executor: Task 27's epoch is 0
> 15/05/13 10:28:35 DEBUG BlockManager: Getting local block broadcast_0
> 15/05/13 10:28:35 DEBUG BlockManager: Block broadcast_0 not registered locally
> 15/05/13 10:28:35 INFO TorrentBroadcast: Started reading broadcast variable 0
> 15/05/13 10:28:35 DEBUG TorrentBroadcast: Reading piece broadcast_0_piece0 of 
> broadcast_0
> 15/05/13 10:28:35 DEBUG BlockManager: Getting local block broadcast_0_piece0 
> as bytes
> 15/05/13 10:28:35 DEBUG BlockManager: Block broadcast_0_piece0 not registered 
> locally
> 15/05/13 10:28:35 DEBUG BlockManager: Getting remote block broadcast_0_piece0 
> as bytes
> 15/05/13 10:28:35 DEBUG BlockManager: Getting remote block broadcast_0_piece0 
> from BlockManagerId(c390c311-bd97-4a99-bcb9-b32fd3dede17, sparkbj01, 37599)
> 15/05/13 10:28:35 TRACE NettyBlockTransferService: Fetch blocks from 
> sparkbj01:37599 (executor id c390c311-bd97-4a99-bcb9-b32fd3dede17)
> 15/05/13 10:28:35 DEBUG TransportClientFactory: Creating new connection to 
> sparkbj01/9.111.254.195:37599
> 15/05/13 10:28:35 ERROR RetryingBlockFetcher: Exception while beginning fetch 
> of 1 outstanding blocks 
> java.io.IOException: Failed to connect to sparkbj01/9.111.254.195:37599
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>   at 
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetc

[jira] [Commented] (SPARK-7703) Task failure caused by block fetch failure in BlockManager.doGetRemote() when using TorrentBroadcast

2016-05-11 Thread jianbo li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15281257#comment-15281257
 ] 

jianbo li commented on SPARK-7703:
--

Hi Hailong Wen,

Do you know when will this issue happen? In our cluster with spark 1.5.2, it 
seems the issue will occur when the spark driver on yarn-client is fetching 
large amount result data? 
And the driver's maximum heap size is full.

> Task failure caused by block fetch failure in BlockManager.doGetRemote() when 
> using TorrentBroadcast
> 
>
> Key: SPARK-7703
> URL: https://issues.apache.org/jira/browse/SPARK-7703
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.2.1, 1.3.1
> Environment: Red Hat Enterprise Linux Server release 7.0 (Maipo)
> Spark 1.3.1 Release
>Reporter: Hailong Wen
>
> I am from IBM Platform Symphony team and we are working to integration Spark 
> with our EGO to provide a fine-grained dynamic allocation Resource Manager. 
> We found a defect in current implementation of BlockManager.doGetRemote():
> {noformat}
>   private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): 
> Option[Any] = {
> require(blockId != null, "BlockId is null")
> val locations = Random.shuffle(master.getLocations(blockId)) 
> <--- Issue2: locations may be out of date
> for (loc <- locations) {
>   logDebug(s"Getting remote block $blockId from $loc")
>   val data = blockTransferService.fetchBlockSync(
> loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() 
>  <--- Issue1: This statement is not in try/catch
>   if (data != null) {
> if (asBlockResult) {
>   return Some(new BlockResult(
> dataDeserialize(blockId, data),
> DataReadMethod.Network,
> data.limit()))
> } else {
>   return Some(data)
> }
>   }
>   logDebug(s"The value of block $blockId is null")
> }
> logDebug(s"Block $blockId not found")
> None
>   }
> {noformat}
> * Issue 1: Although the block fetch uses "for" to try all available 
> locations, the fetch method is not guarded by a "Try" block. When exception 
> occurs, this method will directly throw the error instead of trying other 
> block locations. The uncaught exception will cause task failure.
> * Issue 2: Constant "location" is acquired before fetching, however in a 
> dynamic allocation environment the block locations may change.
> We hit the above 2 issues in our use case, where Executors exit after all its 
> assigned tasks are done. We *occasionally* get the following error (issue 1.):
> {noformat}
> 15/05/13 10:28:35 INFO Executor: Running task 27.0 in stage 0.0 (TID 27)
> 15/05/13 10:28:35 DEBUG Executor: Task 26's epoch is 0
> 15/05/13 10:28:35 DEBUG Executor: Task 28's epoch is 0
> 15/05/13 10:28:35 DEBUG Executor: Task 27's epoch is 0
> 15/05/13 10:28:35 DEBUG BlockManager: Getting local block broadcast_0
> 15/05/13 10:28:35 DEBUG BlockManager: Block broadcast_0 not registered locally
> 15/05/13 10:28:35 INFO TorrentBroadcast: Started reading broadcast variable 0
> 15/05/13 10:28:35 DEBUG TorrentBroadcast: Reading piece broadcast_0_piece0 of 
> broadcast_0
> 15/05/13 10:28:35 DEBUG BlockManager: Getting local block broadcast_0_piece0 
> as bytes
> 15/05/13 10:28:35 DEBUG BlockManager: Block broadcast_0_piece0 not registered 
> locally
> 15/05/13 10:28:35 DEBUG BlockManager: Getting remote block broadcast_0_piece0 
> as bytes
> 15/05/13 10:28:35 DEBUG BlockManager: Getting remote block broadcast_0_piece0 
> from BlockManagerId(c390c311-bd97-4a99-bcb9-b32fd3dede17, sparkbj01, 37599)
> 15/05/13 10:28:35 TRACE NettyBlockTransferService: Fetch blocks from 
> sparkbj01:37599 (executor id c390c311-bd97-4a99-bcb9-b32fd3dede17)
> 15/05/13 10:28:35 DEBUG TransportClientFactory: Creating new connection to 
> sparkbj01/9.111.254.195:37599
> 15/05/13 10:28:35 ERROR RetryingBlockFetcher: Exception while beginning fetch 
> of 1 outstanding blocks 
> java.io.IOException: Failed to connect to sparkbj01/9.111.254.195:37599
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>   at 
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>   at 
> org.apache.spark.network.netty.NettyBlockTransfe

[jira] [Commented] (SPARK-7703) Task failure caused by block fetch failure in BlockManager.doGetRemote() when using TorrentBroadcast

2016-01-14 Thread Hailong Wen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15101119#comment-15101119
 ] 

Hailong Wen commented on SPARK-7703:


Note that this issue is exactly the same with SPARK-9591 and fixed in 1.6.0. So 
I'd prefer to close it.

> Task failure caused by block fetch failure in BlockManager.doGetRemote() when 
> using TorrentBroadcast
> 
>
> Key: SPARK-7703
> URL: https://issues.apache.org/jira/browse/SPARK-7703
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.2.1, 1.3.1
> Environment: Red Hat Enterprise Linux Server release 7.0 (Maipo)
> Spark 1.3.1 Release
>Reporter: Hailong Wen
> Fix For: 1.6.0
>
>
> I am from IBM Platform Symphony team and we are working to integration Spark 
> with our EGO to provide a fine-grained dynamic allocation Resource Manager. 
> We found a defect in current implementation of BlockManager.doGetRemote():
> {noformat}
>   private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): 
> Option[Any] = {
> require(blockId != null, "BlockId is null")
> val locations = Random.shuffle(master.getLocations(blockId)) 
> <--- Issue2: locations may be out of date
> for (loc <- locations) {
>   logDebug(s"Getting remote block $blockId from $loc")
>   val data = blockTransferService.fetchBlockSync(
> loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() 
>  <--- Issue1: This statement is not in try/catch
>   if (data != null) {
> if (asBlockResult) {
>   return Some(new BlockResult(
> dataDeserialize(blockId, data),
> DataReadMethod.Network,
> data.limit()))
> } else {
>   return Some(data)
> }
>   }
>   logDebug(s"The value of block $blockId is null")
> }
> logDebug(s"Block $blockId not found")
> None
>   }
> {noformat}
> * Issue 1: Although the block fetch uses "for" to try all available 
> locations, the fetch method is not guarded by a "Try" block. When exception 
> occurs, this method will directly throw the error instead of trying other 
> block locations. The uncaught exception will cause task failure.
> * Issue 2: Constant "location" is acquired before fetching, however in a 
> dynamic allocation environment the block locations may change.
> We hit the above 2 issues in our use case, where Executors exit after all its 
> assigned tasks are done. We *occasionally* get the following error (issue 1.):
> {noformat}
> 15/05/13 10:28:35 INFO Executor: Running task 27.0 in stage 0.0 (TID 27)
> 15/05/13 10:28:35 DEBUG Executor: Task 26's epoch is 0
> 15/05/13 10:28:35 DEBUG Executor: Task 28's epoch is 0
> 15/05/13 10:28:35 DEBUG Executor: Task 27's epoch is 0
> 15/05/13 10:28:35 DEBUG BlockManager: Getting local block broadcast_0
> 15/05/13 10:28:35 DEBUG BlockManager: Block broadcast_0 not registered locally
> 15/05/13 10:28:35 INFO TorrentBroadcast: Started reading broadcast variable 0
> 15/05/13 10:28:35 DEBUG TorrentBroadcast: Reading piece broadcast_0_piece0 of 
> broadcast_0
> 15/05/13 10:28:35 DEBUG BlockManager: Getting local block broadcast_0_piece0 
> as bytes
> 15/05/13 10:28:35 DEBUG BlockManager: Block broadcast_0_piece0 not registered 
> locally
> 15/05/13 10:28:35 DEBUG BlockManager: Getting remote block broadcast_0_piece0 
> as bytes
> 15/05/13 10:28:35 DEBUG BlockManager: Getting remote block broadcast_0_piece0 
> from BlockManagerId(c390c311-bd97-4a99-bcb9-b32fd3dede17, sparkbj01, 37599)
> 15/05/13 10:28:35 TRACE NettyBlockTransferService: Fetch blocks from 
> sparkbj01:37599 (executor id c390c311-bd97-4a99-bcb9-b32fd3dede17)
> 15/05/13 10:28:35 DEBUG TransportClientFactory: Creating new connection to 
> sparkbj01/9.111.254.195:37599
> 15/05/13 10:28:35 ERROR RetryingBlockFetcher: Exception while beginning fetch 
> of 1 outstanding blocks 
> java.io.IOException: Failed to connect to sparkbj01/9.111.254.195:37599
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
>   at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>   at 
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>   at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>   at 
> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
>   at 
> org.apache.spark.network.BlockTr

[jira] [Commented] (SPARK-7703) Task failure caused by block fetch failure in BlockManager.doGetRemote() when using TorrentBroadcast

2015-06-04 Thread Imran Rashid (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14573396#comment-14573396
 ] 

Imran Rashid commented on SPARK-7703:
-

Hi [~wenhailong1988],

Thanks for reporting this, and the thorough analysis.  I would need to study 
this code a little more carefully to be sure, what you are saying very 
reasonable.  Since you have already done a lot of the work to find a fix, would 
you like submit a pull request with the fix yourself?  You can see more details 
on contributing here: 
https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark

We generally do the more detailed code review on the github pull requests, but 
some high level comments:

* do you think this might be related to some other issues reported w/ 
TorrentBroadcast, in particular SPARK-5812 and SPARK-5594 ?  It appears to be 
different, but you might have more insight.
* can you create a small reproduction which demonstrates the issue?  It might 
be a fake job which manually kills some executors or something?  This will be 
useful for reviewers and also to help prevent future regressions.  It might be 
tough creating a test like this, there might not be the hooks you need.  I've 
recently been writing some tests that are somewhat like this, I might be able 
to help out some if you get stuck (or hopefully at least figure out what hooks 
we need to add).

> Task failure caused by block fetch failure in BlockManager.doGetRemote() when 
> using TorrentBroadcast
> 
>
> Key: SPARK-7703
> URL: https://issues.apache.org/jira/browse/SPARK-7703
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.2.1, 1.3.1
> Environment: Red Hat Enterprise Linux Server release 7.0 (Maipo)
> Spark 1.3.1 Release
>Reporter: Hailong Wen
>
> I am from IBM Platform Symphony team and we are working to integration Spark 
> with our EGO to provide a fine-grained dynamic allocation Resource Manager. 
> We found a defect in current implementation of BlockManager.doGetRemote():
> {noformat}
>   private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): 
> Option[Any] = {
> require(blockId != null, "BlockId is null")
> val locations = Random.shuffle(master.getLocations(blockId)) 
> <--- Issue2: locations may be out of date
> for (loc <- locations) {
>   logDebug(s"Getting remote block $blockId from $loc")
>   val data = blockTransferService.fetchBlockSync(
> loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() 
>  <--- Issue1: This statement is not in try/catch
>   if (data != null) {
> if (asBlockResult) {
>   return Some(new BlockResult(
> dataDeserialize(blockId, data),
> DataReadMethod.Network,
> data.limit()))
> } else {
>   return Some(data)
> }
>   }
>   logDebug(s"The value of block $blockId is null")
> }
> logDebug(s"Block $blockId not found")
> None
>   }
> {noformat}
> * Issue 1: Although the block fetch uses "for" to try all available 
> locations, the fetch method is not guarded by a "Try" block. When exception 
> occurs, this method will directly throw the error instead of trying other 
> block locations. The uncaught exception will cause task failure.
> * Issue 2: Constant "location" is acquired before fetching, however in a 
> dynamic allocation environment the block locations may change.
> We hit the above 2 issues in our use case, where Executors exit after all its 
> assigned tasks are done. We *occasionally* get the following error (issue 1.):
> {noformat}
> 15/05/13 10:28:35 INFO Executor: Running task 27.0 in stage 0.0 (TID 27)
> 15/05/13 10:28:35 DEBUG Executor: Task 26's epoch is 0
> 15/05/13 10:28:35 DEBUG Executor: Task 28's epoch is 0
> 15/05/13 10:28:35 DEBUG Executor: Task 27's epoch is 0
> 15/05/13 10:28:35 DEBUG BlockManager: Getting local block broadcast_0
> 15/05/13 10:28:35 DEBUG BlockManager: Block broadcast_0 not registered locally
> 15/05/13 10:28:35 INFO TorrentBroadcast: Started reading broadcast variable 0
> 15/05/13 10:28:35 DEBUG TorrentBroadcast: Reading piece broadcast_0_piece0 of 
> broadcast_0
> 15/05/13 10:28:35 DEBUG BlockManager: Getting local block broadcast_0_piece0 
> as bytes
> 15/05/13 10:28:35 DEBUG BlockManager: Block broadcast_0_piece0 not registered 
> locally
> 15/05/13 10:28:35 DEBUG BlockManager: Getting remote block broadcast_0_piece0 
> as bytes
> 15/05/13 10:28:35 DEBUG BlockManager: Getting remote block broadcast_0_piece0 
> from BlockManagerId(c390c311-bd97-4a99-bcb9-b32fd3dede17, sparkbj01, 37599)
> 15/05/13 10:28:35 TRACE NettyBlockTransferService: Fetch blocks from 
> sparkbj01:37599 (