adelapena commented on code in PR #1804:
URL: https://github.com/apache/cassandra/pull/1804#discussion_r958439764


##########
src/java/org/apache/cassandra/schema/MigrationCoordinator.java:
##########
@@ -180,6 +199,18 @@ boolean wasReceived()
         {
             return receivedSchema;
         }
+
+        @Override
+        public String toString()
+        {
+            return "VersionInfo{" +
+                   "version=" + version +
+                   ", outstandingRequests=" + outstandingRequests +
+                   ", requestQueue=" + requestQueue +
+                   ", waitQueue=" + waitQueue.getWaiting() +

Review Comment:
   Maybe it should be `waitQueueSize`?
   ```suggestion
                      ", waitQueueSize=" + waitQueue.getWaiting() +
   ```



##########
src/java/org/apache/cassandra/schema/MigrationCoordinator.java:
##########
@@ -23,24 +23,24 @@
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;

Review Comment:
   Nit: unused import



##########
src/java/org/apache/cassandra/config/CassandraRelevantProperties.java:
##########
@@ -235,6 +235,7 @@
     MEMTABLE_OVERHEAD_SIZE("cassandra.memtable.row_overhead_size", "-1"),
     
MEMTABLE_OVERHEAD_COMPUTE_STEPS("cassandra.memtable_row_overhead_computation_step",
 "100000"),
     MIGRATION_DELAY("cassandra.migration_delay_ms", "60000"),
+    SCHEMA_PULL_INTERVAL("cassandra.schema_pull_interval_ms", "60000"),

Review Comment:
   I'd add a brief JavaDoc line saying what the property does.



##########
src/java/org/apache/cassandra/config/CassandraRelevantProperties.java:
##########
@@ -235,6 +235,7 @@
     MEMTABLE_OVERHEAD_SIZE("cassandra.memtable.row_overhead_size", "-1"),
     
MEMTABLE_OVERHEAD_COMPUTE_STEPS("cassandra.memtable_row_overhead_computation_step",
 "100000"),
     MIGRATION_DELAY("cassandra.migration_delay_ms", "60000"),
+    SCHEMA_PULL_INTERVAL("cassandra.schema_pull_interval_ms", "60000"),

Review Comment:
   Probably we should include the unit in the name, like in 
`SCHEMA_PULL_INTERVAL_MS`.



##########
test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java:
##########
@@ -95,25 +107,54 @@ private void selectSilent(Cluster cluster, String name)
     @Test
     public void schemaReset() throws Throwable
     {
+        CassandraRelevantProperties.MIGRATION_DELAY.setLong(10000);
+        CassandraRelevantProperties.SCHEMA_PULL_INTERVAL.setLong(10000);
         try (Cluster cluster = init(Cluster.build(2).withConfig(cfg -> 
cfg.with(Feature.GOSSIP, Feature.NETWORK)).start()))
         {
             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk INT 
PRIMARY KEY, v TEXT)");
 
             assertTrue(cluster.get(1).callOnInstance(() -> 
Schema.instance.getTableMetadata(KEYSPACE, "tbl") != null));
             assertTrue(cluster.get(2).callOnInstance(() -> 
Schema.instance.getTableMetadata(KEYSPACE, "tbl") != null));
 
+            // now we have a table tbl in the schema of both nodes
+
             cluster.get(2).shutdown().get();
 
-            // when schema is removed and there is no other node to fetch it 
from, node 1 should be left with clean schema
-            cluster.get(1).runOnInstance(() -> 
Schema.instance.resetLocalSchema());
+            Awaitility.await().atMost(Duration.ofSeconds(30)) // wait until 
node 1 notices that node 2 is dead
+                      .until(() -> cluster.get(1).callOnInstance(() -> 
Gossiper.instance.getLiveMembers().stream().allMatch(e -> 
e.equals(FBUtilities.getBroadcastAddressAndPort()))));
+
+            // when there is no node to fetch the schema from, reset local 
schema should immediately fail
+            
Assertions.assertThatExceptionOfType(RuntimeException.class).isThrownBy(() -> {
+                cluster.get(1).runOnInstance(() -> 
Schema.instance.resetLocalSchema());

Review Comment:
   ```suggestion
                   
cluster.get(1).runOnInstance(Schema.instance::resetLocalSchema);
   ```



##########
src/java/org/apache/cassandra/schema/MigrationCoordinator.java:
##########
@@ -85,6 +85,10 @@
  * schema. It performs periodic checks and if there is a schema version 
mismatch between the current node and the other
  * node, it pulls the schema and applies the changes locally through the 
callback.
  *
+ * In particular the Migration Coordinator keeps track of all schema versions 
reported from each node in the cluster.
+ * As long as the certain version is advertised by some node, it is being 
tracked. As long as a version is tracked,

Review Comment:
   ```suggestion
    * As long as a certain version is advertised by some node, it is being 
tracked. As long as a version is tracked,
   ```



##########
src/java/org/apache/cassandra/schema/DefaultSchemaUpdateHandler.java:
##########
@@ -85,8 +90,21 @@ public DefaultSchemaUpdateHandler(MigrationCoordinator 
migrationCoordinator,
         this.updateCallback = updateCallback;
         this.migrationCoordinator = migrationCoordinator == null ? 
createMigrationCoordinator(messagingService) : migrationCoordinator;
         Gossiper.instance.register(this);
-        SchemaPushVerbHandler.instance.register(msg -> 
applyMutations(msg.payload));
-        SchemaPullVerbHandler.instance.register(msg -> 
messagingService.send(msg.responseWith(getSchemaMutations()), msg.from()));
+        SchemaPushVerbHandler.instance.register(msg -> {
+            synchronized (this)
+            {
+                if (requestedReset == null)
+                    applyMutations(msg.payload);
+            }
+        });
+        SchemaPullVerbHandler.instance.register(msg -> {
+            try
+            {
+                messagingService.send(msg.responseWith(getSchemaMutations()), 
msg.from());
+            } catch (RuntimeException ex) {

Review Comment:
   ```suggestion
               }
               catch (RuntimeException ex)
               {
   ```



##########
src/java/org/apache/cassandra/schema/SchemaUpdateHandler.java:
##########
@@ -63,14 +64,15 @@
      * refreshed, the callbacks provided in the factory method are executed, 
and the updated schema version is announced.
      *
      * @param local whether we should reset with locally stored schema or 
fetch the schema from other nodes
-     * @return transformation result
      */
-    SchemaTransformationResult reset(boolean local);
+    void reset(boolean local);
 
     /**
      * Clears the locally stored schema entirely. After this operation the 
schema is equal to {@link DistributedSchema#EMPTY}.
      * The method does not execute any callback. It is indended to 
reinitialize the schema later using the method
      * {@link #reset(boolean)}.
+     *
+     * @return

Review Comment:
   I think we should either add some description to the `@return` tag or remove 
it.



##########
src/java/org/apache/cassandra/schema/Schema.java:
##########
@@ -611,18 +615,27 @@ public SchemaTransformationResult 
transform(SchemaTransformation transformation,
      * Clear all locally stored schema information and fetch schema from 
another node.
      * Called by user (via JMX) who wants to get rid of schema disagreement.
      */
-    public synchronized void resetLocalSchema()
+    public void resetLocalSchema()
     {
         logger.debug("Clearing local schema...");
-        updateHandler.clear();
 
-        logger.debug("Clearing local schema keyspace instances...");
-        distributedKeyspaces.forEach(this::unload);
-        updateVersion(SchemaConstants.emptyVersion);
-        SchemaDiagnostics.schemaCleared(this);
+        if (Gossiper.instance.getLiveMembers().stream().allMatch(ep -> 
FBUtilities.getBroadcastAddressAndPort().equals(ep)))
+            throw new InvalidRequestException("Cannot reset local schema when 
there are no other live nodes");
 
-        updateHandler.reset(false);
-        logger.info("Local schema reset is complete.");
+        Awaitable clearCompletion = updateHandler.clear();
+        try
+        {
+            if (!clearCompletion.await(StorageService.SCHEMA_DELAY_MILLIS, 
TimeUnit.MILLISECONDS)) {

Review Comment:
   ```suggestion
               if (!clearCompletion.await(StorageService.SCHEMA_DELAY_MILLIS, 
TimeUnit.MILLISECONDS))
               {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to