This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 1ec3424 [FLINK-13256] Ensure periodical checkpointing continues when regional failover aborts pending checkpoints 1ec3424 is described below commit 1ec34249a0303ae64d049d177057ef9b6c413ab5 Author: Yun Tang <myas...@live.com> AuthorDate: Thu Jul 18 15:58:21 2019 +0800 [FLINK-13256] Ensure periodical checkpointing continues when regional failover aborts pending checkpoints This closes #9128. --- .../runtime/checkpoint/CheckpointCoordinator.java | 139 ++++++++------- .../executiongraph/failover/FailoverRegion.java | 2 +- .../FailoverStrategyCheckpointCoordinatorTest.java | 186 +++++++++++++++++++++ 3 files changed, 264 insertions(+), 63 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 3dc5c1d..682685c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -478,32 +478,9 @@ public class CheckpointCoordinator { throw new CheckpointException(CheckpointFailureReason.ALREADY_QUEUED); } - // if too many checkpoints are currently in progress, we need to mark that a request is queued - if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { - triggerRequestQueued = true; - if (currentPeriodicTrigger != null) { - currentPeriodicTrigger.cancel(false); - currentPeriodicTrigger = null; - } - throw new CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS); - } + checkConcurrentCheckpoints(); - // make sure the minimum interval between checkpoints has passed - final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos; - final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000; - - if (durationTillNextMillis > 0) { - if (currentPeriodicTrigger != null) { - currentPeriodicTrigger.cancel(false); - currentPeriodicTrigger = null; - } - // Reassign the new trigger to the currentPeriodicTrigger - currentPeriodicTrigger = timer.scheduleAtFixedRate( - new ScheduledTrigger(), - durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS); - - throw new CheckpointException(CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS); - } + checkMinPauseBetweenCheckpoints(); } } @@ -623,32 +600,9 @@ public class CheckpointCoordinator { throw new CheckpointException(CheckpointFailureReason.ALREADY_QUEUED); } - if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { - triggerRequestQueued = true; - if (currentPeriodicTrigger != null) { - currentPeriodicTrigger.cancel(false); - currentPeriodicTrigger = null; - } - throw new CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS); - } - - // make sure the minimum interval between checkpoints has passed - final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos; - final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000; - - if (durationTillNextMillis > 0) { - if (currentPeriodicTrigger != null) { - currentPeriodicTrigger.cancel(false); - currentPeriodicTrigger = null; - } + checkConcurrentCheckpoints(); - // Reassign the new trigger to the currentPeriodicTrigger - currentPeriodicTrigger = timer.scheduleAtFixedRate( - new ScheduledTrigger(), - durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS); - - throw new CheckpointException(CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS); - } + checkMinPauseBetweenCheckpoints(); } LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job); @@ -1025,9 +979,7 @@ public class CheckpointCoordinator { if (currentPeriodicTrigger != null) { currentPeriodicTrigger.cancel(false); } - currentPeriodicTrigger = timer.scheduleAtFixedRate( - new ScheduledTrigger(), - 0L, baseInterval, TimeUnit.MILLISECONDS); + currentPeriodicTrigger = scheduleTriggerWithDelay(0L); } else { timer.execute(new ScheduledTrigger()); @@ -1224,6 +1176,11 @@ public class CheckpointCoordinator { return checkpointTimeout; } + @VisibleForTesting + boolean isCurrentPeriodicTriggerAvailable() { + return currentPeriodicTrigger != null; + } + /** * Returns whether periodic checkpointing has been configured. * @@ -1247,10 +1204,7 @@ public class CheckpointCoordinator { stopCheckpointScheduler(); periodicScheduling = true; - long initialDelay = ThreadLocalRandom.current().nextLong( - minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L); - currentPeriodicTrigger = timer.scheduleAtFixedRate( - new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS); + currentPeriodicTrigger = scheduleTriggerWithDelay(getRandomInitDelay()); } } @@ -1284,6 +1238,54 @@ public class CheckpointCoordinator { } } + /** + * If too many checkpoints are currently in progress, we need to mark that a request is queued + * + * @throws CheckpointException If too many checkpoints are currently in progress. + */ + private void checkConcurrentCheckpoints() throws CheckpointException { + if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { + triggerRequestQueued = true; + if (currentPeriodicTrigger != null) { + currentPeriodicTrigger.cancel(false); + currentPeriodicTrigger = null; + } + throw new CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS); + } + } + + /** + * Make sure the minimum interval between checkpoints has passed + * + * @throws CheckpointException If the minimum interval between checkpoints has not passed. + */ + private void checkMinPauseBetweenCheckpoints() throws CheckpointException { + final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos; + final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000; + + if (durationTillNextMillis > 0) { + if (currentPeriodicTrigger != null) { + currentPeriodicTrigger.cancel(false); + currentPeriodicTrigger = null; + } + // Reassign the new trigger to the currentPeriodicTrigger + currentPeriodicTrigger = scheduleTriggerWithDelay(durationTillNextMillis); + + throw new CheckpointException(CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS); + } + } + + private long getRandomInitDelay() { + return ThreadLocalRandom.current().nextLong( + minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L); + } + + private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) { + return timer.scheduleAtFixedRate( + new ScheduledTrigger(), + initDelay, baseInterval, TimeUnit.MILLISECONDS); + } + // ------------------------------------------------------------------------ // job status listener that schedules / cancels periodic checkpoints // ------------------------------------------------------------------------ @@ -1391,6 +1393,13 @@ public class CheckpointCoordinator { } private void failPendingCheckpoint( + final PendingCheckpoint pendingCheckpoint, + final CheckpointFailureReason reason) { + + failPendingCheckpoint(pendingCheckpoint, reason, null); + } + + private void failPendingCheckpoint( final PendingCheckpoint pendingCheckpoint, final CheckpointFailureReason reason, final Throwable cause) { @@ -1398,12 +1407,18 @@ public class CheckpointCoordinator { CheckpointException exception = new CheckpointException(reason, cause); pendingCheckpoint.abort(reason, cause); failureManager.handleCheckpointException(exception, pendingCheckpoint.getCheckpointId()); - } - private void failPendingCheckpoint( - final PendingCheckpoint pendingCheckpoint, - final CheckpointFailureReason reason) { + if (!shutdown && periodicScheduling && currentPeriodicTrigger == null) { + synchronized (lock) { + if (pendingCheckpoints.isEmpty() || allPendingCheckpointsDiscarded()) { + triggerRequestQueued = false; + currentPeriodicTrigger = scheduleTriggerWithDelay(getRandomInitDelay()); + } + } + } + } - failPendingCheckpoint(pendingCheckpoint, reason, null); + private boolean allPendingCheckpointsDiscarded() { + return pendingCheckpoints.values().stream().allMatch(PendingCheckpoint::isDiscarded); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java index 2c0c148..9a93bb8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java @@ -209,7 +209,7 @@ public class FailoverRegion { if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) { // if we have checkpointed state, reload it into the executions if (executionGraph.getCheckpointCoordinator() != null) { - // we restart the checkpoint scheduler for + // we abort pending checkpoints for // i) enable new checkpoint could be triggered without waiting for last checkpoint expired. // ii) ensure the EXACTLY_ONCE semantics if needed. executionGraph.getCheckpointCoordinator().abortPendingCheckpoints( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java new file mode 100644 index 0000000..1d77716 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.mock.Whitebox; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverRegion; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.concurrent.NeverCompleteFuture; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for the interaction between the {@link FailoverStrategy} and the {@link CheckpointCoordinator}. + */ +public class FailoverStrategyCheckpointCoordinatorTest extends TestLogger { + private ManuallyTriggeredScheduledExecutor manualThreadExecutor; + + @Before + public void setUp() { + manualThreadExecutor = new ManuallyTriggeredScheduledExecutor(); + } + + /** + * Tests that {@link CheckpointCoordinator#abortPendingCheckpoints(CheckpointException)} + * called by {@link AdaptedRestartPipelinedRegionStrategyNG} or {@link FailoverRegion} could handle + * the {@code currentPeriodicTrigger} null situation well. + */ + @Test + public void testAbortPendingCheckpointsWithTriggerValidation() { + final int maxConcurrentCheckpoints = ThreadLocalRandom.current().nextInt(10) + 1; + ExecutionVertex executionVertex = mockExecutionVertex(); + CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration( + Integer.MAX_VALUE, + Integer.MAX_VALUE, + 0, + maxConcurrentCheckpoints, + CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator( + new JobID(), + checkpointCoordinatorConfiguration, + new ExecutionVertex[] { executionVertex }, + new ExecutionVertex[] { executionVertex }, + new ExecutionVertex[] { executionVertex }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + new MemoryStateBackend(), + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY, + mock(CheckpointFailureManager.class)); + + // switch current execution's state to running to allow checkpoint could be triggered. + mockExecutionRunning(executionVertex); + + // use manual checkpoint timer to trigger period checkpoints as we expect. + ManualCheckpointTimer manualCheckpointTimer = new ManualCheckpointTimer(manualThreadExecutor); + // set the init delay as 0 to ensure first checkpoint could be triggered once we trigger the manual executor + // this is used to avoid the randomness of when to trigger the first checkpoint (introduced via FLINK-9352) + manualCheckpointTimer.setManualDelay(0L); + Whitebox.setInternalState(checkpointCoordinator, "timer", manualCheckpointTimer); + + checkpointCoordinator.startCheckpointScheduler(); + assertTrue(checkpointCoordinator.isCurrentPeriodicTriggerAvailable()); + manualThreadExecutor.triggerAll(); + manualThreadExecutor.triggerScheduledTasks(); + assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints()); + + for (int i = 1; i < maxConcurrentCheckpoints; i++) { + checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(), false); + assertEquals(i + 1, checkpointCoordinator.getNumberOfPendingCheckpoints()); + assertTrue(checkpointCoordinator.isCurrentPeriodicTriggerAvailable()); + } + + // as we only support limited concurrent checkpoints, after checkpoint triggered more than the limits, + // the currentPeriodicTrigger would been assigned as null. + checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(), false); + assertFalse(checkpointCoordinator.isCurrentPeriodicTriggerAvailable()); + assertEquals(maxConcurrentCheckpoints, checkpointCoordinator.getNumberOfPendingCheckpoints()); + + checkpointCoordinator.abortPendingCheckpoints( + new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION)); + // after aborting checkpoints, we ensure currentPeriodicTrigger still available. + assertTrue(checkpointCoordinator.isCurrentPeriodicTriggerAvailable()); + assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints()); + } + + private ExecutionVertex mockExecutionVertex() { + ExecutionAttemptID executionAttemptID = new ExecutionAttemptID(); + ExecutionVertex executionVertex = mock(ExecutionVertex.class); + Execution execution = Mockito.mock(Execution.class); + when(execution.getAttemptId()).thenReturn(executionAttemptID); + when(executionVertex.getCurrentExecutionAttempt()).thenReturn(execution); + return executionVertex; + } + + private void mockExecutionRunning(ExecutionVertex executionVertex) { + when(executionVertex.getCurrentExecutionAttempt().getState()).thenReturn(ExecutionState.RUNNING); + } + + public static class ManualCheckpointTimer extends ScheduledThreadPoolExecutor { + private final ScheduledExecutor scheduledExecutor; + private long manualDelay = 0; + + ManualCheckpointTimer(final ScheduledExecutor scheduledExecutor) { + super(0); + this.scheduledExecutor = scheduledExecutor; + } + + void setManualDelay(long manualDelay) { + this.manualDelay = manualDelay; + } + + @Override + public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { + // used as checkpoint canceller, as we don't want pending checkpoint cancelled, this should never be scheduled. + return new NeverCompleteFuture(delay); + } + + @Override + public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + @Override + public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + // used to schedule periodic checkpoints. + // this would use configured 'manualDelay' to let the task schedule with the wanted delay. + return scheduledExecutor.scheduleWithFixedDelay(command, manualDelay, period, unit); + } + + @Override + public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + @Override + public void execute(Runnable command) { + scheduledExecutor.execute(command); + } + } +}