This is an automated email from the ASF dual-hosted git repository.

blerer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e08053b  Fix race-conditions in ConnectionTesti
e08053b is described below

commit e08053b77cac4ec91fd398d7bad65bba1394f45f
Author: yifan-c <yc25c...@gmail.com>
AuthorDate: Fri Mar 13 11:30:43 2020 -0700

    Fix race-conditions in ConnectionTesti
    
    patch by Yifan Cai; reviewed by Benjamin Lerer for CASSANDRA-15630
---
 test/unit/org/apache/cassandra/Util.java           | 22 +++++---
 .../org/apache/cassandra/net/ConnectionUtils.java  | 65 ++++++++++------------
 2 files changed, 44 insertions(+), 43 deletions(-)

diff --git a/test/unit/org/apache/cassandra/Util.java 
b/test/unit/org/apache/cassandra/Util.java
index 3dcaff7..c989407 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -41,6 +42,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import afu.org.checkerframework.checker.oigj.qual.O;
 import org.apache.cassandra.db.compaction.ActiveCompactionsTracker;
 import org.apache.cassandra.db.compaction.CompactionTasks;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -579,18 +581,24 @@ public class Util
         }
     }
 
-    public static void spinAssertEquals(Object expected, Supplier<Object> s, 
int timeoutInSeconds)
+    public static void spinAssertEquals(Object expected, Supplier<Object> 
actualSupplier, int timeoutInSeconds)
     {
-        long start = System.currentTimeMillis();
-        Object lastValue = null;
-        while (System.currentTimeMillis() < start + (1000 * timeoutInSeconds))
+        spinAssertEquals(null, expected, actualSupplier, timeoutInSeconds, 
TimeUnit.SECONDS);
+    }
+
+    public static <T> void spinAssertEquals(String message, T expected, 
Supplier<? extends T> actualSupplier, long timeout, TimeUnit timeUnit)
+    {
+        long startNano = System.nanoTime();
+        long expireAtNano = startNano + timeUnit.toNanos(timeout);
+        T actual = null;
+        while (System.nanoTime() < expireAtNano)
         {
-            lastValue = s.get();
-            if (lastValue.equals(expected))
+            actual = actualSupplier.get();
+            if (actual.equals(expected))
                 break;
             Thread.yield();
         }
-        assertEquals(expected, lastValue);
+        assertEquals(message, expected, actual);
     }
 
     public static void joinThread(Thread thread) throws InterruptedException
diff --git a/test/unit/org/apache/cassandra/net/ConnectionUtils.java 
b/test/unit/org/apache/cassandra/net/ConnectionUtils.java
index e391785..5aff390 100644
--- a/test/unit/org/apache/cassandra/net/ConnectionUtils.java
+++ b/test/unit/org/apache/cassandra/net/ConnectionUtils.java
@@ -18,19 +18,17 @@
 
 package org.apache.cassandra.net;
 
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
-import com.google.common.util.concurrent.Uninterruptibles;
-import org.junit.Assert;
-
-import org.apache.cassandra.net.InboundMessageHandlers;
-import org.apache.cassandra.net.OutboundConnection;
+import static org.apache.cassandra.Util.spinAssertEquals;
 
 public class ConnectionUtils
 {
     public interface FailCheck
     {
-        public void accept(String message, long expected, long actual);
+        public void accept(String message, Long expected, Supplier<Long> 
actualSupplier);
     }
 
     public static class OutboundCountChecker
@@ -98,44 +96,44 @@ public class ConnectionUtils
 
         public void check()
         {
-            doCheck(Assert::assertEquals);
+            doCheck((message, expected, actual) -> spinAssertEquals(message, 
expected, actual, 5, TimeUnit.SECONDS));
         }
 
         public void check(FailCheck failCheck)
         {
-            doCheck((message, expect, actual) -> { if (expect != actual) 
failCheck.accept(message, expect, actual); });
+            doCheck((message, expect, actual) -> { if (!Objects.equals(expect, 
actual.get())) failCheck.accept(message, expect, actual); });
         }
 
         private void doCheck(FailCheck testAndFailCheck)
         {
             if (checkSubmitted)
             {
-                testAndFailCheck.accept("submitted count values don't match", 
submitted, connection.submittedCount());
+                testAndFailCheck.accept("submitted count values don't match", 
submitted, connection::submittedCount);
             }
             if (checkPending)
             {
-                testAndFailCheck.accept("pending count values don't match", 
pending, connection.pendingCount());
-                testAndFailCheck.accept("pending bytes values don't match", 
pendingBytes, connection.pendingBytes());
+                testAndFailCheck.accept("pending count values don't match", 
pending, () -> (long) connection.pendingCount());
+                testAndFailCheck.accept("pending bytes values don't match", 
pendingBytes, connection::pendingBytes);
             }
             if (checkSent)
             {
-                testAndFailCheck.accept("sent count values don't match", sent, 
connection.sentCount());
-                testAndFailCheck.accept("sent bytes values don't match", 
sentBytes, connection.sentBytes());
+                testAndFailCheck.accept("sent count values don't match", sent, 
connection::sentCount);
+                testAndFailCheck.accept("sent bytes values don't match", 
sentBytes, connection::sentBytes);
             }
             if (checkOverload)
             {
-                testAndFailCheck.accept("overload count values don't match", 
overload, connection.overloadedCount());
-                testAndFailCheck.accept("overload bytes values don't match", 
overloadBytes, connection.overloadedBytes());
+                testAndFailCheck.accept("overload count values don't match", 
overload, connection::overloadedCount);
+                testAndFailCheck.accept("overload bytes values don't match", 
overloadBytes, connection::overloadedBytes);
             }
             if (checkExpired)
             {
-                testAndFailCheck.accept("expired count values don't match", 
expired, connection.expiredCount());
-                testAndFailCheck.accept("expired bytes values don't match", 
expiredBytes, connection.expiredBytes());
+                testAndFailCheck.accept("expired count values don't match", 
expired, connection::expiredCount);
+                testAndFailCheck.accept("expired bytes values don't match", 
expiredBytes, connection::expiredBytes);
             }
             if (checkError)
             {
-                testAndFailCheck.accept("error count values don't match", 
error, connection.errorCount());
-                testAndFailCheck.accept("error bytes values don't match", 
errorBytes, connection.errorBytes());
+                testAndFailCheck.accept("error count values don't match", 
error, connection::errorCount);
+                testAndFailCheck.accept("error bytes values don't match", 
errorBytes, connection::errorBytes);
             }
         }
     }
