[FLINK-4656] [rpc] Port the existing code to Flink's own future abstraction

This closes #2530.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31a091b9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31a091b9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31a091b9

Branch: refs/heads/flip-6
Commit: 31a091b930178bf2aec2881ee273fe0e5e17464d
Parents: 04fbdb3
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Wed Sep 21 17:26:21 2016 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Wed Sep 21 18:20:17 2016 +0200

----------------------------------------------------------------------
 .../concurrent/impl/FlinkCompletableFuture.java | 22 +++++-
 .../runtime/concurrent/impl/FlinkFuture.java    |  4 ++
 .../flink/runtime/jobmaster/JobMaster.java      |  2 +-
 .../runtime/jobmaster/JobMasterGateway.java     |  2 +-
 .../registration/RetryingRegistration.java      | 65 ++++++++---------
 .../resourcemanager/ResourceManager.java        | 13 ++--
 .../resourcemanager/ResourceManagerGateway.java |  9 ++-
 .../slotmanager/SlotManager.java                |  9 ++-
 .../flink/runtime/rpc/MainThreadExecutable.java | 64 +++++++++++++++++
 .../flink/runtime/rpc/MainThreadExecutor.java   | 64 -----------------
 .../apache/flink/runtime/rpc/RpcEndpoint.java   | 60 ++++++----------
 .../apache/flink/runtime/rpc/RpcService.java    | 17 +++--
 .../runtime/rpc/akka/AkkaInvocationHandler.java | 42 +++++------
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    | 21 +++++-
 .../flink/runtime/rpc/akka/AkkaRpcService.java  | 28 ++++----
 .../runtime/taskexecutor/TaskExecutor.java      | 12 ++--
 .../taskexecutor/TaskExecutorGateway.java       |  6 +-
 ...TaskExecutorToResourceManagerConnection.java | 34 +++++----
 .../registration/RetryingRegistrationTest.java  | 75 ++++++++++----------
 .../registration/TestRegistrationGateway.java   |  6 +-
 .../resourcemanager/ResourceManagerHATest.java  |  4 +-
 .../slotmanager/SlotProtocolTest.java           | 14 ++--
 .../flink/runtime/rpc/AsyncCallsTest.java       | 13 ++--
 .../flink/runtime/rpc/RpcCompletenessTest.java  |  9 +--
 .../flink/runtime/rpc/TestingGatewayBase.java   | 18 ++---
 .../flink/runtime/rpc/TestingRpcService.java    | 20 +++---
 .../runtime/rpc/TestingSerialRpcService.java    | 54 +++++++-------
 .../runtime/rpc/akka/AkkaRpcActorTest.java      | 19 ++---
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    |  4 +-
 .../rpc/akka/MainThreadValidationTest.java      |  7 +-
 .../rpc/akka/MessageSerializationTest.java      | 19 +++--
 .../runtime/taskexecutor/TaskExecutorTest.java  |  9 ++-
 32 files changed, 376 insertions(+), 369 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
index 5566880..e648a71 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.runtime.concurrent.impl;
 
+import akka.dispatch.Futures;
 import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.util.Preconditions;
 import scala.concurrent.Promise;
+import scala.concurrent.Promise$;
 
 import java.util.concurrent.CancellationException;
 
@@ -34,7 +36,17 @@ public class FlinkCompletableFuture<T> extends 
FlinkFuture<T> implements Complet
        private final Promise<T> promise;
 
        public FlinkCompletableFuture() {
-               promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
+               promise = Futures.promise();
+               scalaFuture = promise.future();
+       }
+
+       private FlinkCompletableFuture(T value) {
+               promise = Promise$.MODULE$.successful(value);
+               scalaFuture = promise.future();
+       }
+
+       private FlinkCompletableFuture(Throwable t) {
+               promise = Promise$.MODULE$.failed(t);
                scalaFuture = promise.future();
        }
 
