ekaterinadimitrova2 commented on code in PR #1804:
URL: https://github.com/apache/cassandra/pull/1804#discussion_r957753652


##########
src/java/org/apache/cassandra/schema/MigrationCoordinator.java:
##########
@@ -220,18 +251,23 @@ boolean wasReceived()
 
     void start()
     {
+        logger.info("Starting migration coordinator and scheduling pulling 
schema versions every 1 minute");

Review Comment:
   I think we need to make it print the SCHEMA_PULL_INTERVAL as it is mutable, 
not 1 minute necessarily anymore



##########
test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java:
##########
@@ -95,25 +107,54 @@ private void selectSilent(Cluster cluster, String name)
     @Test
     public void schemaReset() throws Throwable
     {
+        CassandraRelevantProperties.MIGRATION_DELAY.setLong(10000);
+        CassandraRelevantProperties.SCHEMA_PULL_INTERVAL.setLong(10000);
         try (Cluster cluster = init(Cluster.build(2).withConfig(cfg -> 
cfg.with(Feature.GOSSIP, Feature.NETWORK)).start()))
         {
             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk INT 
PRIMARY KEY, v TEXT)");
 
             assertTrue(cluster.get(1).callOnInstance(() -> 
Schema.instance.getTableMetadata(KEYSPACE, "tbl") != null));
             assertTrue(cluster.get(2).callOnInstance(() -> 
Schema.instance.getTableMetadata(KEYSPACE, "tbl") != null));
 
+            // now we have a table tbl in the schema of both nodes
+
             cluster.get(2).shutdown().get();
 
-            // when schema is removed and there is no other node to fetch it 
from, node 1 should be left with clean schema
-            cluster.get(1).runOnInstance(() -> 
Schema.instance.resetLocalSchema());
+            Awaitility.await().atMost(Duration.ofSeconds(30)) // wait until 
node 1 notices that node 2 is dead
+                      .until(() -> cluster.get(1).callOnInstance(() -> 
Gossiper.instance.getLiveMembers().stream().allMatch(e -> 
e.equals(FBUtilities.getBroadcastAddressAndPort()))));
+
+            // when there is no node to fetch the schema from, reset local 
schema should immediately fail
+            
Assertions.assertThatExceptionOfType(RuntimeException.class).isThrownBy(() -> {
+                cluster.get(1).runOnInstance(() -> 
Schema.instance.resetLocalSchema());
+            }).withMessageContaining("Cannot reset local schema when there are 
no other live nodes");
+
+            // now, let's make a disagreement, the shutdown node 2 has a 
definition of tbl, while the running node 1 does not
+            cluster.get(1).runOnInstance(() -> {
+                Schema.instance.transform(current -> 
Schema.instance.distributedKeyspaces().without(KEYSPACE), false);
+             });
+
             assertTrue(cluster.get(1).callOnInstance(() -> 
Schema.instance.getTableMetadata(KEYSPACE, "tbl") == null));
 
+            // clear will wait until it receives schema from some other node
+            // also if we start the node2 first, schema of node2 will be 
synced to schema of node1 because node1 has the newest change (dropping the 
table)
+            // if we run clean on node1 first, it will advertise empty schema 
to node2 and apply the schema from it without merging
+            CompletableFuture<Boolean> clear1 = 
CompletableFuture.supplyAsync(() -> cluster.get(1).callOnInstance(() -> 
Schema.instance.updateHandler.clear().awaitUninterruptibly(1, 
TimeUnit.MINUTES)));

Review Comment:
   This line can also be split in a few lines so we don't have to scroll right



##########
test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java:
##########
@@ -19,15 +19,27 @@
 package org.apache.cassandra.distributed.test;
 
 import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Test;
 
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.assertj.core.api.Assertions;
 import org.awaitility.Awaitility;
 
+import static org.apache.cassandra.schema.SchemaKeyspaceTables.ALL;

Review Comment:
   Unused import



##########
test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java:
##########
@@ -19,15 +19,27 @@
 package org.apache.cassandra.distributed.test;
 
 import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Test;
 
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.service.StorageService;

Review Comment:
   Unused imports



##########
src/java/org/apache/cassandra/schema/MigrationCoordinator.java:
##########
@@ -426,49 +529,19 @@ private Future<Void> 
scheduleSchemaPull(InetAddressAndPort endpoint, VersionInfo
         FutureTask<Void> task = new FutureTask<>(() -> pullSchema(endpoint, 
new Callback(endpoint, info)));
 
         if (shouldPullImmediately(endpoint, info.version))
+        {
+            logger.debug("Pulling {} immediatelly from {}", info, endpoint);

Review Comment:
   immediately



##########
test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java:
##########
@@ -95,25 +107,54 @@ private void selectSilent(Cluster cluster, String name)
     @Test
     public void schemaReset() throws Throwable
     {
+        CassandraRelevantProperties.MIGRATION_DELAY.setLong(10000);
+        CassandraRelevantProperties.SCHEMA_PULL_INTERVAL.setLong(10000);
         try (Cluster cluster = init(Cluster.build(2).withConfig(cfg -> 
cfg.with(Feature.GOSSIP, Feature.NETWORK)).start()))
         {
             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk INT 
PRIMARY KEY, v TEXT)");
 
             assertTrue(cluster.get(1).callOnInstance(() -> 
Schema.instance.getTableMetadata(KEYSPACE, "tbl") != null));
             assertTrue(cluster.get(2).callOnInstance(() -> 
Schema.instance.getTableMetadata(KEYSPACE, "tbl") != null));
 
+            // now we have a table tbl in the schema of both nodes
+
             cluster.get(2).shutdown().get();
 
-            // when schema is removed and there is no other node to fetch it 
from, node 1 should be left with clean schema
-            cluster.get(1).runOnInstance(() -> 
Schema.instance.resetLocalSchema());
+            Awaitility.await().atMost(Duration.ofSeconds(30)) // wait until 
node 1 notices that node 2 is dead
+                      .until(() -> cluster.get(1).callOnInstance(() -> 
Gossiper.instance.getLiveMembers().stream().allMatch(e -> 
e.equals(FBUtilities.getBroadcastAddressAndPort()))));

Review Comment:
   I think this can be split in a few lines as it becomes too long



##########
src/java/org/apache/cassandra/schema/MigrationCoordinator.java:
##########
@@ -403,16 +461,61 @@ private synchronized void 
removeEndpointFromVersion(InetAddressAndPort endpoint,
             return;
 
         info.endpoints.remove(endpoint);
+        logger.trace("Removed endpoint {} from schema {}: {}", endpoint, 
version, info);
         if (info.endpoints.isEmpty())
         {
             info.waitQueue.signalAll();
             versionInfo.remove(version);
+            logger.trace("Removed schema info: {}", info);
         }
     }
 
+    /**
+     * Resets the migration coordinator by notifying all waiting threads and 
removing all the existing version info.
+     * Then, it is populated with the information about schema versions on 
different endpoints provided by Gossiper.
+     * Each version is marked as unreceived so the migration coordinator will 
start pulling schemas from other nodes.
+     */
+    synchronized void reset()
+    {
+        logger.info("Resetting migration coordinator...");
+
+        this.endpointVersions.clear();
+
+        Iterator<Map.Entry<UUID, VersionInfo>> it = 
versionInfo.entrySet().iterator();
+        while (it.hasNext())
+        {
+            Map.Entry<UUID, VersionInfo> entry = it.next();
+            it.remove();
+            entry.getValue().waitQueue.signal();
+        }

Review Comment:
   This feels like it deserves its own method?



##########
test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java:
##########
@@ -95,25 +107,54 @@ private void selectSilent(Cluster cluster, String name)
     @Test
     public void schemaReset() throws Throwable
     {
+        CassandraRelevantProperties.MIGRATION_DELAY.setLong(10000);
+        CassandraRelevantProperties.SCHEMA_PULL_INTERVAL.setLong(10000);
         try (Cluster cluster = init(Cluster.build(2).withConfig(cfg -> 
cfg.with(Feature.GOSSIP, Feature.NETWORK)).start()))
         {
             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk INT 
PRIMARY KEY, v TEXT)");
 
             assertTrue(cluster.get(1).callOnInstance(() -> 
Schema.instance.getTableMetadata(KEYSPACE, "tbl") != null));
             assertTrue(cluster.get(2).callOnInstance(() -> 
Schema.instance.getTableMetadata(KEYSPACE, "tbl") != null));
 
+            // now we have a table tbl in the schema of both nodes
+
             cluster.get(2).shutdown().get();
 
-            // when schema is removed and there is no other node to fetch it 
from, node 1 should be left with clean schema
-            cluster.get(1).runOnInstance(() -> 
Schema.instance.resetLocalSchema());
+            Awaitility.await().atMost(Duration.ofSeconds(30)) // wait until 
node 1 notices that node 2 is dead
+                      .until(() -> cluster.get(1).callOnInstance(() -> 
Gossiper.instance.getLiveMembers().stream().allMatch(e -> 
e.equals(FBUtilities.getBroadcastAddressAndPort()))));
+
+            // when there is no node to fetch the schema from, reset local 
schema should immediately fail
+            
Assertions.assertThatExceptionOfType(RuntimeException.class).isThrownBy(() -> {
+                cluster.get(1).runOnInstance(() -> 
Schema.instance.resetLocalSchema());
+            }).withMessageContaining("Cannot reset local schema when there are 
no other live nodes");
+
+            // now, let's make a disagreement, the shutdown node 2 has a 
definition of tbl, while the running node 1 does not
+            cluster.get(1).runOnInstance(() -> {
+                Schema.instance.transform(current -> 
Schema.instance.distributedKeyspaces().without(KEYSPACE), false);
+             });
+
             assertTrue(cluster.get(1).callOnInstance(() -> 
Schema.instance.getTableMetadata(KEYSPACE, "tbl") == null));
 
+            // clear will wait until it receives schema from some other node
+            // also if we start the node2 first, schema of node2 will be 
synced to schema of node1 because node1 has the newest change (dropping the 
table)
+            // if we run clean on node1 first, it will advertise empty schema 
to node2 and apply the schema from it without merging
+            CompletableFuture<Boolean> clear1 = 
CompletableFuture.supplyAsync(() -> cluster.get(1).callOnInstance(() -> 
Schema.instance.updateHandler.clear().awaitUninterruptibly(1, 
TimeUnit.MINUTES)));
+            assertFalse(clear1.isDone());
+
             // when the other node is started, schema should be back in sync
             cluster.get(2).startup();
+            assertTrue(clear1.get());
+
+            // this proves node1 reset schema works - the most recent change 
should be discarded because it receives
+            // the schema from node2 and applies it on a clean schema
             Awaitility.waitAtMost(Duration.ofMinutes(1))
                       .pollDelay(Duration.ofSeconds(1))
                       .until(() -> cluster.get(1).callOnInstance(() -> 
Schema.instance.getTableMetadata(KEYSPACE, "tbl") != null));
 
+            // now let's break schema locally and let it be reset
+            cluster.get(1).runOnInstance(() -> 
Schema.instance.getLocalKeyspaces()
+                                                              
.get(SchemaConstants.SCHEMA_KEYSPACE_NAME)
+                                                              
.get().tables.forEach(t -> ColumnFamilyStore.getIfExists(t.keyspace, 
t.name).truncateBlockingWithoutSnapshot()));

Review Comment:
   This line also seems too long



##########
src/java/org/apache/cassandra/schema/MigrationCoordinator.java:
##########
@@ -403,16 +461,61 @@ private synchronized void 
removeEndpointFromVersion(InetAddressAndPort endpoint,
             return;
 
         info.endpoints.remove(endpoint);
+        logger.trace("Removed endpoint {} from schema {}: {}", endpoint, 
version, info);
         if (info.endpoints.isEmpty())
         {
             info.waitQueue.signalAll();
             versionInfo.remove(version);
+            logger.trace("Removed schema info: {}", info);
         }
     }
 
+    /**
+     * Resets the migration coordinator by notifying all waiting threads and 
removing all the existing version info.
+     * Then, it is populated with the information about schema versions on 
different endpoints provided by Gossiper.
+     * Each version is marked as unreceived so the migration coordinator will 
start pulling schemas from other nodes.
+     */
+    synchronized void reset()
+    {
+        logger.info("Resetting migration coordinator...");
+
+        this.endpointVersions.clear();
+
+        Iterator<Map.Entry<UUID, VersionInfo>> it = 
versionInfo.entrySet().iterator();
+        while (it.hasNext())
+        {
+            Map.Entry<UUID, VersionInfo> entry = it.next();
+            it.remove();
+            entry.getValue().waitQueue.signal();
+        }
+
+        // now report again the versions we are aware of
+        gossiper.getLiveMembers().forEach(endpoint -> {
+            if (FBUtilities.getBroadcastAddressAndPort().equals(endpoint))
+            {
+                reportEndpointVersion(endpoint, schemaVersion.get());
+            }
+            else
+            {
+                EndpointState state = 
gossiper.getEndpointStateForEndpoint(endpoint);
+                if (state != null)
+                {
+                    UUID v = state.getSchemaVersion();
+                    if (v != null)
+                    {
+                        reportEndpointVersion(endpoint, v);
+                    }
+                }
+            }
+        });

Review Comment:
   This one too
   Not that I am super opinionated but it just feels like it can improve 
readability that way



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to