keith-turner commented on code in PR #4715:
URL: https://github.com/apache/accumulo/pull/4715#discussion_r1673275263


##########
server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java:
##########
@@ -429,11 +441,65 @@ public TNextCompactionJob getCompactionJob(TInfo tinfo, 
TCredentials credentials
       LOG.trace("No jobs found for group {}, returning empty job to compactor 
{}", groupName,
           compactorAddress);
       result = new TExternalCompactionJob();
+    } else {
+      LOG.info("Found job {}", result.externalCompactionId);
     }
 
     return new TNextCompactionJob(result, compactorCounts.get(groupName));
   }
 
+  protected CompletableFuture<TNextCompactionJob> 
getAsyncCompactionJob(TCredentials credentials,
+      String groupName, String compactorAddress, String externalCompactionId)
+      throws ThriftSecurityException {
+
+    // do not expect users to call this directly, expect compactors to call 
this method
+    if (!security.canPerformSystemActions(credentials)) {
+      throw new AccumuloSecurityException(credentials.getPrincipal(),
+          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
+    }
+    CompactorGroupId groupId = CompactorGroupId.of(groupName);
+    LOG.trace("getCompactionJob called for group {} by compactor {}", groupId, 
compactorAddress);
+    TIME_COMPACTOR_LAST_CHECKED.put(groupId, System.currentTimeMillis());
+
+    return jobQueues.getAsync(groupId).thenApply(metaJob -> {

Review Comment:
   Will eventually need to process these in another thread pool



##########
pom.xml:
##########
@@ -164,6 +164,13 @@
         <type>pom</type>
         <scope>import</scope>
       </dependency>
+      <dependency>
+        <groupId>io.grpc</groupId>
+        <artifactId>grpc-bom</artifactId>

Review Comment:
   If there are jars needed at runtime for grpc, then the assembly pom may need 
to be modified to include those so that they show up in the tarball Not sure 
what, if anything, needs to be done for this.  Could be a follow on issue.



##########
server/base/src/main/java/org/apache/accumulo/server/grpc/CompactionCoordinatorServiceServer.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.grpc;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import 
org.apache.accumulo.core.compaction.protobuf.CompactionCoordinatorServiceGrpc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.grpc.Grpc;
+import io.grpc.InsecureServerCredentials;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+
+/**
+ * Simple wrapper to start/stop the grpc server
+ */
+public class CompactionCoordinatorServiceServer {
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(CompactionCoordinatorServiceServer.class);
+
+  private final int port;
+  private final Server server;
+
+  public CompactionCoordinatorServiceServer(
+      CompactionCoordinatorServiceGrpc.CompactionCoordinatorServiceImplBase 
service, int port)
+      throws IOException {
+    this(Grpc.newServerBuilderForPort(port, 
InsecureServerCredentials.create()), service, port);

Review Comment:
   Would you happen to know the threading model for processing incoming 
request?   Seems like we could possibly have two thread pools one for 
processing incoming request and another pool for async processing of responses. 
 Not sure though, need to look at the grpc docs.   Currently for thrift we have 
config related to thread pool sizes, wondering if we will need to have config 
for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to