Hello,

We're seeing some strange behavior with flink's KafkaConnector010 (Kafka
0.10.1.1) arbitrarily skipping data.

*Context*
KafkaConnector010 is used as source, and StreamingFileSink/BulkPartWriter
(S3) as sink with no intermediate operators. Recently, we noticed that
millions of Kafka records were missing for *one* topic partition (this job
is running for 100+ topic partitions, and such behavior was only observed
for one). This job is run on YARN, and hosts were healthy with no hardware
faults observed. No exceptions in jobmanager or taskmanager logs at this
time.

*How was this detected?*
As a sanity check, we dual-write Kafka metadata (offsets) to a separate
location in S3, and have monitoring to ensure that written offsets are
contiguous with no duplicates.
Each Kafka record is bucketed into hourly datetime partitions (UTC) in S3.

*(Condensed) Taskmanager logs*
2019-11-24 02:36:50,140 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5252
with MPU ID 3XG...
2019-11-24 02:41:27,966 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5253
with MPU ID 9MW...
2019-11-24 02:46:29,153 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5254
with MPU ID 7AP...
2019-11-24 02:51:32,602 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5255
with MPU ID xQU...

*2019-11-24 02:56:35,183 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5256
with MPU ID pDL...*

*2019-11-24 03:01:26,059 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5257
with MPU ID Itf...*
*2019-11-24 03:01:26,510 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5263
with MPU ID e3l...*
2019-11-24 03:06:26,230 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5264
with MPU ID 5z4...
2019-11-24 03:11:22,711 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5265
with MPU ID NfP...

Two observations stand out from the above logs:
- Datetime *2019-11-24T01* and *2019-11-24T02* are entirely skipped,
resulting in millions of missing offsets. They are never written in future
commits (and data in S3 shows this).
- Two commits for the *same *topic partition ("digest_features", partition
4), happened nearly simultaneously on 2019-11-24 03:03, despite our commit
interval being set at 5 minutes. Why was the same TopicPartition read from
and committed twice in such a short interval?

Would greatly appreciate if anyone is able to shed light on this issue.
Happy to provide full logs if needed.
Thanks

Reply via email to