[FLINK-4351] [cluster management] JobManager handle TaskManager's registration


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a19cae3b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a19cae3b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a19cae3b

Branch: refs/heads/flip-6
Commit: a19cae3b07963776c07c0aae7bee806004f59429
Parents: e91b82d
Author: Kurt Young <ykt...@gmail.com>
Authored: Sun Oct 16 23:00:57 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Oct 16 22:14:41 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/JobMaster.java      | 75 +++++++++++++++++---
 .../runtime/jobmaster/JobMasterGateway.java     | 15 ++--
 .../runtime/taskexecutor/JobLeaderService.java  | 55 ++++++--------
 .../runtime/taskexecutor/TaskExecutor.java      |  2 +-
 .../taskexecutor/TaskManagerServices.java       |  2 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 10 ++-
 6 files changed, 102 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a19cae3b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 05c20d3..8cb9946 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
@@ -36,7 +37,9 @@ import 
org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.Execution;
@@ -81,7 +84,9 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.StartStoppable;
 import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.SerializedValue;
@@ -89,8 +94,10 @@ import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -122,6 +129,8 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        /** Configuration of the JobManager */
        private final Configuration configuration;
 
+       private final Time rpcTimeout;
+
        /** Service to contend for and retrieve the leadership of JM and RM */
        private final HighAvailabilityServices highAvailabilityServices;
 
@@ -152,7 +161,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
 
        private volatile UUID leaderSessionID;
 
-       // --------- resource manager --------
+       // --------- ResourceManager --------
 
        /** Leader retriever service used to locate ResourceManager's address */
        private LeaderRetrievalService resourceManagerLeaderRetriever;
@@ -160,6 +169,9 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        /** Connection with ResourceManager, null if not located address yet or 
we close it initiative */
        private ResourceManagerConnection resourceManagerConnection;
 
+       // --------- TaskManagers --------
+
+       private final Map<ResourceID, Tuple2<TaskManagerLocation, 
TaskExecutorGateway>> registeredTaskManagers;
 
        // 
------------------------------------------------------------------------
 
@@ -181,6 +193,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
 
                this.jobGraph = checkNotNull(jobGraph);
                this.configuration = checkNotNull(configuration);
+               this.rpcTimeout = rpcAskTimeout;
                this.highAvailabilityServices = 
checkNotNull(highAvailabilityService);
                this.libraryCacheManager = checkNotNull(libraryCacheManager);
                this.executionContext = checkNotNull(executorService);
@@ -243,6 +256,8 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
 
                this.slotPool = new SlotPool(executorService);
                this.allocationTimeout = Time.of(5, TimeUnit.SECONDS);
+
+               this.registeredTaskManagers = new HashMap<>(4);
        }
 
        
//----------------------------------------------------------------------------------------------
@@ -379,8 +394,10 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                }
                closeResourceManagerConnection();
 
-               // TODO: disconnect from all registered task managers
-
+               for (ResourceID taskManagerId : 
registeredTaskManagers.keySet()) {
+                       slotPool.releaseResource(taskManagerId);
+               }
+               registeredTaskManagers.clear();
        }
 
        
//----------------------------------------------------------------------------------------------
@@ -662,11 +679,53 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        }
 
        @RpcMethod
