Hello, We're seeing some strange behavior with flink's KafkaConnector010 (Kafka 0.10.1.1) arbitrarily skipping data.
*Context* KafkaConnector010 is used as source, and StreamingFileSink/BulkPartWriter (S3) as sink with no intermediate operators. Recently, we noticed that millions of Kafka records were missing for *one* topic partition (this job is running for 100+ topic partitions, and such behavior was only observed for one). This job is run on YARN, and hosts were healthy with no hardware faults observed. No exceptions in jobmanager or taskmanager logs at this time. *How was this detected?* As a sanity check, we dual-write Kafka metadata (offsets) to a separate location in S3, and have monitoring to ensure that written offsets are contiguous with no duplicates. Each Kafka record is bucketed into hourly datetime partitions (UTC) in S3. *(Condensed) Taskmanager logs* 2019-11-24 02:36:50,140 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5252 with MPU ID 3XG... 2019-11-24 02:41:27,966 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5253 with MPU ID 9MW... 2019-11-24 02:46:29,153 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5254 with MPU ID 7AP... 2019-11-24 02:51:32,602 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5255 with MPU ID xQU... *2019-11-24 02:56:35,183 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5256 with MPU ID pDL...* *2019-11-24 03:01:26,059 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5257 with MPU ID Itf...* *2019-11-24 03:01:26,510 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5263 with MPU ID e3l...* 2019-11-24 03:06:26,230 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5264 with MPU ID 5z4... 2019-11-24 03:11:22,711 INFO org.apache.flink.fs.s3.common.writer.S3Committer - Committing kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5265 with MPU ID NfP... Two observations stand out from the above logs: - Datetime *2019-11-24T01* and *2019-11-24T02* are entirely skipped, resulting in millions of missing offsets. They are never written in future commits (and data in S3 shows this). - Two commits for the *same *topic partition ("digest_features", partition 4), happened nearly simultaneously on 2019-11-24 03:03, despite our commit interval being set at 5 minutes. Why was the same TopicPartition read from and committed twice in such a short interval? Would greatly appreciate if anyone is able to shed light on this issue. Happy to provide full logs if needed. Thanks