This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ec3424  [FLINK-13256] Ensure periodical checkpointing continues when 
regional failover aborts pending checkpoints
1ec3424 is described below

commit 1ec34249a0303ae64d049d177057ef9b6c413ab5
Author: Yun Tang <myas...@live.com>
AuthorDate: Thu Jul 18 15:58:21 2019 +0800

    [FLINK-13256] Ensure periodical checkpointing continues when regional 
failover aborts pending checkpoints
    
    This closes #9128.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 139 ++++++++-------
 .../executiongraph/failover/FailoverRegion.java    |   2 +-
 .../FailoverStrategyCheckpointCoordinatorTest.java | 186 +++++++++++++++++++++
 3 files changed, 264 insertions(+), 63 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 3dc5c1d..682685c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -478,32 +478,9 @@ public class CheckpointCoordinator {
                                        throw new 
CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
                                }
 
-                               // if too many checkpoints are currently in 
progress, we need to mark that a request is queued
-                               if (pendingCheckpoints.size() >= 
maxConcurrentCheckpointAttempts) {
-                                       triggerRequestQueued = true;
-                                       if (currentPeriodicTrigger != null) {
-                                               
currentPeriodicTrigger.cancel(false);
-                                               currentPeriodicTrigger = null;
-                                       }
-                                       throw new 
CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
-                               }
+                               checkConcurrentCheckpoints();
 
-                               // make sure the minimum interval between 
checkpoints has passed
-                               final long earliestNext = 
lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
-                               final long durationTillNextMillis = 
(earliestNext - System.nanoTime()) / 1_000_000;
-
-                               if (durationTillNextMillis > 0) {
-                                       if (currentPeriodicTrigger != null) {
-                                               
currentPeriodicTrigger.cancel(false);
-                                               currentPeriodicTrigger = null;
-                                       }
-                                       // Reassign the new trigger to the 
currentPeriodicTrigger
-                                       currentPeriodicTrigger = 
timer.scheduleAtFixedRate(
-                                                       new ScheduledTrigger(),
-                                                       durationTillNextMillis, 
baseInterval, TimeUnit.MILLISECONDS);
-
-                                       throw new 
CheckpointException(CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
-                               }
+                               checkMinPauseBetweenCheckpoints();
                        }
                }
 
@@ -623,32 +600,9 @@ public class CheckpointCoordinator {
                                                        throw new 
CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
                                                }
 