@@ -68,4 +80,12 @@ public class FlinkCompletableFuture<T> extends 
FlinkFuture<T> implements Complet
        public boolean cancel(boolean mayInterruptIfRunning) {
                return completeExceptionally(new CancellationException("Future 
has been canceled."));
        }
+
+       public static <T> FlinkCompletableFuture<T> completed(T value) {
+               return new FlinkCompletableFuture<>(value);
+       }
+
+       public static <T> FlinkCompletableFuture<T> 
completedExceptionally(Throwable t) {
+               return new FlinkCompletableFuture<>(t);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
index 361cd3d..b28a1bd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
@@ -59,6 +59,10 @@ public class FlinkFuture<T> implements Future<T> {
                this.scalaFuture = Preconditions.checkNotNull(scalaFuture);
        }
 
+       public scala.concurrent.Future<T> getScalaFuture() {
+               return scalaFuture;
+       }
+
        
//-----------------------------------------------------------------------------------
        // Future's methods
        
//-----------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 0a6a7ef..1537396 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -36,7 +36,7 @@ import java.util.UUID;
 
 /**
  * JobMaster implementation. The job master is responsible for the execution 
of a single
- * {@link org.apache.flink.runtime.jobgraph.JobGraph}.
+ * {@link JobGraph}.
  * <p>
  * It offers the following methods as part of its rpc interface to interact 
with the JobMaster
  * remotely:

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index a53e383..86bf17c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.runtime.jobmaster;
 
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import scala.concurrent.Future;
 
 /**
  * {@link JobMaster} rpc gateway interface

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
index ea49e42..32dd978 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
@@ -18,19 +18,17 @@
 
 package org.apache.flink.runtime.registration;
 
-import akka.dispatch.OnFailure;
-import akka.dispatch.OnSuccess;
-
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcService;
 
 import org.slf4j.Logger;
 
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.impl.Promise.DefaultPromise;
-
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -86,7 +84,7 @@ public abstract class RetryingRegistration<Gateway extends 
RpcGateway, Success e
 
        private final UUID leaderId;
 
-       private final Promise<Tuple2<Gateway, Success>> completionPromise;
+       private final CompletableFuture<Tuple2<Gateway, Success>> 
completionFuture;
 
        private final long initialRegistrationTimeout;
 
@@ -140,7 +138,7 @@ public abstract class RetryingRegistration<Gateway extends 
RpcGateway, Success e
                this.delayOnError = delayOnError;
                this.delayOnRefusedRegistration = delayOnRefusedRegistration;
 
-               this.completionPromise = new DefaultPromise<>();
+               this.completionFuture = new FlinkCompletableFuture<>();
        }
 
        // 
------------------------------------------------------------------------
@@ -148,7 +146,7 @@ public abstract class RetryingRegistration<Gateway extends 
RpcGateway, Success e
        // 
------------------------------------------------------------------------
 
        public Future<Tuple2<Gateway, Success>> getFuture() {
-               return completionPromise.future();
+               return completionFuture;
        }
 
        /**
@@ -184,28 +182,30 @@ public abstract class RetryingRegistration<Gateway 
extends RpcGateway, Success e
                        Future<Gateway> resourceManagerFuture = 
rpcService.connect(targetAddress, targetType);
        
                        // upon success, start the registration attempts
-                       resourceManagerFuture.onSuccess(new 
OnSuccess<Gateway>() {
+                       resourceManagerFuture.thenAcceptAsync(new 
AcceptFunction<Gateway>() {
                                @Override
-                               public void onSuccess(Gateway result) {
+                               public void accept(Gateway result) {
                                        log.info("Resolved {} address, 
beginning registration", targetName);
                                        register(result, 1, 
initialRegistrationTimeout);
                                }
-                       }, rpcService.getExecutionContext());
-       
+                       }, rpcService.getExecutor());
+
                        // upon failure, retry, unless this is cancelled
-                       resourceManagerFuture.onFailure(new OnFailure() {
+                       resourceManagerFuture.exceptionallyAsync(new 
ApplyFunction<Throwable, Void>() {
                                @Override
-                               public void onFailure(Throwable failure) {
+                               public Void apply(Throwable failure) {
                                        if (!isCanceled()) {
                                                log.warn("Could not resolve {} 
address {}, retrying...", targetName, targetAddress, failure);
                                                startRegistration();
                                        }
+
+                                       return null;
                                }
-                       }, rpcService.getExecutionContext());
+                       }, rpcService.getExecutor());
                }
                catch (Throwable t) {
                        cancel();
-                       completionPromise.tryFailure(t);
+                       completionFuture.completeExceptionally(t);
                }
        }
 
@@ -225,15 +225,14 @@ public abstract class RetryingRegistration<Gateway 
extends RpcGateway, Success e
                        Future<RegistrationResponse> registrationFuture = 
invokeRegistration(gateway, leaderId, timeoutMillis);
        
                        // if the registration was successful, let the 
TaskExecutor know
-                       registrationFuture.onSuccess(new 
OnSuccess<RegistrationResponse>() {
-                               
+                       registrationFuture.thenAcceptAsync(new 
AcceptFunction<RegistrationResponse>() {
                                @Override
-                               public void onSuccess(RegistrationResponse 
result) throws Throwable {
+                               public void accept(RegistrationResponse result) 
{
                                        if (!isCanceled()) {
                                                if (result instanceof 
RegistrationResponse.Success) {
                                                        // registration 
successful!
                                                        Success success = 
(Success) result;
-                                                       
completionPromise.success(new Tuple2<>(gateway, success));
+                                                       
completionFuture.complete(Tuple2.of(gateway, success));
                                                }
                                                else {
                                                        // registration refused 
or unknown
@@ -241,7 +240,7 @@ public abstract class RetryingRegistration<Gateway extends 
RpcGateway, Success e
                                                                
RegistrationResponse.Decline decline = (RegistrationResponse.Decline) result;
                                                                
log.info("Registration at {} was declined: {}", targetName, 
decline.getReason());
                                                        } else {
-                                                               
log.error("Received unknown response to registration attempt: " + result);
+                                                               
log.error("Received unknown response to registration attempt: {}", result);
                                                        }
 
                                                        log.info("Pausing and 
re-attempting registration in {} ms", delayOnRefusedRegistration);
@@ -249,12 +248,12 @@ public abstract class RetryingRegistration<Gateway 
extends RpcGateway, Success e
                                                }
                                        }
                                }
-                       }, rpcService.getExecutionContext());
+                       }, rpcService.getExecutor());
        
                        // upon failure, retry
-                       registrationFuture.onFailure(new OnFailure() {
+                       registrationFuture.exceptionallyAsync(new 
ApplyFunction<Throwable, Void>() {
                                @Override
-                               public void onFailure(Throwable failure) {
+                               public Void apply(Throwable failure) {
                                        if (!isCanceled()) {
                                                if (failure instanceof 
TimeoutException) {
                                                        // we simply have not 
received a response in time. maybe the timeout was
@@ -262,26 +261,28 @@ public abstract class RetryingRegistration<Gateway 
extends RpcGateway, Success e
                                                        // currently down.
                                                        if 
(log.isDebugEnabled()) {
                                                                
log.debug("Registration at {} ({}) attempt {} timed out after {} ms",
-                                                                               
targetName, targetAddress, attempt, timeoutMillis);
+                                                                       
targetName, targetAddress, attempt, timeoutMillis);
                                                        }
-       
+
                                                        long newTimeoutMillis = 
Math.min(2 * timeoutMillis, maxRegistrationTimeout);
                                                        register(gateway, 
attempt + 1, newTimeoutMillis);
                                                }
                                                else {
                                                        // a serious failure 
occurred. we still should not give up, but keep trying
-                                                       log.error("Registration 
at " + targetName + " failed due to an error", failure);
+                                                       log.error("Registration 
at {} failed due to an error", targetName, failure);
                                                        log.info("Pausing and 
re-attempting registration in {} ms", delayOnError);
-       
+
                                                        registerLater(gateway, 
1, initialRegistrationTimeout, delayOnError);
                                                }
                                        }
+
+                                       return null;
                                }
-                       }, rpcService.getExecutionContext());
+                       }, rpcService.getExecutor());
                }
                catch (Throwable t) {
                        cancel();
-                       completionPromise.tryFailure(t);
+                       completionFuture.completeExceptionally(t);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index d9a7134..5370710 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -18,12 +18,11 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
-import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
-
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
@@ -38,7 +37,6 @@ import 
org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -126,10 +124,9 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> impleme
                        
getRpcService().connect(jobMasterRegistration.getAddress(), 
JobMasterGateway.class);
                final JobID jobID = jobMasterRegistration.getJobID();
 
-               return jobMasterFuture.map(new Mapper<JobMasterGateway, 
RegistrationResponse>() {
+               return jobMasterFuture.thenApplyAsync(new 
ApplyFunction<JobMasterGateway, RegistrationResponse>() {
                        @Override
-                       public RegistrationResponse apply(final 
JobMasterGateway jobMasterGateway) {
-
+                       public RegistrationResponse apply(JobMasterGateway 
jobMasterGateway) {
                                final JobMasterGateway existingGateway = 
jobMasterGateways.put(jobID, jobMasterGateway);
                                if (existingGateway != null) {
                                        LOG.info("Replacing existing gateway {} 
for JobID {} with  {}.",
@@ -137,7 +134,7 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> impleme
                                }
                                return new RegistrationResponse(true);
                        }
-               }, getMainThreadExecutionContext());
+               }, getMainThreadExecutor());
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index c8e3488..5c8786c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -18,14 +18,13 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.UUID;
 
 /**
@@ -42,7 +41,7 @@ public interface ResourceManagerGateway extends RpcGateway {
         */
        Future<RegistrationResponse> registerJobMaster(
                JobMasterRegistration jobMasterRegistration,
-               @RpcTimeout FiniteDuration timeout);
+               @RpcTimeout Time timeout);
 
        /**
         * Register a {@link JobMaster} at the resource manager.
@@ -73,5 +72,5 @@ public interface ResourceManagerGateway extends RpcGateway {
                        UUID resourceManagerLeaderId,
                        String taskExecutorAddress,
                        ResourceID resourceID,
-                       @RpcTimeout FiniteDuration timeout);
+                       @RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 96fde7d..97176b2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -19,11 +19,13 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
@@ -33,14 +35,11 @@ import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -79,7 +78,7 @@ public abstract class SlotManager implements 
LeaderRetrievalListener {
        /** All allocations, we can lookup allocations either by SlotID or 
AllocationID */
        private final AllocationMap allocationMap;
 
-       private final FiniteDuration timeout;
+       private final Time timeout;
 
        /** The current leader id set by the ResourceManager */
        private UUID leaderID;
@@ -90,7 +89,7 @@ public abstract class SlotManager implements 
LeaderRetrievalListener {
                this.freeSlots = new HashMap<>(16);
                this.allocationMap = new AllocationMap();
                this.taskManagerGateways = new HashMap<>();
-               this.timeout = new FiniteDuration(10, TimeUnit.SECONDS);
+               this.timeout = Time.seconds(10);
        }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java
new file mode 100644
index 0000000..ec1c984
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Future;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Interface to execute {@link Runnable} and {@link Callable} in the main 
thread of the underlying
+ * RPC endpoint.
+ *
+ * <p>This interface is intended to be implemented by the self gateway in a 
{@link RpcEndpoint}
+ * implementation which allows to dispatch local procedures to the main thread 
of the underlying
+ * RPC endpoint.
+ */
+public interface MainThreadExecutable {
+
+       /**
+        * Execute the runnable in the main thread of the underlying RPC 
endpoint.
+        *
+        * @param runnable Runnable to be executed
+        */
+       void runAsync(Runnable runnable);
+
+       /**
+        * Execute the callable in the main thread of the underlying RPC 
endpoint and return a future for
+        * the callable result. If the future is not completed within the given 
timeout, the returned
+        * future will throw a {@link TimeoutException}.
+        *
+        * @param callable Callable to be executed
+        * @param callTimeout Timeout for the future to complete
+        * @param <V> Return value of the callable
+        * @return Future of the callable result
+        */
+       <V> Future<V> callAsync(Callable<V> callable, Time callTimeout);
+
+       /**
+        * Execute the runnable in the main thread of the underlying RPC 
endpoint, with
+        * a delay of the given number of milliseconds.
+        *
+        * @param runnable Runnable to be executed
+        * @param delay    The delay, in milliseconds, after which the runnable 
will be executed
+        */
+       void scheduleRunAsync(Runnable runnable, long delay);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
deleted file mode 100644
index 5e4fead..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc;
-
-import akka.util.Timeout;
-import scala.concurrent.Future;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Interface to execute {@link Runnable} and {@link Callable} in the main 
thread of the underlying
- * RPC endpoint.
- *
- * <p>This interface is intended to be implemented by the self gateway in a 
{@link RpcEndpoint}
- * implementation which allows to dispatch local procedures to the main thread 
of the underlying
- * RPC endpoint.
- */
-public interface MainThreadExecutor {
-
-       /**
-        * Execute the runnable in the main thread of the underlying RPC 
endpoint.
-        *
-        * @param runnable Runnable to be executed
-        */
-       void runAsync(Runnable runnable);
-
-       /**
-        * Execute the callable in the main thread of the underlying RPC 
endpoint and return a future for
-        * the callable result. If the future is not completed within the given 
timeout, the returned
-        * future will throw a {@link TimeoutException}.
-        *
-        * @param callable Callable to be executed
-        * @param callTimeout Timeout for the future to complete
-        * @param <V> Return value of the callable
-        * @return Future of the callable result
-        */
-       <V> Future<V> callAsync(Callable<V> callable, Timeout callTimeout);
-
-       /**
-        * Execute the runnable in the main thread of the underlying RPC 
endpoint, with
-        * a delay of the given number of milliseconds.
-        *
-        * @param runnable Runnable to be executed
-        * @param delay    The delay, in milliseconds, after which the runnable 
will be executed
-        */
-       void scheduleRunAsync(Runnable runnable, long delay);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index e9e2b2c..4e5e49a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -18,16 +18,15 @@
 
 package org.apache.flink.runtime.rpc;
 
-import akka.util.Timeout;
-
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ReflectionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-
 import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -49,8 +48,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * thread, we don't have to reason about concurrent accesses, in the same way 
in the Actor Model
  * of Erlang or Akka.
  *
- * <p>The RPC endpoint provides provides {@link #runAsync(Runnable)}, {@link 
#callAsync(Callable, Timeout)}
-  * and the {@link #getMainThreadExecutionContext()} to execute code in the 
RPC endoint's main thread.
+ * <p>The RPC endpoint provides provides {@link #runAsync(Runnable)}, {@link 
#callAsync(Callable, Time)}
+  * and the {@link #getMainThreadExecutor()} to execute code in the RPC 
endoint's main thread.
  *
  * @param <C> The RPC gateway counterpart for the implementing RPC endpoint
  */
@@ -69,9 +68,9 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
        /** Self gateway which can be used to schedule asynchronous calls on 
yourself */
        private final C self;
 
-       /** The main thread execution context to be used to execute future 
callbacks in the main thread
+       /** The main thread executor to be used to execute future callbacks in 
the main thread
         * of the executing rpc server. */
-       private final ExecutionContext mainThreadExecutionContext;
+       private final Executor mainThreadExecutor;
 
        /** A reference to the endpoint's main thread, if the current method is 
called by the main thread */
        final AtomicReference<Thread> currentMainThread = new 
AtomicReference<>(null); 
@@ -89,7 +88,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
                this.selfGatewayType = 
ReflectionUtil.getTemplateType1(getClass());
                this.self = rpcService.startServer(this);
                
-               this.mainThreadExecutionContext = new 
MainThreadExecutionContext((MainThreadExecutor) self);
+               this.mainThreadExecutor = new 
MainThreadExecutor((MainThreadExecutable) self);
        }
 
        /**
@@ -120,7 +119,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
         * Shuts down the underlying RPC endpoint via the RPC service.
         * After this method was called, the RPC endpoint will no longer be 
reachable, neither remotely,
         * not via its {@link #getSelf() self gateway}. It will also not 
accepts executions in main thread
-        * any more (via {@link #callAsync(Callable, Timeout)} and {@link 
#runAsync(Runnable)}).
+        * any more (via {@link #callAsync(Callable, Time)} and {@link 
#runAsync(Runnable)}).
         * 
         * <p>This method can be overridden to add RPC endpoint specific shut 
down code.
         * The overridden method should always call the parent shut down method.
@@ -161,8 +160,8 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
         *
         * @return Main thread execution context
         */
-       protected ExecutionContext getMainThreadExecutionContext() {
-               return mainThreadExecutionContext;
+       protected Executor getMainThreadExecutor() {
+               return mainThreadExecutor;
        }
 
        /**
@@ -185,7 +184,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
         * @param runnable Runnable to be executed in the main thread of the 
underlying RPC endpoint
         */
        protected void runAsync(Runnable runnable) {
-               ((MainThreadExecutor) self).runAsync(runnable);
+               ((MainThreadExecutable) self).runAsync(runnable);
        }
 
        /**
@@ -196,7 +195,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
         * @param delay    The delay after which the runnable will be executed
         */
        protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit 
unit) {
-               ((MainThreadExecutor) self).scheduleRunAsync(runnable, 
unit.toMillis(delay));
+               ((MainThreadExecutable) self).scheduleRunAsync(runnable, 
unit.toMillis(delay));
        }
 
        /**
@@ -209,8 +208,8 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
         * @param <V> Return type of the callable
         * @return Future for the result of the callable.
         */
-       protected <V> Future<V> callAsync(Callable<V> callable, Timeout 
timeout) {
-               return ((MainThreadExecutor) self).callAsync(callable, timeout);
+       protected <V> Future<V> callAsync(Callable<V> callable, Time timeout) {
+               return ((MainThreadExecutable) self).callAsync(callable, 
timeout);
        }
 
        // 
------------------------------------------------------------------------
@@ -241,36 +240,19 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
        // 
------------------------------------------------------------------------
        
        /**
-        * Execution context which executes runnables in the main thread 
context. A reported failure
-        * will cause the underlying rpc server to shut down.
+        * Executor which executes runnables in the main thread context.
         */
-       private class MainThreadExecutionContext implements ExecutionContext {
+       private class MainThreadExecutor implements Executor {
 
-               private final MainThreadExecutor gateway;
+               private final MainThreadExecutable gateway;
 
-               MainThreadExecutionContext(MainThreadExecutor gateway) {
-                       this.gateway = gateway;
+               MainThreadExecutor(MainThreadExecutable gateway) {
+                       this.gateway = Preconditions.checkNotNull(gateway);
                }
 
                @Override
                public void execute(Runnable runnable) {
                        gateway.runAsync(runnable);
                }
-
-               @Override
-               public void reportFailure(final Throwable t) {
-                       gateway.runAsync(new Runnable() {
-                               @Override
-                               public void run() {
-                                       log.error("Encountered failure in the 
main thread execution context.", t);
-                                       shutDown();
-                               }
-                       });
-               }
-
-               @Override
-               public ExecutionContext prepare() {
-                       return this;
-               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 78c1cec..a367ff2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.runtime.rpc;
 
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
 
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -68,23 +68,22 @@ public interface RpcService {
        void stopService();
 
        /**
-        * Gets the execution context, provided by this RPC service. This 
execution
-        * context can be used for example for the {@code onComplete(...)} or 
{@code onSuccess(...)}
-        * methods of Futures.
+        * Gets the executor, provided by this RPC service. This executor can 
be used for example for
+        * the {@code handleAsync(...)} or {@code thenAcceptAsync(...)} methods 
of futures.
         * 
-        * <p><b>IMPORTANT:</b> This execution context does not isolate the 
method invocations against
+        * <p><b>IMPORTANT:</b> This executor does not isolate the method 
invocations against
         * any concurrent invocations and is therefore not suitable to run 
completion methods of futures
         * that modify state of an {@link RpcEndpoint}. For such operations, 
one needs to use the
-        * {@link RpcEndpoint#getMainThreadExecutionContext() 
MainThreadExecutionContext} of that
+        * {@link RpcEndpoint#getMainThreadExecutor() 
MainThreadExecutionContext} of that
         * {@code RpcEndpoint}.
         * 
         * @return The execution context provided by the RPC service
         */
-       ExecutionContext getExecutionContext();
+       Executor getExecutor();
 
        /**
         * Execute the runnable in the execution context of this RPC Service, 
as returned by
-        * {@link #getExecutionContext()}, after a scheduled delay.
+        * {@link #getExecutor()}, after a scheduled delay.
         *
         * @param runnable Runnable to be executed
         * @param delay    The delay after which the runnable will be executed

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index bfa04f6..8f4deff 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -20,9 +20,11 @@ package org.apache.flink.runtime.rpc.akka;
 
 import akka.actor.ActorRef;
 import akka.pattern.Patterns;
-import akka.util.Timeout;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.StartStoppable;
@@ -34,9 +36,6 @@ import 
org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
 import org.apache.flink.util.Preconditions;
 import org.apache.log4j.Logger;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
 import java.lang.annotation.Annotation;
@@ -53,7 +52,7 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  * rpc in a {@link LocalRpcInvocation} message and then sends it to the {@link 
AkkaRpcActor} where it is
  * executed.
  */
-class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, 
MainThreadExecutor, StartStoppable {
+class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, 
MainThreadExecutable, StartStoppable {
        private static final Logger LOG = 
Logger.getLogger(AkkaInvocationHandler.class);
 
        private final String address;
@@ -64,11 +63,11 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaGateway, MainThrea
        private final boolean isLocal;
 
        // default timeout for asks
-       private final Timeout timeout;
+       private final Time timeout;
 
        private final long maximumFramesize;
 
-       AkkaInvocationHandler(String address, ActorRef rpcEndpoint, Timeout 
timeout, long maximumFramesize) {
+       AkkaInvocationHandler(String address, ActorRef rpcEndpoint, Time 
timeout, long maximumFramesize) {
                this.address = Preconditions.checkNotNull(address);
                this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint);
                this.isLocal = 
this.rpcEndpoint.path().address().hasLocalScope();
@@ -82,7 +81,7 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaGateway, MainThrea
 
                Object result;
 
-               if (declaringClass.equals(AkkaGateway.class) || 
declaringClass.equals(MainThreadExecutor.class) ||
+               if (declaringClass.equals(AkkaGateway.class) || 
declaringClass.equals(MainThreadExecutable.class) ||
                        declaringClass.equals(Object.class) || 
declaringClass.equals(StartStoppable.class) ||
                        declaringClass.equals(RpcGateway.class)) {
                        result = method.invoke(this, args);
@@ -90,7 +89,7 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaGateway, MainThrea
                        String methodName = method.getName();
                        Class<?>[] parameterTypes = method.getParameterTypes();
                        Annotation[][] parameterAnnotations = 
method.getParameterAnnotations();
-                       Timeout futureTimeout = 
extractRpcTimeout(parameterAnnotations, args, timeout);
+                       Time futureTimeout = 
extractRpcTimeout(parameterAnnotations, args, timeout);
 
                        Tuple2<Class<?>[], Object[]> filteredArguments = 
filterArguments(
                                parameterTypes,
@@ -130,13 +129,14 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaGateway, MainThrea
                                result = null;
                        } else if (returnType.equals(Future.class)) {
                                // execute an asynchronous call
-                               result = Patterns.ask(rpcEndpoint, 
rpcInvocation, futureTimeout);
+                               result = new 
FlinkFuture<>(Patterns.ask(rpcEndpoint, rpcInvocation, 
futureTimeout.toMilliseconds()));
                        } else {
                                // execute a synchronous call
-                               Future<?> futureResult = 
Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout);
-                               FiniteDuration duration = timeout.duration();
+                               scala.concurrent.Future<?> scalaFuture = 
Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout.toMilliseconds());
 
-                               result = Await.result(futureResult, duration);
+                               Future<?> futureResult = new 
FlinkFuture<>(scalaFuture);
+
+                               return 
futureResult.get(futureTimeout.getSize(), futureTimeout.getUnit());
                        }
                }
 
@@ -167,12 +167,12 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaGateway, MainThrea
        }
 
        @Override
-       public <V> Future<V> callAsync(Callable<V> callable, Timeout 
callTimeout) {
+       public <V> Future<V> callAsync(Callable<V> callable, Time callTimeout) {
                if(isLocal) {
                        @SuppressWarnings("unchecked")
-                       Future<V> result = (Future<V>) 
Patterns.ask(rpcEndpoint, new CallAsync(callable), callTimeout);
+                       scala.concurrent.Future<V> result = 
(scala.concurrent.Future<V>) Patterns.ask(rpcEndpoint, new CallAsync(callable), 
callTimeout.toMilliseconds());
 
-                       return result;
+                       return new FlinkFuture<>(result);
                } else {
                        throw new RuntimeException("Trying to send a Callable 
to a remote actor at " +
                                rpcEndpoint.path() + ". This is not 
supported.");
@@ -204,17 +204,17 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaGateway, MainThrea
         *                       has been found
         * @return Timeout extracted from the array of arguments or the default 
timeout
         */
-       private static Timeout extractRpcTimeout(Annotation[][] 
parameterAnnotations, Object[] args, Timeout defaultTimeout) {
+       private static Time extractRpcTimeout(Annotation[][] 
parameterAnnotations, Object[] args, Time defaultTimeout) {
                if (args != null) {
                        Preconditions.checkArgument(parameterAnnotations.length 
== args.length);
 
                        for (int i = 0; i < parameterAnnotations.length; i++) {
                                if (isRpcTimeout(parameterAnnotations[i])) {
-                                       if (args[i] instanceof FiniteDuration) {
-                                               return new 
Timeout((FiniteDuration) args[i]);
+                                       if (args[i] instanceof Time) {
+                                               return (Time) args[i];
                                        } else {
                                                throw new RuntimeException("The 
rpc timeout parameter must be of type " +
-                                                       
FiniteDuration.class.getName() + ". The type " + args[i].getClass().getName() +
+                                                       Time.class.getName() + 
". The type " + args[i].getClass().getName() +
                                                        " is not supported.");
                                        }
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 2373be9..59daa46 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -21,8 +21,11 @@ package org.apache.flink.runtime.rpc.akka;
 import akka.actor.ActorRef;
 import akka.actor.Status;
 import akka.actor.UntypedActorWithStash;
+import akka.dispatch.Futures;
 import akka.japi.Procedure;
 import akka.pattern.Patterns;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
@@ -35,7 +38,6 @@ import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
@@ -146,8 +148,23 @@ class AkkaRpcActor<C extends RpcGateway, T extends 
RpcEndpoint<C>> extends Untyp
                                        Object result = 
rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
 
                                        if (result instanceof Future) {
+                                               final Future<?> future = 
(Future<?>) result;
+
                                                // pipe result to sender
-                                               Patterns.pipe((Future<?>) 
result, getContext().dispatcher()).to(getSender());
+                                               if (future instanceof 
FlinkFuture) {
+                                                       // FlinkFutures are 
currently backed by Scala's futures
+                                                       FlinkFuture<?> 
flinkFuture = (FlinkFuture<?>) future;
+
+                                                       
Patterns.pipe(flinkFuture.getScalaFuture(), 
getContext().dispatcher()).to(getSender());
+                                               } else {
+                                                       // We have to unpack 
the Flink future and pack it into a Scala future
+                                                       
Patterns.pipe(Futures.future(new Callable<Object>() {
+                                                               @Override
+                                                               public Object 
call() throws Exception {
+                                                                       return 
future.get();
+                                                               }
+                                                       }, 
getContext().dispatcher()), getContext().dispatcher());
+                                               }
                                        } else {
                                                // tell the sender the result 
of the computation
                                                getSender().tell(new 
Status.Success(result), getSelf());

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 060a1ef..36f1115 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -26,11 +26,13 @@ import akka.actor.Identify;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.dispatch.Mapper;
-import akka.pattern.AskableActorSelection;
-import akka.util.Timeout;
 
+import akka.pattern.Patterns;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -39,8 +41,6 @@ import 
org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 import javax.annotation.concurrent.ThreadSafe;
@@ -48,6 +48,7 @@ import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Proxy;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -68,13 +69,13 @@ public class AkkaRpcService implements RpcService {
        private final Object lock = new Object();
 
        private final ActorSystem actorSystem;
-       private final Timeout timeout;
+       private final Time timeout;
        private final Set<ActorRef> actors = new HashSet<>(4);
        private final long maximumFramesize;
 
        private volatile boolean stopped;
 
-       public AkkaRpcService(final ActorSystem actorSystem, final Timeout 
timeout) {
+       public AkkaRpcService(final ActorSystem actorSystem, final Time 
timeout) {
                this.actorSystem = checkNotNull(actorSystem, "actor system");
                this.timeout = checkNotNull(timeout, "timeout");
 
@@ -95,10 +96,9 @@ public class AkkaRpcService implements RpcService {
                                address, clazz.getName());
 
                final ActorSelection actorSel = 
actorSystem.actorSelection(address);
-               final AskableActorSelection asker = new 
AskableActorSelection(actorSel);
 
-               final Future<Object> identify = asker.ask(new Identify(42), 
timeout);
-               return identify.map(new Mapper<Object, C>(){
+               final scala.concurrent.Future<Object> identify = 
Patterns.ask(actorSel, new Identify(42), timeout.toMilliseconds());
+               final scala.concurrent.Future<C> resultFuture = 
identify.map(new Mapper<Object, C>(){
                        @Override
                        public C checkedApply(Object obj) throws Exception {
 
@@ -128,6 +128,8 @@ public class AkkaRpcService implements RpcService {
                                }
                        }
                }, actorSystem.dispatcher());
+
+               return new FlinkFuture<>(resultFuture);
        }
 
        @Override
@@ -159,7 +161,7 @@ public class AkkaRpcService implements RpcService {
                        classLoader,
                        new Class<?>[]{
                                rpcEndpoint.getSelfGatewayType(),
-                               MainThreadExecutor.class,
+                               MainThreadExecutable.class,
                                StartStoppable.class,
                                AkkaGateway.class},
                        akkaInvocationHandler);
@@ -209,7 +211,7 @@ public class AkkaRpcService implements RpcService {
        }
 
        @Override
-       public ExecutionContext getExecutionContext() {
+       public Executor getExecutor() {
                return actorSystem.dispatcher();
        }
 
@@ -219,6 +221,6 @@ public class AkkaRpcService implements RpcService {
                checkNotNull(unit, "unit");
                checkArgument(delay >= 0, "delay must be zero or larger");
 
-               actorSystem.scheduler().scheduleOnce(new FiniteDuration(delay, 
unit), runnable, getExecutionContext());
+               actorSystem.scheduler().scheduleOnce(new FiniteDuration(delay, 
unit), runnable, actorSystem.dispatcher());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index fadae5f..d84a6a9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import akka.actor.ActorSystem;
-import akka.util.Timeout;
 import com.typesafe.config.Config;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.LocalConnectionManager;
@@ -35,6 +35,7 @@ import 
org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
 import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.Preconditions;
+import org.jboss.netty.channel.ChannelException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,6 +79,7 @@ import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.BindException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.UUID;
@@ -198,7 +200,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                                        this,
                                        newLeaderAddress,
                                        newLeaderId,
-                                       getMainThreadExecutionContext());
+                                       getMainThreadExecutor());
                        resourceManagerConnection.start();
                }
        }
@@ -302,9 +304,9 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                        LOG.debug("Using akka configuration\n " + akkaConfig);
                        taskManagerSystem = 
AkkaUtils.createActorSystem(akkaConfig);
                } catch (Throwable t) {
-                       if (t instanceof 
org.jboss.netty.channel.ChannelException) {
+                       if (t instanceof ChannelException) {
                                Throwable cause = t.getCause();
-                               if (cause != null && t.getCause() instanceof 
java.net.BindException) {
+                               if (cause != null && t.getCause() instanceof 
BindException) {
                                        String address = 
NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort);
                                        throw new IOException("Unable to bind 
TaskManager actor system to address " +
                                                address + " - " + 
cause.getMessage(), t);
@@ -314,7 +316,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                }
 
                // start akka rpc service based on actor system
-               final Timeout timeout = new 
Timeout(AkkaUtils.getTimeout(configuration).toMillis(), TimeUnit.MILLISECONDS);
+               final Time timeout = 
Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
                final AkkaRpcService akkaRpcService = new 
AkkaRpcService(taskManagerSystem, timeout);
 
                // start high availability service to implement 
getResourceManagerLeaderRetriever method only

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 65323a8..0962802 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.util.UUID;
 
@@ -48,5 +48,5 @@ public interface TaskExecutorGateway extends RpcGateway {
        Future<SlotRequestReply> requestSlot(
                AllocationID allocationID,
                UUID resourceManagerLeaderID,
-               @RpcTimeout FiniteDuration timeout);
+               @RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index 28062b6..647359d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -18,11 +18,12 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
-import akka.dispatch.OnFailure;
-import akka.dispatch.OnSuccess;
-
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -31,12 +32,8 @@ import 
org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 
 import org.slf4j.Logger;
 
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -57,7 +54,7 @@ public class TaskExecutorToResourceManagerConnection {
        private final String resourceManagerAddress;
 
        /** Execution context to be used to execute the on complete action of 
the ResourceManagerRegistration */
-       private final ExecutionContext executionContext;
+       private final Executor executor;
 
        private 
TaskExecutorToResourceManagerConnection.ResourceManagerRegistration 
pendingRegistration;
 
@@ -74,13 +71,13 @@ public class TaskExecutorToResourceManagerConnection {
                TaskExecutor taskExecutor,
                String resourceManagerAddress,
                UUID resourceManagerLeaderId,
-               ExecutionContext executionContext) {
+               Executor executor) {
 
                this.log = checkNotNull(log);
                this.taskExecutor = checkNotNull(taskExecutor);
                this.resourceManagerAddress = 
checkNotNull(resourceManagerAddress);
                this.resourceManagerLeaderId = 
checkNotNull(resourceManagerLeaderId);
-               this.executionContext = checkNotNull(executionContext);
+               this.executor = checkNotNull(executor);
        }
 
        // 
------------------------------------------------------------------------
@@ -100,21 +97,22 @@ public class TaskExecutorToResourceManagerConnection {
 
                Future<Tuple2<ResourceManagerGateway, 
TaskExecutorRegistrationSuccess>> future = pendingRegistration.getFuture();
 
-               future.onSuccess(new OnSuccess<Tuple2<ResourceManagerGateway, 
TaskExecutorRegistrationSuccess>>() {
+               future.thenAcceptAsync(new 
AcceptFunction<Tuple2<ResourceManagerGateway, 
TaskExecutorRegistrationSuccess>>() {
                        @Override
-                       public void onSuccess(Tuple2<ResourceManagerGateway, 
TaskExecutorRegistrationSuccess> result) {
+                       public void accept(Tuple2<ResourceManagerGateway, 
TaskExecutorRegistrationSuccess> result) {
                                registrationId = result.f1.getRegistrationId();
                                registeredResourceManager = result.f0;
                        }
-               }, executionContext);
+               }, executor);
                
                // this future should only ever fail if there is a bug, not if 
the registration is declined
-               future.onFailure(new OnFailure() {
+               future.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
                        @Override
-                       public void onFailure(Throwable failure) {
+                       public Void apply(Throwable failure) {
                                taskExecutor.onFatalErrorAsync(failure);
+                               return null;
                        }
-               }, executionContext);
+               }, executor);
        }
 
        public void close() {
@@ -197,7 +195,7 @@ public class TaskExecutorToResourceManagerConnection {
                protected Future<RegistrationResponse> invokeRegistration(
                                ResourceManagerGateway resourceManager, UUID 
leaderId, long timeoutMillis) throws Exception {
 
-                       FiniteDuration timeout = new 
FiniteDuration(timeoutMillis, TimeUnit.MILLISECONDS);
+                       Time timeout = Time.milliseconds(timeoutMillis);
                        return resourceManager.registerTaskExecutor(leaderId, 
taskExecutorAddress, resourceID, timeout);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
index 80fa19c..e56a9ec 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.registration;
 
-import akka.dispatch.Futures;
-
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.util.TestLogger;
@@ -29,18 +29,13 @@ import org.junit.Test;
 
 import org.slf4j.LoggerFactory;
 
-import scala.concurrent.Await;
-import scala.concurrent.ExecutionContext$;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
@@ -71,8 +66,8 @@ public class RetryingRegistrationTest extends TestLogger {
                        // multiple accesses return the same future
                        assertEquals(future, registration.getFuture());
 
-                       Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess> success = 
-                                       Await.result(future, new 
FiniteDuration(10, SECONDS));
+                       Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess> success =
+                                       future.get(10L, TimeUnit.SECONDS);
 
                        // validate correct invocation and result
                        assertEquals(testId, success.f1.getCorrelationId());
@@ -83,7 +78,7 @@ public class RetryingRegistrationTest extends TestLogger {
                        rpc.stopService();
                }
        }
-       
+
        @Test
        public void testPropagateFailures() throws Exception {
                final String testExceptionMessage = "testExceptionMessage";
@@ -96,9 +91,15 @@ public class RetryingRegistrationTest extends TestLogger {
                registration.startRegistration();
 
                Future<?> future = registration.getFuture();
-               assertTrue(future.failed().isCompleted());
+               assertTrue(future.isDone());
 
-               assertEquals(testExceptionMessage, 
future.failed().value().get().get().getMessage());
+               try {
+                       future.get();
+
+                       fail("We expected an ExecutionException.");
+               } catch (ExecutionException e) {
+                       assertEquals(testExceptionMessage, 
e.getCause().getMessage());
+               }
        }
 
        @Test
@@ -113,16 +114,16 @@ public class RetryingRegistrationTest extends TestLogger {
                        // RPC service that fails upon the first connection, 
but succeeds on the second
                        RpcService rpc = mock(RpcService.class);
                        when(rpc.connect(anyString(), 
any(Class.class))).thenReturn(
-                                       Futures.failed(new Exception("test 
connect failure")),  // first connection attempt fails
-                                       Futures.successful(testGateway)         
                // second connection attempt succeeds
+                                       
FlinkCompletableFuture.completedExceptionally(new Exception("test connect 
failure")),  // first connection attempt fails
+                                       
FlinkCompletableFuture.completed(testGateway)                         // second 
connection attempt succeeds
                        );
-                       
when(rpc.getExecutionContext()).thenReturn(ExecutionContext$.MODULE$.fromExecutor(executor));
+                       when(rpc.getExecutor()).thenReturn(executor);
 
                        TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, "foobar address", leaderId);
                        registration.startRegistration();
 
                        Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess> success =
-                                       Await.result(registration.getFuture(), 
new FiniteDuration(10, SECONDS));
+                               registration.getFuture().get(10L, 
TimeUnit.SECONDS);
 
                        // validate correct invocation and result
                        assertEquals(testId, success.f1.getCorrelationId());
@@ -151,23 +152,23 @@ public class RetryingRegistrationTest extends TestLogger {
 
                try {
                        rpc.registerGateway(testEndpointAddress, testGateway);
-       
+
                        TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
-       
+
                        long started = System.nanoTime();
                        registration.startRegistration();
-       
+
                        Future<Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess>> future = registration.getFuture();
                        Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess> success =
-                                       Await.result(future, new 
FiniteDuration(10, SECONDS));
-       
+                                       future.get(10L, TimeUnit.SECONDS);
+
                        long finished = System.nanoTime();
                        long elapsedMillis = (finished - started) / 1000000;
-       
+
                        // validate correct invocation and result
                        assertEquals(testId, success.f1.getCorrelationId());
                        assertEquals(leaderId, 
testGateway.getInvocations().take().leaderId());
-       
+
                        // validate that some retry-delay / back-off behavior 
happened
                        assertTrue("retries did not properly back off", 
elapsedMillis >= 3 * TestRetryingRegistration.INITIAL_TIMEOUT);
                }
@@ -199,10 +200,10 @@ public class RetryingRegistrationTest extends TestLogger {
 
                        long started = System.nanoTime();
                        registration.startRegistration();
-       
+
                        Future<Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess>> future = registration.getFuture();
                        Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess> success =
-                                       Await.result(future, new 
FiniteDuration(10, SECONDS));
+                                       future.get(10L, TimeUnit.SECONDS);
 
                        long finished = System.nanoTime();
                        long elapsedMillis = (finished - started) / 1000000;
@@ -212,7 +213,7 @@ public class RetryingRegistrationTest extends TestLogger {
                        assertEquals(leaderId, 
testGateway.getInvocations().take().leaderId());
 
                        // validate that some retry-delay / back-off behavior 
happened
-                       assertTrue("retries did not properly back off", 
elapsedMillis >= 
+                       assertTrue("retries did not properly back off", 
elapsedMillis >=
                                        2 * 
TestRetryingRegistration.INITIAL_TIMEOUT + 
TestRetryingRegistration.DELAY_ON_DECLINE);
                }
                finally {
@@ -220,7 +221,7 @@ public class RetryingRegistrationTest extends TestLogger {
                        rpc.stopService();
                }
        }
-       
+
        @Test
        @SuppressWarnings("unchecked")
        public void testRetryOnError() throws Exception {
@@ -235,9 +236,9 @@ public class RetryingRegistrationTest extends TestLogger {
                        TestRegistrationGateway testGateway = 
mock(TestRegistrationGateway.class);
 
                        when(testGateway.registrationCall(any(UUID.class), 
anyLong())).thenReturn(
-                                       
Futures.<RegistrationResponse>failed(new Exception("test exception")),
-                                       
Futures.<RegistrationResponse>successful(new TestRegistrationSuccess(testId)));
-                       
+                                       
FlinkCompletableFuture.<RegistrationResponse>completedExceptionally(new 
Exception("test exception")),
+                                       
FlinkCompletableFuture.<RegistrationResponse>completed(new 
TestRegistrationSuccess(testId)));
+
                        rpc.registerGateway(testEndpointAddress, testGateway);
 
                        TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
@@ -247,11 +248,11 @@ public class RetryingRegistrationTest extends TestLogger {
 
                        Future<Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess>> future = registration.getFuture();
                        Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess> success =
-                                       Await.result(future, new 
FiniteDuration(10, SECONDS));
+                                       future.get(10, TimeUnit.SECONDS);
 
                        long finished = System.nanoTime();
                        long elapsedMillis = (finished - started) / 1000000;
-                       
+
                        assertEquals(testId, success.f1.getCorrelationId());
 
                        // validate that some retry-delay / back-off behavior 
happened
@@ -271,10 +272,10 @@ public class RetryingRegistrationTest extends TestLogger {
                TestingRpcService rpc = new TestingRpcService();
 
                try {
-                       Promise<RegistrationResponse> result = 
Futures.promise();
+                       FlinkCompletableFuture<RegistrationResponse> result = 
new FlinkCompletableFuture<>();
 
                        TestRegistrationGateway testGateway = 
mock(TestRegistrationGateway.class);
-                       when(testGateway.registrationCall(any(UUID.class), 
anyLong())).thenReturn(result.future());
+                       when(testGateway.registrationCall(any(UUID.class), 
anyLong())).thenReturn(result);
 
                        rpc.registerGateway(testEndpointAddress, testGateway);
 
@@ -283,7 +284,7 @@ public class RetryingRegistrationTest extends TestLogger {
 
                        // cancel and fail the current registration attempt
                        registration.cancel();
-                       result.failure(new TimeoutException());
+                       result.completeExceptionally(new TimeoutException());
 
                        // there should not be a second registration attempt
                        verify(testGateway, 
atMost(1)).registrationCall(any(UUID.class), anyLong());

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
index 431fbe8..2843aeb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
@@ -20,11 +20,11 @@ package org.apache.flink.runtime.registration;
 
 import akka.dispatch.Futures;
 
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.rpc.TestingGatewayBase;
 import org.apache.flink.util.Preconditions;
 
-import scala.concurrent.Future;
-
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -56,7 +56,7 @@ public class TestRegistrationGateway extends 
TestingGatewayBase {
                }
 
                // return a completed future (for a proper value), or one that 
never completes and will time out (for null)
-               return response != null ? Futures.successful(response) : 
this.<RegistrationResponse>futureWithTimeout(timeout);
+               return response != null ? 
FlinkCompletableFuture.completed(response) : 
this.<RegistrationResponse>futureWithTimeout(timeout);
        }
 
        public BlockingQueue<RegistrationCall> getInvocations() {

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 8183c0a..64a1191 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.resourcemanager;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -68,7 +68,7 @@ public class ResourceManagerHATest {
                Assert.assertNull(resourceManager.getLeaderSessionID());
        }
 
-       private static abstract class TestingResourceManagerGatewayProxy 
implements MainThreadExecutor, StartStoppable, RpcGateway {
+       private static abstract class TestingResourceManagerGatewayProxy 
implements MainThreadExecutable, StartStoppable, RpcGateway {
                @Override
                public void runAsync(Runnable runnable) {
                        runnable.run();

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 85d2880..1f9e7e8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -18,10 +18,12 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.highavailability.NonHaServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
@@ -40,10 +42,6 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.util.Collections;
 import java.util.UUID;
@@ -99,7 +97,7 @@ public class SlotProtocolTest extends TestLogger {
                Future<RegistrationResponse> registrationFuture =
                        resourceManager.registerJobMaster(new 
JobMasterRegistration(jmAddress, jobID));
                try {
-                       Await.ready(registrationFuture, Duration.create(5, 
TimeUnit.SECONDS));
+                       registrationFuture.get(5, TimeUnit.SECONDS);
                } catch (Exception e) {
                        Assert.fail("JobManager registration Future didn't 
become ready.");
                }
@@ -141,7 +139,7 @@ public class SlotProtocolTest extends TestLogger {
                slotManager.updateSlotStatus(slotReport);
 
                // 4) Slot becomes available and TaskExecutor gets a SlotRequest
-               verify(taskExecutorGateway, 
timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), 
any(FiniteDuration.class));
+               verify(taskExecutorGateway, 
timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(Time.class));
        }
 
        /**
@@ -171,7 +169,7 @@ public class SlotProtocolTest extends TestLogger {
                Future<RegistrationResponse> registrationFuture =
                        resourceManager.registerJobMaster(new 
JobMasterRegistration(jmAddress, jobID));
                try {
-                       Await.ready(registrationFuture, Duration.create(5, 
TimeUnit.SECONDS));
+                       registrationFuture.get(5, TimeUnit.SECONDS);
                } catch (Exception e) {
                        Assert.fail("JobManager registration Future didn't 
become ready.");
                }
@@ -207,7 +205,7 @@ public class SlotProtocolTest extends TestLogger {
 
 
                // 4) a SlotRequest is routed to the TaskExecutor
-               verify(taskExecutorGateway, 
timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), 
any(FiniteDuration.class));
+               verify(taskExecutorGateway, 
timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(Time.class));
        }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index 1791056..7c6b0ee 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -21,18 +21,16 @@ package org.apache.flink.runtime.rpc;
 import akka.actor.ActorSystem;
 import akka.util.Timeout;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
 
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.Test;
 
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -49,7 +47,7 @@ public class AsyncCallsTest extends TestLogger {
        private static ActorSystem actorSystem = 
AkkaUtils.createDefaultActorSystem();
 
        private static AkkaRpcService akkaRpcService =
-                       new AkkaRpcService(actorSystem, new Timeout(10000, 
TimeUnit.MILLISECONDS));
+                       new AkkaRpcService(actorSystem, 
Time.milliseconds(10000L));
 
        @AfterClass
        public static void shutdown() {
@@ -104,8 +102,9 @@ public class AsyncCallsTest extends TestLogger {
                                }
                                return "test";
                        }
-               }, new Timeout(30, TimeUnit.SECONDS));
-               String str = Await.result(result, new FiniteDuration(30, 
TimeUnit.SECONDS));
+               }, Time.seconds(30L));
+
+               String str = result.get(30, TimeUnit.SECONDS);
                assertEquals("test", str);
 
                // validate that no concurrent access happened

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index b431eb9..ee3f784 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -18,14 +18,14 @@
 
 package org.apache.flink.runtime.rpc;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.util.ReflectionUtil;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import org.reflections.Reflections;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Method;
@@ -43,6 +43,7 @@ import static org.junit.Assert.fail;
 public class RpcCompletenessTest extends TestLogger {
 
        private static final Class<?> futureClass = Future.class;
+       private static final Class<?> timeoutClass = Time.class;
 
        @Test
        @SuppressWarnings({"rawtypes", "unchecked"})
@@ -147,8 +148,8 @@ public class RpcCompletenessTest extends TestLogger {
                for (int i = 0; i < parameterAnnotations.length; i++) {
                        if 
(RpcCompletenessTest.isRpcTimeout(parameterAnnotations[i])) {
                                assertTrue(
-                                       "The rpc timeout has to be of type " + 
FiniteDuration.class.getName() + ".",
-                                       
parameterTypes[i].equals(FiniteDuration.class));
+                                       "The rpc timeout has to be of type " + 
timeoutClass.getName() + ".",
+                                       parameterTypes[i].equals(timeoutClass));
 
                                rpcTimeoutParameters++;
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
index 8133a87..caf5e81 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.rpc;
 
-import akka.dispatch.Futures;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -73,25 +73,25 @@ public abstract class TestingGatewayBase implements 
RpcGateway {
        // 
------------------------------------------------------------------------
 
        public <T> Future<T> futureWithTimeout(long timeoutMillis) {
-               Promise<T> promise = Futures.<T>promise();
-               executor.schedule(new FutureTimeout(promise), timeoutMillis, 
TimeUnit.MILLISECONDS);
-               return promise.future();
+               FlinkCompletableFuture<T> future = new 
FlinkCompletableFuture<>();
+               executor.schedule(new FutureTimeout(future), timeoutMillis, 
TimeUnit.MILLISECONDS);
+               return future;
        }
 
        // 
------------------------------------------------------------------------
        
        private static final class FutureTimeout implements Runnable {
 
-               private final Promise<?> promise;
+               private final CompletableFuture<?> promise;
 
-               private FutureTimeout(Promise<?> promise) {
+               private FutureTimeout(CompletableFuture<?> promise) {
                        this.promise = promise;
                }
 
                @Override
                public void run() {
                        try {
-                               promise.failure(new TimeoutException());
+                               promise.completeExceptionally(new 
TimeoutException());
                        } catch (Throwable t) {
                                System.err.println("CAUGHT AN ERROR IN THE 
TEST: " + t.getMessage());
                                t.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/31a091b9/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index 2212680..f164056 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -18,18 +18,14 @@
 
 package org.apache.flink.runtime.rpc;
 
-import akka.dispatch.Futures;
-import akka.util.Timeout;
-
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -69,7 +65,7 @@ public class TestingRpcService extends AkkaRpcService {
         * Creates a new {@code TestingRpcService}, using the given 
configuration. 
         */
        public TestingRpcService(Configuration configuration) {
-               super(AkkaUtils.createLocalActorSystem(configuration), new 
Timeout(new FiniteDuration(10, TimeUnit.SECONDS)));
+               super(AkkaUtils.createLocalActorSystem(configuration), 
Time.seconds(10));
 
                this.registeredConnections = new ConcurrentHashMap<>();
        }
@@ -103,13 +99,13 @@ public class TestingRpcService extends AkkaRpcService {
                        if (clazz.isAssignableFrom(gateway.getClass())) {
                                @SuppressWarnings("unchecked")
                                C typedGateway = (C) gateway;
-                               return Futures.successful(typedGateway);
+                               return 
FlinkCompletableFuture.completed(typedGateway);
                        } else {
-                               return Futures.failed(
-                                               new Exception("Gateway 
registered under " + address + " is not of type " + clazz));
+                               return 
FlinkCompletableFuture.completedExceptionally(
+                                       new Exception("Gateway registered under 
" + address + " is not of type " + clazz));
                        }
                } else {
-                       return Futures.failed(new Exception("No gateway 
registered under that name"));
+                       return 
FlinkCompletableFuture.completedExceptionally(new Exception("No gateway 
registered under that name"));
                }
        }
 

Reply via email to