josh-mckenzie commented on a change in pull request #1045:
URL: https://github.com/apache/cassandra/pull/1045#discussion_r684458398



##########
File path: src/java/org/apache/cassandra/utils/NoWaitRateLimiter.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.NotThreadSafe;
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
+import com.google.common.math.LongMath;
+
+@ThreadSafe
+@SuppressWarnings("UnstableApiUsage")
+public class NoWaitRateLimiter
+{
+    @GuardedBy("this")
+    private Stopwatch stopwatch;
+
+    @GuardedBy("this")
+    private SmoothRateLimiter delegate;
+
+    private NoWaitRateLimiter(SmoothRateLimiter delegate, Ticker ticker)
+    {
+        this.delegate = delegate;
+        this.stopwatch = Stopwatch.createStarted(ticker);
+    }
+    
+    public static NoWaitRateLimiter create(double permitsPerSecond)
+    {
+        return create(permitsPerSecond, Ticker.systemTicker());
+    }
+
+    public static NoWaitRateLimiter create(double permitsPerSecond, Ticker 
ticker)
+    {
+        return create(permitsPerSecond, 1.0D, ticker);
+    }
+
+    public static NoWaitRateLimiter create(double permitsPerSecond, double 
burstSeconds)
+    {
+        return create(permitsPerSecond, burstSeconds, Ticker.systemTicker());
+    }
+
+    public static NoWaitRateLimiter create(double permitsPerSecond, double 
burstSeconds, Ticker ticker)
+    {
+        SmoothRateLimiter delegate = new SmoothRateLimiter(burstSeconds);
+        NoWaitRateLimiter limiter = new NoWaitRateLimiter(delegate, ticker);
+        limiter.setRate(permitsPerSecond);
+        return limiter;
+    }
+    
+    @VisibleForTesting
+    public synchronized void reset(double permitsPerSecond, double 
burstSeconds, Ticker ticker)

Review comment:
       If we want this usage to be for test only, consider renaming to 
`resetForTest`

##########
File path: src/java/org/apache/cassandra/utils/NoWaitRateLimiter.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.NotThreadSafe;
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
+import com.google.common.math.LongMath;
+
+@ThreadSafe
+@SuppressWarnings("UnstableApiUsage")
+public class NoWaitRateLimiter

Review comment:
       Did we consider a specialization of 
[SmoothRateLimiter](https://github.com/google/guava/blob/v30.1.1/guava/src/com/google/common/util/concurrent/SmoothRateLimiter.java
 ) from guava and decide to go our own path?
   
   Looks like 
[SmoothBursty](https://github.com/google/guava/blob/master/guava/src/com/google/common/util/concurrent/SmoothRateLimiter.java#L353)
 might be a match for what we're doing here w/the appropriate aggressive permit 
grant immediately rather than the delay behavior.
   
   From a meta stylistic perspective, I prefer their style with the more heavy 
javadoccing on the class and method level to provide a cognitive framework for 
understanding the structure of the timer - we may be able to pull some of that 
from there and slightly tune it to better reflect our behavior as fits here.
    

##########
File path: src/java/org/apache/cassandra/transport/Dispatcher.java
##########
@@ -65,20 +73,38 @@ public Dispatcher(boolean useLegacyFlusher)
         this.useLegacyFlusher = useLegacyFlusher;
     }
 
-    public void dispatch(Channel channel, Message.Request request, 
FlushItemConverter forFlusher)
+    public void dispatch(Channel channel, Message.Request request, 
FlushItemConverter forFlusher, Overload backpressure)
     {
-        requestExecutor.submit(() -> processRequest(channel, request, 
forFlusher));
+        requestExecutor.submit(() -> processRequest(channel, request, 
forFlusher, backpressure));
     }
 
     /**
      * Note: this method may be executed on the netty event loop, during 
initial protocol negotiation
      */
-    static Message.Response processRequest(ServerConnection connection, 
Message.Request request)
+    static Message.Response processRequest(ServerConnection connection, 
Message.Request request, Overload backpressure)
     {
         long queryStartNanoTime = System.nanoTime();
         if (connection.getVersion().isGreaterOrEqualTo(ProtocolVersion.V4))
             ClientWarn.instance.captureWarnings();
 
+        if (backpressure == Overload.REQUESTS)
+        {
+            String message = String.format("Request breached global limit of 
%.2f requests/second and triggered backpressure.",
+                                           
ClientResourceLimits.getNativeTransportRequestsPerSecond());
+            
+            NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, 1, 
TimeUnit.MINUTES, message);
+            ClientWarn.instance.warn(message);
+        }
+        else if (backpressure == Overload.BYTES_IN_FLIGHT)
+        {
+            NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, 1, 
TimeUnit.MINUTES,

Review comment:
       nit: is there a reason we didn't do the same here as above - construct 
the string and re-use it for both contexts?

##########
File path: src/java/org/apache/cassandra/utils/NoWaitRateLimiter.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.NotThreadSafe;
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
+import com.google.common.math.LongMath;
+
+@ThreadSafe
+@SuppressWarnings("UnstableApiUsage")
+public class NoWaitRateLimiter
+{
+    @GuardedBy("this")
+    private Stopwatch stopwatch;
+
+    @GuardedBy("this")
+    private SmoothRateLimiter delegate;
+
+    private NoWaitRateLimiter(SmoothRateLimiter delegate, Ticker ticker)
+    {
+        this.delegate = delegate;
+        this.stopwatch = Stopwatch.createStarted(ticker);
+    }
+    
+    public static NoWaitRateLimiter create(double permitsPerSecond)
+    {
+        return create(permitsPerSecond, Ticker.systemTicker());
+    }
+
+    public static NoWaitRateLimiter create(double permitsPerSecond, Ticker 
ticker)
+    {
+        return create(permitsPerSecond, 1.0D, ticker);
+    }
+
+    public static NoWaitRateLimiter create(double permitsPerSecond, double 
burstSeconds)
+    {
+        return create(permitsPerSecond, burstSeconds, Ticker.systemTicker());
+    }
+
+    public static NoWaitRateLimiter create(double permitsPerSecond, double 
burstSeconds, Ticker ticker)
+    {
+        SmoothRateLimiter delegate = new SmoothRateLimiter(burstSeconds);
+        NoWaitRateLimiter limiter = new NoWaitRateLimiter(delegate, ticker);
+        limiter.setRate(permitsPerSecond);
+        return limiter;
+    }
+    
+    @VisibleForTesting
+    public synchronized void reset(double permitsPerSecond, double 
burstSeconds, Ticker ticker)
+    {
+        this.stopwatch = Stopwatch.createStarted(ticker);
+        this.delegate = new SmoothRateLimiter(burstSeconds);
+        setRate(permitsPerSecond);
+    }
+
+    public final void setRate(double permitsPerSecond) 
+    {
+        Preconditions.checkArgument(permitsPerSecond > 0.0D && 
!Double.isNaN(permitsPerSecond), "rate must be positive");
+        
+        synchronized (this) 
+        {
+            delegate.doSetRate(permitsPerSecond, 
stopwatch.elapsed(TimeUnit.MICROSECONDS));
+        }
+    }
+
+    public synchronized double getRate() 
+    {
+        return delegate.getRate();
+    }
+
+    /**
+     * Forces the acquisition of a single permit.
+     * 
+     * @return microseconds until the acquired permit is available, and zero 
if it already is
+     */
+    public synchronized long reserveAndGetWaitLength() 
+    {
+        long nowMicros = this.stopwatch.elapsed(TimeUnit.MICROSECONDS);
+        long momentAvailable = delegate.reserveEarliestAvailable(1, nowMicros);
+        return Math.max(momentAvailable - nowMicros, 0L);
+    }
+
+    public synchronized boolean tryAcquire()
+    {
+        long nowMicros = this.stopwatch.elapsed(TimeUnit.MICROSECONDS);
+
+        if (!delegate.canAcquire(nowMicros)) 
+        {
+            return false;
+        }
+
+        delegate.reserveEarliestAvailable(1, nowMicros);
+            
+        return true;
+    }
+
+    public synchronized boolean canAcquire() 
+    {
+        long nowMicros = this.stopwatch.elapsed(TimeUnit.MICROSECONDS);
+        return delegate.canAcquire(nowMicros);
+    }
+
+    public synchronized long waitTimeMicros()
+    {
+        long nowMicros = this.stopwatch.elapsed(TimeUnit.MICROSECONDS);
+        return delegate.waitTimeMicros(nowMicros);
+    }
+
+    @VisibleForTesting
+    public synchronized long getNextFreeTicketMicros()
+    {
+        return delegate.nextFreeTicketMicros;
+    }
+
+    public synchronized double getStoredPermits()
+    {
+        return delegate.getStoredPermits();
+    }
+    
+    @Override
+    public String toString()
+    {
+        return String.format("Maximum requests/second is %.2f. %.2f stored 
permits available.", 
+                             getRate(), getStoredPermits());
+    }
+
+    @NotThreadSafe
+    private static class SmoothRateLimiter
+    {
+        private double storedPermits;
+        private double maxPermits;
+        private double stableIntervalMicros;
+        private long nextFreeTicketMicros;
+        private final double maxBurstSeconds;
+
+        private SmoothRateLimiter(double maxBurstSeconds)
+        {
+            this.nextFreeTicketMicros = 0L;
+            this.maxBurstSeconds = maxBurstSeconds;
+        }
+        
+        public double getStoredPermits()
+        {
+            return storedPermits;
+        }
+
+        public void doSetRate(double permitsPerSecond, long nowMicros)
+        {
+            resync(nowMicros);
+            this.stableIntervalMicros = (double) TimeUnit.SECONDS.toMicros(1L) 
/ permitsPerSecond;
+            doSetRate(permitsPerSecond);
+        }
+
+        private void doSetRate(double permitsPerSecond)
+        {
+            double oldMaxPermits = this.maxPermits;
+            this.maxPermits = this.maxBurstSeconds * permitsPerSecond;
+            this.storedPermits = oldMaxPermits == 0.0D ? 0.0D : 
this.storedPermits * this.maxPermits / oldMaxPermits;
+        }
+
+        final double getRate()
+        {
+            return (double) TimeUnit.SECONDS.toMicros(1L) / 
this.stableIntervalMicros;
+        }
+
+        long waitTimeMicros(long nowMicros)
+        {
+            return Math.max(this.nextFreeTicketMicros - nowMicros, 0L);
+        }
+
+        boolean canAcquire(long nowMicros)
+        {
+            return nowMicros >= this.nextFreeTicketMicros;
+        }
+
+        public long reserveEarliestAvailable(int requiredPermits, long 
nowMicros)

Review comment:
       In our implementation here we remove the 
[storedPermitsToWaitTime](https://github.com/google/guava/blob/v30.1.1/guava/src/com/google/common/util/concurrent/SmoothRateLimiter.java#L360)
 method; I believe it's because of our intention to always [allow a single 
greedy overflow on permit 
allocation](https://github.com/google/guava/blob/88b1bdcb613de9f985cf3f873b7eec0a2e13741b/android/guava/src/com/google/common/util/concurrent/SmoothRateLimiter.java#L81-L119)
 before pausing the client channel.
   
   That said, might be good to both 1) point to where our inspiration for 
things came from, and 2) comment on how our imply differs in key ways, if that 
makes sense.

##########
File path: src/java/org/apache/cassandra/utils/NoWaitRateLimiter.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.NotThreadSafe;
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
+import com.google.common.math.LongMath;
+
+@ThreadSafe
+@SuppressWarnings("UnstableApiUsage")
+public class NoWaitRateLimiter
+{
+    @GuardedBy("this")
+    private Stopwatch stopwatch;
+
+    @GuardedBy("this")
+    private SmoothRateLimiter delegate;
+
+    private NoWaitRateLimiter(SmoothRateLimiter delegate, Ticker ticker)
+    {
+        this.delegate = delegate;
+        this.stopwatch = Stopwatch.createStarted(ticker);
+    }
+    
+    public static NoWaitRateLimiter create(double permitsPerSecond)
+    {
+        return create(permitsPerSecond, Ticker.systemTicker());
+    }
+
+    public static NoWaitRateLimiter create(double permitsPerSecond, Ticker 
ticker)
+    {
+        return create(permitsPerSecond, 1.0D, ticker);
+    }
+
+    public static NoWaitRateLimiter create(double permitsPerSecond, double 
burstSeconds)
+    {
+        return create(permitsPerSecond, burstSeconds, Ticker.systemTicker());
+    }
+
+    public static NoWaitRateLimiter create(double permitsPerSecond, double 
burstSeconds, Ticker ticker)
+    {
+        SmoothRateLimiter delegate = new SmoothRateLimiter(burstSeconds);
+        NoWaitRateLimiter limiter = new NoWaitRateLimiter(delegate, ticker);
+        limiter.setRate(permitsPerSecond);
+        return limiter;
+    }
+    
+    @VisibleForTesting
+    public synchronized void reset(double permitsPerSecond, double 
burstSeconds, Ticker ticker)
+    {
+        this.stopwatch = Stopwatch.createStarted(ticker);
+        this.delegate = new SmoothRateLimiter(burstSeconds);
+        setRate(permitsPerSecond);
+    }
+
+    public final void setRate(double permitsPerSecond) 
+    {
+        Preconditions.checkArgument(permitsPerSecond > 0.0D && 
!Double.isNaN(permitsPerSecond), "rate must be positive");
+        
+        synchronized (this) 
+        {
+            delegate.doSetRate(permitsPerSecond, 
stopwatch.elapsed(TimeUnit.MICROSECONDS));
+        }
+    }
+
+    public synchronized double getRate() 
+    {
+        return delegate.getRate();
+    }
+
+    /**
+     * Forces the acquisition of a single permit.
+     * 
+     * @return microseconds until the acquired permit is available, and zero 
if it already is
+     */
+    public synchronized long reserveAndGetWaitLength() 
+    {
+        long nowMicros = this.stopwatch.elapsed(TimeUnit.MICROSECONDS);
+        long momentAvailable = delegate.reserveEarliestAvailable(1, nowMicros);
+        return Math.max(momentAvailable - nowMicros, 0L);
+    }
+
+    public synchronized boolean tryAcquire()
+    {
+        long nowMicros = this.stopwatch.elapsed(TimeUnit.MICROSECONDS);
+
+        if (!delegate.canAcquire(nowMicros)) 
+        {
+            return false;
+        }
+
+        delegate.reserveEarliestAvailable(1, nowMicros);
+            
+        return true;
+    }
+
+    public synchronized boolean canAcquire() 
+    {
+        long nowMicros = this.stopwatch.elapsed(TimeUnit.MICROSECONDS);
+        return delegate.canAcquire(nowMicros);
+    }
+
+    public synchronized long waitTimeMicros()
+    {
+        long nowMicros = this.stopwatch.elapsed(TimeUnit.MICROSECONDS);
+        return delegate.waitTimeMicros(nowMicros);
+    }
+
+    @VisibleForTesting
+    public synchronized long getNextFreeTicketMicros()
+    {
+        return delegate.nextFreeTicketMicros;
+    }
+
+    public synchronized double getStoredPermits()
+    {
+        return delegate.getStoredPermits();
+    }
+    
+    @Override
+    public String toString()
+    {
+        return String.format("Maximum requests/second is %.2f. %.2f stored 
permits available.", 
+                             getRate(), getStoredPermits());
+    }
+
+    @NotThreadSafe
+    private static class SmoothRateLimiter
+    {
+        private double storedPermits;
+        private double maxPermits;
+        private double stableIntervalMicros;
+        private long nextFreeTicketMicros;
+        private final double maxBurstSeconds;
+
+        private SmoothRateLimiter(double maxBurstSeconds)
+        {
+            this.nextFreeTicketMicros = 0L;
+            this.maxBurstSeconds = maxBurstSeconds;
+        }
+        
+        public double getStoredPermits()
+        {
+            return storedPermits;
+        }
+
+        public void doSetRate(double permitsPerSecond, long nowMicros)
+        {
+            resync(nowMicros);

Review comment:
       Having two signatures for doSetRate, one of which takes a timestamp and 
_mutates internal state of the timer_ vs. one that just recalculates stored 
permits, is pretty confusing to me and seems like it'd be error prone on future 
use. Just double-checked the guava one and they don't have the signature below 
(sans nowMicros). Recommend renaming this one to something that indicates it 
recalculates permits prior to calling doSetRate or renaming the other to 
indicate it just recalculates permits.

##########
File path: src/java/org/apache/cassandra/utils/NoWaitRateLimiter.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.NotThreadSafe;
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
+import com.google.common.math.LongMath;
+
+@ThreadSafe
+@SuppressWarnings("UnstableApiUsage")
+public class NoWaitRateLimiter
+{
+    @GuardedBy("this")
+    private Stopwatch stopwatch;
+
+    @GuardedBy("this")
+    private SmoothRateLimiter delegate;
+
+    private NoWaitRateLimiter(SmoothRateLimiter delegate, Ticker ticker)
+    {
+        this.delegate = delegate;
+        this.stopwatch = Stopwatch.createStarted(ticker);
+    }
+    
+    public static NoWaitRateLimiter create(double permitsPerSecond)
+    {
+        return create(permitsPerSecond, Ticker.systemTicker());
+    }
+
+    public static NoWaitRateLimiter create(double permitsPerSecond, Ticker 
ticker)
+    {
+        return create(permitsPerSecond, 1.0D, ticker);
+    }
+
+    public static NoWaitRateLimiter create(double permitsPerSecond, double 
burstSeconds)
+    {
+        return create(permitsPerSecond, burstSeconds, Ticker.systemTicker());
+    }
+
+    public static NoWaitRateLimiter create(double permitsPerSecond, double 
burstSeconds, Ticker ticker)
+    {
+        SmoothRateLimiter delegate = new SmoothRateLimiter(burstSeconds);
+        NoWaitRateLimiter limiter = new NoWaitRateLimiter(delegate, ticker);
+        limiter.setRate(permitsPerSecond);
+        return limiter;
+    }
+    
+    @VisibleForTesting
+    public synchronized void reset(double permitsPerSecond, double 
burstSeconds, Ticker ticker)
+    {
+        this.stopwatch = Stopwatch.createStarted(ticker);
+        this.delegate = new SmoothRateLimiter(burstSeconds);
+        setRate(permitsPerSecond);
+    }
+
+    public final void setRate(double permitsPerSecond) 
+    {
+        Preconditions.checkArgument(permitsPerSecond > 0.0D && 
!Double.isNaN(permitsPerSecond), "rate must be positive");
+        
+        synchronized (this) 
+        {
+            delegate.doSetRate(permitsPerSecond, 
stopwatch.elapsed(TimeUnit.MICROSECONDS));
+        }
+    }
+
+    public synchronized double getRate() 
+    {
+        return delegate.getRate();
+    }
+
+    /**
+     * Forces the acquisition of a single permit.
+     * 
+     * @return microseconds until the acquired permit is available, and zero 
if it already is
+     */
+    public synchronized long reserveAndGetWaitLength() 
+    {
+        long nowMicros = this.stopwatch.elapsed(TimeUnit.MICROSECONDS);
+        long momentAvailable = delegate.reserveEarliestAvailable(1, nowMicros);
+        return Math.max(momentAvailable - nowMicros, 0L);
+    }
+
+    public synchronized boolean tryAcquire()
+    {
+        long nowMicros = this.stopwatch.elapsed(TimeUnit.MICROSECONDS);
+
+        if (!delegate.canAcquire(nowMicros)) 
+        {
+            return false;
+        }
+
+        delegate.reserveEarliestAvailable(1, nowMicros);
+            
+        return true;
+    }
+
+    public synchronized boolean canAcquire() 
+    {
+        long nowMicros = this.stopwatch.elapsed(TimeUnit.MICROSECONDS);
+        return delegate.canAcquire(nowMicros);
+    }
+
+    public synchronized long waitTimeMicros()
+    {
+        long nowMicros = this.stopwatch.elapsed(TimeUnit.MICROSECONDS);
+        return delegate.waitTimeMicros(nowMicros);
+    }
+
+    @VisibleForTesting
+    public synchronized long getNextFreeTicketMicros()
+    {
+        return delegate.nextFreeTicketMicros;
+    }
+
+    public synchronized double getStoredPermits()
+    {
+        return delegate.getStoredPermits();
+    }
+    
+    @Override
+    public String toString()
+    {
+        return String.format("Maximum requests/second is %.2f. %.2f stored 
permits available.", 
+                             getRate(), getStoredPermits());
+    }
+
+    @NotThreadSafe
+    private static class SmoothRateLimiter
+    {
+        private double storedPermits;
+        private double maxPermits;
+        private double stableIntervalMicros;
+        private long nextFreeTicketMicros;
+        private final double maxBurstSeconds;
+
+        private SmoothRateLimiter(double maxBurstSeconds)
+        {
+            this.nextFreeTicketMicros = 0L;
+            this.maxBurstSeconds = maxBurstSeconds;
+        }
+        
+        public double getStoredPermits()
+        {
+            return storedPermits;
+        }
+
+        public void doSetRate(double permitsPerSecond, long nowMicros)
+        {
+            resync(nowMicros);
+            this.stableIntervalMicros = (double) TimeUnit.SECONDS.toMicros(1L) 
/ permitsPerSecond;
+            doSetRate(permitsPerSecond);
+        }
+
+        private void doSetRate(double permitsPerSecond)
+        {
+            double oldMaxPermits = this.maxPermits;
+            this.maxPermits = this.maxBurstSeconds * permitsPerSecond;
+            this.storedPermits = oldMaxPermits == 0.0D ? 0.0D : 
this.storedPermits * this.maxPermits / oldMaxPermits;
+        }
+
+        final double getRate()
+        {
+            return (double) TimeUnit.SECONDS.toMicros(1L) / 
this.stableIntervalMicros;
+        }
+
+        long waitTimeMicros(long nowMicros)
+        {
+            return Math.max(this.nextFreeTicketMicros - nowMicros, 0L);
+        }
+
+        boolean canAcquire(long nowMicros)
+        {
+            return nowMicros >= this.nextFreeTicketMicros;
+        }
+
+        public long reserveEarliestAvailable(int requiredPermits, long 
nowMicros)
+        {
+            resync(nowMicros);
+            long momentAvailableMicros = this.nextFreeTicketMicros;
+            double storedPermitsToSpend = Math.min(requiredPermits, 
this.storedPermits);
+            double freshPermits = (double) requiredPermits - 
storedPermitsToSpend;
+            long waitMicros = (long) (freshPermits * 
this.stableIntervalMicros);

Review comment:
       long-winded nit: Given freshPermits can basically be whatever arbitrary 
number gets passed in as requiredPermits that overflows storedPermitsToSpend, 
we don't really have anything in the call pipeline to prevent single large 
queries from heavily starving a single connection. While things are hard-coded 
to 1 for now, I see this as being something somewhat brittle in the pipeline 
for this API; might be worth either commenting on this at call sites, at method 
header here, or putting something in the code-base to indicate not just a max 
rate allowable per endpoint but perhaps a max permit request size and cap on 
that?
   
   I think that'd be a YAGNI violation to put in today, but at least commenting 
to that effect might help if someone else is working on this implementation in 
the future.

##########
File path: test/burn/org/apache/cassandra/transport/SimpleClientPerfTest.java
##########
@@ -190,7 +199,9 @@ public int encodedSize(QueryMessage queryMessage, 
ProtocolVersion version)
         AtomicBoolean measure = new AtomicBoolean(false);
         DescriptiveStatistics stats = new DescriptiveStatistics();
         Lock lock = new ReentrantLock();
-
+        RateLimiter limiter = RateLimiter.create(2000);

Review comment:
       If we're testing perf here, should we be using the NoWaitRateLimiter 
instead of the guava RateLimiter?

##########
File path: test/unit/org/apache/cassandra/utils/NoWaitRateLimiterTest.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Ticker;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class NoWaitRateLimiterTest
+{
+    @Test
+    public void shouldProperlyInitializeRate()
+    {
+        NoWaitRateLimiter limiter = NoWaitRateLimiter.create(1000.0);
+        assertEquals(1000.0, limiter.getRate(), 0.0);
+    }
+
+    @SuppressWarnings("UnstableApiUsage")
+    @Test
+    public void shouldAdvanceWaitTime()
+    {
+        Ticker ticker = new Ticker()
+        {
+            @Override
+            public long read()
+            {
+                return 0;
+            }
+        };
+
+        double permitsPerSecond = 1000.0;
+        NoWaitRateLimiter limiter = NoWaitRateLimiter.create(permitsPerSecond, 
0.0, ticker);
+        assertEquals(0.0, limiter.waitTimeMicros(), 0.0);
+        limiter.reserveAndGetWaitLength();
+        assertEquals(permitsPerSecond, limiter.waitTimeMicros(), 0.0);
+    }
+
+    @SuppressWarnings("UnstableApiUsage")
+    @Test
+    public void shouldAdvanceNextFreeTicketMicrosWithZeroBurst()
+    {
+        double permitsPerSecond = 1000.0;
+        
+        Ticker ticker = new Ticker()
+        {
+            @Override
+            public long read()
+            {
+                return 0;
+            }
+        };
+        
+        NoWaitRateLimiter limiter = NoWaitRateLimiter.create(permitsPerSecond, 
0.0, ticker);
+        long start = limiter.getNextFreeTicketMicros();
+        limiter.reserveAndGetWaitLength();
+        assertEquals(permitsPerSecond, limiter.getNextFreeTicketMicros() - 
start, 0.0);
+    }
+
+    @SuppressWarnings("UnstableApiUsage")
+    @Test
+    public void shouldNotAcquireWhenPermitsExhaustedAndBeforeNextTicket()
+    {
+        double permitsPerSecond = 1000.0;
+        final AtomicLong tick = new AtomicLong(0);
+
+        Ticker ticker = new Ticker()
+        {
+            @Override
+            public long read()
+            {
+                return tick.get();
+            }
+        };
+
+        NoWaitRateLimiter limiter = NoWaitRateLimiter.create(permitsPerSecond, 
0.0, ticker);
+        assertTrue(limiter.tryAcquire());
+        assertFalse(limiter.canAcquire());
+        
+        // Advance the clock to the next ticket time, and verify we can 
acquire again:
+        tick.addAndGet(TimeUnit.MICROSECONDS.toNanos((long) permitsPerSecond));
+        assertTrue(limiter.tryAcquire());
+        assertFalse(limiter.canAcquire());
+    }
+
+    @SuppressWarnings("UnstableApiUsage")
+    @Test
+    public void shouldNotAcquireAfterConumingAllStoredPermits()

Review comment:
       nit: spelling. Should be `shouldNotAcquireAfterConsumingAllStoredPermits`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to