[FLINK-4535] [cluster management] resourceManager process the registration from 
TaskExecutor


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a31cf004
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a31cf004
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a31cf004

Branch: refs/heads/flip-6
Commit: a31cf00438e018d05c5890af142d00b4a8b51b78
Parents: b905fdb
Author: beyond1920 <beyond1...@126.com>
Authored: Thu Sep 1 11:14:00 2016 +0800
Committer: Maximilian Michels <m...@apache.org>
Committed: Wed Sep 21 21:57:04 2016 +0200

----------------------------------------------------------------------
 .../resourcemanager/RegistrationResponse.java   |  36 ---
 .../resourcemanager/ResourceManager.java        | 288 ++++++++-----------
 .../resourcemanager/ResourceManagerGateway.java |  45 +--
 .../TaskExecutorRegistration.java               |  51 ++++
 .../exceptions/LeaderSessionIDException.java    |   1 +
 .../resourcemanager/ResourceManagerTest.java    | 119 ++++----
 6 files changed, 241 insertions(+), 299 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a31cf004/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
deleted file mode 100644
index 796e634..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import java.io.Serializable;
-
-public class RegistrationResponse implements Serializable {
-       private static final long serialVersionUID = -2379003255993119993L;
-
-       private final boolean isSuccess;
-
-       public RegistrationResponse(boolean isSuccess) {
-               this.isSuccess = isSuccess;
-       }
-
-       public boolean isSuccess() {
-               return isSuccess;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a31cf004/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index aae4874..15692b6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -18,41 +18,29 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+import akka.dispatch.Futures;
+import akka.dispatch.Mapper;
+
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.registration.RegistrationResponse;
-
-import org.apache.flink.runtime.concurrent.Future;
-
-import org.apache.flink.runtime.util.LeaderConnectionInfo;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.FiniteDuration;
+import scala.concurrent.Future;
 
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -62,35 +50,25 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *
  * It offers the following methods as part of its rpc interface to interact 
with the him remotely:
  * <ul>
- *     <li>{@link #registerJobMaster(UUID, String, JobID)} registers a {@link 
JobMaster} at the resource manager</li>
+ *     <li>{@link #registerJobMaster(JobMasterRegistration)} registers a 
{@link JobMaster} at the resource manager</li>
  *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource 
manager</li>
  * </ul>
  */
-public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> 
implements LeaderContender {
-
-       private final Logger LOG = LoggerFactory.getLogger(getClass());
+public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
+       private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
 
-       private final Map<JobID, JobMasterGateway> jobMasterGateways;
-
-       private final Set<LeaderRetrievalListener> 
jobMasterLeaderRetrievalListeners;
+       /** ResourceID and TaskExecutorRegistration mapping relationship of 
registered taskExecutors */
+       private final Map<ResourceID, TaskExecutorRegistration>  
startedTaskExecutorGateways;
 
        private final HighAvailabilityServices highAvailabilityServices;
+       private LeaderElectionService leaderElectionService = null;
+       private UUID leaderSessionID = null;
 
-       private LeaderElectionService leaderElectionService;
-
-       private final SlotManager slotManager;
-
-       private UUID leaderSessionID;
-
-       public ResourceManager(
-                       RpcService rpcService,
-                       HighAvailabilityServices highAvailabilityServices,
-                       SlotManager slotManager) {
+       public ResourceManager(RpcService rpcService, HighAvailabilityServices 
highAvailabilityServices) {
                super(rpcService);
                this.highAvailabilityServices = 
checkNotNull(highAvailabilityServices);
-               this.jobMasterGateways = new HashMap<>();
-               this.slotManager = slotManager;
-               this.jobMasterLeaderRetrievalListeners = new HashSet<>();
+               this.jobMasterGateways = new HashMap<>(16);
+               this.startedTaskExecutorGateways = new HashMap<>(16);
        }
 
        @Override
@@ -99,7 +77,7 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> impleme
                try {
                        super.start();
                        leaderElectionService = 
highAvailabilityServices.getResourceManagerLeaderElectionService();
-                       leaderElectionService.start(this);
+                       leaderElectionService.start(new 
ResourceManagerLeaderContender());
                } catch (Throwable e) {
                        log.error("A fatal error happened when starting the 
ResourceManager", e);
                        throw new RuntimeException("A fatal error happened when 
starting the ResourceManager", e);
@@ -110,11 +88,8 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> impleme
        public void shutDown() {
                try {
                        leaderElectionService.stop();
-                       for(JobID jobID : jobMasterGateways.keySet()) {
-                               
highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop();
-                       }
                        super.shutDown();
-               } catch (Throwable e) {
+               } catch(Throwable e) {
                        log.error("A fatal error happened when shutdown the 
ResourceManager", e);
                        throw new RuntimeException("A fatal error happened when 
shutdown the ResourceManager", e);
                }
@@ -127,78 +102,34 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> impleme
         */
        @VisibleForTesting
        UUID getLeaderSessionID() {
-               return this.leaderSessionID;
+               return leaderSessionID;
        }
 
        /**
         * Register a {@link JobMaster} at the resource manager.
         *
-        * @param resourceManagerLeaderId The fencing token for the 
ResourceManager leader
-        * @param jobMasterAddress        The address of the JobMaster that 
registers
-        * @param jobID                   The Job ID of the JobMaster that 
registers
+        * @param jobMasterRegistration Job master registration information
         * @return Future registration response
         */
        @RpcMethod
-       public Future<RegistrationResponse> registerJobMaster(
-               final UUID resourceManagerLeaderId, final UUID 
jobMasterLeaderId,
-               final String jobMasterAddress, final JobID jobID) {
-
-               checkNotNull(resourceManagerLeaderId);
-               checkNotNull(jobMasterAddress);
-               checkNotNull(jobID);
+       public Future<RegistrationResponse> 
registerJobMaster(JobMasterRegistration jobMasterRegistration) {
+               Future<JobMasterGateway> jobMasterFuture = 
getRpcService().connect(jobMasterRegistration.getAddress(), 
JobMasterGateway.class);
 
-               // TODO mxm The leader retrieval needs to be split up in an 
async part which runs outside the main execution thread
-               // The state updates should be performed inside the main thread
-
-               final FlinkCompletableFuture<RegistrationResponse> future = new 
FlinkCompletableFuture<>();
-
-               if(!leaderSessionID.equals(resourceManagerLeaderId)) {
-                       log.warn("Discard registration from JobMaster {} at 
({}) because the expected leader session ID {}" +
-                                       " did not equal the received leader 
session ID  {}",
-                               jobID, jobMasterAddress, leaderSessionID, 
resourceManagerLeaderId);
-                       future.complete(new 
RegistrationResponse.Decline("Invalid leader session id"));
-                       return future;
-               }
-
-               final LeaderConnectionInfo jobMasterLeaderInfo;
-               try {
-                       jobMasterLeaderInfo = 
LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
-                               
highAvailabilityServices.getJobMasterLeaderRetriever(jobID), new 
FiniteDuration(5, TimeUnit.SECONDS));
-               } catch (Exception e) {
-                       LOG.warn("Failed to start JobMasterLeaderRetriever for 
JobID {}", jobID);
-                       future.complete(new 
RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever"));
-                       return future;
-               }
-
-               if 
(!jobMasterLeaderId.equals(jobMasterLeaderInfo.getLeaderSessionID())) {
-                       LOG.info("Declining registration request from 
non-leading JobManager {}", jobMasterAddress);
-                       future.complete(new 
RegistrationResponse.Decline("JobManager is not leading"));
-                       return future;
-               }
-
-               Future<JobMasterGateway> jobMasterGatewayFuture =
-                       getRpcService().connect(jobMasterAddress, 
JobMasterGateway.class);
-
-               return jobMasterGatewayFuture.thenApplyAsync(new 
ApplyFunction<JobMasterGateway, RegistrationResponse>() {
+               return jobMasterFuture.map(new Mapper<JobMasterGateway, 
RegistrationResponse>() {
                        @Override
-                       public RegistrationResponse apply(JobMasterGateway 
jobMasterGateway) {
-
-                               final JobMasterLeaderListener 
jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
-                               try {
-                                       LeaderRetrievalService 
jobMasterLeaderRetriever = 
highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
-                                       
jobMasterLeaderRetriever.start(jobMasterLeaderListener);
-                               } catch (Exception e) {
-                                       LOG.warn("Failed to start 
JobMasterLeaderRetriever for JobID {}", jobID);
-                                       return new 
RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
-                               }
-                               
jobMasterLeaderRetrievalListeners.add(jobMasterLeaderListener);
-                               final JobMasterGateway existingGateway = 
jobMasterGateways.put(jobID, jobMasterGateway);
-                               if (existingGateway != null) {
-                                       log.info("Replacing gateway for 
registered JobID {}.", jobID);
+                       public RegistrationResponse apply(final 
JobMasterGateway jobMasterGateway) {
+                               InstanceID instanceID;
+
+                               if 
(jobMasterGateways.containsKey(jobMasterGateway)) {
+                                       instanceID = 
jobMasterGateways.get(jobMasterGateway);
+                               } else {
+                                       instanceID = new InstanceID();
+                                       jobMasterGateways.put(jobMasterGateway, 
instanceID);
                                }
-                               return new JobMasterRegistrationSuccess(5000);
+
+                               return new 
TaskExecutorRegistrationSuccess(instanceID, 5000);
                        }
-               }, getMainThreadExecutor());
+               }, getMainThreadExecutionContext());
        }
 
        /**
@@ -208,104 +139,111 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> impleme
         * @return Slot assignment
         */
        @RpcMethod
-       public SlotRequestReply requestSlot(SlotRequest slotRequest) {
-               final JobID jobId = slotRequest.getJobId();
-               final JobMasterGateway jobMasterGateway = 
jobMasterGateways.get(jobId);
-
-               if (jobMasterGateway != null) {
-                       return slotManager.requestSlot(slotRequest);
-               } else {
-                       LOG.info("Ignoring slot request for unknown JobMaster 
with JobID {}", jobId);
-                       return new 
SlotRequestRejected(slotRequest.getAllocationId());
-               }
+       public SlotAssignment requestSlot(SlotRequest slotRequest) {
+               System.out.println("SlotRequest: " + slotRequest);
+               return new SlotAssignment();
        }
 
 
        /**
-        * @param resourceManagerLeaderId The fencing token for the 
ResourceManager leader
-        * @param taskExecutorAddress     The address of the TaskExecutor that 
registers
-        * @param resourceID              The resource ID of the TaskExecutor 
that registers
+        * Register a {@link 
org.apache.flink.runtime.taskexecutor.TaskExecutor} at the resource manager
+        *
+        * @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader
+        * @param taskExecutorAddress      The address of the TaskExecutor that 
registers
+        * @param resourceID               The resource ID of the TaskExecutor 
that registers
+        *
         * @return The response by the ResourceManager.
         */
        @RpcMethod
-       public RegistrationResponse registerTaskExecutor(
-               UUID resourceManagerLeaderId,
-               String taskExecutorAddress,
-               ResourceID resourceID) {
+       public Future<RegistrationResponse> registerTaskExecutor(
+               final UUID resourceManagerLeaderId,
+               final String taskExecutorAddress,
+               final ResourceID resourceID) {
 
-               return new TaskExecutorRegistrationSuccess(new InstanceID(), 
5000);
-       }
+               if(!leaderSessionID.equals(resourceManagerLeaderId)) {
+                       log.warn("Discard registration from TaskExecutor {} at 
({}) because the expected leader session ID {} did not equal the received 
leader session ID  {}",
+                               resourceID, taskExecutorAddress, 
leaderSessionID, resourceManagerLeaderId);
+                       return Futures.failed(new 
LeaderSessionIDException(leaderSessionID, resourceManagerLeaderId));
+               }
 
+               Future<TaskExecutorGateway> taskExecutorGatewayFuture = 
getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
 
-       // 
------------------------------------------------------------------------
-       //  Leader Contender
-       // 
------------------------------------------------------------------------
+               return taskExecutorGatewayFuture.map(new 
Mapper<TaskExecutorGateway, RegistrationResponse>() {
 
-       /**
-        * Callback method when current resourceManager is granted leadership
-        *
-        * @param leaderSessionID unique leadershipID
-        */
-       @Override
-       public void grantLeadership(final UUID leaderSessionID) {
-               runAsync(new Runnable() {
                        @Override
-                       public void run() {
-                               log.info("ResourceManager {} was granted 
leadership with leader session ID {}", getAddress(), leaderSessionID);
-                               // confirming the leader session ID might be 
blocking,
-                               
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
-                               // notify SlotManager
-                               slotManager.setLeaderUUID(leaderSessionID);
-                               ResourceManager.this.leaderSessionID = 
leaderSessionID;
-                       }
-               });
-       }
+                       public RegistrationResponse apply(final 
TaskExecutorGateway taskExecutorGateway) {
+                               InstanceID instanceID = null;
+                               TaskExecutorRegistration 
taskExecutorRegistration = startedTaskExecutorGateways.get(resourceID);
+                               if(taskExecutorRegistration != null) {
+                                       log.warn("Receive a duplicate 
registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress);
+                                       instanceID = 
taskExecutorRegistration.getInstanceID();
+                               } else {
+                                       instanceID = new InstanceID();
+                                       
startedTaskExecutorGateways.put(resourceID, new 
TaskExecutorRegistration(taskExecutorGateway, instanceID));
+                               }
 
-       /**
-        * Callback method when current resourceManager lose leadership.
-        */
-       @Override
-       public void revokeLeadership() {
-               runAsync(new Runnable() {
-                       @Override
-                       public void run() {
-                               log.info("ResourceManager {} was revoked 
leadership.", getAddress());
-                               jobMasterGateways.clear();
-                               slotManager.clearState();
-                               leaderSessionID = null;
+                               return new 
TaskExecutorRegistrationSuccess(instanceID, 5000);
                        }
-               });
+               }, getMainThreadExecutionContext());
        }
 
-       /**
-        * Handles error occurring in the leader election service
-        *
-        * @param exception Exception being thrown in the leader election 
service
-        */
-       @Override
-       public void handleError(final Exception exception) {
-               log.error("ResourceManager received an error from the 
LeaderElectionService.", exception);
-               // terminate ResourceManager in case of an error
-               shutDown();
-       }
 
-       private static class JobMasterLeaderListener implements 
LeaderRetrievalListener {
+       private class ResourceManagerLeaderContender implements LeaderContender 
{
 
-               private final JobID jobID;
-               private UUID leaderID;
+               /**
+                * Callback method when current resourceManager is granted 
leadership
+                *
+                * @param leaderSessionID unique leadershipID
+                */
+               @Override
+               public void grantLeadership(final UUID leaderSessionID) {
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       log.info("ResourceManager {} was 
granted leadership with leader session ID {}", getAddress(), leaderSessionID);
+                                       ResourceManager.this.leaderSessionID = 
leaderSessionID;
+                                       // confirming the leader session ID 
might be blocking,
+                                       
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
+                               }
+                       });
+               }
 
-               private JobMasterLeaderListener(JobID jobID) {
-                       this.jobID = jobID;
+               /**
+                * Callback method when current resourceManager lose leadership.
+                */
+               @Override
+               public void revokeLeadership() {
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       log.info("ResourceManager {} was 
revoked leadership.", getAddress());
+                                       jobMasterGateways.clear();
+                                       startedTaskExecutorGateways.clear();
+                                       leaderSessionID = null;
+                               }
+                       });
                }
 
                @Override
-               public void notifyLeaderAddress(final String leaderAddress, 
final UUID leaderSessionID) {
-                       this.leaderID = leaderSessionID;
+               public String getAddress() {
+                       return ResourceManager.this.getAddress();
                }
 
+               /**
+                * Handles error occurring in the leader election service
+                *
+                * @param exception Exception being thrown in the leader 
election service
+                */
                @Override
                public void handleError(final Exception exception) {
-                       // TODO
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       log.error("ResourceManager received an 
error from the LeaderElectionService.", exception);
+                                       // terminate ResourceManager in case of 
an error
+                                       shutDown();
+                               }
+                       });
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a31cf004/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 1ee11a1..30a096f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.jobmaster.JobMaster;
+
 import org.apache.flink.runtime.registration.RegistrationResponse;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.util.UUID;
 
@@ -36,18 +37,21 @@ public interface ResourceManagerGateway extends RpcGateway {
        /**
         * Register a {@link JobMaster} at the resource manager.
         *
-        * @param resourceManagerLeaderId The fencing token for the 
ResourceManager leader
-        * @param jobMasterAddress        The address of the JobMaster that 
registers
-        * @param jobID                   The Job ID of the JobMaster that 
registers
-        * @param timeout                 Timeout for the future to complete
+        * @param jobMasterRegistration Job master registration information
+        * @param timeout Timeout for the future to complete
         * @return Future registration response
         */
        Future<RegistrationResponse> registerJobMaster(
-               UUID resourceManagerLeaderId,
-               String jobMasterAddress,
-               JobID jobID,
-                               @RpcTimeout Time timeout);
+               JobMasterRegistration jobMasterRegistration,
+               @RpcTimeout FiniteDuration timeout);
 
+       /**
+        * Register a {@link JobMaster} at the resource manager.
+        *
+        * @param jobMasterRegistration Job master registration information
+        * @return Future registration response
+        */
+       Future<RegistrationResponse> registerJobMaster(JobMasterRegistration 
jobMasterRegistration);
 
        /**
         * Requests a slot from the resource manager.
@@ -55,18 +59,21 @@ public interface ResourceManagerGateway extends RpcGateway {
         * @param slotRequest Slot request
         * @return Future slot assignment
         */
-       Future<SlotRequestReply> requestSlot(SlotRequest slotRequest);
+       Future<SlotAssignment> requestSlot(SlotRequest slotRequest);
 
        /**
-        * @param resourceManagerLeaderId The fencing token for the 
ResourceManager leader
-        * @param taskExecutorAddress     The address of the TaskExecutor that 
registers
-        * @param resourceID              The resource ID of the TaskExecutor 
that registers
-        * @param timeout                 The timeout for the response.
+        * Register a {@link 
org.apache.flink.runtime.taskexecutor.TaskExecutor} at the resource manager.
+        *
+        * @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader
+        * @param taskExecutorAddress      The address of the TaskExecutor that 
registers
+        * @param resourceID               The resource ID of the TaskExecutor 
that registers
+        * @param timeout                  The timeout for the response.
+        *
         * @return The future to the response by the ResourceManager.
         */
        Future<RegistrationResponse> registerTaskExecutor(
-                       UUID resourceManagerLeaderId,
-                       String taskExecutorAddress,
-                       ResourceID resourceID,
-                       @RpcTimeout Time timeout);
+               UUID resourceManagerLeaderId,
+               String taskExecutorAddress,
+               ResourceID resourceID,
+               @RpcTimeout FiniteDuration timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a31cf004/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
new file mode 100644
index 0000000..bd78a47
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+
+import java.io.Serializable;
+
+/**
+ * This class is responsible for group the TaskExecutorGateway and the 
InstanceID of a registered task executor.
+ */
+public class TaskExecutorRegistration implements Serializable {
+
+       private static final long serialVersionUID = -2062957799469434614L;
+
+       private TaskExecutorGateway taskExecutorGateway;
+
+       private InstanceID instanceID;
+
+       public TaskExecutorRegistration(TaskExecutorGateway taskExecutorGateway,
+               InstanceID instanceID) {
+               this.taskExecutorGateway = taskExecutorGateway;
+               this.instanceID = instanceID;
+       }
+
+       public InstanceID getInstanceID() {
+               return instanceID;
+       }
+
+       public TaskExecutorGateway getTaskExecutorGateway() {
+               return taskExecutorGateway;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a31cf004/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java
index cd14a0d..d3ba9a9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rpc.exceptions;
 
 import java.util.UUID;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/a31cf004/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index 4d04001..b75d9b8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -18,15 +18,14 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
-import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -37,8 +36,9 @@ import scala.concurrent.duration.FiniteDuration;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
 
 public class ResourceManagerTest {
 
@@ -55,105 +55,86 @@ public class ResourceManagerTest {
        }
 
        /**
-        * Test receive normal registration from job master and receive 
duplicate registration from job master
+        * Test receive normal registration from task executor and receive 
duplicate registration from task executor
         *
         * @throws Exception
         */
        @Test
-       public void testRegisterJobMaster() throws Exception {
-               String jobMasterAddress = "/jobMasterAddress1";
-               JobID jobID = mockJobMaster(jobMasterAddress);
-               TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
-               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService();
-               final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService);
-               final UUID leaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+       public void testRegisterTaskExecutor() throws Exception {
+               String taskExecutorAddress = "/taskExecutor1";
+               ResourceID taskExecutorResourceID = 
mockTaskExecutor(taskExecutorAddress);
+               TestingLeaderElectionService leaderElectionService = new 
TestingLeaderElectionService();
+               final ResourceManager resourceManager = 
createAndStartResourceManager(leaderElectionService);
+               final UUID leaderSessionId = 
grantResourceManagerLeadership(leaderElectionService);
 
                // test response successful
-               Future<RegistrationResponse> successfulFuture = 
resourceManager.registerJobMaster(leaderSessionId, jobMasterAddress, jobID);
+               Future<RegistrationResponse> successfulFuture = 
resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, 
taskExecutorResourceID);
                RegistrationResponse response = Await.result(successfulFuture, 
new FiniteDuration(0, TimeUnit.SECONDS));
-               assertTrue(response instanceof JobMasterRegistrationSuccess);
+               assertTrue(response instanceof TaskExecutorRegistrationSuccess);
+
+               // test response successful with previous instanceID when 
receive duplicate registration from taskExecutor
+               Future<RegistrationResponse> duplicateFuture = 
resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, 
taskExecutorResourceID);
+               RegistrationResponse duplicateResponse = 
Await.result(duplicateFuture, new FiniteDuration(0, TimeUnit.SECONDS));
+               assertTrue(duplicateResponse instanceof 
TaskExecutorRegistrationSuccess);
+               assertEquals(((TaskExecutorRegistrationSuccess) 
response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) 
duplicateResponse).getRegistrationId());
        }
 
        /**
-        * Test receive registration with unmatched leadershipId from job master
+        * Test receive registration with unmatched leadershipId from task 
executor
         *
         * @throws Exception
         */
        @Test(expected = LeaderSessionIDException.class)
-       public void testRegisterJobMasterWithUnmatchedLeaderSessionId() throws 
Exception {
-               String jobMasterAddress = "/jobMasterAddress1";
-               JobID jobID = mockJobMaster(jobMasterAddress);
-               TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
-               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService();
-               final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService);
-               final UUID leaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
-
-               // test throw exception when receive a registration from job 
master which takes unmatched leaderSessionId
+       public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() 
throws Exception {
+               String taskExecutorAddress = "/taskExecutor1";
+               ResourceID taskExecutorResourceID = 
mockTaskExecutor(taskExecutorAddress);
+               TestingLeaderElectionService leaderElectionService = new 
TestingLeaderElectionService();
+               final ResourceManager resourceManager = 
createAndStartResourceManager(leaderElectionService);
+               final UUID leaderSessionId = 
grantResourceManagerLeadership(leaderElectionService);
+
+               // test throw exception when receive a registration from 
taskExecutor which takes unmatched leaderSessionId
                UUID differentLeaderSessionID = UUID.randomUUID();
-               Future<RegistrationResponse> unMatchedLeaderFuture = 
resourceManager.registerJobMaster(differentLeaderSessionID, jobMasterAddress, 
jobID);
+               Future<RegistrationResponse> unMatchedLeaderFuture = 
resourceManager.registerTaskExecutor(differentLeaderSessionID, 
taskExecutorAddress, taskExecutorResourceID);
                Await.result(unMatchedLeaderFuture, new FiniteDuration(200, 
TimeUnit.MILLISECONDS));
        }
 
        /**
-        * Test receive registration with invalid address from job master
+        * Test receive registration with invalid address from task executor
         *
         * @throws Exception
         */
        @Test(expected = Exception.class)
-       public void testRegisterJobMasterFromInvalidAddress() throws Exception {
-               String jobMasterAddress = "/jobMasterAddress1";
-               JobID jobID = mockJobMaster(jobMasterAddress);
-               TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
-               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService();
-               final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService);
-               final UUID leaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
-
-               // test throw exception when receive a registration from job 
master which takes invalid address
-               String invalidAddress = "/jobMasterAddress2";
-               Future<RegistrationResponse> invalidAddressFuture = 
resourceManager.registerJobMaster(leaderSessionId, invalidAddress, jobID);
+       public void testRegisterTaskExecutorFromInvalidAddress() throws 
Exception {
+               String taskExecutorAddress = "/taskExecutor1";
+               ResourceID taskExecutorResourceID = 
mockTaskExecutor(taskExecutorAddress);
+               TestingLeaderElectionService leaderElectionService = new 
TestingLeaderElectionService();
+               final ResourceManager resourceManager = 
createAndStartResourceManager(leaderElectionService);
+               final UUID leaderSessionId = 
grantResourceManagerLeadership(leaderElectionService);
+
+               // test throw exception when receive a registration from 
taskExecutor which takes invalid address
+               String invalidAddress = "/taskExecutor2";
+               Future<RegistrationResponse> invalidAddressFuture = 
resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, 
taskExecutorResourceID);
                Await.result(invalidAddressFuture, new FiniteDuration(200, 
TimeUnit.MILLISECONDS));
        }
 
-       /**
-        * Check and verify return RegistrationResponse.Decline when failed to 
start a job master Leader retrieval listener
-        *
-        * @throws Exception
-        */
-       @Test
-       public void testRegisterJobMasterWithFailureLeaderListener() throws 
Exception {
-               String jobMasterAddress = "/jobMasterAddress1";
-               JobID jobID = mockJobMaster(jobMasterAddress);
-               TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
-               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService();
-               final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService);
-               final UUID leaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
-
-               JobID unknownJobIDToHAServices = new JobID();
-               // verify return RegistrationResponse.Decline when failed to 
start a job master Leader retrieval listener
-               Future<RegistrationResponse> declineFuture = 
resourceManager.registerJobMaster(leaderSessionId, jobMasterAddress, 
unknownJobIDToHAServices);
-               RegistrationResponse response = Await.result(declineFuture, new 
FiniteDuration(0, TimeUnit.SECONDS));
-               assertTrue(response instanceof RegistrationResponse.Decline);
-       }
-
-       private JobID mockJobMaster(String jobMasterAddress) {
-               JobID jobID = new JobID();
-               JobMasterGateway jobMasterGateway = 
mock(JobMasterGateway.class);
-               rpcService.registerGateway(jobMasterAddress, jobMasterGateway);
-               return jobID;
+       private ResourceID mockTaskExecutor(String taskExecutorAddress) {
+               TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+               ResourceID taskExecutorResourceID = ResourceID.generate();
+               rpcService.registerGateway(taskExecutorAddress, 
taskExecutorGateway);
+               return taskExecutorResourceID;
        }
 
-       private ResourceManager 
createAndStartResourceManager(TestingLeaderElectionService 
resourceManagerLeaderElectionService, JobID jobID, 
TestingLeaderRetrievalService jobMasterLeaderRetrievalService) {
+       private ResourceManager 
createAndStartResourceManager(TestingLeaderElectionService 
leaderElectionService) {
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
-               
highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
-               highAvailabilityServices.setJobMasterLeaderRetriever(jobID, 
jobMasterLeaderRetrievalService);
+               
highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
                ResourceManager resourceManager = new 
ResourceManager(rpcService, highAvailabilityServices);
                resourceManager.start();
                return resourceManager;
        }
 
-       private UUID 
grantResourceManagerLeadership(TestingLeaderElectionService 
resourceManagerLeaderElectionService) {
+       private UUID 
grantResourceManagerLeadership(TestingLeaderElectionService 
leaderElectionService) {
                UUID leaderSessionId = UUID.randomUUID();
-               resourceManagerLeaderElectionService.isLeader(leaderSessionId);
+               leaderElectionService.isLeader(leaderSessionId);
                return leaderSessionId;
        }
 

Reply via email to