Hi Nico,

> On Feb 26, 2018, at 9:41 AM, Nico Kruber <n...@data-artisans.com> wrote:
> 
> Hi Ken,
> LocalFlinkMiniCluster should run checkpoints just fine. It looks like it
> was attempting to even create one but could not finish. Maybe your
> program was not fully running yet?

In the logs I see:

18/02/23 12:40:50 INFO taskmanager.Task:957 - Source: Seed urls source (1/2) 
(56fdede2f4783455b4ab8f290e700baa) switched from DEPLOYING to RUNNING.
18/02/23 12:40:50 DEBUG tasks.StreamTask:214 - Initializing Source: Seed urls 
source (1/2).
18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 - Checkpoint 
triggering task Source: Seed urls source (1/2) is not being executed at the 
moment. Aborting checkpoint.

Maybe the checkpoint here is happening too soon after the “Initializing Source” 
message.

After that the source is done (it only triggers the iteration with a single 
starting tuple), so I wouldn’t expect checkpointing to actually do anything. I 
was just using these messages as indications that I had configured my workflow 
properly to actually do checkpointing.

> Can you tell us a little bit more about your set up and how you
> configured the LocalFlinkMiniCluster?

Potential issue #1 - I’ve got a workflow with multiple iterations.

For that reason I had to force checkpointing via:

        env.setStateBackend(new MemoryStateBackend());
        env.enableCheckpointing(100L, CheckpointingMode.AT_LEAST_ONCE, true);


Potential issue #2 - because of the fun with tracking iteration progress, I 
subclass LocalStreamEnvironment to add this async execution method:

        public JobSubmissionResult executeAsync(String jobName) throws 
Exception {
                // transform the streaming program into a JobGraph
                StreamGraph streamGraph = getStreamGraph();
                streamGraph.setJobName(jobName);

                JobGraph jobGraph = streamGraph.getJobGraph();

                Configuration configuration = new Configuration();
                configuration.addAll(jobGraph.getJobConfiguration());

                configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
-1L);
                
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
jobGraph.getMaximumParallelism());

                // add (and override) the settings with what the user defined
                configuration.addAll(_conf);

                _exec = new LocalFlinkMiniCluster(configuration, true);
                _exec.start(true);
                
                // The above code is all basically the same as Flink's 
LocalStreamEnvironment.
                // The change is that here we call submitJobDetached vs. 
submitJobAndWait.
                // We assume that eventually someone calls stop(job id), which 
then terminates
                // the LocalFlinkMinimCluster.
                return _exec.submitJobDetached(jobGraph);
        }

However I don’t think that would impact checkpointing.

Anything else I should do to debug whether checkpointing is operating as 
expected? In the logs, at DEBUG level, I don’t see any errors or warnings 
related to this.

Thanks,

— Ken

> 
> 
> Nico
> 
> On 23/02/18 21:42, Ken Krugler wrote:
>> Hi all,
>> 
>> For testing checkpointing, is it possible to use LocalFlinkMiniCluster?
>> 
>> Asking because I’m not seeing checkpoint calls being made to my custom 
>> function (implements ListCheckpointed) when I’m running with 
>> LocalFlinkMiniCluster.
>> 
>> Though I do see entries like this logged:
>> 
>> 18/02/23 12:40:50 INFO jobmanager.JobManager:246 - Using application-defined 
>> state backend for checkpoint/savepoint metadata: MemoryStateBackend (data in 
>> heap memory / checkpoints to JobManager).
>> 18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 - Checkpoint 
>> triggering task Source: Seed urls source (1/2) is not being executed at the 
>> moment. Aborting checkpoint.
>> 
>> But when I browse the Flink source, tests for checkpointing seem to be using 
>> TestCluster, e.g. in ResumeCheckpointManuallyITCase
>> 
>> Thanks,
>> 
>> — Ken
>> 
>> --------------------------------------------
>> http://about.me/kkrugler
>> +1 530-210-6378
>> 
> 

--------------------------------------------
http://about.me/kkrugler
+1 530-210-6378

Reply via email to