maedhroz commented on a change in pull request #1221:
URL: https://github.com/apache/cassandra/pull/1221#discussion_r727686236
##########
File path: src/java/org/apache/cassandra/schema/Schema.java
##########
@@ -85,25 +85,34 @@ private Schema()
*
* See CASSANDRA-16856/16996. Make sure schema pulls are synchronized to
prevent concurrent schema pull/writes
*/
- public static synchronized void saveSystemKeyspace()
+ public static void saveSystemKeyspace()
{
- SchemaKeyspace.saveSystemKeyspacesSchema();
+ synchronized(instance)
+ {
+ SchemaKeyspace.saveSystemKeyspacesSchema();
+ }
}
/**
* See CASSANDRA-16856/16996. Make sure schema pulls are synchronized to
prevent concurrent schema pull/writes
*/
- public static synchronized void truncateSystemKeyspace()
+ public static void truncateSystemKeyspace()
{
- SchemaKeyspace.truncate();
+ synchronized(instance)
+ {
+ SchemaKeyspace.truncate();
+ }
}
/**
* See CASSANDRA-16856/16996. Make sure schema pulls are synchronized to
prevent concurrent schema pull/writes
*/
- public static synchronized Collection<Mutation> schemaKeyspaceAsMutations()
+ public static Collection<Mutation> schemaKeyspaceAsMutations()
{
- return SchemaKeyspace.convertSchemaToMutations();
+ synchronized(instance)
+ {
+ return SchemaKeyspace.convertSchemaToMutations();
+ }
Review comment:
nit: It seems like it would be much less invasive to leave the
`synchronized` keyword in the method signatures and make them instance methods
(then access via `Schema.instance` like the existing synchronized methods)?
##########
File path: src/java/org/apache/cassandra/schema/Schema.java
##########
@@ -85,25 +85,34 @@ private Schema()
*
* See CASSANDRA-16856/16996. Make sure schema pulls are synchronized to
prevent concurrent schema pull/writes
*/
- public static synchronized void saveSystemKeyspace()
+ public static void saveSystemKeyspace()
{
- SchemaKeyspace.saveSystemKeyspacesSchema();
+ synchronized(instance)
+ {
+ SchemaKeyspace.saveSystemKeyspacesSchema();
+ }
}
/**
* See CASSANDRA-16856/16996. Make sure schema pulls are synchronized to
prevent concurrent schema pull/writes
*/
- public static synchronized void truncateSystemKeyspace()
+ public static void truncateSystemKeyspace()
Review comment:
`truncateSchemaKeyspace()`?
##########
File path: test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
##########
@@ -183,8 +269,32 @@ public void testSchemaNoColumn()
KeyspaceParams.simple(1),
SchemaLoader.standardCFMD(testKS,
testTable));
// Delete all colmns in the schema
- String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=?
and table_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME,
SchemaKeyspace.COLUMNS);
+ String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=?
and table_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME,
SystemKeyspaceConstants.COLUMNS);
executeOnceInternal(query, testKS, testTable);
SchemaKeyspace.fetchNonSystemKeyspaces();
}
+
+ private void testMethodSynchronicity(Runnable method) throws Exception
+ {
+ ExecutorService pool = Executors.newFixedThreadPool(2);
+ CountDownLatch pendingStarts = new CountDownLatch(2);
+
+ threadBlock = new CountDownLatch(2);
+ pool.execute(() -> {
+ pendingStarts.countDown();
+ method.run();
+ });
+ pool.execute(() -> {
+ pendingStarts.countDown();
+ method.run();
+ });
+
+ // We have started 2 threads. 1 thread should be held by the latch
threadBlock, the other should be held as
+ // proof of thread safety.
+ pendingStarts.await(10, TimeUnit.SECONDS);
+ Util.spinAssertEquals(1L, () -> threadBlock.getCount(), 10);
+
+ // Do not execute the tasks. Just kill it.
+ pool.shutdownNow();
+ }
Review comment:
Thanks for the other minor fixes. I think we're almost there.
I know we've gone back and forth quite a bit, but the tests are still more
or less checking that we've synchronized the appropriate methods (although the
latest one does at least act as a tripwire for the static vs instance method
synchronization issue).
I put some code to the descriptions I've given in a couple other places:
```
@Test
@BMRule(name = "delay partition updates to schema tables",
targetClass = "CassandraTableWriteHandler",
targetMethod = "write",
action = "Thread.sleep(100);",
targetLocation = "AT EXIT")
public void testNoVisiblePartialSchemaUpdates() throws Exception
{
String keyspace = "sandbox";
ExecutorService pool = Executors.newFixedThreadPool(2);
Schema.truncateSystemKeyspace(); // Make sure there's nothing but the
create we're about to do
CyclicBarrier barrier = new CyclicBarrier(2);
Future<Void> creation = pool.submit(() -> {
barrier.await();
createTable(keyspace, "CREATE TABLE test (a text primary key, b int,
c int)");
return null;
});
Future<Collection<Mutation>> mutationsFromThread = pool.submit(() -> {
barrier.await();
return Schema.schemaKeyspaceAsMutations();
});
creation.get(); // make sure the creation is finished
Collection<Mutation> mutationsFromConcurrentAccess =
mutationsFromThread.get();
Collection<Mutation> settledMutations =
Schema.schemaKeyspaceAsMutations();
// If the worker thread picked up the creation at all, it should have
the same modifications.
// In other words, we should see all modifications or none.
if (mutationsFromConcurrentAccess.size() == settledMutations.size())
{
assertEquals(1, settledMutations.size());
Mutation mutationFromConcurrentAccess =
mutationsFromConcurrentAccess.iterator().next();
Mutation settledMutation = settledMutations.iterator().next();
assertEquals("Read partial schema change!",
settledMutation.getTableIds(),
mutationFromConcurrentAccess.getTableIds());
}
pool.shutdownNow();
}
```
This tests the public API of Schema for the particular use case that spawned
this Jira, reliably identifies a partial schema update on creation, and passes
reliably with proper synchronization. I would be perfectly happy w/ our
coverage (like +1 now, given it looks like we've cleaned up everything else) if
the patch added just this test, although it could probably be pretty easily
generalized to handle truncation as well.
##########
File path: test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
##########
@@ -63,15 +84,112 @@ public static void defineSchema() throws
ConfigurationException
SchemaLoader.standardCFMD(KEYSPACE1,
CF_STANDARD1));
}
- /** See CASSANDRA-16856. Make sure schema pulls are synchronized to
prevent concurrent schema pull/writes
- *
- * @throws Exception
- */
+ /** See CASSANDRA-16856/16996. Make sure schema pulls are synchronized to
prevent concurrent schema pull/writes */
@Test
public void testSchemaPullSynchoricity() throws Exception
{
- Method method =
SchemaKeyspace.class.getDeclaredMethod("convertSchemaToMutations");
+ for (String methodName : Arrays.asList("schemaKeyspaceAsMutations",
+ "truncateSchemaKeyspace",
+ "saveSystemKeyspace",
+ "updateVersion"))
+ {
+ Method method = Schema.class.getDeclaredMethod(methodName);
+ assertTrue(Modifier.isSynchronized(method.getModifiers()));
+ }
+
+ Method method = Schema.class.getDeclaredMethod("merge",
Collection.class);
assertTrue(Modifier.isSynchronized(method.getModifiers()));
+ method = Schema.class.getDeclaredMethod("transform",
SchemaTransformation.class, boolean.class, long.class);
+ assertTrue(Modifier.isSynchronized(method.getModifiers()));
+ }
+
+ /** See CASSANDRA-16856/16996. Make sure schema pulls are synchronized to
prevent concurrent schema pull/writes */
+ @Test
+ @BMRule(name = "Delay system ks schema",
+ targetClass = "SchemaKeyspace",
+ targetMethod = "applyChanges",
+ action =
"org.apache.cassandra.schema.SchemaKeyspaceTest.threadBlock.countDown();"
+ +
"org.apache.cassandra.schema.SchemaKeyspaceTest.threadBlock.await();",
+ targetLocation = "AT EXIT")
+ public void testNoVisiblePartialSchemaUpdates() throws Exception
+ {
+ String keyspace = "sandbox";
+ AtomicBoolean foundCollision = new AtomicBoolean(false);
+ ExecutorService pool = Executors.newFixedThreadPool(2);
+ CountDownLatch pendingStarts = new CountDownLatch(2);
+ CountDownLatch blockedThreadFinished = new CountDownLatch(1);
+
+ threadBlock = new CountDownLatch(2);
+ pool.execute(() -> {
+ pendingStarts.countDown();
+ createTable(keyspace, "CREATE TABLE test (a text primary key, b
int, c int)");
+ });
+ // Wait until table is created and that thread blocks
+ Util.spinAssertEquals(1L, () -> pendingStarts.getCount(), 5);
+ Util.spinAssertEquals(1L, () -> threadBlock.getCount(), 5);
+
+ pool.execute(() -> {
+ pendingStarts.countDown();
+ foundCollision.set(Schema.instance.schemaKeyspaceAsMutations()
+ .stream()
+ .filter(m ->
m.getPartitionUpdates().toString().contains(keyspace))
+ .collect(Collectors.counting())
+ > 0);
+ blockedThreadFinished.countDown();
+ });
+
+ pendingStarts.await(10, TimeUnit.SECONDS);
+ Util.spinAssertEquals(1L, () -> threadBlock.getCount(), 5);
+ assertFalse(blockedThreadFinished.await(5, TimeUnit.SECONDS));
+ assertEquals(false, foundCollision.get());
+
+ // Do not execute the tasks. Just kill it.
+ pool.shutdownNow();
+ }
+
+ @Test
+ @BMRule(name = "delay partition updates to schema tables",
+ targetClass = "CassandraTableWriteHandler",
+ targetMethod = "write",
+ action = "Thread.sleep(100);",
Review comment:
Minor note...if we're concerned about making sure thread overlap is
99.9% guaranteed (which is fine for a fuzz test), we could easily raise this
10x to 1000.
##########
File path: test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
##########
@@ -183,8 +301,21 @@ public void testSchemaNoColumn()
KeyspaceParams.simple(1),
SchemaLoader.standardCFMD(testKS,
testTable));
// Delete all colmns in the schema
- String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=?
and table_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME,
SchemaKeyspace.COLUMNS);
+ String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=?
and table_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME,
SchemaKeyspaceTables.COLUMNS);
executeOnceInternal(query, testKS, testTable);
SchemaKeyspace.fetchNonSystemKeyspaces();
}
+
+ private void testMethodSynchronicity(Runnable method) throws Exception
Review comment:
This guy is dead code now?
##########
File path: test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
##########
@@ -183,8 +269,32 @@ public void testSchemaNoColumn()
KeyspaceParams.simple(1),
SchemaLoader.standardCFMD(testKS,
testTable));
// Delete all colmns in the schema
- String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=?
and table_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME,
SchemaKeyspace.COLUMNS);
+ String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=?
and table_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME,
SystemKeyspaceConstants.COLUMNS);
executeOnceInternal(query, testKS, testTable);
SchemaKeyspace.fetchNonSystemKeyspaces();
}
+
+ private void testMethodSynchronicity(Runnable method) throws Exception
+ {
+ ExecutorService pool = Executors.newFixedThreadPool(2);
+ CountDownLatch pendingStarts = new CountDownLatch(2);
+
+ threadBlock = new CountDownLatch(2);
+ pool.execute(() -> {
+ pendingStarts.countDown();
+ method.run();
+ });
+ pool.execute(() -> {
+ pendingStarts.countDown();
+ method.run();
+ });
+
+ // We have started 2 threads. 1 thread should be held by the latch
threadBlock, the other should be held as
+ // proof of thread safety.
+ pendingStarts.await(10, TimeUnit.SECONDS);
+ Util.spinAssertEquals(1L, () -> threadBlock.getCount(), 10);
+
+ // Do not execute the tasks. Just kill it.
+ pool.shutdownNow();
+ }
Review comment:
> I disagree the tests only check the syncs. There is indeed a test that
tests the sync across the former static/instance problem but that's not the
point of that test. Here it tests if a partial update can be seen by some other
thread.
1.) If you remove synchronization, it fails on `blockedThreadFinished`
counting down, not on the partial schema mutation read.
2.) The create table schema mutation has 3 modifications. I don't think
checking for `> 0` is what we want, as 3 modifications is a perfectly valid
outcome. (i.e. The failure case is having the same number of mutations but
different modifications within them.)
I can +1 if we remove this and `testSchemaPullSynchoricity()`, which again,
is literally a check for `synchronized`, but otherwise, let's just grab a
second official (committer) reviewer?
> Also it is time dependent
Sure, but we can easily make the sleep like 1 second and cause overlap
99.99% of the time, which is probably fine for a fuzz test like this.
##########
File path: test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
##########
@@ -183,8 +269,32 @@ public void testSchemaNoColumn()
KeyspaceParams.simple(1),
SchemaLoader.standardCFMD(testKS,
testTable));
// Delete all colmns in the schema
- String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=?
and table_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME,
SchemaKeyspace.COLUMNS);
+ String query = String.format("DELETE FROM %s.%s WHERE keyspace_name=?
and table_name=?", SchemaConstants.SCHEMA_KEYSPACE_NAME,
SystemKeyspaceConstants.COLUMNS);
executeOnceInternal(query, testKS, testTable);
SchemaKeyspace.fetchNonSystemKeyspaces();
}
+
+ private void testMethodSynchronicity(Runnable method) throws Exception
+ {
+ ExecutorService pool = Executors.newFixedThreadPool(2);
+ CountDownLatch pendingStarts = new CountDownLatch(2);
+
+ threadBlock = new CountDownLatch(2);
+ pool.execute(() -> {
+ pendingStarts.countDown();
+ method.run();
+ });
+ pool.execute(() -> {
+ pendingStarts.countDown();
+ method.run();
+ });
+
+ // We have started 2 threads. 1 thread should be held by the latch
threadBlock, the other should be held as
+ // proof of thread safety.
+ pendingStarts.await(10, TimeUnit.SECONDS);
+ Util.spinAssertEquals(1L, () -> threadBlock.getCount(), 10);
+
+ // Do not execute the tasks. Just kill it.
+ pool.shutdownNow();
+ }
Review comment:
Outside the tests, everything LGTM
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]