[FLINK-3051] [streaming] Add mechanisms to control the maximum number of 
concurrent checkpoints

This closes #1408


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/55fd5f32
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/55fd5f32
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/55fd5f32

Branch: refs/heads/master
Commit: 55fd5f32d7ef0292a01192ab08456fae49b91791
Parents: 4097666
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 19 19:05:47 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 26 17:16:29 2015 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 402 ++++++++++++-------
 .../CheckpointCoordinatorDeActivator.java       |  11 +-
 .../runtime/checkpoint/PendingCheckpoint.java   |  10 +-
 .../runtime/executiongraph/ExecutionGraph.java  |  22 +-
 .../jobgraph/tasks/JobSnapshottingSettings.java |  55 ++-
 .../flink/runtime/jobmanager/JobManager.scala   |   2 +
 .../checkpoint/CheckpointCoordinatorTest.java   | 357 +++++++++++++++-
 .../checkpoint/CheckpointStateRestoreTest.java  |   6 +-
 .../checkpoint/CoordinatorShutdownTest.java     |   6 +-
 .../api/environment/CheckpointConfig.java       | 221 ++++++++++
 .../environment/StreamExecutionEnvironment.java |  61 +--
 .../flink/streaming/api/graph/StreamGraph.java  | 105 ++---
 .../api/graph/StreamGraphGenerator.java         |  11 -
 .../api/graph/StreamingJobGraphGenerator.java   |  36 +-
 .../api/scala/StreamExecutionEnvironment.scala  |   7 +
 15 files changed, 990 insertions(+), 322 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 09dd2d9..454b88a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
@@ -34,6 +35,7 @@ import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +48,6 @@ import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -63,11 +64,12 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
  */
 public class CheckpointCoordinator {
        
-       private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointCoordinator.class);
+       static final Logger LOG = 
LoggerFactory.getLogger(CheckpointCoordinator.class);
        
        /** The number of recent checkpoints whose IDs are remembered */
        private static final int NUM_GHOST_CHECKPOINT_IDS = 16;
        
+       
        /** Coordinator-wide lock to safeguard the checkpoint updates */
        private final Object lock = new Object();
        
