Hi Dan, Can you describe under which conditions you are missing records (after a machine failure, after a Kafka failure, after taking and restoring from a savepoint, ...). Are many records missing? Are "the first records" or the "latest records" missing? Any individual records missing, or larger blocks of data?
I don't think that there's a bug in Flink or the Kafka connector. Maybe its just a configuration or systems design issue. On Sun, Apr 25, 2021 at 9:56 AM Dan Hill <quietgol...@gmail.com> wrote: > Hi! > > Have any other devs noticed issues with Flink missing Kafka records with > long-running Flink jobs? When I re-run my Flink job and start from the > earliest Kafka offset, Flink processes the events correctly. I'm using > Flink v1.11.1. > > I have a simple job that takes records (Requests) from Kafka and > serializes them to S3. Pretty basic. No related issues in the text logs. > I'm hoping I just have a configuration issue. I'm guessing idleness is > working in a way that I'm not expecting. > > Any ideas? > - Dan > > > void createLogRequestJob(StreamExecutionEnvironment env) throws Exception { > > Properties kafkaSourceProperties = > getKafkaSourceProperties("logrequest"); > > SingleOutputStreamOperator<Request> rawRequestInput = env.addSource( > > new FlinkKafkaConsumer(getInputRequestTopic(), > getProtoDeserializationSchema(Request.class), kafkaSourceProperties)) > > .uid("source-request") > > .name("Request") > > .assignTimestampsAndWatermarks( > > > WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness).withIdleness(Duration.ofMinutes(1))); > > > executeLogRequest(rawRequestInput); > > env.execute("log-request"); > > } > > > void executeLogRequest(SingleOutputStreamOperator<Request> > rawRequestInput) { > > AvroWriterFactory<Request> factory = > getAvroWriterFactory(Request.class); > > rawRequestInput.addSink(StreamingFileSink > > .forBulkFormat(new Path(getS3OutputDirectory(), "raw/request"), > factory) > > .withBucketAssigner(new DateHourBucketAssigner<Request>(request -> > request.getTiming().getEventApiTimestamp())) > > .withRollingPolicy(OnCheckpointRollingPolicy.build()) > > .withOutputFileConfig(createOutputFileConfig()) > > .build()) > > .uid("sink-s3-raw-request") > > .name("S3 Raw Request"); > > } > > > >