This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.1 by this push:
     new 428fa1f445 Make queries visible to the "system_views.queries" virtual 
table at the coordinator level
428fa1f445 is described below

commit 428fa1f4453735ef43d508621e5e7d9e0a054415
Author: Caleb Rackliffe <calebrackli...@gmail.com>
AuthorDate: Fri Apr 19 18:40:08 2024 -0500

    Make queries visible to the "system_views.queries" virtual table at the 
coordinator level
    
    patch by Caleb Rackliffe; reviewed by David Capwell and Chris Lohfink for 
CASSANDRA-19577
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/concurrent/TaskFactory.java   |  20 +-
 .../apache/cassandra/db/virtual/QueriesTable.java  |   2 +-
 .../distributed/test/QueriesTableTest.java         | 221 ++++++++++++++++-----
 4 files changed, 192 insertions(+), 52 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 801c6f2e48..e4de315818 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1.5
+ * Make queries visible to the "system_views.queries" virtual table at the 
coordinator level (CASSANDRA-19577)
  * Concurrent equivalent schema updates lead to unresolved disagreement 
(CASSANDRA-19578)
  * Fix hints delivery for a node going down repeatedly (CASSANDRA-19495)
  * Do not go to disk for reading hints file sizes (CASSANDRA-19477)
diff --git a/src/java/org/apache/cassandra/concurrent/TaskFactory.java 
b/src/java/org/apache/cassandra/concurrent/TaskFactory.java
index faeabe6c4c..9ed84f71c0 100644
--- a/src/java/org/apache/cassandra/concurrent/TaskFactory.java
+++ b/src/java/org/apache/cassandra/concurrent/TaskFactory.java
@@ -32,7 +32,7 @@ import static 
org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 /**
  * A simple mechanism to impose our desired semantics on the execution of a 
task without requiring a specialised
  * executor service. We wrap tasks in a suitable {@link FutureTask} or 
encapsulating {@link Runnable}.
- *
+ * <p>
  * The encapsulations handle any exceptions in our standard way, as well as 
ensuring {@link ExecutorLocals} are
  * propagated in the case of {@link #localAware()}
  */
@@ -52,7 +52,7 @@ public interface TaskFactory
     static TaskFactory standard() { return Standard.INSTANCE; }
     static TaskFactory localAware() { return LocalAware.INSTANCE; }
 
-    public class Standard implements TaskFactory
+    class Standard implements TaskFactory
     {
         static final Standard INSTANCE = new Standard();
         protected Standard() {}
@@ -90,8 +90,8 @@ public interface TaskFactory
         @Override
         public <T> RunnableFuture<T> toSubmit(WithResources withResources, 
Runnable runnable)
         {
-            return withResources.isNoOp() ? newTask(callable(runnable))
-                                          : newTask(withResources, 
callable(runnable));
+            return withResources.isNoOp() ? newTask(runnable)
+                                          : newTask(withResources, runnable);
         }
 
         @Override
@@ -108,18 +108,28 @@ public interface TaskFactory
                                           : newTask(withResources, callable);
         }
 
+        protected <T> RunnableFuture<T> newTask(Runnable task)
+        {
+            return new FutureTask<>(task);
+        }
+
         protected <T> RunnableFuture<T> newTask(Callable<T> call)
         {
             return new FutureTask<>(call);
         }
 
+        protected <T> RunnableFuture<T> newTask(WithResources withResources, 
Runnable task)
+        {
+            return new FutureTaskWithResources<>(withResources, task);
+        }
+
         protected <T> RunnableFuture<T> newTask(WithResources withResources, 
Callable<T> call)
         {
             return new FutureTaskWithResources<>(withResources, call);
         }
     }
 
