[GitHub] flink issue #3360: [FLINK-5830][Distributed Coordination] Handle OutOfMemory...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3360 @StephanEwen , if the exception is bubbled out, and cause TaskExecutor to exit as a result, I think the JobMaster can be assumed in a sane state in final based on detection of TaskExecutor failure. The current solution just refers to OOM error, maybe it can extend to any exceptions, because it is difficult to confirm the consistency of the internal state and the conservative approach is to let it terminate as @tillrohrmann said. If I understand correctly from @tillrohrmann 's suggestions, the **RpcEndpoint.runAsync** method would modify to return a **Future** that is similar with **RpcEndpoint.callAsync**, but still a **Tell** action in akka. And this **Future** should be set as a field in **RunAsync** in order to get it when handle in **AkkaRpcActor**. The **Future** can help to determine whether the message is executed successfully or lost to enhance the **Tell** mechanism. If the **Future** with **Tell** action is better than current ** RpcEndpoint .callAsync** which refers to **Ask** action, I will try to do for that. Or another option is tolerating the lost message in current **RpcEndpoint.runAsync**, and it should be used in such scenarios for efficiency and not safe. For the important interaction, it should resort to **RpcEndpoint.callAsync**. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3360: [FLINK-5830][Distributed Coordination] Handle OutOfMemory...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3360 Thanks for the clarification @zhijiangW. I know understand the problem that we effectively introduce via `RpcEndpoint.runAsync` another message which might get "lost" (e.g. due to OOM exception). I agree with Stephan that it's hard to reason about the consistency of the `AkkaRpcActor s` internal state once we see an exception. The conservative approach would probably be to let it terminate or calling `notifyFatalError` to handle it. Related to this is also how we handle exceptions in the `AkkaRpcActor.handleRpcInvocation`. There we catch all exception and simply send them to the caller. I think in this method we should only send the non-fatal exceptions back and terminate otherwise. To follow a similar pattern for the `handleRunAsync` we could think about returning a `Future` which we return when calling `RpcEndpoint.runAsync` which will be completed with non fatal exceptions or if the `Runnable` has been executed. And in case that we see a fatal exception we terminate or call `notifyFatalError`. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3360: [FLINK-5830][Distributed Coordination] Handle OutOfMemory...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3360 Looking at this from another angle: If any Runnable that is scheduled ever lets an exception bubble out, can we still assume that the JobManager is in a sane state? Or should be actually make every uncaught exception in the RPC executors a fatal error and send a `notifyFatalError` to the `RpcEndpoint`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3360: [FLINK-5830][Distributed Coordination] Handle OutOfMemory...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3360 Hi @tillrohrmann , thank you for reviews and positive suggestions! I try to explain the root case of this issue first: From JobMaster side, it sends the cancel rpc message and gets the acknowledge from TaskExecutor, then the execution state transition to **CANCELING**. From TaskExecutor side, it would notify the final state to JobMaster before task exits. The **notifyFinalState** can be divided into two steps: - Execute the **RunAsync** message by akka actor and this is a tell action, and it will trigger to run **unregisterTaskAndNotifyFinalState**. - In process of **unregisterTaskAndNotifyFinalState**, it will trigger the rpc message of **updateTaskExecutionState** , and it is a ask action, so the mechanism can avoid lost message. The problem is that it may cause OOM before trigger **updateTaskExecutionState**, and this error is caught by **AkkaRpcActor** and does not do anything resulting in interrupting the following process. The **updateTaskExecutionState** will not be executed anymore. For the key point interaction between TaskExecutor and JobMaster, it should not tolerate lost message, and I agree with your above suggestions. So there may be two ideas for this improvement: - Enhance the robustness of **notifyFinalState**, and the current rethrow OOM is an easy option, but it will cause the TaskExecutor exitï¼there should be other ways to make the cost reduction. - After get cancel acknowledge in JobMaster side, it will trigger a timeout to check the execution final state. If the execution has not entered the final state within timeout, the JobMaster can resend the acknowledge message to TaskExecutor to confirm the status. And I prefers the first way to just make it sense in one side, avoid the complex interaction between TaskExecutor and JobMaster. Wish your further suggestions or any ideas. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3360: [FLINK-5830][Distributed Coordination] Handle OutOfMemory...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3360 I think adding this safety net makes sense and protects against a corrupted state. However, isn't the root cause of the described problem that the JobMaster-TaskExecutor communication does not tolerate a lost message? Maybe we should introduce an acknowledge message which signals the correct reception of the status message. If the response times out we can either retry to send it or fail. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3360: [FLINK-5830][Distributed Coordination] Handle OutOfMemory...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3360 @StephanEwen , already submit the modifications. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3360: [FLINK-5830][Distributed Coordination] Handle OutOfMemory...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3360 @StephanEwen , thank you for so quick reviews! That is a good idea to add the uniform way in the utils, so we can use that in anywhere. I will fix it as your suggestions later today. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3360: [FLINK-5830][Distributed Coordination] Handle OutOfMemory...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3360 I would suggest that we adopt the following pattern for all the places like the one in this pull request where we catch Throwables: ```java try { ... } catch (Throwable t) { ExceptionUtils.rethrowIfFatalErrorOrOOM(t); // the other handling logic... } ``` This requires to add the function `rethrowIfFatalErrorOrOOM(Throwable)` to the `ExceptionUtils`, similar to the method here: https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java#L109 It would be even nicer if we could do something like Scala supports, but I think there is no way better way to do this in Java than the way suggested above ```scala try { ... } catch { case NonFatal(t) => // does not include OOM and internal errors ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---