bbux-atg closed pull request #510: ACCUMULO-4074
URL: https://github.com/apache/accumulo/pull/510
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index fc423acca5..42fc87a817 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -436,6 +436,8 @@
   TSERV_READ_AHEAD_MAXCONCURRENT("tserver.readahead.concurrent.max", "16", 
PropertyType.COUNT,
       "The maximum number of concurrent read ahead that will execute. This 
effectively"
           + " limits the number of long running scans that can run 
concurrently per tserver."),
+  TSERV_READ_AHEAD_PREFIX("tserver.readahead.concurrent.table.", null, 
PropertyType.PREFIX,
+      "Properties in this category allow overriding of table specific read 
ahead pools"),
   
TSERV_METADATA_READ_AHEAD_MAXCONCURRENT("tserver.metadata.readahead.concurrent.max",
 "8",
       PropertyType.COUNT,
       "The maximum number of concurrent metadata read ahead that will 
execute."),
@@ -542,6 +544,9 @@
   TSERV_SUMMARY_RETRIEVAL_THREADS("tserver.summary.retrieval.threads", "10", 
PropertyType.COUNT,
       "The number of threads on each tablet server available to retrieve"
           + " summary data, that is not currently in cache, from RFiles."),
+  TSERV_SESSION_COMPARATOR_CLASS("tserver.summary.comparator.class", "", 
PropertyType.CLASSNAME,
+      "A customizable Scan session comparator. Note that by default, the value 
is empty"
+          + " and thus uses no session comparator"),
 
   // accumulo garbage collector properties
   GC_PREFIX("gc.", null, PropertyType.PREFIX,
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
 
b/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
new file mode 100644
index 0000000000..f688010f16
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AccumuloUncaughtExceptionHandler implements 
UncaughtExceptionHandler {
+
+  private static final Logger log = 
LoggerFactory.getLogger(AccumuloUncaughtExceptionHandler.class);
+
+  @Override
+  public void uncaughtException(Thread t, Throwable e) {
+
+    log.error(String.format("Caught an exception in %s.  Shutting down.", t), 
e);
+  }
+
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java 
b/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
index d8b307c36c..0b4730cc70 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
@@ -26,6 +26,8 @@
 public class NamingThreadFactory implements ThreadFactory {
   private static final Logger log = 
LoggerFactory.getLogger(NamingThreadFactory.class);
 
+  private static final AccumuloUncaughtExceptionHandler uncaughtHandler = new 
AccumuloUncaughtExceptionHandler();
+
   private AtomicInteger threadNum = new AtomicInteger(1);
   private String name;
 
@@ -35,7 +37,10 @@ public NamingThreadFactory(String name) {
 
   @Override
   public Thread newThread(Runnable r) {
-    return new Daemon(new LoggingRunnable(log, r), name + " " + 
threadNum.getAndIncrement());
+    Thread thread = new Daemon(new LoggingRunnable(log, r),
+        name + " " + threadNum.getAndIncrement());
+    thread.setUncaughtExceptionHandler(uncaughtHandler);
+    return thread;
   }
 
 }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 71cebbe014..9a91e8723d 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -32,10 +32,15 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.NamespaceNotFoundException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.impl.KeyExtent;
@@ -62,6 +67,7 @@
 import org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy;
 import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
 import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
+import org.apache.accumulo.tserver.session.SessionComparator;
 import org.apache.accumulo.tserver.tablet.Tablet;
 import org.apache.htrace.wrappers.TraceExecutorService;
 import org.slf4j.Logger;
@@ -95,6 +101,8 @@
   private final ExecutorService summaryRemotePool;
   private final Map<String,ExecutorService> threadPools = new TreeMap<>();
 
+  private final Map<String,ExecutorService> tableThreadPools = new TreeMap<>();
+
   private final ConcurrentHashMap<KeyExtent,RunnableStartedAt> 
activeAssignments;
 
   private final FileManager fileManager;
@@ -142,12 +150,34 @@ public void run() {
     return result;
   }
 
-  private ExecutorService createEs(int max, String name) {
-    return addEs(name, Executors.newFixedThreadPool(max, new 
NamingThreadFactory(name)));
-  }
+  private ExecutorService addEs(final int maxThreads, final Property prefix,
+      final String propertyName, String name, final ThreadPoolExecutor tp) {
+    ExecutorService result = addEs(name, tp);
+    SimpleTimer.getInstance(tserver.getConfiguration()).schedule(new 
Runnable() {
+      @Override
+      public void run() {
+        try {
+          int max = maxThreads;
+          for (Entry<String,String> entry : conf.getSystemConfiguration()
+              .getAllPropertiesWithPrefix(prefix).entrySet()) {
+            if (entry.getKey().equals(propertyName)) {
+              if (null != entry.getValue() && entry.getValue().length() != 0)
+                max = Integer.parseInt(entry.getValue());
+              break;
+            }
+          }
+          if (tp.getMaximumPoolSize() != max) {
+            log.info("Changing {} to {}", maxThreads, max);
+            tp.setCorePoolSize(max);
+            tp.setMaximumPoolSize(max);
+          }
+        } catch (Throwable t) {
+          log.error("Failed to change thread pool size", t);
+        }
+      }
 
-  private ExecutorService createEs(Property max, String name) {
-    return createEs(max, name, new LinkedBlockingQueue<>());
+    }, 1000, 10 * 1000);
+    return result;
   }
 
   private ExecutorService createIdlingEs(Property max, String name, long 
timeout,
@@ -160,6 +190,86 @@ private ExecutorService createIdlingEs(Property max, 
String name, long timeout,
     return addEs(max, name, tp);
   }
 
+  private ExecutorService createEs(int max, String name) {
+    return addEs(name, Executors.newFixedThreadPool(max, new 
NamingThreadFactory(name)));
+  }
+
+  private ExecutorService createEs(Property max, String name) {
+    return createEs(max, name, new LinkedBlockingQueue<>());
+  }
+
+  private ExecutorService createEs(int maxThreads, Property prefix, String 
propertyName,
+      String name, BlockingQueue<Runnable> queue) {
+    ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 0L,
+        TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name));
+    return addEs(maxThreads, prefix, propertyName, name, tp);
+  }
+
+  /**
+   * If we cannot instantiate the comparator we will default to the linked 
blocking queue comparator
+   * 
+   * @param max
+   *          max number of threads
+   * @param comparator
+   *          comparator property
+   * @param name
+   *          name passed to the thread factory
+   * @return priority executor
+   */
+  private ExecutorService createPriorityExecutor(Property prefix, String 
propertyName,
+      final int maxThreads, Property comparator, String name) {
+
+    String comparatorClazz = conf.getSystemConfiguration().get(comparator);
+
+    if (null == comparatorClazz || comparatorClazz.length() == 0) {
+      log.debug("Using no comparator");
+      return createEs(maxThreads, prefix, propertyName, name, new 
LinkedBlockingQueue<>());
+    } else {
+      SessionComparator comparatorObj = 
Property.createInstanceFromPropertyName(
+          conf.getSystemConfiguration(), comparator, SessionComparator.class, 
null);
+      if (null != comparatorObj) {
+        log.debug("Using priority based scheduler {}", comparatorClazz);
+        return createEs(maxThreads, prefix, propertyName, name,
+            new PriorityBlockingQueue<>(maxThreads, comparatorObj));
+      } else {
+        log.debug("Using no comparator");
+        return createEs(maxThreads, prefix, propertyName, name, new 
LinkedBlockingQueue<>());
+      }
+    }
+  }
+
+  /**
+   * If we cannot instantiate the comparator we will default to the linked 
blocking queue comparator
+   * 
+   * @param max
+   *          max number of threads
+   * @param comparator
+   *          comparator property
+   * @param name
+   *          name passed to the thread factory
+   * @return priority executor
+   */
+  private ExecutorService createPriorityExecutor(Property max, Property 
comparator, String name) {
+    int maxThreads = conf.getSystemConfiguration().getCount(max);
+
+    String comparatorClazz = conf.getSystemConfiguration().get(comparator);
+
+    if (null == comparatorClazz || comparatorClazz.length() == 0) {
+      log.debug("Using no comparator");
+      return createEs(max, name, new LinkedBlockingQueue<>());
+    } else {
+      SessionComparator comparatorObj = 
Property.createInstanceFromPropertyName(
+          conf.getSystemConfiguration(), comparator, SessionComparator.class, 
null);
+      if (null != comparatorObj) {
+        log.debug("Using priority based scheduler {}", comparatorClazz);
+        return createEs(max, name, new PriorityBlockingQueue<>(maxThreads, 
comparatorObj));
+      } else {
+        log.debug("Using no comparator");
+        return createEs(max, name, new LinkedBlockingQueue<>());
+      }
+    }
+  }
+
   private ExecutorService createEs(Property max, String name, 
BlockingQueue<Runnable> queue) {
     int maxThreads = conf.getSystemConfiguration().getCount(max);
     ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 0L,
@@ -172,6 +282,35 @@ private ExecutorService createEs(int min, int max, int 
timeout, String name) {
         new LinkedBlockingQueue<>(), new NamingThreadFactory(name)));
   }
 
+  /**
+   * Creates table specific thread pool for executing scan threads
+   * 
+   * @param instance
+   *          ZK instance.
+   * @param acuConf
+   *          accumulo configuration.
+   * @throws NamespaceNotFoundException
+   *           Error thrown by tables.getTableId when a name space is not 
found.
+   * @throws TableNotFoundException
+   *           Error thrown by tables.getTableId when a table is not found.
+   */
+  protected void createTablePools(Instance instance, AccumuloConfiguration 
acuConf)
+      throws NamespaceNotFoundException, TableNotFoundException {
+    for (Entry<String,String> entry : acuConf
+        
.getAllPropertiesWithPrefix(Property.TSERV_READ_AHEAD_PREFIX).entrySet()) {
+      final String tableName = entry.getKey()
+          .substring(Property.TSERV_READ_AHEAD_PREFIX.getKey().length());
+      if (null == entry.getValue() || entry.getValue().length() == 0) {
+        throw new RuntimeException("Read ahead prefix is inproperly 
configured");
+      }
+      final int maxThreads = Integer.parseInt(entry.getValue());
+      final String tableId = Tables.getTableId(instance, 
tableName).canonicalID();
+      tableThreadPools.put(tableId,
+          createPriorityExecutor(Property.TSERV_READ_AHEAD_PREFIX, 
entry.getKey(), maxThreads,
+              Property.TSERV_SESSION_COMPARATOR_CLASS, tableName + " specific 
read ahead"));
+    }
+  }
+
   public TabletServerResourceManager(TabletServer tserver, VolumeManager fs) {
     this.tserver = tserver;
     this.conf = tserver.getServerConfigurationFactory();
@@ -251,7 +390,8 @@ public TabletServerResourceManager(TabletServer tserver, 
VolumeManager fs) {
 
     activeAssignments = new ConcurrentHashMap<>();
 
-    readAheadThreadPool = createEs(Property.TSERV_READ_AHEAD_MAXCONCURRENT, 
"tablet read ahead");
+    readAheadThreadPool = 
createPriorityExecutor(Property.TSERV_READ_AHEAD_MAXCONCURRENT,
+        Property.TSERV_SESSION_COMPARATOR_CLASS, "tablet read ahead");
     defaultReadAheadThreadPool = 
createEs(Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT,
         "metadata tablets read ahead");
 
@@ -262,6 +402,13 @@ public TabletServerResourceManager(TabletServer tserver, 
VolumeManager fs) {
     summaryParitionPool = 
createIdlingEs(Property.TSERV_SUMMARY_PARTITION_THREADS,
         "summary partition", 60, TimeUnit.SECONDS);
 
+    try {
+      createTablePools(tserver.getInstance(), acuConf);
+    } catch (NamespaceNotFoundException e) {
+      throw new RuntimeException(e);
+    } catch (TableNotFoundException e) {
+      throw new RuntimeException(e);
+    }
     int maxOpenFiles = acuConf.getCount(Property.TSERV_SCAN_MAX_OPENFILES);
 
     Cache<String,Long> fileLenCache = CacheBuilder.newBuilder()
@@ -792,7 +939,10 @@ public void executeMajorCompaction(KeyExtent tablet, 
Runnable compactionTask) {
   }
 
   public void executeReadAhead(KeyExtent tablet, Runnable task) {
-    if (tablet.isRootTablet()) {
+    ExecutorService service = 
tableThreadPools.get(tablet.getTableId().canonicalID());
+    if (null != service) {
+      service.execute(task);
+    } else if (tablet.isRootTablet()) {
       task.run();
     } else if (tablet.isMeta()) {
       defaultReadAheadThreadPool.execute(task);
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/DefaultSessionComparator.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/DefaultSessionComparator.java
new file mode 100644
index 0000000000..d584242fab
--- /dev/null
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/DefaultSessionComparator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.session;
+
+public class DefaultSessionComparator extends SessionComparator {
+
+  @Override
+  public int compareSession(Session sessionA, Session sessionB) {
+
+    final long startTimeFirst = sessionA.startTime;
+    final long startTimeSecond = sessionB.startTime;
+
+    // use the lowest max idle time
+    final long maxIdle = sessionA.maxIdleAccessTime < 
sessionB.maxIdleAccessTime
+        ? sessionA.maxIdleAccessTime
+        : sessionB.maxIdleAccessTime;
+
+    final long currentTime = System.currentTimeMillis();
+
+    /*
+     * Multiply by -1 so that we have a sensical comparison. This means that 
if comparison < 0,
+     * sessionA is newer. If comparison > 0, this means that session B is newer
+     */
+    int comparison = -1 * Long.compare(startTimeFirst, startTimeSecond);
+
+    if (!(sessionA.lastExecTime == -1 && sessionB.lastExecTime == -1)) {
+      if (comparison >= 0) {
+        long idleTimeA = currentTime - sessionA.lastExecTime;
+
+        /*
+         * If session B is newer, let's make sure that we haven't reached the 
max idle time, where
+         * we have to begin aging A
+         */
+        if (idleTimeA > sessionA.maxIdleAccessTime) {
+          comparison = -1 * Long.valueOf(idleTimeA - maxIdle).intValue();
+        }
+      } else {
+        long idleTimeB = currentTime - sessionB.lastExecTime;
+
+        /*
+         * If session A is newer, let's make sure that B hasn't reached the 
max idle time, where we
+         * have to begin aging A
+         */
+        if (idleTimeB > sessionA.maxIdleAccessTime) {
+          comparison = 1 * Long.valueOf(idleTimeB - maxIdle).intValue();
+        }
+      }
+    }
+
+    return comparison;
+  }
+
+}
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
index 4285a51446..981832af6a 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
@@ -71,4 +71,14 @@ public boolean cleanup() {
     // the cancellation should provide us the safety to return true here
     return true;
   }
+
+  /**
+   * Ensure that the runnable actually runs
+   */
+  @Override
+  public void run() {
+    super.run();
+    lookupTask.run();
+  }
+
 }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
index eed45cf073..32f7e34851 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
@@ -19,15 +19,17 @@
 import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.server.rpc.TServerUtils;
 
-public class Session {
+public abstract class Session implements Runnable {
 
   enum State {
     NEW, UNRESERVED, RESERVED, REMOVED
   }
 
   public final String client;
-  long lastAccessTime;
+  public long lastAccessTime;
+  protected volatile long lastExecTime = -1;
   public long startTime;
+  public long maxIdleAccessTime;
   State state = State.NEW;
   private final TCredentials credentials;
 
@@ -47,4 +49,18 @@ public TCredentials getCredentials() {
   public boolean cleanup() {
     return true;
   }
+
+  @Override
+  public void run() {
+    lastExecTime = System.currentTimeMillis();
+  }
+
+  public void setLastExecutionTime(long lastExecTime) {
+    this.lastExecTime = lastExecTime;
+  }
+
+  public long getLastExecutionTime() {
+    return lastExecTime;
+  }
+
 }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionComparator.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionComparator.java
new file mode 100644
index 0000000000..dcfa1d4bbd
--- /dev/null
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionComparator.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.session;
+
+import java.util.Comparator;
+
+public abstract class SessionComparator implements Comparator<Runnable> {
+
+  @Override
+  public int compare(Runnable sessionA, Runnable sessionB) {
+    if (sessionA instanceof Session && sessionB instanceof Session)
+      return compareSession((Session) sessionA, (Session) sessionB);
+    else
+      return 0;
+  }
+
+  public abstract int compareSession(final Session sessionA, final Session 
sessionB);
+}
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index 724d23c18f..c48569824d 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@ -231,6 +231,9 @@ private void sweep(final long maxIdle, final long 
maxUpdateIdle) {
             configuredIdle = maxUpdateIdle;
           }
           long idleTime = System.currentTimeMillis() - session.lastAccessTime;
+          if (idleTime > session.maxIdleAccessTime) {
+            session.maxIdleAccessTime = idleTime;
+          }
           if (idleTime > configuredIdle) {
             log.info("Closing idle session from user={}, client={}, 
idle={}ms", session.getUser(),
                 session.client, idleTime);
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleRangePriorityComparator.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleRangePriorityComparator.java
new file mode 100644
index 0000000000..28e6ef22bb
--- /dev/null
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleRangePriorityComparator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.session;
+
+public class SingleRangePriorityComparator extends DefaultSessionComparator {
+
+  @Override
+  public int compareSession(Session sessionA, Session sessionB) {
+    int priority = super.compareSession(sessionA, sessionB);
+
+    if (sessionA instanceof MultiScanSession && sessionB instanceof 
ScanSession) {
+      if (priority < 0) {
+        priority *= -1;
+      }
+    } else if (sessionB instanceof MultiScanSession && sessionA instanceof 
ScanSession) {
+      if (priority > 0) {
+        priority *= -1;
+      }
+    }
+    return priority;
+  }
+}
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 9ad246cc3a..083ea77251 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -1537,7 +1537,13 @@ public synchronized void 
initiateMajorCompaction(MajorCompactionReason reason) {
 
     majorCompactionQueued.add(reason);
 
-    getTabletResources().executeMajorCompaction(getExtent(), new 
CompactionRunner(this, reason));
+    try {
+      getTabletResources().executeMajorCompaction(getExtent(), new 
CompactionRunner(this, reason));
+    } catch (RuntimeException t) {
+      log.debug("removing {} because we encountered an exception enqueing the 
CompactionRunner", reason, t);
+      majorCompactionQueued.remove(reason);
+      throw t;
+    }
   }
 
   /**
diff --git 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionComparatorTest.java
 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionComparatorTest.java
new file mode 100644
index 0000000000..f7cc5cd2aa
--- /dev/null
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionComparatorTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.session;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+public class SessionComparatorTest {
+
+  @Test
+  public void testSingleScanMultiScanNoRun() {
+    long time = System.currentTimeMillis();
+    ScanSession sessionA = emptyScanSession();
+    sessionA.lastAccessTime = 0;
+    sessionA.maxIdleAccessTime = 0;
+    sessionA.startTime = time - 1000;
+
+    MultiScanSession sessionB = emptyMultiScanSession();
+    sessionB.lastAccessTime = 0;
+    sessionB.maxIdleAccessTime = 1000;
+    sessionB.startTime = time - 800;
+
+    ScanSession sessionC = emptyScanSession();
+    sessionC.lastAccessTime = 0;
+    sessionC.maxIdleAccessTime = 1000;
+    sessionC.startTime = time - 800;
+
+    // a has never run, so it should be given priority
+    SingleRangePriorityComparator comparator = new 
SingleRangePriorityComparator();
+    assertEquals(-1, comparator.compareSession(sessionA, sessionB));
+
+    // b is before a in queue, b has never run, but because a is single
+    // we should be given priority
+    assertEquals(1, comparator.compareSession(sessionB, sessionA));
+
+    // now let's assume they have been executed
+
+    assertEquals(1, comparator.compareSession(sessionA, sessionC));
+
+    assertEquals(0, comparator.compareSession(sessionC, sessionC));
+
+  }
+
+  @Test
+  public void testSingleScanRun() {
+    long time = System.currentTimeMillis();
+    ScanSession sessionA = emptyScanSession();
+    sessionA.lastAccessTime = 0;
+    sessionA.setLastExecutionTime(time);
+    sessionA.maxIdleAccessTime = 1000;
+    sessionA.startTime = time - 1000;
+
+    ScanSession sessionB = emptyScanSession();
+    sessionB.lastAccessTime = 0;
+    sessionB.setLastExecutionTime(time - 2000);
+    sessionB.maxIdleAccessTime = 1000;
+    sessionB.startTime = time - 800;
+
+    // b is newer
+    SingleRangePriorityComparator comparator = new 
SingleRangePriorityComparator();
+    assertEquals(1, comparator.compareSession(sessionA, sessionB));
+
+    // b is before a in queue, b has never run, but because a is single
+    // we should be given priority
+    assertTrue(comparator.compareSession(sessionB, sessionA) < 0);
+
+    sessionB.setLastExecutionTime(time);
+    sessionA.setLastExecutionTime(time - 2000);
+
+    assertTrue(comparator.compareSession(sessionA, sessionB) < 0);
+
+    // b is before a in queue, b has never run, but because a is single
+    // we should be given priority
+    int comp = comparator.compareSession(sessionB, sessionA);
+    assertTrue("comparison is " + comp, comp >= 1);
+  }
+
+  @Test
+  public void testSingleScanMultiScanRun() {
+    long time = System.currentTimeMillis();
+    ScanSession sessionA = emptyScanSession();
+    sessionA.lastAccessTime = 0;
+    sessionA.setLastExecutionTime(time);
+    sessionA.maxIdleAccessTime = 1000;
+    sessionA.startTime = time - 1000;
+
+    MultiScanSession sessionB = emptyMultiScanSession();
+    sessionB.lastAccessTime = 0;
+    sessionB.setLastExecutionTime(time - 2000);
+    sessionB.maxIdleAccessTime = 1000;
+    sessionB.startTime = time - 800;
+
+    // b is newer
+    SingleRangePriorityComparator comparator = new 
SingleRangePriorityComparator();
+    assertEquals(-1, comparator.compareSession(sessionA, sessionB));
+
+    // b is before a in queue, b has never run, but because a is single
+    // we should be given priority
+    assertTrue(comparator.compareSession(sessionB, sessionA) > 0);
+
+    sessionB.setLastExecutionTime(time);
+    sessionA.setLastExecutionTime(time - 2000);
+
+    assertTrue(comparator.compareSession(sessionA, sessionB) < 0);
+
+    // b is before a in queue, b has never run, but because a is single
+    // we should be given priority
+    int comp = comparator.compareSession(sessionB, sessionA);
+    assertTrue("comparison is " + comp, comp > 0);
+  }
+
+  @Test
+  public void testMultiScanRun() {
+    long time = System.currentTimeMillis();
+    ScanSession sessionA = emptyScanSession();
+    sessionA.lastAccessTime = 0;
+    sessionA.setLastExecutionTime(time);
+    sessionA.maxIdleAccessTime = 1000;
+    sessionA.startTime = time - 1000;
+
+    ScanSession sessionB = emptyScanSession();
+    sessionB.lastAccessTime = 0;
+    sessionB.setLastExecutionTime(time - 2000);
+    sessionB.maxIdleAccessTime = 1000;
+    sessionB.startTime = time - 800;
+
+    // b is newer
+    SingleRangePriorityComparator comparator = new 
SingleRangePriorityComparator();
+    assertEquals(1, comparator.compareSession(sessionA, sessionB));
+
+    // b is before a in queue, b has never run, but because a is single
+    // we should be given priority
+    assertTrue(comparator.compareSession(sessionB, sessionA) < 0);
+
+    sessionB.setLastExecutionTime(time);
+    sessionA.setLastExecutionTime(time - 2000);
+
+    
+    assertTrue(comparator.compareSession(sessionA, sessionB) < 0);
+
+    // b is before a in queue, b has never run, but because a is single
+    // we should be given priority
+    int comp = comparator.compareSession(sessionB, sessionA);
+    assertTrue("comparison is " + comp, comp >= 1);
+  }
+
+  private static ScanSession emptyScanSession() {
+    return new ScanSession(null, null, null, null, null, null, 0, 0, null);
+  }
+
+  private static MultiScanSession emptyMultiScanSession() {
+    return new MultiScanSession(null, null, null, null, null, null, null, 0, 
null);
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to