Manno15 commented on a change in pull request #2275:
URL: https://github.com/apache/accumulo/pull/2275#discussion_r712393544



##########
File path: 
test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
##########
@@ -160,7 +212,231 @@ public void testBadSelector() throws Exception {
   }
 
   @Test
-  public void test() throws Exception {
+  public void testCompactionWithTableIterator() throws Exception {
+    String table1 = this.getUniqueNames(1)[0];
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      client.tableOperations().create(table1);
+      try (BatchWriter bw = client.createBatchWriter(table1)) {
+        for (int i = 1; i <= 4; i++) {
+          Mutation m = new Mutation(Integer.toString(i));
+          m.put("cf", "cq", new Value());
+          bw.addMutation(m);
+          bw.flush();
+          client.tableOperations().flush(table1, null, null, true);
+        }
+      }
+
+      IteratorSetting setting = new IteratorSetting(50, "delete", 
DevNull.class);
+      client.tableOperations().attachIterator(table1, setting, 
EnumSet.of(IteratorScope.majc));
+      client.tableOperations().compact(table1, new 
CompactionConfig().setWait(true));
+
+      try (Scanner s = client.createScanner(table1)) {
+        assertFalse(s.iterator().hasNext());
+      }
+    }
+  }
+
+  @Test
+  public void testUserCompactionCancellation() throws Exception {
+    final String table1 = this.getUniqueNames(1)[0];
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      client.tableOperations().create(table1);
+      try (BatchWriter bw = client.createBatchWriter(table1)) {
+        for (int i = 1; i <= MAX_DATA; i++) {
+          Mutation m = new Mutation(Integer.toString(i));
+          m.put("cf", "cq", new Value());
+          bw.addMutation(m);
+          bw.flush();
+          client.tableOperations().flush(table1, null, null, true);
+        }
+      }
+
+      final AtomicReference<Exception> error = new AtomicReference<>();
+      final AtomicBoolean started = new AtomicBoolean(false);
+      Thread t = new Thread(() -> {
+        try {
+          started.set(true);
+          IteratorSetting setting = new IteratorSetting(50, "sleepy", 
SlowIterator.class);
+          setting.addOption("sleepTime", "3000");
+          setting.addOption("seekSleepTime", "3000");
+          client.tableOperations().attachIterator(table1, setting, 
EnumSet.of(IteratorScope.majc));
+          client.tableOperations().compact(table1, new 
CompactionConfig().setWait(true));
+        } catch (AccumuloSecurityException | TableNotFoundException | 
AccumuloException e) {
+          error.set(e);
+        }
+      });
+      t.start();
+      while (!started.get()) {
+        Thread.sleep(1000);
+      }
+      client.tableOperations().cancelCompaction(table1);
+      t.join();
+      Exception e = error.get();
+      assertNotNull(e);
+      assertEquals("Compaction canceled", e.getMessage());
+

Review comment:
       ```suggestion
   ```

##########
File path: 
test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
##########
@@ -160,7 +212,231 @@ public void testBadSelector() throws Exception {
   }
 
   @Test
-  public void test() throws Exception {
+  public void testCompactionWithTableIterator() throws Exception {
+    String table1 = this.getUniqueNames(1)[0];
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      client.tableOperations().create(table1);
+      try (BatchWriter bw = client.createBatchWriter(table1)) {
+        for (int i = 1; i <= 4; i++) {
+          Mutation m = new Mutation(Integer.toString(i));
+          m.put("cf", "cq", new Value());
+          bw.addMutation(m);
+          bw.flush();
+          client.tableOperations().flush(table1, null, null, true);
+        }
+      }
+
+      IteratorSetting setting = new IteratorSetting(50, "delete", 
DevNull.class);
+      client.tableOperations().attachIterator(table1, setting, 
EnumSet.of(IteratorScope.majc));
+      client.tableOperations().compact(table1, new 
CompactionConfig().setWait(true));
+
+      try (Scanner s = client.createScanner(table1)) {
+        assertFalse(s.iterator().hasNext());
+      }
+    }
+  }
+
+  @Test
+  public void testUserCompactionCancellation() throws Exception {
+    final String table1 = this.getUniqueNames(1)[0];
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      client.tableOperations().create(table1);
+      try (BatchWriter bw = client.createBatchWriter(table1)) {
+        for (int i = 1; i <= MAX_DATA; i++) {
+          Mutation m = new Mutation(Integer.toString(i));
+          m.put("cf", "cq", new Value());
+          bw.addMutation(m);
+          bw.flush();
+          client.tableOperations().flush(table1, null, null, true);
+        }
+      }
+
+      final AtomicReference<Exception> error = new AtomicReference<>();
+      final AtomicBoolean started = new AtomicBoolean(false);
+      Thread t = new Thread(() -> {
+        try {
+          started.set(true);
+          IteratorSetting setting = new IteratorSetting(50, "sleepy", 
SlowIterator.class);
+          setting.addOption("sleepTime", "3000");
+          setting.addOption("seekSleepTime", "3000");
+          client.tableOperations().attachIterator(table1, setting, 
EnumSet.of(IteratorScope.majc));
+          client.tableOperations().compact(table1, new 
CompactionConfig().setWait(true));
+        } catch (AccumuloSecurityException | TableNotFoundException | 
AccumuloException e) {
+          error.set(e);
+        }
+      });
+      t.start();
+      while (!started.get()) {
+        Thread.sleep(1000);
+      }
+      client.tableOperations().cancelCompaction(table1);
+      t.join();
+      Exception e = error.get();
+      assertNotNull(e);
+      assertEquals("Compaction canceled", e.getMessage());
+
+    }
+  }
+
+  @Test
+  public void testTableDeletedDuringUserCompaction() throws Exception {
+    final String table1 = this.getUniqueNames(1)[0];
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      client.tableOperations().create(table1);
+      try (BatchWriter bw = client.createBatchWriter(table1)) {
+        for (int i = 1; i <= MAX_DATA; i++) {
+          Mutation m = new Mutation(Integer.toString(i));
+          m.put("cf", "cq", new Value());
+          bw.addMutation(m);
+          bw.flush();
+          client.tableOperations().flush(table1, null, null, true);
+        }
+      }
+
+      final AtomicReference<Exception> error = new AtomicReference<>();
+      final AtomicBoolean started = new AtomicBoolean(false);
+      Thread t = new Thread(() -> {
+        try {
+          started.set(true);
+          IteratorSetting setting = new IteratorSetting(50, "sleepy", 
SlowIterator.class);
+          setting.addOption("sleepTime", "3000");
+          setting.addOption("seekSleepTime", "3000");
+          client.tableOperations().attachIterator(table1, setting, 
EnumSet.of(IteratorScope.majc));
+          client.tableOperations().compact(table1, new 
CompactionConfig().setWait(true));
+        } catch (AccumuloSecurityException | TableNotFoundException | 
AccumuloException e) {
+          error.set(e);
+        }
+      });
+      t.start();
+      while (!started.get()) {
+        Thread.sleep(1000);
+      }
+      client.tableOperations().delete(table1);
+      t.join();
+      Exception e = error.get();
+      assertNotNull(e);
+      assertEquals("Compaction canceled", e.getMessage());
+

Review comment:
       ```suggestion
   ```

##########
File path: 
test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
##########
@@ -160,7 +212,231 @@ public void testBadSelector() throws Exception {
   }
 
   @Test
-  public void test() throws Exception {
+  public void testCompactionWithTableIterator() throws Exception {
+    String table1 = this.getUniqueNames(1)[0];
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      client.tableOperations().create(table1);
+      try (BatchWriter bw = client.createBatchWriter(table1)) {
+        for (int i = 1; i <= 4; i++) {
+          Mutation m = new Mutation(Integer.toString(i));
+          m.put("cf", "cq", new Value());
+          bw.addMutation(m);
+          bw.flush();
+          client.tableOperations().flush(table1, null, null, true);
+        }
+      }
+
+      IteratorSetting setting = new IteratorSetting(50, "delete", 
DevNull.class);
+      client.tableOperations().attachIterator(table1, setting, 
EnumSet.of(IteratorScope.majc));
+      client.tableOperations().compact(table1, new 
CompactionConfig().setWait(true));
+
+      try (Scanner s = client.createScanner(table1)) {
+        assertFalse(s.iterator().hasNext());
+      }
+    }
+  }
+
+  @Test
+  public void testUserCompactionCancellation() throws Exception {
+    final String table1 = this.getUniqueNames(1)[0];
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      client.tableOperations().create(table1);
+      try (BatchWriter bw = client.createBatchWriter(table1)) {
+        for (int i = 1; i <= MAX_DATA; i++) {
+          Mutation m = new Mutation(Integer.toString(i));
+          m.put("cf", "cq", new Value());
+          bw.addMutation(m);
+          bw.flush();
+          client.tableOperations().flush(table1, null, null, true);
+        }
+      }
+
+      final AtomicReference<Exception> error = new AtomicReference<>();
+      final AtomicBoolean started = new AtomicBoolean(false);
+      Thread t = new Thread(() -> {
+        try {
+          started.set(true);
+          IteratorSetting setting = new IteratorSetting(50, "sleepy", 
SlowIterator.class);
+          setting.addOption("sleepTime", "3000");
+          setting.addOption("seekSleepTime", "3000");
+          client.tableOperations().attachIterator(table1, setting, 
EnumSet.of(IteratorScope.majc));
+          client.tableOperations().compact(table1, new 
CompactionConfig().setWait(true));
+        } catch (AccumuloSecurityException | TableNotFoundException | 
AccumuloException e) {
+          error.set(e);
+        }
+      });
+      t.start();
+      while (!started.get()) {
+        Thread.sleep(1000);
+      }
+      client.tableOperations().cancelCompaction(table1);
+      t.join();
+      Exception e = error.get();
+      assertNotNull(e);
+      assertEquals("Compaction canceled", e.getMessage());
+
+    }
+  }
+
+  @Test
+  public void testTableDeletedDuringUserCompaction() throws Exception {
+    final String table1 = this.getUniqueNames(1)[0];
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      client.tableOperations().create(table1);
+      try (BatchWriter bw = client.createBatchWriter(table1)) {
+        for (int i = 1; i <= MAX_DATA; i++) {
+          Mutation m = new Mutation(Integer.toString(i));
+          m.put("cf", "cq", new Value());
+          bw.addMutation(m);
+          bw.flush();
+          client.tableOperations().flush(table1, null, null, true);
+        }
+      }
+
+      final AtomicReference<Exception> error = new AtomicReference<>();
+      final AtomicBoolean started = new AtomicBoolean(false);
+      Thread t = new Thread(() -> {
+        try {
+          started.set(true);
+          IteratorSetting setting = new IteratorSetting(50, "sleepy", 
SlowIterator.class);
+          setting.addOption("sleepTime", "3000");
+          setting.addOption("seekSleepTime", "3000");
+          client.tableOperations().attachIterator(table1, setting, 
EnumSet.of(IteratorScope.majc));
+          client.tableOperations().compact(table1, new 
CompactionConfig().setWait(true));
+        } catch (AccumuloSecurityException | TableNotFoundException | 
AccumuloException e) {
+          error.set(e);
+        }
+      });
+      t.start();
+      while (!started.get()) {
+        Thread.sleep(1000);
+      }
+      client.tableOperations().delete(table1);
+      t.join();
+      Exception e = error.get();
+      assertNotNull(e);
+      assertEquals("Compaction canceled", e.getMessage());
+
+    }
+  }
+
+  @Test
+  public void testPartialCompaction() throws Exception {
+    String tableName = getUniqueNames(1)[0];
+    try (final AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+
+      client.tableOperations().create(tableName);
+
+      // Insert MAX_DATA rows
+      try (BatchWriter bw = client.createBatchWriter(tableName)) {
+        for (int i = 0; i < MAX_DATA; i++) {
+          Mutation m = new Mutation(String.format("r:%04d", i));
+          m.put("", "", "" + i);
+          bw.addMutation(m);
+        }
+      }
+      client.tableOperations().flush(tableName);
+      IteratorSetting iterSetting = new IteratorSetting(100, TestFilter.class);
+      // make sure iterator options make it to compactor process
+      iterSetting.addOption("modulus", 17 + "");
+      CompactionConfig config =
+          new 
CompactionConfig().setIterators(List.of(iterSetting)).setWait(true);
+      client.tableOperations().compact(tableName, config);
+
+      // Insert 2 * MAX_DATA rows
+      try (BatchWriter bw = client.createBatchWriter(tableName)) {
+        for (int i = MAX_DATA; i < MAX_DATA * 2; i++) {
+          Mutation m = new Mutation(String.format("r:%04d", i));
+          m.put("", "", "" + i);
+          bw.addMutation(m);
+        }
+      }
+      // this should create an F file
+      client.tableOperations().flush(tableName);
+
+      // run a compaction that only compacts F files
+      iterSetting = new IteratorSetting(100, TestFilter.class);
+      // compact F file w/ different modulus and user pmodulus option for 
partial compaction
+      iterSetting.addOption("pmodulus", 19 + "");
+      config = new 
CompactionConfig().setIterators(List.of(iterSetting)).setWait(true)
+          .setSelector(new PluginConfig(FSelector.class.getName()));
+      client.tableOperations().compact(tableName, config);
+
+      try (Scanner scanner = client.createScanner(tableName)) {
+        int count = 0;
+        for (Entry<Key,Value> entry : scanner) {
+
+          int v = Integer.parseInt(entry.getValue().toString());
+          int modulus = v < MAX_DATA ? 17 : 19;
+
+          assertTrue(String.format("%s %s %d != 0", entry.getValue(), "%", 
modulus),
+              Integer.parseInt(entry.getValue().toString()) % modulus == 0);

Review comment:
       Doesn't matter too much but this could be converted to an assertEquals




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to