keith-turner closed pull request #1031: fixes #1024 Do less work in the wait 
command
URL: https://github.com/apache/fluo/pull/1031
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java 
b/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java
index 2bba0c77..9c0b191b 100644
--- a/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java
+++ b/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -15,72 +15,105 @@
 
 package org.apache.fluo.command;
 
-import com.google.common.collect.Iterables;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.exceptions.FluoException;
 import org.apache.fluo.core.client.FluoAdminImpl;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.impl.Notification;
+import org.apache.fluo.core.util.UtilWaitThread;
+import org.apache.fluo.core.worker.finder.hash.TableRange;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.util.concurrent.TimeUnit.MINUTES;
+
 public class FluoWait {
 
   private static final Logger log = LoggerFactory.getLogger(FluoWait.class);
-  private static final long MIN_SLEEP_SEC = 10;
-  private static final long MAX_SLEEP_SEC = 300;
-
-  private static long calculateSleep(long notifyCount) {
-    long sleep = notifyCount / 500;
-    if (sleep < MIN_SLEEP_SEC) {
-      return MIN_SLEEP_SEC;
-    } else if (sleep > MAX_SLEEP_SEC) {
-      return MAX_SLEEP_SEC;
-    }
-    return sleep;
+  private static final long MIN_SLEEP_MS = 250;
+  private static final long MAX_SLEEP_MS = MINUTES.toMillis(5);
+
+  private static List<TableRange> getRanges(Environment env)
+      throws TableNotFoundException, AccumuloSecurityException, 
AccumuloException {
+    List<TableRange> ranges =
+        
TableRange.fromTexts(env.getConnector().tableOperations().listSplits(env.getTable()));
+    Collections.shuffle(ranges);
+    return ranges;
   }
 
-  private static long countNotifications(Environment env) {
-    Scanner scanner;
+  private static boolean hasNotifications(Environment env, TableRange range)
+      throws TableNotFoundException {
+    Scanner scanner = null;
     try {
       scanner = env.getConnector().createScanner(env.getTable(), 
env.getAuthorizations());
-    } catch (TableNotFoundException e) {
-      log.error("An exception was thrown -", e);
-      throw new FluoException(e);
+      scanner.setRange(range.getRange());
+      Notification.configureScanner(scanner);
+
+      return scanner.iterator().hasNext();
+    } finally {
+      if (scanner != null) {
+        scanner.close();
+      }
     }
+  }
+
+  /**
+   * Wait until a range has no notifications.
+   *
+   * @return true if notifications were ever seen while waiting
+   */
+  private static boolean waitTillNoNotifications(Environment env, TableRange 
range)
+      throws TableNotFoundException {
+    boolean sawNotifications = false;
+    long retryTime = MIN_SLEEP_MS;
 
-    Notification.configureScanner(scanner);
+    log.debug("Scanning tablet {} for notifications", range);
 
-    return Iterables.size(scanner);
+    long start = System.currentTimeMillis();
+    while (hasNotifications(env, range)) {
+      sawNotifications = true;
+      long sleepTime = Math.max(System.currentTimeMillis() - start, retryTime);
+      log.debug("Tablet {} had notfications, will rescan in {}ms", range, 
sleepTime);
+      UtilWaitThread.sleep(sleepTime);
+      retryTime = Math.min(MAX_SLEEP_MS, (long) (retryTime * 1.5));
+      start = System.currentTimeMillis();
+    }
+
+    return sawNotifications;
   }
 
+  /**
+   * Wait until a scan of the table completes without seeing notifications AND 
without the Oracle
+   * issuing any timestamps during the scan.
+   */
   private static void waitUntilFinished(FluoConfiguration config) {
     try (Environment env = new Environment(config)) {
-      log.info("The wait command will exit when all notifications are 
processed");
-      while (true) {
+      List<TableRange> ranges = getRanges(env);
+
+      outer: while (true) {
         long ts1 = 
env.getSharedResources().getOracleClient().getStamp().getTxTimestamp();
-        long ntfyCount = countNotifications(env);
-        long ts2 = 
env.getSharedResources().getOracleClient().getStamp().getTxTimestamp();
-        if (ntfyCount == 0 && ts1 == (ts2 - 1)) {
-          log.info("All processing has finished!");
-          break;
+        for (TableRange range : ranges) {
+          boolean sawNotifications = waitTillNoNotifications(env, range);
+          if (sawNotifications) {
+            ranges = getRanges(env);
+            // This range had notifications. Processing those notifications 
may have created
+            // notifications in previously scanned ranges, so start over.
+            continue outer;
+          }
         }
+        long ts2 = 
env.getSharedResources().getOracleClient().getStamp().getTxTimestamp();
 
-        try {
-          long sleepSec = calculateSleep(ntfyCount);
-          log.info("{} notifications are still outstanding.  Will try again in 
{} seconds...",
-              ntfyCount, sleepSec);
-          Thread.sleep(1000 * sleepSec);
-        } catch (InterruptedException e) {
-          log.error("Sleep was interrupted!  Exiting...");
-          System.exit(-1);
+        // Check to ensure the Oracle issued no timestamps during the scan for 
notifications.
+        if (ts2 - ts1 == 1) {
+          break;
         }
       }
-    } catch (FluoException e) {
-      log.error(e.getMessage());
-      System.exit(-1);
     } catch (Exception e) {
       log.error("An exception was thrown -", e);
       System.exit(-1);
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java 
b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
index 4690fd5a..3cb9d497 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
@@ -26,6 +26,11 @@
 import java.util.Map;
 import java.util.function.Function;
 
+import com.google.common.collect.Iterables;
+import com.google.gson.FieldNamingPolicy;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonIOException;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.security.Authorizations;
@@ -40,12 +45,6 @@
 import org.apache.fluo.api.data.RowColumnValue;
 import org.apache.fluo.api.data.Span;
 
-import com.google.common.collect.Iterables;
-import com.google.gson.FieldNamingPolicy;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonIOException;
-
 public class ScanUtil {
   public static final String FLUO_VALUE = "value";
   public static final String FLUO_COLUMN_VISIBILITY = "visibility";
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionManager.java
 
b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionManager.java
index 4474c780..89647f68 100644
--- 
a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionManager.java
+++ 
b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/PartitionManager.java
@@ -199,7 +199,7 @@ private void updatePartitionInfo() {
       List<Bytes> zkSplits = new ArrayList<>();
       SerializedSplits.deserialize(zkSplits::add, zkSplitData);
 
-      Collection<TableRange> tableRanges = TableRange.toTabletRanges(zkSplits);
+      Collection<TableRange> tableRanges = TableRange.fromBytes(zkSplits);
       PartitionInfo newPI = getGroupInfo(me, children, tableRanges, groupSize);
 
       setPartitionInfo(newPI);
diff --git 
a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/TableRange.java
 
b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/TableRange.java
index a7517f8d..a3c70bb7 100644
--- 
a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/TableRange.java
+++ 
b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/TableRange.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the 
License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
  * or implied. See the License for the specific language governing permissions 
and limitations under
@@ -20,10 +20,12 @@
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.stream.Stream;
 
 import org.apache.accumulo.core.data.Range;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.core.util.ByteUtil;
+import org.apache.fluo.core.util.Hex;
 import org.apache.hadoop.io.Text;
 
 import static java.util.stream.Collectors.toList;
@@ -69,12 +71,14 @@ public boolean contains(Bytes row) {
 
   @Override
   public String toString() {
-    return getPrevEndRow() + " " + getEndRow();
-  }
+    String per = prevEndRow == null ? "-INF" : Hex.encNonAscii(prevEndRow);
+    String er = endRow == null ? "+INF" : Hex.encNonAscii(endRow);
 
+    return "(" + per + " " + er + "]";
+  }
 
-  public static Collection<TableRange> toTabletRanges(Collection<Bytes> rows) {
-    List<Bytes> sortedRows = rows.stream().sorted().collect(toList());
+  private static List<TableRange> fromStream(Stream<Bytes> stream) {
+    List<Bytes> sortedRows = stream.sorted().collect(toList());
     List<TableRange> tablets = new ArrayList<>(sortedRows.size() + 1);
     for (int i = 0; i < sortedRows.size(); i++) {
       tablets.add(new TableRange(i == 0 ? null : sortedRows.get(i - 1), 
sortedRows.get(i)));
@@ -83,9 +87,16 @@ public String toString() {
     tablets.add(new TableRange(
         sortedRows.size() == 0 ? null : sortedRows.get(sortedRows.size() - 1), 
null));
     return tablets;
+
   }
 
+  public static List<TableRange> fromBytes(Collection<Bytes> rows) {
+    return fromStream(rows.stream());
+  }
 
+  public static List<TableRange> fromTexts(Collection<Text> rows) {
+    return fromStream(rows.stream().map(ByteUtil::toBytes));
+  }
 
   public Range getRange() {
     Text tper = 
Optional.ofNullable(prevEndRow).map(ByteUtil::toText).orElse(null);
diff --git 
a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/PartitionManagerTest.java
 
b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/PartitionManagerTest.java
index 3bf5307f..8b380654 100644
--- 
a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/PartitionManagerTest.java
+++ 
b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/PartitionManagerTest.java
@@ -50,7 +50,7 @@ public void testGrouping() {
 
           Collection<Bytes> rows = IntStream.iterate(0, i -> i + 
1000).limit(numSplits)
               .mapToObj(i -> String.format("r%06d", 
i)).map(Bytes::of).collect(toList());
-          Collection<TableRange> tablets = TableRange.toTabletRanges(rows);
+          Collection<TableRange> tablets = TableRange.fromBytes(rows);
 
           Set<String> idCombos = new HashSet<>();
           Map<Integer, RangeSet> groupTablets = new HashMap<>();
diff --git 
a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/TableRangeTest.java
 
b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/TableRangeTest.java
index 637186f7..0018d099 100644
--- 
a/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/TableRangeTest.java
+++ 
b/modules/core/src/test/java/org/apache/fluo/core/worker/finder/hash/TableRangeTest.java
@@ -69,8 +69,7 @@ public void testMultiple() {
     Bytes sp2 = Bytes.of("m1");
     Bytes sp3 = Bytes.of("r1");
 
-    Collection<TableRange> trc1 =
-        new HashSet<>(TableRange.toTabletRanges(Arrays.asList(sp2, sp3, sp1)));
+    Collection<TableRange> trc1 = new 
HashSet<>(TableRange.fromBytes(Arrays.asList(sp2, sp3, sp1)));
 
     Assert.assertEquals(4, trc1.size());
     Assert.assertTrue(trc1.contains(new TableRange(null, sp1)));
@@ -78,7 +77,7 @@ public void testMultiple() {
     Assert.assertTrue(trc1.contains(new TableRange(sp2, sp3)));
     Assert.assertTrue(trc1.contains(new TableRange(sp3, null)));
 
-    Collection<TableRange> trc2 = new 
HashSet<>(TableRange.toTabletRanges(Collections.emptyList()));
+    Collection<TableRange> trc2 = new 
HashSet<>(TableRange.fromBytes(Collections.emptyList()));
     Assert.assertEquals(1, trc2.size());
     Assert.assertTrue(trc2.contains(new TableRange(null, null)));
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to