ekaterinadimitrova2 commented on code in PR #1804:
URL: https://github.com/apache/cassandra/pull/1804#discussion_r960732388


##########
src/java/org/apache/cassandra/schema/MigrationCoordinator.java:
##########
@@ -403,16 +461,61 @@ private synchronized void 
removeEndpointFromVersion(InetAddressAndPort endpoint,
             return;
 
         info.endpoints.remove(endpoint);
+        logger.trace("Removed endpoint {} from schema {}: {}", endpoint, 
version, info);
         if (info.endpoints.isEmpty())
         {
             info.waitQueue.signalAll();
             versionInfo.remove(version);
+            logger.trace("Removed schema info: {}", info);
         }
     }
 
+    /**
+     * Resets the migration coordinator by notifying all waiting threads and 
removing all the existing version info.
+     * Then, it is populated with the information about schema versions on 
different endpoints provided by Gossiper.
+     * Each version is marked as unreceived so the migration coordinator will 
start pulling schemas from other nodes.
+     */
+    synchronized void reset()
+    {
+        logger.info("Resetting migration coordinator...");
+
+        this.endpointVersions.clear();
+
+        Iterator<Map.Entry<UUID, VersionInfo>> it = 
versionInfo.entrySet().iterator();
+        while (it.hasNext())
+        {
+            Map.Entry<UUID, VersionInfo> entry = it.next();
+            it.remove();
+            entry.getValue().waitQueue.signal();
+        }
+
+        // now report again the versions we are aware of
+        gossiper.getLiveMembers().forEach(endpoint -> {
+            if (FBUtilities.getBroadcastAddressAndPort().equals(endpoint))
+            {
+                reportEndpointVersion(endpoint, schemaVersion.get());
+            }
+            else
+            {
+                EndpointState state = 
gossiper.getEndpointStateForEndpoint(endpoint);
+                if (state != null)
+                {
+                    UUID v = state.getSchemaVersion();
+                    if (v != null)
+                    {
+                        reportEndpointVersion(endpoint, v);
+                    }
+                }
+            }
+        });
+    }
+
     synchronized void removeAndIgnoreEndpoint(InetAddressAndPort endpoint)
     {
+        logger.debug("Removing and ignoring endpoint {}", endpoint);
         Preconditions.checkArgument(endpoint != null);
+        // TODO The endpoint address is now ignored but when a node with the 
same address is added again later,
+        //  there will be no way to include it in schema synchronization other 
than restarting each other node

Review Comment:
   Then in a follow up ticket in order to be fixed and add the number in this 
comment? WDYT?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to