maedhroz commented on a change in pull request #1045:
URL: https://github.com/apache/cassandra/pull/1045#discussion_r678573497



##########
File path: src/java/org/apache/cassandra/transport/PreV5Handlers.java
##########
@@ -95,23 +95,26 @@ private void 
releaseItem(Flusher.FlushItem<Message.Response> item)
 
             // since the request has been processed, decrement inflight 
payload at channel, endpoint and global levels
             channelPayloadBytesInFlight -= itemSize;
-            ResourceLimits.Outcome endpointGlobalReleaseOutcome = 
endpointPayloadTracker.release(itemSize);
+            boolean globalInFlightBytesBelowLimit = 
endpointPayloadTracker.release(itemSize) == ResourceLimits.Outcome.BELOW_LIMIT;
 
-            // now check to see if we need to reenable the channel's autoRead.
-            // If the current payload side is zero, we must reenable autoread 
as
+            // Now check to see if we need to reenable the channel's autoRead.
+            //
+            // If the current payload bytes in flight is zero, we must 
reenable autoread as
             // 1) we allow no other thread/channel to do it, and
-            // 2) there's no other events following this one (becuase we're at 
zero bytes in flight),
-            // so no successive to trigger the other clause in this if-block
+            // 2) there are no other events following this one (becuase we're 
at zero bytes in flight),
+            // so no successive to trigger the other clause in this if-block.
+            //
+            // The only exception to this is if the global request rate limit 
has been breached, which means
+            // we'll have to wait until a scheduled wakeup task unpauses the 
connection.
             //
             // note: this path is only relevant when part of a pre-V5 
pipeline, as only in this case is
             // paused ever set to true. In pipelines configured for V5 or 
later, backpressure and control
             // over the inbound pipeline's autoread status are handled by the 
FrameDecoder/FrameProcessor.
             ChannelConfig config = item.channel.config();
-            if (paused && (channelPayloadBytesInFlight == 0 || 
endpointGlobalReleaseOutcome == ResourceLimits.Outcome.BELOW_LIMIT))
+
+            if (!config.isAutoRead() && (channelPayloadBytesInFlight == 0 || 
globalInFlightBytesBelowLimit) && GLOBAL_REQUEST_LIMITER.canAcquire())

Review comment:
       Let me describe this in prose first. Assuming auto-read is currently 
disabled...
   
   1.) If the wait time until the next permit is > 0, schedule unpause on the 
event loop after that amount of time has passed. (It shouldn't be necessary to 
take a permit here, given the next request we process will take one itself.) If 
an unpause is already scheduled, this shouldn't be harmful, and one of them 
will be a no-op.
   
   2.) If the wait time is <= 0, immediately unpause if the bytes in flight == 
0 or we've at least fallen below the limit. (i.e. the logic before this patch)
   
   In code...
   
   ```
   if (!config.isAutoRead())
   {
       long waitTimeMicros = GLOBAL_REQUEST_LIMITER.waitTimeMicros();
       
       if (waitTimeMicros > 0)
       {
           item.channel.eventLoop().schedule(() -> unpauseConnection(config), 
waitTimeMicros, TimeUnit.MICROSECONDS);
       }
       else if (channelPayloadBytesInFlight == 0 || 
globalInFlightBytesBelowLimit)
       {
           unpauseConnection(config);
       }
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to