Hi, I have a Kafka topic configured with:
message.timestamp.type=LogAppendTime I'm using the "brod" [1] Kafka client and I have noticed that it does return the CreateTime instead of LogAppendTime when fetching the messages. I have tracked down that the "kafka_protocol" library (used by the brod client) always uses the firstTimestamp from the Record Batch and timestampDelta from the Record to compute each record's timestamp [2]. This always gives the CreateTime. In the official Java client, it looks like that when LogAppendTime is in effect (determined by the attribute timestampType in the Record Batch), it uses the maxTimestamp from the Record Batch [3] to set the timestamp in each Record [4]. Is this the exact behaviour which is expected to be followed by clients? I've come just across several resources which gave me few hints: * KIP-32 [5], which just talks about the Message format with magic < 2. * KAFKA-5353 [6], which changed the baseTimeStamp to always be the create timestamp. On the other hand, the documentation does not give a clue that clients should use the maxTimestamp when LogAppendTime is in use: * The Record Batch documentation [7] does not explain the individual fields semantics. * Wiki page "A Guide To The Kafka Protocol" [8] is more detailed on the FirstTimestamp, TimestampDelta and MaxTimestamp, but does not mention what implications does have the timestamp type on those fields. From my point of view, this is either a deficiency in Kafka, which should instead always provide the correct authoritative timestamp to consumers. Or if it is indeed expected that this logic is handled by clients, it should be explicitly written in the official documentation. For the record, here's a pull request [9] to the kafka_protocol library. [1] https://github.com/klarna/brod [2] https://github.com/klarna/kafka_protocol/blob/cc13902191b9ca3970a65388697c1069ae68fd2a/src/kpro_batch.erl#L249 [3] https://github.com/apache/kafka/blob/1f1179ea64bbaf068d759aae988bd2a6fe966161/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java#L558 [4] https://github.com/apache/kafka/blob/1f1179ea64bbaf068d759aae988bd2a6fe966161/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L330-L331 [5] https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message [6] https://issues.apache.org/jira/browse/KAFKA-5353 [7] http://kafka.apache.org/documentation/#recordbatch [8] https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets [9] https://github.com/klarna/kafka_protocol/pull/60 -- Jan Hruban
signature.asc
Description: PGP signature