chia7712 commented on code in PR #20461:
URL: https://github.com/apache/kafka/pull/20461#discussion_r2376370868


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -3108,37 +3108,57 @@ public DescribeLogDirsRequest.Builder createRequest(int 
timeoutMs) {
                 @Override
                 public void handleResponse(AbstractResponse abstractResponse) {
                     DescribeLogDirsResponse response = 
(DescribeLogDirsResponse) abstractResponse;
-                    for (Map.Entry<String, LogDirDescription> responseEntry : 
logDirDescriptions(response).entrySet()) {
+
+                    Set<TopicPartition> pendingPartitions = new 
HashSet<>(replicaDirInfoByPartition.keySet());
+                    Map<String, Throwable> directoryFailures = new HashMap<>();
+                    Map<String, LogDirDescription> descriptions = 
logDirDescriptions(response);
+                    if (descriptions.isEmpty()) {
+                        Errors error = response.data().errorCode() == 
Errors.NONE.code()
+                                ? Errors.CLUSTER_AUTHORIZATION_FAILED
+                                : Errors.forCode(response.data().errorCode());
+                        handleFailure(error.exception(), pendingPartitions);
+                    }
+
+                    for (Map.Entry<String, LogDirDescription> responseEntry : 
descriptions.entrySet()) {
                         String logDir = responseEntry.getKey();
                         LogDirDescription logDirInfo = 
responseEntry.getValue();
 
                         // No replica info will be provided if the log 
directory is offline
                         if (logDirInfo.error() instanceof 
KafkaStorageException)
                             continue;
-                        if (logDirInfo.error() != null)
-                            handleFailure(new IllegalStateException(
-                                "The error " + 
logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in 
the response from broker " + brokerId + " is illegal"));
-
-                        for (Map.Entry<TopicPartition, ReplicaInfo> 
replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) {
-                            TopicPartition tp = replicaInfoEntry.getKey();
-                            ReplicaInfo replicaInfo = 
replicaInfoEntry.getValue();
-                            ReplicaLogDirInfo replicaLogDirInfo = 
replicaDirInfoByPartition.get(tp);
-                            if (replicaLogDirInfo == null) {
-                                log.warn("Server response from broker {} 
mentioned unknown partition {}", brokerId, tp);
-                            } else if (replicaInfo.isFuture()) {
-                                replicaDirInfoByPartition.put(tp, new 
ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(),
-                                    
replicaLogDirInfo.getCurrentReplicaOffsetLag(),
-                                    logDir,
-                                    replicaInfo.offsetLag()));
-                            } else {
-                                replicaDirInfoByPartition.put(tp, new 
ReplicaLogDirInfo(logDir,
-                                    replicaInfo.offsetLag(),
-                                    replicaLogDirInfo.getFutureReplicaLogDir(),
-                                    
replicaLogDirInfo.getFutureReplicaOffsetLag()));
+
+                        if (logDirInfo.error() == null) {
+                            for (Map.Entry<TopicPartition, ReplicaInfo> 
replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) {
+                                TopicPartition tp = replicaInfoEntry.getKey();
+                                ReplicaInfo replicaInfo = 
replicaInfoEntry.getValue();
+                                ReplicaLogDirInfo replicaLogDirInfo = 
replicaDirInfoByPartition.get(tp);
+                                if (replicaLogDirInfo == null) {
+                                    log.warn("Server response from broker {} 
mentioned unknown partition {}", brokerId, tp);
+                                } else if (replicaInfo.isFuture()) {
+                                    replicaDirInfoByPartition.put(tp, new 
ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(),
+                                            
replicaLogDirInfo.getCurrentReplicaOffsetLag(),
+                                            logDir,
+                                            replicaInfo.offsetLag()));
+                                } else {
+                                    replicaDirInfoByPartition.put(tp, new 
ReplicaLogDirInfo(logDir,
+                                            replicaInfo.offsetLag(),
+                                            
replicaLogDirInfo.getFutureReplicaLogDir(),
+                                            
replicaLogDirInfo.getFutureReplicaOffsetLag()));
+                                }
+                                pendingPartitions.remove(tp);
                             }
+                        } else {
+                            directoryFailures.put(logDir, logDirInfo.error());
                         }
                     }
 
+                    if (!pendingPartitions.isEmpty() && 
!directoryFailures.isEmpty()) {

Review Comment:
   This change is a bit complicated for me. Could you try collecting the topic 
partitions for each directory error, and then use that collection to complete 
the `futures`? for example:
   ```java
                       final Map<TopicPartition, Exception> failedTps = new 
HashMap<>();
                       for (Map.Entry<String, LogDirDescription> responseEntry 
: logDirDescriptions(response).entrySet()) {
                           String logDir = responseEntry.getKey();
                           LogDirDescription logDirInfo = 
responseEntry.getValue();
   
                           // No replica info will be provided if the log 
directory is offline
                           if (logDirInfo.error() instanceof 
KafkaStorageException)
                               continue;
                           if (logDirInfo.error() != null)
                               handleFailure(new IllegalStateException(
                                   "The error " + 
logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in 
the response from broker " + brokerId + " is illegal"));
   
                           for (Map.Entry<TopicPartition, ReplicaInfo> 
replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) {
                               TopicPartition tp = replicaInfoEntry.getKey();
                               if (logDirInfo.error() != null) {
                                   failedTps.put(tp, logDirInfo.error());
                                   continue;
                               }
                               ...
                           }
                       }
   
                       for (var entry : failedTps.entrySet()) {
                           var tp = entry.getKey();
                           var future = futures.get(new 
TopicPartitionReplica(tp.topic(), tp.partition(), brokerId));
                           future.completeExceptionally(entry.getValue());
                       }
   
   ```



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -3108,37 +3108,57 @@ public DescribeLogDirsRequest.Builder createRequest(int 
timeoutMs) {
                 @Override
                 public void handleResponse(AbstractResponse abstractResponse) {
                     DescribeLogDirsResponse response = 
(DescribeLogDirsResponse) abstractResponse;
-                    for (Map.Entry<String, LogDirDescription> responseEntry : 
logDirDescriptions(response).entrySet()) {
+
+                    Set<TopicPartition> pendingPartitions = new 
HashSet<>(replicaDirInfoByPartition.keySet());
+                    Map<String, Throwable> directoryFailures = new HashMap<>();
+                    Map<String, LogDirDescription> descriptions = 
logDirDescriptions(response);
+                    if (descriptions.isEmpty()) {
+                        Errors error = response.data().errorCode() == 
Errors.NONE.code()
+                                ? Errors.CLUSTER_AUTHORIZATION_FAILED
+                                : Errors.forCode(response.data().errorCode());
+                        handleFailure(error.exception(), pendingPartitions);
+                    }
+
+                    for (Map.Entry<String, LogDirDescription> responseEntry : 
descriptions.entrySet()) {
                         String logDir = responseEntry.getKey();
                         LogDirDescription logDirInfo = 
responseEntry.getValue();
 
                         // No replica info will be provided if the log 
directory is offline
                         if (logDirInfo.error() instanceof 
KafkaStorageException)
                             continue;
-                        if (logDirInfo.error() != null)
-                            handleFailure(new IllegalStateException(
-                                "The error " + 
logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in 
the response from broker " + brokerId + " is illegal"));
-
-                        for (Map.Entry<TopicPartition, ReplicaInfo> 
replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) {
-                            TopicPartition tp = replicaInfoEntry.getKey();
-                            ReplicaInfo replicaInfo = 
replicaInfoEntry.getValue();
-                            ReplicaLogDirInfo replicaLogDirInfo = 
replicaDirInfoByPartition.get(tp);
-                            if (replicaLogDirInfo == null) {
-                                log.warn("Server response from broker {} 
mentioned unknown partition {}", brokerId, tp);
-                            } else if (replicaInfo.isFuture()) {
-                                replicaDirInfoByPartition.put(tp, new 
ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(),
-                                    
replicaLogDirInfo.getCurrentReplicaOffsetLag(),
-                                    logDir,
-                                    replicaInfo.offsetLag()));
-                            } else {
-                                replicaDirInfoByPartition.put(tp, new 
ReplicaLogDirInfo(logDir,
-                                    replicaInfo.offsetLag(),
-                                    replicaLogDirInfo.getFutureReplicaLogDir(),
-                                    
replicaLogDirInfo.getFutureReplicaOffsetLag()));
+
+                        if (logDirInfo.error() == null) {
+                            for (Map.Entry<TopicPartition, ReplicaInfo> 
replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) {
+                                TopicPartition tp = replicaInfoEntry.getKey();
+                                ReplicaInfo replicaInfo = 
replicaInfoEntry.getValue();
+                                ReplicaLogDirInfo replicaLogDirInfo = 
replicaDirInfoByPartition.get(tp);
+                                if (replicaLogDirInfo == null) {
+                                    log.warn("Server response from broker {} 
mentioned unknown partition {}", brokerId, tp);
+                                } else if (replicaInfo.isFuture()) {
+                                    replicaDirInfoByPartition.put(tp, new 
ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(),
+                                            
replicaLogDirInfo.getCurrentReplicaOffsetLag(),
+                                            logDir,
+                                            replicaInfo.offsetLag()));
+                                } else {
+                                    replicaDirInfoByPartition.put(tp, new 
ReplicaLogDirInfo(logDir,
+                                            replicaInfo.offsetLag(),
+                                            
replicaLogDirInfo.getFutureReplicaLogDir(),
+                                            
replicaLogDirInfo.getFutureReplicaOffsetLag()));
+                                }
+                                pendingPartitions.remove(tp);
                             }
+                        } else {
+                            directoryFailures.put(logDir, logDirInfo.error());
                         }
                     }
 
+                    if (!pendingPartitions.isEmpty() && 
!directoryFailures.isEmpty()) {
+                        List<String> errorAtDir = new ArrayList<>();
+                        directoryFailures.forEach((k, v) -> 
errorAtDir.add(v.getClass().getName() + " at " + k));
+                        Throwable error = new IllegalStateException("The error 
" + String.join(", ", errorAtDir) + " in the response from broker " + brokerId 
+ " is illegal");

Review Comment:
   we could generate the string directly. for example:
   ```java
   directoryFailures.entrySet().stream().map(entry -> 
entry.getValue().getClass().getName() + " at " + entry.getKey())
                                   .collect(Collectors.joining(","))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to