josh-mckenzie commented on a change in pull request #1045:
URL: https://github.com/apache/cassandra/pull/1045#discussion_r686976650
##########
File path: src/java/org/apache/cassandra/transport/CQLMessageHandler.java
##########
@@ -157,41 +160,97 @@ protected boolean
processOneContainedMessage(ShareableBytes bytes, Limit endpoin
// max CQL message size defaults to 256mb, so should be safe to
downcast
int messageSize = Ints.checkedCast(header.bodySizeInBytes);
+
if (throwOnOverload)
{
if (!acquireCapacity(header, endpointReserve, globalReserve))
{
- // discard the request and throw an exception
- ClientMetrics.instance.markRequestDiscarded();
- logger.trace("Discarded request of size: {}.
InflightChannelRequestPayload: {}, " +
- "InflightEndpointRequestPayload: {},
InflightOverallRequestPayload: {}, Header: {}",
- messageSize,
- channelPayloadBytesInFlight,
- endpointReserve.using(),
- globalReserve.using(),
- header);
-
- handleError(new OverloadedException("Server is in overloaded
state. " +
- "Cannot accept more
requests at this point"), header);
-
- // Don't stop processing incoming messages, rely on the client
to apply
- // backpressure when it receives OverloadedException
- // but discard this message as we're responding with the
overloaded error
- incrementReceivedMessageMetrics(messageSize);
- buf.position(buf.position() + Envelope.Header.LENGTH +
messageSize);
+ discardAndThrow(endpointReserve, globalReserve, buf, header,
messageSize, Overload.BYTES_IN_FLIGHT);
+ return true;
+ }
+
+ if (DatabaseDescriptor.getNativeTransportRateLimitingEnabled() &&
!requestRateLimiter.tryReserve())
+ {
+ discardAndThrow(endpointReserve, globalReserve, buf, header,
messageSize, Overload.REQUESTS);
return true;
}
}
- else if (!acquireCapacityAndQueueOnFailure(header, endpointReserve,
globalReserve))
+ else
Review comment:
else if and remove a level of nesting here
##########
File path: src/java/org/apache/cassandra/transport/CQLMessageHandler.java
##########
@@ -424,21 +482,46 @@ protected boolean
processFirstFrameOfLargeMessage(IntactFrame frame, Limit endpo
// concurrently.
if (throwOnOverload)
{
- // discard the request and throw an exception
+ // Mark as overloaded so that discard the message after
consuming any subsequent frames.
ClientMetrics.instance.markRequestDiscarded();
- logger.trace("Discarded request of size: {}.
InflightChannelRequestPayload: {}, " +
- "InflightEndpointRequestPayload: {},
InflightOverallRequestPayload: {}, Header: {}",
- messageSize,
- channelPayloadBytesInFlight,
- endpointReserve.using(),
- globalReserve.using(),
- header);
-
- // mark as overloaded so that discard the message
- // after consuming any subsequent frames
- largeMessage.markOverloaded();
+ logOverload(endpointReserve, globalReserve, header,
messageSize);
+ largeMessage.markOverloaded(Overload.BYTES_IN_FLIGHT);
+ }
+ }
+ else if
(DatabaseDescriptor.getNativeTransportRateLimitingEnabled())
+ {
+ if (throwOnOverload)
Review comment:
Can combine this with the `!requestRateLimiter.tryReserve()` to remove a
level of nesting.
##########
File path: src/java/org/apache/cassandra/transport/CQLMessageHandler.java
##########
@@ -157,41 +160,97 @@ protected boolean
processOneContainedMessage(ShareableBytes bytes, Limit endpoin
// max CQL message size defaults to 256mb, so should be safe to
downcast
int messageSize = Ints.checkedCast(header.bodySizeInBytes);
+
if (throwOnOverload)
Review comment:
Might want to comment here on what we talked about offline, that there's
an expected "work anyway then back pressure" vs. "don't process and
throwOnOverload" delta between the two modes of operation and how they deal
with too much inbound.
##########
File path:
src/java/org/apache/cassandra/utils/concurrent/NonBlockingRateLimiter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.concurrent;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A rate limiter implementation that allows callers to reserve permits that
may only be available
+ * in the future, delegating to them decisions about how to schedule/delay
work and whether or not
+ * to block execution to do so.
+ */
+@SuppressWarnings("UnstableApiUsage")
+@ThreadSafe
+public class NonBlockingRateLimiter
+{
+ private static final long DEFAULT_BURST_NANOS =
TimeUnit.SECONDS.toNanos(1);
+
+ /** a starting time for elapsed time calculations */
+ private final long startedNanos;
+
+ /**
+ * nanoseconds of "burst time"
+ *
+ * ex. If this is configured at the default of 1 second, the limiter will
allow a
+ * number of permits equal to the configured number of permits/second
to be issued
+ * in a "burst" prior to the limiter reaching a steady state. This
allows the limiter
+ * to adhere more closely to the configured rate, as it can bring
forward short
+ * periods of inactivity if the rate of requests oscillates around the
configured limit.
+ */
+ private final long burstNanos;
+
+ /** nanoseconds from start time corresponding to the next available permit
*/
+ private final AtomicLong nextAvailable = new AtomicLong();
+
+ private volatile Ticker ticker;
+
+ private volatile int permitsPerSecond;
+
+ /** time in nanoseconds between permits on the timeline */
+ private volatile long intervalNanos;
+
+ public NonBlockingRateLimiter(int permitsPerSecond)
+ {
+ this(permitsPerSecond, DEFAULT_BURST_NANOS, Ticker.systemTicker());
+ }
+
+ @VisibleForTesting
+ public NonBlockingRateLimiter(int permitsPerSecond, long burstNanos,
Ticker ticker)
+ {
+ this.startedNanos = ticker.read();
+ this.burstNanos = burstNanos;
+ setRate(permitsPerSecond, ticker);
+ }
+
+ public void setRate(int permitsPerSecond)
+ {
+ setRate(permitsPerSecond, Ticker.systemTicker());
+ }
+
+ @VisibleForTesting
+ public synchronized void setRate(int permitsPerSecond, Ticker ticker)
+ {
+ Preconditions.checkArgument(permitsPerSecond > 0, "permits/second must
be positive");
+
+ this.ticker = ticker;
+ this.permitsPerSecond = permitsPerSecond;
+ intervalNanos = Math.max(TimeUnit.SECONDS.toNanos(1) /
permitsPerSecond, 1);
Review comment:
If someone provides < 1 permit per second and we override it, we should
probably log a warning to let someone know that's happened.
##########
File path: src/java/org/apache/cassandra/net/FrameDecoder.java
##########
@@ -190,6 +190,11 @@ boolean isConsumed()
abstract void decode(Collection<Frame> into, ShareableBytes bytes);
abstract void addLastTo(ChannelPipeline pipeline);
+ public boolean isActive()
Review comment:
Consider a javadoc comment here articulating when we expect the Frame
Decoder to be active vs. non. We didn't have that prior but we also didn't
expose it as a public API and use it this heavily.
##########
File path: src/java/org/apache/cassandra/transport/PreV5Handlers.java
##########
@@ -63,19 +64,21 @@
* Note: should only be accessed while on the netty event loop.
*/
private long channelPayloadBytesInFlight;
- private boolean paused;
+
+ /** The cause of the current connection pause, or {@link
Overload#NONE} if it is unpaused. */
+ private Overload pauseTrigger = Overload.NONE;
Review comment:
We're kind of overloading a couple things conceptually here. Consider an
`isPaused()` method that checks this variable and renaming this variable to
`overloadState` or something. Or not. The change from `paused` to
`pauseTrigger` gave me... pause. 😁
##########
File path: src/java/org/apache/cassandra/transport/ClientResourceLimits.java
##########
@@ -95,6 +100,19 @@ public static Snapshot getCurrentIpUsage()
return histogram.getSnapshot();
}
+ public static int getNativeTransportMaxRequestsPerSecond()
+ {
+ return DatabaseDescriptor.getNativeTransportMaxRequestsPerSecond();
+ }
+
+ public static void setNativeTransportMaxRequestsPerSecond(int newPerSecond)
+ {
+ double existingPerSecond = getNativeTransportMaxRequestsPerSecond();
Review comment:
Should this be an int?
##########
File path:
src/java/org/apache/cassandra/utils/concurrent/NonBlockingRateLimiter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.concurrent;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A rate limiter implementation that allows callers to reserve permits that
may only be available
+ * in the future, delegating to them decisions about how to schedule/delay
work and whether or not
+ * to block execution to do so.
+ */
+@SuppressWarnings("UnstableApiUsage")
+@ThreadSafe
+public class NonBlockingRateLimiter
+{
+ private static final long DEFAULT_BURST_NANOS =
TimeUnit.SECONDS.toNanos(1);
+
+ /** a starting time for elapsed time calculations */
+ private final long startedNanos;
+
+ /**
+ * nanoseconds of "burst time"
+ *
+ * ex. If this is configured at the default of 1 second, the limiter will
allow a
+ * number of permits equal to the configured number of permits/second
to be issued
+ * in a "burst" prior to the limiter reaching a steady state. This
allows the limiter
+ * to adhere more closely to the configured rate, as it can bring
forward short
+ * periods of inactivity if the rate of requests oscillates around the
configured limit.
+ */
+ private final long burstNanos;
+
+ /** nanoseconds from start time corresponding to the next available permit
*/
+ private final AtomicLong nextAvailable = new AtomicLong();
+
+ private volatile Ticker ticker;
+
+ private volatile int permitsPerSecond;
+
+ /** time in nanoseconds between permits on the timeline */
+ private volatile long intervalNanos;
+
+ public NonBlockingRateLimiter(int permitsPerSecond)
+ {
+ this(permitsPerSecond, DEFAULT_BURST_NANOS, Ticker.systemTicker());
+ }
+
+ @VisibleForTesting
+ public NonBlockingRateLimiter(int permitsPerSecond, long burstNanos,
Ticker ticker)
+ {
+ this.startedNanos = ticker.read();
+ this.burstNanos = burstNanos;
+ setRate(permitsPerSecond, ticker);
+ }
+
+ public void setRate(int permitsPerSecond)
+ {
+ setRate(permitsPerSecond, Ticker.systemTicker());
+ }
+
+ @VisibleForTesting
+ public synchronized void setRate(int permitsPerSecond, Ticker ticker)
+ {
+ Preconditions.checkArgument(permitsPerSecond > 0, "permits/second must
be positive");
+
+ this.ticker = ticker;
+ this.permitsPerSecond = permitsPerSecond;
+ intervalNanos = Math.max(TimeUnit.SECONDS.toNanos(1) /
permitsPerSecond, 1);
+ nextAvailable.set(nanosElapsed());
+ }
+
+ /**
+ * @return the number of available permits per second
+ */
+ public int getRate()
+ {
+ return permitsPerSecond;
+ }
+
+ /**
+ * Reserves a single permit slot on the timeline which may not yet be
available.
+ *
+ * @return microseconds until the reserved permit will be available (or
zero if it already is)
+ */
+ public long reserveAndGetWaitMicros()
+ {
+ long nowNanos = nanosElapsed();
+
+ for (;;)
+ {
+ long prev = nextAvailable.get();
+ long interval = this.intervalNanos;
+
+ // Push the first available permit slot up to the burst window if
necessary.
+ long firstAvailable = Math.max(prev, nowNanos - burstNanos);
+
+ // Advance the configured interval starting from the bounded
previous permit slot.
+ if (nextAvailable.compareAndSet(prev, firstAvailable + interval))
+ // If the time now is before the first available slot, return
the delay.
+ return TimeUnit.NANOSECONDS.toMicros(Math.max(0,
firstAvailable - nowNanos));
+ }
+ }
+
+ /**
+ * Reserves a single permit slot on the timeline, but only if one is
available.
+ *
+ * @return true if a permit is available, false if one is not
+ */
+ public boolean tryReserve()
Review comment:
Having duplication between `reserveAndGetWaitMicros` and `tryReserve`
makes it a little subtle to see that certain paths in the message handler are
forcing permit reservation and processing queries and others not. While I don't
love it, another option here would be unifying these two methods with something
like the following:
```
/**
* Reserves a single permit slot on the timeline which may not yet be
available.
*
* @return microseconds until the reserved permit will be available, 0 if
available, -1 on failure
*/
public long tryReserve(boolean force)
{
long nowNanos = nanosElapsed();
for (;;)
{
long prev = nextAvailable.get();
long interval = this.intervalNanos;
// Push the first available permit slot up to the burst window if
necessary.
long firstAvailable = Math.max(prev, nowNanos - burstNanos);
// If we haven't reached the time for the first available permit,
we've failed to reserve.
if (!force && nowNanos < firstAvailable)
return -1;
// Advance the configured interval starting from the bounded
previous permit slot.
// If another thread has already taken the next slot, retry.
if (nextAvailable.compareAndSet(prev, firstAvailable + interval))
return TimeUnit.NANOSECONDS.toMicros(Math.max(0, firstAvailable
- nowNanos));
}
}
```
For the record: don't love that return API. Just wanted to surface that we
should think twice before a) having two different places we keep our logic for
"permit" reservation, and b) having a more clear API on "Reserve whether the
time is right or not" vs. "Maybe Reserve" would be helpful.
##########
File path:
src/java/org/apache/cassandra/utils/concurrent/NonBlockingRateLimiter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.concurrent;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A rate limiter implementation that allows callers to reserve permits that
may only be available
+ * in the future, delegating to them decisions about how to schedule/delay
work and whether or not
+ * to block execution to do so.
+ */
+@SuppressWarnings("UnstableApiUsage")
+@ThreadSafe
+public class NonBlockingRateLimiter
+{
+ private static final long DEFAULT_BURST_NANOS =
TimeUnit.SECONDS.toNanos(1);
+
+ /** a starting time for elapsed time calculations */
+ private final long startedNanos;
+
+ /**
+ * nanoseconds of "burst time"
+ *
+ * ex. If this is configured at the default of 1 second, the limiter will
allow a
+ * number of permits equal to the configured number of permits/second
to be issued
Review comment:
I'm about 75% sure it doesn't allow permits/second count, but instead
allows for burstNanos / interval count to be allocated. For example, if
nextAvailable is some time far in the past, you allocated at nowNanos -
burstNanos, setting the next alloc at that sentinel + interval. Assuming a
microburst of requests, your next will alloc at new sentinel + interval
(nextAvailable == nowNanos - burstNanos), then add interval. Rinse and repeat
in increments of interval (i.e. burstNanos as your original window / interval)
until you cross nowNanos and get throttled.
Which may very well be fine. But _if_ that's how the logic is working, we
should probably update the comment here to explain that.
##########
File path:
test/unit/org/apache/cassandra/utils/concurrent/NonBlockingRateLimiterTest.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.concurrent;
+
+import com.google.common.base.Ticker;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@SuppressWarnings("UnstableApiUsage")
+public class NonBlockingRateLimiterTest
Review comment:
Related to another comment above, but we could use a test that confirms
the burst logic works as we expect.
##########
File path: src/java/org/apache/cassandra/transport/PreV5Handlers.java
##########
@@ -123,48 +129,113 @@ private void
releaseItem(Flusher.FlushItem<Message.Response> item)
* purpose since it does not have the message envelope associated with
the exception.
* <p>
* Note: this method should execute on the netty event loop.
+ *
+ * @return the type of {@link Overload} triggered
*/
- private boolean shouldHandleRequest(ChannelHandlerContext ctx,
Message.Request request)
+ private Overload checkLimits(ChannelHandlerContext ctx,
Message.Request request)
Review comment:
Consider mentioning in javadoc comment here why some of the paths set
back pressure and others don't. It's not clear at face value w/out tracing the
usage of the return why some conditional branches set it and others don't,
which is liable to cause issues in the future if someone comes in here to
modify it and there's that subtlety.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]