dimitarndimitrov commented on code in PR #17221:
URL: https://github.com/apache/kafka/pull/17221#discussion_r1771776869


##########
coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/HdrHistogramTest.java:
##########
@@ -172,4 +177,39 @@ public void testHistogramDataReset() {
         assertEquals(numEventsInFirstCycle, hdrHistogram.count(now + 
maxSnapshotAgeMs));
         assertEquals(numEventsInSecondCycle, hdrHistogram.count(now + 1 + 
maxSnapshotAgeMs));
     }
+
+    @Test
+    public void testLatestHistogramRace() throws InterruptedException, 
ExecutionException {
+        long maxSnapshotAgeMs = 10L;
+        long now = System.currentTimeMillis();
+        HdrHistogram hdrHistogram = new HdrHistogram(maxSnapshotAgeMs, 
MAX_VALUE, 1);
+        ExecutorService countExecutor = Executors.newFixedThreadPool(2);
+        for (int i = 1; i < 10000; i++) {
+            int numEvents = 2;
+            for (int j = 0; j < numEvents; j++) {
+                hdrHistogram.record(i);
+            }
+            final long moreThanMaxAge = now + maxSnapshotAgeMs + 1;
+            now = moreThanMaxAge;
+            CountDownLatch latch = new CountDownLatch(1);
+            Callable<Long> countTask = () -> {
+                try {
+                    assertTrue(latch.await(500, TimeUnit.MILLISECONDS));

Review Comment:
   Ah, thanks for raising that, I actually considered this and there are two 
reasons why I didn't go with a barrier:
   1. The latch is more familiar to people - e.g. in the AK codebase I can see 
>100 matches in >30 files for `CountDownLatch` and <10 matches in only 2 files 
for `CyclicBarrier`.
   2. On my local machine the latch is faster, resulting in a local run of the 
test between 400 and 500 ms, while the barrier results in a local run of the 
test between 500 and 700 ms, regardless if the barrier is reused and reset or 
not. This matches my previous experience and can be explained by the fact that 
the barrier locks manually while the latch uses AQS (the main building block of 
the various locking and synchronization utilities the JDK provides) directly.
       - Note that we can also use a `Phaser` here, and while it's faster than 
the barrier, it's even less well-known than it, it requires 2 API calls if we 
want to have a timed wait, and is still not really as fast as the latch.
   
   Just to clarify, what we are doing here is a canonical usage of a 
`CountDownLatch` - signalling the worker threads from the external test thread.



##########
coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/HdrHistogramTest.java:
##########
@@ -172,4 +177,39 @@ public void testHistogramDataReset() {
         assertEquals(numEventsInFirstCycle, hdrHistogram.count(now + 
maxSnapshotAgeMs));
         assertEquals(numEventsInSecondCycle, hdrHistogram.count(now + 1 + 
maxSnapshotAgeMs));
     }
+
+    @Test
+    public void testLatestHistogramRace() throws InterruptedException, 
ExecutionException {
+        long maxSnapshotAgeMs = 10L;
+        long now = System.currentTimeMillis();
+        HdrHistogram hdrHistogram = new HdrHistogram(maxSnapshotAgeMs, 
MAX_VALUE, 1);
+        ExecutorService countExecutor = Executors.newFixedThreadPool(2);
+        for (int i = 1; i < 10000; i++) {
+            int numEvents = 2;
+            for (int j = 0; j < numEvents; j++) {
+                hdrHistogram.record(i);
+            }
+            final long moreThanMaxAge = now + maxSnapshotAgeMs + 1;
+            now = moreThanMaxAge;
+            CountDownLatch latch = new CountDownLatch(1);
+            Callable<Long> countTask = () -> {
+                try {
+                    assertTrue(latch.await(500, TimeUnit.MILLISECONDS));
+                    return hdrHistogram.count(moreThanMaxAge);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            };
+            Future<Long> t1Future = countExecutor.submit(countTask);
+            Future<Long> t2Future = countExecutor.submit(countTask);
+            latch.countDown();
+            long t1Count = t1Future.get();
+            long t2Count = t2Future.get();
+            assertTrue(
+                numEvents == t1Count && numEvents == t2Count,

Review Comment:
   > actually, is this what we want? my understanding is that if we have two 
threads concurrently calling latestHistogram, we want the first thread to reset 
the recorder and return the previous histogram where the other thread should 
return the reset-ed histogram.
   
   Yes, this is what we want for this particular test. We want the threads to 
race for the snapshot reset, we want the thread that wins the race to be 
guaranteed to reset the snapshot, and we want the thread that loses the race to 
be guaranteed to accept the reset snapshot and not to reset the snapshot itself.
   
   > i may be wrong but both thread counts returning 2 doesn't seem right
   
   It is actually the correct behavior given the current API. We read from both 
threads using the same timestamp, which is why the latest condition I mentioned 
above is guaranteed - that the thread that loses the race will not reset the 
snapshot itself.
   
   I think some confusion might be arising from the fact that the histogram 
relies user thread collaboration for the snapshot freshness (e.g. like Guava 
Cache implementations rely on user thread collaboration for entry expiration). 
So the actual max age of the histogram snapshot in case no reads are performed 
can be infinity.
   - We don't really want to do that maintenance on the write path, and while 
it's possible to do it separately and not rely on user threads (e.g. the same 
way later versions of Caffeine, a Guava Cache successor, allow specifying an 
executor/scheduler for entry expiration) I don't think this is a necessary 
feature for the wrapper.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to