keith-turner commented on a change in pull request #2096:
URL: https://github.com/apache/accumulo/pull/2096#discussion_r633713455



##########
File path: 
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
##########
@@ -0,0 +1,766 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.coordinator;
+
+import static 
org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
+
+import java.net.UnknownHostException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.coordinator.QueueSummaries.PrioTserver;
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.clientImpl.ThriftTransportPool;
+import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
+import org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Iface;
+import org.apache.accumulo.core.compaction.thrift.Compactor;
+import org.apache.accumulo.core.compaction.thrift.TCompactionState;
+import org.apache.accumulo.core.compaction.thrift.UnknownCompactionIdException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
+import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
+import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
+import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary;
+import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
+import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.trace.thrift.TInfo;
+import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.ServiceLock;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.server.AbstractServer;
+import org.apache.accumulo.server.GarbageCollectionLogger;
+import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.compaction.RetryableThriftCall;
+import 
org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException;
+import org.apache.accumulo.server.compaction.RetryableThriftFunction;
+import org.apache.accumulo.server.manager.LiveTServerSet;
+import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection;
+import org.apache.accumulo.server.rpc.ServerAddress;
+import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
+import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.accumulo.server.security.AuditedSecurityOperation;
+import org.apache.accumulo.server.security.SecurityOperation;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CompactionCoordinator extends AbstractServer
+    implements 
org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Iface,
+    LiveTServerSet.Listener {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionCoordinator.class);
+  private static final long TIME_BETWEEN_GC_CHECKS = 5000;
+  private static final long FIFTEEN_MINUTES =
+      TimeUnit.MILLISECONDS.convert(Duration.of(15, 
TimeUnit.MINUTES.toChronoUnit()));
+
+  protected static final QueueSummaries QUEUE_SUMMARIES = new QueueSummaries();
+
+  /* Map of compactionId to RunningCompactions */
+  protected static final Map<ExternalCompactionId,RunningCompaction> RUNNING =
+      new ConcurrentHashMap<>();
+
+  /* Map of queue name to last time compactor called to get a compaction job */
+  private static final Map<String,Long> TIME_COMPACTOR_LAST_CHECKED = new 
ConcurrentHashMap<>();
+
+  private final GarbageCollectionLogger gcLogger = new 
GarbageCollectionLogger();
+  protected SecurityOperation security;
+  protected final AccumuloConfiguration aconf;
+  protected CompactionFinalizer compactionFinalizer;
+  protected LiveTServerSet tserverSet;
+
+  private ServiceLock coordinatorLock;
+
+  // Exposed for tests
+  protected volatile Boolean shutdown = false;
+
+  private ScheduledThreadPoolExecutor schedExecutor;
+
+  protected CompactionCoordinator(ServerOpts opts, String[] args) {
+    super("compaction-coordinator", opts, args);
+    aconf = getConfiguration();
+    schedExecutor = ThreadPools.createGeneralScheduledExecutorService(aconf);
+    compactionFinalizer = createCompactionFinalizer(schedExecutor);
+    tserverSet = createLiveTServerSet();
+    setupSecurity();
+    startGCLogger(schedExecutor);
+    printStartupMsg();
+    startCompactionCleaner(schedExecutor);
+  }
+
+  protected CompactionCoordinator(ServerOpts opts, String[] args, 
AccumuloConfiguration conf) {
+    super("compaction-coordinator", opts, args);
+    aconf = conf;
+    schedExecutor = ThreadPools.createGeneralScheduledExecutorService(aconf);
+    compactionFinalizer = createCompactionFinalizer(schedExecutor);
+    tserverSet = createLiveTServerSet();
+    setupSecurity();
+    startGCLogger(schedExecutor);
+    printStartupMsg();
+    startCompactionCleaner(schedExecutor);
+  }
+
+  protected CompactionFinalizer
+      createCompactionFinalizer(ScheduledThreadPoolExecutor schedExecutor) {
+    return new CompactionFinalizer(getContext(), schedExecutor);
+  }
+
+  protected LiveTServerSet createLiveTServerSet() {
+    return new LiveTServerSet(getContext(), this);
+  }
+
+  protected void setupSecurity() {
+    getContext().setupCrypto();
+    security = AuditedSecurityOperation.getInstance(getContext());
+  }
+
+  protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {
+    schedExecutor.scheduleWithFixedDelay(() -> 
gcLogger.logGCInfo(getConfiguration()), 0,
+        TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
+  }
+
+  private void startCompactionCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {
+    schedExecutor.scheduleWithFixedDelay(() -> cleanUpCompactors(), 0, 5, 
TimeUnit.MINUTES);
+  }
+
+  protected void printStartupMsg() {
+    LOG.info("Version " + Constants.VERSION);
+    LOG.info("Instance " + getContext().getInstanceID());
+  }
+
+  /**
+   * Set up nodes and locks in ZooKeeper for this CompactionCoordinator
+   *
+   * @param clientAddress
+   *          address of this Compactor
+   * @throws KeeperException
+   *           zookeeper error
+   * @throws InterruptedException
+   *           thread interrupted
+   */
+  protected void getCoordinatorLock(HostAndPort clientAddress)
+      throws KeeperException, InterruptedException {
+    LOG.info("trying to get coordinator lock");
+
+    final String coordinatorClientAddress = 
ExternalCompactionUtil.getHostPortString(clientAddress);
+    final String lockPath = getContext().getZooKeeperRoot() + 
Constants.ZCOORDINATOR_LOCK;
+    final UUID zooLockUUID = UUID.randomUUID();
+
+    while (true) {
+
+      CoordinatorLockWatcher coordinatorLockWatcher = new 
CoordinatorLockWatcher();
+      coordinatorLock = new 
ServiceLock(getContext().getZooReaderWriter().getZooKeeper(),
+          ServiceLock.path(lockPath), zooLockUUID);
+      coordinatorLock.lock(coordinatorLockWatcher, 
coordinatorClientAddress.getBytes());
+
+      coordinatorLockWatcher.waitForChange();
+      if (coordinatorLockWatcher.isAcquiredLock()) {
+        break;
+      }
+      if (!coordinatorLockWatcher.isFailedToAcquireLock()) {
+        throw new IllegalStateException("manager lock in unknown state");
+      }
+      coordinatorLock.tryToCancelAsyncLockOrUnlock();
+
+      sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
+    }
+  }
+
+  /**
+   * Start this CompactionCoordinator thrift service to handle incoming client 
requests
+   *
+   * @return address of this CompactionCoordinator client service
+   * @throws UnknownHostException
+   *           host unknown
+   */
+  protected ServerAddress startCoordinatorClientService() throws 
UnknownHostException {
+    Iface rpcProxy = TraceUtil.wrapService(this);
+    if (getContext().getThriftServerType() == ThriftServerType.SASL) {
+      rpcProxy = TCredentialsUpdatingWrapper.service(rpcProxy, 
CompactionCoordinator.class,
+          getConfiguration());
+    }
+    final 
org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Processor<
+        Iface> processor =
+            new 
org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Processor<>(
+                rpcProxy);
+    Property maxMessageSizeProperty =
+        (aconf.get(Property.COORDINATOR_THRIFTCLIENT_MAX_MESSAGE_SIZE) != null
+            ? Property.COORDINATOR_THRIFTCLIENT_MAX_MESSAGE_SIZE
+            : Property.GENERAL_MAX_MESSAGE_SIZE);
+    ServerAddress sp = TServerUtils.startServer(getMetricsSystem(), 
getContext(), getHostname(),
+        Property.COORDINATOR_THRIFTCLIENT_CLIENTPORT, processor, 
this.getClass().getSimpleName(),
+        "Thrift Client Server", Property.COORDINATOR_THRIFTCLIENT_PORTSEARCH,
+        Property.COORDINATOR_THRIFTCLIENT_MINTHREADS,
+        Property.COORDINATOR_THRIFTCLIENT_MINTHREADS_TIMEOUT,
+        Property.COORDINATOR_THRIFTCLIENT_THREADCHECK, maxMessageSizeProperty);
+    LOG.info("address = {}", sp.address);
+    return sp;
+  }
+
+  @Override
+  public void run() {
+
+    ServerAddress coordinatorAddress = null;
+    try {
+      coordinatorAddress = startCoordinatorClientService();
+    } catch (UnknownHostException e1) {
+      throw new RuntimeException("Failed to start the coordinator service", 
e1);
+    }
+    final HostAndPort clientAddress = coordinatorAddress.address;
+
+    try {
+      getCoordinatorLock(clientAddress);
+    } catch (KeeperException | InterruptedException e) {
+      throw new IllegalStateException("Exception getting Coordinator lock", e);
+    }
+
+    // On a re-start of the coordinator it's possible that external 
compactions are in-progress.
+    // Attempt to get the running compactions on the compactors and then 
resolve which tserver
+    // the external compaction came from to re-populate the RUNNING collection.
+    LOG.info("Checking for running external compactions");
+    tserverSet.scanServers();
+    final Set<TServerInstance> tservers = tserverSet.getCurrentServers();
+    if (null != tservers && !tservers.isEmpty()) {
+      // On re-start contact the running Compactors to try and seed the list 
of running compactions
+      Map<HostAndPort,TExternalCompactionJob> running =
+          
ExternalCompactionUtil.getCompactionsRunningOnCompactors(getContext());
+      if (running.isEmpty()) {
+        LOG.info("No compactions running on Compactors.");
+      } else {
+        LOG.info("Found {} running external compactions", running.size());
+        running.forEach((hp, job) -> {
+          // Find the tserver that has this compaction id
+          boolean matchFound = false;
+
+          // Attempt to find the TServer hosting the tablet based on the 
metadata table
+          // TODO use #1974 for more efficient metadata reads
+          KeyExtent extent = KeyExtent.fromThrift(job.getExtent());
+          LOG.debug("Getting tablet metadata for extent: {}", extent);
+          TabletMetadata tabletMetadata = getMetadataEntryForExtent(extent);
+
+          if (tabletMetadata != null && 
tabletMetadata.getExtent().equals(extent)
+              && tabletMetadata.getLocation() != null
+              && tabletMetadata.getLocation().getType() == 
LocationType.CURRENT) {
+
+            TServerInstance tsi = tservers.stream()
+                .filter(
+                    t -> 
t.getHostAndPort().equals(tabletMetadata.getLocation().getHostAndPort()))
+                .findFirst().orElse(null);
+
+            if (null != tsi) {
+              TabletClientService.Client client = null;
+              try {
+                LOG.debug(
+                    "Checking to see if tserver {} is running external 
compaction for extent: {}",
+                    tsi.getHostAndPort(), extent);
+                client = getTabletServerConnection(tsi);
+                boolean tserverMatch = 
client.isRunningExternalCompaction(TraceUtil.traceInfo(),
+                    getContext().rpcCreds(), job.getExternalCompactionId(), 
job.getExtent());
+                if (tserverMatch) {
+                  LOG.debug(
+                      "Tablet server {} is running external compaction for 
extent: {}, adding to running list",
+                      tsi.getHostAndPort(), extent);
+                  
RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()),
+                      new RunningCompaction(job, 
ExternalCompactionUtil.getHostPortString(hp),
+                          tsi));
+                  matchFound = true;
+                } else {
+                  LOG.debug("Tablet server {} is NOT running external 
compaction for extent: {}",
+                      tsi.getHostAndPort(), extent);
+                }
+              } catch (TException e) {
+                LOG.warn("Failed to notify tserver {}",
+                    tabletMetadata.getLocation().getHostAndPort(), e);
+              } finally {
+                ThriftUtil.returnClient(client);
+              }
+            } else {
+              LOG.info("Tablet server {} is not currently in live tserver set",
+                  tabletMetadata.getLocation().getHostAndPort());
+            }
+          } else {
+            LOG.info("No current location for extent: {}", extent);
+          }

Review comment:
       I may be missing something.  I was looking at the code saw a good bit of 
effort went into finding `tsi`, however it seems after the information was 
found that it was never used.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to