Hello,
I have a Flink workflow where I need to upload the output data into a legacy
SQL Server database and so I have read the section in the Flink book about data
sinks and utilizing the GenericWriteAheadSink base class. I am currently using
Flink 1.12.3 although we plan to upgrade to 1.14 shortly.
Firstly, given I will be generating a large amount of data I feel it best to
use the GenericWriteAheadSink base class so I can bulk copy all the data into
my SQL Server database rather than attempt a row by row insertion which would
be too slow. Hopefully this is a good use case for this class or is there now a
better approach?
Secondly, one thing I noticed is my JDBC source emits ~50,000 rows but the
program actually exists before a final checkpoint is taken so I miss many of
the final rows - I have to put in a Thread.sleep(5000) before allowing the JDBC
source to exit. This might be related to FLINK-21215 as I see the following
error:
org.apache.flink.util.SerializedThrowable: Task NameSource: Trade JDBC Source
(1/1)#0 Failure reason: Checkpoint was declined (tasks not ready)
With the extra Thread.sleep(5000) I see all the rows handled by the
sendValues() method.
I have included the test code below which just logs the "insertions" for now
(and doesn't do real db access) but demonstrates the problem:
private void checkpointTest() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
env.enableCheckpointing(500);
MyJDBCSource myJDBCSource = new MyJDBCSource(tradesDBConnectionParams,
fromDttm, toDttm, asOf);
DataStream<MyObj> jdbcStreamIn = env.addSource(myJDBCSource, "My JDBC
Source");
jdbcTradesStreamIn.transform("SqlServerSink",
TypeInformation.of(MyObj.class), new SqlServerBulkCopySink(
new FileCheckpointCommitter("c:\\temp\\FlinkTemp"),
TypeExtractor.createTypeInfo(MyObj.class).createSerializer(new
ExecutionConfig()),
UUID.randomUUID().toString()));
env.execute();
}
private static class SqlServerBulkCopySink extends GenericWriteAheadSink<MyObj>
{
public SqlServerBulkCopySink(CheckpointCommitter committer,
TypeSerializer<MyObj> serializer, String jobID) throws Exception {
super(committer, serializer, jobID);
}
@Override
protected boolean sendValues(Iterable<MyObj> objects, long checkpointId,
long timestamp) {
logger.info("Sending
{},{}-----------------------------------------------", checkpointId, timestamp);
for (MyObj myObj: objects)
logger.info(" {},{}: {}", checkpointId, timestamp, trade); // this
will eventually be a bulk copy insert into the SQL Server database
return true;
}
}
Am I right in thinking the latest versions of Flink will not suffer from this
problem or am I hitting something else? To be clear, I am expecting a
checkpoint to be invoked by Flink to cover all the data I want to insert into
my DB - how else would I do the final bulk copy if my sendValues() is not
called?
I have more questions about my data sink but I will wait to hear your answers.
Many thanks in advance,
James.