maedhroz commented on a change in pull request #1221:
URL: https://github.com/apache/cassandra/pull/1221#discussion_r727686236



##########
File path: src/java/org/apache/cassandra/schema/Schema.java
##########
@@ -85,25 +85,34 @@ private Schema()
      * 
      * See CASSANDRA-16856/16996. Make sure schema pulls are synchronized to 
prevent concurrent schema pull/writes
      */
-    public static synchronized void saveSystemKeyspace()
+    public static void saveSystemKeyspace()
     {
-        SchemaKeyspace.saveSystemKeyspacesSchema();
+        synchronized(instance)
+        {
+            SchemaKeyspace.saveSystemKeyspacesSchema();
+        }
     }
 
     /**
      * See CASSANDRA-16856/16996. Make sure schema pulls are synchronized to 
prevent concurrent schema pull/writes
      */
-    public static synchronized void truncateSystemKeyspace()
+    public static void truncateSystemKeyspace()
     {
-        SchemaKeyspace.truncate();
+        synchronized(instance)
+        {
+            SchemaKeyspace.truncate();
+        }
     }
 
     /**
      * See CASSANDRA-16856/16996. Make sure schema pulls are synchronized to 
prevent concurrent schema pull/writes
      */
-    public static synchronized Collection<Mutation> schemaKeyspaceAsMutations()
+    public static Collection<Mutation> schemaKeyspaceAsMutations()
     {
-        return SchemaKeyspace.convertSchemaToMutations();
+        synchronized(instance)
+        {
+            return SchemaKeyspace.convertSchemaToMutations();
+        }

Review comment:
       nit: It seems like it would be much less invasive to leave the 
`synchronized` keyword in the method signatures and make them instance methods 
(then access via `Schema.instance` like the existing synchronized methods)?

##########
File path: src/java/org/apache/cassandra/schema/Schema.java
##########
@@ -85,25 +85,34 @@ private Schema()
      * 
      * See CASSANDRA-16856/16996. Make sure schema pulls are synchronized to 
prevent concurrent schema pull/writes
      */
-    public static synchronized void saveSystemKeyspace()
+    public static void saveSystemKeyspace()
     {
-        SchemaKeyspace.saveSystemKeyspacesSchema();
+        synchronized(instance)
+        {
+            SchemaKeyspace.saveSystemKeyspacesSchema();
+        }
     }
 
     /**
      * See CASSANDRA-16856/16996. Make sure schema pulls are synchronized to 
prevent concurrent schema pull/writes
      */
-    public static synchronized void truncateSystemKeyspace()
+    public static void truncateSystemKeyspace()

Review comment:
       `truncateSchemaKeyspace()`?

##########
File path: test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
##########
@@ -183,8 +269,32 @@ public void testSchemaNoColumn()
                                     KeyspaceParams.simple(1),
                                     SchemaLoader.standardCFMD(testKS, 
testTable));
         // Delete all colmns in the schema
-        String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=? 
and table_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME, 
SchemaKeyspace.COLUMNS);
+        String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=? 
and table_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME, 
SystemKeyspaceConstants.COLUMNS);
         executeOnceInternal(query, testKS, testTable);
         SchemaKeyspace.fetchNonSystemKeyspaces();
     }
+
+    private void testMethodSynchronicity(Runnable method) throws Exception
+    {
+        ExecutorService pool = Executors.newFixedThreadPool(2);
+        CountDownLatch pendingStarts = new CountDownLatch(2);
+
+        threadBlock = new CountDownLatch(2);
+        pool.execute(() -> {
+            pendingStarts.countDown();
+            method.run();
+        });
+        pool.execute(() -> {
+            pendingStarts.countDown();
+            method.run();
+        });
+
+        // We have started 2 threads. 1 thread should be held by the latch 
threadBlock, the other should be held as
+        // proof of thread safety.
+        pendingStarts.await(10, TimeUnit.SECONDS);
+        Util.spinAssertEquals(1L, () -> threadBlock.getCount(), 10);
+
+        // Do not execute the tasks. Just kill it.
+        pool.shutdownNow();
+    }

Review comment:
       Thanks for the other minor fixes. I think we're almost there.
   
   I know we've gone back and forth quite a bit, but the tests are still more 
