[jira] [Commented] (FLINK-4851) Add FatalErrorHandler and MetricRegistry to ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15599397#comment-15599397 ] ASF GitHub Bot commented on FLINK-4851: --- Github user tillrohrmann closed the pull request at: https://github.com/apache/flink/pull/2655 > Add FatalErrorHandler and MetricRegistry to ResourceManager > --- > > Key: FLINK-4851 > URL: https://issues.apache.org/jira/browse/FLINK-4851 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{ResourceManager}} currently does not contain a {{FatalErrorHandler}}. > In order to harmonize the fatal error handling across all components, we > should introduce a {{FatalErrorHandler}}, which handles fatal errors. > Additionally, we should also give a {{MetricRegistry}} to the > {{ResourceManager}} so that it can report metrics. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4851) Add FatalErrorHandler and MetricRegistry to ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15599396#comment-15599396 ] ASF GitHub Bot commented on FLINK-4851: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2655 Merged > Add FatalErrorHandler and MetricRegistry to ResourceManager > --- > > Key: FLINK-4851 > URL: https://issues.apache.org/jira/browse/FLINK-4851 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{ResourceManager}} currently does not contain a {{FatalErrorHandler}}. > In order to harmonize the fatal error handling across all components, we > should introduce a {{FatalErrorHandler}}, which handles fatal errors. > Additionally, we should also give a {{MetricRegistry}} to the > {{ResourceManager}} so that it can report metrics. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4851) Add FatalErrorHandler and MetricRegistry to ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15597489#comment-15597489 ] ASF GitHub Bot commented on FLINK-4851: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2655#discussion_r84573260 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java --- @@ -145,6 +171,10 @@ public void testRegisterJobMasterWithFailureLeaderListener() throws Exception { Future declineFuture = resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderSessionId, jobMasterAddress, unknownJobIDToHAServices); RegistrationResponse response = declineFuture.get(5, TimeUnit.SECONDS); assertTrue(response instanceof RegistrationResponse.Decline); + + if (testingFatalErrorHandler.hasExceptionOccurred()) { + testingFatalErrorHandler.rethrowError(); + } --- End diff -- Yes I think this makes sense for all of the distributed components RM, JM and TM. It would be good to instantiate default components which you can replace with a special version of a component in case you need it for a test. > Add FatalErrorHandler and MetricRegistry to ResourceManager > --- > > Key: FLINK-4851 > URL: https://issues.apache.org/jira/browse/FLINK-4851 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{ResourceManager}} currently does not contain a {{FatalErrorHandler}}. > In order to harmonize the fatal error handling across all components, we > should introduce a {{FatalErrorHandler}}, which handles fatal errors. > Additionally, we should also give a {{MetricRegistry}} to the > {{ResourceManager}} so that it can report metrics. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4851) Add FatalErrorHandler and MetricRegistry to ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15595486#comment-15595486 ] ASF GitHub Bot commented on FLINK-4851: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2655#discussion_r84507291 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/util/TestingFatalErrorHandler.java --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * Testing fatal error handler which records the occurred exceptions during the execution of the + * tests. Captured exceptions are thrown as a {@link TestingException}. + */ +public class TestingFatalErrorHandler implements FatalErrorHandler { + private static final Logger LOG = LoggerFactory.getLogger(TestingFatalErrorHandler.class); + private final AtomicReference atomicThrowable; + + public TestingFatalErrorHandler() { + atomicThrowable = new AtomicReference<>(null); + } + + public void rethrowError() throws TestingException { + Throwable throwable = atomicThrowable.get(); + + if (throwable != null) { + throw new TestingException(throwable); + } + } + + public boolean hasExceptionOccurred() { + return atomicThrowable.get() != null; + } + + public Throwable getException() { + return atomicThrowable.get(); + } + + @Override + public void onFatalError(Throwable exception) { + LOG.error("OnFatalError:", exception); + + if (!atomicThrowable.compareAndSet(null, exception)) { + atomicThrowable.get().addSuppressed(exception); + } --- End diff -- Oh, I didn't know you could do that. That's neat. > Add FatalErrorHandler and MetricRegistry to ResourceManager > --- > > Key: FLINK-4851 > URL: https://issues.apache.org/jira/browse/FLINK-4851 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{ResourceManager}} currently does not contain a {{FatalErrorHandler}}. > In order to harmonize the fatal error handling across all components, we > should introduce a {{FatalErrorHandler}}, which handles fatal errors. > Additionally, we should also give a {{MetricRegistry}} to the > {{ResourceManager}} so that it can report metrics. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4851) Add FatalErrorHandler and MetricRegistry to ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15595485#comment-15595485 ] ASF GitHub Bot commented on FLINK-4851: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2655#discussion_r84506385 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java --- @@ -145,6 +171,10 @@ public void testRegisterJobMasterWithFailureLeaderListener() throws Exception { Future declineFuture = resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderSessionId, jobMasterAddress, unknownJobIDToHAServices); RegistrationResponse response = declineFuture.get(5, TimeUnit.SECONDS); assertTrue(response instanceof RegistrationResponse.Decline); + + if (testingFatalErrorHandler.hasExceptionOccurred()) { + testingFatalErrorHandler.rethrowError(); + } --- End diff -- This seems like a lot of boilerplate that we could abstract using a base testing class for ResourceManager tests. > Add FatalErrorHandler and MetricRegistry to ResourceManager > --- > > Key: FLINK-4851 > URL: https://issues.apache.org/jira/browse/FLINK-4851 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{ResourceManager}} currently does not contain a {{FatalErrorHandler}}. > In order to harmonize the fatal error handling across all components, we > should introduce a {{FatalErrorHandler}}, which handles fatal errors. > Additionally, we should also give a {{MetricRegistry}} to the > {{ResourceManager}} so that it can report metrics. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4851) Add FatalErrorHandler and MetricRegistry to ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15588696#comment-15588696 ] ASF GitHub Bot commented on FLINK-4851: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2655#discussion_r84063789 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -408,106 +528,98 @@ public void run() { */ @Override public void handleError(final Exception exception) { - log.error("ResourceManager received an error from the LeaderElectionService.", exception); - // terminate ResourceManager in case of an error - shutDown(); + onFatalErrorAsync(new ResourceManagerException("Received an error from the LeaderElectionService.", exception)); } /** -* Registers an infoMessage listener +* This method should be called by the framework once it detects that a currently registered +* task executor has failed. * -* @param infoMessageListenerAddress address of infoMessage listener to register to this resource manager +* @param resourceID Id of the worker that has failed. +* @param message An informational message that explains why the worker failed. */ - @RpcMethod - public void registerInfoMessageListener(final String infoMessageListenerAddress) { - if(infoMessageListeners.containsKey(infoMessageListenerAddress)) { - log.warn("Receive a duplicate registration from info message listener on ({})", infoMessageListenerAddress); - } else { - Future infoMessageListenerRpcGatewayFuture = getRpcService().connect(infoMessageListenerAddress, InfoMessageListenerRpcGateway.class); - - infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new AcceptFunction() { - @Override - public void accept(InfoMessageListenerRpcGateway gateway) { - log.info("Receive a registration from info message listener on ({})", infoMessageListenerAddress); - infoMessageListeners.put(infoMessageListenerAddress, gateway); - } - }, getMainThreadExecutor()); + public void notifyWorkerFailed(final ResourceID resourceID, final String message) { + runAsync(new Runnable() { --- End diff -- I actually just adding logging. I guess that one has to add the registration id of the TM to filter out outdated notify worker failed calls. > Add FatalErrorHandler and MetricRegistry to ResourceManager > --- > > Key: FLINK-4851 > URL: https://issues.apache.org/jira/browse/FLINK-4851 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{ResourceManager}} currently does not contain a {{FatalErrorHandler}}. > In order to harmonize the fatal error handling across all components, we > should introduce a {{FatalErrorHandler}}, which handles fatal errors. > Additionally, we should also give a {{MetricRegistry}} to the > {{ResourceManager}} so that it can report metrics. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4851) Add FatalErrorHandler and MetricRegistry to ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585538#comment-15585538 ] ASF GitHub Bot commented on FLINK-4851: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2655 [FLINK-4851] [rm] Introduce FatalErrorHandler and MetricRegistry to RM This PR is based on #2651 This PR introduces a `FatalErrorHandler` and the `MetricRegistry` to the RM. The `FatalErrorHandler` is used to handle fatal errors. Additionally, the PR adds the `MetricRegistry` to the RM which can be used to register metrics. Apart from these changes the PR restructures the code of the RM a little bit and fixes some blocking operations. The PR also moves the `TestingFatalErrorHandler` into the util package of flink-runtime test. That way, it is usable across multiple tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink resourceManagerImprovements Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2655.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2655 commit ddf35c4ddb04629cddebb2401488effe93416b70 Author: Till Rohrmann Date: 2016-10-17T14:22:16Z [FLINK-4847] Let RpcEndpoint.start/shutDown throw exceptions Allowing the RpcEndpoint.start/shutDown to throw exceptions will help to let rpc endpoints to quickly fail without having to use a callback like the FatalErrorHandler. commit 9fdcc11f7730b3c7a6f061c17d0463ea3f21a9f9 Author: Till Rohrmann Date: 2016-10-17T14:03:02Z [FLINK-4851] [rm] Introduce FatalErrorHandler and MetricRegistry to RM This PR introduces a FatalErrorHandler and the MetricRegistry to the RM. The FatalErrorHandler is used to handle fatal errors. Additionally, the PR adds the MetricRegistry to the RM which can be used to register metrics. Apart from these changes the PR restructures the code of the RM a little bit and fixes some blocking operations. The PR also moves the TestingFatalErrorHandler into the util package of flink-runtime test. That it is usable across multiple tests. > Add FatalErrorHandler and MetricRegistry to ResourceManager > --- > > Key: FLINK-4851 > URL: https://issues.apache.org/jira/browse/FLINK-4851 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{ResourceManager}} currently does not contain a {{FatalErrorHandler}}. > In order to harmonize the fatal error handling across all components, we > should introduce a {{FatalErrorHandler}}, which handles fatal errors. > Additionally, we should also give a {{MetricRegistry}} to the > {{ResourceManager}} so that it can report metrics. -- This message was sent by Atlassian JIRA (v6.3.4#6332)