cshannon commented on code in PR #3954:
URL: https://github.com/apache/accumulo/pull/3954#discussion_r1397727130


##########
server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java:
##########
@@ -84,111 +89,97 @@ public void upgradeZookeeper(@NonNull ServerContext 
context) {
   public void upgradeRoot(@NonNull ServerContext context) {
     log.debug("Upgrade root: upgrading to data version {}", 
METADATA_FILE_JSON_ENCODING);
     var rootName = Ample.DataLevel.METADATA.metaTable();
-    processReferences(context, rootName);
+    // not using ample to avoid StoredTabletFile because old file ref is 
incompatible
+    try (AccumuloClient c = 
Accumulo.newClient().from(context.getProperties()).build();
+        BatchWriter batchWriter = c.createBatchWriter(rootName); Scanner 
scanner =
+            new IsolatedScanner(context.createScanner(rootName, 
Authorizations.EMPTY))) {
+      processReferences(batchWriter, scanner, rootName);
+    } catch (TableNotFoundException ex) {
+      throw new IllegalStateException("Failed to find table " + rootName, ex);
+    } catch (MutationsRejectedException mex) {
+      log.warn("Failed to update reference for table: " + rootName);
+      log.warn("Constraint violations: {}", 
mex.getConstraintViolationSummaries());
+      throw new IllegalStateException("Failed to process table: " + rootName, 
mex);
+    }
   }
 
   @Override
   public void upgradeMetadata(@NonNull ServerContext context) {
     log.debug("Upgrade metadata: upgrading to data version {}", 
METADATA_FILE_JSON_ENCODING);
     var metaName = Ample.DataLevel.USER.metaTable();
-    processReferences(context, metaName);
+    try (AccumuloClient c = 
Accumulo.newClient().from(context.getProperties()).build();
+        BatchWriter batchWriter = c.createBatchWriter(metaName); Scanner 
scanner =
+            new IsolatedScanner(context.createScanner(metaName, 
Authorizations.EMPTY))) {
+      processReferences(batchWriter, scanner, metaName);
+    } catch (TableNotFoundException ex) {
+      throw new IllegalStateException("Failed to find table " + metaName, ex);
+    } catch (MutationsRejectedException mex) {
+      log.warn("Failed to update reference for table: " + metaName);
+      log.warn("Constraint violations: {}", 
mex.getConstraintViolationSummaries());
+      throw new IllegalStateException("Failed to process table: " + metaName, 
mex);
+    }
   }
 
-  private void processReferences(ServerContext context, String tableName) {
-    // not using ample to avoid StoredTabletFile because old file ref is 
incompatible
-    try (AccumuloClient c = 
Accumulo.newClient().from(context.getProperties()).build();
-        BatchWriter batchWriter = c.createBatchWriter(tableName); Scanner 
scanner =
-            new IsolatedScanner(context.createScanner(tableName, 
Authorizations.EMPTY))) {
-
-      scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
-      scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
-      scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
-      scanner.forEach((k, v) -> {
-        var family = k.getColumnFamily();
+  void processReferences(BatchWriter batchWriter, Scanner scanner, String 
tableName) {
+    scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+    scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+    scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
+    try {
+      Mutation update = null;
+      for (Map.Entry<Key,Value> entry : scanner) {
+        Key key = entry.getKey();
+        Value value = entry.getValue();
+
+        // on new row, write current mutation and prepare a new one.
+        Text r = key.getRow();
+        if (update == null) {
+          update = new Mutation(r);
+        } else if (!Arrays.equals(update.getRow(), r.getBytes())) {
+          log.trace("table: {}, update: {}", tableName, update.prettyPrint());
+          batchWriter.addMutation(update);
+          update = new Mutation(r);
+        }
+
+        var family = key.getColumnFamily();
         if (family.equals(DataFileColumnFamily.NAME)) {
-          upgradeDataFileCF(k, v, batchWriter, tableName);
+          upgradeDataFileCF(key, value, update);
         } else if (family.equals(ChoppedColumnFamily.NAME)) {
-          removeChoppedCF(k, batchWriter, tableName);
+          log.warn(
+              "Deleting chopped reference from:{}. Previous split or delete 
may not have completed cleanly. Ref: {}",
+              tableName, key.getRow());
+          
update.at().family(ChoppedColumnFamily.STR_NAME).qualifier(ChoppedColumnFamily.STR_NAME)
+              .delete();
         } else if (family.equals(ExternalCompactionColumnFamily.NAME)) {
-          removeExternalCompactionCF(k, batchWriter, tableName);
+          log.debug(
+              "Deleting external compaction reference from:{}. Previous 
compaction may not have completed. Ref: {}",
+              tableName, key.getRow());
+          update.at().family(ExternalCompactionColumnFamily.NAME)
+              .qualifier(key.getColumnQualifier()).delete();
         } else {
           throw new IllegalStateException("Processing: " + tableName
               + " Received unexpected column family processing references: " + 
family);
         }
-      });
+      }
+      // send last mutation
+      if (update != null) {
+        log.trace("table: {}, update: {}", tableName, update.prettyPrint());
+        batchWriter.addMutation(update);

Review Comment:
   ```suggestion
           if (!update.getUpdates().isEmpty()) {
             batchWriter.addMutation(update);
           }
   ```
   
   Same as other comment



##########
server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java:
##########
@@ -84,111 +89,97 @@ public void upgradeZookeeper(@NonNull ServerContext 
context) {
   public void upgradeRoot(@NonNull ServerContext context) {
     log.debug("Upgrade root: upgrading to data version {}", 
METADATA_FILE_JSON_ENCODING);
     var rootName = Ample.DataLevel.METADATA.metaTable();
-    processReferences(context, rootName);
+    // not using ample to avoid StoredTabletFile because old file ref is 
incompatible
+    try (AccumuloClient c = 
Accumulo.newClient().from(context.getProperties()).build();
+        BatchWriter batchWriter = c.createBatchWriter(rootName); Scanner 
scanner =
+            new IsolatedScanner(context.createScanner(rootName, 
Authorizations.EMPTY))) {
+      processReferences(batchWriter, scanner, rootName);
+    } catch (TableNotFoundException ex) {
+      throw new IllegalStateException("Failed to find table " + rootName, ex);
+    } catch (MutationsRejectedException mex) {
+      log.warn("Failed to update reference for table: " + rootName);
+      log.warn("Constraint violations: {}", 
mex.getConstraintViolationSummaries());
+      throw new IllegalStateException("Failed to process table: " + rootName, 
mex);
+    }
   }
 
   @Override
   public void upgradeMetadata(@NonNull ServerContext context) {
     log.debug("Upgrade metadata: upgrading to data version {}", 
METADATA_FILE_JSON_ENCODING);
     var metaName = Ample.DataLevel.USER.metaTable();
-    processReferences(context, metaName);
+    try (AccumuloClient c = 
Accumulo.newClient().from(context.getProperties()).build();
+        BatchWriter batchWriter = c.createBatchWriter(metaName); Scanner 
scanner =
+            new IsolatedScanner(context.createScanner(metaName, 
Authorizations.EMPTY))) {
+      processReferences(batchWriter, scanner, metaName);
+    } catch (TableNotFoundException ex) {
+      throw new IllegalStateException("Failed to find table " + metaName, ex);
+    } catch (MutationsRejectedException mex) {
+      log.warn("Failed to update reference for table: " + metaName);
+      log.warn("Constraint violations: {}", 
mex.getConstraintViolationSummaries());
+      throw new IllegalStateException("Failed to process table: " + metaName, 
mex);
+    }
   }
 
-  private void processReferences(ServerContext context, String tableName) {
-    // not using ample to avoid StoredTabletFile because old file ref is 
incompatible
-    try (AccumuloClient c = 
Accumulo.newClient().from(context.getProperties()).build();
-        BatchWriter batchWriter = c.createBatchWriter(tableName); Scanner 
scanner =
-            new IsolatedScanner(context.createScanner(tableName, 
Authorizations.EMPTY))) {
-
-      scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
-      scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
-      scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
-      scanner.forEach((k, v) -> {
-        var family = k.getColumnFamily();
+  void processReferences(BatchWriter batchWriter, Scanner scanner, String 
tableName) {
+    scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+    scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+    scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
+    try {
+      Mutation update = null;
+      for (Map.Entry<Key,Value> entry : scanner) {
+        Key key = entry.getKey();
+        Value value = entry.getValue();
+
+        // on new row, write current mutation and prepare a new one.
+        Text r = key.getRow();
+        if (update == null) {
+          update = new Mutation(r);
+        } else if (!Arrays.equals(update.getRow(), r.getBytes())) {
+          log.trace("table: {}, update: {}", tableName, update.prettyPrint());
+          batchWriter.addMutation(update);

Review Comment:
   ```suggestion
           if (!update.getUpdates().isEmpty()) {
             batchWriter.addMutation(update);
           }
   ```
   
   I'm wondering if we need to check if the mutation is empty here before 
adding it. I don't _think_ this necessary the way things are currently written 
as we are only fetching 3 columns and each of those should cause a mutation to 
be added but it might not hurt to check since adding an empty mutation would 
throw an exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to