edoardocomar commented on code in PR #15530:
URL: https://github.com/apache/kafka/pull/15530#discussion_r1526169481


##########
core/src/main/scala/kafka/server/KafkaServer.scala:
##########
@@ -617,14 +617,21 @@ class KafkaServer(
             }
           }
         }
-        socketServer.enableRequestProcessing(authorizerFutures)
+        val enableRequestProcessingFuture = 
socketServer.enableRequestProcessing(authorizerFutures)
         // Block here until all the authorizer futures are complete
         try {
           CompletableFuture.allOf(authorizerFutures.values.toSeq: _*).join()
         } catch {
           case t: Throwable => throw new RuntimeException("Received a fatal 
error while " +
             "waiting for all of the authorizer futures to be completed.", t)
         }
+        // Wait for all the SocketServer ports to be open, and the Acceptors 
to be started.
+        try {
+          enableRequestProcessingFuture.join()
+        } catch {
+          case t: Throwable => throw new RuntimeException("Received a fatal 
error while " +
+            "waiting for the SocketServer Acceptors to be started.", t)
+        }

Review Comment:
   @showuon I prefer for consistency to use the same waiting as done on the 
`authorizerFutures` in the same `KafkaServer`.
   
   The `BrokerServer` approach requires the `startupDeadline` which should be 
used in the other waits, for consistency. It is computed from 
`config.serverMaxStartupTimeMs` which although being set by default as 
Long>MAX_LONG, its is annotated as KRaft config.
   
   Maybe another PR could be used to make `KafkaServer` waits more consistent 
with `BrokerServer`.
   However, with ZK mode now deprecated, I don't think this my PR of mine is 
the right place for that change.
   
   In any case, I find the current log output shows clearly the address in use 
and for which listener, e.g.:
   
   ```
   [2024-03-15 12:54:56,944] INFO App info kafka.server for 0 unregistered 
(org.apache.kafka.common.utils.AppInfoParser)
   [2024-03-15 12:54:56,944] INFO [KafkaServer id=0] shut down completed 
(kafka.server.KafkaServer)
   [2024-03-15 12:54:56,944] ERROR Exiting Kafka due to fatal exception during 
startup. (kafka.Kafka$)
   java.lang.RuntimeException: Received a fatal error while waiting for the 
SocketServer Acceptors to be started.
        at kafka.server.KafkaServer.startup(KafkaServer.scala:633)
        at kafka.Kafka$.main(Kafka.scala:112)
        at kafka.Kafka.main(Kafka.scala)
   Caused by: java.util.concurrent.CompletionException: 
java.lang.RuntimeException: Unable to start acceptor for ListenerName(PLAINTEXT)
        at 
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
        at 
java.base/java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1527)
        at 
java.base/java.util.concurrent.CompletableFuture.allOf(CompletableFuture.java:2419)
        at 
kafka.network.SocketServer.enableRequestProcessing(SocketServer.scala:238)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:620)
        ... 2 more
   Caused by: java.lang.RuntimeException: Unable to start acceptor for 
ListenerName(PLAINTEXT)
        at kafka.network.Acceptor.liftedTree1$1(SocketServer.scala:647)
        at kafka.network.Acceptor.start(SocketServer.scala:627)
        at 
kafka.network.SocketServer.$anonfun$enableRequestProcessing$2(SocketServer.scala:223)
        at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
        at 
java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887)
        at 
java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2325)
        at 
kafka.network.SocketServer.chainAcceptorFuture$1(SocketServer.scala:216)
        at 
kafka.network.SocketServer.$anonfun$enableRequestProcessing$5(SocketServer.scala:230)
        at 
java.base/java.util.concurrent.ConcurrentHashMap$ValuesView.forEach(ConcurrentHashMap.java:4780)
        at 
kafka.network.SocketServer.enableRequestProcessing(SocketServer.scala:230)
        ... 3 more
   Caused by: org.apache.kafka.common.KafkaException: Socket server failed to 
bind to localhost:9092: Address already in use.
        at kafka.network.Acceptor.openServerSocket(SocketServer.scala:729)
        at kafka.network.Acceptor.liftedTree1$1(SocketServer.scala:632)
        ... 12 more
   Caused by: java.net.BindException: Address already in use
        at java.base/sun.nio.ch.Net.bind0(Native Method)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to