dimitarndimitrov commented on code in PR #17221: URL: https://github.com/apache/kafka/pull/17221#discussion_r1771776869
########## coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/HdrHistogramTest.java: ########## @@ -172,4 +177,39 @@ public void testHistogramDataReset() { assertEquals(numEventsInFirstCycle, hdrHistogram.count(now + maxSnapshotAgeMs)); assertEquals(numEventsInSecondCycle, hdrHistogram.count(now + 1 + maxSnapshotAgeMs)); } + + @Test + public void testLatestHistogramRace() throws InterruptedException, ExecutionException { + long maxSnapshotAgeMs = 10L; + long now = System.currentTimeMillis(); + HdrHistogram hdrHistogram = new HdrHistogram(maxSnapshotAgeMs, MAX_VALUE, 1); + ExecutorService countExecutor = Executors.newFixedThreadPool(2); + for (int i = 1; i < 10000; i++) { + int numEvents = 2; + for (int j = 0; j < numEvents; j++) { + hdrHistogram.record(i); + } + final long moreThanMaxAge = now + maxSnapshotAgeMs + 1; + now = moreThanMaxAge; + CountDownLatch latch = new CountDownLatch(1); + Callable<Long> countTask = () -> { + try { + assertTrue(latch.await(500, TimeUnit.MILLISECONDS)); Review Comment: Ah, thanks for raising that, I actually considered this and there are two reasons why I didn't go with a barrier: 1. The latch is more familiar to people - e.g. in the AK codebase I can see >100 matches in >30 files for `CountDownLatch` and <10 matches in only 2 files for `CyclicBarrier`. 2. On my local machine the latch is faster, resulting in a local run of the test between 400 and 500 ms, while the barrier results in a local run of the test between 500 and 700 ms, regardless if the barrier is reused and reset or not. This matches my previous experience and can be explained by the fact that the barrier locks manually while the latch uses AQS (the main building block of the various locking and synchronization utilities the JDK provides) directly. - Note that we can also use a `Phaser` here, and while it's faster than the barrier, it's even less well-known than it, it requires 2 API calls if we want to have a timed wait, and is still not really as fast as the latch. Just to clarify, what we are doing here is a canonical usage of a `CountDownLatch` - signalling the worker threads from the external test thread. ########## coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/HdrHistogramTest.java: ########## @@ -172,4 +177,39 @@ public void testHistogramDataReset() { assertEquals(numEventsInFirstCycle, hdrHistogram.count(now + maxSnapshotAgeMs)); assertEquals(numEventsInSecondCycle, hdrHistogram.count(now + 1 + maxSnapshotAgeMs)); } + + @Test + public void testLatestHistogramRace() throws InterruptedException, ExecutionException { + long maxSnapshotAgeMs = 10L; + long now = System.currentTimeMillis(); + HdrHistogram hdrHistogram = new HdrHistogram(maxSnapshotAgeMs, MAX_VALUE, 1); + ExecutorService countExecutor = Executors.newFixedThreadPool(2); + for (int i = 1; i < 10000; i++) { + int numEvents = 2; + for (int j = 0; j < numEvents; j++) { + hdrHistogram.record(i); + } + final long moreThanMaxAge = now + maxSnapshotAgeMs + 1; + now = moreThanMaxAge; + CountDownLatch latch = new CountDownLatch(1); + Callable<Long> countTask = () -> { + try { + assertTrue(latch.await(500, TimeUnit.MILLISECONDS)); + return hdrHistogram.count(moreThanMaxAge); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }; + Future<Long> t1Future = countExecutor.submit(countTask); + Future<Long> t2Future = countExecutor.submit(countTask); + latch.countDown(); + long t1Count = t1Future.get(); + long t2Count = t2Future.get(); + assertTrue( + numEvents == t1Count && numEvents == t2Count, Review Comment: > actually, is this what we want? my understanding is that if we have two threads concurrently calling latestHistogram, we want the first thread to reset the recorder and return the previous histogram where the other thread should return the reset-ed histogram. Yes, this is what we want for this particular test. We want the threads to race for the snapshot reset, we want the thread that wins the race to be guaranteed to reset the snapshot, and we want the thread that loses the race to be guaranteed to accept the reset snapshot and not to reset the snapshot itself. > i may be wrong but both thread counts returning 2 doesn't seem right It is actually the correct behavior given the current API. We read from both threads using the same timestamp, which is why the latest condition I mentioned above is guaranteed - that the thread that loses the race will not reset the snapshot itself. I think some confusion might be arising from the fact that the histogram relies user thread collaboration for the snapshot freshness (e.g. like Guava Cache implementations rely on user thread collaboration for entry expiration). So the actual max age of the histogram snapshot in case no reads are performed can be infinity. - We don't really want to do that maintenance on the write path, and while it's possible to do it separately and not rely on user threads (e.g. the same way later versions of Caffeine, a Guava Cache successor, allow specifying an executor/scheduler for entry expiration) I don't think this is a necessary feature for the wrapper. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org