Avoids having replicate on write tasks stacking up at CL.ONE patch by slebresne; reviewed by jbellis for CASSANDRA-2889
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2ca2fb3f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2ca2fb3f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2ca2fb3f Branch: refs/heads/trunk Commit: 2ca2fb3fdc1636e2d3d7feb446f66f6ed8043cf4 Parents: 2af8591 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Fri Apr 27 19:39:31 2012 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Fri Apr 27 19:39:31 2012 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/concurrent/StageManager.java | 14 +++++++++++++- 2 files changed, 14 insertions(+), 1 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ca2fb3f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 91a8fbe..918c146 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -19,6 +19,7 @@ * Move CfDef and KsDef validation out of thrift (CASSANDRA-4037) * Expose repairing by a user provided range (CASSANDRA-3912) * Add way to force the cassandra-cli to refresh it's schema (CASSANDRA-4052) + * Avoids having replicate on write tasks stacking up at CL.ONE (CASSANDRA-2889) Merged from 1.0: * Fix super columns bug where cache is not updated (CASSANDRA-4190) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ca2fb3f/src/java/org/apache/cassandra/concurrent/StageManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java index c57b593..4bcb75d 100644 --- a/src/java/org/apache/cassandra/concurrent/StageManager.java +++ b/src/java/org/apache/cassandra/concurrent/StageManager.java @@ -37,13 +37,15 @@ public class StageManager public static final long KEEPALIVE = 60; // seconds to keep "extra" threads alive for when idle + public static final int MAX_REPLICATE_ON_WRITE_TASKS = 1024 * Runtime.getRuntime().availableProcessors(); + static { stages.put(Stage.MUTATION, multiThreadedConfigurableStage(Stage.MUTATION, getConcurrentWriters())); stages.put(Stage.READ, multiThreadedConfigurableStage(Stage.READ, getConcurrentReaders())); stages.put(Stage.REQUEST_RESPONSE, multiThreadedStage(Stage.REQUEST_RESPONSE, Runtime.getRuntime().availableProcessors())); stages.put(Stage.INTERNAL_RESPONSE, multiThreadedStage(Stage.INTERNAL_RESPONSE, Runtime.getRuntime().availableProcessors())); - stages.put(Stage.REPLICATE_ON_WRITE, multiThreadedConfigurableStage(Stage.REPLICATE_ON_WRITE, getConcurrentReplicators())); + stages.put(Stage.REPLICATE_ON_WRITE, multiThreadedConfigurableStage(Stage.REPLICATE_ON_WRITE, getConcurrentReplicators(), MAX_REPLICATE_ON_WRITE_TASKS)); // the rest are all single-threaded stages.put(Stage.STREAM, new JMXEnabledThreadPoolExecutor(Stage.STREAM)); stages.put(Stage.GOSSIP, new JMXEnabledThreadPoolExecutor(Stage.GOSSIP)); @@ -73,6 +75,16 @@ public class StageManager stage.getJmxType()); } + private static ThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads, int maxTasksBeforeBlock) + { + return new JMXConfigurableThreadPoolExecutor(numThreads, + KEEPALIVE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(maxTasksBeforeBlock), + new NamedThreadFactory(stage.getJmxName()), + stage.getJmxType()); + } + /** * Retrieve a stage from the StageManager * @param stage name of the stage to be retrieved.