ctubbsii commented on a change in pull request #1031: fixes #1024 Do less work 
in the wait command
URL: https://github.com/apache/fluo/pull/1031#discussion_r181485475
 
 

 ##########
 File path: modules/command/src/main/java/org/apache/fluo/command/FluoWait.java
 ##########
 @@ -15,72 +15,90 @@
 
 package org.apache.fluo.command;
 
-import com.google.common.collect.Iterables;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.exceptions.FluoException;
 import org.apache.fluo.core.client.FluoAdminImpl;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.impl.Notification;
+import org.apache.fluo.core.util.UtilWaitThread;
+import org.apache.fluo.core.worker.finder.hash.TableRange;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class FluoWait {
 
   private static final Logger log = LoggerFactory.getLogger(FluoWait.class);
-  private static final long MIN_SLEEP_SEC = 10;
-  private static final long MAX_SLEEP_SEC = 300;
+  private static final long MIN_SLEEP_MS = 250;
+  private static final long MAX_SLEEP_MS = 300 * 1000;
 
-  private static long calculateSleep(long notifyCount) {
-    long sleep = notifyCount / 500;
-    if (sleep < MIN_SLEEP_SEC) {
-      return MIN_SLEEP_SEC;
-    } else if (sleep > MAX_SLEEP_SEC) {
-      return MAX_SLEEP_SEC;
-    }
-    return sleep;
+  private static List<TableRange> getRanges(Environment env)
+      throws TableNotFoundException, AccumuloSecurityException, 
AccumuloException {
+    List<TableRange> ranges =
+        
TableRange.fromTexts(env.getConnector().tableOperations().listSplits(env.getTable()));
+    Collections.shuffle(ranges);
+    return ranges;
   }
 
-  private static long countNotifications(Environment env) {
-    Scanner scanner;
+  private static boolean hasNotifications(Environment env, TableRange range)
+      throws TableNotFoundException {
+    Scanner scanner = null;
     try {
       scanner = env.getConnector().createScanner(env.getTable(), 
env.getAuthorizations());
-    } catch (TableNotFoundException e) {
-      log.error("An exception was thrown -", e);
-      throw new FluoException(e);
+      scanner.setRange(range.getRange());
+      Notification.configureScanner(scanner);
+
+      return scanner.iterator().hasNext();
+    } finally {
+      if (scanner != null) {
+        scanner.close();
+      }
     }
+  }
+
+  private static boolean waitTillNoNotifications(Environment env, TableRange 
range)
+      throws TableNotFoundException {
+    boolean sawNotifications = false;
+    long retryTime = MIN_SLEEP_MS;
 
-    Notification.configureScanner(scanner);
+    log.debug("Scanning tablet {} for notifications", range);
 
-    return Iterables.size(scanner);
+    long start = System.currentTimeMillis();
+    while (hasNotifications(env, range)) {
+      sawNotifications = true;
+      long sleepTime = Math.max(System.currentTimeMillis() - start, retryTime);
+      log.debug("Tablet {} had notfications, will rescan in {}ms", range, 
sleepTime);
+      UtilWaitThread.sleep(sleepTime);
+      retryTime = Math.min(MAX_SLEEP_MS, (long) (retryTime * 1.5));
+      start = System.currentTimeMillis();
+    }
+
+    return sawNotifications;
   }
 
   private static void waitUntilFinished(FluoConfiguration config) {
     try (Environment env = new Environment(config)) {
-      log.info("The wait command will exit when all notifications are 
processed");
-      while (true) {
+      List<TableRange> ranges = getRanges(env);
+      outer: while (true) {
         long ts1 = 
env.getSharedResources().getOracleClient().getStamp().getTxTimestamp();
-        long ntfyCount = countNotifications(env);
-        long ts2 = 
env.getSharedResources().getOracleClient().getStamp().getTxTimestamp();
-        if (ntfyCount == 0 && ts1 == (ts2 - 1)) {
-          log.info("All processing has finished!");
-          break;
+        for (TableRange range : ranges) {
+          boolean sawNotifications = waitTillNoNotifications(env, range);
+          if (sawNotifications) {
+            ranges = getRanges(env);
+            continue outer;
+          }
         }
+        long ts2 = 
env.getSharedResources().getOracleClient().getStamp().getTxTimestamp();
 
-        try {
-          long sleepSec = calculateSleep(ntfyCount);
-          log.info("{} notifications are still outstanding.  Will try again in 
{} seconds...",
-              ntfyCount, sleepSec);
-          Thread.sleep(1000 * sleepSec);
-        } catch (InterruptedException e) {
-          log.error("Sleep was interrupted!  Exiting...");
-          System.exit(-1);
+        if (ts1 == (ts2 - 1)) {
 
 Review comment:
   `ts2 - ts1 == 1` is slightly more readable.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to