keith-turner commented on a change in pull request #2096: URL: https://github.com/apache/accumulo/pull/2096#discussion_r637238906
########## File path: server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java ########## @@ -0,0 +1,898 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.compactor; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; +import java.util.function.Supplier; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService; +import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService.Client; +import org.apache.accumulo.core.compaction.thrift.CompactorService; +import org.apache.accumulo.core.compaction.thrift.CompactorService.Iface; +import org.apache.accumulo.core.compaction.thrift.TCompactionState; +import org.apache.accumulo.core.compaction.thrift.UnknownCompactionIdException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.TabletFile; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.securityImpl.thrift.TCredentials; +import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; +import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.accumulo.core.trace.thrift.TInfo; +import org.apache.accumulo.core.util.Halt; +import org.apache.accumulo.core.util.HostAndPort; +import org.apache.accumulo.core.util.ServerServices; +import org.apache.accumulo.core.util.ServerServices.Service; +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; +import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.fate.util.UtilWaitThread; +import org.apache.accumulo.fate.zookeeper.ServiceLock; +import org.apache.accumulo.fate.zookeeper.ServiceLock.LockLossReason; +import org.apache.accumulo.fate.zookeeper.ServiceLock.LockWatcher; +import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.server.AbstractServer; +import org.apache.accumulo.server.GarbageCollectionLogger; +import org.apache.accumulo.server.ServerOpts; +import org.apache.accumulo.server.compaction.CompactionInfo; +import org.apache.accumulo.server.compaction.RetryableThriftCall; +import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException; +import org.apache.accumulo.server.compaction.RetryableThriftFunction; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.rpc.ServerAddress; +import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper; +import org.apache.accumulo.server.rpc.TServerUtils; +import org.apache.accumulo.server.rpc.ThriftServerType; +import org.apache.accumulo.server.security.AuditedSecurityOperation; +import org.apache.accumulo.server.security.SecurityOperation; +import org.apache.hadoop.fs.Path; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.Parameter; +import com.google.common.base.Preconditions; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +public class Compactor extends AbstractServer implements CompactorService.Iface { + + public static class CompactorServerOpts extends ServerOpts { + @Parameter(required = true, names = {"-q", "--queue"}, description = "compaction queue name") + private String queueName = null; + + public String getQueueName() { + return queueName; + } + } + + private static final Logger LOG = LoggerFactory.getLogger(Compactor.class); + private static final long TIME_BETWEEN_GC_CHECKS = 5000; + private static final long TIME_BETWEEN_CANCEL_CHECKS = 5 * 60 * 1000; + + private static final long TEN_MEGABYTES = 10485760; + private static final CompactionCoordinatorService.Client.Factory COORDINATOR_CLIENT_FACTORY = + new CompactionCoordinatorService.Client.Factory(); + + protected static final CompactionJobHolder JOB_HOLDER = new CompactionJobHolder(); + + private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger(); + private final UUID compactorId = UUID.randomUUID(); + private final AccumuloConfiguration aconf; + private final String queueName; + protected final AtomicReference<ExternalCompactionId> currentCompactionId = + new AtomicReference<>(); + + private SecurityOperation security; + private ServiceLock compactorLock; + private ServerAddress compactorAddress = null; + + // Exposed for tests + protected volatile Boolean shutdown = false; + + private final AtomicBoolean compactionRunning = new AtomicBoolean(false); + + protected Compactor(CompactorServerOpts opts, String[] args) { + super("compactor", opts, args); + queueName = opts.getQueueName(); + aconf = getConfiguration(); + setupSecurity(); + var schedExecutor = ThreadPools.createGeneralScheduledExecutorService(aconf); + startGCLogger(schedExecutor); + startCancelChecker(schedExecutor, TIME_BETWEEN_CANCEL_CHECKS); + printStartupMsg(); + } + + protected Compactor(CompactorServerOpts opts, String[] args, AccumuloConfiguration conf) { + super("compactor", opts, args); + queueName = opts.getQueueName(); + aconf = conf; + setupSecurity(); + var schedExecutor = ThreadPools.createGeneralScheduledExecutorService(aconf); + startGCLogger(schedExecutor); + startCancelChecker(schedExecutor, TIME_BETWEEN_CANCEL_CHECKS); + printStartupMsg(); + } + + protected void setupSecurity() { + getContext().setupCrypto(); + security = AuditedSecurityOperation.getInstance(getContext()); + } + + protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) { + schedExecutor.scheduleWithFixedDelay(() -> gcLogger.logGCInfo(getConfiguration()), 0, + TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS); + } + + protected void startCancelChecker(ScheduledThreadPoolExecutor schedExecutor, + long timeBetweenChecks) { + schedExecutor.scheduleWithFixedDelay(() -> checkIfCanceled(), 0, timeBetweenChecks, + TimeUnit.MILLISECONDS); + } + + protected void checkIfCanceled() { + TExternalCompactionJob job = JOB_HOLDER.getJob(); + if (job != null) { + try { + var extent = KeyExtent.fromThrift(job.getExtent()); + var ecid = ExternalCompactionId.of(job.getExternalCompactionId()); + + TabletMetadata tabletMeta = + getContext().getAmple().readTablet(extent, ColumnType.ECOMP, ColumnType.PREV_ROW); + if (tabletMeta == null || !tabletMeta.getExtent().equals(extent) + || !tabletMeta.getExternalCompactions().containsKey(ecid)) { + // table was deleted OR tablet was split or merged OR tablet no longer thinks compaction + // is running for some reason + LOG.info("Cancelling compaction {} that no longer has a metadata entry at {}", ecid, + extent); + JOB_HOLDER.cancel(job.getExternalCompactionId()); + return; + } + + if (job.getKind() == TCompactionKind.USER) { + String zTablePath = Constants.ZROOT + "/" + getContext().getInstanceID() + + Constants.ZTABLES + "/" + extent.tableId() + Constants.ZTABLE_COMPACT_CANCEL_ID; + byte[] id = getContext().getZooCache().get(zTablePath); + if (id == null) { + // table probably deleted + LOG.info("Cancelling compaction {} for table that no longer exists {}", ecid, extent); + JOB_HOLDER.cancel(job.getExternalCompactionId()); + return; + } else { + var cancelId = Long.parseLong(new String(id, UTF_8)); + + if (cancelId >= job.getUserCompactionId()) { + LOG.info("Cancelling compaction {} because user compaction was canceled"); + JOB_HOLDER.cancel(job.getExternalCompactionId()); + return; + } + } + } + } catch (RuntimeException e) { + LOG.warn("Failed to check if compaction {} for {} was canceled.", + job.getExternalCompactionId(), KeyExtent.fromThrift(job.getExtent()), e); + } + } + } + + protected void printStartupMsg() { + LOG.info("Version " + Constants.VERSION); + LOG.info("Instance " + getContext().getInstanceID()); + } + + /** + * Set up nodes and locks in ZooKeeper for this Compactor + * + * @param clientAddress + * address of this Compactor + * + * @throws KeeperException + * zookeeper error + * @throws InterruptedException + * thread interrupted + */ + protected void announceExistence(HostAndPort clientAddress) + throws KeeperException, InterruptedException { + + String hostPort = ExternalCompactionUtil.getHostPortString(clientAddress); + + ZooReaderWriter zoo = getContext().getZooReaderWriter(); + String compactorQueuePath = + getContext().getZooKeeperRoot() + Constants.ZCOMPACTORS + "/" + this.queueName; + String zPath = compactorQueuePath + "/" + hostPort; + + try { + zoo.mkdirs(compactorQueuePath); + zoo.putPersistentData(zPath, new byte[] {}, NodeExistsPolicy.SKIP); + } catch (KeeperException e) { + if (e.code() == KeeperException.Code.NOAUTH) { + LOG.error("Failed to write to ZooKeeper. Ensure that" + + " accumulo.properties, specifically instance.secret, is consistent."); + } + throw e; + } + + compactorLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), + ServiceLock.path(zPath), compactorId); + LockWatcher lw = new LockWatcher() { + @Override + public void lostLock(final LockLossReason reason) { + Halt.halt(1, () -> { + LOG.error("Compactor lost lock (reason = {}), exiting.", reason); + gcLogger.logGCInfo(getConfiguration()); + }); + } + + @Override + public void unableToMonitorLockNode(final Exception e) { + Halt.halt(1, () -> LOG.error("Lost ability to monitor Compactor lock, exiting.", e)); + } + }; + + try { + byte[] lockContent = + new ServerServices(hostPort, Service.COMPACTOR_CLIENT).toString().getBytes(UTF_8); + for (int i = 0; i < 25; i++) { + zoo.putPersistentData(zPath, new byte[0], NodeExistsPolicy.SKIP); + + if (compactorLock.tryLock(lw, lockContent)) { + LOG.debug("Obtained Compactor lock {}", compactorLock.getLockPath()); + return; + } + LOG.info("Waiting for Compactor lock"); + sleepUninterruptibly(5, TimeUnit.SECONDS); + } + String msg = "Too many retries, exiting."; + LOG.info(msg); + throw new RuntimeException(msg); + } catch (Exception e) { + LOG.info("Could not obtain tablet server lock, exiting.", e); + throw new RuntimeException(e); + } + } + + /** + * Start this Compactors thrift service to handle incoming client requests + * + * @return address of this compactor client service + * @throws UnknownHostException + * host unknown + */ + protected ServerAddress startCompactorClientService() throws UnknownHostException { + Iface rpcProxy = TraceUtil.wrapService(this); + if (getContext().getThriftServerType() == ThriftServerType.SASL) { + rpcProxy = TCredentialsUpdatingWrapper.service(rpcProxy, getClass(), getConfiguration()); + } + final CompactorService.Processor<Iface> processor = new CompactorService.Processor<>(rpcProxy); + Property maxMessageSizeProperty = (aconf.get(Property.COMPACTOR_MAX_MESSAGE_SIZE) != null + ? Property.COMPACTOR_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE); + ServerAddress sp = TServerUtils.startServer(getMetricsSystem(), getContext(), getHostname(), + Property.COMPACTOR_CLIENTPORT, processor, this.getClass().getSimpleName(), + "Thrift Client Server", Property.COMPACTOR_PORTSEARCH, Property.COMPACTOR_MINTHREADS, + Property.COMPACTOR_MINTHREADS_TIMEOUT, Property.COMPACTOR_THREADCHECK, + maxMessageSizeProperty); + LOG.info("address = {}", sp.address); + return sp; + } + + /** + * Called by a CompactionCoordinator to cancel the currently running compaction + * + * @param tinfo + * trace info + * @param credentials + * caller credentials + * @param externalCompactionId + * compaction id + * @throws UnknownCompactionIdException + * if the externalCompactionId does not match the currently executing compaction + */ + @Override + public void cancel(TInfo tinfo, TCredentials credentials, String externalCompactionId) + throws TException { + // do not expect users to call this directly, expect other tservers to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + cancel(externalCompactionId); + } + + /** + * Cancel the compaction with this id. + * + * @param externalCompactionId + * compaction id + * @throws UnknownCompactionIdException + * if the externalCompactionId does not match the currently executing compaction + * @throws TException + * thrift error + */ + private void cancel(String externalCompactionId) throws TException { + if (JOB_HOLDER.cancel(externalCompactionId)) { + LOG.info("Cancel requested for compaction job {}", externalCompactionId); + } else { + throw new UnknownCompactionIdException(); + } + } + + /** + * Send an update to the CompactionCoordinator for this job + * + * @param job + * compactionJob + * @param state + * updated state + * @param message + * updated message + * @throws RetriesExceededException + * thrown when retries have been exceeded + */ + protected void updateCompactionState(TExternalCompactionJob job, TCompactionState state, + String message) throws RetriesExceededException { + RetryableThriftCall<String> thriftCall = new RetryableThriftCall<>(1000, + RetryableThriftCall.MAX_WAIT_TIME, 25, new RetryableThriftFunction<String>() { + @Override + public String execute() throws TException { + Client coordinatorClient = getCoordinatorClient(); + try { + coordinatorClient.updateCompactionStatus(TraceUtil.traceInfo(), + getContext().rpcCreds(), job.getExternalCompactionId(), state, message, + System.currentTimeMillis()); + return ""; + } finally { + ThriftUtil.returnClient(coordinatorClient); + } + } + }); + thriftCall.run(); + } + + /** + * Notify the CompactionCoordinator the job failed + * + * @param job + * current compaction job + * @throws RetriesExceededException + * thrown when retries have been exceeded + */ + protected void updateCompactionFailed(TExternalCompactionJob job) + throws RetriesExceededException { + RetryableThriftCall<String> thriftCall = new RetryableThriftCall<>(1000, + RetryableThriftCall.MAX_WAIT_TIME, 25, new RetryableThriftFunction<String>() { + @Override + public String execute() throws TException { + Client coordinatorClient = getCoordinatorClient(); + try { + coordinatorClient.compactionFailed(TraceUtil.traceInfo(), getContext().rpcCreds(), + job.getExternalCompactionId(), job.extent); + return ""; + } finally { + ThriftUtil.returnClient(coordinatorClient); + } + } + }); + thriftCall.run(); + } + + /** + * Update the CompactionCoordinator with the stats from the completed job + * + * @param job + * current compaction job + * @param stats + * compaction stats + * @throws RetriesExceededException + * thrown when retries have been exceeded + */ + protected void updateCompactionCompleted(TExternalCompactionJob job, TCompactionStats stats) + throws RetriesExceededException { + RetryableThriftCall<String> thriftCall = new RetryableThriftCall<>(1000, + RetryableThriftCall.MAX_WAIT_TIME, 25, new RetryableThriftFunction<String>() { + @Override + public String execute() throws TException { + Client coordinatorClient = getCoordinatorClient(); + try { + coordinatorClient.compactionCompleted(TraceUtil.traceInfo(), getContext().rpcCreds(), + job.getExternalCompactionId(), job.extent, stats); + return ""; + } finally { + ThriftUtil.returnClient(coordinatorClient); + } + } + }); + thriftCall.run(); + } + + /** + * Get the next job to run + * + * @param uuid + * uuid supplier + * @return CompactionJob + * @throws RetriesExceededException + * thrown when retries have been exceeded + */ + protected TExternalCompactionJob getNextJob(Supplier<UUID> uuid) throws RetriesExceededException { + RetryableThriftCall<TExternalCompactionJob> nextJobThriftCall = + new RetryableThriftCall<>(1000, RetryableThriftCall.MAX_WAIT_TIME, 0, + new RetryableThriftFunction<TExternalCompactionJob>() { + @Override + public TExternalCompactionJob execute() throws TException { + Client coordinatorClient = getCoordinatorClient(); + try { + ExternalCompactionId eci = ExternalCompactionId.generate(uuid.get()); + LOG.trace("Attempting to get next job, eci = {}", eci); + currentCompactionId.set(eci); + return coordinatorClient.getCompactionJob(TraceUtil.traceInfo(), + getContext().rpcCreds(), queueName, + ExternalCompactionUtil.getHostPortString(compactorAddress.getAddress()), + eci.toString()); + } catch (Exception e) { + currentCompactionId.set(null); + throw e; + } finally { + ThriftUtil.returnClient(coordinatorClient); + } + } + }); + return nextJobThriftCall.run(); + } + + /** + * Get the client to the CompactionCoordinator + * + * @return compaction coordinator client + * @throws TTransportException + * when unable to get client + */ + protected CompactionCoordinatorService.Client getCoordinatorClient() throws TTransportException { + HostAndPort coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(getContext()); + if (null == coordinatorHost) { + throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper"); + } + LOG.trace("CompactionCoordinator address is: {}", coordinatorHost); + return ThriftUtil.getClient(COORDINATOR_CLIENT_FACTORY, coordinatorHost, getContext()); + } + + /** + * Create compaction runnable + * + * @param job + * compaction job + * @param totalInputEntries + * object to capture total entries + * @param totalInputBytes + * object to capture input file size + * @param started + * started latch + * @param stopped + * stopped latch + * @param err + * reference to error + * @return Runnable compaction job + */ + protected Runnable createCompactionJob(final TExternalCompactionJob job, + final LongAdder totalInputEntries, final LongAdder totalInputBytes, + final CountDownLatch started, final CountDownLatch stopped, + final AtomicReference<Throwable> err) { + + return new Runnable() { + @Override + public void run() { + // Its only expected that a single compaction runs at a time. Multiple compactions running + // at a time could cause odd behavior like out of order and unexpected thrift calls to the + // coordinator. This is a sanity check to ensure the expectation is met. Should this check + // ever fail, it means there is a bug elsewhere. + Preconditions.checkState(compactionRunning.compareAndSet(false, true)); + try { + LOG.info("Starting up compaction runnable for job: {}", job); + updateCompactionState(job, TCompactionState.STARTED, "Compaction started"); + + final AccumuloConfiguration tConfig; + + if (!job.getTableCompactionProperties().isEmpty()) { + tConfig = new ConfigurationCopy(DefaultConfiguration.getInstance()); Review comment: Looking how this works here and in the tserver code, thinking this is brittle and error prone. What if a non table props is used, like a general prop? That could cause silent failures in this code. Think we need to rework this, but not sure what to do ATM. The peer code in the tserver is in CompactableImpl.reserveExternalCompaction() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
