ekaterinadimitrova2 commented on code in PR #1804:
URL: https://github.com/apache/cassandra/pull/1804#discussion_r953106530
##########
src/java/org/apache/cassandra/config/CassandraRelevantProperties.java:
##########
@@ -235,6 +235,7 @@
MEMTABLE_OVERHEAD_SIZE("cassandra.memtable.row_overhead_size", "-1"),
MEMTABLE_OVERHEAD_COMPUTE_STEPS("cassandra.memtable_row_overhead_computation_step",
"100000"),
MIGRATION_DELAY("cassandra.migration_delay_ms", "60000"),
+ SCHEMA_PULL_INTERVAL("cassandra.schema_pull_interval_ms", "60000"),
Review Comment:
How about some Javadoc?
##########
src/java/org/apache/cassandra/schema/DefaultSchemaUpdateHandler.java:
##########
@@ -253,20 +264,28 @@ private synchronized SchemaTransformationResult reload()
}
@Override
- public SchemaTransformationResult reset(boolean local)
+ public void reset(boolean local)
{
if (local)
- return reload();
-
- Collection<Mutation> mutations =
migrationCoordinator.pullSchemaFromAnyNode().awaitThrowUncheckedOnInterrupt().getNow();
- return applyMutations(mutations);
+ {
+ reload();
Review Comment:
Do we still need to return in `reload()`? Seems like this is the only usage
##########
src/java/org/apache/cassandra/schema/DefaultSchemaUpdateHandler.java:
##########
@@ -232,12 +240,15 @@ public synchronized SchemaTransformationResult
apply(SchemaTransformation transf
private void updateSchema(SchemaTransformationResult update, boolean local)
{
- this.schema = update.after;
- logger.debug("Schema updated: {}", update);
- updateCallback.accept(update, true);
- if (!local)
+ if (!update.diff.isEmpty())
{
- migrationCoordinator.announce(update.after.getVersion());
+ this.schema = update.after;
+ logger.debug("Schema updated: {}", update);
+ updateCallback.accept(update, true);
+ if (!local)
+ {
+ migrationCoordinator.announce(update.after.getVersion());
+ }
Review Comment:
Shall we add some logging in case no update has happened?
##########
src/java/org/apache/cassandra/schema/MigrationCoordinator.java:
##########
@@ -323,7 +372,7 @@ private boolean shouldPullFromEndpoint(InetAddressAndPort
endpoint)
if (!messagingService.versions.knows(endpoint))
{
- logger.debug("Not pulling schema from {} because their messaging
version is unknown", endpoint);
+ logger.trace("Not pulling schema from {} because their messaging
version is unknown", endpoint);
Review Comment:
Why this was switched to trace?
##########
src/java/org/apache/cassandra/schema/MigrationCoordinator.java:
##########
@@ -180,6 +199,18 @@ boolean wasReceived()
{
return receivedSchema;
}
+
+ @Override
+ public String toString()
Review Comment:
Do we actually use this one anywhere?
##########
src/java/org/apache/cassandra/schema/MigrationCoordinator.java:
##########
@@ -149,12 +153,27 @@ private static Set<InetAddressAndPort>
getIgnoredEndpoints()
{
final UUID version;
+ /**
+ * The set of endpoints containing this schema version
+ */
final Set<InetAddressAndPort> endpoints =
Sets.newConcurrentHashSet();
+ /**
+ * The set of endpoints from which we are already fetching the schema
+ */
final Set<InetAddressAndPort> outstandingRequests =
Sets.newConcurrentHashSet();
+ /**
+ * The queue of endpoint from which we are going to fetch the schema
Review Comment:
nit: endpoints
##########
src/java/org/apache/cassandra/schema/MigrationCoordinator.java:
##########
@@ -403,16 +461,61 @@ private synchronized void
removeEndpointFromVersion(InetAddressAndPort endpoint,
return;
info.endpoints.remove(endpoint);
+ logger.trace("Removed endpoint {} from schema {}: {}", endpoint,
version, info);
if (info.endpoints.isEmpty())
{
info.waitQueue.signalAll();
versionInfo.remove(version);
+ logger.trace("Removed schema info: {}", info);
}
}
+ /**
+ * Resets the migration coordinator by notifying all waiting threads and
removing all the existing version info.
+ * Then, it is populated with the information about schema versions on
different endpoints provided by Gossiper.
+ * Each version is marked as unreceived so the migration coordinator will
start pulling schemas from other nodes.
+ */
+ synchronized void reset()
+ {
+ logger.info("Resetting migration coordinator...");
+
+ this.endpointVersions.clear();
+
+ Iterator<Map.Entry<UUID, VersionInfo>> it =
versionInfo.entrySet().iterator();
+ while (it.hasNext())
+ {
+ Map.Entry<UUID, VersionInfo> entry = it.next();
+ it.remove();
+ entry.getValue().waitQueue.signal();
+ }
+
+ // now report again the versions we are aware of
+ gossiper.getLiveMembers().forEach(endpoint -> {
+ if (FBUtilities.getBroadcastAddressAndPort().equals(endpoint))
+ {
+ reportEndpointVersion(endpoint, schemaVersion.get());
+ }
+ else
+ {
+ EndpointState state =
gossiper.getEndpointStateForEndpoint(endpoint);
+ if (state != null)
+ {
+ UUID v = state.getSchemaVersion();
+ if (v != null)
+ {
+ reportEndpointVersion(endpoint, v);
+ }
+ }
+ }
+ });
+ }
+
synchronized void removeAndIgnoreEndpoint(InetAddressAndPort endpoint)
{
+ logger.debug("Removing and ignoring endpoint {}", endpoint);
Preconditions.checkArgument(endpoint != null);
+ // TODO The endpoint address is now ignored but when a node with the
same address is added again later,
+ // there will be no way to include it in schema synchronization other
than restarting each other node
Review Comment:
I think I mentioned in the previous version but for completeness - I think
this TO DO deserves to be documented actually
##########
test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java:
##########
@@ -95,21 +98,37 @@ private void selectSilent(Cluster cluster, String name)
@Test
public void schemaReset() throws Throwable
{
+ CassandraRelevantProperties.MIGRATION_DELAY.setLong(10000);
+ CassandraRelevantProperties.SCHEMA_PULL_INTERVAL.setLong(10000);
try (Cluster cluster = init(Cluster.build(2).withConfig(cfg ->
cfg.with(Feature.GOSSIP, Feature.NETWORK)).start()))
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk INT
PRIMARY KEY, v TEXT)");
assertTrue(cluster.get(1).callOnInstance(() ->
Schema.instance.getTableMetadata(KEYSPACE, "tbl") != null));
assertTrue(cluster.get(2).callOnInstance(() ->
Schema.instance.getTableMetadata(KEYSPACE, "tbl") != null));
+ // now we have a table tbl in the schema of both nodes
+
cluster.get(2).shutdown().get();
- // when schema is removed and there is no other node to fetch it
from, node 1 should be left with clean schema
- cluster.get(1).runOnInstance(() ->
Schema.instance.resetLocalSchema());
+ // when there is no node to fetch the schema from, reset local
schema should immediately fail
Review Comment:
How does this work in previous versions?
##########
src/java/org/apache/cassandra/schema/DefaultSchemaUpdateHandler.java:
##########
@@ -253,20 +264,28 @@ private synchronized SchemaTransformationResult reload()
}
@Override
- public SchemaTransformationResult reset(boolean local)
+ public void reset(boolean local)
{
if (local)
- return reload();
-
- Collection<Mutation> mutations =
migrationCoordinator.pullSchemaFromAnyNode().awaitThrowUncheckedOnInterrupt().getNow();
- return applyMutations(mutations);
+ {
+ reload();
+ }
+ else
+ {
+ migrationCoordinator.reset();
+ if
(!migrationCoordinator.awaitSchemaRequests(CassandraRelevantProperties.MIGRATION_DELAY.getLong()))
+ {
+ logger.error("Timeout exceeded when waiting for schema from
other nodes");
+ }
+ }
}
@Override
public synchronized void clear()
{
+ schema = DistributedSchema.EMPTY;
Review Comment:
Why did you decide to move it up?
##########
src/java/org/apache/cassandra/schema/Schema.java:
##########
@@ -611,18 +613,42 @@ public SchemaTransformationResult
transform(SchemaTransformation transformation,
* Clear all locally stored schema information and fetch schema from
another node.
* Called by user (via JMX) who wants to get rid of schema disagreement.
*/
- public synchronized void resetLocalSchema()
+ public void resetLocalSchema()
{
logger.debug("Clearing local schema...");
- updateHandler.clear();
- logger.debug("Clearing local schema keyspace instances...");
- distributedKeyspaces.forEach(this::unload);
Review Comment:
Where did the unload go?
##########
src/java/org/apache/cassandra/schema/Schema.java:
##########
@@ -611,18 +613,42 @@ public SchemaTransformationResult
transform(SchemaTransformation transformation,
* Clear all locally stored schema information and fetch schema from
another node.
* Called by user (via JMX) who wants to get rid of schema disagreement.
*/
- public synchronized void resetLocalSchema()
Review Comment:
Why this change?
##########
src/java/org/apache/cassandra/schema/MigrationCoordinator.java:
##########
@@ -369,22 +418,31 @@ private synchronized boolean
shouldApplySchemaFor(VersionInfo info)
synchronized Future<Void> reportEndpointVersion(InetAddressAndPort
endpoint, UUID version)
{
+ logger.debug("Reported schema {} at endpoint {}", version, endpoint);
if (ignoredEndpoints.contains(endpoint) ||
IGNORED_VERSIONS.contains(version))
{
endpointVersions.remove(endpoint);
removeEndpointFromVersion(endpoint, null);
+ logger.debug("Discarding endpoint {} or schema {} because either
endpoint or schema version were marked as ignored", endpoint, version);
return FINISHED_FUTURE;
}
UUID current = endpointVersions.put(endpoint, version);
if (current != null && current.equals(version))
+ {
+ logger.trace("Skipping report of schema {} from {} because we
already know that", version, endpoint);
return FINISHED_FUTURE;
+ }
VersionInfo info = versionInfo.computeIfAbsent(version,
VersionInfo::new);
if (Objects.equals(schemaVersion.get(), version))
+ {
info.markReceived();
+ logger.trace("Schema {} from {} has been marked as recevied
because it is equal the local schema", version, endpoint);
+ }
info.endpoints.add(endpoint);
- info.requestQueue.addFirst(endpoint);
+ info.requestQueue.addFirst(endpoint); // TODO not sure if it is
correct - given we've just marked this schema version as received, why do we
add a request to receive it?
Review Comment:
Have to think a bit about this TO DO
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]