http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java index d1aeea4..e2437e6 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; import com.fasterxml.jackson.core.JsonGenerator; @@ -45,7 +45,7 @@ public class JobManagerConfigHandler extends AbstractJsonRequestHandler { } @Override - public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java index c8ec689..3526734 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java @@ -19,8 +19,9 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; import java.util.Map; @@ -33,17 +34,23 @@ public class JobStoppingHandler extends AbstractJsonRequestHandler { private static final String JOB_STOPPING_REST_PATH = "/jobs/:jobid/stop"; private static final String JOB_STOPPING_YARN_REST_PATH = "/jobs/:jobid/yarn-stop"; + private final Time timeout; + + public JobStoppingHandler(Time timeout) { + this.timeout = Preconditions.checkNotNull(timeout); + } + @Override public String[] getPaths() { return new String[]{JOB_STOPPING_REST_PATH, JOB_STOPPING_YARN_REST_PATH}; } @Override - public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception { try { - JobID jobid = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid"))); - if (jobManager != null) { - jobManager.tell(new JobManagerMessages.StopJob(jobid)); + JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid"))); + if (jobManagerGateway != null) { + jobManagerGateway.stopJob(jobId, timeout); return "{}"; } else { http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java index 8646df9..079be8f 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java @@ -18,7 +18,8 @@ package org.apache.flink.runtime.webmonitor.handlers; -import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.webmonitor.NotFoundException; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse; @@ -40,16 +41,16 @@ public interface RequestHandler { * * @param pathParams The map of REST path parameters, decoded by the router. * @param queryParams The map of query parameters. - * @param jobManager The JobManager actor. + * @param jobManagerGateway to talk to the JobManager. * * @return The full http response. * * @throws Exception Handlers may forward exceptions. Exceptions of type - * {@link org.apache.flink.runtime.webmonitor.NotFoundException} will cause a HTTP 404 + * {@link NotFoundException} will cause a HTTP 404 * response with the exception message, other exceptions will cause a HTTP 500 response * with the exception stack trace. */ - FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception; + FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception; /** * Returns an array of REST URL's under which this handler can be registered. http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java index 3562874..b7fee2d 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java @@ -33,13 +33,11 @@ import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.blob.BlobView; import org.apache.flink.runtime.concurrent.FlinkFutureException; -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceID; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase; +import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; @@ -62,7 +60,6 @@ import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile; import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future; import org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener; -import akka.dispatch.Mapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,13 +71,10 @@ import java.net.InetSocketAddress; import java.nio.channels.FileChannel; import java.util.HashMap; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; - -import scala.Option; -import scala.concurrent.ExecutionContextExecutor; -import scala.concurrent.duration.FiniteDuration; -import scala.reflect.ClassTag$; +import java.util.concurrent.Executor; import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; @@ -115,9 +109,7 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase { /** Indicates which log file should be displayed. */ private FileMode fileMode; - private final ExecutionContextExecutor executor; - - private final Time timeTimeout; + private final Executor executor; private final BlobView blobView; @@ -129,9 +121,9 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase { public TaskManagerLogHandler( JobManagerRetriever retriever, - ExecutionContextExecutor executor, - scala.concurrent.Future<String> localJobManagerAddressPromise, - FiniteDuration timeout, + Executor executor, + CompletableFuture<String> localJobManagerAddressPromise, + Time timeout, FileMode fileMode, Configuration config, boolean httpsEnabled, @@ -143,8 +135,6 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase { this.fileMode = fileMode; this.blobView = Preconditions.checkNotNull(blobView, "blobView"); - - timeTimeout = Time.milliseconds(timeout.toMillis()); } @Override @@ -162,20 +152,18 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase { * Response when running with leading JobManager. */ @Override - protected void respondAsLeader(final ChannelHandlerContext ctx, final Routed routed, final ActorGateway jobManager) { + protected void respondAsLeader(final ChannelHandlerContext ctx, final Routed routed, final JobManagerGateway jobManagerGateway) { if (cache == null) { - scala.concurrent.Future<Object> portFuture = jobManager.ask(JobManagerMessages.getRequestBlobManagerPort(), timeout); - scala.concurrent.Future<BlobCache> cacheFuture = portFuture.map(new Mapper<Object, BlobCache>() { - @Override - public BlobCache checkedApply(Object result) throws IOException { - Option<String> hostOption = jobManager.actor().path().address().host(); - String host = hostOption.isDefined() ? hostOption.get() : "localhost"; - int port = (int) result; - return new BlobCache(new InetSocketAddress(host, port), config, blobView); - } - }, executor); - - cache = FutureUtils.toJava(cacheFuture); + CompletableFuture<Integer> blobPortFuture = jobManagerGateway.requestBlobServerPort(timeout); + cache = blobPortFuture.thenApplyAsync( + (Integer port) -> { + try { + return new BlobCache(new InetSocketAddress(jobManagerGateway.getHostname(), port), config, blobView); + } catch (IOException e) { + throw new FlinkFutureException("Could not create BlobCache.", e); + } + }, + executor); } final String taskManagerID = routed.pathParams().get(TaskManagersHandler.TASK_MANAGER_ID_KEY); @@ -185,22 +173,18 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase { if (lastRequestPending.putIfAbsent(taskManagerID, true) == null) { try { InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(taskManagerID)); - scala.concurrent.Future<JobManagerMessages.TaskManagerInstance> scalaTaskManagerFuture = jobManager - .ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout) - .mapTo(ClassTag$.MODULE$.<JobManagerMessages.TaskManagerInstance>apply(JobManagerMessages.TaskManagerInstance.class)); - - CompletableFuture<JobManagerMessages.TaskManagerInstance> taskManagerFuture = FutureUtils.toJava(scalaTaskManagerFuture); + CompletableFuture<Optional<Instance>> taskManagerFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout); CompletableFuture<BlobKey> blobKeyFuture = taskManagerFuture.thenCompose( - taskManagerInstance -> { - Instance taskManager = taskManagerInstance.instance().get(); - + (Optional<Instance> optTMInstance) -> { + Instance taskManagerInstance = optTMInstance.orElseThrow( + () -> new FlinkFutureException("Could not find instance with " + instanceID + '.')); switch (fileMode) { case LOG: - return taskManager.getTaskManagerGateway().requestTaskManagerLog(timeTimeout); + return taskManagerInstance.getTaskManagerGateway().requestTaskManagerLog(timeout); case STDOUT: default: - return taskManager.getTaskManagerGateway().requestTaskManagerStdout(timeTimeout); + return taskManagerInstance.getTaskManagerGateway().requestTaskManagerStdout(timeout); } } ); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java index 6ad490e..a8ab7a3 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java @@ -18,12 +18,10 @@ package org.apache.flink.runtime.webmonitor.handlers; -import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceID; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.messages.JobManagerMessages.RegisteredTaskManagers; -import org.apache.flink.runtime.messages.JobManagerMessages.TaskManagerInstance; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; import org.apache.flink.runtime.webmonitor.metrics.MetricStore; import org.apache.flink.util.StringUtils; @@ -32,12 +30,12 @@ import com.fasterxml.jackson.core.JsonGenerator; import java.io.StringWriter; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; - -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import static java.util.Objects.requireNonNull; @@ -51,11 +49,11 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler { public static final String TASK_MANAGER_ID_KEY = "taskmanagerid"; - private final FiniteDuration timeout; + private final Time timeout; private final MetricFetcher fetcher; - public TaskManagersHandler(FiniteDuration timeout, MetricFetcher fetcher) { + public TaskManagersHandler(Time timeout, MetricFetcher fetcher) { this.timeout = requireNonNull(timeout); this.fetcher = fetcher; } @@ -66,9 +64,9 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler { } @Override - public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception { try { - if (jobManager != null) { + if (jobManagerGateway != null) { // whether one task manager's metrics are requested, or all task manager, we // return them in an array. This avoids unnecessary code complexity. // If only one task manager is requested, we only fetch one task manager metrics. @@ -76,20 +74,21 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler { if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) { try { InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY))); - Future<Object> future = jobManager.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout); - TaskManagerInstance instance = (TaskManagerInstance) Await.result(future, timeout); - if (instance.instance().nonEmpty()) { - instances.add(instance.instance().get()); - } + CompletableFuture<Optional<Instance>> tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout); + + Optional<Instance> instance = tmInstanceFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + instance.ifPresent(instances::add); } // this means the id string was invalid. Keep the list empty. catch (IllegalArgumentException e){ // do nothing. } } else { - Future<Object> future = jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout); - RegisteredTaskManagers taskManagers = (RegisteredTaskManagers) Await.result(future, timeout); - instances.addAll(taskManagers.asJavaCollection()); + CompletableFuture<Collection<Instance>> tmInstancesFuture = jobManagerGateway.requestTaskManagerInstances(timeout); + + Collection<Instance> tmInstances = tmInstancesFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + instances.addAll(tmInstances); } StringWriter writer = new StringWriter(); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java index f96e0c2..d116c56 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java @@ -25,8 +25,8 @@ import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; import org.apache.flink.runtime.checkpoint.SubtaskStateStats; import org.apache.flink.runtime.checkpoint.TaskStateStats; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler; import org.apache.flink.runtime.webmonitor.handlers.AbstractJobVertexRequestHandler; @@ -71,8 +71,8 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap public String handleJsonRequest( Map<String, String> pathParams, Map<String, String> queryParams, - ActorGateway jobManager) throws Exception { - return super.handleJsonRequest(pathParams, queryParams, jobManager); + JobManagerGateway jobManagerGateway) throws Exception { + return super.handleJsonRequest(pathParams, queryParams, jobManagerGateway); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java index 94b135d..b95f2c4 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.webmonitor.metrics; -import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.webmonitor.handlers.AbstractJsonRequestHandler; import org.apache.flink.runtime.webmonitor.handlers.JsonFactory; import org.apache.flink.util.Preconditions; @@ -48,7 +48,7 @@ public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler } @Override - public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception { fetcher.update(); String requestedMetricsList = queryParams.get("get"); return requestedMetricsList != null http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java index 95398b5..3af9c56 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java @@ -18,37 +18,29 @@ package org.apache.flink.runtime.webmonitor.metrics; -import org.apache.flink.configuration.AkkaOptions; -import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.instance.Instance; -import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; -import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails; import org.apache.flink.runtime.metrics.dump.MetricDump; import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization; import org.apache.flink.runtime.metrics.dump.MetricQueryService; -import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; import org.apache.flink.util.Preconditions; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.dispatch.OnFailure; -import akka.dispatch.OnSuccess; -import akka.pattern.Patterns; -import akka.util.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collection; import java.util.List; -import java.util.concurrent.TimeUnit; - -import scala.Option; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.duration.Duration; -import scala.concurrent.duration.FiniteDuration; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer; @@ -61,20 +53,25 @@ import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.Metr public class MetricFetcher { private static final Logger LOG = LoggerFactory.getLogger(MetricFetcher.class); - private final ActorSystem actorSystem; private final JobManagerRetriever retriever; - private final ExecutionContext ctx; - private final FiniteDuration timeout = new FiniteDuration(Duration.create(AkkaOptions.ASK_TIMEOUT.defaultValue()).toMillis(), TimeUnit.MILLISECONDS); + private final MetricQueryServiceRetriever queryServiceRetriever; + private final Executor executor; + private final Time timeout; private MetricStore metrics = new MetricStore(); private MetricDumpDeserializer deserializer = new MetricDumpDeserializer(); private long lastUpdateTime; - public MetricFetcher(ActorSystem actorSystem, JobManagerRetriever retriever, ExecutionContext ctx) { - this.actorSystem = Preconditions.checkNotNull(actorSystem); + public MetricFetcher( + JobManagerRetriever retriever, + MetricQueryServiceRetriever queryServiceRetriever, + Executor executor, + Time timeout) { this.retriever = Preconditions.checkNotNull(retriever); - this.ctx = Preconditions.checkNotNull(ctx); + this.queryServiceRetriever = Preconditions.checkNotNull(queryServiceRetriever); + this.executor = Preconditions.checkNotNull(executor); + this.timeout = Preconditions.checkNotNull(timeout); } /** @@ -101,38 +98,38 @@ public class MetricFetcher { private void fetchMetrics() { try { - Option<scala.Tuple2<ActorGateway, Integer>> jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort(); - if (jobManagerGatewayAndWebPort.isDefined()) { - ActorGateway jobManager = jobManagerGatewayAndWebPort.get()._1(); + Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow(); + if (optJobManagerGateway.isPresent()) { + final JobManagerGateway jobManagerGateway = optJobManagerGateway.get(); /** * Remove all metrics that belong to a job that is not running and no longer archived. */ - Future<Object> jobDetailsFuture = jobManager.ask(new RequestJobDetails(true, true), timeout); - jobDetailsFuture - .onSuccess(new OnSuccess<Object>() { - @Override - public void onSuccess(Object result) throws Throwable { - MultipleJobsDetails details = (MultipleJobsDetails) result; + CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout); + + jobDetailsFuture.whenCompleteAsync( + (MultipleJobsDetails jobDetails, Throwable throwable) -> { + if (throwable != null) { + LOG.debug("Fetching of JobDetails failed.", throwable); + } else { ArrayList<String> toRetain = new ArrayList<>(); - for (JobDetails job : details.getRunningJobs()) { + for (JobDetails job : jobDetails.getRunningJobs()) { toRetain.add(job.getJobId().toString()); } - for (JobDetails job : details.getFinishedJobs()) { + for (JobDetails job : jobDetails.getFinishedJobs()) { toRetain.add(job.getJobId().toString()); } synchronized (metrics) { metrics.jobs.keySet().retainAll(toRetain); } } - }, ctx); - logErrorOnFailure(jobDetailsFuture, "Fetching of JobDetails failed."); + }, + executor); - String jobManagerPath = jobManager.path(); - String queryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME; - ActorRef jobManagerQueryService = actorSystem.actorFor(queryServicePath); + String jobManagerPath = jobManagerGateway.getAddress(); + String jmQueryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME; - queryMetrics(jobManagerQueryService); + retrieveAndQueryMetrics(jmQueryServicePath); /** * We first request the list of all registered task managers from the job manager, and then @@ -140,88 +137,75 @@ public class MetricFetcher { * * <p>All stored metrics that do not belong to a registered task manager will be removed. */ - Future<Object> registeredTaskManagersFuture = jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout); - registeredTaskManagersFuture - .onSuccess(new OnSuccess<Object>() { - @Override - public void onSuccess(Object result) throws Throwable { - Iterable<Instance> taskManagers = ((JobManagerMessages.RegisteredTaskManagers) result).asJavaIterable(); - List<String> activeTaskManagers = new ArrayList<>(); - for (Instance taskManager : taskManagers) { - activeTaskManagers.add(taskManager.getId().toString()); - - String taskManagerPath = taskManager.getTaskManagerGateway().getAddress(); - String queryServicePath = taskManagerPath.substring(0, taskManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManager.getTaskManagerID().getResourceIdString(); - ActorRef taskManagerQueryService = actorSystem.actorFor(queryServicePath); - - queryMetrics(taskManagerQueryService); - } - synchronized (metrics) { // remove all metrics belonging to unregistered task managers + CompletableFuture<Collection<Instance>> taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout); + + taskManagersFuture.whenCompleteAsync( + (Collection<Instance> taskManagers, Throwable throwable) -> { + if (throwable != null) { + LOG.debug("Fetching list of registered TaskManagers failed.", throwable); + } else { + List<String> activeTaskManagers = taskManagers.stream().map( + taskManagerInstance -> { + final String taskManagerAddress = taskManagerInstance.getTaskManagerGateway().getAddress(); + final String tmQueryServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManagerInstance.getTaskManagerID().getResourceIdString(); + + retrieveAndQueryMetrics(tmQueryServicePath); + + return taskManagerInstance.getId().toString(); + }).collect(Collectors.toList()); + + synchronized (metrics) { metrics.taskManagers.keySet().retainAll(activeTaskManagers); } } - }, ctx); - logErrorOnFailure(registeredTaskManagersFuture, "Fetchin list of registered TaskManagers failed."); + }, + executor); } } catch (Exception e) { LOG.warn("Exception while fetching metrics.", e); } } - private void logErrorOnFailure(Future<Object> future, final String message) { - future.onFailure(new OnFailure() { - @Override - public void onFailure(Throwable failure) throws Throwable { - LOG.debug(message, failure); - } - }, ctx); - } - /** - * Requests a metric dump from the given actor. + * Retrieves and queries the specified QueryServiceGateway. * - * @param actor ActorRef to request the dump from - */ - private void queryMetrics(ActorRef actor) { - Future<Object> metricQueryFuture = new BasicGateway(actor).ask(MetricQueryService.getCreateDump(), timeout); - metricQueryFuture - .onSuccess(new OnSuccess<Object>() { - @Override - public void onSuccess(Object result) throws Throwable { - addMetrics(result); + * @param queryServicePath specifying the QueryServiceGateway + */ + private void retrieveAndQueryMetrics(String queryServicePath) { + final CompletableFuture<MetricQueryServiceGateway> queryServiceGatewayFuture = queryServiceRetriever.retrieveService(queryServicePath); + + queryServiceGatewayFuture.whenCompleteAsync( + (MetricQueryServiceGateway queryServiceGateway, Throwable t) -> { + if (t != null) { + LOG.debug("Could not retrieve QueryServiceGateway.", t); + } else { + queryMetrics(queryServiceGateway); } - }, ctx); - logErrorOnFailure(metricQueryFuture, "Fetching metrics failed."); - } - - private void addMetrics(Object result) { - MetricDumpSerialization.MetricSerializationResult data = (MetricDumpSerialization.MetricSerializationResult) result; - List<MetricDump> dumpedMetrics = deserializer.deserialize(data); - for (MetricDump metric : dumpedMetrics) { - metrics.add(metric); - } + }, + executor); } /** - * Helper class that allows mocking of the answer. - */ - static class BasicGateway { - private final ActorRef actor; - - private BasicGateway(ActorRef actor) { - this.actor = actor; - } - - /** - * Sends a message asynchronously and returns its response. The response to the message is - * returned as a future. - * - * @param message Message to be sent - * @param timeout Timeout until the Future is completed with an AskTimeoutException - * @return Future which contains the response to the sent message - */ - public Future<Object> ask(Object message, FiniteDuration timeout) { - return Patterns.ask(actor, message, new Timeout(timeout)); - } + * Query the metrics from the given QueryServiceGateway. + * + * @param queryServiceGateway to query for metrics + */ + private void queryMetrics(final MetricQueryServiceGateway queryServiceGateway) { + queryServiceGateway + .queryMetrics(timeout) + .whenCompleteAsync( + (MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> { + if (t != null) { + LOG.debug("Fetching metrics failed.", t); + } else { + List<MetricDump> dumpedMetrics = deserializer.deserialize(result); + synchronized (metrics) { + for (MetricDump metric : dumpedMetrics) { + metrics.add(metric); + } + } + } + }, + executor); } } http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java index 7cd2932..b3ce135 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java @@ -158,25 +158,20 @@ public class WebFrontendITCase extends TestLogger { } @Test - public void getTaskmanagers() { - try { - String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/"); + public void getTaskmanagers() throws Exception { + String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/"); - ObjectMapper mapper = new ObjectMapper(); - JsonNode parsed = mapper.readTree(json); - ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers"); + ObjectMapper mapper = new ObjectMapper(); + JsonNode parsed = mapper.readTree(json); + ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers"); - assertNotNull(taskManagers); - assertEquals(cluster.numTaskManagers(), taskManagers.size()); + assertNotNull(taskManagers); + assertEquals(cluster.numTaskManagers(), taskManagers.size()); - JsonNode taskManager = taskManagers.get(0); - assertNotNull(taskManager); - assertEquals(NUM_SLOTS, taskManager.get("slotsNumber").asInt()); - assertTrue(taskManager.get("freeSlots").asInt() <= NUM_SLOTS); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + JsonNode taskManager = taskManagers.get(0); + assertNotNull(taskManager); + assertEquals(NUM_SLOTS, taskManager.get("slotsNumber").asInt()); + assertTrue(taskManager.get("freeSlots").asInt() <= NUM_SLOTS); } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java index fe16445..5829d1c 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.webmonitor; +import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.WebOptions; @@ -27,12 +28,15 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.leaderelection.TestingListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.ZooKeeperTestUtils; import org.apache.flink.runtime.webmonitor.files.MimeTypes; +import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever; +import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever; import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient; import org.apache.flink.util.TestLogger; @@ -44,11 +48,12 @@ import org.apache.curator.test.TestingServer; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import org.powermock.reflect.Whitebox; import java.io.File; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Objects; +import java.util.Optional; import java.util.Scanner; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -71,7 +76,9 @@ public class WebRuntimeMonitorITCase extends TestLogger { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); - private static final FiniteDuration TestTimeout = new FiniteDuration(2, TimeUnit.MINUTES); + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(2L, TimeUnit.MINUTES); + + private static final Time TIMEOUT = Time.milliseconds(TEST_TIMEOUT.toMillis()); private final String mainResourcesPath = getClass().getResource("/web").getPath(); @@ -80,7 +87,7 @@ public class WebRuntimeMonitorITCase extends TestLogger { */ @Test public void testStandaloneWebRuntimeMonitor() throws Exception { - final Deadline deadline = TestTimeout.fromNow(); + final Deadline deadline = TEST_TIMEOUT.fromNow(); TestingCluster flink = null; WebRuntimeMonitor webMonitor = null; @@ -89,7 +96,7 @@ public class WebRuntimeMonitorITCase extends TestLogger { // Flink w/o a web monitor flink = new TestingCluster(new Configuration()); flink.start(true); - webMonitor = startWebRuntimeMonitor(flink); + webMonitor = startWebRuntimeMonitor(flink, TIMEOUT); try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort())) { String expected = new Scanner(new File(mainResourcesPath + "/index.html")) @@ -129,10 +136,11 @@ public class WebRuntimeMonitorITCase extends TestLogger { */ @Test public void testRedirectToLeader() throws Exception { - final Deadline deadline = TestTimeout.fromNow(); + final Deadline deadline = TEST_TIMEOUT.fromNow(); ActorSystem[] jobManagerSystem = new ActorSystem[2]; WebRuntimeMonitor[] webMonitor = new WebRuntimeMonitor[2]; + AkkaJobManagerRetriever[] jobManagerRetrievers = new AkkaJobManagerRetriever[2]; HighAvailabilityServices highAvailabilityServices = null; try (TestingServer zooKeeper = new TestingServer()) { @@ -157,11 +165,16 @@ public class WebRuntimeMonitorITCase extends TestLogger { } for (int i = 0; i < webMonitor.length; i++) { + jobManagerRetrievers[i] = new AkkaJobManagerRetriever(jobManagerSystem[i], TIMEOUT); + webMonitor[i] = new WebRuntimeMonitor( config, highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), highAvailabilityServices.createBlobStore(), - jobManagerSystem[i]); + jobManagerRetrievers[i], + new AkkaQueryServiceRetriever(jobManagerSystem[i], TIMEOUT), + TIMEOUT, + TestingUtils.defaultExecutor()); } ActorRef[] jobManager = new ActorRef[2]; @@ -196,22 +209,18 @@ public class WebRuntimeMonitorITCase extends TestLogger { int followerIndex = (leaderIndex + 1) % 2; ActorSystem leadingSystem = jobManagerSystem[leaderIndex]; - ActorSystem followerSystem = jobManagerSystem[followerIndex]; WebMonitor leadingWebMonitor = webMonitor[leaderIndex]; WebMonitor followerWebMonitor = webMonitor[followerIndex]; // For test stability reason we have to wait until we are sure that both leader // listeners have been notified. - JobManagerRetriever leadingRetriever = Whitebox - .getInternalState(leadingWebMonitor, "retriever"); - - JobManagerRetriever followerRetriever = Whitebox - .getInternalState(followerWebMonitor, "retriever"); + AkkaJobManagerRetriever leadingRetriever = jobManagerRetrievers[leaderIndex]; + AkkaJobManagerRetriever followerRetriever = jobManagerRetrievers[followerIndex]; // Wait for the initial notifications - waitForLeaderNotification(leadingSystem, jobManager[leaderIndex], leadingRetriever, deadline); - waitForLeaderNotification(leadingSystem, jobManager[leaderIndex], followerRetriever, deadline); + waitForLeaderNotification(jobManager[leaderIndex].path().toString(), leadingRetriever, deadline); + waitForLeaderNotification(AkkaUtils.getAkkaURL(leadingSystem, jobManager[leaderIndex]), followerRetriever, deadline); try ( HttpTestClient leaderClient = new HttpTestClient( @@ -241,7 +250,7 @@ public class WebRuntimeMonitorITCase extends TestLogger { leadingSystem.shutdown(); // Wait for the notification of the follower - waitForLeaderNotification(followerSystem, jobManager[followerIndex], followerRetriever, deadline); + waitForLeaderNotification(jobManager[followerIndex].path().toString(), followerRetriever, deadline); // Same request to the new leader followingClient.sendGetRequest("index.html", deadline.timeLeft()); @@ -282,7 +291,7 @@ public class WebRuntimeMonitorITCase extends TestLogger { @Test public void testLeaderNotAvailable() throws Exception { - final Deadline deadline = TestTimeout.fromNow(); + final Deadline deadline = TEST_TIMEOUT.fromNow(); ActorSystem actorSystem = null; WebRuntimeMonitor webRuntimeMonitor = null; @@ -305,7 +314,10 @@ public class WebRuntimeMonitorITCase extends TestLogger { config, mock(LeaderRetrievalService.class), mock(BlobView.class), - actorSystem); + new AkkaJobManagerRetriever(actorSystem, TIMEOUT), + new AkkaQueryServiceRetriever(actorSystem, TIMEOUT), + TIMEOUT, + TestingUtils.defaultExecutor()); webRuntimeMonitor.start("akka://schmakka"); @@ -343,7 +355,7 @@ public class WebRuntimeMonitorITCase extends TestLogger { */ @Test public void testNoEscape() throws Exception { - final Deadline deadline = TestTimeout.fromNow(); + final Deadline deadline = TEST_TIMEOUT.fromNow(); TestingCluster flink = null; WebRuntimeMonitor webMonitor = null; @@ -351,7 +363,7 @@ public class WebRuntimeMonitorITCase extends TestLogger { try { flink = new TestingCluster(new Configuration()); flink.start(true); - webMonitor = startWebRuntimeMonitor(flink); + webMonitor = startWebRuntimeMonitor(flink, TIMEOUT); try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort())) { String expectedIndex = new Scanner(new File(mainResourcesPath + "/index.html")) @@ -405,7 +417,7 @@ public class WebRuntimeMonitorITCase extends TestLogger { */ @Test public void testNoCopyFromJar() throws Exception { - final Deadline deadline = TestTimeout.fromNow(); + final Deadline deadline = TEST_TIMEOUT.fromNow(); TestingCluster flink = null; WebRuntimeMonitor webMonitor = null; @@ -413,7 +425,7 @@ public class WebRuntimeMonitorITCase extends TestLogger { try { flink = new TestingCluster(new Configuration()); flink.start(true); - webMonitor = startWebRuntimeMonitor(flink); + webMonitor = startWebRuntimeMonitor(flink, TIMEOUT); try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort())) { String expectedIndex = new Scanner(new File(mainResourcesPath + "/index.html")) @@ -459,7 +471,8 @@ public class WebRuntimeMonitorITCase extends TestLogger { } private WebRuntimeMonitor startWebRuntimeMonitor( - TestingCluster flink) throws Exception { + TestingCluster flink, + Time timeout) throws Exception { ActorSystem jmActorSystem = flink.jobManagerActorSystems().get().head(); ActorRef jmActor = flink.jobManagerActors().get().head(); @@ -482,7 +495,10 @@ public class WebRuntimeMonitorITCase extends TestLogger { config, highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), highAvailabilityServices.createBlobStore(), - jmActorSystem); + new AkkaJobManagerRetriever(jmActorSystem, timeout), + new AkkaQueryServiceRetriever(jmActorSystem, timeout), + timeout, + TestingUtils.defaultExecutor()); webMonitor.start(jobManagerAddress); flink.waitForActorsToBeAlive(); @@ -492,17 +508,14 @@ public class WebRuntimeMonitorITCase extends TestLogger { // ------------------------------------------------------------------------ private void waitForLeaderNotification( - ActorSystem system, - ActorRef expectedLeader, - JobManagerRetriever retriever, + String expectedJobManagerURL, + AkkaJobManagerRetriever retriever, Deadline deadline) throws Exception { - String expectedJobManagerUrl = AkkaUtils.getAkkaURL(system, expectedLeader); - while (deadline.hasTimeLeft()) { - ActorRef leaderRef = retriever.awaitJobManagerGatewayAndWebPort()._1().actor(); + Optional<JobManagerGateway> optJobManagerGateway = retriever.getJobManagerGatewayNow(); - if (AkkaUtils.getAkkaURL(system, leaderRef).equals(expectedJobManagerUrl)) { + if (optJobManagerGateway.isPresent() && Objects.equals(expectedJobManagerURL, optJobManagerGateway.get().getAddress())) { return; } else { http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java index 19e8a49..865385f 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java @@ -18,20 +18,18 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.api.common.time.Time; + import org.junit.Assert; import org.junit.Test; -import java.util.concurrent.TimeUnit; - -import scala.concurrent.duration.FiniteDuration; - /** * Tests for the ClusterOverviewHandler. */ public class ClusterOverviewHandlerTest { @Test public void testGetPaths() { - ClusterOverviewHandler handler = new ClusterOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS)); + ClusterOverviewHandler handler = new ClusterOverviewHandler(Time.seconds(0L)); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/overview", paths[0]); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java index e108774..ea26f5d 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java @@ -18,20 +18,18 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.api.common.time.Time; + import org.junit.Assert; import org.junit.Test; -import java.util.concurrent.TimeUnit; - -import scala.concurrent.duration.FiniteDuration; - /** * Tests for the CurrentJobIdsHandler. */ public class CurrentJobIdsHandlerTest { @Test public void testGetPaths() { - CurrentJobIdsHandler handler = new CurrentJobIdsHandler(new FiniteDuration(0, TimeUnit.SECONDS)); + CurrentJobIdsHandler handler = new CurrentJobIdsHandler(Time.seconds(0L)); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs", paths[0]); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java index 9f3d362..64360d3 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.messages.webmonitor.JobDetails; @@ -35,9 +36,6 @@ import org.junit.Test; import java.io.IOException; import java.io.StringWriter; import java.util.Collection; -import java.util.concurrent.TimeUnit; - -import scala.concurrent.duration.FiniteDuration; /** * Tests for the CurrentJobsOverviewHandler. @@ -68,17 +66,17 @@ public class CurrentJobsOverviewHandlerTest { @Test public void testGetPaths() { - CurrentJobsOverviewHandler handlerAll = new CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), true, true); + CurrentJobsOverviewHandler handlerAll = new CurrentJobsOverviewHandler(Time.seconds(0L), true, true); String[] pathsAll = handlerAll.getPaths(); Assert.assertEquals(1, pathsAll.length); Assert.assertEquals("/joboverview", pathsAll[0]); - CurrentJobsOverviewHandler handlerRunning = new CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), true, false); + CurrentJobsOverviewHandler handlerRunning = new CurrentJobsOverviewHandler(Time.seconds(0L), true, false); String[] pathsRunning = handlerRunning.getPaths(); Assert.assertEquals(1, pathsRunning.length); Assert.assertEquals("/joboverview/running", pathsRunning[0]); - CurrentJobsOverviewHandler handlerCompleted = new CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), false, true); + CurrentJobsOverviewHandler handlerCompleted = new CurrentJobsOverviewHandler(Time.seconds(0L), false, true); String[] pathsCompleted = handlerCompleted.getPaths(); Assert.assertEquals(1, pathsCompleted.length); Assert.assertEquals("/joboverview/completed", pathsCompleted[0]); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java index 4ddddca..ac8d934 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtilsTest.java @@ -18,41 +18,54 @@ package org.apache.flink.runtime.webmonitor.handlers; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.DummyActorGateway; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.util.TestLogger; import org.junit.Assert; import org.junit.Test; -import scala.Tuple2; +import java.util.concurrent.CompletableFuture; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Tests for the HandlerRedirectUtils. */ -public class HandlerRedirectUtilsTest { +public class HandlerRedirectUtilsTest extends TestLogger { private static final String localJobManagerAddress = "akka.tcp://flink@127.0.0.1:1234/user/foobar"; - private static final String remoteURL = "127.0.0.2:1235"; + private static final String remoteHostname = "127.0.0.2"; + private static final int webPort = 1235; + private static final String remoteURL = remoteHostname + ':' + webPort; private static final String remotePath = "akka.tcp://flink@" + remoteURL + "/user/jobmanager"; @Test public void testGetRedirectAddressWithLocalAkkaPath() throws Exception { - ActorGateway leaderGateway = new DummyActorGateway("akka://flink/user/foobar"); - - Tuple2<ActorGateway, Integer> leader = new Tuple2<>(leaderGateway, 1235); + JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); + when(jobManagerGateway.getAddress()).thenReturn("akka://flink/user/foobar"); - String redirectingAddress = HandlerRedirectUtils.getRedirectAddress(localJobManagerAddress, leader); + String redirectingAddress = HandlerRedirectUtils.getRedirectAddress( + localJobManagerAddress, + jobManagerGateway, + Time.seconds(3L)); Assert.assertNull(redirectingAddress); } @Test public void testGetRedirectAddressWithRemoteAkkaPath() throws Exception { - ActorGateway leaderGateway = new DummyActorGateway(remotePath); - - Tuple2<ActorGateway, Integer> leader = new Tuple2<>(leaderGateway, 1235); + JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); + when(jobManagerGateway.getAddress()).thenReturn(remotePath); + when(jobManagerGateway.getHostname()).thenReturn(remoteHostname); + when(jobManagerGateway.requestWebPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(webPort)); - String redirectingAddress = HandlerRedirectUtils.getRedirectAddress(localJobManagerAddress, leader); + String redirectingAddress = HandlerRedirectUtils.getRedirectAddress( + localJobManagerAddress, + jobManagerGateway, + Time.seconds(3L)); Assert.assertEquals(remoteURL, redirectingAddress); } http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java index fcbfa02..82aa87a 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java @@ -18,6 +18,9 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; + import org.junit.Assert; import org.junit.Test; @@ -27,7 +30,7 @@ import org.junit.Test; public class JarRunHandlerTest { @Test public void testGetPaths() { - JarRunHandler handler = new JarRunHandler(null, null, null); + JarRunHandler handler = new JarRunHandler(null, Time.seconds(0L), new Configuration()); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jars/:jarid/run", paths[0]); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java index 25fca9b..fe55f51 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; @@ -31,6 +32,8 @@ import org.junit.Test; import java.io.IOException; import java.util.Collection; +import static org.mockito.Mockito.mock; + /** * Tests for the JobAccumulatorsHandler. */ @@ -51,7 +54,7 @@ public class JobAccumulatorsHandlerTest { @Test public void testGetPaths() { - JobAccumulatorsHandler handler = new JobAccumulatorsHandler(null); + JobAccumulatorsHandler handler = new JobAccumulatorsHandler(mock(ExecutionGraphHolder.class)); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/accumulators", paths[0]); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java index ed54000..98d9353 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.runtime.testingUtils.TestingUtils; + import com.google.common.collect.Lists; import org.junit.Assert; import org.junit.Test; @@ -30,7 +32,7 @@ import java.util.List; public class JobCancellationHandlerTest { @Test public void testGetPaths() { - JobCancellationHandler handler = new JobCancellationHandler(); + JobCancellationHandler handler = new JobCancellationHandler(TestingUtils.TIMEOUT()); String[] paths = handler.getPaths(); Assert.assertEquals(2, paths.length); List<String> pathsList = Lists.newArrayList(paths); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java index 64a07c8..b48ee66 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java @@ -19,21 +19,20 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint; -import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; -import akka.dispatch.ExecutionContexts$; -import akka.dispatch.Futures; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.Assert; @@ -45,15 +44,14 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; - -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; -import scala.concurrent.impl.Promise; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -62,13 +60,13 @@ import static org.mockito.Mockito.when; /** * Tests for the JobCancellationWithSavepointHandler. */ -public class JobCancellationWithSavepointHandlersTest { +public class JobCancellationWithSavepointHandlersTest extends TestLogger { - private static final ExecutionContext EC = ExecutionContexts$.MODULE$.fromExecutor(Executors.directExecutor()); + private static final Executor executor = Executors.directExecutor(); @Test public void testGetPaths() { - JobCancellationWithSavepointHandlers handler = new JobCancellationWithSavepointHandlers(mock(ExecutionGraphHolder.class), EC); + JobCancellationWithSavepointHandlers handler = new JobCancellationWithSavepointHandlers(mock(ExecutionGraphHolder.class), executor); JobCancellationWithSavepointHandlers.TriggerHandler triggerHandler = handler.getTriggerHandler(); String[] triggerPaths = triggerHandler.getPaths(); @@ -94,25 +92,23 @@ public class JobCancellationWithSavepointHandlersTest { ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class); ExecutionGraph graph = mock(ExecutionGraph.class); CheckpointCoordinator coord = mock(CheckpointCoordinator.class); - when(holder.getExecutionGraph(eq(jobId), any(ActorGateway.class))).thenReturn(graph); + when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(Optional.of(graph)); when(graph.getCheckpointCoordinator()).thenReturn(coord); when(coord.getCheckpointTimeout()).thenReturn(timeout); - JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, EC); + JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor); JobCancellationWithSavepointHandlers.TriggerHandler handler = handlers.getTriggerHandler(); Map<String, String> params = new HashMap<>(); params.put("jobid", jobId.toString()); params.put("targetDirectory", "placeholder"); - ActorGateway jobManager = mock(ActorGateway.class); - - Future<Object> future = Futures.successful((Object) new CancellationSuccess(jobId, null)); - when(jobManager.ask(any(Object.class), any(FiniteDuration.class))).thenReturn(future); + JobManagerGateway jobManager = mock(JobManagerGateway.class); + when(jobManager.cancelJobWithSavepoint(eq(jobId), anyString(), any(Time.class))).thenReturn(CompletableFuture.completedFuture("foobar")); - handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager); + handler.handleRequest(params, Collections.emptyMap(), jobManager); - verify(jobManager).ask(any(CancelJobWithSavepoint.class), eq(FiniteDuration.apply(timeout, "ms"))); + verify(jobManager).cancelJobWithSavepoint(eq(jobId), anyString(), any(Time.class)); } /** @@ -125,36 +121,34 @@ public class JobCancellationWithSavepointHandlersTest { ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class); ExecutionGraph graph = mock(ExecutionGraph.class); CheckpointCoordinator coord = mock(CheckpointCoordinator.class); - when(holder.getExecutionGraph(eq(jobId), any(ActorGateway.class))).thenReturn(graph); + when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(Optional.of(graph)); when(graph.getCheckpointCoordinator()).thenReturn(coord); when(coord.getCheckpointTimeout()).thenReturn(timeout); - JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, EC, "the-default-directory"); + JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor, "the-default-directory"); JobCancellationWithSavepointHandlers.TriggerHandler handler = handlers.getTriggerHandler(); Map<String, String> params = new HashMap<>(); params.put("jobid", jobId.toString()); - ActorGateway jobManager = mock(ActorGateway.class); - - Future<Object> future = Futures.successful((Object) new CancellationSuccess(jobId, null)); - when(jobManager.ask(any(Object.class), any(FiniteDuration.class))).thenReturn(future); + JobManagerGateway jobManager = mock(JobManagerGateway.class); + when(jobManager.cancelJobWithSavepoint(eq(jobId), anyString(), any(Time.class))).thenReturn(CompletableFuture.completedFuture("foobar")); // 1. Use targetDirectory path param params.put("targetDirectory", "custom-directory"); handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager); - verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "custom-directory")), eq(FiniteDuration.apply(timeout, "ms"))); + verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class)); // 2. Use default params.remove("targetDirectory"); handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager); - verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "the-default-directory")), eq(FiniteDuration.apply(timeout, "ms"))); + verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("the-default-directory"), any(Time.class)); // 3. Throw Exception - handlers = new JobCancellationWithSavepointHandlers(holder, EC, null); + handlers = new JobCancellationWithSavepointHandlers(holder, executor, null); handler = handlers.getTriggerHandler(); try { @@ -175,10 +169,10 @@ public class JobCancellationWithSavepointHandlersTest { ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class); ExecutionGraph graph = mock(ExecutionGraph.class); CheckpointCoordinator coord = mock(CheckpointCoordinator.class); - when(holder.getExecutionGraph(eq(jobId), any(ActorGateway.class))).thenReturn(graph); + when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(Optional.of(graph)); when(graph.getCheckpointCoordinator()).thenReturn(coord); - JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, EC); + JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor); JobCancellationWithSavepointHandlers.TriggerHandler trigger = handlers.getTriggerHandler(); JobCancellationWithSavepointHandlers.InProgressHandler progress = handlers.getInProgressHandler(); @@ -186,16 +180,16 @@ public class JobCancellationWithSavepointHandlersTest { params.put("jobid", jobId.toString()); params.put("targetDirectory", "custom-directory"); - ActorGateway jobManager = mock(ActorGateway.class); + JobManagerGateway jobManager = mock(JobManagerGateway.class); // Successful - Promise<Object> promise = new Promise.DefaultPromise<>(); - when(jobManager.ask(any(Object.class), any(FiniteDuration.class))).thenReturn(promise); + CompletableFuture<String> successfulCancelWithSavepoint = new CompletableFuture<>(); + when(jobManager.cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class))).thenReturn(successfulCancelWithSavepoint); // Trigger FullHttpResponse response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager); - verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "custom-directory")), any(FiniteDuration.class)); + verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class)); String location = String.format("/jobs/%s/cancel-with-savepoint/in-progress/1", jobId); @@ -226,7 +220,7 @@ public class JobCancellationWithSavepointHandlersTest { assertEquals(location, root.get("location").asText()); // Only single actual request - verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "custom-directory")), any(FiniteDuration.class)); + verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class)); // Query progress params.put("requestId", "1"); @@ -243,7 +237,7 @@ public class JobCancellationWithSavepointHandlersTest { assertEquals("1", root.get("request-id").asText()); // Complete - promise.success(new CancellationSuccess(jobId, "_path-savepoint_")); + successfulCancelWithSavepoint.complete("_path-savepoint_"); response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager); @@ -301,10 +295,10 @@ public class JobCancellationWithSavepointHandlersTest { ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class); ExecutionGraph graph = mock(ExecutionGraph.class); CheckpointCoordinator coord = mock(CheckpointCoordinator.class); - when(holder.getExecutionGraph(eq(jobId), any(ActorGateway.class))).thenReturn(graph); + when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(Optional.of(graph)); when(graph.getCheckpointCoordinator()).thenReturn(coord); - JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, EC); + JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor); JobCancellationWithSavepointHandlers.TriggerHandler trigger = handlers.getTriggerHandler(); JobCancellationWithSavepointHandlers.InProgressHandler progress = handlers.getInProgressHandler(); @@ -312,15 +306,15 @@ public class JobCancellationWithSavepointHandlersTest { params.put("jobid", jobId.toString()); params.put("targetDirectory", "custom-directory"); - ActorGateway jobManager = mock(ActorGateway.class); + JobManagerGateway jobManager = mock(JobManagerGateway.class); // Successful - Future<Object> future = Futures.failed(new Exception("Test Exception")); - when(jobManager.ask(any(Object.class), any(FiniteDuration.class))).thenReturn(future); + CompletableFuture<String> unsuccessfulCancelWithSavepoint = FutureUtils.completedExceptionally(new Exception("Test Exception")); + when(jobManager.cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class))).thenReturn(unsuccessfulCancelWithSavepoint); // Trigger trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager); - verify(jobManager).ask(eq(new CancelJobWithSavepoint(jobId, "custom-directory")), any(FiniteDuration.class)); + verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class)); // Query progress params.put("requestId", "1"); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java index ad9da6b..104b0a3 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.api.common.ArchivedExecutionConfig; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; @@ -32,6 +33,8 @@ import java.io.IOException; import java.util.Collection; import java.util.Map; +import static org.mockito.Mockito.mock; + /** * Tests for the JobConfigHandler. */ @@ -52,7 +55,7 @@ public class JobConfigHandlerTest { @Test public void testGetPaths() { - JobConfigHandler handler = new JobConfigHandler(null); + JobConfigHandler handler = new JobConfigHandler(mock(ExecutionGraphHolder.class)); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/config", paths[0]); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java index d830707..f3f5943 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.executiongraph.IOMetrics; import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; @@ -39,6 +40,8 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; +import static org.mockito.Mockito.mock; + /** * Tests for the JobDetailsHandler. */ @@ -64,7 +67,7 @@ public class JobDetailsHandlerTest { @Test public void testGetPaths() { - JobDetailsHandler handler = new JobDetailsHandler(null, null); + JobDetailsHandler handler = new JobDetailsHandler(mock(ExecutionGraphHolder.class), null); String[] paths = handler.getPaths(); Assert.assertEquals(2, paths.length); List<String> pathsList = Lists.newArrayList(paths); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java index 6016d01..f54ab06 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; @@ -35,6 +36,8 @@ import org.junit.Test; import java.io.IOException; import java.util.Collection; +import static org.mockito.Mockito.mock; + /** * Tests for the JobExceptionsHandler. */ @@ -55,7 +58,7 @@ public class JobExceptionsHandlerTest { @Test public void testGetPaths() { - JobExceptionsHandler handler = new JobExceptionsHandler(null); + JobExceptionsHandler handler = new JobExceptionsHandler(mock(ExecutionGraphHolder.class)); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/exceptions", paths[0]); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java index a5ea2b3..17b4c44 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; @@ -28,6 +29,8 @@ import org.junit.Test; import java.util.Collection; +import static org.mockito.Mockito.mock; + /** * Tests for the JobPlanHandler. */ @@ -48,7 +51,7 @@ public class JobPlanHandlerTest { @Test public void testGetPaths() { - JobPlanHandler handler = new JobPlanHandler(null); + JobPlanHandler handler = new JobPlanHandler(mock(ExecutionGraphHolder.class)); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/plan", paths[0]); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java index cac0b10..ee47ee9 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java @@ -18,6 +18,9 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; + import com.google.common.collect.Lists; import org.junit.Assert; import org.junit.Test; @@ -27,10 +30,10 @@ import java.util.List; /** * Tests for the JobStoppingHandler. */ -public class JobStoppingHandlerTest { +public class JobStoppingHandlerTest extends TestLogger { @Test public void testGetPaths() { - JobStoppingHandler handler = new JobStoppingHandler(); + JobStoppingHandler handler = new JobStoppingHandler(TestingUtils.TIMEOUT()); String[] paths = handler.getPaths(); Assert.assertEquals(2, paths.length); List<String> pathsList = Lists.newArrayList(paths); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java index c57aa09..b7af323 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; @@ -33,6 +34,8 @@ import org.junit.Test; import java.io.IOException; import java.util.Collection; +import static org.mockito.Mockito.mock; + /** * Tests for the JobVertexAccumulatorsHandler. */ @@ -54,7 +57,7 @@ public class JobVertexAccumulatorsHandlerTest { @Test public void testGetPaths() { - JobVertexAccumulatorsHandler handler = new JobVertexAccumulatorsHandler(null); + JobVertexAccumulatorsHandler handler = new JobVertexAccumulatorsHandler(mock(ExecutionGraphHolder.class)); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/accumulators", paths[0]); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java index 8985d89..d2ac0d6 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java @@ -46,7 +46,7 @@ import static org.mockito.Mockito.when; public class JobVertexBackPressureHandlerTest { @Test public void testGetPaths() { - JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(null, mock(BackPressureStatsTracker.class), 0); + JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(mock(ExecutionGraphHolder.class), mock(BackPressureStatsTracker.class), 0); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/backpressure", paths[0]); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java index bde6a84..bc4fe9c 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; @@ -35,6 +36,8 @@ import org.junit.Test; import java.io.IOException; import java.util.Collection; +import static org.mockito.Mockito.mock; + /** * Tests for the JobVertexDetailsHandler. */ @@ -56,7 +59,7 @@ public class JobVertexDetailsHandlerTest { @Test public void testGetPaths() { - JobVertexDetailsHandler handler = new JobVertexDetailsHandler(null, null); + JobVertexDetailsHandler handler = new JobVertexDetailsHandler(mock(ExecutionGraphHolder.class), null); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid", paths[0]); http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java index 8954844..d5d877a 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.executiongraph.IOMetrics; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; @@ -36,6 +37,8 @@ import org.junit.Test; import java.io.IOException; import java.util.Collection; +import static org.mockito.Mockito.mock; + /** * Tests for the JobVertexTaskManagersHandler. */ @@ -58,7 +61,7 @@ public class JobVertexTaskManagersHandlerTest { @Test public void testGetPaths() { - JobVertexTaskManagersHandler handler = new JobVertexTaskManagersHandler(null, null); + JobVertexTaskManagersHandler handler = new JobVertexTaskManagersHandler(mock(ExecutionGraphHolder.class), null); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/taskmanagers", paths[0]);