This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to 
refs/heads/1451-external-compactions-feature by this push:
     new 70e106f  Added test for deleting a table during a system compaction
70e106f is described below

commit 70e106f794c3fbd338bf7735dc7cab39d926bfc7
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Wed Apr 21 16:30:30 2021 +0000

    Added test for deleting a table during a system compaction
---
 .../org/apache/accumulo/compactor/Compactor.java   | 26 ++++---
 .../apache/accumulo/test/ExternalCompactionIT.java | 42 ++++++++++-
 .../accumulo/test/ExternalDoNothingCompactor.java  | 83 ++++++++++++++++++++++
 3 files changed, 140 insertions(+), 11 deletions(-)

diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 10d1cfa..528810f 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -102,7 +102,7 @@ import com.beust.jcommander.Parameter;
 public class Compactor extends AbstractServer
     implements org.apache.accumulo.core.compaction.thrift.Compactor.Iface {
 
-  static class CompactorServerOpts extends ServerOpts {
+  public static class CompactorServerOpts extends ServerOpts {
     @Parameter(required = true, names = {"-q", "--queue"}, description = 
"compaction queue name")
     private String queueName = null;
 
@@ -113,11 +113,12 @@ public class Compactor extends AbstractServer
 
   private static final Logger LOG = LoggerFactory.getLogger(Compactor.class);
   private static final long TIME_BETWEEN_GC_CHECKS = 5000;
-  private static final CompactionJobHolder JOB_HOLDER = new 
CompactionJobHolder();
   private static final long TEN_MEGABYTES = 10485760;
   private static final CompactionCoordinator.Client.Factory 
COORDINATOR_CLIENT_FACTORY =
       new CompactionCoordinator.Client.Factory();
 
+  protected static final CompactionJobHolder JOB_HOLDER = new 
CompactionJobHolder();
+
   private final GarbageCollectionLogger gcLogger = new 
GarbageCollectionLogger();
   private final UUID compactorId = UUID.randomUUID();
   private final AccumuloConfiguration aconf;
@@ -134,7 +135,7 @@ public class Compactor extends AbstractServer
   // Exposed for tests
   protected volatile Boolean shutdown = false;
 
-  Compactor(CompactorServerOpts opts, String[] args) {
+  protected Compactor(CompactorServerOpts opts, String[] args) {
     super("compactor", opts, args);
     queueName = opts.getQueueName();
     aconf = getConfiguration();
@@ -599,8 +600,8 @@ public class Compactor extends AbstractServer
         final CountDownLatch stopped = new CountDownLatch(1);
 
         final Thread compactionThread = Threads.createThread(
-            "Compaction job for tablet " + job.getExtent().toString(), 
this.createCompactionJob(job,
-                totalInputEntries, totalInputBytes, started, stopped, err));
+            "Compaction job for tablet " + job.getExtent().toString(),
+            createCompactionJob(job, totalInputEntries, totalInputBytes, 
started, stopped, err));
 
         synchronized (JOB_HOLDER) {
           JOB_HOLDER.set(job, compactionThread);
@@ -608,7 +609,8 @@ public class Compactor extends AbstractServer
 
         final String tableId = new String(job.getExtent().getTable(), UTF_8);
         final ServerContext ctxRef = getContext();
-        String tablePath = getContext().getZooKeeperRoot() + Constants.ZTABLES 
+ "/" + tableId;
+        final String tablePath =
+            getContext().getZooKeeperRoot() + Constants.ZTABLES + "/" + 
tableId;
         Watcher tableNodeWatcher = new Watcher() {
           @Override
           public void process(WatchedEvent event) {
@@ -682,12 +684,14 @@ public class Compactor extends AbstractServer
                       e.getMessage());
                 }
               }
+            } else {
+              LOG.warn("Waiting on compaction thread to finish, but no RUNNING 
compaction");
             }
           }
           compactionThread.join();
           LOG.info("Compaction thread finished.");
 
-          if (compactionThread.isInterrupted()
+          if (compactionThread.isInterrupted() || JOB_HOLDER.isCancelled()
               || ((err.get() != null && 
err.get().getClass().equals(InterruptedException.class)))) {
             LOG.warn("Compaction thread was interrupted, sending CANCELLED 
state");
             try {
@@ -736,8 +740,12 @@ public class Compactor extends AbstractServer
             LOG.error("Error cancelling compaction.", e2);
           }
         } finally {
-          
getContext().getZooReaderWriter().getZooKeeper().removeWatches(tablePath,
-              tableNodeWatcher, WatcherType.Any, true);
+          try {
+            
getContext().getZooReaderWriter().getZooKeeper().removeWatches(tablePath,
+                tableNodeWatcher, WatcherType.Any, true);
+          } catch (KeeperException e) {
+            LOG.error("Error removing watch from {}.", tablePath, e);
+          }
           currentCompactionId.set(null);
         }
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java 
b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
index bffd526..32566b1 100644
--- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
@@ -44,6 +44,7 @@ import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
@@ -192,13 +193,51 @@ public class ExternalCompactionIT extends 
ConfigurableMacBase {
     }
   }
 
+  @Test
+  public void testDeleteTableDuringExternalCompaction() throws Exception {
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
+
+      String table1 = "ectt5";
+      createTable(client, table1, "cs1");
+      // set compaction ratio to 1 so that majc occurs naturally, not user 
compaction
+      client.tableOperations().setProperty(table1, 
Property.TABLE_MAJC_RATIO.toString(), "1.0");
+      // cause multiple rfiles to be created
+      writeData(client, table1);
+      writeData(client, table1);
+      writeData(client, table1);
+      writeData(client, table1);
+
+      // The ExternalDoNothingCompactor creates a compaction thread that
+      // sleeps for 5 minutes. The compaction should occur naturally.
+      // Wait for the coordinator to insert the running compaction metadata
+      // entry into the metadata table, then delete the table.
+      cluster.exec(ExternalDoNothingCompactor.class, "-q", "DCQ1");
+      cluster.exec(CompactionCoordinator.class);
+
+      List<TabletMetadata> md = new ArrayList<>();
+      TabletsMetadata tm = 
getCluster().getServerContext().getAmple().readTablets()
+          .forLevel(DataLevel.USER).fetch(ColumnType.ECOMP).build();
+      tm.forEach(t -> md.add(t));
+
+      while (md.size() == 0) {
+        tm.close();
+        md.clear();
+        tm = 
getCluster().getServerContext().getAmple().readTablets().forLevel(DataLevel.USER)
+            .fetch(ColumnType.ECOMP).build();
+        tm.forEach(t -> md.add(t));
+      }
+      client.tableOperations().delete(table1);
+      // CBUG: How to verify? Metadata tablets are gone...
+      UtilWaitThread.sleep(1000); // to see the logs
+    }
+  }
+
   // CBUG add test that configures output file for external compaction
 
   // CBUG add test that verifies iterators configured on table (not on user 
compaction) are used in
   // external compaction
 
   @Test
-  // @Ignore // waiting for solution to issue #2019
   public void testExternalCompactionDeadTServer() throws Exception {
     // Shut down the normal TServers
     getCluster().getProcesses().get(TABLET_SERVER).forEach(p -> {
@@ -230,7 +269,6 @@ public class ExternalCompactionIT extends 
ConfigurableMacBase {
         fs = 
getCluster().getServerContext().getAmple().getExternalCompactionFinalStates();
       }
 
-      // We need to wait until the metadata entries show up
       LOG.info("Validating metadata table contents.");
       TabletsMetadata tm = 
getCluster().getServerContext().getAmple().readTablets()
           .forLevel(DataLevel.USER).fetch(ColumnType.ECOMP).build();
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ExternalDoNothingCompactor.java 
b/test/src/main/java/org/apache/accumulo/test/ExternalDoNothingCompactor.java
new file mode 100644
index 0000000..49f6acb
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/ExternalDoNothingCompactor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.accumulo.compactor.Compactor;
+import org.apache.accumulo.core.compaction.thrift.CompactionState;
+import org.apache.accumulo.core.compaction.thrift.Compactor.Iface;
+import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import 
org.apache.accumulo.server.compaction.Compactor.CompactionCanceledException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExternalDoNothingCompactor extends Compactor implements Iface {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExternalDoNothingCompactor.class);
+
+  ExternalDoNothingCompactor(CompactorServerOpts opts, String[] args) {
+    super(opts, args);
+  }
+
+  @Override
+  protected Runnable createCompactionJob(TExternalCompactionJob job, LongAdder 
totalInputEntries,
+      LongAdder totalInputBytes, CountDownLatch started, CountDownLatch 
stopped,
+      AtomicReference<Throwable> err) {
+
+    return new Runnable() {
+      @Override
+      public void run() {
+        try {
+          LOG.info("Starting up compaction runnable for job: {}", job);
+          updateCompactionState(job, CompactionState.STARTED, "Compaction 
started");
+
+          LOG.info("Starting compactor");
+          started.countDown();
+
+          while (!JOB_HOLDER.isCancelled()) {
+            LOG.info("Sleeping while job is not cancelled");
+            UtilWaitThread.sleep(1000);
+          }
+          // Compactor throws this exception when cancelled
+          throw new CompactionCanceledException();
+
+        } catch (Exception e) {
+          LOG.error("Compaction failed", e);
+          err.set(e);
+          throw new RuntimeException("Compaction failed", e);
+        } finally {
+          stopped.countDown();
+        }
+      }
+    };
+
+  }
+
+  public static void main(String[] args) throws Exception {
+    try (ExternalDoNothingCompactor compactor =
+        new ExternalDoNothingCompactor(new CompactorServerOpts(), args)) {
+      compactor.runServer();
+    }
+  }
+
+}

Reply via email to