attilapiros opened a new pull request #24499: [SPARK-25888][Core] Serve local 
disk persisted blocks by the external service after releasing executor by 
dynamic allocation
URL: https://github.com/apache/spark/pull/24499
 
 
   # What changes were proposed in this pull request?
   
   ## Problem statement
   
   An executor which has persisted blocks does not consider to be idle and this 
way ready to be released by dynamic allocation after the regular timeout 
`spark.dynamicAllocation.executorIdleTimeout` but there is separate 
configuration `spark.dynamicAllocation.cachedExecutorIdleTimeout` which 
defaults to `Integer.MAX_VALUE`. This is because releasing the executor also 
means losing the persisted blocks (as the metadata for individual blocks called 
`BlockInfo` are kept in memory) and when the RDD is referenced latter on this 
lost blocks will be recomputed. 
   On the other hand keeping the executors too long without any task to work on 
is also a waste of resources (as executors are reserved for the application by 
the resource manager).
   
   ## Solution
   
   This PR focuses on the first part of SPARK-25888: it extends the external 
shuffle service with the capability to serve RDD blocks which are persisted on 
the local disk store by the executors. Moreover when a block is reported to be 
persisted on to disk the external shuffle service instance running on the same 
host as the executor is also registered (along with the reporting block 
manager) as a possible location for fetching it. 
   
   ## Some implementation detail
   
   Some explanation about the decisions made during the development:
   - Introducing `BlockTransferClient` and `BlockTransferClientSync` instead of 
moving `BlockTransferService#fetchBlockSync()` method to `ShuffleClient`:
   `ShuffleClient` is in the `spark-network-shuffle` artifact where neither 
`EncryptedManagedBuffer` and `SparkException` (used in the `awaitResult()` 
which should have been reimplemented here with a Java promise) are not 
available.
   - the location list to fetch a block was randomized but the groups (same 
host, same rack, others) order was kept. In this PR the order of groups are 
kept and external shuffle service added to the end of the each group.
   - `BlockManagerInfo` is not introduced for external shuffle service but only 
a lightweight solution is taken. A hash map from `BlockId` to `BlockStatus` is 
introduced. A type alias would make the source more readable but I know it is 
discouraged. On the other hand a new class wrapping this hash map would 
introduce unnecessary indirection.  
   
   # How was this patch tested?
   
   ## Unit tests
   
   ### ExternalShuffleServiceSuite
   
   Here the complete use case is tested by the "SPARK-25888: using external 
shuffle service fetching disk persisted blocks" with a tiny difference: here 
the executor is killed manually, this way the test is a bit faster than waiting 
for the idle timeout.
   
   ### ExternalShuffleBlockHandlerSuite
   
   Tests the fetching of the RDD blocks via the external shuffle service.
   
   ### BlockManagerInfoSuite 
   
   This a new suite. As the `BlockManagerInfo` behaviour depends very much on 
whether the external shuffle service enabled or not all the tests are executed 
with and without it.
   
   ### BlockManagerSuite
   
   Tests the sorting of the block locations. 
   
   ## Manually on YARN
   
   Spark App was:
   
   ~~~scala
   package com.mycompany
   
   import org.apache.spark.rdd.RDD
   import org.apache.spark.{SparkContext, SparkConf}
   import org.apache.spark.storage.StorageLevel
   
   object TestAppDiskOnlyLevel {
     def main(args: Array[String]): Unit = {
       val conf = new SparkConf().setAppName("test-app")
   
       println("Attila: START")
       val sc = new SparkContext(conf)
       val rdd = sc.parallelize(0 until 100, 10)
         .map { i =>
           println(s"Attila: calculate first rdd i=$i")
           Thread.sleep(1000)
           i
         }
   
       rdd.persist(StorageLevel.DISK_ONLY)
       rdd.count()
   
       println("Attila: First RDD is processed, waiting for 60 sec")
   
       Thread.sleep(60 * 1000)
   
       println("Attila: Num executors must be 0 as executorIdleTimeout is way 
over")
   
       val rdd2 = sc.parallelize(0 until 10, 1)
         .map(i => (i, 1))
         .persist(StorageLevel.DISK_ONLY)
   
       rdd2.count()
   
       println("Attila: Second RDD with one partition (only one executors must 
be alive)")
       println("Attila: Calling collect on the first RDD:")
   
       rdd.collect()
   
       println("Attila: STOP")
     }
   }
   ~~~
   
   I have submitted with the following configuration:
   
   ~~~bash
   spark-submit --master yarn \
     --conf spark.dynamicAllocation.enabled=true \
     --conf spark.dynamicAllocation.executorIdleTimeout=30 \
     --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=90 \
     --class com.mycompany.TestAppDiskOnlyLevel 
dyn_alloc_demo-core_2.11-0.1.0-SNAPSHOT-jar-with-dependencies.jar
   ~~~
   
   Checked the result by filtering for the side effect of the task calculations:
   
   ~~~bash
   [user@server ~]$ yarn logs -applicationId application_1556299359453_0001 | 
grep "Attila: calculate" | wc -l
   WARNING: YARN_OPTS has been replaced by HADOOP_OPTS. Using value of 
YARN_OPTS.
   19/04/26 10:31:59 INFO client.RMProxy: Connecting to ResourceManager at 
apiros-1.gce.company.com/172.31.115.165:8032
   100
   ~~~
   
   So it is only 100 task execution and not 200 (which would be the case for 
re-computation).
   
   Moreover from the submit/launcher log we can see executors really stopped in 
between (see the new total is 0 before the last line): 
   ~~~
   [user@server ~]$ grep "Attila: Num executors must be 0" -B 2 spark-submit.log
   19/04/26 10:24:27 INFO cluster.YarnScheduler: Executor 9 on 
apiros-3.gce.company.com killed by driver.
   19/04/26 10:24:27 INFO spark.ExecutorAllocationManager: Existing executor 9 
has been removed (new total is 0)
   Attila: Num executors must be 0 as executorIdleTimeout is way over
   ~~~
   
   [Full spark submit 
log](https://github.com/attilapiros/spark/files/3122465/spark-submit.log)
   
   I have done a test also after changing the `DISK_ONLY` storage level to 
`MEMORY_ONLY` for the first RDD. After this change during the 60sec waiting no 
executor was removed. 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to