maedhroz commented on code in PR #3274:
URL: https://github.com/apache/cassandra/pull/3274#discussion_r1592843183


##########
src/java/org/apache/cassandra/transport/CQLMessageHandler.java:
##########
@@ -188,56 +193,67 @@ protected boolean 
processOneContainedMessage(ShareableBytes bytes, Limit endpoin
             }
 
             if (DatabaseDescriptor.getNativeTransportRateLimitingEnabled() && 
!requestRateLimiter.tryReserve())
+                backpressure = Overload.REQUESTS;
+            else if (!dispatcher.hasQueueCapacity())
+                backpressure = Overload.QUEUE_TIME;
+
+            if (backpressure != Overload.NONE)
             {
                 // We've already allocated against the bytes-in-flight limits, 
so release those resources.
                 release(header);
-                discardAndThrow(endpointReserve, globalReserve, buf, header, 
messageSize, Overload.REQUESTS);
+                discardAndThrow(endpointReserve, globalReserve, buf, header, 
messageSize, backpressure);
                 return true;
             }
         }
         else
         {
-            Overload backpressure = Overload.NONE;
+            long delay = -1;
 
             if (!acquireCapacityAndQueueOnFailure(header, endpointReserve, 
globalReserve))
-            {
-                if (processRequestAndUpdateMetrics(bytes, header, messageSize, 
Overload.BYTES_IN_FLIGHT))
-                {
-                    if (decoder.isActive())
-                        ClientMetrics.instance.pauseConnection();
-                }
-
                 backpressure = Overload.BYTES_IN_FLIGHT;
-            }
-            
+
+            // Apply rate limiting, if enabled
             if (DatabaseDescriptor.getNativeTransportRateLimitingEnabled())
             {
                 // Reserve a permit even if we've already triggered 
backpressure on bytes in flight.
-                long delay = 
requestRateLimiter.reserveAndGetDelay(RATE_LIMITER_DELAY_UNIT);
-                
+                delay = 
requestRateLimiter.reserveAndGetDelay(RATE_LIMITER_DELAY_UNIT);
+
                 if (backpressure == Overload.NONE && delay > 0)
+                    backpressure = Overload.REQUESTS;
+            }
+
+            // Check queue time, if enabled
+            if (backpressure == Overload.NONE && 
!dispatcher.hasQueueCapacity())
+            {
+                delay = 
queueBackpressure.markAndGetDelay(RATE_LIMITER_DELAY_UNIT);
+
+                if (delay > 0)
+                    backpressure = Overload.QUEUE_TIME;
+            }
+
+            if (backpressure != Overload.NONE)
+            {
+                if (processRequestAndUpdateMetrics(bytes, header, messageSize, 
backpressure))
                 {
-                    if (processRequestAndUpdateMetrics(bytes, header, 
messageSize, Overload.REQUESTS))
-                    {
-                        if (decoder.isActive())
-                            ClientMetrics.instance.pauseConnection();
+                    if (decoder.isActive())
+                        ClientMetrics.instance.pauseConnection();
 
-                        // Schedule a wakup here if we process successfully. 
The connection should be closing otherwise.  
+                    if (delay > 0)
+                    {
+                        // Schedule a wakeup here if we process successfully. 
The connection should be closing otherwise.
                         scheduleConnectionWakeupTask(delay, 
RATE_LIMITER_DELAY_UNIT);
                     }
-                    
-                    backpressure = Overload.REQUESTS;
                 }
-            }
-            
-            // If we triggered backpressure, make sure the caller stops 
processing frames after the request completes.
-            if (backpressure != Overload.NONE)
+
+                // If we triggered backpressure, make sure the caller stops 
processing frames after the request completes.
                 return false;
+            }
         }
 
         return processRequestAndUpdateMetrics(bytes, header, messageSize, 
Overload.NONE);
     }
 
+

Review Comment:
   nit: unnecessary newline?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to