This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c4b1c0614e Read/Write/Truncate throw RequestFailure in a race 
condition with callback timeouts, should return Timeout instead
c4b1c0614e is described below

commit c4b1c0614e42b4ea2064822d31c28aa5d4f1450a
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Fri Aug 19 16:42:56 2022 -0700

    Read/Write/Truncate throw RequestFailure in a race condition with callback 
timeouts, should return Timeout instead
    
    patch by David Capwell; reviewed by Caleb Rackliffe for CASSANDRA-17828
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/net/RequestCallback.java  |  17 ++
 .../service/AbstractWriteResponseHandler.java      |  40 ++--
 .../cassandra/service/TruncateResponseHandler.java |  29 ++-
 .../cassandra/service/reads/ReadCallback.java      |  13 +-
 .../test/metrics/RequestTimeoutTest.java           | 241 +++++++++++++++++++++
 .../org/apache/cassandra/utils/AssertionUtils.java | 124 +++++++++++
 .../apache/cassandra/utils/AssertionUtilsTest.java |  45 ++++
 8 files changed, 483 insertions(+), 27 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 36beb3c27f..3fd1a8c747 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.2
+ * Read/Write/Truncate throw RequestFailure in a race condition with callback 
timeouts, should return Timeout instead (CASSANDRA-17828)
  * Add ability to log load profiles at fixed intervals (CASSANDRA-17821)
  * Protect against Gossip backing up due to a quarantined endpoint without 
version information (CASSANDRA-17830)
  * NPE in org.apache.cassandra.cql3.Attributes.getTimeToLive (CASSANDRA-17822)
diff --git a/src/java/org/apache/cassandra/net/RequestCallback.java 
b/src/java/org/apache/cassandra/net/RequestCallback.java
index bd14cae1d0..14e0169b85 100644
--- a/src/java/org/apache/cassandra/net/RequestCallback.java
+++ b/src/java/org/apache/cassandra/net/RequestCallback.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.net;
 
+import java.util.Map;
+
 import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.locator.InetAddressAndPort;
 
@@ -63,4 +65,19 @@ public interface RequestCallback<T>
         return false;
     }
 
+    static boolean isTimeout(Map<InetAddressAndPort, RequestFailureReason> 
failureReasonByEndpoint)
+    {
+        // The reason that all must be timeout to be called a timeout is as 
follows
+        // Assume RF=6, QUORUM, and failureReasonByEndpoint.size() == 3
+        // R1 -> TIMEOUT
+        // R2 -> TIMEOUT
+        // R3 -> READ_TOO_MANY_TOMBSTONES
+        // Since we got a reply back, and that was a failure, we should return 
a failure letting the user know.
+        // When all failures are a timeout, then this is a race condition with
+        // org.apache.cassandra.utils.concurrent.Awaitable.await(long, 
java.util.concurrent.TimeUnit)
+        // The race is that the message expire path runs and expires all 
messages, this then casues the condition
+        // to signal telling the caller "got all replies!".
+        return 
failureReasonByEndpoint.values().stream().allMatch(RequestFailureReason.TIMEOUT::equals);
+    }
+
 }
diff --git 
a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 4d75f19bca..76ad4c2ff8 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -17,12 +17,16 @@
  */
 package org.apache.cassandra.service;
 
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.Function;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import javax.annotation.Nullable;
 
