[GitHub] spark issue #21486: [SPARK-24387][Core] Heartbeat-timeout executor is added ...

2018-06-10 Thread lirui-apache
Github user lirui-apache commented on the issue:

https://github.com/apache/spark/pull/21486
  
cc @vanzin @andrewor14 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21486: [SPARK-24387][Core] Heartbeat-timeout executor is added ...

2018-06-08 Thread lirui-apache
Github user lirui-apache commented on the issue:

https://github.com/apache/spark/pull/21486
  
Hi guys, any further comments?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21486: [SPARK-24387][Core] Heartbeat-timeout executor is...

2018-06-05 Thread lirui-apache
Github user lirui-apache commented on a diff in the pull request:

https://github.com/apache/spark/pull/21486#discussion_r193044306
  
--- Diff: core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala 
---
@@ -207,6 +210,54 @@ class HeartbeatReceiverSuite
 assert(fakeClusterManager.getExecutorIdsToKill === Set(executorId1, 
executorId2))
   }
 
+  test("expired executor should not be offered again (SPARK-24387)") {
+scheduler = spy(new TaskSchedulerImpl(sc))
--- End diff --

Because I need a functioning scheduler rather than just a mock. And need to 
stub the executorHeartbeatReceived method. So I use a spy here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21486: [SPARK-24387][Core] Heartbeat-timeout executor is...

2018-06-03 Thread lirui-apache
Github user lirui-apache commented on a diff in the pull request:

https://github.com/apache/spark/pull/21486#discussion_r192614888
  
--- Diff: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ---
@@ -197,14 +197,14 @@ private[spark] class HeartbeatReceiver(sc: 
SparkContext, clock: Clock)
   if (now - lastSeenMs > executorTimeoutMs) {
 logWarning(s"Removing executor $executorId with no recent 
heartbeats: " +
   s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
-scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " 
+
-  s"timed out after ${now - lastSeenMs} ms"))
   // Asynchronously kill the executor to avoid blocking the 
current thread
 killExecutorThread.submit(new Runnable {
   override def run(): Unit = Utils.tryLogNonFatalError {
 // Note: we want to get an executor back after expiring this 
one,
 // so do not simply call `sc.killExecutor` here (SPARK-8119)
 sc.killAndReplaceExecutor(executorId)
--- End diff --

Yes:
```
  private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
schedulerBackend match {
  case b: ExecutorAllocationClient =>
b.killExecutors(Seq(executorId), adjustTargetNumExecutors = false, 
countFailures = true,
  force = true).nonEmpty
  case _ =>
logWarning("Killing executors is not supported by current 
scheduler.")
false
}
  }
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21486: [SPARK-24387][Core] Heartbeat-timeout executor is...

2018-06-03 Thread lirui-apache
Github user lirui-apache commented on a diff in the pull request:

https://github.com/apache/spark/pull/21486#discussion_r192614874
  
--- Diff: core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala 
---
@@ -207,6 +210,55 @@ class HeartbeatReceiverSuite
 assert(fakeClusterManager.getExecutorIdsToKill === Set(executorId1, 
executorId2))
   }
 
+  test("expired host should not be offered again") {
+scheduler = spy(new TaskSchedulerImpl(sc))
+scheduler.setDAGScheduler(sc.dagScheduler)
+when(sc.taskScheduler).thenReturn(scheduler)
+doReturn(true).when(scheduler).executorHeartbeatReceived(any(), any(), 
any())
+
+// Set up a fake backend and cluster manager to simulate killing 
executors
+val rpcEnv = sc.env.rpcEnv
+val fakeClusterManager = new FakeClusterManager(rpcEnv)
+val fakeClusterManagerRef = rpcEnv.setupEndpoint("fake-cm", 
fakeClusterManager)
+val fakeSchedulerBackend = new FakeSchedulerBackend(scheduler, rpcEnv, 
fakeClusterManagerRef)
+when(sc.schedulerBackend).thenReturn(fakeSchedulerBackend)
+
+fakeSchedulerBackend.start()
+val dummyExecutorEndpoint1 = new FakeExecutorEndpoint(rpcEnv)
+val dummyExecutorEndpointRef1 = 
rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1)
+fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
+  RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 
2, Map.empty))
+heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
+addExecutorAndVerify(executorId1)
+triggerHeartbeat(executorId1, executorShouldReregister = false)
+
+scheduler.initialize(fakeSchedulerBackend)
+sc.requestTotalExecutors(0, 0, Map.empty)
--- End diff --

Just thought we don't need a replacement executor for this test case. Will 
update total num to 1 to be closer to real use case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21486: [SPARK-24387][Core] Heartbeat-timeout executor is...

2018-06-03 Thread lirui-apache
Github user lirui-apache commented on a diff in the pull request:

https://github.com/apache/spark/pull/21486#discussion_r192614878
  
--- Diff: core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala 
---
@@ -207,6 +210,55 @@ class HeartbeatReceiverSuite
 assert(fakeClusterManager.getExecutorIdsToKill === Set(executorId1, 
executorId2))
   }
 
+  test("expired host should not be offered again") {
--- End diff --

Should be `executor`. Thanks for catching, will update


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21486: [SPARK-24387][Core] Heartbeat-timeout executor is...

2018-06-02 Thread lirui-apache
GitHub user lirui-apache opened a pull request:

https://github.com/apache/spark/pull/21486

[SPARK-24387][Core] Heartbeat-timeout executor is added back and used again

## What changes were proposed in this pull request?

When an executor's heartbeat is lost, we call scheduler.executorLost before 
we tell the backend to kill the executor. TaskSchedulerImpl asks the backend to 
revive offers in executorLost. If this is the only executor, it's possible the 
backend will offer it again to TaskSchedulerImpl, and the retried task is 
scheduled to this executor.

This patch proposes to call scheduler.executorLost after the executor is 
killed. At this point, the executor has been marked as pending-to-remove and 
won't be offered again.

## How was this patch tested?

Added a new test case in HeartbeatReceiverSuite. W/o the fix this test case 
fails.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lirui-apache/spark SPARK-24387

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21486.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21486


commit 189f2696dab47a23b3f2a48a313a72dc4ec77c80
Author: Rui Li 
Date:   2018-06-02T08:25:10Z

Call executorLost after the executor is killed




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org