This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to 
refs/heads/1451-external-compactions-feature by this push:
     new b21af9f  ref #2026 ref #1992 ref #1993 exposed metrics and details via 
http to support verification in ITs
b21af9f is described below

commit b21af9fea21bff53f3779c057ddc5bc16c730ac9
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Mon Apr 26 18:44:21 2021 +0000

    ref #2026 ref #1992 ref #1993 exposed metrics and details via http to 
support verification in ITs
---
 .../org/apache/accumulo/core/conf/Property.java    |  4 +-
 server/compaction-coordinator/pom.xml              |  4 +
 .../coordinator/CompactionCoordinator.java         | 78 ++++++++++++++++++
 .../coordinator/ExternalCompactionMetrics.java     | 92 ++++++++++++++++++++++
 .../coordinator/CompactionCoordinatorTest.java     |  6 ++
 .../org/apache/accumulo/compactor/Compactor.java   |  8 +-
 .../apache/accumulo/test/ExternalCompactionIT.java | 78 +++++++++++++++++-
 .../accumulo/test/ExternalDoNothingCompactor.java  |  8 ++
 test/src/main/resources/log4j2-test.properties     |  3 +
 9 files changed, 276 insertions(+), 5 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index f0713c2..aff4398 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -1043,7 +1043,9 @@ public enum Property {
   COORDINATOR_PORTSEARCH("coordinator.port.search", "false", 
PropertyType.BOOLEAN,
       "if the ports above are in use, search higher ports until one is 
available"),
   COORDINATOR_CLIENTPORT("coordinator.port.client", "9100", PropertyType.PORT,
-      "The port used for handling client connections on the compactor 
servers"),
+      "The port used for handling Thrift client connections on the compaction 
coordinator server"),
+  COORDINATOR_METRICPORT("coordinator.port.metrics", "9099", PropertyType.PORT,
+      "The port used for the metric http server on the compaction coordinator 
server"),
   COORDINATOR_MINTHREADS("coordinator.server.threads.minimum", "1", 
PropertyType.COUNT,
       "The minimum number of threads to use to handle incoming requests."),
   COORDINATOR_MINTHREADS_TIMEOUT("coordinator.server.threads.timeout", "0s",
diff --git a/server/compaction-coordinator/pom.xml 
b/server/compaction-coordinator/pom.xml
index 66f98b6..4ed49e3 100644
--- a/server/compaction-coordinator/pom.xml
+++ b/server/compaction-coordinator/pom.xml
@@ -60,6 +60,10 @@
       <artifactId>zookeeper</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-server</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index e6e7549..bbf23a2 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.coordinator;
 
 import static 
org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
 
+import java.io.IOException;
 import java.net.UnknownHostException;
 import java.time.Duration;
 import java.util.List;
@@ -29,6 +30,10 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
+import jakarta.servlet.ServletException;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
+
 import org.apache.accumulo.coordinator.QueueSummaries.PrioTserver;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -74,13 +79,23 @@ import org.apache.accumulo.server.rpc.TServerUtils;
 import org.apache.accumulo.server.rpc.ThriftServerType;
 import org.apache.accumulo.server.security.AuditedSecurityOperation;
 import org.apache.accumulo.server.security.SecurityOperation;
+import org.apache.http.entity.ContentType;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.zookeeper.KeeperException;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import org.eclipse.jetty.server.handler.ContextHandlerCollection;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.gson.Gson;
+
 public class CompactionCoordinator extends AbstractServer
     implements 
org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Iface,
     LiveTServerSet.Listener {
@@ -93,6 +108,8 @@ public class CompactionCoordinator extends AbstractServer
 
   protected static final QueueSummaries QUEUE_SUMMARIES = new QueueSummaries();
 
+  private static final Gson GSON = new Gson();
+
   /* Map of compactionId to RunningCompactions */
   protected static final Map<ExternalCompactionId,RunningCompaction> RUNNING =
       new ConcurrentHashMap<>();
@@ -100,6 +117,7 @@ public class CompactionCoordinator extends AbstractServer
   /* Map of queue name to last time compactor called to get a compaction job */
   private static final Map<String,Long> TIME_COMPACTOR_LAST_CHECKED = new 
ConcurrentHashMap<>();
 
+  private final ExternalCompactionMetrics metrics = new 
ExternalCompactionMetrics();
   private final GarbageCollectionLogger gcLogger = new 
GarbageCollectionLogger();
   protected SecurityOperation security;
   protected final AccumuloConfiguration aconf;
@@ -221,6 +239,48 @@ public class CompactionCoordinator extends AbstractServer
     return sp;
   }
 
+  protected Server startHttpMetricServer() throws Exception {
+    int port = 
getContext().getConfiguration().getPortStream(Property.COORDINATOR_METRICPORT)
+        .iterator().next();
+    String hostname = getHostname();
+    Server metricServer = new Server(new QueuedThreadPool(4, 1));
+    ServerConnector c = new ServerConnector(metricServer);
+    c.setHost(hostname);
+    c.setPort(port);
+    metricServer.addConnector(c);
+    ContextHandlerCollection handlers = new ContextHandlerCollection();
+    metricServer.setHandler(handlers);
+    ContextHandler metricContext = new ContextHandler("/metrics");
+    metricContext.setHandler(new AbstractHandler() {
+      @Override
+      public void handle(String target, Request baseRequest, 
HttpServletRequest request,
+          HttpServletResponse response) throws IOException, ServletException {
+        baseRequest.setHandled(true);
+        response.setStatus(200);
+        response.setContentType(ContentType.APPLICATION_JSON.toString());
+        response.getWriter().print(metrics.toJson(GSON));
+      }
+    });
+    handlers.addHandler(metricContext);
+
+    ContextHandler detailsContext = new ContextHandler("/details");
+    detailsContext.setHandler(new AbstractHandler() {
+      @Override
+      public void handle(String target, Request baseRequest, 
HttpServletRequest request,
+          HttpServletResponse response) throws IOException, ServletException {
+        baseRequest.setHandled(true);
+        response.setStatus(200);
+        response.setContentType(ContentType.APPLICATION_JSON.toString());
+        response.getWriter().print(GSON.toJson(RUNNING));
+      }
+    });
+    handlers.addHandler(detailsContext);
+
+    metricServer.start();
+    LOG.info("Metrics HTTP server listening on {}:{}", hostname, port);
+    return metricServer;
+  }
+
   @Override
   public void run() {
 
@@ -232,6 +292,13 @@ public class CompactionCoordinator extends AbstractServer
     }
     final HostAndPort clientAddress = coordinatorAddress.address;
 
+    Server metricServer = null;
+    try {
+      metricServer = startHttpMetricServer();
+    } catch (Exception e1) {
+      throw new RuntimeException("Failed to start metric http server", e1);
+    }
+
     try {
       getCoordinatorLock(clientAddress);
     } catch (KeeperException | InterruptedException e) {
@@ -400,7 +467,15 @@ public class CompactionCoordinator extends AbstractServer
         UtilWaitThread.sleep(checkInterval - duration);
       }
     }
+
     LOG.info("Shutting down");
+    if (null != metricServer) {
+      try {
+        metricServer.stop();
+      } catch (Exception e) {
+        LOG.error("Error stopping metric server", e);
+      }
+    }
   }
 
   protected long getMissingCompactorWarningTime() {
@@ -484,6 +559,7 @@ public class CompactionCoordinator extends AbstractServer
         }
         RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()),
             new RunningCompaction(job, compactorAddress, tserver));
+        metrics.incrementStarted();
         LOG.debug("Returning external job {} to {}", job.externalCompactionId, 
compactorAddress);
         result = job;
         break;
@@ -632,6 +708,7 @@ public class CompactionCoordinator extends AbstractServer
       rc.setCompleted();
       compactionFinalizer.commitCompaction(ecid, 
KeyExtent.fromThrift(textent), stats.fileSize,
           stats.entriesWritten);
+      metrics.incrementCompleted();
     } else {
       LOG.error(
           "Compaction completed called by Compactor for {}, but no running 
compaction for that id.",
@@ -655,6 +732,7 @@ public class CompactionCoordinator extends AbstractServer
       // CBUG: Should we remove rc from RUNNING here and remove the 
isCompactionCompleted method?
       rc.setCompleted();
       compactionFinalizer.failCompactions(Map.of(ecid, 
KeyExtent.fromThrift(extent)));
+      metrics.incrementFailed();
     } else {
       LOG.error(
           "Compaction failed called by Compactor for {}, but no running 
compaction for that id.",
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/ExternalCompactionMetrics.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/ExternalCompactionMetrics.java
new file mode 100644
index 0000000..2343583
--- /dev/null
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/ExternalCompactionMetrics.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.coordinator;
+
+import com.google.gson.Gson;
+
+public class ExternalCompactionMetrics {
+
+  private long started = 0;
+  private long running = 0;
+  private long completed = 0;
+  private long failed = 0;
+
+  public long getStarted() {
+    return started;
+  }
+
+  public void setStarted(long started) {
+    this.started = started;
+  }
+
+  public void incrementStarted() {
+    this.started++;
+  }
+
+  public long getRunning() {
+    return running;
+  }
+
+  public void setRunning(long running) {
+    this.running = running;
+  }
+
+  public void incrementRunning() {
+    this.running++;
+  }
+
+  public long getCompleted() {
+    return completed;
+  }
+
+  public void setCompleted(long completed) {
+    this.completed = completed;
+  }
+
+  public void incrementCompleted() {
+    this.completed++;
+  }
+
+  public long getFailed() {
+    return failed;
+  }
+
+  public void setFailed(long failed) {
+    this.failed = failed;
+  }
+
+  public void incrementFailed() {
+    this.failed++;
+  }
+
+  public String toJson(Gson gson) {
+    return gson.toJson(this);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    buf.append("started: ").append(started);
+    buf.append("running: ").append(running);
+    buf.append("completed: ").append(completed);
+    buf.append("failed: ").append(failed);
+    return buf.toString();
+  }
+
+}
diff --git 
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
 
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
index e1ec07d..7e1d27c 100644
--- 
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
+++ 
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
@@ -61,6 +61,7 @@ import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.zookeeper.KeeperException;
 import org.easymock.EasyMock;
+import org.eclipse.jetty.server.Server;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.easymock.PowerMock;
@@ -184,6 +185,11 @@ public class CompactionCoordinatorTest {
       };
     }
 
+    @Override
+    protected Server startHttpMetricServer() throws Exception {
+      return null;
+    }
+
   }
 
   @Test
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 6f98923..74344b7 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -68,6 +68,7 @@ import org.apache.accumulo.core.util.ServerServices;
 import org.apache.accumulo.core.util.ServerServices.Service;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.core.util.threads.Threads;
+import org.apache.accumulo.fate.util.UtilWaitThread;
 import org.apache.accumulo.fate.zookeeper.ServiceLock;
 import org.apache.accumulo.fate.zookeeper.ServiceLock.LockLossReason;
 import org.apache.accumulo.fate.zookeeper.ServiceLock.LockWatcher;
@@ -174,7 +175,7 @@ public class Compactor extends AbstractServer
         TimeUnit.MILLISECONDS);
   }
 
