[jira] [Updated] (SPARK-17582) Error Executor info in SparkUI ExecutorsTab
[ https://issues.apache.org/jira/browse/SPARK-17582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xukun updated SPARK-17582: -- Description: When executor is losted, SparkUI ExecutorsTab still show its executor info. class HeartbeatReceiver.scala {code:borderStyle=solid} override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { // Messages sent and received locally case ExecutorRegistered(executorId) => executorLastSeen(executorId) = clock.getTimeMillis() context.reply(true) case ExecutorRemoved(executorId) => executorLastSeen.remove(executorId) context.reply(true) case TaskSchedulerIsSet => scheduler = sc.taskScheduler context.reply(true) case ExpireDeadHosts => expireDeadHosts() context.reply(true) // Messages received from executors case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) => if (scheduler != null) { if (executorLastSeen.contains(executorId)) { executorLastSeen(executorId) = clock.getTimeMillis() eventLoopThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { val unknownExecutor = !scheduler.executorHeartbeatReceived( executorId, taskMetrics, blockManagerId) val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor) context.reply(response) } }) } else { // This may happen if we get an executor's in-flight heartbeat immediately // after we just removed it. It's not really an error condition so we should // not log warning here. Otherwise there may be a lot of noise especially if // we explicitly remove executors (SPARK-4134). logDebug(s"Received heartbeat from unknown executor $executorId") context.reply(HeartbeatResponse(reregisterBlockManager = false)) } } else { // Because Executor will sleep several seconds before sending the first "Heartbeat", this // case rarely happens. However, if it really happens, log it and ask the executor to // register itself again. logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet") context.reply(HeartbeatResponse(reregisterBlockManager = true)) } } {code} If the process like listed: 1. process HeartBeat and eventLoopThread not return result 2.Executor is lost variables unknownExecutor will be true,it will lead to reregisterBlockManager. The result is that dead executors are still shown in the SparkUI . was: When executor is losted, SparkUI ExecutorsTab still show its executor info. class HeartbeatReceiver.scala {code:borderStyle=solid} override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { // Messages sent and received locally case ExecutorRegistered(executorId) => executorLastSeen(executorId) = clock.getTimeMillis() context.reply(true) case ExecutorRemoved(executorId) => executorLastSeen.remove(executorId) context.reply(true) case TaskSchedulerIsSet => scheduler = sc.taskScheduler context.reply(true) case ExpireDeadHosts => expireDeadHosts() context.reply(true) // Messages received from executors case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) => if (scheduler != null) { if (executorLastSeen.contains(executorId)) { executorLastSeen(executorId) = clock.getTimeMillis() eventLoopThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { val unknownExecutor = !scheduler.executorHeartbeatReceived( executorId, taskMetrics, blockManagerId) val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor) context.reply(response) } }) } else { // This may happen if we get an executor's in-flight heartbeat immediately // after we just removed it. It's not really an error condition so we should // not log warning here. Otherwise there may be a lot of noise especially if // we explicitly remove executors (SPARK-4134). logDebug(s"Received heartbeat from unknown executor $executorId") context.reply(HeartbeatResponse(reregisterBlockManager = false)) } } else { // Because Executor will sleep several seconds before sending the first "Heartbeat", this // case rarely happens. However, if it really happens, log it and ask the executor to // register itself again. logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet") context.reply(HeartbeatResponse(reregisterBlockManager = true)) } } {code} HeartbeatReceiver receive
[jira] [Updated] (SPARK-17582) Error Executor info in SparkUI ExecutorsTab
[ https://issues.apache.org/jira/browse/SPARK-17582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xukun updated SPARK-17582: -- Description: When executor is losted, SparkUI ExecutorsTab still show its executor info. class HeartbeatReceiver.scala {code:borderStyle=solid} override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { // Messages sent and received locally case ExecutorRegistered(executorId) => executorLastSeen(executorId) = clock.getTimeMillis() context.reply(true) case ExecutorRemoved(executorId) => executorLastSeen.remove(executorId) context.reply(true) case TaskSchedulerIsSet => scheduler = sc.taskScheduler context.reply(true) case ExpireDeadHosts => expireDeadHosts() context.reply(true) // Messages received from executors case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) => if (scheduler != null) { if (executorLastSeen.contains(executorId)) { executorLastSeen(executorId) = clock.getTimeMillis() eventLoopThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { val unknownExecutor = !scheduler.executorHeartbeatReceived( executorId, taskMetrics, blockManagerId) val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor) context.reply(response) } }) } else { // This may happen if we get an executor's in-flight heartbeat immediately // after we just removed it. It's not really an error condition so we should // not log warning here. Otherwise there may be a lot of noise especially if // we explicitly remove executors (SPARK-4134). logDebug(s"Received heartbeat from unknown executor $executorId") context.reply(HeartbeatResponse(reregisterBlockManager = false)) } } else { // Because Executor will sleep several seconds before sending the first "Heartbeat", this // case rarely happens. However, if it really happens, log it and ask the executor to // register itself again. logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet") context.reply(HeartbeatResponse(reregisterBlockManager = true)) } } {code} HeartbeatReceiver receive message: Heartbeat and ExecutorRemoved; If the process like listed: 1. process HeartBeat and eventLoopThread not return result 2.process ExecutorRemoved variables unknownExecutor will be true,it will lead to reregisterBlockManager. The result is that executor is lost and blockManager is still alive. was: When executor is losted, SparkUI ExecutorsTab still show its executor info. class HeartbeatReceiver.scala ``` override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { // Messages sent and received locally case ExecutorRegistered(executorId) => executorLastSeen(executorId) = clock.getTimeMillis() context.reply(true) case ExecutorRemoved(executorId) => executorLastSeen.remove(executorId) context.reply(true) case TaskSchedulerIsSet => scheduler = sc.taskScheduler context.reply(true) case ExpireDeadHosts => expireDeadHosts() context.reply(true) // Messages received from executors case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) => if (scheduler != null) { if (executorLastSeen.contains(executorId)) { executorLastSeen(executorId) = clock.getTimeMillis() eventLoopThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { val unknownExecutor = !scheduler.executorHeartbeatReceived( executorId, taskMetrics, blockManagerId) val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor) context.reply(response) } }) } else { // This may happen if we get an executor's in-flight heartbeat immediately // after we just removed it. It's not really an error condition so we should // not log warning here. Otherwise there may be a lot of noise especially if // we explicitly remove executors (SPARK-4134). logDebug(s"Received heartbeat from unknown executor $executorId") context.reply(HeartbeatResponse(reregisterBlockManager = false)) } } else { // Because Executor will sleep several seconds before sending the first "Heartbeat", this // case rarely happens. However, if it really happens, log it and ask the executor to // register itself again. logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet") context.reply(HeartbeatResponse(reregisterBlockManager =
[jira] [Updated] (SPARK-17582) Error Executor info in SparkUI ExecutorsTab
[ https://issues.apache.org/jira/browse/SPARK-17582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xukun updated SPARK-17582: -- Description: When executor is losted, SparkUI ExecutorsTab still show its executor info. class HeartbeatReceiver.scala ``` override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { // Messages sent and received locally case ExecutorRegistered(executorId) => executorLastSeen(executorId) = clock.getTimeMillis() context.reply(true) case ExecutorRemoved(executorId) => executorLastSeen.remove(executorId) context.reply(true) case TaskSchedulerIsSet => scheduler = sc.taskScheduler context.reply(true) case ExpireDeadHosts => expireDeadHosts() context.reply(true) // Messages received from executors case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) => if (scheduler != null) { if (executorLastSeen.contains(executorId)) { executorLastSeen(executorId) = clock.getTimeMillis() eventLoopThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { val unknownExecutor = !scheduler.executorHeartbeatReceived( executorId, taskMetrics, blockManagerId) val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor) context.reply(response) } }) } else { // This may happen if we get an executor's in-flight heartbeat immediately // after we just removed it. It's not really an error condition so we should // not log warning here. Otherwise there may be a lot of noise especially if // we explicitly remove executors (SPARK-4134). logDebug(s"Received heartbeat from unknown executor $executorId") context.reply(HeartbeatResponse(reregisterBlockManager = false)) } } else { // Because Executor will sleep several seconds before sending the first "Heartbeat", this // case rarely happens. However, if it really happens, log it and ask the executor to // register itself again. logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet") context.reply(HeartbeatResponse(reregisterBlockManager = true)) } } ``` HeartbeatReceiver receive message: Heartbeat and ExecutorRemoved; If the process like listed: 1. process HeartBeat and eventLoopThread not return result 2.process ExecutorRemoved variables unknownExecutor will be true,it will lead to reregisterBlockManager. The result is that executor is lost and blockManager is still alive. was: When executor is losted, SparkUI ExecutorsTab still show its executor info. class HeartbeatReceiver.scala ``` override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { // Messages sent and received locally case ExecutorRegistered(executorId) => executorLastSeen(executorId) = clock.getTimeMillis() context.reply(true) case ExecutorRemoved(executorId) => executorLastSeen.remove(executorId) context.reply(true) case TaskSchedulerIsSet => scheduler = sc.taskScheduler context.reply(true) case ExpireDeadHosts => expireDeadHosts() context.reply(true) // Messages received from executors case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) => if (scheduler != null) { if (executorLastSeen.contains(executorId)) { executorLastSeen(executorId) = clock.getTimeMillis() eventLoopThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { val unknownExecutor = !scheduler.executorHeartbeatReceived( executorId, taskMetrics, blockManagerId) val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor) context.reply(response) } }) } else { // This may happen if we get an executor's in-flight heartbeat immediately // after we just removed it. It's not really an error condition so we should // not log warning here. Otherwise there may be a lot of noise especially if // we explicitly remove executors (SPARK-4134). logDebug(s"Received heartbeat from unknown executor $executorId") context.reply(HeartbeatResponse(reregisterBlockManager = false)) } } else { // Because Executor will sleep several seconds before sending the first "Heartbeat", this // case rarely happens. However, if it really happens, log it and ask the executor to // register itself again. logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet") context.reply(HeartbeatResponse(reregisterBlockManager = true)) } } ```