TaiJuWu commented on code in PR #20553:
URL: https://github.com/apache/kafka/pull/20553#discussion_r2412251186
##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java:
##########
@@ -202,23 +207,61 @@ private void handlePartitionError(
public Map<TopicPartition, Throwable> handleUnsupportedVersionException(
int brokerId, UnsupportedVersionException exception,
Set<TopicPartition> keys
) {
- log.warn("Broker {} does not support MAX_TIMESTAMP offset specs",
brokerId);
- Map<TopicPartition, Throwable> maxTimestampPartitions = new
HashMap<>();
+ log.warn("Broker {} does not support {} offset specs", brokerId,
timestampToString(currentUnsupportedVersion));
+ Map<TopicPartition, Throwable> supportedTimestampPartitions = new
HashMap<>();
+
for (TopicPartition topicPartition : keys) {
Long offsetTimestamp =
offsetTimestampsByPartition.get(topicPartition);
- if (offsetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
- maxTimestampPartitions.put(topicPartition, exception);
+ if (offsetTimestamp != null && offsetTimestamp ==
currentUnsupportedVersion &&
!accUnsupportedTimestampPartition.containsKey(topicPartition)) {
+ supportedTimestampPartitions.put(topicPartition, exception);
+ accUnsupportedTimestampPartition.put(topicPartition,
exception);
}
}
- // If there are no partitions with MAX_TIMESTAMP specs the
UnsupportedVersionException cannot be handled
+
+ // If there are no partitions with support specs the
UnsupportedVersionException cannot be handled
// and all partitions should be failed here.
- // Otherwise, just the partitions with MAX_TIMESTAMP specs should be
failed here and the fulfillment stage
- // will later be retried for the potentially empty set of partitions
with non-MAX_TIMESTAMP specs.
- if (maxTimestampPartitions.isEmpty()) {
+ // Otherwise, just the partitions with support specs should be failed
here and the fulfillment stage
+ // will later be retried for the potentially empty set of partitions
with non-support specs.
+ if (unsupportedVersionRetry == maxUnsupportedVersionRetry &&
accUnsupportedTimestampPartition.isEmpty()) {
Review Comment:
We need to check the number of retry.
For example, there are 5 versions (-1, -2, -3, -4, -5), but we get the keys
are [-1, -2, -3, -4], in such case, we still need to fall back next check
instead of fail all.
I also add comments here, hope to make the intention is clear.
##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java:
##########
@@ -202,23 +207,61 @@ private void handlePartitionError(
public Map<TopicPartition, Throwable> handleUnsupportedVersionException(
int brokerId, UnsupportedVersionException exception,
Set<TopicPartition> keys
) {
- log.warn("Broker {} does not support MAX_TIMESTAMP offset specs",
brokerId);
- Map<TopicPartition, Throwable> maxTimestampPartitions = new
HashMap<>();
+ log.warn("Broker {} does not support {} offset specs", brokerId,
timestampToString(currentUnsupportedVersion));
+ Map<TopicPartition, Throwable> supportedTimestampPartitions = new
HashMap<>();
+
for (TopicPartition topicPartition : keys) {
Long offsetTimestamp =
offsetTimestampsByPartition.get(topicPartition);
- if (offsetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
- maxTimestampPartitions.put(topicPartition, exception);
+ if (offsetTimestamp != null && offsetTimestamp ==
currentUnsupportedVersion &&
!accUnsupportedTimestampPartition.containsKey(topicPartition)) {
+ supportedTimestampPartitions.put(topicPartition, exception);
+ accUnsupportedTimestampPartition.put(topicPartition,
exception);
Review Comment:
This is name mistake, fix it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]