-  private void checkIfCanceled() {
+  protected void checkIfCanceled() {
     TExternalCompactionJob job = JOB_HOLDER.getJob();
     if (job != null) {
       try {
@@ -600,6 +601,10 @@ public class Compactor extends AbstractServer
     return supplier;
   }
 
+  protected long getWaitTimeBetweenCompactionChecks() {
+    return 3000;
+  }
+
   @Override
   public void run() {
 
@@ -631,6 +636,7 @@ public class Compactor extends AbstractServer
           job = getNextJob(getNextId());
           if (!job.isSetExternalCompactionId()) {
             LOG.info("No external compactions in queue {}", this.queueName);
+            UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks());
             continue;
           }
           if 
(!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) {
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java 
b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
index 32566b1..a2b7a3c 100644
--- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
@@ -21,6 +21,11 @@ package org.apache.accumulo.test;
 import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.net.http.HttpResponse.BodyHandlers;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -33,6 +38,7 @@ import java.util.stream.Stream;
 import org.apache.accumulo.compactor.CompactionEnvironment.CompactorIterEnv;
 import org.apache.accumulo.compactor.Compactor;
 import org.apache.accumulo.coordinator.CompactionCoordinator;
+import org.apache.accumulo.coordinator.ExternalCompactionMetrics;
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.AccumuloException;
@@ -75,6 +81,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
 
 public class ExternalCompactionIT extends ConfigurableMacBase {
 
@@ -194,6 +201,56 @@ public class ExternalCompactionIT extends 
ConfigurableMacBase {
   }
 
   @Test
+  public void testUserCompactionCancellation() throws Exception {
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
+
+      String table1 = "ectt6";
+      createTable(client, table1, "cs1");
+      writeData(client, table1);
+
+      // The ExternalDoNothingCompactor creates a compaction thread that
+      // sleeps for 5 minutes.
+      // Wait for the coordinator to insert the running compaction metadata
+      // entry into the metadata table, then cancel the compaction
+      cluster.exec(ExternalDoNothingCompactor.class, "-q", "DCQ1");
+      cluster.exec(CompactionCoordinator.class);
+
+      compact(client, table1, 2, "DCQ1", false);
+
+      List<TabletMetadata> md = new ArrayList<>();
+      TabletsMetadata tm = 
getCluster().getServerContext().getAmple().readTablets()
+          .forLevel(DataLevel.USER).fetch(ColumnType.ECOMP).build();
+      tm.forEach(t -> md.add(t));
+
+      while (md.size() == 0) {
+        tm.close();
+        tm = 
getCluster().getServerContext().getAmple().readTablets().forLevel(DataLevel.USER)
+            .fetch(ColumnType.ECOMP).build();
+        tm.forEach(t -> md.add(t));
+      }
+      client.tableOperations().cancelCompaction(table1);
+
+      // ExternalDoNothingCompactor runs the cancel checker every 5s and the 
compaction thread
+      // sleeps for 1s between checks to see if it's canceled or not.
+      UtilWaitThread.sleep(8000);
+
+      // The metadata tablets will be deleted from the metadata table because 
we have deleted the
+      // table
+      // Verify that the compaction failed by looking at the metrics in the 
Coordinator.
+      HttpRequest req =
+          HttpRequest.newBuilder().GET().uri(new 
URI("http://localhost:9099/metrics";)).build();
+      HttpClient hc = HttpClient.newHttpClient();
+      HttpResponse<String> res = hc.send(req, BodyHandlers.ofString());
+      ExternalCompactionMetrics metrics =
+          new Gson().fromJson(res.body(), ExternalCompactionMetrics.class);
+      Assert.assertEquals(1, metrics.getStarted());
+      Assert.assertEquals(0, metrics.getRunning());
+      Assert.assertEquals(0, metrics.getCompleted());
+      Assert.assertEquals(1, metrics.getFailed());
+    }
+  }
+
+  @Test
   public void testDeleteTableDuringExternalCompaction() throws Exception {
     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
 
@@ -221,14 +278,29 @@ public class ExternalCompactionIT extends 
ConfigurableMacBase {
 
       while (md.size() == 0) {
         tm.close();
-        md.clear();
         tm = 
getCluster().getServerContext().getAmple().readTablets().forLevel(DataLevel.USER)
             .fetch(ColumnType.ECOMP).build();
         tm.forEach(t -> md.add(t));
       }
       client.tableOperations().delete(table1);
-      // CBUG: How to verify? Metadata tablets are gone...
-      UtilWaitThread.sleep(1000); // to see the logs
+
+      // ExternalDoNothingCompactor runs the cancel checker every 5s and the 
compaction thread
+      // sleeps for 1s between checks to see if it's canceled or not.
+      UtilWaitThread.sleep(8000);
+
+      // The metadata tablets will be deleted from the metadata table because 
we have deleted the
+      // table
+      // Verify that the compaction failed by looking at the metrics in the 
Coordinator.
+      HttpRequest req =
+          HttpRequest.newBuilder().GET().uri(new 
URI("http://localhost:9099/metrics";)).build();
+      HttpClient hc = HttpClient.newHttpClient();
+      HttpResponse<String> res = hc.send(req, BodyHandlers.ofString());
+      ExternalCompactionMetrics metrics =
+          new Gson().fromJson(res.body(), ExternalCompactionMetrics.class);
+      Assert.assertEquals(1, metrics.getStarted());
+      Assert.assertEquals(0, metrics.getRunning());
+      Assert.assertEquals(0, metrics.getCompleted());
+      Assert.assertEquals(1, metrics.getFailed());
     }
   }
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ExternalDoNothingCompactor.java 
b/test/src/main/java/org/apache/accumulo/test/ExternalDoNothingCompactor.java
index b1441ae..9199c44 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/ExternalDoNothingCompactor.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/ExternalDoNothingCompactor.java
@@ -19,6 +19,8 @@
 package org.apache.accumulo.test;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.LongAdder;
 
@@ -40,6 +42,12 @@ public class ExternalDoNothingCompactor extends Compactor 
implements Iface {
   }
 
   @Override
+  protected void startCancelChecker(ScheduledThreadPoolExecutor schedExecutor,
+      long timeBetweenChecks) {
+    schedExecutor.scheduleWithFixedDelay(() -> checkIfCanceled(), 0, 5000, 
TimeUnit.MILLISECONDS);
+  }
+
+  @Override
   protected Runnable createCompactionJob(TExternalCompactionJob job, LongAdder 
totalInputEntries,
       LongAdder totalInputBytes, CountDownLatch started, CountDownLatch 
stopped,
       AtomicReference<Throwable> err) {
diff --git a/test/src/main/resources/log4j2-test.properties 
b/test/src/main/resources/log4j2-test.properties
index 35891b8..b61cdc1 100644
--- a/test/src/main/resources/log4j2-test.properties
+++ b/test/src/main/resources/log4j2-test.properties
@@ -143,6 +143,9 @@ logger.35.level = info
 logger.36.name = org.apache.thrift.transport.TIOStreamTransport
 logger.36.level = error
 
+logger.37.name = org.eclipse.jetty
+logger.37.level = warn
+
 rootLogger.level = debug
 rootLogger.appenderRef.console.ref = STDOUT
 

Reply via email to