This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to 
refs/heads/1451-external-compactions-feature by this push:
     new d27649c  Moved metrics for coordinator to test coordinator class
d27649c is described below

commit d27649cb67f4fcf0c472ad5138ae0ac6ecef9e42
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Tue Apr 27 21:14:09 2021 +0000

    Moved metrics for coordinator to test coordinator class
---
 server/compaction-coordinator/pom.xml              |  17 --
 .../coordinator/CompactionCoordinator.java         |  78 ----------
 .../coordinator/CompactionCoordinatorTest.java     |   6 -
 test/pom.xml                                       |  13 ++
 .../apache/accumulo/test/ExternalCompactionIT.java |   8 +-
 .../accumulo/test/TestCompactionCoordinator.java   | 171 +++++++++++++++++++++
 6 files changed, 188 insertions(+), 105 deletions(-)

diff --git a/server/compaction-coordinator/pom.xml 
b/server/compaction-coordinator/pom.xml
index 0793491..93c12b4 100644
--- a/server/compaction-coordinator/pom.xml
+++ b/server/compaction-coordinator/pom.xml
@@ -36,10 +36,6 @@
       <optional>true</optional>
     </dependency>
     <dependency>
-      <groupId>com.google.code.gson</groupId>
-      <artifactId>gson</artifactId>
-    </dependency>
-    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
@@ -68,19 +64,6 @@
       <artifactId>zookeeper</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.eclipse.jetty</groupId>
-      <artifactId>jetty-server</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.eclipse.jetty</groupId>
-      <artifactId>jetty-util</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.eclipse.jetty.toolchain</groupId>
-      <artifactId>jetty-jakarta-servlet-api</artifactId>
-      <version>5.0.2</version>
-    </dependency>
-    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 1107e4d..b8b1fa3 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -20,7 +20,6 @@ package org.apache.accumulo.coordinator;
 
 import static 
org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
 
-import java.io.IOException;
 import java.net.UnknownHostException;
 import java.time.Duration;
 import java.util.List;
@@ -30,10 +29,6 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
-import jakarta.servlet.ServletException;
-import jakarta.servlet.http.HttpServletRequest;
-import jakarta.servlet.http.HttpServletResponse;
-
 import org.apache.accumulo.coordinator.QueueSummaries.PrioTserver;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -83,18 +78,9 @@ import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.zookeeper.KeeperException;
-import org.eclipse.jetty.server.Request;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.ServerConnector;
-import org.eclipse.jetty.server.handler.AbstractHandler;
-import org.eclipse.jetty.server.handler.ContextHandler;
-import org.eclipse.jetty.server.handler.ContextHandlerCollection;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.gson.Gson;
-
 public class CompactionCoordinator extends AbstractServer
     implements 
org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Iface,
     LiveTServerSet.Listener {
@@ -107,8 +93,6 @@ public class CompactionCoordinator extends AbstractServer
 
   protected static final QueueSummaries QUEUE_SUMMARIES = new QueueSummaries();
 
-  private static final Gson GSON = new Gson();
-
   /* Map of compactionId to RunningCompactions */
   protected static final Map<ExternalCompactionId,RunningCompaction> RUNNING =
       new ConcurrentHashMap<>();
@@ -116,7 +100,6 @@ public class CompactionCoordinator extends AbstractServer
   /* Map of queue name to last time compactor called to get a compaction job */
   private static final Map<String,Long> TIME_COMPACTOR_LAST_CHECKED = new 
ConcurrentHashMap<>();
 
-  private final ExternalCompactionMetrics metrics = new 
ExternalCompactionMetrics();
   private final GarbageCollectionLogger gcLogger = new 
GarbageCollectionLogger();
   protected SecurityOperation security;
   protected final AccumuloConfiguration aconf;
@@ -238,50 +221,6 @@ public class CompactionCoordinator extends AbstractServer
     return sp;
   }
 