-       public RegistrationResponse registerTaskManager(
-               final String taskManagerAddress,
-               final ResourceID taskManagerProcessId,
-               final UUID leaderId) {
-               throw new UnsupportedOperationException("Has to be 
implemented.");
+       public Future<RegistrationResponse> registerTaskManager(
+                       final TaskManagerLocation taskManagerLocation,
+                       final UUID leaderId) throws Exception
+       {
+               if (!JobMaster.this.leaderSessionID.equals(leaderId)) {
+                       log.warn("Discard registration from TaskExecutor {} at 
({}) because the expected " +
+                                                       "leader session ID {} 
did not equal the received leader session ID {}.",
+                                       taskManagerLocation.getResourceID(), 
taskManagerLocation.addressString(),
+                                       JobMaster.this.leaderSessionID, 
leaderId);
+                       throw new Exception("Leader id not match, expected: " + 
JobMaster.this.leaderSessionID
+                                       + ", actual: " + leaderId);
+               }
+
+               final ResourceID taskManagerId = 
taskManagerLocation.getResourceID();
+
+               if (registeredTaskManagers.containsKey(taskManagerId)) {
+                       final RegistrationResponse response = new 
JMTMRegistrationSuccess(
+                                       taskManagerId, 
libraryCacheManager.getBlobServerPort());
+                       return FlinkCompletableFuture.completed(response);
+               } else {
+                       return getRpcService().execute(new 
Callable<TaskExecutorGateway>() {
+                               @Override
+                               public TaskExecutorGateway call() throws 
Exception {
+                                       return 
getRpcService().connect(taskManagerLocation.addressString(), 
TaskExecutorGateway.class)
+                                                       
.get(rpcTimeout.getSize(), rpcTimeout.getUnit());
+                               }
+                       }).handleAsync(new BiFunction<TaskExecutorGateway, 
Throwable, RegistrationResponse>() {
+                               @Override
+                               public RegistrationResponse 
apply(TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
+                                       if (throwable != null) {
+                                               return new 
RegistrationResponse.Decline(throwable.getMessage());
+                                       }
+
+                                       if 
(!JobMaster.this.leaderSessionID.equals(leaderId)) {
+                                               log.warn("Discard registration 
from TaskExecutor {} at ({}) because the expected " +
+                                                                               
"leader session ID {} did not equal the received leader session ID {}.",
+                                                               
taskManagerLocation.getResourceID(), taskManagerLocation.addressString(),
+                                                               
JobMaster.this.leaderSessionID, leaderId);
+                                               return new 
RegistrationResponse.Decline("Invalid leader session id");
+                                       }
+
+                                       
slotPool.registerResource(taskManagerId);
+                                       
registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, 
taskExecutorGateway));
+                                       return new 
JMTMRegistrationSuccess(taskManagerId, libraryCacheManager.getBlobServerPort());
+                               }
+                       }, getMainThreadExecutor());
+               }
        }
 
        
//----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a19cae3b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 0f155a4..4c85839 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import java.util.UUID;
 
