http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequest.java
new file mode 100644
index 0000000..896421b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This describes the requirement of the slot, mainly used by JobManager 
requesting slot from ResourceManager.
+ */
+public class SlotRequest implements Serializable {
+
+       private static final long serialVersionUID = -6586877187990445986L;
+
+       /** The JobID of the slot requested for */
+       private final JobID jobId;
+
+       /** The unique identification of this request */
+       private final AllocationID allocationId;
+
+       /** The resource profile of the required slot */
+       private final ResourceProfile resourceProfile;
+
+       public SlotRequest(JobID jobId, AllocationID allocationId, 
ResourceProfile resourceProfile) {
+               this.jobId = checkNotNull(jobId);
+               this.allocationId = checkNotNull(allocationId);
+               this.resourceProfile = checkNotNull(resourceProfile);
+       }
+
+       /**
+        * Get the JobID of the slot requested for.
+        * @return The job id
+        */
+       public JobID getJobId() {
+               return jobId;
+       }
+
+       /**
+        * Get the unique identification of this request
+        * @return the allocation id
+        */
+       public AllocationID getAllocationId() {
+               return allocationId;
+       }
+
+       /**
+        * Get the resource profile of the desired slot
+        * @return The resource profile
+        */
+       public ResourceProfile getResourceProfile() {
+               return resourceProfile;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
deleted file mode 100644
index a046cb8..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.jobmaster;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.leaderelection.LeaderContender;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.RpcMethod;
-import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.util.Preconditions;
-
-import java.util.UUID;
-
-/**
- * JobMaster implementation. The job master is responsible for the execution 
of a single
- * {@link org.apache.flink.runtime.jobgraph.JobGraph}.
- * <p>
- * It offers the following methods as part of its rpc interface to interact 
with the JobMaster
- * remotely:
- * <ul>
- *     <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the 
task execution state for
- * given task</li>
- * </ul>
- */
-public class JobMaster extends RpcEndpoint<JobMasterGateway> {
-
-       /** Gateway to connected resource manager, null iff not connected */
-       private ResourceManagerGateway resourceManager = null;
-
-       /** Logical representation of the job */
-       private final JobGraph jobGraph;
-       private final JobID jobID;
-
-       /** Configuration of the job */
-       private final Configuration configuration;
-
-       /** Service to contend for and retrieve the leadership of JM and RM */
-       private final HighAvailabilityServices highAvailabilityServices;
-
-       /** Leader Management */
-       private LeaderElectionService leaderElectionService = null;
-       private UUID leaderSessionID;
-
-       /**
-        * The JM's Constructor
-        *
-        * @param jobGraph The representation of the job's execution plan
-        * @param configuration The job's configuration
-        * @param rpcService The RPC service at which the JM serves
-        * @param highAvailabilityService The cluster's HA service from the JM 
can elect and retrieve leaders.
-        */
-       public JobMaster(
-               JobGraph jobGraph,
-               Configuration configuration,
-               RpcService rpcService,
-               HighAvailabilityServices highAvailabilityService) {
-
-               super(rpcService);
-
-               this.jobGraph = Preconditions.checkNotNull(jobGraph);
-               this.jobID = Preconditions.checkNotNull(jobGraph.getJobID());
-
-               this.configuration = Preconditions.checkNotNull(configuration);
-
-               this.highAvailabilityServices = 
Preconditions.checkNotNull(highAvailabilityService);
-       }
-
-       public ResourceManagerGateway getResourceManager() {
-               return resourceManager;
-       }
-
-       
//----------------------------------------------------------------------------------------------
-       // Initialization methods
-       
//----------------------------------------------------------------------------------------------
-       public void start() {
-               super.start();
-
-               // register at the election once the JM starts
-               registerAtElectionService();
-       }
-
-
-       
//----------------------------------------------------------------------------------------------
-       // JobMaster Leadership methods
-       
//----------------------------------------------------------------------------------------------
-
-       /**
-        * Retrieves the election service and contend for the leadership.
-        */
-       private void registerAtElectionService() {
-               try {
-                       leaderElectionService = 
highAvailabilityServices.getJobMasterLeaderElectionService(jobID);
-                       leaderElectionService.start(new 
JobMasterLeaderContender());
-               } catch (Exception e) {
-                       throw new RuntimeException("Fail to register at the 
election of JobMaster", e);
-               }
-       }
-
-       /**
-        * Start the execution when the leadership is granted.
-        *
-        * @param newLeaderSessionID The identifier of the new leadership 
session
-        */
-       public void grantJobMasterLeadership(final UUID newLeaderSessionID) {
-               runAsync(new Runnable() {
-                       @Override
-                       public void run() {
-                               log.info("JobManager {} grants leadership with 
session id {}.", getAddress(), newLeaderSessionID);
-
-                               // The operation may be blocking, but since JM 
is idle before it grants the leadership, it's okay that
-                               // JM waits here for the operation's 
completeness.
-                               leaderSessionID = newLeaderSessionID;
-                               
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
-
-                               // TODO:: execute the job when the leadership 
is granted.
-                       }
-               });
-       }
-
-       /**
-        * Stop the execution when the leadership is revoked.
-        */
-       public void revokeJobMasterLeadership() {
-               runAsync(new Runnable() {
-                       @Override
-                       public void run() {
-                               log.info("JobManager {} was revoked 
leadership.", getAddress());
-
-                               // TODO:: cancel the job's execution and notify 
all listeners
-                               cancelAndClearEverything(new 
Exception("JobManager is no longer the leader."));
-
-                               leaderSessionID = null;
-                       }
-               });
-       }
-
-       /**
-        * Handles error occurring in the leader election service
-        *
-        * @param exception Exception thrown in the leader election service
-        */
-       public void onJobMasterElectionError(final Exception exception) {
-               runAsync(new Runnable() {
-                       @Override
-                       public void run() {
-                               log.error("Received an error from the 
LeaderElectionService.", exception);
-
-                               // TODO:: cancel the job's execution and 
shutdown the JM
-                               cancelAndClearEverything(exception);
-
-                               leaderSessionID = null;
-                       }
-               });
-
-       }
-
-       
//----------------------------------------------------------------------------------------------
-       // RPC methods
-       
//----------------------------------------------------------------------------------------------
-
-       /**
-        * Updates the task execution state for a given task.
-        *
-        * @param taskExecutionState New task execution state for a given task
-        * @return Acknowledge the task execution state update
-        */
-       @RpcMethod
-       public Acknowledge updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {
-               System.out.println("TaskExecutionState: " + taskExecutionState);
-               return Acknowledge.get();
-       }
-
-       /**
-        * Triggers the registration of the job master at the resource manager.
-        *
-        * @param address Address of the resource manager
-        */
-       @RpcMethod
-       public void registerAtResourceManager(final String address) {
-               //TODO:: register at the RM
-       }
-
-       
//----------------------------------------------------------------------------------------------
-       // Helper methods
-       
//----------------------------------------------------------------------------------------------
-
-       /**
-        * Cancel the current job and notify all listeners the job's 
cancellation.
-        *
-        * @param cause Cause for the cancelling.
-        */
-       private void cancelAndClearEverything(Throwable cause) {
-               // currently, nothing to do here
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utility classes
-       // 
------------------------------------------------------------------------
-       private class JobMasterLeaderContender implements LeaderContender {
-
-               @Override
-               public void grantLeadership(UUID leaderSessionID) {
-                       
JobMaster.this.grantJobMasterLeadership(leaderSessionID);
-               }
-
-               @Override
-               public void revokeLeadership() {
-                       JobMaster.this.revokeJobMasterLeadership();
-               }
-
-               @Override
-               public String getAddress() {
-                       return JobMaster.this.getAddress();
-               }
-
-               @Override
-               public void handleError(Exception exception) {
-                       onJobMasterElectionError(exception);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java
deleted file mode 100644
index 17a4c3a..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.jobmaster;
-
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import scala.concurrent.Future;
-
-/**
- * {@link JobMaster} rpc gateway interface
- */
-public interface JobMasterGateway extends RpcGateway {
-
-       /**
-        * Updates the task execution state for a given task.
-        *
-        * @param taskExecutionState New task execution state for a given task
-        * @return Future acknowledge of the task execution state update
-        */
-       Future<Acknowledge> updateTaskExecutionState(TaskExecutionState 
taskExecutionState);
-
-       /**
-        * Triggers the registration of the job master at the resource manager.
-        *
-        * @param address Address of the resource manager
-        */
-       void registerAtResourceManager(final String address);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java
deleted file mode 100644
index 2de560a..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.registration;
-
-import java.io.Serializable;
-
-/**
- * Base class for responses given to registration attempts from {@link 
RetryingRegistration}.
- */
-public abstract class RegistrationResponse implements Serializable {
-
-       private static final long serialVersionUID = 1L;
-
-       // 
----------------------------------------------------------------------------
-       
-       /**
-        * Base class for a successful registration. Concrete registration 
implementations
-        * will typically extend this class to attach more information.
-        */
-       public static class Success extends RegistrationResponse {
-               private static final long serialVersionUID = 1L;
-               
-               @Override
-               public String toString() {
-                       return "Registration Successful";
-               }
-       }
-
-       // 
----------------------------------------------------------------------------
-
-       /**
-        * A rejected (declined) registration.
-        */
-       public static final class Decline extends RegistrationResponse {
-               private static final long serialVersionUID = 1L;
-
-               /** the rejection reason */
-               private final String reason;
-
-               /**
-                * Creates a new rejection message.
-                * 
-                * @param reason The reason for the rejection.
-                */
-               public Decline(String reason) {
-                       this.reason = reason != null ? reason : "(unknown)";
-               }
-
-               /**
-                * Gets the reason for the rejection.
-                */
-               public String getReason() {
-                       return reason;
-               }
-
-               @Override
-               public String toString() {
-                       return "Registration Declined (" + reason + ')';
-               }
-       }
-}
-
-
-
-
-
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
deleted file mode 100644
index dcb5011..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.registration;
-
-import akka.dispatch.OnFailure;
-import akka.dispatch.OnSuccess;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcService;
-
-import org.slf4j.Logger;
-
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.impl.Promise.DefaultPromise;
-
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-
-/**
- * This utility class implements the basis of registering one component at 
another component,
- * for example registering the TaskExecutor at the ResourceManager.
- * This {@code RetryingRegistration} implements both the initial address 
resolution
- * and the retries-with-backoff strategy.
- * 
- * <p>The registration gives access to a future that is completed upon 
successful registration.
- * The registration can be canceled, for example when the target where it 
tries to register
- * at looses leader status.
- * 
- * @param <Gateway> The type of the gateway to connect to.
- * @param <Success> The type of the successful registration responses.
- */
-public abstract class RetryingRegistration<Gateway extends RpcGateway, Success 
extends RegistrationResponse.Success> {
-
-       // 
------------------------------------------------------------------------
-       //  default configuration values
-       // 
------------------------------------------------------------------------
-
-       /** default value for the initial registration timeout (milliseconds) */
-       private static final long INITIAL_REGISTRATION_TIMEOUT_MILLIS = 100;
-
-       /** default value for the maximum registration timeout, after 
exponential back-off (milliseconds) */
-       private static final long MAX_REGISTRATION_TIMEOUT_MILLIS = 30000;
-
-       /** The pause (milliseconds) made after an registration attempt caused 
an exception (other than timeout) */
-       private static final long ERROR_REGISTRATION_DELAY_MILLIS = 10000;
-
-       /** The pause (milliseconds) made after the registration attempt was 
refused */
-       private static final long REFUSED_REGISTRATION_DELAY_MILLIS = 30000;
-
-       // 
------------------------------------------------------------------------
-       // Fields
-       // 
------------------------------------------------------------------------
-
-       private final Logger log;
-
-       private final RpcService rpcService;
-
-       private final String targetName;
-
-       private final Class<Gateway> targetType;
-
-       private final String targetAddress;
-
-       private final UUID leaderId;
-
-       private final Promise<Tuple2<Gateway, Success>> completionPromise;
-
-       private final long initialRegistrationTimeout;
-
-       private final long maxRegistrationTimeout;
-
-       private final long delayOnError;
-
-       private final long delayOnRefusedRegistration;
-
-       private volatile boolean canceled;
-
-       // 
------------------------------------------------------------------------
-
-       public RetryingRegistration(
-                       Logger log,
-                       RpcService rpcService,
-                       String targetName,
-                       Class<Gateway> targetType,
-                       String targetAddress,
-                       UUID leaderId) {
-               this(log, rpcService, targetName, targetType, targetAddress, 
leaderId,
-                               INITIAL_REGISTRATION_TIMEOUT_MILLIS, 
MAX_REGISTRATION_TIMEOUT_MILLIS,
-                               ERROR_REGISTRATION_DELAY_MILLIS, 
REFUSED_REGISTRATION_DELAY_MILLIS);
-       }
-
-       public RetryingRegistration(
-                       Logger log,
-                       RpcService rpcService,
-                       String targetName, 
-                       Class<Gateway> targetType,
-                       String targetAddress,
-                       UUID leaderId,
-                       long initialRegistrationTimeout,
-                       long maxRegistrationTimeout,
-                       long delayOnError,
-                       long delayOnRefusedRegistration) {
-
-               checkArgument(initialRegistrationTimeout > 0, "initial 
registration timeout must be greater than zero");
-               checkArgument(maxRegistrationTimeout > 0, "maximum registration 
timeout must be greater than zero");
-               checkArgument(delayOnError >= 0, "delay on error must be 
non-negative");
-               checkArgument(delayOnRefusedRegistration >= 0, "delay on 
refused registration must be non-negative");
-
-               this.log = checkNotNull(log);
-               this.rpcService = checkNotNull(rpcService);
-               this.targetName = checkNotNull(targetName);
-               this.targetType = checkNotNull(targetType);
-               this.targetAddress = checkNotNull(targetAddress);
-               this.leaderId = checkNotNull(leaderId);
-               this.initialRegistrationTimeout = initialRegistrationTimeout;
-               this.maxRegistrationTimeout = maxRegistrationTimeout;
-               this.delayOnError = delayOnError;
-               this.delayOnRefusedRegistration = delayOnRefusedRegistration;
-
-               this.completionPromise = new DefaultPromise<>();
-       }
-
-       // 
------------------------------------------------------------------------
-       //  completion and cancellation
-       // 
------------------------------------------------------------------------
-
-       public Future<Tuple2<Gateway, Success>> getFuture() {
-               return completionPromise.future();
-       }
-
-       /**
-        * Cancels the registration procedure.
-        */
-       public void cancel() {
-               canceled = true;
-       }
-
-       /**
-        * Checks if the registration was canceled.
-        * @return True if the registration was canceled, false otherwise.
-        */
-       public boolean isCanceled() {
-               return canceled;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  registration
-       // 
------------------------------------------------------------------------
-
-       protected abstract Future<RegistrationResponse> invokeRegistration(
-                       Gateway gateway, UUID leaderId, long timeoutMillis) 
throws Exception;
-
-       /**
-        * This method resolves the target address to a callable gateway and 
starts the
-        * registration after that.
-        */
-       @SuppressWarnings("unchecked")
-       public void startRegistration() {
-               try {
-                       // trigger resolution of the resource manager address 
to a callable gateway
-                       Future<Gateway> resourceManagerFuture = 
rpcService.connect(targetAddress, targetType);
-       
-                       // upon success, start the registration attempts
-                       resourceManagerFuture.onSuccess(new 
OnSuccess<Gateway>() {
-                               @Override
-                               public void onSuccess(Gateway result) {
-                                       log.info("Resolved {} address, 
beginning registration", targetName);
-                                       register(result, 1, 
initialRegistrationTimeout);
-                               }
-                       }, rpcService.getExecutionContext());
-       
-                       // upon failure, retry, unless this is cancelled
-                       resourceManagerFuture.onFailure(new OnFailure() {
-                               @Override
-                               public void onFailure(Throwable failure) {
-                                       if (!isCanceled()) {
-                                               log.warn("Could not resolve {} 
address {}, retrying...", targetName, targetAddress);
-                                               startRegistration();
-                                       }
-                               }
-                       }, rpcService.getExecutionContext());
-               }
-               catch (Throwable t) {
-                       cancel();
-                       completionPromise.tryFailure(t);
-               }
-       }
-
-       /**
-        * This method performs a registration attempt and triggers either a 
success notification or a retry,
-        * depending on the result.
-        */
-       @SuppressWarnings("unchecked")
-       private void register(final Gateway gateway, final int attempt, final 
long timeoutMillis) {
-               // eager check for canceling to avoid some unnecessary work
-               if (canceled) {
-                       return;
-               }
-
-               try {
-                       log.info("Registration at {} attempt {} 
(timeout={}ms)", targetName, attempt, timeoutMillis);
-                       Future<RegistrationResponse> registrationFuture = 
invokeRegistration(gateway, leaderId, timeoutMillis);
-       
-                       // if the registration was successful, let the 
TaskExecutor know
-                       registrationFuture.onSuccess(new 
OnSuccess<RegistrationResponse>() {
-                               
-                               @Override
-                               public void onSuccess(RegistrationResponse 
result) throws Throwable {
-                                       if (!isCanceled()) {
-                                               if (result instanceof 
RegistrationResponse.Success) {
-                                                       // registration 
successful!
-                                                       Success success = 
(Success) result;
-                                                       
completionPromise.success(new Tuple2<>(gateway, success));
-                                               }
-                                               else {
-                                                       // registration refused 
or unknown
-                                                       if (result instanceof 
RegistrationResponse.Decline) {
-                                                               
RegistrationResponse.Decline decline = (RegistrationResponse.Decline) result;
-                                                               
log.info("Registration at {} was declined: {}", targetName, 
decline.getReason());
-                                                       } else {
-                                                               
log.error("Received unknown response to registration attempt: " + result);
-                                                       }
-
-                                                       log.info("Pausing and 
re-attempting registration in {} ms", delayOnRefusedRegistration);
-                                                       registerLater(gateway, 
1, initialRegistrationTimeout, delayOnRefusedRegistration);
-                                               }
-                                       }
-                               }
-                       }, rpcService.getExecutionContext());
-       
-                       // upon failure, retry
-                       registrationFuture.onFailure(new OnFailure() {
-                               @Override
-                               public void onFailure(Throwable failure) {
-                                       if (!isCanceled()) {
-                                               if (failure instanceof 
TimeoutException) {
-                                                       // we simply have not 
received a response in time. maybe the timeout was
-                                                       // very low (initial 
fast registration attempts), maybe the target endpoint is
-                                                       // currently down.
-                                                       if 
(log.isDebugEnabled()) {
-                                                               
log.debug("Registration at {} ({}) attempt {} timed out after {} ms",
-                                                                               
targetName, targetAddress, attempt, timeoutMillis);
-                                                       }
-       
-                                                       long newTimeoutMillis = 
Math.min(2 * timeoutMillis, maxRegistrationTimeout);
-                                                       register(gateway, 
attempt + 1, newTimeoutMillis);
-                                               }
-                                               else {
-                                                       // a serious failure 
occurred. we still should not give up, but keep trying
-                                                       log.error("Registration 
at " + targetName + " failed due to an error", failure);
-                                                       log.info("Pausing and 
re-attempting registration in {} ms", delayOnError);
-       
-                                                       registerLater(gateway, 
1, initialRegistrationTimeout, delayOnError);
-                                               }
-                                       }
-                               }
-                       }, rpcService.getExecutionContext());
-               }
-               catch (Throwable t) {
-                       cancel();
-                       completionPromise.tryFailure(t);
-               }
-       }
-
-       private void registerLater(final Gateway gateway, final int attempt, 
final long timeoutMillis, long delay) {
-               rpcService.scheduleRunnable(new Runnable() {
-                       @Override
-                       public void run() {
-                               register(gateway, attempt, timeoutMillis);
-                       }
-               }, delay, TimeUnit.MILLISECONDS);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java
deleted file mode 100644
index 7a2deae..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.resourcemanager;
-
-import java.io.Serializable;
-
-public class JobMasterRegistration implements Serializable {
-       private static final long serialVersionUID = 8411214999193765202L;
-
-       private final String address;
-
-       public JobMasterRegistration(String address) {
-               this.address = address;
-       }
-
-       public String getAddress() {
-               return address;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java
deleted file mode 100644
index 8ac9e49..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.resourcemanager;
-
-import org.apache.flink.runtime.instance.InstanceID;
-
-import java.io.Serializable;
-
-public class RegistrationResponse implements Serializable {
-       private static final long serialVersionUID = -2379003255993119993L;
-
-       private final boolean isSuccess;
-       private final InstanceID instanceID;
-
-       public RegistrationResponse(boolean isSuccess, InstanceID instanceID) {
-               this.isSuccess = isSuccess;
-               this.instanceID = instanceID;
-       }
-
-       public boolean isSuccess() {
-               return isSuccess;
-       }
-
-       public InstanceID getInstanceID() {
-               return instanceID;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
deleted file mode 100644
index f7147c9..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.resourcemanager;
-
-import akka.dispatch.Mapper;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.leaderelection.LeaderContender;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.rpc.RpcMethod;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
-import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
-import 
org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorRegistrationSuccess;
-
-import scala.concurrent.Future;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * ResourceManager implementation. The resource manager is responsible for 
resource de-/allocation
- * and bookkeeping.
- *
- * It offers the following methods as part of its rpc interface to interact 
with the him remotely:
- * <ul>
- *     <li>{@link #registerJobMaster(JobMasterRegistration)} registers a 
{@link JobMaster} at the resource manager</li>
- *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource 
manager</li>
- * </ul>
- */
-public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
-       private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
-       private final HighAvailabilityServices highAvailabilityServices;
-       private LeaderElectionService leaderElectionService = null;
-       private UUID leaderSessionID = null;
-
-       public ResourceManager(RpcService rpcService, HighAvailabilityServices 
highAvailabilityServices) {
-               super(rpcService);
-               this.highAvailabilityServices = 
checkNotNull(highAvailabilityServices);
-               this.jobMasterGateways = new HashMap<>();
-       }
-
-       @Override
-       public void start() {
-               // start a leader
-               try {
-                       super.start();
-                       leaderElectionService = 
highAvailabilityServices.getResourceManagerLeaderElectionService();
-                       leaderElectionService.start(new 
ResourceManagerLeaderContender());
-               } catch (Throwable e) {
-                       log.error("A fatal error happened when starting the 
ResourceManager", e);
-                       throw new RuntimeException("A fatal error happened when 
starting the ResourceManager", e);
-               }
-       }
-
-       @Override
-       public void shutDown() {
-               try {
-                       leaderElectionService.stop();
-                       super.shutDown();
-               } catch(Throwable e) {
-                       log.error("A fatal error happened when shutdown the 
ResourceManager", e);
-                       throw new RuntimeException("A fatal error happened when 
shutdown the ResourceManager", e);
-               }
-       }
-
-       /**
-        * Gets the leader session id of current resourceManager.
-        *
-        * @return return the leaderSessionId of current resourceManager, this 
returns null until the current resourceManager is granted leadership.
-        */
-       @VisibleForTesting
-       UUID getLeaderSessionID() {
-               return leaderSessionID;
-       }
-
-       /**
-        * Register a {@link JobMaster} at the resource manager.
-        *
-        * @param jobMasterRegistration Job master registration information
-        * @return Future registration response
-        */
-       @RpcMethod
-       public Future<RegistrationResponse> 
registerJobMaster(JobMasterRegistration jobMasterRegistration) {
-               Future<JobMasterGateway> jobMasterFuture = 
getRpcService().connect(jobMasterRegistration.getAddress(), 
JobMasterGateway.class);
-
-               return jobMasterFuture.map(new Mapper<JobMasterGateway, 
RegistrationResponse>() {
-                       @Override
-                       public RegistrationResponse apply(final 
JobMasterGateway jobMasterGateway) {
-                               InstanceID instanceID;
-
-                               if 
(jobMasterGateways.containsKey(jobMasterGateway)) {
-                                       instanceID = 
jobMasterGateways.get(jobMasterGateway);
-                               } else {
-                                       instanceID = new InstanceID();
-                                       jobMasterGateways.put(jobMasterGateway, 
instanceID);
-                               }
-
-                               return new RegistrationResponse(true, 
instanceID);
-                       }
-               }, getMainThreadExecutionContext());
-       }
-
-       /**
-        * Requests a slot from the resource manager.
-        *
-        * @param slotRequest Slot request
-        * @return Slot assignment
-        */
-       @RpcMethod
-       public SlotAssignment requestSlot(SlotRequest slotRequest) {
-               System.out.println("SlotRequest: " + slotRequest);
-               return new SlotAssignment();
-       }
-
-
-       /**
-        *
-        * @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader 
-        * @param taskExecutorAddress      The address of the TaskExecutor that 
registers
-        * @param resourceID               The resource ID of the TaskExecutor 
that registers
-        *
-        * @return The response by the ResourceManager.
-        */
-       @RpcMethod
-       public org.apache.flink.runtime.rpc.registration.RegistrationResponse 
registerTaskExecutor(
-                       UUID resourceManagerLeaderId,
-                       String taskExecutorAddress,
-                       ResourceID resourceID) {
-
-               return new TaskExecutorRegistrationSuccess(new InstanceID(), 
5000);
-       }
-
-       private class ResourceManagerLeaderContender implements LeaderContender 
{
-
-               /**
-                * Callback method when current resourceManager is granted 
leadership
-                *
-                * @param leaderSessionID unique leadershipID
-                */
-               @Override
-               public void grantLeadership(final UUID leaderSessionID) {
-                       runAsync(new Runnable() {
-                               @Override
-                               public void run() {
-                                       log.info("ResourceManager {} was 
granted leadership with leader session ID {}", getAddress(), leaderSessionID);
-                                       ResourceManager.this.leaderSessionID = 
leaderSessionID;
-                                       // confirming the leader session ID 
might be blocking,
-                                       
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
-                               }
-                       });
-               }
-
-               /**
-                * Callback method when current resourceManager lose leadership.
-                */
-               @Override
-               public void revokeLeadership() {
-                       runAsync(new Runnable() {
-                               @Override
-                               public void run() {
-                                       log.info("ResourceManager {} was 
revoked leadership.", getAddress());
-                                       jobMasterGateways.clear();
-                                       leaderSessionID = null;
-                               }
-                       });
-               }
-
-               @Override
-               public String getAddress() {
-                       return ResourceManager.this.getAddress();
-               }
-
-               /**
-                * Handles error occurring in the leader election service
-                *
-                * @param exception Exception being thrown in the leader 
election service
-                */
-               @Override
-               public void handleError(final Exception exception) {
-                       runAsync(new Runnable() {
-                               @Override
-                               public void run() {
-                                       log.error("ResourceManager received an 
error from the LeaderElectionService.", exception);
-                                       // terminate ResourceManager in case of 
an error
-                                       shutDown();
-                               }
-                       });
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
deleted file mode 100644
index afddb01..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.resourcemanager;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcTimeout;
-import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
-
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.UUID;
-
-/**
- * The {@link ResourceManager}'s RPC gateway interface.
- */
-public interface ResourceManagerGateway extends RpcGateway {
-
-       /**
-        * Register a {@link JobMaster} at the resource manager.
-        *
-        * @param jobMasterRegistration Job master registration information
-        * @param timeout Timeout for the future to complete
-        * @return Future registration response
-        */
-       Future<RegistrationResponse> registerJobMaster(
-               JobMasterRegistration jobMasterRegistration,
-               @RpcTimeout FiniteDuration timeout);
-
-       /**
-        * Register a {@link JobMaster} at the resource manager.
-        *
-        * @param jobMasterRegistration Job master registration information
-        * @return Future registration response
-        */
-       Future<RegistrationResponse> registerJobMaster(JobMasterRegistration 
jobMasterRegistration);
-
-       /**
-        * Requests a slot from the resource manager.
-        *
-        * @param slotRequest Slot request
-        * @return Future slot assignment
-        */
-       Future<SlotAssignment> requestSlot(SlotRequest slotRequest);
-
-       /**
-        * 
-        * @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader 
-        * @param taskExecutorAddress      The address of the TaskExecutor that 
registers
-        * @param resourceID               The resource ID of the TaskExecutor 
that registers
-        * @param timeout                  The timeout for the response.
-        * 
-        * @return The future to the response by the ResourceManager.
-        */
-       Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> 
registerTaskExecutor(
-                       UUID resourceManagerLeaderId,
-                       String taskExecutorAddress,
-                       ResourceID resourceID,
-                       @RpcTimeout FiniteDuration timeout);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java
deleted file mode 100644
index 86cd8b7..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.resourcemanager;
-
-import java.io.Serializable;
-
-public class SlotAssignment implements Serializable{
-       private static final long serialVersionUID = -6990813455942742322L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
deleted file mode 100644
index 74c7c39..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.resourcemanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-
-import java.io.Serializable;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * This describes the requirement of the slot, mainly used by JobManager 
requesting slot from ResourceManager.
- */
-public class SlotRequest implements Serializable {
-
-       private static final long serialVersionUID = -6586877187990445986L;
-
-       /** The JobID of the slot requested for */
-       private final JobID jobId;
-
-       /** The unique identification of this request */
-       private final AllocationID allocationId;
-
-       /** The resource profile of the required slot */
-       private final ResourceProfile resourceProfile;
-
-       public SlotRequest(JobID jobId, AllocationID allocationId, 
ResourceProfile resourceProfile) {
-               this.jobId = checkNotNull(jobId);
-               this.allocationId = checkNotNull(allocationId);
-               this.resourceProfile = checkNotNull(resourceProfile);
-       }
-
-       /**
-        * Get the JobID of the slot requested for.
-        * @return The job id
-        */
-       public JobID getJobId() {
-               return jobId;
-       }
-
-       /**
-        * Get the unique identification of this request
-        * @return the allocation id
-        */
-       public AllocationID getAllocationId() {
-               return allocationId;
-       }
-
-       /**
-        * Get the resource profile of the desired slot
-        * @return The resource profile
-        */
-       public ResourceProfile getResourceProfile() {
-               return resourceProfile;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
deleted file mode 100644
index c372ecb..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.taskexecutor;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-
-import java.io.Serializable;
-import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A report about the current status of all slots of the TaskExecutor, 
describing
- * which slots are available and allocated, and what jobs (JobManagers) the 
allocated slots
- * have been allocated to.
- */
-public class SlotReport implements Serializable {
-
-       private static final long serialVersionUID = -3150175198722481689L;
-
-       /** The slots status of the TaskManager */
-       private final List<SlotStatus> slotsStatus;
-
-       /** The resource id which identifies the TaskManager */
-       private final ResourceID resourceID;
-
-       public SlotReport(final List<SlotStatus> slotsStatus, final ResourceID 
resourceID) {
-               this.slotsStatus = checkNotNull(slotsStatus);
-               this.resourceID = checkNotNull(resourceID);
-       }
-
-       public List<SlotStatus> getSlotsStatus() {
-               return slotsStatus;
-       }
-
-       public ResourceID getResourceID() {
-               return resourceID;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java
deleted file mode 100644
index e8e2084..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.taskexecutor;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-
-import java.io.Serializable;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * This describes the slot current status which located in TaskManager.
- */
-public class SlotStatus implements Serializable {
-
-       private static final long serialVersionUID = 5099191707339664493L;
-
-       /** slotID to identify a slot */
-       private final SlotID slotID;
-
-       /** the resource profile of the slot */
-       private final ResourceProfile profiler;
-
-       /** if the slot is allocated, allocationId identify its allocation; 
else, allocationId is null */
-       private final AllocationID allocationID;
-
-       /** if the slot is allocated, jobId identify which job this slot is 
allocated to; else, jobId is null */
-       private final JobID jobID;
-
-       public SlotStatus(SlotID slotID, ResourceProfile profiler) {
-               this(slotID, profiler, null, null);
-       }
-
-       public SlotStatus(SlotID slotID, ResourceProfile profiler, AllocationID 
allocationID, JobID jobID) {
-               this.slotID = checkNotNull(slotID, "slotID cannot be null");
-               this.profiler = checkNotNull(profiler, "profile cannot be 
null");
-               this.allocationID = allocationID;
-               this.jobID = jobID;
-       }
-
-       /**
-        * Get the unique identification of this slot
-        *
-        * @return The slot id
-        */
-       public SlotID getSlotID() {
-               return slotID;
-       }
-
-       /**
-        * Get the resource profile of this slot
-        *
-        * @return The resource profile
-        */
-       public ResourceProfile getProfiler() {
-               return profiler;
-       }
-
-       /**
-        * Get the allocation id of this slot
-        *
-        * @return The allocation id if this slot is allocated, otherwise null
-        */
-       public AllocationID getAllocationID() {
-               return allocationID;
-       }
-
-       /**
-        * Get the job id of the slot allocated for
-        *
-        * @return The job id if this slot is allocated, otherwise null
-        */
-       public JobID getJobID() {
-               return jobID;
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-
-               SlotStatus that = (SlotStatus) o;
-
-               if (!slotID.equals(that.slotID)) {
-                       return false;
-               }
-               if (!profiler.equals(that.profiler)) {
-                       return false;
-               }
-               if (allocationID != null ? 
!allocationID.equals(that.allocationID) : that.allocationID != null) {
-                       return false;
-               }
-               return jobID != null ? jobID.equals(that.jobID) : that.jobID == 
null;
-
-       }
-
-       @Override
-       public int hashCode() {
-               int result = slotID.hashCode();
-               result = 31 * result + profiler.hashCode();
-               result = 31 * result + (allocationID != null ? 
allocationID.hashCode() : 0);
-               result = 31 * result + (jobID != null ? jobID.hashCode() : 0);
-               return result;
-       }
-
-}

Reply via email to