Allow overriding available processors with
-Dcassandra.available_processors
Patch by brandonwilliams, reviewed by iamaleksey for CASSANDRA-4790


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5abeecc8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5abeecc8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5abeecc8

Branch: refs/heads/trunk
Commit: 5abeecc8b51b896742403d48019300b68caceecb
Parents: 2e27123
Author: Brandon Williams <brandonwilli...@apache.org>
Authored: Wed Dec 5 13:18:00 2012 -0600
Committer: Brandon Williams <brandonwilli...@apache.org>
Committed: Wed Dec 5 13:18:00 2012 -0600

----------------------------------------------------------------------
 .../apache/cassandra/concurrent/StageManager.java  |    9 +++++----
 src/java/org/apache/cassandra/config/Config.java   |    3 ++-
 .../cassandra/config/DatabaseDescriptor.java       |    6 +++---
 .../PeriodicCommitLogExecutorService.java          |    3 ++-
 .../db/compaction/ParallelCompactionIterable.java  |    2 +-
 .../cassandra/hadoop/ColumnFamilyRecordWriter.java |    3 ++-
 .../apache/cassandra/io/sstable/SSTableReader.java |    2 +-
 .../org/apache/cassandra/service/StorageProxy.java |    2 +-
 .../apache/cassandra/thrift/CassandraDaemon.java   |    3 ++-
 .../org/apache/cassandra/utils/FBUtilities.java    |    8 ++++++++
 10 files changed, 27 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5abeecc8/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java 
b/src/java/org/apache/cassandra/concurrent/StageManager.java
index 4bcb75d..662be29 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.cassandra.config.DatabaseDescriptor.*;
+import org.apache.cassandra.utils.FBUtilities;
 
 
 /**
@@ -37,14 +38,14 @@ public class StageManager
 
     public static final long KEEPALIVE = 60; // seconds to keep "extra" 
threads alive for when idle
 
-    public static final int MAX_REPLICATE_ON_WRITE_TASKS = 1024 * 
Runtime.getRuntime().availableProcessors();
+    public static final int MAX_REPLICATE_ON_WRITE_TASKS = 1024 * 
FBUtilities.getAvailableProcessors();
 
     static
     {
         stages.put(Stage.MUTATION, 
multiThreadedConfigurableStage(Stage.MUTATION, getConcurrentWriters()));
         stages.put(Stage.READ, multiThreadedConfigurableStage(Stage.READ, 
getConcurrentReaders()));
-        stages.put(Stage.REQUEST_RESPONSE, 
multiThreadedStage(Stage.REQUEST_RESPONSE, 
Runtime.getRuntime().availableProcessors()));
-        stages.put(Stage.INTERNAL_RESPONSE, 
multiThreadedStage(Stage.INTERNAL_RESPONSE, 
Runtime.getRuntime().availableProcessors()));
+        stages.put(Stage.REQUEST_RESPONSE, 
multiThreadedStage(Stage.REQUEST_RESPONSE, 
FBUtilities.getAvailableProcessors()));
+        stages.put(Stage.INTERNAL_RESPONSE, 
multiThreadedStage(Stage.INTERNAL_RESPONSE, 
FBUtilities.getAvailableProcessors()));
         stages.put(Stage.REPLICATE_ON_WRITE, 
multiThreadedConfigurableStage(Stage.REPLICATE_ON_WRITE, 
getConcurrentReplicators(), MAX_REPLICATE_ON_WRITE_TASKS));
         // the rest are all single-threaded
         stages.put(Stage.STREAM, new 
JMXEnabledThreadPoolExecutor(Stage.STREAM));
@@ -52,7 +53,7 @@ public class StageManager
         stages.put(Stage.ANTI_ENTROPY, new 
JMXEnabledThreadPoolExecutor(Stage.ANTI_ENTROPY));
         stages.put(Stage.MIGRATION, new 
JMXEnabledThreadPoolExecutor(Stage.MIGRATION));
         stages.put(Stage.MISC, new JMXEnabledThreadPoolExecutor(Stage.MISC));
-        stages.put(Stage.READ_REPAIR, multiThreadedStage(Stage.READ_REPAIR, 
Runtime.getRuntime().availableProcessors()));
+        stages.put(Stage.READ_REPAIR, multiThreadedStage(Stage.READ_REPAIR, 
FBUtilities.getAvailableProcessors()));
     }
 
     private static ThreadPoolExecutor multiThreadedStage(Stage stage, int 
numThreads)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5abeecc8/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index f4e7955..3170ef7 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.config;
  */
 
 import org.apache.cassandra.cache.SerializingCacheProvider;
