keith-turner closed pull request #549: ACCUMULO-4074 Added support for
multiple scan executors
URL: https://github.com/apache/accumulo/pull/549
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/pom.xml b/core/pom.xml
index cf45283272..4535f502d2 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -212,6 +212,25 @@
</allows>
</configuration>
</execution>
+ <execution>
+ <id>apilyzer-spi</id>
+ <goals>
+ <goal>analyze</goal>
+ </goals>
+ <configuration>
+
<outputFile>${project.build.directory}/apilyzer-spi.txt</outputFile>
+ <includes>
+ <include>org[.]apache[.]accumulo[.]core[.]spi[.].*</include>
+ </includes>
+ <excludes />
+ <allows>
+ <allow>org[.]apache[.]hadoop[.]io[.]Text</allow>
+
<allow>org[.]apache[.]accumulo[.]core[.]client(?!.*[.](impl|thrift)[.].*)[.].*</allow>
+
<allow>org[.]apache[.]accumulo[.]core[.]data(?!.*[.](impl|thrift)[.].*)[.].*</allow>
+
<allow>org[.]apache[.]accumulo[.]core[.]security(?!.*[.](crypto)[.].*)[.].*</allow>
+ </allows>
+ </configuration>
+ </execution>
</executions>
</plugin>
<plugin>
diff --git
a/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
b/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
index 5a9ce21724..5427d79310 100644
---
a/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
+++
b/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
@@ -16,12 +16,17 @@
*/
package org.apache.accumulo.core.conf;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -29,10 +34,12 @@
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.conf.PropertyType.PortRange;
+import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher;
import org.apache.accumulo.core.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
/**
@@ -348,6 +355,135 @@ public int getMaxFilesPerTablet() {
return maxFilesPerTablet;
}
+ public class ScanExecutorConfig {
+ public final String name;
+ public final int maxThreads;
+ public final OptionalInt priority;
+ public final Optional<String> prioritizerClass;
+ public final Map<String,String> prioritizerOpts;
+
+ public ScanExecutorConfig(String name, int maxThreads, OptionalInt
priority,
+ Optional<String> comparatorFactory, Map<String,String>
comparatorFactoryOpts) {
+ this.name = name;
+ this.maxThreads = maxThreads;
+ this.priority = priority;
+ this.prioritizerClass = comparatorFactory;
+ this.prioritizerOpts = comparatorFactoryOpts;
+ }
+
+ /**
+ * Re-reads the max threads from the configuration that created this class
+ */
+ public int getCurrentMaxThreads() {
+ Integer depThreads = getDeprecatedScanThreads(name);
+ if (depThreads != null) {
+ return depThreads;
+ }
+
+ String prop = Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + name + "."
+ SCAN_EXEC_THREADS;
+ String val =
getAllPropertiesWithPrefix(Property.TSERV_SCAN_EXECUTORS_PREFIX).get(prop);
+ return Integer.parseInt(val);
+ }
+ }
+
+ public boolean isPropertySet(Property prop) {
+ throw new UnsupportedOperationException();
+ }
+
+ @SuppressWarnings("deprecation")
+ Integer getDeprecatedScanThreads(String name) {
+
+ Property prop;
+ Property deprecatedProp;
+
+ if (name.equals(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME)) {
+ prop = Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS;
+ deprecatedProp = Property.TSERV_READ_AHEAD_MAXCONCURRENT;
+ } else if (name.equals("meta")) {
+ prop = Property.TSERV_SCAN_EXECUTORS_META_THREADS;
+ deprecatedProp = Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT;
+ } else {
+ return null;
+ }
+
+ if (!isPropertySet(prop) && isPropertySet(deprecatedProp)) {
+ log.warn("Property {} is deprecated, use {} instead.", prop.getKey(),
+ deprecatedProp.getKey());
+ return Integer.valueOf(get(deprecatedProp));
+ } else if (isPropertySet(prop) && isPropertySet(deprecatedProp)) {
+ log.warn("Deprecated property {} ignored because {} is set",
deprecatedProp.getKey(),
+ prop.getKey());
+ }
+
+ return null;
+ }
+
+ private static final String SCAN_EXEC_THREADS = "threads";
+ private static final String SCAN_EXEC_PRIORITY = "priority";
+ private static final String SCAN_EXEC_PRIORITIZER = "prioritizer";
+ private static final String SCAN_EXEC_PRIORITIZER_OPTS = "prioritizer.opts.";
+
+ public Collection<ScanExecutorConfig> getScanExecutors() {
+
+ Map<String,Map<String,String>> propsByName = new HashMap<>();
+
+ List<ScanExecutorConfig> scanResources = new ArrayList<>();
+
+ for (Entry<String,String> entry : getAllPropertiesWithPrefix(
+ Property.TSERV_SCAN_EXECUTORS_PREFIX).entrySet()) {
+
+ String suffix = entry.getKey()
+ .substring(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey().length());
+ String[] tokens = suffix.split("\\.", 2);
+ String name = tokens[0];
+
+ propsByName.computeIfAbsent(name, k -> new HashMap<>()).put(tokens[1],
entry.getValue());
+ }
+
+ for (Entry<String,Map<String,String>> entry : propsByName.entrySet()) {
+ String name = entry.getKey();
+ Integer threads = null;
+ Integer prio = null;
+ String prioritizerClass = null;
+ Map<String,String> prioritizerOpts = new HashMap<>();
+
+ for (Entry<String,String> subEntry : entry.getValue().entrySet()) {
+ String opt = subEntry.getKey();
+ String val = subEntry.getValue();
+
+ if (opt.equals(SCAN_EXEC_THREADS)) {
+ Integer depThreads = getDeprecatedScanThreads(name);
+ if (depThreads == null) {
+ threads = Integer.parseInt(val);
+ } else {
+ threads = depThreads;
+ }
+ } else if (opt.equals(SCAN_EXEC_PRIORITY)) {
+ prio = Integer.parseInt(val);
+ } else if (opt.equals(SCAN_EXEC_PRIORITIZER)) {
+ prioritizerClass = val;
+ } else if (opt.startsWith(SCAN_EXEC_PRIORITIZER_OPTS)) {
+ String key = opt.substring(SCAN_EXEC_PRIORITIZER_OPTS.length());
+ if (key.isEmpty()) {
+ throw new IllegalStateException("Invalid scan executor option : "
+ opt);
+ }
+ prioritizerOpts.put(key, val);
+ } else {
+ throw new IllegalStateException("Unkown scan executor option : " +
opt);
+ }
+ }
+
+ Preconditions.checkArgument(threads != null && threads > 0,
+ "Scan resource %s incorrectly specified threads", name);
+
+ scanResources.add(new ScanExecutorConfig(name, threads,
+ prio == null ? OptionalInt.empty() : OptionalInt.of(prio),
+ Optional.ofNullable(prioritizerClass), prioritizerOpts));
+ }
+
+ return scanResources;
+ }
+
/**
* Invalidates the <code>ZooCache</code> used for storage and quick
retrieval of properties for
* this configuration.
diff --git
a/core/src/main/java/org/apache/accumulo/core/conf/DefaultConfiguration.java
b/core/src/main/java/org/apache/accumulo/core/conf/DefaultConfiguration.java
index fc2891e2ad..54b87ccbda 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/DefaultConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/DefaultConfiguration.java
@@ -52,4 +52,9 @@ public void getProperties(Map<String,String> props,
Predicate<String> filter) {
resolvedProps.entrySet().stream().filter(p -> filter.test(p.getKey()))
.forEach(e -> props.put(e.getKey(), e.getValue()));
}
+
+ @Override
+ public boolean isPropertySet(Property prop) {
+ return false;
+ }
}
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index ed17bedbd4..b7ff210fb0 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -30,6 +30,9 @@
import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher;
+import org.apache.accumulo.core.spi.scan.ScanPrioritizer;
+import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher;
import org.apache.accumulo.core.util.format.DefaultFormatter;
import org.apache.accumulo.core.util.interpret.DefaultScanInterpreter;
import org.apache.accumulo.start.classloader.AccumuloClassLoader;
@@ -430,12 +433,37 @@
"When a tablet server's SimpleTimer thread triggers to check idle"
+ " sessions, this configurable option will be used to evaluate
update"
+ " sessions to determine if they can be closed due to inactivity"),
+ @Deprecated
TSERV_READ_AHEAD_MAXCONCURRENT("tserver.readahead.concurrent.max", "16",
PropertyType.COUNT,
- "The maximum number of concurrent read ahead that will execute. This
effectively"
- + " limits the number of long running scans that can run
concurrently per tserver."),
+ "This property is deprecated since 2.0.0, use
tserver.scan.executors.default.threads "
+ + "instead. The maximum number of concurrent read ahead that will
execute. This "
+ + "effectively limits the number of long running scans that can run
concurrently "
+ + "per tserver.\""),
+ @Deprecated
TSERV_METADATA_READ_AHEAD_MAXCONCURRENT("tserver.metadata.readahead.concurrent.max",
"8",
PropertyType.COUNT,
- "The maximum number of concurrent metadata read ahead that will
execute."),
+ "This property is deprecated since 2.0.0, use
tserver.scan.executors.meta.threads instead. "
+ + "The maximum number of concurrent metadata read ahead that will
execute."),
+ TSERV_SCAN_EXECUTORS_PREFIX("tserver.scan.executors.", null,
PropertyType.PREFIX,
+ "Prefix for defining executors to service scans. For each executor the
number of threads, "
+ + "thread priority, and an optional prioritizer can be configured.
The prioritizer "
+ + "determines which scan an executor should run first and must
implement "
+ + ScanPrioritizer.class.getName() + ". Tables can select an executor
by setting"
+ + " table.scan.dispatcher. To configure a new executor, set "
+ + "tserver.scan.executors.<name>.threads=<number>. Optionally, can
also set "
+ + "tserver.scan.executors.<name>.priority=<number 1 to 10>, "
+ + "tserver.scan.executors.<name>.prioritizer=<class name>, and "
+ + "tserver.scan.executors.<name>.prioritizer.opts.<key>=<value>"),
+
TSERV_SCAN_EXECUTORS_DEFAULT_THREADS("tserver.scan.executors.default.threads",
"16",
+ PropertyType.COUNT,
+ "The number of threads for the scan executor that tables use by
default."),
+
TSERV_SCAN_EXECUTORS_DEFAULT_PRIORITIZER("tserver.scan.executors.default.prioritizer",
"",
+ PropertyType.STRING,
+ "Prioritizer for the default scan executor. Defaults to none which "
+ + "results in FIFO priority. Set to a class that implements "
+ + ScanPrioritizer.class.getName() + " to configure one."),
+ TSERV_SCAN_EXECUTORS_META_THREADS("tserver.scan.executors.meta.threads",
"8", PropertyType.COUNT,
+ "The number of threads for the metadata table scan executor."),
TSERV_MIGRATE_MAXCONCURRENT("tserver.migrations.concurrent.max", "1",
PropertyType.COUNT,
"The maximum number of concurrent tablet migrations for a tablet
server"),
TSERV_MAJC_MAXCONCURRENT("tserver.compaction.major.concurrent.max", "3",
PropertyType.COUNT,
@@ -664,6 +692,19 @@
PropertyType.BYTES,
"The max RFile size used for a merging minor compaction. The default"
+ " value of 0 disables a max file size."),
+ TABLE_SCAN_DISPATCHER("table.scan.dispatcher",
SimpleScanDispatcher.class.getName(),
+ PropertyType.CLASSNAME,
+ "This class is used to dynamically dispatch scans to configured scan
executors. This setting"
+ + " defaults to " + SimpleScanDispatcher.class.getSimpleName()
+ + " which dispatches to an executor"
+ + " named 'default' when it is optionless. Setting the option "
+ + "'table.scan.dispatcher.opts.executor=<name>' causes "
+ + SimpleScanDispatcher.class.getSimpleName() + " to dispatch to the
specified executor. "
+ + "It has more options listed in its javadoc. Configured classes
must implement "
+ + ScanDispatcher.class.getName() + ". This property is ignored for
the root and metadata"
+ + " table. The metadata table always dispatches to a scan executor
named `meta`."),
+ TABLE_SCAN_DISPATCHER_OPTS("table.scan.dispatcher.opts.", null,
PropertyType.PREFIX,
+ "Options for the table scan dispatcher"),
TABLE_SCAN_MAXMEM("table.scan.max.memory", "512K", PropertyType.BYTES,
"The maximum amount of memory that will be used to cache results of a
client query/scan. "
+ "Once this limit is reached, the buffered data is sent to the
client."),
@@ -1148,7 +1189,8 @@ public static boolean isValidTablePropertyKey(String key)
{
|| key.startsWith(Property.TABLE_REPLICATION_TARGET.getKey())
|| key.startsWith(Property.TABLE_ARBITRARY_PROP_PREFIX.getKey())
|| key.startsWith(TABLE_SAMPLER_OPTS.getKey())
- || key.startsWith(TABLE_SUMMARIZER_PREFIX.getKey())));
+ || key.startsWith(TABLE_SUMMARIZER_PREFIX.getKey())
+ || key.startsWith(TABLE_SCAN_DISPATCHER_OPTS.getKey())));
}
/**
diff --git
a/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java
b/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java
index 001b1dbbba..cf0fe25353 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/SiteConfiguration.java
@@ -33,6 +33,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
/**
* An {@link AccumuloConfiguration} which first loads any properties set on
the command-line (using
* the -o option) and then from an XML file, usually accumulo-site.xml. This
implementation supports
@@ -164,6 +166,14 @@ public String get(Property property) {
return value;
}
+ @Override
+ public boolean isPropertySet(Property prop) {
+ Preconditions.checkArgument(!prop.isSensitive(),
+ "This method not implemented for sensitive props");
+ return CliConfiguration.get(prop) != null ||
staticConfigs.containsKey(prop.getKey())
+ || getXmlConfig().get(prop.getKey()) != null ||
parent.isPropertySet(prop);
+ }
+
@Override
public void getProperties(Map<String,String> props, Predicate<String>
filter) {
getProperties(props, filter, true);
diff --git
a/core/src/main/java/org/apache/accumulo/core/spi/common/IteratorConfiguration.java
b/core/src/main/java/org/apache/accumulo/core/spi/common/IteratorConfiguration.java
new file mode 100644
index 0000000000..40f05e5050
--- /dev/null
+++
b/core/src/main/java/org/apache/accumulo/core/spi/common/IteratorConfiguration.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.common;
+
+import java.util.Map;
+
+/**
+ * Provides information about a configured Accumulo Iterator
+ *
+ * @since 2.0.0
+ */
+public interface IteratorConfiguration {
+ String getIteratorClass();
+
+ String getName();
+
+ int getPriority();
+
+ Map<String,String> getOptions();
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/common/Stats.java
b/core/src/main/java/org/apache/accumulo/core/spi/common/Stats.java
new file mode 100644
index 0000000000..a1b9322129
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/common/Stats.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.common;
+
+/**
+ * @since 2.0.0
+ */
+public interface Stats {
+
+ /**
+ * @return the minimum data point seen, or 0 if no data was seen
+ */
+ long min();
+
+ /**
+ * @return the maximum data point seen, or 0 if no data was seen
+ */
+ long max();
+
+ /**
+ * @return the mean of the data points seen, or {@link Double#NaN} if no
data was seen
+ */
+ double mean();
+
+ /**
+ * @return the sum of the data points seen, or 0 if no data was seen
+ */
+ long sum();
+
+ /**
+ * @return the number of data points seen
+ */
+ long num();
+}
diff --git
a/core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java
b/core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java
new file mode 100644
index 0000000000..c567460bf0
--- /dev/null
+++
b/core/src/main/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Comparator;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Prioritize scans based on the ratio of runTime/idleTime. Scans with a lower
ratio have a higher
+ * priority. When the ratio is equal, the scan with the oldest last run time
has the highest
+ * priority. If neither have run, then the oldest gets priority.
+ *
+ * @since 2.0.0
+ */
+public class IdleRatioScanPrioritizer implements ScanPrioritizer {
+ private static double idleRatio(long currTime, ScanInfo si) {
+ double totalRunTime = si.getRunTimeStats().sum();
+ double totalIdleTime = Math.max(1, si.getIdleTimeStats(currTime).sum());
+ return totalRunTime / totalIdleTime;
+ }
+
+ @Override
+ public Comparator<ScanInfo> createComparator(Map<String,String> options) {
+ Preconditions.checkArgument(options.isEmpty());
+
+ Comparator<ScanInfo> c1 = (si1, si2) -> {
+ long currTime = System.currentTimeMillis();
+ return Double.compare(idleRatio(currTime, si1), idleRatio(currTime,
si2));
+ };
+
+ return c1.thenComparingLong(si -> si.getLastRunTime().orElse(0))
+ .thenComparingLong(si -> si.getCreationTime());
+ }
+}
diff --git
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java
new file mode 100644
index 0000000000..a3bc3f1e10
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanDispatcher.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A per table scan dispatcher that decides which executor should be used to
processes a scan. For
+ * information about configuring, find the documentation for the {@code
table.scan.dispatcher} and
+ * {@code table.scan.dispatcher.opts.} properties.
+ *
+ * @since 2.0.0
+ */
+public interface ScanDispatcher {
+ /**
+ * This method is called once after a ScanDispatcher is instantiated.
+ *
+ * @param options
+ * The configured options. For example if the table properties
+ * {@code table.scan.dispatcher.opts.p1=abc} and
+ * {@code table.scan.dispatcher.opts.p9=123} were set, then this
map would contain
+ * {@code p1=abc} and {@code p9=123}.
+ */
+ public default void init(Map<String,String> options) {
+ Preconditions.checkArgument(options.isEmpty(), "No options expected");
+ }
+
+ /**
+ * @param scanInfo
+ * Information about the scan.
+ * @param scanExecutors
+ * Information about the currently configured executors.
+ * @return Should return one of the executors named in scanExecutors.keySet()
+ */
+ String dispatch(ScanInfo scanInfo, Map<String,ScanExecutor> scanExecutors);
+}
diff --git
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanExecutor.java
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanExecutor.java
new file mode 100644
index 0000000000..a55b090b9b
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanExecutor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Interface for obtaining information about a scan executor
+ *
+ * @since 2.0.0
+ */
+public interface ScanExecutor {
+
+ interface Config {
+ /**
+ * @return the unique name used to identified executor in config
+ */
+ String getName();
+
+ /**
+ * @return the max number of threads that were configured
+ */
+ int getMaxThreads();
+
+ /**
+ * @return the prioritizer that was configured
+ */
+ Optional<String> getPrioritizerClass();
+
+ /**
+ * @return the prioritizer options
+ */
+ Map<String,String> getPrioritizerOptions();
+ }
+
+ /**
+ * @return The number of task queued for the executor
+ */
+ int getQueued();
+
+ /**
+ * @return The configuration used to create the executor
+ */
+ Config getConfig();
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanInfo.java
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanInfo.java
new file mode 100644
index 0000000000..7961c21b3c
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanInfo.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Collection;
+import java.util.OptionalLong;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.spi.common.IteratorConfiguration;
+import org.apache.accumulo.core.spi.common.Stats;
+
+/**
+ * Provides information about an active Accumulo scan against a tablet.
Accumulo scans operate by
+ * repeatedly gathering batches of data and returning those to the client.
+ *
+ * <p>
+ * All times are in milliseconds and obtained using System.currentTimeMillis().
+ *
+ * @since 2.0.0
+ */
+public interface ScanInfo {
+
+ enum Type {
+ /**
+ * A single range scan started using a {@link Scanner}
+ */
+ SINGLE,
+ /**
+ * A multi range scan started using a {@link BatchScanner}
+ */
+ MULTI
+ }
+
+ Type getScanType();
+
+ String getTableId();
+
+ /**
+ * Returns the first time a tablet knew about a scan over its portion of
data.
+ */
+ long getCreationTime();
+
+ /**
+ * If the scan has run, returns the last run time.
+ */
+ OptionalLong getLastRunTime();
+
+ /**
+ * Returns timing statistics about running and gathering a batches of data.
+ */
+ Stats getRunTimeStats();
+
+ /**
+ * Returns statistics about the time between running. These stats are only
about the idle times
+ * before the last run time. The idle time after the last run time are not
included. If the scan
+ * has never run, then there are no stats.
+ */
+ Stats getIdleTimeStats();
+
+ /**
+ * This method is similar to {@link #getIdleTimeStats()}, but it also
includes the time period
+ * between the last run time and now in the stats. If the scan has never
run, then the stats are
+ * computed using only {@code currentTime - creationTime}.
+ */
+ Stats getIdleTimeStats(long currentTime);
+
+ /**
+ * This method returns what column were fetched by a scan. When a family is
fetched, a Column
+ * object where everything but the family is null is in the set.
+ *
+ * <p>
+ * The following example code shows how this method can be used to check if
a family was fetched
+ * or a family+qualifier was fetched. If continually checking for the same
column, should probably
+ * create a constant.
+ *
+ * <pre>
+ * <code>
+ * boolean wasFamilyFetched(ScanInfo si, byte[] fam) {
+ * Column family = new Column(fam, null, null);
+ * return si.getFetchedColumns().contains(family);
+ * }
+ *
+ * boolean wasColumnFetched(ScanInfo si, byte[] fam, byte[] qual) {
+ * Column col = new Column(fam, qual, null);
+ * return si.getFetchedColumns().contains(col);
+ * }
+ * </code>
+ * </pre>
+ *
+ *
+ * @return The family and family+qualifier pairs fetched.
+ */
+ Set<Column> getFetchedColumns();
+
+ /**
+ * @return iterators that where configured on the client side scanner
+ */
+ Collection<IteratorConfiguration> getClientScanIterators();
+}
diff --git
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java
new file mode 100644
index 0000000000..51a4254a32
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanPrioritizer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Comparator;
+import java.util.Map;
+
+/**
+ * A factory for creating comparators used for prioritizing scans. For
information about
+ * configuring, find the documentation for the {@code tserver.scan.executors.}
property.
+ *
+ * @since 2.0.0
+ */
+public interface ScanPrioritizer {
+ Comparator<ScanInfo> createComparator(Map<String,String> options);
+}
diff --git
a/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
new file mode 100644
index 0000000000..96e7d2cf17
--- /dev/null
+++
b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+/**
+ * If no options are given, then this will dispatch to an executor named
{@code default}. This
+ * dispatcher supports the following options.
+ *
+ * <UL>
+ * <LI>{@code table.scan.dispatcher.opts.executor=<scan executor name>} :
dispatches all scans to
+ * the named executor.</LI>
+ * <LI>{@code table.scan.dispatcher.opts.multi_executor=<scan executor name>}
: dispatches batch
+ * scans to the named executor.</LI>
+ * <LI>{@code table.scan.dispatcher.opts.single_executor=<scan executor name>}
: dispatches regular
+ * scans to the named executor.</LI>
+ * </UL>
+ *
+ * The {@code multi_executor} and {@code single_executor} options override the
{@code executor}
+ * option.
+ */
+
+public class SimpleScanDispatcher implements ScanDispatcher {
+
+ private final Set<String> VALID_OPTS = ImmutableSet.of("executor",
"multi_executor",
+ "single_executor");
+ private String multiExecutor;
+ private String singleExecutor;
+
+ public static final String DEFAULT_SCAN_EXECUTOR_NAME = "default";
+
+ @Override
+ public void init(Map<String,String> options) {
+ Set<String> invalidOpts = Sets.difference(options.keySet(), VALID_OPTS);
+ Preconditions.checkArgument(invalidOpts.size() == 0, "Invalid options :
%s", invalidOpts);
+
+ String base = options.getOrDefault("executor", DEFAULT_SCAN_EXECUTOR_NAME);
+ multiExecutor = options.getOrDefault("multi_executor", base);
+ singleExecutor = options.getOrDefault("single_executor", base);
+ }
+
+ @Override
+ public String dispatch(ScanInfo scanInfo, Map<String,ScanExecutor>
scanExecutors) {
+ switch (scanInfo.getScanType()) {
+ case MULTI:
+ return multiExecutor;
+ case SINGLE:
+ return singleExecutor;
+ default:
+ throw new IllegalArgumentException("Unexpected scan type " +
scanInfo.getScanType());
+
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
b/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
new file mode 100644
index 0000000000..ed9c150792
--- /dev/null
+++
b/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AccumuloUncaughtExceptionHandler implements
UncaughtExceptionHandler {
+
+ private static final Logger log =
LoggerFactory.getLogger(AccumuloUncaughtExceptionHandler.class);
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ log.error(String.format("Caught an exception in %s. Shutting down.", t),
e);
+ }
+
+}
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
b/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
index d8b307c36c..57d429bfa8 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
@@ -16,6 +16,8 @@
*/
package org.apache.accumulo.core.util;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.OptionalInt;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
@@ -26,16 +28,31 @@
public class NamingThreadFactory implements ThreadFactory {
private static final Logger log =
LoggerFactory.getLogger(NamingThreadFactory.class);
+ private static final UncaughtExceptionHandler UEH = new
AccumuloUncaughtExceptionHandler();
+
private AtomicInteger threadNum = new AtomicInteger(1);
private String name;
+ private OptionalInt priority;
public NamingThreadFactory(String name) {
this.name = name;
+ this.priority = OptionalInt.empty();
+ }
+
+ public NamingThreadFactory(String name, OptionalInt priority) {
+ this.name = name;
+ this.priority = priority;
}
@Override
public Thread newThread(Runnable r) {
- return new Daemon(new LoggingRunnable(log, r), name + " " +
threadNum.getAndIncrement());
+ Thread thread = new Daemon(new LoggingRunnable(log, r),
+ name + " " + threadNum.getAndIncrement());
+ thread.setUncaughtExceptionHandler(UEH);
+ if (priority.isPresent()) {
+ thread.setPriority(priority.getAsInt());
+ }
+ return thread;
}
}
diff --git a/core/src/main/java/org/apache/accumulo/core/util/Stat.java
b/core/src/main/java/org/apache/accumulo/core/util/Stat.java
index 8bdeb63312..704622b005 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/Stat.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/Stat.java
@@ -16,67 +16,70 @@
*/
package org.apache.accumulo.core.util;
-import org.apache.commons.math3.stat.descriptive.StorelessUnivariateStatistic;
+import org.apache.accumulo.core.spi.common.Stats;
import org.apache.commons.math3.stat.descriptive.moment.Mean;
-import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
-import org.apache.commons.math3.stat.descriptive.rank.Max;
-import org.apache.commons.math3.stat.descriptive.rank.Min;
-import org.apache.commons.math3.stat.descriptive.summary.Sum;
-public class Stat {
- Min min;
- Max max;
- Sum sum;
+public class Stat implements Stats {
+ long min;
+ long max;
+ long sum;
Mean mean;
- StandardDeviation sd;
-
- StorelessUnivariateStatistic[] stats;
public Stat() {
- min = new Min();
- max = new Max();
- sum = new Sum();
mean = new Mean();
- sd = new StandardDeviation();
-
- stats = new StorelessUnivariateStatistic[] {min, max, sum, mean, sd};
+ clear();
}
public void addStat(long stat) {
- for (StorelessUnivariateStatistic statistic : stats) {
- statistic.increment(stat);
- }
+ min = Math.min(min, stat);
+ max = Math.max(max, stat);
+ sum += stat;
+ mean.increment(stat);
}
- public long getMin() {
- return (long) min.getResult();
+ @Override
+ public long min() {
+ return num() == 0 ? 0L : min;
}
- public long getMax() {
- return (long) max.getResult();
+ @Override
+ public long max() {
+ return num() == 0 ? 0L : max;
}
- public long getSum() {
- return (long) sum.getResult();
+ @Override
+ public long sum() {
+ return sum;
}
- public double getAverage() {
+ @Override
+ public double mean() {
return mean.getResult();
}
- public double getStdDev() {
- return sd.getResult();
- }
-
@Override
public String toString() {
- return String.format("%,d %,d %,.2f %,d", getMin(), getMax(),
getAverage(), mean.getN());
+ return String.format("%,d %,d %,.2f %,d", min(), max(), mean(),
mean.getN());
}
public void clear() {
- for (StorelessUnivariateStatistic statistic : stats) {
- statistic.clear();
- }
+ min = Long.MAX_VALUE;
+ max = Long.MIN_VALUE;
+ sum = 0;
+ mean.clear();
+ }
+
+ @Override
+ public long num() {
+ return mean.getN();
}
+ public Stat copy() {
+ Stat stat = new Stat();
+ stat.min = this.min;
+ stat.max = this.max;
+ stat.sum = this.sum;
+ stat.mean = this.mean.copy();
+ return stat;
+ }
}
diff --git
a/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java
b/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java
index 9f77834f4b..0fcfebdbcf 100644
---
a/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java
+++
b/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java
@@ -21,11 +21,15 @@
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Predicate;
+import org.apache.accumulo.core.conf.AccumuloConfiguration.ScanExecutorConfig;
+import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher;
+import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -131,12 +135,26 @@ public void testGetPortInvalidSyntax() {
private HashMap<String,String> props = new HashMap<>();
private int upCount = 0;
+ private AccumuloConfiguration parent;
+
+ TestConfiguration() {
+ parent = null;
+ }
+
+ TestConfiguration(AccumuloConfiguration parent) {
+ this.parent = parent;
+ }
public void set(String p, String v) {
props.put(p, v);
upCount++;
}
+ @Override
+ public boolean isPropertySet(Property prop) {
+ return props.containsKey(prop.getKey());
+ }
+
@Override
public long getUpdateCount() {
return upCount;
@@ -144,11 +162,18 @@ public long getUpdateCount() {
@Override
public String get(Property property) {
- return props.get(property.getKey());
+ String v = props.get(property.getKey());
+ if (v == null & parent != null) {
+ v = parent.get(property);
+ }
+ return v;
}
@Override
public void getProperties(Map<String,String> output, Predicate<String>
filter) {
+ if (parent != null) {
+ parent.getProperties(output, filter);
+ }
for (Entry<String,String> entry : props.entrySet()) {
if (filter.test(entry.getKey())) {
output.put(entry.getKey(), entry.getValue());
@@ -267,4 +292,89 @@ public void testGetByPrefix() {
Map<String,String> pmL =
tc.getAllPropertiesWithPrefix(Property.TABLE_ITERATOR_SCAN_PREFIX);
assertSame(pmG, pmL);
}
+
+ @Test
+ public void testScanExecutors() {
+ String defName = SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME;
+
+ TestConfiguration tc = new
TestConfiguration(DefaultConfiguration.getInstance());
+
+ Collection<ScanExecutorConfig> executors = tc.getScanExecutors();
+
+ Assert.assertEquals(2, executors.size());
+
+ ScanExecutorConfig sec = executors.stream().filter(c ->
c.name.equals(defName)).findFirst()
+ .get();
+ Assert.assertEquals(
+
Integer.parseInt(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getDefaultValue()),
+ sec.maxThreads);
+ Assert.assertFalse(sec.priority.isPresent());
+ Assert.assertTrue(sec.prioritizerClass.get().isEmpty());
+ Assert.assertTrue(sec.prioritizerOpts.isEmpty());
+
+ // ensure deprecated props is read if nothing else is set
+ tc.set("tserver.readahead.concurrent.max", "6");
+ Assert.assertEquals(6, sec.getCurrentMaxThreads());
+ Assert.assertEquals(
+
Integer.parseInt(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getDefaultValue()),
+ sec.maxThreads);
+ ScanExecutorConfig sec2 = tc.getScanExecutors().stream().filter(c ->
c.name.equals(defName))
+ .findFirst().get();
+ Assert.assertEquals(6, sec2.maxThreads);
+
+ // ensure new prop overrides deperecated prop
+ tc.set(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getKey(), "9");
+ Assert.assertEquals(9, sec.getCurrentMaxThreads());
+ Assert.assertEquals(
+
Integer.parseInt(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getDefaultValue()),
+ sec.maxThreads);
+ ScanExecutorConfig sec3 = tc.getScanExecutors().stream().filter(c ->
c.name.equals(defName))
+ .findFirst().get();
+ Assert.assertEquals(9, sec3.maxThreads);
+
+ ScanExecutorConfig sec4 = executors.stream().filter(c ->
c.name.equals("meta")).findFirst()
+ .get();
+ Assert.assertEquals(
+
Integer.parseInt(Property.TSERV_SCAN_EXECUTORS_META_THREADS.getDefaultValue()),
+ sec4.maxThreads);
+ Assert.assertFalse(sec4.priority.isPresent());
+ Assert.assertFalse(sec4.prioritizerClass.isPresent());
+ Assert.assertTrue(sec4.prioritizerOpts.isEmpty());
+
+ tc.set("tserver.metadata.readahead.concurrent.max", "2");
+ Assert.assertEquals(2, sec4.getCurrentMaxThreads());
+ ScanExecutorConfig sec5 = tc.getScanExecutors().stream().filter(c ->
c.name.equals("meta"))
+ .findFirst().get();
+ Assert.assertEquals(2, sec5.maxThreads);
+
+ tc.set(Property.TSERV_SCAN_EXECUTORS_META_THREADS.getKey(), "3");
+ Assert.assertEquals(3, sec4.getCurrentMaxThreads());
+ ScanExecutorConfig sec6 = tc.getScanExecutors().stream().filter(c ->
c.name.equals("meta"))
+ .findFirst().get();
+ Assert.assertEquals(3, sec6.maxThreads);
+
+ String prefix = Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey();
+ tc.set(prefix + "hulksmash.threads", "66");
+ tc.set(prefix + "hulksmash.priority", "3");
+ tc.set(prefix + "hulksmash.prioritizer", "com.foo.ScanPrioritizer");
+ tc.set(prefix + "hulksmash.prioritizer.opts.k1", "v1");
+ tc.set(prefix + "hulksmash.prioritizer.opts.k2", "v3");
+
+ executors = tc.getScanExecutors();
+ Assert.assertEquals(3, executors.size());
+ ScanExecutorConfig sec7 = executors.stream().filter(c ->
c.name.equals("hulksmash")).findFirst()
+ .get();
+ Assert.assertEquals(66, sec7.maxThreads);
+ Assert.assertEquals(3, sec7.priority.getAsInt());
+ Assert.assertEquals("com.foo.ScanPrioritizer",
sec7.prioritizerClass.get());
+ Assert.assertEquals(ImmutableMap.of("k1", "v1", "k2", "v3"),
sec7.prioritizerOpts);
+
+ tc.set(prefix + "hulksmash.threads", "44");
+ Assert.assertEquals(66, sec7.maxThreads);
+ Assert.assertEquals(44, sec7.getCurrentMaxThreads());
+
+ ScanExecutorConfig sec8 = tc.getScanExecutors().stream().filter(c ->
c.name.equals("hulksmash"))
+ .findFirst().get();
+ Assert.assertEquals(44, sec8.maxThreads);
+ }
}
diff --git
a/core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java
b/core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java
new file mode 100644
index 0000000000..0128b20954
--- /dev/null
+++
b/core/src/test/java/org/apache/accumulo/core/spi/scan/IdleRatioScanPrioritizerTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.accumulo.core.spi.scan.ScanInfo.Type;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class IdleRatioScanPrioritizerTest {
+
+ @Test
+ public void testSort() {
+ long now = System.currentTimeMillis();
+
+ List<TestScanInfo> scans = new ArrayList<>();
+
+ // Two following have never run, so oldest should go first
+ scans.add(new TestScanInfo("a", Type.SINGLE, now - 3));
+ scans.add(new TestScanInfo("b", Type.SINGLE, now - 8));
+ // Two following have different idle ratio and same last run times
+ scans.add(new TestScanInfo("c", Type.SINGLE, now - 16, 2, 10));
+ scans.add(new TestScanInfo("d", Type.SINGLE, now - 16, 5, 10));
+ // Two following have same idle ratio and different last run times
+ scans.add(new TestScanInfo("e", Type.SINGLE, now - 12, 5, 9));
+ scans.add(new TestScanInfo("f", Type.SINGLE, now - 12, 3, 7));
+
+ Collections.shuffle(scans);
+
+ Comparator<ScanInfo> comparator = new IdleRatioScanPrioritizer()
+ .createComparator(Collections.emptyMap());
+
+ Collections.sort(scans, comparator);
+
+ Assert.assertEquals("b", scans.get(0).testId);
+ Assert.assertEquals("a", scans.get(1).testId);
+ Assert.assertEquals("f", scans.get(2).testId);
+ Assert.assertEquals("e", scans.get(3).testId);
+ Assert.assertEquals("d", scans.get(4).testId);
+ Assert.assertEquals("c", scans.get(5).testId);
+ }
+}
diff --git
a/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
new file mode 100644
index 0000000000..b59858a9c0
--- /dev/null
+++
b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.spi.scan.ScanInfo.Type;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+public class SimpleScanDispatcherTest {
+ @Test
+ public void testProps() {
+ Assert.assertTrue(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS.getKey()
+ .endsWith(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME +
".threads"));
+
Assert.assertTrue(Property.TSERV_SCAN_EXECUTORS_DEFAULT_PRIORITIZER.getKey()
+ .endsWith(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME +
".prioritizer"));
+ }
+
+ private void runTest(Map<String,String> opts, String expectedSingle, String
expectedMulti) {
+ ScanInfo msi = new TestScanInfo("a", Type.MULTI, 4);
+ ScanInfo ssi = new TestScanInfo("a", Type.SINGLE, 4);
+
+ SimpleScanDispatcher ssd1 = new SimpleScanDispatcher();
+ ssd1.init(opts);
+ Assert.assertEquals(expectedMulti, ssd1.dispatch(msi, null));
+ Assert.assertEquals(expectedSingle, ssd1.dispatch(ssi, null));
+ }
+
+ @Test
+ public void testBasic() {
+ String dname = SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME;
+
+ runTest(Collections.emptyMap(), dname, dname);
+ runTest(ImmutableMap.of("executor", "E1"), "E1", "E1");
+ runTest(ImmutableMap.of("single_executor", "E2"), "E2", dname);
+ runTest(ImmutableMap.of("multi_executor", "E3"), dname, "E3");
+ runTest(ImmutableMap.of("executor", "E1", "single_executor", "E2"), "E2",
"E1");
+ runTest(ImmutableMap.of("executor", "E1", "multi_executor", "E3"), "E1",
"E3");
+ runTest(ImmutableMap.of("single_executor", "E2", "multi_executor", "E3"),
"E2", "E3");
+ runTest(ImmutableMap.of("executor", "E1", "single_executor", "E2",
"multi_executor", "E3"),
+ "E2", "E3");
+ }
+}
diff --git
a/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java
b/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java
new file mode 100644
index 0000000000..68a96502f8
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.spi.scan;
+
+import java.util.Collection;
+import java.util.OptionalLong;
+import java.util.Set;
+
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.spi.common.IteratorConfiguration;
+import org.apache.accumulo.core.spi.common.Stats;
+import org.apache.accumulo.core.util.Stat;
+
+public class TestScanInfo implements ScanInfo {
+
+ String testId;
+ Type scanType;
+ long creationTime;
+ OptionalLong lastRunTime = OptionalLong.empty();
+ Stat runTimeStats = new Stat();
+ Stat idleTimeStats = new Stat();
+
+ TestScanInfo(String testId, Type scanType, long creationTime, int... times) {
+ this.testId = testId;
+ this.scanType = scanType;
+ this.creationTime = creationTime;
+
+ for (int i = 0; i < times.length; i += 2) {
+ long idleDuration = times[i] - (i == 0 ? 0 : times[i - 1]);
+ long runDuration = times[i + 1] - times[i];
+ runTimeStats.addStat(runDuration);
+ idleTimeStats.addStat(idleDuration);
+ }
+
+ if (times.length > 0) {
+ lastRunTime = OptionalLong.of(times[times.length - 1] + creationTime);
+ }
+ }
+
+ @Override
+ public Type getScanType() {
+ return scanType;
+ }
+
+ @Override
+ public String getTableId() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getCreationTime() {
+ return creationTime;
+ }
+
+ @Override
+ public OptionalLong getLastRunTime() {
+ return lastRunTime;
+ }
+
+ @Override
+ public Stats getRunTimeStats() {
+ return runTimeStats;
+ }
+
+ @Override
+ public Stats getIdleTimeStats() {
+ return idleTimeStats;
+ }
+
+ @Override
+ public Stats getIdleTimeStats(long currentTime) {
+ Stat copy = idleTimeStats.copy();
+ copy.addStat(currentTime - lastRunTime.orElse(creationTime));
+ return copy;
+ }
+
+ @Override
+ public Set<Column> getFetchedColumns() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Collection<IteratorConfiguration> getClientScanIterators() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/core/src/test/java/org/apache/accumulo/core/util/StatTest.java
b/core/src/test/java/org/apache/accumulo/core/util/StatTest.java
index 69df38d4e6..a2986acc50 100644
--- a/core/src/test/java/org/apache/accumulo/core/util/StatTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/util/StatTest.java
@@ -43,32 +43,26 @@ public void setUp() {
@Test
public void testGetMin() {
- assertEquals(0, zero.getMin());
- assertEquals(3677, stat.getMin());
+ assertEquals(0, zero.min());
+ assertEquals(3677, stat.min());
}
@Test
public void testGetMax() {
- assertEquals(0, zero.getMax());
- assertEquals(9792, stat.getMax());
+ assertEquals(0, zero.max());
+ assertEquals(9792, stat.max());
}
@Test
public void testGetAverage() {
- assertEquals(0, zero.getAverage(), delta);
- assertEquals(5529, stat.getAverage(), delta);
- }
-
- @Test
- public void testGetStdDev() {
- assertEquals(0, zero.getStdDev(), delta);
- assertEquals(2073.7656569632, stat.getStdDev(), delta);
+ assertEquals(0, zero.mean(), delta);
+ assertEquals(5529, stat.mean(), delta);
}
@Test
public void testGetSum() {
- assertEquals(0, zero.getSum());
- assertEquals(38703, stat.getSum());
+ assertEquals(0, zero.sum());
+ assertEquals(38703, stat.sum());
}
@Test
@@ -76,16 +70,14 @@ public void testClear() {
zero.clear();
stat.clear();
- assertEquals(0, zero.getMax());
- assertEquals(zero.getMax(), stat.getMax());
- assertEquals(0, zero.getMin());
- assertEquals(zero.getMin(), stat.getMin());
- assertEquals(0, zero.getSum());
- assertEquals(zero.getSum(), stat.getSum());
+ assertEquals(0, zero.max());
+ assertEquals(zero.max(), stat.max());
+ assertEquals(0, zero.min());
+ assertEquals(zero.min(), stat.min());
+ assertEquals(0, zero.sum());
+ assertEquals(zero.sum(), stat.sum());
- assertEquals(Double.NaN, zero.getAverage(), 0);
- assertEquals(zero.getAverage(), stat.getAverage(), 0);
- assertEquals(Double.NaN, zero.getStdDev(), 0);
- assertEquals(zero.getStdDev(), stat.getStdDev(), 0);
+ assertEquals(Double.NaN, zero.mean(), 0);
+ assertEquals(zero.mean(), stat.mean(), 0);
}
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
index dbb7ca49ec..cc67feb484 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
@@ -19,6 +19,7 @@
import static java.util.Objects.requireNonNull;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
@@ -36,6 +37,7 @@
import org.apache.accumulo.core.data.thrift.IterInfo;
import org.apache.accumulo.core.iterators.IteratorUtil;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.ZooCache;
import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
@@ -216,4 +218,39 @@ public ParsedIteratorConfig
getParsedIteratorConfig(IteratorScope scope) {
return pic;
}
+
+ public static class TablesScanDispatcher {
+ public final ScanDispatcher dispatcher;
+ public final long count;
+
+ public TablesScanDispatcher(ScanDispatcher dispatcher, long count) {
+ this.dispatcher = dispatcher;
+ this.count = count;
+ }
+ }
+
+ private AtomicReference<TablesScanDispatcher> scanDispatcherRef = new
AtomicReference<>();
+
+ public ScanDispatcher getScanDispatcher() {
+ long count = getUpdateCount();
+ TablesScanDispatcher currRef = scanDispatcherRef.get();
+ if (currRef == null || currRef.count != count) {
+ ScanDispatcher newDispatcher =
Property.createTableInstanceFromPropertyName(this,
+ Property.TABLE_SCAN_DISPATCHER, ScanDispatcher.class, null);
+
+ Map<String,String> opts = new HashMap<>();
+
getAllPropertiesWithPrefix(Property.TABLE_SCAN_DISPATCHER_OPTS).forEach((k, v)
-> {
+ String optKey =
k.substring(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey().length());
+ opts.put(optKey, v);
+ });
+
+ newDispatcher.init(Collections.unmodifiableMap(opts));
+
+ TablesScanDispatcher newRef = new TablesScanDispatcher(newDispatcher,
count);
+ scanDispatcherRef.compareAndSet(currRef, newRef);
+ currRef = newRef;
+ }
+
+ return currRef.dispatcher;
+ }
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
b/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
index 1973fd8e24..ab41141817 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
@@ -81,11 +81,12 @@ private String _get(Property property) {
@Override
public String get(Property property) {
if (Property.isFixedZooPropertyKey(property)) {
- if (fixedProps.containsKey(property.getKey())) {
- return fixedProps.get(property.getKey());
+ String val = fixedProps.get(property.getKey());
+ if (val != null) {
+ return val;
} else {
synchronized (fixedProps) {
- String val = _get(property);
+ val = _get(property);
fixedProps.put(property.getKey(), val);
return val;
}
@@ -96,6 +97,12 @@ public String get(Property property) {
}
}
+ @Override
+ public boolean isPropertySet(Property prop) {
+ return fixedProps.containsKey(prop.getKey()) || getRaw(prop.getKey()) !=
null
+ || parent.isPropertySet(prop);
+ }
+
private String getRaw(String key) {
String zPath = propPathPrefix + "/" + key;
byte[] v = propCache.get(zPath);
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 6e67cf045e..14c0306e22 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -130,6 +130,7 @@
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher;
import org.apache.accumulo.core.summary.Gatherer;
import org.apache.accumulo.core.summary.Gatherer.FileSystemResolver;
import org.apache.accumulo.core.summary.SummaryCollection;
@@ -250,9 +251,9 @@
import org.apache.accumulo.tserver.scan.ScanRunState;
import org.apache.accumulo.tserver.session.ConditionalSession;
import org.apache.accumulo.tserver.session.MultiScanSession;
-import org.apache.accumulo.tserver.session.ScanSession;
import org.apache.accumulo.tserver.session.Session;
import org.apache.accumulo.tserver.session.SessionManager;
+import org.apache.accumulo.tserver.session.SingleScanSession;
import org.apache.accumulo.tserver.session.SummarySession;
import org.apache.accumulo.tserver.session.UpdateSession;
import org.apache.accumulo.tserver.tablet.BulkImportCacheCleaner;
@@ -543,6 +544,16 @@ public void loadFiles(TInfo tinfo, TCredentials
credentials, long tid, TKeyExten
}
}
+ private ScanDispatcher getScanDispatcher(KeyExtent extent) {
+ if (extent.isRootTablet() || extent.isMeta()) {
+ // dispatcher is only for user tables
+ return null;
+ }
+
+ return
getServerConfigurationFactory().getTableConfiguration(extent.getTableId())
+ .getScanDispatcher();
+ }
+
@Override
public InitialScan startScan(TInfo tinfo, TCredentials credentials,
TKeyExtent textent,
TRange range, List<TColumn> columns, int batchSize, List<IterInfo>
ssiList,
@@ -587,13 +598,14 @@ public InitialScan startScan(TInfo tinfo, TCredentials
credentials, TKeyExtent t
if (tablet == null)
throw new NotServingTabletException(textent);
- Set<Column> columnSet = new HashSet<>();
+ HashSet<Column> columnSet = new HashSet<>();
for (TColumn tcolumn : columns) {
columnSet.add(new Column(tcolumn));
}
- final ScanSession scanSession = new ScanSession(credentials, extent,
columnSet, ssiList, ssio,
- new Authorizations(authorizations), readaheadThreshold,
batchTimeOut, context);
+ final SingleScanSession scanSession = new SingleScanSession(credentials,
extent, columnSet,
+ ssiList, ssio, new Authorizations(authorizations),
readaheadThreshold, batchTimeOut,
+ context);
scanSession.scanner = tablet.createScanner(new Range(range), batchSize,
scanSession.columnSet,
scanSession.auths, ssiList, ssio, isolated,
scanSession.interruptFlag,
SamplerConfigurationImpl.fromThrift(tSamplerConfig),
scanSession.batchTimeOut,
@@ -619,7 +631,7 @@ public ScanResult continueScan(TInfo tinfo, long scanID)
throws NoSuchScanIDException, NotServingTabletException,
org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
TSampleNotPresentException {
- ScanSession scanSession = (ScanSession)
sessionManager.reserveSession(scanID);
+ SingleScanSession scanSession = (SingleScanSession)
sessionManager.reserveSession(scanID);
if (scanSession == null) {
throw new NoSuchScanIDException();
}
@@ -631,7 +643,7 @@ public ScanResult continueScan(TInfo tinfo, long scanID)
}
}
- private ScanResult continueScan(TInfo tinfo, long scanID, ScanSession
scanSession)
+ private ScanResult continueScan(TInfo tinfo, long scanID,
SingleScanSession scanSession)
throws NoSuchScanIDException, NotServingTabletException,
org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
TSampleNotPresentException {
@@ -639,7 +651,8 @@ private ScanResult continueScan(TInfo tinfo, long scanID,
ScanSession scanSessio
if (scanSession.nextBatchTask == null) {
scanSession.nextBatchTask = new NextBatchTask(TabletServer.this,
scanID,
scanSession.interruptFlag);
- resourceManager.executeReadAhead(scanSession.extent,
scanSession.nextBatchTask);
+ resourceManager.executeReadAhead(scanSession.extent,
getScanDispatcher(scanSession.extent),
+ scanSession, scanSession.nextBatchTask);
}
ScanBatch bresult;
@@ -694,7 +707,8 @@ else if (e.getCause() instanceof IOException) {
// to client
scanSession.nextBatchTask = new NextBatchTask(TabletServer.this,
scanID,
scanSession.interruptFlag);
- resourceManager.executeReadAhead(scanSession.extent,
scanSession.nextBatchTask);
+ resourceManager.executeReadAhead(scanSession.extent,
getScanDispatcher(scanSession.extent),
+ scanSession, scanSession.nextBatchTask);
}
if (!scanResult.more)
@@ -705,14 +719,14 @@ else if (e.getCause() instanceof IOException) {
@Override
public void closeScan(TInfo tinfo, long scanID) {
- final ScanSession ss = (ScanSession)
sessionManager.removeSession(scanID);
+ final SingleScanSession ss = (SingleScanSession)
sessionManager.removeSession(scanID);
if (ss != null) {
long t2 = System.currentTimeMillis();
if (log.isTraceEnabled()) {
log.trace(String.format("ScanSess tid %s %s %,d entries in %.2f
secs, nbTimes = [%s] ",
TServerUtils.clientAddress.get(), ss.extent.getTableId(),
ss.entriesReturned,
- (t2 - ss.startTime) / 1000.0, ss.nbTimes.toString()));
+ (t2 - ss.startTime) / 1000.0, ss.runStats.toString()));
}
if (scanMetrics.isEnabled()) {
@@ -818,7 +832,8 @@ private MultiScanResult continueMultiScan(TInfo tinfo, long
scanID, MultiScanSes
if (session.lookupTask == null) {
session.lookupTask = new LookupTask(TabletServer.this, scanID);
- resourceManager.executeReadAhead(session.threadPoolExtent,
session.lookupTask);
+ resourceManager.executeReadAhead(session.threadPoolExtent,
+ getScanDispatcher(session.threadPoolExtent), session,
session.lookupTask);
}
try {
@@ -1174,8 +1189,8 @@ public UpdateErrors closeUpdate(TInfo tinfo, long
updateID) throws NoSuchScanIDE
String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs
lt=%.3fs ct=%.3fs)",
TServerUtils.clientAddress.get(), us.totalUpdates,
(System.currentTimeMillis() - us.startTime) / 1000.0,
us.authTimes.toString(),
- us.flushTime / 1000.0, us.prepareTimes.getSum() / 1000.0,
- us.walogTimes.getSum() / 1000.0, us.commitTimes.getSum() /
1000.0));
+ us.flushTime / 1000.0, us.prepareTimes.sum() / 1000.0,
us.walogTimes.sum() / 1000.0,
+ us.commitTimes.sum() / 1000.0));
}
if (us.failures.size() > 0) {
Entry<KeyExtent,Long> first = us.failures.entrySet().iterator().next();
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 71cebbe014..5ab6e27f4b 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -21,10 +21,15 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Queue;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
@@ -32,11 +37,17 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.function.IntSupplier;
+import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.AccumuloConfiguration.ScanExecutorConfig;
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
@@ -45,6 +56,11 @@
import
org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
import
org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.spi.scan.ScanDispatcher;
+import org.apache.accumulo.core.spi.scan.ScanExecutor;
+import org.apache.accumulo.core.spi.scan.ScanInfo;
+import org.apache.accumulo.core.spi.scan.ScanPrioritizer;
+import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.NamingThreadFactory;
import org.apache.accumulo.fate.util.LoggingRunnable;
@@ -62,14 +78,18 @@
import org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy;
import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
+import org.apache.accumulo.tserver.session.ScanSession;
import org.apache.accumulo.tserver.tablet.Tablet;
import org.apache.htrace.wrappers.TraceExecutorService;
+import org.apache.htrace.wrappers.TraceRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
/**
* ResourceManager is responsible for managing the resources of all tablets
within a tablet server.
@@ -88,13 +108,14 @@
private final ExecutorService migrationPool;
private final ExecutorService assignmentPool;
private final ExecutorService assignMetaDataPool;
- private final ExecutorService readAheadThreadPool;
- private final ExecutorService defaultReadAheadThreadPool;
private final ExecutorService summaryRetrievalPool;
private final ExecutorService summaryParitionPool;
private final ExecutorService summaryRemotePool;
private final Map<String,ExecutorService> threadPools = new TreeMap<>();
+ private final Map<String,ExecutorService> scanExecutors;
+ private final Map<String,ScanExecutor> scanExecutorChoices;
+
private final ConcurrentHashMap<KeyExtent,RunnableStartedAt>
activeAssignments;
private final FileManager fileManager;
@@ -120,16 +141,15 @@ private ExecutorService addEs(String name,
ExecutorService tp) {
return tp;
}
- private ExecutorService addEs(final Property maxThreads, String name,
- final ThreadPoolExecutor tp) {
+ private ExecutorService addEs(IntSupplier maxThreads, String name, final
ThreadPoolExecutor tp) {
ExecutorService result = addEs(name, tp);
SimpleTimer.getInstance(tserver.getConfiguration()).schedule(new
Runnable() {
@Override
public void run() {
try {
- int max = tserver.getConfiguration().getCount(maxThreads);
+ int max = maxThreads.getAsInt();
if (tp.getMaximumPoolSize() != max) {
- log.info("Changing {} to {}", maxThreads.getKey(), max);
+ log.info("Changing max threads for {} to {}", name, max);
tp.setCorePoolSize(max);
tp.setMaximumPoolSize(max);
}
@@ -142,6 +162,16 @@ public void run() {
return result;
}
+ private ExecutorService createIdlingEs(Property max, String name, long
timeout,
+ TimeUnit timeUnit) {
+ LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+ int maxThreads = conf.getSystemConfiguration().getCount(max);
+ ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads,
timeout, timeUnit, queue,
+ new NamingThreadFactory(name));
+ tp.allowCoreThreadTimeOut(true);
+ return addEs(() -> conf.getSystemConfiguration().getCount(max), name, tp);
+ }
+
private ExecutorService createEs(int max, String name) {
return addEs(name, Executors.newFixedThreadPool(max, new
NamingThreadFactory(name)));
}
@@ -150,21 +180,52 @@ private ExecutorService createEs(Property max, String
name) {
return createEs(max, name, new LinkedBlockingQueue<>());
}
- private ExecutorService createIdlingEs(Property max, String name, long
timeout,
- TimeUnit timeUnit) {
- LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
- int maxThreads = conf.getSystemConfiguration().getCount(max);
- ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads,
timeout, timeUnit, queue,
- new NamingThreadFactory(name));
- tp.allowCoreThreadTimeOut(true);
- return addEs(max, name, tp);
+ private ExecutorService createPriorityExecutor(ScanExecutorConfig sec,
+ Map<String,Queue<?>> scanExecQueues) {
+
+ BlockingQueue<Runnable> queue;
+
+ if (sec.prioritizerClass.orElse("").isEmpty()) {
+ queue = new LinkedBlockingQueue<>();
+ } else {
+ ScanPrioritizer factory = null;
+ try {
+ factory = ConfigurationTypeHelper.getClassInstance(null,
sec.prioritizerClass.get(),
+ ScanPrioritizer.class);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ if (factory == null) {
+ queue = new LinkedBlockingQueue<>();
+ } else {
+ Comparator<ScanInfo> comparator =
factory.createComparator(sec.prioritizerOpts);
+
+ // function to extract scan scan session from runnable
+ Function<Runnable,ScanInfo> extractor = r ->
((ScanSession.ScanMeasurer) ((TraceRunnable) r)
+ .getRunnable()).getScanInfo();
+
+ queue = new PriorityBlockingQueue<>(sec.maxThreads,
+ Comparator.comparing(extractor, comparator));
+ }
+ }
+
+ scanExecQueues.put(sec.name, queue);
+
+ return createEs(() -> sec.getCurrentMaxThreads(), "scan-" + sec.name,
queue, sec.priority);
}
- private ExecutorService createEs(Property max, String name,
BlockingQueue<Runnable> queue) {
- int maxThreads = conf.getSystemConfiguration().getCount(max);
+ private ExecutorService createEs(IntSupplier maxThreadsSupplier, String name,
+ BlockingQueue<Runnable> queue, OptionalInt priority) {
+ int maxThreads = maxThreadsSupplier.getAsInt();
ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 0L,
- TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name));
- return addEs(max, name, tp);
+ TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name, priority));
+ return addEs(maxThreadsSupplier, name, tp);
+ }
+
+ private ExecutorService createEs(Property max, String name,
BlockingQueue<Runnable> queue) {
+ IntSupplier maxThreadsSupplier = () ->
conf.getSystemConfiguration().getCount(max);
+ return createEs(maxThreadsSupplier, name, queue, OptionalInt.empty());
}
private ExecutorService createEs(int min, int max, int timeout, String name)
{
@@ -172,6 +233,80 @@ private ExecutorService createEs(int min, int max, int
timeout, String name) {
new LinkedBlockingQueue<>(), new NamingThreadFactory(name)));
}
+ protected Map<String,ExecutorService> createScanExecutors(Instance instance,
+ Collection<ScanExecutorConfig> scanExecCfg, Map<String,Queue<?>>
scanExecQueues) {
+ Builder<String,ExecutorService> builder = ImmutableMap.builder();
+
+ for (ScanExecutorConfig sec : scanExecCfg) {
+ builder.put(sec.name, createPriorityExecutor(sec, scanExecQueues));
+ }
+
+ return builder.build();
+ }
+
+ private static class ScanExecutorImpl implements ScanExecutor {
+
+ private static class ConfigImpl implements ScanExecutor.Config {
+
+ final ScanExecutorConfig cfg;
+
+ public ConfigImpl(ScanExecutorConfig sec) {
+ this.cfg = sec;
+ }
+
+ @Override
+ public String getName() {
+ return cfg.name;
+ }
+
+ @Override
+ public int getMaxThreads() {
+ return cfg.maxThreads;
+ }
+
+ @Override
+ public Optional<String> getPrioritizerClass() {
+ return cfg.prioritizerClass;
+ }
+
+ @Override
+ public Map<String,String> getPrioritizerOptions() {
+ return cfg.prioritizerOpts;
+ }
+
+ }
+
+ private final ConfigImpl config;
+ private final Queue<?> queue;
+
+ ScanExecutorImpl(ScanExecutorConfig sec, Queue<?> q) {
+ this.config = new ConfigImpl(sec);
+ this.queue = q;
+ }
+
+ @Override
+ public int getQueued() {
+ return queue.size();
+ }
+
+ @Override
+ public Config getConfig() {
+ return config;
+ }
+
+ }
+
+ private Map<String,ScanExecutor> createScanExecutorChoices(
+ Collection<ScanExecutorConfig> scanExecCfg, Map<String,Queue<?>>
scanExecQueues) {
+ Builder<String,ScanExecutor> builder = ImmutableMap.builder();
+
+ for (ScanExecutorConfig sec : scanExecCfg) {
+ builder.put(sec.name, new ScanExecutorImpl(sec,
scanExecQueues.get(sec.name)));
+ }
+
+ return builder.build();
+ }
+
public TabletServerResourceManager(TabletServer tserver, VolumeManager fs) {
this.tserver = tserver;
this.conf = tserver.getServerConfigurationFactory();
@@ -251,10 +386,6 @@ public TabletServerResourceManager(TabletServer tserver,
VolumeManager fs) {
activeAssignments = new ConcurrentHashMap<>();
- readAheadThreadPool = createEs(Property.TSERV_READ_AHEAD_MAXCONCURRENT,
"tablet read ahead");
- defaultReadAheadThreadPool =
createEs(Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT,
- "metadata tablets read ahead");
-
summaryRetrievalPool =
createIdlingEs(Property.TSERV_SUMMARY_RETRIEVAL_THREADS,
"summary file retriever", 60, TimeUnit.SECONDS);
summaryRemotePool = createIdlingEs(Property.TSERV_SUMMARY_REMOTE_THREADS,
"summary remote", 60,
@@ -262,6 +393,11 @@ public TabletServerResourceManager(TabletServer tserver,
VolumeManager fs) {
summaryParitionPool =
createIdlingEs(Property.TSERV_SUMMARY_PARTITION_THREADS,
"summary partition", 60, TimeUnit.SECONDS);
+ Collection<ScanExecutorConfig> scanExecCfg = acuConf.getScanExecutors();
+ Map<String,Queue<?>> scanExecQueues = new HashMap<>();
+ scanExecutors = createScanExecutors(tserver.getInstance(), scanExecCfg,
scanExecQueues);
+ scanExecutorChoices = createScanExecutorChoices(scanExecCfg,
scanExecQueues);
+
int maxOpenFiles = acuConf.getCount(Property.TSERV_SCAN_MAX_OPENFILES);
Cache<String,Long> fileLenCache = CacheBuilder.newBuilder()
@@ -791,13 +927,29 @@ public void executeMajorCompaction(KeyExtent tablet,
Runnable compactionTask) {
}
}
- public void executeReadAhead(KeyExtent tablet, Runnable task) {
+ public void executeReadAhead(KeyExtent tablet, ScanDispatcher dispatcher,
ScanSession scanInfo,
+ Runnable task) {
+
+ task = ScanSession.wrap(scanInfo, task);
+
if (tablet.isRootTablet()) {
task.run();
} else if (tablet.isMeta()) {
- defaultReadAheadThreadPool.execute(task);
+ scanExecutors.get("meta").execute(task);
} else {
- readAheadThreadPool.execute(task);
+ String scanExecutorName = dispatcher.dispatch(scanInfo,
scanExecutorChoices);
+ ExecutorService executor = scanExecutors.get(scanExecutorName);
+ if (executor == null) {
+ log.warn(
+ "For table id {}, {} dispatched to non-existant executor {} Using
default executor.",
+ tablet.getTableId(), dispatcher.getClass().getName(),
scanExecutorName);
+ executor =
scanExecutors.get(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME);
+ } else if ("meta".equals(scanExecutorName)) {
+ log.warn("For table id {}, {} dispatched to meta executor. Using
default executor.",
+ tablet.getTableId(), dispatcher.getClass().getName());
+ executor =
scanExecutors.get(SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME);
+ }
+ executor.execute(task);
}
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
index aee4477475..16b9c55581 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
@@ -23,7 +23,7 @@
import org.apache.accumulo.server.util.Halt;
import org.apache.accumulo.tserver.TabletServer;
import org.apache.accumulo.tserver.TooManyFilesException;
-import org.apache.accumulo.tserver.session.ScanSession;
+import org.apache.accumulo.tserver.session.SingleScanSession;
import org.apache.accumulo.tserver.tablet.ScanBatch;
import org.apache.accumulo.tserver.tablet.Tablet;
import org.apache.accumulo.tserver.tablet.TabletClosedException;
@@ -48,7 +48,7 @@ public NextBatchTask(TabletServer server, long scanID,
AtomicBoolean interruptFl
@Override
public void run() {
- final ScanSession scanSession = (ScanSession) server.getSession(scanID);
+ final SingleScanSession scanSession = (SingleScanSession)
server.getSession(scanID);
String oldThreadName = Thread.currentThread().getName();
try {
@@ -69,10 +69,7 @@ public void run() {
return;
}
- long t1 = System.currentTimeMillis();
ScanBatch batch = scanSession.scanner.read();
- long t2 = System.currentTimeMillis();
- scanSession.nbTimes.addStat(t2 - t1);
// there should only be one thing on the queue at a time, so
// it should be ok to call add()
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
index 4285a51446..f8db1f42c5 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
@@ -21,7 +21,6 @@
import java.util.Map;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
-import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.data.thrift.IterInfo;
@@ -30,13 +29,9 @@
import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.accumulo.tserver.scan.ScanTask;
-public class MultiScanSession extends Session {
+public class MultiScanSession extends ScanSession {
public final KeyExtent threadPoolExtent;
- public final HashSet<Column> columnSet = new HashSet<>();
public final Map<KeyExtent,List<Range>> queries;
- public final List<IterInfo> ssiList;
- public final Map<String,Map<String,String>> ssio;
- public final Authorizations auths;
public final SamplerConfiguration samplerConfig;
public final long batchTimeOut;
public final String context;
@@ -53,17 +48,24 @@ public MultiScanSession(TCredentials credentials, KeyExtent
threadPoolExtent,
Map<KeyExtent,List<Range>> queries, List<IterInfo> ssiList,
Map<String,Map<String,String>> ssio, Authorizations authorizations,
SamplerConfiguration samplerConfig, long batchTimeOut, String context) {
- super(credentials);
+ super(credentials, new HashSet<>(), ssiList, ssio, authorizations);
this.queries = queries;
- this.ssiList = ssiList;
- this.ssio = ssio;
- this.auths = authorizations;
this.threadPoolExtent = threadPoolExtent;
this.samplerConfig = samplerConfig;
this.batchTimeOut = batchTimeOut;
this.context = context;
}
+ @Override
+ public Type getScanType() {
+ return Type.MULTI;
+ }
+
+ @Override
+ public String getTableId() {
+ return threadPoolExtent.getTableId().canonicalID();
+ }
+
@Override
public boolean cleanup() {
if (lookupTask != null)
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
index a55de1e39f..5cde06c657 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
@@ -16,64 +16,148 @@
*/
package org.apache.accumulo.tserver.session;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.OptionalLong;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.data.Column;
-import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.data.thrift.IterInfo;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.spi.common.IteratorConfiguration;
+import org.apache.accumulo.core.spi.common.Stats;
+import org.apache.accumulo.core.spi.scan.ScanInfo;
import org.apache.accumulo.core.util.Stat;
-import org.apache.accumulo.tserver.scan.ScanTask;
-import org.apache.accumulo.tserver.tablet.ScanBatch;
-import org.apache.accumulo.tserver.tablet.Scanner;
-
-public class ScanSession extends Session {
- public final Stat nbTimes = new Stat();
- public final KeyExtent extent;
- public final Set<Column> columnSet;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public abstract class ScanSession extends Session implements ScanInfo {
+
+ public static class ScanMeasurer implements Runnable {
+
+ private ScanSession session;
+ private Runnable task;
+
+ ScanMeasurer(ScanSession session, Runnable task) {
+ this.session = session;
+ this.task = task;
+ }
+
+ @Override
+ public void run() {
+ long t1 = System.currentTimeMillis();
+ task.run();
+ long t2 = System.currentTimeMillis();
+ session.finishedRun(t1, t2);
+ }
+
+ public ScanInfo getScanInfo() {
+ return session;
+ }
+ }
+
+ public static ScanMeasurer wrap(ScanSession scanInfo, Runnable r) {
+ return new ScanMeasurer(scanInfo, r);
+ }
+
+ private OptionalLong lastRunTime = OptionalLong.empty();
+ private Stat idleStats = new Stat();
+ public Stat runStats = new Stat();
+
+ public final HashSet<Column> columnSet;
public final List<IterInfo> ssiList;
public final Map<String,Map<String,String>> ssio;
public final Authorizations auths;
- public final AtomicBoolean interruptFlag = new AtomicBoolean();
- public long entriesReturned = 0;
- public long batchCount = 0;
- public volatile ScanTask<ScanBatch> nextBatchTask;
- public Scanner scanner;
- public final long readaheadThreshold;
- public final long batchTimeOut;
- public final String context;
-
- public ScanSession(TCredentials credentials, KeyExtent extent, Set<Column>
columnSet,
- List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
Authorizations authorizations,
- long readaheadThreshold, long batchTimeOut, String context) {
+
+ ScanSession(TCredentials credentials, HashSet<Column> cols, List<IterInfo>
ssiList,
+ Map<String,Map<String,String>> ssio, Authorizations auths) {
super(credentials);
- this.extent = extent;
- this.columnSet = columnSet;
+ this.columnSet = cols;
this.ssiList = ssiList;
this.ssio = ssio;
- this.auths = authorizations;
- this.readaheadThreshold = readaheadThreshold;
- this.batchTimeOut = batchTimeOut;
- this.context = context;
+ this.auths = auths;
+ }
+
+ @Override
+ public long getCreationTime() {
+ return startTime;
+ }
+
+ @Override
+ public OptionalLong getLastRunTime() {
+ return lastRunTime;
}
@Override
- public boolean cleanup() {
- final boolean ret;
- try {
- if (nextBatchTask != null)
- nextBatchTask.cancel(true);
- } finally {
- if (scanner != null)
- ret = scanner.close();
- else
- ret = true;
+ public Stats getRunTimeStats() {
+ return runStats;
+ }
+
+ @Override
+ public Stats getIdleTimeStats() {
+ return idleStats;
+ }
+
+ @Override
+ public Stats getIdleTimeStats(long currentTime) {
+ long idleTime = currentTime - getLastRunTime().orElse(getCreationTime());
+ Preconditions.checkArgument(idleTime >= 0);
+ Stat copy = idleStats.copy();
+ copy.addStat(idleTime);
+ return copy;
+ }
+
+ @Override
+ public Set<Column> getFetchedColumns() {
+ return Collections.unmodifiableSet(columnSet);
+ }
+
+ private class IterConfImpl implements IteratorConfiguration {
+
+ private IterInfo ii;
+
+ IterConfImpl(IterInfo ii) {
+ this.ii = ii;
+ }
+
+ @Override
+ public String getIteratorClass() {
+ return ii.className;
+ }
+
+ @Override
+ public String getName() {
+ return ii.iterName;
+ }
+
+ @Override
+ public int getPriority() {
+ return ii.priority;
+ }
+
+ @Override
+ public Map<String,String> getOptions() {
+ Map<String,String> opts = ssio.get(ii.iterName);
+ return opts == null || opts.isEmpty() ? Collections.emptyMap()
+ : Collections.unmodifiableMap(opts);
}
- return ret;
}
+ @Override
+ public Collection<IteratorConfiguration> getClientScanIterators() {
+ return Lists.transform(ssiList, IterConfImpl::new);
+ }
+
+ public void finishedRun(long start, long finish) {
+ long idleTime = start - getLastRunTime().orElse(getCreationTime());
+ long runTime = finish - start;
+ lastRunTime = OptionalLong.of(finish);
+ idleStats.addStat(idleTime);
+ runStats.addStat(runTime);
+ }
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
index eed45cf073..6bd52e4686 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
@@ -26,7 +26,7 @@
}
public final String client;
- long lastAccessTime;
+ public long lastAccessTime;
public long startTime;
State state = State.NEW;
private final TCredentials credentials;
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index 724d23c18f..1ba2491d5c 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@ -315,8 +315,8 @@ public void run() {
ScanTask nbt = null;
Table.ID tableID = null;
- if (session instanceof ScanSession) {
- ScanSession ss = (ScanSession) session;
+ if (session instanceof SingleScanSession) {
+ SingleScanSession ss = (SingleScanSession) session;
nbt = ss.nextBatchTask;
tableID = ss.extent.getTableId();
} else if (session instanceof MultiScanSession) {
@@ -362,8 +362,8 @@ public void run() {
for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(),
copiedIdleSessions)) {
Session session = entry.getValue();
- if (session instanceof ScanSession) {
- ScanSession ss = (ScanSession) session;
+ if (session instanceof SingleScanSession) {
+ SingleScanSession ss = (SingleScanSession) session;
ScanState state = ScanState.RUNNING;
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleScanSession.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleScanSession.java
new file mode 100644
index 0000000000..fb3e29fe56
--- /dev/null
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleScanSession.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.session;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.tserver.scan.ScanTask;
+import org.apache.accumulo.tserver.tablet.ScanBatch;
+import org.apache.accumulo.tserver.tablet.Scanner;
+
+public class SingleScanSession extends ScanSession {
+ public final KeyExtent extent;
+ public final AtomicBoolean interruptFlag = new AtomicBoolean();
+ public long entriesReturned = 0;
+ public long batchCount = 0;
+ public volatile ScanTask<ScanBatch> nextBatchTask;
+ public Scanner scanner;
+ public final long readaheadThreshold;
+ public final long batchTimeOut;
+ public final String context;
+
+ public SingleScanSession(TCredentials credentials, KeyExtent extent,
HashSet<Column> columnSet,
+ List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
Authorizations authorizations,
+ long readaheadThreshold, long batchTimeOut, String context) {
+ super(credentials, columnSet, ssiList, ssio, authorizations);
+ this.extent = extent;
+ this.readaheadThreshold = readaheadThreshold;
+ this.batchTimeOut = batchTimeOut;
+ this.context = context;
+ }
+
+ @Override
+ public Type getScanType() {
+ return Type.SINGLE;
+ }
+
+ @Override
+ public String getTableId() {
+ return extent.getTableId().canonicalID();
+ }
+
+ @Override
+ public boolean cleanup() {
+ final boolean ret;
+ try {
+ if (nextBatchTask != null)
+ nextBatchTask.cancel(true);
+ } finally {
+ if (scanner != null)
+ ret = scanner.close();
+ else
+ ret = true;
+ }
+ return ret;
+ }
+}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 2ef408754c..dc5d6842fd 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -1537,7 +1537,14 @@ public synchronized void
initiateMajorCompaction(MajorCompactionReason reason) {
majorCompactionQueued.add(reason);
- getTabletResources().executeMajorCompaction(getExtent(), new
CompactionRunner(this, reason));
+ try {
+ getTabletResources().executeMajorCompaction(getExtent(), new
CompactionRunner(this, reason));
+ } catch (RuntimeException t) {
+ log.debug("removing {} because we encountered an exception enqueing the
CompactionRunner",
+ reason, t);
+ majorCompactionQueued.remove(reason);
+ throw t;
+ }
}
/**
diff --git a/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java
b/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java
index e807dfb17f..320b9c23d2 100644
--- a/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java
+++ b/test/src/main/java/org/apache/accumulo/test/IMMLGBenchmark.java
@@ -68,7 +68,7 @@ public static void main(String[] args) throws Exception {
}
for (Entry<String,Stat> entry : stats.entrySet()) {
- System.out.printf("%20s : %6.2f\n", entry.getKey(),
entry.getValue().getAverage());
+ System.out.printf("%20s : %6.2f\n", entry.getKey(),
entry.getValue().mean());
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
index cbeb1e4fc8..907f2c60a0 100644
---
a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
+++
b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
@@ -606,9 +606,8 @@ private static void calcTabletStats(Connector conn, String
table, Authorizations
}
private static void printStat(String desc, Stat s) {
- System.out.printf(
- "\t\tDescription: [%30s] average: %,6.2f std dev: %,6.2f min: %,d
max: %,d %n", desc,
- s.getAverage(), s.getStdDev(), s.getMin(), s.getMax());
+ System.out.printf("\t\tDescription: [%30s] average: %,6.2f min: %,d max:
%,d %n", desc,
+ s.mean(), s.min(), s.max());
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services