Hi Robert, 

Thanks for the feed-back.

You are correct, the behavior is indeed different: when I make the source 
bounded, the application eventually stops whereas without that setting it runs 
forever.

In both cases neither checkpoints nor data is being written to the filesystem.

I re-ran the experiment to get more info:

* when making the kafka source unbounded (which is not what I want), I notice 
log telling tasks associated to the Kafka source gets unregistered in the 
beginning:

2021-08-04 10:55:16,112 [INFO org.apa.fli.run.tas.Task Source: Kafka Source -> 
Sink: Unnamed (3/8)#0 – Freeing task resources for Source: Kafka Source -> 
Sink: Unnamed (3/8)#0 (70e93520406e12a1e7480a3344f4064d).
2021-08-04 10:55:16,112 [INFO org.apa.fli.run.tas.TaskExecutor 
flink-akka.actor.default-dispatcher-6 – Un-registering task and sending final 
execution state FINISHED to JobManager for task Source: Kafka Source -> Sink: 
Unnamed (8/8)#0 f02843c05005daced3d5271832f25559.

Later on, it seems each checkpoint fails while complaining that some tasks are 
not running => would that be caused by the finished tasks above?

2021-08-04 10:55:31,463 [INFO org.apa.fli.run.che.CheckpointCoordinator 
Checkpoint Timer – Failed to trigger checkpoint for job 
321fc6243d8aa091bdb8b913b7c3a679 since some tasks of job 
321fc6243d8aa091bdb8b913b7c3a679 has been finished, abort the checkpoint 
Failure reason: Not all required tasks are currently running.


* when setting the kafka source back to bounded, I also notice such 
"unregistered" log message, before one "Checkpoint Timer – Failed to trigger 
c" log, and then the job finishes


By digging a little this seems similar to this bug: 
https://issues.apache.org/jira/browse/FLINK-2491

I've tried to add a supplementary ProcessFunction in the pipeline just to have 
one more stateful thing and hope to trigger the checkpoint, though without 
success. 




On Tue, 3 Aug 2021, at 1:33 PM, Robert Metzger wrote:
> Hi Svend,
> I'm a bit confused by this statement:
> 
>> * In sreaming mode, with checkpoing but removing the `setBounded()` on the 
>> kafka source yields the same result
> 
> My expectation would be that the source runs forever, if it is not bounded.
> Are you sure this error message is not coming from another task?
> 
> 
> On Sat, Jul 31, 2021 at 11:23 AM Svend <stream...@svend.xyz> wrote:
>> __
>> Hi everyone,
>> 
>> I'm failing to write a simple Flink 1.13.1 application with the DataStream 
>> that reads from kafka and writes to parquet.
>> 
>> My intention is to run this as a job that reads what is currenlty in kafka, 
>> shuts down when reaching current end of each partition and picks up from 
>> there next time it's started.
>> 
>> I've tried several variations, I can't get anything to work properly:
>> 
>> * In streaming mode, enabling checkpoint and setting the kafkaSource to 
>> bounded (see code sample below), the application fails to perform checkpoint 
>> complaining about:
>> 
>> "Checkpoint Timer Failed to trigger checkpoint for job ... since some tasks 
>> of job ... has been finished"
>> 
>> => no parquet part gets written, no checkpoint gets written and no kafka 
>> offset get committed
>>  
>> * In sreaming mode, with checkpoing but removing the `setBounded()` on the 
>> kafka source yields the same result
>> 
>> * I also tried in batch mode, removing the checkpoint, switching the 
>> StreamingFileSink for a FileSink and using Kafka's "auto.commit.interval.ms" 
>> => in that case I'm getting some parquet output and kafka offsets are 
>> committed, but the application shuts down before flushing the offset of what 
>> has been read, s.t. the latest kafka events will be read again at the next 
>> start.
>> 
>> This all sounds very basic, I see other people do this kind of thing 
>> (recently, [1]), and II was really expecting the combinaision of KafkaSource 
>> with StreamingFileSink and checkpointing to work, all those are streaming 
>> concepts. Hopefully I'm doing something wrong?
>> 
>> [1] http://mail-archives.apache.org/mod_mbox/flink-user/202106.mbox/browser
>> 
>> Thanks a lot in advance,
>> 
>> Svend
>> 
>> ```
>>     // I'm testing this by launching the app an IDE
>> 
>>     StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>     env.getConfig().registerTypeWithKryoSerializer(DynamicMessage.class, new 
>> DynamicProtobufKryoSerializer(params));
>> 
>>     env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>     env.enableCheckpointing(1000);
>>     
>> env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoints");
>>     
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> 
>>     KafkaSource<DynamicMessage> kafkaSource = 
>> KafkaSource.<DynamicMessage>builder()
>>         .setBootstrapServers("localhost:9092")
>>         .setTopics("some-topic")
>>         .setGroupId("my-group")
>>         .setValueOnlyDeserializer(new DynamicProtobufFlinkSchema())
>>         
>> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>>         .setBounded(OffsetsInitializer.latest())
>>         .build();
>> 
>>     StreamingFileSink<DynamicMessage> parquetSink = StreamingFileSink
>>         .forBulkFormat(
>>             Path.fromLocalFile(new 
>> File("/tmp/job-output/some-topic.parquet")),
>>             new ParquetWriterFactory<>((out) -> new 
>> ParquetDynamicMessageWriterBuilder(out, params).build()))
>>    .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>         .build();
>> 
>>     env
>>         .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka 
>> Source")
>>         .sinkTo(parquetSink);
>> 
>>     env.execute("my-job");
>> ```

Reply via email to