+import org.apache.cassandra.utils.FBUtilities;
 
 
 public class Config
@@ -83,7 +84,7 @@ public class Config
     /* if the size of columns or super-columns are more than this, indexing 
will kick in */
     public Integer column_index_size_in_kb = 64;
     public Integer in_memory_compaction_limit_in_mb = 64;
-    public Integer concurrent_compactors = 
Runtime.getRuntime().availableProcessors();
+    public Integer concurrent_compactors = 
FBUtilities.getAvailableProcessors();
     public Integer compaction_throughput_mb_per_sec = 16;
     public Boolean multithreaded_compaction = false;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5abeecc8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 0e9d705..0742f34 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -367,7 +367,7 @@ public class DatabaseDescriptor
             }
 
             if (conf.concurrent_compactors == null)
-                conf.concurrent_compactors = 
Runtime.getRuntime().availableProcessors();
+                conf.concurrent_compactors = 
FBUtilities.getAvailableProcessors();
 
             if (conf.concurrent_compactors <= 0)
                 throw new ConfigurationException("concurrent_compactors should 
be strictly greater than 0");
@@ -382,11 +382,11 @@ public class DatabaseDescriptor
                 throw new ConfigurationException("Unknown rpc_server_type: " + 
conf.rpc_server_type);
             if (conf.rpc_min_threads == null)
                 conf.rpc_min_threads = 
conf.rpc_server_type.toLowerCase().equals("hsha")
-                                     ? 
Runtime.getRuntime().availableProcessors() * 4
+                                     ? FBUtilities.getAvailableProcessors() * 4
                                      : 16;
             if (conf.rpc_max_threads == null)
                 conf.rpc_max_threads = 
conf.rpc_server_type.toLowerCase().equals("hsha")
-                                     ? 
Runtime.getRuntime().availableProcessors() * 4
+                                     ? FBUtilities.getAvailableProcessors() * 4
                                      : Integer.MAX_VALUE;
 
             /* data file and commit log directories. they get created later, 
when they're needed. */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5abeecc8/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
 
b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
index fb76f7b..907e1dd 100644
--- 
a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
+++ 
b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.concurrent.*;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 class PeriodicCommitLogExecutorService implements ICommitLogExecutorService
@@ -36,7 +37,7 @@ class PeriodicCommitLogExecutorService implements 
ICommitLogExecutorService
 
     public PeriodicCommitLogExecutorService(final CommitLog commitLog)
     {
-        queue = new LinkedBlockingQueue<Runnable>(1024 * 
Runtime.getRuntime().availableProcessors());
+        queue = new LinkedBlockingQueue<Runnable>(1024 * 
FBUtilities.getAvailableProcessors());
         Runnable runnable = new WrappedRunnable()
         {
             public void runMayThrow() throws Exception

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5abeecc8/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java 
b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index 79b3396..fc52393 100644
--- 
a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -138,7 +138,7 @@ public class ParallelCompactionIterable extends 
AbstractCompactionIterable
         private final List<RowContainer> rows = new ArrayList<RowContainer>();
         private int row = 0;
 
-        private final ThreadPoolExecutor executor = new 
DebuggableThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
+        private final ThreadPoolExecutor executor = new 
DebuggableThreadPoolExecutor(FBUtilities.getAvailableProcessors(),
                                                                                
      Integer.MAX_VALUE,
                                                                                
      TimeUnit.MILLISECONDS,
                                                                                
      new SynchronousQueue<Runnable>(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5abeecc8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java 
b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
index 5e36a80..16ad402 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.client.RingCache;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.RecordWriter;
@@ -104,7 +105,7 @@ implements 
org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
     {
         this.conf = conf;
         this.ringCache = new RingCache(conf);
-        this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * 
Runtime.getRuntime().availableProcessors());
+        this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * 
FBUtilities.getAvailableProcessors());
         this.clients = new HashMap<Range,RangeClient>();
         batchThreshold = 
conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
         consistencyLevel = 
ConsistencyLevel.valueOf(ConfigHelper.getWriteConsistencyLevel(conf));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5abeecc8/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index a4cca37..a631241 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -229,7 +229,7 @@ public class SSTableReader extends SSTable
     {
         final Collection<SSTableReader> sstables = new 
LinkedBlockingQueue<SSTableReader>();
 
-        ExecutorService executor = 
DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", 
Runtime.getRuntime().availableProcessors());
+        ExecutorService executor = 
DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", 
FBUtilities.getAvailableProcessors());
         for (final Map.Entry<Descriptor, Set<Component>> entry : entries)
         {
             Runnable runnable = new Runnable()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5abeecc8/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 453e2b2..dbb36af 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -82,7 +82,7 @@ public class StorageProxy implements StorageProxyMBean
 
     private static volatile boolean hintedHandoffEnabled = 
DatabaseDescriptor.hintedHandoffEnabled();
     private static volatile int maxHintWindow = 
DatabaseDescriptor.getMaxHintWindow();
-    private static volatile int maxHintsInProgress = 1024 * 
Runtime.getRuntime().availableProcessors();
+    private static volatile int maxHintsInProgress = 1024 * 
FBUtilities.getAvailableProcessors();
     private static final AtomicInteger totalHintsInProgress = new 
AtomicInteger();
     private static final Map<InetAddress, AtomicInteger> hintsInProgress = new 
MapMaker().concurrencyLevel(1).makeComputingMap(new Function<InetAddress, 
AtomicInteger>()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5abeecc8/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraDaemon.java 
b/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
index 7153c08..572e3e0 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
@@ -27,6 +27,7 @@ import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.cassandra.service.AbstractCassandraDaemon;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.thrift.server.TNonblockingServer;
 import org.apache.thrift.server.TThreadPoolServer;
 import org.slf4j.Logger;
@@ -202,7 +203,7 @@ public class CassandraDaemon extends 
org.apache.cassandra.service.AbstractCassan
                                                                                
        .processor(processor);
                     logger.info(String.format("Using custom 
half-sync/half-async thrift server on %s : %s", listenAddr, listenPort));
                     // Check for available processors in the system which will 
be equal to the IO Threads.
-                    serverEngine = new CustomTHsHaServer(serverArgs, 
executorService, Runtime.getRuntime().availableProcessors());
+                    serverEngine = new CustomTHsHaServer(serverArgs, 
executorService, FBUtilities.getAvailableProcessors());
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5abeecc8/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java 
b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 106bedf..03ec757 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -70,6 +70,14 @@ public class FBUtilities
     private static volatile InetAddress localInetAddress_;
     private static volatile InetAddress broadcastInetAddress_;
 
+    public static int getAvailableProcessors()
+    {
+        if (System.getProperty("cassandra.available_processors") != null)
+            return 
Integer.parseInt(System.getProperty("cassandra.available_processors"));
+        else
+            return Runtime.getRuntime().availableProcessors();
+    }
+
     private static final ThreadLocal<MessageDigest> localMD5Digest = new 
ThreadLocal<MessageDigest>()
     {
         @Override

Reply via email to