This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new c4b1c0614e Read/Write/Truncate throw RequestFailure in a race condition with callback timeouts, should return Timeout instead c4b1c0614e is described below commit c4b1c0614e42b4ea2064822d31c28aa5d4f1450a Author: David Capwell <dcapw...@apache.org> AuthorDate: Fri Aug 19 16:42:56 2022 -0700 Read/Write/Truncate throw RequestFailure in a race condition with callback timeouts, should return Timeout instead patch by David Capwell; reviewed by Caleb Rackliffe for CASSANDRA-17828 --- CHANGES.txt | 1 + .../org/apache/cassandra/net/RequestCallback.java | 17 ++ .../service/AbstractWriteResponseHandler.java | 40 ++-- .../cassandra/service/TruncateResponseHandler.java | 29 ++- .../cassandra/service/reads/ReadCallback.java | 13 +- .../test/metrics/RequestTimeoutTest.java | 241 +++++++++++++++++++++ .../org/apache/cassandra/utils/AssertionUtils.java | 124 +++++++++++ .../apache/cassandra/utils/AssertionUtilsTest.java | 45 ++++ 8 files changed, 483 insertions(+), 27 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 36beb3c27f..3fd1a8c747 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.2 + * Read/Write/Truncate throw RequestFailure in a race condition with callback timeouts, should return Timeout instead (CASSANDRA-17828) * Add ability to log load profiles at fixed intervals (CASSANDRA-17821) * Protect against Gossip backing up due to a quarantined endpoint without version information (CASSANDRA-17830) * NPE in org.apache.cassandra.cql3.Attributes.getTimeToLive (CASSANDRA-17822) diff --git a/src/java/org/apache/cassandra/net/RequestCallback.java b/src/java/org/apache/cassandra/net/RequestCallback.java index bd14cae1d0..14e0169b85 100644 --- a/src/java/org/apache/cassandra/net/RequestCallback.java +++ b/src/java/org/apache/cassandra/net/RequestCallback.java @@ -17,6 +17,8 @@ */ package org.apache.cassandra.net; +import java.util.Map; + import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.locator.InetAddressAndPort; @@ -63,4 +65,19 @@ public interface RequestCallback<T> return false; } + static boolean isTimeout(Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint) + { + // The reason that all must be timeout to be called a timeout is as follows + // Assume RF=6, QUORUM, and failureReasonByEndpoint.size() == 3 + // R1 -> TIMEOUT + // R2 -> TIMEOUT + // R3 -> READ_TOO_MANY_TOMBSTONES + // Since we got a reply back, and that was a failure, we should return a failure letting the user know. + // When all failures are a timeout, then this is a race condition with + // org.apache.cassandra.utils.concurrent.Awaitable.await(long, java.util.concurrent.TimeUnit) + // The race is that the message expire path runs and expires all messages, this then casues the condition + // to signal telling the caller "got all replies!". + return failureReasonByEndpoint.values().stream().allMatch(RequestFailureReason.TIMEOUT::equals); + } + } diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java index 4d75f19bca..76ad4c2ff8 100644 --- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java @@ -17,12 +17,16 @@ */ package org.apache.cassandra.service; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -113,34 +117,42 @@ public abstract class AbstractWriteResponseHandler<T> implements RequestCallback { long timeoutNanos = currentTimeoutNanos(); - boolean success; + boolean signaled; try { - success = condition.await(timeoutNanos, NANOSECONDS); + signaled = condition.await(timeoutNanos, NANOSECONDS); } catch (InterruptedException e) { throw new UncheckedInterruptedException(e); } - if (!success) - { - int blockedFor = blockFor(); - int acks = ackCount(); - // It's pretty unlikely, but we can race between exiting await above and here, so - // that we could now have enough acks. In that case, we "lie" on the acks count to - // avoid sending confusing info to the user (see CASSANDRA-6491). - if (acks >= blockedFor) - acks = blockedFor - 1; - throw new WriteTimeoutException(writeType, replicaPlan.consistencyLevel(), acks, blockedFor); - } + if (!signaled) + throwTimeout(); if (blockFor() + failures > candidateReplicaCount()) { - throw new WriteFailureException(replicaPlan.consistencyLevel(), ackCount(), blockFor(), writeType, failureReasonByEndpoint); + if (RequestCallback.isTimeout(this.failureReasonByEndpoint.keySet().stream() + .filter(this::waitingFor) // DatacenterWriteResponseHandler filters errors from remote DCs + .collect(Collectors.toMap(Function.identity(), this.failureReasonByEndpoint::get)))) + throwTimeout(); + + throw new WriteFailureException(replicaPlan.consistencyLevel(), ackCount(), blockFor(), writeType, this.failureReasonByEndpoint); } } + private void throwTimeout() + { + int blockedFor = blockFor(); + int acks = ackCount(); + // It's pretty unlikely, but we can race between exiting await above and here, so + // that we could now have enough acks. In that case, we "lie" on the acks count to + // avoid sending confusing info to the user (see CASSANDRA-6491). + if (acks >= blockedFor) + acks = blockedFor - 1; + throw new WriteTimeoutException(writeType, replicaPlan.consistencyLevel(), acks, blockedFor); + } + public final long currentTimeoutNanos() { long requestTimeout = writeType == COUNTER diff --git a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java index 984ba5a10a..54b1241006 100644 --- a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java +++ b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java @@ -17,7 +17,9 @@ */ package org.apache.cassandra.service; -import java.net.InetAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -46,7 +48,7 @@ public class TruncateResponseHandler implements RequestCallback<TruncateResponse private final int responseCount; protected final AtomicInteger responses = new AtomicInteger(0); private final long start; - private volatile InetAddress truncateFailingReplica; + private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint = new ConcurrentHashMap<>(); public TruncateResponseHandler(int responseCount) { @@ -61,24 +63,31 @@ public class TruncateResponseHandler implements RequestCallback<TruncateResponse public void get() throws TimeoutException { long timeoutNanos = getTruncateRpcTimeout(NANOSECONDS) - (nanoTime() - start); - boolean completedInTime; + boolean signaled; try { - completedInTime = condition.await(timeoutNanos, NANOSECONDS); // TODO truncate needs a much longer timeout + signaled = condition.await(timeoutNanos, NANOSECONDS); // TODO truncate needs a much longer timeout } catch (InterruptedException e) { throw new UncheckedInterruptedException(e); } - if (!completedInTime) - { + if (!signaled) throw new TimeoutException("Truncate timed out - received only " + responses.get() + " responses"); - } - if (truncateFailingReplica != null) + if (!failureReasonByEndpoint.isEmpty()) { - throw new TruncateException("Truncate failed on replica " + truncateFailingReplica); + // clone to make sure no race condition happens + Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint = new HashMap<>(this.failureReasonByEndpoint); + if (RequestCallback.isTimeout(failureReasonByEndpoint)) + throw new TimeoutException("Truncate timed out - received only " + responses.get() + " responses"); + + StringBuilder sb = new StringBuilder("Truncate failed on "); + for (Map.Entry<InetAddressAndPort, RequestFailureReason> e : failureReasonByEndpoint.entrySet()) + sb.append("replica ").append(e.getKey()).append(" -> ").append(e.getValue()).append(", "); + sb.setLength(sb.length() - 2); + throw new TruncateException(sb.toString()); } } @@ -94,7 +103,7 @@ public class TruncateResponseHandler implements RequestCallback<TruncateResponse public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason) { // If the truncation hasn't succeeded on some replica, abort and indicate this back to the client. - truncateFailingReplica = from.getAddress(); + failureReasonByEndpoint.put(from, failureReason); condition.signalAll(); } diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java b/src/java/org/apache/cassandra/service/reads/ReadCallback.java index e69e6bd2b9..c25b1f0f02 100644 --- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.service.reads; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -120,6 +121,12 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead< */ int received = resolver.responses.size(); boolean failed = failures > 0 && (blockFor > received || !resolver.isDataPresent()); + // If all messages came back as a TIMEOUT then signaled=true and failed=true. + // Need to distinguish between a timeout and a failure (network, bad data, etc.), so store an extra field. + // see CASSANDRA-17828 + boolean timedout = !signaled; + if (failed) + timedout = RequestCallback.isTimeout(new HashMap<>(failureReasonByEndpoint)); WarningContext warnings = warningContext; // save the snapshot so abort state is not changed between now and when mayAbort gets called WarningsSnapshot snapshot = null; @@ -138,19 +145,19 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead< if (isTracing()) { String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : ""; - Tracing.trace("{}; received {} of {} responses{}", failed ? "Failed" : "Timed out", received, blockFor, gotData); + Tracing.trace("{}; received {} of {} responses{}", !timedout ? "Failed" : "Timed out", received, blockFor, gotData); } else if (logger.isDebugEnabled()) { String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : ""; - logger.debug("{}; received {} of {} responses{}", failed ? "Failed" : "Timed out", received, blockFor, gotData); + logger.debug("{}; received {} of {} responses{}", !timedout ? "Failed" : "Timed out", received, blockFor, gotData); } if (snapshot != null) snapshot.maybeAbort(command, replicaPlan().consistencyLevel(), received, blockFor, resolver.isDataPresent(), failureReasonByEndpoint); // Same as for writes, see AbstractWriteResponseHandler - throw failed + throw !timedout ? new ReadFailureException(replicaPlan().consistencyLevel(), received, blockFor, resolver.isDataPresent(), failureReasonByEndpoint) : new ReadTimeoutException(replicaPlan().consistencyLevel(), received, blockFor, resolver.isDataPresent()); } diff --git a/test/distributed/org/apache/cassandra/distributed/test/metrics/RequestTimeoutTest.java b/test/distributed/org/apache/cassandra/distributed/test/metrics/RequestTimeoutTest.java new file mode 100644 index 0000000000..2799fca66a --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/metrics/RequestTimeoutTest.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.metrics; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import net.bytebuddy.implementation.bind.annotation.SuperMethod; +import net.bytebuddy.implementation.bind.annotation.This; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.cql3.statements.BatchStatement; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.RequestCallback; +import org.apache.cassandra.utils.AssertionUtils; +import org.apache.cassandra.exceptions.CasWriteTimeoutException; +import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.net.Verb; +import org.apache.cassandra.service.paxos.Paxos; +import org.apache.cassandra.utils.concurrent.Awaitable; +import org.apache.cassandra.utils.concurrent.Condition; +import org.assertj.core.api.Assertions; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; +import static org.apache.cassandra.utils.AssertionUtils.isThrowable; + +public class RequestTimeoutTest extends TestBaseImpl +{ + private static final AtomicInteger NEXT = new AtomicInteger(0); + public static final int COORDINATOR = 1; + private static Cluster CLUSTER; + + @BeforeClass + public static void init() throws IOException + { + CLUSTER = Cluster.build(3) + .withConfig(c -> c.set("truncate_request_timeout", "10s")) + .withInstanceInitializer(BB::install) + .start(); + init(CLUSTER); + CLUSTER.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int PRIMARY KEY, v int)")); + } + + @AfterClass + public static void cleanup() + { + if (CLUSTER != null) + CLUSTER.close(); + } + + @Before + public void before() + { + CLUSTER.get(COORDINATOR).runOnInstance(() -> MessagingService.instance().callbacks.unsafeClear()); + CLUSTER.filters().reset(); + BB.reset(); + } + + @Test + public void insert() + { + CLUSTER.filters().verbs(Verb.MUTATION_REQ.id).to(2).drop(); + Assertions.assertThatThrownBy(() -> CLUSTER.coordinator(COORDINATOR).execute(withKeyspace("INSERT INTO %s.tbl (pk, v) VALUES (?, ?)"), ConsistencyLevel.ALL, NEXT.getAndIncrement(), NEXT.getAndIncrement())) + .is(isThrowable(WriteTimeoutException.class)); + BB.assertIsTimeoutTrue(); + } + + @Test + public void update() + { + CLUSTER.filters().verbs(Verb.MUTATION_REQ.id).to(2).drop(); + Assertions.assertThatThrownBy(() -> CLUSTER.coordinator(COORDINATOR).execute(withKeyspace("UPDATE %s.tbl SET v=? WHERE pk=?"), ConsistencyLevel.ALL, NEXT.getAndIncrement(), NEXT.getAndIncrement())) + .is(isThrowable(WriteTimeoutException.class)); + BB.assertIsTimeoutTrue(); + } + + @Test + public void batchInsert() + { + CLUSTER.filters().verbs(Verb.MUTATION_REQ.id).to(2).drop(); + Assertions.assertThatThrownBy(() -> CLUSTER.coordinator(COORDINATOR).execute(batch(withKeyspace("INSERT INTO %s.tbl (pk, v) VALUES (?, ?)")), ConsistencyLevel.ALL, NEXT.getAndIncrement(), NEXT.getAndIncrement())) + .is(isThrowable(WriteTimeoutException.class)); + BB.assertIsTimeoutTrue(); + } + + @Test + public void rangeSelect() + { + CLUSTER.filters().verbs(Verb.RANGE_REQ.id).to(2).drop(); + Assertions.assertThatThrownBy(() -> CLUSTER.coordinator(COORDINATOR).execute(withKeyspace("SELECT * FROM %s.tbl"), ConsistencyLevel.ALL)) + .is(isThrowable(ReadTimeoutException.class)); + BB.assertIsTimeoutTrue(); + } + + @Test + public void select() + { + CLUSTER.filters().verbs(Verb.READ_REQ.id).to(2).drop(); + Assertions.assertThatThrownBy(() -> CLUSTER.coordinator(COORDINATOR).execute(withKeyspace("SELECT * FROM %s.tbl WHERE pk=?"), ConsistencyLevel.ALL, NEXT.getAndIncrement())) + .is(isThrowable(ReadTimeoutException.class)); + BB.assertIsTimeoutTrue(); + } + + @Test + public void truncate() + { + CLUSTER.filters().verbs(Verb.TRUNCATE_REQ.id).to(2).drop(); + Assertions.assertThatThrownBy(() -> CLUSTER.coordinator(COORDINATOR).execute(withKeyspace("TRUNCATE %s.tbl"), ConsistencyLevel.ALL)) + .is(AssertionUtils.rootCauseIs(TimeoutException.class)); + BB.assertIsTimeoutTrue(); + } + + // don't call BB.assertIsTimeoutTrue(); for CAS, as it has its own logic + + @Test + public void casV2PrepareInsert() + { + withPaxos(Config.PaxosVariant.v2); + + CLUSTER.filters().verbs(Verb.PAXOS2_PREPARE_REQ.id).to(2, 3).drop(); + Assertions.assertThatThrownBy(() -> CLUSTER.coordinator(COORDINATOR).execute(withKeyspace("INSERT INTO %s.tbl (pk, v) VALUES (?, ?) IF NOT EXISTS"), ConsistencyLevel.ALL, NEXT.getAndIncrement(), NEXT.getAndIncrement())) + .is(isThrowable(CasWriteTimeoutException.class)); + } + + @Test + public void casV2PrepareSelect() + { + withPaxos(Config.PaxosVariant.v2); + + CLUSTER.filters().verbs(Verb.PAXOS2_PREPARE_REQ.id).to(2, 3).drop(); + Assertions.assertThatThrownBy(() -> CLUSTER.coordinator(COORDINATOR).execute(withKeyspace("SELECT * FROM %s.tbl WHERE pk=?"), ConsistencyLevel.SERIAL, NEXT.getAndIncrement())) + .is(isThrowable(ReadTimeoutException.class)); // why does write have its own type but not read? + } + + @Test + public void casV2CommitInsert() + { + withPaxos(Config.PaxosVariant.v2); + + CLUSTER.filters().verbs(Verb.PAXOS_COMMIT_REQ.id).to(2, 3).drop(); + Assertions.assertThatThrownBy(() -> CLUSTER.coordinator(COORDINATOR).execute(withKeyspace("INSERT INTO %s.tbl (pk, v) VALUES (?, ?) IF NOT EXISTS"), ConsistencyLevel.ALL, NEXT.getAndIncrement(), NEXT.getAndIncrement())) + .is(isThrowable(CasWriteTimeoutException.class)); + } + + private static void withPaxos(Config.PaxosVariant variant) + { + CLUSTER.forEach(i -> i.runOnInstance(() -> Paxos.setPaxosVariant(variant))); + } + + private static String batch(String cql) + { + return "BEGIN " + BatchStatement.Type.UNLOGGED.name() + " BATCH\n" + cql + "\nAPPLY BATCH"; + } + + public static class BB + { + public static void install(ClassLoader cl, int num) + { + if (num != COORDINATOR) + return; + new ByteBuddy().rebase(Condition.Async.class) + .method(named("await").and(takesArguments(2))) + .intercept(MethodDelegation.to(BB.class)) + .make() + .load(cl, ClassLoadingStrategy.Default.INJECTION); + + new ByteBuddy().rebase(RequestCallback.class) + .method(named("isTimeout")) + .intercept(MethodDelegation.to(BB.class)) + .make() + .load(cl, ClassLoadingStrategy.Default.INJECTION); + } + + public static boolean await(long time, TimeUnit units, @This Awaitable self, @SuperMethod Method method) throws InterruptedException, InvocationTargetException, IllegalAccessException + { + // make sure that the underline condition is met before returnning true + // this way its know that the timeouts triggered! + while (!((boolean) method.invoke(self, time, units))) + { + } + return true; + } + + private static final AtomicInteger TIMEOUTS = new AtomicInteger(0); + public static boolean isTimeout(Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint, @SuperCall Callable<Boolean> fn) throws Exception + { + boolean timeout = fn.call(); + if (timeout) + TIMEOUTS.incrementAndGet(); + return timeout; + } + + public static void assertIsTimeoutTrue() + { + int timeouts = CLUSTER.get(COORDINATOR).callOnInstance(() -> TIMEOUTS.getAndSet(0)); + Assertions.assertThat(timeouts).isGreaterThan(0); + } + + public static void reset() + { + CLUSTER.get(COORDINATOR).runOnInstance(() -> TIMEOUTS.set(0)); + } + } +} diff --git a/test/unit/org/apache/cassandra/utils/AssertionUtils.java b/test/unit/org/apache/cassandra/utils/AssertionUtils.java new file mode 100644 index 0000000000..d5b1981fc1 --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/AssertionUtils.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils; + +import com.google.common.base.Throwables; + +import org.assertj.core.api.Condition; + +public class AssertionUtils +{ + private AssertionUtils() + { + } + + /** + * When working with jvm-dtest the thrown error is in a different {@link ClassLoader} causing type checks + * to fail; this method relies on naming instead. + */ + public static Condition<Object> is(Class<?> klass) + { + String name = klass.getCanonicalName(); + return new Condition<Object>() { + @Override + public boolean matches(Object value) + { + return value.getClass().getCanonicalName().equals(name); + } + + @Override + public String toString() + { + return name; + } + }; + } + + public static <T extends Throwable> Condition<Throwable> isThrowable(Class<T> klass) + { + // org.assertj.core.api.AbstractAssert.is has <? super ? extends Throwable> which blocks <T>, so need to + // always return Throwable + return (Condition<Throwable>) (Condition<?>) is(klass); + } + + /** + * When working with jvm-dtest the thrown error is in a different {@link ClassLoader} causing type checks + * to fail; this method relies on naming instead. + * + * This method is different than {@link #is(Class)} as it tries to mimic instanceOf rather than equality. + */ + public static Condition<Object> isInstanceof(Class<?> klass) + { + String name = klass.getCanonicalName(); + return new Condition<Object>() { + @Override + public boolean matches(Object value) + { + if (value == null) + return false; + return matches(value.getClass()); + } + + private boolean matches(Class<?> input) + { + for (Class<?> klass = input; klass != null; klass = klass.getSuperclass()) + { + // extends + if (klass.getCanonicalName().equals(name)) + return true; + // implements + for (Class<?> i : klass.getInterfaces()) + { + if (matches(i)) + return true; + } + } + return false; + } + + @Override + public String toString() + { + return name; + } + }; + } + + public static Condition<Throwable> rootCause(Condition<Throwable> other) + { + return new Condition<Throwable>() { + @Override + public boolean matches(Throwable value) + { + return other.matches(Throwables.getRootCause(value)); + } + + @Override + public String toString() + { + return "Root cause " + other; + } + }; + } + + public static Condition<Throwable> rootCauseIs(Class<? extends Throwable> klass) + { + return rootCause((Condition<Throwable>) (Condition<?>) is(klass)); + } +} diff --git a/test/unit/org/apache/cassandra/utils/AssertionUtilsTest.java b/test/unit/org/apache/cassandra/utils/AssertionUtilsTest.java new file mode 100644 index 0000000000..e3ec93ab48 --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/AssertionUtilsTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils; + +import org.junit.Test; + +import org.assertj.core.api.Assertions; + +public class AssertionUtilsTest +{ + @Test + public void isInstanceof() + { + Assertions.assertThat(new C()) + .is(AssertionUtils.isInstanceof(A.class)); + + Assertions.assertThat(new D()) + .is(AssertionUtils.isInstanceof(A.class)) + .is(AssertionUtils.isInstanceof(B.class)); + + Assertions.assertThat(null instanceof A) + .isEqualTo(AssertionUtils.isInstanceof(A.class).matches(null)); + } + + interface A {} + interface B extends A {} + static class C implements A {} + static class D implements B {} +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org