@@ -195,15 +196,13 @@ public interface JobMasterGateway extends 
CheckpointCoordinatorGateway {
        /**
         * Register the task manager at the job manager.
         *
-        * @param taskManagerAddress address of the task manager
-        * @param taskManagerProcessId identifying the task manager
-        * @param leaderId identifying the job leader
-        * @param timeout for the rpc call
+        * @param taskManagerLocation location of the task manager
+        * @param leaderId            identifying the job leader
+        * @param timeout             for the rpc call
         * @return Future registration response indicating whether the 
registration was successful or not
         */
        Future<RegistrationResponse> registerTaskManager(
-               final String taskManagerAddress,
-               final ResourceID taskManagerProcessId,
-               final UUID leaderId,
-               @RpcTimeout final Time timeout);
+                       final TaskManagerLocation taskManagerLocation,
+                       final UUID leaderId,
+                       @RpcTimeout final Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a19cae3b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index e7f52e2..14d36ab 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
@@ -32,6 +31,7 @@ import 
org.apache.flink.runtime.registration.RegisteredRpcConnection;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.registration.RetryingRegistration;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,8 +53,8 @@ public class JobLeaderService {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(JobLeaderService.class);
 
-       /** Process id of the owning process */
-       private final ResourceID ownerProcessId;
+       /** Self's location, used for the job manager connection */
+       private final TaskManagerLocation ownLocation;
 
        /** The leader retrieval service and listener for each registered job */
        private final Map<JobID, Tuple2<LeaderRetrievalService, 
JobLeaderService.JobManagerLeaderListener>> jobLeaderServices;
@@ -62,9 +62,6 @@ public class JobLeaderService {
        /** Internal state of the service */
        private volatile JobLeaderService.State state;
 
-       /** Address of the owner of this service. This address is used for the 
job manager connection */
-       private String ownerAddress;
-
        /** Rpc service to use for establishing connections */
        private RpcService rpcService;
 
@@ -74,14 +71,13 @@ public class JobLeaderService {
        /** Job leader listener listening for job leader changes */
        private JobLeaderListener jobLeaderListener;
 
-       public JobLeaderService(ResourceID ownerProcessId) {
-               this.ownerProcessId = 
Preconditions.checkNotNull(ownerProcessId);
+       public JobLeaderService(TaskManagerLocation location) {
+               this.ownLocation = Preconditions.checkNotNull(location);
 
                jobLeaderServices = new HashMap<>(4);
 
                state = JobLeaderService.State.CREATED;
 
-               ownerAddress = null;
                rpcService = null;
                highAvailabilityServices = null;
                jobLeaderListener = null;
@@ -94,13 +90,11 @@ public class JobLeaderService {
        /**
         * Start the job leader service with the given services.
         *
-        * @param initialOwnerAddress to be used for establishing connections 
(source address)
         * @param initialRpcService to be used to create rpc connections
         * @param initialHighAvailabilityServices to create leader retrieval 
services for the different jobs
         * @param initialJobLeaderListener listening for job leader changes
         */
        public void start(
-               final String initialOwnerAddress,
                final RpcService initialRpcService,
                final HighAvailabilityServices initialHighAvailabilityServices,
                final JobLeaderListener initialJobLeaderListener) {
@@ -110,7 +104,6 @@ public class JobLeaderService {
                } else {
                        LOG.info("Start job leader service.");
 
-                       this.ownerAddress = 
Preconditions.checkNotNull(initialOwnerAddress);
                        this.rpcService = 
Preconditions.checkNotNull(initialRpcService);
                        this.highAvailabilityServices = 
Preconditions.checkNotNull(initialHighAvailabilityServices);
                        this.jobLeaderListener = 
Preconditions.checkNotNull(initialJobLeaderListener);
@@ -311,14 +304,13 @@ public class JobLeaderService {
                        @Override
                        protected RetryingRegistration<JobMasterGateway, 
JMTMRegistrationSuccess> generateRegistration() {
                                return new 
JobLeaderService.JobManagerRetryingRegistration(
-                                       LOG,
-                                       rpcService,
-                                       "JobManager",
-                                       JobMasterGateway.class,
-                                       getTargetAddress(),
-                                       getTargetLeaderId(),
-                                       ownerAddress,
-                                       ownerProcessId);
+                                               LOG,
+                                               rpcService,
+                                               "JobManager",
+                                               JobMasterGateway.class,
+                                               getTargetAddress(),
+                                               getTargetLeaderId(),
+                                               ownLocation);
                        }
 
                        @Override
@@ -349,10 +341,11 @@ public class JobLeaderService {
        /**
         * Retrying registration for the job manager <--> task manager 
connection.
         */
-       private static final class JobManagerRetryingRegistration extends 
RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess> {
+       private static final class JobManagerRetryingRegistration
+                       extends RetryingRegistration<JobMasterGateway, 
JMTMRegistrationSuccess>
+       {
 
-               private final String taskManagerAddress;
-               private final ResourceID taskManagerProcessId;
+               private final TaskManagerLocation taskManagerLocation;
 
                JobManagerRetryingRegistration(
                        Logger log,
@@ -361,22 +354,18 @@ public class JobLeaderService {
                        Class<JobMasterGateway> targetType,
                        String targetAddress,
                        UUID leaderId,
-                       String taskManagerAddress,
-                       ResourceID taskManagerProcessId) {
+                       TaskManagerLocation taskManagerLocation) {
 
                        super(log, rpcService, targetName, targetType, 
targetAddress, leaderId);
 
-                       this.taskManagerAddress = 
Preconditions.checkNotNull(taskManagerAddress);
-                       this.taskManagerProcessId = 
Preconditions.checkNotNull(taskManagerProcessId);
+                       this.taskManagerLocation = 
Preconditions.checkNotNull(taskManagerLocation);
                }
 
                @Override
-               protected Future<RegistrationResponse> 
invokeRegistration(JobMasterGateway gateway, UUID leaderId, long timeoutMillis) 
throws Exception {
-                       return gateway.registerTaskManager(
-                               taskManagerAddress,
-                               taskManagerProcessId,
-                               leaderId,
-                               Time.milliseconds(timeoutMillis));
+               protected Future<RegistrationResponse> invokeRegistration(
+                               JobMasterGateway gateway, UUID leaderId, long 
timeoutMillis) throws Exception
+               {
+                       return gateway.registerTaskManager(taskManagerLocation, 
leaderId, Time.milliseconds(timeoutMillis));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a19cae3b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 3e3a544..1201281 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -206,7 +206,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                taskSlotTable.start(new SlotActionsImpl());
 
                // start the job leader service
-               jobLeaderService.start(getAddress(), getRpcService(), 
haServices, new JobLeaderListenerImpl());
+               jobLeaderService.start(getRpcService(), haServices, new 
JobLeaderListenerImpl());
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/a19cae3b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 7575ba3..e8de1b1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -207,7 +207,7 @@ public class TaskManagerServices {
 
                final JobManagerTable jobManagerTable = new JobManagerTable();
 
-               final JobLeaderService jobLeaderService = new 
JobLeaderService(resourceID);
+               final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
                
                return new TaskManagerServices(
                        taskManagerLocation,

http://git-wip-us.apache.org/repos/asf/flink/blob/a19cae3b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 23c6833..2220f12 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -350,7 +350,7 @@ public class TaskExecutorTest extends TestLogger {
                final TimerService<AllocationID> timerService = 
mock(TimerService.class);
                final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Arrays.asList(mock(ResourceProfile.class)), timerService);
                final JobManagerTable jobManagerTable = new JobManagerTable();
-               final JobLeaderService jobLeaderService = new 
JobLeaderService(resourceId);
+               final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
                final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
 
                final TestingLeaderRetrievalService 
resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService();
@@ -379,8 +379,7 @@ public class TaskExecutorTest extends TestLogger {
                final JobMasterGateway jobMasterGateway = 
mock(JobMasterGateway.class);
 
                when(jobMasterGateway.registerTaskManager(
-                       any(String.class),
-                       eq(resourceId),
+                       eq(taskManagerLocation),
                        eq(jobManagerLeaderId),
                        any(Time.class)
                
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
JMTMRegistrationSuccess(jmResourceId, blobPort)));
@@ -451,7 +450,7 @@ public class TaskExecutorTest extends TestLogger {
                final TimerService<AllocationID> timerService = 
mock(TimerService.class);
                final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Arrays.asList(mock(ResourceProfile.class), 
mock(ResourceProfile.class)), timerService);
                final JobManagerTable jobManagerTable = new JobManagerTable();
-               final JobLeaderService jobLeaderService = new 
JobLeaderService(resourceId);
+               final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
                final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
 
                final String resourceManagerAddress = "rm";
@@ -484,8 +483,7 @@ public class TaskExecutorTest extends TestLogger {
                final JobMasterGateway jobMasterGateway = 
mock(JobMasterGateway.class);
 
                when(jobMasterGateway.registerTaskManager(
-                       any(String.class),
-                       eq(resourceId),
+                       eq(taskManagerLocation),
                        eq(jobManagerLeaderId),
                        any(Time.class)
                
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
JMTMRegistrationSuccess(jmResourceId, blobPort)));

Reply via email to