Hi Robert, Thanks for the feed-back.
You are correct, the behavior is indeed different: when I make the source bounded, the application eventually stops whereas without that setting it runs forever. In both cases neither checkpoints nor data is being written to the filesystem. I re-ran the experiment to get more info: * when making the kafka source unbounded (which is not what I want), I notice log telling tasks associated to the Kafka source gets unregistered in the beginning: 2021-08-04 10:55:16,112 [INFO org.apa.fli.run.tas.Task Source: Kafka Source -> Sink: Unnamed (3/8)#0 â Freeing task resources for Source: Kafka Source -> Sink: Unnamed (3/8)#0 (70e93520406e12a1e7480a3344f4064d). 2021-08-04 10:55:16,112 [INFO org.apa.fli.run.tas.TaskExecutor flink-akka.actor.default-dispatcher-6 â Un-registering task and sending final execution state FINISHED to JobManager for task Source: Kafka Source -> Sink: Unnamed (8/8)#0 f02843c05005daced3d5271832f25559. Later on, it seems each checkpoint fails while complaining that some tasks are not running => would that be caused by the finished tasks above? 2021-08-04 10:55:31,463 [INFO org.apa.fli.run.che.CheckpointCoordinator Checkpoint Timer â Failed to trigger checkpoint for job 321fc6243d8aa091bdb8b913b7c3a679 since some tasks of job 321fc6243d8aa091bdb8b913b7c3a679 has been finished, abort the checkpoint Failure reason: Not all required tasks are currently running. * when setting the kafka source back to bounded, I also notice such "unregistered" log message, before one "Checkpoint Timer â Failed to trigger c" log, and then the job finishes By digging a little this seems similar to this bug: https://issues.apache.org/jira/browse/FLINK-2491 I've tried to add a supplementary ProcessFunction in the pipeline just to have one more stateful thing and hope to trigger the checkpoint, though without success. On Tue, 3 Aug 2021, at 1:33 PM, Robert Metzger wrote: > Hi Svend, > I'm a bit confused by this statement: > >> * In sreaming mode, with checkpoing but removing the `setBounded()` on the >> kafka source yields the same result > > My expectation would be that the source runs forever, if it is not bounded. > Are you sure this error message is not coming from another task? > > > On Sat, Jul 31, 2021 at 11:23 AM Svend <stream...@svend.xyz> wrote: >> __ >> Hi everyone, >> >> I'm failing to write a simple Flink 1.13.1 application with the DataStream >> that reads from kafka and writes to parquet. >> >> My intention is to run this as a job that reads what is currenlty in kafka, >> shuts down when reaching current end of each partition and picks up from >> there next time it's started. >> >> I've tried several variations, I can't get anything to work properly: >> >> * In streaming mode, enabling checkpoint and setting the kafkaSource to >> bounded (see code sample below), the application fails to perform checkpoint >> complaining about: >> >> "Checkpoint Timer Failed to trigger checkpoint for job ... since some tasks >> of job ... has been finished" >> >> => no parquet part gets written, no checkpoint gets written and no kafka >> offset get committed >> >> * In sreaming mode, with checkpoing but removing the `setBounded()` on the >> kafka source yields the same result >> >> * I also tried in batch mode, removing the checkpoint, switching the >> StreamingFileSink for a FileSink and using Kafka's "auto.commit.interval.ms" >> => in that case I'm getting some parquet output and kafka offsets are >> committed, but the application shuts down before flushing the offset of what >> has been read, s.t. the latest kafka events will be read again at the next >> start. >> >> This all sounds very basic, I see other people do this kind of thing >> (recently, [1]), and II was really expecting the combinaision of KafkaSource >> with StreamingFileSink and checkpointing to work, all those are streaming >> concepts. Hopefully I'm doing something wrong? >> >> [1] http://mail-archives.apache.org/mod_mbox/flink-user/202106.mbox/browser >> >> Thanks a lot in advance, >> >> Svend >> >> ``` >> // I'm testing this by launching the app an IDE >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.getConfig().registerTypeWithKryoSerializer(DynamicMessage.class, new >> DynamicProtobufKryoSerializer(params)); >> >> env.setRuntimeMode(RuntimeExecutionMode.STREAMING); >> env.enableCheckpointing(1000); >> >> env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoints"); >> >> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >> >> KafkaSource<DynamicMessage> kafkaSource = >> KafkaSource.<DynamicMessage>builder() >> .setBootstrapServers("localhost:9092") >> .setTopics("some-topic") >> .setGroupId("my-group") >> .setValueOnlyDeserializer(new DynamicProtobufFlinkSchema()) >> >> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) >> .setBounded(OffsetsInitializer.latest()) >> .build(); >> >> StreamingFileSink<DynamicMessage> parquetSink = StreamingFileSink >> .forBulkFormat( >> Path.fromLocalFile(new >> File("/tmp/job-output/some-topic.parquet")), >> new ParquetWriterFactory<>((out) -> new >> ParquetDynamicMessageWriterBuilder(out, params).build())) >> .withRollingPolicy(OnCheckpointRollingPolicy.build()) >> .build(); >> >> env >> .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka >> Source") >> .sinkTo(parquetSink); >> >> env.execute("my-job"); >> ```