or less checking that we've synchronized the appropriate methods (although the 
latest one does at least act as a tripwire for the static vs instance method 
synchronization issue).
   
   I put some code to the descriptions I've given in a couple other places:
   
   ```
   @Test
   @BMRule(name = "delay partition updates to schema tables",
           targetClass = "CassandraTableWriteHandler",
           targetMethod = "write",
           action = "Thread.sleep(100);",
           targetLocation = "AT EXIT")
   public void testNoVisiblePartialSchemaUpdates() throws Exception
   {
       String keyspace = "sandbox";
       ExecutorService pool = Executors.newFixedThreadPool(2);
   
       Schema.truncateSystemKeyspace(); // Make sure there's nothing but the 
create we're about to do
       CyclicBarrier barrier = new CyclicBarrier(2);
   
       Future<Void> creation = pool.submit(() -> {
           barrier.await();
           createTable(keyspace, "CREATE TABLE test (a text primary key, b int, 
c int)");
           return null;
       });
   
       Future<Collection<Mutation>> mutationsFromThread = pool.submit(() -> {
           barrier.await(); 
           return Schema.schemaKeyspaceAsMutations();
       });
   
       creation.get(); // make sure the creation is finished
   
       Collection<Mutation> mutationsFromConcurrentAccess = 
mutationsFromThread.get();
       Collection<Mutation> settledMutations = 
Schema.schemaKeyspaceAsMutations();
   
       // If the worker thread picked up the creation at all, it should have 
the same modifications.
       // In other words, we should see all modifications or none.
       if (mutationsFromConcurrentAccess.size() == settledMutations.size())
       {
           assertEquals(1, settledMutations.size());
           Mutation mutationFromConcurrentAccess = 
mutationsFromConcurrentAccess.iterator().next();
           Mutation settledMutation = settledMutations.iterator().next();
           
           assertEquals("Read partial schema change!",
                        settledMutation.getTableIds(), 
mutationFromConcurrentAccess.getTableIds());
       }
       
       pool.shutdownNow();
   }
   ```
   This tests the public API of Schema for the particular use case that spawned 
this Jira, reliably identifies a partial schema update on creation, and passes 
reliably with proper synchronization. I would be perfectly happy w/ our 
coverage (like +1 now, given it looks like we've cleaned up everything else) if 
the patch added just this test, although it could probably be pretty easily 
generalized to handle truncation as well.

##########
File path: test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
##########
@@ -63,15 +84,112 @@ public static void defineSchema() throws 
ConfigurationException
                                     SchemaLoader.standardCFMD(KEYSPACE1, 
CF_STANDARD1));
     }
 
-    /** See CASSANDRA-16856. Make sure schema pulls are synchronized to 
prevent concurrent schema pull/writes
-     *
-     * @throws Exception
-     */
+    /** See CASSANDRA-16856/16996. Make sure schema pulls are synchronized to 
prevent concurrent schema pull/writes */
     @Test
     public void testSchemaPullSynchoricity() throws Exception
     {
-        Method method = 
SchemaKeyspace.class.getDeclaredMethod("convertSchemaToMutations");
+        for (String methodName : Arrays.asList("schemaKeyspaceAsMutations",
+                                               "truncateSchemaKeyspace",
+                                               "saveSystemKeyspace",
+                                               "updateVersion"))
+        {
+            Method method = Schema.class.getDeclaredMethod(methodName);
+            assertTrue(Modifier.isSynchronized(method.getModifiers()));
+        }
+
+        Method method = Schema.class.getDeclaredMethod("merge", 
Collection.class);
         assertTrue(Modifier.isSynchronized(method.getModifiers()));
+        method = Schema.class.getDeclaredMethod("transform", 
SchemaTransformation.class, boolean.class, long.class);
+        assertTrue(Modifier.isSynchronized(method.getModifiers()));
+    }
+
+    /** See CASSANDRA-16856/16996. Make sure schema pulls are synchronized to 
prevent concurrent schema pull/writes */
+    @Test
+    @BMRule(name = "Delay system ks schema",
+            targetClass = "SchemaKeyspace",
+            targetMethod = "applyChanges",
+            action = 
"org.apache.cassandra.schema.SchemaKeyspaceTest.threadBlock.countDown();"
+                     + 
"org.apache.cassandra.schema.SchemaKeyspaceTest.threadBlock.await();",
+            targetLocation = "AT EXIT")
+    public void testNoVisiblePartialSchemaUpdates() throws Exception
+    {
+        String keyspace = "sandbox";
+        AtomicBoolean foundCollision = new AtomicBoolean(false);
+        ExecutorService pool = Executors.newFixedThreadPool(2);
+        CountDownLatch pendingStarts = new CountDownLatch(2);
+        CountDownLatch blockedThreadFinished = new CountDownLatch(1);
+
+        threadBlock = new CountDownLatch(2);
+        pool.execute(() -> {
+            pendingStarts.countDown();
+            createTable(keyspace, "CREATE TABLE test (a text primary key, b 
int, c int)");
+        });
+        // Wait until table is created and that thread blocks
+        Util.spinAssertEquals(1L, () -> pendingStarts.getCount(), 5);
+        Util.spinAssertEquals(1L, () -> threadBlock.getCount(), 5);
+
+        pool.execute(() -> {
+            pendingStarts.countDown();
+            foundCollision.set(Schema.instance.schemaKeyspaceAsMutations()
+                                              .stream()
+                                              .filter(m -> 
m.getPartitionUpdates().toString().contains(keyspace))
+                                              .collect(Collectors.counting())
+                               > 0);
+            blockedThreadFinished.countDown();
+        });
+
+        pendingStarts.await(10, TimeUnit.SECONDS);
+        Util.spinAssertEquals(1L, () -> threadBlock.getCount(), 5);
+        assertFalse(blockedThreadFinished.await(5, TimeUnit.SECONDS));
+        assertEquals(false, foundCollision.get());
+
+        // Do not execute the tasks. Just kill it.
+        pool.shutdownNow();
+    }
+
+    @Test
+    @BMRule(name = "delay partition updates to schema tables",
+            targetClass = "CassandraTableWriteHandler",
+            targetMethod = "write",
+            action = "Thread.sleep(100);",

Review comment:
       Minor note...if we're concerned about making sure thread overlap is 
99.9% guaranteed (which is fine for a fuzz test), we could easily raise this 
10x to 1000.

##########
File path: test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
##########
@@ -183,8 +301,21 @@ public void testSchemaNoColumn()
                                     KeyspaceParams.simple(1),
                                     SchemaLoader.standardCFMD(testKS, 
testTable));
         // Delete all colmns in the schema
