Github user harishreedharan commented on a diff in the pull request:
https://github.com/apache/spark/pull/6508#discussion_r31497408
--- Diff:
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -443,9 +466,27 @@ private[spark] class ExecutorAllocationManager(
private def onExecutorIdle(executorId: String): Unit = synchronized {
if (executorIds.contains(executorId)) {
if (!removeTimes.contains(executorId) &&
!executorsPendingToRemove.contains(executorId)) {
+
+ val hasCachedBlocks =
+ executorsWithCachedBlocks.contains(executorId) ||
+
executorEndpoints.get(executorId).exists(_.askWithRetry[Boolean](HasCachedBlocks))
+
+ if (hasCachedBlocks) executorsWithCachedBlocks += executorId
--- End diff --
@vanzin @andrewor14 - The idea is to not to do the RPC outside the
`synchronized` method, but to actually make sure the listener thread does not
get blocked. For that we'd have to fork off a new thread, that does the RPC
then enters a synchronized block which updates all of the internal state.
@lianhuiwang - It would be great if we could avoid the RPC round-trip, but
I am not sure exactly when the master would not know about certain blocks.
Given that, I do not want to risk correctness by depending on an assumption I
was not sure is correct. If someone who is more familiar with the whole
BlockManager code can confirm the RPC is not required, I am happy to remove it.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]