This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new 70e106f Added test for deleting a table during a system compaction 70e106f is described below commit 70e106f794c3fbd338bf7735dc7cab39d926bfc7 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Wed Apr 21 16:30:30 2021 +0000 Added test for deleting a table during a system compaction --- .../org/apache/accumulo/compactor/Compactor.java | 26 ++++--- .../apache/accumulo/test/ExternalCompactionIT.java | 42 ++++++++++- .../accumulo/test/ExternalDoNothingCompactor.java | 83 ++++++++++++++++++++++ 3 files changed, 140 insertions(+), 11 deletions(-) diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 10d1cfa..528810f 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -102,7 +102,7 @@ import com.beust.jcommander.Parameter; public class Compactor extends AbstractServer implements org.apache.accumulo.core.compaction.thrift.Compactor.Iface { - static class CompactorServerOpts extends ServerOpts { + public static class CompactorServerOpts extends ServerOpts { @Parameter(required = true, names = {"-q", "--queue"}, description = "compaction queue name") private String queueName = null; @@ -113,11 +113,12 @@ public class Compactor extends AbstractServer private static final Logger LOG = LoggerFactory.getLogger(Compactor.class); private static final long TIME_BETWEEN_GC_CHECKS = 5000; - private static final CompactionJobHolder JOB_HOLDER = new CompactionJobHolder(); private static final long TEN_MEGABYTES = 10485760; private static final CompactionCoordinator.Client.Factory COORDINATOR_CLIENT_FACTORY = new CompactionCoordinator.Client.Factory(); + protected static final CompactionJobHolder JOB_HOLDER = new CompactionJobHolder(); + private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger(); private final UUID compactorId = UUID.randomUUID(); private final AccumuloConfiguration aconf; @@ -134,7 +135,7 @@ public class Compactor extends AbstractServer // Exposed for tests protected volatile Boolean shutdown = false; - Compactor(CompactorServerOpts opts, String[] args) { + protected Compactor(CompactorServerOpts opts, String[] args) { super("compactor", opts, args); queueName = opts.getQueueName(); aconf = getConfiguration(); @@ -599,8 +600,8 @@ public class Compactor extends AbstractServer final CountDownLatch stopped = new CountDownLatch(1); final Thread compactionThread = Threads.createThread( - "Compaction job for tablet " + job.getExtent().toString(), this.createCompactionJob(job, - totalInputEntries, totalInputBytes, started, stopped, err)); + "Compaction job for tablet " + job.getExtent().toString(), + createCompactionJob(job, totalInputEntries, totalInputBytes, started, stopped, err)); synchronized (JOB_HOLDER) { JOB_HOLDER.set(job, compactionThread); @@ -608,7 +609,8 @@ public class Compactor extends AbstractServer final String tableId = new String(job.getExtent().getTable(), UTF_8); final ServerContext ctxRef = getContext(); - String tablePath = getContext().getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId; + final String tablePath = + getContext().getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId; Watcher tableNodeWatcher = new Watcher() { @Override public void process(WatchedEvent event) { @@ -682,12 +684,14 @@ public class Compactor extends AbstractServer e.getMessage()); } } + } else { + LOG.warn("Waiting on compaction thread to finish, but no RUNNING compaction"); } } compactionThread.join(); LOG.info("Compaction thread finished."); - if (compactionThread.isInterrupted() + if (compactionThread.isInterrupted() || JOB_HOLDER.isCancelled() || ((err.get() != null && err.get().getClass().equals(InterruptedException.class)))) { LOG.warn("Compaction thread was interrupted, sending CANCELLED state"); try { @@ -736,8 +740,12 @@ public class Compactor extends AbstractServer LOG.error("Error cancelling compaction.", e2); } } finally { - getContext().getZooReaderWriter().getZooKeeper().removeWatches(tablePath, - tableNodeWatcher, WatcherType.Any, true); + try { + getContext().getZooReaderWriter().getZooKeeper().removeWatches(tablePath, + tableNodeWatcher, WatcherType.Any, true); + } catch (KeeperException e) { + LOG.error("Error removing watch from {}.", tablePath, e); + } currentCompactionId.set(null); } diff --git a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java index bffd526..32566b1 100644 --- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java @@ -44,6 +44,7 @@ import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; @@ -192,13 +193,51 @@ public class ExternalCompactionIT extends ConfigurableMacBase { } } + @Test + public void testDeleteTableDuringExternalCompaction() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + + String table1 = "ectt5"; + createTable(client, table1, "cs1"); + // set compaction ratio to 1 so that majc occurs naturally, not user compaction + client.tableOperations().setProperty(table1, Property.TABLE_MAJC_RATIO.toString(), "1.0"); + // cause multiple rfiles to be created + writeData(client, table1); + writeData(client, table1); + writeData(client, table1); + writeData(client, table1); + + // The ExternalDoNothingCompactor creates a compaction thread that + // sleeps for 5 minutes. The compaction should occur naturally. + // Wait for the coordinator to insert the running compaction metadata + // entry into the metadata table, then delete the table. + cluster.exec(ExternalDoNothingCompactor.class, "-q", "DCQ1"); + cluster.exec(CompactionCoordinator.class); + + List<TabletMetadata> md = new ArrayList<>(); + TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() + .forLevel(DataLevel.USER).fetch(ColumnType.ECOMP).build(); + tm.forEach(t -> md.add(t)); + + while (md.size() == 0) { + tm.close(); + md.clear(); + tm = getCluster().getServerContext().getAmple().readTablets().forLevel(DataLevel.USER) + .fetch(ColumnType.ECOMP).build(); + tm.forEach(t -> md.add(t)); + } + client.tableOperations().delete(table1); + // CBUG: How to verify? Metadata tablets are gone... + UtilWaitThread.sleep(1000); // to see the logs + } + } + // CBUG add test that configures output file for external compaction // CBUG add test that verifies iterators configured on table (not on user compaction) are used in // external compaction @Test - // @Ignore // waiting for solution to issue #2019 public void testExternalCompactionDeadTServer() throws Exception { // Shut down the normal TServers getCluster().getProcesses().get(TABLET_SERVER).forEach(p -> { @@ -230,7 +269,6 @@ public class ExternalCompactionIT extends ConfigurableMacBase { fs = getCluster().getServerContext().getAmple().getExternalCompactionFinalStates(); } - // We need to wait until the metadata entries show up LOG.info("Validating metadata table contents."); TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() .forLevel(DataLevel.USER).fetch(ColumnType.ECOMP).build(); diff --git a/test/src/main/java/org/apache/accumulo/test/ExternalDoNothingCompactor.java b/test/src/main/java/org/apache/accumulo/test/ExternalDoNothingCompactor.java new file mode 100644 index 0000000..49f6acb --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/ExternalDoNothingCompactor.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; + +import org.apache.accumulo.compactor.Compactor; +import org.apache.accumulo.core.compaction.thrift.CompactionState; +import org.apache.accumulo.core.compaction.thrift.Compactor.Iface; +import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; +import org.apache.accumulo.fate.util.UtilWaitThread; +import org.apache.accumulo.server.compaction.Compactor.CompactionCanceledException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExternalDoNothingCompactor extends Compactor implements Iface { + + private static final Logger LOG = LoggerFactory.getLogger(ExternalDoNothingCompactor.class); + + ExternalDoNothingCompactor(CompactorServerOpts opts, String[] args) { + super(opts, args); + } + + @Override + protected Runnable createCompactionJob(TExternalCompactionJob job, LongAdder totalInputEntries, + LongAdder totalInputBytes, CountDownLatch started, CountDownLatch stopped, + AtomicReference<Throwable> err) { + + return new Runnable() { + @Override + public void run() { + try { + LOG.info("Starting up compaction runnable for job: {}", job); + updateCompactionState(job, CompactionState.STARTED, "Compaction started"); + + LOG.info("Starting compactor"); + started.countDown(); + + while (!JOB_HOLDER.isCancelled()) { + LOG.info("Sleeping while job is not cancelled"); + UtilWaitThread.sleep(1000); + } + // Compactor throws this exception when cancelled + throw new CompactionCanceledException(); + + } catch (Exception e) { + LOG.error("Compaction failed", e); + err.set(e); + throw new RuntimeException("Compaction failed", e); + } finally { + stopped.countDown(); + } + } + }; + + } + + public static void main(String[] args) throws Exception { + try (ExternalDoNothingCompactor compactor = + new ExternalDoNothingCompactor(new CompactorServerOpts(), args)) { + compactor.runServer(); + } + } + +}