@@ -83,35 +85,58 @@ public class CheckpointCoordinator {
        /** Tasks who need to be sent a message when a checkpoint is confirmed 
*/
        private final ExecutionVertex[] tasksToCommitTo;
 
+       /** Map from checkpoint ID to the pending checkpoint */
        private final Map<Long, PendingCheckpoint> pendingCheckpoints;
 
-       /**
-        * Completed checkpoints. Implementations can be blocking. Make sure 
calls to methods
-        * accessing this don't block the job manager actor and run 
asynchronously.
-        */
+       /** Completed checkpoints. Implementations can be blocking. Make sure 
calls to methods
+        * accessing this don't block the job manager actor and run 
asynchronously. */
        private final CompletedCheckpointStore completedCheckpointStore;
        
+       /** A list of recent checkpoint IDs, to identify late messages (vs 
invalid ones) */
        private final ArrayDeque<Long> recentPendingCheckpoints;
 
-       /**
-        * Checkpoint ID counter to ensure ascending IDs. In case of job 
manager failures, these
-        * need to be ascending across job managers.
-        */
+       /** Checkpoint ID counter to ensure ascending IDs. In case of job 
manager failures, these
+        * need to be ascending across job managers. */
        private final CheckpointIDCounter checkpointIdCounter;
 
-       private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new 
AtomicInteger();
+       /** Class loader used to deserialize the state handles (as they may be 
user-defined) */
+       private final ClassLoader userClassLoader;
 
-       /** The timer that handles the checkpoint timeouts and triggers 
periodic checkpoints */
-       private final Timer timer;
+       /** The base checkpoint interval. Actual trigger time may be affected 
by the
+        * max concurrent checkpoints and minimum-pause values */
+       private final long baseInterval;
        
+       /** The max time (in ms) that a checkpoint may take */
        private final long checkpointTimeout;
+
+       /** The min time(in ms) to delay after a checkpoint could be triggered. 
Allows to 
+        * enforce minimum processing time between checkpoint attempts */
+       private final long minPauseBetweenCheckpoints;
+       
+       /** The maximum number of checkpoints that may be in progress at the 
same time */
+       private final int maxConcurrentCheckpointAttempts;
        
-       private TimerTask periodicScheduler;
+       /** The timer that handles the checkpoint timeouts and triggers 
periodic checkpoints */
+       private final Timer timer;
        
+       /** Actor that receives status updates from the execution graph this 
coordinator works for */ 
        private ActorGateway jobStatusListener;
+
+       /** The number of consecutive failed trigger attempts */ 
+       private int numUnsuccessfulCheckpointsTriggers;
+       
+       
+       private ScheduledTrigger currentPeriodicTrigger;
+
+       /** Flag whether a triggered checkpoint should immediately schedule the 
next checkpoint.
+        * Non-volatile, because only accessed in synchronized scope */
+       private boolean periodicScheduling;
        
-       private ClassLoader userClassLoader;
+       /** Flag whether a trigger request could not be handled immediately. 
Non-volatile, because only
+        * accessed in synchronized scope */
+       private boolean triggerRequestQueued;
        
+       /** Flag marking the coordinator as shut down (not accepting any 
messages any more) */
        private volatile boolean shutdown;
 
        /** Shutdown hook thread to clean up state handles. */
@@ -121,6 +146,7 @@ public class CheckpointCoordinator {
 
        public CheckpointCoordinator(
                        JobID job,
+                       long baseInterval,
                        long checkpointTimeout,
                        ExecutionVertex[] tasksToTrigger,
                        ExecutionVertex[] tasksToWaitFor,
@@ -130,11 +156,36 @@ public class CheckpointCoordinator {
                        CompletedCheckpointStore completedCheckpointStore,
                        RecoveryMode recoveryMode) throws Exception {
                
+               this(job, baseInterval, checkpointTimeout, 0L, 
Integer.MAX_VALUE,
+                               tasksToTrigger, tasksToWaitFor, tasksToCommitTo,
+                               userClassLoader, checkpointIDCounter, 
completedCheckpointStore, recoveryMode);
+       }
+       
+       public CheckpointCoordinator(
+                       JobID job,
+                       long baseInterval,
+                       long checkpointTimeout,
+                       long minPauseBetweenCheckpoints,
+                       int maxConcurrentCheckpointAttempts,
+                       ExecutionVertex[] tasksToTrigger,
+                       ExecutionVertex[] tasksToWaitFor,
+                       ExecutionVertex[] tasksToCommitTo,
+                       ClassLoader userClassLoader,
+                       CheckpointIDCounter checkpointIDCounter,
+                       CompletedCheckpointStore completedCheckpointStore,
+                       RecoveryMode recoveryMode) throws Exception {
+               
                // Sanity check
+               checkArgument(baseInterval > 0, "Checkpoint timeout must be 
larger than zero");
                checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must 
be larger than zero");
+               checkArgument(minPauseBetweenCheckpoints >= 0, 
"minPauseBetweenCheckpoints must be >= 0");
+               checkArgument(maxConcurrentCheckpointAttempts >= 1, 
"maxConcurrentCheckpointAttempts must be >= 1");
                
                this.job = checkNotNull(job);
+               this.baseInterval = baseInterval;
                this.checkpointTimeout = checkpointTimeout;
+               this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+               this.maxConcurrentCheckpointAttempts = 
maxConcurrentCheckpointAttempts;
                this.tasksToTrigger = checkNotNull(tasksToTrigger);
                this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
                this.tasksToCommitTo = checkNotNull(tasksToCommitTo);
@@ -143,10 +194,11 @@ public class CheckpointCoordinator {
                this.recentPendingCheckpoints = new 
ArrayDeque<Long>(NUM_GHOST_CHECKPOINT_IDS);
                this.userClassLoader = userClassLoader;
                this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
+               
                checkpointIDCounter.start();
 
                this.timer = new Timer("Checkpoint Timer", true);
-
+               
                if (recoveryMode == RecoveryMode.STANDALONE) {
                        // Add shutdown hook to clean up state handles when no 
checkpoint recovery is
                        // possible. In case of another configured recovery 
mode, the checkpoints need to be
@@ -158,7 +210,7 @@ public class CheckpointCoordinator {
                                                
CheckpointCoordinator.this.shutdown();
                                        }
                                        catch (Throwable t) {
-                                               LOG.error("Error during 
shutdown of checkpoint coordniator via " +
+                                               LOG.error("Error during 
shutdown of checkpoint coordinator via " +
                                                                "JVM shutdown 
hook: " + t.getMessage(), t);
                                        }
                                }
@@ -197,7 +249,10 @@ public class CheckpointCoordinator {
                                        shutdown = true;
                                        LOG.info("Stopping checkpoint 
coordinator for job " + job);
 
-                                       // shut down the thread that handles 
the timeouts
+                                       periodicScheduling = false;
+                                       triggerRequestQueued = false;
+                                       
+                                       // shut down the thread that handles 
the timeouts and pending triggers
                                        timer.cancel();
 
                                        // make sure that the actor does not 
linger
@@ -206,17 +261,11 @@ public class CheckpointCoordinator {
                                                jobStatusListener = null;
                                        }
 
-                                       // the scheduling thread needs also to 
go away
-                                       if (periodicScheduler != null) {
-                                               periodicScheduler.cancel();
-                                               periodicScheduler = null;
-                                       }
-
                                        checkpointIdCounter.stop();
 
                                        // clear and discard all pending 
checkpoints
                                        for (PendingCheckpoint pending : 
pendingCheckpoints.values()) {
-                                                       
pending.discard(userClassLoader, true);
+                                               
pending.discard(userClassLoader);
                                        }
                                        pendingCheckpoints.clear();
 
@@ -235,7 +284,7 @@ public class CheckpointCoordinator {
                                                // race, JVM is in shutdown 
already, we can safely ignore this
                                        }
                                        catch (Throwable t) {
-                                               LOG.warn("Error unregistering 
checkpoint cooordniator shutdown hook.", t);
+                                               LOG.warn("Error unregistering 
checkpoint coordinator shutdown hook.", t);
                                        }
                                }
                        }
@@ -251,93 +300,136 @@ public class CheckpointCoordinator {
        // 
--------------------------------------------------------------------------------------------
        
        /**
-        * Triggers a new checkpoint and uses the current system time as the
-        * checkpoint time.
-        */
-       public void triggerCheckpoint() throws Exception {
-               triggerCheckpoint(System.currentTimeMillis());
-       }
-
-       /**
         * Triggers a new checkpoint and uses the given timestamp as the 
checkpoint
         * timestamp.
         * 
         * @param timestamp The timestamp for the checkpoint.
         */
-       public boolean triggerCheckpoint(final long timestamp) throws Exception 
{
-               if (shutdown) {
-                       LOG.error("Cannot trigger checkpoint, checkpoint 
coordinator has been shutdown.");
-                       return false;
-               }
-               
-               final long checkpointID = checkpointIdCounter.getAndIncrement();
-               LOG.info("Triggering checkpoint " + checkpointID + " @ " + 
timestamp);
-               
-               try {
-                       // first check if all tasks that we need to trigger are 
running.
-                       // if not, abort the checkpoint
-                       ExecutionAttemptID[] triggerIDs = new 
ExecutionAttemptID[tasksToTrigger.length];
-                       for (int i = 0; i < tasksToTrigger.length; i++) {
-                               Execution ee = 
tasksToTrigger[i].getCurrentExecutionAttempt();
-                               if (ee != null && ee.getState() == 
ExecutionState.RUNNING) {
-                                       triggerIDs[i] = ee.getAttemptId();
-                               } else {
-                                       LOG.info("Checkpoint triggering task {} 
is not being executed at the moment. Aborting checkpoint.",
-                                                       
tasksToTrigger[i].getSimpleName());
-                                       return false;
-                               }
+       public boolean triggerCheckpoint(long timestamp) throws Exception {
+               // make some eager pre-checks
+               synchronized (lock) {
+                       // abort if the coordinator has been shutdown in the 
meantime
+                       if (shutdown) {
+                               return false;
+                       }
+                       
+                       // sanity check: there should never be more than one 
trigger request queued
+                       if (triggerRequestQueued) {
+                               LOG.warn("Trying to trigger another checkpoint 
while one was queued already");
+                               return false;
                        }
 
-                       // next, check if all tasks that need to acknowledge 
the checkpoint are running.
-                       // if not, abort the checkpoint
-                       Map<ExecutionAttemptID, ExecutionVertex> ackTasks =
-                                                               new 
HashMap<ExecutionAttemptID, ExecutionVertex>(tasksToWaitFor.length);
-
-                       for (ExecutionVertex ev : tasksToWaitFor) {
-                               Execution ee = ev.getCurrentExecutionAttempt();
-                               if (ee != null) {
-                                       ackTasks.put(ee.getAttemptId(), ev);
-                               } else {
-                                       LOG.info("Checkpoint acknowledging task 
{} is not being executed at the moment. Aborting checkpoint.",
-                                                       ev.getSimpleName());
-                                       return false;
+                       // if too many checkpoints are currently in progress, 
we need to mark that a request is queued
+                       if (pendingCheckpoints.size() >= 
maxConcurrentCheckpointAttempts) {
+                               triggerRequestQueued = true;
+                               if (currentPeriodicTrigger != null) {
+                                       currentPeriodicTrigger.cancel();
+                                       currentPeriodicTrigger = null;
                                }
+                               return false;
                        }
-                       
-                       // register a new pending checkpoint. this makes sure 
we can properly receive acknowledgements
-                       final PendingCheckpoint checkpoint = new 
PendingCheckpoint(job, checkpointID, timestamp, ackTasks);
+               }
 
-                       // schedule the timer that will clean up the expired 
checkpoints
-                       TimerTask canceller = new TimerTask() {
-                               @Override
-                               public void run() {
-                                       try {
-                                               synchronized (lock) {
-                                                       // only do the work if 
the checkpoint is not discarded anyways
-                                                       // note that checkpoint 
completion discards the pending checkpoint object
-                                                       if 
(!checkpoint.isDiscarded()) {
-                                                               
LOG.info("Checkpoint " + checkpointID + " expired before completing.");
-                                                               
-                                                               
checkpoint.discard(userClassLoader, true);
-                                                               
-                                                               
pendingCheckpoints.remove(checkpointID);
-                                                               
rememberRecentCheckpointId(checkpointID);
-                                                       }
+               // first check if all tasks that we need to trigger are running.
+               // if not, abort the checkpoint
+               ExecutionAttemptID[] triggerIDs = new 
ExecutionAttemptID[tasksToTrigger.length];
+               for (int i = 0; i < tasksToTrigger.length; i++) {
+                       Execution ee = 
tasksToTrigger[i].getCurrentExecutionAttempt();
+                       if (ee != null && ee.getState() == 
ExecutionState.RUNNING) {
+                               triggerIDs[i] = ee.getAttemptId();
+                       } else {
+                               LOG.info("Checkpoint triggering task {} is not 
being executed at the moment. Aborting checkpoint.",
+                                               
tasksToTrigger[i].getSimpleName());
+                               return false;
+                       }
+               }
+       
+               // next, check if all tasks that need to acknowledge the 
checkpoint are running.
+               // if not, abort the checkpoint
+               Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new 
HashMap<>(tasksToWaitFor.length);
+
+               for (ExecutionVertex ev : tasksToWaitFor) {
+                       Execution ee = ev.getCurrentExecutionAttempt();
+                       if (ee != null) {
+                               ackTasks.put(ee.getAttemptId(), ev);
+                       } else {
+                               LOG.info("Checkpoint acknowledging task {} is 
not being executed at the moment. Aborting checkpoint.",
+                                               ev.getSimpleName());
+                               return false;
+                       }
+               }
+
+               // we will actually trigger this checkpoint!
+               
+               final long checkpointID;
+               try {
+                       // this must happen outside the locked scope, because 
it communicates
+                       // with external services (in HA mode) and may block 
for a while.
+                       checkpointID = checkpointIdCounter.getAndIncrement();
+               }
+               catch (Throwable t) {
+                       int numUnsuccessful = 
++numUnsuccessfulCheckpointsTriggers;
+                       LOG.warn("Failed to trigger checkpoint (" + 
numUnsuccessful + " consecutive failed attempts so far)", t);
+                       return false;
+               }
+
+               LOG.info("Triggering checkpoint " + checkpointID + " @ " + 
timestamp);
+               
+               final PendingCheckpoint checkpoint = new PendingCheckpoint(job, 
checkpointID, timestamp, ackTasks);
+
+               // schedule the timer that will clean up the expired checkpoints
+               TimerTask canceller = new TimerTask() {
+                       @Override
+                       public void run() {
+                               try {
+                                       synchronized (lock) {
+                                               // only do the work if the 
checkpoint is not discarded anyways
+                                               // note that checkpoint 
completion discards the pending checkpoint object
+                                               if (!checkpoint.isDiscarded()) {
+                                                       LOG.info("Checkpoint " 
+ checkpointID + " expired before completing.");
+
+                                                       
checkpoint.discard(userClassLoader);
+                                                       
pendingCheckpoints.remove(checkpointID);
+                                                       
rememberRecentCheckpointId(checkpointID);
+
+                                                       triggerQueuedRequests();
                                                }
                                        }
-                                       catch (Throwable t) {
-                                               LOG.error("Exception while 
handling checkpoint timeout", t);
-                                       }
                                }
-                       };
-                       
+                               catch (Throwable t) {
+                                       LOG.error("Exception while handling 
checkpoint timeout", t);
+                               }
+                       }
+               };
+               
+               try {
+                       // re-acquire the lock
                        synchronized (lock) {
+                               // since we released the lock in the meantime, 
we need to re-check
+                               // that the conditions still hold. this is 
clumsy, but it allows us to
+                               // release the lock in the meantime while calls 
to external services are
+                               // blocking progress, and still gives us early 
checks that skip work
+                               // if no checkpoint can happen anyways
                                if (shutdown) {
-                                       throw new 
IllegalStateException("Checkpoint coordinator has been shutdown.");
+                                       return false;
+                               }
+                               else if (triggerRequestQueued) {
+                                       LOG.warn("Trying to trigger another 
checkpoint while one was queued already");
+                                       return false;
+                               }
+                               else if (pendingCheckpoints.size() >= 
maxConcurrentCheckpointAttempts) {
+                                       triggerRequestQueued = true;
+                                       if (currentPeriodicTrigger != null) {
+                                               currentPeriodicTrigger.cancel();
+                                               currentPeriodicTrigger = null;
+                                       }
+                                       return false;
                                }
+                               
                                pendingCheckpoints.put(checkpointID, 
checkpoint);
                                timer.schedule(canceller, checkpointTimeout);
                        }
+                       // end of lock scope
 
                        // send the messages to the tasks that trigger their 
checkpoint
                        for (int i = 0; i < tasksToTrigger.length; i++) {
@@ -345,21 +437,21 @@ public class CheckpointCoordinator {
                                TriggerCheckpoint message = new 
TriggerCheckpoint(job, id, checkpointID, timestamp);
                                
tasksToTrigger[i].sendMessageToCurrentExecution(message, id);
                        }
-                       
-                       numUnsuccessfulCheckpointsTriggers.set(0);
+
+                       numUnsuccessfulCheckpointsTriggers = 0;
                        return true;
                }
                catch (Throwable t) {
-                       int numUnsuccessful = 
numUnsuccessfulCheckpointsTriggers.incrementAndGet();
-                       LOG.warn("Failed to trigger checkpoint (" + 
numUnsuccessful + " consecutive failed attempts so far)", t);
-                       
+                       // guard the map against concurrent modifications
                        synchronized (lock) {
-                               PendingCheckpoint checkpoint = 
pendingCheckpoints.remove(checkpointID);
-                               if (checkpoint != null && 
!checkpoint.isDiscarded()) {
-                                       checkpoint.discard(userClassLoader, 
true);
-                               }
+                               pendingCheckpoints.remove(checkpointID);
                        }
                        
+                       int numUnsuccessful = 
++numUnsuccessfulCheckpointsTriggers;
+                       LOG.warn("Failed to trigger checkpoint (" + 
numUnsuccessful + " consecutive failed attempts so far)", t);
+                       if (!checkpoint.isDiscarded()) {
+                               checkpoint.discard(userClassLoader);
+                       }
                        return false;
                }
        }
@@ -401,6 +493,8 @@ public class CheckpointCoordinator {
                                                
rememberRecentCheckpointId(checkpointId);
                                                
                                                
dropSubsumedCheckpoints(completed.getTimestamp());
+
+                                               triggerQueuedRequests();
                                        }
                                }
                                else {
@@ -455,13 +549,38 @@ public class CheckpointCoordinator {
                        if (p.getCheckpointTimestamp() < timestamp) {
                                rememberRecentCheckpointId(p.getCheckpointId());
 
-                               p.discard(userClassLoader, true);
+                               p.discard(userClassLoader);
 
                                entries.remove();
                        }
                }
        }
 
+       /**
+        * Triggers the queued request, if there is one.
+        * 
+        * <p>NOTE: The caller of this method must hold the lock when invoking 
the method!
+        */
+       private void triggerQueuedRequests() throws Exception {
+               if (triggerRequestQueued) {
+                       triggerRequestQueued = false;
+
+                       // trigger the checkpoint from the trigger timer, to 
finish the work of this thread before
+                       // starting with the next checkpoint
+                       ScheduledTrigger trigger = new ScheduledTrigger();
+                       if (periodicScheduling) {
+                               if (currentPeriodicTrigger != null) {
+                                       currentPeriodicTrigger.cancel();
+                               }
+                               currentPeriodicTrigger = trigger; 
+                               timer.scheduleAtFixedRate(trigger, 0L, 
baseInterval);
+                       }
+                       else {
+                               timer.schedule(trigger, 0L);
+                       }
+               }
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Checkpoint State Restoring
        // 
--------------------------------------------------------------------------------------------
@@ -557,63 +676,74 @@ public class CheckpointCoordinator {
        //  Periodic scheduling of checkpoints
        // 
--------------------------------------------------------------------------------------------
        
-       public void startPeriodicCheckpointScheduler(long interval) {
+       public void startCheckpointScheduler() {
                synchronized (lock) {
                        if (shutdown) {
                                throw new IllegalArgumentException("Checkpoint 
coordinator is shut down");
                        }
                        
-                       // cancel any previous scheduler
-                       stopPeriodicCheckpointScheduler();
+                       // make sure all prior timers are cancelled 
+                       stopCheckpointScheduler();
                        
-                       // start a new scheduler
-                       periodicScheduler = new TimerTask() {
-                               @Override
-                               public void run() {
-                                       try {
-                                               triggerCheckpoint();
-                                       }
-                                       catch (Exception e) {
-                                               LOG.error("Exception while 
triggering checkpoint", e);
-                                       }
-                               }
-                       };
-                       timer.scheduleAtFixedRate(periodicScheduler, interval, 
interval);
+                       periodicScheduling = true;
+                       currentPeriodicTrigger = new ScheduledTrigger();
+                       timer.scheduleAtFixedRate(currentPeriodicTrigger, 
baseInterval, baseInterval);
                }
        }
        
-       public void stopPeriodicCheckpointScheduler() {
+       public void stopCheckpointScheduler() {
                synchronized (lock) {
-                       if (periodicScheduler != null) {
-                               periodicScheduler.cancel();
-                               periodicScheduler = null;
+                       triggerRequestQueued = false;
+                       periodicScheduling = false;
+                       
+                       if (currentPeriodicTrigger != null) {
+                               currentPeriodicTrigger.cancel();
+                               currentPeriodicTrigger = null;
+                       }
+                       
+                       for (PendingCheckpoint p : pendingCheckpoints.values()) 
{
+                               p.discard(userClassLoader);
                        }
+                       pendingCheckpoints.clear();
+                       
+                       numUnsuccessfulCheckpointsTriggers = 0;
                }
        }
        
-       public ActorGateway createJobStatusListener(
-                       ActorSystem actorSystem,
-                       long checkpointInterval,
-                       UUID leaderSessionID) {
+       // 
------------------------------------------------------------------------
+       //  job status listener that schedules / cancels periodic checkpoints 
+       // 
------------------------------------------------------------------------
+       
+       public ActorGateway createActivatorDeactivator(ActorSystem actorSystem, 
UUID leaderSessionID) {
+               
                synchronized (lock) {
                        if (shutdown) {
                                throw new IllegalArgumentException("Checkpoint 
coordinator is shut down");
                        }
 
                        if (jobStatusListener == null) {
-                               Props props = Props.create(
-                                               
CheckpointCoordinatorDeActivator.class,
-                                               this,
-                                               checkpointInterval,
-                                               leaderSessionID);
+                               Props props = 
Props.create(CheckpointCoordinatorDeActivator.class, this, leaderSessionID);
 
                                // wrap the ActorRef in a AkkaActorGateway to 
support message decoration
-                               jobStatusListener = new AkkaActorGateway(
-                                               actorSystem.actorOf(props),
-                                               leaderSessionID);
+                               jobStatusListener = new 
AkkaActorGateway(actorSystem.actorOf(props), leaderSessionID);
                        }
 
                        return jobStatusListener;
                }
        }
+       
+       // 
------------------------------------------------------------------------
+       
+       private class ScheduledTrigger extends TimerTask {
+               
+               @Override
+               public void run() {
+                       try {
+                               triggerCheckpoint(System.currentTimeMillis());
+                       }
+                       catch (Exception e) {
+                               LOG.error("Exception while triggering 
checkpoint", e);
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
index 7e32b72..8bdab7f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
@@ -32,19 +32,15 @@ import java.util.UUID;
 public class CheckpointCoordinatorDeActivator extends FlinkUntypedActor {
 
        private final CheckpointCoordinator coordinator;
-       private final long interval;
        private final UUID leaderSessionID;
        
        public CheckpointCoordinatorDeActivator(
                        CheckpointCoordinator coordinator,
-                       long interval,
                        UUID leaderSessionID) {
 
                LOG.info("Create CheckpointCoordinatorDeActivator");
 
                this.coordinator = Preconditions.checkNotNull(coordinator, "The 
checkpointCoordinator must not be null.");
-
-               this.interval = interval;
                this.leaderSessionID = leaderSessionID;
        }
 
@@ -55,11 +51,10 @@ public class CheckpointCoordinatorDeActivator extends 
FlinkUntypedActor {
                        
                        if (status == JobStatus.RUNNING) {
                                // start the checkpoint scheduler
-                               
coordinator.startPeriodicCheckpointScheduler(interval);
-                       }
-                       else {
+                               coordinator.startCheckpointScheduler();
+                       } else {
                                // anything else should stop the trigger for now
-                               coordinator.stopPeriodicCheckpointScheduler();
+                               coordinator.stopCheckpointScheduler();
                        }
                }
                

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 19c65d4..b94e5bb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -117,7 +117,7 @@ public class PendingCheckpoint {
                        if (notYetAcknowledgedTasks.isEmpty()) {
                                CompletedCheckpoint completed =  new 
CompletedCheckpoint(jobId, checkpointId,
                                                checkpointTimestamp, new 
ArrayList<StateForTask>(collectedStates));
-                               discard(null, false);
+                               dispose(null, false);
                                
                                return completed;
                        }
@@ -150,11 +150,15 @@ public class PendingCheckpoint {
        /**
         * Discards the pending checkpoint, releasing all held resources.
         */
-       public void discard(ClassLoader userClassLoader, boolean 
discardStateHandle) {
+       public void discard(ClassLoader userClassLoader) {
+               dispose(userClassLoader, true);
+       }
+
+       private void dispose(ClassLoader userClassLoader, boolean releaseState) 
{
                synchronized (lock) {
                        discarded = true;
                        numAcknowledgedTasks = -1;
-                       if (discardStateHandle) {
+                       if (releaseState) {
                                for (StateForTask state : collectedStates) {
                                        state.discard(userClassLoader);
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index d10aac1..9218fe4 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -182,9 +182,6 @@ public class ExecutionGraph implements Serializable {
         * from results than need to be materialized. */
        private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
 
-       /** Flag that indicate whether the executed dataflow should be 
periodically snapshotted */
-       private boolean snapshotCheckpointsEnabled;
-
        /** Flag to indicate whether the Graph has been archived */
        private boolean isArchived = false;
                
@@ -341,9 +338,12 @@ public class ExecutionGraph implements Serializable {
        public boolean isArchived() {
                return isArchived;
        }
+       
        public void enableSnapshotCheckpointing(
                        long interval,
                        long checkpointTimeout,
+                       long minPauseBetweenCheckpoints,
+                       int maxConcurrentCheckpoints,
                        List<ExecutionJobVertex> verticesToTrigger,
                        List<ExecutionJobVertex> verticesToWaitFor,
                        List<ExecutionJobVertex> verticesToCommitTo,
@@ -368,11 +368,13 @@ public class ExecutionGraph implements Serializable {
                // disable to make sure existing checkpoint coordinators are 
cleared
                disableSnaphotCheckpointing();
                
-               // create the coordinator that triggers and commits checkpoints 
and holds the state 
-               snapshotCheckpointsEnabled = true;
+               // create the coordinator that triggers and commits checkpoints 
and holds the state
                checkpointCoordinator = new CheckpointCoordinator(
                                jobID,
+                               interval,
                                checkpointTimeout,
+                               minPauseBetweenCheckpoints,
+                               maxConcurrentCheckpoints,
                                tasksToTrigger,
                                tasksToWaitFor,
                                tasksToCommitTo,
@@ -384,10 +386,7 @@ public class ExecutionGraph implements Serializable {
                // the periodic checkpoint scheduler is activated and 
deactivated as a result of
                // job status changes (running -> on, all other states -> off)
                registerJobStatusListener(
-                               checkpointCoordinator.createJobStatusListener(
-                                               actorSystem,
-                                               interval,
-                                               leaderSessionID));
+                               
checkpointCoordinator.createActivatorDeactivator(actorSystem, leaderSessionID));
        }
 
        /**
@@ -401,16 +400,11 @@ public class ExecutionGraph implements Serializable {
                        throw new IllegalStateException("Job must be in CREATED 
state");
                }
                
-               snapshotCheckpointsEnabled = false;
                if (checkpointCoordinator != null) {
                        checkpointCoordinator.shutdown();
                        checkpointCoordinator = null;
                }
        }
-       
-       public boolean isSnapshotCheckpointsEnabled() {
-               return snapshotCheckpointsEnabled;
-       }
 
        public CheckpointCoordinator getCheckpointCoordinator() {
                return checkpointCoordinator;

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
index 86c9b60..d58be52 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
@@ -22,17 +22,17 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import java.util.List;
 
+import static java.util.Objects.requireNonNull;
+
 /**
- * The JobSnapshottingSettings are attached to a JobGraph and describe the 
settings
- * for the asynchronous snapshotting of the JobGraph, such as interval, and 
which vertices
+ * The JobCheckpointingSettings are attached to a JobGraph and describe the 
settings
+ * for the asynchronous checkpoints of the JobGraph, such as interval, and 
which vertices
  * need to participate.
  */
 public class JobSnapshottingSettings implements java.io.Serializable{
        
        private static final long serialVersionUID = -2593319571078198180L;
 
-       /** The default time in which pending checkpoints need to be 
acknowledged before timing out */
-       public static final long DEFAULT_SNAPSHOT_TIMEOUT = 10 * 60 * 1000; // 
10 minutes
        
        private final List<JobVertexID> verticesToTrigger;
 
@@ -43,26 +43,32 @@ public class JobSnapshottingSettings implements 
java.io.Serializable{
        private final long checkpointInterval;
        
        private final long checkpointTimeout;
-
-
-       public JobSnapshottingSettings(List<JobVertexID> verticesToTrigger,
-                                                                       
List<JobVertexID> verticesToAcknowledge,
-                                                                       
List<JobVertexID> verticesToConfirm,
-                                                                       long 
checkpointInterval)
-       {
-               this(verticesToTrigger, verticesToAcknowledge, 
verticesToConfirm, checkpointInterval, DEFAULT_SNAPSHOT_TIMEOUT);
-       }
+       
+       private final long minPauseBetweenCheckpoints;
+       
+       private final int maxConcurrentCheckpoints;
+       
        
        public JobSnapshottingSettings(List<JobVertexID> verticesToTrigger,
                                                                        
List<JobVertexID> verticesToAcknowledge,
                                                                        
List<JobVertexID> verticesToConfirm,
-                                                                       long 
checkpointInterval, long checkpointTimeout)
+                                                                       long 
checkpointInterval, long checkpointTimeout,
+                                                                       long 
minPauseBetweenCheckpoints, int maxConcurrentCheckpoints)
        {
-               this.verticesToTrigger = verticesToTrigger;
-               this.verticesToAcknowledge = verticesToAcknowledge;
-               this.verticesToConfirm = verticesToConfirm;
+               // sanity checks
+               if (checkpointInterval < 1 || checkpointTimeout < 1 ||
+                               minPauseBetweenCheckpoints < 0 || 
maxConcurrentCheckpoints < 1)
+               {
+                       throw new IllegalArgumentException();
+               }
+               
+               this.verticesToTrigger = requireNonNull(verticesToTrigger);
+               this.verticesToAcknowledge = 
requireNonNull(verticesToAcknowledge);
+               this.verticesToConfirm = requireNonNull(verticesToConfirm);
                this.checkpointInterval = checkpointInterval;
                this.checkpointTimeout = checkpointTimeout;
+               this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+               this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
        }
        
        // 
--------------------------------------------------------------------------------------------
@@ -87,11 +93,22 @@ public class JobSnapshottingSettings implements 
java.io.Serializable{
                return checkpointTimeout;
        }
 
+       public long getMinPauseBetweenCheckpoints() {
+               return minPauseBetweenCheckpoints;
+       }
+
+       public int getMaxConcurrentCheckpoints() {
+               return maxConcurrentCheckpoints;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        
        @Override
        public String toString() {
-               return String.format("SnapshotSettings: interval=%d, 
timeout=%d, trigger=%s, ack=%s, commit=%s",
-                               checkpointInterval, checkpointTimeout, 
verticesToTrigger, verticesToAcknowledge, verticesToConfirm);
+               return String.format("SnapshotSettings: interval=%d, 
timeout=%d, pause-between=%d, " +
+                                               "maxConcurrent=%d, trigger=%s, 
ack=%s, commit=%s",
+                                               checkpointInterval, 
checkpointTimeout,
+                                               minPauseBetweenCheckpoints, 
maxConcurrentCheckpoints,
+                                               verticesToTrigger, 
verticesToAcknowledge, verticesToConfirm);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index c4d0fbb..8cbb13a 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -913,6 +913,8 @@ class JobManager(
           executionGraph.enableSnapshotCheckpointing(
             snapshotSettings.getCheckpointInterval,
             snapshotSettings.getCheckpointTimeout,
+            snapshotSettings.getMinPauseBetweenCheckpoints,
+            snapshotSettings.getMaxConcurrentCheckpoints,
             triggerVertices,
             ackVertices,
             confirmVertices,

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index f6ee5c5..cd52fd4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -28,10 +28,17 @@ import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
+
 import org.junit.Test;
 
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -39,6 +46,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -70,7 +78,7 @@ public class CheckpointCoordinatorTest {
 
                        // set up the coordinator and validate the initial state
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                                       jid, 600000,
+                                       jid, 600000, 600000,
                                        new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
                                        new ExecutionVertex[] { ackVertex1, 
ackVertex2 },
                                        new ExecutionVertex[] {}, cl,
@@ -116,7 +124,7 @@ public class CheckpointCoordinatorTest {
 
                        // set up the coordinator and validate the initial state
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                                       jid, 600000,
+                                       jid, 600000, 600000,
                                        new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
                                        new ExecutionVertex[] { ackVertex1, 
ackVertex2 },
                                        new ExecutionVertex[] {}, cl,
@@ -160,7 +168,7 @@ public class CheckpointCoordinatorTest {
 
                        // set up the coordinator and validate the initial state
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                                       jid, 600000,
+                                       jid, 600000, 600000,
                                        new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
                                        new ExecutionVertex[] { ackVertex1, 
ackVertex2 },
                                        new ExecutionVertex[] {}, cl, new 
StandaloneCheckpointIDCounter(), new
@@ -199,7 +207,7 @@ public class CheckpointCoordinatorTest {
 
                        // set up the coordinator and validate the initial state
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                                       jid, 600000,
+                                       jid, 600000, 600000,
                                        new ExecutionVertex[] { vertex1, 
vertex2 },
                                        new ExecutionVertex[] { vertex1, 
vertex2 },
                                        new ExecutionVertex[] { vertex1, 
vertex2 }, cl,
@@ -343,7 +351,7 @@ public class CheckpointCoordinatorTest {
 
                        // set up the coordinator and validate the initial state
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                                       jid, 600000,
+                                       jid, 600000, 600000,
                                        new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
                                        new ExecutionVertex[] { ackVertex1, 
ackVertex2, ackVertex3 },
                                        new ExecutionVertex[] { commitVertex }, 
cl,
@@ -472,7 +480,7 @@ public class CheckpointCoordinatorTest {
 
                        // set up the coordinator and validate the initial state
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                                       jid, 600000,
+                                       jid, 600000, 600000,
                                        new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
                                        new ExecutionVertex[] { ackVertex1, 
ackVertex2, ackVertex3 },
                                        new ExecutionVertex[] { commitVertex }, 
cl,
@@ -587,7 +595,7 @@ public class CheckpointCoordinatorTest {
                        // the timeout for the checkpoint is a 200 milliseconds
 
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                                       jid, 200,
+                                       jid, 600000, 200,
                                        new ExecutionVertex[] { triggerVertex },
                                        new ExecutionVertex[] { ackVertex1, 
ackVertex2 },
                                        new ExecutionVertex[] { commitVertex }, 
cl,
@@ -649,7 +657,7 @@ public class CheckpointCoordinatorTest {
                        ExecutionVertex commitVertex = 
mockExecutionVertex(commitAttemptID);
 
                        CheckpointCoordinator coord = new CheckpointCoordinator(
-                                       jid, 200000,
+                                       jid, 200000, 200000,
                                        new ExecutionVertex[] { triggerVertex },
                                        new ExecutionVertex[] { ackVertex1, 
ackVertex2 },
                                        new ExecutionVertex[] { commitVertex }, 
cl, new StandaloneCheckpointIDCounter
@@ -680,14 +688,343 @@ public class CheckpointCoordinatorTest {
                }
        }
 
+       @Test
+       public void testPeriodicTriggering() {
+               try {
+                       final JobID jid = new JobID();
+                       final long start = System.currentTimeMillis();
+
+                       // create some mock execution vertices and trigger some 
checkpoint
+
+                       final ExecutionAttemptID triggerAttemptID = new 
ExecutionAttemptID();
+                       final ExecutionAttemptID ackAttemptID = new 
ExecutionAttemptID();
+                       final ExecutionAttemptID commitAttemptID = new 
ExecutionAttemptID();
+
+                       ExecutionVertex triggerVertex = 
mockExecutionVertex(triggerAttemptID);
+                       ExecutionVertex ackVertex = 
mockExecutionVertex(ackAttemptID);
+                       ExecutionVertex commitVertex = 
mockExecutionVertex(commitAttemptID);
+
+                       final AtomicInteger numCalls = new AtomicInteger();
+                       
+                       doAnswer(new Answer<Void>() {
+                               
+                               private long lastId = -1;
+                               private long lastTs = -1;
+                               
+                               @Override
+                               public Void answer(InvocationOnMock invocation) 
throws Throwable {
+                                       TriggerCheckpoint message = 
(TriggerCheckpoint) invocation.getArguments()[0];
+                                       long id = message.getCheckpointId();
+                                       long ts = message.getTimestamp();
+                                       
+                                       assertTrue(id > lastId);
+                                       assertTrue(ts >= lastTs);
+                                       assertTrue(ts >= start);
+                                       
+                                       lastId = id;
+                                       lastTs = ts;
+                                       numCalls.incrementAndGet();
+                                       return null;
+                               }
+                       
}).when(triggerVertex).sendMessageToCurrentExecution(any(Serializable.class), 
any(ExecutionAttemptID.class));
+                       
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                                       jid,
+                                       10,             // periodic interval is 
10 ms
+                                       200000, // timeout is very long (200 s)
+                                       new ExecutionVertex[] { triggerVertex },
+                                       new ExecutionVertex[] { ackVertex },
+                                       new ExecutionVertex[] { commitVertex }, 
cl, new StandaloneCheckpointIDCounter
+                                       (), new 
StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE);
+
+                       
+                       coord.startCheckpointScheduler();
+                       
+                       long timeout = System.currentTimeMillis() + 60000;
+                       do {
+                               Thread.sleep(20);
+                       }
+                       while (timeout > System.currentTimeMillis() && 
numCalls.get() < 5);
+                       assertTrue(numCalls.get() >= 5);
+                       
+                       coord.stopCheckpointScheduler();
+                       
+                       
+                       // for 400 ms, no further calls may come.
+                       // there may be the case that one trigger was fired and 
about to
+                       // acquire the lock, such that after cancelling it will 
still do
+                       // the remainder of its work
+                       int numCallsSoFar = numCalls.get();
+                       Thread.sleep(400);
+                       assertTrue(numCallsSoFar == numCalls.get() ||
+                                       numCallsSoFar+1 == numCalls.get());
+                       
+                       // start another sequence of periodic scheduling
+                       numCalls.set(0);
+                       coord.startCheckpointScheduler();
+
+                       timeout = System.currentTimeMillis() + 60000;
+                       do {
+                               Thread.sleep(20);
+                       }
+                       while (timeout > System.currentTimeMillis() && 
numCalls.get() < 5);
+                       assertTrue(numCalls.get() >= 5);
+                       
+                       coord.stopCheckpointScheduler();
+
+                       // for 400 ms, no further calls may come
+                       // there may be the case that one trigger was fired and 
about to
+                       // acquire the lock, such that after cancelling it will 
still do
+                       // the remainder of its work
+                       numCallsSoFar = numCalls.get();
+                       Thread.sleep(400);
+                       assertTrue(numCallsSoFar == numCalls.get() ||
+                                       numCallsSoFar + 1 == numCalls.get());
+
+                       coord.shutdown();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testMaxConcurrentAttempts1() {
+               testMaxConcurrentAttemps(1);
+       }
+
+       @Test
+       public void testMaxConcurrentAttempts2() {
+               testMaxConcurrentAttemps(2);
+       }
+
+       @Test
+       public void testMaxConcurrentAttempts5() {
+               testMaxConcurrentAttemps(5);
+       }
+       
+       private void testMaxConcurrentAttemps(int maxConcurrentAttempts) {
+               try {
+                       final JobID jid = new JobID();
+
+                       // create some mock execution vertices and trigger some 
checkpoint
+                       final ExecutionAttemptID triggerAttemptID = new 
ExecutionAttemptID();
+                       final ExecutionAttemptID ackAttemptID = new 
ExecutionAttemptID();
+                       final ExecutionAttemptID commitAttemptID = new 
ExecutionAttemptID();
+
+                       ExecutionVertex triggerVertex = 
mockExecutionVertex(triggerAttemptID);
+                       ExecutionVertex ackVertex = 
mockExecutionVertex(ackAttemptID);
+                       ExecutionVertex commitVertex = 
mockExecutionVertex(commitAttemptID);
+
+                       final AtomicInteger numCalls = new AtomicInteger();
+
+                       doAnswer(new Answer<Void>() {
+                               @Override
+                               public Void answer(InvocationOnMock invocation) 
throws Throwable {
+                                       numCalls.incrementAndGet();
+                                       return null;
+                               }
+                       
}).when(triggerVertex).sendMessageToCurrentExecution(any(Serializable.class), 
any(ExecutionAttemptID.class));
+
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                                       jid,
+                                       10,             // periodic interval is 
10 ms
+                                       200000, // timeout is very long (200 s)
+                                       0L,             // no extra delay
+                                       maxConcurrentAttempts,
+                                       new ExecutionVertex[] { triggerVertex },
+                                       new ExecutionVertex[] { ackVertex },
+                                       new ExecutionVertex[] { commitVertex }, 
cl, new StandaloneCheckpointIDCounter
+                                       (), new 
StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE);
+
+
+                       coord.startCheckpointScheduler();
+
+                       // after a while, there should be exactly as many 
checkpoints
+                       // as concurrently permitted
+                       long now = System.currentTimeMillis();
+                       long timeout = now + 60000;
+                       long minDuration = now + 100;
+                       do {
+                               Thread.sleep(20);
+                       }
+                       while ((now = System.currentTimeMillis()) < minDuration 
||
+                                       (numCalls.get() < maxConcurrentAttempts 
&& now < timeout));
+                       
+                       assertEquals(maxConcurrentAttempts, numCalls.get());
+                       
+                       verify(triggerVertex, times(maxConcurrentAttempts))
+                                       
.sendMessageToCurrentExecution(any(TriggerCheckpoint.class), 
eq(triggerAttemptID));
+                       
+                       // now, once we acknowledge one checkpoint, it should 
trigger the next one
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID, 1L));
+                       
+                       // this should have immediately triggered a new 
checkpoint
+                       now = System.currentTimeMillis();
+                       timeout = now + 60000;
+                       do {
+                               Thread.sleep(20);
+                       }
+                       while (numCalls.get() < maxConcurrentAttempts + 1 && 
now < timeout);
+
+                       assertEquals(maxConcurrentAttempts + 1, numCalls.get());
+                       
+                       // no further checkpoints should happen
+                       Thread.sleep(200);
+                       assertEquals(maxConcurrentAttempts + 1, numCalls.get());
+                       
+                       coord.shutdown();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testMaxConcurrentAttempsWithSubsumption() {
+               try {
+                       final int maxConcurrentAttempts = 2;
+                       final JobID jid = new JobID();
+
+                       // create some mock execution vertices and trigger some 
checkpoint
+                       final ExecutionAttemptID triggerAttemptID = new 
ExecutionAttemptID();
+                       final ExecutionAttemptID ackAttemptID = new 
ExecutionAttemptID();
+                       final ExecutionAttemptID commitAttemptID = new 
ExecutionAttemptID();
+
+                       ExecutionVertex triggerVertex = 
mockExecutionVertex(triggerAttemptID);
+                       ExecutionVertex ackVertex = 
mockExecutionVertex(ackAttemptID);
+                       ExecutionVertex commitVertex = 
mockExecutionVertex(commitAttemptID);
+
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                                       jid,
+                                       10,             // periodic interval is 
10 ms
+                                       200000, // timeout is very long (200 s)
+                                       0L,             // no extra delay
+                                       maxConcurrentAttempts, // max two 
concurrent checkpoints
+                                       new ExecutionVertex[] { triggerVertex },
+                                       new ExecutionVertex[] { ackVertex },
+                                       new ExecutionVertex[] { commitVertex }, 
cl, new StandaloneCheckpointIDCounter
+                                       (), new 
StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE);
+
+
+                       coord.startCheckpointScheduler();
+
+                       // after a while, there should be exactly as many 
checkpoints
+                       // as concurrently permitted 
+                       long now = System.currentTimeMillis();
+                       long timeout = now + 60000;
+                       long minDuration = now + 100;
+                       do {
+                               Thread.sleep(20);
+                       }
+                       while ((now = System.currentTimeMillis()) < minDuration 
||
+                                       (coord.getNumberOfPendingCheckpoints() 
< maxConcurrentAttempts && now < timeout));
+                       
+                       // validate that the pending checkpoints are there
+                       assertEquals(maxConcurrentAttempts, 
coord.getNumberOfPendingCheckpoints());
+                       assertNotNull(coord.getPendingCheckpoints().get(1L));
+                       assertNotNull(coord.getPendingCheckpoints().get(2L));
+
+                       // now we acknowledge the second checkpoint, which 
should subsume the first checkpoint
+                       // and allow two more checkpoints to be triggered
+                       // now, once we acknowledge one checkpoint, it should 
trigger the next one
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID, 2L));
+
+                       // after a while, there should be the new checkpoints
+                       final long newTimeout = System.currentTimeMillis() + 
60000;
+                       do {
+                               Thread.sleep(20);
+                       }
+                       while (coord.getPendingCheckpoints().get(4L) == null && 
+                                       System.currentTimeMillis() < 
newTimeout);
+                       
+                       // do the final check
+                       assertEquals(maxConcurrentAttempts, 
coord.getNumberOfPendingCheckpoints());
+                       assertNotNull(coord.getPendingCheckpoints().get(3L));
+                       assertNotNull(coord.getPendingCheckpoints().get(4L));
+                       
+                       coord.shutdown();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testPeriodicSchedulingWithInactiveTasks() {
+               try {
+                       final JobID jid = new JobID();
+
+                       // create some mock execution vertices and trigger some 
checkpoint
+                       final ExecutionAttemptID triggerAttemptID = new 
ExecutionAttemptID();
+                       final ExecutionAttemptID ackAttemptID = new 
ExecutionAttemptID();
+                       final ExecutionAttemptID commitAttemptID = new 
ExecutionAttemptID();
+
+                       ExecutionVertex triggerVertex = 
mockExecutionVertex(triggerAttemptID);
+                       ExecutionVertex ackVertex = 
mockExecutionVertex(ackAttemptID);
+                       ExecutionVertex commitVertex = 
mockExecutionVertex(commitAttemptID);
+
+                       final AtomicReference<ExecutionState> currentState = 
new AtomicReference<>(ExecutionState.CREATED);
+                       
when(triggerVertex.getCurrentExecutionAttempt().getState()).thenAnswer(
+                                       new Answer<ExecutionState>() {
+                                               @Override
+                                               public ExecutionState 
answer(InvocationOnMock invocation){
+                                                       return 
currentState.get();
+                                               }
+                                       });
+                       
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
+                                       jid,
+                                       10,             // periodic interval is 
10 ms
+                                       200000, // timeout is very long (200 s)
+                                       0L,             // no extra delay
+                                       2, // max two concurrent checkpoints
+                                       new ExecutionVertex[] { triggerVertex },
+                                       new ExecutionVertex[] { ackVertex },
+                                       new ExecutionVertex[] { commitVertex }, 
cl, new StandaloneCheckpointIDCounter(),
+                                       new 
StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE);
+                       
+                       coord.startCheckpointScheduler();
+
+                       // no checkpoint should have started so far
+                       Thread.sleep(200);
+                       assertEquals(0, coord.getNumberOfPendingCheckpoints());
+                       
+                       // now move the state to RUNNING
+                       currentState.set(ExecutionState.RUNNING);
+                       
+                       // the coordinator should start checkpointing now
+                       final long timeout = System.currentTimeMillis() + 10000;
+                       do {
+                               Thread.sleep(20);
+                       }
+                       while (System.currentTimeMillis() < timeout && 
+                                       coord.getNumberOfPendingCheckpoints() 
== 0);
+                       
+                       assertTrue(coord.getNumberOfPendingCheckpoints() > 0);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
        private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID 
attemptID) {
                return mockExecutionVertex(attemptID, ExecutionState.RUNNING);
        }
 
-       private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID 
attemptID, ExecutionState state) {
+       private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID 
attemptID, 
+                                                                               
                                ExecutionState state, ExecutionState ... 
successiveStates) {
                final Execution exec = mock(Execution.class);
                when(exec.getAttemptId()).thenReturn(attemptID);
-               when(exec.getState()).thenReturn(state);
+               when(exec.getState()).thenReturn(state, successiveStates);
 
                ExecutionVertex vertex = mock(ExecutionVertex.class);
                when(vertex.getJobvertexId()).thenReturn(new JobVertexID());

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 7b2c2d4..bec04bd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -80,7 +80,7 @@ public class CheckpointStateRestoreTest {
                        map.put(statelessId, stateless);
 
 
-                       CheckpointCoordinator coord = new 
CheckpointCoordinator(jid, 200000L,
+                       CheckpointCoordinator coord = new 
CheckpointCoordinator(jid, 200000L, 200000L,
                                        new ExecutionVertex[] { stateful1, 
stateful2, stateful3, stateless1, stateless2 },
                                        new ExecutionVertex[] { stateful1, 
stateful2, stateful3, stateless1, stateless2 },
                                        new ExecutionVertex[0], cl,
@@ -151,7 +151,7 @@ public class CheckpointStateRestoreTest {
                        map.put(statelessId, stateless);
 
 
-                       CheckpointCoordinator coord = new 
CheckpointCoordinator(jid, 200000L,
+                       CheckpointCoordinator coord = new 
CheckpointCoordinator(jid, 200000L, 200000L,
                                        new ExecutionVertex[] { stateful1, 
stateful2, stateful3, stateless1, stateless2 },
                                        new ExecutionVertex[] { stateful1, 
stateful2, stateful3, stateless1, stateless2 },
                                        new ExecutionVertex[0], cl,
@@ -193,7 +193,7 @@ public class CheckpointStateRestoreTest {
        @Test
        public void testNoCheckpointAvailable() {
                try {
-                       CheckpointCoordinator coord = new 
CheckpointCoordinator(new JobID(), 200000L,
+                       CheckpointCoordinator coord = new 
CheckpointCoordinator(new JobID(), 200000L, 200000L,
                                        new ExecutionVertex[] { 
mock(ExecutionVertex.class) },
                                        new ExecutionVertex[] { 
mock(ExecutionVertex.class) },
                                        new ExecutionVertex[0], cl,

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index f6e4ab8..1c666e5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -61,7 +61,8 @@ public class CoordinatorShutdownTest {
                        List<JobVertexID> vertexIdList = 
Collections.singletonList(vertex.getID());
                        
                        JobGraph testGraph = new JobGraph("test job", vertex);
-                       testGraph.setSnapshotSettings(new 
JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 5000));
+                       testGraph.setSnapshotSettings(new 
JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 
+                                       5000, 60000, 0L, Integer.MAX_VALUE));
                        
                        ActorGateway jmGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 
@@ -112,7 +113,8 @@ public class CoordinatorShutdownTest {
                        List<JobVertexID> vertexIdList = 
Collections.singletonList(vertex.getID());
 
                        JobGraph testGraph = new JobGraph("test job", vertex);
-                       testGraph.setSnapshotSettings(new 
JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 5000));
+                       testGraph.setSnapshotSettings(new 
JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList,
+                                       5000, 60000, 0L, Integer.MAX_VALUE));
                        
                        ActorGateway jmGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
new file mode 100644
index 0000000..0320d6b
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import org.apache.flink.streaming.api.CheckpointingMode;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Configuration that captures all checkpointing related settings.
+ */
+public class CheckpointConfig implements java.io.Serializable {
+
+       private static final long serialVersionUID = -750378776078908147L;
+
+       /** The default checkpoint mode: exactly once */
+       public static final CheckpointingMode DEFAULT_MODE = 
CheckpointingMode.EXACTLY_ONCE;
+
+       /** The default timeout of a checkpoint attempt: 10 minutes */
+       public static final long DEFAULT_TIMEOUT = 10 * 60 * 1000;
+
+       /** The default minimum pause to be made between checkpoints: none */
+       public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS = 0;
+
+       /** The default limit of concurrently happening checkpoints: one */
+       public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1;
+
+       // 
------------------------------------------------------------------------
+
+       /** Checkpointing mode (exactly-once vs. at-least-once). */
+       private CheckpointingMode checkpointingMode = DEFAULT_MODE;
+
+       /** Periodic checkpoint triggering interval */
+       private long checkpointInterval = -1; // disabled
+
+       /** Maximum time checkpoint may take before being discarded */
+       private long checkpointTimeout = DEFAULT_TIMEOUT;
+
+       /** Minimal pause between checkpointing attempts */
+       private long minPauseBetweenCheckpoints = 
DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS;
+
+       /** Maximum number of checkpoint attempts in progress at the same time 
*/
+       private int maxConcurrentCheckpoints = 
DEFAULT_MAX_CONCURRENT_CHECKPOINTS;
+
+       /** Flag to force checkpointing in iterative jobs */
+       private boolean forceCheckpointing;
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Checks whether checkpointing is enabled.
+        * 
+        * @return True if checkpointing is enables, false otherwise.
+        */
+       public boolean isCheckpointingEnabled() {
+               return checkpointInterval > 0;
+       }
+       
+       /**
+        * Gets the checkpointing mode (exactly-once vs. at-least-once).
+        * 
+        * @return The checkpointing mode.
+        */
+       public CheckpointingMode getCheckpointingMode() {
+               return checkpointingMode;
+       }
+
+       /**
+        * Sets the checkpointing mode (exactly-once vs. at-least-once).
+        * 
+        * @param checkpointingMode The checkpointing mode.
+        */
+       public void setCheckpointingMode(CheckpointingMode checkpointingMode) {
+               this.checkpointingMode = requireNonNull(checkpointingMode);
+       }
+
+       /**
+        * Gets the interval in which checkpoints are periodically scheduled.
+        * 
+        * <p>This setting defines the base interval. Checkpoint triggering may 
be delayed by the settings
+        * {@link #getMaxConcurrentCheckpoints()} and {@link 
#getMinPauseBetweenCheckpoints()}.
+        * 
+        * @return The checkpoint interval, in milliseconds.
+        */
+       public long getCheckpointInterval() {
+               return checkpointInterval;
+       }
+
+       /**
+        * Sets the interval in which checkpoints are periodically scheduled.
+        *
+        * <p>This setting defines the base interval. Checkpoint triggering may 
be delayed by the settings
+        * {@link #setMaxConcurrentCheckpoints(int)} and {@link 
#setMinPauseBetweenCheckpoints(long)}.
+        *
+        * @param checkpointInterval The checkpoint interval, in milliseconds.
+        */
+       public void setCheckpointInterval(long checkpointInterval) {
+               if (checkpointInterval <= 0) {
+                       throw new IllegalArgumentException("Checkpoint interval 
must be larger than zero");
+               }
+               this.checkpointInterval = checkpointInterval;
+       }
+
+       /**
+        * Gets the maximum time that a checkpoint may take before being 
discarded.
+        * 
+        * @return The checkpoint timeout, in milliseconds.
+        */
+       public long getCheckpointTimeout() {
+               return checkpointTimeout;
+       }
+
+       /**
+        * Sets the maximum time that a checkpoint may take before being 
discarded.
+        * 
+        * @param checkpointTimeout The checkpoint timeout, in milliseconds.
+        */
+       public void setCheckpointTimeout(long checkpointTimeout) {
+               if (checkpointInterval <= 0) {
+                       throw new IllegalArgumentException("Checkpoint timeout 
must be larger than zero");
+               }
+               this.checkpointTimeout = checkpointTimeout;
+       }
+
+       /**
+        * Gets the minimal pause between checkpointing attempts. This setting 
defines how soon the
+        * checkpoint coordinator may trigger another checkpoint after it 
becomes possible to trigger
+        * another checkpoint with respect to the maximum number of concurrent 
checkpoints
+        * (see {@link #getMaxConcurrentCheckpoints()}).
+        *
+        * @return The minimal pause before the next checkpoint is triggered.
+        */
+       public long getMinPauseBetweenCheckpoints() {
+               return minPauseBetweenCheckpoints;
+       }
+
+//     /**
+//      * Sets the minimal pause between checkpointing attempts. This setting 
defines how soon the
+//      * checkpoint coordinator may trigger another checkpoint after it 
becomes possible to trigger
+//      * another checkpoint with respect to the maximum number of concurrent 
checkpoints
+//      * (see {@link #setMaxConcurrentCheckpoints(int)}).
+//      * 
+//      * <p>If the maximum number of concurrent checkpoints is set to one, 
this setting makes effectively sure
+//      * that a minimum amount of time passes where no checkpoint is in 
progress at all.
+//      * 
+//      * @param minPauseBetweenCheckpoints The minimal pause before the next 
checkpoint is triggered.
+//      */
+//     public void setMinPauseBetweenCheckpoints(long 
minPauseBetweenCheckpoints) {
+//             if (minPauseBetweenCheckpoints < 0) {
+//                     throw new IllegalArgumentException("Pause value must be 
zero or positive");
+//             }
+//             this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+//     }
+
+       /**
+        * Gets the maximum number of checkpoint attempts that may be in 
progress at the same time. If this
+        * value is <i>n</i>, then no checkpoints will be triggered while 
<i>n</i> checkpoint attempts are
+        * currently in flight. For the next checkpoint to be triggered, one 
checkpoint attempt would need
+        * to finish or expire.
+        * 
+        * @return The maximum number of concurrent checkpoint attempts.
+        */
+       public int getMaxConcurrentCheckpoints() {
+               return maxConcurrentCheckpoints;
+       }
+
+       /**
+        * Sets the maximum number of checkpoint attempts that may be in 
progress at the same time. If this
+        * value is <i>n</i>, then no checkpoints will be triggered while 
<i>n</i> checkpoint attempts are
+        * currently in flight. For the next checkpoint to be triggered, one 
checkpoint attempt would need
+        * to finish or expire.
+        * 
+        * @param maxConcurrentCheckpoints The maximum number of concurrent 
checkpoint attempts.
+        */
+       public void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints) {
+               if (maxConcurrentCheckpoints < 1) {
+                       throw new IllegalArgumentException("The maximum number 
of concurrent attempts must be at least one.");
+               }
+               this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
+       }
+
+       /**
+        * Checks whether checkpointing is forced, despite currently 
non-checkpointable iteration feedback.
+        * 
+        * @return True, if checkpointing is forced, false otherwise.
+        * 
+        * @deprecated This will be removed once iterations properly 
participate in checkpointing.
+        */
+       @Deprecated
+       public boolean isForceCheckpointing() {
+               return forceCheckpointing;
+       }
+
+       /**
+        * Checks whether checkpointing is forced, despite currently 
non-checkpointable iteration feedback.
+        * 
+        * @param forceCheckpointing The flag to force checkpointing. 
+        * 
+        * @deprecated This will be removed once iterations properly 
participate in checkpointing.
+        */
+       @Deprecated
+       public void setForceCheckpointing(boolean forceCheckpointing) {
+               this.forceCheckpointing = forceCheckpointing;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 72722bf..cb5fce5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -114,17 +114,14 @@ public abstract class StreamExecutionEnvironment {
        /** The execution configuration for this environment */
        private final ExecutionConfig config = new ExecutionConfig();
        
+       /** Settings that control the checkpointing behavior */ 
+       private final CheckpointConfig checkpointCfg = new CheckpointConfig();
+       
        protected final List<StreamTransformation<?>> transformations = new 
ArrayList<>();
        
        private long bufferTimeout = DEFAULT_NETWORK_BUFFER_TIMEOUT;
        
        protected boolean isChainingEnabled = true;
-
-       protected long checkpointInterval = -1; // disabled
-
-       protected CheckpointingMode checkpointingMode;
-
-       protected boolean forceCheckpointing = false;
        
        /** The state backend used for storing k/v state and state snapshots */
        private StateBackend<?> defaultStateBackend;
@@ -239,7 +236,17 @@ public abstract class StreamExecutionEnvironment {
        // 
------------------------------------------------------------------------
        //  Checkpointing Settings
        // 
------------------------------------------------------------------------
-       
+
+       /**
+        * Gets the checkpoint config, which defines values like checkpoint 
interval, delay between
+        * checkpoints, etc.
+        * 
+        * @return The checkpoint config.
+        */
+       public CheckpointConfig getCheckpointConfig() {
+               return checkpointCfg;
+       }
+
        /**
         * Enables checkpointing for the streaming job. The distributed state 
of the streaming
         * dataflow will be periodically snapshotted. In case of a failure, the 
streaming
@@ -257,7 +264,8 @@ public abstract class StreamExecutionEnvironment {
         * @param interval Time interval between state checkpoints in 
milliseconds.
         */
        public StreamExecutionEnvironment enableCheckpointing(long interval) {
-               return enableCheckpointing(interval, 
CheckpointingMode.EXACTLY_ONCE);
+               checkpointCfg.setCheckpointInterval(interval);
+               return this;
        }
 
        /**
@@ -280,15 +288,8 @@ public abstract class StreamExecutionEnvironment {
         *             The checkpointing mode, selecting between "exactly once" 
and "at least once" guaranteed.
         */
        public StreamExecutionEnvironment enableCheckpointing(long interval, 
CheckpointingMode mode) {
-               if (mode == null) {
-                       throw new NullPointerException("checkpoint mode must 
not be null");
-               }
-               if (interval <= 0) {
-                       throw new IllegalArgumentException("the checkpoint 
interval must be positive");
-               }
-
-               this.checkpointInterval = interval;
-               this.checkpointingMode = mode;
+               checkpointCfg.setCheckpointingMode(mode);
+               checkpointCfg.setCheckpointInterval(interval);
                return this;
        }
        
@@ -312,10 +313,11 @@ public abstract class StreamExecutionEnvironment {
         *            If true checkpointing will be enabled for iterative jobs 
as well.
         */
        @Deprecated
+       @SuppressWarnings("deprecation")
        public StreamExecutionEnvironment enableCheckpointing(long interval, 
CheckpointingMode mode, boolean force) {
-               this.enableCheckpointing(interval, mode);
-
-               this.forceCheckpointing = force;
+               checkpointCfg.setCheckpointingMode(mode);
+               checkpointCfg.setCheckpointInterval(interval);
+               checkpointCfg.setForceCheckpointing(force);
                return this;
        }
 
@@ -337,32 +339,39 @@ public abstract class StreamExecutionEnvironment {
         */
        @Deprecated
        public StreamExecutionEnvironment enableCheckpointing() {
-               enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
+               checkpointCfg.setCheckpointInterval(500);
                return this;
        }
 
        /**
         * Returns the checkpointing interval or -1 if checkpointing is 
disabled.
+        * 
+        * <p>Shorthand for {@code 
getCheckpointConfig().getCheckpointInterval()}.
         *
         * @return The checkpointing interval or -1
         */
        public long getCheckpointInterval() {
-               return checkpointInterval;
+               return checkpointCfg.getCheckpointInterval();
        }
 
-
        /**
         * Returns whether checkpointing is force-enabled.
         */
+       @Deprecated
+       @SuppressWarnings("deprecation")
        public boolean isForceCheckpointing() {
-               return forceCheckpointing;
+               return checkpointCfg.isForceCheckpointing();
        }
 
        /**
-        * Returns the {@link CheckpointingMode}.
+        * Returns the checkpointing mode (exactly-once vs. at-least-once).
+        * 
+        * <p>Shorthand for {@code 
getCheckpointConfig().getCheckpointingMode()}.
+        * 
+        * @return The checkpoin
         */
        public CheckpointingMode getCheckpointingMode() {
-               return checkpointingMode;
+               return checkpointCfg.getCheckpointingMode();
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 7d8f9f9..bc3acb7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -23,15 +23,12 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -43,8 +40,8 @@ import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -59,7 +56,6 @@ import 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
 import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
-import org.apache.sling.commons.json.JSONException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,9 +66,6 @@ import org.slf4j.LoggerFactory;
  * 
  */
 public class StreamGraph extends StreamingPlan {
-
-       /** The default interval for checkpoints, in milliseconds */
-       public static final int DEFAULT_CHECKPOINTING_INTERVAL_MS = 5000;
        
        private static final Logger LOG = 
LoggerFactory.getLogger(StreamGraph.class);
 
@@ -80,11 +73,9 @@ public class StreamGraph extends StreamingPlan {
 
        private final StreamExecutionEnvironment environemnt;
        private final ExecutionConfig executionConfig;
-
-       private CheckpointingMode checkpointingMode;
-       private boolean checkpointingEnabled = false;
-       private long checkpointingInterval = DEFAULT_CHECKPOINTING_INTERVAL_MS;
-       private boolean chaining = true;
+       private final CheckpointConfig checkpointConfig;
+       
+       private boolean chaining;
 
        private Map<Integer, StreamNode> streamNodes;
        private Set<Integer> sources;
@@ -97,12 +88,11 @@ public class StreamGraph extends StreamingPlan {
        private StateBackend<?> stateBackend;
        private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
 
-       private boolean forceCheckpoint = false;
 
        public StreamGraph(StreamExecutionEnvironment environment) {
-
                this.environemnt = environment;
-               executionConfig = environment.getConfig();
+               this.executionConfig = environment.getConfig();
+               this.checkpointConfig = environment.getCheckpointConfig();
 
                // create an empty new stream graph.
                clear();
@@ -112,19 +102,23 @@ public class StreamGraph extends StreamingPlan {
         * Remove all registered nodes etc.
         */
        public void clear() {
-               streamNodes = Maps.newHashMap();
-               virtualSelectNodes = Maps.newHashMap();
-               virtuaPartitionNodes = Maps.newHashMap();
-               vertexIDtoBrokerID = Maps.newHashMap();
-               vertexIDtoLoopTimeout = Maps.newHashMap();
-               iterationSourceSinkPairs = Sets.newHashSet();
-               sources = Sets.newHashSet();
-               sinks = Sets.newHashSet();
+               streamNodes = new HashMap<>();
+               virtualSelectNodes = new HashMap<>();
+               virtuaPartitionNodes = new HashMap<>();
+               vertexIDtoBrokerID = new HashMap<>();
+               vertexIDtoLoopTimeout  = new HashMap<>();
+               iterationSourceSinkPairs = new HashSet<>();
+               sources = new HashSet<>();
+               sinks = new HashSet<>();
        }
 
-       protected ExecutionConfig getExecutionConfig() {
+       public ExecutionConfig getExecutionConfig() {
                return executionConfig;
        }
+       
+       public CheckpointConfig getCheckpointConfig() {
+               return checkpointConfig;
+       }
 
        public String getJobName() {
                return jobName;
@@ -138,18 +132,6 @@ public class StreamGraph extends StreamingPlan {
                this.chaining = chaining;
        }
 
-       public void setCheckpointingEnabled(boolean checkpointingEnabled) {
-               this.checkpointingEnabled = checkpointingEnabled;
-       }
-
-       public void setCheckpointingInterval(long checkpointingInterval) {
-               this.checkpointingInterval = checkpointingInterval;
-       }
-
-       public void forceCheckpoint() {
-               this.forceCheckpoint = true;
-       }
-
        public void setStateBackend(StateBackend<?> backend) {
                this.stateBackend = backend;
        }
@@ -158,27 +140,11 @@ public class StreamGraph extends StreamingPlan {
                return this.stateBackend;
        }
 
-       public long getCheckpointingInterval() {
-               return checkpointingInterval;
-       }
-
        // Checkpointing
        
        public boolean isChainingEnabled() {
                return chaining;
        }
-
-       public boolean isCheckpointingEnabled() {
-               return checkpointingEnabled;
-       }
-
-       public CheckpointingMode getCheckpointingMode() {
-               return checkpointingMode;
-       }
-
-       public void setCheckpointingMode(CheckpointingMode checkpointingMode) {
-               this.checkpointingMode = checkpointingMode;
-       }
        
 
        public boolean isIterative() {
@@ -322,7 +288,7 @@ public class StreamGraph extends StreamingPlan {
                                downStreamVertexID,
                                typeNumber,
                                null,
-                               Lists.<String>newArrayList());
+                               new ArrayList<String>());
 
        }
 
@@ -463,10 +429,7 @@ public class StreamGraph extends StreamingPlan {
        }
 
        public StreamEdge getStreamEdge(int sourceId, int targetId) {
-               Iterator<StreamEdge> outIterator = 
getStreamNode(sourceId).getOutEdges().iterator();
-               while (outIterator.hasNext()) {
-                       StreamEdge edge = outIterator.next();
-
+               for (StreamEdge edge : getStreamNode(sourceId).getOutEdges()) {
                        if (edge.getTargetId() == targetId) {
                                return edge;
                        }
@@ -505,8 +468,7 @@ public class StreamGraph extends StreamingPlan {
                return vertexIDtoLoopTimeout.get(vertexID);
        }
 
-       public  Tuple2<StreamNode, StreamNode> createIterationSourceAndSink(int 
loopId, int sourceId, int sinkId, long timeout, int parallelism) {
-
+       public Tuple2<StreamNode, StreamNode> createIterationSourceAndSink(int 
loopId, int sourceId, int sinkId, long timeout, int parallelism) {
                StreamNode source = this.addNode(sourceId,
                                StreamIterationHead.class,
                                null,
@@ -537,15 +499,12 @@ public class StreamGraph extends StreamingPlan {
                return iterationSourceSinkPairs;
        }
 
-       protected void removeEdge(StreamEdge edge) {
-
+       private void removeEdge(StreamEdge edge) {
                edge.getSourceVertex().getOutEdges().remove(edge);
                edge.getTargetVertex().getInEdges().remove(edge);
-
        }
 
-       protected void removeVertex(StreamNode toRemove) {
-
+       private void removeVertex(StreamNode toRemove) {
                Set<StreamEdge> edgesToRemove = new HashSet<StreamEdge>();
 
                edgesToRemove.addAll(toRemove.getInEdges());
@@ -560,9 +519,10 @@ public class StreamGraph extends StreamingPlan {
        /**
         * Gets the assembled {@link JobGraph}.
         */
+       @SuppressWarnings("deprecation")
        public JobGraph getJobGraph() {
                // temporarily forbid checkpointing for iterative jobs
-               if (isIterative() && isCheckpointingEnabled() && 
!forceCheckpoint) {
+               if (isIterative() && checkpointConfig.isCheckpointingEnabled() 
&& !checkpointConfig.isForceCheckpointing()) {
                        throw new UnsupportedOperationException(
                                        "Checkpointing is currently not 
supported by default for iterative jobs, as we cannot guarantee exactly once 
semantics. "
                                                        + "State checkpoints 
happen normally, but records in-transit during the snapshot will be lost upon 
failure. "
@@ -576,16 +536,12 @@ public class StreamGraph extends StreamingPlan {
 
        @Override
        public String getStreamingPlanAsJSON() {
-
                try {
                        return new JSONGenerator(this).getJSON();
-               } catch (JSONException e) {
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("JSON plan creation failed: {}", e);
-                       }
-                       return "";
                }
-
+               catch (Exception e) {
+                       throw new RuntimeException("JSON plan creation failed", 
e);
+               }
        }
 
        @Override
@@ -606,5 +562,4 @@ public class StreamGraph extends StreamingPlan {
        public static enum ResourceStrategy {
                DEFAULT, ISOLATE, NEWGROUP
        }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 8bd0e48..4bd7a73 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -98,17 +98,6 @@ public class StreamGraphGenerator {
        private StreamGraphGenerator(StreamExecutionEnvironment env) {
                this.streamGraph = new StreamGraph(env);
                this.streamGraph.setChaining(env.isChainingEnabled());
-               
-               if (env.getCheckpointInterval() > 0) {
-                       this.streamGraph.setCheckpointingEnabled(true);
-                       
this.streamGraph.setCheckpointingInterval(env.getCheckpointInterval());
-                       
this.streamGraph.setCheckpointingMode(env.getCheckpointingMode());
-               }
-               this.streamGraph.setStateBackend(env.getStateBackend());
-               if (env.isForceCheckpointing()) {
-                       this.streamGraph.forceCheckpoint();
-               }
-               
                this.env = env;
                this.alreadyTransformed = new HashMap<>();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 613d381..515e362 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.api.graph;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -27,10 +26,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
+
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -45,6 +44,7 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
@@ -52,6 +52,7 @@ import 
org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
 import org.apache.flink.util.InstantiationUtil;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -274,18 +275,20 @@ public class StreamingJobGraphGenerator {
                config.setNonChainedOutputs(nonChainableOutputs);
                config.setChainedOutputs(chainableOutputs);
 
-               
config.setCheckpointingEnabled(streamGraph.isCheckpointingEnabled());
-               if (streamGraph.isCheckpointingEnabled()) {
-                       
config.setCheckpointMode(streamGraph.getCheckpointingMode());
+               final CheckpointConfig ceckpointCfg = 
streamGraph.getCheckpointConfig();
+               
+               
config.setCheckpointingEnabled(ceckpointCfg.isCheckpointingEnabled());
+               if (ceckpointCfg.isCheckpointingEnabled()) {
+                       
config.setCheckpointMode(ceckpointCfg.getCheckpointingMode());
                        config.setStateBackend(streamGraph.getStateBackend());
-               } else {
-                       // the at least once input handler is slightly cheaper 
(in the absence of checkpoints),
+               }
+               else {
+                       // the "at-least-once" input handler is slightly 
cheaper (in the absence of checkpoints),
                        // so we use that one if checkpointing is not enabled
                        
config.setCheckpointMode(CheckpointingMode.AT_LEAST_ONCE);
                }
-               config.setStatePartitioner((KeySelector<?, Serializable>) 
vertex.getStatePartitioner());
+               config.setStatePartitioner(vertex.getStatePartitioner());
                config.setStateKeySerializer(vertex.getStateKeySerializer());
-
                
                Class<? extends AbstractInvokable> vertexClass = 
vertex.getJobVertexClass();
 
@@ -385,8 +388,10 @@ public class StreamingJobGraphGenerator {
        }
        
        private void configureCheckpointing() {
-               if (streamGraph.isCheckpointingEnabled()) {
-                       long interval = streamGraph.getCheckpointingInterval();
+               CheckpointConfig cfg = streamGraph.getCheckpointConfig();
+               
+               if (cfg.isCheckpointingEnabled()) {
+                       long interval = cfg.getCheckpointInterval();
                        if (interval < 1) {
                                throw new IllegalArgumentException("The 
checkpoint interval must be positive");
                        }
@@ -400,10 +405,9 @@ public class StreamingJobGraphGenerator {
                        List<JobVertexID> ackVertices = new 
ArrayList<JobVertexID>(jobVertices.size());
 
                        // collect the vertices that receive "commit 
checkpoint" messages
-                       // currently, these are all certices
+                       // currently, these are all vertices
                        List<JobVertexID> commitVertices = new 
ArrayList<JobVertexID>();
                        
-                       
                        for (JobVertex vertex : jobVertices.values()) {
                                if (vertex.isInputVertex()) {
                                        triggerVertices.add(vertex.getID());
@@ -414,7 +418,9 @@ public class StreamingJobGraphGenerator {
                        }
 
                        JobSnapshottingSettings settings = new 
JobSnapshottingSettings(
-                                       triggerVertices, ackVertices, 
commitVertices, interval);
+                                       triggerVertices, ackVertices, 
commitVertices, interval,
+                                       cfg.getCheckpointTimeout(), 
cfg.getMinPauseBetweenCheckpoints(),
+                                       cfg.getMaxConcurrentCheckpoints());
                        jobGraph.setSnapshotSettings(settings);
 
                        // if the user enabled checkpointing, the default 
number of exec retries is infinitive.

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 14f23e1..69147f6 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -96,6 +96,13 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   // ------------------------------------------------------------------------
   //  Checkpointing Settings
   // ------------------------------------------------------------------------
+  
+  /**
+   * Gets the checkpoint config, which defines values like checkpoint 
interval, delay between
+   * checkpoints, etc.
+   */
+  def getCheckpointConfig = javaEnv.getCheckpointConfig()
+  
   /**
    * Enables checkpointing for the streaming job. The distributed state of the 
streaming
    * dataflow will be periodically snapshotted. In case of a failure, the 
streaming

Reply via email to