keith-turner commented on code in PR #2620:
URL: https://github.com/apache/accumulo/pull/2620#discussion_r852474542


##########
server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerTypes.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import org.apache.accumulo.core.clientImpl.thrift.ClientService;
+import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
+import org.apache.accumulo.core.compaction.thrift.CompactorService;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.gc.thrift.GCMonitorService;
+import org.apache.accumulo.core.manager.thrift.FateService;
+import org.apache.accumulo.core.manager.thrift.ManagerClientService;
+import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
+import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
+import org.apache.accumulo.core.rpc.ThriftClientTypes;
+import org.apache.accumulo.core.rpc.ThriftClientTypes.ThriftClientType;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.client.ClientServiceHandler;
+import org.apache.thrift.TBaseProcessor;
+import org.apache.thrift.TMultiplexedProcessor;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TServiceClient;
+import org.apache.thrift.TServiceClientFactory;
+
+public class ThriftServerTypes {

Review Comment:
   Given this class helps create TProcessor's seems like incorporating 
processor into the name rather than server would be more informative.  Some 
alternatives for the class name : `ThriftProcessorTypes` , 
`AccumuloTProcessors`, `TProcessorsTypes`.
   



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java:
##########
@@ -582,13 +582,17 @@ private ManagerClientService.Client 
managerConnection(HostAndPort address) {
         return null;
       }
       // log.info("Listener API to manager has been opened");
-      return ThriftUtil.getClient(new ManagerClientService.Client.Factory(), 
address, getContext());
+      return ThriftUtil.getClient(ThriftClientTypes.MANAGER, address, 
getContext());
     } catch (Exception e) {
       log.warn("Issue with managerConnection (" + address + ") " + e, e);
     }
     return null;
   }
 
