This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
commit b682e9a316669f30fa4bfcaa32a8fa0d3ac1dc02 Author: Andrey Zagrebin <azagre...@gmail.com> AuthorDate: Mon Aug 19 16:20:39 2019 +0200 [FLINK-13769][Coordination] Close RM connection in TaskExecutor.onStop and do not reconnect This prevents JM from acquiring slots which belong to the stopped TM. --- .../apache/flink/runtime/taskexecutor/TaskExecutor.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index c8bcaf9..b1238ec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -340,11 +340,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { Throwable jobManagerDisconnectThrowable = null; - if (resourceManagerConnection != null) { - resourceManagerConnection.close(); - } - FlinkException cause = new FlinkException("The TaskExecutor is shutting down."); + + closeResourceManagerConnection(cause); + for (JobManagerConnection jobManagerConnection : jobManagerConnections.values()) { try { disassociateFromJobManager(jobManagerConnection, cause); @@ -958,7 +957,9 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { @Override public void disconnectResourceManager(Exception cause) { - reconnectToResourceManager(cause); + if (isRunning()) { + reconnectToResourceManager(cause); + } } // ====================================================================== @@ -986,6 +987,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { private void reconnectToResourceManager(Exception cause) { closeResourceManagerConnection(cause); + startRegistrationTimeout(); tryConnectToResourceManager(); } @@ -1098,8 +1100,6 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { resourceManagerConnection.close(); resourceManagerConnection = null; } - - startRegistrationTimeout(); } private void startRegistrationTimeout() {