netudima commented on code in PR #4003:
URL: https://github.com/apache/cassandra/pull/4003#discussion_r2032078397


##########
src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java:
##########
@@ -855,4 +855,319 @@ public String toString()
             return "[" + min + ',' + max + ']';
         }
     }
+
+    interface MetricCleaner
+    {
+        void clean();
+    }
+
+    private static class BucketsPhantomReference extends 
PhantomReference<Object> implements MetricCleaner
+    {
+        private final MetricCleaner cleaner;
+
+        public BucketsPhantomReference(ReferenceQueue<? super Object> q, 
MetricCleaner cleaner)
+        {
+            super(Thread.currentThread(), q);
+            this.cleaner = cleaner;
+        }
+
+        public void clean()
+        {
+            cleaner.clean();
+        }
+    }
+
+    /**
+     * Writes are exclusive to the thread-local buckets, so we can use a 
single updater for all threads.
+     * Readers will see a consistent view of the buckets and could be blocked 
for a while.
+     * <p>
+     * The class is aslso being tracked by a phantom reference queue to 
release the accumulated buckets when the thread is dead.
+     */
+    protected class BucketsThreadLocal
+    {
+        // try to use int[] instead of long[] to reduce memory usage, and move 
to the sum array when overflow
+        private final AtomicReference<DecayingArray> decayingRef;
+        private final long[] estimated;
+        private volatile boolean writing;
+
+        public BucketsThreadLocal(int size)
+        {
+            this.decayingRef = new AtomicReference<>(new DecayingArray(size, 
decayingEstimatedBuckets.decayLandmark));
+            this.estimated = new long[size];
+        }
+
+        public void update(int index, long now)
+        {
+            // This is only called by the thread that owns the thread local, 
so we don't need to worry about contention.
+            // Once the rescaling has occurred, we need to flush the values to 
the decayingBucket and report that the values are no longer in use.
+            writing = true;
+            try
+            {
+                DecayingArray decaying = decayingRef.get();
+                if (decaying.decayLandmark != 
decayingEstimatedBuckets.decayLandmark)
+                    decayingEstimatedBuckets.flush(this, decaying);
+
+                decayingRef.get().update(index, now);
+                estimated[index]++;
+            }
+            finally
+            {
+                writing = false;
+            }
+        }
+
+        public void release()
+        {
+            // The release method could be called by the 
FastThreadLocal#onRemoval or by the PhantomReference queue.
+            // We need to make sure we transfer the values to the 
decayingBuckets only once.
+            // There is also no need for the BucketsThreadLocal#inUse check 
since the thread is dead and no one will update the values.
+            if (!bucketsThreadLocals.contains(this))
+                return;
+            long stamp = bucketsStampedLock.writeLock();
+            try
+            {
+                if (!bucketsThreadLocals.remove(this))
+                    return;
+                DecayingArray locDecaying = decayingRef.get();
+                // The same write lock is used to flush the values to the 
decaying buckets and to rescale the values,
+                // so the landmark is safe to use from the decaying array.
+                decayingEstimatedBuckets.updateExclusive((index, value) -> 
locDecaying.data[(int) index] + value,
+                                                         (index, value) -> 
estimated[(int) index] + value,
+                                                         
locDecaying.decayLandmark);
+            }
+            finally
+            {
+                bucketsStampedLock.unlockWrite(stamp);
+            }
+        }
+    }
+
+    private static class DecayingEstimatedBuckets
+    {
+        private final int size;
+        /** Lock to protect the decaying {@code buckets}. Only one thread can 
update the buckets at a time. */
+        private final StampedLock stampedLock;
+        private final ConcurrentLinkedQueue<DecayingArray> decayingPending = 
new ConcurrentLinkedQueue<>();
+        /**
+         * The buckets array is used to store the decaying and estimated 
buckets, we can use the same array for both.
+         * The actual size of the array is twice the size of the decaying 
buckets or the estimated buckets.
+         * <p>
+         * The first half is used for the decaying buckets and the second half 
is used for the estimated buckets.
+         */
+        private final long[] buckets;
+        private volatile long decayLandmark;
+
+        public DecayingEstimatedBuckets(StampedLock shared, int size, long now)
+        {
+            this.size = size;
+            this.buckets = new long[size * 2];
+            this.decayLandmark = now;
+            this.stampedLock = shared;
+        }
+
+        public void rescale(Set<BucketsThreadLocal> locals, long now)
+        {
+            if (now - decayLandmark <= LANDMARK_RESET_INTERVAL_IN_NS)
+                return;
+            long stamp = stampedLock.writeLock();
+            try
+            {
+                long previousDecayLandmark = decayLandmark;
+                decayLandmark = now;
+
+                // The list of thread locals should be fetched after the lock 
is taken and decayLandmark is updated.
+                for (BucketsThreadLocal local : locals)
+                {
+                    while (true)
+                    {
+                        DecayingArray prev = local.decayingRef.get();
+                        // Skip the thread local if it was created after the 
decayLandmark was updated.
+                        if (prev.decayLandmark == now)
+                            break;
+                        if (local.decayingRef.compareAndSet(prev, new 
DecayingArray(size, now)))
+                        {
+                            // We successfully switched the thread local to 
the new decayLandmark, wait for the thread to finish updating.
+                            while (local.writing)
+                                LockSupport.parkNanos(50);
+                            decayingPending.offer(prev);
+                            break;
+                        }
+                    }
+                }
+                flushPendingExclusive();
+                
DecayingEstimatedHistogramReservoir.decay(LongBuffer.wrap(buckets, 0, size), 
previousDecayLandmark, now);
+            }
+            finally
+            {
+                stampedLock.unlockWrite(stamp);
+            }
+        }
+
+        public void updateExclusive(LongBinaryOperator decayingOp, 
LongBinaryOperator estimatedOp, long decayLandmark)
+        {
+            assert stampedLock.isWriteLocked();
+            this.decayLandmark = decayLandmark;
+            for (int i = 0; i < size; i++)
+            {
+                buckets[i] = decayingOp.applyAsLong(i, buckets[i]);
+                buckets[size + i] = estimatedOp.applyAsLong(i, buckets[size + 
i]);
+            }
+        }
+
+        /**
+         * Used only by a thread-local writer to flush the values to the 
buffer.
+         */
+        public void flush(BucketsThreadLocal local, DecayingArray decaying)
+        {
+            long stamp = stampedLock.tryWriteLock();
+            if (stamp > 0)
+            {
+                try
+                {
+                    boolean success = 
local.decayingRef.compareAndSet(decaying, new DecayingArray(size, 
decayLandmark));
+                    assert success : "The thread local was updated by another 
thread";
+                    decayingPending.offer(decaying);
+                    flushPendingExclusive();
+                }
+                finally
+                {
+                    stampedLock.unlockWrite(stamp);
+                }
+            }
+            else
+            {
+                boolean success = local.decayingRef.compareAndSet(decaying, 
new DecayingArray(size, decayLandmark));
+                // If the CAS failed, the thread local was updated by the 
rescale thread and all the values were already flushed.
+                if (success)
+                    decayingPending.offer(decaying);
+            }
+        }
+
+        public DecayingEstimatedArray snapshot(Set<BucketsThreadLocal> locals)
+        {
+            long[] decaying = new long[size];
+            long[] estimated = new long[size];
+            long resultLandmark = this.decayLandmark;
+            long stamp;
+            do
+            {
+                stamp = stampedLock.tryWriteLock();
+                if (stamp > 0)
+                {
+                    try
+                    {
+                        flushPendingExclusive();
+                    }
+                    finally
+                    {
+                        stampedLock.unlockWrite(stamp);
+                    }
+                }
+                // If the write lock is not available, we need to use the 
optimistic read lock.
+                // This will allow us to read the buckets without blocking 
other threads.
+                // We need to make sure that the buckets and the list of 
thread locals are consistent,
+                // while we are reading them, so we won't miss any updates or 
overlap with thread locals being released.
+                stamp = stampedLock.tryOptimisticRead();
+                if (stamp == 0)
+                {
+                    LockSupport.parkNanos(this, 100);
+                    continue;
+                }
+                Arrays.fill(decaying, 0);
+                Arrays.fill(estimated, 0);
+                resultLandmark = this.decayLandmark;
+                for (int i = 0; i < size; i++)
+                {
+                    decaying[i] = buckets[i];
+                    estimated[i] = buckets[size + i];
+                }
+                for (BucketsThreadLocal local : locals)
+                {
+                    DecayingArray decayingLoc = local.decayingRef.get();
+                    long[] estimatedLoc = local.estimated;
+                    for (int i = 0; i < size; i++)
+                    {
+                        decaying[i] += decayingLoc.data[i];
+                        estimated[i] += estimatedLoc[i];
+                    }
+                }
+            } while (!stampedLock.validate(stamp));
+
+            return new DecayingEstimatedArray(decaying, estimated, 
resultLandmark);
+        }
+
+        private void flushPendingExclusive()
+        {
+            assert stampedLock.isWriteLocked();
+            DecayingArray arr;
+            while ((arr = decayingPending.poll()) != null)
+            {
+                // We need to flush the values to the decaying buckets only, 
which is a half of the buckets array.
+                for (int i = 0; i < arr.data.length; i++)
+                    buckets[i] += arr.data[i];
+            }
+        }
+    }
+
+    private static class DecayingEstimatedArray
+    {
+        private final long[] decaying;
+        private final long[] estimated;
+        private final long decayLandmark;
+
+        public DecayingEstimatedArray(long[] decaying, long[] estimated, long 
decayLandmark)
+        {
+            this.decaying = decaying;
+            this.estimated = estimated;
+            this.decayLandmark = decayLandmark;
+        }
+
+        public long[] estimated()
+        {
+            return estimated;
+        }
+
+        public long[] decaying()
+        {
+            return decaying;
+        }
+
+        public long landmark()
+        {
+            return decayLandmark;
+        }
+    }
+
+    /**
+     * This class is used to store the decaying buckets in a thread local 
variable along with the landmark.
+     * No concurrency issues are expected here, as the thread local is only 
used by one thread at a time.
+     */
+    private static class DecayingArray
+    {
+        private final long[] data;
+        private final long decayLandmark;
+        /**
+         * As SampledClock is used to register the last time the decay weight 
was sampled,
+         * and the precision of the clock is not guaranteed to be nanoseconds 
(approximately 2ms),
+         * we can avoid calculating the decay weight for every sample and 
instead use the last calculated weight.
+         */
+        private long lastSampledClock;
+        private long lastDecayedWeight;
+
+        public DecayingArray(int size, long decayLandmark)
+        {
+            this.data = new long[size];
+            this.decayLandmark = decayLandmark;
+        }
+
+        public void update(int index, long now)
+        {
+            if (lastSampledClock != now)

Review Comment:
   forwardDecayWeight actually uses seconds granularity, so we can reuse the 
value for much longer..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to