Hi Nico, > On Feb 26, 2018, at 9:41 AM, Nico Kruber <n...@data-artisans.com> wrote: > > Hi Ken, > LocalFlinkMiniCluster should run checkpoints just fine. It looks like it > was attempting to even create one but could not finish. Maybe your > program was not fully running yet?
In the logs I see: 18/02/23 12:40:50 INFO taskmanager.Task:957 - Source: Seed urls source (1/2) (56fdede2f4783455b4ab8f290e700baa) switched from DEPLOYING to RUNNING. 18/02/23 12:40:50 DEBUG tasks.StreamTask:214 - Initializing Source: Seed urls source (1/2). 18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 - Checkpoint triggering task Source: Seed urls source (1/2) is not being executed at the moment. Aborting checkpoint. Maybe the checkpoint here is happening too soon after the “Initializing Source” message. After that the source is done (it only triggers the iteration with a single starting tuple), so I wouldn’t expect checkpointing to actually do anything. I was just using these messages as indications that I had configured my workflow properly to actually do checkpointing. > Can you tell us a little bit more about your set up and how you > configured the LocalFlinkMiniCluster? Potential issue #1 - I’ve got a workflow with multiple iterations. For that reason I had to force checkpointing via: env.setStateBackend(new MemoryStateBackend()); env.enableCheckpointing(100L, CheckpointingMode.AT_LEAST_ONCE, true); Potential issue #2 - because of the fun with tracking iteration progress, I subclass LocalStreamEnvironment to add this async execution method: public JobSubmissionResult executeAsync(String jobName) throws Exception { // transform the streaming program into a JobGraph StreamGraph streamGraph = getStreamGraph(); streamGraph.setJobName(jobName); JobGraph jobGraph = streamGraph.getJobGraph(); Configuration configuration = new Configuration(); configuration.addAll(jobGraph.getJobConfiguration()); configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L); configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism()); // add (and override) the settings with what the user defined configuration.addAll(_conf); _exec = new LocalFlinkMiniCluster(configuration, true); _exec.start(true); // The above code is all basically the same as Flink's LocalStreamEnvironment. // The change is that here we call submitJobDetached vs. submitJobAndWait. // We assume that eventually someone calls stop(job id), which then terminates // the LocalFlinkMinimCluster. return _exec.submitJobDetached(jobGraph); } However I don’t think that would impact checkpointing. Anything else I should do to debug whether checkpointing is operating as expected? In the logs, at DEBUG level, I don’t see any errors or warnings related to this. Thanks, — Ken > > > Nico > > On 23/02/18 21:42, Ken Krugler wrote: >> Hi all, >> >> For testing checkpointing, is it possible to use LocalFlinkMiniCluster? >> >> Asking because I’m not seeing checkpoint calls being made to my custom >> function (implements ListCheckpointed) when I’m running with >> LocalFlinkMiniCluster. >> >> Though I do see entries like this logged: >> >> 18/02/23 12:40:50 INFO jobmanager.JobManager:246 - Using application-defined >> state backend for checkpoint/savepoint metadata: MemoryStateBackend (data in >> heap memory / checkpoints to JobManager). >> 18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 - Checkpoint >> triggering task Source: Seed urls source (1/2) is not being executed at the >> moment. Aborting checkpoint. >> >> But when I browse the Flink source, tests for checkpointing seem to be using >> TestCluster, e.g. in ResumeCheckpointManuallyITCase >> >> Thanks, >> >> — Ken >> >> -------------------------------------------- >> http://about.me/kkrugler >> +1 530-210-6378 >> > -------------------------------------------- http://about.me/kkrugler +1 530-210-6378