This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 5b1c0b5  Fix #1628 add test where selector throws an error (#2098)
5b1c0b5 is described below

commit 5b1c0b54e33ee108a45b2126bd194472703822f2
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Wed May 12 15:05:16 2021 -0400

    Fix #1628 add test where selector throws an error (#2098)
    
    
    Co-authored-by: Christopher Tubbs <ctubb...@apache.org>
---
 .../accumulo/test/functional/CompactionIT.java     | 91 ++++++++++++++++++++++
 .../test/functional/FunctionalTestUtils.java       | 15 ++++
 2 files changed, 106 insertions(+)

diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index e01fc39..a1b221d 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -18,9 +18,14 @@
  */
 package org.apache.accumulo.test.functional;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -28,8 +33,16 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.PluginConfig;
+import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
+import org.apache.accumulo.core.client.admin.compaction.CompactionSelector;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
@@ -48,9 +61,43 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 
 @SuppressWarnings("removal")
 public class CompactionIT extends AccumuloClusterHarness {
+
+  public static class RandomErrorThrowingSelector implements 
CompactionSelector {
+
+    public static final String FILE_LIST_PARAM = "filesToCompact";
+    private static Boolean ERROR_THROWN = Boolean.FALSE;
+
+    private List<String> filesToCompact;
+
+    @Override
+    public void init(InitParamaters iparams) {
+      String files = iparams.getOptions().get(FILE_LIST_PARAM);
+      Objects.requireNonNull(files);
+      String[] f = files.split(",");
+      filesToCompact = Lists.newArrayList(f);
+    }
+
+    @Override
+    public Selection select(SelectionParameters sparams) {
+      if (!ERROR_THROWN) {
+        ERROR_THROWN = Boolean.TRUE;
+        throw new RuntimeException("Exception for test");
+      }
+      List<CompactableFile> matches = new ArrayList<>();
+      sparams.getAvailableFiles().forEach(cf -> {
+        if (filesToCompact.contains(cf.getFileName())) {
+          matches.add(cf);
+        }
+      });
+      return new Selection(matches);
+    }
+
+  }
+
   private static final Logger log = 
LoggerFactory.getLogger(CompactionIT.class);
 
   @Override
@@ -69,6 +116,50 @@ public class CompactionIT extends AccumuloClusterHarness {
   }
 
   @Test
+  public void testBadSelector() throws Exception {
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+      final String tableName = getUniqueNames(1)[0];
+      NewTableConfiguration tc = new NewTableConfiguration();
+      // Ensure compactions don't kick off
+      tc.setProperties(Map.of(Property.TABLE_MAJC_RATIO.getKey(), "10.0"));
+      c.tableOperations().create(tableName, tc);
+      // Create multiple RFiles
+      try (BatchWriter bw = c.createBatchWriter(tableName)) {
+        for (int i = 1; i <= 4; i++) {
+          Mutation m = new Mutation(Integer.toString(i));
+          m.put("cf", "cq", new Value());
+          bw.addMutation(m);
+          bw.flush();
+          c.tableOperations().flush(tableName, null, null, true);
+        }
+      }
+
+      List<String> files = FunctionalTestUtils.getRFilePaths(c, tableName);
+      assertEquals(4, files.size());
+
+      String subset = files.get(0).substring(files.get(0).lastIndexOf('/') + 
1) + ","
+          + files.get(3).substring(files.get(3).lastIndexOf('/') + 1);
+
+      CompactionConfig config = new CompactionConfig()
+          .setSelector(new 
PluginConfig(RandomErrorThrowingSelector.class.getName(),
+              Map.of(RandomErrorThrowingSelector.FILE_LIST_PARAM, subset)))
+          .setWait(true);
+      c.tableOperations().compact(tableName, config);
+
+      // check that the subset of files selected are compacted, but the others 
remain untouched
+      List<String> filesAfterCompact = FunctionalTestUtils.getRFilePaths(c, 
tableName);
+      assertFalse(filesAfterCompact.contains(files.get(0)));
+      assertTrue(filesAfterCompact.contains(files.get(1)));
+      assertTrue(filesAfterCompact.contains(files.get(2)));
+      assertFalse(filesAfterCompact.contains(files.get(3)));
+
+      List<String> rows = new ArrayList<>();
+      c.createScanner(tableName).forEach((k, v) -> 
rows.add(k.getRow().toString()));
+      assertEquals(List.of("1", "2", "3", "4"), rows);
+    }
+  }
+
+  @Test
   public void test() throws Exception {
     try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
       final String tableName = getUniqueNames(1)[0];
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
index 735c966..6113c01 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
@@ -30,8 +30,10 @@ import java.net.http.HttpClient;
 import java.net.http.HttpRequest;
 import java.net.http.HttpResponse;
 import java.net.http.HttpResponse.BodyHandlers;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
@@ -81,6 +83,19 @@ public class FunctionalTestUtils {
     }
   }
 
+  public static List<String> getRFilePaths(AccumuloClient c, String tableName) 
throws Exception {
+    List<String> files = new ArrayList<>();
+    try (Scanner scanner = c.createScanner(MetadataTable.NAME, 
Authorizations.EMPTY)) {
+      TableId tableId = 
TableId.of(c.tableOperations().tableIdMap().get(tableName));
+      scanner.setRange(TabletsSection.getRange(tableId));
+      scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+      scanner.forEach(entry -> {
+        files.add(entry.getKey().getColumnQualifier().toString());
+      });
+    }
+    return files;
+  }
+
   static void checkRFiles(AccumuloClient c, String tableName, int minTablets, 
int maxTablets,
       int minRFiles, int maxRFiles) throws Exception {
     try (Scanner scanner = c.createScanner(MetadataTable.NAME, 
Authorizations.EMPTY)) {

Reply via email to