keith-turner commented on a change in pull request #2096: URL: https://github.com/apache/accumulo/pull/2096#discussion_r633923100
########## File path: server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java ########## @@ -0,0 +1,889 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.compactor; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; +import java.util.function.Supplier; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.compaction.thrift.CompactionCoordinator; +import org.apache.accumulo.core.compaction.thrift.Compactor.Iface; +import org.apache.accumulo.core.compaction.thrift.TCompactionState; +import org.apache.accumulo.core.compaction.thrift.UnknownCompactionIdException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.TabletFile; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.securityImpl.thrift.TCredentials; +import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; +import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.accumulo.core.trace.thrift.TInfo; +import org.apache.accumulo.core.util.Halt; +import org.apache.accumulo.core.util.HostAndPort; +import org.apache.accumulo.core.util.ServerServices; +import org.apache.accumulo.core.util.ServerServices.Service; +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; +import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.fate.util.UtilWaitThread; +import org.apache.accumulo.fate.zookeeper.ServiceLock; +import org.apache.accumulo.fate.zookeeper.ServiceLock.LockLossReason; +import org.apache.accumulo.fate.zookeeper.ServiceLock.LockWatcher; +import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.server.AbstractServer; +import org.apache.accumulo.server.GarbageCollectionLogger; +import org.apache.accumulo.server.ServerOpts; +import org.apache.accumulo.server.compaction.CompactionInfo; +import org.apache.accumulo.server.compaction.RetryableThriftCall; +import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException; +import org.apache.accumulo.server.compaction.RetryableThriftFunction; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.rpc.ServerAddress; +import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper; +import org.apache.accumulo.server.rpc.TServerUtils; +import org.apache.accumulo.server.rpc.ThriftServerType; +import org.apache.accumulo.server.security.AuditedSecurityOperation; +import org.apache.accumulo.server.security.SecurityOperation; +import org.apache.hadoop.fs.Path; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.Parameter; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +public class Compactor extends AbstractServer + implements org.apache.accumulo.core.compaction.thrift.Compactor.Iface { + + public static class CompactorServerOpts extends ServerOpts { + @Parameter(required = true, names = {"-q", "--queue"}, description = "compaction queue name") + private String queueName = null; + + public String getQueueName() { + return queueName; + } + } + + private static final Logger LOG = LoggerFactory.getLogger(Compactor.class); + private static final long TIME_BETWEEN_GC_CHECKS = 5000; + private static final long TIME_BETWEEN_CANCEL_CHECKS = 5 * 60 * 1000; + + private static final long TEN_MEGABYTES = 10485760; + private static final CompactionCoordinator.Client.Factory COORDINATOR_CLIENT_FACTORY = + new CompactionCoordinator.Client.Factory(); + + protected static final CompactionJobHolder JOB_HOLDER = new CompactionJobHolder(); + + private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger(); + private final UUID compactorId = UUID.randomUUID(); + private final AccumuloConfiguration aconf; + private final String queueName; + private final AtomicReference<CompactionCoordinator.Client> coordinatorClient = + new AtomicReference<>(); + protected final AtomicReference<ExternalCompactionId> currentCompactionId = + new AtomicReference<>(); + + private SecurityOperation security; + private ServiceLock compactorLock; + private ServerAddress compactorAddress = null; + + // Exposed for tests + protected volatile Boolean shutdown = false; + + protected Compactor(CompactorServerOpts opts, String[] args) { + super("compactor", opts, args); + queueName = opts.getQueueName(); + aconf = getConfiguration(); + setupSecurity(); + var schedExecutor = ThreadPools.createGeneralScheduledExecutorService(aconf); + startGCLogger(schedExecutor); + startCancelChecker(schedExecutor, TIME_BETWEEN_CANCEL_CHECKS); + printStartupMsg(); + } + + protected Compactor(CompactorServerOpts opts, String[] args, AccumuloConfiguration conf) { + super("compactor", opts, args); + queueName = opts.getQueueName(); + aconf = conf; + setupSecurity(); + var schedExecutor = ThreadPools.createGeneralScheduledExecutorService(aconf); + startGCLogger(schedExecutor); + startCancelChecker(schedExecutor, TIME_BETWEEN_CANCEL_CHECKS); + printStartupMsg(); + } + + protected void setupSecurity() { + getContext().setupCrypto(); + security = AuditedSecurityOperation.getInstance(getContext()); + } + + protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) { + schedExecutor.scheduleWithFixedDelay(() -> gcLogger.logGCInfo(getConfiguration()), 0, + TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS); + } + + protected void startCancelChecker(ScheduledThreadPoolExecutor schedExecutor, + long timeBetweenChecks) { + schedExecutor.scheduleWithFixedDelay(() -> checkIfCanceled(), 0, timeBetweenChecks, + TimeUnit.MILLISECONDS); + } + + protected void checkIfCanceled() { + TExternalCompactionJob job = JOB_HOLDER.getJob(); + if (job != null) { + try { + var extent = KeyExtent.fromThrift(job.getExtent()); + var ecid = ExternalCompactionId.of(job.getExternalCompactionId()); + + TabletMetadata tabletMeta = + getContext().getAmple().readTablet(extent, ColumnType.ECOMP, ColumnType.PREV_ROW); + if (tabletMeta == null || !tabletMeta.getExtent().equals(extent) + || !tabletMeta.getExternalCompactions().containsKey(ecid)) { + // table was deleted OR tablet was split or merged OR tablet no longer thinks compaction + // is running for some reason + LOG.info("Cancelling compaction {} that no longer has a metadata entry at {}", ecid, Review comment: @ctubbsii I Am currently working through a complete review of the code. You had mentioned having a very generic per tserver plugin to generally handle external compactions. Thinking about that when looking at this code, it cancels a running external compaction for a tablet that was deleted (split/merge/table deletion). To handle this case in a highly pluggable environment, this functionality would need to be present in some form in the SPI. This would be unrelated to any pluggable component on a tserver, its more related to running external compactions. The tablet for a running external compaction could be unloaded and then the tablet deleted, no tserver would be aware of this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
