chia7712 commented on code in PR #20461:
URL: https://github.com/apache/kafka/pull/20461#discussion_r2376370868
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -3108,37 +3108,57 @@ public DescribeLogDirsRequest.Builder createRequest(int
timeoutMs) {
@Override
public void handleResponse(AbstractResponse abstractResponse) {
DescribeLogDirsResponse response =
(DescribeLogDirsResponse) abstractResponse;
- for (Map.Entry<String, LogDirDescription> responseEntry :
logDirDescriptions(response).entrySet()) {
+
+ Set<TopicPartition> pendingPartitions = new
HashSet<>(replicaDirInfoByPartition.keySet());
+ Map<String, Throwable> directoryFailures = new HashMap<>();
+ Map<String, LogDirDescription> descriptions =
logDirDescriptions(response);
+ if (descriptions.isEmpty()) {
+ Errors error = response.data().errorCode() ==
Errors.NONE.code()
+ ? Errors.CLUSTER_AUTHORIZATION_FAILED
+ : Errors.forCode(response.data().errorCode());
+ handleFailure(error.exception(), pendingPartitions);
+ }
+
+ for (Map.Entry<String, LogDirDescription> responseEntry :
descriptions.entrySet()) {
String logDir = responseEntry.getKey();
LogDirDescription logDirInfo =
responseEntry.getValue();
// No replica info will be provided if the log
directory is offline
if (logDirInfo.error() instanceof
KafkaStorageException)
continue;
- if (logDirInfo.error() != null)
- handleFailure(new IllegalStateException(
- "The error " +
logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in
the response from broker " + brokerId + " is illegal"));
-
- for (Map.Entry<TopicPartition, ReplicaInfo>
replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) {
- TopicPartition tp = replicaInfoEntry.getKey();
- ReplicaInfo replicaInfo =
replicaInfoEntry.getValue();
- ReplicaLogDirInfo replicaLogDirInfo =
replicaDirInfoByPartition.get(tp);
- if (replicaLogDirInfo == null) {
- log.warn("Server response from broker {}
mentioned unknown partition {}", brokerId, tp);
- } else if (replicaInfo.isFuture()) {
- replicaDirInfoByPartition.put(tp, new
ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(),
-
replicaLogDirInfo.getCurrentReplicaOffsetLag(),
- logDir,
- replicaInfo.offsetLag()));
- } else {
- replicaDirInfoByPartition.put(tp, new
ReplicaLogDirInfo(logDir,
- replicaInfo.offsetLag(),
- replicaLogDirInfo.getFutureReplicaLogDir(),
-
replicaLogDirInfo.getFutureReplicaOffsetLag()));
+
+ if (logDirInfo.error() == null) {
+ for (Map.Entry<TopicPartition, ReplicaInfo>
replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) {
+ TopicPartition tp = replicaInfoEntry.getKey();
+ ReplicaInfo replicaInfo =
replicaInfoEntry.getValue();
+ ReplicaLogDirInfo replicaLogDirInfo =
replicaDirInfoByPartition.get(tp);
+ if (replicaLogDirInfo == null) {
+ log.warn("Server response from broker {}
mentioned unknown partition {}", brokerId, tp);
+ } else if (replicaInfo.isFuture()) {
+ replicaDirInfoByPartition.put(tp, new
ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(),
+
replicaLogDirInfo.getCurrentReplicaOffsetLag(),
+ logDir,
+ replicaInfo.offsetLag()));
+ } else {
+ replicaDirInfoByPartition.put(tp, new
ReplicaLogDirInfo(logDir,
+ replicaInfo.offsetLag(),
+
replicaLogDirInfo.getFutureReplicaLogDir(),
+
replicaLogDirInfo.getFutureReplicaOffsetLag()));
+ }
+ pendingPartitions.remove(tp);
}
+ } else {
+ directoryFailures.put(logDir, logDirInfo.error());
}
}
+ if (!pendingPartitions.isEmpty() &&
!directoryFailures.isEmpty()) {
Review Comment:
This change is a bit complicated for me. Could you try collecting the topic
partitions for each directory error, and then use that collection to complete
the `futures`? for example:
```java
final Map<TopicPartition, Exception> failedTps = new
HashMap<>();
for (Map.Entry<String, LogDirDescription> responseEntry
: logDirDescriptions(response).entrySet()) {
String logDir = responseEntry.getKey();
LogDirDescription logDirInfo =
responseEntry.getValue();
// No replica info will be provided if the log
directory is offline
if (logDirInfo.error() instanceof
KafkaStorageException)
continue;
if (logDirInfo.error() != null)
handleFailure(new IllegalStateException(
"The error " +
logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in
the response from broker " + brokerId + " is illegal"));
for (Map.Entry<TopicPartition, ReplicaInfo>
replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) {
TopicPartition tp = replicaInfoEntry.getKey();
if (logDirInfo.error() != null) {
failedTps.put(tp, logDirInfo.error());
continue;
}
...
}
}
for (var entry : failedTps.entrySet()) {
var tp = entry.getKey();
var future = futures.get(new
TopicPartitionReplica(tp.topic(), tp.partition(), brokerId));
future.completeExceptionally(entry.getValue());
}
```
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -3108,37 +3108,57 @@ public DescribeLogDirsRequest.Builder createRequest(int
timeoutMs) {
@Override
public void handleResponse(AbstractResponse abstractResponse) {
DescribeLogDirsResponse response =
(DescribeLogDirsResponse) abstractResponse;
- for (Map.Entry<String, LogDirDescription> responseEntry :
logDirDescriptions(response).entrySet()) {
+
+ Set<TopicPartition> pendingPartitions = new
HashSet<>(replicaDirInfoByPartition.keySet());
+ Map<String, Throwable> directoryFailures = new HashMap<>();
+ Map<String, LogDirDescription> descriptions =
logDirDescriptions(response);
+ if (descriptions.isEmpty()) {
+ Errors error = response.data().errorCode() ==
Errors.NONE.code()
+ ? Errors.CLUSTER_AUTHORIZATION_FAILED
+ : Errors.forCode(response.data().errorCode());
+ handleFailure(error.exception(), pendingPartitions);
+ }
+
+ for (Map.Entry<String, LogDirDescription> responseEntry :
descriptions.entrySet()) {
String logDir = responseEntry.getKey();
LogDirDescription logDirInfo =
responseEntry.getValue();
// No replica info will be provided if the log
directory is offline
if (logDirInfo.error() instanceof
KafkaStorageException)
continue;
- if (logDirInfo.error() != null)
- handleFailure(new IllegalStateException(
- "The error " +
logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in
the response from broker " + brokerId + " is illegal"));
-
- for (Map.Entry<TopicPartition, ReplicaInfo>
replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) {
- TopicPartition tp = replicaInfoEntry.getKey();
- ReplicaInfo replicaInfo =
replicaInfoEntry.getValue();
- ReplicaLogDirInfo replicaLogDirInfo =
replicaDirInfoByPartition.get(tp);
- if (replicaLogDirInfo == null) {
- log.warn("Server response from broker {}
mentioned unknown partition {}", brokerId, tp);
- } else if (replicaInfo.isFuture()) {
- replicaDirInfoByPartition.put(tp, new
ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(),
-
replicaLogDirInfo.getCurrentReplicaOffsetLag(),
- logDir,
- replicaInfo.offsetLag()));
- } else {
- replicaDirInfoByPartition.put(tp, new
ReplicaLogDirInfo(logDir,
- replicaInfo.offsetLag(),
- replicaLogDirInfo.getFutureReplicaLogDir(),
-
replicaLogDirInfo.getFutureReplicaOffsetLag()));
+
+ if (logDirInfo.error() == null) {
+ for (Map.Entry<TopicPartition, ReplicaInfo>
replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) {
+ TopicPartition tp = replicaInfoEntry.getKey();
+ ReplicaInfo replicaInfo =
replicaInfoEntry.getValue();
+ ReplicaLogDirInfo replicaLogDirInfo =
replicaDirInfoByPartition.get(tp);
+ if (replicaLogDirInfo == null) {
+ log.warn("Server response from broker {}
mentioned unknown partition {}", brokerId, tp);
+ } else if (replicaInfo.isFuture()) {
+ replicaDirInfoByPartition.put(tp, new
ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(),
+
replicaLogDirInfo.getCurrentReplicaOffsetLag(),
+ logDir,
+ replicaInfo.offsetLag()));
+ } else {
+ replicaDirInfoByPartition.put(tp, new
ReplicaLogDirInfo(logDir,
+ replicaInfo.offsetLag(),
+
replicaLogDirInfo.getFutureReplicaLogDir(),
+
replicaLogDirInfo.getFutureReplicaOffsetLag()));
+ }
+ pendingPartitions.remove(tp);
}
+ } else {
+ directoryFailures.put(logDir, logDirInfo.error());
}
}
+ if (!pendingPartitions.isEmpty() &&
!directoryFailures.isEmpty()) {
+ List<String> errorAtDir = new ArrayList<>();
+ directoryFailures.forEach((k, v) ->
errorAtDir.add(v.getClass().getName() + " at " + k));
+ Throwable error = new IllegalStateException("The error
" + String.join(", ", errorAtDir) + " in the response from broker " + brokerId
+ " is illegal");
Review Comment:
we could generate the string directly. for example:
```java
directoryFailures.entrySet().stream().map(entry ->
entry.getValue().getClass().getName() + " at " + entry.getKey())
.collect(Collectors.joining(","))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]