kamalcph commented on code in PR #16602:
URL: https://github.com/apache/kafka/pull/16602#discussion_r1759283199
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1336,34 +1336,39 @@ class UnifiedLog(@volatile var logStartOffset: Long,
val maxTimestampSoFar =
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
// lookup the position of batch to avoid extra I/O
val position =
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
- latestTimestampSegment.log.batchesFrom(position.position).asScala
+ val timestampAndOffsetOpt =
latestTimestampSegment.log.batchesFrom(position.position).asScala
.find(_.maxTimestamp() == maxTimestampSoFar.timestamp)
.flatMap(batch => batch.offsetOfMaxTimestamp().asScala.map(new
TimestampAndOffset(batch.maxTimestamp(), _,
Optional.of[Integer](batch.partitionLeaderEpoch()).filter(_ >=
0))))
+ OffsetResultHolder(timestampAndOffsetOpt)
Review Comment:
Went over
[KIP-734](https://cwiki.apache.org/confluence/display/KAFKA/KIP-734%3A+Improve+AdminClient.listOffsets+to+return+timestamp+and+offset+for+the+record+with+the+largest+timestamp),
the purpose of MAX_TIMESTAMP is to get the offset of the record with highest
timestamp in the partition:
```
Used to retrieve the offset with the largest timestamp of a partition as
message timestamps can be specified client side this may not match the log end
offset returned by LatestSpec
```
With remote storage enabled, all the passive segments might be uploaded to
remote and the local-log might contain only one **empty** active segment. We
have to handle the `MAX_TIMESTAMP` case for remote storage. Thanks for catching
this! Filed KAFKA-17552 to track this issue separately.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]