maedhroz commented on a change in pull request #1045:
URL: https://github.com/apache/cassandra/pull/1045#discussion_r651434081



##########
File path: src/java/org/apache/cassandra/transport/CQLMessageHandler.java
##########
@@ -157,43 +163,131 @@ protected boolean 
processOneContainedMessage(ShareableBytes bytes, Limit endpoin
 
         // max CQL message size defaults to 256mb, so should be safe to 
downcast
         int messageSize = Ints.checkedCast(header.bodySizeInBytes);
+        
         if (throwOnOverload)
         {
+            if (DatabaseDescriptor.getNativeTransportRateLimitingEnabled() && 
!requestRateLimiter.tryAcquire())
+            {
+                discardAndThrow(endpointReserve, globalReserve, buf, header, 
messageSize, Overload.REQUESTS);
+                return true;
+            }
+            
             if (!acquireCapacity(header, endpointReserve, globalReserve))
             {
-                // discard the request and throw an exception
-                ClientMetrics.instance.markRequestDiscarded();
-                logger.trace("Discarded request of size: {}. 
InflightChannelRequestPayload: {}, " +
-                             "InflightEndpointRequestPayload: {}, 
InflightOverallRequestPayload: {}, Header: {}",
-                             messageSize,
-                             channelPayloadBytesInFlight,
-                             endpointReserve.using(),
-                             globalReserve.using(),
-                             header);
-
-                handleError(new OverloadedException("Server is in overloaded 
state. " +
-                                                    "Cannot accept more 
requests at this point"), header);
-
-                // Don't stop processing incoming messages, rely on the client 
to apply
-                // backpressure when it receives OverloadedException
-                // but discard this message as we're responding with the 
overloaded error
-                incrementReceivedMessageMetrics(messageSize);
-                buf.position(buf.position() + Envelope.Header.LENGTH + 
messageSize);
+                // TODO: Should we release a stored permit back to the rate 
limiter?
+                discardAndThrow(endpointReserve, globalReserve, buf, header, 
messageSize, Overload.BYTES_IN_FLIGHT);
                 return true;
             }
         }
-        else if (!acquireCapacityAndQueueOnFailure(header, endpointReserve, 
globalReserve))
-        {
-            // set backpressure on the channel, queuing the request until we 
have capacity
-            ClientMetrics.instance.pauseConnection();
-            return false;
+        else
+        {   
+            Overload result = maybeTriggerBackpressure(endpointReserve, 
globalReserve, header);
+            
+            // Stop processing if we triggered backpressure...
+            if (result != Overload.NONE)
+            {
+                // Indicate that the handler is backpressured so we can throw 
a client warning later.
+                backpressure = result;
+                return false;
+            }
         }
 
         channelPayloadBytesInFlight += messageSize;
         incrementReceivedMessageMetrics(messageSize);
         return processRequest(composeRequest(header, bytes));
     }
 
+    private Overload maybeTriggerBackpressure(Limit endpointReserve, Limit 
globalReserve, Envelope.Header header) 
+    {
+        if (DatabaseDescriptor.getNativeTransportRateLimitingEnabled() && 
!requestRateLimiter.tryAcquire())
+        {
+            if (decoder.isActive())
+                ClientMetrics.instance.pauseConnection();
+
+            submitBackpressureWakeupTask();

Review comment:
       One concern I had early on is that we might saturate the wake thread 
with requests when we hit the rate limit. However, given that we stop 
processing messages when back-pressure kicks in, this should naturally be 
limited to the number of connections (and those should be handled in a FIFO 
manner).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to