[FLINK-4355] [cluster management] Implement TaskManager side of registration at 
ResourceManager.

This closes #2353


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe90811a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe90811a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe90811a

Branch: refs/heads/flip-6
Commit: fe90811ad115abb0c95f47461ae6630cd994246f
Parents: 613f5a7
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 10 20:42:45 2016 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Wed Sep 21 11:39:14 2016 +0200

----------------------------------------------------------------------
 .../HighAvailabilityServices.java               |  39 +++
 .../runtime/highavailability/NonHaServices.java |  59 ++++
 .../StandaloneLeaderRetrievalService.java       |  72 +++--
 .../apache/flink/runtime/rpc/RpcEndpoint.java   |   1 -
 .../apache/flink/runtime/rpc/RpcService.java    |  27 ++
 .../flink/runtime/rpc/akka/AkkaRpcService.java  |  18 ++
 .../runtime/rpc/akka/messages/RunAsync.java     |   1 +
 .../rpc/registration/RegistrationResponse.java  |  84 ++++++
 .../rpc/registration/RetryingRegistration.java  | 292 +++++++++++++++++++
 .../rpc/resourcemanager/ResourceManager.java    |  23 ++
 .../resourcemanager/ResourceManagerGateway.java |  21 +-
 .../runtime/rpc/taskexecutor/SlotReport.java    |  38 +++
 .../runtime/rpc/taskexecutor/TaskExecutor.java  | 169 ++++++++---
 .../rpc/taskexecutor/TaskExecutorGateway.java   |  29 +-
 .../TaskExecutorRegistrationSuccess.java        |  75 +++++
 ...TaskExecutorToResourceManagerConnection.java | 194 ++++++++++++
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    |  51 +++-
 .../rpc/taskexecutor/TaskExecutorTest.java      |  87 +-----
 18 files changed, 1105 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fe90811a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
