[jira] [Commented] (FLINK-4851) Add FatalErrorHandler and MetricRegistry to ResourceManager

2016-10-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15599397#comment-15599397
 ] 

ASF GitHub Bot commented on FLINK-4851:
---

Github user tillrohrmann closed the pull request at:

https://github.com/apache/flink/pull/2655


> Add FatalErrorHandler and MetricRegistry to ResourceManager
> ---
>
> Key: FLINK-4851
> URL: https://issues.apache.org/jira/browse/FLINK-4851
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{ResourceManager}} currently does not contain a {{FatalErrorHandler}}. 
> In order to harmonize the fatal error handling across all components, we 
> should introduce a {{FatalErrorHandler}}, which handles fatal errors. 
> Additionally, we should also give a {{MetricRegistry}} to the 
> {{ResourceManager}} so that it can report metrics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4851) Add FatalErrorHandler and MetricRegistry to ResourceManager

2016-10-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15599396#comment-15599396
 ] 

ASF GitHub Bot commented on FLINK-4851:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2655
  
Merged


> Add FatalErrorHandler and MetricRegistry to ResourceManager
> ---
>
> Key: FLINK-4851
> URL: https://issues.apache.org/jira/browse/FLINK-4851
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{ResourceManager}} currently does not contain a {{FatalErrorHandler}}. 
> In order to harmonize the fatal error handling across all components, we 
> should introduce a {{FatalErrorHandler}}, which handles fatal errors. 
> Additionally, we should also give a {{MetricRegistry}} to the 
> {{ResourceManager}} so that it can report metrics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4851) Add FatalErrorHandler and MetricRegistry to ResourceManager

2016-10-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15597489#comment-15597489
 ] 

ASF GitHub Bot commented on FLINK-4851:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2655#discussion_r84573260
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
 ---
@@ -145,6 +171,10 @@ public void 
testRegisterJobMasterWithFailureLeaderListener() throws Exception {
Future declineFuture = 
resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderSessionId, 
jobMasterAddress, unknownJobIDToHAServices);
RegistrationResponse response = declineFuture.get(5, 
TimeUnit.SECONDS);
assertTrue(response instanceof RegistrationResponse.Decline);
+
+   if (testingFatalErrorHandler.hasExceptionOccurred()) {
+   testingFatalErrorHandler.rethrowError();
+   }
--- End diff --

Yes I think this makes sense for all of the distributed components RM, JM 
and TM. It would  be good to instantiate default components which you can 
replace with a special version of a component in case you need it for a test. 


> Add FatalErrorHandler and MetricRegistry to ResourceManager
> ---
>
> Key: FLINK-4851
> URL: https://issues.apache.org/jira/browse/FLINK-4851
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{ResourceManager}} currently does not contain a {{FatalErrorHandler}}. 
> In order to harmonize the fatal error handling across all components, we 
> should introduce a {{FatalErrorHandler}}, which handles fatal errors. 
> Additionally, we should also give a {{MetricRegistry}} to the 
> {{ResourceManager}} so that it can report metrics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4851) Add FatalErrorHandler and MetricRegistry to ResourceManager

2016-10-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15595486#comment-15595486
 ] 

ASF GitHub Bot commented on FLINK-4851:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2655#discussion_r84507291
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingFatalErrorHandler.java
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Testing fatal error handler which records the occurred exceptions 
during the execution of the
+ * tests. Captured exceptions are thrown as a {@link TestingException}.
+ */
+public class TestingFatalErrorHandler implements FatalErrorHandler {
+   private static final Logger LOG = 
LoggerFactory.getLogger(TestingFatalErrorHandler.class);
+   private final AtomicReference atomicThrowable;
+
+   public TestingFatalErrorHandler() {
+   atomicThrowable = new AtomicReference<>(null);
+   }
+
+   public void rethrowError() throws TestingException {
+   Throwable throwable = atomicThrowable.get();
+
+   if (throwable != null) {
+   throw new TestingException(throwable);
+   }
+   }
+
+   public boolean hasExceptionOccurred() {
+   return atomicThrowable.get() != null;
+   }
+
+   public Throwable getException() {
+   return atomicThrowable.get();
+   }
+
+   @Override
+   public void onFatalError(Throwable exception) {
+   LOG.error("OnFatalError:", exception);
+
+   if (!atomicThrowable.compareAndSet(null, exception)) {
+   atomicThrowable.get().addSuppressed(exception);
+   }
--- End diff --

Oh, I didn't know you could do that. That's neat.


> Add FatalErrorHandler and MetricRegistry to ResourceManager
> ---
>
> Key: FLINK-4851
> URL: https://issues.apache.org/jira/browse/FLINK-4851
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{ResourceManager}} currently does not contain a {{FatalErrorHandler}}. 
> In order to harmonize the fatal error handling across all components, we 
> should introduce a {{FatalErrorHandler}}, which handles fatal errors. 
> Additionally, we should also give a {{MetricRegistry}} to the 
> {{ResourceManager}} so that it can report metrics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4851) Add FatalErrorHandler and MetricRegistry to ResourceManager

2016-10-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15595485#comment-15595485
 ] 

ASF GitHub Bot commented on FLINK-4851:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2655#discussion_r84506385
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
 ---
@@ -145,6 +171,10 @@ public void 
testRegisterJobMasterWithFailureLeaderListener() throws Exception {
Future declineFuture = 
resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderSessionId, 
jobMasterAddress, unknownJobIDToHAServices);
RegistrationResponse response = declineFuture.get(5, 
TimeUnit.SECONDS);
assertTrue(response instanceof RegistrationResponse.Decline);
+
+   if (testingFatalErrorHandler.hasExceptionOccurred()) {
+   testingFatalErrorHandler.rethrowError();
+   }
--- End diff --

