Re: GenericWriteAheadSink, declined checkpoint for a finished source

2021-12-06 Thread Dawid Wysakowicz
eive(Actor.scala:537) ~[na:na]
>
>  at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[na:na]
>
>  at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) ~[na:na]
>
>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) ~[na:na]
>
>  at akka.actor.ActorCell.invoke(ActorCell.scala:548) ~[na:na]
>
>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) ~[na:na]
>
>  at akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[na:na]
>
>  at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ~[na:na]
>
>  at
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> ~[na:1.8.0_91]
>
>  at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> ~[na:1.8.0_91]
>
>  at
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> ~[na:1.8.0_91]
>
>  at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
> ~[na:1.8.0_91]
>
>
> FYI, if we don't enable this feature we see a different error
> consistent with the older version of Flink:
>
> *INFO  [Checkpoint Timer]
> o.a.f.r.c.CheckpointCoordinator   Failed to trigger
> checkpoint for job 532b20d72d407bf82570c86f672ccf2c because Some
> tasks of the job have already finished and checkpointing with
> finished tasks is not enabled. Failure reason: Not all required
> tasks are currently running.*
>
>
> Can anyone advise if this feature is indeed available and working in
> 1.14.0 and how to correctly enable?
>
> Thanks,
>
> James.
>
> 
> *From:* Austin Cawley-Edwards 
> *Sent:* 04 November 2021 18:46
> *To:* James Sandys-Lumsdaine 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: GenericWriteAheadSink, declined checkpoint for a
> finished source
>  
> Hi James,
>
> You are correct that since Flink 1.14 [1] (which included FLIP-147
> [2]) there is support for checkpointing after some tasks has finished,
> which sounds like it will solve this use case.
>
> You may also want to look at the JDBC sink[3] which also supports
> batching, as well as some other nice things like retries and batch
> intervals.
>
> Hope that helps,
> Austin
>
>
> [1]:
> https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams
> <https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams>
> [2]:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished>
> [3]:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/>
>
>
> On Wed, Nov 3, 2021 at 12:25 PM James Sandys-Lumsdaine
> mailto:jas...@hotmail.com>> wrote:
>
> Hello,
>
> I have a Flink workflow where I need to upload the output data
> into a legacy SQL Server database and so I have read the section
> in the Flink book about data sinks and utilizing the
> GenericWriteAheadSink base class. I am currently using Flink
> 1.12.3 although we plan to upgrade to 1.14 shortly. 
>
> Firstly, given I will be generating a large amount of data I feel
> it best to use the GenericWriteAheadSink base class so I can bulk
> copy all the data into my SQL Server database rather than attempt
> a row by row insertion which would be too slow. Hopefully this is
> a good use case for this class or is there now a better approach?
>
> Secondly, one thing I noticed is my JDBC source emits ~50,000 rows
> but the program actually exists before a final checkpoint is taken
> so I miss many of the final rows - I have to put in a
> Thread.sleep(5000) before allowing the JDBC source to exit. This
> might be related to FLINK-21215 as I see the following error:
> /org.apache.flink.util.SerializedThrowable: Task NameSource: Trade
> JDBC Source (1/1)#0 Failure reason: Checkpoint was declined (tasks
> not ready)/
> With the extra Thread.sleep(5000) I see all the rows handled by
> the sendValues() method.
>
> I have included the test code below which just logs the
> "insertions" for now (and doesn't do real db access) but
> demonstrates the problem:
>
> private void checkpointTest() throws Exception {
>     Configuration conf = new Configuration();
>     conf.setBoolean(ConfigConstants./LOCAL_START_WEBSERVER/, true);
>     final

Re: GenericWriteAheadSink, declined checkpoint for a finished source

2021-12-06 Thread James Sandys-Lumsdaine
 
Cc: user@flink.apache.org 
Subject: Re: GenericWriteAheadSink, declined checkpoint for a finished source

Hi James,

You are correct that since Flink 1.14 [1] (which included FLIP-147 [2]) there 
is support for checkpointing after some tasks has finished, which sounds like 
it will solve this use case.

You may also want to look at the JDBC sink[3] which also supports batching, as 
well as some other nice things like retries and batch intervals.

Hope that helps,
Austin


[1]: 
https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams
[2]: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
[3]: 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/

On Wed, Nov 3, 2021 at 12:25 PM James Sandys-Lumsdaine 
mailto:jas...@hotmail.com>> wrote:
Hello,

I have a Flink workflow where I need to upload the output data into a legacy 
SQL Server database and so I have read the section in the Flink book about data 
sinks and utilizing the GenericWriteAheadSink base class. I am currently using 
Flink 1.12.3 although we plan to upgrade to 1.14 shortly.

Firstly, given I will be generating a large amount of data I feel it best to 
use the GenericWriteAheadSink base class so I can bulk copy all the data into 
my SQL Server database rather than attempt a row by row insertion which would 
be too slow. Hopefully this is a good use case for this class or is there now a 
better approach?

Secondly, one thing I noticed is my JDBC source emits ~50,000 rows but the 
program actually exists before a final checkpoint is taken so I miss many of 
the final rows - I have to put in a Thread.sleep(5000) before allowing the JDBC 
source to exit. This might be related to FLINK-21215 as I see the following 
error:
org.apache.flink.util.SerializedThrowable: Task NameSource: Trade JDBC Source 
(1/1)#0 Failure reason: Checkpoint was declined (tasks not ready)
With the extra Thread.sleep(5000) I see all the rows handled by the 
sendValues() method.

I have included the test code below which just logs the "insertions" for now 
(and doesn't do real db access) but demonstrates the problem:

private void checkpointTest() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
env.enableCheckpointing(500);

MyJDBCSource myJDBCSource = new MyJDBCSource(tradesDBConnectionParams, 
fromDttm, toDttm, asOf);
DataStream jdbcStreamIn = env.addSource(myJDBCSource, "My JDBC 
Source");

jdbcTradesStreamIn.transform("SqlServerSink", 
TypeInformation.of(MyObj.class), new SqlServerBulkCopySink(
new FileCheckpointCommitter("c:\\temp\\FlinkTemp"),
TypeExtractor.createTypeInfo(MyObj.class).createSerializer(new 
ExecutionConfig()),
UUID.randomUUID().toString()));


env.execute();
}

private static class SqlServerBulkCopySink extends GenericWriteAheadSink 
{
public SqlServerBulkCopySink(CheckpointCommitter committer, 
TypeSerializer serializer, String jobID) throws Exception {
super(committer, serializer, jobID);
}

@Override
protected boolean sendValues(Iterable objects, long checkpointId, 
long timestamp) {
logger.info("Sending 
{},{}---", checkpointId, timestamp);
for (MyObj myObj: objects)
logger.info("  {},{}: {}", checkpointId, timestamp, trade); // this 
will eventually be a bulk copy insert into the SQL Server database
return true;
}
}



Am I right in thinking the latest versions of Flink will not suffer from this 
problem or am I hitting something else? To be clear, I am expecting a 
checkpoint to be invoked by Flink to cover all the data I want to insert into 
my DB - how else would I do the final bulk copy if my sendValues() is not 
called?


I have more questions about my data sink but I will wait to hear your answers.


Many thanks in advance,


James.



Re: GenericWriteAheadSink, declined checkpoint for a finished source

2021-11-04 Thread Caizhi Weng
Hi!

Thanks Austin for the answer. I agree that FLIP-147 has solved the problem,
just set execution.checkpointing.checkpoints-after-tasks-finish.enabled to
true to enable this feature.

JDBC sinks solves this problem in a different way. It flushes the sink when
closed (see JdbcOutputFormat#close [1]). But please note that
unlike GenericWriteAheadSink, JdbcOutputFormat flushes data once its buffer
threshold is reached, so users might see inconsistent data after a
checkpoint failure and before the job finishes (it requires a primary key
for the sink to reach the eventual consistency).

[1]
https://github.com/apache/flink/blob/d502353acd03428b9befa4ec970191b757b6c8c3/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java#L253

Austin Cawley-Edwards  于2021年11月5日周五 上午2:47写道:

> Hi James,
>
> You are correct that since Flink 1.14 [1] (which included FLIP-147 [2])
> there is support for checkpointing after some tasks has finished, which
> sounds like it will solve this use case.
>
> You may also want to look at the JDBC sink[3] which also supports
> batching, as well as some other nice things like retries and batch
> intervals.
>
> Hope that helps,
> Austin
>
>
> [1]:
> https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams
> [2]:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
> [3]:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/
>
> On Wed, Nov 3, 2021 at 12:25 PM James Sandys-Lumsdaine 
> wrote:
>
>> Hello,
>>
>> I have a Flink workflow where I need to upload the output data into a
>> legacy SQL Server database and so I have read the section in the Flink book
>> about data sinks and utilizing the GenericWriteAheadSink base class. I
>> am currently using Flink 1.12.3 although we plan to upgrade to 1.14
>> shortly.
>>
>> Firstly, given I will be generating a large amount of data I feel it best
>> to use the GenericWriteAheadSink base class so I can bulk copy all the data
>> into my SQL Server database rather than attempt a row by row insertion
>> which would be too slow. Hopefully this is a good use case for this class
>> or is there now a better approach?
>>
>> Secondly, one thing I noticed is my JDBC source emits ~50,000 rows but
>> the program actually exists before a final checkpoint is taken so I miss
>> many of the final rows - I have to put in a Thread.sleep(5000) before
>> allowing the JDBC source to exit. This might be related to FLINK-21215 as I
>> see the following error:
>> *org.apache.flink.util.SerializedThrowable: Task NameSource: Trade JDBC
>> Source (1/1)#0 Failure reason: Checkpoint was declined (tasks not ready)*
>> With the extra Thread.sleep(5000) I see all the rows handled by the
>> sendValues() method.
>>
>> I have included the test code below which just logs the "insertions" for
>> now (and doesn't do real db access) but demonstrates the problem:
>>
>> private void checkpointTest() throws Exception {
>> Configuration conf = new Configuration();
>> conf.setBoolean(ConfigConstants.*LOCAL_START_WEBSERVER*, true);
>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.
>> *createLocalEnvironmentWithWebUI*(conf);
>> env.setParallelism(1);
>> env.enableCheckpointing(500);
>>
>>
>> MyJDBCSource myJDBCSource = new MyJDBCSource(tradesDBConnectionParams,
>> fromDttm, toDttm, asOf);
>> DataStream jdbcStreamIn = env.addSource(myJDBCSource, "My
>> JDBC Source");
>>
>> jdbcTradesStreamIn.transform("SqlServerSink", TypeInformation.*of*
>> (MyObj.class), new SqlServerBulkCopySink(
>> new FileCheckpointCommitter("c:\\temp\\FlinkTemp"),
>> TypeExtractor.*createTypeInfo*(MyObj.class).createSerializer(new
>> ExecutionConfig()),
>> UUID.*randomUUID*().toString()));
>>
>> env.execute();
>> }
>>
>> private static class SqlServerBulkCopySink extends 
>> GenericWriteAheadSink
>> {
>> public SqlServerBulkCopySink(CheckpointCommitter committer, 
>> TypeSerializer
>> serializer, String jobID) throws Exception {
>> super(committer, serializer, jobID);
>> }
>>
>> @Override
>> protected boolean sendValues(Iterable objects, long
>> checkpointId, long timestamp) {
>> *logger*.info("Sending
>> {},{}---", checkpointId,
>> timestamp);
>> for (MyObj myObj: objects)
>> *logger*.info("  {},{}: {}", checkpointId, timestamp, trade);
>> // this will eventually be a bulk copy insert into the SQL Server database
>> return true;
>> }
>> }
>>
>>
>>
>> Am I right in thinking the latest versions of Flink will not suffer from
>> this problem or am I hitting something else? To be clear, I am expecting a
>> checkpoint to be invoked by Flink to cover all the data I want to insert
>> into my DB - how else would I do the final bulk copy if my 

Re: GenericWriteAheadSink, declined checkpoint for a finished source

2021-11-04 Thread Austin Cawley-Edwards
Hi James,

You are correct that since Flink 1.14 [1] (which included FLIP-147 [2])
there is support for checkpointing after some tasks has finished, which
sounds like it will solve this use case.

You may also want to look at the JDBC sink[3] which also supports batching,
as well as some other nice things like retries and batch intervals.

Hope that helps,
Austin


[1]:
https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams
[2]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
[3]:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/

On Wed, Nov 3, 2021 at 12:25 PM James Sandys-Lumsdaine 
wrote:

> Hello,
>
> I have a Flink workflow where I need to upload the output data into a
> legacy SQL Server database and so I have read the section in the Flink book
> about data sinks and utilizing the GenericWriteAheadSink base class. I am
> currently using Flink 1.12.3 although we plan to upgrade to 1.14 shortly.
>
> Firstly, given I will be generating a large amount of data I feel it best
> to use the GenericWriteAheadSink base class so I can bulk copy all the data
> into my SQL Server database rather than attempt a row by row insertion
> which would be too slow. Hopefully this is a good use case for this class
> or is there now a better approach?
>
> Secondly, one thing I noticed is my JDBC source emits ~50,000 rows but the
> program actually exists before a final checkpoint is taken so I miss many
> of the final rows - I have to put in a Thread.sleep(5000) before allowing
> the JDBC source to exit. This might be related to FLINK-21215 as I see the
> following error:
> *org.apache.flink.util.SerializedThrowable: Task NameSource: Trade JDBC
> Source (1/1)#0 Failure reason: Checkpoint was declined (tasks not ready)*
> With the extra Thread.sleep(5000) I see all the rows handled by the
> sendValues() method.
>
> I have included the test code below which just logs the "insertions" for
> now (and doesn't do real db access) but demonstrates the problem:
>
> private void checkpointTest() throws Exception {
> Configuration conf = new Configuration();
> conf.setBoolean(ConfigConstants.*LOCAL_START_WEBSERVER*, true);
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> *createLocalEnvironmentWithWebUI*(conf);
> env.setParallelism(1);
> env.enableCheckpointing(500);
>
>
> MyJDBCSource myJDBCSource = new MyJDBCSource(tradesDBConnectionParams,
> fromDttm, toDttm, asOf);
> DataStream jdbcStreamIn = env.addSource(myJDBCSource, "My JDBC
> Source");
>
> jdbcTradesStreamIn.transform("SqlServerSink", TypeInformation.*of*
> (MyObj.class), new SqlServerBulkCopySink(
> new FileCheckpointCommitter("c:\\temp\\FlinkTemp"),
> TypeExtractor.*createTypeInfo*(MyObj.class).createSerializer(new
> ExecutionConfig()),
> UUID.*randomUUID*().toString()));
>
> env.execute();
> }
>
> private static class SqlServerBulkCopySink extends 
> GenericWriteAheadSink
> {
> public SqlServerBulkCopySink(CheckpointCommitter committer, 
> TypeSerializer
> serializer, String jobID) throws Exception {
> super(committer, serializer, jobID);
> }
>
> @Override
> protected boolean sendValues(Iterable objects, long
> checkpointId, long timestamp) {
> *logger*.info("Sending
> {},{}---", checkpointId,
> timestamp);
> for (MyObj myObj: objects)
> *logger*.info("  {},{}: {}", checkpointId, timestamp, trade); //
> this will eventually be a bulk copy insert into the SQL Server database
> return true;
> }
> }
>
>
>
> Am I right in thinking the latest versions of Flink will not suffer from
> this problem or am I hitting something else? To be clear, I am expecting a
> checkpoint to be invoked by Flink to cover all the data I want to insert
> into my DB - how else would I do the final bulk copy if my sendValues() is
> not called?
>
>
> I have more questions about my data sink but I will wait to hear your
> answers.
>
>
> Many thanks in advance,
>
>
> James.
>
>
>


GenericWriteAheadSink, declined checkpoint for a finished source

2021-11-03 Thread James Sandys-Lumsdaine
Hello,

I have a Flink workflow where I need to upload the output data into a legacy 
SQL Server database and so I have read the section in the Flink book about data 
sinks and utilizing the GenericWriteAheadSink base class. I am currently using 
Flink 1.12.3 although we plan to upgrade to 1.14 shortly.

Firstly, given I will be generating a large amount of data I feel it best to 
use the GenericWriteAheadSink base class so I can bulk copy all the data into 
my SQL Server database rather than attempt a row by row insertion which would 
be too slow. Hopefully this is a good use case for this class or is there now a 
better approach?

Secondly, one thing I noticed is my JDBC source emits ~50,000 rows but the 
program actually exists before a final checkpoint is taken so I miss many of 
the final rows - I have to put in a Thread.sleep(5000) before allowing the JDBC 
source to exit. This might be related to FLINK-21215 as I see the following 
error:
org.apache.flink.util.SerializedThrowable: Task NameSource: Trade JDBC Source 
(1/1)#0 Failure reason: Checkpoint was declined (tasks not ready)
With the extra Thread.sleep(5000) I see all the rows handled by the 
sendValues() method.

I have included the test code below which just logs the "insertions" for now 
(and doesn't do real db access) but demonstrates the problem:

private void checkpointTest() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
env.enableCheckpointing(500);

MyJDBCSource myJDBCSource = new MyJDBCSource(tradesDBConnectionParams, 
fromDttm, toDttm, asOf);
DataStream jdbcStreamIn = env.addSource(myJDBCSource, "My JDBC 
Source");

jdbcTradesStreamIn.transform("SqlServerSink", 
TypeInformation.of(MyObj.class), new SqlServerBulkCopySink(
new FileCheckpointCommitter("c:\\temp\\FlinkTemp"),
TypeExtractor.createTypeInfo(MyObj.class).createSerializer(new 
ExecutionConfig()),
UUID.randomUUID().toString()));


env.execute();
}

private static class SqlServerBulkCopySink extends GenericWriteAheadSink 
{
public SqlServerBulkCopySink(CheckpointCommitter committer, 
TypeSerializer serializer, String jobID) throws Exception {
super(committer, serializer, jobID);
}

@Override
protected boolean sendValues(Iterable objects, long checkpointId, 
long timestamp) {
logger.info("Sending 
{},{}---", checkpointId, timestamp);
for (MyObj myObj: objects)
logger.info("  {},{}: {}", checkpointId, timestamp, trade); // this 
will eventually be a bulk copy insert into the SQL Server database
return true;
}
}



Am I right in thinking the latest versions of Flink will not suffer from this 
problem or am I hitting something else? To be clear, I am expecting a 
checkpoint to be invoked by Flink to cover all the data I want to insert into 
my DB - how else would I do the final bulk copy if my sendValues() is not 
called?


I have more questions about my data sink but I will wait to hear your answers.


Many thanks in advance,


James.