new file mode 100644
index 0000000..094d36f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+/**
+ * This class gives access to all services needed for
+ *
+ * <ul>
+ *     <li>ResourceManager leader election and leader retrieval</li>
+ *     <li>JobManager leader election and leader retrieval</li>
+ *     <li>Persistence for checkpoint metadata</li>
+ *     <li>Registering the latest completed checkpoint(s)</li>
+ * </ul>
+ */
+public interface HighAvailabilityServices {
+
+       /**
+        * Gets the leader retriever for the cluster's resource manager.
+        */
+       LeaderRetrievalService getResourceManagerLeaderRetriever() throws 
Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe90811a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
new file mode 100644
index 0000000..b8c2ed8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link HighAvailabilityServices} for the 
non-high-availability case.
+ * This implementation can be used for testing, and for cluster setups that do 
not
+ * tolerate failures of the master processes (JobManager, ResourceManager).
+ * 
+ * <p>This implementation has no dependencies on any external services. It 
returns fix
+ * pre-configured leaders, and stores checkpoints and metadata simply on the 
heap and therefore
+ * in volatile memory.
+ */
+public class NonHaServices implements HighAvailabilityServices {
+
+       /** The fix address of the ResourceManager */
+       private final String resourceManagerAddress;
+
+       /**
+        * Creates a new services class for the fix pre-defined leaders.
+        * 
+        * @param resourceManagerAddress    The fix address of the 
ResourceManager
+        */
+       public NonHaServices(String resourceManagerAddress) {
+               this.resourceManagerAddress = 
checkNotNull(resourceManagerAddress);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Services
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public LeaderRetrievalService getResourceManagerLeaderRetriever() 
throws Exception {
+               return new 
StandaloneLeaderRetrievalService(resourceManagerAddress, new UUID(0, 0));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe90811a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
index 26a34aa..16b163c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
@@ -18,44 +18,74 @@
 
 package org.apache.flink.runtime.leaderretrieval;
 
-import org.apache.flink.util.Preconditions;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * Standalone implementation of the {@link LeaderRetrievalService}. The 
standalone implementation
- * assumes that there is only a single {@link 
org.apache.flink.runtime.jobmanager.JobManager} whose
- * address is given to the service when creating it. This address is directly 
given to the
- * {@link LeaderRetrievalListener} when the service is started.
+ * Standalone implementation of the {@link LeaderRetrievalService}. This 
implementation
+ * assumes that there is only a single contender for leadership
+ * (e.g., a single JobManager or ResourceManager process) and that this 
process is
+ * reachable under a constant address.
+ * 
+ * <p>As soon as this service is started, it immediately notifies the leader 
listener
+ * of the leader contender with the pre-configured address.
  */
 public class StandaloneLeaderRetrievalService implements 
LeaderRetrievalService {
 
-       /** Address of the only JobManager */
-       private final String jobManagerAddress;
+       private final Object startStopLock = new Object();
+       
+       /** The fix address of the leader */
+       private final String leaderAddress;
+
+       /** The fix leader ID (leader lock fencing token) */
+       private final UUID leaderId;
 
-       /** Listener which wants to be notified about the new leader */
-       private LeaderRetrievalListener leaderListener;
+       /** Flag whether this service is started */
+       private boolean started;
 
        /**
-        * Creates a StandaloneLeaderRetrievalService with the given JobManager 
address.
+        * Creates a StandaloneLeaderRetrievalService with the given leader 
address.
+        * The leaderId will be null.
         *
-        * @param jobManagerAddress The JobManager's address which is returned 
to the
-        *                                                      {@link 
LeaderRetrievalListener}
+        * @param leaderAddress The leader's pre-configured address
         */
-       public StandaloneLeaderRetrievalService(String jobManagerAddress) {
-               this.jobManagerAddress = jobManagerAddress;
+       public StandaloneLeaderRetrievalService(String leaderAddress) {
+               this.leaderAddress = checkNotNull(leaderAddress);
+               this.leaderId = null;
        }
 
+       /**
+        * Creates a StandaloneLeaderRetrievalService with the given leader 
address.
+        *
+        * @param leaderAddress The leader's pre-configured address
+        * @param leaderId      The constant leaderId.
+        */
+       public StandaloneLeaderRetrievalService(String leaderAddress, UUID 
leaderId) {
+               this.leaderAddress = checkNotNull(leaderAddress);
+               this.leaderId = checkNotNull(leaderId);
+       }
+
+       // 
------------------------------------------------------------------------
+
        @Override
        public void start(LeaderRetrievalListener listener) {
-               Preconditions.checkNotNull(listener, "Listener must not be 
null.");
-               Preconditions.checkState(leaderListener == null, 
"StandaloneLeaderRetrievalService can " +
-                               "only be started once.");
+               checkNotNull(listener, "Listener must not be null.");
 
-               leaderListener = listener;
+               synchronized (startStopLock) {
+                       checkState(!started, "StandaloneLeaderRetrievalService 
can only be started once.");
+                       started = true;
 
-               // directly notify the listener, because we already know the 
leading JobManager's address
-               leaderListener.notifyLeaderAddress(jobManagerAddress, null);
+                       // directly notify the listener, because we already 
know the leading JobManager's address
+                       listener.notifyLeaderAddress(leaderAddress, leaderId);
+               }
        }
 
        @Override
-       public void stop() {}
+       public void stop() {
+               synchronized (startStopLock) {
+                       started = false;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe90811a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 67ac182..a28bc14 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -237,7 +237,6 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
         * }</pre>
         */
        public void validateRunsInMainThread() {
-               // because the initialization is lazy, it can be that certain 
methods are
                assert currentMainThread.get() == Thread.currentThread();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe90811a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index f93be83..fabdb05 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -18,8 +18,11 @@
 
 package org.apache.flink.runtime.rpc;
 
+import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * Interface for rpc services. An rpc service is used to start and connect to 
a {@link RpcEndpoint}.
  * Connecting to a rpc server will return a {@link RpcGateway} which can be 
used to call remote
@@ -71,4 +74,28 @@ public interface RpcService {
         * @return Fully qualified address
         */
        <C extends RpcGateway> String getAddress(C selfGateway);
+
+       /**
+        * Gets the execution context, provided by this RPC service. This 
execution
+        * context can be used for example for the {@code onComplete(...)} or 
{@code onSuccess(...)}
+        * methods of Futures.
+        * 
+        * <p><b>IMPORTANT:</b> This execution context does not isolate the 
method invocations against
+        * any concurrent invocations and is therefore not suitable to run 
completion methods of futures
+        * that modify state of an {@link RpcEndpoint}. For such operations, 
one needs to use the
+        * {@link RpcEndpoint#getMainThreadExecutionContext() 
MainThreadExecutionContext} of that
+        * {@code RpcEndpoint}.
+        * 
+        * @return The execution context provided by the RPC service
+        */
+       ExecutionContext getExecutionContext();
+
+       /**
+        * Execute the runnable in the execution context of this RPC Service, 
as returned by
+        * {@link #getExecutionContext()}, after a scheduled delay.
+        *
+        * @param runnable Runnable to be executed
+        * @param delay    The delay after which the runnable will be executed
+        */
+       void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe90811a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 7b33524..b647bbd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -38,14 +38,18 @@ import org.apache.flink.runtime.rpc.StartStoppable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 import javax.annotation.concurrent.ThreadSafe;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Proxy;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -199,4 +203,18 @@ public class AkkaRpcService implements RpcService {
                        throw new IllegalArgumentException("Cannot get address 
for non " + className + '.');
                }
        }
+
+       @Override
+       public ExecutionContext getExecutionContext() {
+               return actorSystem.dispatcher();
+       }
+
+       @Override
+       public void scheduleRunnable(Runnable runnable, long delay, TimeUnit 
unit) {
+               checkNotNull(runnable, "runnable");
+               checkNotNull(unit, "unit");
+               checkArgument(delay >= 0, "delay must be zero or larger");
+
+               actorSystem.scheduler().scheduleOnce(new FiniteDuration(delay, 
unit), runnable, getExecutionContext());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe90811a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
index c18906c..ce4f9d6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
@@ -36,6 +36,7 @@ public final class RunAsync implements Serializable {
        private final long delay;
 
        /**
+        * Creates a new {@code RunAsync} message.
         * 
         * @param runnable  The Runnable to run.
         * @param delay     The delay in milliseconds. Zero indicates immediate 
execution.

http://git-wip-us.apache.org/repos/asf/flink/blob/fe90811a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java
new file mode 100644
index 0000000..2de560a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.registration;
+
+import java.io.Serializable;
+
+/**
+ * Base class for responses given to registration attempts from {@link 
RetryingRegistration}.
+ */
+public abstract class RegistrationResponse implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       // 
----------------------------------------------------------------------------
+       
+       /**
+        * Base class for a successful registration. Concrete registration 
implementations
+        * will typically extend this class to attach more information.
+        */
+       public static class Success extends RegistrationResponse {
+               private static final long serialVersionUID = 1L;
+               
+               @Override
+               public String toString() {
+                       return "Registration Successful";
+               }
+       }
+
+       // 
----------------------------------------------------------------------------
+
+       /**
+        * A rejected (declined) registration.
+        */
+       public static final class Decline extends RegistrationResponse {
+               private static final long serialVersionUID = 1L;
+
+               /** the rejection reason */
+               private final String reason;
+
+               /**
+                * Creates a new rejection message.
+                * 
+                * @param reason The reason for the rejection.
+                */
+               public Decline(String reason) {
+                       this.reason = reason != null ? reason : "(unknown)";
+               }
+
+               /**
+                * Gets the reason for the rejection.
+                */
+               public String getReason() {
+                       return reason;
+               }
+
+               @Override
+               public String toString() {
+                       return "Registration Declined (" + reason + ')';
+               }
+       }
+}
+
+
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/fe90811a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
new file mode 100644
index 0000000..4c93684
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.registration;
+
+import akka.dispatch.OnFailure;
+import akka.dispatch.OnSuccess;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+import scala.concurrent.impl.Promise.DefaultPromise;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+
+/**
+ * This utility class implements the basis of registering one component at 
another component,
+ * for example registering the TaskExecutor at the ResourceManager.
+ * This {@code RetryingRegistration} implements both the initial address 
resolution
+ * and the retries-with-backoff strategy.
+ * 
+ * <p>The registration gives access to a future that is completed upon 
successful registration.
+ * The registration can be canceled, for example when the target where it 
tries to register
+ * at looses leader status.
+ * 
+ * @param <Gateway> The type of the gateway to connect to.
+ * @param <Success> The type of the successful registration responses.
+ */
+public abstract class RetryingRegistration<Gateway extends RpcGateway, Success 
extends RegistrationResponse.Success> {
+
+       // 
------------------------------------------------------------------------
+       //  default configuration values
+       // 
------------------------------------------------------------------------
+
+       private static final long INITIAL_REGISTRATION_TIMEOUT_MILLIS = 100;
+
+       private static final long MAX_REGISTRATION_TIMEOUT_MILLIS = 30000;
+
+       private static final long ERROR_REGISTRATION_DELAY_MILLIS = 10000;
+
+       private static final long REFUSED_REGISTRATION_DELAY_MILLIS = 30000;
+
+       // 
------------------------------------------------------------------------
+       // Fields
+       // 
------------------------------------------------------------------------
+
+       private final Logger log;
+
+       private final RpcService rpcService;
+
+       private final String targetName;
+
+       private final Class<Gateway> targetType;
+
+       private final String targetAddress;
+
+       private final UUID leaderId;
+
+       private final Promise<Tuple2<Gateway, Success>> completionPromise;
+
+       private final long initialRegistrationTimeout;
+
+       private final long maxRegistrationTimeout;
+
+       private final long delayOnError;
+
+       private final long delayOnRefusedRegistration;
+
+       private volatile boolean canceled;
+
+       // 
------------------------------------------------------------------------
+
+       public RetryingRegistration(
+                       Logger log,
+                       RpcService rpcService,
+                       String targetName,
+                       Class<Gateway> targetType,
+                       String targetAddress,
+                       UUID leaderId) {
+               this(log, rpcService, targetName, targetType, targetAddress, 
leaderId,
+                               INITIAL_REGISTRATION_TIMEOUT_MILLIS, 
MAX_REGISTRATION_TIMEOUT_MILLIS,
+                               ERROR_REGISTRATION_DELAY_MILLIS, 
REFUSED_REGISTRATION_DELAY_MILLIS);
+       }
+
+       public RetryingRegistration(
+                       Logger log,
+                       RpcService rpcService,
+                       String targetName, 
+                       Class<Gateway> targetType,
+                       String targetAddress,
+                       UUID leaderId,
+                       long initialRegistrationTimeout,
+                       long maxRegistrationTimeout,
+                       long delayOnError,
+                       long delayOnRefusedRegistration) {
+
+               checkArgument(initialRegistrationTimeout > 0, "initial 
registration timeout must be greater than zero");
+               checkArgument(maxRegistrationTimeout > 0, "maximum registration 
timeout must be greater than zero");
+               checkArgument(delayOnError >= 0, "delay on error must be 
non-negative");
+               checkArgument(delayOnRefusedRegistration >= 0, "delay on 
refused registration must be non-negative");
+
+               this.log = checkNotNull(log);
+               this.rpcService = checkNotNull(rpcService);
+               this.targetName = checkNotNull(targetName);
+               this.targetType = checkNotNull(targetType);
+               this.targetAddress = checkNotNull(targetAddress);
+               this.leaderId = checkNotNull(leaderId);
+               this.initialRegistrationTimeout = initialRegistrationTimeout;
+               this.maxRegistrationTimeout = maxRegistrationTimeout;
+               this.delayOnError = delayOnError;
+               this.delayOnRefusedRegistration = delayOnRefusedRegistration;
+
+               this.completionPromise = new DefaultPromise<>();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  completion and cancellation
+       // 
------------------------------------------------------------------------
+
+       public Future<Tuple2<Gateway, Success>> getFuture() {
+               return completionPromise.future();
+       }
+
+       /**
+        * Cancels the registration procedure.
+        */
+       public void cancel() {
+               canceled = true;
+       }
+
+       /**
+        * Checks if the registration was canceled.
+        * @return True if the registration was canceled, false otherwise.
+        */
+       public boolean isCanceled() {
+               return canceled;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  registration
+       // 
------------------------------------------------------------------------
+
+       protected abstract Future<RegistrationResponse> invokeRegistration(
+                       Gateway gateway, UUID leaderId, long timeoutMillis) 
throws Exception;
+
+       /**
+        * This method resolves the target address to a callable gateway and 
starts the
+        * registration after that.
+        */
+       @SuppressWarnings("unchecked")
+       public void startRegistration() {
+               try {
+                       // trigger resolution of the resource manager address 
to a callable gateway
+                       Future<Gateway> resourceManagerFuture = 
rpcService.connect(targetAddress, targetType);
+       
+                       // upon success, start the registration attempts
+                       resourceManagerFuture.onSuccess(new 
OnSuccess<Gateway>() {
+                               @Override
+                               public void onSuccess(Gateway result) {
+                                       log.info("Resolved {} address, 
beginning registration", targetName);
+                                       register(result, 1, 
initialRegistrationTimeout);
+                               }
+                       }, rpcService.getExecutionContext());
+       
+                       // upon failure, retry, unless this is cancelled
+                       resourceManagerFuture.onFailure(new OnFailure() {
+                               @Override
+                               public void onFailure(Throwable failure) {
+                                       if (!isCanceled()) {
+                                               log.warn("Could not resolve {} 
address {}, retrying...", targetName, targetAddress);
+                                               startRegistration();
+                                       }
+                               }
+                       }, rpcService.getExecutionContext());
+               }
+               catch (Throwable t) {
+                       cancel();
+                       completionPromise.tryFailure(t);
+               }
+       }
+
+       /**
+        * This method performs a registration attempt and triggers either a 
success notification or a retry,
+        * depending on the result.
+        */
+       @SuppressWarnings("unchecked")
+       private void register(final Gateway gateway, final int attempt, final 
long timeoutMillis) {
+               // eager check for canceling to avoid some unnecessary work
+               if (canceled) {
+                       return;
+               }
+
+               try {
+                       log.info("Registration at {} attempt {} 
(timeout={}ms)", targetName, attempt, timeoutMillis);
+                       Future<RegistrationResponse> registrationFuture = 
invokeRegistration(gateway, leaderId, timeoutMillis);
+       
+                       // if the registration was successful, let the 
TaskExecutor know
+                       registrationFuture.onSuccess(new 
OnSuccess<RegistrationResponse>() {
+                               
+                               @Override
+                               public void onSuccess(RegistrationResponse 
result) throws Throwable {
+                                       if (!isCanceled()) {
+                                               if (result instanceof 
RegistrationResponse.Success) {
+                                                       // registration 
successful!
+                                                       Success success = 
(Success) result;
+                                                       
completionPromise.success(new Tuple2<>(gateway, success));
+                                               }
+                                               else {
+                                                       // registration refused 
or unknown
+                                                       if (result instanceof 
RegistrationResponse.Decline) {
+                                                               
RegistrationResponse.Decline decline = (RegistrationResponse.Decline) result;
+                                                               
log.info("Registration at {} was declined: {}", targetName, 
decline.getReason());
+                                                       } else {
+                                                               
log.error("Received unknown response to registration attempt: " + result);
+                                                       }
+
+                                                       log.info("Pausing and 
re-attempting registration in {} ms", delayOnRefusedRegistration);
+                                                       registerLater(gateway, 
1, initialRegistrationTimeout, delayOnRefusedRegistration);
+                                               }
+                                       }
+                               }
+                       }, rpcService.getExecutionContext());
+       
+                       // upon failure, retry
+                       registrationFuture.onFailure(new OnFailure() {
+                               @Override
+                               public void onFailure(Throwable failure) {
+                                       if (!isCanceled()) {
+                                               if (failure instanceof 
TimeoutException) {
+                                                       // we simply have not 
received a response in time. maybe the timeout was
+                                                       // very low (initial 
fast registration attempts), maybe the target endpoint is
+                                                       // currently down.
+                                                       if 
(log.isDebugEnabled()) {
+                                                               
log.debug("Registration at {} ({}) attempt {} timed out after {} ms",
+                                                                               
targetName, targetAddress, attempt, timeoutMillis);
+                                                       }
+       
+                                                       long newTimeoutMillis = 
Math.min(2 * timeoutMillis, maxRegistrationTimeout);
+                                                       register(gateway, 
attempt + 1, newTimeoutMillis);
+                                               }
+                                               else {
+                                                       // a serious failure 
occurred. we still should not give up, but keep trying
+                                                       log.error("Registration 
at " + targetName + " failed due to an error", failure);
+                                                       log.info("Pausing and 
re-attempting registration in {} ms", delayOnError);
+       
+                                                       registerLater(gateway, 
1, initialRegistrationTimeout, delayOnError);
+                                               }
+                                       }
+                               }
+                       }, rpcService.getExecutionContext());
+               }
+               catch (Throwable t) {
+                       cancel();
+                       completionPromise.tryFailure(t);
+               }
+       }
+
+       private void registerLater(final Gateway gateway, final int attempt, 
final long timeoutMillis, long delay) {
+               rpcService.scheduleRunnable(new Runnable() {
+                       @Override
+                       public void run() {
+                               register(gateway, attempt, timeoutMillis);
+                       }
+               }, delay, TimeUnit.MILLISECONDS);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe90811a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
index 729ef0c..6f34465 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
@@ -19,19 +19,24 @@
 package org.apache.flink.runtime.rpc.resourcemanager;
 
 import akka.dispatch.Mapper;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
 import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
+import 
org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.util.Preconditions;
+
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.ExecutionContext$;
 import scala.concurrent.Future;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 
 /**
@@ -93,4 +98,22 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> {
                System.out.println("SlotRequest: " + slotRequest);
                return new SlotAssignment();
        }
+
+
+       /**
+        *
+        * @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader 
+        * @param taskExecutorAddress      The address of the TaskExecutor that 
registers
+        * @param resourceID               The resource ID of the TaskExecutor 
that registers
+        *
+        * @return The response by the ResourceManager.
+        */
+       @RpcMethod
+       public org.apache.flink.runtime.rpc.registration.RegistrationResponse 
registerTaskExecutor(
+                       UUID resourceManagerLeaderId,
+                       String taskExecutorAddress,
+                       ResourceID resourceID) {
+
+               return new TaskExecutorRegistrationSuccess(new InstanceID(), 
5000);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe90811a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
index 464a261..afddb01 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
@@ -18,14 +18,18 @@
 
 package org.apache.flink.runtime.rpc.resourcemanager;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
+
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.util.UUID;
+
 /**
- * {@link ResourceManager} rpc gateway interface.
+ * The {@link ResourceManager}'s RPC gateway interface.
  */
 public interface ResourceManagerGateway extends RpcGateway {
 
@@ -55,4 +59,19 @@ public interface ResourceManagerGateway extends RpcGateway {
         * @return Future slot assignment
         */
        Future<SlotAssignment> requestSlot(SlotRequest slotRequest);
+
+       /**
+        * 
+        * @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader 
+        * @param taskExecutorAddress      The address of the TaskExecutor that 
registers
+        * @param resourceID               The resource ID of the TaskExecutor 
that registers
+        * @param timeout                  The timeout for the response.
+        * 
+        * @return The future to the response by the ResourceManager.
+        */
+       Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> 
registerTaskExecutor(
+                       UUID resourceManagerLeaderId,
+                       String taskExecutorAddress,
+                       ResourceID resourceID,
+                       @RpcTimeout FiniteDuration timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe90811a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
new file mode 100644
index 0000000..e42fa4a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import java.io.Serializable;
+
+/**
+ * A report about the current status of all slots of the TaskExecutor, 
describing
+ * which slots are available and allocated, and what jobs (JobManagers) the 
allocated slots
+ * have been allocated to.
+ */
+public class SlotReport implements Serializable{
+
+       private static final long serialVersionUID = 1L;
+
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public String toString() {
+               return "SlotReport";
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe90811a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
index 3a7dd9f..1a637bb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
@@ -18,67 +18,152 @@
 
 package org.apache.flink.runtime.rpc.taskexecutor;
 
-import akka.dispatch.ExecutionContexts$;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.util.Preconditions;
-import scala.concurrent.ExecutionContext;
 
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * TaskExecutor implementation. The task executor is responsible for the 
execution of multiple
  * {@link org.apache.flink.runtime.taskmanager.Task}.
- *
- * It offers the following methods as part of its rpc interface to interact 
with him remotely:
- * <ul>
- *     <li>{@link #executeTask(TaskDeploymentDescriptor)} executes a given 
task on the TaskExecutor</li>
- *     <li>{@link #cancelTask(ExecutionAttemptID)} cancels a given task 
identified by the {@link ExecutionAttemptID}</li>
- * </ul>
  */
 public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
-       private final ExecutionContext executionContext;
-       private final Set<ExecutionAttemptID> tasks = new HashSet<>();
 
-       public TaskExecutor(RpcService rpcService, ExecutorService 
executorService) {
+       /** The unique resource ID of this TaskExecutor */
+       private final ResourceID resourceID;
+
+       /** The access to the leader election and metadata storage services */
+       private final HighAvailabilityServices haServices;
+
+       // --------- resource manager --------
+
+       private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
+
+       // 
------------------------------------------------------------------------
+
+       public TaskExecutor(
+                       RpcService rpcService,
+                       HighAvailabilityServices haServices,
+                       ResourceID resourceID) {
+
                super(rpcService);
-               this.executionContext = ExecutionContexts$.MODULE$.fromExecutor(
-                       Preconditions.checkNotNull(executorService));
+
+               this.haServices = checkNotNull(haServices);
+               this.resourceID = checkNotNull(resourceID);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Properties
+       // 
------------------------------------------------------------------------
+
+       public ResourceID getResourceID() {
+               return resourceID;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Life cycle
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void start() {
+               // start by connecting to the ResourceManager
+               try {
+                       
haServices.getResourceManagerLeaderRetriever().start(new 
ResourceManagerLeaderListener());
+               } catch (Exception e) {
+                       onFatalErrorAsync(e);
+               }
+       }
+
+
+       // 
------------------------------------------------------------------------
+       //  RPC methods - ResourceManager related
+       // 
------------------------------------------------------------------------
+
+       @RpcMethod
+       public void notifyOfNewResourceManagerLeader(String newLeaderAddress, 
UUID newLeaderId) {
+               if (resourceManagerConnection != null) {
+                       if (newLeaderAddress != null) {
+                               // the resource manager switched to a new leader
+                               log.info("ResourceManager leader changed from 
{} to {}. Registering at new leader.",
+                                               
resourceManagerConnection.getResourceManagerAddress(), newLeaderAddress);
+                       }
+                       else {
+                               // address null means that the current leader 
is lost without a new leader being there, yet
+                               log.info("Current ResourceManager {} lost 
leader status. Waiting for new ResourceManager leader.",
+                                               
resourceManagerConnection.getResourceManagerAddress());
+                       }
+
+                       // drop the current connection or connection attempt
+                       if (resourceManagerConnection != null) {
+                               resourceManagerConnection.close();
+                               resourceManagerConnection = null;
+                       }
+               }
+
+               // establish a connection to the new leader
+               if (newLeaderAddress != null) {
+                       log.info("Attempting to register at ResourceManager 
{}", newLeaderAddress);
+                       resourceManagerConnection = 
+                                       new 
TaskExecutorToResourceManagerConnection(log, this, newLeaderAddress, 
newLeaderId);
+                       resourceManagerConnection.start();
+               }
        }
 
+       // 
------------------------------------------------------------------------
+       //  Error handling
+       // 
------------------------------------------------------------------------
+
        /**
-        * Execute the given task on the task executor. The task is described 
by the provided
-        * {@link TaskDeploymentDescriptor}.
-        *
-        * @param taskDeploymentDescriptor Descriptor for the task to be 
executed
-        * @return Acknowledge the start of the task execution
+        * Notifies the TaskExecutor that a fatal error has occurred and it 
cannot proceed.
+        * This method should be used when asynchronous threads want to notify 
the
+        * TaskExecutor of a fatal error.
+        * 
+        * @param t The exception describing the fatal error
         */
-       @RpcMethod
-       public Acknowledge executeTask(TaskDeploymentDescriptor 
taskDeploymentDescriptor) {
-               tasks.add(taskDeploymentDescriptor.getExecutionId());
-               return Acknowledge.get();
+       void onFatalErrorAsync(final Throwable t) {
+               runAsync(new Runnable() {
+                       @Override
+                       public void run() {
+                               onFatalError(t);
+                       }
+               });
        }
 
        /**
-        * Cancel a task identified by it {@link ExecutionAttemptID}. If the 
task cannot be found, then
-        * the method throws an {@link Exception}.
-        *
-        * @param executionAttemptId Execution attempt ID identifying the task 
to be canceled.
-        * @return Acknowledge the task canceling
-        * @throws Exception if the task with the given execution attempt id 
could not be found
+        * Notifies the TaskExecutor that a fatal error has occurred and it 
cannot proceed.
+        * This method must only be called from within the TaskExecutor's main 
thread.
+        * 
+        * @param t The exception describing the fatal error
         */
-       @RpcMethod
-       public Acknowledge cancelTask(ExecutionAttemptID executionAttemptId) 
throws Exception {
-               if (tasks.contains(executionAttemptId)) {
-                       return Acknowledge.get();
-               } else {
-                       throw new Exception("Could not find task.");
+       void onFatalError(Throwable t) {
+               // to be determined, probably delegate to a fatal error handler 
that 
+               // would either log (mini cluster) ot kill the process (yarn, 
mesos, ...)
+               log.error("FATAL ERROR", t);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utility classes
+       // 
------------------------------------------------------------------------
+
+       /**
+        * The listener for leader changes of the resource manager
+        */
+       private class ResourceManagerLeaderListener implements 
LeaderRetrievalListener {
+
+               @Override
+               public void notifyLeaderAddress(String leaderAddress, UUID 
leaderSessionID) {
+                       
getSelf().notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID);
+               }
+
+               @Override
+               public void handleError(Exception exception) {
+                       onFatalErrorAsync(exception);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe90811a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java
index 450423e..b0b21b0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java
@@ -18,31 +18,18 @@
 
 package org.apache.flink.runtime.rpc.taskexecutor;
 
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.RpcGateway;
-import scala.concurrent.Future;
+
+import java.util.UUID;
 
 /**
- * {@link TaskExecutor} rpc gateway interface
+ * {@link TaskExecutor} RPC gateway interface
  */
 public interface TaskExecutorGateway extends RpcGateway {
-       /**
-        * Execute the given task on the task executor. The task is described 
by the provided
-        * {@link TaskDeploymentDescriptor}.
-        *
-        * @param taskDeploymentDescriptor Descriptor for the task to be 
executed
-        * @return Future acknowledge of the start of the task execution
-        */
-       Future<Acknowledge> executeTask(TaskDeploymentDescriptor 
taskDeploymentDescriptor);
 
-       /**
-        * Cancel a task identified by it {@link ExecutionAttemptID}. If the 
task cannot be found, then
-        * the method throws an {@link Exception}.
-        *
-        * @param executionAttemptId Execution attempt ID identifying the task 
to be canceled.
-        * @return Future acknowledge of the task canceling
-        */
-       Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptId);
+       // 
------------------------------------------------------------------------
+       //  ResourceManager handlers
+       // 
------------------------------------------------------------------------
+
+       void notifyOfNewResourceManagerLeader(String address, UUID 
resourceManagerLeaderId);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe90811a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorRegistrationSuccess.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorRegistrationSuccess.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorRegistrationSuccess.java
new file mode 100644
index 0000000..641102d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorRegistrationSuccess.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.rpc.registration.RegistrationResponse;
+
+import java.io.Serializable;
+
+/**
+ * Base class for responses from the ResourceManager to a registration attempt 
by a
+ * TaskExecutor.
+ */
+public final class TaskExecutorRegistrationSuccess extends 
RegistrationResponse.Success implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       private final InstanceID registrationId;
+
+       private final long heartbeatInterval;
+
+       /**
+        * Create a new {@code TaskExecutorRegistrationSuccess} message.
+        * 
+        * @param registrationId     The ID that the ResourceManager assigned 
the registration.
+        * @param heartbeatInterval  The interval in which the ResourceManager 
will heartbeat the TaskExecutor.
+        */
+       public TaskExecutorRegistrationSuccess(InstanceID registrationId, long 
heartbeatInterval) {
+               this.registrationId = registrationId;
+               this.heartbeatInterval = heartbeatInterval;
+       }
+
+       /**
+        * Gets the ID that the ResourceManager assigned the registration.
+        */
+       public InstanceID getRegistrationId() {
+               return registrationId;
+       }
+
+       /**
+        * Gets the interval in which the ResourceManager will heartbeat the 
TaskExecutor.
+        */
+       public long getHeartbeatInterval() {
+               return heartbeatInterval;
+       }
+
+       @Override
+       public String toString() {
+               return "TaskExecutorRegistrationSuccess (" + registrationId + " 
/ " + heartbeatInterval + ')';
+       }
+
+}
+
+
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/fe90811a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java
new file mode 100644
index 0000000..ef75862
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import akka.dispatch.OnFailure;
+import akka.dispatch.OnSuccess;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.registration.RegistrationResponse;
+import org.apache.flink.runtime.rpc.registration.RetryingRegistration;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+
+import org.slf4j.Logger;
+
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+public class TaskExecutorToResourceManagerConnection {
+
+       /** the logger for all log messages of this class */
+       private final Logger log;
+
+       /** the TaskExecutor whose connection to the ResourceManager this 
represents */
+       private final TaskExecutor taskExecutor;
+
+       private final UUID resourceManagerLeaderId;
+
+       private final String resourceManagerAddress;
+
+       private ResourceManagerRegistration pendingRegistration;
+
+       private ResourceManagerGateway registeredResourceManager;
+
+       private InstanceID registrationId;
+
+       /** flag indicating that the connection is closed */
+       private volatile boolean closed;
+
+
+       public TaskExecutorToResourceManagerConnection(
+                       Logger log,
+                       TaskExecutor taskExecutor,
+                       String resourceManagerAddress,
+                       UUID resourceManagerLeaderId) {
+
+               this.log = checkNotNull(log);
+               this.taskExecutor = checkNotNull(taskExecutor);
+               this.resourceManagerAddress = 
checkNotNull(resourceManagerAddress);
+               this.resourceManagerLeaderId = 
checkNotNull(resourceManagerLeaderId);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Life cycle
+       // 
------------------------------------------------------------------------
+
+       @SuppressWarnings("unchecked")
+       public void start() {
+               checkState(!closed, "The connection is already closed");
+               checkState(!isRegistered() && pendingRegistration == null, "The 
connection is already started");
+
+               ResourceManagerRegistration registration = new 
ResourceManagerRegistration(
+                               log, taskExecutor.getRpcService(),
+                               resourceManagerAddress, resourceManagerLeaderId,
+                               taskExecutor.getAddress(), 
taskExecutor.getResourceID());
+
+               Future<Tuple2<ResourceManagerGateway, 
TaskExecutorRegistrationSuccess>> future = registration.getFuture();
+               
+               future.onSuccess(new OnSuccess<Tuple2<ResourceManagerGateway, 
TaskExecutorRegistrationSuccess>>() {
+                       @Override
+                       public void onSuccess(Tuple2<ResourceManagerGateway, 
TaskExecutorRegistrationSuccess> result) {
+                               registeredResourceManager = result.f0;
+                               registrationId = result.f1.getRegistrationId();
+                       }
+               }, taskExecutor.getMainThreadExecutionContext());
+               
+               // this future should only ever fail if there is a bug, not if 
the registration is declined
+               future.onFailure(new OnFailure() {
+                       @Override
+                       public void onFailure(Throwable failure) {
+                               taskExecutor.onFatalError(failure);
+                       }
+               }, taskExecutor.getMainThreadExecutionContext());
+       }
+
+       public void close() {
+               closed = true;
+
+               // make sure we do not keep re-trying forever
+               if (pendingRegistration != null) {
+                       pendingRegistration.cancel();
+               }
+       }
+
+       public boolean isClosed() {
+               return closed;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Properties
+       // 
------------------------------------------------------------------------
+
+       public UUID getResourceManagerLeaderId() {
+               return resourceManagerLeaderId;
+       }
+
+       public String getResourceManagerAddress() {
+               return resourceManagerAddress;
+       }
+
+       /**
+        * Gets the ResourceManagerGateway. This returns null until the 
registration is completed.
+        */
+       public ResourceManagerGateway getResourceManager() {
+               return registeredResourceManager;
+       }
+
+       /**
+        * Gets the ID under which the TaskExecutor is registered at the 
ResourceManager.
+        * This returns null until the registration is completed.
+        */
+       public InstanceID getRegistrationId() {
+               return registrationId;
+       }
+
+       public boolean isRegistered() {
+               return registeredResourceManager != null;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public String toString() {
+               return String.format("Connection to ResourceManager %s 
(leaderId=%s)",
+                               resourceManagerAddress, 
resourceManagerLeaderId); 
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       static class ResourceManagerRegistration
+                       extends RetryingRegistration<ResourceManagerGateway, 
TaskExecutorRegistrationSuccess> {
+
+               private final String taskExecutorAddress;
+               
+               private final ResourceID resourceID;
+
+               public ResourceManagerRegistration(
+                               Logger log,
+                               RpcService rpcService,
+                               String targetAddress,
+                               UUID leaderId,
+                               String taskExecutorAddress,
+                               ResourceID resourceID) {
+
+                       super(log, rpcService, "ResourceManager", 
ResourceManagerGateway.class, targetAddress, leaderId);
+                       this.taskExecutorAddress = 
checkNotNull(taskExecutorAddress);
+                       this.resourceID = checkNotNull(resourceID);
+               }
+
+               @Override
+               protected Future<RegistrationResponse> invokeRegistration(
+                               ResourceManagerGateway resourceManager, UUID 
leaderId, long timeoutMillis) throws Exception {
+
+                       FiniteDuration timeout = new 
FiniteDuration(timeoutMillis, TimeUnit.MILLISECONDS);
+                       return resourceManager.registerTaskExecutor(leaderId, 
taskExecutorAddress, resourceID, timeout);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe90811a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index fd55904..7b4ab89 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -20,15 +20,17 @@ package org.apache.flink.runtime.rpc.akka;
 
 import akka.actor.ActorSystem;
 import akka.util.Timeout;
+
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
 import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
 import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
 import org.junit.Test;
+
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -41,6 +43,49 @@ import static org.junit.Assert.assertTrue;
 
 public class AkkaRpcServiceTest extends TestLogger {
 
+       // 
------------------------------------------------------------------------
+       //  shared test members
+       // 
------------------------------------------------------------------------
+
+       private static ActorSystem actorSystem = 
AkkaUtils.createDefaultActorSystem();
+
+       private static AkkaRpcService akkaRpcService =
+                       new AkkaRpcService(actorSystem, new Timeout(10000, 
TimeUnit.MILLISECONDS));
+
+       @AfterClass
+       public static void shutdown() {
+               akkaRpcService.stopService();
+               actorSystem.shutdown();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  tests
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testScheduleRunnable() throws Exception {
+               final OneShotLatch latch = new OneShotLatch();
+               final long delay = 100;
+               final long start = System.nanoTime();
+
+               akkaRpcService.scheduleRunnable(new Runnable() {
+                       @Override
+                       public void run() {
+                               latch.trigger();
+                       }
+               }, delay, TimeUnit.MILLISECONDS);
+
+               latch.await();
+               final long stop = System.nanoTime();
+
+               assertTrue("call was not properly delayed", ((stop - start) / 
1000000) >= delay);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  specific component tests - should be moved to the test classes
+       //  for those components
+       // 
------------------------------------------------------------------------
+
        /**
         * Tests that the {@link JobMaster} can connect to the {@link 
ResourceManager} using the
         * {@link AkkaRpcService}.

http://git-wip-us.apache.org/repos/asf/flink/blob/fe90811a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
index c96f4f6..9f9bab3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
@@ -18,93 +18,8 @@
 
 package org.apache.flink.runtime.rpc.taskexecutor;
 
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.MainThreadExecutor;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.StartStoppable;
-import org.apache.flink.runtime.util.DirectExecutorService;
-import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.cglib.proxy.InvocationHandler;
-import org.mockito.cglib.proxy.Proxy;
-import scala.concurrent.Future;
-
-import java.net.URL;
-import java.util.Collections;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 public class TaskExecutorTest extends TestLogger {
-
-       /**
-        * Tests that we can deploy and cancel a task on the TaskExecutor 
without exceptions
-        */
-       @Test
-       public void testTaskExecution() throws Exception {
-               RpcService testingRpcService = mock(RpcService.class);
-               InvocationHandler invocationHandler = 
mock(InvocationHandler.class);
-               Object selfGateway = 
Proxy.newProxyInstance(ClassLoader.getSystemClassLoader(), new Class<?>[] 
{TaskExecutorGateway.class, MainThreadExecutor.class, StartStoppable.class}, 
invocationHandler);
-               
when(testingRpcService.startServer(Matchers.any(RpcEndpoint.class))).thenReturn((RpcGateway)selfGateway);
-
-               DirectExecutorService directExecutorService = new 
DirectExecutorService();
-               TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, 
directExecutorService);
-               taskExecutor.start();
-
-               TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
-                       new JobID(),
-                       "Test job",
-                       new JobVertexID(),
-                       new ExecutionAttemptID(),
-                       new SerializedValue<ExecutionConfig>(null),
-                       "Test task",
-                       0,
-                       1,
-                       0,
-                       new Configuration(),
-                       new Configuration(),
-                       "Invokable",
-                       
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-                       Collections.<InputGateDeploymentDescriptor>emptyList(),
-                       Collections.<BlobKey>emptyList(),
-                       Collections.<URL>emptyList(),
-                       0
-               );
-
-               Acknowledge ack = taskExecutor.executeTask(tdd);
-
-               ack = taskExecutor.cancelTask(tdd.getExecutionId());
-       }
-
-       /**
-        * Tests that cancelling a non-existing task will return an exception
-        */
-       @Test(expected=Exception.class)
-       public void testWrongTaskCancellation() throws Exception {
-               RpcService testingRpcService = mock(RpcService.class);
-               InvocationHandler invocationHandler = 
mock(InvocationHandler.class);
-               Object selfGateway = 
Proxy.newProxyInstance(ClassLoader.getSystemClassLoader(), new Class<?>[] 
{TaskExecutorGateway.class, MainThreadExecutor.class, StartStoppable.class}, 
invocationHandler);
-               
when(testingRpcService.startServer(Matchers.any(RpcEndpoint.class))).thenReturn((RpcGateway)selfGateway);
-               DirectExecutorService directExecutorService = null;
-               TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, 
directExecutorService);
-               taskExecutor.start();
-
-               taskExecutor.cancelTask(new ExecutionAttemptID());
-
-               fail("The cancellation should have thrown an exception.");
-       }
+       
 }

Reply via email to