Hi Dan,

Can you describe under which conditions you are missing records (after a
machine failure, after a Kafka failure, after taking and restoring from a
savepoint, ...).
Are many records missing? Are "the first records" or the "latest records"
missing? Any individual records missing, or larger blocks of data?

I don't think that there's a bug in Flink or the Kafka connector. Maybe its
just a configuration or systems design issue.


On Sun, Apr 25, 2021 at 9:56 AM Dan Hill <quietgol...@gmail.com> wrote:

> Hi!
>
> Have any other devs noticed issues with Flink missing Kafka records with
> long-running Flink jobs?  When I re-run my Flink job and start from the
> earliest Kafka offset, Flink processes the events correctly.  I'm using
> Flink v1.11.1.
>
> I have a simple job that takes records (Requests) from Kafka and
> serializes them to S3.  Pretty basic.  No related issues in the text logs.
> I'm hoping I just have a configuration issue.  I'm guessing idleness is
> working in a way that I'm not expecting.
>
> Any ideas?
> - Dan
>
>
> void createLogRequestJob(StreamExecutionEnvironment env) throws Exception {
>
>   Properties kafkaSourceProperties =
> getKafkaSourceProperties("logrequest");
>
>   SingleOutputStreamOperator<Request> rawRequestInput = env.addSource(
>
>     new FlinkKafkaConsumer(getInputRequestTopic(),
> getProtoDeserializationSchema(Request.class), kafkaSourceProperties))
>
>       .uid("source-request")
>
>       .name("Request")
>
>       .assignTimestampsAndWatermarks(
>
>
> WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness).withIdleness(Duration.ofMinutes(1)));
>
>
>   executeLogRequest(rawRequestInput);
>
>   env.execute("log-request");
>
> }
>
>
> void executeLogRequest(SingleOutputStreamOperator<Request>
> rawRequestInput) {
>
>   AvroWriterFactory<Request> factory =
> getAvroWriterFactory(Request.class);
>
>   rawRequestInput.addSink(StreamingFileSink
>
>       .forBulkFormat(new Path(getS3OutputDirectory(), "raw/request"),
> factory)
>
>       .withBucketAssigner(new DateHourBucketAssigner<Request>(request ->
> request.getTiming().getEventApiTimestamp()))
>
>       .withRollingPolicy(OnCheckpointRollingPolicy.build())
>
>       .withOutputFileConfig(createOutputFileConfig())
>
>       .build())
>
>     .uid("sink-s3-raw-request")
>
>     .name("S3 Raw Request");
>
> }
>
>
>
>

Reply via email to