This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit ba6b23bc13cbe391a04089856cc6a64f4555f4f9 Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Thu Apr 23 10:38:15 2020 +0200 [FLINK-17308] Extract ExecutionGraphCache interface and rename impl into DefaultExecutionGraphCache This commit extracts the ExecutionGraphCache interface from the implementation and renames the latter into DefaultExecutionGraphCache. Moreover, it introduces the NoOpExecutionGraphCache implementation which is used for the DocumentingDispatcherRestEndpoint. --- .../flink/runtime/rest/RestEndpointFactory.java | 3 +- ...hCache.java => DefaultExecutionGraphCache.java} | 27 +---- .../rest/handler/legacy/ExecutionGraphCache.java | 128 ++------------------- .../rest/handler/job/JobConfigHandlerTest.java | 4 +- .../rest/handler/job/JobExceptionsHandlerTest.java | 4 +- .../SubtaskCurrentAttemptDetailsHandlerTest.java | 4 +- ...askExecutionAttemptAccumulatorsHandlerTest.java | 4 +- .../SubtaskExecutionAttemptDetailsHandlerTest.java | 4 +- .../metrics/JobVertexWatermarksHandlerTest.java | 4 +- ...st.java => DefaultExecutionGraphCacheTest.java} | 18 +-- .../util/DocumentingDispatcherRestEndpoint.java | 3 +- .../runtime/rest/util/NoOpExecutionGraphCache.java | 50 ++++++++ 12 files changed, 89 insertions(+), 164 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java index d148c24..b2e21cf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; +import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -52,7 +53,7 @@ public interface RestEndpointFactory<T extends RestfulGateway> { FatalErrorHandler fatalErrorHandler) throws Exception; static ExecutionGraphCache createExecutionGraphCache(RestHandlerConfiguration restConfiguration) { - return new ExecutionGraphCache( + return new DefaultExecutionGraphCache( restConfiguration.getTimeout(), Time.milliseconds(restConfiguration.getRefreshInterval())); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java similarity index 83% copy from flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java copy to flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java index f634a62..a01704e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCache.java @@ -25,7 +25,6 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.util.Preconditions; -import java.io.Closeable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; @@ -33,11 +32,9 @@ import java.util.function.Function; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * Cache for {@link ArchivedExecutionGraph} which are obtained from the Flink cluster. Every cache entry - * has an associated time to live after which a new request will trigger the reloading of the - * {@link ArchivedExecutionGraph} from the cluster. + * Default implementation of {@link ExecutionGraphCache}. */ -public class ExecutionGraphCache implements Closeable { +public class DefaultExecutionGraphCache implements ExecutionGraphCache { private final Time timeout; @@ -47,7 +44,7 @@ public class ExecutionGraphCache implements Closeable { private volatile boolean running = true; - public ExecutionGraphCache( + public DefaultExecutionGraphCache( Time timeout, Time timeToLive) { this.timeout = checkNotNull(timeout); @@ -64,22 +61,12 @@ public class ExecutionGraphCache implements Closeable { cachedExecutionGraphs.clear(); } - /** - * Gets the number of cache entries. - */ + @Override public int size() { return cachedExecutionGraphs.size(); } - /** - * Gets the {@link AccessExecutionGraph} for the given {@link JobID} and caches it. The - * {@link AccessExecutionGraph} will be requested again after the refresh interval has passed - * or if the graph could not be retrieved from the given gateway. - * - * @param jobId identifying the {@link ArchivedExecutionGraph} to get - * @param restfulGateway to request the {@link ArchivedExecutionGraph} from - * @return Future containing the requested {@link ArchivedExecutionGraph} - */ + @Override public CompletableFuture<AccessExecutionGraph> getExecutionGraph(JobID jobId, RestfulGateway restfulGateway) { return getExecutionGraphInternal(jobId, restfulGateway).thenApply(Function.identity()); } @@ -137,9 +124,7 @@ public class ExecutionGraphCache implements Closeable { } } - /** - * Perform the cleanup of out dated {@link ExecutionGraphEntry}. - */ + @Override public void cleanup() { long currentTime = System.currentTimeMillis(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java index f634a62..b8b9489 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,57 +19,24 @@ package org.apache.flink.runtime.rest.handler.legacy; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.webmonitor.RestfulGateway; -import org.apache.flink.util.Preconditions; import java.io.Closeable; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Function; - -import static org.apache.flink.util.Preconditions.checkNotNull; /** * Cache for {@link ArchivedExecutionGraph} which are obtained from the Flink cluster. Every cache entry * has an associated time to live after which a new request will trigger the reloading of the * {@link ArchivedExecutionGraph} from the cluster. */ -public class ExecutionGraphCache implements Closeable { - - private final Time timeout; - - private final Time timeToLive; - - private final ConcurrentHashMap<JobID, ExecutionGraphEntry> cachedExecutionGraphs; - - private volatile boolean running = true; - - public ExecutionGraphCache( - Time timeout, - Time timeToLive) { - this.timeout = checkNotNull(timeout); - this.timeToLive = checkNotNull(timeToLive); - - cachedExecutionGraphs = new ConcurrentHashMap<>(4); - } - - @Override - public void close() { - running = false; - - // clear all cached AccessExecutionGraphs - cachedExecutionGraphs.clear(); - } +public interface ExecutionGraphCache extends Closeable { /** * Gets the number of cache entries. */ - public int size() { - return cachedExecutionGraphs.size(); - } + int size(); /** * Gets the {@link AccessExecutionGraph} for the given {@link JobID} and caches it. The @@ -80,93 +47,16 @@ public class ExecutionGraphCache implements Closeable { * @param restfulGateway to request the {@link ArchivedExecutionGraph} from * @return Future containing the requested {@link ArchivedExecutionGraph} */ - public CompletableFuture<AccessExecutionGraph> getExecutionGraph(JobID jobId, RestfulGateway restfulGateway) { - return getExecutionGraphInternal(jobId, restfulGateway).thenApply(Function.identity()); - } - - private CompletableFuture<ArchivedExecutionGraph> getExecutionGraphInternal(JobID jobId, RestfulGateway restfulGateway) { - Preconditions.checkState(running, "ExecutionGraphCache is no longer running"); - - while (true) { - final ExecutionGraphEntry oldEntry = cachedExecutionGraphs.get(jobId); - - final long currentTime = System.currentTimeMillis(); - - if (oldEntry != null && currentTime < oldEntry.getTTL()) { - final CompletableFuture<ArchivedExecutionGraph> executionGraphFuture = oldEntry.getExecutionGraphFuture(); - if (!executionGraphFuture.isCompletedExceptionally()) { - return executionGraphFuture; - } - // otherwise it must be completed exceptionally - } - - final ExecutionGraphEntry newEntry = new ExecutionGraphEntry(currentTime + timeToLive.toMilliseconds()); - - final boolean successfulUpdate; - - if (oldEntry == null) { - successfulUpdate = cachedExecutionGraphs.putIfAbsent(jobId, newEntry) == null; - } else { - successfulUpdate = cachedExecutionGraphs.replace(jobId, oldEntry, newEntry); - // cancel potentially outstanding futures - oldEntry.getExecutionGraphFuture().cancel(false); - } - - if (successfulUpdate) { - final CompletableFuture<ArchivedExecutionGraph> executionGraphFuture = restfulGateway.requestJob(jobId, timeout); - - executionGraphFuture.whenComplete( - (ArchivedExecutionGraph executionGraph, Throwable throwable) -> { - if (throwable != null) { - newEntry.getExecutionGraphFuture().completeExceptionally(throwable); - - // remove exceptionally completed entry because it doesn't help - cachedExecutionGraphs.remove(jobId, newEntry); - } else { - newEntry.getExecutionGraphFuture().complete(executionGraph); - } - }); - - if (!running) { - // delete newly added entry in case of a concurrent stopping operation - cachedExecutionGraphs.remove(jobId, newEntry); - } - - return newEntry.getExecutionGraphFuture(); - } - } - } + CompletableFuture<AccessExecutionGraph> getExecutionGraph(JobID jobId, RestfulGateway restfulGateway); /** - * Perform the cleanup of out dated {@link ExecutionGraphEntry}. + * Perform the cleanup of out dated cache entries. */ - public void cleanup() { - long currentTime = System.currentTimeMillis(); - - // remove entries which have exceeded their time to live - cachedExecutionGraphs.values().removeIf( - (ExecutionGraphEntry entry) -> currentTime >= entry.getTTL()); - } + void cleanup(); /** - * Wrapper containing the current execution graph and it's time to live (TTL). + * Closes the execution graph cache. */ - private static final class ExecutionGraphEntry { - private final long ttl; - - private final CompletableFuture<ArchivedExecutionGraph> executionGraphFuture; - - ExecutionGraphEntry(long ttl) { - this.ttl = ttl; - this.executionGraphFuture = new CompletableFuture<>(); - } - - public long getTTL() { - return ttl; - } - - public CompletableFuture<ArchivedExecutionGraph> getExecutionGraphFuture() { - return executionGraphFuture; - } - } + @Override + void close(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandlerTest.java index 1dcd2b2..d75ed0f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandlerTest.java @@ -24,7 +24,7 @@ import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.HandlerRequestException; -import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionConfigBuilder; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; @@ -57,7 +57,7 @@ public class JobConfigHandlerTest extends TestLogger { TestingUtils.TIMEOUT(), Collections.emptyMap(), JobConfigHeaders.getInstance(), - new ExecutionGraphCache(TestingUtils.TIMEOUT(), TestingUtils.TIMEOUT()), + new DefaultExecutionGraphCache(TestingUtils.TIMEOUT(), TestingUtils.TIMEOUT()), TestingUtils.defaultExecutor()); final Map<String, String> globalJobParameters = new HashMap<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java index abbd3fc..ac59952 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java @@ -31,7 +31,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.HandlerRequestException; -import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders; @@ -65,7 +65,7 @@ public class JobExceptionsHandlerTest extends TestLogger { TestingUtils.TIMEOUT(), Collections.emptyMap(), JobExceptionsHeaders.getInstance(), - new ExecutionGraphCache(TestingUtils.TIMEOUT(), TestingUtils.TIMEOUT()), + new DefaultExecutionGraphCache(TestingUtils.TIMEOUT(), TestingUtils.TIMEOUT()), TestingUtils.defaultExecutor()); final int numExceptions = 20; final AccessExecutionGraph archivedExecutionGraph = createAccessExecutionGraph(numExceptions); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java index 56da33c..167b25b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java @@ -32,7 +32,7 @@ import org.apache.flink.runtime.executiongraph.IOMetrics; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; -import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; @@ -125,7 +125,7 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger { Time.milliseconds(100), Collections.emptyMap(), SubtaskCurrentAttemptDetailsHeaders.getInstance(), - new ExecutionGraphCache( + new DefaultExecutionGraphCache( restHandlerConfiguration.getTimeout(), Time.milliseconds(restHandlerConfiguration.getRefreshInterval())), TestingUtils.defaultExecutor(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java index 352b05f..a2bfb7b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java @@ -29,7 +29,7 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; -import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters; import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsHeaders; @@ -65,7 +65,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandlerTest extends TestLogger { Time.milliseconds(100L), Collections.emptyMap(), SubtaskExecutionAttemptAccumulatorsHeaders.getInstance(), - new ExecutionGraphCache( + new DefaultExecutionGraphCache( restHandlerConfiguration.getTimeout(), Time.milliseconds(restHandlerConfiguration.getRefreshInterval())), TestingUtils.defaultExecutor()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java index 59b560f..b2b4654f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java @@ -33,7 +33,7 @@ import org.apache.flink.runtime.executiongraph.IOMetrics; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; -import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; @@ -128,7 +128,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger { Time.milliseconds(100L), Collections.emptyMap(), SubtaskExecutionAttemptDetailsHeaders.getInstance(), - new ExecutionGraphCache( + new DefaultExecutionGraphCache( restHandlerConfiguration.getTimeout(), Time.milliseconds(restHandlerConfiguration.getRefreshInterval())), TestingUtils.defaultExecutor(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexWatermarksHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexWatermarksHandlerTest.java index 2fd08ae..8efbd95 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexWatermarksHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexWatermarksHandlerTest.java @@ -24,7 +24,6 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.handler.HandlerRequest; -import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; @@ -33,6 +32,7 @@ import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters; import org.apache.flink.runtime.rest.messages.job.metrics.Metric; import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody; +import org.apache.flink.runtime.rest.util.NoOpExecutionGraphCache; import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever; import org.hamcrest.BaseMatcher; @@ -87,7 +87,7 @@ public class JobVertexWatermarksHandlerTest { Time.seconds(1), Collections.emptyMap(), metricFetcher, - Mockito.mock(ExecutionGraphCache.class), + NoOpExecutionGraphCache.INSTANCE, Mockito.mock(Executor.class)); final Map<String, String> pathParameters = new HashMap<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCacheTest.java similarity index 93% rename from flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCacheTest.java index 1426ac7..b78248a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DefaultExecutionGraphCacheTest.java @@ -59,9 +59,9 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** - * Tests for the {@link ExecutionGraphCache}. + * Tests for the {@link DefaultExecutionGraphCache}. */ -public class ExecutionGraphCacheTest extends TestLogger { +public class DefaultExecutionGraphCacheTest extends TestLogger { private static ArchivedExecutionGraph expectedExecutionGraph; private static final JobID expectedJobId = new JobID(); @@ -81,7 +81,7 @@ public class ExecutionGraphCacheTest extends TestLogger { final CountingRestfulGateway restfulGateway = createCountingRestfulGateway(expectedJobId, CompletableFuture.completedFuture(expectedExecutionGraph)); - try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { + try (ExecutionGraphCache executionGraphCache = new DefaultExecutionGraphCache(timeout, timeToLive)) { CompletableFuture<AccessExecutionGraph> accessExecutionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); assertEquals(expectedExecutionGraph, accessExecutionGraphFuture.get()); @@ -107,7 +107,7 @@ public class ExecutionGraphCacheTest extends TestLogger { CompletableFuture.completedFuture(expectedExecutionGraph), CompletableFuture.completedFuture(expectedExecutionGraph)); - try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { + try (ExecutionGraphCache executionGraphCache = new DefaultExecutionGraphCache(timeout, timeToLive)) { CompletableFuture<AccessExecutionGraph> executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); assertEquals(expectedExecutionGraph, executionGraphFuture.get()); @@ -139,7 +139,7 @@ public class ExecutionGraphCacheTest extends TestLogger { FutureUtils.completedExceptionally(new FlinkJobNotFoundException(expectedJobId)), CompletableFuture.completedFuture(expectedExecutionGraph)); - try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { + try (ExecutionGraphCache executionGraphCache = new DefaultExecutionGraphCache(timeout, timeToLive)) { CompletableFuture<AccessExecutionGraph> executionGraphFuture = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); try { @@ -158,7 +158,7 @@ public class ExecutionGraphCacheTest extends TestLogger { /** * Tests that cache entries are cleaned up when their TTL has expired upon - * calling {@link ExecutionGraphCache#cleanup()}. + * calling {@link DefaultExecutionGraphCache#cleanup()}. */ @Test public void testCacheEntryCleanup() throws Exception { @@ -183,7 +183,7 @@ public class ExecutionGraphCacheTest extends TestLogger { ) .build(); - try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { + try (ExecutionGraphCache executionGraphCache = new DefaultExecutionGraphCache(timeout, timeToLive)) { CompletableFuture<AccessExecutionGraph> executionGraph1Future = executionGraphCache.getExecutionGraph(expectedJobId, restfulGateway); @@ -219,7 +219,7 @@ public class ExecutionGraphCacheTest extends TestLogger { final ExecutorService executor = java.util.concurrent.Executors.newFixedThreadPool(numConcurrentAccesses); - try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { + try (ExecutionGraphCache executionGraphCache = new DefaultExecutionGraphCache(timeout, timeToLive)) { for (int i = 0; i < numConcurrentAccesses; i++) { CompletableFuture<AccessExecutionGraph> executionGraphFuture = CompletableFuture .supplyAsync( @@ -286,7 +286,7 @@ public class ExecutionGraphCacheTest extends TestLogger { public SuspendableAccessExecutionGraph(JobID jobId) { super( jobId, - "ExecutionGraphCacheTest", + "DefaultExecutionGraphCacheTest", Collections.emptyMap(), Collections.emptyList(), new long[0], diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java index 9f84bce..92eedec 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java @@ -28,7 +28,6 @@ import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; -import org.apache.flink.runtime.rest.RestEndpointFactory; import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; @@ -86,7 +85,7 @@ public class DocumentingDispatcherRestEndpoint extends DispatcherRestEndpoint im Executors.newScheduledThreadPool(1), VoidMetricFetcher.INSTANCE, NoOpElectionService.INSTANCE, - RestEndpointFactory.createExecutionGraphCache(handlerConfig), + NoOpExecutionGraphCache.INSTANCE, NoOpFatalErrorHandler.INSTANCE); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/NoOpExecutionGraphCache.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/NoOpExecutionGraphCache.java new file mode 100644 index 0000000..db4ceec --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/NoOpExecutionGraphCache.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.util; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.webmonitor.RestfulGateway; + +import java.util.concurrent.CompletableFuture; + +/** + * {@link ExecutionGraphCache} which does nothing. + */ +public enum NoOpExecutionGraphCache implements ExecutionGraphCache { + INSTANCE; + + @Override + public int size() { + return 0; + } + + @Override + public CompletableFuture<AccessExecutionGraph> getExecutionGraph(JobID jobId, RestfulGateway restfulGateway) { + return FutureUtils.completedExceptionally(new UnsupportedOperationException("NoOpExecutionGraphCache does not support to retrieve execution graphs")); + } + + @Override + public void cleanup() {} + + @Override + public void close() {} +}