apovzner commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r512887499



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -697,6 +714,31 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         info(s"Rejected connection from ${e.ip}, address already has the 
configured maximum of ${e.count} connections.")
         close(endPoint.listenerName, socketChannel)
         None
+      case e: ConnectionThrottledException =>
+        val ip = socketChannel.socket.getInetAddress
+        debug(s"Delaying closing of connection from $ip for 
${e.throttleTimeMs} ms")
+        val delayQueue = throttledSockets.computeIfAbsent(ip, _ => new 
mutable.Queue[DelayedCloseSocket])

Review comment:
       I agree, that it is better to optimize for checking connections vs. 
overhead of adding/removing to the queue, because `closeThrottledConnections` 
runs pretty often (on a loop), which I think also means that finding that there 
are no connections yet to unthrottle would also be common. Or very few 
connections to unthrottle. So, after reading all your evaluations above, I am 
also leaning towards using a delay queue here.
   
   Not sure if the question #1 about why we need to delay closing a connection 
got answered, so answering just in case. Since we want to throttle accepting 
connections from an IP, closing a connection due to reaching IP quota right 
away would not help with throttling accepting connections since that IP is 
going to reconnect. 
   
   if we close the connection right away, that IP is going to reconnect right 
away, adn 

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -697,6 +714,31 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         info(s"Rejected connection from ${e.ip}, address already has the 
configured maximum of ${e.count} connections.")
         close(endPoint.listenerName, socketChannel)
         None
+      case e: ConnectionThrottledException =>
+        val ip = socketChannel.socket.getInetAddress
+        debug(s"Delaying closing of connection from $ip for 
${e.throttleTimeMs} ms")
+        val delayQueue = throttledSockets.computeIfAbsent(ip, _ => new 
mutable.Queue[DelayedCloseSocket])

Review comment:
       I agree, that it is better to optimize for checking connections vs. 
overhead of adding/removing to the queue, because `closeThrottledConnections` 
runs pretty often (on a loop), which I think also means that finding that there 
are no connections yet to unthrottle would also be common. Or very few 
connections to unthrottle. So, after reading all your evaluations above, I am 
also leaning towards using a delay queue here.
   
   Not sure if the question #1 about why we need to delay closing a connection 
got answered, so answering just in case. Since we want to throttle accepting 
connections from an IP, closing a connection due to reaching IP quota right 
away would not help with throttling accepting connections since that IP is 
going to reconnect. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to