maedhroz commented on a change in pull request #1045:
URL: https://github.com/apache/cassandra/pull/1045#discussion_r651433016
##########
File path: src/java/org/apache/cassandra/transport/PreV5Handlers.java
##########
@@ -123,48 +127,112 @@ private void
releaseItem(Flusher.FlushItem<Message.Response> item)
* purpose since it does not have the message envelope associated with
the exception.
* <p>
* Note: this method should execute on the netty event loop.
+ *
+ * @return the type of {@link Overload} triggered
*/
- private boolean shouldHandleRequest(ChannelHandlerContext ctx,
Message.Request request)
+ @SuppressWarnings("UnstableApiUsage")
+ private Overload checkLimits(ChannelHandlerContext ctx,
Message.Request request)
{
long requestSize = request.getSource().header.bodySizeInBytes;
+ Overload backpressure = Overload.NONE;
// check for overloaded state by trying to allocate the message
size from inflight payload trackers
if (endpointPayloadTracker.tryAllocate(requestSize) !=
ResourceLimits.Outcome.SUCCESS)
{
if (request.connection.isThrowOnOverload())
{
- // discard the request and throw an exception
- ClientMetrics.instance.markRequestDiscarded();
- logger.trace("Discarded request of size: {}.
InflightChannelRequestPayload: {}, {}, Request: {}",
- requestSize,
- channelPayloadBytesInFlight,
- endpointPayloadTracker.toString(),
- request);
- throw ErrorMessage.wrap(new OverloadedException("Server is
in overloaded state. Cannot accept more requests at this point"),
-
request.getSource().header.streamId);
+ discardAndThrow(request, requestSize,
Overload.BYTES_IN_FLIGHT);
}
else
{
// set backpressure on the channel, and handle the request
endpointPayloadTracker.allocate(requestSize);
- ctx.channel().config().setAutoRead(false);
- ClientMetrics.instance.pauseConnection();
- paused = true;
+ pauseConnection(ctx);
+ backpressure = Overload.BYTES_IN_FLIGHT;
+ }
+ }
+
+ if (DatabaseDescriptor.getNativeTransportRateLimitingEnabled())
+ {
+ // Make sure we haven't breached the configured request-level
rate limit.
+ if (request.connection.isThrowOnOverload())
+ {
+ if (!GLOBAL_REQUEST_LIMITER.tryAcquire())
+ {
+ // We've already allocated against the payload tracker
here, so release those
+ // resources before aborting.
+ endpointPayloadTracker.release(requestSize);
+ discardAndThrow(request, requestSize,
Overload.REQUESTS);
+ }
+ }
+ else if (GLOBAL_REQUEST_LIMITER.reserveAndGetWaitLength() > 0)
+ {
+ // Force acquire a permit and handle the request, but set
backpressure on the channel first.
+ backpressure = Overload.REQUESTS;
+ pauseConnection(ctx);
+
+ CONNECTION_WAKER.submit(() ->
+ {
+ // We should be able to unpause if either rate
limiting is disabled or a permits is available.
Review comment:
```suggestion
// We should be able to unpause if either rate
limiting is disabled or a permit is available.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]