This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ba6b23bc13cbe391a04089856cc6a64f4555f4f9
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Thu Apr 23 10:38:15 2020 +0200

    [FLINK-17308] Extract ExecutionGraphCache interface and rename impl into 
DefaultExecutionGraphCache
    
    This commit extracts the ExecutionGraphCache interface from the 
implementation and renames
    the latter into DefaultExecutionGraphCache. Moreover, it introduces the 
NoOpExecutionGraphCache
    implementation which is used for the DocumentingDispatcherRestEndpoint.
---
 .../flink/runtime/rest/RestEndpointFactory.java    |   3 +-
 ...hCache.java => DefaultExecutionGraphCache.java} |  27 +----
 .../rest/handler/legacy/ExecutionGraphCache.java   | 128 ++-------------------
 .../rest/handler/job/JobConfigHandlerTest.java     |   4 +-
 .../rest/handler/job/JobExceptionsHandlerTest.java |   4 +-
 .../SubtaskCurrentAttemptDetailsHandlerTest.java   |   4 +-
 ...askExecutionAttemptAccumulatorsHandlerTest.java |   4 +-
 .../SubtaskExecutionAttemptDetailsHandlerTest.java |   4 +-
 .../metrics/JobVertexWatermarksHandlerTest.java    |   4 +-
 ...st.java => DefaultExecutionGraphCacheTest.java} |  18 +--
 .../util/DocumentingDispatcherRestEndpoint.java    |   3 +-
 .../runtime/rest/util/NoOpExecutionGraphCache.java |  50 ++++++++
 12 files changed, 89 insertions(+), 164 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
index d148c24..b2e21cf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -52,7 +53,7 @@ public interface RestEndpointFactory<T extends 
RestfulGateway> {
                FatalErrorHandler fatalErrorHandler) throws Exception;
 
        static ExecutionGraphCache 
