dlmarion commented on code in PR #3425:
URL: https://github.com/apache/accumulo/pull/3425#discussion_r1205362670


##########
server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java:
##########
@@ -217,10 +218,16 @@ private static boolean shouldReturnDueToSplit(final 
TabletMetadata tm,
         .collect(Collectors.summarizingLong(Long::longValue)).getSum() > 
splitThreshold;
   }
 
-  private static boolean shouldReturnDueToLocation(final TabletMetadata tm,
+  private boolean shouldReturnDueToLocation(final TabletMetadata tm,
       final Set<TableId> onlineTables, final Set<TServerInstance> current, 
final boolean debug) {
+
+    if (migrations.contains(tm.getExtent())) {
+      return true;
+    }
+
     // is the table supposed to be online or offline?
-    final boolean shouldBeOnline = onlineTables.contains(tm.getTableId());
+    final boolean shouldBeOnline =
+        onlineTables.contains(tm.getTableId()) && tm.getOperationId() == null;

Review Comment:
   You should be able to modify TabletManagerIteratorIT to insert an OPID 
column into the tablet metadata and check that the Tablet gets returned by the 
iterator.



##########
server/manager/src/main/java/org/apache/accumulo/manager/Manager.java:
##########
@@ -706,6 +706,10 @@ TabletGoalState getGoalState(TabletMetadata tm, MergeInfo 
mergeInfo) {
         return TabletGoalState.UNASSIGNED;
       }
 
+      if (tm.getOperationId() != null) {

Review Comment:
   Might be able to modify ManagerAssignmentIT.test to insert an OPID into 
tablet metadata and test this



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java:
##########
@@ -435,198 +432,143 @@ String doFateOperation(FateOperation op, 
List<ByteBuffer> args, Map<String,Strin
     }
   }
 
-  private static class SplitEnv {
-    private final String tableName;
-    private final TableId tableId;
-    private final ExecutorService executor;
-    private final CountDownLatch latch;
-    private final AtomicReference<Exception> exception;
-
-    SplitEnv(String tableName, TableId tableId, ExecutorService executor, 
CountDownLatch latch,
-        AtomicReference<Exception> exception) {
-      this.tableName = tableName;
-      this.tableId = tableId;
-      this.executor = executor;
-      this.latch = latch;
-      this.exception = exception;
-    }
-  }
-
-  private class SplitTask implements Runnable {
-
-    private List<Text> splits;
-    private SplitEnv env;
-
-    SplitTask(SplitEnv env, List<Text> splits) {
-      this.env = env;
-      this.splits = splits;
-    }
+  /**
+   * On the server side the fate operation will exit w/o an error if the 
tablet requested to split
+   * does not exist. When this happens it will also return an empty string. In 
the case where the
+   * fate operation successfully splits the tablet it will return the 
following string. This code
+   * uses this return value to see if it needs to retry finding the tablet.
+   */
+  public static final String SPLIT_SUCCESS_MSG = "SPLIT_SUCCEEDED";
 
-    @Override
-    public void run() {
-      try {
-        if (env.exception.get() != null) {
-          return;
-        }
+  @Override
+  public void addSplits(String tableName, SortedSet<Text> splits)
+      throws AccumuloException, TableNotFoundException, 
AccumuloSecurityException {
 
-        if (splits.size() <= 2) {
-          addSplits(env, new TreeSet<>(splits));
-          splits.forEach(s -> env.latch.countDown());
-          return;
-        }
+    EXISTING_TABLE_NAME.validate(tableName);
 
-        int mid = splits.size() / 2;
+    TableId tableId = context.getTableId(tableName);
 
-        // split the middle split point to ensure that child task split
-        // different tablets and can therefore run in parallel
-        addSplits(env, new TreeSet<>(splits.subList(mid, mid + 1)));
-        env.latch.countDown();
+    // TODO should there be a server side check for this?
+    context.requireNotOffline(tableId, tableName);
 
-        env.executor.execute(new SplitTask(env, splits.subList(0, mid)));
-        env.executor.execute(new SplitTask(env, splits.subList(mid + 1, 
splits.size())));
+    ClientTabletCache tabLocator = ClientTabletCache.getInstance(context, 
tableId);
 
-      } catch (Exception t) {
-        env.exception.compareAndSet(null, t);
-      }
-    }
+    SortedSet<Text> splitsTodo = new TreeSet<>(splits);
+    ExecutorService executor = context.threadPools().createFixedThreadPool(16, 
"addSplits", false);
+    try {
+      while (!splitsTodo.isEmpty()) {
 
-  }
+        tabLocator.invalidateCache();
 
-  @Override
-  public void addSplits(String tableName, SortedSet<Text> partitionKeys)
-      throws TableNotFoundException, AccumuloException, 
AccumuloSecurityException {
-    EXISTING_TABLE_NAME.validate(tableName);
+        Map<KeyExtent,List<Text>> tabletSplits =
+            mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo);
 
-    TableId tableId = context.getTableId(tableName);
-    List<Text> splits = new ArrayList<>(partitionKeys);
+        List<Future<List<Text>>> splitTasks = new ArrayList<>();
 
-    // should be sorted because we copied from a sorted set, but that makes
-    // assumptions about how the copy was done so resort to be sure.
-    Collections.sort(splits);
-    CountDownLatch latch = new CountDownLatch(splits.size());
-    AtomicReference<Exception> exception = new AtomicReference<>(null);
+        for (Entry<KeyExtent,List<Text>> splitsForTablet : 
tabletSplits.entrySet()) {
+          Callable<List<Text>> splitTask = createSplitTask(tableName, 
splitsForTablet);
+          splitTasks.add(executor.submit(splitTask));
+        }
 
-    ExecutorService executor = context.threadPools().createFixedThreadPool(16, 
"addSplits", false);
-    try {
-      executor.execute(
-          new SplitTask(new SplitEnv(tableName, tableId, executor, latch, 
exception), splits));
-
-      while (!latch.await(100, MILLISECONDS)) {
-        if (exception.get() != null) {
-          executor.shutdownNow();
-          Throwable excep = exception.get();
-          // Below all exceptions are wrapped and rethrown. This is done so 
that the user knows what
-          // code path got them here. If the wrapping was not done, the
-          // user would only have the stack trace for the background thread.
-          if (excep instanceof TableNotFoundException) {
-            TableNotFoundException tnfe = (TableNotFoundException) excep;
-            throw new TableNotFoundException(tableId.canonical(), tableName,
-                "Table not found by background thread", tnfe);
-          } else if (excep instanceof TableOfflineException) {
-            log.debug("TableOfflineException occurred in background thread. 
Throwing new exception",
-                excep);
-            throw new TableOfflineException(tableId, tableName);
-          } else if (excep instanceof AccumuloSecurityException) {
-            // base == background accumulo security exception
-            AccumuloSecurityException base = (AccumuloSecurityException) excep;
-            throw new AccumuloSecurityException(base.getUser(), 
base.asThriftException().getCode(),
-                base.getTableInfo(), excep);
-          } else if (excep instanceof AccumuloServerException) {
-            throw new AccumuloServerException((AccumuloServerException) excep);
-          } else if (excep instanceof Error) {
-            throw new Error(excep);
-          } else {
-            throw new AccumuloException(excep);
+        for (var future : splitTasks) {
+          try {
+            var completedSplits = future.get();
+            completedSplits.forEach(splitsTodo::remove);
+          } catch (ExecutionException ee) {
+            Throwable excep = ee.getCause();
+            // Below all exceptions are wrapped and rethrown. This is done so 
that the user knows
+            // what
+            // code path got them here. If the wrapping was not done, the user 
would only have the
+            // stack trace for the background thread.
+            if (excep instanceof TableNotFoundException) {
+              TableNotFoundException tnfe = (TableNotFoundException) excep;
+              throw new TableNotFoundException(tableId.canonical(), tableName,
+                  "Table not found by background thread", tnfe);
+            } else if (excep instanceof TableOfflineException) {
+              log.debug(
+                  "TableOfflineException occurred in background thread. 
Throwing new exception",
+                  excep);
+              throw new TableOfflineException(tableId, tableName);
+            } else if (excep instanceof AccumuloSecurityException) {
+              // base == background accumulo security exception
+              AccumuloSecurityException base = (AccumuloSecurityException) 
excep;
+              throw new AccumuloSecurityException(base.getUser(),
+                  base.asThriftException().getCode(), base.getTableInfo(), 
excep);
+            } else if (excep instanceof AccumuloServerException) {
+              throw new AccumuloServerException((AccumuloServerException) 
excep);
+            } else {
+              throw new AccumuloException(excep);
+            }
+          } catch (InterruptedException e) {
+            throw new IllegalStateException(e);
           }
         }
       }
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(e);
     } finally {
-      executor.shutdown();
+      executor.shutdownNow();
     }
   }
 
-  private void addSplits(SplitEnv env, SortedSet<Text> partitionKeys)
-      throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException,
-      AccumuloServerException, InvalidTabletHostingRequestException {
-
-    ClientTabletCache tabLocator = ClientTabletCache.getInstance(context, 
env.tableId);
-    for (Text split : partitionKeys) {
-      boolean successful = false;
-      int attempt = 0;
-      long locationFailures = 0;
+  private Map<KeyExtent,List<Text>> mapSplitsToTablets(String tableName, 
TableId tableId,
+      ClientTabletCache tabLocator, SortedSet<Text> splitsTodo)
+      throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException {
+    Map<KeyExtent,List<Text>> tabletSplits = new HashMap<>();
 
-      while (!successful) {
+    var iterator = splitsTodo.iterator();
+    while (iterator.hasNext()) {
+      var split = iterator.next();
 
-        if (attempt > 0) {
-          sleepUninterruptibly(100, MILLISECONDS);
+      try {
+        var tablet = tabLocator.findTablet(context, split, false, 
LocationNeed.NOT_REQUIRED);
+        if (tablet == null) {
+          context.requireTableExists(tableId, tableName);

Review Comment:
   Are you checking this inside the loop in case the table is deleted by 
something else concurrently?



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java:
##########
@@ -435,198 +432,143 @@ String doFateOperation(FateOperation op, 
List<ByteBuffer> args, Map<String,Strin
     }
   }
 
-  private static class SplitEnv {
-    private final String tableName;
-    private final TableId tableId;
-    private final ExecutorService executor;
-    private final CountDownLatch latch;
-    private final AtomicReference<Exception> exception;
-
-    SplitEnv(String tableName, TableId tableId, ExecutorService executor, 
CountDownLatch latch,
-        AtomicReference<Exception> exception) {
-      this.tableName = tableName;
-      this.tableId = tableId;
-      this.executor = executor;
-      this.latch = latch;
-      this.exception = exception;
-    }
-  }
-
-  private class SplitTask implements Runnable {
-
-    private List<Text> splits;
-    private SplitEnv env;
-
-    SplitTask(SplitEnv env, List<Text> splits) {
-      this.env = env;
-      this.splits = splits;
-    }
+  /**
+   * On the server side the fate operation will exit w/o an error if the 
tablet requested to split
+   * does not exist. When this happens it will also return an empty string. In 
the case where the
+   * fate operation successfully splits the tablet it will return the 
following string. This code
+   * uses this return value to see if it needs to retry finding the tablet.
+   */
+  public static final String SPLIT_SUCCESS_MSG = "SPLIT_SUCCEEDED";
 
-    @Override
-    public void run() {
-      try {
-        if (env.exception.get() != null) {
-          return;
-        }
+  @Override
+  public void addSplits(String tableName, SortedSet<Text> splits)
+      throws AccumuloException, TableNotFoundException, 
AccumuloSecurityException {
 
-        if (splits.size() <= 2) {
-          addSplits(env, new TreeSet<>(splits));
-          splits.forEach(s -> env.latch.countDown());
-          return;
-        }
+    EXISTING_TABLE_NAME.validate(tableName);
 
-        int mid = splits.size() / 2;
+    TableId tableId = context.getTableId(tableName);
 
-        // split the middle split point to ensure that child task split
-        // different tablets and can therefore run in parallel
-        addSplits(env, new TreeSet<>(splits.subList(mid, mid + 1)));
-        env.latch.countDown();
+    // TODO should there be a server side check for this?
+    context.requireNotOffline(tableId, tableName);
 
-        env.executor.execute(new SplitTask(env, splits.subList(0, mid)));
-        env.executor.execute(new SplitTask(env, splits.subList(mid + 1, 
splits.size())));
+    ClientTabletCache tabLocator = ClientTabletCache.getInstance(context, 
tableId);
 
-      } catch (Exception t) {
-        env.exception.compareAndSet(null, t);
-      }
-    }
+    SortedSet<Text> splitsTodo = new TreeSet<>(splits);
+    ExecutorService executor = context.threadPools().createFixedThreadPool(16, 
"addSplits", false);
+    try {
+      while (!splitsTodo.isEmpty()) {
 
-  }
+        tabLocator.invalidateCache();
 
-  @Override
-  public void addSplits(String tableName, SortedSet<Text> partitionKeys)
-      throws TableNotFoundException, AccumuloException, 
AccumuloSecurityException {
-    EXISTING_TABLE_NAME.validate(tableName);
+        Map<KeyExtent,List<Text>> tabletSplits =
+            mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo);
 
-    TableId tableId = context.getTableId(tableName);
-    List<Text> splits = new ArrayList<>(partitionKeys);
+        List<Future<List<Text>>> splitTasks = new ArrayList<>();
 
-    // should be sorted because we copied from a sorted set, but that makes
-    // assumptions about how the copy was done so resort to be sure.
-    Collections.sort(splits);
-    CountDownLatch latch = new CountDownLatch(splits.size());
-    AtomicReference<Exception> exception = new AtomicReference<>(null);
+        for (Entry<KeyExtent,List<Text>> splitsForTablet : 
tabletSplits.entrySet()) {
+          Callable<List<Text>> splitTask = createSplitTask(tableName, 
splitsForTablet);
+          splitTasks.add(executor.submit(splitTask));
+        }
 
-    ExecutorService executor = context.threadPools().createFixedThreadPool(16, 
"addSplits", false);
-    try {
-      executor.execute(
-          new SplitTask(new SplitEnv(tableName, tableId, executor, latch, 
exception), splits));
-
-      while (!latch.await(100, MILLISECONDS)) {
-        if (exception.get() != null) {
-          executor.shutdownNow();
-          Throwable excep = exception.get();
-          // Below all exceptions are wrapped and rethrown. This is done so 
that the user knows what
-          // code path got them here. If the wrapping was not done, the
-          // user would only have the stack trace for the background thread.
-          if (excep instanceof TableNotFoundException) {
-            TableNotFoundException tnfe = (TableNotFoundException) excep;
-            throw new TableNotFoundException(tableId.canonical(), tableName,
-                "Table not found by background thread", tnfe);
-          } else if (excep instanceof TableOfflineException) {
-            log.debug("TableOfflineException occurred in background thread. 
Throwing new exception",
-                excep);
-            throw new TableOfflineException(tableId, tableName);
-          } else if (excep instanceof AccumuloSecurityException) {
-            // base == background accumulo security exception
-            AccumuloSecurityException base = (AccumuloSecurityException) excep;
-            throw new AccumuloSecurityException(base.getUser(), 
base.asThriftException().getCode(),
-                base.getTableInfo(), excep);
-          } else if (excep instanceof AccumuloServerException) {
-            throw new AccumuloServerException((AccumuloServerException) excep);
-          } else if (excep instanceof Error) {
-            throw new Error(excep);
-          } else {
-            throw new AccumuloException(excep);
+        for (var future : splitTasks) {
+          try {
+            var completedSplits = future.get();
+            completedSplits.forEach(splitsTodo::remove);
+          } catch (ExecutionException ee) {
+            Throwable excep = ee.getCause();
+            // Below all exceptions are wrapped and rethrown. This is done so 
that the user knows
+            // what
+            // code path got them here. If the wrapping was not done, the user 
would only have the
+            // stack trace for the background thread.
+            if (excep instanceof TableNotFoundException) {
+              TableNotFoundException tnfe = (TableNotFoundException) excep;
+              throw new TableNotFoundException(tableId.canonical(), tableName,
+                  "Table not found by background thread", tnfe);
+            } else if (excep instanceof TableOfflineException) {
+              log.debug(
+                  "TableOfflineException occurred in background thread. 
Throwing new exception",
+                  excep);
+              throw new TableOfflineException(tableId, tableName);
+            } else if (excep instanceof AccumuloSecurityException) {
+              // base == background accumulo security exception
+              AccumuloSecurityException base = (AccumuloSecurityException) 
excep;
+              throw new AccumuloSecurityException(base.getUser(),
+                  base.asThriftException().getCode(), base.getTableInfo(), 
excep);
+            } else if (excep instanceof AccumuloServerException) {
+              throw new AccumuloServerException((AccumuloServerException) 
excep);
+            } else {
+              throw new AccumuloException(excep);
+            }
+          } catch (InterruptedException e) {
+            throw new IllegalStateException(e);
           }
         }
       }
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(e);
     } finally {
-      executor.shutdown();
+      executor.shutdownNow();
     }
   }
 
-  private void addSplits(SplitEnv env, SortedSet<Text> partitionKeys)
-      throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException,
-      AccumuloServerException, InvalidTabletHostingRequestException {
-
-    ClientTabletCache tabLocator = ClientTabletCache.getInstance(context, 
env.tableId);
-    for (Text split : partitionKeys) {
-      boolean successful = false;
-      int attempt = 0;
-      long locationFailures = 0;
+  private Map<KeyExtent,List<Text>> mapSplitsToTablets(String tableName, 
TableId tableId,
+      ClientTabletCache tabLocator, SortedSet<Text> splitsTodo)
+      throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException {
+    Map<KeyExtent,List<Text>> tabletSplits = new HashMap<>();
 
-      while (!successful) {
+    var iterator = splitsTodo.iterator();
+    while (iterator.hasNext()) {
+      var split = iterator.next();
 
-        if (attempt > 0) {
-          sleepUninterruptibly(100, MILLISECONDS);
+      try {
+        var tablet = tabLocator.findTablet(context, split, false, 
LocationNeed.NOT_REQUIRED);
+        if (tablet == null) {
+          context.requireTableExists(tableId, tableName);
+          throw new IllegalStateException("Unable to find a tablet for split " 
+ split
+              + " int table " + tableName + " " + tableId);

Review Comment:
   ```suggestion
                 + " in table " + tableName + " " + tableId);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to