[FLINK-3051] [streaming] Add mechanisms to control the maximum number of concurrent checkpoints
This closes #1408 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/55fd5f32 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/55fd5f32 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/55fd5f32 Branch: refs/heads/master Commit: 55fd5f32d7ef0292a01192ab08456fae49b91791 Parents: 4097666 Author: Stephan Ewen <se...@apache.org> Authored: Thu Nov 19 19:05:47 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Nov 26 17:16:29 2015 +0100 ---------------------------------------------------------------------- .../checkpoint/CheckpointCoordinator.java | 402 ++++++++++++------- .../CheckpointCoordinatorDeActivator.java | 11 +- .../runtime/checkpoint/PendingCheckpoint.java | 10 +- .../runtime/executiongraph/ExecutionGraph.java | 22 +- .../jobgraph/tasks/JobSnapshottingSettings.java | 55 ++- .../flink/runtime/jobmanager/JobManager.scala | 2 + .../checkpoint/CheckpointCoordinatorTest.java | 357 +++++++++++++++- .../checkpoint/CheckpointStateRestoreTest.java | 6 +- .../checkpoint/CoordinatorShutdownTest.java | 6 +- .../api/environment/CheckpointConfig.java | 221 ++++++++++ .../environment/StreamExecutionEnvironment.java | 61 +-- .../flink/streaming/api/graph/StreamGraph.java | 105 ++--- .../api/graph/StreamGraphGenerator.java | 11 - .../api/graph/StreamingJobGraphGenerator.java | 36 +- .../api/scala/StreamExecutionEnvironment.scala | 7 + 15 files changed, 990 insertions(+), 322 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 09dd2d9..454b88a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.actor.Props; + import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.Execution; @@ -34,6 +35,7 @@ import org.apache.flink.runtime.jobmanager.RecoveryMode; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete; import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +48,6 @@ import java.util.Map; import java.util.Timer; import java.util.TimerTask; import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -63,11 +64,12 @@ import static com.google.common.base.Preconditions.checkNotNull; */ public class CheckpointCoordinator { - private static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class); + static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class); /** The number of recent checkpoints whose IDs are remembered */ private static final int NUM_GHOST_CHECKPOINT_IDS = 16; + /** Coordinator-wide lock to safeguard the checkpoint updates */ private final Object lock = new Object(); @@ -83,35 +85,58 @@ public class CheckpointCoordinator { /** Tasks who need to be sent a message when a checkpoint is confirmed */ private final ExecutionVertex[] tasksToCommitTo; + /** Map from checkpoint ID to the pending checkpoint */ private final Map<Long, PendingCheckpoint> pendingCheckpoints; - /** - * Completed checkpoints. Implementations can be blocking. Make sure calls to methods - * accessing this don't block the job manager actor and run asynchronously. - */ + /** Completed checkpoints. Implementations can be blocking. Make sure calls to methods + * accessing this don't block the job manager actor and run asynchronously. */ private final CompletedCheckpointStore completedCheckpointStore; + /** A list of recent checkpoint IDs, to identify late messages (vs invalid ones) */ private final ArrayDeque<Long> recentPendingCheckpoints; - /** - * Checkpoint ID counter to ensure ascending IDs. In case of job manager failures, these - * need to be ascending across job managers. - */ + /** Checkpoint ID counter to ensure ascending IDs. In case of job manager failures, these + * need to be ascending across job managers. */ private final CheckpointIDCounter checkpointIdCounter; - private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(); + /** Class loader used to deserialize the state handles (as they may be user-defined) */ + private final ClassLoader userClassLoader; - /** The timer that handles the checkpoint timeouts and triggers periodic checkpoints */ - private final Timer timer; + /** The base checkpoint interval. Actual trigger time may be affected by the + * max concurrent checkpoints and minimum-pause values */ + private final long baseInterval; + /** The max time (in ms) that a checkpoint may take */ private final long checkpointTimeout; + + /** The min time(in ms) to delay after a checkpoint could be triggered. Allows to + * enforce minimum processing time between checkpoint attempts */ + private final long minPauseBetweenCheckpoints; + + /** The maximum number of checkpoints that may be in progress at the same time */ + private final int maxConcurrentCheckpointAttempts; - private TimerTask periodicScheduler; + /** The timer that handles the checkpoint timeouts and triggers periodic checkpoints */ + private final Timer timer; + /** Actor that receives status updates from the execution graph this coordinator works for */ private ActorGateway jobStatusListener; + + /** The number of consecutive failed trigger attempts */ + private int numUnsuccessfulCheckpointsTriggers; + + + private ScheduledTrigger currentPeriodicTrigger; + + /** Flag whether a triggered checkpoint should immediately schedule the next checkpoint. + * Non-volatile, because only accessed in synchronized scope */ + private boolean periodicScheduling; - private ClassLoader userClassLoader; + /** Flag whether a trigger request could not be handled immediately. Non-volatile, because only + * accessed in synchronized scope */ + private boolean triggerRequestQueued; + /** Flag marking the coordinator as shut down (not accepting any messages any more) */ private volatile boolean shutdown; /** Shutdown hook thread to clean up state handles. */ @@ -121,6 +146,7 @@ public class CheckpointCoordinator { public CheckpointCoordinator( JobID job, + long baseInterval, long checkpointTimeout, ExecutionVertex[] tasksToTrigger, ExecutionVertex[] tasksToWaitFor, @@ -130,11 +156,36 @@ public class CheckpointCoordinator { CompletedCheckpointStore completedCheckpointStore, RecoveryMode recoveryMode) throws Exception { + this(job, baseInterval, checkpointTimeout, 0L, Integer.MAX_VALUE, + tasksToTrigger, tasksToWaitFor, tasksToCommitTo, + userClassLoader, checkpointIDCounter, completedCheckpointStore, recoveryMode); + } + + public CheckpointCoordinator( + JobID job, + long baseInterval, + long checkpointTimeout, + long minPauseBetweenCheckpoints, + int maxConcurrentCheckpointAttempts, + ExecutionVertex[] tasksToTrigger, + ExecutionVertex[] tasksToWaitFor, + ExecutionVertex[] tasksToCommitTo, + ClassLoader userClassLoader, + CheckpointIDCounter checkpointIDCounter, + CompletedCheckpointStore completedCheckpointStore, + RecoveryMode recoveryMode) throws Exception { + // Sanity check + checkArgument(baseInterval > 0, "Checkpoint timeout must be larger than zero"); checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must be larger than zero"); + checkArgument(minPauseBetweenCheckpoints >= 0, "minPauseBetweenCheckpoints must be >= 0"); + checkArgument(maxConcurrentCheckpointAttempts >= 1, "maxConcurrentCheckpointAttempts must be >= 1"); this.job = checkNotNull(job); + this.baseInterval = baseInterval; this.checkpointTimeout = checkpointTimeout; + this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints; + this.maxConcurrentCheckpointAttempts = maxConcurrentCheckpointAttempts; this.tasksToTrigger = checkNotNull(tasksToTrigger); this.tasksToWaitFor = checkNotNull(tasksToWaitFor); this.tasksToCommitTo = checkNotNull(tasksToCommitTo); @@ -143,10 +194,11 @@ public class CheckpointCoordinator { this.recentPendingCheckpoints = new ArrayDeque<Long>(NUM_GHOST_CHECKPOINT_IDS); this.userClassLoader = userClassLoader; this.checkpointIdCounter = checkNotNull(checkpointIDCounter); + checkpointIDCounter.start(); this.timer = new Timer("Checkpoint Timer", true); - + if (recoveryMode == RecoveryMode.STANDALONE) { // Add shutdown hook to clean up state handles when no checkpoint recovery is // possible. In case of another configured recovery mode, the checkpoints need to be @@ -158,7 +210,7 @@ public class CheckpointCoordinator { CheckpointCoordinator.this.shutdown(); } catch (Throwable t) { - LOG.error("Error during shutdown of checkpoint coordniator via " + + LOG.error("Error during shutdown of checkpoint coordinator via " + "JVM shutdown hook: " + t.getMessage(), t); } } @@ -197,7 +249,10 @@ public class CheckpointCoordinator { shutdown = true; LOG.info("Stopping checkpoint coordinator for job " + job); - // shut down the thread that handles the timeouts + periodicScheduling = false; + triggerRequestQueued = false; + + // shut down the thread that handles the timeouts and pending triggers timer.cancel(); // make sure that the actor does not linger @@ -206,17 +261,11 @@ public class CheckpointCoordinator { jobStatusListener = null; } - // the scheduling thread needs also to go away - if (periodicScheduler != null) { - periodicScheduler.cancel(); - periodicScheduler = null; - } - checkpointIdCounter.stop(); // clear and discard all pending checkpoints for (PendingCheckpoint pending : pendingCheckpoints.values()) { - pending.discard(userClassLoader, true); + pending.discard(userClassLoader); } pendingCheckpoints.clear(); @@ -235,7 +284,7 @@ public class CheckpointCoordinator { // race, JVM is in shutdown already, we can safely ignore this } catch (Throwable t) { - LOG.warn("Error unregistering checkpoint cooordniator shutdown hook.", t); + LOG.warn("Error unregistering checkpoint coordinator shutdown hook.", t); } } } @@ -251,93 +300,136 @@ public class CheckpointCoordinator { // -------------------------------------------------------------------------------------------- /** - * Triggers a new checkpoint and uses the current system time as the - * checkpoint time. - */ - public void triggerCheckpoint() throws Exception { - triggerCheckpoint(System.currentTimeMillis()); - } - - /** * Triggers a new checkpoint and uses the given timestamp as the checkpoint * timestamp. * * @param timestamp The timestamp for the checkpoint. */ - public boolean triggerCheckpoint(final long timestamp) throws Exception { - if (shutdown) { - LOG.error("Cannot trigger checkpoint, checkpoint coordinator has been shutdown."); - return false; - } - - final long checkpointID = checkpointIdCounter.getAndIncrement(); - LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp); - - try { - // first check if all tasks that we need to trigger are running. - // if not, abort the checkpoint - ExecutionAttemptID[] triggerIDs = new ExecutionAttemptID[tasksToTrigger.length]; - for (int i = 0; i < tasksToTrigger.length; i++) { - Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt(); - if (ee != null && ee.getState() == ExecutionState.RUNNING) { - triggerIDs[i] = ee.getAttemptId(); - } else { - LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.", - tasksToTrigger[i].getSimpleName()); - return false; - } + public boolean triggerCheckpoint(long timestamp) throws Exception { + // make some eager pre-checks + synchronized (lock) { + // abort if the coordinator has been shutdown in the meantime + if (shutdown) { + return false; + } + + // sanity check: there should never be more than one trigger request queued + if (triggerRequestQueued) { + LOG.warn("Trying to trigger another checkpoint while one was queued already"); + return false; } - // next, check if all tasks that need to acknowledge the checkpoint are running. - // if not, abort the checkpoint - Map<ExecutionAttemptID, ExecutionVertex> ackTasks = - new HashMap<ExecutionAttemptID, ExecutionVertex>(tasksToWaitFor.length); - - for (ExecutionVertex ev : tasksToWaitFor) { - Execution ee = ev.getCurrentExecutionAttempt(); - if (ee != null) { - ackTasks.put(ee.getAttemptId(), ev); - } else { - LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.", - ev.getSimpleName()); - return false; + // if too many checkpoints are currently in progress, we need to mark that a request is queued + if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { + triggerRequestQueued = true; + if (currentPeriodicTrigger != null) { + currentPeriodicTrigger.cancel(); + currentPeriodicTrigger = null; } + return false; } - - // register a new pending checkpoint. this makes sure we can properly receive acknowledgements - final PendingCheckpoint checkpoint = new PendingCheckpoint(job, checkpointID, timestamp, ackTasks); + } - // schedule the timer that will clean up the expired checkpoints - TimerTask canceller = new TimerTask() { - @Override - public void run() { - try { - synchronized (lock) { - // only do the work if the checkpoint is not discarded anyways - // note that checkpoint completion discards the pending checkpoint object - if (!checkpoint.isDiscarded()) { - LOG.info("Checkpoint " + checkpointID + " expired before completing."); - - checkpoint.discard(userClassLoader, true); - - pendingCheckpoints.remove(checkpointID); - rememberRecentCheckpointId(checkpointID); - } + // first check if all tasks that we need to trigger are running. + // if not, abort the checkpoint + ExecutionAttemptID[] triggerIDs = new ExecutionAttemptID[tasksToTrigger.length]; + for (int i = 0; i < tasksToTrigger.length; i++) { + Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt(); + if (ee != null && ee.getState() == ExecutionState.RUNNING) { + triggerIDs[i] = ee.getAttemptId(); + } else { + LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.", + tasksToTrigger[i].getSimpleName()); + return false; + } + } + + // next, check if all tasks that need to acknowledge the checkpoint are running. + // if not, abort the checkpoint + Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length); + + for (ExecutionVertex ev : tasksToWaitFor) { + Execution ee = ev.getCurrentExecutionAttempt(); + if (ee != null) { + ackTasks.put(ee.getAttemptId(), ev); + } else { + LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.", + ev.getSimpleName()); + return false; + } + } + + // we will actually trigger this checkpoint! + + final long checkpointID; + try { + // this must happen outside the locked scope, because it communicates + // with external services (in HA mode) and may block for a while. + checkpointID = checkpointIdCounter.getAndIncrement(); + } + catch (Throwable t) { + int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers; + LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t); + return false; + } + + LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp); + + final PendingCheckpoint checkpoint = new PendingCheckpoint(job, checkpointID, timestamp, ackTasks); + + // schedule the timer that will clean up the expired checkpoints + TimerTask canceller = new TimerTask() { + @Override + public void run() { + try { + synchronized (lock) { + // only do the work if the checkpoint is not discarded anyways + // note that checkpoint completion discards the pending checkpoint object + if (!checkpoint.isDiscarded()) { + LOG.info("Checkpoint " + checkpointID + " expired before completing."); + + checkpoint.discard(userClassLoader); + pendingCheckpoints.remove(checkpointID); + rememberRecentCheckpointId(checkpointID); + + triggerQueuedRequests(); } } - catch (Throwable t) { - LOG.error("Exception while handling checkpoint timeout", t); - } } - }; - + catch (Throwable t) { + LOG.error("Exception while handling checkpoint timeout", t); + } + } + }; + + try { + // re-acquire the lock synchronized (lock) { + // since we released the lock in the meantime, we need to re-check + // that the conditions still hold. this is clumsy, but it allows us to + // release the lock in the meantime while calls to external services are + // blocking progress, and still gives us early checks that skip work + // if no checkpoint can happen anyways if (shutdown) { - throw new IllegalStateException("Checkpoint coordinator has been shutdown."); + return false; + } + else if (triggerRequestQueued) { + LOG.warn("Trying to trigger another checkpoint while one was queued already"); + return false; + } + else if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { + triggerRequestQueued = true; + if (currentPeriodicTrigger != null) { + currentPeriodicTrigger.cancel(); + currentPeriodicTrigger = null; + } + return false; } + pendingCheckpoints.put(checkpointID, checkpoint); timer.schedule(canceller, checkpointTimeout); } + // end of lock scope // send the messages to the tasks that trigger their checkpoint for (int i = 0; i < tasksToTrigger.length; i++) { @@ -345,21 +437,21 @@ public class CheckpointCoordinator { TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp); tasksToTrigger[i].sendMessageToCurrentExecution(message, id); } - - numUnsuccessfulCheckpointsTriggers.set(0); + + numUnsuccessfulCheckpointsTriggers = 0; return true; } catch (Throwable t) { - int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet(); - LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t); - + // guard the map against concurrent modifications synchronized (lock) { - PendingCheckpoint checkpoint = pendingCheckpoints.remove(checkpointID); - if (checkpoint != null && !checkpoint.isDiscarded()) { - checkpoint.discard(userClassLoader, true); - } + pendingCheckpoints.remove(checkpointID); } + int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers; + LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t); + if (!checkpoint.isDiscarded()) { + checkpoint.discard(userClassLoader); + } return false; } } @@ -401,6 +493,8 @@ public class CheckpointCoordinator { rememberRecentCheckpointId(checkpointId); dropSubsumedCheckpoints(completed.getTimestamp()); + + triggerQueuedRequests(); } } else { @@ -455,13 +549,38 @@ public class CheckpointCoordinator { if (p.getCheckpointTimestamp() < timestamp) { rememberRecentCheckpointId(p.getCheckpointId()); - p.discard(userClassLoader, true); + p.discard(userClassLoader); entries.remove(); } } } + /** + * Triggers the queued request, if there is one. + * + * <p>NOTE: The caller of this method must hold the lock when invoking the method! + */ + private void triggerQueuedRequests() throws Exception { + if (triggerRequestQueued) { + triggerRequestQueued = false; + + // trigger the checkpoint from the trigger timer, to finish the work of this thread before + // starting with the next checkpoint + ScheduledTrigger trigger = new ScheduledTrigger(); + if (periodicScheduling) { + if (currentPeriodicTrigger != null) { + currentPeriodicTrigger.cancel(); + } + currentPeriodicTrigger = trigger; + timer.scheduleAtFixedRate(trigger, 0L, baseInterval); + } + else { + timer.schedule(trigger, 0L); + } + } + } + // -------------------------------------------------------------------------------------------- // Checkpoint State Restoring // -------------------------------------------------------------------------------------------- @@ -557,63 +676,74 @@ public class CheckpointCoordinator { // Periodic scheduling of checkpoints // -------------------------------------------------------------------------------------------- - public void startPeriodicCheckpointScheduler(long interval) { + public void startCheckpointScheduler() { synchronized (lock) { if (shutdown) { throw new IllegalArgumentException("Checkpoint coordinator is shut down"); } - // cancel any previous scheduler - stopPeriodicCheckpointScheduler(); + // make sure all prior timers are cancelled + stopCheckpointScheduler(); - // start a new scheduler - periodicScheduler = new TimerTask() { - @Override - public void run() { - try { - triggerCheckpoint(); - } - catch (Exception e) { - LOG.error("Exception while triggering checkpoint", e); - } - } - }; - timer.scheduleAtFixedRate(periodicScheduler, interval, interval); + periodicScheduling = true; + currentPeriodicTrigger = new ScheduledTrigger(); + timer.scheduleAtFixedRate(currentPeriodicTrigger, baseInterval, baseInterval); } } - public void stopPeriodicCheckpointScheduler() { + public void stopCheckpointScheduler() { synchronized (lock) { - if (periodicScheduler != null) { - periodicScheduler.cancel(); - periodicScheduler = null; + triggerRequestQueued = false; + periodicScheduling = false; + + if (currentPeriodicTrigger != null) { + currentPeriodicTrigger.cancel(); + currentPeriodicTrigger = null; + } + + for (PendingCheckpoint p : pendingCheckpoints.values()) { + p.discard(userClassLoader); } + pendingCheckpoints.clear(); + + numUnsuccessfulCheckpointsTriggers = 0; } } - public ActorGateway createJobStatusListener( - ActorSystem actorSystem, - long checkpointInterval, - UUID leaderSessionID) { + // ------------------------------------------------------------------------ + // job status listener that schedules / cancels periodic checkpoints + // ------------------------------------------------------------------------ + + public ActorGateway createActivatorDeactivator(ActorSystem actorSystem, UUID leaderSessionID) { + synchronized (lock) { if (shutdown) { throw new IllegalArgumentException("Checkpoint coordinator is shut down"); } if (jobStatusListener == null) { - Props props = Props.create( - CheckpointCoordinatorDeActivator.class, - this, - checkpointInterval, - leaderSessionID); + Props props = Props.create(CheckpointCoordinatorDeActivator.class, this, leaderSessionID); // wrap the ActorRef in a AkkaActorGateway to support message decoration - jobStatusListener = new AkkaActorGateway( - actorSystem.actorOf(props), - leaderSessionID); + jobStatusListener = new AkkaActorGateway(actorSystem.actorOf(props), leaderSessionID); } return jobStatusListener; } } + + // ------------------------------------------------------------------------ + + private class ScheduledTrigger extends TimerTask { + + @Override + public void run() { + try { + triggerCheckpoint(System.currentTimeMillis()); + } + catch (Exception e) { + LOG.error("Exception while triggering checkpoint", e); + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java index 7e32b72..8bdab7f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java @@ -32,19 +32,15 @@ import java.util.UUID; public class CheckpointCoordinatorDeActivator extends FlinkUntypedActor { private final CheckpointCoordinator coordinator; - private final long interval; private final UUID leaderSessionID; public CheckpointCoordinatorDeActivator( CheckpointCoordinator coordinator, - long interval, UUID leaderSessionID) { LOG.info("Create CheckpointCoordinatorDeActivator"); this.coordinator = Preconditions.checkNotNull(coordinator, "The checkpointCoordinator must not be null."); - - this.interval = interval; this.leaderSessionID = leaderSessionID; } @@ -55,11 +51,10 @@ public class CheckpointCoordinatorDeActivator extends FlinkUntypedActor { if (status == JobStatus.RUNNING) { // start the checkpoint scheduler - coordinator.startPeriodicCheckpointScheduler(interval); - } - else { + coordinator.startCheckpointScheduler(); + } else { // anything else should stop the trigger for now - coordinator.stopPeriodicCheckpointScheduler(); + coordinator.stopCheckpointScheduler(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 19c65d4..b94e5bb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -117,7 +117,7 @@ public class PendingCheckpoint { if (notYetAcknowledgedTasks.isEmpty()) { CompletedCheckpoint completed = new CompletedCheckpoint(jobId, checkpointId, checkpointTimestamp, new ArrayList<StateForTask>(collectedStates)); - discard(null, false); + dispose(null, false); return completed; } @@ -150,11 +150,15 @@ public class PendingCheckpoint { /** * Discards the pending checkpoint, releasing all held resources. */ - public void discard(ClassLoader userClassLoader, boolean discardStateHandle) { + public void discard(ClassLoader userClassLoader) { + dispose(userClassLoader, true); + } + + private void dispose(ClassLoader userClassLoader, boolean releaseState) { synchronized (lock) { discarded = true; numAcknowledgedTasks = -1; - if (discardStateHandle) { + if (releaseState) { for (StateForTask state : collectedStates) { state.discard(userClassLoader); } http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index d10aac1..9218fe4 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -182,9 +182,6 @@ public class ExecutionGraph implements Serializable { * from results than need to be materialized. */ private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES; - /** Flag that indicate whether the executed dataflow should be periodically snapshotted */ - private boolean snapshotCheckpointsEnabled; - /** Flag to indicate whether the Graph has been archived */ private boolean isArchived = false; @@ -341,9 +338,12 @@ public class ExecutionGraph implements Serializable { public boolean isArchived() { return isArchived; } + public void enableSnapshotCheckpointing( long interval, long checkpointTimeout, + long minPauseBetweenCheckpoints, + int maxConcurrentCheckpoints, List<ExecutionJobVertex> verticesToTrigger, List<ExecutionJobVertex> verticesToWaitFor, List<ExecutionJobVertex> verticesToCommitTo, @@ -368,11 +368,13 @@ public class ExecutionGraph implements Serializable { // disable to make sure existing checkpoint coordinators are cleared disableSnaphotCheckpointing(); - // create the coordinator that triggers and commits checkpoints and holds the state - snapshotCheckpointsEnabled = true; + // create the coordinator that triggers and commits checkpoints and holds the state checkpointCoordinator = new CheckpointCoordinator( jobID, + interval, checkpointTimeout, + minPauseBetweenCheckpoints, + maxConcurrentCheckpoints, tasksToTrigger, tasksToWaitFor, tasksToCommitTo, @@ -384,10 +386,7 @@ public class ExecutionGraph implements Serializable { // the periodic checkpoint scheduler is activated and deactivated as a result of // job status changes (running -> on, all other states -> off) registerJobStatusListener( - checkpointCoordinator.createJobStatusListener( - actorSystem, - interval, - leaderSessionID)); + checkpointCoordinator.createActivatorDeactivator(actorSystem, leaderSessionID)); } /** @@ -401,16 +400,11 @@ public class ExecutionGraph implements Serializable { throw new IllegalStateException("Job must be in CREATED state"); } - snapshotCheckpointsEnabled = false; if (checkpointCoordinator != null) { checkpointCoordinator.shutdown(); checkpointCoordinator = null; } } - - public boolean isSnapshotCheckpointsEnabled() { - return snapshotCheckpointsEnabled; - } public CheckpointCoordinator getCheckpointCoordinator() { return checkpointCoordinator; http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java index 86c9b60..d58be52 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java @@ -22,17 +22,17 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import java.util.List; +import static java.util.Objects.requireNonNull; + /** - * The JobSnapshottingSettings are attached to a JobGraph and describe the settings - * for the asynchronous snapshotting of the JobGraph, such as interval, and which vertices + * The JobCheckpointingSettings are attached to a JobGraph and describe the settings + * for the asynchronous checkpoints of the JobGraph, such as interval, and which vertices * need to participate. */ public class JobSnapshottingSettings implements java.io.Serializable{ private static final long serialVersionUID = -2593319571078198180L; - /** The default time in which pending checkpoints need to be acknowledged before timing out */ - public static final long DEFAULT_SNAPSHOT_TIMEOUT = 10 * 60 * 1000; // 10 minutes private final List<JobVertexID> verticesToTrigger; @@ -43,26 +43,32 @@ public class JobSnapshottingSettings implements java.io.Serializable{ private final long checkpointInterval; private final long checkpointTimeout; - - - public JobSnapshottingSettings(List<JobVertexID> verticesToTrigger, - List<JobVertexID> verticesToAcknowledge, - List<JobVertexID> verticesToConfirm, - long checkpointInterval) - { - this(verticesToTrigger, verticesToAcknowledge, verticesToConfirm, checkpointInterval, DEFAULT_SNAPSHOT_TIMEOUT); - } + + private final long minPauseBetweenCheckpoints; + + private final int maxConcurrentCheckpoints; + public JobSnapshottingSettings(List<JobVertexID> verticesToTrigger, List<JobVertexID> verticesToAcknowledge, List<JobVertexID> verticesToConfirm, - long checkpointInterval, long checkpointTimeout) + long checkpointInterval, long checkpointTimeout, + long minPauseBetweenCheckpoints, int maxConcurrentCheckpoints) { - this.verticesToTrigger = verticesToTrigger; - this.verticesToAcknowledge = verticesToAcknowledge; - this.verticesToConfirm = verticesToConfirm; + // sanity checks + if (checkpointInterval < 1 || checkpointTimeout < 1 || + minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1) + { + throw new IllegalArgumentException(); + } + + this.verticesToTrigger = requireNonNull(verticesToTrigger); + this.verticesToAcknowledge = requireNonNull(verticesToAcknowledge); + this.verticesToConfirm = requireNonNull(verticesToConfirm); this.checkpointInterval = checkpointInterval; this.checkpointTimeout = checkpointTimeout; + this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints; + this.maxConcurrentCheckpoints = maxConcurrentCheckpoints; } // -------------------------------------------------------------------------------------------- @@ -87,11 +93,22 @@ public class JobSnapshottingSettings implements java.io.Serializable{ return checkpointTimeout; } + public long getMinPauseBetweenCheckpoints() { + return minPauseBetweenCheckpoints; + } + + public int getMaxConcurrentCheckpoints() { + return maxConcurrentCheckpoints; + } + // -------------------------------------------------------------------------------------------- @Override public String toString() { - return String.format("SnapshotSettings: interval=%d, timeout=%d, trigger=%s, ack=%s, commit=%s", - checkpointInterval, checkpointTimeout, verticesToTrigger, verticesToAcknowledge, verticesToConfirm); + return String.format("SnapshotSettings: interval=%d, timeout=%d, pause-between=%d, " + + "maxConcurrent=%d, trigger=%s, ack=%s, commit=%s", + checkpointInterval, checkpointTimeout, + minPauseBetweenCheckpoints, maxConcurrentCheckpoints, + verticesToTrigger, verticesToAcknowledge, verticesToConfirm); } } http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index c4d0fbb..8cbb13a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -913,6 +913,8 @@ class JobManager( executionGraph.enableSnapshotCheckpointing( snapshotSettings.getCheckpointInterval, snapshotSettings.getCheckpointTimeout, + snapshotSettings.getMinPauseBetweenCheckpoints, + snapshotSettings.getMaxConcurrentCheckpoints, triggerVertices, ackVertices, confirmVertices, http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index f6ee5c5..cd52fd4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -28,10 +28,17 @@ import org.apache.flink.runtime.jobmanager.RecoveryMode; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete; import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint; + import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.Serializable; import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -39,6 +46,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -70,7 +78,7 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, 600000, + jid, 600000, 600000, new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, new ExecutionVertex[] { ackVertex1, ackVertex2 }, new ExecutionVertex[] {}, cl, @@ -116,7 +124,7 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, 600000, + jid, 600000, 600000, new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, new ExecutionVertex[] { ackVertex1, ackVertex2 }, new ExecutionVertex[] {}, cl, @@ -160,7 +168,7 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, 600000, + jid, 600000, 600000, new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, new ExecutionVertex[] { ackVertex1, ackVertex2 }, new ExecutionVertex[] {}, cl, new StandaloneCheckpointIDCounter(), new @@ -199,7 +207,7 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, 600000, + jid, 600000, 600000, new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, cl, @@ -343,7 +351,7 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, 600000, + jid, 600000, 600000, new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 }, new ExecutionVertex[] { commitVertex }, cl, @@ -472,7 +480,7 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, 600000, + jid, 600000, 600000, new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 }, new ExecutionVertex[] { commitVertex }, cl, @@ -587,7 +595,7 @@ public class CheckpointCoordinatorTest { // the timeout for the checkpoint is a 200 milliseconds CheckpointCoordinator coord = new CheckpointCoordinator( - jid, 200, + jid, 600000, 200, new ExecutionVertex[] { triggerVertex }, new ExecutionVertex[] { ackVertex1, ackVertex2 }, new ExecutionVertex[] { commitVertex }, cl, @@ -649,7 +657,7 @@ public class CheckpointCoordinatorTest { ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID); CheckpointCoordinator coord = new CheckpointCoordinator( - jid, 200000, + jid, 200000, 200000, new ExecutionVertex[] { triggerVertex }, new ExecutionVertex[] { ackVertex1, ackVertex2 }, new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter @@ -680,14 +688,343 @@ public class CheckpointCoordinatorTest { } } + @Test + public void testPeriodicTriggering() { + try { + final JobID jid = new JobID(); + final long start = System.currentTimeMillis(); + + // create some mock execution vertices and trigger some checkpoint + + final ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID(); + final ExecutionAttemptID ackAttemptID = new ExecutionAttemptID(); + final ExecutionAttemptID commitAttemptID = new ExecutionAttemptID(); + + ExecutionVertex triggerVertex = mockExecutionVertex(triggerAttemptID); + ExecutionVertex ackVertex = mockExecutionVertex(ackAttemptID); + ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID); + + final AtomicInteger numCalls = new AtomicInteger(); + + doAnswer(new Answer<Void>() { + + private long lastId = -1; + private long lastTs = -1; + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + TriggerCheckpoint message = (TriggerCheckpoint) invocation.getArguments()[0]; + long id = message.getCheckpointId(); + long ts = message.getTimestamp(); + + assertTrue(id > lastId); + assertTrue(ts >= lastTs); + assertTrue(ts >= start); + + lastId = id; + lastTs = ts; + numCalls.incrementAndGet(); + return null; + } + }).when(triggerVertex).sendMessageToCurrentExecution(any(Serializable.class), any(ExecutionAttemptID.class)); + + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + 10, // periodic interval is 10 ms + 200000, // timeout is very long (200 s) + new ExecutionVertex[] { triggerVertex }, + new ExecutionVertex[] { ackVertex }, + new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter + (), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE); + + + coord.startCheckpointScheduler(); + + long timeout = System.currentTimeMillis() + 60000; + do { + Thread.sleep(20); + } + while (timeout > System.currentTimeMillis() && numCalls.get() < 5); + assertTrue(numCalls.get() >= 5); + + coord.stopCheckpointScheduler(); + + + // for 400 ms, no further calls may come. + // there may be the case that one trigger was fired and about to + // acquire the lock, such that after cancelling it will still do + // the remainder of its work + int numCallsSoFar = numCalls.get(); + Thread.sleep(400); + assertTrue(numCallsSoFar == numCalls.get() || + numCallsSoFar+1 == numCalls.get()); + + // start another sequence of periodic scheduling + numCalls.set(0); + coord.startCheckpointScheduler(); + + timeout = System.currentTimeMillis() + 60000; + do { + Thread.sleep(20); + } + while (timeout > System.currentTimeMillis() && numCalls.get() < 5); + assertTrue(numCalls.get() >= 5); + + coord.stopCheckpointScheduler(); + + // for 400 ms, no further calls may come + // there may be the case that one trigger was fired and about to + // acquire the lock, such that after cancelling it will still do + // the remainder of its work + numCallsSoFar = numCalls.get(); + Thread.sleep(400); + assertTrue(numCallsSoFar == numCalls.get() || + numCallsSoFar + 1 == numCalls.get()); + + coord.shutdown(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testMaxConcurrentAttempts1() { + testMaxConcurrentAttemps(1); + } + + @Test + public void testMaxConcurrentAttempts2() { + testMaxConcurrentAttemps(2); + } + + @Test + public void testMaxConcurrentAttempts5() { + testMaxConcurrentAttemps(5); + } + + private void testMaxConcurrentAttemps(int maxConcurrentAttempts) { + try { + final JobID jid = new JobID(); + + // create some mock execution vertices and trigger some checkpoint + final ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID(); + final ExecutionAttemptID ackAttemptID = new ExecutionAttemptID(); + final ExecutionAttemptID commitAttemptID = new ExecutionAttemptID(); + + ExecutionVertex triggerVertex = mockExecutionVertex(triggerAttemptID); + ExecutionVertex ackVertex = mockExecutionVertex(ackAttemptID); + ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID); + + final AtomicInteger numCalls = new AtomicInteger(); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + numCalls.incrementAndGet(); + return null; + } + }).when(triggerVertex).sendMessageToCurrentExecution(any(Serializable.class), any(ExecutionAttemptID.class)); + + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + 10, // periodic interval is 10 ms + 200000, // timeout is very long (200 s) + 0L, // no extra delay + maxConcurrentAttempts, + new ExecutionVertex[] { triggerVertex }, + new ExecutionVertex[] { ackVertex }, + new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter + (), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE); + + + coord.startCheckpointScheduler(); + + // after a while, there should be exactly as many checkpoints + // as concurrently permitted + long now = System.currentTimeMillis(); + long timeout = now + 60000; + long minDuration = now + 100; + do { + Thread.sleep(20); + } + while ((now = System.currentTimeMillis()) < minDuration || + (numCalls.get() < maxConcurrentAttempts && now < timeout)); + + assertEquals(maxConcurrentAttempts, numCalls.get()); + + verify(triggerVertex, times(maxConcurrentAttempts)) + .sendMessageToCurrentExecution(any(TriggerCheckpoint.class), eq(triggerAttemptID)); + + // now, once we acknowledge one checkpoint, it should trigger the next one + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 1L)); + + // this should have immediately triggered a new checkpoint + now = System.currentTimeMillis(); + timeout = now + 60000; + do { + Thread.sleep(20); + } + while (numCalls.get() < maxConcurrentAttempts + 1 && now < timeout); + + assertEquals(maxConcurrentAttempts + 1, numCalls.get()); + + // no further checkpoints should happen + Thread.sleep(200); + assertEquals(maxConcurrentAttempts + 1, numCalls.get()); + + coord.shutdown(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testMaxConcurrentAttempsWithSubsumption() { + try { + final int maxConcurrentAttempts = 2; + final JobID jid = new JobID(); + + // create some mock execution vertices and trigger some checkpoint + final ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID(); + final ExecutionAttemptID ackAttemptID = new ExecutionAttemptID(); + final ExecutionAttemptID commitAttemptID = new ExecutionAttemptID(); + + ExecutionVertex triggerVertex = mockExecutionVertex(triggerAttemptID); + ExecutionVertex ackVertex = mockExecutionVertex(ackAttemptID); + ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID); + + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + 10, // periodic interval is 10 ms + 200000, // timeout is very long (200 s) + 0L, // no extra delay + maxConcurrentAttempts, // max two concurrent checkpoints + new ExecutionVertex[] { triggerVertex }, + new ExecutionVertex[] { ackVertex }, + new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter + (), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE); + + + coord.startCheckpointScheduler(); + + // after a while, there should be exactly as many checkpoints + // as concurrently permitted + long now = System.currentTimeMillis(); + long timeout = now + 60000; + long minDuration = now + 100; + do { + Thread.sleep(20); + } + while ((now = System.currentTimeMillis()) < minDuration || + (coord.getNumberOfPendingCheckpoints() < maxConcurrentAttempts && now < timeout)); + + // validate that the pending checkpoints are there + assertEquals(maxConcurrentAttempts, coord.getNumberOfPendingCheckpoints()); + assertNotNull(coord.getPendingCheckpoints().get(1L)); + assertNotNull(coord.getPendingCheckpoints().get(2L)); + + // now we acknowledge the second checkpoint, which should subsume the first checkpoint + // and allow two more checkpoints to be triggered + // now, once we acknowledge one checkpoint, it should trigger the next one + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 2L)); + + // after a while, there should be the new checkpoints + final long newTimeout = System.currentTimeMillis() + 60000; + do { + Thread.sleep(20); + } + while (coord.getPendingCheckpoints().get(4L) == null && + System.currentTimeMillis() < newTimeout); + + // do the final check + assertEquals(maxConcurrentAttempts, coord.getNumberOfPendingCheckpoints()); + assertNotNull(coord.getPendingCheckpoints().get(3L)); + assertNotNull(coord.getPendingCheckpoints().get(4L)); + + coord.shutdown(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testPeriodicSchedulingWithInactiveTasks() { + try { + final JobID jid = new JobID(); + + // create some mock execution vertices and trigger some checkpoint + final ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID(); + final ExecutionAttemptID ackAttemptID = new ExecutionAttemptID(); + final ExecutionAttemptID commitAttemptID = new ExecutionAttemptID(); + + ExecutionVertex triggerVertex = mockExecutionVertex(triggerAttemptID); + ExecutionVertex ackVertex = mockExecutionVertex(ackAttemptID); + ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID); + + final AtomicReference<ExecutionState> currentState = new AtomicReference<>(ExecutionState.CREATED); + when(triggerVertex.getCurrentExecutionAttempt().getState()).thenAnswer( + new Answer<ExecutionState>() { + @Override + public ExecutionState answer(InvocationOnMock invocation){ + return currentState.get(); + } + }); + + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + 10, // periodic interval is 10 ms + 200000, // timeout is very long (200 s) + 0L, // no extra delay + 2, // max two concurrent checkpoints + new ExecutionVertex[] { triggerVertex }, + new ExecutionVertex[] { ackVertex }, + new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE); + + coord.startCheckpointScheduler(); + + // no checkpoint should have started so far + Thread.sleep(200); + assertEquals(0, coord.getNumberOfPendingCheckpoints()); + + // now move the state to RUNNING + currentState.set(ExecutionState.RUNNING); + + // the coordinator should start checkpointing now + final long timeout = System.currentTimeMillis() + 10000; + do { + Thread.sleep(20); + } + while (System.currentTimeMillis() < timeout && + coord.getNumberOfPendingCheckpoints() == 0); + + assertTrue(coord.getNumberOfPendingCheckpoints() > 0); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID) { return mockExecutionVertex(attemptID, ExecutionState.RUNNING); } - private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID, ExecutionState state) { + private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID, + ExecutionState state, ExecutionState ... successiveStates) { final Execution exec = mock(Execution.class); when(exec.getAttemptId()).thenReturn(attemptID); - when(exec.getState()).thenReturn(state); + when(exec.getState()).thenReturn(state, successiveStates); ExecutionVertex vertex = mock(ExecutionVertex.class); when(vertex.getJobvertexId()).thenReturn(new JobVertexID()); http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java index 7b2c2d4..bec04bd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java @@ -80,7 +80,7 @@ public class CheckpointStateRestoreTest { map.put(statelessId, stateless); - CheckpointCoordinator coord = new CheckpointCoordinator(jid, 200000L, + CheckpointCoordinator coord = new CheckpointCoordinator(jid, 200000L, 200000L, new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 }, new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 }, new ExecutionVertex[0], cl, @@ -151,7 +151,7 @@ public class CheckpointStateRestoreTest { map.put(statelessId, stateless); - CheckpointCoordinator coord = new CheckpointCoordinator(jid, 200000L, + CheckpointCoordinator coord = new CheckpointCoordinator(jid, 200000L, 200000L, new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 }, new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 }, new ExecutionVertex[0], cl, @@ -193,7 +193,7 @@ public class CheckpointStateRestoreTest { @Test public void testNoCheckpointAvailable() { try { - CheckpointCoordinator coord = new CheckpointCoordinator(new JobID(), 200000L, + CheckpointCoordinator coord = new CheckpointCoordinator(new JobID(), 200000L, 200000L, new ExecutionVertex[] { mock(ExecutionVertex.class) }, new ExecutionVertex[] { mock(ExecutionVertex.class) }, new ExecutionVertex[0], cl, http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java index f6e4ab8..1c666e5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java @@ -61,7 +61,8 @@ public class CoordinatorShutdownTest { List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID()); JobGraph testGraph = new JobGraph("test job", vertex); - testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 5000)); + testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, + 5000, 60000, 0L, Integer.MAX_VALUE)); ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION()); @@ -112,7 +113,8 @@ public class CoordinatorShutdownTest { List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID()); JobGraph testGraph = new JobGraph("test job", vertex); - testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 5000)); + testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, + 5000, 60000, 0L, Integer.MAX_VALUE)); ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION()); http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java new file mode 100644 index 0000000..0320d6b --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.environment; + +import org.apache.flink.streaming.api.CheckpointingMode; + +import static java.util.Objects.requireNonNull; + +/** + * Configuration that captures all checkpointing related settings. + */ +public class CheckpointConfig implements java.io.Serializable { + + private static final long serialVersionUID = -750378776078908147L; + + /** The default checkpoint mode: exactly once */ + public static final CheckpointingMode DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE; + + /** The default timeout of a checkpoint attempt: 10 minutes */ + public static final long DEFAULT_TIMEOUT = 10 * 60 * 1000; + + /** The default minimum pause to be made between checkpoints: none */ + public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS = 0; + + /** The default limit of concurrently happening checkpoints: one */ + public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1; + + // ------------------------------------------------------------------------ + + /** Checkpointing mode (exactly-once vs. at-least-once). */ + private CheckpointingMode checkpointingMode = DEFAULT_MODE; + + /** Periodic checkpoint triggering interval */ + private long checkpointInterval = -1; // disabled + + /** Maximum time checkpoint may take before being discarded */ + private long checkpointTimeout = DEFAULT_TIMEOUT; + + /** Minimal pause between checkpointing attempts */ + private long minPauseBetweenCheckpoints = DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS; + + /** Maximum number of checkpoint attempts in progress at the same time */ + private int maxConcurrentCheckpoints = DEFAULT_MAX_CONCURRENT_CHECKPOINTS; + + /** Flag to force checkpointing in iterative jobs */ + private boolean forceCheckpointing; + + // ------------------------------------------------------------------------ + + /** + * Checks whether checkpointing is enabled. + * + * @return True if checkpointing is enables, false otherwise. + */ + public boolean isCheckpointingEnabled() { + return checkpointInterval > 0; + } + + /** + * Gets the checkpointing mode (exactly-once vs. at-least-once). + * + * @return The checkpointing mode. + */ + public CheckpointingMode getCheckpointingMode() { + return checkpointingMode; + } + + /** + * Sets the checkpointing mode (exactly-once vs. at-least-once). + * + * @param checkpointingMode The checkpointing mode. + */ + public void setCheckpointingMode(CheckpointingMode checkpointingMode) { + this.checkpointingMode = requireNonNull(checkpointingMode); + } + + /** + * Gets the interval in which checkpoints are periodically scheduled. + * + * <p>This setting defines the base interval. Checkpoint triggering may be delayed by the settings + * {@link #getMaxConcurrentCheckpoints()} and {@link #getMinPauseBetweenCheckpoints()}. + * + * @return The checkpoint interval, in milliseconds. + */ + public long getCheckpointInterval() { + return checkpointInterval; + } + + /** + * Sets the interval in which checkpoints are periodically scheduled. + * + * <p>This setting defines the base interval. Checkpoint triggering may be delayed by the settings + * {@link #setMaxConcurrentCheckpoints(int)} and {@link #setMinPauseBetweenCheckpoints(long)}. + * + * @param checkpointInterval The checkpoint interval, in milliseconds. + */ + public void setCheckpointInterval(long checkpointInterval) { + if (checkpointInterval <= 0) { + throw new IllegalArgumentException("Checkpoint interval must be larger than zero"); + } + this.checkpointInterval = checkpointInterval; + } + + /** + * Gets the maximum time that a checkpoint may take before being discarded. + * + * @return The checkpoint timeout, in milliseconds. + */ + public long getCheckpointTimeout() { + return checkpointTimeout; + } + + /** + * Sets the maximum time that a checkpoint may take before being discarded. + * + * @param checkpointTimeout The checkpoint timeout, in milliseconds. + */ + public void setCheckpointTimeout(long checkpointTimeout) { + if (checkpointInterval <= 0) { + throw new IllegalArgumentException("Checkpoint timeout must be larger than zero"); + } + this.checkpointTimeout = checkpointTimeout; + } + + /** + * Gets the minimal pause between checkpointing attempts. This setting defines how soon the + * checkpoint coordinator may trigger another checkpoint after it becomes possible to trigger + * another checkpoint with respect to the maximum number of concurrent checkpoints + * (see {@link #getMaxConcurrentCheckpoints()}). + * + * @return The minimal pause before the next checkpoint is triggered. + */ + public long getMinPauseBetweenCheckpoints() { + return minPauseBetweenCheckpoints; + } + +// /** +// * Sets the minimal pause between checkpointing attempts. This setting defines how soon the +// * checkpoint coordinator may trigger another checkpoint after it becomes possible to trigger +// * another checkpoint with respect to the maximum number of concurrent checkpoints +// * (see {@link #setMaxConcurrentCheckpoints(int)}). +// * +// * <p>If the maximum number of concurrent checkpoints is set to one, this setting makes effectively sure +// * that a minimum amount of time passes where no checkpoint is in progress at all. +// * +// * @param minPauseBetweenCheckpoints The minimal pause before the next checkpoint is triggered. +// */ +// public void setMinPauseBetweenCheckpoints(long minPauseBetweenCheckpoints) { +// if (minPauseBetweenCheckpoints < 0) { +// throw new IllegalArgumentException("Pause value must be zero or positive"); +// } +// this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints; +// } + + /** + * Gets the maximum number of checkpoint attempts that may be in progress at the same time. If this + * value is <i>n</i>, then no checkpoints will be triggered while <i>n</i> checkpoint attempts are + * currently in flight. For the next checkpoint to be triggered, one checkpoint attempt would need + * to finish or expire. + * + * @return The maximum number of concurrent checkpoint attempts. + */ + public int getMaxConcurrentCheckpoints() { + return maxConcurrentCheckpoints; + } + + /** + * Sets the maximum number of checkpoint attempts that may be in progress at the same time. If this + * value is <i>n</i>, then no checkpoints will be triggered while <i>n</i> checkpoint attempts are + * currently in flight. For the next checkpoint to be triggered, one checkpoint attempt would need + * to finish or expire. + * + * @param maxConcurrentCheckpoints The maximum number of concurrent checkpoint attempts. + */ + public void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints) { + if (maxConcurrentCheckpoints < 1) { + throw new IllegalArgumentException("The maximum number of concurrent attempts must be at least one."); + } + this.maxConcurrentCheckpoints = maxConcurrentCheckpoints; + } + + /** + * Checks whether checkpointing is forced, despite currently non-checkpointable iteration feedback. + * + * @return True, if checkpointing is forced, false otherwise. + * + * @deprecated This will be removed once iterations properly participate in checkpointing. + */ + @Deprecated + public boolean isForceCheckpointing() { + return forceCheckpointing; + } + + /** + * Checks whether checkpointing is forced, despite currently non-checkpointable iteration feedback. + * + * @param forceCheckpointing The flag to force checkpointing. + * + * @deprecated This will be removed once iterations properly participate in checkpointing. + */ + @Deprecated + public void setForceCheckpointing(boolean forceCheckpointing) { + this.forceCheckpointing = forceCheckpointing; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 72722bf..cb5fce5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -114,17 +114,14 @@ public abstract class StreamExecutionEnvironment { /** The execution configuration for this environment */ private final ExecutionConfig config = new ExecutionConfig(); + /** Settings that control the checkpointing behavior */ + private final CheckpointConfig checkpointCfg = new CheckpointConfig(); + protected final List<StreamTransformation<?>> transformations = new ArrayList<>(); private long bufferTimeout = DEFAULT_NETWORK_BUFFER_TIMEOUT; protected boolean isChainingEnabled = true; - - protected long checkpointInterval = -1; // disabled - - protected CheckpointingMode checkpointingMode; - - protected boolean forceCheckpointing = false; /** The state backend used for storing k/v state and state snapshots */ private StateBackend<?> defaultStateBackend; @@ -239,7 +236,17 @@ public abstract class StreamExecutionEnvironment { // ------------------------------------------------------------------------ // Checkpointing Settings // ------------------------------------------------------------------------ - + + /** + * Gets the checkpoint config, which defines values like checkpoint interval, delay between + * checkpoints, etc. + * + * @return The checkpoint config. + */ + public CheckpointConfig getCheckpointConfig() { + return checkpointCfg; + } + /** * Enables checkpointing for the streaming job. The distributed state of the streaming * dataflow will be periodically snapshotted. In case of a failure, the streaming @@ -257,7 +264,8 @@ public abstract class StreamExecutionEnvironment { * @param interval Time interval between state checkpoints in milliseconds. */ public StreamExecutionEnvironment enableCheckpointing(long interval) { - return enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE); + checkpointCfg.setCheckpointInterval(interval); + return this; } /** @@ -280,15 +288,8 @@ public abstract class StreamExecutionEnvironment { * The checkpointing mode, selecting between "exactly once" and "at least once" guaranteed. */ public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode) { - if (mode == null) { - throw new NullPointerException("checkpoint mode must not be null"); - } - if (interval <= 0) { - throw new IllegalArgumentException("the checkpoint interval must be positive"); - } - - this.checkpointInterval = interval; - this.checkpointingMode = mode; + checkpointCfg.setCheckpointingMode(mode); + checkpointCfg.setCheckpointInterval(interval); return this; } @@ -312,10 +313,11 @@ public abstract class StreamExecutionEnvironment { * If true checkpointing will be enabled for iterative jobs as well. */ @Deprecated + @SuppressWarnings("deprecation") public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode, boolean force) { - this.enableCheckpointing(interval, mode); - - this.forceCheckpointing = force; + checkpointCfg.setCheckpointingMode(mode); + checkpointCfg.setCheckpointInterval(interval); + checkpointCfg.setForceCheckpointing(force); return this; } @@ -337,32 +339,39 @@ public abstract class StreamExecutionEnvironment { */ @Deprecated public StreamExecutionEnvironment enableCheckpointing() { - enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE); + checkpointCfg.setCheckpointInterval(500); return this; } /** * Returns the checkpointing interval or -1 if checkpointing is disabled. + * + * <p>Shorthand for {@code getCheckpointConfig().getCheckpointInterval()}. * * @return The checkpointing interval or -1 */ public long getCheckpointInterval() { - return checkpointInterval; + return checkpointCfg.getCheckpointInterval(); } - /** * Returns whether checkpointing is force-enabled. */ + @Deprecated + @SuppressWarnings("deprecation") public boolean isForceCheckpointing() { - return forceCheckpointing; + return checkpointCfg.isForceCheckpointing(); } /** - * Returns the {@link CheckpointingMode}. + * Returns the checkpointing mode (exactly-once vs. at-least-once). + * + * <p>Shorthand for {@code getCheckpointConfig().getCheckpointingMode()}. + * + * @return The checkpoin */ public CheckpointingMode getCheckpointingMode() { - return checkpointingMode; + return checkpointCfg.getCheckpointingMode(); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 7d8f9f9..bc3acb7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -23,15 +23,12 @@ import java.io.IOException; import java.io.PrintWriter; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -43,8 +40,8 @@ import org.apache.flink.api.java.typeutils.MissingTypeInfo; import org.apache.flink.optimizer.plan.StreamingPlan; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.collector.selector.OutputSelector; +import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import org.apache.flink.streaming.api.operators.StreamOperator; @@ -59,7 +56,6 @@ import org.apache.flink.streaming.runtime.tasks.SourceStreamTask; import org.apache.flink.streaming.runtime.tasks.StreamIterationHead; import org.apache.flink.streaming.runtime.tasks.StreamIterationTail; import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask; -import org.apache.sling.commons.json.JSONException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,9 +66,6 @@ import org.slf4j.LoggerFactory; * */ public class StreamGraph extends StreamingPlan { - - /** The default interval for checkpoints, in milliseconds */ - public static final int DEFAULT_CHECKPOINTING_INTERVAL_MS = 5000; private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class); @@ -80,11 +73,9 @@ public class StreamGraph extends StreamingPlan { private final StreamExecutionEnvironment environemnt; private final ExecutionConfig executionConfig; - - private CheckpointingMode checkpointingMode; - private boolean checkpointingEnabled = false; - private long checkpointingInterval = DEFAULT_CHECKPOINTING_INTERVAL_MS; - private boolean chaining = true; + private final CheckpointConfig checkpointConfig; + + private boolean chaining; private Map<Integer, StreamNode> streamNodes; private Set<Integer> sources; @@ -97,12 +88,11 @@ public class StreamGraph extends StreamingPlan { private StateBackend<?> stateBackend; private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs; - private boolean forceCheckpoint = false; public StreamGraph(StreamExecutionEnvironment environment) { - this.environemnt = environment; - executionConfig = environment.getConfig(); + this.executionConfig = environment.getConfig(); + this.checkpointConfig = environment.getCheckpointConfig(); // create an empty new stream graph. clear(); @@ -112,19 +102,23 @@ public class StreamGraph extends StreamingPlan { * Remove all registered nodes etc. */ public void clear() { - streamNodes = Maps.newHashMap(); - virtualSelectNodes = Maps.newHashMap(); - virtuaPartitionNodes = Maps.newHashMap(); - vertexIDtoBrokerID = Maps.newHashMap(); - vertexIDtoLoopTimeout = Maps.newHashMap(); - iterationSourceSinkPairs = Sets.newHashSet(); - sources = Sets.newHashSet(); - sinks = Sets.newHashSet(); + streamNodes = new HashMap<>(); + virtualSelectNodes = new HashMap<>(); + virtuaPartitionNodes = new HashMap<>(); + vertexIDtoBrokerID = new HashMap<>(); + vertexIDtoLoopTimeout = new HashMap<>(); + iterationSourceSinkPairs = new HashSet<>(); + sources = new HashSet<>(); + sinks = new HashSet<>(); } - protected ExecutionConfig getExecutionConfig() { + public ExecutionConfig getExecutionConfig() { return executionConfig; } + + public CheckpointConfig getCheckpointConfig() { + return checkpointConfig; + } public String getJobName() { return jobName; @@ -138,18 +132,6 @@ public class StreamGraph extends StreamingPlan { this.chaining = chaining; } - public void setCheckpointingEnabled(boolean checkpointingEnabled) { - this.checkpointingEnabled = checkpointingEnabled; - } - - public void setCheckpointingInterval(long checkpointingInterval) { - this.checkpointingInterval = checkpointingInterval; - } - - public void forceCheckpoint() { - this.forceCheckpoint = true; - } - public void setStateBackend(StateBackend<?> backend) { this.stateBackend = backend; } @@ -158,27 +140,11 @@ public class StreamGraph extends StreamingPlan { return this.stateBackend; } - public long getCheckpointingInterval() { - return checkpointingInterval; - } - // Checkpointing public boolean isChainingEnabled() { return chaining; } - - public boolean isCheckpointingEnabled() { - return checkpointingEnabled; - } - - public CheckpointingMode getCheckpointingMode() { - return checkpointingMode; - } - - public void setCheckpointingMode(CheckpointingMode checkpointingMode) { - this.checkpointingMode = checkpointingMode; - } public boolean isIterative() { @@ -322,7 +288,7 @@ public class StreamGraph extends StreamingPlan { downStreamVertexID, typeNumber, null, - Lists.<String>newArrayList()); + new ArrayList<String>()); } @@ -463,10 +429,7 @@ public class StreamGraph extends StreamingPlan { } public StreamEdge getStreamEdge(int sourceId, int targetId) { - Iterator<StreamEdge> outIterator = getStreamNode(sourceId).getOutEdges().iterator(); - while (outIterator.hasNext()) { - StreamEdge edge = outIterator.next(); - + for (StreamEdge edge : getStreamNode(sourceId).getOutEdges()) { if (edge.getTargetId() == targetId) { return edge; } @@ -505,8 +468,7 @@ public class StreamGraph extends StreamingPlan { return vertexIDtoLoopTimeout.get(vertexID); } - public Tuple2<StreamNode, StreamNode> createIterationSourceAndSink(int loopId, int sourceId, int sinkId, long timeout, int parallelism) { - + public Tuple2<StreamNode, StreamNode> createIterationSourceAndSink(int loopId, int sourceId, int sinkId, long timeout, int parallelism) { StreamNode source = this.addNode(sourceId, StreamIterationHead.class, null, @@ -537,15 +499,12 @@ public class StreamGraph extends StreamingPlan { return iterationSourceSinkPairs; } - protected void removeEdge(StreamEdge edge) { - + private void removeEdge(StreamEdge edge) { edge.getSourceVertex().getOutEdges().remove(edge); edge.getTargetVertex().getInEdges().remove(edge); - } - protected void removeVertex(StreamNode toRemove) { - + private void removeVertex(StreamNode toRemove) { Set<StreamEdge> edgesToRemove = new HashSet<StreamEdge>(); edgesToRemove.addAll(toRemove.getInEdges()); @@ -560,9 +519,10 @@ public class StreamGraph extends StreamingPlan { /** * Gets the assembled {@link JobGraph}. */ + @SuppressWarnings("deprecation") public JobGraph getJobGraph() { // temporarily forbid checkpointing for iterative jobs - if (isIterative() && isCheckpointingEnabled() && !forceCheckpoint) { + if (isIterative() && checkpointConfig.isCheckpointingEnabled() && !checkpointConfig.isForceCheckpointing()) { throw new UnsupportedOperationException( "Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. " + "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. " @@ -576,16 +536,12 @@ public class StreamGraph extends StreamingPlan { @Override public String getStreamingPlanAsJSON() { - try { return new JSONGenerator(this).getJSON(); - } catch (JSONException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("JSON plan creation failed: {}", e); - } - return ""; } - + catch (Exception e) { + throw new RuntimeException("JSON plan creation failed", e); + } } @Override @@ -606,5 +562,4 @@ public class StreamGraph extends StreamingPlan { public static enum ResourceStrategy { DEFAULT, ISOLATE, NEWGROUP } - } http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index 8bd0e48..4bd7a73 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -98,17 +98,6 @@ public class StreamGraphGenerator { private StreamGraphGenerator(StreamExecutionEnvironment env) { this.streamGraph = new StreamGraph(env); this.streamGraph.setChaining(env.isChainingEnabled()); - - if (env.getCheckpointInterval() > 0) { - this.streamGraph.setCheckpointingEnabled(true); - this.streamGraph.setCheckpointingInterval(env.getCheckpointInterval()); - this.streamGraph.setCheckpointingMode(env.getCheckpointingMode()); - } - this.streamGraph.setStateBackend(env.getStateBackend()); - if (env.isForceCheckpointing()) { - this.streamGraph.forceCheckpoint(); - } - this.env = env; this.alreadyTransformed = new HashMap<>(); } http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 613d381..515e362 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.api.graph; import java.io.IOException; -import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -27,10 +26,10 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; + import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; -import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.DistributionPattern; @@ -45,6 +44,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; @@ -52,6 +52,7 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.tasks.StreamIterationHead; import org.apache.flink.streaming.runtime.tasks.StreamIterationTail; import org.apache.flink.util.InstantiationUtil; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -274,18 +275,20 @@ public class StreamingJobGraphGenerator { config.setNonChainedOutputs(nonChainableOutputs); config.setChainedOutputs(chainableOutputs); - config.setCheckpointingEnabled(streamGraph.isCheckpointingEnabled()); - if (streamGraph.isCheckpointingEnabled()) { - config.setCheckpointMode(streamGraph.getCheckpointingMode()); + final CheckpointConfig ceckpointCfg = streamGraph.getCheckpointConfig(); + + config.setCheckpointingEnabled(ceckpointCfg.isCheckpointingEnabled()); + if (ceckpointCfg.isCheckpointingEnabled()) { + config.setCheckpointMode(ceckpointCfg.getCheckpointingMode()); config.setStateBackend(streamGraph.getStateBackend()); - } else { - // the at least once input handler is slightly cheaper (in the absence of checkpoints), + } + else { + // the "at-least-once" input handler is slightly cheaper (in the absence of checkpoints), // so we use that one if checkpointing is not enabled config.setCheckpointMode(CheckpointingMode.AT_LEAST_ONCE); } - config.setStatePartitioner((KeySelector<?, Serializable>) vertex.getStatePartitioner()); + config.setStatePartitioner(vertex.getStatePartitioner()); config.setStateKeySerializer(vertex.getStateKeySerializer()); - Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass(); @@ -385,8 +388,10 @@ public class StreamingJobGraphGenerator { } private void configureCheckpointing() { - if (streamGraph.isCheckpointingEnabled()) { - long interval = streamGraph.getCheckpointingInterval(); + CheckpointConfig cfg = streamGraph.getCheckpointConfig(); + + if (cfg.isCheckpointingEnabled()) { + long interval = cfg.getCheckpointInterval(); if (interval < 1) { throw new IllegalArgumentException("The checkpoint interval must be positive"); } @@ -400,10 +405,9 @@ public class StreamingJobGraphGenerator { List<JobVertexID> ackVertices = new ArrayList<JobVertexID>(jobVertices.size()); // collect the vertices that receive "commit checkpoint" messages - // currently, these are all certices + // currently, these are all vertices List<JobVertexID> commitVertices = new ArrayList<JobVertexID>(); - for (JobVertex vertex : jobVertices.values()) { if (vertex.isInputVertex()) { triggerVertices.add(vertex.getID()); @@ -414,7 +418,9 @@ public class StreamingJobGraphGenerator { } JobSnapshottingSettings settings = new JobSnapshottingSettings( - triggerVertices, ackVertices, commitVertices, interval); + triggerVertices, ackVertices, commitVertices, interval, + cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(), + cfg.getMaxConcurrentCheckpoints()); jobGraph.setSnapshotSettings(settings); // if the user enabled checkpointing, the default number of exec retries is infinitive. http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 14f23e1..69147f6 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -96,6 +96,13 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { // ------------------------------------------------------------------------ // Checkpointing Settings // ------------------------------------------------------------------------ + + /** + * Gets the checkpoint config, which defines values like checkpoint interval, delay between + * checkpoints, etc. + */ + def getCheckpointConfig = javaEnv.getCheckpointConfig() + /** * Enables checkpointing for the streaming job. The distributed state of the streaming * dataflow will be periodically snapshotted. In case of a failure, the streaming