keith-turner commented on a change in pull request #1366: Fix #1365 2.1 Upgrade 
processing for #1043 ~del
URL: https://github.com/apache/accumulo/pull/1366#discussion_r326713563
 
 

 ##########
 File path: 
server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader9to10.java
 ##########
 @@ -352,4 +372,87 @@ MetadataTime computeRootTabletTime(ServerContext context, 
Collection<String> goo
     }
   }
 
+  static public void upgradeFileDeletes(ServerContext ctx, Ample.DataLevel 
level) {
+
+    String tableName = level.metaTable();
+    AccumuloClient c = ctx;
+
+    // find all deletes
+    try (BatchWriter writer = c.createBatchWriter(tableName, new 
BatchWriterConfig())) {
+      String continuePoint = "";
+      boolean stillDeletes = true;
+
+      while (stillDeletes) {
+        List<String> deletes = new ArrayList<>();
+        log.info("looking for candidates");
+        stillDeletes = getOldCandidates(ctx, tableName, continuePoint, 
deletes);
+        log.info("found {} deletes to upgrade", deletes.size());
+        for (String olddelete : deletes) {
+          // create new formatted delete
+          writer.addMutation(upgradeDeleteMutation(olddelete));
+        }
+        writer.flush();
+
+        // if nothing thrown then we're good so mark all deleted
+        for (String olddelete : deletes) {
+          writer.addMutation(deleteOldDeleteMutation(olddelete));
+        }
+        writer.flush();
+
+        // give it some time for memory to clean itself up if needed
+        sleepUninterruptibly(5, TimeUnit.SECONDS);
+        continuePoint = deletes.get(deletes.size() - 1);
+        log.debug("continuing from {}", continuePoint);
+      }
+    } catch (Exception e) {
+      ;
+    }
+  }
+
+  static boolean getOldCandidates(ServerContext ctx, String tableName, String 
continuePoint,
+      List<String> result) throws TableNotFoundException {
+
+    Range range = MetadataSchema.DeletesSection.getRange();
+    if (continuePoint != null && !continuePoint.isEmpty()) {
+      String continueRow = OLD_DELETE_PREFIX + continuePoint;
+      range = new Range(new Key(continueRow).followingKey(PartialKey.ROW), 
true, range.getEndKey(),
+          range.isEndKeyInclusive());
+    }
+
+    Scanner scanner = ctx.createScanner(tableName, Authorizations.EMPTY);
+    scanner.setRange(range);
+
+    // find old candidates for deletion; chop off the prefix
+    for (Map.Entry<Key,Value> entry : scanner) {
+      if (!entry.getValue().toString().equals(UPGRADED)) {
+        String cand = 
entry.getKey().getRow().toString().substring(OLD_DELETE_PREFIX.length());
+        result.add(cand);
+      }
+      if (almostOutOfMemory(Runtime.getRuntime())) {
+        log.info("List of delete candidates has exceeded the memory"
+            + " threshold. Attempting to delete what has been gathered so 
far.");
+        return true;
+      }
+    }
+    return false;
+  }
+
+  static private Mutation deleteOldDeleteMutation(final String delete) {
+    Mutation m = new Mutation(OLD_DELETE_PREFIX + delete);
+    m.putDelete(EMPTY_TEXT, EMPTY_TEXT);
+    return m;
+  }
+
+  static private Mutation upgradeDeleteMutation(final String delete) {
+    // cannot use the Ample implementation because we want to modify the value 
field
+    Mutation m = new Mutation(MetadataSchema.DeletesSection.encodeRow(delete));
+    m.put(EMPTY_TEXT, EMPTY_TEXT, new Value(UPGRADED)); // new Value(new 
byte[] {}));
 
 Review comment:
   Could the constant UPGRADE be a Value?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to