Hi Harrison,

One thing to keep in mind is that Flink will only write files if there
is data to write. If, for example, your partition is not active for a
period of time, then no files will be written.
Could this explain the fact that dt 2019-11-24T01 and 2019-11-24T02
are entirely skipped?

In addition, for the "duplicates", it would help if you could share a
bit more information about your BucketAssigner.
How are these names assigned to the files and what does TT stand for?
Can it be that there are a lot of events for partition 4 that fill up
2 part files for that duration? I am
asking because the counter of the 2 part files differ.

Cheers,
Kostas

On Tue, Nov 26, 2019 at 1:09 AM Harrison Xu <h...@quora.com> wrote:
>
> Hello,
>
> We're seeing some strange behavior with flink's KafkaConnector010 (Kafka 
> 0.10.1.1) arbitrarily skipping data.
>
> Context
> KafkaConnector010 is used as source, and StreamingFileSink/BulkPartWriter 
> (S3) as sink with no intermediate operators. Recently, we noticed that 
> millions of Kafka records were missing for one topic partition (this job is 
> running for 100+ topic partitions, and such behavior was only observed for 
> one). This job is run on YARN, and hosts were healthy with no hardware faults 
> observed. No exceptions in jobmanager or taskmanager logs at this time.
>
> How was this detected?
> As a sanity check, we dual-write Kafka metadata (offsets) to a separate 
> location in S3, and have monitoring to ensure that written offsets are 
> contiguous with no duplicates.
> Each Kafka record is bucketed into hourly datetime partitions (UTC) in S3.
>
> (Condensed) Taskmanager logs
> 2019-11-24 02:36:50,140 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5252 with 
> MPU ID 3XG...
> 2019-11-24 02:41:27,966 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5253 with 
> MPU ID 9MW...
> 2019-11-24 02:46:29,153 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5254 with 
> MPU ID 7AP...
> 2019-11-24 02:51:32,602 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5255 with 
> MPU ID xQU...
> 2019-11-24 02:56:35,183 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5256 with 
> MPU ID pDL...
> 2019-11-24 03:01:26,059 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5257 with 
> MPU ID Itf...
> 2019-11-24 03:01:26,510 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5263 with 
> MPU ID e3l...
> 2019-11-24 03:06:26,230 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5264 with 
> MPU ID 5z4...
> 2019-11-24 03:11:22,711 INFO  
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing 
> kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5265 with 
> MPU ID NfP...
>
> Two observations stand out from the above logs:
> - Datetime 2019-11-24T01 and 2019-11-24T02 are entirely skipped, resulting in 
> millions of missing offsets. They are never written in future commits (and 
> data in S3 shows this).
> - Two commits for the same topic partition ("digest_features", partition 4), 
> happened nearly simultaneously on 2019-11-24 03:03, despite our commit 
> interval being set at 5 minutes. Why was the same TopicPartition read from 
> and committed twice in such a short interval?
>
> Would greatly appreciate if anyone is able to shed light on this issue. Happy 
> to provide full logs if needed.
> Thanks
>
>
>
>
>
>
>
>

Reply via email to