[FLINK-9042][tests] Port ResumeCheckpointManuallyITCase to flip6 This closes #5736.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c553ba4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c553ba4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c553ba4 Branch: refs/heads/master Commit: 7c553ba45b44145ea09e4d9ccb0bdf64df7ee076 Parents: db366cd Author: zentol <ches...@apache.org> Authored: Wed Mar 21 13:31:56 2018 +0100 Committer: zentol <ches...@apache.org> Committed: Wed Apr 4 08:59:05 2018 +0200 ---------------------------------------------------------------------- .../ResumeCheckpointManuallyITCase.java | 146 +++++++++++++------ 1 file changed, 104 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7c553ba4/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java index 537f864..add4243 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java @@ -18,25 +18,26 @@ package org.apache.flink.test.checkpointing; -import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.CheckpointingOptions; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; -import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.test.state.ManualWindowSpeedITCase; +import org.apache.flink.test.util.MiniClusterResource; import org.apache.flink.util.TestLogger; import org.apache.curator.test.TestingServer; @@ -44,9 +45,17 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import javax.annotation.Nullable; + import java.io.File; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertNotNull; /** * IT case for resuming from checkpoints manually via their external pointer, rather than automatic @@ -240,14 +249,10 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { final Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TASK_MANAGER); - final File savepointDir = temporaryFolder.newFolder(); config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString()); config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString()); - config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString()); if (localRecovery) { config.setString( @@ -263,56 +268,113 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString()); } - TestingCluster cluster = new TestingCluster(config); - cluster.start(); + MiniClusterResource cluster = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + config, + NUM_TASK_MANAGERS, + SLOTS_PER_TASK_MANAGER), + true); + + cluster.before(); - String externalCheckpoint = null; + ClusterClient<?> client = cluster.getClusterClient(); + client.setDetached(true); try { + // main test sequence: start job -> eCP -> restore job -> eCP -> restore job + String firstExternalCheckpoint = runJobAndGetExternalizedCheckpoint(backend, checkpointDir, null, client); + assertNotNull(firstExternalCheckpoint); + + String secondExternalCheckpoint = runJobAndGetExternalizedCheckpoint(backend, checkpointDir, firstExternalCheckpoint, client); + assertNotNull(secondExternalCheckpoint); - // main test sequence: start job -> eCP -> restore job -> eCP -> restore job -> eCP - for (int i = 0; i < 3; ++i) { - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + String thirdExternalCheckpoint = runJobAndGetExternalizedCheckpoint(backend, checkpointDir, secondExternalCheckpoint, client); + assertNotNull(thirdExternalCheckpoint); + } finally { + cluster.after(); + } + } - env.setStateBackend(backend); - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - env.setParallelism(PARALLELISM); + private static String runJobAndGetExternalizedCheckpoint(StateBackend backend, File checkpointDir, @Nullable String externalCheckpoint, ClusterClient<?> client) throws Exception { + JobGraph initialJobGraph = getJobGraph(backend, externalCheckpoint); + NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(PARALLELISM); - // initialize count down latch - NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(PARALLELISM); + client.submitJob(initialJobGraph, ResumeCheckpointManuallyITCase.class.getClassLoader()); - env.addSource(new NotifyingInfiniteTupleSource(10_000)) - .keyBy(0) - .timeWindow(Time.seconds(3)) - .reduce((value1, value2) -> Tuple2.of(value1.f0, value1.f1 + value2.f1)) - .filter(value -> value.f0.startsWith("Tuple 0")); + // wait until all sources have been started + NotifyingInfiniteTupleSource.countDownLatch.await(); - StreamGraph streamGraph = env.getStreamGraph(); - streamGraph.setJobName("Test"); + waitUntilExternalizedCheckpointCreated(checkpointDir, initialJobGraph.getJobID()); + client.cancel(initialJobGraph.getJobID()); + waitUntilCanceled(initialJobGraph.getJobID(), client); - JobGraph jobGraph = streamGraph.getJobGraph(); + return getExternalizedCheckpointCheckpointPath(checkpointDir, initialJobGraph.getJobID()); + } + + private static String getExternalizedCheckpointCheckpointPath(File checkpointDir, JobID jobId) throws IOException { + Optional<Path> checkpoint = findExternalizedCheckpoint(checkpointDir, jobId); + if (!checkpoint.isPresent()) { + throw new AssertionError("No complete checkpoint could be found."); + } else { + return checkpoint.get().toString(); + } + } - // recover from previous iteration? - if (externalCheckpoint != null) { - jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(externalCheckpoint)); + private static void waitUntilExternalizedCheckpointCreated(File checkpointDir, JobID jobId) throws InterruptedException, IOException { + while (true) { + Thread.sleep(50); + Optional<Path> externalizedCheckpoint = findExternalizedCheckpoint(checkpointDir, jobId); + if (externalizedCheckpoint.isPresent()) { + break; + } + } + } + + private static Optional<Path> findExternalizedCheckpoint(File checkpointDir, JobID jobId) throws IOException { + return Files.list(checkpointDir.toPath().resolve(jobId.toString())) + .filter(path -> path.getFileName().toString().startsWith("chk-")) + .filter(path -> { + try { + return Files.list(path).anyMatch(child -> child.getFileName().toString().contains("meta")); + } catch (IOException ignored) { + return false; } + }) + .findAny(); + } - config.addAll(jobGraph.getJobConfiguration()); - JobSubmissionResult submissionResult = cluster.submitJobDetached(jobGraph); + private static void waitUntilCanceled(JobID jobId, ClusterClient<?> client) throws ExecutionException, InterruptedException { + while (client.getJobStatus(jobId).get() != JobStatus.CANCELLING) { + Thread.sleep(50); + } + } - // wait until all sources have been started - NotifyingInfiniteTupleSource.countDownLatch.await(); + private static JobGraph getJobGraph(StateBackend backend, @Nullable String externalCheckpoint) { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - externalCheckpoint = cluster.requestCheckpoint( - submissionResult.getJobID(), - CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION); + env.enableCheckpointing(500); + env.setStateBackend(backend); + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + env.setParallelism(PARALLELISM); + env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); - cluster.cancelJob(submissionResult.getJobID()); - } - } finally { - cluster.stop(); - cluster.awaitTermination(); + env.addSource(new NotifyingInfiniteTupleSource(10_000)) + .keyBy(0) + .timeWindow(Time.seconds(3)) + .reduce((value1, value2) -> Tuple2.of(value1.f0, value1.f1 + value2.f1)) + .filter(value -> value.f0.startsWith("Tuple 0")); + + StreamGraph streamGraph = env.getStreamGraph(); + streamGraph.setJobName("Test"); + + JobGraph jobGraph = streamGraph.getJobGraph(); + + // recover from previous iteration? + if (externalCheckpoint != null) { + jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(externalCheckpoint)); } + + return jobGraph; } /**