splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r516997443



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -697,6 +714,31 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         info(s"Rejected connection from ${e.ip}, address already has the 
configured maximum of ${e.count} connections.")
         close(endPoint.listenerName, socketChannel)
         None
+      case e: ConnectionThrottledException =>
+        val ip = socketChannel.socket.getInetAddress
+        debug(s"Delaying closing of connection from $ip for 
${e.throttleTimeMs} ms")
+        val delayQueue = throttledSockets.computeIfAbsent(ip, _ => new 
mutable.Queue[DelayedCloseSocket])

Review comment:
       Okay, I replaced the map/queue with a priority queue.
   I did not use the java `DelayQueue` similar to the implementation in 
`ClientQuotaManager` because this throttling implementation does not use 
timeout-based polling or require a synchronized data structure, and there's 
significantly more boilerplate needed for a `DelayQueue`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to