attilapiros commented on a change in pull request #24499: [SPARK-25888][Core] 
Serve local disk persisted blocks by the external service after releasing 
executor by dynamic allocation
URL: https://github.com/apache/spark/pull/24499#discussion_r280565768
 
 

 ##########
 File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
 ##########
 @@ -179,6 +179,18 @@ public ManagedBuffer getBlockData(
     return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
   }
 
+  public ManagedBuffer getBlockData(
 
 Review comment:
   In the 
#https://github.com/apache/spark/pull/24499/commits/5933ef0723539807356153c767696a1d3cd2b144
 commit there are three changes for handling the deleted files in the external 
shuffle service (and the standalone mode problem mentioned above):
   
   ### Short overview of the changes
   
   1) In `ExternalShuffleBlockResolver#getDiskPersistedRddBlockData` file 
existence is explicitly checked before creating the `FileSegmentManagedBuffer` 
and if the file is missing `null` is passed. Then extended 
`OneForOneStreamManager` and `ChunkFetchRequestHandler` to handle correctly 
this `null` value (in `ChunkFetchRequestHandler` an `IllegalStateException` 
with the `Chunk was not found` message is thrown and this appears in the logs). 
   2) As the previous point is still not 100% perfect there is a change in 
`BlockManager`. At block fetching the buffer size is required to be non-zero if 
the blockSize is non zero (blockSize comes from the block manager master). I am 
afraid here equality between these two properties cannot be checked as a 
MEMORY_ONLY block might be fetched into the disk.
   3) In the `Worker` I have added the warning about the overriding / switching 
off the `spark.storage.cleanupFilesAfterExecutorExit` setting. I have checked 
manually (and also the source code) that the cleanup is really done at the 
stopping of the application.
   
   ### Manual tests
   
   Several manual tests are done.
   
   #### testing point 1)
   
   After the 1) point 
`ExternalShuffleBlockResolver#getDiskPersistedRddBlockData` and related changes 
(here the my expectation was to see a re-trigger of the calculations over 
silently fetched empty blocks):
   
   ~~~
   $ grep "Attila: calculate"  work/app-20190502115425-0000/*/stdout |wc -l
        200
   
   $ grep "Failed to fetch" -A3  work/app-20190502115425-0000/*/stderr
   work/app-20190502115425-0000/10/stderr:19/05/02 11:56:07 ERROR 
RetryingBlockFetcher: Failed to fetch block rdd_1_2, and will not retry (0 
retries)
   
work/app-20190502115425-0000/10/stderr-org.apache.spark.network.client.ChunkFetchFailureException:
 Failure while fetching StreamChunkId{streamId=1537044795002, chunkIndex=0}: 
java.lang.IllegalStateException: Chunk was not found
   work/app-20190502115425-0000/10/stderr-      at 
org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:95)
   work/app-20190502115425-0000/10/stderr-      at 
org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:51)
   --
   work/app-20190502115425-0000/10/stderr:19/05/02 11:56:07 WARN BlockManager: 
Failed to fetch block after 1 fetch failures. Most recent failure cause:
   work/app-20190502115425-0000/10/stderr-org.apache.spark.SparkException: 
Exception thrown in awaitResult:
   work/app-20190502115425-0000/10/stderr-      at 
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:225)
   work/app-20190502115425-0000/10/stderr-      at 
org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:121)
   ~~~ 
   
   #### testing point 2)
   
   The `BlockManager` changes also tested separately from the previous point 
(this also re-triggers the calculations on its own):
   
   ~~~
   $ grep "Attila: calculate" work/app-20190502100445-0000/*/stdout  | wc -l
        200
   
   $ grep "Failed to fetch" -A2  work/app-20190502110418-0000/*/stderr
   work/app-20190502110418-0000/10/stderr:19/05/02 11:06:04 WARN BlockManager: 
Failed to fetch block after 1 fetch failures. Most recent failure cause:
   work/app-20190502110418-0000/10/stderr-java.lang.IllegalStateException: 
Empty buffer received for non empty block
   work/app-20190502110418-0000/10/stderr-      at 
org.apache.spark.storage.BlockManager.getRemoteManagedBuffer(BlockManager.scala:904)
   --
   work/app-20190502110418-0000/11/stderr:19/05/02 11:06:04 WARN BlockManager: 
Failed to fetch block after 1 fetch failures. Most recent failure cause:
   work/app-20190502110418-0000/11/stderr-java.lang.IllegalStateException: 
Empty buffer received for non empty block
   work/app-20190502110418-0000/11/stderr-      at 
org.apache.spark.storage.BlockManager.getRemoteManagedBuffer(BlockManager.scala:904)
   --
   work/app-20190502110418-0000/12/stderr:19/05/02 11:06:07 WARN BlockManager: 
Failed to fetch block after 1 fetch failures. Most recent failure cause:
   work/app-20190502110418-0000/12/stderr-java.lang.IllegalStateException: 
Empty buffer received for non empty block
   ~~~
   
   #### testing together
   
   Finally I have tested the whole solution:
   
   ~~~
   $ grep "Attila" work/app-20190502202206-0000/*/stdout | wc -l
        100
   ~~~~
   
   Along the test app is changed to do a local reduce on the collected RDD:
   
   ```scala    
   println("Calling collect on the first RDD: " + rdd.collect().reduce(_ + _))
   ``` 
   
   This can be found in the logs:
   
   ```
   Calling collect on the first RDD: 4950
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to