-    public class LocalAware extends Standard
+    class LocalAware extends Standard
     {
         static final LocalAware INSTANCE = new LocalAware();
 
diff --git a/src/java/org/apache/cassandra/db/virtual/QueriesTable.java 
b/src/java/org/apache/cassandra/db/virtual/QueriesTable.java
index aeba61c004..9031bbd70b 100644
--- a/src/java/org/apache/cassandra/db/virtual/QueriesTable.java
+++ b/src/java/org/apache/cassandra/db/virtual/QueriesTable.java
@@ -37,7 +37,7 @@ import static 
org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
  *
  *  thread_id                   | queued_micros |  running_micros | task
  * 
------------------------------+---------------+-----------------+--------------------------------------------------------------------------------
- *  Native-Transport-Requests-7 |         72923 |            7611 |            
          QUERY select * from system_views.queries; [pageSize = 100]
+ *  Native-Transport-Requests-7 |         72923 |            7611 |            
          QUERY SELECT * FROM system_views.queries; [pageSize = 100]
  *              MutationStage-2 |         18249 |            2084 | 
Mutation(keyspace='distributed_test_keyspace', key='000000f8', modifications...
  *                  ReadStage-2 |         72447 |           10121 |            
                             SELECT * FROM keyspace.table LIMIT 5000
  * </pre>
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java
index 09e56e0b61..b0c3902ad1 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java
@@ -18,72 +18,201 @@
 
 package org.apache.cassandra.distributed.test;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadExecutionController;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.Row;
 import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.utils.Throwables;
 
-import static org.assertj.core.api.Assertions.assertThat;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
 import static org.junit.Assert.assertTrue;
 
 public class QueriesTableTest extends TestBaseImpl
 {
-    public static final int ITERATIONS = 256;
+    private static Cluster SHARED_CLUSTER;
+    private static com.datastax.driver.core.Cluster DRIVER_CLUSTER;
+    private static Session SESSION;
+
+    @BeforeClass
+    public static void createCluster() throws IOException
+    {
+        SHARED_CLUSTER = 
init(Cluster.build(1).withInstanceInitializer(QueryDelayHelper::install)
+                                              .withConfig(c -> 
c.with(Feature.NATIVE_PROTOCOL, Feature.GOSSIP)).start());
+        DRIVER_CLUSTER = JavaDriverUtils.create(SHARED_CLUSTER);
+        SESSION = DRIVER_CLUSTER.connect();
+    }
+
+    @AfterClass
+    public static void closeCluster()
+    {
+        if (SESSION != null)
+            SESSION.close();
+
+        if (DRIVER_CLUSTER != null)
+            DRIVER_CLUSTER.close();
+
+        if (SHARED_CLUSTER != null)
+            SHARED_CLUSTER.close();
+    }
 
     @Test
     public void shouldExposeReadsAndWrites() throws Throwable
     {
-        try (Cluster cluster = init(Cluster.build(1).start()))
+        SHARED_CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (k int 
primary key, v int)");
+
+        boolean readVisible = false;
+        boolean coordinatorReadVisible = false;
+        boolean writeVisible = false;
+        boolean coordinatorWriteVisible = false;
+        
+        SESSION.executeAsync("INSERT INTO " + KEYSPACE + ".tbl (k, v) VALUES 
(0, 0)");
+        SESSION.executeAsync("SELECT * FROM " + KEYSPACE + ".tbl WHERE k = 0");
+
+        // Wait until the coordinator/local read and write are visible:
+        SimpleQueryResult result = 
SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * FROM 
system_views.queries");
+        while (result.toObjectArrays().length < 4)
+            result = SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * 
FROM system_views.queries");
+
+        while (result.hasNext())
+        {
+            Row row = result.next();
+            String threadId = row.get("thread_id").toString();
+            String task = row.get("task").toString();
+
+            readVisible |= threadId.contains("Read") && 
task.contains("SELECT");
+            coordinatorReadVisible |= 
threadId.contains("Native-Transport-Requests") && task.contains("SELECT");
+            writeVisible |= threadId.contains("Mutation") && 
task.contains("Mutation");
+            coordinatorWriteVisible |= 
threadId.contains("Native-Transport-Requests") && task.contains("INSERT");
+        }
+
+        // Issue another read and write to unblock the original queries in 
progress:
+        SESSION.execute("INSERT INTO " + KEYSPACE + ".tbl (k, v) VALUES (0, 
0)");
+        SESSION.execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE k = 0");
+
+        assertTrue(readVisible);
+        assertTrue(coordinatorReadVisible);
+        assertTrue(writeVisible);
+        assertTrue(coordinatorWriteVisible);
+
+        waitForQueriesToFinish();
+    }
+
+    @Test
+    public void shouldExposeCAS() throws Throwable
+    {
+        SHARED_CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".cas_tbl (k 
int primary key, v int)");
+
+        boolean readVisible = false;
+        boolean coordinatorUpdateVisible = false;
+
+        SESSION.executeAsync("UPDATE " + KEYSPACE + ".cas_tbl SET v = 10 WHERE 
k = 0 IF v = 0");
+
+        // Wait until the coordinator update and local read required by the 
CAS operation are visible:
+        SimpleQueryResult result = 
SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * FROM 
system_views.queries");
+        while (result.toObjectArrays().length < 2)
+            result = SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * 
FROM system_views.queries");
+
+        while (result.hasNext())
+        {
+            Row row = result.next();
+            String threadId = row.get("thread_id").toString();
+            String task = row.get("task").toString();
+
+            readVisible |= threadId.contains("Read") && 
task.contains("SELECT");
+            coordinatorUpdateVisible |= 
threadId.contains("Native-Transport-Requests") && task.contains("UPDATE");
+        }
+
+        // Issue a read to unblock the read generated by the original CAS 
operation:
+        SESSION.executeAsync("SELECT * FROM " + KEYSPACE + ".cas_tbl WHERE k = 
0");
+
+        assertTrue(readVisible);
+        assertTrue(coordinatorUpdateVisible);
+
+        waitForQueriesToFinish();
+    }
+
+    private static void waitForQueriesToFinish() throws InterruptedException
+    {
+        // Continue to query the "queries" table until nothing is in 
progress...
+        SimpleQueryResult result = 
SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * FROM 
system_views.queries");
+        while (result.hasNext())
         {
-            ExecutorService executor = Executors.newFixedThreadPool(16);
-            
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (k int 
primary key, v int)");
-
-            AtomicInteger reads = new AtomicInteger(0);
-            AtomicInteger writes = new AtomicInteger(0);
-            AtomicInteger paxos = new AtomicInteger(0);
-            
-            for (int i = 0; i < ITERATIONS; i++)
+            TimeUnit.SECONDS.sleep(1);
+            result = SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * 
FROM system_views.queries");
+        }
+    }
+
+    public static class QueryDelayHelper
+    {
+        private static final CyclicBarrier readBarrier = new CyclicBarrier(2);
+        private static final CyclicBarrier writeBarrier = new CyclicBarrier(2);
+
+        static void install(ClassLoader cl, int nodeNumber)
+        {
+            new ByteBuddy().rebase(Mutation.class)
+                           .method(named("apply").and(takesArguments(3)))
+                           
.intercept(MethodDelegation.to(QueryDelayHelper.class))
+                           .make()
+                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
+
+            new ByteBuddy().rebase(ReadCommand.class)
+                           
.method(named("executeLocally").and(takesArguments(1)))
+                           
.intercept(MethodDelegation.to(QueryDelayHelper.class))
+                           .make()
+                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
+        }
+
+        @SuppressWarnings("unused")
+        public static void apply(Keyspace keyspace, boolean durableWrites, 
boolean isDroppable, @SuperCall Callable<Void> zuper)
+        {
+            try
             {
-                int k = i;
-                executor.execute(() -> cluster.coordinator(1).execute("INSERT 
INTO " + KEYSPACE + ".tbl (k, v) VALUES (" + k + ", 0)", ConsistencyLevel.ALL));
-                executor.execute(() -> cluster.coordinator(1).execute("UPDATE 
" + KEYSPACE + ".tbl SET v = 10 WHERE k = " + (k - 1) + " IF v = 0", 
ConsistencyLevel.ALL));
-                executor.execute(() -> cluster.coordinator(1).execute("SELECT 
* FROM " + KEYSPACE + ".tbl WHERE k = " + (k - 1), ConsistencyLevel.ALL));
-
-                executor.execute(() ->
-                {
-                    SimpleQueryResult result = 
cluster.get(1).executeInternalWithResult("SELECT * FROM system_views.queries");
-                    
-                    while (result.hasNext())
-                    {
-                        Row row = result.next();
-                        String threadId = row.get("thread_id").toString();
-                        String task = row.get("task").toString();
-
-                        if (threadId.contains("Read") && 
task.contains("SELECT"))
-                            reads.incrementAndGet();
-                        else if (threadId.contains("Mutation") && 
task.contains("Mutation"))
-                            writes.incrementAndGet();
-                        else if (threadId.contains("Mutation") && 
task.contains("Paxos"))
-                            paxos.incrementAndGet();
-                    }
-                });
+                if (keyspace.getName().contains(KEYSPACE))
+                    writeBarrier.await();
+
+                zuper.call();
             }
+            catch (Exception e)
+            {
+                throw Throwables.unchecked(e);
+            }
+        }
 
-            executor.shutdown();
-            assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES));
-            
-            // We should see at least one read, write, and conditional update 
in the "queries" table.
-            
assertThat(reads.get()).isGreaterThan(0).isLessThanOrEqualTo(ITERATIONS);
-            
assertThat(writes.get()).isGreaterThan(0).isLessThanOrEqualTo(ITERATIONS);
-            
assertThat(paxos.get()).isGreaterThan(0).isLessThanOrEqualTo(ITERATIONS);
+        @SuppressWarnings("unused")
+        public static UnfilteredPartitionIterator 
executeLocally(ReadExecutionController executionController,
+                                                                 @SuperCall 
Callable<UnfilteredPartitionIterator> zuper)
+        {
+            try
+            {
+                if (executionController.metadata().keyspace.contains(KEYSPACE))
+                    readBarrier.await();
+
+                return zuper.call();
+            }
+            catch (Exception e)
+            {
+                throw Throwables.unchecked(e);
+            }
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to