Hi,

Sorry to hear it's hard to find the option. It is part of the 1.14
release[1]. It is also documented how to enable it[2]. Happy to hear how
we can improve the situation here.


As for the exception. Are you seeing this exception occur repeatedly for
the same task? I can imagine a situation that with frequent checkpoints
a Task might finish while there is an RPC triggering request pending
somewhere on the wire. In that case such a checkpoint could fail, but
the next triggered should not try to trigger the FINISHED task anymore.


Does it cause problems in your pipeline or are you just concerned with
the entry in logs?

Best,

Dawid


[1]
https://github.com/apache/flink/blob/ef0e17ad6319175ce0054fc3c4db14b78e690dd6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java#L236

[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/checkpointing/#checkpointing-with-parts-of-the-graph-finished-beta


On 06/12/2021 18:24, James Sandys-Lumsdaine wrote:
> Hello again,
>
> We recently upgraded from Flink 1.12.3 to 1.14.0 and we were hoping it
> would solve our issue with checkpointing with finished data sources.
> We need the checkpointing to work to trigger Flink's
> GenericWriteAheadSink class.
>
> Firstly, the constant mentioned on FLIP-147 that enables the feature
> isn't available as far as we can see
> (ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH). It's not in ConfigConstants
> or CheckpointConfig for example. So instead we enabled with the following:
>
> conf.setBoolean("execution.checkpointing.checkpoints-after-tasks-finish.enabled",
> true);
> StreamExecutionEnvironment env
> = StreamExecutionEnvironment .createLocalEnvironmentWithWebUI(config)
> env.enableCheckpointing(30 * 1000);
> ...
>
> We can see the constant available in 1.15 on Google but not the
> version we were expecting (1.14.0).
>
> Previously we had to have long Thread.sleep(x) in to keep the sources
> alive when checkpoints were taken. When we enable this feature using
> the explicit string and removed these hacks we start seeing these errors:
>
> INFO  [flink-akka.actor.default-dispatcher-7] o.a.f.r.e.ExecutionGraph
> Source: Order JDBC Source (1/1) (e015c4f0910fb27e15fec063616ab785)
> switched from RUNNING to FINISHED.
>
>  
>
> [some lines removed for brevity]
>
> * *
>
> INFO 
> [flink-akka.actor.default-dispatcher-7] *o.a.f.r.c.CheckpointCoordinator
> Triggering Checkpoint 5 for job 53d42ae669fad6cc8df2fe8f5706b48d
> failed due to {}*
>
> org.apache.flink.runtime.checkpoint.CheckpointException: TaskManager
> received a checkpoint request for unknown
> task e015c4f0910fb27e15fec063616ab785. Failure reason: Task local
> checkpoint failure.
>
>      at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:966)
> ~[flink-runtime-1.14.0.jar:1.14.0]
>
>      at sun.reflect.GeneratedMethodAccessor58.invoke(Unknown Source)
> ~[na:na]
>
>      at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[na:1.8.0_91]
>
>      at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_91]
>
>      at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
> ~[na:na]
>
>      at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
> ~[na:na]
>
>      at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
> ~[na:na]
>
>      at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
> ~[na:na]
>
>      at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
> ~[na:na]
>
>      at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> ~[na:na]
>
>      at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> ~[na:na]
>
>      at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> ~[scala-library-2.11.12.jar:na]
>
>      at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> ~[scala-library-2.11.12.jar:na]
>
>      at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> ~[na:na]
>
>      at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[scala-library-2.11.12.jar:na]
>
>      at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[scala-library-2.11.12.jar:na]
>
>      at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[scala-library-2.11.12.jar:na]
>
>      at akka.actor.Actor.aroundReceive(Actor.scala:537) ~[na:na]
>
>      at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[na:na]
>
>      at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) ~[na:na]
>
>      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) ~[na:na]
>
>      at akka.actor.ActorCell.invoke(ActorCell.scala:548) ~[na:na]
>
>      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) ~[na:na]
>
>      at akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[na:na]
>
>      at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ~[na:na]
>
>      at
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> ~[na:1.8.0_91]
>
>      at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> ~[na:1.8.0_91]
>
>      at
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> ~[na:1.8.0_91]
>
>      at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
> ~[na:1.8.0_91]
>
>
> FYI, if we don't enable this feature we see a different error
> consistent with the older version of Flink:
>
>     *INFO  [Checkpoint Timer]
>     o.a.f.r.c.CheckpointCoordinator           Failed to trigger
>     checkpoint for job 532b20d72d407bf82570c86f672ccf2c because Some
>     tasks of the job have already finished and checkpointing with
>     finished tasks is not enabled. Failure reason: Not all required
>     tasks are currently running.*
>
>
> Can anyone advise if this feature is indeed available and working in
> 1.14.0 and how to correctly enable?
>
> Thanks,
>
> James.
>
> ------------------------------------------------------------------------
> *From:* Austin Cawley-Edwards <austin.caw...@gmail.com>
> *Sent:* 04 November 2021 18:46
> *To:* James Sandys-Lumsdaine <jas...@hotmail.com>
> *Cc:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Re: GenericWriteAheadSink, declined checkpoint for a
> finished source
>  
> Hi James,
>
> You are correct that since Flink 1.14 [1] (which included FLIP-147
> [2]) there is support for checkpointing after some tasks has finished,
> which sounds like it will solve this use case.
>
> You may also want to look at the JDBC sink[3] which also supports
> batching, as well as some other nice things like retries and batch
> intervals.
>
> Hope that helps,
> Austin
>
>
> [1]:
> https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams
> <https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams>
> [2]:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished>
> [3]:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/>
>
>
> On Wed, Nov 3, 2021 at 12:25 PM James Sandys-Lumsdaine
> <jas...@hotmail.com <mailto:jas...@hotmail.com>> wrote:
>
>     Hello,
>
>     I have a Flink workflow where I need to upload the output data
>     into a legacy SQL Server database and so I have read the section
>     in the Flink book about data sinks and utilizing the
>     GenericWriteAheadSink base class. I am currently using Flink
>     1.12.3 although we plan to upgrade to 1.14 shortly. 
>
>     Firstly, given I will be generating a large amount of data I feel
>     it best to use the GenericWriteAheadSink base class so I can bulk
>     copy all the data into my SQL Server database rather than attempt
>     a row by row insertion which would be too slow. Hopefully this is
>     a good use case for this class or is there now a better approach?
>
>     Secondly, one thing I noticed is my JDBC source emits ~50,000 rows
>     but the program actually exists before a final checkpoint is taken
>     so I miss many of the final rows - I have to put in a
>     Thread.sleep(5000) before allowing the JDBC source to exit. This
>     might be related to FLINK-21215 as I see the following error:
>     /org.apache.flink.util.SerializedThrowable: Task NameSource: Trade
>     JDBC Source (1/1)#0 Failure reason: Checkpoint was declined (tasks
>     not ready)/
>     With the extra Thread.sleep(5000) I see all the rows handled by
>     the sendValues() method.
>
>     I have included the test code below which just logs the
>     "insertions" for now (and doesn't do real db access) but
>     demonstrates the problem:
>
>     private void checkpointTest() throws Exception {
>         Configuration conf = new Configuration();
>         conf.setBoolean(ConfigConstants./LOCAL_START_WEBSERVER/, true);
>         final StreamExecutionEnvironment env =
>     StreamExecutionEnvironment./createLocalEnvironmentWithWebUI/(conf);
>         env.setParallelism(1);
>         env.enableCheckpointing(500);
>
>     /
>     /    MyJDBCSource myJDBCSource = new
>     MyJDBCSource(tradesDBConnectionParams, fromDttm, toDttm, asOf);
>         DataStream<MyObj> jdbcStreamIn = env.addSource(myJDBCSource,
>     "My JDBC Source");
>
>     /    /jdbcTradesStreamIn.transform("SqlServerSink",
>     TypeInformation./of/(MyObj.class), new SqlServerBulkCopySink(
>                 new FileCheckpointCommitter("c:\\temp\\FlinkTemp"),
>                
>     TypeExtractor./createTypeInfo/(MyObj.class).createSerializer(new
>     ExecutionConfig()),
>                 UUID./randomUUID/().toString()));
>
>         env.execute();
>     }
>
>     private static class SqlServerBulkCopySink extends
>     GenericWriteAheadSink<MyObj> {
>         public SqlServerBulkCopySink(CheckpointCommitter committer,
>     TypeSerializer<MyObj> serializer, String jobID) throws Exception {
>             super(committer, serializer, jobID);
>         }
>
>         @Override
>         protected boolean sendValues(Iterable<MyObj> objects, long
>     checkpointId, long timestamp) {
>             /logger/.info("Sending
>     {},{}-----------------------------------------------",
>     checkpointId, timestamp);
>             for (MyObj myObj: objects)
>                 /logger/.info("  {},{}: {}", checkpointId, timestamp,
>     trade); // this will eventually be a bulk copy insert into the SQL
>     Server database
>             return true;
>         }
>     }
>
>      
>
>     Am I right in thinking the latest versions of Flink will not
>     suffer from this problem or am I hitting something else? To be
>     clear, I am expecting a checkpoint to be invoked by Flink to cover
>     all the data I want to insert into my DB - how else would I do the
>     final bulk copy if my sendValues() is not called?
>
>
>     I have more questions about my data sink but I will wait to hear
>     your answers.
>
>
>     Many thanks in advance,
>
>
>     James.
>
>

Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to