keith-turner commented on a change in pull request #2096: URL: https://github.com/apache/accumulo/pull/2096#discussion_r633719112
########## File path: server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java ########## @@ -0,0 +1,766 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.coordinator; + +import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; + +import java.net.UnknownHostException; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.coordinator.QueueSummaries.PrioTserver; +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.clientImpl.ThriftTransportPool; +import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; +import org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Iface; +import org.apache.accumulo.core.compaction.thrift.Compactor; +import org.apache.accumulo.core.compaction.thrift.TCompactionState; +import org.apache.accumulo.core.compaction.thrift.UnknownCompactionIdException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; +import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.securityImpl.thrift.TCredentials; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; +import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.accumulo.core.trace.thrift.TInfo; +import org.apache.accumulo.core.util.HostAndPort; +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; +import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.fate.util.UtilWaitThread; +import org.apache.accumulo.fate.zookeeper.ServiceLock; +import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.server.AbstractServer; +import org.apache.accumulo.server.GarbageCollectionLogger; +import org.apache.accumulo.server.ServerOpts; +import org.apache.accumulo.server.compaction.RetryableThriftCall; +import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException; +import org.apache.accumulo.server.compaction.RetryableThriftFunction; +import org.apache.accumulo.server.manager.LiveTServerSet; +import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection; +import org.apache.accumulo.server.rpc.ServerAddress; +import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper; +import org.apache.accumulo.server.rpc.TServerUtils; +import org.apache.accumulo.server.rpc.ThriftServerType; +import org.apache.accumulo.server.security.AuditedSecurityOperation; +import org.apache.accumulo.server.security.SecurityOperation; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CompactionCoordinator extends AbstractServer + implements org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Iface, + LiveTServerSet.Listener { + + private static final Logger LOG = LoggerFactory.getLogger(CompactionCoordinator.class); + private static final long TIME_BETWEEN_GC_CHECKS = 5000; + private static final long FIFTEEN_MINUTES = + TimeUnit.MILLISECONDS.convert(Duration.of(15, TimeUnit.MINUTES.toChronoUnit())); + + protected static final QueueSummaries QUEUE_SUMMARIES = new QueueSummaries(); + + /* Map of compactionId to RunningCompactions */ + protected static final Map<ExternalCompactionId,RunningCompaction> RUNNING = + new ConcurrentHashMap<>(); + + /* Map of queue name to last time compactor called to get a compaction job */ + private static final Map<String,Long> TIME_COMPACTOR_LAST_CHECKED = new ConcurrentHashMap<>(); + + private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger(); + protected SecurityOperation security; + protected final AccumuloConfiguration aconf; + protected CompactionFinalizer compactionFinalizer; + protected LiveTServerSet tserverSet; + + private ServiceLock coordinatorLock; + + // Exposed for tests + protected volatile Boolean shutdown = false; + + private ScheduledThreadPoolExecutor schedExecutor; + + protected CompactionCoordinator(ServerOpts opts, String[] args) { + super("compaction-coordinator", opts, args); + aconf = getConfiguration(); + schedExecutor = ThreadPools.createGeneralScheduledExecutorService(aconf); + compactionFinalizer = createCompactionFinalizer(schedExecutor); + tserverSet = createLiveTServerSet(); + setupSecurity(); + startGCLogger(schedExecutor); + printStartupMsg(); + startCompactionCleaner(schedExecutor); + } + + protected CompactionCoordinator(ServerOpts opts, String[] args, AccumuloConfiguration conf) { + super("compaction-coordinator", opts, args); + aconf = conf; + schedExecutor = ThreadPools.createGeneralScheduledExecutorService(aconf); + compactionFinalizer = createCompactionFinalizer(schedExecutor); + tserverSet = createLiveTServerSet(); + setupSecurity(); + startGCLogger(schedExecutor); + printStartupMsg(); + startCompactionCleaner(schedExecutor); + } + + protected CompactionFinalizer + createCompactionFinalizer(ScheduledThreadPoolExecutor schedExecutor) { + return new CompactionFinalizer(getContext(), schedExecutor); + } + + protected LiveTServerSet createLiveTServerSet() { + return new LiveTServerSet(getContext(), this); + } + + protected void setupSecurity() { + getContext().setupCrypto(); + security = AuditedSecurityOperation.getInstance(getContext()); + } + + protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) { + schedExecutor.scheduleWithFixedDelay(() -> gcLogger.logGCInfo(getConfiguration()), 0, + TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS); + } + + private void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) { + schedExecutor.scheduleWithFixedDelay(() -> cleanUpCompactors(), 0, 5, TimeUnit.MINUTES); + } + + protected void printStartupMsg() { + LOG.info("Version " + Constants.VERSION); + LOG.info("Instance " + getContext().getInstanceID()); + } + + /** + * Set up nodes and locks in ZooKeeper for this CompactionCoordinator + * + * @param clientAddress + * address of this Compactor + * @throws KeeperException + * zookeeper error + * @throws InterruptedException + * thread interrupted + */ + protected void getCoordinatorLock(HostAndPort clientAddress) + throws KeeperException, InterruptedException { + LOG.info("trying to get coordinator lock"); + + final String coordinatorClientAddress = ExternalCompactionUtil.getHostPortString(clientAddress); + final String lockPath = getContext().getZooKeeperRoot() + Constants.ZCOORDINATOR_LOCK; + final UUID zooLockUUID = UUID.randomUUID(); + + while (true) { + + CoordinatorLockWatcher coordinatorLockWatcher = new CoordinatorLockWatcher(); + coordinatorLock = new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), + ServiceLock.path(lockPath), zooLockUUID); + coordinatorLock.lock(coordinatorLockWatcher, coordinatorClientAddress.getBytes()); + + coordinatorLockWatcher.waitForChange(); + if (coordinatorLockWatcher.isAcquiredLock()) { + break; + } + if (!coordinatorLockWatcher.isFailedToAcquireLock()) { + throw new IllegalStateException("manager lock in unknown state"); + } + coordinatorLock.tryToCancelAsyncLockOrUnlock(); + + sleepUninterruptibly(1000, TimeUnit.MILLISECONDS); + } + } + + /** + * Start this CompactionCoordinator thrift service to handle incoming client requests + * + * @return address of this CompactionCoordinator client service + * @throws UnknownHostException + * host unknown + */ + protected ServerAddress startCoordinatorClientService() throws UnknownHostException { + Iface rpcProxy = TraceUtil.wrapService(this); + if (getContext().getThriftServerType() == ThriftServerType.SASL) { + rpcProxy = TCredentialsUpdatingWrapper.service(rpcProxy, CompactionCoordinator.class, + getConfiguration()); + } + final org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Processor< + Iface> processor = + new org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Processor<>( + rpcProxy); + Property maxMessageSizeProperty = + (aconf.get(Property.COORDINATOR_THRIFTCLIENT_MAX_MESSAGE_SIZE) != null + ? Property.COORDINATOR_THRIFTCLIENT_MAX_MESSAGE_SIZE + : Property.GENERAL_MAX_MESSAGE_SIZE); + ServerAddress sp = TServerUtils.startServer(getMetricsSystem(), getContext(), getHostname(), + Property.COORDINATOR_THRIFTCLIENT_CLIENTPORT, processor, this.getClass().getSimpleName(), + "Thrift Client Server", Property.COORDINATOR_THRIFTCLIENT_PORTSEARCH, + Property.COORDINATOR_THRIFTCLIENT_MINTHREADS, + Property.COORDINATOR_THRIFTCLIENT_MINTHREADS_TIMEOUT, + Property.COORDINATOR_THRIFTCLIENT_THREADCHECK, maxMessageSizeProperty); + LOG.info("address = {}", sp.address); + return sp; + } + + @Override + public void run() { + + ServerAddress coordinatorAddress = null; + try { + coordinatorAddress = startCoordinatorClientService(); + } catch (UnknownHostException e1) { + throw new RuntimeException("Failed to start the coordinator service", e1); + } + final HostAndPort clientAddress = coordinatorAddress.address; + + try { + getCoordinatorLock(clientAddress); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException("Exception getting Coordinator lock", e); + } + + // On a re-start of the coordinator it's possible that external compactions are in-progress. + // Attempt to get the running compactions on the compactors and then resolve which tserver + // the external compaction came from to re-populate the RUNNING collection. + LOG.info("Checking for running external compactions"); + tserverSet.scanServers(); + final Set<TServerInstance> tservers = tserverSet.getCurrentServers(); + if (null != tservers && !tservers.isEmpty()) { + // On re-start contact the running Compactors to try and seed the list of running compactions + Map<HostAndPort,TExternalCompactionJob> running = + ExternalCompactionUtil.getCompactionsRunningOnCompactors(getContext()); + if (running.isEmpty()) { + LOG.info("No compactions running on Compactors."); + } else { + LOG.info("Found {} running external compactions", running.size()); + running.forEach((hp, job) -> { + // Find the tserver that has this compaction id + boolean matchFound = false; + + // Attempt to find the TServer hosting the tablet based on the metadata table + // TODO use #1974 for more efficient metadata reads + KeyExtent extent = KeyExtent.fromThrift(job.getExtent()); + LOG.debug("Getting tablet metadata for extent: {}", extent); + TabletMetadata tabletMetadata = getMetadataEntryForExtent(extent); + + if (tabletMetadata != null && tabletMetadata.getExtent().equals(extent) + && tabletMetadata.getLocation() != null + && tabletMetadata.getLocation().getType() == LocationType.CURRENT) { + + TServerInstance tsi = tservers.stream() + .filter( + t -> t.getHostAndPort().equals(tabletMetadata.getLocation().getHostAndPort())) + .findFirst().orElse(null); + + if (null != tsi) { + TabletClientService.Client client = null; + try { + LOG.debug( + "Checking to see if tserver {} is running external compaction for extent: {}", + tsi.getHostAndPort(), extent); + client = getTabletServerConnection(tsi); + boolean tserverMatch = client.isRunningExternalCompaction(TraceUtil.traceInfo(), + getContext().rpcCreds(), job.getExternalCompactionId(), job.getExtent()); + if (tserverMatch) { + LOG.debug( + "Tablet server {} is running external compaction for extent: {}, adding to running list", + tsi.getHostAndPort(), extent); + RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()), + new RunningCompaction(job, ExternalCompactionUtil.getHostPortString(hp), + tsi)); + matchFound = true; + } else { + LOG.debug("Tablet server {} is NOT running external compaction for extent: {}", + tsi.getHostAndPort(), extent); + } + } catch (TException e) { + LOG.warn("Failed to notify tserver {}", + tabletMetadata.getLocation().getHostAndPort(), e); + } finally { + ThriftUtil.returnClient(client); + } + } else { + LOG.info("Tablet server {} is not currently in live tserver set", + tabletMetadata.getLocation().getHostAndPort()); + } + } else { + LOG.info("No current location for extent: {}", extent); + } Review comment: To be more specific, my IDE indicated nothing calls [RunningCompaction.getTserver()](https://github.com/apache/accumulo/blob/cdfec2afccd22ea344ad8fd3a3ce622ee5b87324/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/RunningCompaction.java#L58) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