@@ -113,34 +117,42 @@ public abstract class AbstractWriteResponseHandler<T> 
implements RequestCallback
     {
         long timeoutNanos = currentTimeoutNanos();
 
-        boolean success;
+        boolean signaled;
         try
         {
-            success = condition.await(timeoutNanos, NANOSECONDS);
+            signaled = condition.await(timeoutNanos, NANOSECONDS);
         }
         catch (InterruptedException e)
         {
             throw new UncheckedInterruptedException(e);
         }
 
-        if (!success)
-        {
-            int blockedFor = blockFor();
-            int acks = ackCount();
-            // It's pretty unlikely, but we can race between exiting await 
above and here, so
-            // that we could now have enough acks. In that case, we "lie" on 
the acks count to
-            // avoid sending confusing info to the user (see CASSANDRA-6491).
-            if (acks >= blockedFor)
-                acks = blockedFor - 1;
-            throw new WriteTimeoutException(writeType, 
replicaPlan.consistencyLevel(), acks, blockedFor);
-        }
+        if (!signaled)
+            throwTimeout();
 
         if (blockFor() + failures > candidateReplicaCount())
         {
-            throw new WriteFailureException(replicaPlan.consistencyLevel(), 
ackCount(), blockFor(), writeType, failureReasonByEndpoint);
+            if 
(RequestCallback.isTimeout(this.failureReasonByEndpoint.keySet().stream()
+                                                                      
.filter(this::waitingFor) // DatacenterWriteResponseHandler filters errors from 
remote DCs
+                                                                      
.collect(Collectors.toMap(Function.identity(), 
this.failureReasonByEndpoint::get))))
+                throwTimeout();
+
+            throw new WriteFailureException(replicaPlan.consistencyLevel(), 
ackCount(), blockFor(), writeType, this.failureReasonByEndpoint);
         }
     }
 
