C0urante commented on code in PR #14156:
URL: https://github.com/apache/kafka/pull/14156#discussion_r1287870397


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -227,7 +227,7 @@ private void clearSyncArray(OffsetSync[] syncs, OffsetSync 
offsetSync) {
         }
     }
 
-    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync[] original, 
OffsetSync offsetSync) {

Review Comment:
   Nit: feels a little strange that we're passing in a to-be-mutated sync 
array. Any reason not to alter `updateSyncArray` to only take in the original 
sync array and the new sync, and construct and return the new sync array?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -153,31 +156,84 @@ public void testPastOffsetTranslation() {
     }
 
     @Test
-    public void testKeepMostDistinctSyncs() {
+    public void testConsistentlySpacedSyncs() {
         // Under normal operation, the incoming syncs will be regularly spaced 
and the store should keep a set of syncs
         // which provide the best translation accuracy (expires as few syncs 
as possible)
-        // Each new sync should be added to the cache and expire at most one 
other sync from the cache
-        long iterations = 10000;
+        long iterations = 100;
         long maxStep = Long.MAX_VALUE / iterations;
         // Test a variety of steps (corresponding to the offset.lag.max 
configuration)
         for (long step = 1; step < maxStep; step = (step * 2) + 1)  {
             for (long firstOffset = 0; firstOffset < 30; firstOffset++) {
-                try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
-                    int lastCount = 1;
-                    store.start();
-                    for (long offset = firstOffset; offset <= iterations; 
offset += step) {
-                        store.sync(tp, offset, offset);
-                        // Invariant A: the latest sync is present
-                        assertEquals(offset, store.syncFor(tp, 
0).upstreamOffset());
-                        // Invariant D: the earliest sync is present
-                        assertEquals(firstOffset, store.syncFor(tp, 
63).upstreamOffset());
-                        int count = countDistinctStoredSyncs(store, tp);
-                        int diff = count - lastCount;
-                        assertTrue(diff >= 0,
-                                "Store expired too many syncs: " + diff + " 
after receiving offset " + offset);
-                        lastCount = count;
-                    }
-                }
+                long finalStep = step;
+                // Generate a stream of consistently spaced syncs
+                // Each new sync should be added to the cache and expire at 
most one other sync from the cache
+                assertSyncSpacingHasBoundedExpirations(firstOffset, 
LongStream.generate(() -> finalStep).limit(iterations), 1);
+            }
+        }
+    }
+
+    @Test
+    public void testRandomlySpacedSyncs() {
+        Random random = new Random(0L); // arbitrary but deterministic seed
+        int iterationBits = 10;
+        long iterations = 1 << iterationBits;
+        for (int n = 1; n < Long.SIZE - iterationBits; n++) {
+            // A stream with at most n bits of difference between the largest 
and smallest steps
+            // will expire n + 2 syncs at once in the worst case, because the 
sync store is laid out exponentially.
+            long maximumDifference = 1L << n;
+            int maximumExpirations = n + 2;
+            assertSyncSpacingHasBoundedExpirations(0, random.longs(iterations, 
0L, maximumDifference), maximumExpirations);
+            // This holds true even if there is a larger minimum step size, 
such as caused by offsetLagMax
+            long offsetLagMax = 1L << 16;
+            assertSyncSpacingHasBoundedExpirations(0, random.longs(iterations, 
offsetLagMax, offsetLagMax + maximumDifference), maximumExpirations);
+        }
+    }
+
+    @Test
+    public void testDroppedSyncsSpacing() {
+        Random random = new Random(0L); // arbitrary but deterministic seed
+        long iterations = 10000;
+        long offsetLagMax = 100;
+        // Half of the gaps will be offsetLagMax, and half will be double 
that, as if one intervening sync was dropped.
+        LongStream stream = random.doubles()
+                .mapToLong(d -> (d < 0.5 ? 2 : 1) * offsetLagMax)
+                .limit(iterations);
+        // This will cause up to 2 syncs to be discarded, because a sequence 
of two adjacent syncs followed by a
+        // dropped sync will set up the following situation
+        // before [d....d,c,b,a....]
+        // after  [e......e,d,a....]
+        // and syncs b and c are discarded to make room for e and the demoted 
sync d.
+        assertSyncSpacingHasBoundedExpirations(0, stream, 2);
+    }
+
+    /**
+     * Simulate an OffsetSyncStore receiving a sequence of offset syncs as 
defined by their start offset and gaps.
+     * After processing each simulated sync, assert that the store has not 
expired more unique syncs than the bound.
+     * @param firstOffset First offset to give to the sync store after starting
+     * @param steps A finite stream of gaps between syncs with some known 
distribution
+     * @param maximumExpirations The maximum number of distinct syncs allowed 
to be expired after a single update.
+     */
+    private void assertSyncSpacingHasBoundedExpirations(long firstOffset, 
LongStream steps, int maximumExpirations) {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            store.start();
+            PrimitiveIterator.OfLong iterator = steps.iterator();
+            long offset = firstOffset;
+            int lastCount = 1;
+            while (iterator.hasNext()) {
+                store.sync(tp, offset, offset);
+                // Invariant A: the latest sync is present
+                assertEquals(offset, store.syncFor(tp, 0).upstreamOffset());
+                // Invariant D: the earliest sync is present
+                assertEquals(firstOffset, store.syncFor(tp, 
63).upstreamOffset());
+                int count = countDistinctStoredSyncs(store, tp);
+                // We are adding one sync, so if the count didn't change, then 
exactly one sync expired.
+                int expiredSyncs = lastCount - count + 1;
+                assertTrue(expiredSyncs <= maximumExpirations,
+                        "Store expired too many syncs: " + expiredSyncs + " > 
" + maximumExpirations
+                                + " after receiving offset " + offset);
+                lastCount = count;
+                offset += iterator.nextLong();

Review Comment:
   I think we could address this pretty easily if we dropped our assertions for 
the case where the store has just started and has only synced `firstOffset`:
   ```java
   try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
       store.start();
       store.sync(tp, firstOffset, firstOffset);
       PrimitiveIterator.OfLong iterator = steps.iterator();
       long offset = firstOffset;
       int lastCount = 1;
       while (iterator.hasNext()) {
           offset += iterator.nextLong();
           assertTrue(offset >= 0, "Test is invalid, offset overflowed");
           store.sync(tp, offset, offset);
           // ...
   ```



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -153,31 +156,84 @@ public void testPastOffsetTranslation() {
     }
 
     @Test
-    public void testKeepMostDistinctSyncs() {
+    public void testConsistentlySpacedSyncs() {
         // Under normal operation, the incoming syncs will be regularly spaced 
and the store should keep a set of syncs
         // which provide the best translation accuracy (expires as few syncs 
as possible)
-        // Each new sync should be added to the cache and expire at most one 
other sync from the cache
-        long iterations = 10000;
+        long iterations = 100;
         long maxStep = Long.MAX_VALUE / iterations;
         // Test a variety of steps (corresponding to the offset.lag.max 
configuration)
         for (long step = 1; step < maxStep; step = (step * 2) + 1)  {
             for (long firstOffset = 0; firstOffset < 30; firstOffset++) {
-                try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
-                    int lastCount = 1;
-                    store.start();
-                    for (long offset = firstOffset; offset <= iterations; 
offset += step) {
-                        store.sync(tp, offset, offset);
-                        // Invariant A: the latest sync is present
-                        assertEquals(offset, store.syncFor(tp, 
0).upstreamOffset());
-                        // Invariant D: the earliest sync is present
-                        assertEquals(firstOffset, store.syncFor(tp, 
63).upstreamOffset());
-                        int count = countDistinctStoredSyncs(store, tp);
-                        int diff = count - lastCount;
-                        assertTrue(diff >= 0,
-                                "Store expired too many syncs: " + diff + " 
after receiving offset " + offset);
-                        lastCount = count;
-                    }
-                }
+                long finalStep = step;
+                // Generate a stream of consistently spaced syncs
+                // Each new sync should be added to the cache and expire at 
most one other sync from the cache
+                assertSyncSpacingHasBoundedExpirations(firstOffset, 
LongStream.generate(() -> finalStep).limit(iterations), 1);
+            }
+        }
+    }
+
+    @Test
+    public void testRandomlySpacedSyncs() {
+        Random random = new Random(0L); // arbitrary but deterministic seed
+        int iterationBits = 10;
+        long iterations = 1 << iterationBits;
+        for (int n = 1; n < Long.SIZE - iterationBits; n++) {
+            // A stream with at most n bits of difference between the largest 
and smallest steps
+            // will expire n + 2 syncs at once in the worst case, because the 
sync store is laid out exponentially.
+            long maximumDifference = 1L << n;
+            int maximumExpirations = n + 2;
+            assertSyncSpacingHasBoundedExpirations(0, random.longs(iterations, 
0L, maximumDifference), maximumExpirations);
+            // This holds true even if there is a larger minimum step size, 
such as caused by offsetLagMax
+            long offsetLagMax = 1L << 16;
+            assertSyncSpacingHasBoundedExpirations(0, random.longs(iterations, 
offsetLagMax, offsetLagMax + maximumDifference), maximumExpirations);
+        }
+    }
+
+    @Test
+    public void testDroppedSyncsSpacing() {
+        Random random = new Random(0L); // arbitrary but deterministic seed
+        long iterations = 10000;
+        long offsetLagMax = 100;
+        // Half of the gaps will be offsetLagMax, and half will be double 
that, as if one intervening sync was dropped.
+        LongStream stream = random.doubles()
+                .mapToLong(d -> (d < 0.5 ? 2 : 1) * offsetLagMax)
+                .limit(iterations);
+        // This will cause up to 2 syncs to be discarded, because a sequence 
of two adjacent syncs followed by a
+        // dropped sync will set up the following situation
+        // before [d....d,c,b,a....]
+        // after  [e......e,d,a....]
+        // and syncs b and c are discarded to make room for e and the demoted 
sync d.
+        assertSyncSpacingHasBoundedExpirations(0, stream, 2);
+    }
+
+    /**
+     * Simulate an OffsetSyncStore receiving a sequence of offset syncs as 
defined by their start offset and gaps.
+     * After processing each simulated sync, assert that the store has not 
expired more unique syncs than the bound.
+     * @param firstOffset First offset to give to the sync store after starting
+     * @param steps A finite stream of gaps between syncs with some known 
distribution
+     * @param maximumExpirations The maximum number of distinct syncs allowed 
to be expired after a single update.
+     */
+    private void assertSyncSpacingHasBoundedExpirations(long firstOffset, 
LongStream steps, int maximumExpirations) {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            store.start();
+            PrimitiveIterator.OfLong iterator = steps.iterator();
+            long offset = firstOffset;
+            int lastCount = 1;
+            while (iterator.hasNext()) {
+                store.sync(tp, offset, offset);
+                // Invariant A: the latest sync is present
+                assertEquals(offset, store.syncFor(tp, 0).upstreamOffset());
+                // Invariant D: the earliest sync is present
+                assertEquals(firstOffset, store.syncFor(tp, 
63).upstreamOffset());
+                int count = countDistinctStoredSyncs(store, tp);
+                // We are adding one sync, so if the count didn't change, then 
exactly one sync expired.
+                int expiredSyncs = lastCount - count + 1;
+                assertTrue(expiredSyncs <= maximumExpirations,
+                        "Store expired too many syncs: " + expiredSyncs + " > 
" + maximumExpirations
+                                + " after receiving offset " + offset);
+                lastCount = count;
+                offset += iterator.nextLong();

Review Comment:
   Doesn't this technically omit the final offset since we'll return to the top 
of the while loop and `iterator::hasNext` will return false?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to