chia7712 commented on code in PR #16876:
URL: https://github.com/apache/kafka/pull/16876#discussion_r1720792871


##########
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java:
##########
@@ -65,17 +66,25 @@ public static Builder forReplica(short allowedVersion, int 
replicaId) {
         public static Builder forConsumer(boolean requireTimestamp,
                                           IsolationLevel isolationLevel,
                                           boolean requireMaxTimestamp,
-                                          boolean 
requireTieredStorageTimestamp) {
+                                          boolean 
requireEarliestLocalTimestamp,
+                                          boolean 
requireTieredStorageTimestamp,
+                                          List<ListOffsetsTopic> topics) {
             short minVersion = 0;
             if (requireTieredStorageTimestamp)
                 minVersion = 9;
+            else if (requireEarliestLocalTimestamp)
+                minVersion = 8;
             else if (requireMaxTimestamp)
                 minVersion = 7;
             else if (isolationLevel == IsolationLevel.READ_COMMITTED)
                 minVersion = 2;
             else if (requireTimestamp)
                 minVersion = 1;
-            return new Builder(minVersion, 
ApiKeys.LIST_OFFSETS.latestVersion(), CONSUMER_REPLICA_ID, isolationLevel);
+
+            short version = ApiKeys.LIST_OFFSETS.latestVersion();
+            topics.forEach(topic -> topic.partitions().forEach(partition -> 
checkVersion(version, topic.name(), partition)));

Review Comment:
   After setting the min version, the version checkcan handled by 
`NetworkClient` [0]. Hence, we don't need to add extra check here.
   
   [0] 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L528



##########
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java:
##########
@@ -65,17 +66,25 @@ public static Builder forReplica(short allowedVersion, int 
replicaId) {
         public static Builder forConsumer(boolean requireTimestamp,

Review Comment:
   There are too many boolean arguments. Could you please refactor it? for 
instance:
   ```java
           public static Builder forTieredStorageTimestamp(IsolationLevel 
isolationLevel) {
   
               return new Builder((short) 9, 
ApiKeys.LIST_OFFSETS.latestVersion(), CONSUMER_REPLICA_ID, isolationLevel);
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to