+    private void throwTimeout()
+    {
+        int blockedFor = blockFor();
+        int acks = ackCount();
+        // It's pretty unlikely, but we can race between exiting await above 
and here, so
+        // that we could now have enough acks. In that case, we "lie" on the 
acks count to
+        // avoid sending confusing info to the user (see CASSANDRA-6491).
+        if (acks >= blockedFor)
+            acks = blockedFor - 1;
+        throw new WriteTimeoutException(writeType, 
replicaPlan.consistencyLevel(), acks, blockedFor);
+    }
+
     public final long currentTimeoutNanos()
     {
         long requestTimeout = writeType == COUNTER
diff --git a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java 
b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
index 984ba5a10a..54b1241006 100644
--- a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
@@ -17,7 +17,9 @@
  */
 package org.apache.cassandra.service;
 
-import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -46,7 +48,7 @@ public class TruncateResponseHandler implements 
RequestCallback<TruncateResponse
     private final int responseCount;
     protected final AtomicInteger responses = new AtomicInteger(0);
     private final long start;
-    private volatile InetAddress truncateFailingReplica;
+    private final Map<InetAddressAndPort, RequestFailureReason> 
failureReasonByEndpoint = new ConcurrentHashMap<>();
 
     public TruncateResponseHandler(int responseCount)
     {
@@ -61,24 +63,31 @@ public class TruncateResponseHandler implements 
RequestCallback<TruncateResponse
     public void get() throws TimeoutException
     {
         long timeoutNanos = getTruncateRpcTimeout(NANOSECONDS) - (nanoTime() - 
start);
-        boolean completedInTime;
+        boolean signaled;
         try
         {
-            completedInTime = condition.await(timeoutNanos, NANOSECONDS); // 
TODO truncate needs a much longer timeout
+            signaled = condition.await(timeoutNanos, NANOSECONDS); // TODO 
truncate needs a much longer timeout
         }
         catch (InterruptedException e)
         {
             throw new UncheckedInterruptedException(e);
         }
 
-        if (!completedInTime)
-        {
+        if (!signaled)
             throw new TimeoutException("Truncate timed out - received only " + 
responses.get() + " responses");
-        }
 
-        if (truncateFailingReplica != null)
+        if (!failureReasonByEndpoint.isEmpty())
         {
-            throw new TruncateException("Truncate failed on replica " + 
truncateFailingReplica);
+            // clone to make sure no race condition happens
+            Map<InetAddressAndPort, RequestFailureReason> 
failureReasonByEndpoint = new HashMap<>(this.failureReasonByEndpoint);
+            if (RequestCallback.isTimeout(failureReasonByEndpoint))
+                throw new TimeoutException("Truncate timed out - received only 
" + responses.get() + " responses");
+
+            StringBuilder sb = new StringBuilder("Truncate failed on ");
+            for (Map.Entry<InetAddressAndPort, RequestFailureReason> e : 
failureReasonByEndpoint.entrySet())
+                sb.append("replica ").append(e.getKey()).append(" -> 
").append(e.getValue()).append(", ");
+            sb.setLength(sb.length() - 2);
+            throw new TruncateException(sb.toString());
         }
     }
 
@@ -94,7 +103,7 @@ public class TruncateResponseHandler implements 
RequestCallback<TruncateResponse
     public void onFailure(InetAddressAndPort from, RequestFailureReason 
failureReason)
     {
         // If the truncation hasn't succeeded on some replica, abort and 
indicate this back to the client.
-        truncateFailingReplica = from.getAddress();
+        failureReasonByEndpoint.put(from, failureReason);
         condition.signalAll();
     }
 
diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java 
b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
index e69e6bd2b9..c25b1f0f02 100644
--- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.service.reads;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -120,6 +121,12 @@ public class ReadCallback<E extends Endpoints<E>, P 
extends ReplicaPlan.ForRead<
          */
         int received = resolver.responses.size();
         boolean failed = failures > 0 && (blockFor > received || 
!resolver.isDataPresent());
+        // If all messages came back as a TIMEOUT then signaled=true and 
failed=true.
+        // Need to distinguish between a timeout and a failure (network, bad 
data, etc.), so store an extra field.
+        // see CASSANDRA-17828
+        boolean timedout = !signaled;
+        if (failed)
+            timedout = RequestCallback.isTimeout(new 
HashMap<>(failureReasonByEndpoint));
         WarningContext warnings = warningContext;
         // save the snapshot so abort state is not changed between now and 
when mayAbort gets called
         WarningsSnapshot snapshot = null;
@@ -138,19 +145,19 @@ public class ReadCallback<E extends Endpoints<E>, P 
extends ReplicaPlan.ForRead<
         if (isTracing())
         {
             String gotData = received > 0 ? (resolver.isDataPresent() ? " 
(including data)" : " (only digests)") : "";
-            Tracing.trace("{}; received {} of {} responses{}", failed ? 
"Failed" : "Timed out", received, blockFor, gotData);
+            Tracing.trace("{}; received {} of {} responses{}", !timedout ? 
"Failed" : "Timed out", received, blockFor, gotData);
         }
         else if (logger.isDebugEnabled())
         {
             String gotData = received > 0 ? (resolver.isDataPresent() ? " 
(including data)" : " (only digests)") : "";
-            logger.debug("{}; received {} of {} responses{}", failed ? 
"Failed" : "Timed out", received, blockFor, gotData);
+            logger.debug("{}; received {} of {} responses{}", !timedout ? 
"Failed" : "Timed out", received, blockFor, gotData);
         }
 
         if (snapshot != null)
             snapshot.maybeAbort(command, replicaPlan().consistencyLevel(), 
received, blockFor, resolver.isDataPresent(), failureReasonByEndpoint);
 
         // Same as for writes, see AbstractWriteResponseHandler
-        throw failed
+        throw !timedout
             ? new ReadFailureException(replicaPlan().consistencyLevel(), 
received, blockFor, resolver.isDataPresent(), failureReasonByEndpoint)
             : new ReadTimeoutException(replicaPlan().consistencyLevel(), 
received, blockFor, resolver.isDataPresent());
     }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/metrics/RequestTimeoutTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/metrics/RequestTimeoutTest.java
new file mode 100644
index 0000000000..2799fca66a
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/metrics/RequestTimeoutTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.metrics;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import net.bytebuddy.implementation.bind.annotation.SuperMethod;
+import net.bytebuddy.implementation.bind.annotation.This;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.cql3.statements.BatchStatement;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.utils.AssertionUtils;
+import org.apache.cassandra.exceptions.CasWriteTimeoutException;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.service.paxos.Paxos;
+import org.apache.cassandra.utils.concurrent.Awaitable;
+import org.apache.cassandra.utils.concurrent.Condition;
+import org.assertj.core.api.Assertions;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.apache.cassandra.utils.AssertionUtils.isThrowable;
+
+public class RequestTimeoutTest extends TestBaseImpl
+{
+    private static final AtomicInteger NEXT = new AtomicInteger(0);
+    public static final int COORDINATOR = 1;
+    private static Cluster CLUSTER;
+
+    @BeforeClass
+    public static void init() throws IOException
+    {
+        CLUSTER = Cluster.build(3)
+                         .withConfig(c -> c.set("truncate_request_timeout", 
"10s"))
+                         .withInstanceInitializer(BB::install)
+                         .start();
+        init(CLUSTER);
+        CLUSTER.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int PRIMARY 
KEY, v int)"));
+    }
+
+    @AfterClass
+    public static void cleanup()
+    {
+        if (CLUSTER != null)
+            CLUSTER.close();
+    }
+
+    @Before
+    public void before()
+    {
+        CLUSTER.get(COORDINATOR).runOnInstance(() -> 
MessagingService.instance().callbacks.unsafeClear());
+        CLUSTER.filters().reset();
+        BB.reset();
+    }
+
+    @Test
+    public void insert()
+    {
+        CLUSTER.filters().verbs(Verb.MUTATION_REQ.id).to(2).drop();
+        Assertions.assertThatThrownBy(() -> 
CLUSTER.coordinator(COORDINATOR).execute(withKeyspace("INSERT INTO %s.tbl (pk, 
v) VALUES (?, ?)"), ConsistencyLevel.ALL, NEXT.getAndIncrement(), 
NEXT.getAndIncrement()))
+                  .is(isThrowable(WriteTimeoutException.class));
+        BB.assertIsTimeoutTrue();
+    }
+
+    @Test
+    public void update()
+    {
+        CLUSTER.filters().verbs(Verb.MUTATION_REQ.id).to(2).drop();
+        Assertions.assertThatThrownBy(() -> 
CLUSTER.coordinator(COORDINATOR).execute(withKeyspace("UPDATE %s.tbl SET v=? 
WHERE pk=?"), ConsistencyLevel.ALL, NEXT.getAndIncrement(), 
NEXT.getAndIncrement()))
+                  .is(isThrowable(WriteTimeoutException.class));
+        BB.assertIsTimeoutTrue();
+    }
+
+    @Test
+    public void batchInsert()
+    {
+        CLUSTER.filters().verbs(Verb.MUTATION_REQ.id).to(2).drop();
+        Assertions.assertThatThrownBy(() -> 
CLUSTER.coordinator(COORDINATOR).execute(batch(withKeyspace("INSERT INTO %s.tbl 
(pk, v) VALUES (?, ?)")), ConsistencyLevel.ALL, NEXT.getAndIncrement(), 
NEXT.getAndIncrement()))
+                  .is(isThrowable(WriteTimeoutException.class));
+        BB.assertIsTimeoutTrue();
+    }
+
+    @Test
+    public void rangeSelect()
+    {
+        CLUSTER.filters().verbs(Verb.RANGE_REQ.id).to(2).drop();
+        Assertions.assertThatThrownBy(() -> 
CLUSTER.coordinator(COORDINATOR).execute(withKeyspace("SELECT * FROM %s.tbl"), 
ConsistencyLevel.ALL))
+                  .is(isThrowable(ReadTimeoutException.class));
+        BB.assertIsTimeoutTrue();
+    }
+
+    @Test
+    public void select()
+    {
+        CLUSTER.filters().verbs(Verb.READ_REQ.id).to(2).drop();
+        Assertions.assertThatThrownBy(() -> 
CLUSTER.coordinator(COORDINATOR).execute(withKeyspace("SELECT * FROM %s.tbl 
WHERE pk=?"), ConsistencyLevel.ALL, NEXT.getAndIncrement()))
+                  .is(isThrowable(ReadTimeoutException.class));
+        BB.assertIsTimeoutTrue();
+    }
+
+    @Test
+    public void truncate()
+    {
+        CLUSTER.filters().verbs(Verb.TRUNCATE_REQ.id).to(2).drop();
+        Assertions.assertThatThrownBy(() -> 
CLUSTER.coordinator(COORDINATOR).execute(withKeyspace("TRUNCATE %s.tbl"), 
ConsistencyLevel.ALL))
+                  .is(AssertionUtils.rootCauseIs(TimeoutException.class));
+        BB.assertIsTimeoutTrue();
+    }
+
+    // don't call BB.assertIsTimeoutTrue(); for CAS, as it has its own logic
+
+    @Test
+    public void casV2PrepareInsert()
+    {
+        withPaxos(Config.PaxosVariant.v2);
+
+        CLUSTER.filters().verbs(Verb.PAXOS2_PREPARE_REQ.id).to(2, 3).drop();
+        Assertions.assertThatThrownBy(() -> 
CLUSTER.coordinator(COORDINATOR).execute(withKeyspace("INSERT INTO %s.tbl (pk, 
v) VALUES (?, ?) IF NOT EXISTS"), ConsistencyLevel.ALL, NEXT.getAndIncrement(), 
NEXT.getAndIncrement()))
+                  .is(isThrowable(CasWriteTimeoutException.class));
+    }
+
+    @Test
+    public void casV2PrepareSelect()
+    {
+        withPaxos(Config.PaxosVariant.v2);
+
+        CLUSTER.filters().verbs(Verb.PAXOS2_PREPARE_REQ.id).to(2, 3).drop();
+        Assertions.assertThatThrownBy(() -> 
CLUSTER.coordinator(COORDINATOR).execute(withKeyspace("SELECT * FROM %s.tbl 
WHERE pk=?"), ConsistencyLevel.SERIAL, NEXT.getAndIncrement()))
+                  .is(isThrowable(ReadTimeoutException.class)); // why does 
write have its own type but not read?
+    }
+
+    @Test
+    public void casV2CommitInsert()
+    {
+        withPaxos(Config.PaxosVariant.v2);
+
+        CLUSTER.filters().verbs(Verb.PAXOS_COMMIT_REQ.id).to(2, 3).drop();
+        Assertions.assertThatThrownBy(() -> 
CLUSTER.coordinator(COORDINATOR).execute(withKeyspace("INSERT INTO %s.tbl (pk, 
v) VALUES (?, ?) IF NOT EXISTS"), ConsistencyLevel.ALL, NEXT.getAndIncrement(), 
NEXT.getAndIncrement()))
+                  .is(isThrowable(CasWriteTimeoutException.class));
+    }
+
+    private static void withPaxos(Config.PaxosVariant variant)
+    {
+        CLUSTER.forEach(i -> i.runOnInstance(() -> 
Paxos.setPaxosVariant(variant)));
+    }
+
+    private static String batch(String cql)
+    {
+        return "BEGIN " + BatchStatement.Type.UNLOGGED.name() + " BATCH\n" + 
cql + "\nAPPLY BATCH";
+    }
+
+    public static class BB
+    {
+        public static void install(ClassLoader cl, int num)
+        {
+            if (num != COORDINATOR)
+                return;
+            new ByteBuddy().rebase(Condition.Async.class)
+                           .method(named("await").and(takesArguments(2)))
+                           .intercept(MethodDelegation.to(BB.class))
+                           .make()
+                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
+
+            new ByteBuddy().rebase(RequestCallback.class)
+                           .method(named("isTimeout"))
+                           .intercept(MethodDelegation.to(BB.class))
+                           .make()
+                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
+        }
+
+        public static boolean await(long time, TimeUnit units, @This Awaitable 
self, @SuperMethod Method method) throws InterruptedException, 
InvocationTargetException, IllegalAccessException
+        {
+            // make sure that the underline condition is met before returnning 
true
+            // this way its know that the timeouts triggered!
+            while (!((boolean) method.invoke(self, time, units)))
+            {
+            }
+            return true;
+        }
+
+        private static final AtomicInteger TIMEOUTS = new AtomicInteger(0);
+        public static boolean isTimeout(Map<InetAddressAndPort, 
RequestFailureReason> failureReasonByEndpoint, @SuperCall Callable<Boolean> fn) 
throws Exception
+        {
+            boolean timeout = fn.call();
+            if (timeout)
+                TIMEOUTS.incrementAndGet();
+            return timeout;
+        }
+
+        public static void assertIsTimeoutTrue()
+        {
+            int timeouts = CLUSTER.get(COORDINATOR).callOnInstance(() -> 
TIMEOUTS.getAndSet(0));
+            Assertions.assertThat(timeouts).isGreaterThan(0);
+        }
+
+        public static void reset()
+        {
+            CLUSTER.get(COORDINATOR).runOnInstance(() -> TIMEOUTS.set(0));
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/utils/AssertionUtils.java 
b/test/unit/org/apache/cassandra/utils/AssertionUtils.java
new file mode 100644
index 0000000000..d5b1981fc1
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/AssertionUtils.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import com.google.common.base.Throwables;
+
+import org.assertj.core.api.Condition;
+
+public class AssertionUtils
+{
+    private AssertionUtils()
+    {
+    }
+
+    /**
+     * When working with jvm-dtest the thrown error is in a different {@link 
ClassLoader} causing type checks
+     * to fail; this method relies on naming instead.
+     */
+    public static Condition<Object> is(Class<?> klass)
+    {
+        String name = klass.getCanonicalName();
+        return new Condition<Object>() {
+            @Override
+            public boolean matches(Object value)
+            {
+                return value.getClass().getCanonicalName().equals(name);
+            }
+
+            @Override
+            public String toString()
+            {
+                return name;
+            }
+        };
+    }
+
+    public static <T extends Throwable> Condition<Throwable> 
isThrowable(Class<T> klass)
+    {
+        // org.assertj.core.api.AbstractAssert.is has <? super ? extends 
Throwable> which blocks <T>, so need to
+        // always return Throwable
+        return (Condition<Throwable>) (Condition<?>) is(klass);
+    }
+
+    /**
+     * When working with jvm-dtest the thrown error is in a different {@link 
ClassLoader} causing type checks
+     * to fail; this method relies on naming instead.
+     *
+     * This method is different than {@link #is(Class)} as it tries to mimic 
instanceOf rather than equality.
+     */
+    public static Condition<Object> isInstanceof(Class<?> klass)
+    {
+        String name = klass.getCanonicalName();
+        return new Condition<Object>() {
+            @Override
+            public boolean matches(Object value)
+            {
+                if (value == null)
+                    return false;
+                return matches(value.getClass());
+            }
+
+            private boolean matches(Class<?> input)
+            {
+                for (Class<?> klass = input; klass != null; klass = 
klass.getSuperclass())
+                {
+                    // extends
+                    if (klass.getCanonicalName().equals(name))
+                        return true;
+                    // implements
+                    for (Class<?> i : klass.getInterfaces())
+                    {
+                        if (matches(i))
+                            return true;
+                    }
+                }
+                return false;
+            }
+
+            @Override
+            public String toString()
+            {
+                return name;
+            }
+        };
+    }
+
+    public static Condition<Throwable> rootCause(Condition<Throwable> other)
+    {
+        return new Condition<Throwable>() {
+            @Override
+            public boolean matches(Throwable value)
+            {
+                return other.matches(Throwables.getRootCause(value));
+            }
+
+            @Override
+            public String toString()
+            {
+                return "Root cause " + other;
+            }
+        };
+    }
+
+    public static Condition<Throwable> rootCauseIs(Class<? extends Throwable> 
klass)
+    {
+        return rootCause((Condition<Throwable>) (Condition<?>) is(klass));
+    }
+}
diff --git a/test/unit/org/apache/cassandra/utils/AssertionUtilsTest.java 
b/test/unit/org/apache/cassandra/utils/AssertionUtilsTest.java
new file mode 100644
index 0000000000..e3ec93ab48
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/AssertionUtilsTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import org.junit.Test;
+
+import org.assertj.core.api.Assertions;
+
+public class AssertionUtilsTest
+{
+    @Test
+    public void isInstanceof()
+    {
+        Assertions.assertThat(new C())
+                  .is(AssertionUtils.isInstanceof(A.class));
+
+        Assertions.assertThat(new D())
+                  .is(AssertionUtils.isInstanceof(A.class))
+                  .is(AssertionUtils.isInstanceof(B.class));
+
+        Assertions.assertThat(null instanceof A)
+                  
.isEqualTo(AssertionUtils.isInstanceof(A.class).matches(null));
+    }
+
+    interface A {}
+    interface B extends A {}
+    static class C implements A {}
+    static class D implements B {}
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to