attilapiros commented on a change in pull request #24499: [SPARK-25888][Core]
Serve local disk persisted blocks by the external service after releasing
executor by dynamic allocation
URL: https://github.com/apache/spark/pull/24499#discussion_r280565768
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
##########
@@ -179,6 +179,18 @@ public ManagedBuffer getBlockData(
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
}
+ public ManagedBuffer getBlockData(
Review comment:
In my next commit three changes regarding the handling of deleted files:
### Short overview of the changes
1) In `ExternalShuffleBlockResolver#getDiskPersistedRddBlockData` file
existence is explicitly checked before creating the `FileSegmentManagedBuffer`
and if the file is missing `null` is passed. Then extended
`OneForOneStreamManager` and `ChunkFetchRequestHandler` to handle correctly
this `null` value (in `ChunkFetchRequestHandler` an `IllegalStateException`
with the `Chunk was not found` message is thrown and this appears in the logs).
2) As the previous point is still not 100% perfect there is a change in
`BlockManager`. At block fetching the buffer size is required to be non-zero if
the blockSize is non zero (blockSize comes from the block manager master). I am
afraid here equality between these two properties cannot be checked as a
MEMORY_ONLY block might be fetched into the disk.
3) In the `Worker` I have added the warning about the overriding / switching
off the `spark.storage.cleanupFilesAfterExecutorExit` setting. I have checked
manually (and also the source code) that the cleanup is really done at the
stopping of the application.
### Manual tests
Several manual tests are done.
#### testing point 1)
After the 1) point
`ExternalShuffleBlockResolver#getDiskPersistedRddBlockData` and related changes
(here the my expectation was to see a re-trigger of the calculations over
silently fetched empty blocks):
~~~
$ grep "Attila: calculate" work/app-20190502115425-0000/*/stdout |wc -l
200
$ grep "Failed to fetch" -A3 work/app-20190502115425-0000/*/stderr
work/app-20190502115425-0000/10/stderr:19/05/02 11:56:07 ERROR
RetryingBlockFetcher: Failed to fetch block rdd_1_2, and will not retry (0
retries)
work/app-20190502115425-0000/10/stderr-org.apache.spark.network.client.ChunkFetchFailureException:
Failure while fetching StreamChunkId{streamId=1537044795002, chunkIndex=0}:
java.lang.IllegalStateException: Chunk was not found
work/app-20190502115425-0000/10/stderr- at
org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:95)
work/app-20190502115425-0000/10/stderr- at
org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:51)
--
work/app-20190502115425-0000/10/stderr:19/05/02 11:56:07 WARN BlockManager:
Failed to fetch block after 1 fetch failures. Most recent failure cause:
work/app-20190502115425-0000/10/stderr-org.apache.spark.SparkException:
Exception thrown in awaitResult:
work/app-20190502115425-0000/10/stderr- at
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:225)
work/app-20190502115425-0000/10/stderr- at
org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:121)
~~~
#### testing point 2)
The `BlockManager` changes also tested separately from the previous point
(this also re-triggers the calculations on its own):
~~~
$ grep "Attila: calculate" work/app-20190502100445-0000/*/stdout | wc -l
200
$ grep "Failed to fetch" -A2 work/app-20190502110418-0000/*/stderr
work/app-20190502110418-0000/10/stderr:19/05/02 11:06:04 WARN BlockManager:
Failed to fetch block after 1 fetch failures. Most recent failure cause:
work/app-20190502110418-0000/10/stderr-java.lang.IllegalStateException:
Empty buffer received for non empty block
work/app-20190502110418-0000/10/stderr- at
org.apache.spark.storage.BlockManager.getRemoteManagedBuffer(BlockManager.scala:904)
--
work/app-20190502110418-0000/11/stderr:19/05/02 11:06:04 WARN BlockManager:
Failed to fetch block after 1 fetch failures. Most recent failure cause:
work/app-20190502110418-0000/11/stderr-java.lang.IllegalStateException:
Empty buffer received for non empty block
work/app-20190502110418-0000/11/stderr- at
org.apache.spark.storage.BlockManager.getRemoteManagedBuffer(BlockManager.scala:904)
--
work/app-20190502110418-0000/12/stderr:19/05/02 11:06:07 WARN BlockManager:
Failed to fetch block after 1 fetch failures. Most recent failure cause:
work/app-20190502110418-0000/12/stderr-java.lang.IllegalStateException:
Empty buffer received for non empty block
~~~
#### testing together
Finally I have tested the whole solution:
~~~
$ grep "Attila" work/app-20190502202206-0000/*/stdout | wc -l
100
~~~~
Along the test app is changed to do a local reduce on the collected RDD:
```scala
println("Calling collect on the first RDD: " + rdd.collect().reduce(_ + _))
```
This can be found in the logs:
```
Attila: sum: 4950
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]