createExecutionGraphCache(RestHandlerConfiguration restConfiguration) {
-               return new ExecutionGraphCache(
+               return new DefaultExecutionGraphCache(
                        restConfiguration.getTimeout(),
                        
Time.milliseconds(restConfiguration.getRefreshInterval()));
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java
similarity index 83%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java
index f634a62..a01704e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.util.Preconditions;
 
-import java.io.Closeable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
@@ -33,11 +32,9 @@ import java.util.function.Function;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Cache for {@link ArchivedExecutionGraph} which are obtained from the Flink 
cluster. Every cache entry
- * has an associated time to live after which a new request will trigger the 
reloading of the
- * {@link ArchivedExecutionGraph} from the cluster.
+ * Default implementation of {@link ExecutionGraphCache}.
  */
-public class ExecutionGraphCache implements Closeable {
+public class DefaultExecutionGraphCache implements ExecutionGraphCache {
 
        private final Time timeout;
 
@@ -47,7 +44,7 @@ public class ExecutionGraphCache implements Closeable {
 
        private volatile boolean running = true;
 
-       public ExecutionGraphCache(
+       public DefaultExecutionGraphCache(
                        Time timeout,
                        Time timeToLive) {
                this.timeout = checkNotNull(timeout);
@@ -64,22 +61,12 @@ public class ExecutionGraphCache implements Closeable {
                cachedExecutionGraphs.clear();
        }
 
-       /**
-        * Gets the number of cache entries.
-        */
+       @Override
        public int size() {
                return cachedExecutionGraphs.size();
        }
 
-       /**
-        * Gets the {@link AccessExecutionGraph} for the given {@link JobID} 
and caches it. The
-        * {@link AccessExecutionGraph} will be requested again after the 
refresh interval has passed
-        * or if the graph could not be retrieved from the given gateway.
-        *
-        * @param jobId identifying the {@link ArchivedExecutionGraph} to get
-        * @param restfulGateway to request the {@link ArchivedExecutionGraph} 
from
-        * @return Future containing the requested {@link 
ArchivedExecutionGraph}
-        */
+       @Override
        public CompletableFuture<AccessExecutionGraph> getExecutionGraph(JobID 
jobId, RestfulGateway restfulGateway) {
                return getExecutionGraphInternal(jobId, 
restfulGateway).thenApply(Function.identity());
        }
@@ -137,9 +124,7 @@ public class ExecutionGraphCache implements Closeable {
                }
        }
 
-       /**
-        * Perform the cleanup of out dated {@link ExecutionGraphEntry}.
-        */
+       @Override
        public void cleanup() {
                long currentTime = System.currentTimeMillis();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
index f634a62..b8b9489 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,57 +19,24 @@
 package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
-import org.apache.flink.util.Preconditions;
 
 import java.io.Closeable;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Function;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Cache for {@link ArchivedExecutionGraph} which are obtained from the Flink 
cluster. Every cache entry
  * has an associated time to live after which a new request will trigger the 
reloading of the
  * {@link ArchivedExecutionGraph} from the cluster.
  */
-public class ExecutionGraphCache implements Closeable {
-
-       private final Time timeout;
-
-       private final Time timeToLive;
-
-       private final ConcurrentHashMap<JobID, ExecutionGraphEntry> 
cachedExecutionGraphs;
-
-       private volatile boolean running = true;
-
-       public ExecutionGraphCache(
-                       Time timeout,
-                       Time timeToLive) {
-               this.timeout = checkNotNull(timeout);
-               this.timeToLive = checkNotNull(timeToLive);
-
-               cachedExecutionGraphs = new ConcurrentHashMap<>(4);
-       }
-
-       @Override
-       public void close() {
-               running = false;
-
-               // clear all cached AccessExecutionGraphs
-               cachedExecutionGraphs.clear();
-       }
+public interface ExecutionGraphCache extends Closeable {
 
        /**
         * Gets the number of cache entries.
         */
-       public int size() {
-               return cachedExecutionGraphs.size();
-       }
+       int size();
 
        /**
         * Gets the {@link AccessExecutionGraph} for the given {@link JobID} 
and caches it. The
@@ -80,93 +47,16 @@ public class ExecutionGraphCache implements Closeable {
         * @param restfulGateway to request the {@link ArchivedExecutionGraph} 
from
         * @return Future containing the requested {@link 
ArchivedExecutionGraph}
         */
-       public CompletableFuture<AccessExecutionGraph> getExecutionGraph(JobID 
jobId, RestfulGateway restfulGateway) {
-               return getExecutionGraphInternal(jobId, 
restfulGateway).thenApply(Function.identity());
-       }
-
-       private CompletableFuture<ArchivedExecutionGraph> 
getExecutionGraphInternal(JobID jobId, RestfulGateway restfulGateway) {
-               Preconditions.checkState(running, "ExecutionGraphCache is no 
longer running");
-
-               while (true) {
-                       final ExecutionGraphEntry oldEntry = 
cachedExecutionGraphs.get(jobId);
-
-                       final long currentTime = System.currentTimeMillis();
-
-                       if (oldEntry != null && currentTime < 
oldEntry.getTTL()) {
-                               final CompletableFuture<ArchivedExecutionGraph> 
executionGraphFuture = oldEntry.getExecutionGraphFuture();
-                               if 
(!executionGraphFuture.isCompletedExceptionally()) {
-                                       return executionGraphFuture;
-                               }
-                               // otherwise it must be completed exceptionally
-                       }
-
-                       final ExecutionGraphEntry newEntry = new 
ExecutionGraphEntry(currentTime + timeToLive.toMilliseconds());
-
-                       final boolean successfulUpdate;
-
-                       if (oldEntry == null) {
-                               successfulUpdate = 
cachedExecutionGraphs.putIfAbsent(jobId, newEntry) == null;
-                       } else {
-                               successfulUpdate = 
cachedExecutionGraphs.replace(jobId, oldEntry, newEntry);
-                               // cancel potentially outstanding futures
-                               
oldEntry.getExecutionGraphFuture().cancel(false);
-                       }
-
-                       if (successfulUpdate) {
-                               final CompletableFuture<ArchivedExecutionGraph> 
executionGraphFuture = restfulGateway.requestJob(jobId, timeout);
-
-                               executionGraphFuture.whenComplete(
-                                       (ArchivedExecutionGraph executionGraph, 
Throwable throwable) -> {
-                                               if (throwable != null) {
-                                                       
newEntry.getExecutionGraphFuture().completeExceptionally(throwable);
-
-                                                       // remove exceptionally 
completed entry because it doesn't help
-                                                       
cachedExecutionGraphs.remove(jobId, newEntry);
-                                               } else {
-                                                       
newEntry.getExecutionGraphFuture().complete(executionGraph);
-                                               }
-                                       });
-
-                               if (!running) {
-                                       // delete newly added entry in case of 
a concurrent stopping operation
-                                       cachedExecutionGraphs.remove(jobId, 
newEntry);
-                               }
-
-                               return newEntry.getExecutionGraphFuture();
-                       }
-               }
-       }
+       CompletableFuture<AccessExecutionGraph> getExecutionGraph(JobID jobId, 
RestfulGateway restfulGateway);
 
        /**
-        * Perform the cleanup of out dated {@link ExecutionGraphEntry}.
+        * Perform the cleanup of out dated cache entries.
         */
-       public void cleanup() {
-               long currentTime = System.currentTimeMillis();
-
-               // remove entries which have exceeded their time to live
-               cachedExecutionGraphs.values().removeIf(
-                       (ExecutionGraphEntry entry) -> currentTime >= 
entry.getTTL());
-       }
+       void cleanup();
 
        /**
-        * Wrapper containing the current execution graph and it's time to live 
(TTL).
+        * Closes the execution graph cache.
         */
-       private static final class ExecutionGraphEntry {
-               private final long ttl;
-
-               private final CompletableFuture<ArchivedExecutionGraph> 
executionGraphFuture;
-
-               ExecutionGraphEntry(long ttl) {
-                       this.ttl = ttl;
-                       this.executionGraphFuture = new CompletableFuture<>();
-               }
-
-               public long getTTL() {
-                       return ttl;
-               }
-
-               public CompletableFuture<ArchivedExecutionGraph> 
getExecutionGraphFuture() {
-                       return executionGraphFuture;
-               }
-       }
+       @Override
+       void close();
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandlerTest.java
index 1dcd2b2..d75ed0f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandlerTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.HandlerRequestException;
-import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionConfigBuilder;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
@@ -57,7 +57,7 @@ public class JobConfigHandlerTest extends TestLogger {
                        TestingUtils.TIMEOUT(),
                        Collections.emptyMap(),
                        JobConfigHeaders.getInstance(),
-                       new ExecutionGraphCache(TestingUtils.TIMEOUT(), 
TestingUtils.TIMEOUT()),
+                       new DefaultExecutionGraphCache(TestingUtils.TIMEOUT(), 
TestingUtils.TIMEOUT()),
                        TestingUtils.defaultExecutor());
 
                final Map<String, String> globalJobParameters = new HashMap<>();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
index abbd3fc..ac59952 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
@@ -31,7 +31,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.HandlerRequestException;
-import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
@@ -65,7 +65,7 @@ public class JobExceptionsHandlerTest extends TestLogger {
                        TestingUtils.TIMEOUT(),
                        Collections.emptyMap(),
                        JobExceptionsHeaders.getInstance(),
-                       new ExecutionGraphCache(TestingUtils.TIMEOUT(), 
TestingUtils.TIMEOUT()),
+                       new DefaultExecutionGraphCache(TestingUtils.TIMEOUT(), 
TestingUtils.TIMEOUT()),
                        TestingUtils.defaultExecutor());
                final int numExceptions = 20;
                final AccessExecutionGraph archivedExecutionGraph = 
createAccessExecutionGraph(numExceptions);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
index 56da33c..167b25b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
-import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
@@ -125,7 +125,7 @@ public class SubtaskCurrentAttemptDetailsHandlerTest 
extends TestLogger {
                        Time.milliseconds(100),
                        Collections.emptyMap(),
                        SubtaskCurrentAttemptDetailsHeaders.getInstance(),
-                       new ExecutionGraphCache(
+                       new DefaultExecutionGraphCache(
                                restHandlerConfiguration.getTimeout(),
                                
Time.milliseconds(restHandlerConfiguration.getRefreshInterval())),
                        TestingUtils.defaultExecutor(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
index 352b05f..a2bfb7b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -29,7 +29,7 @@ import 
org.apache.flink.runtime.executiongraph.ArchivedExecution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
-import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import 
org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters;
 import 
org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsHeaders;
@@ -65,7 +65,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandlerTest 
extends TestLogger {
                        Time.milliseconds(100L),
                        Collections.emptyMap(),
                        
SubtaskExecutionAttemptAccumulatorsHeaders.getInstance(),
-                       new ExecutionGraphCache(
+                       new DefaultExecutionGraphCache(
                                restHandlerConfiguration.getTimeout(),
                                
Time.milliseconds(restHandlerConfiguration.getRefreshInterval())),
                        TestingUtils.defaultExecutor());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index 59b560f..b2b4654f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
-import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
@@ -128,7 +128,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest 
extends TestLogger {
                        Time.milliseconds(100L),
                        Collections.emptyMap(),
                        SubtaskExecutionAttemptDetailsHeaders.getInstance(),
-                       new ExecutionGraphCache(
+                       new DefaultExecutionGraphCache(
                                restHandlerConfiguration.getTimeout(),
                                
Time.milliseconds(restHandlerConfiguration.getRefreshInterval())),
                        TestingUtils.defaultExecutor(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexWatermarksHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexWatermarksHandlerTest.java
index 2fd08ae..8efbd95 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexWatermarksHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexWatermarksHandlerTest.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
@@ -33,6 +32,7 @@ import 
org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
 import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
 import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
+import org.apache.flink.runtime.rest.util.NoOpExecutionGraphCache;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 
 import org.hamcrest.BaseMatcher;
@@ -87,7 +87,7 @@ public class JobVertexWatermarksHandlerTest {
                        Time.seconds(1),
                        Collections.emptyMap(),
                        metricFetcher,
-                       Mockito.mock(ExecutionGraphCache.class),
+                       NoOpExecutionGraphCache.INSTANCE,
                        Mockito.mock(Executor.class));
 
                final Map<String, String> pathParameters = new HashMap<>();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCacheTest.java
similarity index 93%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCacheTest.java
index 1426ac7..b78248a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCacheTest.java
@@ -59,9 +59,9 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
- * Tests for the {@link ExecutionGraphCache}.
+ * Tests for the {@link DefaultExecutionGraphCache}.
  */
-public class ExecutionGraphCacheTest extends TestLogger {
+public class DefaultExecutionGraphCacheTest extends TestLogger {
 
        private static ArchivedExecutionGraph expectedExecutionGraph;
        private static final JobID expectedJobId = new JobID();
@@ -81,7 +81,7 @@ public class ExecutionGraphCacheTest extends TestLogger {
 
                final CountingRestfulGateway restfulGateway = 
createCountingRestfulGateway(expectedJobId, 
CompletableFuture.completedFuture(expectedExecutionGraph));
 
-               try (ExecutionGraphCache executionGraphCache = new 
ExecutionGraphCache(timeout, timeToLive)) {
+               try (ExecutionGraphCache executionGraphCache = new 
DefaultExecutionGraphCache(timeout, timeToLive)) {
                        CompletableFuture<AccessExecutionGraph> 
accessExecutionGraphFuture = 
executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway);
 
                        assertEquals(expectedExecutionGraph, 
accessExecutionGraphFuture.get());
@@ -107,7 +107,7 @@ public class ExecutionGraphCacheTest extends TestLogger {
                        
CompletableFuture.completedFuture(expectedExecutionGraph),
                        
CompletableFuture.completedFuture(expectedExecutionGraph));
 
-               try (ExecutionGraphCache executionGraphCache = new 
ExecutionGraphCache(timeout, timeToLive)) {
+               try (ExecutionGraphCache executionGraphCache = new 
DefaultExecutionGraphCache(timeout, timeToLive)) {
                        CompletableFuture<AccessExecutionGraph> 
executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, 
restfulGateway);
 
                        assertEquals(expectedExecutionGraph, 
executionGraphFuture.get());
@@ -139,7 +139,7 @@ public class ExecutionGraphCacheTest extends TestLogger {
                        FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(expectedJobId)),
                        
CompletableFuture.completedFuture(expectedExecutionGraph));
 
-               try (ExecutionGraphCache executionGraphCache = new 
ExecutionGraphCache(timeout, timeToLive)) {
+               try (ExecutionGraphCache executionGraphCache = new 
DefaultExecutionGraphCache(timeout, timeToLive)) {
                        CompletableFuture<AccessExecutionGraph> 
executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, 
restfulGateway);
 
                        try {
@@ -158,7 +158,7 @@ public class ExecutionGraphCacheTest extends TestLogger {
 
        /**
         * Tests that cache entries are cleaned up when their TTL has expired 
upon
-        * calling {@link ExecutionGraphCache#cleanup()}.
+        * calling {@link DefaultExecutionGraphCache#cleanup()}.
         */
        @Test
        public void testCacheEntryCleanup() throws Exception {
@@ -183,7 +183,7 @@ public class ExecutionGraphCacheTest extends TestLogger {
                        )
                        .build();
 
-               try (ExecutionGraphCache executionGraphCache = new 
ExecutionGraphCache(timeout, timeToLive)) {
+               try (ExecutionGraphCache executionGraphCache = new 
DefaultExecutionGraphCache(timeout, timeToLive)) {
 
                        CompletableFuture<AccessExecutionGraph> 
executionGraph1Future = executionGraphCache.getExecutionGraph(expectedJobId, 
restfulGateway);
 
@@ -219,7 +219,7 @@ public class ExecutionGraphCacheTest extends TestLogger {
 
                final ExecutorService executor = 
java.util.concurrent.Executors.newFixedThreadPool(numConcurrentAccesses);
 
-               try (ExecutionGraphCache executionGraphCache = new 
ExecutionGraphCache(timeout, timeToLive)) {
+               try (ExecutionGraphCache executionGraphCache = new 
DefaultExecutionGraphCache(timeout, timeToLive)) {
                        for (int i = 0; i < numConcurrentAccesses; i++) {
                                CompletableFuture<AccessExecutionGraph> 
executionGraphFuture = CompletableFuture
                                        .supplyAsync(
@@ -286,7 +286,7 @@ public class ExecutionGraphCacheTest extends TestLogger {
                public SuspendableAccessExecutionGraph(JobID jobId) {
                        super(
                                jobId,
-                               "ExecutionGraphCacheTest",
+                               "DefaultExecutionGraphCacheTest",
                                Collections.emptyMap(),
                                Collections.emptyList(),
                                new long[0],
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
index 9f84bce..92eedec 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
@@ -28,7 +28,6 @@ import 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rest.RestEndpointFactory;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
@@ -86,7 +85,7 @@ public class DocumentingDispatcherRestEndpoint extends 
DispatcherRestEndpoint im
                        Executors.newScheduledThreadPool(1),
                        VoidMetricFetcher.INSTANCE,
                        NoOpElectionService.INSTANCE,
-                       
RestEndpointFactory.createExecutionGraphCache(handlerConfig),
+                       NoOpExecutionGraphCache.INSTANCE,
                        NoOpFatalErrorHandler.INSTANCE);
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/NoOpExecutionGraphCache.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/NoOpExecutionGraphCache.java
new file mode 100644
index 0000000..db4ceec
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/NoOpExecutionGraphCache.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.util;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@link ExecutionGraphCache} which does nothing.
+ */
+public enum NoOpExecutionGraphCache implements ExecutionGraphCache {
+       INSTANCE;
+
+       @Override
+       public int size() {
+               return 0;
+       }
+
+       @Override
+       public CompletableFuture<AccessExecutionGraph> getExecutionGraph(JobID 
jobId, RestfulGateway restfulGateway) {
+               return FutureUtils.completedExceptionally(new 
UnsupportedOperationException("NoOpExecutionGraphCache does not support to 
retrieve execution graphs"));
+       }
+
+       @Override
+       public void cleanup() {}
+
+       @Override
+       public void close() {}
+}

Reply via email to