ekaterinadimitrova2 commented on code in PR #1804:
URL: https://github.com/apache/cassandra/pull/1804#discussion_r953106530


##########
src/java/org/apache/cassandra/config/CassandraRelevantProperties.java:
##########
@@ -235,6 +235,7 @@
     MEMTABLE_OVERHEAD_SIZE("cassandra.memtable.row_overhead_size", "-1"),
     
MEMTABLE_OVERHEAD_COMPUTE_STEPS("cassandra.memtable_row_overhead_computation_step",
 "100000"),
     MIGRATION_DELAY("cassandra.migration_delay_ms", "60000"),
+    SCHEMA_PULL_INTERVAL("cassandra.schema_pull_interval_ms", "60000"),

Review Comment:
   How about some Javadoc?  



##########
src/java/org/apache/cassandra/schema/DefaultSchemaUpdateHandler.java:
##########
@@ -253,20 +264,28 @@ private synchronized SchemaTransformationResult reload()
     }
 
     @Override
-    public SchemaTransformationResult reset(boolean local)
+    public void reset(boolean local)
     {
         if (local)
-            return reload();
-
-        Collection<Mutation> mutations = 
migrationCoordinator.pullSchemaFromAnyNode().awaitThrowUncheckedOnInterrupt().getNow();
-        return applyMutations(mutations);
+        {
+            reload();

Review Comment:
   Do we still need to return in `reload()`? Seems like this is the only usage



##########
src/java/org/apache/cassandra/schema/DefaultSchemaUpdateHandler.java:
##########
@@ -232,12 +240,15 @@ public synchronized SchemaTransformationResult 
apply(SchemaTransformation transf
 
     private void updateSchema(SchemaTransformationResult update, boolean local)
     {
-        this.schema = update.after;
-        logger.debug("Schema updated: {}", update);
-        updateCallback.accept(update, true);
-        if (!local)
+        if (!update.diff.isEmpty())
         {
-            migrationCoordinator.announce(update.after.getVersion());
+            this.schema = update.after;
+            logger.debug("Schema updated: {}", update);
+            updateCallback.accept(update, true);
+            if (!local)
+            {
+                migrationCoordinator.announce(update.after.getVersion());
+            }

Review Comment:
   Shall we add some logging in case no update has happened?  



##########
src/java/org/apache/cassandra/schema/MigrationCoordinator.java:
##########
@@ -323,7 +372,7 @@ private boolean shouldPullFromEndpoint(InetAddressAndPort 
endpoint)
 
         if (!messagingService.versions.knows(endpoint))
         {
-            logger.debug("Not pulling schema from {} because their messaging 
version is unknown", endpoint);
+            logger.trace("Not pulling schema from {} because their messaging 
version is unknown", endpoint);

Review Comment:
   Why this was switched to trace? 



##########
src/java/org/apache/cassandra/schema/MigrationCoordinator.java:
##########
@@ -180,6 +199,18 @@ boolean wasReceived()
         {
             return receivedSchema;
         }
+
+        @Override
+        public String toString()

Review Comment:
   Do we actually use this one anywhere? 



##########
src/java/org/apache/cassandra/schema/MigrationCoordinator.java:
##########
@@ -149,12 +153,27 @@ private static Set<InetAddressAndPort> 
getIgnoredEndpoints()
     {
         final UUID version;
 
+        /**
+         * The set of endpoints containing this schema version
+         */
         final Set<InetAddressAndPort> endpoints           = 
Sets.newConcurrentHashSet();
+        /**
+         * The set of endpoints from which we are already fetching the schema
+         */
         final Set<InetAddressAndPort> outstandingRequests = 
Sets.newConcurrentHashSet();
+        /**
+         * The queue of endpoint from which we are going to fetch the schema

Review Comment:
   nit: endpoints



##########
src/java/org/apache/cassandra/schema/MigrationCoordinator.java:
##########
@@ -403,16 +461,61 @@ private synchronized void 
removeEndpointFromVersion(InetAddressAndPort endpoint,
             return;
 
         info.endpoints.remove(endpoint);
+        logger.trace("Removed endpoint {} from schema {}: {}", endpoint, 
version, info);
         if (info.endpoints.isEmpty())
         {
             info.waitQueue.signalAll();
             versionInfo.remove(version);
+            logger.trace("Removed schema info: {}", info);
         }
     }
 
+    /**
+     * Resets the migration coordinator by notifying all waiting threads and 
removing all the existing version info.
+     * Then, it is populated with the information about schema versions on 
different endpoints provided by Gossiper.
+     * Each version is marked as unreceived so the migration coordinator will 
start pulling schemas from other nodes.
+     */
+    synchronized void reset()
+    {
+        logger.info("Resetting migration coordinator...");
+
+        this.endpointVersions.clear();
+
+        Iterator<Map.Entry<UUID, VersionInfo>> it = 
versionInfo.entrySet().iterator();
+        while (it.hasNext())
+        {
+            Map.Entry<UUID, VersionInfo> entry = it.next();
+            it.remove();
+            entry.getValue().waitQueue.signal();
+        }
+
+        // now report again the versions we are aware of
+        gossiper.getLiveMembers().forEach(endpoint -> {
+            if (FBUtilities.getBroadcastAddressAndPort().equals(endpoint))
+            {
+                reportEndpointVersion(endpoint, schemaVersion.get());
+            }
+            else
+            {
+                EndpointState state = 
gossiper.getEndpointStateForEndpoint(endpoint);
+                if (state != null)
+                {
+                    UUID v = state.getSchemaVersion();
+                    if (v != null)
+                    {
+                        reportEndpointVersion(endpoint, v);
+                    }
+                }
+            }
+        });
+    }
+
     synchronized void removeAndIgnoreEndpoint(InetAddressAndPort endpoint)
     {
+        logger.debug("Removing and ignoring endpoint {}", endpoint);
         Preconditions.checkArgument(endpoint != null);
+        // TODO The endpoint address is now ignored but when a node with the 
same address is added again later,
+        //  there will be no way to include it in schema synchronization other 
than restarting each other node

Review Comment:
   I think I mentioned in the previous version but for completeness - I think 
this TO DO deserves to be documented actually



##########
test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java:
##########
@@ -95,21 +98,37 @@ private void selectSilent(Cluster cluster, String name)
     @Test
     public void schemaReset() throws Throwable
     {
+        CassandraRelevantProperties.MIGRATION_DELAY.setLong(10000);
+        CassandraRelevantProperties.SCHEMA_PULL_INTERVAL.setLong(10000);
         try (Cluster cluster = init(Cluster.build(2).withConfig(cfg -> 
cfg.with(Feature.GOSSIP, Feature.NETWORK)).start()))
         {
             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk INT 
PRIMARY KEY, v TEXT)");
 
             assertTrue(cluster.get(1).callOnInstance(() -> 
Schema.instance.getTableMetadata(KEYSPACE, "tbl") != null));
             assertTrue(cluster.get(2).callOnInstance(() -> 
Schema.instance.getTableMetadata(KEYSPACE, "tbl") != null));
 
+            // now we have a table tbl in the schema of both nodes
+
             cluster.get(2).shutdown().get();
 
-            // when schema is removed and there is no other node to fetch it 
from, node 1 should be left with clean schema
-            cluster.get(1).runOnInstance(() -> 
Schema.instance.resetLocalSchema());
+            // when there is no node to fetch the schema from, reset local 
schema should immediately fail

Review Comment:
   How does this work in previous versions?



##########
src/java/org/apache/cassandra/schema/DefaultSchemaUpdateHandler.java:
##########
@@ -253,20 +264,28 @@ private synchronized SchemaTransformationResult reload()
     }
 
     @Override
-    public SchemaTransformationResult reset(boolean local)
+    public void reset(boolean local)
     {
         if (local)
-            return reload();
-
-        Collection<Mutation> mutations = 
migrationCoordinator.pullSchemaFromAnyNode().awaitThrowUncheckedOnInterrupt().getNow();
-        return applyMutations(mutations);
+        {
+            reload();
+        }
+        else
+        {
+            migrationCoordinator.reset();
+            if 
(!migrationCoordinator.awaitSchemaRequests(CassandraRelevantProperties.MIGRATION_DELAY.getLong()))
+            {
+                logger.error("Timeout exceeded when waiting for schema from 
other nodes");
+            }
+        }
     }
 
     @Override
     public synchronized void clear()
     {
+        schema = DistributedSchema.EMPTY;

Review Comment:
   Why did you decide to move it up? 



##########
src/java/org/apache/cassandra/schema/Schema.java:
##########
@@ -611,18 +613,42 @@ public SchemaTransformationResult 
transform(SchemaTransformation transformation,
      * Clear all locally stored schema information and fetch schema from 
another node.
      * Called by user (via JMX) who wants to get rid of schema disagreement.
      */
-    public synchronized void resetLocalSchema()
+    public void resetLocalSchema()
     {
         logger.debug("Clearing local schema...");
-        updateHandler.clear();
 
-        logger.debug("Clearing local schema keyspace instances...");
-        distributedKeyspaces.forEach(this::unload);

Review Comment:
   Where did the unload go? 



##########
src/java/org/apache/cassandra/schema/Schema.java:
##########
@@ -611,18 +613,42 @@ public SchemaTransformationResult 
transform(SchemaTransformation transformation,
      * Clear all locally stored schema information and fetch schema from 
another node.
      * Called by user (via JMX) who wants to get rid of schema disagreement.
      */
-    public synchronized void resetLocalSchema()

Review Comment:
   Why this change?



##########
src/java/org/apache/cassandra/schema/MigrationCoordinator.java:
##########
@@ -369,22 +418,31 @@ private synchronized boolean 
shouldApplySchemaFor(VersionInfo info)
 
     synchronized Future<Void> reportEndpointVersion(InetAddressAndPort 
endpoint, UUID version)
     {
+        logger.debug("Reported schema {} at endpoint {}", version, endpoint);
         if (ignoredEndpoints.contains(endpoint) || 
IGNORED_VERSIONS.contains(version))
         {
             endpointVersions.remove(endpoint);
             removeEndpointFromVersion(endpoint, null);
+            logger.debug("Discarding endpoint {} or schema {} because either 
endpoint or schema version were marked as ignored", endpoint, version);
             return FINISHED_FUTURE;
         }
 
         UUID current = endpointVersions.put(endpoint, version);
         if (current != null && current.equals(version))
+        {
+            logger.trace("Skipping report of schema {} from {} because we 
already know that", version, endpoint);
             return FINISHED_FUTURE;
+        }
 
         VersionInfo info = versionInfo.computeIfAbsent(version, 
VersionInfo::new);
         if (Objects.equals(schemaVersion.get(), version))
+        {
             info.markReceived();
+            logger.trace("Schema {} from {} has been marked as recevied 
because it is equal the local schema", version, endpoint);
+        }
         info.endpoints.add(endpoint);
-        info.requestQueue.addFirst(endpoint);
+        info.requestQueue.addFirst(endpoint); // TODO not sure if it is 
correct - given we've just marked this schema version as received, why do we 
add a request to receive it?

Review Comment:
   Have to think a bit about this TO DO



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to