This seems like a lot of boilerplate that we could abstract using a base 
testing class for ResourceManager tests.


> Add FatalErrorHandler and MetricRegistry to ResourceManager
> ---
>
> Key: FLINK-4851
> URL: https://issues.apache.org/jira/browse/FLINK-4851
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{ResourceManager}} currently does not contain a {{FatalErrorHandler}}. 
> In order to harmonize the fatal error handling across all components, we 
> should introduce a {{FatalErrorHandler}}, which handles fatal errors. 
> Additionally, we should also give a {{MetricRegistry}} to the 
> {{ResourceManager}} so that it can report metrics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4851) Add FatalErrorHandler and MetricRegistry to ResourceManager

2016-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15588696#comment-15588696
 ] 

ASF GitHub Bot commented on FLINK-4851:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2655#discussion_r84063789
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -408,106 +528,98 @@ public void run() {
 */
@Override
public void handleError(final Exception exception) {
-   log.error("ResourceManager received an error from the 
LeaderElectionService.", exception);
-   // terminate ResourceManager in case of an error
-   shutDown();
+   onFatalErrorAsync(new ResourceManagerException("Received an 
error from the LeaderElectionService.", exception));
}
 
/**
-* Registers an infoMessage listener
+* This method should be called by the framework once it detects that a 
currently registered
+* task executor has failed.
 *
-* @param infoMessageListenerAddress address of infoMessage listener to 
register to this resource manager
+* @param resourceID Id of the worker that has failed.
+* @param message An informational message that explains why the worker 
failed.
 */
-   @RpcMethod
-   public void registerInfoMessageListener(final String 
infoMessageListenerAddress) {
-   
if(infoMessageListeners.containsKey(infoMessageListenerAddress)) {
-   log.warn("Receive a duplicate registration from info 
message listener on ({})", infoMessageListenerAddress);
-   } else {
-   Future 
infoMessageListenerRpcGatewayFuture = 
getRpcService().connect(infoMessageListenerAddress, 
InfoMessageListenerRpcGateway.class);
-
-   infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new 
AcceptFunction() {
-   @Override
-   public void 
accept(InfoMessageListenerRpcGateway gateway) {
-   log.info("Receive a registration from 
info message listener on ({})", infoMessageListenerAddress);
-   
infoMessageListeners.put(infoMessageListenerAddress, gateway);
-   }
-   }, getMainThreadExecutor());
+   public void notifyWorkerFailed(final ResourceID resourceID, final 
String message) {
+   runAsync(new Runnable() {
--- End diff --

I actually just adding logging. I guess that one has to add the 
registration id of the TM to filter out outdated notify worker failed calls.


> Add FatalErrorHandler and MetricRegistry to ResourceManager
> ---
>
> Key: FLINK-4851
> URL: https://issues.apache.org/jira/browse/FLINK-4851
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{ResourceManager}} currently does not contain a {{FatalErrorHandler}}. 
> In order to harmonize the fatal error handling across all components, we 
> should introduce a {{FatalErrorHandler}}, which handles fatal errors. 
> Additionally, we should also give a {{MetricRegistry}} to the 
> {{ResourceManager}} so that it can report metrics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4851) Add FatalErrorHandler and MetricRegistry to ResourceManager

2016-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585538#comment-15585538
 ] 

ASF GitHub Bot commented on FLINK-4851:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2655

[FLINK-4851] [rm] Introduce FatalErrorHandler and MetricRegistry to RM

This PR is based on #2651 

This PR introduces a `FatalErrorHandler` and the `MetricRegistry` to the 
RM. The `FatalErrorHandler` is used to handle fatal errors. Additionally, the 
PR adds the `MetricRegistry` to the RM which can be used to register metrics.

Apart from these changes the PR restructures the code of the RM a little 
bit and fixes some
blocking operations.

The PR also moves the `TestingFatalErrorHandler` into the util package of 
flink-runtime test. That way, it is usable across multiple tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink resourceManagerImprovements

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2655.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2655


commit ddf35c4ddb04629cddebb2401488effe93416b70
Author: Till Rohrmann 
Date:   2016-10-17T14:22:16Z

[FLINK-4847] Let RpcEndpoint.start/shutDown throw exceptions

Allowing the RpcEndpoint.start/shutDown to throw exceptions will help to 
let rpc endpoints
to quickly fail without having to use a callback like the FatalErrorHandler.

commit 9fdcc11f7730b3c7a6f061c17d0463ea3f21a9f9
Author: Till Rohrmann 
Date:   2016-10-17T14:03:02Z

[FLINK-4851] [rm] Introduce FatalErrorHandler and MetricRegistry to RM

This PR introduces a FatalErrorHandler and the MetricRegistry to the RM. 
The FatalErrorHandler is used to handle fatal errors. Additionally, the PR adds 
the MetricRegistry to the RM which can be used
to register metrics.

Apart from these changes the PR restructures the code of the RM a little 
bit and fixes some
blocking operations.

The PR also moves the TestingFatalErrorHandler into the util package of 
flink-runtime test. That
it is usable across multiple tests.




> Add FatalErrorHandler and MetricRegistry to ResourceManager
> ---
>
> Key: FLINK-4851
> URL: https://issues.apache.org/jira/browse/FLINK-4851
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{ResourceManager}} currently does not contain a {{FatalErrorHandler}}. 
> In order to harmonize the fatal error handling across all components, we 
> should introduce a {{FatalErrorHandler}}, which handles fatal errors. 
> Additionally, we should also give a {{MetricRegistry}} to the 
> {{ResourceManager}} so that it can report metrics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)