frankvicky commented on code in PR #16873:
URL: https://github.com/apache/kafka/pull/16873#discussion_r1722613611
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1170,6 +1174,8 @@ class KafkaApis(val requestChannel: RequestChannel,
debug(s"OffsetRequest with correlation id $correlationId from client
$clientId on partition $topicPartition " +
s"failed because the partition is duplicated in the request.")
buildErrorResponse(Errors.INVALID_REQUEST, partition)
+ } else if
(timestampMinSupportedVersion.contains(partition.timestamp()) && version <
timestampMinSupportedVersion(partition.timestamp())) {
Review Comment:
Hi @FrankYang0529 and @chia7712,
In the `ListOffsetsRequest`, we have the forConsumer field that was recently
updated to handle different timestamps. Perhaps we could define the behavior
based on this logic.
```java
public static Builder forConsumer(boolean requireTimestamp,
IsolationLevel isolationLevel,
boolean requireMaxTimestamp,
boolean
requireEarliestLocalTimestamp,
boolean
requireTieredStorageTimestamp) {
short minVersion = 0;
if (requireTieredStorageTimestamp)
minVersion = 9;
else if (requireEarliestLocalTimestamp)
minVersion = 8;
else if (requireMaxTimestamp)
minVersion = 7;
else if (isolationLevel == IsolationLevel.READ_COMMITTED)
minVersion = 2;
else if (requireTimestamp)
minVersion = 1;
return new Builder(minVersion,
ApiKeys.LIST_OFFSETS.latestVersion(), CONSUMER_REPLICA_ID, isolationLevel);
}
```
WYDT
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]