Gossip thread slows down when using batch commit log patch by jasobrown; reviwed by spodkowinski fot CASSANDRA-12966
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ec85b4a9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ec85b4a9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ec85b4a9 Branch: refs/heads/trunk Commit: ec85b4a96390ef5bf85acac42f9c93a68620b668 Parents: dc32ed8 Author: Jason Brown <jasedbr...@gmail.com> Authored: Mon Nov 28 15:22:14 2016 -0800 Committer: Jason Brown <jasedbr...@gmail.com> Committed: Mon Aug 21 15:48:09 2017 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/SystemKeyspace.java | 22 +++++++++------ .../cassandra/service/StorageService.java | 28 +++++++++++--------- .../apache/cassandra/db/SystemKeyspaceTest.java | 6 ++++- .../cassandra/gms/FailureDetectorTest.java | 2 +- .../service/LeaveAndBootstrapTest.java | 10 +++++-- 6 files changed, 44 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec85b4a9/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a0a61ac..d8b22f0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.15 + * Gossip thread slows down when using batch commit log (CASSANDRA-12966) * Randomize batchlog endpoint selection with only 1 or 2 racks (CASSANDRA-12884) * Fix digest calculation for counter cells (CASSANDRA-13750) * Fix ColumnDefinition.cellValueType() for non-frozen collection and change SSTabledump to use type.toJSONString() (CASSANDRA-13573) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec85b4a9/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index cc21435..7ce74a1 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -23,11 +23,13 @@ import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.StreamSupport; import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; +import java.util.concurrent.Future; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableSet; @@ -36,6 +38,10 @@ import com.google.common.io.ByteStreams; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.util.concurrent.Futures; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.QueryProcessor; @@ -687,29 +693,29 @@ public final class SystemKeyspace /** * Record tokens being used by another node */ - public static synchronized void updateTokens(InetAddress ep, Collection<Token> tokens) + public static Future<?> updateTokens(final InetAddress ep, final Collection<Token> tokens, ExecutorService executorService) { if (ep.equals(FBUtilities.getBroadcastAddress())) - return; + return Futures.immediateFuture(null); String req = "INSERT INTO system.%s (peer, tokens) VALUES (?, ?)"; - executeInternal(String.format(req, PEERS), ep, tokensAsSet(tokens)); + return executorService.submit((Runnable) () -> executeInternal(String.format(req, PEERS), ep, tokensAsSet(tokens))); } - public static synchronized void updatePreferredIP(InetAddress ep, InetAddress preferred_ip) + public static void updatePreferredIP(InetAddress ep, InetAddress preferred_ip) { String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES (?, ?)"; executeInternal(String.format(req, PEERS), ep, preferred_ip); forceBlockingFlush(PEERS); } - public static synchronized void updatePeerInfo(InetAddress ep, String columnName, Object value) + public static Future<?> updatePeerInfo(final InetAddress ep, final String columnName, final Object value, ExecutorService executorService) { if (ep.equals(FBUtilities.getBroadcastAddress())) - return; + return Futures.immediateFuture(null); String req = "INSERT INTO system.%s (peer, %s) VALUES (?, ?)"; - executeInternal(String.format(req, PEERS, columnName), ep, value); + return executorService.submit((Runnable) () -> executeInternal(String.format(req, PEERS, columnName), ep, value)); } public static synchronized void updateHintsDropped(InetAddress ep, UUID timePeriod, int value) @@ -748,7 +754,7 @@ public final class SystemKeyspace /** * Remove stored tokens being used by another node */ - public static synchronized void removeEndpoint(InetAddress ep) + public static void removeEndpoint(InetAddress ep) { String req = "DELETE FROM system.%s WHERE peer = ?"; executeInternal(String.format(req, PEERS), ep); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec85b4a9/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 8a9113c..a1d1756 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1711,23 +1711,24 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (getTokenMetadata().isMember(endpoint)) { + final ExecutorService executor = StageManager.getStage(Stage.MUTATION); switch (state) { case RELEASE_VERSION: - SystemKeyspace.updatePeerInfo(endpoint, "release_version", value.value); + SystemKeyspace.updatePeerInfo(endpoint, "release_version", value.value, executor); break; case DC: updateTopology(endpoint); - SystemKeyspace.updatePeerInfo(endpoint, "data_center", value.value); + SystemKeyspace.updatePeerInfo(endpoint, "data_center", value.value, executor); break; case RACK: updateTopology(endpoint); - SystemKeyspace.updatePeerInfo(endpoint, "rack", value.value); + SystemKeyspace.updatePeerInfo(endpoint, "rack", value.value, executor); break; case RPC_ADDRESS: try { - SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(value.value)); + SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(value.value), executor); } catch (UnknownHostException e) { @@ -1735,11 +1736,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } break; case SCHEMA: - SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value.value)); + SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value.value), executor); MigrationManager.instance.scheduleSchemaPull(endpoint, epState); break; case HOST_ID: - SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value.value)); + SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value.value), executor); break; case RPC_READY: notifyRpcChange(endpoint, epState.isRpcReady()); @@ -1785,23 +1786,24 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private void updatePeerInfo(InetAddress endpoint) { EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + final ExecutorService executor = StageManager.getStage(Stage.MUTATION); for (Map.Entry<ApplicationState, VersionedValue> entry : epState.states()) { switch (entry.getKey()) { case RELEASE_VERSION: - SystemKeyspace.updatePeerInfo(endpoint, "release_version", entry.getValue().value); + SystemKeyspace.updatePeerInfo(endpoint, "release_version", entry.getValue().value, executor); break; case DC: - SystemKeyspace.updatePeerInfo(endpoint, "data_center", entry.getValue().value); + SystemKeyspace.updatePeerInfo(endpoint, "data_center", entry.getValue().value, executor); break; case RACK: - SystemKeyspace.updatePeerInfo(endpoint, "rack", entry.getValue().value); + SystemKeyspace.updatePeerInfo(endpoint, "rack", entry.getValue().value, executor); break; case RPC_ADDRESS: try { - SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(entry.getValue().value)); + SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(entry.getValue().value), executor); } catch (UnknownHostException e) { @@ -1809,10 +1811,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } break; case SCHEMA: - SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(entry.getValue().value)); + SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(entry.getValue().value), executor); break; case HOST_ID: - SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(entry.getValue().value)); + SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(entry.getValue().value), executor); break; } } @@ -2118,7 +2120,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE Gossiper.instance.replacementQuarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260 } if (!tokensToUpdateInSystemKeyspace.isEmpty()) - SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace); + SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace, StageManager.getStage(Stage.MUTATION)); if (isMoving || operationMode == Mode.MOVING) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec85b4a9/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java index bcbabfd..d151f59 100644 --- a/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java +++ b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java @@ -24,11 +24,14 @@ import java.net.UnknownHostException; import java.nio.file.Path; import java.nio.file.Paths; import java.util.*; +import java.util.concurrent.Future; import org.apache.commons.io.FileUtils; import org.junit.BeforeClass; import org.junit.Test; +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; @@ -84,7 +87,8 @@ public class SystemKeyspaceTest { BytesToken token = new BytesToken(ByteBufferUtil.bytes("token3")); InetAddress address = InetAddress.getByName("127.0.0.2"); - SystemKeyspace.updateTokens(address, Collections.<Token>singletonList(token)); + Future<?> future = SystemKeyspace.updateTokens(address, Collections.singletonList(token), StageManager.getStage(Stage.MUTATION)); + FBUtilities.waitOnFuture(future); assert SystemKeyspace.loadTokens().get(address).contains(token); SystemKeyspace.removeEndpoint(address); assert !SystemKeyspace.loadTokens().containsValue(token); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec85b4a9/test/unit/org/apache/cassandra/gms/FailureDetectorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/gms/FailureDetectorTest.java b/test/unit/org/apache/cassandra/gms/FailureDetectorTest.java index af099b0..83c3500 100644 --- a/test/unit/org/apache/cassandra/gms/FailureDetectorTest.java +++ b/test/unit/org/apache/cassandra/gms/FailureDetectorTest.java @@ -45,8 +45,8 @@ public class FailureDetectorTest { // slow unit tests can cause problems with FailureDetector's GC pause handling System.setProperty("cassandra.max_local_pause_in_ms", "20000"); - DatabaseDescriptor.setDaemonInitialized(); + DatabaseDescriptor.createAllDirectories(); } @Test http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec85b4a9/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java index efab615..91a7ab2 100644 --- a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java +++ b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java @@ -22,6 +22,8 @@ package org.apache.cassandra.service; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; @@ -33,6 +35,8 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.Util.PartitionerSwitcher; +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.dht.IPartitioner; @@ -46,6 +50,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.SimpleSnitch; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.utils.FBUtilities; import static org.junit.Assert.*; @@ -675,8 +680,9 @@ public class LeaveAndBootstrapTest Util.createInitialRing(ss, partitioner, endpointTokens, new ArrayList<Token>(), hosts, new ArrayList<UUID>(), 2); InetAddress toRemove = hosts.get(1); - SystemKeyspace.updatePeerInfo(toRemove, "data_center", "dc42"); - SystemKeyspace.updatePeerInfo(toRemove, "rack", "rack42"); + final ExecutorService executor = StageManager.getStage(Stage.MUTATION); + FBUtilities.waitOnFuture(SystemKeyspace.updatePeerInfo(toRemove, "data_center", "dc42", executor)); + FBUtilities.waitOnFuture(SystemKeyspace.updatePeerInfo(toRemove, "rack", "rack42", executor)); assertEquals("rack42", SystemKeyspace.loadDcRackInfo().get(toRemove).get("rack")); // mark the node as removed --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org