Improve schema propagation performance patch by jbellis; reviewed by Chris Herron and xedin for CASSANDRA-5025
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8d55474f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8d55474f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8d55474f Branch: refs/heads/trunk Commit: 8d55474febf31980f2eb158ae0ff9717b0761f1e Parents: 5abeecc Author: Jonathan Ellis <jbel...@apache.org> Authored: Thu Dec 6 16:12:31 2012 -0600 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Thu Dec 6 16:12:38 2012 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/service/MigrationManager.java | 39 ++++++++++----- 2 files changed, 28 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8d55474f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a0e3eb7..5d1fd8d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.1.8 + * Improve schema propagation performance (CASSANDRA-5025) * Fall back to old describe_splits if d_s_ex is not available (CASSANDRA-4803) * Improve error reporting when streaming ranges fail (CASSANDRA-5009) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8d55474f/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index 94c0dcc..102ea12 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -25,12 +25,12 @@ import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.ArrayList; import java.util.Collection; import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +72,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber if (state != ApplicationState.SCHEMA || endpoint.equals(FBUtilities.getBroadcastAddress())) return; - rectifySchema(UUID.fromString(value.value), endpoint); + maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint); } public void onAlive(InetAddress endpoint, EndpointState state) @@ -80,7 +80,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA); if (value != null) - rectifySchema(UUID.fromString(value.value), endpoint); + maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint); } public void onDead(InetAddress endpoint, EndpointState state) @@ -92,7 +92,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber public void onRemove(InetAddress endpoint) {} - private static void rectifySchema(UUID theirVersion, final InetAddress endpoint) + private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint) { // Can't request migrations from nodes with versions younger than 1.1.7 if (Gossiper.instance.getVersion(endpoint) < MessagingService.VERSION_117) @@ -101,14 +101,29 @@ public class MigrationManager implements IEndpointStateChangeSubscriber if (Schema.instance.getVersion().equals(theirVersion)) return; - /** - * if versions differ this node sends request with local migration list to the endpoint - * and expecting to receive a list of migrations to apply locally. - * - * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are - * running in the gossip stage. - */ - StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint)); + // check our schema vs theirs, after a delay to make sure we have a chance to apply any changes + // being pushed out simultaneously. See CASSANDRA-5025 + Runnable runnable = new Runnable() + { + public void run() + { + // grab the latest version of the schema since it may have changed again since the initial scheduling + VersionedValue value = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA); + UUID currentVersion = UUID.fromString(value.value); + if (Schema.instance.getVersion().equals(currentVersion)) + return; + + /** + * if versions differ this node sends request with local migration list to the endpoint + * and expecting to receive a list of migrations to apply locally. + * + * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are + * running in the gossip stage. + */ + StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint)); + } + }; + StorageService.optionalTasks.schedule(runnable, 1, TimeUnit.MINUTES); } public static boolean isReadyForBootstrap()