-  protected Server startHttpMetricServer() throws Exception {
-    int port = 
getContext().getConfiguration().getPortStream(Property.COORDINATOR_METRICPORT)
-        .iterator().next();
-    String hostname = getHostname();
-    Server metricServer = new Server(new QueuedThreadPool(4, 1));
-    ServerConnector c = new ServerConnector(metricServer);
-    c.setHost(hostname);
-    c.setPort(port);
-    metricServer.addConnector(c);
-    ContextHandlerCollection handlers = new ContextHandlerCollection();
-    metricServer.setHandler(handlers);
-    ContextHandler metricContext = new ContextHandler("/metrics");
-    metricContext.setHandler(new AbstractHandler() {
-      @Override
-      public void handle(String target, Request baseRequest, 
HttpServletRequest request,
-          HttpServletResponse response) throws IOException, ServletException {
-        baseRequest.setHandled(true);
-        response.setStatus(200);
-        response.setContentType("application/json");
-        metrics.setRunning(RUNNING.size());
-        LOG.debug("Returning metrics: {}", metrics);
-        response.getWriter().print(GSON.toJson(metrics));
-      }
-    });
-    handlers.addHandler(metricContext);
-
-    ContextHandler detailsContext = new ContextHandler("/details");
-    detailsContext.setHandler(new AbstractHandler() {
-      @Override
-      public void handle(String target, Request baseRequest, 
HttpServletRequest request,
-          HttpServletResponse response) throws IOException, ServletException {
-        baseRequest.setHandled(true);
-        response.setStatus(200);
-        response.setContentType("application/json");
-        response.getWriter().print(GSON.toJson(RUNNING));
-      }
-    });
-    handlers.addHandler(detailsContext);
-
-    metricServer.start();
-    LOG.info("Metrics HTTP server listening on {}:{}", hostname, port);
-    return metricServer;
-  }
-
   @Override
   public void run() {
 
@@ -293,13 +232,6 @@ public class CompactionCoordinator extends AbstractServer
     }
     final HostAndPort clientAddress = coordinatorAddress.address;
 
-    Server metricServer = null;
-    try {
-      metricServer = startHttpMetricServer();
-    } catch (Exception e1) {
-      throw new RuntimeException("Failed to start metric http server", e1);
-    }
-
     try {
       getCoordinatorLock(clientAddress);
     } catch (KeeperException | InterruptedException e) {
@@ -470,13 +402,6 @@ public class CompactionCoordinator extends AbstractServer
     }
 
     LOG.info("Shutting down");
-    if (null != metricServer) {
-      try {
-        metricServer.stop();
-      } catch (Exception e) {
-        LOG.error("Error stopping metric server", e);
-      }
-    }
   }
 
   protected long getMissingCompactorWarningTime() {
@@ -560,7 +485,6 @@ public class CompactionCoordinator extends AbstractServer
         }
         RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()),
             new RunningCompaction(job, compactorAddress, tserver));
-        metrics.incrementStarted();
         LOG.debug("Returning external job {} to {}", job.externalCompactionId, 
compactorAddress);
         result = job;
         break;
@@ -701,7 +625,6 @@ public class CompactionCoordinator extends AbstractServer
           SecurityErrorCode.PERMISSION_DENIED).asThriftException();
     }
     LOG.info("Compaction completed, id: {}, stats: {}", externalCompactionId, 
stats);
-    metrics.incrementCompleted();
     final var ecid = ExternalCompactionId.of(externalCompactionId);
     final RunningCompaction rc = RUNNING.get(ecid);
     if (null != rc) {
@@ -727,7 +650,6 @@ public class CompactionCoordinator extends AbstractServer
           SecurityErrorCode.PERMISSION_DENIED).asThriftException();
     }
     LOG.info("Compaction failed, id: {}", externalCompactionId);
