keith-turner commented on a change in pull request #1366: Fix #1365 2.1 Upgrade processing for #1043 ~del URL: https://github.com/apache/accumulo/pull/1366#discussion_r326713563
########## File path: server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader9to10.java ########## @@ -352,4 +372,87 @@ MetadataTime computeRootTabletTime(ServerContext context, Collection<String> goo } } + static public void upgradeFileDeletes(ServerContext ctx, Ample.DataLevel level) { + + String tableName = level.metaTable(); + AccumuloClient c = ctx; + + // find all deletes + try (BatchWriter writer = c.createBatchWriter(tableName, new BatchWriterConfig())) { + String continuePoint = ""; + boolean stillDeletes = true; + + while (stillDeletes) { + List<String> deletes = new ArrayList<>(); + log.info("looking for candidates"); + stillDeletes = getOldCandidates(ctx, tableName, continuePoint, deletes); + log.info("found {} deletes to upgrade", deletes.size()); + for (String olddelete : deletes) { + // create new formatted delete + writer.addMutation(upgradeDeleteMutation(olddelete)); + } + writer.flush(); + + // if nothing thrown then we're good so mark all deleted + for (String olddelete : deletes) { + writer.addMutation(deleteOldDeleteMutation(olddelete)); + } + writer.flush(); + + // give it some time for memory to clean itself up if needed + sleepUninterruptibly(5, TimeUnit.SECONDS); + continuePoint = deletes.get(deletes.size() - 1); + log.debug("continuing from {}", continuePoint); + } + } catch (Exception e) { + ; + } + } + + static boolean getOldCandidates(ServerContext ctx, String tableName, String continuePoint, + List<String> result) throws TableNotFoundException { + + Range range = MetadataSchema.DeletesSection.getRange(); + if (continuePoint != null && !continuePoint.isEmpty()) { + String continueRow = OLD_DELETE_PREFIX + continuePoint; + range = new Range(new Key(continueRow).followingKey(PartialKey.ROW), true, range.getEndKey(), + range.isEndKeyInclusive()); + } + + Scanner scanner = ctx.createScanner(tableName, Authorizations.EMPTY); + scanner.setRange(range); + + // find old candidates for deletion; chop off the prefix + for (Map.Entry<Key,Value> entry : scanner) { + if (!entry.getValue().toString().equals(UPGRADED)) { + String cand = entry.getKey().getRow().toString().substring(OLD_DELETE_PREFIX.length()); + result.add(cand); + } + if (almostOutOfMemory(Runtime.getRuntime())) { + log.info("List of delete candidates has exceeded the memory" + + " threshold. Attempting to delete what has been gathered so far."); + return true; + } + } + return false; + } + + static private Mutation deleteOldDeleteMutation(final String delete) { + Mutation m = new Mutation(OLD_DELETE_PREFIX + delete); + m.putDelete(EMPTY_TEXT, EMPTY_TEXT); + return m; + } + + static private Mutation upgradeDeleteMutation(final String delete) { + // cannot use the Ample implementation because we want to modify the value field + Mutation m = new Mutation(MetadataSchema.DeletesSection.encodeRow(delete)); + m.put(EMPTY_TEXT, EMPTY_TEXT, new Value(UPGRADED)); // new Value(new byte[] {})); Review comment: Could the constant UPGRADE be a Value? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services