netudima commented on code in PR #4003: URL: https://github.com/apache/cassandra/pull/4003#discussion_r2032078397
########## src/java/org/apache/cassandra/metrics/DecayingEstimatedHistogramReservoir.java: ########## @@ -855,4 +855,319 @@ public String toString() return "[" + min + ',' + max + ']'; } } + + interface MetricCleaner + { + void clean(); + } + + private static class BucketsPhantomReference extends PhantomReference<Object> implements MetricCleaner + { + private final MetricCleaner cleaner; + + public BucketsPhantomReference(ReferenceQueue<? super Object> q, MetricCleaner cleaner) + { + super(Thread.currentThread(), q); + this.cleaner = cleaner; + } + + public void clean() + { + cleaner.clean(); + } + } + + /** + * Writes are exclusive to the thread-local buckets, so we can use a single updater for all threads. + * Readers will see a consistent view of the buckets and could be blocked for a while. + * <p> + * The class is aslso being tracked by a phantom reference queue to release the accumulated buckets when the thread is dead. + */ + protected class BucketsThreadLocal + { + // try to use int[] instead of long[] to reduce memory usage, and move to the sum array when overflow + private final AtomicReference<DecayingArray> decayingRef; + private final long[] estimated; + private volatile boolean writing; + + public BucketsThreadLocal(int size) + { + this.decayingRef = new AtomicReference<>(new DecayingArray(size, decayingEstimatedBuckets.decayLandmark)); + this.estimated = new long[size]; + } + + public void update(int index, long now) + { + // This is only called by the thread that owns the thread local, so we don't need to worry about contention. + // Once the rescaling has occurred, we need to flush the values to the decayingBucket and report that the values are no longer in use. + writing = true; + try + { + DecayingArray decaying = decayingRef.get(); + if (decaying.decayLandmark != decayingEstimatedBuckets.decayLandmark) + decayingEstimatedBuckets.flush(this, decaying); + + decayingRef.get().update(index, now); + estimated[index]++; + } + finally + { + writing = false; + } + } + + public void release() + { + // The release method could be called by the FastThreadLocal#onRemoval or by the PhantomReference queue. + // We need to make sure we transfer the values to the decayingBuckets only once. + // There is also no need for the BucketsThreadLocal#inUse check since the thread is dead and no one will update the values. + if (!bucketsThreadLocals.contains(this)) + return; + long stamp = bucketsStampedLock.writeLock(); + try + { + if (!bucketsThreadLocals.remove(this)) + return; + DecayingArray locDecaying = decayingRef.get(); + // The same write lock is used to flush the values to the decaying buckets and to rescale the values, + // so the landmark is safe to use from the decaying array. + decayingEstimatedBuckets.updateExclusive((index, value) -> locDecaying.data[(int) index] + value, + (index, value) -> estimated[(int) index] + value, + locDecaying.decayLandmark); + } + finally + { + bucketsStampedLock.unlockWrite(stamp); + } + } + } + + private static class DecayingEstimatedBuckets + { + private final int size; + /** Lock to protect the decaying {@code buckets}. Only one thread can update the buckets at a time. */ + private final StampedLock stampedLock; + private final ConcurrentLinkedQueue<DecayingArray> decayingPending = new ConcurrentLinkedQueue<>(); + /** + * The buckets array is used to store the decaying and estimated buckets, we can use the same array for both. + * The actual size of the array is twice the size of the decaying buckets or the estimated buckets. + * <p> + * The first half is used for the decaying buckets and the second half is used for the estimated buckets. + */ + private final long[] buckets; + private volatile long decayLandmark; + + public DecayingEstimatedBuckets(StampedLock shared, int size, long now) + { + this.size = size; + this.buckets = new long[size * 2]; + this.decayLandmark = now; + this.stampedLock = shared; + } + + public void rescale(Set<BucketsThreadLocal> locals, long now) + { + if (now - decayLandmark <= LANDMARK_RESET_INTERVAL_IN_NS) + return; + long stamp = stampedLock.writeLock(); + try + { + long previousDecayLandmark = decayLandmark; + decayLandmark = now; + + // The list of thread locals should be fetched after the lock is taken and decayLandmark is updated. + for (BucketsThreadLocal local : locals) + { + while (true) + { + DecayingArray prev = local.decayingRef.get(); + // Skip the thread local if it was created after the decayLandmark was updated. + if (prev.decayLandmark == now) + break; + if (local.decayingRef.compareAndSet(prev, new DecayingArray(size, now))) + { + // We successfully switched the thread local to the new decayLandmark, wait for the thread to finish updating. + while (local.writing) + LockSupport.parkNanos(50); + decayingPending.offer(prev); + break; + } + } + } + flushPendingExclusive(); + DecayingEstimatedHistogramReservoir.decay(LongBuffer.wrap(buckets, 0, size), previousDecayLandmark, now); + } + finally + { + stampedLock.unlockWrite(stamp); + } + } + + public void updateExclusive(LongBinaryOperator decayingOp, LongBinaryOperator estimatedOp, long decayLandmark) + { + assert stampedLock.isWriteLocked(); + this.decayLandmark = decayLandmark; + for (int i = 0; i < size; i++) + { + buckets[i] = decayingOp.applyAsLong(i, buckets[i]); + buckets[size + i] = estimatedOp.applyAsLong(i, buckets[size + i]); + } + } + + /** + * Used only by a thread-local writer to flush the values to the buffer. + */ + public void flush(BucketsThreadLocal local, DecayingArray decaying) + { + long stamp = stampedLock.tryWriteLock(); + if (stamp > 0) + { + try + { + boolean success = local.decayingRef.compareAndSet(decaying, new DecayingArray(size, decayLandmark)); + assert success : "The thread local was updated by another thread"; + decayingPending.offer(decaying); + flushPendingExclusive(); + } + finally + { + stampedLock.unlockWrite(stamp); + } + } + else + { + boolean success = local.decayingRef.compareAndSet(decaying, new DecayingArray(size, decayLandmark)); + // If the CAS failed, the thread local was updated by the rescale thread and all the values were already flushed. + if (success) + decayingPending.offer(decaying); + } + } + + public DecayingEstimatedArray snapshot(Set<BucketsThreadLocal> locals) + { + long[] decaying = new long[size]; + long[] estimated = new long[size]; + long resultLandmark = this.decayLandmark; + long stamp; + do + { + stamp = stampedLock.tryWriteLock(); + if (stamp > 0) + { + try + { + flushPendingExclusive(); + } + finally + { + stampedLock.unlockWrite(stamp); + } + } + // If the write lock is not available, we need to use the optimistic read lock. + // This will allow us to read the buckets without blocking other threads. + // We need to make sure that the buckets and the list of thread locals are consistent, + // while we are reading them, so we won't miss any updates or overlap with thread locals being released. + stamp = stampedLock.tryOptimisticRead(); + if (stamp == 0) + { + LockSupport.parkNanos(this, 100); + continue; + } + Arrays.fill(decaying, 0); + Arrays.fill(estimated, 0); + resultLandmark = this.decayLandmark; + for (int i = 0; i < size; i++) + { + decaying[i] = buckets[i]; + estimated[i] = buckets[size + i]; + } + for (BucketsThreadLocal local : locals) + { + DecayingArray decayingLoc = local.decayingRef.get(); + long[] estimatedLoc = local.estimated; + for (int i = 0; i < size; i++) + { + decaying[i] += decayingLoc.data[i]; + estimated[i] += estimatedLoc[i]; + } + } + } while (!stampedLock.validate(stamp)); + + return new DecayingEstimatedArray(decaying, estimated, resultLandmark); + } + + private void flushPendingExclusive() + { + assert stampedLock.isWriteLocked(); + DecayingArray arr; + while ((arr = decayingPending.poll()) != null) + { + // We need to flush the values to the decaying buckets only, which is a half of the buckets array. + for (int i = 0; i < arr.data.length; i++) + buckets[i] += arr.data[i]; + } + } + } + + private static class DecayingEstimatedArray + { + private final long[] decaying; + private final long[] estimated; + private final long decayLandmark; + + public DecayingEstimatedArray(long[] decaying, long[] estimated, long decayLandmark) + { + this.decaying = decaying; + this.estimated = estimated; + this.decayLandmark = decayLandmark; + } + + public long[] estimated() + { + return estimated; + } + + public long[] decaying() + { + return decaying; + } + + public long landmark() + { + return decayLandmark; + } + } + + /** + * This class is used to store the decaying buckets in a thread local variable along with the landmark. + * No concurrency issues are expected here, as the thread local is only used by one thread at a time. + */ + private static class DecayingArray + { + private final long[] data; + private final long decayLandmark; + /** + * As SampledClock is used to register the last time the decay weight was sampled, + * and the precision of the clock is not guaranteed to be nanoseconds (approximately 2ms), + * we can avoid calculating the decay weight for every sample and instead use the last calculated weight. + */ + private long lastSampledClock; + private long lastDecayedWeight; + + public DecayingArray(int size, long decayLandmark) + { + this.data = new long[size]; + this.decayLandmark = decayLandmark; + } + + public void update(int index, long now) + { + if (lastSampledClock != now) Review Comment: forwardDecayWeight actually uses seconds granularity, so we can reuse the value for much longer.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org