This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b682e9a316669f30fa4bfcaa32a8fa0d3ac1dc02
Author: Andrey Zagrebin <azagre...@gmail.com>
AuthorDate: Mon Aug 19 16:20:39 2019 +0200

    [FLINK-13769][Coordination] Close RM connection in TaskExecutor.onStop and 
do not reconnect
    
    This prevents JM from acquiring slots which belong to the stopped TM.
---
 .../apache/flink/runtime/taskexecutor/TaskExecutor.java    | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index c8bcaf9..b1238ec 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -340,11 +340,10 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
                Throwable jobManagerDisconnectThrowable = null;
 
-               if (resourceManagerConnection != null) {
-                       resourceManagerConnection.close();
-               }
-
                FlinkException cause = new FlinkException("The TaskExecutor is 
shutting down.");
+
+               closeResourceManagerConnection(cause);
+
                for (JobManagerConnection jobManagerConnection : 
jobManagerConnections.values()) {
                        try {
                                
disassociateFromJobManager(jobManagerConnection, cause);
@@ -958,7 +957,9 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
        @Override
        public void disconnectResourceManager(Exception cause) {
-               reconnectToResourceManager(cause);
+               if (isRunning()) {
+                       reconnectToResourceManager(cause);
+               }
        }
 
        // 
======================================================================
@@ -986,6 +987,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
        private void reconnectToResourceManager(Exception cause) {
                closeResourceManagerConnection(cause);
+               startRegistrationTimeout();
                tryConnectToResourceManager();
        }
 
@@ -1098,8 +1100,6 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                        resourceManagerConnection.close();
                        resourceManagerConnection = null;
                }
-
-               startRegistrationTimeout();
        }
 
        private void startRegistrationTimeout() {

Reply via email to