@@ -197,45 +195,40 @@ public class ConnectionUtils
 
         public void check()
         {
-            doCheck(Assert::assertEquals);
+            doCheck((message, expected, actual) -> spinAssertEquals(message, 
expected, actual, 5, TimeUnit.SECONDS));
         }
 
         public void check(FailCheck failCheck)
         {
-            doCheck((message, expect, actual) -> { if (expect != actual) 
failCheck.accept(message, expect, actual); });
+            doCheck((message, expect, actual) -> { if (!Objects.equals(expect, 
actual.get())) failCheck.accept(message, expect, actual); });
         }
 
         private void doCheck(FailCheck testAndFailCheck)
         {
             if (checkReceived)
             {
-                testAndFailCheck.accept("received count values don't match", 
received, connection.receivedCount());
-                testAndFailCheck.accept("received bytes values don't match", 
receivedBytes, connection.receivedBytes());
+                testAndFailCheck.accept("received count values don't match", 
received, connection::receivedCount);
+                testAndFailCheck.accept("received bytes values don't match", 
receivedBytes, connection::receivedBytes);
             }
             if (checkProcessed)
             {
-                testAndFailCheck.accept("processed count values don't match", 
processed, connection.processedCount());
-                testAndFailCheck.accept("processed bytes values don't match", 
processedBytes, connection.processedBytes());
+                testAndFailCheck.accept("processed count values don't match", 
processed, connection::processedCount);
+                testAndFailCheck.accept("processed bytes values don't match", 
processedBytes, connection::processedBytes);
             }
             if (checkExpired)
             {
-                testAndFailCheck.accept("expired count values don't match", 
expired, connection.expiredCount());
-                testAndFailCheck.accept("expired bytes values don't match", 
expiredBytes, connection.expiredBytes());
+                testAndFailCheck.accept("expired count values don't match", 
expired, connection::expiredCount);
+                testAndFailCheck.accept("expired bytes values don't match", 
expiredBytes, connection::expiredBytes);
             }
             if (checkError)
             {
-                testAndFailCheck.accept("error count values don't match", 
error, connection.errorCount());
-                testAndFailCheck.accept("error bytes values don't match", 
errorBytes, connection.errorBytes());
+                testAndFailCheck.accept("error count values don't match", 
error, connection::errorCount);
+                testAndFailCheck.accept("error bytes values don't match", 
errorBytes, connection::errorBytes);
             }
             if (checkScheduled)
             {
-                // scheduled cannot relied upon to not race with completion of 
the task,
-                // so if it is currently above the value we expect, sleep for 
a bit
-                if (scheduled < connection.scheduledCount())
-                    for (int i = 0; i < 10 && scheduled < 
connection.scheduledCount() ; ++i)
-                        Uninterruptibles.sleepUninterruptibly(1L, 
TimeUnit.MILLISECONDS);
-                testAndFailCheck.accept("scheduled count values don't match", 
scheduled, connection.scheduledCount());
-                testAndFailCheck.accept("scheduled bytes values don't match", 
scheduledBytes, connection.scheduledBytes());
+                testAndFailCheck.accept("scheduled count values don't match", 
scheduled, connection::scheduledCount);
+                testAndFailCheck.accept("scheduled bytes values don't match", 
scheduledBytes, connection::scheduledBytes);
             }
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to