Hi Harrison, One thing to keep in mind is that Flink will only write files if there is data to write. If, for example, your partition is not active for a period of time, then no files will be written. Could this explain the fact that dt 2019-11-24T01 and 2019-11-24T02 are entirely skipped?
In addition, for the "duplicates", it would help if you could share a bit more information about your BucketAssigner. How are these names assigned to the files and what does TT stand for? Can it be that there are a lot of events for partition 4 that fill up 2 part files for that duration? I am asking because the counter of the 2 part files differ. Cheers, Kostas On Tue, Nov 26, 2019 at 1:09 AM Harrison Xu <h...@quora.com> wrote: > > Hello, > > We're seeing some strange behavior with flink's KafkaConnector010 (Kafka > 0.10.1.1) arbitrarily skipping data. > > Context > KafkaConnector010 is used as source, and StreamingFileSink/BulkPartWriter > (S3) as sink with no intermediate operators. Recently, we noticed that > millions of Kafka records were missing for one topic partition (this job is > running for 100+ topic partitions, and such behavior was only observed for > one). This job is run on YARN, and hosts were healthy with no hardware faults > observed. No exceptions in jobmanager or taskmanager logs at this time. > > How was this detected? > As a sanity check, we dual-write Kafka metadata (offsets) to a separate > location in S3, and have monitoring to ensure that written offsets are > contiguous with no duplicates. > Each Kafka record is bucketed into hourly datetime partitions (UTC) in S3. > > (Condensed) Taskmanager logs > 2019-11-24 02:36:50,140 INFO > org.apache.flink.fs.s3.common.writer.S3Committer - Committing > kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5252 with > MPU ID 3XG... > 2019-11-24 02:41:27,966 INFO > org.apache.flink.fs.s3.common.writer.S3Committer - Committing > kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5253 with > MPU ID 9MW... > 2019-11-24 02:46:29,153 INFO > org.apache.flink.fs.s3.common.writer.S3Committer - Committing > kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5254 with > MPU ID 7AP... > 2019-11-24 02:51:32,602 INFO > org.apache.flink.fs.s3.common.writer.S3Committer - Committing > kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5255 with > MPU ID xQU... > 2019-11-24 02:56:35,183 INFO > org.apache.flink.fs.s3.common.writer.S3Committer - Committing > kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5256 with > MPU ID pDL... > 2019-11-24 03:01:26,059 INFO > org.apache.flink.fs.s3.common.writer.S3Committer - Committing > kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5257 with > MPU ID Itf... > 2019-11-24 03:01:26,510 INFO > org.apache.flink.fs.s3.common.writer.S3Committer - Committing > kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5263 with > MPU ID e3l... > 2019-11-24 03:06:26,230 INFO > org.apache.flink.fs.s3.common.writer.S3Committer - Committing > kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5264 with > MPU ID 5z4... > 2019-11-24 03:11:22,711 INFO > org.apache.flink.fs.s3.common.writer.S3Committer - Committing > kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5265 with > MPU ID NfP... > > Two observations stand out from the above logs: > - Datetime 2019-11-24T01 and 2019-11-24T02 are entirely skipped, resulting in > millions of missing offsets. They are never written in future commits (and > data in S3 shows this). > - Two commits for the same topic partition ("digest_features", partition 4), > happened nearly simultaneously on 2019-11-24 03:03, despite our commit > interval being set at 5 minutes. Why was the same TopicPartition read from > and committed twice in such a short interval? > > Would greatly appreciate if anyone is able to shed light on this issue. Happy > to provide full logs if needed. > Thanks > > > > > > > >