[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

2018-05-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6035


---


[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

2018-05-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6035#discussion_r189019837
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -947,6 +964,36 @@ private void closeResourceManagerConnection(Exception 
cause) {
resourceManagerConnection.close();
resourceManagerConnection = null;
}
+
+   startRegistrationTimeout();
+   }
+
+   private void startRegistrationTimeout() {
+   final Time maxRegistrationDuration = 
taskManagerConfiguration.getMaxRegistrationDuration();
+
+   if (maxRegistrationDuration != null) {
+   final UUID newRegistrationTimeoutId = UUID.randomUUID();
+   currentRegistrationTimeoutId = newRegistrationTimeoutId;
+   scheduleRunAsync(() -> 
registrationTimeout(newRegistrationTimeoutId), maxRegistrationDuration);
+   }
+   }
+
+   private void stopRegistrationTimeout() {
+   currentRegistrationTimeoutId = null;
+   }
+
+   private void registrationTimeout(@Nonnull UUID registrationTimeoutId) {
+   if (registrationTimeoutId.equals(currentRegistrationTimeoutId)) 
{
+   final Time maxRegistrationDuration = 
taskManagerConfiguration.getMaxRegistrationDuration();
+
+   onFatalError(
+   new RegistrationTimeoutException(
+   String.format("Could not register at 
the ResourceManager within the specified maximum " +
+   "registration duration %s. This 
indicates a problem with this instance. Terminating now.",
+   maxRegistrationDuration)));
+   } else {
+   log.debug("Ignoring outdated registration timeout.");
--- End diff --

True, will remove it.


---


[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

2018-05-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6035#discussion_r189019783
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -1605,9 +1609,16 @@ public void notifyHeartbeatTimeout(final ResourceID 
resourceId) {
runAsync(() -> {
log.info("The heartbeat of ResourceManager with 
id {} timed out.", resourceId);
 
-   closeResourceManagerConnection(
-   new TimeoutException(
-   "The heartbeat of 
ResourceManager with id " + resourceId + " timed out."));
+   if (establishedResourceManagerConnection != 
null && 
establishedResourceManagerConnection.getResourceManagerResourceID().equals(resourceId))
 {
+   final String resourceManagerAddress = 
establishedResourceManagerConnection.getResourceManagerGateway().getAddress();
--- End diff --

Actually not, because we set `establishedResourcemanagerConnection` to 
`null` in the close method.


---


[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

2018-05-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6035#discussion_r189017088
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -947,6 +964,36 @@ private void closeResourceManagerConnection(Exception 
cause) {
resourceManagerConnection.close();
resourceManagerConnection = null;
}
+
+   startRegistrationTimeout();
--- End diff --

The problem is that we want this timeout to start whenever the 
`TaskExecutor` loses its connection to the RM and that's when we close the RM 
connection. This also covers the case, where we don't know the RM address (e.g. 
if the RM loses leadership).


---


[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

2018-05-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6035#discussion_r189016636
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
 ---
@@ -50,8 +50,11 @@
private final String[] tmpDirectories;
 
private final Time timeout;
+
// null indicates an infinite duration
+   @Nullable
--- End diff --

Will change it.


---


[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

2018-05-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6035#discussion_r189016685
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -1605,9 +1609,16 @@ public void notifyHeartbeatTimeout(final ResourceID 
resourceId) {
runAsync(() -> {
log.info("The heartbeat of ResourceManager with 
id {} timed out.", resourceId);
 
-   closeResourceManagerConnection(
-   new TimeoutException(
-   "The heartbeat of 
ResourceManager with id " + resourceId + " timed out."));
+   if (establishedResourceManagerConnection != 
null && 
establishedResourceManagerConnection.getResourceManagerResourceID().equals(resourceId))
 {
+   final String resourceManagerAddress = 
establishedResourceManagerConnection.getResourceManagerGateway().getAddress();
--- End diff --

True, will change it.


---


[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

2018-05-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6035#discussion_r189016603
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -232,8 +239,10 @@ public TaskExecutor(
rpcService.getScheduledExecutor(),
log);
 
-   hardwareDescription = HardwareDescription.extractFromSystem(
+   this.hardwareDescription = 
HardwareDescription.extractFromSystem(

taskExecutorServices.getMemoryManager().getMemorySize());
+
+   this.currentRegistrationTimeoutId = null;
--- End diff --

Will change it.


---


[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

2018-05-17 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6035#discussion_r189004536
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -1605,9 +1609,16 @@ public void notifyHeartbeatTimeout(final ResourceID 
resourceId) {
runAsync(() -> {
log.info("The heartbeat of ResourceManager with 
id {} timed out.", resourceId);
 
-   closeResourceManagerConnection(
-   new TimeoutException(
-   "The heartbeat of 
ResourceManager with id " + resourceId + " timed out."));
+   if (establishedResourceManagerConnection != 
null && 
establishedResourceManagerConnection.getResourceManagerResourceID().equals(resourceId))
 {
+   final String resourceManagerAddress = 
establishedResourceManagerConnection.getResourceManagerGateway().getAddress();
--- End diff --

Declaration and assignment can be moved closer to 
`createResourceManagerConnection`.


---


[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

2018-05-17 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6035#discussion_r189012246
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -947,6 +964,36 @@ private void closeResourceManagerConnection(Exception 
cause) {
resourceManagerConnection.close();
resourceManagerConnection = null;
}
+
+   startRegistrationTimeout();
+   }
+
+   private void startRegistrationTimeout() {
+   final Time maxRegistrationDuration = 
taskManagerConfiguration.getMaxRegistrationDuration();
+
+   if (maxRegistrationDuration != null) {
+   final UUID newRegistrationTimeoutId = UUID.randomUUID();
+   currentRegistrationTimeoutId = newRegistrationTimeoutId;
+   scheduleRunAsync(() -> 
registrationTimeout(newRegistrationTimeoutId), maxRegistrationDuration);
+   }
+   }
+
+   private void stopRegistrationTimeout() {
+   currentRegistrationTimeoutId = null;
+   }
+
+   private void registrationTimeout(@Nonnull UUID registrationTimeoutId) {
+   if (registrationTimeoutId.equals(currentRegistrationTimeoutId)) 
{
+   final Time maxRegistrationDuration = 
taskManagerConfiguration.getMaxRegistrationDuration();
+
+   onFatalError(
+   new RegistrationTimeoutException(
+   String.format("Could not register at 
the ResourceManager within the specified maximum " +
+   "registration duration %s. This 
indicates a problem with this instance. Terminating now.",
+   maxRegistrationDuration)));
+   } else {
+   log.debug("Ignoring outdated registration timeout.");
--- End diff --

I think this will be logged even if the registration succeeded.


---


[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

2018-05-17 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6035#discussion_r189013436
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -947,6 +964,36 @@ private void closeResourceManagerConnection(Exception 
cause) {
resourceManagerConnection.close();
resourceManagerConnection = null;
}
+
+   startRegistrationTimeout();
--- End diff --

It looks weird that we call `startRegistrationTimeout();` in 
`closeResourceManagerConnection`. Can it be done in 
`createResourceManagerConnection`


---


[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

2018-05-17 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6035#discussion_r188992099
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -232,8 +239,10 @@ public TaskExecutor(
rpcService.getScheduledExecutor(),
log);
 
-   hardwareDescription = HardwareDescription.extractFromSystem(
+   this.hardwareDescription = 
HardwareDescription.extractFromSystem(

taskExecutorServices.getMemoryManager().getMemorySize());
+
+   this.currentRegistrationTimeoutId = null;
--- End diff --

It's already `null` by default.


---


[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

2018-05-17 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6035#discussion_r189004280
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
 ---
@@ -50,8 +50,11 @@
private final String[] tmpDirectories;
 
private final Time timeout;
+
// null indicates an infinite duration
+   @Nullable
--- End diff --

Should also be `@Nullable` on the constructor.


---


[GitHub] flink pull request #6035: [FLINK-6160] Add reconnection attempts in case of ...

2018-05-17 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/6035

[FLINK-6160] Add reconnection attempts in case of heartbeat timeouts to 
JobMaster and TaskExecutor

## What is the purpose of the change

If a timeout with the RM occurs on on the JobMaster and TaskExecutor, then 
they will both try to reconnect
to the last known RM address.

Additionally, we now respect the TaskManagerOption#REGISTRATION_TIMEOUT on 
the TaskExecutor. This means that
if the TaskExecutor could not register at a RM within the given 
registration timeout, it will fail with a
fatal exception. This allows to fail the TaskExecutor process in case that 
it cannot establish a connection
and ultimately frees the occupied resources.

The commit also changes the default value for 
TaskManagerOption#REGISTRATION_TIMEOUT from "Inf" to "5 min".

cc @GJL.

## Brief change log

- Retry connection to RM in case of heartbeat timeout on `JobMaster` and 
`TaskExecutor`
- Fail `TaskExecutor` if we could not connect to `RM` within 
`TaskManagerOptions#REGISTRATION_TIMEOUT`

## Verifying this change

- Adapted `JobMasterTest#testHeartbeatTimeoutWithResourceManager`
- Adapted `TaskExecutorTest#testHeartbeatTimeoutWithResourceManager`
- Added `TaskExecutorTest#testMaximumRegistrationDuration` and 
`TaskExecutorTest#testMaximumRegistrationDurationAfterConnectionLoss`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixReconnection

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6035.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6035


commit 6b45c84cf06688099e71c9e1809917653af43d31
Author: Till Rohrmann 
Date:   2018-05-17T12:44:14Z

[FLINK-6160] Add reconnection attempts in case of heartbeat timeouts to 
JobMaster and TaskExecutor

If a timeout with the RM occurs on on the JobMaster and TaskExecutor, then 
they will both try to reconnect
to the last known RM address.

Additionally, we now respect the TaskManagerOption#REGISTRATION_TIMEOUT on 
the TaskExecutor. This means that
if the TaskExecutor could not register at a RM within the given 
registration timeout, it will fail with a
fatal exception. This allows to fail the TaskExecutor process in case that 
it cannot establish a connection
and ultimately frees the occupied resources.

The commit also changes the default value for 
TaskManagerOption#REGISTRATION_TIMEOUT from "Inf" to "5 min".




---