[GitHub] flink issue #3360: [FLINK-5830][Distributed Coordination] Handle OutOfMemory...

2017-02-23 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/3360
  
@StephanEwen , if the exception is bubbled out, and cause TaskExecutor to 
exit as a result, I think the JobMaster can be assumed in a sane state in final 
based on detection of TaskExecutor failure. 

The current solution just refers to OOM error, maybe it can extend to any 
exceptions, because it is difficult to confirm the consistency of the internal 
state and the conservative approach is to let it terminate as @tillrohrmann 
said.

If I understand correctly from @tillrohrmann 's suggestions, the 
**RpcEndpoint.runAsync** method would modify to return a **Future** that is 
similar with **RpcEndpoint.callAsync**, but still a **Tell** action in akka. 
And this **Future** should be set as a field in **RunAsync** in order to get it 
when handle in **AkkaRpcActor**. The **Future** can help to determine whether 
the message is executed successfully or lost to enhance the **Tell** mechanism. 
 If the **Future** with **Tell** action is better than current ** RpcEndpoint 
.callAsync** which refers to **Ask** action, I will try to do for that. Or 
another option is tolerating the lost message in current 
**RpcEndpoint.runAsync**, and it should be used in such scenarios for 
efficiency and not safe. For the important interaction, it should resort to 
**RpcEndpoint.callAsync**. What do you think?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3360: [FLINK-5830][Distributed Coordination] Handle OutOfMemory...

2017-02-22 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3360
  
Thanks for the clarification @zhijiangW. I know understand the problem that 
we effectively introduce via `RpcEndpoint.runAsync` another message which might 
get "lost" (e.g. due to OOM exception).

I agree with Stephan that it's hard to reason about the consistency of the 
`AkkaRpcActor
s` internal state once we see an exception. The conservative approach would 
probably be to let it terminate or calling `notifyFatalError` to handle it.

Related to this is also how we handle exceptions in the 
`AkkaRpcActor.handleRpcInvocation`. There we catch all exception and simply 
send them to the caller. I think in this method we should only send the 
non-fatal exceptions back and terminate otherwise.

To follow a similar pattern for the `handleRunAsync` we could think about 
returning a `Future` which we return when calling `RpcEndpoint.runAsync` which 
will be completed with non fatal exceptions or if the `Runnable` has been 
executed. And in case that we see a fatal exception we terminate or call 
`notifyFatalError`. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3360: [FLINK-5830][Distributed Coordination] Handle OutOfMemory...

2017-02-22 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3360
  
Looking at this from another angle: If any Runnable that is scheduled ever 
lets an exception bubble out, can we still assume that the JobManager is in a 
sane state? Or should be actually make every uncaught exception in the RPC 
executors a fatal error and send a `notifyFatalError` to the `RpcEndpoint`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3360: [FLINK-5830][Distributed Coordination] Handle OutOfMemory...

2017-02-21 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/3360
  
Hi @tillrohrmann , thank you for reviews and positive suggestions!

I try to explain the root case of this issue first:

From JobMaster side, it sends the cancel rpc message and gets the 
acknowledge from TaskExecutor, then the execution state transition to 
**CANCELING**.

From TaskExecutor side, it would notify the final state to JobMaster before 
task exits. The **notifyFinalState** can be divided into two steps:

- Execute the **RunAsync** message by akka actor and this is a tell action, 
and it will trigger to run **unregisterTaskAndNotifyFinalState**.
- In process of **unregisterTaskAndNotifyFinalState**, it will trigger the 
rpc message of **updateTaskExecutionState** , and it is a ask action, so the 
mechanism can avoid lost message.

The problem is that it may cause OOM before trigger 
**updateTaskExecutionState**, and this error is caught by **AkkaRpcActor** and 
does not do anything resulting in interrupting the following process. The 
**updateTaskExecutionState** will not be executed anymore.

For the key point interaction between TaskExecutor and JobMaster, it should 
not tolerate lost message, and I agree with your above suggestions. So there 
may be two ideas for this improvement:

- Enhance the robustness of **notifyFinalState**, and the current rethrow 
OOM is an easy option, but it will cause the TaskExecutor exit,there should 
be other ways to make the cost reduction.
- After get cancel acknowledge in JobMaster side, it will trigger a timeout 
to check the execution final state. If the execution has not entered the final 
state within timeout, the JobMaster can resend the acknowledge message to 
TaskExecutor to confirm the status.

And I prefers the first way to just make it sense in one side, avoid the 
complex interaction between TaskExecutor and JobMaster. 

Wish your further suggestions or any ideas.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3360: [FLINK-5830][Distributed Coordination] Handle OutOfMemory...

2017-02-21 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3360
  
I think adding this safety net makes sense and protects against a corrupted 
state. 

However, isn't the root cause of the described problem that the 
JobMaster-TaskExecutor communication does not tolerate a lost message? Maybe we 
should introduce an acknowledge message which signals the correct reception of 
the status message. If the response times out we can either retry to send it or 
fail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3360: [FLINK-5830][Distributed Coordination] Handle OutOfMemory...

2017-02-20 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/3360
  
@StephanEwen , already submit the modifications.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3360: [FLINK-5830][Distributed Coordination] Handle OutOfMemory...

2017-02-20 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/3360
  
@StephanEwen , thank you for so quick reviews! 

That is a good idea to add the uniform way in the utils, so we can use that 
in anywhere.

I will fix it as your suggestions later today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3360: [FLINK-5830][Distributed Coordination] Handle OutOfMemory...

2017-02-20 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3360
  
I would suggest that we adopt the following pattern for all the places like 
the one in this pull request where we catch Throwables:

```java
try {
...
} catch (Throwable t) {
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);

 // the other handling logic...
}
```

This requires to add the function `rethrowIfFatalErrorOrOOM(Throwable)` to 
the `ExceptionUtils`, similar to the method here: 
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java#L109

It would be even nicer if we could do something like Scala supports, but I 
think there is no way better way to do this in Java than the way suggested above
```scala
try {
...
} catch {
case NonFatal(t) => // does not include OOM and internal errors
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---