[FLINK-4535] rebase and refine

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4076bd74
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4076bd74
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4076bd74

Branch: refs/heads/flip-6
Commit: 4076bd748f325a1b9c5342b1a214ccf4d15660d1
Parents: a31cf00
Author: Maximilian Michels <m...@apache.org>
Authored: Wed Sep 21 20:20:25 2016 +0200
Committer: Maximilian Michels <m...@apache.org>
Committed: Wed Sep 21 21:58:33 2016 +0200

----------------------------------------------------------------------
 .../resourcemanager/JobMasterRegistration.java  |  64 ----
 .../resourcemanager/ResourceManager.java        | 322 ++++++++++++-------
 .../resourcemanager/ResourceManagerGateway.java |  36 +--
 .../TaskExecutorRegistration.java               |   2 +-
 .../slotmanager/SlotManager.java                |   1 -
 .../ResourceManagerJobMasterTest.java           | 174 ++++++++++
 .../ResourceManagerTaskExecutorTest.java        | 135 ++++++++
 .../resourcemanager/ResourceManagerTest.java    | 141 --------
 .../slotmanager/SlotProtocolTest.java           |  43 ++-
 9 files changed, 574 insertions(+), 344 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4076bd74/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
deleted file mode 100644
index 981441f..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-
-import java.util.UUID;
-
-/**
- * This class is responsible for group the JobMasterGateway and the 
LeaderSessionID of a registered job master
- */
-public class JobMasterRegistration implements LeaderRetrievalListener {
-
-       private final JobMasterGateway gateway;
-       private final JobID jobID;
-       private final UUID leaderSessionID;
-       private LeaderRetrievalListener retriever;
-
-       public JobMasterRegistration(JobMasterGateway gateway, JobID jobID, 
UUID leaderSessionID) {
-               this.gateway = gateway;
-               this.jobID = jobID;
-               this.leaderSessionID = leaderSessionID;
-       }
-
-       public JobMasterGateway getGateway() {
-               return gateway;
-       }
-
-       public UUID getLeaderSessionID() {
-               return leaderSessionID;
-       }
-
-       public JobID getJobID() {
-               return jobID;
-       }
-
-       @Override
-       public void notifyLeaderAddress(String leaderAddress, UUID 
leaderSessionID) {
-               
-       }
-
-       @Override
-       public void handleError(Exception exception) {
-
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4076bd74/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 15692b6..88b8a11 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -18,29 +18,41 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
-import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
-
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
-import org.apache.flink.runtime.registration.RegistrationResponse;
-import scala.concurrent.Future;
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -50,25 +62,38 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *
  * It offers the following methods as part of its rpc interface to interact 
with the him remotely:
  * <ul>
- *     <li>{@link #registerJobMaster(JobMasterRegistration)} registers a 
{@link JobMaster} at the resource manager</li>
+ *     <li>{@link #registerJobMaster(UUID, UUID, String, JobID)} registers a 
{@link JobMaster} at the resource manager</li>
  *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource 
manager</li>
  * </ul>
  */
-public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
-       private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
+public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> 
implements LeaderContender {
+
+       private final Logger LOG = LoggerFactory.getLogger(getClass());
 
-       /** ResourceID and TaskExecutorRegistration mapping relationship of 
registered taskExecutors */
-       private final Map<ResourceID, TaskExecutorRegistration>  
startedTaskExecutorGateways;
+       private final Map<JobID, JobMasterGateway> jobMasterGateways;
+
+       private final Set<LeaderRetrievalListener> 
jobMasterLeaderRetrievalListeners;
+
+       private final Map<ResourceID, TaskExecutorRegistration> 
taskExecutorGateways;
 
        private final HighAvailabilityServices highAvailabilityServices;
-       private LeaderElectionService leaderElectionService = null;
-       private UUID leaderSessionID = null;
 
-       public ResourceManager(RpcService rpcService, HighAvailabilityServices 
highAvailabilityServices) {
+       private LeaderElectionService leaderElectionService;
+
+       private final SlotManager slotManager;
+
+       private UUID leaderSessionID;
+
+       public ResourceManager(
+                       RpcService rpcService,
+                       HighAvailabilityServices highAvailabilityServices,
+                       SlotManager slotManager) {
                super(rpcService);
                this.highAvailabilityServices = 
checkNotNull(highAvailabilityServices);
-               this.jobMasterGateways = new HashMap<>(16);
-               this.startedTaskExecutorGateways = new HashMap<>(16);
+               this.jobMasterGateways = new HashMap<>();
+               this.slotManager = slotManager;
+               this.jobMasterLeaderRetrievalListeners = new HashSet<>();
+               this.taskExecutorGateways = new HashMap<>();
        }
 
        @Override
@@ -77,7 +102,7 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> {
                try {
                        super.start();
                        leaderElectionService = 
highAvailabilityServices.getResourceManagerLeaderElectionService();
-                       leaderElectionService.start(new 
ResourceManagerLeaderContender());
+                       leaderElectionService.start(this);
                } catch (Throwable e) {
                        log.error("A fatal error happened when starting the 
ResourceManager", e);
                        throw new RuntimeException("A fatal error happened when 
starting the ResourceManager", e);
@@ -88,8 +113,11 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> {
        public void shutDown() {
                try {
                        leaderElectionService.stop();
+                       for(JobID jobID : jobMasterGateways.keySet()) {
+                               
highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop();
+                       }
                        super.shutDown();
-               } catch(Throwable e) {
+               } catch (Throwable e) {
                        log.error("A fatal error happened when shutdown the 
ResourceManager", e);
                        throw new RuntimeException("A fatal error happened when 
shutdown the ResourceManager", e);
                }
@@ -102,48 +130,79 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> {
         */
        @VisibleForTesting
        UUID getLeaderSessionID() {
-               return leaderSessionID;
+               return this.leaderSessionID;
        }
 
        /**
         * Register a {@link JobMaster} at the resource manager.
         *
-        * @param jobMasterRegistration Job master registration information
+        * @param resourceManagerLeaderId The fencing token for the 
ResourceManager leader
+        * @param jobMasterAddress        The address of the JobMaster that 
registers
+        * @param jobID                   The Job ID of the JobMaster that 
registers
         * @return Future registration response
         */
        @RpcMethod
-       public Future<RegistrationResponse> 
registerJobMaster(JobMasterRegistration jobMasterRegistration) {
-               Future<JobMasterGateway> jobMasterFuture = 
getRpcService().connect(jobMasterRegistration.getAddress(), 
JobMasterGateway.class);
+       public Future<RegistrationResponse> registerJobMaster(
+               final UUID resourceManagerLeaderId, final UUID 
jobMasterLeaderId,
+               final String jobMasterAddress, final JobID jobID) {
 
-               return jobMasterFuture.map(new Mapper<JobMasterGateway, 
RegistrationResponse>() {
-                       @Override
-                       public RegistrationResponse apply(final 
JobMasterGateway jobMasterGateway) {
-                               InstanceID instanceID;
+               checkNotNull(jobMasterAddress);
+               checkNotNull(jobID);
 
-                               if 
(jobMasterGateways.containsKey(jobMasterGateway)) {
-                                       instanceID = 
jobMasterGateways.get(jobMasterGateway);
-                               } else {
-                                       instanceID = new InstanceID();
-                                       jobMasterGateways.put(jobMasterGateway, 
instanceID);
-                               }
+               return getRpcService()
+                       .execute(new Callable<JobMasterGateway>() {
+                               @Override
+                               public JobMasterGateway call() throws Exception 
{
 
-                               return new 
TaskExecutorRegistrationSuccess(instanceID, 5000);
-                       }
-               }, getMainThreadExecutionContext());
-       }
+                                       if 
(!leaderSessionID.equals(resourceManagerLeaderId)) {
+                                               log.warn("Discard registration 
from JobMaster {} at ({}) because the expected leader session ID {}" +
+                                                               " did not equal 
the received leader session ID  {}",
+                                                       jobID, 
jobMasterAddress, leaderSessionID, resourceManagerLeaderId);
+                                               throw new Exception("Invalid 
leader session id");
+                                       }
 
-       /**
-        * Requests a slot from the resource manager.
-        *
-        * @param slotRequest Slot request
-        * @return Slot assignment
-        */
-       @RpcMethod
-       public SlotAssignment requestSlot(SlotRequest slotRequest) {
-               System.out.println("SlotRequest: " + slotRequest);
-               return new SlotAssignment();
-       }
+                                       final LeaderConnectionInfo 
jobMasterLeaderInfo;
+                                       try {
+                                               jobMasterLeaderInfo = 
LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
+                                                       
highAvailabilityServices.getJobMasterLeaderRetriever(jobID), new 
FiniteDuration(5, TimeUnit.SECONDS));
+                                       } catch (Exception e) {
+                                               LOG.warn("Failed to start 
JobMasterLeaderRetriever for JobID {}", jobID);
+                                               throw new Exception("Failed to 
retrieve JobMasterLeaderRetriever");
+                                       }
+
+                                       if 
(!jobMasterLeaderId.equals(jobMasterLeaderInfo.getLeaderSessionID())) {
+                                               LOG.info("Declining 
registration request from non-leading JobManager {}", jobMasterAddress);
+                                               throw new Exception("JobManager 
is not leading");
+                                       }
 
+                                       return 
getRpcService().connect(jobMasterAddress, JobMasterGateway.class).get(5, 
TimeUnit.SECONDS);
+                               }
+                       })
+                       .handleAsync(new BiFunction<JobMasterGateway, 
Throwable, RegistrationResponse>() {
+                               @Override
+                               public RegistrationResponse 
apply(JobMasterGateway jobMasterGateway, Throwable throwable) {
+                                       
+                                       if (throwable != null) {
+                                               return new 
RegistrationResponse.Decline(throwable.getMessage());
+                                       } else {
+                                               JobMasterLeaderListener 
jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
+                                               try {
+                                                       LeaderRetrievalService 
jobMasterLeaderRetriever = 
highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
+                                                       
jobMasterLeaderRetriever.start(jobMasterLeaderListener);
+                                               } catch (Exception e) {
+                                                       LOG.warn("Failed to 
start JobMasterLeaderRetriever for JobID {}", jobID);
+                                                       return new 
RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
+                                               }
+                                               
jobMasterLeaderRetrievalListeners.add(jobMasterLeaderListener);
+                                               final JobMasterGateway 
existingGateway = jobMasterGateways.put(jobID, jobMasterGateway);
+                                               if (existingGateway != null) {
+                                                       log.info("Replacing 
gateway for registered JobID {}.", jobID);
+                                               }
+                                               return new 
JobMasterRegistrationSuccess(5000);
+                                       }
+                               }
+                       }, getMainThreadExecutor());
+       }
 
        /**
         * Register a {@link 
org.apache.flink.runtime.taskexecutor.TaskExecutor} at the resource manager
@@ -160,90 +219,129 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> {
                final String taskExecutorAddress,
                final ResourceID resourceID) {
 
-               if(!leaderSessionID.equals(resourceManagerLeaderId)) {
-                       log.warn("Discard registration from TaskExecutor {} at 
({}) because the expected leader session ID {} did not equal the received 
leader session ID  {}",
-                               resourceID, taskExecutorAddress, 
leaderSessionID, resourceManagerLeaderId);
-                       return Futures.failed(new 
LeaderSessionIDException(leaderSessionID, resourceManagerLeaderId));
+               return getRpcService().execute(new 
Callable<TaskExecutorGateway>() {
+                       @Override
+                       public TaskExecutorGateway call() throws Exception {
+                               if 
(!leaderSessionID.equals(resourceManagerLeaderId)) {
+                                       log.warn("Discard registration from 
TaskExecutor {} at ({}) because the expected leader session ID {} did " +
+                                                       "not equal the received 
leader session ID  {}",
+                                               resourceID, 
taskExecutorAddress, leaderSessionID, resourceManagerLeaderId);
+                                       throw new Exception("Invalid leader 
session id");
+                               }
+
+                               return 
getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class).get(5, 
TimeUnit.SECONDS);
+                       }
+               }).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, 
RegistrationResponse>() {
+                       @Override
+                       public RegistrationResponse apply(TaskExecutorGateway 
taskExecutorGateway, Throwable throwable) {
+                               if (throwable != null) {
+                                       return new 
RegistrationResponse.Decline(throwable.getMessage());
+                               } else {
+                                       InstanceID id = new InstanceID();
+                                       TaskExecutorRegistration 
oldTaskExecutor =
+                                               
taskExecutorGateways.put(resourceID, new 
TaskExecutorRegistration(taskExecutorGateway, id));
+                                       if (oldTaskExecutor != null) {
+                                               log.warn("Receive a duplicate 
registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress);
+                                       }
+                                       return new 
TaskExecutorRegistrationSuccess(id, 5000);
+                               }
+                       }
+               }, getMainThreadExecutor());
+       }
+
+       /**
+        * Requests a slot from the resource manager.
+        *
+        * @param slotRequest Slot request
+        * @return Slot assignment
+        */
+       @RpcMethod
+       public SlotRequestReply requestSlot(SlotRequest slotRequest) {
+               final JobID jobId = slotRequest.getJobId();
+               final JobMasterGateway jobMasterGateway = 
jobMasterGateways.get(jobId);
+
+               if (jobMasterGateway != null) {
+                       return slotManager.requestSlot(slotRequest);
+               } else {
+                       LOG.info("Ignoring slot request for unknown JobMaster 
with JobID {}", jobId);
+                       return new 
SlotRequestRejected(slotRequest.getAllocationId());
                }
+       }
 
-               Future<TaskExecutorGateway> taskExecutorGatewayFuture = 
getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
 
-               return taskExecutorGatewayFuture.map(new 
Mapper<TaskExecutorGateway, RegistrationResponse>() {
 
+
+       // 
------------------------------------------------------------------------
+       //  Leader Contender
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Callback method when current resourceManager is granted leadership
+        *
+        * @param leaderSessionID unique leadershipID
+        */
+       @Override
+       public void grantLeadership(final UUID leaderSessionID) {
+               runAsync(new Runnable() {
                        @Override
-                       public RegistrationResponse apply(final 
TaskExecutorGateway taskExecutorGateway) {
-                               InstanceID instanceID = null;
-                               TaskExecutorRegistration 
taskExecutorRegistration = startedTaskExecutorGateways.get(resourceID);
-                               if(taskExecutorRegistration != null) {
-                                       log.warn("Receive a duplicate 
registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress);
-                                       instanceID = 
taskExecutorRegistration.getInstanceID();
-                               } else {
-                                       instanceID = new InstanceID();
-                                       
startedTaskExecutorGateways.put(resourceID, new 
TaskExecutorRegistration(taskExecutorGateway, instanceID));
-                               }
+                       public void run() {
+                               log.info("ResourceManager {} was granted 
leadership with leader session ID {}", getAddress(), leaderSessionID);
+                               // confirming the leader session ID might be 
blocking,
+                               
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
+                               // notify SlotManager
+                               slotManager.setLeaderUUID(leaderSessionID);
+                               ResourceManager.this.leaderSessionID = 
leaderSessionID;
+                       }
+               });
+       }
 
-                               return new 
TaskExecutorRegistrationSuccess(instanceID, 5000);
+       /**
+        * Callback method when current resourceManager lose leadership.
+        */
+       @Override
+       public void revokeLeadership() {
+               runAsync(new Runnable() {
+                       @Override
+                       public void run() {
+                               log.info("ResourceManager {} was revoked 
leadership.", getAddress());
+                               jobMasterGateways.clear();
+                               taskExecutorGateways.clear();
+                               slotManager.clearState();
+                               leaderSessionID = null;
                        }
-               }, getMainThreadExecutionContext());
+               });
        }
 
+       /**
+        * Handles error occurring in the leader election service
+        *
+        * @param exception Exception being thrown in the leader election 
service
+        */
+       @Override
+       public void handleError(final Exception exception) {
+               log.error("ResourceManager received an error from the 
LeaderElectionService.", exception);
+               // terminate ResourceManager in case of an error
+               shutDown();
+       }
 
-       private class ResourceManagerLeaderContender implements LeaderContender 
{
+       private static class JobMasterLeaderListener implements 
LeaderRetrievalListener {
 
-               /**
-                * Callback method when current resourceManager is granted 
leadership
-                *
-                * @param leaderSessionID unique leadershipID
-                */
-               @Override
-               public void grantLeadership(final UUID leaderSessionID) {
-                       runAsync(new Runnable() {
-                               @Override
-                               public void run() {
-                                       log.info("ResourceManager {} was 
granted leadership with leader session ID {}", getAddress(), leaderSessionID);
-                                       ResourceManager.this.leaderSessionID = 
leaderSessionID;
-                                       // confirming the leader session ID 
might be blocking,
-                                       
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
-                               }
-                       });
-               }
+               private final JobID jobID;
+               private UUID leaderID;
 
-               /**
-                * Callback method when current resourceManager lose leadership.
-                */
-               @Override
-               public void revokeLeadership() {
-                       runAsync(new Runnable() {
-                               @Override
-                               public void run() {
-                                       log.info("ResourceManager {} was 
revoked leadership.", getAddress());
-                                       jobMasterGateways.clear();
-                                       startedTaskExecutorGateways.clear();
-                                       leaderSessionID = null;
-                               }
-                       });
+               private JobMasterLeaderListener(JobID jobID) {
+                       this.jobID = jobID;
                }
 
                @Override
-               public String getAddress() {
-                       return ResourceManager.this.getAddress();
+               public void notifyLeaderAddress(final String leaderAddress, 
final UUID leaderSessionID) {
+                       this.leaderID = leaderSessionID;
                }
 
-               /**
-                * Handles error occurring in the leader election service
-                *
-                * @param exception Exception being thrown in the leader 
election service
-                */
                @Override
                public void handleError(final Exception exception) {
-                       runAsync(new Runnable() {
-                               @Override
-                               public void run() {
-                                       log.error("ResourceManager received an 
error from the LeaderElectionService.", exception);
-                                       // terminate ResourceManager in case of 
an error
-                                       shutDown();
-                               }
-                       });
+                       // TODO
                }
        }
 }
+

http://git-wip-us.apache.org/repos/asf/flink/blob/4076bd74/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 30a096f..d8b8ebe 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -18,15 +18,16 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.jobmaster.JobMaster;
-
 import org.apache.flink.runtime.registration.RegistrationResponse;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
+import org.apache.flink.runtime.registration.RegistrationResponse;
 import java.util.UUID;
 
 /**
@@ -37,21 +38,18 @@ public interface ResourceManagerGateway extends RpcGateway {
        /**
         * Register a {@link JobMaster} at the resource manager.
         *
-        * @param jobMasterRegistration Job master registration information
-        * @param timeout Timeout for the future to complete
+        * @param resourceManagerLeaderId The fencing token for the 
ResourceManager leader
+        * @param jobMasterAddress        The address of the JobMaster that 
registers
+        * @param jobID                   The Job ID of the JobMaster that 
registers
+        * @param timeout                 Timeout for the future to complete
         * @return Future registration response
         */
        Future<RegistrationResponse> registerJobMaster(
-               JobMasterRegistration jobMasterRegistration,
-               @RpcTimeout FiniteDuration timeout);
+               UUID resourceManagerLeaderId,
+               String jobMasterAddress,
+               JobID jobID,
+               @RpcTimeout Time timeout);
 
-       /**
-        * Register a {@link JobMaster} at the resource manager.
-        *
-        * @param jobMasterRegistration Job master registration information
-        * @return Future registration response
-        */
-       Future<RegistrationResponse> registerJobMaster(JobMasterRegistration 
jobMasterRegistration);
 
        /**
         * Requests a slot from the resource manager.
@@ -59,15 +57,15 @@ public interface ResourceManagerGateway extends RpcGateway {
         * @param slotRequest Slot request
         * @return Future slot assignment
         */
-       Future<SlotAssignment> requestSlot(SlotRequest slotRequest);
+       Future<SlotRequestReply> requestSlot(SlotRequest slotRequest);
 
        /**
         * Register a {@link 
org.apache.flink.runtime.taskexecutor.TaskExecutor} at the resource manager.
         *
         * @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader
-        * @param taskExecutorAddress      The address of the TaskExecutor that 
registers
-        * @param resourceID               The resource ID of the TaskExecutor 
that registers
-        * @param timeout                  The timeout for the response.
+        * @param taskExecutorAddress     The address of the TaskExecutor that 
registers
+        * @param resourceID              The resource ID of the TaskExecutor 
that registers
+        * @param timeout                 The timeout for the response.
         *
         * @return The future to the response by the ResourceManager.
         */
@@ -75,5 +73,5 @@ public interface ResourceManagerGateway extends RpcGateway {
                UUID resourceManagerLeaderId,
                String taskExecutorAddress,
                ResourceID resourceID,
-               @RpcTimeout FiniteDuration timeout);
+               @RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4076bd74/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
index bd78a47..f8dfdc7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
@@ -35,7 +35,7 @@ public class TaskExecutorRegistration implements Serializable 
{
        private InstanceID instanceID;
 
        public TaskExecutorRegistration(TaskExecutorGateway taskExecutorGateway,
-               InstanceID instanceID) {
+                                                                       
InstanceID instanceID) {
                this.taskExecutorGateway = taskExecutorGateway;
                this.instanceID = instanceID;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/4076bd74/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 5d0013c..a6d2196 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
 import org.apache.flink.runtime.resourcemanager.SlotRequestReply;

http://git-wip-us.apache.org/repos/asf/flink/blob/4076bd74/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
new file mode 100644
index 0000000..332c093
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Future;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SimpleSlotManager;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.*;
+
+public class ResourceManagerJobMasterTest {
+
+       private TestingSerialRpcService rpcService;
+
+       @Before
+       public void setup() throws Exception {
+               rpcService = new TestingSerialRpcService();
+       }
+
+       @After
+       public void teardown() throws Exception {
+               rpcService.stopService();
+       }
+
+       /**
+        * Test receive normal registration from job master and receive 
duplicate registration from job master
+        */
+       @Test
+       public void testRegisterJobMaster() throws Exception {
+               String jobMasterAddress = "/jobMasterAddress1";
+               JobID jobID = mockJobMaster(jobMasterAddress);
+               TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+               UUID jmLeaderID = UUID.randomUUID();
+               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
+               final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService);
+               final UUID rmLeaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+               // test response successful
+               Future<RegistrationResponse> successfulFuture = 
resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderID, 
jobMasterAddress, jobID);
+               RegistrationResponse response = successfulFuture.get(5, 
TimeUnit.SECONDS);
+               assertTrue(response instanceof JobMasterRegistrationSuccess);
+       }
+
+       /**
+        * Test receive registration with unmatched leadershipId from job master
+        */
+       @Test
+       public void testRegisterJobMasterWithUnmatchedLeaderSessionId1() throws 
Exception {
+               String jobMasterAddress = "/jobMasterAddress1";
+               JobID jobID = mockJobMaster(jobMasterAddress);
+               TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+               UUID jmLeaderID = UUID.randomUUID();
+               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
+               final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService);
+               final UUID rmLeaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+               // test throw exception when receive a registration from job 
master which takes unmatched leaderSessionId
+               UUID differentLeaderSessionID = UUID.randomUUID();
+               Future<RegistrationResponse> unMatchedLeaderFuture = 
resourceManager.registerJobMaster(differentLeaderSessionID, jmLeaderID, 
jobMasterAddress, jobID);
+               assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) 
instanceof RegistrationResponse.Decline);
+       }
+
+       /**
+        * Test receive registration with unmatched leadershipId from job master
+        */
+       @Test
+       public void testRegisterJobMasterWithUnmatchedLeaderSessionId2() throws 
Exception {
+               String jobMasterAddress = "/jobMasterAddress1";
+               JobID jobID = mockJobMaster(jobMasterAddress);
+               TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService();
+               final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService);
+               final UUID rmLeaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+               final UUID jmLeaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+               // test throw exception when receive a registration from job 
master which takes unmatched leaderSessionId
+               UUID differentLeaderSessionID = UUID.randomUUID();
+               Future<RegistrationResponse> unMatchedLeaderFuture = 
resourceManager.registerJobMaster(rmLeaderSessionId, differentLeaderSessionID, 
jobMasterAddress, jobID);
+               assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) 
instanceof RegistrationResponse.Decline);
+       }
+
+       /**
+        * Test receive registration with invalid address from job master
+        */
+       @Test
+       public void testRegisterJobMasterFromInvalidAddress() throws Exception {
+               String jobMasterAddress = "/jobMasterAddress1";
+               JobID jobID = mockJobMaster(jobMasterAddress);
+               TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService();
+               final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService);
+               final UUID rmLeaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+               final UUID jmLeaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+               // test throw exception when receive a registration from job 
master which takes invalid address
+               String invalidAddress = "/jobMasterAddress2";
+               Future<RegistrationResponse> invalidAddressFuture = 
resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderSessionId, 
invalidAddress, jobID);
+               assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) 
instanceof RegistrationResponse.Decline);
+       }
+
+       /**
+        * Check and verify return RegistrationResponse.Decline when failed to 
start a job master Leader retrieval listener
+        */
+       @Test
+       public void testRegisterJobMasterWithFailureLeaderListener() throws 
Exception {
+               String jobMasterAddress = "/jobMasterAddress1";
+               JobID jobID = mockJobMaster(jobMasterAddress);
+               TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService();
+               final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService);
+               final UUID rmLeaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+               final UUID jmLeaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+               JobID unknownJobIDToHAServices = new JobID();
+               // verify return RegistrationResponse.Decline when failed to 
start a job master Leader retrieval listener
+               Future<RegistrationResponse> declineFuture = 
resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderSessionId, 
jobMasterAddress, unknownJobIDToHAServices);
+               RegistrationResponse response = declineFuture.get(5, 
TimeUnit.SECONDS);
+               assertTrue(response instanceof RegistrationResponse.Decline);
+       }
+
+       private JobID mockJobMaster(String jobMasterAddress) {
+               JobID jobID = new JobID();
+               JobMasterGateway jobMasterGateway = 
mock(JobMasterGateway.class);
+               rpcService.registerGateway(jobMasterAddress, jobMasterGateway);
+               return jobID;
+       }
+
+       private ResourceManager 
createAndStartResourceManager(TestingLeaderElectionService 
resourceManagerLeaderElectionService, JobID jobID, 
TestingLeaderRetrievalService jobMasterLeaderRetrievalService) {
+               TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
+               
highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
+               highAvailabilityServices.setJobMasterLeaderRetriever(jobID, 
jobMasterLeaderRetrievalService);
+               ResourceManager resourceManager = new 
ResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager());
+               resourceManager.start();
+               return resourceManager;
+       }
+
+       private UUID 
grantResourceManagerLeadership(TestingLeaderElectionService 
resourceManagerLeaderElectionService) {
+               UUID leaderSessionId = UUID.randomUUID();
+               resourceManagerLeaderElectionService.isLeader(leaderSessionId);
+               return leaderSessionId;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4076bd74/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
new file mode 100644
index 0000000..ed7c7d7
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SimpleSlotManager;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class ResourceManagerTaskExecutorTest {
+
+       private TestingSerialRpcService rpcService;
+
+       @Before
+       public void setup() throws Exception {
+               rpcService = new TestingSerialRpcService();
+       }
+
+       @After
+       public void teardown() throws Exception {
+               rpcService.stopService();
+       }
+
+       /**
+        * Test receive normal registration from task executor and receive 
duplicate registration from task executor
+        */
+       @Test
+       public void testRegisterTaskExecutor() throws Exception {
+               String taskExecutorAddress = "/taskExecutor1";
+               ResourceID taskExecutorResourceID = 
mockTaskExecutor(taskExecutorAddress);
+               TestingLeaderElectionService rmLeaderElectionService = new 
TestingLeaderElectionService();
+               final ResourceManager resourceManager = 
createAndStartResourceManager(rmLeaderElectionService);
+               final UUID leaderSessionId = 
grantLeadership(rmLeaderElectionService);
+
+               // test response successful
+               Future<RegistrationResponse> successfulFuture = 
resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, 
taskExecutorResourceID);
+               RegistrationResponse response = successfulFuture.get(5, 
TimeUnit.SECONDS);
+               assertTrue(response instanceof TaskExecutorRegistrationSuccess);
+
+               // test response successful with instanceID not equal to 
previous when receive duplicate registration from taskExecutor
+               Future<RegistrationResponse> duplicateFuture = 
resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, 
taskExecutorResourceID);
+               RegistrationResponse duplicateResponse = duplicateFuture.get();
+               assertTrue(duplicateResponse instanceof 
TaskExecutorRegistrationSuccess);
+               assertNotEquals(((TaskExecutorRegistrationSuccess) 
response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) 
duplicateResponse).getRegistrationId());
+       }
+
+       /**
+        * Test receive registration with unmatched leadershipId from task 
executor
+        */
+       @Test
+       public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() 
throws Exception {
+               String taskExecutorAddress = "/taskExecutor1";
+               ResourceID taskExecutorResourceID = 
mockTaskExecutor(taskExecutorAddress);
+               TestingLeaderElectionService rmLeaderElectionService = new 
TestingLeaderElectionService();
+               final ResourceManager resourceManager = 
createAndStartResourceManager(rmLeaderElectionService);
+               final UUID leaderSessionId = 
grantLeadership(rmLeaderElectionService);
+
+               // test throw exception when receive a registration from 
taskExecutor which takes unmatched leaderSessionId
+               UUID differentLeaderSessionID = UUID.randomUUID();
+               Future<RegistrationResponse> unMatchedLeaderFuture = 
resourceManager.registerTaskExecutor(differentLeaderSessionID, 
taskExecutorAddress, taskExecutorResourceID);
+               assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) 
instanceof RegistrationResponse.Decline);
+       }
+
+       /**
+        * Test receive registration with invalid address from task executor
+        */
+       @Test
+       public void testRegisterTaskExecutorFromInvalidAddress() throws 
Exception {
+               String taskExecutorAddress = "/taskExecutor1";
+               ResourceID taskExecutorResourceID = 
mockTaskExecutor(taskExecutorAddress);
+               TestingLeaderElectionService leaderElectionService = new 
TestingLeaderElectionService();
+               final ResourceManager resourceManager = 
createAndStartResourceManager(leaderElectionService);
+               final UUID leaderSessionId = 
grantLeadership(leaderElectionService);
+
+               // test throw exception when receive a registration from 
taskExecutor which takes invalid address
+               String invalidAddress = "/taskExecutor2";
+               Future<RegistrationResponse> invalidAddressFuture = 
resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, 
taskExecutorResourceID);
+               assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) 
instanceof RegistrationResponse.Decline);
+       }
+
+       private ResourceID mockTaskExecutor(String taskExecutorAddress) {
+               TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+               ResourceID taskExecutorResourceID = ResourceID.generate();
+               rpcService.registerGateway(taskExecutorAddress, 
taskExecutorGateway);
+               return taskExecutorResourceID;
+       }
+
+       private ResourceManager 
createAndStartResourceManager(TestingLeaderElectionService 
rmLeaderElectionService) {
+               TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
+               
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
+               ResourceManager resourceManager = new 
ResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager());
+               resourceManager.start();
+               return resourceManager;
+       }
+
+       private UUID grantLeadership(TestingLeaderElectionService 
leaderElectionService) {
+               UUID leaderSessionId = UUID.randomUUID();
+               leaderElectionService.isLeader(leaderSessionId);
+               return leaderSessionId;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4076bd74/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
deleted file mode 100644
index b75d9b8..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.rpc.TestingSerialRpcService;
-import org.apache.flink.runtime.registration.RegistrationResponse;
-import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-public class ResourceManagerTest {
-
-       private TestingSerialRpcService rpcService;
-
-       @Before
-       public void setup() throws Exception {
-               rpcService = new TestingSerialRpcService();
-       }
-
-       @After
-       public void teardown() throws Exception {
-               rpcService.stopService();
-       }
-
-       /**
-        * Test receive normal registration from task executor and receive 
duplicate registration from task executor
-        *
-        * @throws Exception
-        */
-       @Test
-       public void testRegisterTaskExecutor() throws Exception {
-               String taskExecutorAddress = "/taskExecutor1";
-               ResourceID taskExecutorResourceID = 
mockTaskExecutor(taskExecutorAddress);
-               TestingLeaderElectionService leaderElectionService = new 
TestingLeaderElectionService();
-               final ResourceManager resourceManager = 
createAndStartResourceManager(leaderElectionService);
-               final UUID leaderSessionId = 
grantResourceManagerLeadership(leaderElectionService);
-
-               // test response successful
-               Future<RegistrationResponse> successfulFuture = 
resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, 
taskExecutorResourceID);
-               RegistrationResponse response = Await.result(successfulFuture, 
new FiniteDuration(0, TimeUnit.SECONDS));
-               assertTrue(response instanceof TaskExecutorRegistrationSuccess);
-
-               // test response successful with previous instanceID when 
receive duplicate registration from taskExecutor
-               Future<RegistrationResponse> duplicateFuture = 
resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, 
taskExecutorResourceID);
-               RegistrationResponse duplicateResponse = 
Await.result(duplicateFuture, new FiniteDuration(0, TimeUnit.SECONDS));
-               assertTrue(duplicateResponse instanceof 
TaskExecutorRegistrationSuccess);
-               assertEquals(((TaskExecutorRegistrationSuccess) 
response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) 
duplicateResponse).getRegistrationId());
-       }
-
-       /**
-        * Test receive registration with unmatched leadershipId from task 
executor
-        *
-        * @throws Exception
-        */
-       @Test(expected = LeaderSessionIDException.class)
-       public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() 
throws Exception {
-               String taskExecutorAddress = "/taskExecutor1";
-               ResourceID taskExecutorResourceID = 
mockTaskExecutor(taskExecutorAddress);
-               TestingLeaderElectionService leaderElectionService = new 
TestingLeaderElectionService();
-               final ResourceManager resourceManager = 
createAndStartResourceManager(leaderElectionService);
-               final UUID leaderSessionId = 
grantResourceManagerLeadership(leaderElectionService);
-
-               // test throw exception when receive a registration from 
taskExecutor which takes unmatched leaderSessionId
-               UUID differentLeaderSessionID = UUID.randomUUID();
-               Future<RegistrationResponse> unMatchedLeaderFuture = 
resourceManager.registerTaskExecutor(differentLeaderSessionID, 
taskExecutorAddress, taskExecutorResourceID);
-               Await.result(unMatchedLeaderFuture, new FiniteDuration(200, 
TimeUnit.MILLISECONDS));
-       }
-
-       /**
-        * Test receive registration with invalid address from task executor
-        *
-        * @throws Exception
-        */
-       @Test(expected = Exception.class)
-       public void testRegisterTaskExecutorFromInvalidAddress() throws 
Exception {
-               String taskExecutorAddress = "/taskExecutor1";
-               ResourceID taskExecutorResourceID = 
mockTaskExecutor(taskExecutorAddress);
-               TestingLeaderElectionService leaderElectionService = new 
TestingLeaderElectionService();
-               final ResourceManager resourceManager = 
createAndStartResourceManager(leaderElectionService);
-               final UUID leaderSessionId = 
grantResourceManagerLeadership(leaderElectionService);
-
-               // test throw exception when receive a registration from 
taskExecutor which takes invalid address
-               String invalidAddress = "/taskExecutor2";
-               Future<RegistrationResponse> invalidAddressFuture = 
resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, 
taskExecutorResourceID);
-               Await.result(invalidAddressFuture, new FiniteDuration(200, 
TimeUnit.MILLISECONDS));
-       }
-
-       private ResourceID mockTaskExecutor(String taskExecutorAddress) {
-               TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
-               ResourceID taskExecutorResourceID = ResourceID.generate();
-               rpcService.registerGateway(taskExecutorAddress, 
taskExecutorGateway);
-               return taskExecutorResourceID;
-       }
-
-       private ResourceManager 
createAndStartResourceManager(TestingLeaderElectionService 
leaderElectionService) {
-               TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
-               
highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
-               ResourceManager resourceManager = new 
ResourceManager(rpcService, highAvailabilityServices);
-               resourceManager.start();
-               return resourceManager;
-       }
-
-       private UUID 
grantResourceManagerLeadership(TestingLeaderElectionService 
leaderElectionService) {
-               UUID leaderSessionId = UUID.randomUUID();
-               leaderElectionService.isLeader(leaderSessionId);
-               return leaderSessionId;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4076bd74/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 1f9e7e8..0232fab 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -24,10 +24,14 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.NonHaServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
-import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
@@ -88,14 +92,20 @@ public class SlotProtocolTest extends TestLogger {
 
                testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
 
+               final TestingHighAvailabilityServices testingHaServices = new 
TestingHighAvailabilityServices();
+               final UUID rmLeaderID = UUID.randomUUID();
+               final UUID jmLeaderID = UUID.randomUUID();
+               TestingLeaderElectionService rmLeaderElectionService =
+                       configureHA(testingHaServices, jobID, rmAddress, 
rmLeaderID, jmAddress, jmLeaderID);
 
                TestingSlotManager slotManager = Mockito.spy(new 
TestingSlotManager());
                ResourceManager resourceManager =
-                       new ResourceManager(testRpcService, new 
NonHaServices(rmAddress), slotManager);
+                       new ResourceManager(testRpcService, testingHaServices, 
slotManager);
                resourceManager.start();
+               rmLeaderElectionService.isLeader(rmLeaderID);
 
                Future<RegistrationResponse> registrationFuture =
-                       resourceManager.registerJobMaster(new 
JobMasterRegistration(jmAddress, jobID));
+                       resourceManager.registerJobMaster(rmLeaderID, 
jmLeaderID, jmAddress, jobID);
                try {
                        registrationFuture.get(5, TimeUnit.SECONDS);
                } catch (Exception e) {
@@ -158,16 +168,23 @@ public class SlotProtocolTest extends TestLogger {
 
                testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
 
+               final TestingHighAvailabilityServices testingHaServices = new 
TestingHighAvailabilityServices();
+               final UUID rmLeaderID = UUID.randomUUID();
+               final UUID jmLeaderID = UUID.randomUUID();
+               TestingLeaderElectionService rmLeaderElectionService =
+                       configureHA(testingHaServices, jobID, rmAddress, 
rmLeaderID, jmAddress, jmLeaderID);
+
                TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
                testRpcService.registerGateway(tmAddress, taskExecutorGateway);
 
                TestingSlotManager slotManager = Mockito.spy(new 
TestingSlotManager());
                ResourceManager resourceManager =
-                       new ResourceManager(testRpcService, new 
NonHaServices(rmAddress), slotManager);
+                       new ResourceManager(testRpcService, testingHaServices, 
slotManager);
                resourceManager.start();
+               rmLeaderElectionService.isLeader(rmLeaderID);
 
                Future<RegistrationResponse> registrationFuture =
-                       resourceManager.registerJobMaster(new 
JobMasterRegistration(jmAddress, jobID));
+                       resourceManager.registerJobMaster(rmLeaderID, 
jmLeaderID, jmAddress, jobID);
                try {
                        registrationFuture.get(5, TimeUnit.SECONDS);
                } catch (Exception e) {
@@ -208,6 +225,20 @@ public class SlotProtocolTest extends TestLogger {
                verify(taskExecutorGateway, 
timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(Time.class));
        }
 
+       private static TestingLeaderElectionService configureHA(
+                       TestingHighAvailabilityServices testingHA, JobID jobID, 
String rmAddress, UUID rmID, String jmAddress, UUID jmID) {
+               final TestingLeaderElectionService rmLeaderElectionService = 
new TestingLeaderElectionService();
+               
testingHA.setResourceManagerLeaderElectionService(rmLeaderElectionService);
+               final TestingLeaderRetrievalService rmLeaderRetrievalService = 
new TestingLeaderRetrievalService(rmAddress, rmID);
+               
testingHA.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
+
+               final TestingLeaderElectionService jmLeaderElectionService = 
new TestingLeaderElectionService();
+               
testingHA.setJobMasterLeaderElectionService(jmLeaderElectionService);
+               final TestingLeaderRetrievalService jmLeaderRetrievalService = 
new TestingLeaderRetrievalService(jmAddress, jmID);
+               testingHA.setJobMasterLeaderRetriever(jobID, 
jmLeaderRetrievalService);
+
+               return rmLeaderElectionService;
+       }
 
        private static class TestingSlotManager extends SimpleSlotManager {
 

Reply via email to