[GitHub] spark pull request #14710: [SPARK-16533][CORE] resolve deadlocking in driver...

2016-09-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/14710


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14710: [SPARK-16533][CORE] resolve deadlocking in driver...

2016-08-31 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/14710#discussion_r76984890
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -532,39 +547,53 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   final def killExecutors(
   executorIds: Seq[String],
   replace: Boolean,
-  force: Boolean): Boolean = synchronized {
+  force: Boolean): Boolean = {
 logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", 
")}")
-val (knownExecutors, unknownExecutors) = 
executorIds.partition(executorDataMap.contains)
-unknownExecutors.foreach { id =>
-  logWarning(s"Executor to kill $id does not exist!")
-}
 
-// If an executor is already pending to be removed, do not kill it 
again (SPARK-9795)
-// If this executor is busy, do not kill it unless we are told to 
force kill it (SPARK-9552)
-val executorsToKill = knownExecutors
-  .filter { id => !executorsPendingToRemove.contains(id) }
-  .filter { id => force || !scheduler.isExecutorBusy(id) }
-executorsToKill.foreach { id => executorsPendingToRemove(id) = 
!replace }
-
-// If we do not wish to replace the executors we kill, sync the target 
number of executors
-// with the cluster manager to avoid allocating new ones. When 
computing the new target,
-// take into account executors that are pending to be added or removed.
-if (!replace) {
-  doRequestTotalExecutors(
-numExistingExecutors + numPendingExecutors - 
executorsPendingToRemove.size)
-} else {
-  numPendingExecutors += knownExecutors.size
+val response = synchronized {
+  val (knownExecutors, unknownExecutors) = 
executorIds.partition(executorDataMap.contains)
+  unknownExecutors.foreach { id =>
+logWarning(s"Executor to kill $id does not exist!")
+  }
+
+  // If an executor is already pending to be removed, do not kill it 
again (SPARK-9795)
+  // If this executor is busy, do not kill it unless we are told to 
force kill it (SPARK-9552)
+  val executorsToKill = knownExecutors
+.filter { id => !executorsPendingToRemove.contains(id) }
+.filter { id => force || !scheduler.isExecutorBusy(id) }
+  executorsToKill.foreach { id => executorsPendingToRemove(id) = 
!replace }
+
+  // If we do not wish to replace the executors we kill, sync the 
target number of executors
+  // with the cluster manager to avoid allocating new ones. When 
computing the new target,
+  // take into account executors that are pending to be added or 
removed.
+  val adjustTotalExecutors =
+if (!replace) {
+  doRequestTotalExecutors(
+numExistingExecutors + numPendingExecutors - 
executorsPendingToRemove.size)
+} else {
+  numPendingExecutors += knownExecutors.size
+  Future.successful(true)
+}
+
+  val killExecutors: Boolean => Future[Boolean] =
+if (!executorsToKill.isEmpty) {
+  _ => doKillExecutors(executorsToKill)
+} else {
+  _ => Future.successful(false)
+}
+
+  adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
--- End diff --

Thanks, I was mostly just trying to make sure I understood correctly. I'm 
not worried about the rpc call outside of the synchronize block because as you 
say its best if it is done outside since its safe to call it multi-threaded. It 
was more to make sure other datastructures weren't modified outside synchronize 
block.  In this case all its accessing is the local executorsToKill so doesn't 
matter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14710: [SPARK-16533][CORE] resolve deadlocking in driver...

2016-08-30 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14710#discussion_r76899692
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala ---
@@ -21,6 +21,8 @@ import java.util.concurrent._
 import java.util.concurrent.{Future => JFuture, ScheduledFuture => 
JScheduledFuture}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
 
+import scala.concurrent.{ExecutionContext, Future}
--- End diff --

nit: unused import


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14710: [SPARK-16533][CORE] resolve deadlocking in driver...

2016-08-30 Thread angolon
Github user angolon commented on a diff in the pull request:

https://github.com/apache/spark/pull/14710#discussion_r76899230
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -532,39 +547,53 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   final def killExecutors(
   executorIds: Seq[String],
   replace: Boolean,
-  force: Boolean): Boolean = synchronized {
+  force: Boolean): Boolean = {
 logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", 
")}")
-val (knownExecutors, unknownExecutors) = 
executorIds.partition(executorDataMap.contains)
-unknownExecutors.foreach { id =>
-  logWarning(s"Executor to kill $id does not exist!")
-}
 
-// If an executor is already pending to be removed, do not kill it 
again (SPARK-9795)
-// If this executor is busy, do not kill it unless we are told to 
force kill it (SPARK-9552)
-val executorsToKill = knownExecutors
-  .filter { id => !executorsPendingToRemove.contains(id) }
-  .filter { id => force || !scheduler.isExecutorBusy(id) }
-executorsToKill.foreach { id => executorsPendingToRemove(id) = 
!replace }
-
-// If we do not wish to replace the executors we kill, sync the target 
number of executors
-// with the cluster manager to avoid allocating new ones. When 
computing the new target,
-// take into account executors that are pending to be added or removed.
-if (!replace) {
-  doRequestTotalExecutors(
-numExistingExecutors + numPendingExecutors - 
executorsPendingToRemove.size)
-} else {
-  numPendingExecutors += knownExecutors.size
+val response = synchronized {
+  val (knownExecutors, unknownExecutors) = 
executorIds.partition(executorDataMap.contains)
+  unknownExecutors.foreach { id =>
+logWarning(s"Executor to kill $id does not exist!")
+  }
+
+  // If an executor is already pending to be removed, do not kill it 
again (SPARK-9795)
+  // If this executor is busy, do not kill it unless we are told to 
force kill it (SPARK-9552)
+  val executorsToKill = knownExecutors
+.filter { id => !executorsPendingToRemove.contains(id) }
+.filter { id => force || !scheduler.isExecutorBusy(id) }
+  executorsToKill.foreach { id => executorsPendingToRemove(id) = 
!replace }
+
+  // If we do not wish to replace the executors we kill, sync the 
target number of executors
+  // with the cluster manager to avoid allocating new ones. When 
computing the new target,
+  // take into account executors that are pending to be added or 
removed.
+  val adjustTotalExecutors =
+if (!replace) {
+  doRequestTotalExecutors(
+numExistingExecutors + numPendingExecutors - 
executorsPendingToRemove.size)
+} else {
+  numPendingExecutors += knownExecutors.size
+  Future.successful(true)
+}
+
+  val killExecutors: Boolean => Future[Boolean] =
+if (!executorsToKill.isEmpty) {
+  _ => doKillExecutors(executorsToKill)
+} else {
+  _ => Future.successful(false)
+}
+
+  adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
--- End diff --

When I originally started working on this I thought I wouldn't be able to 
avoid blocking on that call within the synchronized block. However my 
(admittedly novice) understanding of the code aligns with what @vanzin said - 
because all it does is send the kill message there's no need to synchronize 
over it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14710: [SPARK-16533][CORE] resolve deadlocking in driver...

2016-08-30 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14710#discussion_r76893461
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -532,39 +547,53 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   final def killExecutors(
   executorIds: Seq[String],
   replace: Boolean,
-  force: Boolean): Boolean = synchronized {
+  force: Boolean): Boolean = {
 logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", 
")}")
-val (knownExecutors, unknownExecutors) = 
executorIds.partition(executorDataMap.contains)
-unknownExecutors.foreach { id =>
-  logWarning(s"Executor to kill $id does not exist!")
-}
 
-// If an executor is already pending to be removed, do not kill it 
again (SPARK-9795)
-// If this executor is busy, do not kill it unless we are told to 
force kill it (SPARK-9552)
-val executorsToKill = knownExecutors
-  .filter { id => !executorsPendingToRemove.contains(id) }
-  .filter { id => force || !scheduler.isExecutorBusy(id) }
-executorsToKill.foreach { id => executorsPendingToRemove(id) = 
!replace }
-
-// If we do not wish to replace the executors we kill, sync the target 
number of executors
-// with the cluster manager to avoid allocating new ones. When 
computing the new target,
-// take into account executors that are pending to be added or removed.
-if (!replace) {
-  doRequestTotalExecutors(
-numExistingExecutors + numPendingExecutors - 
executorsPendingToRemove.size)
-} else {
-  numPendingExecutors += knownExecutors.size
+val response = synchronized {
+  val (knownExecutors, unknownExecutors) = 
executorIds.partition(executorDataMap.contains)
+  unknownExecutors.foreach { id =>
+logWarning(s"Executor to kill $id does not exist!")
+  }
+
+  // If an executor is already pending to be removed, do not kill it 
again (SPARK-9795)
+  // If this executor is busy, do not kill it unless we are told to 
force kill it (SPARK-9552)
+  val executorsToKill = knownExecutors
+.filter { id => !executorsPendingToRemove.contains(id) }
+.filter { id => force || !scheduler.isExecutorBusy(id) }
+  executorsToKill.foreach { id => executorsPendingToRemove(id) = 
!replace }
+
+  // If we do not wish to replace the executors we kill, sync the 
target number of executors
+  // with the cluster manager to avoid allocating new ones. When 
computing the new target,
+  // take into account executors that are pending to be added or 
removed.
+  val adjustTotalExecutors =
+if (!replace) {
+  doRequestTotalExecutors(
+numExistingExecutors + numPendingExecutors - 
executorsPendingToRemove.size)
+} else {
+  numPendingExecutors += knownExecutors.size
+  Future.successful(true)
+}
+
+  val killExecutors: Boolean => Future[Boolean] =
+if (!executorsToKill.isEmpty) {
+  _ => doKillExecutors(executorsToKill)
+} else {
+  _ => Future.successful(false)
+}
+
+  adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
--- End diff --

I'm pretty sure you're correct, but at the same time I don't think there's 
a requirement that `doKillExecutors` needs to be called from a synchronized 
block. Current implementations just send RPC messages, which is probably better 
done outside the synchronized block anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14710: [SPARK-16533][CORE] resolve deadlocking in driver...

2016-08-30 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/14710#discussion_r76890382
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -532,39 +547,53 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   final def killExecutors(
   executorIds: Seq[String],
   replace: Boolean,
-  force: Boolean): Boolean = synchronized {
+  force: Boolean): Boolean = {
 logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", 
")}")
-val (knownExecutors, unknownExecutors) = 
executorIds.partition(executorDataMap.contains)
-unknownExecutors.foreach { id =>
-  logWarning(s"Executor to kill $id does not exist!")
-}
 
-// If an executor is already pending to be removed, do not kill it 
again (SPARK-9795)
-// If this executor is busy, do not kill it unless we are told to 
force kill it (SPARK-9552)
-val executorsToKill = knownExecutors
-  .filter { id => !executorsPendingToRemove.contains(id) }
-  .filter { id => force || !scheduler.isExecutorBusy(id) }
-executorsToKill.foreach { id => executorsPendingToRemove(id) = 
!replace }
-
-// If we do not wish to replace the executors we kill, sync the target 
number of executors
-// with the cluster manager to avoid allocating new ones. When 
computing the new target,
-// take into account executors that are pending to be added or removed.
-if (!replace) {
-  doRequestTotalExecutors(
-numExistingExecutors + numPendingExecutors - 
executorsPendingToRemove.size)
-} else {
-  numPendingExecutors += knownExecutors.size
+val response = synchronized {
+  val (knownExecutors, unknownExecutors) = 
executorIds.partition(executorDataMap.contains)
+  unknownExecutors.foreach { id =>
+logWarning(s"Executor to kill $id does not exist!")
+  }
+
+  // If an executor is already pending to be removed, do not kill it 
again (SPARK-9795)
+  // If this executor is busy, do not kill it unless we are told to 
force kill it (SPARK-9552)
+  val executorsToKill = knownExecutors
+.filter { id => !executorsPendingToRemove.contains(id) }
+.filter { id => force || !scheduler.isExecutorBusy(id) }
+  executorsToKill.foreach { id => executorsPendingToRemove(id) = 
!replace }
+
+  // If we do not wish to replace the executors we kill, sync the 
target number of executors
+  // with the cluster manager to avoid allocating new ones. When 
computing the new target,
+  // take into account executors that are pending to be added or 
removed.
+  val adjustTotalExecutors =
+if (!replace) {
+  doRequestTotalExecutors(
+numExistingExecutors + numPendingExecutors - 
executorsPendingToRemove.size)
+} else {
+  numPendingExecutors += knownExecutors.size
+  Future.successful(true)
+}
+
+  val killExecutors: Boolean => Future[Boolean] =
+if (!executorsToKill.isEmpty) {
+  _ => doKillExecutors(executorsToKill)
+} else {
+  _ => Future.successful(false)
+}
+
+  adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
--- End diff --

Please correct me if I'm wrong as I'm not that familiar with the future 
flatmap, but isn't this going to run the doRequestTotalExecutors, then once 
that comes back, apply the result to killExecutors?Which I think means the 
killExecutors is called outside of the synchronize block after we do the 
awaitResults for the doRequestTotalExecutors?  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14710: [SPARK-16533][CORE] resolve deadlocking in driver...

2016-08-30 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14710#discussion_r76830698
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -478,19 +487,24 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   numExecutors: Int,
   localityAwareTasks: Int,
   hostToLocalTaskCount: Map[String, Int]
-): Boolean = synchronized {
+): Boolean = {
 if (numExecutors < 0) {
   throw new IllegalArgumentException(
 "Attempted to request a negative number of executor(s) " +
   s"$numExecutors from the cluster manager. Please specify a 
positive number!")
 }
 
-this.localityAwareTasks = localityAwareTasks
-this.hostToLocalTaskCount = hostToLocalTaskCount
+val response = synchronized {
+  this.localityAwareTasks = localityAwareTasks
+  this.hostToLocalTaskCount = hostToLocalTaskCount
+
+  numPendingExecutors =
--- End diff --

In this particular case, it's not that `ask` would be better, it's just 
that it would be no worse. With the new RPC code, the only time `askWithRetry` 
will actually retry, barring bugs in the RPC handlers, is when a timeout 
occurs, since the RPC layer does not drop messages. So an `ask` with a longer 
timeout has actually a better chance of succeeding, since with `askWithRetry` 
the remote end will receive and process the first message before the retries, 
even if the sender has given up on it.

As for the bug you mention, yes it exists, but it also existed before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14710: [SPARK-16533][CORE] resolve deadlocking in driver...

2016-08-30 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/14710#discussion_r76788546
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -478,19 +487,24 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   numExecutors: Int,
   localityAwareTasks: Int,
   hostToLocalTaskCount: Map[String, Int]
-): Boolean = synchronized {
+): Boolean = {
 if (numExecutors < 0) {
   throw new IllegalArgumentException(
 "Attempted to request a negative number of executor(s) " +
   s"$numExecutors from the cluster manager. Please specify a 
positive number!")
 }
 
-this.localityAwareTasks = localityAwareTasks
-this.hostToLocalTaskCount = hostToLocalTaskCount
+val response = synchronized {
+  this.localityAwareTasks = localityAwareTasks
+  this.hostToLocalTaskCount = hostToLocalTaskCount
+
+  numPendingExecutors =
--- End diff --

I guess I'll have to go look at the new implementation, can you clarify why 
ask would be better?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14710: [SPARK-16533][CORE] resolve deadlocking in driver...

2016-08-29 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14710#discussion_r76700923
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -478,19 +487,24 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   numExecutors: Int,
   localityAwareTasks: Int,
   hostToLocalTaskCount: Map[String, Int]
-): Boolean = synchronized {
+): Boolean = {
 if (numExecutors < 0) {
   throw new IllegalArgumentException(
 "Attempted to request a negative number of executor(s) " +
   s"$numExecutors from the cluster manager. Please specify a 
positive number!")
 }
 
-this.localityAwareTasks = localityAwareTasks
-this.hostToLocalTaskCount = hostToLocalTaskCount
+val response = synchronized {
+  this.localityAwareTasks = localityAwareTasks
+  this.hostToLocalTaskCount = hostToLocalTaskCount
+
+  numPendingExecutors =
--- End diff --

This is a longer discussion (and something I'd like to address thoroughly 
at some point when I find time), but `askWithRetry` is actually pretty useless 
with the new RPC implementation, and I'd say even harmful. An `ask` with a 
larger timeout has a much better chance of succeeding, and is cheaper than 
`askWithRetry`.

So I don't think that the change makes the particular situation you point 
out more common at all.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14710: [SPARK-16533][CORE] resolve deadlocking in driver...

2016-08-29 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/14710#discussion_r76699077
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -478,19 +487,24 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   numExecutors: Int,
   localityAwareTasks: Int,
   hostToLocalTaskCount: Map[String, Int]
-): Boolean = synchronized {
+): Boolean = {
 if (numExecutors < 0) {
   throw new IllegalArgumentException(
 "Attempted to request a negative number of executor(s) " +
   s"$numExecutors from the cluster manager. Please specify a 
positive number!")
 }
 
-this.localityAwareTasks = localityAwareTasks
-this.hostToLocalTaskCount = hostToLocalTaskCount
+val response = synchronized {
+  this.localityAwareTasks = localityAwareTasks
+  this.hostToLocalTaskCount = hostToLocalTaskCount
+
+  numPendingExecutors =
--- End diff --

I'll look at this more tomorrow, but what happens if the ask does fail and 
we have now incremented numPendingExecutors?  that issue was there before, but 
now if we are doing ask instead of askwithretry it might show up more often.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14710: [SPARK-16533][CORE] resolve deadlocking in driver...

2016-08-26 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/14710#discussion_r76465162
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala ---
@@ -122,7 +134,9 @@ class AppClientSuite extends SparkFunSuite with 
LocalSparkContext with BeforeAnd
 val ci = new AppClientInst(masterRpcEnv.address.toSparkURL)
 
 // requests to master should fail immediately
-assert(ci.client.requestTotalExecutors(3) === false)
+whenReady(ci.client.requestTotalExecutors(3), timeout(0.seconds)) { 
success =>
--- End diff --

nit: don't use `0` timeout. It depends on the implementation and it could 
check the timeout at first before running the command in future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14710: [SPARK-16533][CORE] resolve deadlocking in driver...

2016-08-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14710#discussion_r76348848
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 ---
@@ -269,20 +258,22 @@ private[spark] abstract class YarnSchedulerBackend(
   case AddWebUIFilter(filterName, filterParams, proxyBase) =>
 addWebUIFilter(filterName, filterParams, proxyBase)
 
-  case RemoveExecutor(executorId, reason) =>
+  case r @ RemoveExecutor(executorId, reason) =>
 logWarning(reason.toString)
-removeExecutor(executorId, reason)
+driverEndpoint.ask[Boolean](r).onFailure {
+  case e =>
+logError(s"Error requesting driver to remove executor 
$executorId for reason $reason")
+}
 }
 
 
 override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
   case r: RequestExecutors =>
 amEndpoint match {
   case Some(am) =>
-Future {
-  context.reply(am.askWithRetry[Boolean](r))
-} onFailure {
-  case NonFatal(e) =>
+am.ask[Boolean](r).andThen {
--- End diff --

Similarly here, could you replace `askAmExecutor` with 
`ThreadUtils.sameThreadExecutionContext` and get rid of another thread pool?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14710: [SPARK-16533][CORE] resolve deadlocking in driver...

2016-08-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14710#discussion_r76347979
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala ---
@@ -220,19 +225,13 @@ private[spark] class StandaloneAppClient(
 endpointRef: RpcEndpointRef,
 context: RpcCallContext,
 msg: T): Unit = {
-  // Create a thread to ask a message and reply with the result.  
Allow thread to be
+  // Ask a message and create a thread to reply with the result.  
Allow thread to be
   // interrupted during shutdown, otherwise context must be notified 
of NonFatal errors.
-  askAndReplyThreadPool.execute(new Runnable {
-override def run(): Unit = {
-  try {
-context.reply(endpointRef.askWithRetry[Boolean](msg))
-  } catch {
-case ie: InterruptedException => // Cancelled
-case NonFatal(t) =>
-  context.sendFailure(t)
-  }
-}
-  })
+  endpointRef.ask[Boolean](msg).andThen {
+case Success(b) => context.reply(b)
+case Failure(ie: InterruptedException) => // Cancelled
+case Failure(NonFatal(t)) => context.sendFailure(t)
+  }(askAndReplyExecutionContext)
--- End diff --

Do you need `askAndReplyExecutionContext` anymore? It seems now all the 
heavy lifting is being done in the RPC thread pool, and the `andThen` code 
could just use `ThreadUtils.sameThreadExecutionContext` since it doesn't do 
much.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14710: [SPARK-16533][CORE]

2016-08-18 Thread angolon
GitHub user angolon opened a pull request:

https://github.com/apache/spark/pull/14710

[SPARK-16533][CORE]

## What changes were proposed in this pull request?
This pull request reverts the changes made as a part of #14605, which 
simply side-steps the deadlock issue. Instead, I propose the following approach:
* Use `scheduleWithFixedDelay` when calling 
`ExecutorAllocationManager.schedule` for scheduling executor requests. The 
intent of this is that if invocations are delayed beyond the default schedule 
interval on account of lock contention, then we avoid a situation where calls 
to `schedule` are made back-to-back, potentially releasing and then immediately 
reacquiring these locks - further exacerbating contention.
* Replace a number of calls to `askWithRetry` with `ask` inside of message 
handling code in `CoarseGrainedSchedulerBackend` and its ilk. This allows us 
queue messages with the relevant endpoints, release whatever locks we might be 
holding, and then block whilst awaiting the response. This change is made at 
the cost of being able to retry should sending the message fail, as retrying 
outside of the lock could easily cause race conditions if other conflicting 
messages have been sent whilst awaiting a response. I believe this to be the 
lesser of two evils, as in many cases these RPC calls are to process local 
components, and so failures are more likely to be deterministic, and timeouts 
are more likely to be caused by lock contention.

## How was this patch tested?
Existing tests, and manual tests under yarn-client mode.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/angolon/spark SPARK-16533

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/14710.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #14710


commit cef69bf470199c63b6638933756b1d057dc890d1
Author: Angus Gerry 
Date:   2016-08-19T01:52:58Z

Revert "[SPARK-17022][YARN] Handle potential deadlock in driver handling 
messages"

This reverts commit ea0bf91b4a2ca3ef472906e50e31fd6268b6f53e.

commit 4970b3b0bcd834bbe5d5473a3065f04a48b12643
Author: Angus Gerry 
Date:   2016-08-09T04:45:29Z

[SPARK-16533][CORE] Use scheduleWithFixedDelay when calling 
ExecutorAllocatorManager.schedule to ease contention on locks.

commit 920274a3ed0b8278d38d721587a24c9441fa5ff3
Author: Angus Gerry 
Date:   2016-08-04T06:27:56Z

[SPARK-16533][CORE] Replace many calls to askWithRetry to plain old ask.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org