+  protected ClientServiceHandler getClientHandler() {

Review Comment:
   ```suggestion
     protected ClientServiceHandler newClientHandler() {
   ```



##########
server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerTypes.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import org.apache.accumulo.core.clientImpl.thrift.ClientService;
+import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
+import org.apache.accumulo.core.compaction.thrift.CompactorService;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.gc.thrift.GCMonitorService;
+import org.apache.accumulo.core.manager.thrift.FateService;
+import org.apache.accumulo.core.manager.thrift.ManagerClientService;
+import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
+import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
+import org.apache.accumulo.core.rpc.ThriftClientTypes;
+import org.apache.accumulo.core.rpc.ThriftClientTypes.ThriftClientType;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.client.ClientServiceHandler;
+import org.apache.thrift.TBaseProcessor;
+import org.apache.thrift.TMultiplexedProcessor;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TServiceClient;
+import org.apache.thrift.TServiceClientFactory;
+
+public class ThriftServerTypes {
+
+  private static class ServerType<C extends TServiceClient,F extends 
TServiceClientFactory<C>>
+      extends ThriftClientType<C,F> {
+
+    public ServerType(ThriftClientType<C,F> type) {
+      super(type.getServiceName(), type.isMultiplexed(), 
type.getClientFactory());
+    }
+
+    private <I,H extends I,P extends TBaseProcessor<?>> TProcessor getServer(
+        Class<P> processorClass, Class<I> interfaceClass, H serviceHandler, 
ServerContext context,
+        AccumuloConfiguration conf) throws Exception {
+      I rpcProxy = TraceUtil.wrapService(serviceHandler);
+      if (context.getThriftServerType() == ThriftServerType.SASL) {
+        @SuppressWarnings("unchecked")
+        Class<H> clazz = (Class<H>) serviceHandler.getClass();
+        rpcProxy = TCredentialsUpdatingWrapper.service(rpcProxy, clazz, conf);
+      }
+      return 
processorClass.getConstructor(interfaceClass).newInstance(rpcProxy);
+    }
+  }
+
+  private static final 
ServerType<ClientService.Client,ClientService.Client.Factory> CLIENT =
+      new ServerType<>(ThriftClientTypes.CLIENT);
+
+  private static final ServerType<CompactorService.Client,
+      CompactorService.Client.Factory> COMPACTOR = new 
ServerType<>(ThriftClientTypes.COMPACTOR);
+
+  private static final ServerType<CompactionCoordinatorService.Client,
+      CompactionCoordinatorService.Client.Factory> COORDINATOR =
+          new ServerType<>(ThriftClientTypes.COORDINATOR);
+
+  private static final 
ServerType<FateService.Client,FateService.Client.Factory> FATE =
+      new ServerType<>(ThriftClientTypes.FATE);
+
+  private static final 
ServerType<GCMonitorService.Client,GCMonitorService.Client.Factory> GC =
+      new ServerType<>(ThriftClientTypes.GC);
+
+  private static final ServerType<ManagerClientService.Client,
+      ManagerClientService.Client.Factory> MANAGER = new 
ServerType<>(ThriftClientTypes.MANAGER);
+
+  private static final ServerType<ReplicationCoordinator.Client,
+      ReplicationCoordinator.Client.Factory> REPLICATION_COORDINATOR =
+          new ServerType<>(ThriftClientTypes.REPLICATION_COORDINATOR);
+
+  private static final ServerType<ReplicationServicer.Client,
+      ReplicationServicer.Client.Factory> REPLICATION_SERVICER =
+          new ServerType<>(ThriftClientTypes.REPLICATION_SERVICER);
+
+  private static final ServerType<TabletClientService.Client,
+      TabletClientService.Client.Factory> TABLET_SERVER =
+          new ServerType<>(ThriftClientTypes.TABLET_SERVER);
+
+  public static TProcessor getCompactorThriftServer(CompactorService.Iface 
serviceHandler,
+      ServerContext context, AccumuloConfiguration conf) throws Exception {
+    return COMPACTOR.getServer(CompactorService.Processor.class, 
CompactorService.Iface.class,
+        serviceHandler, context, conf);
+  }
+
+  public static TProcessor getCoordinatorThriftServer(

Review Comment:
   These could also substitute server with processor in the name, maybe 
something like :
   
   ```suggestion
     public static TProcessor getCoordinatorTProcessor(
   ```
   
   or if we are always using the class name like 
`AccumuloTProcessors.getCoordinator()`, could drop TProcessor from the method 
name because its redundant.
   
   ```suggestion
     public static TProcessor getCoordinator(
   ```



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java:
##########
@@ -162,17 +164,22 @@
 import io.opentelemetry.api.trace.Span;
 import io.opentelemetry.context.Scope;
 
-public class ThriftClientHandler extends ClientServiceHandler implements 
TabletClientService.Iface {
+public class ThriftClientHandler implements TabletClientService.Iface {

Review Comment:
   The existing name for this class was not good, `TabletClientHandler` would 
be a better name.



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java:
##########
@@ -600,30 +604,32 @@ private void 
returnManagerConnection(ManagerClientService.Client client) {
 
   private HostAndPort startTabletClientService() throws UnknownHostException {
     // start listening for client connection last
-    clientHandler = getThriftClientHandler();
-    Iface rpcProxy = TraceUtil.wrapService(clientHandler);
-    final Processor<Iface> processor;
-    if (getContext().getThriftServerType() == ThriftServerType.SASL) {
-      Iface tcredProxy = TCredentialsUpdatingWrapper.service(rpcProxy, 
ThriftClientHandler.class,
-          getConfiguration());
-      processor = new Processor<>(tcredProxy);
-    } else {

Review Comment:
   Did this SASL processing move elsewhere?



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java:
##########
@@ -162,17 +164,22 @@
 import io.opentelemetry.api.trace.Span;
 import io.opentelemetry.context.Scope;
 
-public class ThriftClientHandler extends ClientServiceHandler implements 
TabletClientService.Iface {
+public class ThriftClientHandler implements TabletClientService.Iface {
 
   private static final Logger log = 
LoggerFactory.getLogger(ThriftClientHandler.class);
   private final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS;
   private static final long RECENTLY_SPLIT_MILLIES = MINUTES.toMillis(1);
   private final TabletServer server;
+  protected final TransactionWatcher transactionWatcher;
+  protected final ServerContext context;
+  protected final SecurityOperation security;
   private final WriteTracker writeTracker = new WriteTracker();
   private final RowLocks rowLocks = new RowLocks();
 
   public ThriftClientHandler(TabletServer server) {
-    super(server.getContext(), new TransactionWatcher(server.getContext()));
+    this.context = server.getContext();
+    this.transactionWatcher = new TransactionWatcher(server.getContext());

Review Comment:
   Should not create a new TransactionWatcher here.  The ClientHandler has an 
`isActive()` method that consults the transaction watcher, so both handlers 
needs the same TransactionWatcher object for bulk import correctness.  In 
another comment in TabletServer code I suggested a change to create the 
TransactionWatcher there and pass it to both handlers.



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java:
##########
@@ -600,30 +604,32 @@ private void 
returnManagerConnection(ManagerClientService.Client client) {
 
   private HostAndPort startTabletClientService() throws UnknownHostException {
     // start listening for client connection last
-    clientHandler = getThriftClientHandler();
-    Iface rpcProxy = TraceUtil.wrapService(clientHandler);
-    final Processor<Iface> processor;
-    if (getContext().getThriftServerType() == ThriftServerType.SASL) {
-      Iface tcredProxy = TCredentialsUpdatingWrapper.service(rpcProxy, 
ThriftClientHandler.class,
-          getConfiguration());
-      processor = new Processor<>(tcredProxy);
-    } else {
-      processor = new Processor<>(rpcProxy);
+    clientHandler = getClientHandler();
+    thriftClientHandler = getThriftClientHandler();

Review Comment:
   The handlers need the same transaction watcher instance
   
   ```suggestion
       var transactionWatcher = new TransactionWatcher(context);
       clientHandler = getClientHandler(transactionWatcher );
       thriftClientHandler = getThriftClientHandler(transactionWatcher );
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to