keith-turner commented on code in PR #3954:
URL: https://github.com/apache/accumulo/pull/3954#discussion_r1402430312


##########
server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader11to12Test.java:
##########
@@ -62,198 +67,334 @@ public class Upgrader11to12Test {
   private static final Logger LOG = 
LoggerFactory.getLogger(Upgrader11to12Test.class);
 
   @Test
-  void upgradeDataFileCFTest() throws Exception {
+  void upgradeDataFileCF2Test() {
     Upgrader11to12 upgrader = new Upgrader11to12();
 
-    BatchWriter bw = createMock(BatchWriter.class);
-    Capture<Mutation> capturedAdd = newCapture();
-    bw.addMutation(capture(capturedAdd));
-    expectLastCall();
-
-    Capture<Mutation> capturedDelete = newCapture();
-    bw.addMutation(capture(capturedDelete));
-    expectLastCall();
-
-    replay(bw);
-
     String fileName = 
"hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf";
     Key k = Key.builder().row(new 
Text("12;")).family(DataFileColumnFamily.NAME)
         .qualifier(new Text(fileName)).build();
     Value v = new Value("1234,5678");
 
-    upgrader.upgradeDataFileCF(k, v, bw, "aTable");
+    Mutation upgrade = new Mutation(k.getRow());
+    upgrader.upgradeDataFileCF(k, v, upgrade);
 
-    StoredTabletFile stf = StoredTabletFile.of(new Path(fileName));
-    Mutation add = new 
Mutation(k.getRow()).at().family(DataFileColumnFamily.NAME)
-        .qualifier(stf.getMetadataText()).put(v);
-    LOG.debug("add mutation to be expected: {}", add.prettyPrint());
+    var pending = upgrade.getUpdates();

Review Comment:
   could check the the size of pending is 2



##########
server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java:
##########
@@ -84,111 +91,100 @@ public void upgradeZookeeper(@NonNull ServerContext 
context) {
   public void upgradeRoot(@NonNull ServerContext context) {
     log.debug("Upgrade root: upgrading to data version {}", 
METADATA_FILE_JSON_ENCODING);
     var rootName = Ample.DataLevel.METADATA.metaTable();
-    processReferences(context, rootName);
+    // not using ample to avoid StoredTabletFile because old file ref is 
incompatible
+    try (AccumuloClient c = 
Accumulo.newClient().from(context.getProperties()).build();
+        BatchWriter batchWriter = c.createBatchWriter(rootName); Scanner 
scanner =
+            new IsolatedScanner(context.createScanner(rootName, 
Authorizations.EMPTY))) {
+      processReferences(batchWriter, scanner, rootName);
+    } catch (TableNotFoundException ex) {
+      throw new IllegalStateException("Failed to find table " + rootName, ex);
+    } catch (MutationsRejectedException mex) {
+      log.warn("Failed to update reference for table: " + rootName);
+      log.warn("Constraint violations: {}", 
mex.getConstraintViolationSummaries());
+      throw new IllegalStateException("Failed to process table: " + rootName, 
mex);
+    }
   }
 
   @Override
   public void upgradeMetadata(@NonNull ServerContext context) {
     log.debug("Upgrade metadata: upgrading to data version {}", 
METADATA_FILE_JSON_ENCODING);
     var metaName = Ample.DataLevel.USER.metaTable();
-    processReferences(context, metaName);
+    try (AccumuloClient c = 
Accumulo.newClient().from(context.getProperties()).build();
+        BatchWriter batchWriter = c.createBatchWriter(metaName); Scanner 
scanner =
+            new IsolatedScanner(context.createScanner(metaName, 
Authorizations.EMPTY))) {
+      processReferences(batchWriter, scanner, metaName);
+    } catch (TableNotFoundException ex) {
+      throw new IllegalStateException("Failed to find table " + metaName, ex);
+    } catch (MutationsRejectedException mex) {
+      log.warn("Failed to update reference for table: " + metaName);
+      log.warn("Constraint violations: {}", 
mex.getConstraintViolationSummaries());
+      throw new IllegalStateException("Failed to process table: " + metaName, 
mex);
+    }
   }
 
-  private void processReferences(ServerContext context, String tableName) {
-    // not using ample to avoid StoredTabletFile because old file ref is 
incompatible
-    try (AccumuloClient c = 
Accumulo.newClient().from(context.getProperties()).build();
-        BatchWriter batchWriter = c.createBatchWriter(tableName); Scanner 
scanner =
-            new IsolatedScanner(context.createScanner(tableName, 
Authorizations.EMPTY))) {
+  void processReferences(BatchWriter batchWriter, Scanner scanner, String 
tableName) {
+    scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+    scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+    scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
+    try {
+      Mutation update = null;
+      for (Map.Entry<Key,Value> entry : scanner) {
+        Key key = entry.getKey();
+        Value value = entry.getValue();
+        Preconditions.checkState(key.getColumnVisibilityData().length() == 0,
+            "Expected empty visibility, saw %s ", 
key.getColumnVisibilityData());
+        // on new row, write current mutation and prepare a new one.
+        Text r = key.getRow();
+        if (update == null) {
+          update = new Mutation(r);
+        } else if (!Arrays.equals(update.getRow(), TextUtil.getBytes(r))) {
+          log.trace("table: {}, update: {}", tableName, update.prettyPrint());

Review Comment:
   May want to wrap this in `if(is trace enabled)` because 
`update.prettyPrint()` function will do work to compute a string.



##########
server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader11to12Test.java:
##########
@@ -62,198 +67,334 @@ public class Upgrader11to12Test {
   private static final Logger LOG = 
LoggerFactory.getLogger(Upgrader11to12Test.class);
 
   @Test
-  void upgradeDataFileCFTest() throws Exception {
+  void upgradeDataFileCF2Test() {
     Upgrader11to12 upgrader = new Upgrader11to12();
 
-    BatchWriter bw = createMock(BatchWriter.class);
-    Capture<Mutation> capturedAdd = newCapture();
-    bw.addMutation(capture(capturedAdd));
-    expectLastCall();
-
-    Capture<Mutation> capturedDelete = newCapture();
-    bw.addMutation(capture(capturedDelete));
-    expectLastCall();
-
-    replay(bw);
-
     String fileName = 
"hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf";
     Key k = Key.builder().row(new 
Text("12;")).family(DataFileColumnFamily.NAME)
         .qualifier(new Text(fileName)).build();
     Value v = new Value("1234,5678");
 
-    upgrader.upgradeDataFileCF(k, v, bw, "aTable");
+    Mutation upgrade = new Mutation(k.getRow());
+    upgrader.upgradeDataFileCF(k, v, upgrade);
 
-    StoredTabletFile stf = StoredTabletFile.of(new Path(fileName));
-    Mutation add = new 
Mutation(k.getRow()).at().family(DataFileColumnFamily.NAME)
-        .qualifier(stf.getMetadataText()).put(v);
-    LOG.debug("add mutation to be expected: {}", add.prettyPrint());
+    var pending = upgrade.getUpdates();
 
-    Mutation delete = new 
Mutation(k.getRow()).at().family(DataFileColumnFamily.NAME)
-        .qualifier(new Text(fileName)).delete();
-    LOG.debug("delete mutation to be expected: {}", delete.prettyPrint());
+    // leverage sort order for "expected" values
+    // check file entry converted is in the mutation
+    Iterator<ColumnUpdate> m = pending.iterator();
+    var cu1 = m.next();
+    assertEquals("file", new Text(cu1.getColumnFamily()).toString());
 
-    assertEquals(add, capturedAdd.getValue());
-    assertEquals(delete, capturedDelete.getValue());
+    StoredTabletFile oldFileEntry = StoredTabletFile.of(new Path(fileName));
+    StoredTabletFile updateEnry = StoredTabletFile.of(new 
String(cu1.getColumnQualifier(), UTF_8));
 
-    verify(bw);
-  }
+    assertEquals(oldFileEntry, updateEnry);
+    assertFalse(cu1.isDeleted());
 
-  @Test
-  void upgradeDataFileCFSkipConvertedTest() {
-    Upgrader11to12 upgrader = new Upgrader11to12();
+    // check old file entry is deleted is in the mutation
 
-    BatchWriter bw = createMock(BatchWriter.class);
+    var cu2 = m.next();
+    assertEquals("file", new Text(cu1.getColumnFamily()).toString());
+    assertEquals(fileName, new String(cu2.getColumnQualifier(), UTF_8));
+    assertTrue(cu2.isDeleted());
 
-    replay(bw);
+  }
 
-    String fileName = 
"hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf";
-    StoredTabletFile stf = StoredTabletFile.of(new Path(fileName));
+  @Test
+  public void processReferencesTest() throws Exception {
+    BatchWriter batchWriter = mock(BatchWriter.class);
+    Capture<Mutation> capturedUpdate1 = newCapture();
+    batchWriter.addMutation(capture(capturedUpdate1));
+    expectLastCall().once();
+
+    Capture<Mutation> capturedUpdate2 = newCapture();
+    batchWriter.addMutation(capture(capturedUpdate2));
+    expectLastCall().once();
+
+    // create sample data "served" by the mocked scanner
+    TreeMap<Key,Value> scanData = new TreeMap<>();
+    Text row1 = new Text("123");
+
+    String fileName1 = 
"hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf";
+    Key key1 =
+        
Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName1).build();
+    Value value1 = new Value("123,456");
+    scanData.put(key1, value1);
+
+    String fileName2 = 
"hdfs://localhost:8020/accumulo/tables/12/default_tablet/B000000v.rf";
+    Key key2 =
+        
Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName2).build();
+    Value value2 = new Value("321,654");
+    scanData.put(key2, value2);
+
+    Key chop1 = Key.builder(false).row(row1).family(ChoppedColumnFamily.NAME)
+        .qualifier(ChoppedColumnFamily.NAME).build();
+    scanData.put(chop1, null);
+
+    Key extern1 = 
Key.builder(false).row(row1).family(ExternalCompactionColumnFamily.NAME)
+        .qualifier(ExternalCompactionColumnFamily.NAME).build();
+    scanData.put(extern1, null);
 
-    Key k = Key.builder().row(new 
Text("12;")).family(DataFileColumnFamily.NAME)
-        .qualifier(stf.getMetadataText()).build();
-    Value v = new Value("1234,5678");
+    Text row2 = new Text("234");
 
-    upgrader.upgradeDataFileCF(k, v, bw, "aTable");
+    String fileName3 = 
"hdfs://localhost:8020/accumulo/tables/13/default_tablet/C000000v.rf";
+    Key key3 =
+        
Key.builder(false).row(row2).family(DataFileColumnFamily.NAME).qualifier(fileName3).build();
+    Value value3 = new Value("1,2");
+    scanData.put(key3, value3);
 
-    // with file entry in correct formation, no mutations are expected.
-    verify(bw);
-  }
+    Scanner scanner = mock(Scanner.class);
+    scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+    expectLastCall();
+    scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+    expectLastCall();
+    scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
+    expectLastCall();
+
+    
expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once();
+    replay(batchWriter, scanner);
 
-  @Test
-  void upgradeDataFileCFInvalidMutationTest() throws Exception {
     Upgrader11to12 upgrader = new Upgrader11to12();
+    upgrader.processReferences(batchWriter, scanner, "accumulo.metadata");
 
-    BatchWriter bw = createMock(BatchWriter.class);
-    Capture<Mutation> capturedAdd = newCapture();
-    bw.addMutation(capture(capturedAdd));
-    expectLastCall().andThrow(new MutationsRejectedException(null, List.of(), 
Map.of(), List.of(),
-        0, new NullPointerException()));
+    LOG.info("c:{}", capturedUpdate1.getValue().prettyPrint());
+    var u1 = capturedUpdate1.getValue();
+    // 2 file add, 2 file delete. 1 chop delete, 1 ext comp delete
+    assertEquals(6, u1.getUpdates().size());
 
-    replay(bw);
+    LOG.info("c:{}", capturedUpdate2.getValue().prettyPrint());
+    var u2 = capturedUpdate2.getValue();
+    // 1 add, 1 delete
+    assertEquals(2, u2.getUpdates().size());
 
-    String fileName = 
"hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf";
-    Key k = Key.builder().row(new 
Text("12;")).family(DataFileColumnFamily.NAME)
-        .qualifier(new Text(fileName)).build();
-    Value v = new Value("1234,5678");
+    verify(batchWriter, scanner);
 
-    assertThrows(IllegalStateException.class, () -> 
upgrader.upgradeDataFileCF(k, v, bw, "aTable"));
-
-    verify(bw);
   }
 
   @Test
-  void upgradeDataFileCFInvalidPathTest() {
-    Upgrader11to12 upgrader = new Upgrader11to12();
-
-    BatchWriter bw = createMock(BatchWriter.class);
-
-    replay(bw);
+  public void skipConvertedFileTest() throws Exception {
+    BatchWriter batchWriter = mock(BatchWriter.class);
+    Capture<Mutation> capturedUpdate1 = newCapture();
+    batchWriter.addMutation(capture(capturedUpdate1));
+    expectLastCall().once();
+    // create sample data "served" by the mocked scanner
+    TreeMap<Key,Value> scanData = new TreeMap<>();
+    Text row1 = new Text("123");
+
+    // reference already in expected form with fence info.
+    String fileName1 =
+        
"{\"path\":\"hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf\",\"startRow\":\"\",\"endRow\":\"\"}";
+    Key key1 =
+        
Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName1).build();
+    Value value1 = new Value("123,456");
+    scanData.put(key1, value1);
+
+    String fileName2 = 
"hdfs://localhost:8020/accumulo/tables/12/default_tablet/B000000v.rf";
+    Key key2 =
+        
Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName2).build();
+    Value value2 = new Value("321,654");
+    scanData.put(key2, value2);
+
+    Scanner scanner = mock(Scanner.class);
+    scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+    expectLastCall();
+    scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+    expectLastCall();
+    scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
+    expectLastCall();
 
-    String invalidPath = "badPath";
+    
expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once();
+    replay(batchWriter, scanner);
 
-    Key k = Key.builder().row(new 
Text("12;")).family(DataFileColumnFamily.NAME)
-        .qualifier(new Text(invalidPath)).build();
-    Value v = new Value("1234,5678");
+    Upgrader11to12 upgrader = new Upgrader11to12();
+    upgrader.processReferences(batchWriter, scanner, "accumulo.metadata");
 
-    assertThrows(IllegalArgumentException.class,
-        () -> upgrader.upgradeDataFileCF(k, v, bw, "aTable"));
+    LOG.info("c:{}", capturedUpdate1.getValue().prettyPrint());
+    var u1 = capturedUpdate1.getValue();
+    // 1 add, 1 delete
+    assertEquals(2, u1.getUpdates().size());

Review Comment:
   could also stream and count deletes here



##########
server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader11to12Test.java:
##########
@@ -62,198 +67,334 @@ public class Upgrader11to12Test {
   private static final Logger LOG = 
LoggerFactory.getLogger(Upgrader11to12Test.class);
 
   @Test
-  void upgradeDataFileCFTest() throws Exception {
+  void upgradeDataFileCF2Test() {
     Upgrader11to12 upgrader = new Upgrader11to12();
 
-    BatchWriter bw = createMock(BatchWriter.class);
-    Capture<Mutation> capturedAdd = newCapture();
-    bw.addMutation(capture(capturedAdd));
-    expectLastCall();
-
-    Capture<Mutation> capturedDelete = newCapture();
-    bw.addMutation(capture(capturedDelete));
-    expectLastCall();
-
-    replay(bw);
-
     String fileName = 
"hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf";
     Key k = Key.builder().row(new 
Text("12;")).family(DataFileColumnFamily.NAME)
         .qualifier(new Text(fileName)).build();
     Value v = new Value("1234,5678");
 
-    upgrader.upgradeDataFileCF(k, v, bw, "aTable");
+    Mutation upgrade = new Mutation(k.getRow());
+    upgrader.upgradeDataFileCF(k, v, upgrade);
 
-    StoredTabletFile stf = StoredTabletFile.of(new Path(fileName));
-    Mutation add = new 
Mutation(k.getRow()).at().family(DataFileColumnFamily.NAME)
-        .qualifier(stf.getMetadataText()).put(v);
-    LOG.debug("add mutation to be expected: {}", add.prettyPrint());
+    var pending = upgrade.getUpdates();
 
-    Mutation delete = new 
Mutation(k.getRow()).at().family(DataFileColumnFamily.NAME)
-        .qualifier(new Text(fileName)).delete();
-    LOG.debug("delete mutation to be expected: {}", delete.prettyPrint());
+    // leverage sort order for "expected" values
+    // check file entry converted is in the mutation
+    Iterator<ColumnUpdate> m = pending.iterator();
+    var cu1 = m.next();
+    assertEquals("file", new Text(cu1.getColumnFamily()).toString());
 
-    assertEquals(add, capturedAdd.getValue());
-    assertEquals(delete, capturedDelete.getValue());
+    StoredTabletFile oldFileEntry = StoredTabletFile.of(new Path(fileName));
+    StoredTabletFile updateEnry = StoredTabletFile.of(new 
String(cu1.getColumnQualifier(), UTF_8));
 
-    verify(bw);
-  }
+    assertEquals(oldFileEntry, updateEnry);
+    assertFalse(cu1.isDeleted());
 
-  @Test
-  void upgradeDataFileCFSkipConvertedTest() {
-    Upgrader11to12 upgrader = new Upgrader11to12();
+    // check old file entry is deleted is in the mutation
 
-    BatchWriter bw = createMock(BatchWriter.class);
+    var cu2 = m.next();
+    assertEquals("file", new Text(cu1.getColumnFamily()).toString());
+    assertEquals(fileName, new String(cu2.getColumnQualifier(), UTF_8));
+    assertTrue(cu2.isDeleted());
 
-    replay(bw);
+  }
 
-    String fileName = 
"hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf";
-    StoredTabletFile stf = StoredTabletFile.of(new Path(fileName));
+  @Test
+  public void processReferencesTest() throws Exception {
+    BatchWriter batchWriter = mock(BatchWriter.class);
+    Capture<Mutation> capturedUpdate1 = newCapture();
+    batchWriter.addMutation(capture(capturedUpdate1));
+    expectLastCall().once();
+
+    Capture<Mutation> capturedUpdate2 = newCapture();
+    batchWriter.addMutation(capture(capturedUpdate2));
+    expectLastCall().once();
+
+    // create sample data "served" by the mocked scanner
+    TreeMap<Key,Value> scanData = new TreeMap<>();
+    Text row1 = new Text("123");
+
+    String fileName1 = 
"hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf";
+    Key key1 =
+        
Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName1).build();
+    Value value1 = new Value("123,456");
+    scanData.put(key1, value1);
+
+    String fileName2 = 
"hdfs://localhost:8020/accumulo/tables/12/default_tablet/B000000v.rf";
+    Key key2 =
+        
Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName2).build();
+    Value value2 = new Value("321,654");
+    scanData.put(key2, value2);
+
+    Key chop1 = Key.builder(false).row(row1).family(ChoppedColumnFamily.NAME)
+        .qualifier(ChoppedColumnFamily.NAME).build();
+    scanData.put(chop1, null);
+
+    Key extern1 = 
Key.builder(false).row(row1).family(ExternalCompactionColumnFamily.NAME)
+        .qualifier(ExternalCompactionColumnFamily.NAME).build();
+    scanData.put(extern1, null);
 
-    Key k = Key.builder().row(new 
Text("12;")).family(DataFileColumnFamily.NAME)
-        .qualifier(stf.getMetadataText()).build();
-    Value v = new Value("1234,5678");
+    Text row2 = new Text("234");
 
-    upgrader.upgradeDataFileCF(k, v, bw, "aTable");
+    String fileName3 = 
"hdfs://localhost:8020/accumulo/tables/13/default_tablet/C000000v.rf";
+    Key key3 =
+        
Key.builder(false).row(row2).family(DataFileColumnFamily.NAME).qualifier(fileName3).build();
+    Value value3 = new Value("1,2");
+    scanData.put(key3, value3);
 
-    // with file entry in correct formation, no mutations are expected.
-    verify(bw);
-  }
+    Scanner scanner = mock(Scanner.class);
+    scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+    expectLastCall();
+    scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+    expectLastCall();
+    scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
+    expectLastCall();
+
+    
expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once();
+    replay(batchWriter, scanner);
 
-  @Test
-  void upgradeDataFileCFInvalidMutationTest() throws Exception {
     Upgrader11to12 upgrader = new Upgrader11to12();
+    upgrader.processReferences(batchWriter, scanner, "accumulo.metadata");
 
-    BatchWriter bw = createMock(BatchWriter.class);
-    Capture<Mutation> capturedAdd = newCapture();
-    bw.addMutation(capture(capturedAdd));
-    expectLastCall().andThrow(new MutationsRejectedException(null, List.of(), 
Map.of(), List.of(),
-        0, new NullPointerException()));
+    LOG.info("c:{}", capturedUpdate1.getValue().prettyPrint());
+    var u1 = capturedUpdate1.getValue();
+    // 2 file add, 2 file delete. 1 chop delete, 1 ext comp delete
+    assertEquals(6, u1.getUpdates().size());
 
-    replay(bw);
+    LOG.info("c:{}", capturedUpdate2.getValue().prettyPrint());
+    var u2 = capturedUpdate2.getValue();
+    // 1 add, 1 delete
+    assertEquals(2, u2.getUpdates().size());

Review Comment:
   could also stream over the updates and filter deletes and then count and 
assert that count is 1
   ```suggestion
       assertEquals(2, u2.getUpdates().size());
       assertEquals(1, 
u2.getUpdates().stream().filter(cup->cup.isDeleted()).count());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to