[GitHub] spark pull request #14710: [SPARK-16533][CORE] resolve deadlocking in driver...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/14710 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14710: [SPARK-16533][CORE] resolve deadlocking in driver...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/14710#discussion_r76984890 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -532,39 +547,53 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp final def killExecutors( executorIds: Seq[String], replace: Boolean, - force: Boolean): Boolean = synchronized { + force: Boolean): Boolean = { logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}") -val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains) -unknownExecutors.foreach { id => - logWarning(s"Executor to kill $id does not exist!") -} -// If an executor is already pending to be removed, do not kill it again (SPARK-9795) -// If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552) -val executorsToKill = knownExecutors - .filter { id => !executorsPendingToRemove.contains(id) } - .filter { id => force || !scheduler.isExecutorBusy(id) } -executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace } - -// If we do not wish to replace the executors we kill, sync the target number of executors -// with the cluster manager to avoid allocating new ones. When computing the new target, -// take into account executors that are pending to be added or removed. -if (!replace) { - doRequestTotalExecutors( -numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size) -} else { - numPendingExecutors += knownExecutors.size +val response = synchronized { + val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains) + unknownExecutors.foreach { id => +logWarning(s"Executor to kill $id does not exist!") + } + + // If an executor is already pending to be removed, do not kill it again (SPARK-9795) + // If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552) + val executorsToKill = knownExecutors +.filter { id => !executorsPendingToRemove.contains(id) } +.filter { id => force || !scheduler.isExecutorBusy(id) } + executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace } + + // If we do not wish to replace the executors we kill, sync the target number of executors + // with the cluster manager to avoid allocating new ones. When computing the new target, + // take into account executors that are pending to be added or removed. + val adjustTotalExecutors = +if (!replace) { + doRequestTotalExecutors( +numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size) +} else { + numPendingExecutors += knownExecutors.size + Future.successful(true) +} + + val killExecutors: Boolean => Future[Boolean] = +if (!executorsToKill.isEmpty) { + _ => doKillExecutors(executorsToKill) +} else { + _ => Future.successful(false) +} + + adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread) --- End diff -- Thanks, I was mostly just trying to make sure I understood correctly. I'm not worried about the rpc call outside of the synchronize block because as you say its best if it is done outside since its safe to call it multi-threaded. It was more to make sure other datastructures weren't modified outside synchronize block. In this case all its accessing is the local executorsToKill so doesn't matter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14710: [SPARK-16533][CORE] resolve deadlocking in driver...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/14710#discussion_r76899692 --- Diff: core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala --- @@ -21,6 +21,8 @@ import java.util.concurrent._ import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture} import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} +import scala.concurrent.{ExecutionContext, Future} --- End diff -- nit: unused import --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14710: [SPARK-16533][CORE] resolve deadlocking in driver...
Github user angolon commented on a diff in the pull request: https://github.com/apache/spark/pull/14710#discussion_r76899230 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -532,39 +547,53 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp final def killExecutors( executorIds: Seq[String], replace: Boolean, - force: Boolean): Boolean = synchronized { + force: Boolean): Boolean = { logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}") -val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains) -unknownExecutors.foreach { id => - logWarning(s"Executor to kill $id does not exist!") -} -// If an executor is already pending to be removed, do not kill it again (SPARK-9795) -// If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552) -val executorsToKill = knownExecutors - .filter { id => !executorsPendingToRemove.contains(id) } - .filter { id => force || !scheduler.isExecutorBusy(id) } -executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace } - -// If we do not wish to replace the executors we kill, sync the target number of executors -// with the cluster manager to avoid allocating new ones. When computing the new target, -// take into account executors that are pending to be added or removed. -if (!replace) { - doRequestTotalExecutors( -numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size) -} else { - numPendingExecutors += knownExecutors.size +val response = synchronized { + val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains) + unknownExecutors.foreach { id => +logWarning(s"Executor to kill $id does not exist!") + } + + // If an executor is already pending to be removed, do not kill it again (SPARK-9795) + // If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552) + val executorsToKill = knownExecutors +.filter { id => !executorsPendingToRemove.contains(id) } +.filter { id => force || !scheduler.isExecutorBusy(id) } + executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace } + + // If we do not wish to replace the executors we kill, sync the target number of executors + // with the cluster manager to avoid allocating new ones. When computing the new target, + // take into account executors that are pending to be added or removed. + val adjustTotalExecutors = +if (!replace) { + doRequestTotalExecutors( +numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size) +} else { + numPendingExecutors += knownExecutors.size + Future.successful(true) +} + + val killExecutors: Boolean => Future[Boolean] = +if (!executorsToKill.isEmpty) { + _ => doKillExecutors(executorsToKill) +} else { + _ => Future.successful(false) +} + + adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread) --- End diff -- When I originally started working on this I thought I wouldn't be able to avoid blocking on that call within the synchronized block. However my (admittedly novice) understanding of the code aligns with what @vanzin said - because all it does is send the kill message there's no need to synchronize over it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14710: [SPARK-16533][CORE] resolve deadlocking in driver...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/14710#discussion_r76893461 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -532,39 +547,53 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp final def killExecutors( executorIds: Seq[String], replace: Boolean, - force: Boolean): Boolean = synchronized { + force: Boolean): Boolean = { logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}") -val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains) -unknownExecutors.foreach { id => - logWarning(s"Executor to kill $id does not exist!") -} -// If an executor is already pending to be removed, do not kill it again (SPARK-9795) -// If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552) -val executorsToKill = knownExecutors - .filter { id => !executorsPendingToRemove.contains(id) } - .filter { id => force || !scheduler.isExecutorBusy(id) } -executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace } - -// If we do not wish to replace the executors we kill, sync the target number of executors -// with the cluster manager to avoid allocating new ones. When computing the new target, -// take into account executors that are pending to be added or removed. -if (!replace) { - doRequestTotalExecutors( -numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size) -} else { - numPendingExecutors += knownExecutors.size +val response = synchronized { + val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains) + unknownExecutors.foreach { id => +logWarning(s"Executor to kill $id does not exist!") + } + + // If an executor is already pending to be removed, do not kill it again (SPARK-9795) + // If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552) + val executorsToKill = knownExecutors +.filter { id => !executorsPendingToRemove.contains(id) } +.filter { id => force || !scheduler.isExecutorBusy(id) } + executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace } + + // If we do not wish to replace the executors we kill, sync the target number of executors + // with the cluster manager to avoid allocating new ones. When computing the new target, + // take into account executors that are pending to be added or removed. + val adjustTotalExecutors = +if (!replace) { + doRequestTotalExecutors( +numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size) +} else { + numPendingExecutors += knownExecutors.size + Future.successful(true) +} + + val killExecutors: Boolean => Future[Boolean] = +if (!executorsToKill.isEmpty) { + _ => doKillExecutors(executorsToKill) +} else { + _ => Future.successful(false) +} + + adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread) --- End diff -- I'm pretty sure you're correct, but at the same time I don't think there's a requirement that `doKillExecutors` needs to be called from a synchronized block. Current implementations just send RPC messages, which is probably better done outside the synchronized block anyway. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14710: [SPARK-16533][CORE] resolve deadlocking in driver...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/14710#discussion_r76890382 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -532,39 +547,53 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp final def killExecutors( executorIds: Seq[String], replace: Boolean, - force: Boolean): Boolean = synchronized { + force: Boolean): Boolean = { logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}") -val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains) -unknownExecutors.foreach { id => - logWarning(s"Executor to kill $id does not exist!") -} -// If an executor is already pending to be removed, do not kill it again (SPARK-9795) -// If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552) -val executorsToKill = knownExecutors - .filter { id => !executorsPendingToRemove.contains(id) } - .filter { id => force || !scheduler.isExecutorBusy(id) } -executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace } - -// If we do not wish to replace the executors we kill, sync the target number of executors -// with the cluster manager to avoid allocating new ones. When computing the new target, -// take into account executors that are pending to be added or removed. -if (!replace) { - doRequestTotalExecutors( -numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size) -} else { - numPendingExecutors += knownExecutors.size +val response = synchronized { + val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains) + unknownExecutors.foreach { id => +logWarning(s"Executor to kill $id does not exist!") + } + + // If an executor is already pending to be removed, do not kill it again (SPARK-9795) + // If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552) + val executorsToKill = knownExecutors +.filter { id => !executorsPendingToRemove.contains(id) } +.filter { id => force || !scheduler.isExecutorBusy(id) } + executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace } + + // If we do not wish to replace the executors we kill, sync the target number of executors + // with the cluster manager to avoid allocating new ones. When computing the new target, + // take into account executors that are pending to be added or removed. + val adjustTotalExecutors = +if (!replace) { + doRequestTotalExecutors( +numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size) +} else { + numPendingExecutors += knownExecutors.size + Future.successful(true) +} + + val killExecutors: Boolean => Future[Boolean] = +if (!executorsToKill.isEmpty) { + _ => doKillExecutors(executorsToKill) +} else { + _ => Future.successful(false) +} + + adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread) --- End diff -- Please correct me if I'm wrong as I'm not that familiar with the future flatmap, but isn't this going to run the doRequestTotalExecutors, then once that comes back, apply the result to killExecutors?Which I think means the killExecutors is called outside of the synchronize block after we do the awaitResults for the doRequestTotalExecutors? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14710: [SPARK-16533][CORE] resolve deadlocking in driver...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/14710#discussion_r76830698 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -478,19 +487,24 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp numExecutors: Int, localityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int] -): Boolean = synchronized { +): Boolean = { if (numExecutors < 0) { throw new IllegalArgumentException( "Attempted to request a negative number of executor(s) " + s"$numExecutors from the cluster manager. Please specify a positive number!") } -this.localityAwareTasks = localityAwareTasks -this.hostToLocalTaskCount = hostToLocalTaskCount +val response = synchronized { + this.localityAwareTasks = localityAwareTasks + this.hostToLocalTaskCount = hostToLocalTaskCount + + numPendingExecutors = --- End diff -- In this particular case, it's not that `ask` would be better, it's just that it would be no worse. With the new RPC code, the only time `askWithRetry` will actually retry, barring bugs in the RPC handlers, is when a timeout occurs, since the RPC layer does not drop messages. So an `ask` with a longer timeout has actually a better chance of succeeding, since with `askWithRetry` the remote end will receive and process the first message before the retries, even if the sender has given up on it. As for the bug you mention, yes it exists, but it also existed before. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14710: [SPARK-16533][CORE] resolve deadlocking in driver...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/14710#discussion_r76788546 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -478,19 +487,24 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp numExecutors: Int, localityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int] -): Boolean = synchronized { +): Boolean = { if (numExecutors < 0) { throw new IllegalArgumentException( "Attempted to request a negative number of executor(s) " + s"$numExecutors from the cluster manager. Please specify a positive number!") } -this.localityAwareTasks = localityAwareTasks -this.hostToLocalTaskCount = hostToLocalTaskCount +val response = synchronized { + this.localityAwareTasks = localityAwareTasks + this.hostToLocalTaskCount = hostToLocalTaskCount + + numPendingExecutors = --- End diff -- I guess I'll have to go look at the new implementation, can you clarify why ask would be better? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14710: [SPARK-16533][CORE] resolve deadlocking in driver...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/14710#discussion_r76700923 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -478,19 +487,24 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp numExecutors: Int, localityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int] -): Boolean = synchronized { +): Boolean = { if (numExecutors < 0) { throw new IllegalArgumentException( "Attempted to request a negative number of executor(s) " + s"$numExecutors from the cluster manager. Please specify a positive number!") } -this.localityAwareTasks = localityAwareTasks -this.hostToLocalTaskCount = hostToLocalTaskCount +val response = synchronized { + this.localityAwareTasks = localityAwareTasks + this.hostToLocalTaskCount = hostToLocalTaskCount + + numPendingExecutors = --- End diff -- This is a longer discussion (and something I'd like to address thoroughly at some point when I find time), but `askWithRetry` is actually pretty useless with the new RPC implementation, and I'd say even harmful. An `ask` with a larger timeout has a much better chance of succeeding, and is cheaper than `askWithRetry`. So I don't think that the change makes the particular situation you point out more common at all. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14710: [SPARK-16533][CORE] resolve deadlocking in driver...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/14710#discussion_r76699077 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -478,19 +487,24 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp numExecutors: Int, localityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int] -): Boolean = synchronized { +): Boolean = { if (numExecutors < 0) { throw new IllegalArgumentException( "Attempted to request a negative number of executor(s) " + s"$numExecutors from the cluster manager. Please specify a positive number!") } -this.localityAwareTasks = localityAwareTasks -this.hostToLocalTaskCount = hostToLocalTaskCount +val response = synchronized { + this.localityAwareTasks = localityAwareTasks + this.hostToLocalTaskCount = hostToLocalTaskCount + + numPendingExecutors = --- End diff -- I'll look at this more tomorrow, but what happens if the ask does fail and we have now incremented numPendingExecutors? that issue was there before, but now if we are doing ask instead of askwithretry it might show up more often. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14710: [SPARK-16533][CORE] resolve deadlocking in driver...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/14710#discussion_r76465162 --- Diff: core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala --- @@ -122,7 +134,9 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd val ci = new AppClientInst(masterRpcEnv.address.toSparkURL) // requests to master should fail immediately -assert(ci.client.requestTotalExecutors(3) === false) +whenReady(ci.client.requestTotalExecutors(3), timeout(0.seconds)) { success => --- End diff -- nit: don't use `0` timeout. It depends on the implementation and it could check the timeout at first before running the command in future. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14710: [SPARK-16533][CORE] resolve deadlocking in driver...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/14710#discussion_r76348848 --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala --- @@ -269,20 +258,22 @@ private[spark] abstract class YarnSchedulerBackend( case AddWebUIFilter(filterName, filterParams, proxyBase) => addWebUIFilter(filterName, filterParams, proxyBase) - case RemoveExecutor(executorId, reason) => + case r @ RemoveExecutor(executorId, reason) => logWarning(reason.toString) -removeExecutor(executorId, reason) +driverEndpoint.ask[Boolean](r).onFailure { + case e => +logError(s"Error requesting driver to remove executor $executorId for reason $reason") +} } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: RequestExecutors => amEndpoint match { case Some(am) => -Future { - context.reply(am.askWithRetry[Boolean](r)) -} onFailure { - case NonFatal(e) => +am.ask[Boolean](r).andThen { --- End diff -- Similarly here, could you replace `askAmExecutor` with `ThreadUtils.sameThreadExecutionContext` and get rid of another thread pool? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14710: [SPARK-16533][CORE] resolve deadlocking in driver...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/14710#discussion_r76347979 --- Diff: core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala --- @@ -220,19 +225,13 @@ private[spark] class StandaloneAppClient( endpointRef: RpcEndpointRef, context: RpcCallContext, msg: T): Unit = { - // Create a thread to ask a message and reply with the result. Allow thread to be + // Ask a message and create a thread to reply with the result. Allow thread to be // interrupted during shutdown, otherwise context must be notified of NonFatal errors. - askAndReplyThreadPool.execute(new Runnable { -override def run(): Unit = { - try { -context.reply(endpointRef.askWithRetry[Boolean](msg)) - } catch { -case ie: InterruptedException => // Cancelled -case NonFatal(t) => - context.sendFailure(t) - } -} - }) + endpointRef.ask[Boolean](msg).andThen { +case Success(b) => context.reply(b) +case Failure(ie: InterruptedException) => // Cancelled +case Failure(NonFatal(t)) => context.sendFailure(t) + }(askAndReplyExecutionContext) --- End diff -- Do you need `askAndReplyExecutionContext` anymore? It seems now all the heavy lifting is being done in the RPC thread pool, and the `andThen` code could just use `ThreadUtils.sameThreadExecutionContext` since it doesn't do much. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14710: [SPARK-16533][CORE]
GitHub user angolon opened a pull request: https://github.com/apache/spark/pull/14710 [SPARK-16533][CORE] ## What changes were proposed in this pull request? This pull request reverts the changes made as a part of #14605, which simply side-steps the deadlock issue. Instead, I propose the following approach: * Use `scheduleWithFixedDelay` when calling `ExecutorAllocationManager.schedule` for scheduling executor requests. The intent of this is that if invocations are delayed beyond the default schedule interval on account of lock contention, then we avoid a situation where calls to `schedule` are made back-to-back, potentially releasing and then immediately reacquiring these locks - further exacerbating contention. * Replace a number of calls to `askWithRetry` with `ask` inside of message handling code in `CoarseGrainedSchedulerBackend` and its ilk. This allows us queue messages with the relevant endpoints, release whatever locks we might be holding, and then block whilst awaiting the response. This change is made at the cost of being able to retry should sending the message fail, as retrying outside of the lock could easily cause race conditions if other conflicting messages have been sent whilst awaiting a response. I believe this to be the lesser of two evils, as in many cases these RPC calls are to process local components, and so failures are more likely to be deterministic, and timeouts are more likely to be caused by lock contention. ## How was this patch tested? Existing tests, and manual tests under yarn-client mode. You can merge this pull request into a Git repository by running: $ git pull https://github.com/angolon/spark SPARK-16533 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14710.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14710 commit cef69bf470199c63b6638933756b1d057dc890d1 Author: Angus GerryDate: 2016-08-19T01:52:58Z Revert "[SPARK-17022][YARN] Handle potential deadlock in driver handling messages" This reverts commit ea0bf91b4a2ca3ef472906e50e31fd6268b6f53e. commit 4970b3b0bcd834bbe5d5473a3065f04a48b12643 Author: Angus Gerry Date: 2016-08-09T04:45:29Z [SPARK-16533][CORE] Use scheduleWithFixedDelay when calling ExecutorAllocatorManager.schedule to ease contention on locks. commit 920274a3ed0b8278d38d721587a24c9441fa5ff3 Author: Angus Gerry Date: 2016-08-04T06:27:56Z [SPARK-16533][CORE] Replace many calls to askWithRetry to plain old ask. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org