keith-turner closed pull request #549:  ACCUMULO-4074 Added support for 
multiple scan executors
URL: https://github.com/apache/accumulo/pull/549
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/pom.xml b/core/pom.xml
index cf45283272..4535f502d2 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -212,6 +212,25 @@
               </allows>
             </configuration>
           </execution>
+          <execution>
+            <id>apilyzer-spi</id>
+            <goals>
+              <goal>analyze</goal>
+            </goals>
+            <configuration>
+              
<outputFile>${project.build.directory}/apilyzer-spi.txt</outputFile>
+              <includes>
+                <include>org[.]apache[.]accumulo[.]core[.]spi[.].*</include>
+              </includes>
+              <excludes />
+              <allows>
+                <allow>org[.]apache[.]hadoop[.]io[.]Text</allow>
+                
<allow>org[.]apache[.]accumulo[.]core[.]client(?!.*[.](impl|thrift)[.].*)[.].*</allow>
+                
<allow>org[.]apache[.]accumulo[.]core[.]data(?!.*[.](impl|thrift)[.].*)[.].*</allow>
+                
<allow>org[.]apache[.]accumulo[.]core[.]security(?!.*[.](crypto)[.].*)[.].*</allow>
+              </allows>
+            </configuration>
+          </execution>
         </executions>
       </plugin>
       <plugin>