-                                               if (pendingCheckpoints.size() 
>= maxConcurrentCheckpointAttempts) {
-                                                       triggerRequestQueued = 
true;
-                                                       if 
(currentPeriodicTrigger != null) {
-                                                               
currentPeriodicTrigger.cancel(false);
-                                                               
currentPeriodicTrigger = null;
-                                                       }
-                                                       throw new 
CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
-                                               }
-
-                                               // make sure the minimum 
interval between checkpoints has passed
-                                               final long earliestNext = 
lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
-                                               final long 
durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
-
-                                               if (durationTillNextMillis > 0) 
{
-                                                       if 
(currentPeriodicTrigger != null) {
-                                                               
currentPeriodicTrigger.cancel(false);
-                                                               
currentPeriodicTrigger = null;
-                                                       }
+                                               checkConcurrentCheckpoints();
 
-                                                       // Reassign the new 
trigger to the currentPeriodicTrigger
-                                                       currentPeriodicTrigger 
= timer.scheduleAtFixedRate(
-                                                                       new 
ScheduledTrigger(),
-                                                                       
durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
-
-                                                       throw new 
CheckpointException(CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
-                                               }
+                                               
checkMinPauseBetweenCheckpoints();
                                        }
 
                                        LOG.info("Triggering checkpoint {} @ {} 
for job {}.", checkpointID, timestamp, job);
@@ -1025,9 +979,7 @@ public class CheckpointCoordinator {
                                if (currentPeriodicTrigger != null) {
                                        currentPeriodicTrigger.cancel(false);
                                }
-                               currentPeriodicTrigger = 
timer.scheduleAtFixedRate(
-                                               new ScheduledTrigger(),
-                                               0L, baseInterval, 
TimeUnit.MILLISECONDS);
+                               currentPeriodicTrigger = 
scheduleTriggerWithDelay(0L);
                        }
                        else {
                                timer.execute(new ScheduledTrigger());
@@ -1224,6 +1176,11 @@ public class CheckpointCoordinator {
                return checkpointTimeout;
        }
 
+       @VisibleForTesting
+       boolean isCurrentPeriodicTriggerAvailable() {
+               return currentPeriodicTrigger != null;
+       }
+
        /**
         * Returns whether periodic checkpointing has been configured.
         *
@@ -1247,10 +1204,7 @@ public class CheckpointCoordinator {
                        stopCheckpointScheduler();
 
                        periodicScheduling = true;
-                       long initialDelay = 
ThreadLocalRandom.current().nextLong(
-                               minPauseBetweenCheckpointsNanos / 1_000_000L, 
baseInterval + 1L);
-                       currentPeriodicTrigger = timer.scheduleAtFixedRate(
-                                       new ScheduledTrigger(), initialDelay, 
baseInterval, TimeUnit.MILLISECONDS);
+                       currentPeriodicTrigger = 
scheduleTriggerWithDelay(getRandomInitDelay());
                }
        }
 
@@ -1284,6 +1238,54 @@ public class CheckpointCoordinator {
                }
        }
 
+       /**
+        * If too many checkpoints are currently in progress, we need to mark 
that a request is queued
+        *
+        * @throws CheckpointException If too many checkpoints are currently in 
progress.
+        */
+       private void checkConcurrentCheckpoints() throws CheckpointException {
+               if (pendingCheckpoints.size() >= 
maxConcurrentCheckpointAttempts) {
+                       triggerRequestQueued = true;
+                       if (currentPeriodicTrigger != null) {
+                               currentPeriodicTrigger.cancel(false);
+                               currentPeriodicTrigger = null;
+                       }
+                       throw new 
CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
+               }
+       }
+
+       /**
+        * Make sure the minimum interval between checkpoints has passed
+        *
+        * @throws CheckpointException If the minimum interval between 
checkpoints has not passed.
+        */
+       private void checkMinPauseBetweenCheckpoints() throws 
CheckpointException {
+               final long earliestNext = lastCheckpointCompletionNanos + 
minPauseBetweenCheckpointsNanos;
+               final long durationTillNextMillis = (earliestNext - 
System.nanoTime()) / 1_000_000;
+
+               if (durationTillNextMillis > 0) {
+                       if (currentPeriodicTrigger != null) {
+                               currentPeriodicTrigger.cancel(false);
+                               currentPeriodicTrigger = null;
+                       }
+                       // Reassign the new trigger to the 
currentPeriodicTrigger
+                       currentPeriodicTrigger = 
scheduleTriggerWithDelay(durationTillNextMillis);
+
+                       throw new 
CheckpointException(CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
+               }
+       }
+
+       private long getRandomInitDelay() {
+               return ThreadLocalRandom.current().nextLong(
+                       minPauseBetweenCheckpointsNanos / 1_000_000L, 
baseInterval + 1L);
+       }
+
+       private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {
+               return timer.scheduleAtFixedRate(
+                       new ScheduledTrigger(),
+                       initDelay, baseInterval, TimeUnit.MILLISECONDS);
+       }
+
        // 
------------------------------------------------------------------------
        //  job status listener that schedules / cancels periodic checkpoints
        // 
------------------------------------------------------------------------
@@ -1391,6 +1393,13 @@ public class CheckpointCoordinator {
        }
 
        private void failPendingCheckpoint(
+               final PendingCheckpoint pendingCheckpoint,
+               final CheckpointFailureReason reason) {
+
+               failPendingCheckpoint(pendingCheckpoint, reason, null);
+       }
+
+       private void failPendingCheckpoint(
                        final PendingCheckpoint pendingCheckpoint,
                        final CheckpointFailureReason reason,
                        final Throwable cause) {
@@ -1398,12 +1407,18 @@ public class CheckpointCoordinator {
                CheckpointException exception = new CheckpointException(reason, 
cause);
                pendingCheckpoint.abort(reason, cause);
                failureManager.handleCheckpointException(exception, 
pendingCheckpoint.getCheckpointId());
-       }
 
-       private void failPendingCheckpoint(
-                       final PendingCheckpoint pendingCheckpoint,
-                       final CheckpointFailureReason reason) {
+               if (!shutdown && periodicScheduling && currentPeriodicTrigger 
== null) {
+                       synchronized (lock) {
+                               if (pendingCheckpoints.isEmpty() || 
allPendingCheckpointsDiscarded()) {
+                                       triggerRequestQueued = false;
+                                       currentPeriodicTrigger = 
scheduleTriggerWithDelay(getRandomInitDelay());
+                               }
+                       }
+               }
+       }
 
-               failPendingCheckpoint(pendingCheckpoint, reason, null);
+       private boolean allPendingCheckpointsDiscarded() {
+               return 
pendingCheckpoints.values().stream().allMatch(PendingCheckpoint::isDiscarded);
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
index 2c0c148..9a93bb8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
@@ -209,7 +209,7 @@ public class FailoverRegion {
                        if (transitionState(JobStatus.CREATED, 
JobStatus.RUNNING)) {
                                // if we have checkpointed state, reload it 
into the executions
                                if (executionGraph.getCheckpointCoordinator() 
!= null) {
-                                       // we restart the checkpoint scheduler 
for
+                                       // we abort pending checkpoints for
                                        // i) enable new checkpoint could be 
triggered without waiting for last checkpoint expired.
                                        // ii) ensure the EXACTLY_ONCE 
semantics if needed.
                                        
executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java
new file mode 100644
index 0000000..1d77716
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.mock.Whitebox;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.NeverCompleteFuture;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the interaction between the {@link FailoverStrategy} and the 
{@link CheckpointCoordinator}.
+ */
+public class FailoverStrategyCheckpointCoordinatorTest extends TestLogger {
+       private ManuallyTriggeredScheduledExecutor manualThreadExecutor;
+
+       @Before
+       public void setUp() {
+               manualThreadExecutor = new ManuallyTriggeredScheduledExecutor();
+       }
+
+       /**
+        * Tests that {@link 
CheckpointCoordinator#abortPendingCheckpoints(CheckpointException)}
+        * called by {@link AdaptedRestartPipelinedRegionStrategyNG} or {@link 
FailoverRegion} could handle
+        * the {@code currentPeriodicTrigger} null situation well.
+        */
+       @Test
+       public void testAbortPendingCheckpointsWithTriggerValidation() {
+               final int maxConcurrentCheckpoints = 
ThreadLocalRandom.current().nextInt(10) + 1;
+               ExecutionVertex executionVertex = mockExecutionVertex();
+               CheckpointCoordinatorConfiguration 
checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration(
+                       Integer.MAX_VALUE,
+                       Integer.MAX_VALUE,
+                       0,
+                       maxConcurrentCheckpoints,
+                       
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+                       true,
+                       false,
+                       0);
+               CheckpointCoordinator checkpointCoordinator = new 
CheckpointCoordinator(
+                       new JobID(),
+                       checkpointCoordinatorConfiguration,
+                       new ExecutionVertex[] { executionVertex },
+                       new ExecutionVertex[] { executionVertex },
+                       new ExecutionVertex[] { executionVertex },
+                       new StandaloneCheckpointIDCounter(),
+                       new StandaloneCompletedCheckpointStore(1),
+                       new MemoryStateBackend(),
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY,
+                       mock(CheckpointFailureManager.class));
+
+               // switch current execution's state to running to allow 
checkpoint could be triggered.
+               mockExecutionRunning(executionVertex);
+
+               // use manual checkpoint timer to trigger period checkpoints as 
we expect.
+               ManualCheckpointTimer manualCheckpointTimer = new 
ManualCheckpointTimer(manualThreadExecutor);
+               // set the init delay as 0 to ensure first checkpoint could be 
triggered once we trigger the manual executor
+               // this is used to avoid the randomness of when to trigger the 
first checkpoint (introduced via FLINK-9352)
+               manualCheckpointTimer.setManualDelay(0L);
+               Whitebox.setInternalState(checkpointCoordinator, "timer", 
manualCheckpointTimer);
+
+               checkpointCoordinator.startCheckpointScheduler();
+               
assertTrue(checkpointCoordinator.isCurrentPeriodicTriggerAvailable());
+               manualThreadExecutor.triggerAll();
+               manualThreadExecutor.triggerScheduledTasks();
+               assertEquals(1, 
checkpointCoordinator.getNumberOfPendingCheckpoints());
+
+               for (int i = 1; i < maxConcurrentCheckpoints; i++) {
+                       
checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(), false);
+                       assertEquals(i + 1, 
checkpointCoordinator.getNumberOfPendingCheckpoints());
+                       
assertTrue(checkpointCoordinator.isCurrentPeriodicTriggerAvailable());
+               }
+
+               // as we only support limited concurrent checkpoints, after 
checkpoint triggered more than the limits,
+               // the currentPeriodicTrigger would been assigned as null.
+               
checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(), false);
+               
assertFalse(checkpointCoordinator.isCurrentPeriodicTriggerAvailable());
+               assertEquals(maxConcurrentCheckpoints, 
checkpointCoordinator.getNumberOfPendingCheckpoints());
+
+               checkpointCoordinator.abortPendingCheckpoints(
+                       new 
CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));
+               // after aborting checkpoints, we ensure currentPeriodicTrigger 
still available.
+               
assertTrue(checkpointCoordinator.isCurrentPeriodicTriggerAvailable());
+               assertEquals(0, 
checkpointCoordinator.getNumberOfPendingCheckpoints());
+       }
+
+       private ExecutionVertex mockExecutionVertex() {
+               ExecutionAttemptID executionAttemptID = new 
ExecutionAttemptID();
+               ExecutionVertex executionVertex = mock(ExecutionVertex.class);
+               Execution execution = Mockito.mock(Execution.class);
+               when(execution.getAttemptId()).thenReturn(executionAttemptID);
+               
when(executionVertex.getCurrentExecutionAttempt()).thenReturn(execution);
+               return executionVertex;
+       }
+
+       private void mockExecutionRunning(ExecutionVertex executionVertex) {
+               
when(executionVertex.getCurrentExecutionAttempt().getState()).thenReturn(ExecutionState.RUNNING);
+       }
+
+       public static class ManualCheckpointTimer extends 
ScheduledThreadPoolExecutor {
+               private final ScheduledExecutor scheduledExecutor;
+               private long manualDelay = 0;
+
+               ManualCheckpointTimer(final ScheduledExecutor 
scheduledExecutor) {
+                       super(0);
+                       this.scheduledExecutor = scheduledExecutor;
+               }
+
+               void setManualDelay(long manualDelay) {
+                       this.manualDelay = manualDelay;
+               }
+
+               @Override
+               public ScheduledFuture<?> schedule(Runnable command, long 
delay, TimeUnit unit) {
+                       // used as checkpoint canceller, as we don't want 
pending checkpoint cancelled, this should never be scheduled.
+                       return new NeverCompleteFuture(delay);
+               }
+
+               @Override
+               public <V> ScheduledFuture<V> schedule(Callable<V> callable, 
long delay, TimeUnit unit) {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, 
long initialDelay, long period, TimeUnit unit) {
+                       // used to schedule periodic checkpoints.
+                       // this would use configured 'manualDelay' to let the 
task schedule with the wanted delay.
+                       return 
scheduledExecutor.scheduleWithFixedDelay(command, manualDelay, period, unit);
+               }
+
+               @Override
+               public ScheduledFuture<?> scheduleWithFixedDelay(Runnable 
command, long initialDelay, long delay, TimeUnit unit) {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public void execute(Runnable command) {
+                       scheduledExecutor.execute(command);
+               }
+       }
+}

Reply via email to