-        String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=? 
and table_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME, 
SchemaKeyspace.COLUMNS);
+        String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=? 
and table_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME, 
SchemaKeyspaceTables.COLUMNS);
         executeOnceInternal(query, testKS, testTable);
         SchemaKeyspace.fetchNonSystemKeyspaces();
     }
+
+    private void testMethodSynchronicity(Runnable method) throws Exception

Review comment:
       This guy is dead code now?

##########
File path: test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
##########
@@ -183,8 +269,32 @@ public void testSchemaNoColumn()
                                     KeyspaceParams.simple(1),
                                     SchemaLoader.standardCFMD(testKS, 
testTable));
         // Delete all colmns in the schema
-        String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=? 
and table_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME, 
SchemaKeyspace.COLUMNS);
+        String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=? 
and table_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME, 
SystemKeyspaceConstants.COLUMNS);
         executeOnceInternal(query, testKS, testTable);
         SchemaKeyspace.fetchNonSystemKeyspaces();
     }
+
+    private void testMethodSynchronicity(Runnable method) throws Exception
+    {
+        ExecutorService pool = Executors.newFixedThreadPool(2);
+        CountDownLatch pendingStarts = new CountDownLatch(2);
+
+        threadBlock = new CountDownLatch(2);
+        pool.execute(() -> {
+            pendingStarts.countDown();
+            method.run();
+        });
+        pool.execute(() -> {
+            pendingStarts.countDown();
+            method.run();
+        });
+
+        // We have started 2 threads. 1 thread should be held by the latch 
threadBlock, the other should be held as
+        // proof of thread safety.
+        pendingStarts.await(10, TimeUnit.SECONDS);
+        Util.spinAssertEquals(1L, () -> threadBlock.getCount(), 10);
+
+        // Do not execute the tasks. Just kill it.
+        pool.shutdownNow();
+    }

Review comment:
       > I disagree the tests only check the syncs. There is indeed a test that 
tests the sync across the former static/instance problem but that's not the 
point of that test. Here it tests if a partial update can be seen by some other 
thread.
   
   1.) If you remove synchronization, it fails on `blockedThreadFinished` 
counting down, not on the partial schema mutation read.
   2.) The create table schema mutation has 3 modifications. I don't think 
checking for `> 0` is what we want, as 3 modifications is a perfectly valid 
outcome. (i.e. The failure case is having the same number of mutations but 
different modifications within them.)
   
   I can +1 if we remove this and `testSchemaPullSynchoricity()`, which again, 
is literally a check for `synchronized`, but otherwise, let's just grab a 
second official (committer) reviewer?
   
   > Also it is time dependent
   
   Sure, but we can easily make the sleep like 1 second and cause overlap 
99.99% of the time, which is probably fine for a fuzz test like this.

##########
File path: test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
##########
@@ -183,8 +269,32 @@ public void testSchemaNoColumn()
                                     KeyspaceParams.simple(1),
                                     SchemaLoader.standardCFMD(testKS, 
testTable));
         // Delete all colmns in the schema
-        String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=? 
and table_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME, 
SchemaKeyspace.COLUMNS);
+        String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=? 
and table_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME, 
SystemKeyspaceConstants.COLUMNS);
         executeOnceInternal(query, testKS, testTable);
         SchemaKeyspace.fetchNonSystemKeyspaces();
     }
+
+    private void testMethodSynchronicity(Runnable method) throws Exception
+    {
+        ExecutorService pool = Executors.newFixedThreadPool(2);
+        CountDownLatch pendingStarts = new CountDownLatch(2);
+
+        threadBlock = new CountDownLatch(2);
+        pool.execute(() -> {
+            pendingStarts.countDown();
+            method.run();
+        });
+        pool.execute(() -> {
+            pendingStarts.countDown();
+            method.run();
+        });
+
+        // We have started 2 threads. 1 thread should be held by the latch 
threadBlock, the other should be held as
+        // proof of thread safety.
+        pendingStarts.await(10, TimeUnit.SECONDS);
+        Util.spinAssertEquals(1L, () -> threadBlock.getCount(), 10);
+
+        // Do not execute the tasks. Just kill it.
+        pool.shutdownNow();
+    }

Review comment:
       Outside the tests, everything LGTM




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to