This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch cassandra-4.1 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.1 by this push: new 428fa1f445 Make queries visible to the "system_views.queries" virtual table at the coordinator level 428fa1f445 is described below commit 428fa1f4453735ef43d508621e5e7d9e0a054415 Author: Caleb Rackliffe <calebrackli...@gmail.com> AuthorDate: Fri Apr 19 18:40:08 2024 -0500 Make queries visible to the "system_views.queries" virtual table at the coordinator level patch by Caleb Rackliffe; reviewed by David Capwell and Chris Lohfink for CASSANDRA-19577 --- CHANGES.txt | 1 + .../apache/cassandra/concurrent/TaskFactory.java | 20 +- .../apache/cassandra/db/virtual/QueriesTable.java | 2 +- .../distributed/test/QueriesTableTest.java | 221 ++++++++++++++++----- 4 files changed, 192 insertions(+), 52 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 801c6f2e48..e4de315818 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.1.5 + * Make queries visible to the "system_views.queries" virtual table at the coordinator level (CASSANDRA-19577) * Concurrent equivalent schema updates lead to unresolved disagreement (CASSANDRA-19578) * Fix hints delivery for a node going down repeatedly (CASSANDRA-19495) * Do not go to disk for reading hints file sizes (CASSANDRA-19477) diff --git a/src/java/org/apache/cassandra/concurrent/TaskFactory.java b/src/java/org/apache/cassandra/concurrent/TaskFactory.java index faeabe6c4c..9ed84f71c0 100644 --- a/src/java/org/apache/cassandra/concurrent/TaskFactory.java +++ b/src/java/org/apache/cassandra/concurrent/TaskFactory.java @@ -32,7 +32,7 @@ import static org.apache.cassandra.utils.Shared.Scope.SIMULATION; /** * A simple mechanism to impose our desired semantics on the execution of a task without requiring a specialised * executor service. We wrap tasks in a suitable {@link FutureTask} or encapsulating {@link Runnable}. - * + * <p> * The encapsulations handle any exceptions in our standard way, as well as ensuring {@link ExecutorLocals} are * propagated in the case of {@link #localAware()} */ @@ -52,7 +52,7 @@ public interface TaskFactory static TaskFactory standard() { return Standard.INSTANCE; } static TaskFactory localAware() { return LocalAware.INSTANCE; } - public class Standard implements TaskFactory + class Standard implements TaskFactory { static final Standard INSTANCE = new Standard(); protected Standard() {} @@ -90,8 +90,8 @@ public interface TaskFactory @Override public <T> RunnableFuture<T> toSubmit(WithResources withResources, Runnable runnable) { - return withResources.isNoOp() ? newTask(callable(runnable)) - : newTask(withResources, callable(runnable)); + return withResources.isNoOp() ? newTask(runnable) + : newTask(withResources, runnable); } @Override @@ -108,18 +108,28 @@ public interface TaskFactory : newTask(withResources, callable); } + protected <T> RunnableFuture<T> newTask(Runnable task) + { + return new FutureTask<>(task); + } + protected <T> RunnableFuture<T> newTask(Callable<T> call) { return new FutureTask<>(call); } + protected <T> RunnableFuture<T> newTask(WithResources withResources, Runnable task) + { + return new FutureTaskWithResources<>(withResources, task); + } + protected <T> RunnableFuture<T> newTask(WithResources withResources, Callable<T> call) { return new FutureTaskWithResources<>(withResources, call); } } - public class LocalAware extends Standard + class LocalAware extends Standard { static final LocalAware INSTANCE = new LocalAware(); diff --git a/src/java/org/apache/cassandra/db/virtual/QueriesTable.java b/src/java/org/apache/cassandra/db/virtual/QueriesTable.java index aeba61c004..9031bbd70b 100644 --- a/src/java/org/apache/cassandra/db/virtual/QueriesTable.java +++ b/src/java/org/apache/cassandra/db/virtual/QueriesTable.java @@ -37,7 +37,7 @@ import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; * * thread_id | queued_micros | running_micros | task * ------------------------------+---------------+-----------------+-------------------------------------------------------------------------------- - * Native-Transport-Requests-7 | 72923 | 7611 | QUERY select * from system_views.queries; [pageSize = 100] + * Native-Transport-Requests-7 | 72923 | 7611 | QUERY SELECT * FROM system_views.queries; [pageSize = 100] * MutationStage-2 | 18249 | 2084 | Mutation(keyspace='distributed_test_keyspace', key='000000f8', modifications... * ReadStage-2 | 72447 | 10121 | SELECT * FROM keyspace.table LIMIT 5000 * </pre> diff --git a/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java b/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java index 09e56e0b61..b0c3902ad1 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java @@ -18,72 +18,201 @@ package org.apache.cassandra.distributed.test; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; +import com.datastax.driver.core.Session; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.ReadExecutionController; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.distributed.Cluster; -import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.Row; import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.utils.Throwables; -import static org.assertj.core.api.Assertions.assertThat; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import static org.junit.Assert.assertTrue; public class QueriesTableTest extends TestBaseImpl { - public static final int ITERATIONS = 256; + private static Cluster SHARED_CLUSTER; + private static com.datastax.driver.core.Cluster DRIVER_CLUSTER; + private static Session SESSION; + + @BeforeClass + public static void createCluster() throws IOException + { + SHARED_CLUSTER = init(Cluster.build(1).withInstanceInitializer(QueryDelayHelper::install) + .withConfig(c -> c.with(Feature.NATIVE_PROTOCOL, Feature.GOSSIP)).start()); + DRIVER_CLUSTER = JavaDriverUtils.create(SHARED_CLUSTER); + SESSION = DRIVER_CLUSTER.connect(); + } + + @AfterClass + public static void closeCluster() + { + if (SESSION != null) + SESSION.close(); + + if (DRIVER_CLUSTER != null) + DRIVER_CLUSTER.close(); + + if (SHARED_CLUSTER != null) + SHARED_CLUSTER.close(); + } @Test public void shouldExposeReadsAndWrites() throws Throwable { - try (Cluster cluster = init(Cluster.build(1).start())) + SHARED_CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (k int primary key, v int)"); + + boolean readVisible = false; + boolean coordinatorReadVisible = false; + boolean writeVisible = false; + boolean coordinatorWriteVisible = false; + + SESSION.executeAsync("INSERT INTO " + KEYSPACE + ".tbl (k, v) VALUES (0, 0)"); + SESSION.executeAsync("SELECT * FROM " + KEYSPACE + ".tbl WHERE k = 0"); + + // Wait until the coordinator/local read and write are visible: + SimpleQueryResult result = SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * FROM system_views.queries"); + while (result.toObjectArrays().length < 4) + result = SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * FROM system_views.queries"); + + while (result.hasNext()) + { + Row row = result.next(); + String threadId = row.get("thread_id").toString(); + String task = row.get("task").toString(); + + readVisible |= threadId.contains("Read") && task.contains("SELECT"); + coordinatorReadVisible |= threadId.contains("Native-Transport-Requests") && task.contains("SELECT"); + writeVisible |= threadId.contains("Mutation") && task.contains("Mutation"); + coordinatorWriteVisible |= threadId.contains("Native-Transport-Requests") && task.contains("INSERT"); + } + + // Issue another read and write to unblock the original queries in progress: + SESSION.execute("INSERT INTO " + KEYSPACE + ".tbl (k, v) VALUES (0, 0)"); + SESSION.execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE k = 0"); + + assertTrue(readVisible); + assertTrue(coordinatorReadVisible); + assertTrue(writeVisible); + assertTrue(coordinatorWriteVisible); + + waitForQueriesToFinish(); + } + + @Test + public void shouldExposeCAS() throws Throwable + { + SHARED_CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".cas_tbl (k int primary key, v int)"); + + boolean readVisible = false; + boolean coordinatorUpdateVisible = false; + + SESSION.executeAsync("UPDATE " + KEYSPACE + ".cas_tbl SET v = 10 WHERE k = 0 IF v = 0"); + + // Wait until the coordinator update and local read required by the CAS operation are visible: + SimpleQueryResult result = SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * FROM system_views.queries"); + while (result.toObjectArrays().length < 2) + result = SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * FROM system_views.queries"); + + while (result.hasNext()) + { + Row row = result.next(); + String threadId = row.get("thread_id").toString(); + String task = row.get("task").toString(); + + readVisible |= threadId.contains("Read") && task.contains("SELECT"); + coordinatorUpdateVisible |= threadId.contains("Native-Transport-Requests") && task.contains("UPDATE"); + } + + // Issue a read to unblock the read generated by the original CAS operation: + SESSION.executeAsync("SELECT * FROM " + KEYSPACE + ".cas_tbl WHERE k = 0"); + + assertTrue(readVisible); + assertTrue(coordinatorUpdateVisible); + + waitForQueriesToFinish(); + } + + private static void waitForQueriesToFinish() throws InterruptedException + { + // Continue to query the "queries" table until nothing is in progress... + SimpleQueryResult result = SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * FROM system_views.queries"); + while (result.hasNext()) { - ExecutorService executor = Executors.newFixedThreadPool(16); - - cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (k int primary key, v int)"); - - AtomicInteger reads = new AtomicInteger(0); - AtomicInteger writes = new AtomicInteger(0); - AtomicInteger paxos = new AtomicInteger(0); - - for (int i = 0; i < ITERATIONS; i++) + TimeUnit.SECONDS.sleep(1); + result = SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * FROM system_views.queries"); + } + } + + public static class QueryDelayHelper + { + private static final CyclicBarrier readBarrier = new CyclicBarrier(2); + private static final CyclicBarrier writeBarrier = new CyclicBarrier(2); + + static void install(ClassLoader cl, int nodeNumber) + { + new ByteBuddy().rebase(Mutation.class) + .method(named("apply").and(takesArguments(3))) + .intercept(MethodDelegation.to(QueryDelayHelper.class)) + .make() + .load(cl, ClassLoadingStrategy.Default.INJECTION); + + new ByteBuddy().rebase(ReadCommand.class) + .method(named("executeLocally").and(takesArguments(1))) + .intercept(MethodDelegation.to(QueryDelayHelper.class)) + .make() + .load(cl, ClassLoadingStrategy.Default.INJECTION); + } + + @SuppressWarnings("unused") + public static void apply(Keyspace keyspace, boolean durableWrites, boolean isDroppable, @SuperCall Callable<Void> zuper) + { + try { - int k = i; - executor.execute(() -> cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (k, v) VALUES (" + k + ", 0)", ConsistencyLevel.ALL)); - executor.execute(() -> cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 10 WHERE k = " + (k - 1) + " IF v = 0", ConsistencyLevel.ALL)); - executor.execute(() -> cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE k = " + (k - 1), ConsistencyLevel.ALL)); - - executor.execute(() -> - { - SimpleQueryResult result = cluster.get(1).executeInternalWithResult("SELECT * FROM system_views.queries"); - - while (result.hasNext()) - { - Row row = result.next(); - String threadId = row.get("thread_id").toString(); - String task = row.get("task").toString(); - - if (threadId.contains("Read") && task.contains("SELECT")) - reads.incrementAndGet(); - else if (threadId.contains("Mutation") && task.contains("Mutation")) - writes.incrementAndGet(); - else if (threadId.contains("Mutation") && task.contains("Paxos")) - paxos.incrementAndGet(); - } - }); + if (keyspace.getName().contains(KEYSPACE)) + writeBarrier.await(); + + zuper.call(); } + catch (Exception e) + { + throw Throwables.unchecked(e); + } + } - executor.shutdown(); - assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES)); - - // We should see at least one read, write, and conditional update in the "queries" table. - assertThat(reads.get()).isGreaterThan(0).isLessThanOrEqualTo(ITERATIONS); - assertThat(writes.get()).isGreaterThan(0).isLessThanOrEqualTo(ITERATIONS); - assertThat(paxos.get()).isGreaterThan(0).isLessThanOrEqualTo(ITERATIONS); + @SuppressWarnings("unused") + public static UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController, + @SuperCall Callable<UnfilteredPartitionIterator> zuper) + { + try + { + if (executionController.metadata().keyspace.contains(KEYSPACE)) + readBarrier.await(); + + return zuper.call(); + } + catch (Exception e) + { + throw Throwables.unchecked(e); + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org