hachikuji commented on a change in pull request #9802:
URL: https://github.com/apache/kafka/pull/9802#discussion_r552089415



##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -266,8 +266,16 @@ class MetadataCache(brokerId: Int) extends Logging {
 
   def getClusterMetadata(clusterId: String, listenerName: ListenerName): 
Cluster = {
     val snapshot = metadataSnapshot
-    val nodes = snapshot.aliveNodes.map { case (id, nodes) => (id, 
nodes.get(listenerName).orNull) }
-    def node(id: Integer): Node = nodes.get(id.toLong).orNull
+    val nodes = snapshot.aliveNodes.flatMap { case (id, nodesByListener) =>
+      nodesByListener.get(listenerName).map { node =>
+        id -> node
+      }
+    }
+
+    def node(id: Integer): Node = {
+      nodes.getOrElse(id.toLong, new Node(id, "", -1))

Review comment:
       I think what we are implementing here is what is documented in 
`PartitionInfo`:
   ```java
       /**
        * The complete set of replicas for this partition regardless of whether 
they are alive or up-to-date
        */
       public Node[] replicas() {
           return replicas;
       }
   ```
   Even the internal logic in `PartitionInfo` assumes non-null values in the 
array (I caught this issue because of an NPE in the `toString`). We probably 
can't 100% guarantee that all quota callbacks have appropriate `isEmpty` 
checks, but my guess is that usage of this plugin is rare in any case and 
reliance on the replica set is less likely than reliance on the leader. All in 
all, I think we're better fixing the bug.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to