Allow overriding available processors with -Dcassandra.available_processors Patch by brandonwilliams, reviewed by iamaleksey for CASSANDRA-4790
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5abeecc8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5abeecc8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5abeecc8 Branch: refs/heads/trunk Commit: 5abeecc8b51b896742403d48019300b68caceecb Parents: 2e27123 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Wed Dec 5 13:18:00 2012 -0600 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Wed Dec 5 13:18:00 2012 -0600 ---------------------------------------------------------------------- .../apache/cassandra/concurrent/StageManager.java | 9 +++++---- src/java/org/apache/cassandra/config/Config.java | 3 ++- .../cassandra/config/DatabaseDescriptor.java | 6 +++--- .../PeriodicCommitLogExecutorService.java | 3 ++- .../db/compaction/ParallelCompactionIterable.java | 2 +- .../cassandra/hadoop/ColumnFamilyRecordWriter.java | 3 ++- .../apache/cassandra/io/sstable/SSTableReader.java | 2 +- .../org/apache/cassandra/service/StorageProxy.java | 2 +- .../apache/cassandra/thrift/CassandraDaemon.java | 3 ++- .../org/apache/cassandra/utils/FBUtilities.java | 8 ++++++++ 10 files changed, 27 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5abeecc8/src/java/org/apache/cassandra/concurrent/StageManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java index 4bcb75d..662be29 100644 --- a/src/java/org/apache/cassandra/concurrent/StageManager.java +++ b/src/java/org/apache/cassandra/concurrent/StageManager.java @@ -24,6 +24,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import static org.apache.cassandra.config.DatabaseDescriptor.*; +import org.apache.cassandra.utils.FBUtilities; /** @@ -37,14 +38,14 @@ public class StageManager public static final long KEEPALIVE = 60; // seconds to keep "extra" threads alive for when idle - public static final int MAX_REPLICATE_ON_WRITE_TASKS = 1024 * Runtime.getRuntime().availableProcessors(); + public static final int MAX_REPLICATE_ON_WRITE_TASKS = 1024 * FBUtilities.getAvailableProcessors(); static { stages.put(Stage.MUTATION, multiThreadedConfigurableStage(Stage.MUTATION, getConcurrentWriters())); stages.put(Stage.READ, multiThreadedConfigurableStage(Stage.READ, getConcurrentReaders())); - stages.put(Stage.REQUEST_RESPONSE, multiThreadedStage(Stage.REQUEST_RESPONSE, Runtime.getRuntime().availableProcessors())); - stages.put(Stage.INTERNAL_RESPONSE, multiThreadedStage(Stage.INTERNAL_RESPONSE, Runtime.getRuntime().availableProcessors())); + stages.put(Stage.REQUEST_RESPONSE, multiThreadedStage(Stage.REQUEST_RESPONSE, FBUtilities.getAvailableProcessors())); + stages.put(Stage.INTERNAL_RESPONSE, multiThreadedStage(Stage.INTERNAL_RESPONSE, FBUtilities.getAvailableProcessors())); stages.put(Stage.REPLICATE_ON_WRITE, multiThreadedConfigurableStage(Stage.REPLICATE_ON_WRITE, getConcurrentReplicators(), MAX_REPLICATE_ON_WRITE_TASKS)); // the rest are all single-threaded stages.put(Stage.STREAM, new JMXEnabledThreadPoolExecutor(Stage.STREAM)); @@ -52,7 +53,7 @@ public class StageManager stages.put(Stage.ANTI_ENTROPY, new JMXEnabledThreadPoolExecutor(Stage.ANTI_ENTROPY)); stages.put(Stage.MIGRATION, new JMXEnabledThreadPoolExecutor(Stage.MIGRATION)); stages.put(Stage.MISC, new JMXEnabledThreadPoolExecutor(Stage.MISC)); - stages.put(Stage.READ_REPAIR, multiThreadedStage(Stage.READ_REPAIR, Runtime.getRuntime().availableProcessors())); + stages.put(Stage.READ_REPAIR, multiThreadedStage(Stage.READ_REPAIR, FBUtilities.getAvailableProcessors())); } private static ThreadPoolExecutor multiThreadedStage(Stage stage, int numThreads) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5abeecc8/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index f4e7955..3170ef7 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -21,6 +21,7 @@ package org.apache.cassandra.config; */ import org.apache.cassandra.cache.SerializingCacheProvider; +import org.apache.cassandra.utils.FBUtilities; public class Config @@ -83,7 +84,7 @@ public class Config /* if the size of columns or super-columns are more than this, indexing will kick in */ public Integer column_index_size_in_kb = 64; public Integer in_memory_compaction_limit_in_mb = 64; - public Integer concurrent_compactors = Runtime.getRuntime().availableProcessors(); + public Integer concurrent_compactors = FBUtilities.getAvailableProcessors(); public Integer compaction_throughput_mb_per_sec = 16; public Boolean multithreaded_compaction = false; http://git-wip-us.apache.org/repos/asf/cassandra/blob/5abeecc8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 0e9d705..0742f34 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -367,7 +367,7 @@ public class DatabaseDescriptor } if (conf.concurrent_compactors == null) - conf.concurrent_compactors = Runtime.getRuntime().availableProcessors(); + conf.concurrent_compactors = FBUtilities.getAvailableProcessors(); if (conf.concurrent_compactors <= 0) throw new ConfigurationException("concurrent_compactors should be strictly greater than 0"); @@ -382,11 +382,11 @@ public class DatabaseDescriptor throw new ConfigurationException("Unknown rpc_server_type: " + conf.rpc_server_type); if (conf.rpc_min_threads == null) conf.rpc_min_threads = conf.rpc_server_type.toLowerCase().equals("hsha") - ? Runtime.getRuntime().availableProcessors() * 4 + ? FBUtilities.getAvailableProcessors() * 4 : 16; if (conf.rpc_max_threads == null) conf.rpc_max_threads = conf.rpc_server_type.toLowerCase().equals("hsha") - ? Runtime.getRuntime().availableProcessors() * 4 + ? FBUtilities.getAvailableProcessors() * 4 : Integer.MAX_VALUE; /* data file and commit log directories. they get created later, when they're needed. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/5abeecc8/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java index fb76f7b..907e1dd 100644 --- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java +++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.concurrent.*; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.WrappedRunnable; class PeriodicCommitLogExecutorService implements ICommitLogExecutorService @@ -36,7 +37,7 @@ class PeriodicCommitLogExecutorService implements ICommitLogExecutorService public PeriodicCommitLogExecutorService(final CommitLog commitLog) { - queue = new LinkedBlockingQueue<Runnable>(1024 * Runtime.getRuntime().availableProcessors()); + queue = new LinkedBlockingQueue<Runnable>(1024 * FBUtilities.getAvailableProcessors()); Runnable runnable = new WrappedRunnable() { public void runMayThrow() throws Exception http://git-wip-us.apache.org/repos/asf/cassandra/blob/5abeecc8/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java index 79b3396..fc52393 100644 --- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java +++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java @@ -138,7 +138,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable private final List<RowContainer> rows = new ArrayList<RowContainer>(); private int row = 0; - private final ThreadPoolExecutor executor = new DebuggableThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), + private final ThreadPoolExecutor executor = new DebuggableThreadPoolExecutor(FBUtilities.getAvailableProcessors(), Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), http://git-wip-us.apache.org/repos/asf/cassandra/blob/5abeecc8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java index 5e36a80..16ad402 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java @@ -33,6 +33,7 @@ import org.apache.cassandra.client.RingCache; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.thrift.*; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.RecordWriter; @@ -104,7 +105,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> { this.conf = conf; this.ringCache = new RingCache(conf); - this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * Runtime.getRuntime().availableProcessors()); + this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors()); this.clients = new HashMap<Range,RangeClient>(); batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32); consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getWriteConsistencyLevel(conf)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5abeecc8/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index a4cca37..a631241 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -229,7 +229,7 @@ public class SSTableReader extends SSTable { final Collection<SSTableReader> sstables = new LinkedBlockingQueue<SSTableReader>(); - ExecutorService executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", Runtime.getRuntime().availableProcessors()); + ExecutorService executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", FBUtilities.getAvailableProcessors()); for (final Map.Entry<Descriptor, Set<Component>> entry : entries) { Runnable runnable = new Runnable() http://git-wip-us.apache.org/repos/asf/cassandra/blob/5abeecc8/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 453e2b2..dbb36af 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -82,7 +82,7 @@ public class StorageProxy implements StorageProxyMBean private static volatile boolean hintedHandoffEnabled = DatabaseDescriptor.hintedHandoffEnabled(); private static volatile int maxHintWindow = DatabaseDescriptor.getMaxHintWindow(); - private static volatile int maxHintsInProgress = 1024 * Runtime.getRuntime().availableProcessors(); + private static volatile int maxHintsInProgress = 1024 * FBUtilities.getAvailableProcessors(); private static final AtomicInteger totalHintsInProgress = new AtomicInteger(); private static final Map<InetAddress, AtomicInteger> hintsInProgress = new MapMaker().concurrencyLevel(1).makeComputingMap(new Function<InetAddress, AtomicInteger>() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/5abeecc8/src/java/org/apache/cassandra/thrift/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraDaemon.java b/src/java/org/apache/cassandra/thrift/CassandraDaemon.java index 7153c08..572e3e0 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/thrift/CassandraDaemon.java @@ -27,6 +27,7 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import org.apache.cassandra.service.AbstractCassandraDaemon; +import org.apache.cassandra.utils.FBUtilities; import org.apache.thrift.server.TNonblockingServer; import org.apache.thrift.server.TThreadPoolServer; import org.slf4j.Logger; @@ -202,7 +203,7 @@ public class CassandraDaemon extends org.apache.cassandra.service.AbstractCassan .processor(processor); logger.info(String.format("Using custom half-sync/half-async thrift server on %s : %s", listenAddr, listenPort)); // Check for available processors in the system which will be equal to the IO Threads. - serverEngine = new CustomTHsHaServer(serverArgs, executorService, Runtime.getRuntime().availableProcessors()); + serverEngine = new CustomTHsHaServer(serverArgs, executorService, FBUtilities.getAvailableProcessors()); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5abeecc8/src/java/org/apache/cassandra/utils/FBUtilities.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index 106bedf..03ec757 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -70,6 +70,14 @@ public class FBUtilities private static volatile InetAddress localInetAddress_; private static volatile InetAddress broadcastInetAddress_; + public static int getAvailableProcessors() + { + if (System.getProperty("cassandra.available_processors") != null) + return Integer.parseInt(System.getProperty("cassandra.available_processors")); + else + return Runtime.getRuntime().availableProcessors(); + } + private static final ThreadLocal<MessageDigest> localMD5Digest = new ThreadLocal<MessageDigest>() { @Override