maedhroz commented on code in PR #3274:
URL: https://github.com/apache/cassandra/pull/3274#discussion_r1592843183
##########
src/java/org/apache/cassandra/transport/CQLMessageHandler.java:
##########
@@ -188,56 +193,67 @@ protected boolean
processOneContainedMessage(ShareableBytes bytes, Limit endpoin
}
if (DatabaseDescriptor.getNativeTransportRateLimitingEnabled() &&
!requestRateLimiter.tryReserve())
+ backpressure = Overload.REQUESTS;
+ else if (!dispatcher.hasQueueCapacity())
+ backpressure = Overload.QUEUE_TIME;
+
+ if (backpressure != Overload.NONE)
{
// We've already allocated against the bytes-in-flight limits,
so release those resources.
release(header);
- discardAndThrow(endpointReserve, globalReserve, buf, header,
messageSize, Overload.REQUESTS);
+ discardAndThrow(endpointReserve, globalReserve, buf, header,
messageSize, backpressure);
return true;
}
}
else
{
- Overload backpressure = Overload.NONE;
+ long delay = -1;
if (!acquireCapacityAndQueueOnFailure(header, endpointReserve,
globalReserve))
- {
- if (processRequestAndUpdateMetrics(bytes, header, messageSize,
Overload.BYTES_IN_FLIGHT))
- {
- if (decoder.isActive())
- ClientMetrics.instance.pauseConnection();
- }
-
backpressure = Overload.BYTES_IN_FLIGHT;
- }
-
+
+ // Apply rate limiting, if enabled
if (DatabaseDescriptor.getNativeTransportRateLimitingEnabled())
{
// Reserve a permit even if we've already triggered
backpressure on bytes in flight.
- long delay =
requestRateLimiter.reserveAndGetDelay(RATE_LIMITER_DELAY_UNIT);
-
+ delay =
requestRateLimiter.reserveAndGetDelay(RATE_LIMITER_DELAY_UNIT);
+
if (backpressure == Overload.NONE && delay > 0)
+ backpressure = Overload.REQUESTS;
+ }
+
+ // Check queue time, if enabled
+ if (backpressure == Overload.NONE &&
!dispatcher.hasQueueCapacity())
+ {
+ delay =
queueBackpressure.markAndGetDelay(RATE_LIMITER_DELAY_UNIT);
+
+ if (delay > 0)
+ backpressure = Overload.QUEUE_TIME;
+ }
+
+ if (backpressure != Overload.NONE)
+ {
+ if (processRequestAndUpdateMetrics(bytes, header, messageSize,
backpressure))
{
- if (processRequestAndUpdateMetrics(bytes, header,
messageSize, Overload.REQUESTS))
- {
- if (decoder.isActive())
- ClientMetrics.instance.pauseConnection();
+ if (decoder.isActive())
+ ClientMetrics.instance.pauseConnection();
- // Schedule a wakup here if we process successfully.
The connection should be closing otherwise.
+ if (delay > 0)
+ {
+ // Schedule a wakeup here if we process successfully.
The connection should be closing otherwise.
scheduleConnectionWakeupTask(delay,
RATE_LIMITER_DELAY_UNIT);
}
-
- backpressure = Overload.REQUESTS;
}
- }
-
- // If we triggered backpressure, make sure the caller stops
processing frames after the request completes.
- if (backpressure != Overload.NONE)
+
+ // If we triggered backpressure, make sure the caller stops
processing frames after the request completes.
return false;
+ }
}
return processRequestAndUpdateMetrics(bytes, header, messageSize,
Overload.NONE);
}
+
Review Comment:
nit: unnecessary newline?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]