-    metrics.incrementFailed();
     final var ecid = ExternalCompactionId.of(externalCompactionId);
     final RunningCompaction rc = RUNNING.get(ecid);
     if (null != rc) {
diff --git 
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
 
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
index 8f1bee3..2aa2b4f 100644
--- 
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
+++ 
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
@@ -60,7 +60,6 @@ import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.zookeeper.KeeperException;
 import org.easymock.EasyMock;
-import org.eclipse.jetty.server.Server;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.easymock.PowerMock;
@@ -186,11 +185,6 @@ public class CompactionCoordinatorTest {
       };
     }
 
-    @Override
-    protected Server startHttpMetricServer() throws Exception {
-      return null;
-    }
-
   }
 
   @Test
diff --git a/test/pom.xml b/test/pom.xml
index 921425e..0a1c788 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -187,6 +187,19 @@
       <artifactId>easymock</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-server</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-util</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty.toolchain</groupId>
+      <artifactId>jetty-jakarta-servlet-api</artifactId>
+      <version>5.0.2</version>
+    </dependency>
+    <dependency>
       <groupId>org.jline</groupId>
       <artifactId>jline</artifactId>
     </dependency>
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java 
b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
index efaf042..27898d9 100644
--- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
@@ -188,7 +188,7 @@ public class ExternalCompactionIT extends 
ConfigurableMacBase {
       writeData(client, table1);
 
       cluster.exec(ExternalDoNothingCompactor.class, "-q", "DCQ1");
-      cluster.exec(CompactionCoordinator.class);
+      cluster.exec(TestCompactionCoordinator.class);
       compact(client, table1, 2, "DCQ1", false);
 
       // Wait for the compaction to start by waiting for 1 external compaction 
column
@@ -260,7 +260,7 @@ public class ExternalCompactionIT extends 
ConfigurableMacBase {
       TableId tid = Tables.getTableId(getCluster().getServerContext(), table1);
 
       cluster.exec(ExternalDoNothingCompactor.class, "-q", "DCQ1");
-      cluster.exec(CompactionCoordinator.class);
+      cluster.exec(TestCompactionCoordinator.class);
 
       // Wait for the compaction to start by waiting for 1 external compaction 
column
       List<TabletMetadata> md = new ArrayList<>();
@@ -353,7 +353,7 @@ public class ExternalCompactionIT extends 
ConfigurableMacBase {
       // Wait for the coordinator to insert the running compaction metadata
       // entry into the metadata table, then cancel the compaction
       cluster.exec(ExternalDoNothingCompactor.class, "-q", "DCQ1");
-      cluster.exec(CompactionCoordinator.class);
+      cluster.exec(TestCompactionCoordinator.class);
 
       compact(client, table1, 2, "DCQ1", false);
 
@@ -404,7 +404,7 @@ public class ExternalCompactionIT extends 
ConfigurableMacBase {
       // Wait for the coordinator to insert the running compaction metadata
       // entry into the metadata table, then delete the table.
       cluster.exec(ExternalDoNothingCompactor.class, "-q", "DCQ1");
-      cluster.exec(CompactionCoordinator.class);
+      cluster.exec(TestCompactionCoordinator.class);
 
       List<TabletMetadata> md = new ArrayList<>();
       TabletsMetadata tm = 
getCluster().getServerContext().getAmple().readTablets()
diff --git 
a/test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinator.java 
b/test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinator.java
new file mode 100644
index 0000000..0d3e708
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinator.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+
+import jakarta.servlet.ServletException;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
+
+import org.apache.accumulo.coordinator.CompactionCoordinator;
+import org.apache.accumulo.coordinator.ExternalCompactionMetrics;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
+import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
+import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
+import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
+import org.apache.accumulo.core.trace.thrift.TInfo;
+import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.rpc.ServerAddress;
+import org.apache.thrift.TException;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import org.eclipse.jetty.server.handler.ContextHandlerCollection;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+
+public class TestCompactionCoordinator extends CompactionCoordinator
+    implements 
org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Iface {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestCompactionCoordinator.class);
+  private static final Gson GSON = new Gson();
+
+  private final ExternalCompactionMetrics metrics = new 
ExternalCompactionMetrics();
+  private Server metricServer = null;
+
+  protected TestCompactionCoordinator(ServerOpts opts, String[] args) {
+    super(opts, args);
+  }
+
+  private Server startHttpMetricServer() throws Exception {
+    int port = 
getContext().getConfiguration().getPortStream(Property.COORDINATOR_METRICPORT)
+        .iterator().next();
+    String hostname = getHostname();
+    Server metricServer = new Server(new QueuedThreadPool(4, 1));
+    ServerConnector c = new ServerConnector(metricServer);
+    c.setHost(hostname);
+    c.setPort(port);
+    metricServer.addConnector(c);
+    ContextHandlerCollection handlers = new ContextHandlerCollection();
+    metricServer.setHandler(handlers);
+    ContextHandler metricContext = new ContextHandler("/metrics");
+    metricContext.setHandler(new AbstractHandler() {
+      @Override
+      public void handle(String target, Request baseRequest, 
HttpServletRequest request,
+          HttpServletResponse response) throws IOException, ServletException {
+        baseRequest.setHandled(true);
+        response.setStatus(200);
+        response.setContentType("application/json");
+        metrics.setRunning(RUNNING.size());
+        LOG.debug("Returning metrics: {}", metrics);
+        response.getWriter().print(GSON.toJson(metrics));
+      }
+    });
+    handlers.addHandler(metricContext);
+
+    ContextHandler detailsContext = new ContextHandler("/details");
+    detailsContext.setHandler(new AbstractHandler() {
+      @Override
+      public void handle(String target, Request baseRequest, 
HttpServletRequest request,
+          HttpServletResponse response) throws IOException, ServletException {
+        baseRequest.setHandled(true);
+        response.setStatus(200);
+        response.setContentType("application/json");
+        response.getWriter().print(GSON.toJson(RUNNING));
+      }
+    });
+    handlers.addHandler(detailsContext);
+
+    metricServer.start();
+    LOG.info("Metrics HTTP server listening on {}:{}", hostname, port);
+    return metricServer;
+  }
+
+  @Override
+  protected ServerAddress startCoordinatorClientService() throws 
UnknownHostException {
+    try {
+      return super.startCoordinatorClientService();
+    } finally {
+      try {
+        metricServer = startHttpMetricServer();
+      } catch (Exception e1) {
+        throw new RuntimeException("Failed to start metric http server", e1);
+      }
+    }
+  }
+
+  @Override
+  public TExternalCompactionJob getCompactionJob(TInfo tinfo, TCredentials 
credentials,
+      String queueName, String compactorAddress, String externalCompactionId) 
throws TException {
+    TExternalCompactionJob job = super.getCompactionJob(tinfo, credentials, 
queueName,
+        compactorAddress, externalCompactionId);
+    if (null != job && null != job.getExternalCompactionId()) {
+      metrics.incrementStarted();
+    }
+    return job;
+  }
+
+  @Override
+  public void compactionCompleted(TInfo tinfo, TCredentials credentials,
+      String externalCompactionId, TKeyExtent textent, TCompactionStats stats) 
throws TException {
+    try {
+      super.compactionCompleted(tinfo, credentials, externalCompactionId, 
textent, stats);
+    } finally {
+      metrics.incrementCompleted();
+    }
+  }
+
+  @Override
+  public void compactionFailed(TInfo tinfo, TCredentials credentials, String 
externalCompactionId,
+      TKeyExtent extent) throws TException {
+    try {
+      super.compactionFailed(tinfo, credentials, externalCompactionId, extent);
+    } finally {
+      metrics.incrementFailed();
+    }
+  }
+
+  @Override
+  public void close() {
+    super.close();
+    if (null != metricServer) {
+      try {
+        metricServer.stop();
+      } catch (Exception e) {
+        LOG.error("Error stopping metric server", e);
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    try (TestCompactionCoordinator compactor =
+        new TestCompactionCoordinator(new ServerOpts(), args)) {
+      compactor.runServer();
+    }
+  }
+
+}

Reply via email to