I left a question on stackoverflow (
https://stackoverflow.com/questions/49712817/two-questions-on-flink-externalized-checkpoints)
and ask again in mail-list in case you check mail more often.

I have two questions on Flink externalized checkpoints

(Q1) I can set "state.checkpoints.dir" in flink-conf.yaml to get
externalized checkpoints to work all right, but how do I achieve same thing
when I run flink from IDE? I tried the GlobalConfiguration approach
mentioned in (
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/state-checkpoints-dir-td17921.html)
but no luck.

This is how I did it:

Configuration cfg =
                GlobalConfiguration.loadConfiguration();
cfg.setString("state.checkpoints.dir", "file:///tmp/checkpoints/state");
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

and this is the error msg show in IDE:

Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Failed to submit job
ef7050e2308a4787d983d80f3c07f55c (Long Taxi Rides (checkpointed))
    at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1325)
    at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:447)
    at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
    at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
    at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: CheckpointConfig says to
persist periodic checkpoints, but no checkpoint directory has been
configured. You can configure configure one via key 'state.checkpoints.dir'.
    at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:211)
    at
org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:478)
    at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:291)
    at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1277)
    ... 19 more

Process finished with exit code 1

(Q2) In the checkpoint's document (
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/checkpointing.html),
it says "This way, you will have a checkpoint around to resume from if your
job fails.", how about the cancelled jobs? will the new job carry on with
the existing checkpoint or it will start with its own checkpoint?


This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275

Reply via email to