diff --git 
a/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java 
b/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
index 5a9ce21724..5427d79310 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
@@ -16,12 +16,17 @@
  */
 package org.apache.accumulo.core.conf;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
 import java.util.TreeMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -29,10 +34,12 @@
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.conf.PropertyType.PortRange;
+import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher;
 import org.apache.accumulo.core.util.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 
 /**
@@ -348,6 +355,135 @@ public int getMaxFilesPerTablet() {
     return maxFilesPerTablet;
   }
 
+  public class ScanExecutorConfig {
+    public final String name;
+    public final int maxThreads;
+    public final OptionalInt priority;
+    public final Optional<String> prioritizerClass;
+    public final Map<String,String> prioritizerOpts;
+
+    public ScanExecutorConfig(String name, int maxThreads, OptionalInt 
priority,
+        Optional<String> comparatorFactory, Map<String,String> 
comparatorFactoryOpts) {
+      this.name = name;
+      this.maxThreads = maxThreads;
+      this.priority = priority;
+      this.prioritizerClass = comparatorFactory;
+      this.prioritizerOpts = comparatorFactoryOpts;
+    }
+
+    /**
+     * Re-reads the max threads from the configuration that created this class
+     */
+    public int getCurrentMaxThreads() {
+      Integer depThreads = getDeprecatedScanThreads(name);
+      if (depThreads != null) {
+        return depThreads;
+      }
+
+      String prop = Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + name + "." 
+ SCAN_EXEC_THREADS;
+      String val = 
getAllPropertiesWithPrefix(Property.TSERV_SCAN_EXECUTORS_PREFIX).get(prop);
+      return Integer.parseInt(val);
+    }
+  }
+
+  public boolean isPropertySet(Property prop) {
+    throw new UnsupportedOperationException();
+  }
+
+  @SuppressWarnings("deprecation")
+  Integer getDeprecatedScanThreads(String name) {
+
+    Property prop;
+    Property deprecatedProp;
+
+    if (name.equals(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME)) {
+      prop = Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS;
+      deprecatedProp = Property.TSERV_READ_AHEAD_MAXCONCURRENT;
+    } else if (name.equals("meta")) {
+      prop = Property.TSERV_SCAN_EXECUTORS_META_THREADS;
+      deprecatedProp = Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT;
+    } else {
+      return null;
+    }
+
+    if (!isPropertySet(prop) && isPropertySet(deprecatedProp)) {
+      log.warn("Property {} is deprecated, use {} instead.", prop.getKey(),
+          deprecatedProp.getKey());
+      return Integer.valueOf(get(deprecatedProp));
+    } else if (isPropertySet(prop) && isPropertySet(deprecatedProp)) {
+      log.warn("Deprecated property {} ignored because {} is set", 
deprecatedProp.getKey(),
+          prop.getKey());
+    }
+
+    return null;
+  }
+
+  private static final String SCAN_EXEC_THREADS = "threads";
+  private static final String SCAN_EXEC_PRIORITY = "priority";
+  private static final String SCAN_EXEC_PRIORITIZER = "prioritizer";
+  private static final String SCAN_EXEC_PRIORITIZER_OPTS = "prioritizer.opts.";
+
+  public Collection<ScanExecutorConfig> getScanExecutors() {
+
+    Map<String,Map<String,String>> propsByName = new HashMap<>();
+
+    List<ScanExecutorConfig> scanResources = new ArrayList<>();
+
+    for (Entry<String,String> entry : getAllPropertiesWithPrefix(
+        Property.TSERV_SCAN_EXECUTORS_PREFIX).entrySet()) {
+
+      String suffix = entry.getKey()
+          .substring(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey().length());
+      String[] tokens = suffix.split("\\.", 2);
+      String name = tokens[0];
+
+      propsByName.computeIfAbsent(name, k -> new HashMap<>()).put(tokens[1], 
entry.getValue());
+    }
+
+    for (Entry<String,Map<String,String>> entry : propsByName.entrySet()) {
+      String name = entry.getKey();
+      Integer threads = null;
+      Integer prio = null;
+      String prioritizerClass = null;
+      Map<String,String> prioritizerOpts = new HashMap<>();
+
+      for (Entry<String,String> subEntry : entry.getValue().entrySet()) {
+        String opt = subEntry.getKey();
+        String val = subEntry.getValue();
+
+        if (opt.equals(SCAN_EXEC_THREADS)) {
+          Integer depThreads = getDeprecatedScanThreads(name);
+          if (depThreads == null) {
+            threads = Integer.parseInt(val);
+          } else {
+            threads = depThreads;
+          }
+        } else if (opt.equals(SCAN_EXEC_PRIORITY)) {
+          prio = Integer.parseInt(val);
+        } else if (opt.equals(SCAN_EXEC_PRIORITIZER)) {
+          prioritizerClass = val;
+        } else if (opt.startsWith(SCAN_EXEC_PRIORITIZER_OPTS)) {
+          String key = opt.substring(SCAN_EXEC_PRIORITIZER_OPTS.length());
+          if (key.isEmpty()) {
+            throw new IllegalStateException("Invalid scan executor option : " 
+ opt);
+          }
+          prioritizerOpts.put(key, val);
+        } else {
+          throw new IllegalStateException("Unkown scan executor option : " + 
opt);
+        }
+      }
+
+      Preconditions.checkArgument(threads != null && threads > 0,
+          "Scan resource %s incorrectly specified threads", name);
+
+      scanResources.add(new ScanExecutorConfig(name, threads,
+          prio == null ? OptionalInt.empty() : OptionalInt.of(prio),
+          Optional.ofNullable(prioritizerClass), prioritizerOpts));
+    }
+
+    return scanResources;
+  }
+
   /**
    * Invalidates the <code>ZooCache</code> used for storage and quick 
retrieval of properties for
    * this configuration.
diff --git 
a/core/src/main/java/org/apache/accumulo/core/conf/DefaultConfiguration.java 
b/core/src/main/java/org/apache/accumulo/core/conf/DefaultConfiguration.java
index fc2891e2ad..54b87ccbda 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/DefaultConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/DefaultConfiguration.java
@@ -52,4 +52,9 @@ public void getProperties(Map<String,String> props, 
Predicate<String> filter) {
     resolvedProps.entrySet().stream().filter(p -> filter.test(p.getKey()))
         .forEach(e -> props.put(e.getKey(), e.getValue()));
   }
+
+  @Override
+  public boolean isPropertySet(Property prop) {
+    return false;
+  }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index ed17bedbd4..b7ff210fb0 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -30,6 +30,9 @@
 import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher;
+import org.apache.accumulo.core.spi.scan.ScanPrioritizer;
+import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher;
 import org.apache.accumulo.core.util.format.DefaultFormatter;
 import org.apache.accumulo.core.util.interpret.DefaultScanInterpreter;
 import org.apache.accumulo.start.classloader.AccumuloClassLoader;
@@ -430,12 +433,37 @@
       "When a tablet server's SimpleTimer thread triggers to check idle"
           + " sessions, this configurable option will be used to evaluate 
update"
           + " sessions to determine if they can be closed due to inactivity"),
+  @Deprecated
   TSERV_READ_AHEAD_MAXCONCURRENT("tserver.readahead.concurrent.max", "16", 
PropertyType.COUNT,
-      "The maximum number of concurrent read ahead that will execute. This 
effectively"
-          + " limits the number of long running scans that can run 
concurrently per tserver."),
+      "This property is deprecated since 2.0.0, use 
tserver.scan.executors.default.threads "
+          + "instead. The maximum number of concurrent read ahead that will 
execute. This "
+          + "effectively limits the number of long running scans that can run 
concurrently "
+          + "per tserver.\""),
+  @Deprecated
   
TSERV_METADATA_READ_AHEAD_MAXCONCURRENT("tserver.metadata.readahead.concurrent.max",
 "8",
       PropertyType.COUNT,
-      "The maximum number of concurrent metadata read ahead that will 
execute."),
+      "This property is deprecated since 2.0.0, use 
tserver.scan.executors.meta.threads instead. "
+          + "The maximum number of concurrent metadata read ahead that will 
execute."),
+  TSERV_SCAN_EXECUTORS_PREFIX("tserver.scan.executors.", null, 
PropertyType.PREFIX,
+      "Prefix for defining executors to service scans.  For each executor the 
number of threads, "
+          + "thread priority, and an optional prioritizer can be configured.  
The prioritizer "
+          + "determines which scan an executor should run first and must 
implement "
+          + ScanPrioritizer.class.getName() + ". Tables can select an executor 
by setting"
+          + " table.scan.dispatcher. To configure a new executor, set "
+          + "tserver.scan.executors.<name>.threads=<number>.  Optionally, can 
also set "
+          + "tserver.scan.executors.<name>.priority=<number 1 to 10>, "
+          + "tserver.scan.executors.<name>.prioritizer=<class name>, and "
+          + "tserver.scan.executors.<name>.prioritizer.opts.<key>=<value>"),
+  
TSERV_SCAN_EXECUTORS_DEFAULT_THREADS("tserver.scan.executors.default.threads", 
"16",
+      PropertyType.COUNT,
+      "The number of threads for the scan executor that tables use by 
default."),
+  
TSERV_SCAN_EXECUTORS_DEFAULT_PRIORITIZER("tserver.scan.executors.default.prioritizer",
 "",
+      PropertyType.STRING,
+      "Prioritizer for the default scan executor.  Defaults to none which "
+          + "results in FIFO priority.  Set to a class that implements "
+          + ScanPrioritizer.class.getName() + " to configure one."),
+  TSERV_SCAN_EXECUTORS_META_THREADS("tserver.scan.executors.meta.threads", 
"8", PropertyType.COUNT,
+      "The number of threads for the metadata table scan executor."),
   TSERV_MIGRATE_MAXCONCURRENT("tserver.migrations.concurrent.max", "1", 
PropertyType.COUNT,
       "The maximum number of concurrent tablet migrations for a tablet 
server"),
   TSERV_MAJC_MAXCONCURRENT("tserver.compaction.major.concurrent.max", "3", 
PropertyType.COUNT,
@@ -664,6 +692,19 @@
       PropertyType.BYTES,
       "The max RFile size used for a merging minor compaction. The default"
           + " value of 0 disables a max file size."),
+  TABLE_SCAN_DISPATCHER("table.scan.dispatcher", 
SimpleScanDispatcher.class.getName(),
+      PropertyType.CLASSNAME,
+      "This class is used to dynamically dispatch scans to configured scan 
executors.  This setting"
+          + " defaults to " + SimpleScanDispatcher.class.getSimpleName()
+          + " which dispatches to an executor"
+          + " named 'default' when it is optionless. Setting the option "
+          + "'table.scan.dispatcher.opts.executor=<name>' causes "
+          + SimpleScanDispatcher.class.getSimpleName() + " to dispatch to the 
specified executor. "
+          + "It has more options listed in its javadoc. Configured classes 
must implement "
+          + ScanDispatcher.class.getName() + ".  This property is ignored for 
the root and metadata"
+          + " table.  The metadata table always dispatches to a scan executor 
named `meta`."),
+  TABLE_SCAN_DISPATCHER_OPTS("table.scan.dispatcher.opts.", null, 
PropertyType.PREFIX,
+      "Options for the table scan dispatcher"),
   TABLE_SCAN_MAXMEM("table.scan.max.memory", "512K", PropertyType.BYTES,
       "The maximum amount of memory that will be used to cache results of a 
client query/scan. "
           + "Once this limit is reached, the buffered data is sent to the 
client."),
@@ -1148,7 +1189,8 @@ public static boolean isValidTablePropertyKey(String key) 
{
             || key.startsWith(Property.TABLE_REPLICATION_TARGET.getKey())
             || key.startsWith(Property.TABLE_ARBITRARY_PROP_PREFIX.getKey())
             || key.startsWith(TABLE_SAMPLER_OPTS.getKey())
-            || key.startsWith(TABLE_SUMMARIZER_PREFIX.getKey())));
+            || key.startsWith(TABLE_SUMMARIZER_PREFIX.getKey())
+            || key.startsWith(TABLE_SCAN_DISPATCHER_OPTS.getKey())));
   }
 
   /**
diff --git 
a/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java 
b/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java
index 001b1dbbba..cf0fe25353 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java
@@ -33,6 +33,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  * An {@link AccumuloConfiguration} which first loads any properties set on 
the command-line (using
  * the -o option) and then from an XML file, usually accumulo-site.xml. This 
implementation supports
@@ -164,6 +166,14 @@ public String get(Property property) {
     return value;
   }
 
+  @Override
+  public boolean isPropertySet(Property prop) {
+    Preconditions.checkArgument(!prop.isSensitive(),
+        "This method not implemented for sensitive props");
+    return CliConfiguration.get(prop) != null || 
staticConfigs.containsKey(prop.getKey())
+        || getXmlConfig().get(prop.getKey()) != null || 
parent.isPropertySet(prop);
+  }
+
   @Override
   public void getProperties(Map<String,String> props, Predicate<String> 
filter) {
     getProperties(props, filter, true);
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/common/IteratorConfiguration.java
 
b/core/src/main/java/org/apache/accumulo/core/spi/common/IteratorConfiguration.java
new file mode 100644
index 0000000000..40f05e5050
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/common/IteratorConfiguration.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.common;
+
+import java.util.Map;
+
+/**
+ * Provides information about a configured Accumulo Iterator
+ *
+ * @since 2.0.0
+ */
+public interface IteratorConfiguration {
+  String getIteratorClass();
+
+  String getName();
+
+  int getPriority();
+
+  Map<String,String> getOptions();
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/common/Stats.java 
b/core/src/main/java/org/apache/accumulo/core/spi/common/Stats.java
new file mode 100644
index 0000000000..a1b9322129
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/common/Stats.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.common;
+
+/**
+ * @since 2.0.0
+ */
+public interface Stats {
+
+  /**
+   * @return the minimum data point seen, or 0 if no data was seen
+   */
+  long min();
+
+  /**
+   * @return the maximum data point seen, or 0 if no data was seen
+   */
+  long max();
+
+  /**
+   * @return the mean of the data points seen, or {@link Double#NaN} if no 
data was seen
+   */
+  double mean();
+
+  /**
+   * @return the sum of the data points seen, or 0 if no data was seen
+   */
+  long sum();
+
+  /**
+   * @return the number of data points seen
+   */
+  long num();
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java
 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java
new file mode 100644
index 0000000000..c567460bf0
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Comparator;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Prioritize scans based on the ratio of runTime/idleTime. Scans with a lower 
ratio have a higher
+ * priority. When the ratio is equal, the scan with the oldest last run time 
has the highest
+ * priority. If neither have run, then the oldest gets priority.
+ *
+ * @since 2.0.0
+ */
+public class IdleRatioScanPrioritizer implements ScanPrioritizer {
+  private static double idleRatio(long currTime, ScanInfo si) {
+    double totalRunTime = si.getRunTimeStats().sum();
+    double totalIdleTime = Math.max(1, si.getIdleTimeStats(currTime).sum());
+    return totalRunTime / totalIdleTime;
+  }
+
+  @Override
+  public Comparator<ScanInfo> createComparator(Map<String,String> options) {
+    Preconditions.checkArgument(options.isEmpty());
+
+    Comparator<ScanInfo> c1 = (si1, si2) -> {
+      long currTime = System.currentTimeMillis();
+      return Double.compare(idleRatio(currTime, si1), idleRatio(currTime, 
si2));
+    };
+
+    return c1.thenComparingLong(si -> si.getLastRunTime().orElse(0))
+        .thenComparingLong(si -> si.getCreationTime());
+  }
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java
new file mode 100644
index 0000000000..a3bc3f1e10
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A per table scan dispatcher that decides which executor should be used to 
processes a scan. For
+ * information about configuring, find the documentation for the {@code 
table.scan.dispatcher} and
+ * {@code table.scan.dispatcher.opts.} properties.
+ *
+ * @since 2.0.0
+ */
+public interface ScanDispatcher {
+  /**
+   * This method is called once after a ScanDispatcher is instantiated.
+   *
+   * @param options
+   *          The configured options. For example if the table properties
+   *          {@code table.scan.dispatcher.opts.p1=abc} and
+   *          {@code table.scan.dispatcher.opts.p9=123} were set, then this 
map would contain
+   *          {@code p1=abc} and {@code p9=123}.
+   */
+  public default void init(Map<String,String> options) {
+    Preconditions.checkArgument(options.isEmpty(), "No options expected");
+  }
+
+  /**
+   * @param scanInfo
+   *          Information about the scan.
+   * @param scanExecutors
+   *          Information about the currently configured executors.
+   * @return Should return one of the executors named in scanExecutors.keySet()
+   */
+  String dispatch(ScanInfo scanInfo, Map<String,ScanExecutor> scanExecutors);
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanExecutor.java 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanExecutor.java
new file mode 100644
index 0000000000..a55b090b9b
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanExecutor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Interface for obtaining information about a scan executor
+ *
+ * @since 2.0.0
+ */
+public interface ScanExecutor {
+
+  interface Config {
+    /**
+     * @return the unique name used to identified executor in config
+     */
+    String getName();
+
+    /**
+     * @return the max number of threads that were configured
+     */
+    int getMaxThreads();
+
+    /**
+     * @return the prioritizer that was configured
+     */
+    Optional<String> getPrioritizerClass();
+
+    /**
+     * @return the prioritizer options
+     */
+    Map<String,String> getPrioritizerOptions();
+  }
+
+  /**
+   * @return The number of task queued for the executor
+   */
+  int getQueued();
+
+  /**
+   * @return The configuration used to create the executor
+   */
+  Config getConfig();
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanInfo.java 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanInfo.java
new file mode 100644
index 0000000000..7961c21b3c
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanInfo.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Collection;
+import java.util.OptionalLong;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.spi.common.IteratorConfiguration;
+import org.apache.accumulo.core.spi.common.Stats;
+
+/**
+ * Provides information about an active Accumulo scan against a tablet. 
Accumulo scans operate by
+ * repeatedly gathering batches of data and returning those to the client.
+ *
+ * <p>
+ * All times are in milliseconds and obtained using System.currentTimeMillis().
+ *
+ * @since 2.0.0
+ */
+public interface ScanInfo {
+
+  enum Type {
+    /**
+     * A single range scan started using a {@link Scanner}
+     */
+    SINGLE,
+    /**
+     * A multi range scan started using a {@link BatchScanner}
+     */
+    MULTI
+  }
+
+  Type getScanType();
+
+  String getTableId();
+
+  /**
+   * Returns the first time a tablet knew about a scan over its portion of 
data.
+   */
+  long getCreationTime();
+
+  /**
+   * If the scan has run, returns the last run time.
+   */
+  OptionalLong getLastRunTime();
+
+  /**
+   * Returns timing statistics about running and gathering a batches of data.
+   */
+  Stats getRunTimeStats();
+
+  /**
+   * Returns statistics about the time between running. These stats are only 
about the idle times
+   * before the last run time. The idle time after the last run time are not 
included. If the scan
+   * has never run, then there are no stats.
+   */
+  Stats getIdleTimeStats();
+
+  /**
+   * This method is similar to {@link #getIdleTimeStats()}, but it also 
includes the time period
+   * between the last run time and now in the stats. If the scan has never 
run, then the stats are
+   * computed using only {@code currentTime - creationTime}.
+   */
+  Stats getIdleTimeStats(long currentTime);
+
+  /**
+   * This method returns what column were fetched by a scan. When a family is 
fetched, a Column
+   * object where everything but the family is null is in the set.
+   *
+   * <p>
+   * The following example code shows how this method can be used to check if 
a family was fetched
+   * or a family+qualifier was fetched. If continually checking for the same 
column, should probably
+   * create a constant.
+   *
+   * <pre>
+   * <code>
+   *   boolean wasFamilyFetched(ScanInfo si, byte[] fam) {
+   *     Column family = new Column(fam, null, null);
+   *     return si.getFetchedColumns().contains(family);
+   *   }
+   *
+   *   boolean wasColumnFetched(ScanInfo si, byte[] fam, byte[] qual) {
+   *     Column col = new Column(fam, qual, null);
+   *     return si.getFetchedColumns().contains(col);
+   *   }
+   * </code>
+   * </pre>
+   *
+   *
+   * @return The family and family+qualifier pairs fetched.
+   */
+  Set<Column> getFetchedColumns();
+
+  /**
+   * @return iterators that where configured on the client side scanner
+   */
+  Collection<IteratorConfiguration> getClientScanIterators();
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java
new file mode 100644
index 0000000000..51a4254a32
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Comparator;
+import java.util.Map;
+
+/**
+ * A factory for creating comparators used for prioritizing scans. For 
information about
+ * configuring, find the documentation for the {@code tserver.scan.executors.} 
property.
+ *
+ * @since 2.0.0
+ */
+public interface ScanPrioritizer {
+  Comparator<ScanInfo> createComparator(Map<String,String> options);
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
new file mode 100644
index 0000000000..96e7d2cf17
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+/**
+ * If no options are given, then this will dispatch to an executor named 
{@code default}. This
+ * dispatcher supports the following options.
+ *
+ * <UL>
+ * <LI>{@code table.scan.dispatcher.opts.executor=<scan executor name>} : 
dispatches all scans to
+ * the named executor.</LI>
+ * <LI>{@code table.scan.dispatcher.opts.multi_executor=<scan executor name>} 
: dispatches batch
+ * scans to the named executor.</LI>
+ * <LI>{@code table.scan.dispatcher.opts.single_executor=<scan executor name>} 
: dispatches regular
+ * scans to the named executor.</LI>
+ * </UL>
+ *
+ * The {@code multi_executor} and {@code single_executor} options override the 
{@code executor}
+ * option.
+ */
+
+public class SimpleScanDispatcher implements ScanDispatcher {
+
+  private final Set<String> VALID_OPTS = ImmutableSet.of("executor", 
"multi_executor",
+      "single_executor");
+  private String multiExecutor;
+  private String singleExecutor;
+
+  public static final String DEFAULT_SCAN_EXECUTOR_NAME = "default";
+
+  @Override
+  public void init(Map<String,String> options) {
+    Set<String> invalidOpts = Sets.difference(options.keySet(), VALID_OPTS);
+    Preconditions.checkArgument(invalidOpts.size() == 0, "Invalid options : 
%s", invalidOpts);
+
+    String base = options.getOrDefault("executor", DEFAULT_SCAN_EXECUTOR_NAME);
+    multiExecutor = options.getOrDefault("multi_executor", base);
+    singleExecutor = options.getOrDefault("single_executor", base);
+  }
+
+  @Override
+  public String dispatch(ScanInfo scanInfo, Map<String,ScanExecutor> 
scanExecutors) {
+    switch (scanInfo.getScanType()) {
+      case MULTI:
+        return multiExecutor;
+      case SINGLE:
+        return singleExecutor;
+      default:
+        throw new IllegalArgumentException("Unexpected scan type " + 
scanInfo.getScanType());
+
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
 
b/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
new file mode 100644
index 0000000000..ed9c150792
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AccumuloUncaughtExceptionHandler implements 
UncaughtExceptionHandler {
+
+  private static final Logger log = 
LoggerFactory.getLogger(AccumuloUncaughtExceptionHandler.class);
+
+  @Override
+  public void uncaughtException(Thread t, Throwable e) {
+    log.error(String.format("Caught an exception in %s.  Shutting down.", t), 
e);
+  }
+
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java 
b/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
index d8b307c36c..57d429bfa8 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
@@ -16,6 +16,8 @@
  */
 package org.apache.accumulo.core.util;
 
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.OptionalInt;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -26,16 +28,31 @@
 public class NamingThreadFactory implements ThreadFactory {
   private static final Logger log = 
LoggerFactory.getLogger(NamingThreadFactory.class);
 
+  private static final UncaughtExceptionHandler UEH = new 
AccumuloUncaughtExceptionHandler();
+
   private AtomicInteger threadNum = new AtomicInteger(1);
   private String name;
+  private OptionalInt priority;
 
   public NamingThreadFactory(String name) {
     this.name = name;
+    this.priority = OptionalInt.empty();
+  }
+
+  public NamingThreadFactory(String name, OptionalInt priority) {
+    this.name = name;
+    this.priority = priority;
   }
 
   @Override
   public Thread newThread(Runnable r) {
-    return new Daemon(new LoggingRunnable(log, r), name + " " + 
threadNum.getAndIncrement());
+    Thread thread = new Daemon(new LoggingRunnable(log, r),
+        name + " " + threadNum.getAndIncrement());
+    thread.setUncaughtExceptionHandler(UEH);
+    if (priority.isPresent()) {
+      thread.setPriority(priority.getAsInt());
+    }
+    return thread;
   }
 
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/util/Stat.java 
b/core/src/main/java/org/apache/accumulo/core/util/Stat.java
index 8bdeb63312..704622b005 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/Stat.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/Stat.java
@@ -16,67 +16,70 @@
  */
 package org.apache.accumulo.core.util;
 
-import org.apache.commons.math3.stat.descriptive.StorelessUnivariateStatistic;
+import org.apache.accumulo.core.spi.common.Stats;
 import org.apache.commons.math3.stat.descriptive.moment.Mean;
-import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
-import org.apache.commons.math3.stat.descriptive.rank.Max;
-import org.apache.commons.math3.stat.descriptive.rank.Min;
-import org.apache.commons.math3.stat.descriptive.summary.Sum;
 
-public class Stat {
-  Min min;
-  Max max;
-  Sum sum;
+public class Stat implements Stats {
+  long min;
+  long max;
+  long sum;
   Mean mean;
-  StandardDeviation sd;
-
-  StorelessUnivariateStatistic[] stats;
 
   public Stat() {
-    min = new Min();
-    max = new Max();
-    sum = new Sum();
     mean = new Mean();
-    sd = new StandardDeviation();
-
-    stats = new StorelessUnivariateStatistic[] {min, max, sum, mean, sd};
+    clear();
   }
 
   public void addStat(long stat) {
-    for (StorelessUnivariateStatistic statistic : stats) {
-      statistic.increment(stat);
-    }
+    min = Math.min(min, stat);
+    max = Math.max(max, stat);
+    sum += stat;
+    mean.increment(stat);
   }
 
-  public long getMin() {
-    return (long) min.getResult();
+  @Override
+  public long min() {
+    return num() == 0 ? 0L : min;
   }
 
-  public long getMax() {
-    return (long) max.getResult();
+  @Override
+  public long max() {
+    return num() == 0 ? 0L : max;
   }
 
-  public long getSum() {
-    return (long) sum.getResult();
+  @Override
+  public long sum() {
+    return sum;
   }
 
-  public double getAverage() {
+  @Override
+  public double mean() {
     return mean.getResult();
   }
 
-  public double getStdDev() {
-    return sd.getResult();
-  }
-
   @Override
   public String toString() {
-    return String.format("%,d %,d %,.2f %,d", getMin(), getMax(), 
getAverage(), mean.getN());
+    return String.format("%,d %,d %,.2f %,d", min(), max(), mean(), 
mean.getN());
   }
 
   public void clear() {
-    for (StorelessUnivariateStatistic statistic : stats) {
-      statistic.clear();
-    }
+    min = Long.MAX_VALUE;
+    max = Long.MIN_VALUE;
+    sum = 0;
+    mean.clear();
+  }
+
+  @Override
+  public long num() {
+    return mean.getN();
   }
 
+  public Stat copy() {
+    Stat stat = new Stat();
+    stat.min = this.min;
+    stat.max = this.max;
+    stat.sum = this.sum;
+    stat.mean = this.mean.copy();
+    return stat;
+  }
 }
diff --git 
a/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java
 
b/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java
index 9f77834f4b..0fcfebdbcf 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java
@@ -21,11 +21,15 @@
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.function.Predicate;
 
+import org.apache.accumulo.core.conf.AccumuloConfiguration.ScanExecutorConfig;
+import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher;
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -131,12 +135,26 @@ public void testGetPortInvalidSyntax() {
 
     private HashMap<String,String> props = new HashMap<>();
     private int upCount = 0;
+    private AccumuloConfiguration parent;
+
+    TestConfiguration() {
+      parent = null;
+    }
+
+    TestConfiguration(AccumuloConfiguration parent) {
+      this.parent = parent;
+    }
 
     public void set(String p, String v) {
       props.put(p, v);
       upCount++;
     }
 
+    @Override
+    public boolean isPropertySet(Property prop) {
+      return props.containsKey(prop.getKey());
+    }
+
     @Override
     public long getUpdateCount() {
       return upCount;
@@ -144,11 +162,18 @@ public long getUpdateCount() {
 
     @Override
     public String get(Property property) {
-      return props.get(property.getKey());
+      String v = props.get(property.getKey());
+      if (v == null & parent != null) {
+        v = parent.get(property);
+      }
+      return v;
     }
 
     @Override
     public void getProperties(Map<String,String> output, Predicate<String> 
filter) {
+      if (parent != null) {
+        parent.getProperties(output, filter);
+      }
       for (Entry<String,String> entry : props.entrySet()) {
         if (filter.test(entry.getKey())) {
           output.put(entry.getKey(), entry.getValue());
@@ -267,4 +292,89 @@ public void testGetByPrefix() {
     Map<String,String> pmL = 
tc.getAllPropertiesWithPrefix(Property.TABLE_ITERATOR_SCAN_PREFIX);
     assertSame(pmG, pmL);
   }
+
+  @Test
+  public void testScanExecutors() {
+    String defName = SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME;
+
+    TestConfiguration tc = new 
TestConfiguration(DefaultConfiguration.getInstance());
+
+    Collection<ScanExecutorConfig> executors = tc.getScanExecutors();
+
+    Assert.assertEquals(2, executors.size());
+
+    ScanExecutorConfig sec = executors.stream().filter(c -> 
c.name.equals(defName)).findFirst()
+        .get();
+    Assert.assertEquals(
+        
Integer.parseInt(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getDefaultValue()),
+        sec.maxThreads);
+    Assert.assertFalse(sec.priority.isPresent());
+    Assert.assertTrue(sec.prioritizerClass.get().isEmpty());
+    Assert.assertTrue(sec.prioritizerOpts.isEmpty());
+
+    // ensure deprecated props is read if nothing else is set
+    tc.set("tserver.readahead.concurrent.max", "6");
+    Assert.assertEquals(6, sec.getCurrentMaxThreads());
+    Assert.assertEquals(
+        
Integer.parseInt(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getDefaultValue()),
+        sec.maxThreads);
+    ScanExecutorConfig sec2 = tc.getScanExecutors().stream().filter(c -> 
c.name.equals(defName))
+        .findFirst().get();
+    Assert.assertEquals(6, sec2.maxThreads);
+
+    // ensure new prop overrides deperecated prop
+    tc.set(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getKey(), "9");
+    Assert.assertEquals(9, sec.getCurrentMaxThreads());
+    Assert.assertEquals(
+        
Integer.parseInt(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getDefaultValue()),
+        sec.maxThreads);
+    ScanExecutorConfig sec3 = tc.getScanExecutors().stream().filter(c -> 
c.name.equals(defName))
+        .findFirst().get();
+    Assert.assertEquals(9, sec3.maxThreads);
+
+    ScanExecutorConfig sec4 = executors.stream().filter(c -> 
c.name.equals("meta")).findFirst()
+        .get();
+    Assert.assertEquals(
+        
Integer.parseInt(Property.TSERV_SCAN_EXECUTORS_META_THREADS.getDefaultValue()),
+        sec4.maxThreads);
+    Assert.assertFalse(sec4.priority.isPresent());
+    Assert.assertFalse(sec4.prioritizerClass.isPresent());
+    Assert.assertTrue(sec4.prioritizerOpts.isEmpty());
+
+    tc.set("tserver.metadata.readahead.concurrent.max", "2");
+    Assert.assertEquals(2, sec4.getCurrentMaxThreads());
+    ScanExecutorConfig sec5 = tc.getScanExecutors().stream().filter(c -> 
c.name.equals("meta"))
+        .findFirst().get();
+    Assert.assertEquals(2, sec5.maxThreads);
+
+    tc.set(Property.TSERV_SCAN_EXECUTORS_META_THREADS.getKey(), "3");
+    Assert.assertEquals(3, sec4.getCurrentMaxThreads());
+    ScanExecutorConfig sec6 = tc.getScanExecutors().stream().filter(c -> 
c.name.equals("meta"))
+        .findFirst().get();
+    Assert.assertEquals(3, sec6.maxThreads);
+
+    String prefix = Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey();
+    tc.set(prefix + "hulksmash.threads", "66");
+    tc.set(prefix + "hulksmash.priority", "3");
+    tc.set(prefix + "hulksmash.prioritizer", "com.foo.ScanPrioritizer");
+    tc.set(prefix + "hulksmash.prioritizer.opts.k1", "v1");
+    tc.set(prefix + "hulksmash.prioritizer.opts.k2", "v3");
+
+    executors = tc.getScanExecutors();
+    Assert.assertEquals(3, executors.size());
+    ScanExecutorConfig sec7 = executors.stream().filter(c -> 
c.name.equals("hulksmash")).findFirst()
+        .get();
+    Assert.assertEquals(66, sec7.maxThreads);
+    Assert.assertEquals(3, sec7.priority.getAsInt());
+    Assert.assertEquals("com.foo.ScanPrioritizer", 
sec7.prioritizerClass.get());
+    Assert.assertEquals(ImmutableMap.of("k1", "v1", "k2", "v3"), 
sec7.prioritizerOpts);
+
+    tc.set(prefix + "hulksmash.threads", "44");
+    Assert.assertEquals(66, sec7.maxThreads);
+    Assert.assertEquals(44, sec7.getCurrentMaxThreads());
+
+    ScanExecutorConfig sec8 = tc.getScanExecutors().stream().filter(c -> 
c.name.equals("hulksmash"))
+        .findFirst().get();
+    Assert.assertEquals(44, sec8.maxThreads);
+  }
 }
diff --git 
a/core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java
 
b/core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java
new file mode 100644
index 0000000000..0128b20954
--- /dev/null
+++ 
b/core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.accumulo.core.spi.scan.ScanInfo.Type;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class IdleRatioScanPrioritizerTest {
+
+  @Test
+  public void testSort() {
+    long now = System.currentTimeMillis();
+
+    List<TestScanInfo> scans = new ArrayList<>();
+
+    // Two following have never run, so oldest should go first
+    scans.add(new TestScanInfo("a", Type.SINGLE, now - 3));
+    scans.add(new TestScanInfo("b", Type.SINGLE, now - 8));
+    // Two following have different idle ratio and same last run times
+    scans.add(new TestScanInfo("c", Type.SINGLE, now - 16, 2, 10));
+    scans.add(new TestScanInfo("d", Type.SINGLE, now - 16, 5, 10));
+    // Two following have same idle ratio and different last run times
+    scans.add(new TestScanInfo("e", Type.SINGLE, now - 12, 5, 9));
+    scans.add(new TestScanInfo("f", Type.SINGLE, now - 12, 3, 7));
+
+    Collections.shuffle(scans);
+
+    Comparator<ScanInfo> comparator = new IdleRatioScanPrioritizer()
+        .createComparator(Collections.emptyMap());
+
+    Collections.sort(scans, comparator);
+
+    Assert.assertEquals("b", scans.get(0).testId);
+    Assert.assertEquals("a", scans.get(1).testId);
+    Assert.assertEquals("f", scans.get(2).testId);
+    Assert.assertEquals("e", scans.get(3).testId);
+    Assert.assertEquals("d", scans.get(4).testId);
+    Assert.assertEquals("c", scans.get(5).testId);
+  }
+}
diff --git 
a/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
 
b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
new file mode 100644
index 0000000000..b59858a9c0
--- /dev/null
+++ 
b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.spi.scan.ScanInfo.Type;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+public class SimpleScanDispatcherTest {
+  @Test
+  public void testProps() {
+    Assert.assertTrue(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getKey()
+        .endsWith(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME + 
".threads"));
+    
Assert.assertTrue(Property.TSERV_SCAN_EXECUTORS_DEFAULT_PRIORITIZER.getKey()
+        .endsWith(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME + 
".prioritizer"));
+  }
+
+  private void runTest(Map<String,String> opts, String expectedSingle, String 
expectedMulti) {
+    ScanInfo msi = new TestScanInfo("a", Type.MULTI, 4);
+    ScanInfo ssi = new TestScanInfo("a", Type.SINGLE, 4);
+
+    SimpleScanDispatcher ssd1 = new SimpleScanDispatcher();
+    ssd1.init(opts);
+    Assert.assertEquals(expectedMulti, ssd1.dispatch(msi, null));
+    Assert.assertEquals(expectedSingle, ssd1.dispatch(ssi, null));
+  }
+
+  @Test
+  public void testBasic() {
+    String dname = SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME;
+
+    runTest(Collections.emptyMap(), dname, dname);
+    runTest(ImmutableMap.of("executor", "E1"), "E1", "E1");
+    runTest(ImmutableMap.of("single_executor", "E2"), "E2", dname);
+    runTest(ImmutableMap.of("multi_executor", "E3"), dname, "E3");
+    runTest(ImmutableMap.of("executor", "E1", "single_executor", "E2"), "E2", 
"E1");
+    runTest(ImmutableMap.of("executor", "E1", "multi_executor", "E3"), "E1", 
"E3");
+    runTest(ImmutableMap.of("single_executor", "E2", "multi_executor", "E3"), 
"E2", "E3");
+    runTest(ImmutableMap.of("executor", "E1", "single_executor", "E2", 
"multi_executor", "E3"),
+        "E2", "E3");
+  }
+}
diff --git 
a/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java 
b/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java
new file mode 100644
index 0000000000..68a96502f8
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Collection;
+import java.util.OptionalLong;
+import java.util.Set;
+
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.spi.common.IteratorConfiguration;
+import org.apache.accumulo.core.spi.common.Stats;
+import org.apache.accumulo.core.util.Stat;
+
+public class TestScanInfo implements ScanInfo {
+
+  String testId;
+  Type scanType;
+  long creationTime;
+  OptionalLong lastRunTime = OptionalLong.empty();
+  Stat runTimeStats = new Stat();
+  Stat idleTimeStats = new Stat();
+
+  TestScanInfo(String testId, Type scanType, long creationTime, int... times) {
+    this.testId = testId;
+    this.scanType = scanType;
+    this.creationTime = creationTime;
+
+    for (int i = 0; i < times.length; i += 2) {
+      long idleDuration = times[i] - (i == 0 ? 0 : times[i - 1]);
+      long runDuration = times[i + 1] - times[i];
+      runTimeStats.addStat(runDuration);
+      idleTimeStats.addStat(idleDuration);
+    }
+
+    if (times.length > 0) {
+      lastRunTime = OptionalLong.of(times[times.length - 1] + creationTime);
+    }
+  }
+
+  @Override
+  public Type getScanType() {
+    return scanType;
+  }
+
+  @Override
+  public String getTableId() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getCreationTime() {
+    return creationTime;
+  }
+
+  @Override
+  public OptionalLong getLastRunTime() {
+    return lastRunTime;
+  }
+
+  @Override
+  public Stats getRunTimeStats() {
+    return runTimeStats;
+  }
+
+  @Override
+  public Stats getIdleTimeStats() {
+    return idleTimeStats;
+  }
+
+  @Override
+  public Stats getIdleTimeStats(long currentTime) {
+    Stat copy = idleTimeStats.copy();
+    copy.addStat(currentTime - lastRunTime.orElse(creationTime));
+    return copy;
+  }
+
+  @Override
+  public Set<Column> getFetchedColumns() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection<IteratorConfiguration> getClientScanIterators() {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/core/src/test/java/org/apache/accumulo/core/util/StatTest.java 
b/core/src/test/java/org/apache/accumulo/core/util/StatTest.java
index 69df38d4e6..a2986acc50 100644
--- a/core/src/test/java/org/apache/accumulo/core/util/StatTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/util/StatTest.java
@@ -43,32 +43,26 @@ public void setUp() {
 
   @Test
   public void testGetMin() {
-    assertEquals(0, zero.getMin());
-    assertEquals(3677, stat.getMin());
+    assertEquals(0, zero.min());
+    assertEquals(3677, stat.min());
   }
 
   @Test
   public void testGetMax() {
-    assertEquals(0, zero.getMax());
-    assertEquals(9792, stat.getMax());
+    assertEquals(0, zero.max());
+    assertEquals(9792, stat.max());
   }
 
   @Test
   public void testGetAverage() {
-    assertEquals(0, zero.getAverage(), delta);
-    assertEquals(5529, stat.getAverage(), delta);
-  }
-
-  @Test
-  public void testGetStdDev() {
-    assertEquals(0, zero.getStdDev(), delta);
-    assertEquals(2073.7656569632, stat.getStdDev(), delta);
+    assertEquals(0, zero.mean(), delta);
+    assertEquals(5529, stat.mean(), delta);
   }
 
   @Test
   public void testGetSum() {
-    assertEquals(0, zero.getSum());
-    assertEquals(38703, stat.getSum());
+    assertEquals(0, zero.sum());
+    assertEquals(38703, stat.sum());
   }
 
   @Test
@@ -76,16 +70,14 @@ public void testClear() {
     zero.clear();
     stat.clear();
 
-    assertEquals(0, zero.getMax());
-    assertEquals(zero.getMax(), stat.getMax());
-    assertEquals(0, zero.getMin());
-    assertEquals(zero.getMin(), stat.getMin());
-    assertEquals(0, zero.getSum());
-    assertEquals(zero.getSum(), stat.getSum());
+    assertEquals(0, zero.max());
+    assertEquals(zero.max(), stat.max());
+    assertEquals(0, zero.min());
+    assertEquals(zero.min(), stat.min());
+    assertEquals(0, zero.sum());
+    assertEquals(zero.sum(), stat.sum());
 
-    assertEquals(Double.NaN, zero.getAverage(), 0);
-    assertEquals(zero.getAverage(), stat.getAverage(), 0);
-    assertEquals(Double.NaN, zero.getStdDev(), 0);
-    assertEquals(zero.getStdDev(), stat.getStdDev(), 0);
+    assertEquals(Double.NaN, zero.mean(), 0);
+    assertEquals(zero.mean(), stat.mean(), 0);
   }
 }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
 
b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
index dbb7ca49ec..cc67feb484 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
@@ -19,6 +19,7 @@
 import static java.util.Objects.requireNonNull;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.List;
@@ -36,6 +37,7 @@
 import org.apache.accumulo.core.data.thrift.IterInfo;
 import org.apache.accumulo.core.iterators.IteratorUtil;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.zookeeper.ZooCache;
 import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
@@ -216,4 +218,39 @@ public ParsedIteratorConfig 
getParsedIteratorConfig(IteratorScope scope) {
 
     return pic;
   }
+
+  public static class TablesScanDispatcher {
+    public final ScanDispatcher dispatcher;
+    public final long count;
+
+    public TablesScanDispatcher(ScanDispatcher dispatcher, long count) {
+      this.dispatcher = dispatcher;
+      this.count = count;
+    }
+  }
+
+  private AtomicReference<TablesScanDispatcher> scanDispatcherRef = new 
AtomicReference<>();
+
+  public ScanDispatcher getScanDispatcher() {
+    long count = getUpdateCount();
+    TablesScanDispatcher currRef = scanDispatcherRef.get();
+    if (currRef == null || currRef.count != count) {
+      ScanDispatcher newDispatcher = 
Property.createTableInstanceFromPropertyName(this,
+          Property.TABLE_SCAN_DISPATCHER, ScanDispatcher.class, null);
+
+      Map<String,String> opts = new HashMap<>();
+      
getAllPropertiesWithPrefix(Property.TABLE_SCAN_DISPATCHER_OPTS).forEach((k, v) 
-> {
+        String optKey = 
k.substring(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey().length());
+        opts.put(optKey, v);
+      });
+
+      newDispatcher.init(Collections.unmodifiableMap(opts));
+
+      TablesScanDispatcher newRef = new TablesScanDispatcher(newDispatcher, 
count);
+      scanDispatcherRef.compareAndSet(currRef, newRef);
+      currRef = newRef;
+    }
+
+    return currRef.dispatcher;
+  }
 }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
 
b/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
index 1973fd8e24..ab41141817 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
@@ -81,11 +81,12 @@ private String _get(Property property) {
   @Override
   public String get(Property property) {
     if (Property.isFixedZooPropertyKey(property)) {
-      if (fixedProps.containsKey(property.getKey())) {
-        return fixedProps.get(property.getKey());
+      String val = fixedProps.get(property.getKey());
+      if (val != null) {
+        return val;
       } else {
         synchronized (fixedProps) {
-          String val = _get(property);
+          val = _get(property);
           fixedProps.put(property.getKey(), val);
           return val;
         }
@@ -96,6 +97,12 @@ public String get(Property property) {
     }
   }
 
+  @Override
+  public boolean isPropertySet(Property prop) {
+    return fixedProps.containsKey(prop.getKey()) || getRaw(prop.getKey()) != 
null
+        || parent.isPropertySet(prop);
+  }
+
   private String getRaw(String key) {
     String zPath = propPathPrefix + "/" + key;
     byte[] v = propCache.get(zPath);
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 6e67cf045e..14c0306e22 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -130,6 +130,7 @@
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher;
 import org.apache.accumulo.core.summary.Gatherer;
 import org.apache.accumulo.core.summary.Gatherer.FileSystemResolver;
 import org.apache.accumulo.core.summary.SummaryCollection;
@@ -250,9 +251,9 @@
 import org.apache.accumulo.tserver.scan.ScanRunState;
 import org.apache.accumulo.tserver.session.ConditionalSession;
 import org.apache.accumulo.tserver.session.MultiScanSession;
-import org.apache.accumulo.tserver.session.ScanSession;
 import org.apache.accumulo.tserver.session.Session;
 import org.apache.accumulo.tserver.session.SessionManager;
+import org.apache.accumulo.tserver.session.SingleScanSession;
 import org.apache.accumulo.tserver.session.SummarySession;
 import org.apache.accumulo.tserver.session.UpdateSession;
 import org.apache.accumulo.tserver.tablet.BulkImportCacheCleaner;
@@ -543,6 +544,16 @@ public void loadFiles(TInfo tinfo, TCredentials 
credentials, long tid, TKeyExten
       }
     }
 
+    private ScanDispatcher getScanDispatcher(KeyExtent extent) {
+      if (extent.isRootTablet() || extent.isMeta()) {
+        // dispatcher is only for user tables
+        return null;
+      }
+
+      return 
getServerConfigurationFactory().getTableConfiguration(extent.getTableId())
+          .getScanDispatcher();
+    }
+
     @Override
     public InitialScan startScan(TInfo tinfo, TCredentials credentials, 
TKeyExtent textent,
         TRange range, List<TColumn> columns, int batchSize, List<IterInfo> 
ssiList,
@@ -587,13 +598,14 @@ public InitialScan startScan(TInfo tinfo, TCredentials 
credentials, TKeyExtent t
       if (tablet == null)
         throw new NotServingTabletException(textent);
 
-      Set<Column> columnSet = new HashSet<>();
+      HashSet<Column> columnSet = new HashSet<>();
       for (TColumn tcolumn : columns) {
         columnSet.add(new Column(tcolumn));
       }
 
-      final ScanSession scanSession = new ScanSession(credentials, extent, 
columnSet, ssiList, ssio,
-          new Authorizations(authorizations), readaheadThreshold, 
batchTimeOut, context);
+      final SingleScanSession scanSession = new SingleScanSession(credentials, 
extent, columnSet,
+          ssiList, ssio, new Authorizations(authorizations), 
readaheadThreshold, batchTimeOut,
+          context);
       scanSession.scanner = tablet.createScanner(new Range(range), batchSize, 
scanSession.columnSet,
           scanSession.auths, ssiList, ssio, isolated, 
scanSession.interruptFlag,
           SamplerConfigurationImpl.fromThrift(tSamplerConfig), 
scanSession.batchTimeOut,
@@ -619,7 +631,7 @@ public ScanResult continueScan(TInfo tinfo, long scanID)
         throws NoSuchScanIDException, NotServingTabletException,
         org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
         TSampleNotPresentException {
-      ScanSession scanSession = (ScanSession) 
sessionManager.reserveSession(scanID);
+      SingleScanSession scanSession = (SingleScanSession) 
sessionManager.reserveSession(scanID);
       if (scanSession == null) {
         throw new NoSuchScanIDException();
       }
@@ -631,7 +643,7 @@ public ScanResult continueScan(TInfo tinfo, long scanID)
       }
     }
 
-    private ScanResult continueScan(TInfo tinfo, long scanID, ScanSession 
scanSession)
+    private ScanResult continueScan(TInfo tinfo, long scanID, 
SingleScanSession scanSession)
         throws NoSuchScanIDException, NotServingTabletException,
         org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
         TSampleNotPresentException {
@@ -639,7 +651,8 @@ private ScanResult continueScan(TInfo tinfo, long scanID, 
ScanSession scanSessio
       if (scanSession.nextBatchTask == null) {
         scanSession.nextBatchTask = new NextBatchTask(TabletServer.this, 
scanID,
             scanSession.interruptFlag);
-        resourceManager.executeReadAhead(scanSession.extent, 
scanSession.nextBatchTask);
+        resourceManager.executeReadAhead(scanSession.extent, 
getScanDispatcher(scanSession.extent),
+            scanSession, scanSession.nextBatchTask);
       }
 
       ScanBatch bresult;
@@ -694,7 +707,8 @@ else if (e.getCause() instanceof IOException) {
         // to client
         scanSession.nextBatchTask = new NextBatchTask(TabletServer.this, 
scanID,
             scanSession.interruptFlag);
-        resourceManager.executeReadAhead(scanSession.extent, 
scanSession.nextBatchTask);
+        resourceManager.executeReadAhead(scanSession.extent, 
getScanDispatcher(scanSession.extent),
+            scanSession, scanSession.nextBatchTask);
       }
 
       if (!scanResult.more)
@@ -705,14 +719,14 @@ else if (e.getCause() instanceof IOException) {
 
     @Override
     public void closeScan(TInfo tinfo, long scanID) {
-      final ScanSession ss = (ScanSession) 
sessionManager.removeSession(scanID);
+      final SingleScanSession ss = (SingleScanSession) 
sessionManager.removeSession(scanID);
       if (ss != null) {
         long t2 = System.currentTimeMillis();
 
         if (log.isTraceEnabled()) {
           log.trace(String.format("ScanSess tid %s %s %,d entries in %.2f 
secs, nbTimes = [%s] ",
               TServerUtils.clientAddress.get(), ss.extent.getTableId(), 
ss.entriesReturned,
-              (t2 - ss.startTime) / 1000.0, ss.nbTimes.toString()));
+              (t2 - ss.startTime) / 1000.0, ss.runStats.toString()));
         }
 
         if (scanMetrics.isEnabled()) {
@@ -818,7 +832,8 @@ private MultiScanResult continueMultiScan(TInfo tinfo, long 
scanID, MultiScanSes
 
       if (session.lookupTask == null) {
         session.lookupTask = new LookupTask(TabletServer.this, scanID);
-        resourceManager.executeReadAhead(session.threadPoolExtent, 
session.lookupTask);
+        resourceManager.executeReadAhead(session.threadPoolExtent,
+            getScanDispatcher(session.threadPoolExtent), session, 
session.lookupTask);
       }
 
       try {
@@ -1174,8 +1189,8 @@ public UpdateErrors closeUpdate(TInfo tinfo, long 
updateID) throws NoSuchScanIDE
             String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs 
lt=%.3fs ct=%.3fs)",
                 TServerUtils.clientAddress.get(), us.totalUpdates,
                 (System.currentTimeMillis() - us.startTime) / 1000.0, 
us.authTimes.toString(),
-                us.flushTime / 1000.0, us.prepareTimes.getSum() / 1000.0,
-                us.walogTimes.getSum() / 1000.0, us.commitTimes.getSum() / 
1000.0));
+                us.flushTime / 1000.0, us.prepareTimes.sum() / 1000.0, 
us.walogTimes.sum() / 1000.0,
+                us.commitTimes.sum() / 1000.0));
       }
       if (us.failures.size() > 0) {
         Entry<KeyExtent,Long> first = us.failures.entrySet().iterator().next();
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 71cebbe014..5ab6e27f4b 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -21,10 +21,15 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Queue;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
@@ -32,11 +37,17 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.IntSupplier;
 
+import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.AccumuloConfiguration.ScanExecutorConfig;
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
@@ -45,6 +56,11 @@
 import 
org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
 import 
org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher;
+import org.apache.accumulo.core.spi.scan.ScanExecutor;
+import org.apache.accumulo.core.spi.scan.ScanInfo;
+import org.apache.accumulo.core.spi.scan.ScanPrioritizer;
+import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.fate.util.LoggingRunnable;
@@ -62,14 +78,18 @@
 import org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy;
 import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
 import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
+import org.apache.accumulo.tserver.session.ScanSession;
 import org.apache.accumulo.tserver.tablet.Tablet;
 import org.apache.htrace.wrappers.TraceExecutorService;
+import org.apache.htrace.wrappers.TraceRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
 
 /**
  * ResourceManager is responsible for managing the resources of all tablets 
within a tablet server.
@@ -88,13 +108,14 @@
   private final ExecutorService migrationPool;
   private final ExecutorService assignmentPool;
   private final ExecutorService assignMetaDataPool;
-  private final ExecutorService readAheadThreadPool;
-  private final ExecutorService defaultReadAheadThreadPool;
   private final ExecutorService summaryRetrievalPool;
   private final ExecutorService summaryParitionPool;
   private final ExecutorService summaryRemotePool;
   private final Map<String,ExecutorService> threadPools = new TreeMap<>();
 
+  private final Map<String,ExecutorService> scanExecutors;
+  private final Map<String,ScanExecutor> scanExecutorChoices;
+
   private final ConcurrentHashMap<KeyExtent,RunnableStartedAt> 
activeAssignments;
 
   private final FileManager fileManager;
@@ -120,16 +141,15 @@ private ExecutorService addEs(String name, 
ExecutorService tp) {
     return tp;
   }
 
-  private ExecutorService addEs(final Property maxThreads, String name,
-      final ThreadPoolExecutor tp) {
+  private ExecutorService addEs(IntSupplier maxThreads, String name, final 
ThreadPoolExecutor tp) {
     ExecutorService result = addEs(name, tp);
     SimpleTimer.getInstance(tserver.getConfiguration()).schedule(new 
Runnable() {
       @Override
       public void run() {
         try {
-          int max = tserver.getConfiguration().getCount(maxThreads);
+          int max = maxThreads.getAsInt();
           if (tp.getMaximumPoolSize() != max) {
-            log.info("Changing {} to {}", maxThreads.getKey(), max);
+            log.info("Changing max threads for {} to {}", name, max);
             tp.setCorePoolSize(max);
             tp.setMaximumPoolSize(max);
           }
@@ -142,6 +162,16 @@ public void run() {
     return result;
   }
 
+  private ExecutorService createIdlingEs(Property max, String name, long 
timeout,
+      TimeUnit timeUnit) {
+    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+    int maxThreads = conf.getSystemConfiguration().getCount(max);
+    ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 
timeout, timeUnit, queue,
+        new NamingThreadFactory(name));
+    tp.allowCoreThreadTimeOut(true);
+    return addEs(() -> conf.getSystemConfiguration().getCount(max), name, tp);
+  }
+
   private ExecutorService createEs(int max, String name) {
     return addEs(name, Executors.newFixedThreadPool(max, new 
NamingThreadFactory(name)));
   }
@@ -150,21 +180,52 @@ private ExecutorService createEs(Property max, String 
name) {
     return createEs(max, name, new LinkedBlockingQueue<>());
   }
 
-  private ExecutorService createIdlingEs(Property max, String name, long 
timeout,
-      TimeUnit timeUnit) {
-    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
-    int maxThreads = conf.getSystemConfiguration().getCount(max);
-    ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 
timeout, timeUnit, queue,
-        new NamingThreadFactory(name));
-    tp.allowCoreThreadTimeOut(true);
-    return addEs(max, name, tp);
+  private ExecutorService createPriorityExecutor(ScanExecutorConfig sec,
+      Map<String,Queue<?>> scanExecQueues) {
+
+    BlockingQueue<Runnable> queue;
+
+    if (sec.prioritizerClass.orElse("").isEmpty()) {
+      queue = new LinkedBlockingQueue<>();
+    } else {
+      ScanPrioritizer factory = null;
+      try {
+        factory = ConfigurationTypeHelper.getClassInstance(null, 
sec.prioritizerClass.get(),
+            ScanPrioritizer.class);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+
+      if (factory == null) {
+        queue = new LinkedBlockingQueue<>();
+      } else {
+        Comparator<ScanInfo> comparator = 
factory.createComparator(sec.prioritizerOpts);
+
+        // function to extract scan scan session from runnable
+        Function<Runnable,ScanInfo> extractor = r -> 
((ScanSession.ScanMeasurer) ((TraceRunnable) r)
+            .getRunnable()).getScanInfo();
+
+        queue = new PriorityBlockingQueue<>(sec.maxThreads,
+            Comparator.comparing(extractor, comparator));
+      }
+    }
+
+    scanExecQueues.put(sec.name, queue);
+
+    return createEs(() -> sec.getCurrentMaxThreads(), "scan-" + sec.name, 
queue, sec.priority);
   }
 
-  private ExecutorService createEs(Property max, String name, 
BlockingQueue<Runnable> queue) {
-    int maxThreads = conf.getSystemConfiguration().getCount(max);
+  private ExecutorService createEs(IntSupplier maxThreadsSupplier, String name,
+      BlockingQueue<Runnable> queue, OptionalInt priority) {
+    int maxThreads = maxThreadsSupplier.getAsInt();
     ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 0L,
-        TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name));
-    return addEs(max, name, tp);
+        TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name, priority));
+    return addEs(maxThreadsSupplier, name, tp);
+  }
+
+  private ExecutorService createEs(Property max, String name, 
BlockingQueue<Runnable> queue) {
+    IntSupplier maxThreadsSupplier = () -> 
conf.getSystemConfiguration().getCount(max);
+    return createEs(maxThreadsSupplier, name, queue, OptionalInt.empty());
   }
 
   private ExecutorService createEs(int min, int max, int timeout, String name) 
{
@@ -172,6 +233,80 @@ private ExecutorService createEs(int min, int max, int 
timeout, String name) {
         new LinkedBlockingQueue<>(), new NamingThreadFactory(name)));
   }
 
+  protected Map<String,ExecutorService> createScanExecutors(Instance instance,
+      Collection<ScanExecutorConfig> scanExecCfg, Map<String,Queue<?>> 
scanExecQueues) {
+    Builder<String,ExecutorService> builder = ImmutableMap.builder();
+
+    for (ScanExecutorConfig sec : scanExecCfg) {
+      builder.put(sec.name, createPriorityExecutor(sec, scanExecQueues));
+    }
+
+    return builder.build();
+  }
+
+  private static class ScanExecutorImpl implements ScanExecutor {
+
+    private static class ConfigImpl implements ScanExecutor.Config {
+
+      final ScanExecutorConfig cfg;
+
+      public ConfigImpl(ScanExecutorConfig sec) {
+        this.cfg = sec;
+      }
+
+      @Override
+      public String getName() {
+        return cfg.name;
+      }
+
+      @Override
+      public int getMaxThreads() {
+        return cfg.maxThreads;
+      }
+
+      @Override
+      public Optional<String> getPrioritizerClass() {
+        return cfg.prioritizerClass;
+      }
+
+      @Override
+      public Map<String,String> getPrioritizerOptions() {
+        return cfg.prioritizerOpts;
+      }
+
+    }
+
+    private final ConfigImpl config;
+    private final Queue<?> queue;
+
+    ScanExecutorImpl(ScanExecutorConfig sec, Queue<?> q) {
+      this.config = new ConfigImpl(sec);
+      this.queue = q;
+    }
+
+    @Override
+    public int getQueued() {
+      return queue.size();
+    }
+
+    @Override
+    public Config getConfig() {
+      return config;
+    }
+
+  }
+
+  private Map<String,ScanExecutor> createScanExecutorChoices(
+      Collection<ScanExecutorConfig> scanExecCfg, Map<String,Queue<?>> 
scanExecQueues) {
+    Builder<String,ScanExecutor> builder = ImmutableMap.builder();
+
+    for (ScanExecutorConfig sec : scanExecCfg) {
+      builder.put(sec.name, new ScanExecutorImpl(sec, 
scanExecQueues.get(sec.name)));
+    }
+
+    return builder.build();
+  }
+
   public TabletServerResourceManager(TabletServer tserver, VolumeManager fs) {
     this.tserver = tserver;
     this.conf = tserver.getServerConfigurationFactory();
@@ -251,10 +386,6 @@ public TabletServerResourceManager(TabletServer tserver, 
VolumeManager fs) {
 
     activeAssignments = new ConcurrentHashMap<>();
 
-    readAheadThreadPool = createEs(Property.TSERV_READ_AHEAD_MAXCONCURRENT, 
"tablet read ahead");
-    defaultReadAheadThreadPool = 
createEs(Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT,
-        "metadata tablets read ahead");
-
     summaryRetrievalPool = 
createIdlingEs(Property.TSERV_SUMMARY_RETRIEVAL_THREADS,
         "summary file retriever", 60, TimeUnit.SECONDS);
     summaryRemotePool = createIdlingEs(Property.TSERV_SUMMARY_REMOTE_THREADS, 
"summary remote", 60,
@@ -262,6 +393,11 @@ public TabletServerResourceManager(TabletServer tserver, 
VolumeManager fs) {
     summaryParitionPool = 
createIdlingEs(Property.TSERV_SUMMARY_PARTITION_THREADS,
         "summary partition", 60, TimeUnit.SECONDS);
 
+    Collection<ScanExecutorConfig> scanExecCfg = acuConf.getScanExecutors();
+    Map<String,Queue<?>> scanExecQueues = new HashMap<>();
+    scanExecutors = createScanExecutors(tserver.getInstance(), scanExecCfg, 
scanExecQueues);
+    scanExecutorChoices = createScanExecutorChoices(scanExecCfg, 
scanExecQueues);
+
     int maxOpenFiles = acuConf.getCount(Property.TSERV_SCAN_MAX_OPENFILES);
 
     Cache<String,Long> fileLenCache = CacheBuilder.newBuilder()
@@ -791,13 +927,29 @@ public void executeMajorCompaction(KeyExtent tablet, 
Runnable compactionTask) {
     }
   }
 
-  public void executeReadAhead(KeyExtent tablet, Runnable task) {
+  public void executeReadAhead(KeyExtent tablet, ScanDispatcher dispatcher, 
ScanSession scanInfo,
+      Runnable task) {
+
+    task = ScanSession.wrap(scanInfo, task);
+
     if (tablet.isRootTablet()) {
       task.run();
     } else if (tablet.isMeta()) {
-      defaultReadAheadThreadPool.execute(task);
+      scanExecutors.get("meta").execute(task);
     } else {
-      readAheadThreadPool.execute(task);
+      String scanExecutorName = dispatcher.dispatch(scanInfo, 
scanExecutorChoices);
+      ExecutorService executor = scanExecutors.get(scanExecutorName);
+      if (executor == null) {
+        log.warn(
+            "For table id {}, {} dispatched to non-existant executor {} Using 
default executor.",
+            tablet.getTableId(), dispatcher.getClass().getName(), 
scanExecutorName);
+        executor = 
scanExecutors.get(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME);
+      } else if ("meta".equals(scanExecutorName)) {
+        log.warn("For table id {}, {} dispatched to meta executor. Using 
default executor.",
+            tablet.getTableId(), dispatcher.getClass().getName());
+        executor = 
scanExecutors.get(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME);
+      }
+      executor.execute(task);
     }
   }
 
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
index aee4477475..16b9c55581 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
@@ -23,7 +23,7 @@
 import org.apache.accumulo.server.util.Halt;
 import org.apache.accumulo.tserver.TabletServer;
 import org.apache.accumulo.tserver.TooManyFilesException;
-import org.apache.accumulo.tserver.session.ScanSession;
+import org.apache.accumulo.tserver.session.SingleScanSession;
 import org.apache.accumulo.tserver.tablet.ScanBatch;
 import org.apache.accumulo.tserver.tablet.Tablet;
 import org.apache.accumulo.tserver.tablet.TabletClosedException;
@@ -48,7 +48,7 @@ public NextBatchTask(TabletServer server, long scanID, 
AtomicBoolean interruptFl
   @Override
   public void run() {
 
-    final ScanSession scanSession = (ScanSession) server.getSession(scanID);
+    final SingleScanSession scanSession = (SingleScanSession) 
server.getSession(scanID);
     String oldThreadName = Thread.currentThread().getName();
 
     try {
@@ -69,10 +69,7 @@ public void run() {
         return;
       }
 
-      long t1 = System.currentTimeMillis();
       ScanBatch batch = scanSession.scanner.read();
-      long t2 = System.currentTimeMillis();
-      scanSession.nbTimes.addStat(t2 - t1);
 
       // there should only be one thing on the queue at a time, so
       // it should be ok to call add()
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
index 4285a51446..f8db1f42c5 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
@@ -21,7 +21,6 @@
 import java.util.Map;
 
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
-import org.apache.accumulo.core.data.Column;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.data.thrift.IterInfo;
@@ -30,13 +29,9 @@
 import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.tserver.scan.ScanTask;
 
-public class MultiScanSession extends Session {
+public class MultiScanSession extends ScanSession {
   public final KeyExtent threadPoolExtent;
-  public final HashSet<Column> columnSet = new HashSet<>();
   public final Map<KeyExtent,List<Range>> queries;
-  public final List<IterInfo> ssiList;
-  public final Map<String,Map<String,String>> ssio;
-  public final Authorizations auths;
   public final SamplerConfiguration samplerConfig;
   public final long batchTimeOut;
   public final String context;
@@ -53,17 +48,24 @@ public MultiScanSession(TCredentials credentials, KeyExtent 
threadPoolExtent,
       Map<KeyExtent,List<Range>> queries, List<IterInfo> ssiList,
       Map<String,Map<String,String>> ssio, Authorizations authorizations,
       SamplerConfiguration samplerConfig, long batchTimeOut, String context) {
-    super(credentials);
+    super(credentials, new HashSet<>(), ssiList, ssio, authorizations);
     this.queries = queries;
-    this.ssiList = ssiList;
-    this.ssio = ssio;
-    this.auths = authorizations;
     this.threadPoolExtent = threadPoolExtent;
     this.samplerConfig = samplerConfig;
     this.batchTimeOut = batchTimeOut;
     this.context = context;
   }
 
+  @Override
+  public Type getScanType() {
+    return Type.MULTI;
+  }
+
+  @Override
+  public String getTableId() {
+    return threadPoolExtent.getTableId().canonicalID();
+  }
+
   @Override
   public boolean cleanup() {
     if (lookupTask != null)
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
index a55de1e39f..5cde06c657 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
@@ -16,64 +16,148 @@
  */
 package org.apache.accumulo.tserver.session;
 
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.data.Column;
-import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.data.thrift.IterInfo;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.spi.common.IteratorConfiguration;
+import org.apache.accumulo.core.spi.common.Stats;
+import org.apache.accumulo.core.spi.scan.ScanInfo;
 import org.apache.accumulo.core.util.Stat;
-import org.apache.accumulo.tserver.scan.ScanTask;
-import org.apache.accumulo.tserver.tablet.ScanBatch;
-import org.apache.accumulo.tserver.tablet.Scanner;
-
-public class ScanSession extends Session {
-  public final Stat nbTimes = new Stat();
-  public final KeyExtent extent;
-  public final Set<Column> columnSet;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public abstract class ScanSession extends Session implements ScanInfo {
+
+  public static class ScanMeasurer implements Runnable {
+
+    private ScanSession session;
+    private Runnable task;
+
+    ScanMeasurer(ScanSession session, Runnable task) {
+      this.session = session;
+      this.task = task;
+    }
+
+    @Override
+    public void run() {
+      long t1 = System.currentTimeMillis();
+      task.run();
+      long t2 = System.currentTimeMillis();
+      session.finishedRun(t1, t2);
+    }
+
+    public ScanInfo getScanInfo() {
+      return session;
+    }
+  }
+
+  public static ScanMeasurer wrap(ScanSession scanInfo, Runnable r) {
+    return new ScanMeasurer(scanInfo, r);
+  }
+
+  private OptionalLong lastRunTime = OptionalLong.empty();
+  private Stat idleStats = new Stat();
+  public Stat runStats = new Stat();
+
+  public final HashSet<Column> columnSet;
   public final List<IterInfo> ssiList;
   public final Map<String,Map<String,String>> ssio;
   public final Authorizations auths;
-  public final AtomicBoolean interruptFlag = new AtomicBoolean();
-  public long entriesReturned = 0;
-  public long batchCount = 0;
-  public volatile ScanTask<ScanBatch> nextBatchTask;
-  public Scanner scanner;
-  public final long readaheadThreshold;
-  public final long batchTimeOut;
-  public final String context;
-
-  public ScanSession(TCredentials credentials, KeyExtent extent, Set<Column> 
columnSet,
-      List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, 
Authorizations authorizations,
-      long readaheadThreshold, long batchTimeOut, String context) {
+
+  ScanSession(TCredentials credentials, HashSet<Column> cols, List<IterInfo> 
ssiList,
+      Map<String,Map<String,String>> ssio, Authorizations auths) {
     super(credentials);
-    this.extent = extent;
-    this.columnSet = columnSet;
+    this.columnSet = cols;
     this.ssiList = ssiList;
     this.ssio = ssio;
-    this.auths = authorizations;
-    this.readaheadThreshold = readaheadThreshold;
-    this.batchTimeOut = batchTimeOut;
-    this.context = context;
+    this.auths = auths;
+  }
+
+  @Override
+  public long getCreationTime() {
+    return startTime;
+  }
+
+  @Override
+  public OptionalLong getLastRunTime() {
+    return lastRunTime;
   }
 
   @Override
-  public boolean cleanup() {
-    final boolean ret;
-    try {
-      if (nextBatchTask != null)
-        nextBatchTask.cancel(true);
-    } finally {
-      if (scanner != null)
-        ret = scanner.close();
-      else
-        ret = true;
+  public Stats getRunTimeStats() {
+    return runStats;
+  }
+
+  @Override
+  public Stats getIdleTimeStats() {
+    return idleStats;
+  }
+
+  @Override
+  public Stats getIdleTimeStats(long currentTime) {
+    long idleTime = currentTime - getLastRunTime().orElse(getCreationTime());
+    Preconditions.checkArgument(idleTime >= 0);
+    Stat copy = idleStats.copy();
+    copy.addStat(idleTime);
+    return copy;
+  }
+
+  @Override
+  public Set<Column> getFetchedColumns() {
+    return Collections.unmodifiableSet(columnSet);
+  }
+
+  private class IterConfImpl implements IteratorConfiguration {
+
+    private IterInfo ii;
+
+    IterConfImpl(IterInfo ii) {
+      this.ii = ii;
+    }
+
+    @Override
+    public String getIteratorClass() {
+      return ii.className;
+    }
+
+    @Override
+    public String getName() {
+      return ii.iterName;
+    }
+
+    @Override
+    public int getPriority() {
+      return ii.priority;
+    }
+
+    @Override
+    public Map<String,String> getOptions() {
+      Map<String,String> opts = ssio.get(ii.iterName);
+      return opts == null || opts.isEmpty() ? Collections.emptyMap()
+          : Collections.unmodifiableMap(opts);
     }
-    return ret;
   }
 
+  @Override
+  public Collection<IteratorConfiguration> getClientScanIterators() {
+    return Lists.transform(ssiList, IterConfImpl::new);
+  }
+
+  public void finishedRun(long start, long finish) {
+    long idleTime = start - getLastRunTime().orElse(getCreationTime());
+    long runTime = finish - start;
+    lastRunTime = OptionalLong.of(finish);
+    idleStats.addStat(idleTime);
+    runStats.addStat(runTime);
+  }
 }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
index eed45cf073..6bd52e4686 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
@@ -26,7 +26,7 @@
   }
 
   public final String client;
-  long lastAccessTime;
+  public long lastAccessTime;
   public long startTime;
   State state = State.NEW;
   private final TCredentials credentials;
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index 724d23c18f..1ba2491d5c 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@ -315,8 +315,8 @@ public void run() {
       ScanTask nbt = null;
       Table.ID tableID = null;
 
-      if (session instanceof ScanSession) {
-        ScanSession ss = (ScanSession) session;
+      if (session instanceof SingleScanSession) {
+        SingleScanSession ss = (SingleScanSession) session;
         nbt = ss.nextBatchTask;
         tableID = ss.extent.getTableId();
       } else if (session instanceof MultiScanSession) {
@@ -362,8 +362,8 @@ public void run() {
 
     for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), 
copiedIdleSessions)) {
       Session session = entry.getValue();
-      if (session instanceof ScanSession) {
-        ScanSession ss = (ScanSession) session;
+      if (session instanceof SingleScanSession) {
+        SingleScanSession ss = (SingleScanSession) session;
 
         ScanState state = ScanState.RUNNING;
 
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleScanSession.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleScanSession.java
new file mode 100644
index 0000000000..fb3e29fe56
--- /dev/null
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleScanSession.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.session;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.tserver.scan.ScanTask;
+import org.apache.accumulo.tserver.tablet.ScanBatch;
+import org.apache.accumulo.tserver.tablet.Scanner;
+
+public class SingleScanSession extends ScanSession {
+  public final KeyExtent extent;
+  public final AtomicBoolean interruptFlag = new AtomicBoolean();
+  public long entriesReturned = 0;
+  public long batchCount = 0;
+  public volatile ScanTask<ScanBatch> nextBatchTask;
+  public Scanner scanner;
+  public final long readaheadThreshold;
+  public final long batchTimeOut;
+  public final String context;
+
+  public SingleScanSession(TCredentials credentials, KeyExtent extent, 
HashSet<Column> columnSet,
+      List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, 
Authorizations authorizations,
+      long readaheadThreshold, long batchTimeOut, String context) {
+    super(credentials, columnSet, ssiList, ssio, authorizations);
+    this.extent = extent;
+    this.readaheadThreshold = readaheadThreshold;
+    this.batchTimeOut = batchTimeOut;
+    this.context = context;
+  }
+
+  @Override
+  public Type getScanType() {
+    return Type.SINGLE;
+  }
+
+  @Override
+  public String getTableId() {
+    return extent.getTableId().canonicalID();
+  }
+
+  @Override
+  public boolean cleanup() {
+    final boolean ret;
+    try {
+      if (nextBatchTask != null)
+        nextBatchTask.cancel(true);
+    } finally {
+      if (scanner != null)
+        ret = scanner.close();
+      else
+        ret = true;
+    }
+    return ret;
+  }
+}
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 2ef408754c..dc5d6842fd 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -1537,7 +1537,14 @@ public synchronized void 
initiateMajorCompaction(MajorCompactionReason reason) {
 
     majorCompactionQueued.add(reason);
 
-    getTabletResources().executeMajorCompaction(getExtent(), new 
CompactionRunner(this, reason));
+    try {
+      getTabletResources().executeMajorCompaction(getExtent(), new 
CompactionRunner(this, reason));
+    } catch (RuntimeException t) {
+      log.debug("removing {} because we encountered an exception enqueing the 
CompactionRunner",
+          reason, t);
+      majorCompactionQueued.remove(reason);
+      throw t;
+    }
   }
 
   /**
diff --git a/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java 
b/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java
index e807dfb17f..320b9c23d2 100644
--- a/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java
+++ b/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java
@@ -68,7 +68,7 @@ public static void main(String[] args) throws Exception {
     }
 
     for (Entry<String,Stat> entry : stats.entrySet()) {
-      System.out.printf("%20s : %6.2f\n", entry.getKey(), 
entry.getValue().getAverage());
+      System.out.printf("%20s : %6.2f\n", entry.getKey(), 
entry.getValue().mean());
     }
 
   }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
 
b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
index cbeb1e4fc8..907f2c60a0 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
@@ -606,9 +606,8 @@ private static void calcTabletStats(Connector conn, String 
table, Authorizations
   }
 
   private static void printStat(String desc, Stat s) {
-    System.out.printf(
-        "\t\tDescription: [%30s]  average: %,6.2f  std dev: %,6.2f  min: %,d  
max: %,d %n", desc,
-        s.getAverage(), s.getStdDev(), s.getMin(), s.getMax());
+    System.out.printf("\t\tDescription: [%30s]  average: %,6.2f min: %,d  max: 
%,d %n", desc,
+        s.mean(), s.min(), s.max());
 
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to