TaiJuWu commented on code in PR #20553:
URL: https://github.com/apache/kafka/pull/20553#discussion_r2412251186


##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java:
##########
@@ -202,23 +207,61 @@ private void handlePartitionError(
     public Map<TopicPartition, Throwable> handleUnsupportedVersionException(
         int brokerId, UnsupportedVersionException exception, 
Set<TopicPartition> keys
     ) {
-        log.warn("Broker {} does not support MAX_TIMESTAMP offset specs", 
brokerId);
-        Map<TopicPartition, Throwable> maxTimestampPartitions = new 
HashMap<>();
+        log.warn("Broker {} does not support {} offset specs", brokerId, 
timestampToString(currentUnsupportedVersion));
+        Map<TopicPartition, Throwable> supportedTimestampPartitions = new 
HashMap<>();
+
         for (TopicPartition topicPartition : keys) {
             Long offsetTimestamp = 
offsetTimestampsByPartition.get(topicPartition);
-            if (offsetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
-                maxTimestampPartitions.put(topicPartition, exception);
+            if (offsetTimestamp != null && offsetTimestamp == 
currentUnsupportedVersion && 
!accUnsupportedTimestampPartition.containsKey(topicPartition)) {
+                supportedTimestampPartitions.put(topicPartition, exception);
+                accUnsupportedTimestampPartition.put(topicPartition, 
exception);
             }
         }
-        // If there are no partitions with MAX_TIMESTAMP specs the 
UnsupportedVersionException cannot be handled
+
+        // If there are no partitions with support specs the 
UnsupportedVersionException cannot be handled
         // and all partitions should be failed here.
-        // Otherwise, just the partitions with MAX_TIMESTAMP specs should be 
failed here and the fulfillment stage
-        // will later be retried for the potentially empty set of partitions 
with non-MAX_TIMESTAMP specs.
-        if (maxTimestampPartitions.isEmpty()) {
+        // Otherwise, just the partitions with support specs should be failed 
here and the fulfillment stage
+        // will later be retried for the potentially empty set of partitions 
with non-support specs.
+        if (unsupportedVersionRetry == maxUnsupportedVersionRetry && 
accUnsupportedTimestampPartition.isEmpty()) {

Review Comment:
   We need to check the number of retry.
   For example, there are 5 versions (-1, -2, -3, -4, -5), but we get the keys 
are [-1, -2, -3, -4], in such case, we still need to fall back next check 
instead of fail all.
   
   I also add comments here, hope to make the intention is clear.



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java:
##########
@@ -202,23 +207,61 @@ private void handlePartitionError(
     public Map<TopicPartition, Throwable> handleUnsupportedVersionException(
         int brokerId, UnsupportedVersionException exception, 
Set<TopicPartition> keys
     ) {
-        log.warn("Broker {} does not support MAX_TIMESTAMP offset specs", 
brokerId);
-        Map<TopicPartition, Throwable> maxTimestampPartitions = new 
HashMap<>();
+        log.warn("Broker {} does not support {} offset specs", brokerId, 
timestampToString(currentUnsupportedVersion));
+        Map<TopicPartition, Throwable> supportedTimestampPartitions = new 
HashMap<>();
+
         for (TopicPartition topicPartition : keys) {
             Long offsetTimestamp = 
offsetTimestampsByPartition.get(topicPartition);
-            if (offsetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
-                maxTimestampPartitions.put(topicPartition, exception);
+            if (offsetTimestamp != null && offsetTimestamp == 
currentUnsupportedVersion && 
!accUnsupportedTimestampPartition.containsKey(topicPartition)) {
+                supportedTimestampPartitions.put(topicPartition, exception);
+                accUnsupportedTimestampPartition.put(topicPartition, 
exception);

Review Comment:
   This is name mistake, fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to