http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
index d1aeea4..e2437e6 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -45,7 +45,7 @@ public class JobManagerConfigHandler extends 
AbstractJsonRequestHandler {
        }
 
        @Override
-       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws 
Exception {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
index c8ec689..3526734 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
@@ -19,8 +19,9 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
 import java.util.Map;
@@ -33,17 +34,23 @@ public class JobStoppingHandler extends 
AbstractJsonRequestHandler {
        private static final String JOB_STOPPING_REST_PATH = 
"/jobs/:jobid/stop";
        private static final String JOB_STOPPING_YARN_REST_PATH = 
"/jobs/:jobid/yarn-stop";
 
+       private final Time timeout;
+
+       public JobStoppingHandler(Time timeout) {
+               this.timeout = Preconditions.checkNotNull(timeout);
+       }
+
        @Override
        public String[] getPaths() {
                return new String[]{JOB_STOPPING_REST_PATH, 
JOB_STOPPING_YARN_REST_PATH};
        }
 
        @Override
-       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws 
Exception {
                try {
-                       JobID jobid = new 
JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
-                       if (jobManager != null) {
-                               jobManager.tell(new 
JobManagerMessages.StopJob(jobid));
+                       JobID jobId = new 
JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
+                       if (jobManagerGateway != null) {
+                               jobManagerGateway.stopJob(jobId, timeout);
                                return "{}";
                        }
                        else {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
index 8646df9..079be8f 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
@@ -18,7 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.webmonitor.NotFoundException;
 
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
 
@@ -40,16 +41,16 @@ public interface RequestHandler {
         *
         * @param pathParams The map of REST path parameters, decoded by the 
router.
         * @param queryParams The map of query parameters.
-        * @param jobManager The JobManager actor.
+        * @param jobManagerGateway to talk to the JobManager.
         *
         * @return The full http response.
         *
         * @throws Exception Handlers may forward exceptions. Exceptions of type
-        *         {@link 
org.apache.flink.runtime.webmonitor.NotFoundException} will cause a HTTP 404
+        *         {@link NotFoundException} will cause a HTTP 404
         *         response with the exception message, other exceptions will 
cause a HTTP 500 response
         *         with the exception stack trace.
         */
-       FullHttpResponse handleRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception;
+       FullHttpResponse handleRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws 
Exception;
 
        /**
         * Returns an array of REST URL's under which this handler can be 
registered.

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index 3562874..b7fee2d 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -33,13 +33,11 @@ import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobView;
 import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
@@ -62,7 +60,6 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
 import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future;
 import 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener;
 
-import akka.dispatch.Mapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,13 +71,10 @@ import java.net.InetSocketAddress;
 import java.nio.channels.FileChannel;
 import java.util.HashMap;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-
-import scala.Option;
-import scala.concurrent.ExecutionContextExecutor;
-import scala.concurrent.duration.FiniteDuration;
-import scala.reflect.ClassTag$;
+import java.util.concurrent.Executor;
 
 import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
 import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
@@ -115,9 +109,7 @@ public class TaskManagerLogHandler extends 
RuntimeMonitorHandlerBase {
        /** Indicates which log file should be displayed. */
        private FileMode fileMode;
 
-       private final ExecutionContextExecutor executor;
-
-       private final Time timeTimeout;
+       private final Executor executor;
 
        private final BlobView blobView;
 
@@ -129,9 +121,9 @@ public class TaskManagerLogHandler extends 
RuntimeMonitorHandlerBase {
 
        public TaskManagerLogHandler(
                JobManagerRetriever retriever,
-               ExecutionContextExecutor executor,
-               scala.concurrent.Future<String> localJobManagerAddressPromise,
-               FiniteDuration timeout,
+               Executor executor,
+               CompletableFuture<String> localJobManagerAddressPromise,
+               Time timeout,
                FileMode fileMode,
                Configuration config,
                boolean httpsEnabled,
@@ -143,8 +135,6 @@ public class TaskManagerLogHandler extends 
RuntimeMonitorHandlerBase {
                this.fileMode = fileMode;
 
                this.blobView = Preconditions.checkNotNull(blobView, 
"blobView");
-
-               timeTimeout = Time.milliseconds(timeout.toMillis());
        }
 
        @Override
@@ -162,20 +152,18 @@ public class TaskManagerLogHandler extends 
RuntimeMonitorHandlerBase {
         * Response when running with leading JobManager.
         */
        @Override
-       protected void respondAsLeader(final ChannelHandlerContext ctx, final 
Routed routed, final ActorGateway jobManager) {
+       protected void respondAsLeader(final ChannelHandlerContext ctx, final 
Routed routed, final JobManagerGateway jobManagerGateway) {
                if (cache == null) {
-                       scala.concurrent.Future<Object> portFuture = 
jobManager.ask(JobManagerMessages.getRequestBlobManagerPort(), timeout);
-                       scala.concurrent.Future<BlobCache> cacheFuture = 
portFuture.map(new Mapper<Object, BlobCache>() {
-                               @Override
-                               public BlobCache checkedApply(Object result) 
throws IOException {
-                                       Option<String> hostOption = 
jobManager.actor().path().address().host();
-                                       String host = hostOption.isDefined() ? 
hostOption.get() : "localhost";
-                                       int port = (int) result;
-                                       return new BlobCache(new 
InetSocketAddress(host, port), config, blobView);
-                               }
-                       }, executor);
-
-                       cache = FutureUtils.toJava(cacheFuture);
+                       CompletableFuture<Integer> blobPortFuture = 
jobManagerGateway.requestBlobServerPort(timeout);
+                       cache = blobPortFuture.thenApplyAsync(
+                               (Integer port) -> {
+                                       try {
+                                               return new BlobCache(new 
InetSocketAddress(jobManagerGateway.getHostname(), port), config, blobView);
+                                       } catch (IOException e) {
+                                               throw new 
FlinkFutureException("Could not create BlobCache.", e);
+                                       }
+                               },
+                               executor);
                }
 
                final String taskManagerID = 
routed.pathParams().get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
@@ -185,22 +173,18 @@ public class TaskManagerLogHandler extends 
RuntimeMonitorHandlerBase {
                if (lastRequestPending.putIfAbsent(taskManagerID, true) == 
null) {
                        try {
                                InstanceID instanceID = new 
InstanceID(StringUtils.hexStringToByte(taskManagerID));
-                               
scala.concurrent.Future<JobManagerMessages.TaskManagerInstance> 
scalaTaskManagerFuture = jobManager
-                                       .ask(new 
JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout)
-                                       
.mapTo(ClassTag$.MODULE$.<JobManagerMessages.TaskManagerInstance>apply(JobManagerMessages.TaskManagerInstance.class));
-
-                               
CompletableFuture<JobManagerMessages.TaskManagerInstance> taskManagerFuture = 
FutureUtils.toJava(scalaTaskManagerFuture);
+                               CompletableFuture<Optional<Instance>> 
taskManagerFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, 
timeout);
 
                                CompletableFuture<BlobKey> blobKeyFuture = 
taskManagerFuture.thenCompose(
-                                       taskManagerInstance -> {
-                                               Instance taskManager = 
taskManagerInstance.instance().get();
-
+                                       (Optional<Instance> optTMInstance) -> {
+                                               Instance taskManagerInstance = 
optTMInstance.orElseThrow(
+                                                       () -> new 
FlinkFutureException("Could not find instance with " + instanceID + '.'));
                                                switch (fileMode) {
                                                        case LOG:
-                                                               return 
taskManager.getTaskManagerGateway().requestTaskManagerLog(timeTimeout);
+                                                               return 
taskManagerInstance.getTaskManagerGateway().requestTaskManagerLog(timeout);
                                                        case STDOUT:
                                                        default:
-                                                               return 
taskManager.getTaskManagerGateway().requestTaskManagerStdout(timeTimeout);
+                                                               return 
taskManagerInstance.getTaskManagerGateway().requestTaskManagerStdout(timeout);
                                                }
                                        }
                                );

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
index 6ad490e..a8ab7a3 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
@@ -18,12 +18,10 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.RegisteredTaskManagers;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.TaskManagerInstance;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
 import org.apache.flink.util.StringUtils;
@@ -32,12 +30,12 @@ import com.fasterxml.jackson.core.JsonGenerator;
 
 import java.io.StringWriter;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static java.util.Objects.requireNonNull;
 
@@ -51,11 +49,11 @@ public class TaskManagersHandler extends 
AbstractJsonRequestHandler  {
 
        public static final String TASK_MANAGER_ID_KEY = "taskmanagerid";
 
-       private final FiniteDuration timeout;
+       private final Time timeout;
 
        private final MetricFetcher fetcher;
 
-       public TaskManagersHandler(FiniteDuration timeout, MetricFetcher 
fetcher) {
+       public TaskManagersHandler(Time timeout, MetricFetcher fetcher) {
                this.timeout = requireNonNull(timeout);
                this.fetcher = fetcher;
        }
@@ -66,9 +64,9 @@ public class TaskManagersHandler extends 
AbstractJsonRequestHandler  {
        }
 
        @Override
-       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws 
Exception {
                try {
-                       if (jobManager != null) {
+                       if (jobManagerGateway != null) {
                                // whether one task manager's metrics are 
requested, or all task manager, we
                                // return them in an array. This avoids 
unnecessary code complexity.
                                // If only one task manager is requested, we 
only fetch one task manager metrics.
@@ -76,20 +74,21 @@ public class TaskManagersHandler extends 
AbstractJsonRequestHandler  {
                                if 
(pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
                                        try {
                                                InstanceID instanceID = new 
InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY)));
-                                               Future<Object> future = 
jobManager.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), 
timeout);
-                                               TaskManagerInstance instance = 
(TaskManagerInstance) Await.result(future, timeout);
-                                               if 
(instance.instance().nonEmpty()) {
-                                                       
instances.add(instance.instance().get());
-                                               }
+                                               
CompletableFuture<Optional<Instance>> tmInstanceFuture = 
jobManagerGateway.requestTaskManagerInstance(instanceID, timeout);
+
+                                               Optional<Instance> instance = 
tmInstanceFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+                                               
instance.ifPresent(instances::add);
                                        }
                                        // this means the id string was 
invalid. Keep the list empty.
                                        catch (IllegalArgumentException e){
                                                // do nothing.
                                        }
                                } else {
-                                       Future<Object> future = 
jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout);
-                                       RegisteredTaskManagers taskManagers = 
(RegisteredTaskManagers) Await.result(future, timeout);
-                                       
instances.addAll(taskManagers.asJavaCollection());
+                                       CompletableFuture<Collection<Instance>> 
tmInstancesFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
+
+                                       Collection<Instance> tmInstances = 
tmInstancesFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+                                       instances.addAll(tmInstances);
                                }
 
                                StringWriter writer = new StringWriter();

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
index f96e0c2..d116c56 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -25,8 +25,8 @@ import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
 import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
 import org.apache.flink.runtime.checkpoint.TaskStateStats;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import 
org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
 import 
org.apache.flink.runtime.webmonitor.handlers.AbstractJobVertexRequestHandler;
@@ -71,8 +71,8 @@ public class CheckpointStatsDetailsSubtasksHandler extends 
AbstractExecutionGrap
        public String handleJsonRequest(
                Map<String, String> pathParams,
                Map<String, String> queryParams,
-               ActorGateway jobManager) throws Exception {
-               return super.handleJsonRequest(pathParams, queryParams, 
jobManager);
+               JobManagerGateway jobManagerGateway) throws Exception {
+               return super.handleJsonRequest(pathParams, queryParams, 
jobManagerGateway);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
index 94b135d..b95f2c4 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.metrics;
 
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.handlers.AbstractJsonRequestHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
 import org.apache.flink.util.Preconditions;
@@ -48,7 +48,7 @@ public abstract class AbstractMetricsHandler extends 
AbstractJsonRequestHandler
        }
 
        @Override
-       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws 
Exception {
                fetcher.update();
                String requestedMetricsList = queryParams.get("get");
                return requestedMetricsList != null

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
index 95398b5..3af9c56 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
@@ -18,37 +18,29 @@
 
 package org.apache.flink.runtime.webmonitor.metrics;
 
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
 import org.apache.flink.runtime.metrics.dump.MetricDump;
 import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
-import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.Preconditions;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.OnFailure;
-import akka.dispatch.OnSuccess;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import scala.Option;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer;
 
@@ -61,20 +53,25 @@ import static 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.Metr
 public class MetricFetcher {
        private static final Logger LOG = 
LoggerFactory.getLogger(MetricFetcher.class);
 
-       private final ActorSystem actorSystem;
        private final JobManagerRetriever retriever;
-       private final ExecutionContext ctx;
-       private final FiniteDuration timeout = new 
FiniteDuration(Duration.create(AkkaOptions.ASK_TIMEOUT.defaultValue()).toMillis(),
 TimeUnit.MILLISECONDS);
+       private final MetricQueryServiceRetriever queryServiceRetriever;
+       private final Executor executor;
+       private final Time timeout;
 
        private MetricStore metrics = new MetricStore();
        private MetricDumpDeserializer deserializer = new 
MetricDumpDeserializer();
 
        private long lastUpdateTime;
 
-       public MetricFetcher(ActorSystem actorSystem, JobManagerRetriever 
retriever, ExecutionContext ctx) {
-               this.actorSystem = Preconditions.checkNotNull(actorSystem);
+       public MetricFetcher(
+                       JobManagerRetriever retriever,
+                       MetricQueryServiceRetriever queryServiceRetriever,
+                       Executor executor,
+                       Time timeout) {
                this.retriever = Preconditions.checkNotNull(retriever);
-               this.ctx = Preconditions.checkNotNull(ctx);
+               this.queryServiceRetriever = 
Preconditions.checkNotNull(queryServiceRetriever);
+               this.executor = Preconditions.checkNotNull(executor);
+               this.timeout = Preconditions.checkNotNull(timeout);
        }
 
        /**
@@ -101,38 +98,38 @@ public class MetricFetcher {
 
        private void fetchMetrics() {
                try {
-                       Option<scala.Tuple2<ActorGateway, Integer>> 
jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort();
-                       if (jobManagerGatewayAndWebPort.isDefined()) {
-                               ActorGateway jobManager = 
jobManagerGatewayAndWebPort.get()._1();
+                       Optional<JobManagerGateway> optJobManagerGateway = 
retriever.getJobManagerGatewayNow();
+                       if (optJobManagerGateway.isPresent()) {
+                               final JobManagerGateway jobManagerGateway = 
optJobManagerGateway.get();
 
                                /**
                                 * Remove all metrics that belong to a job that 
is not running and no longer archived.
                                 */
-                               Future<Object> jobDetailsFuture = 
jobManager.ask(new RequestJobDetails(true, true), timeout);
-                               jobDetailsFuture
-                                       .onSuccess(new OnSuccess<Object>() {
-                                               @Override
-                                               public void onSuccess(Object 
result) throws Throwable {
-                                                       MultipleJobsDetails 
details = (MultipleJobsDetails) result;
+                               CompletableFuture<MultipleJobsDetails> 
jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout);
+
+                               jobDetailsFuture.whenCompleteAsync(
+                                       (MultipleJobsDetails jobDetails, 
Throwable throwable) -> {
+                                               if (throwable != null) {
+                                                       LOG.debug("Fetching of 
JobDetails failed.", throwable);
+                                               } else {
                                                        ArrayList<String> 
toRetain = new ArrayList<>();
-                                                       for (JobDetails job : 
details.getRunningJobs()) {
+                                                       for (JobDetails job : 
jobDetails.getRunningJobs()) {
                                                                
toRetain.add(job.getJobId().toString());
                                                        }
-                                                       for (JobDetails job : 
details.getFinishedJobs()) {
+                                                       for (JobDetails job : 
jobDetails.getFinishedJobs()) {
                                                                
toRetain.add(job.getJobId().toString());
                                                        }
                                                        synchronized (metrics) {
                                                                
metrics.jobs.keySet().retainAll(toRetain);
                                                        }
                                                }
-                                       }, ctx);
-                               logErrorOnFailure(jobDetailsFuture, "Fetching 
of JobDetails failed.");
+                                       },
+                                       executor);
 
-                               String jobManagerPath = jobManager.path();
-                               String queryServicePath = 
jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME;
-                               ActorRef jobManagerQueryService = 
actorSystem.actorFor(queryServicePath);
+                               String jobManagerPath = 
jobManagerGateway.getAddress();
+                               String jmQueryServicePath = 
jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME;
 
-                               queryMetrics(jobManagerQueryService);
+                               retrieveAndQueryMetrics(jmQueryServicePath);
 
                                /**
                                 * We first request the list of all registered 
task managers from the job manager, and then
@@ -140,88 +137,75 @@ public class MetricFetcher {
                                 *
                                 * <p>All stored metrics that do not belong to 
a registered task manager will be removed.
                                 */
-                               Future<Object> registeredTaskManagersFuture = 
jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout);
-                               registeredTaskManagersFuture
-                                       .onSuccess(new OnSuccess<Object>() {
-                                               @Override
-                                               public void onSuccess(Object 
result) throws Throwable {
-                                                       Iterable<Instance> 
taskManagers = ((JobManagerMessages.RegisteredTaskManagers) 
result).asJavaIterable();
-                                                       List<String> 
activeTaskManagers = new ArrayList<>();
-                                                       for (Instance 
taskManager : taskManagers) {
-                                                               
activeTaskManagers.add(taskManager.getId().toString());
-
-                                                               String 
taskManagerPath = taskManager.getTaskManagerGateway().getAddress();
-                                                               String 
queryServicePath = taskManagerPath.substring(0, 
taskManagerPath.lastIndexOf('/') + 1) + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + 
taskManager.getTaskManagerID().getResourceIdString();
-                                                               ActorRef 
taskManagerQueryService = actorSystem.actorFor(queryServicePath);
-
-                                                               
queryMetrics(taskManagerQueryService);
-                                                       }
-                                                       synchronized (metrics) 
{ // remove all metrics belonging to unregistered task managers
+                               CompletableFuture<Collection<Instance>> 
taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
+
+                               taskManagersFuture.whenCompleteAsync(
+                                       (Collection<Instance> taskManagers, 
Throwable throwable) -> {
+                                               if (throwable != null) {
+                                                       LOG.debug("Fetching 
list of registered TaskManagers failed.", throwable);
+                                               } else {
+                                                       List<String> 
activeTaskManagers = taskManagers.stream().map(
+                                                               
taskManagerInstance -> {
+                                                                       final 
String taskManagerAddress = 
taskManagerInstance.getTaskManagerGateway().getAddress();
+                                                                       final 
String tmQueryServicePath = taskManagerAddress.substring(0, 
taskManagerAddress.lastIndexOf('/') + 1) + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + 
taskManagerInstance.getTaskManagerID().getResourceIdString();
+
+                                                                       
retrieveAndQueryMetrics(tmQueryServicePath);
+
+                                                                       return 
taskManagerInstance.getId().toString();
+                                                               
}).collect(Collectors.toList());
+
+                                                       synchronized (metrics) {
                                                                
metrics.taskManagers.keySet().retainAll(activeTaskManagers);
                                                        }
                                                }
-                                       }, ctx);
-                               logErrorOnFailure(registeredTaskManagersFuture, 
"Fetchin list of registered TaskManagers failed.");
+                                       },
+                                       executor);
                        }
                } catch (Exception e) {
                        LOG.warn("Exception while fetching metrics.", e);
                }
        }
 
-       private void logErrorOnFailure(Future<Object> future, final String 
message) {
-               future.onFailure(new OnFailure() {
-                       @Override
-                       public void onFailure(Throwable failure) throws 
Throwable {
-                               LOG.debug(message, failure);
-                       }
-               }, ctx);
-       }
-
        /**
-        * Requests a metric dump from the given actor.
+        * Retrieves and queries the specified QueryServiceGateway.
         *
-        * @param actor ActorRef to request the dump from
-     */
-       private void queryMetrics(ActorRef actor) {
-               Future<Object> metricQueryFuture = new 
BasicGateway(actor).ask(MetricQueryService.getCreateDump(), timeout);
-               metricQueryFuture
-                       .onSuccess(new OnSuccess<Object>() {
-                               @Override
-                               public void onSuccess(Object result) throws 
Throwable {
-                                       addMetrics(result);
+        * @param queryServicePath specifying the QueryServiceGateway
+        */
+       private void retrieveAndQueryMetrics(String queryServicePath) {
+               final CompletableFuture<MetricQueryServiceGateway> 
queryServiceGatewayFuture = 
queryServiceRetriever.retrieveService(queryServicePath);
+
+               queryServiceGatewayFuture.whenCompleteAsync(
+                       (MetricQueryServiceGateway queryServiceGateway, 
Throwable t) -> {
+                               if (t != null) {
+                                       LOG.debug("Could not retrieve 
QueryServiceGateway.", t);
+                               } else {
+                                       queryMetrics(queryServiceGateway);
                                }
-                       }, ctx);
-               logErrorOnFailure(metricQueryFuture, "Fetching metrics 
failed.");
-       }
-
-       private void addMetrics(Object result) {
-               MetricDumpSerialization.MetricSerializationResult data = 
(MetricDumpSerialization.MetricSerializationResult) result;
-               List<MetricDump> dumpedMetrics = deserializer.deserialize(data);
-               for (MetricDump metric : dumpedMetrics) {
-                       metrics.add(metric);
-               }
+                       },
+                       executor);
        }
 
        /**
-        * Helper class that allows mocking of the answer.
-     */
-       static class BasicGateway {
-               private final ActorRef actor;
-
-               private BasicGateway(ActorRef actor) {
-                       this.actor = actor;
-               }
-
-               /**
-                * Sends a message asynchronously and returns its response. The 
response to the message is
-                * returned as a future.
-                *
-                * @param message Message to be sent
-                * @param timeout Timeout until the Future is completed with an 
AskTimeoutException
-                * @return Future which contains the response to the sent 
message
-                */
-               public Future<Object> ask(Object message, FiniteDuration 
timeout) {
-                       return Patterns.ask(actor, message, new 
Timeout(timeout));
-               }
+        * Query the metrics from the given QueryServiceGateway.
+        *
+        * @param queryServiceGateway to query for metrics
+        */
+       private void queryMetrics(final MetricQueryServiceGateway 
queryServiceGateway) {
+               queryServiceGateway
+                       .queryMetrics(timeout)
+                       .whenCompleteAsync(
+                               
(MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> {
+                                       if (t != null) {
+                                               LOG.debug("Fetching metrics 
failed.", t);
+                                       } else {
+                                               List<MetricDump> dumpedMetrics 
= deserializer.deserialize(result);
+                                               synchronized (metrics) {
+                                                       for (MetricDump metric 
: dumpedMetrics) {
+                                                               
metrics.add(metric);
+                                                       }
+                                               }
+                                       }
+                               },
+                               executor);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index 7cd2932..b3ce135 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -158,25 +158,20 @@ public class WebFrontendITCase extends TestLogger {
        }
 
        @Test
-       public void getTaskmanagers() {
-               try {
-                       String json = 
TestBaseUtils.getFromHTTP("http://localhost:"; + port + "/taskmanagers/");
+       public void getTaskmanagers() throws Exception {
+               String json = TestBaseUtils.getFromHTTP("http://localhost:"; + 
port + "/taskmanagers/");
 
-                       ObjectMapper mapper = new ObjectMapper();
-                       JsonNode parsed = mapper.readTree(json);
-                       ArrayNode taskManagers = (ArrayNode) 
parsed.get("taskmanagers");
+               ObjectMapper mapper = new ObjectMapper();
+               JsonNode parsed = mapper.readTree(json);
+               ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers");
 
-                       assertNotNull(taskManagers);
-                       assertEquals(cluster.numTaskManagers(), 
taskManagers.size());
+               assertNotNull(taskManagers);
+               assertEquals(cluster.numTaskManagers(), taskManagers.size());
 
-                       JsonNode taskManager = taskManagers.get(0);
-                       assertNotNull(taskManager);
-                       assertEquals(NUM_SLOTS, 
taskManager.get("slotsNumber").asInt());
-                       assertTrue(taskManager.get("freeSlots").asInt() <= 
NUM_SLOTS);
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
+               JsonNode taskManager = taskManagers.get(0);
+               assertNotNull(taskManager);
+               assertEquals(NUM_SLOTS, taskManager.get("slotsNumber").asInt());
+               assertTrue(taskManager.get("freeSlots").asInt() <= NUM_SLOTS);
        }
 
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index fe16445..5829d1c 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.WebOptions;
@@ -27,12 +28,15 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.webmonitor.files.MimeTypes;
+import 
org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
 import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
 import org.apache.flink.util.TestLogger;
 
@@ -44,11 +48,12 @@ import org.apache.curator.test.TestingServer;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.powermock.reflect.Whitebox;
 
 import java.io.File;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.Scanner;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -71,7 +76,9 @@ public class WebRuntimeMonitorITCase extends TestLogger {
        @Rule
        public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
-       private static final FiniteDuration TestTimeout = new FiniteDuration(2, 
TimeUnit.MINUTES);
+       private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(2L, TimeUnit.MINUTES);
+
+       private static final Time TIMEOUT = 
Time.milliseconds(TEST_TIMEOUT.toMillis());
 
        private final String mainResourcesPath = 
getClass().getResource("/web").getPath();
 
@@ -80,7 +87,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
         */
        @Test
        public void testStandaloneWebRuntimeMonitor() throws Exception {
-               final Deadline deadline = TestTimeout.fromNow();
+               final Deadline deadline = TEST_TIMEOUT.fromNow();
 
                TestingCluster flink = null;
                WebRuntimeMonitor webMonitor = null;
@@ -89,7 +96,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
                        // Flink w/o a web monitor
                        flink = new TestingCluster(new Configuration());
                        flink.start(true);
-                       webMonitor = startWebRuntimeMonitor(flink);
+                       webMonitor = startWebRuntimeMonitor(flink, TIMEOUT);
 
                        try (HttpTestClient client = new 
HttpTestClient("localhost", webMonitor.getServerPort())) {
                                String expected = new Scanner(new 
File(mainResourcesPath + "/index.html"))
@@ -129,10 +136,11 @@ public class WebRuntimeMonitorITCase extends TestLogger {
         */
        @Test
        public void testRedirectToLeader() throws Exception {
-               final Deadline deadline = TestTimeout.fromNow();
+               final Deadline deadline = TEST_TIMEOUT.fromNow();
 
                ActorSystem[] jobManagerSystem = new ActorSystem[2];
                WebRuntimeMonitor[] webMonitor = new WebRuntimeMonitor[2];
+               AkkaJobManagerRetriever[] jobManagerRetrievers = new 
AkkaJobManagerRetriever[2];
                HighAvailabilityServices highAvailabilityServices = null;
 
                try (TestingServer zooKeeper = new TestingServer()) {
@@ -157,11 +165,16 @@ public class WebRuntimeMonitorITCase extends TestLogger {
                        }
 
                        for (int i = 0; i < webMonitor.length; i++) {
+                               jobManagerRetrievers[i] = new 
AkkaJobManagerRetriever(jobManagerSystem[i], TIMEOUT);
+
                                webMonitor[i] = new WebRuntimeMonitor(
                                        config,
                                        
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
                                        
highAvailabilityServices.createBlobStore(),
-                                       jobManagerSystem[i]);
+                                       jobManagerRetrievers[i],
+                                       new 
AkkaQueryServiceRetriever(jobManagerSystem[i], TIMEOUT),
+                                       TIMEOUT,
+                                       TestingUtils.defaultExecutor());
                        }
 
                        ActorRef[] jobManager = new ActorRef[2];
@@ -196,22 +209,18 @@ public class WebRuntimeMonitorITCase extends TestLogger {
                        int followerIndex = (leaderIndex + 1) % 2;
 
                        ActorSystem leadingSystem = 
jobManagerSystem[leaderIndex];
-                       ActorSystem followerSystem = 
jobManagerSystem[followerIndex];
 
                        WebMonitor leadingWebMonitor = webMonitor[leaderIndex];
                        WebMonitor followerWebMonitor = 
webMonitor[followerIndex];
 
                        // For test stability reason we have to wait until we 
are sure that both leader
                        // listeners have been notified.
-                       JobManagerRetriever leadingRetriever = Whitebox
-                                       .getInternalState(leadingWebMonitor, 
"retriever");
-
-                       JobManagerRetriever followerRetriever = Whitebox
-                                       .getInternalState(followerWebMonitor, 
"retriever");
+                       AkkaJobManagerRetriever leadingRetriever = 
jobManagerRetrievers[leaderIndex];
+                       AkkaJobManagerRetriever followerRetriever = 
jobManagerRetrievers[followerIndex];
 
                        // Wait for the initial notifications
-                       waitForLeaderNotification(leadingSystem, 
jobManager[leaderIndex], leadingRetriever, deadline);
-                       waitForLeaderNotification(leadingSystem, 
jobManager[leaderIndex], followerRetriever, deadline);
+                       
waitForLeaderNotification(jobManager[leaderIndex].path().toString(), 
leadingRetriever, deadline);
+                       
waitForLeaderNotification(AkkaUtils.getAkkaURL(leadingSystem, 
jobManager[leaderIndex]), followerRetriever, deadline);
 
                        try (
                                        HttpTestClient leaderClient = new 
HttpTestClient(
@@ -241,7 +250,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
                                leadingSystem.shutdown();
 
                                // Wait for the notification of the follower
-                               waitForLeaderNotification(followerSystem, 
jobManager[followerIndex], followerRetriever, deadline);
+                               
waitForLeaderNotification(jobManager[followerIndex].path().toString(), 
followerRetriever, deadline);
 
                                // Same request to the new leader
                                followingClient.sendGetRequest("index.html", 
deadline.timeLeft());
@@ -282,7 +291,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 
        @Test
        public void testLeaderNotAvailable() throws Exception {
-               final Deadline deadline = TestTimeout.fromNow();
+               final Deadline deadline = TEST_TIMEOUT.fromNow();
 
                ActorSystem actorSystem = null;
                WebRuntimeMonitor webRuntimeMonitor = null;
@@ -305,7 +314,10 @@ public class WebRuntimeMonitorITCase extends TestLogger {
                                config,
                                mock(LeaderRetrievalService.class),
                                mock(BlobView.class),
-                               actorSystem);
+                               new AkkaJobManagerRetriever(actorSystem, 
TIMEOUT),
+                               new AkkaQueryServiceRetriever(actorSystem, 
TIMEOUT),
+                               TIMEOUT,
+                               TestingUtils.defaultExecutor());
 
                        webRuntimeMonitor.start("akka://schmakka");
 
@@ -343,7 +355,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
         */
        @Test
        public void testNoEscape() throws Exception {
-               final Deadline deadline = TestTimeout.fromNow();
+               final Deadline deadline = TEST_TIMEOUT.fromNow();
 
                TestingCluster flink = null;
                WebRuntimeMonitor webMonitor = null;
@@ -351,7 +363,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
                try {
                        flink = new TestingCluster(new Configuration());
                        flink.start(true);
-                       webMonitor = startWebRuntimeMonitor(flink);
+                       webMonitor = startWebRuntimeMonitor(flink, TIMEOUT);
 
                        try (HttpTestClient client = new 
HttpTestClient("localhost", webMonitor.getServerPort())) {
                                String expectedIndex = new Scanner(new 
File(mainResourcesPath + "/index.html"))
@@ -405,7 +417,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
         */
        @Test
        public void testNoCopyFromJar() throws Exception {
-               final Deadline deadline = TestTimeout.fromNow();
+               final Deadline deadline = TEST_TIMEOUT.fromNow();
 
                TestingCluster flink = null;
                WebRuntimeMonitor webMonitor = null;
@@ -413,7 +425,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
                try {
                        flink = new TestingCluster(new Configuration());
                        flink.start(true);
-                       webMonitor = startWebRuntimeMonitor(flink);
+                       webMonitor = startWebRuntimeMonitor(flink, TIMEOUT);
 
                        try (HttpTestClient client = new 
HttpTestClient("localhost", webMonitor.getServerPort())) {
                                String expectedIndex = new Scanner(new 
File(mainResourcesPath + "/index.html"))
@@ -459,7 +471,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
        }
 
        private WebRuntimeMonitor startWebRuntimeMonitor(
-               TestingCluster flink) throws Exception {
+                       TestingCluster flink,
+                       Time timeout) throws Exception {
 
                ActorSystem jmActorSystem = 
flink.jobManagerActorSystems().get().head();
                ActorRef jmActor = flink.jobManagerActors().get().head();
@@ -482,7 +495,10 @@ public class WebRuntimeMonitorITCase extends TestLogger {
                        config,
                        
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
                        highAvailabilityServices.createBlobStore(),
-                       jmActorSystem);
+                       new AkkaJobManagerRetriever(jmActorSystem, timeout),
+                       new AkkaQueryServiceRetriever(jmActorSystem, timeout),
+                       timeout,
+                       TestingUtils.defaultExecutor());
 
                webMonitor.start(jobManagerAddress);
                flink.waitForActorsToBeAlive();
@@ -492,17 +508,14 @@ public class WebRuntimeMonitorITCase extends TestLogger {
        // 
------------------------------------------------------------------------
 
        private void waitForLeaderNotification(
-                       ActorSystem system,
-                       ActorRef expectedLeader,
-                       JobManagerRetriever retriever,
+                       String expectedJobManagerURL,
+                       AkkaJobManagerRetriever retriever,
                        Deadline deadline) throws Exception {
 
-               String expectedJobManagerUrl = AkkaUtils.getAkkaURL(system, 
expectedLeader);
-
                while (deadline.hasTimeLeft()) {
-                       ActorRef leaderRef = 
retriever.awaitJobManagerGatewayAndWebPort()._1().actor();
+                       Optional<JobManagerGateway> optJobManagerGateway = 
retriever.getJobManagerGatewayNow();
 
-                       if (AkkaUtils.getAkkaURL(system, 
leaderRef).equals(expectedJobManagerUrl)) {
+                       if (optJobManagerGateway.isPresent() && 
Objects.equals(expectedJobManagerURL, optJobManagerGateway.get().getAddress())) 
{
                                return;
                        }
                        else {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
index 19e8a49..865385f 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
@@ -18,20 +18,18 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.api.common.time.Time;
+
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.duration.FiniteDuration;
-
 /**
  * Tests for the ClusterOverviewHandler.
  */
 public class ClusterOverviewHandlerTest {
        @Test
        public void testGetPaths() {
-               ClusterOverviewHandler handler = new ClusterOverviewHandler(new 
FiniteDuration(0, TimeUnit.SECONDS));
+               ClusterOverviewHandler handler = new 
ClusterOverviewHandler(Time.seconds(0L));
                String[] paths = handler.getPaths();
                Assert.assertEquals(1, paths.length);
                Assert.assertEquals("/overview", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
index e108774..ea26f5d 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
@@ -18,20 +18,18 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.api.common.time.Time;
+
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.duration.FiniteDuration;
-
 /**
  * Tests for the CurrentJobIdsHandler.
  */
 public class CurrentJobIdsHandlerTest {
        @Test
        public void testGetPaths() {
-               CurrentJobIdsHandler handler = new CurrentJobIdsHandler(new 
FiniteDuration(0, TimeUnit.SECONDS));
+               CurrentJobIdsHandler handler = new 
CurrentJobIdsHandler(Time.seconds(0L));
                String[] paths = handler.getPaths();
                Assert.assertEquals(1, paths.length);
                Assert.assertEquals("/jobs", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
index 9f3d362..64360d3 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
@@ -35,9 +36,6 @@ import org.junit.Test;
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Collection;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Tests for the CurrentJobsOverviewHandler.
@@ -68,17 +66,17 @@ public class CurrentJobsOverviewHandlerTest {
 
        @Test
        public void testGetPaths() {
-               CurrentJobsOverviewHandler handlerAll = new 
CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), true, true);
+               CurrentJobsOverviewHandler handlerAll = new 
CurrentJobsOverviewHandler(Time.seconds(0L), true, true);
                String[] pathsAll = handlerAll.getPaths();
                Assert.assertEquals(1, pathsAll.length);
                Assert.assertEquals("/joboverview", pathsAll[0]);
 
-               CurrentJobsOverviewHandler handlerRunning = new 
CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), true, 
false);
+               CurrentJobsOverviewHandler handlerRunning = new 
CurrentJobsOverviewHandler(Time.seconds(0L), true, false);
                String[] pathsRunning = handlerRunning.getPaths();
                Assert.assertEquals(1, pathsRunning.length);
                Assert.assertEquals("/joboverview/running", pathsRunning[0]);
 
-               CurrentJobsOverviewHandler handlerCompleted = new 
CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), false, 
true);
+               CurrentJobsOverviewHandler handlerCompleted = new 
CurrentJobsOverviewHandler(Time.seconds(0L), false, true);
                String[] pathsCompleted = handlerCompleted.getPaths();
                Assert.assertEquals(1, pathsCompleted.length);
                Assert.assertEquals("/joboverview/completed", 
pathsCompleted[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
index 4ddddca..ac8d934 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java
@@ -18,41 +18,54 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.DummyActorGateway;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
 import org.junit.Test;
 
-import scala.Tuple2;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for the HandlerRedirectUtils.
  */
-public class HandlerRedirectUtilsTest {
+public class HandlerRedirectUtilsTest extends TestLogger {
 
        private static final String localJobManagerAddress = 
"akka.tcp://flink@127.0.0.1:1234/user/foobar";
-       private static final String remoteURL = "127.0.0.2:1235";
+       private static final String remoteHostname = "127.0.0.2";
+       private static final int webPort = 1235;
+       private static final String remoteURL = remoteHostname + ':' + webPort;
        private static final String remotePath = "akka.tcp://flink@" + 
remoteURL + "/user/jobmanager";
 
        @Test
        public void testGetRedirectAddressWithLocalAkkaPath() throws Exception {
-               ActorGateway leaderGateway = new 
DummyActorGateway("akka://flink/user/foobar");
-
-               Tuple2<ActorGateway, Integer> leader = new 
Tuple2<>(leaderGateway, 1235);
+               JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
+               
when(jobManagerGateway.getAddress()).thenReturn("akka://flink/user/foobar");
 
-               String redirectingAddress = 
HandlerRedirectUtils.getRedirectAddress(localJobManagerAddress, leader);
+               String redirectingAddress = 
HandlerRedirectUtils.getRedirectAddress(
+                       localJobManagerAddress,
+                       jobManagerGateway,
+                       Time.seconds(3L));
 
                Assert.assertNull(redirectingAddress);
        }
 
        @Test
        public void testGetRedirectAddressWithRemoteAkkaPath() throws Exception 
{
-               ActorGateway leaderGateway = new DummyActorGateway(remotePath);
-
-               Tuple2<ActorGateway, Integer> leader = new 
Tuple2<>(leaderGateway, 1235);
+               JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
+               when(jobManagerGateway.getAddress()).thenReturn(remotePath);
+               
when(jobManagerGateway.getHostname()).thenReturn(remoteHostname);
+               
when(jobManagerGateway.requestWebPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(webPort));
 
-               String redirectingAddress = 
HandlerRedirectUtils.getRedirectAddress(localJobManagerAddress, leader);
+               String redirectingAddress = 
HandlerRedirectUtils.getRedirectAddress(
+                       localJobManagerAddress,
+                       jobManagerGateway,
+                       Time.seconds(3L));
 
                Assert.assertEquals(remoteURL, redirectingAddress);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
index fcbfa02..82aa87a 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -27,7 +30,7 @@ import org.junit.Test;
 public class JarRunHandlerTest {
        @Test
        public void testGetPaths() {
-               JarRunHandler handler = new JarRunHandler(null, null, null);
+               JarRunHandler handler = new JarRunHandler(null, 
Time.seconds(0L), new Configuration());
                String[] paths = handler.getPaths();
                Assert.assertEquals(1, paths.length);
                Assert.assertEquals("/jars/:jarid/run", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
index 25fca9b..fe55f51 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -31,6 +32,8 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.Collection;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Tests for the JobAccumulatorsHandler.
  */
@@ -51,7 +54,7 @@ public class JobAccumulatorsHandlerTest {
 
        @Test
        public void testGetPaths() {
-               JobAccumulatorsHandler handler = new 
JobAccumulatorsHandler(null);
+               JobAccumulatorsHandler handler = new 
JobAccumulatorsHandler(mock(ExecutionGraphHolder.class));
                String[] paths = handler.getPaths();
                Assert.assertEquals(1, paths.length);
                Assert.assertEquals("/jobs/:jobid/accumulators", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
index ed54000..98d9353 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+
 import com.google.common.collect.Lists;
 import org.junit.Assert;
 import org.junit.Test;
@@ -30,7 +32,7 @@ import java.util.List;
 public class JobCancellationHandlerTest {
        @Test
        public void testGetPaths() {
-               JobCancellationHandler handler = new JobCancellationHandler();
+               JobCancellationHandler handler = new 
JobCancellationHandler(TestingUtils.TIMEOUT());
                String[] paths = handler.getPaths();
                Assert.assertEquals(2, paths.length);
                List<String> pathsList = Lists.newArrayList(paths);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
index 64a07c8..b48ee66 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
@@ -19,21 +19,20 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.instance.ActorGateway;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.util.TestLogger;
 
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
-import akka.dispatch.ExecutionContexts$;
-import akka.dispatch.Futures;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.junit.Assert;
@@ -45,15 +44,14 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-import scala.concurrent.impl.Promise;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -62,13 +60,13 @@ import static org.mockito.Mockito.when;
 /**
  * Tests for the JobCancellationWithSavepointHandler.
  */
-public class JobCancellationWithSavepointHandlersTest {
+public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 
-       private static final ExecutionContext EC = 
ExecutionContexts$.MODULE$.fromExecutor(Executors.directExecutor());
+       private static final Executor executor = Executors.directExecutor();
 
        @Test
        public void testGetPaths() {
-               JobCancellationWithSavepointHandlers handler = new 
JobCancellationWithSavepointHandlers(mock(ExecutionGraphHolder.class), EC);
+               JobCancellationWithSavepointHandlers handler = new 
JobCancellationWithSavepointHandlers(mock(ExecutionGraphHolder.class), 
executor);
 
                JobCancellationWithSavepointHandlers.TriggerHandler 
triggerHandler = handler.getTriggerHandler();
                String[] triggerPaths = triggerHandler.getPaths();
@@ -94,25 +92,23 @@ public class JobCancellationWithSavepointHandlersTest {
                ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
                ExecutionGraph graph = mock(ExecutionGraph.class);
                CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
-               when(holder.getExecutionGraph(eq(jobId), 
any(ActorGateway.class))).thenReturn(graph);
+               when(holder.getExecutionGraph(eq(jobId), 
any(JobManagerGateway.class))).thenReturn(Optional.of(graph));
                when(graph.getCheckpointCoordinator()).thenReturn(coord);
                when(coord.getCheckpointTimeout()).thenReturn(timeout);
 
-               JobCancellationWithSavepointHandlers handlers = new 
JobCancellationWithSavepointHandlers(holder, EC);
+               JobCancellationWithSavepointHandlers handlers = new 
JobCancellationWithSavepointHandlers(holder, executor);
                JobCancellationWithSavepointHandlers.TriggerHandler handler = 
handlers.getTriggerHandler();
 
                Map<String, String> params = new HashMap<>();
                params.put("jobid", jobId.toString());
                params.put("targetDirectory", "placeholder");
 
-               ActorGateway jobManager = mock(ActorGateway.class);
-
-               Future<Object> future = Futures.successful((Object) new 
CancellationSuccess(jobId, null));
-               when(jobManager.ask(any(Object.class), 
any(FiniteDuration.class))).thenReturn(future);
+               JobManagerGateway jobManager = mock(JobManagerGateway.class);
+               when(jobManager.cancelJobWithSavepoint(eq(jobId), anyString(), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture("foobar"));
 
-               handler.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager);
+               handler.handleRequest(params, Collections.emptyMap(), 
jobManager);
 
-               verify(jobManager).ask(any(CancelJobWithSavepoint.class), 
eq(FiniteDuration.apply(timeout, "ms")));
+               verify(jobManager).cancelJobWithSavepoint(eq(jobId), 
anyString(), any(Time.class));
        }
 
        /**
@@ -125,36 +121,34 @@ public class JobCancellationWithSavepointHandlersTest {
                ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
                ExecutionGraph graph = mock(ExecutionGraph.class);
                CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
-               when(holder.getExecutionGraph(eq(jobId), 
any(ActorGateway.class))).thenReturn(graph);
+               when(holder.getExecutionGraph(eq(jobId), 
any(JobManagerGateway.class))).thenReturn(Optional.of(graph));
                when(graph.getCheckpointCoordinator()).thenReturn(coord);
                when(coord.getCheckpointTimeout()).thenReturn(timeout);
 
-               JobCancellationWithSavepointHandlers handlers = new 
JobCancellationWithSavepointHandlers(holder, EC, "the-default-directory");
+               JobCancellationWithSavepointHandlers handlers = new 
JobCancellationWithSavepointHandlers(holder, executor, "the-default-directory");
                JobCancellationWithSavepointHandlers.TriggerHandler handler = 
handlers.getTriggerHandler();
 
                Map<String, String> params = new HashMap<>();
                params.put("jobid", jobId.toString());
 
-               ActorGateway jobManager = mock(ActorGateway.class);
-
-               Future<Object> future = Futures.successful((Object) new 
CancellationSuccess(jobId, null));
-               when(jobManager.ask(any(Object.class), 
any(FiniteDuration.class))).thenReturn(future);
+               JobManagerGateway jobManager = mock(JobManagerGateway.class);
+               when(jobManager.cancelJobWithSavepoint(eq(jobId), anyString(), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture("foobar"));
 
                // 1. Use targetDirectory path param
                params.put("targetDirectory", "custom-directory");
                handler.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager);
 
-               verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, 
"custom-directory")), eq(FiniteDuration.apply(timeout, "ms")));
+               verify(jobManager).cancelJobWithSavepoint(eq(jobId), 
eq("custom-directory"), any(Time.class));
 
                // 2. Use default
                params.remove("targetDirectory");
 
                handler.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager);
 
-               verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, 
"the-default-directory")), eq(FiniteDuration.apply(timeout, "ms")));
+               verify(jobManager).cancelJobWithSavepoint(eq(jobId), 
eq("the-default-directory"), any(Time.class));
 
                // 3. Throw Exception
-               handlers = new JobCancellationWithSavepointHandlers(holder, EC, 
null);
+               handlers = new JobCancellationWithSavepointHandlers(holder, 
executor, null);
                handler = handlers.getTriggerHandler();
 
                try {
@@ -175,10 +169,10 @@ public class JobCancellationWithSavepointHandlersTest {
                ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
                ExecutionGraph graph = mock(ExecutionGraph.class);
                CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
-               when(holder.getExecutionGraph(eq(jobId), 
any(ActorGateway.class))).thenReturn(graph);
+               when(holder.getExecutionGraph(eq(jobId), 
any(JobManagerGateway.class))).thenReturn(Optional.of(graph));
                when(graph.getCheckpointCoordinator()).thenReturn(coord);
 
-               JobCancellationWithSavepointHandlers handlers = new 
JobCancellationWithSavepointHandlers(holder, EC);
+               JobCancellationWithSavepointHandlers handlers = new 
JobCancellationWithSavepointHandlers(holder, executor);
                JobCancellationWithSavepointHandlers.TriggerHandler trigger = 
handlers.getTriggerHandler();
                JobCancellationWithSavepointHandlers.InProgressHandler progress 
= handlers.getInProgressHandler();
 
@@ -186,16 +180,16 @@ public class JobCancellationWithSavepointHandlersTest {
                params.put("jobid", jobId.toString());
                params.put("targetDirectory", "custom-directory");
 
-               ActorGateway jobManager = mock(ActorGateway.class);
+               JobManagerGateway jobManager = mock(JobManagerGateway.class);
 
                // Successful
-               Promise<Object> promise = new Promise.DefaultPromise<>();
-               when(jobManager.ask(any(Object.class), 
any(FiniteDuration.class))).thenReturn(promise);
+               CompletableFuture<String> successfulCancelWithSavepoint = new 
CompletableFuture<>();
+               when(jobManager.cancelJobWithSavepoint(eq(jobId), 
eq("custom-directory"), 
any(Time.class))).thenReturn(successfulCancelWithSavepoint);
 
                // Trigger
                FullHttpResponse response = trigger.handleRequest(params, 
Collections.<String, String>emptyMap(), jobManager);
 
-               verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, 
"custom-directory")), any(FiniteDuration.class));
+               verify(jobManager).cancelJobWithSavepoint(eq(jobId), 
eq("custom-directory"), any(Time.class));
 
                String location = 
String.format("/jobs/%s/cancel-with-savepoint/in-progress/1", jobId);
 
@@ -226,7 +220,7 @@ public class JobCancellationWithSavepointHandlersTest {
                assertEquals(location, root.get("location").asText());
 
                // Only single actual request
-               verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, 
"custom-directory")), any(FiniteDuration.class));
+               verify(jobManager).cancelJobWithSavepoint(eq(jobId), 
eq("custom-directory"), any(Time.class));
 
                // Query progress
                params.put("requestId", "1");
@@ -243,7 +237,7 @@ public class JobCancellationWithSavepointHandlersTest {
                assertEquals("1", root.get("request-id").asText());
 
                // Complete
-               promise.success(new CancellationSuccess(jobId, 
"_path-savepoint_"));
+               successfulCancelWithSavepoint.complete("_path-savepoint_");
 
                response = progress.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager);
 
@@ -301,10 +295,10 @@ public class JobCancellationWithSavepointHandlersTest {
                ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
                ExecutionGraph graph = mock(ExecutionGraph.class);
                CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
-               when(holder.getExecutionGraph(eq(jobId), 
any(ActorGateway.class))).thenReturn(graph);
+               when(holder.getExecutionGraph(eq(jobId), 
any(JobManagerGateway.class))).thenReturn(Optional.of(graph));
                when(graph.getCheckpointCoordinator()).thenReturn(coord);
 
-               JobCancellationWithSavepointHandlers handlers = new 
JobCancellationWithSavepointHandlers(holder, EC);
+               JobCancellationWithSavepointHandlers handlers = new 
JobCancellationWithSavepointHandlers(holder, executor);
                JobCancellationWithSavepointHandlers.TriggerHandler trigger = 
handlers.getTriggerHandler();
                JobCancellationWithSavepointHandlers.InProgressHandler progress 
= handlers.getInProgressHandler();
 
@@ -312,15 +306,15 @@ public class JobCancellationWithSavepointHandlersTest {
                params.put("jobid", jobId.toString());
                params.put("targetDirectory", "custom-directory");
 
-               ActorGateway jobManager = mock(ActorGateway.class);
+               JobManagerGateway jobManager = mock(JobManagerGateway.class);
 
                // Successful
-               Future<Object> future = Futures.failed(new Exception("Test 
Exception"));
-               when(jobManager.ask(any(Object.class), 
any(FiniteDuration.class))).thenReturn(future);
+               CompletableFuture<String> unsuccessfulCancelWithSavepoint = 
FutureUtils.completedExceptionally(new Exception("Test Exception"));
+               when(jobManager.cancelJobWithSavepoint(eq(jobId), 
eq("custom-directory"), 
any(Time.class))).thenReturn(unsuccessfulCancelWithSavepoint);
 
                // Trigger
                trigger.handleRequest(params, Collections.<String, 
String>emptyMap(), jobManager);
-               verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, 
"custom-directory")), any(FiniteDuration.class));
+               verify(jobManager).cancelJobWithSavepoint(eq(jobId), 
eq("custom-directory"), any(Time.class));
 
                // Query progress
                params.put("requestId", "1");

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
index ad9da6b..104b0a3 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -32,6 +33,8 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Tests for the JobConfigHandler.
  */
@@ -52,7 +55,7 @@ public class JobConfigHandlerTest {
 
        @Test
        public void testGetPaths() {
-               JobConfigHandler handler = new JobConfigHandler(null);
+               JobConfigHandler handler = new 
JobConfigHandler(mock(ExecutionGraphHolder.class));
                String[] paths = handler.getPaths();
                Assert.assertEquals(1, paths.length);
                Assert.assertEquals("/jobs/:jobid/config", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
index d830707..f3f5943 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -39,6 +40,8 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Tests for the JobDetailsHandler.
  */
@@ -64,7 +67,7 @@ public class JobDetailsHandlerTest {
 
        @Test
        public void testGetPaths() {
-               JobDetailsHandler handler = new JobDetailsHandler(null, null);
+               JobDetailsHandler handler = new 
JobDetailsHandler(mock(ExecutionGraphHolder.class), null);
                String[] paths = handler.getPaths();
                Assert.assertEquals(2, paths.length);
                List<String> pathsList = Lists.newArrayList(paths);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
index 6016d01..f54ab06 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -35,6 +36,8 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.Collection;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Tests for the JobExceptionsHandler.
  */
@@ -55,7 +58,7 @@ public class JobExceptionsHandlerTest {
 
        @Test
        public void testGetPaths() {
-               JobExceptionsHandler handler = new JobExceptionsHandler(null);
+               JobExceptionsHandler handler = new 
JobExceptionsHandler(mock(ExecutionGraphHolder.class));
                String[] paths = handler.getPaths();
                Assert.assertEquals(1, paths.length);
                Assert.assertEquals("/jobs/:jobid/exceptions", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
index a5ea2b3..17b4c44 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -28,6 +29,8 @@ import org.junit.Test;
 
 import java.util.Collection;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Tests for the JobPlanHandler.
  */
@@ -48,7 +51,7 @@ public class JobPlanHandlerTest {
 
        @Test
        public void testGetPaths() {
-               JobPlanHandler handler = new JobPlanHandler(null);
+               JobPlanHandler handler = new 
JobPlanHandler(mock(ExecutionGraphHolder.class));
                String[] paths = handler.getPaths();
                Assert.assertEquals(1, paths.length);
                Assert.assertEquals("/jobs/:jobid/plan", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
index cac0b10..ee47ee9 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
 import com.google.common.collect.Lists;
 import org.junit.Assert;
 import org.junit.Test;
@@ -27,10 +30,10 @@ import java.util.List;
 /**
  * Tests for the JobStoppingHandler.
  */
-public class JobStoppingHandlerTest {
+public class JobStoppingHandlerTest extends TestLogger {
        @Test
        public void testGetPaths() {
-               JobStoppingHandler handler = new JobStoppingHandler();
+               JobStoppingHandler handler = new 
JobStoppingHandler(TestingUtils.TIMEOUT());
                String[] paths = handler.getPaths();
                Assert.assertEquals(2, paths.length);
                List<String> pathsList = Lists.newArrayList(paths);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
index c57aa09..b7af323 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -33,6 +34,8 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.Collection;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Tests for the JobVertexAccumulatorsHandler.
  */
@@ -54,7 +57,7 @@ public class JobVertexAccumulatorsHandlerTest {
 
        @Test
        public void testGetPaths() {
-               JobVertexAccumulatorsHandler handler = new 
JobVertexAccumulatorsHandler(null);
+               JobVertexAccumulatorsHandler handler = new 
JobVertexAccumulatorsHandler(mock(ExecutionGraphHolder.class));
                String[] paths = handler.getPaths();
                Assert.assertEquals(1, paths.length);
                
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/accumulators", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
index 8985d89..d2ac0d6 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
@@ -46,7 +46,7 @@ import static org.mockito.Mockito.when;
 public class JobVertexBackPressureHandlerTest {
        @Test
        public void testGetPaths() {
-               JobVertexBackPressureHandler handler = new 
JobVertexBackPressureHandler(null, mock(BackPressureStatsTracker.class), 0);
+               JobVertexBackPressureHandler handler = new 
JobVertexBackPressureHandler(mock(ExecutionGraphHolder.class), 
mock(BackPressureStatsTracker.class), 0);
                String[] paths = handler.getPaths();
                Assert.assertEquals(1, paths.length);
                
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/backpressure", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
index bde6a84..bc4fe9c 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -35,6 +36,8 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.Collection;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Tests for the JobVertexDetailsHandler.
  */
@@ -56,7 +59,7 @@ public class JobVertexDetailsHandlerTest {
 
        @Test
        public void testGetPaths() {
-               JobVertexDetailsHandler handler = new 
JobVertexDetailsHandler(null, null);
+               JobVertexDetailsHandler handler = new 
JobVertexDetailsHandler(mock(ExecutionGraphHolder.class), null);
                String[] paths = handler.getPaths();
                Assert.assertEquals(1, paths.length);
                Assert.assertEquals("/jobs/:jobid/vertices/:vertexid", 
paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
index 8954844..d5d877a 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -36,6 +37,8 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.Collection;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Tests for the JobVertexTaskManagersHandler.
  */
@@ -58,7 +61,7 @@ public class JobVertexTaskManagersHandlerTest {
 
        @Test
        public void testGetPaths() {
-               JobVertexTaskManagersHandler handler = new 
JobVertexTaskManagersHandler(null, null);
+               JobVertexTaskManagersHandler handler = new 
JobVertexTaskManagersHandler(mock(ExecutionGraphHolder.class), null);
                String[] paths = handler.getPaths();
                Assert.assertEquals(1, paths.length);
                
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/taskmanagers", paths[0]);

Reply via email to