keith-turner commented on code in PR #2620: URL: https://github.com/apache/accumulo/pull/2620#discussion_r852474542
########## server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerTypes.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.rpc; + +import org.apache.accumulo.core.clientImpl.thrift.ClientService; +import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService; +import org.apache.accumulo.core.compaction.thrift.CompactorService; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.gc.thrift.GCMonitorService; +import org.apache.accumulo.core.manager.thrift.FateService; +import org.apache.accumulo.core.manager.thrift.ManagerClientService; +import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator; +import org.apache.accumulo.core.replication.thrift.ReplicationServicer; +import org.apache.accumulo.core.rpc.ThriftClientTypes; +import org.apache.accumulo.core.rpc.ThriftClientTypes.ThriftClientType; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.client.ClientServiceHandler; +import org.apache.thrift.TBaseProcessor; +import org.apache.thrift.TMultiplexedProcessor; +import org.apache.thrift.TProcessor; +import org.apache.thrift.TServiceClient; +import org.apache.thrift.TServiceClientFactory; + +public class ThriftServerTypes { Review Comment: Given this class helps create TProcessor's seems like incorporating processor into the name rather than server would be more informative. Some alternatives for the class name : `ThriftProcessorTypes` , `AccumuloTProcessors`, `TProcessorsTypes`. ########## server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java: ########## @@ -582,13 +582,17 @@ private ManagerClientService.Client managerConnection(HostAndPort address) { return null; } // log.info("Listener API to manager has been opened"); - return ThriftUtil.getClient(new ManagerClientService.Client.Factory(), address, getContext()); + return ThriftUtil.getClient(ThriftClientTypes.MANAGER, address, getContext()); } catch (Exception e) { log.warn("Issue with managerConnection (" + address + ") " + e, e); } return null; } + protected ClientServiceHandler getClientHandler() { Review Comment: ```suggestion protected ClientServiceHandler newClientHandler() { ``` ########## server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerTypes.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.rpc; + +import org.apache.accumulo.core.clientImpl.thrift.ClientService; +import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService; +import org.apache.accumulo.core.compaction.thrift.CompactorService; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.gc.thrift.GCMonitorService; +import org.apache.accumulo.core.manager.thrift.FateService; +import org.apache.accumulo.core.manager.thrift.ManagerClientService; +import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator; +import org.apache.accumulo.core.replication.thrift.ReplicationServicer; +import org.apache.accumulo.core.rpc.ThriftClientTypes; +import org.apache.accumulo.core.rpc.ThriftClientTypes.ThriftClientType; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.client.ClientServiceHandler; +import org.apache.thrift.TBaseProcessor; +import org.apache.thrift.TMultiplexedProcessor; +import org.apache.thrift.TProcessor; +import org.apache.thrift.TServiceClient; +import org.apache.thrift.TServiceClientFactory; + +public class ThriftServerTypes { + + private static class ServerType<C extends TServiceClient,F extends TServiceClientFactory<C>> + extends ThriftClientType<C,F> { + + public ServerType(ThriftClientType<C,F> type) { + super(type.getServiceName(), type.isMultiplexed(), type.getClientFactory()); + } + + private <I,H extends I,P extends TBaseProcessor<?>> TProcessor getServer( + Class<P> processorClass, Class<I> interfaceClass, H serviceHandler, ServerContext context, + AccumuloConfiguration conf) throws Exception { + I rpcProxy = TraceUtil.wrapService(serviceHandler); + if (context.getThriftServerType() == ThriftServerType.SASL) { + @SuppressWarnings("unchecked") + Class<H> clazz = (Class<H>) serviceHandler.getClass(); + rpcProxy = TCredentialsUpdatingWrapper.service(rpcProxy, clazz, conf); + } + return processorClass.getConstructor(interfaceClass).newInstance(rpcProxy); + } + } + + private static final ServerType<ClientService.Client,ClientService.Client.Factory> CLIENT = + new ServerType<>(ThriftClientTypes.CLIENT); + + private static final ServerType<CompactorService.Client, + CompactorService.Client.Factory> COMPACTOR = new ServerType<>(ThriftClientTypes.COMPACTOR); + + private static final ServerType<CompactionCoordinatorService.Client, + CompactionCoordinatorService.Client.Factory> COORDINATOR = + new ServerType<>(ThriftClientTypes.COORDINATOR); + + private static final ServerType<FateService.Client,FateService.Client.Factory> FATE = + new ServerType<>(ThriftClientTypes.FATE); + + private static final ServerType<GCMonitorService.Client,GCMonitorService.Client.Factory> GC = + new ServerType<>(ThriftClientTypes.GC); + + private static final ServerType<ManagerClientService.Client, + ManagerClientService.Client.Factory> MANAGER = new ServerType<>(ThriftClientTypes.MANAGER); + + private static final ServerType<ReplicationCoordinator.Client, + ReplicationCoordinator.Client.Factory> REPLICATION_COORDINATOR = + new ServerType<>(ThriftClientTypes.REPLICATION_COORDINATOR); + + private static final ServerType<ReplicationServicer.Client, + ReplicationServicer.Client.Factory> REPLICATION_SERVICER = + new ServerType<>(ThriftClientTypes.REPLICATION_SERVICER); + + private static final ServerType<TabletClientService.Client, + TabletClientService.Client.Factory> TABLET_SERVER = + new ServerType<>(ThriftClientTypes.TABLET_SERVER); + + public static TProcessor getCompactorThriftServer(CompactorService.Iface serviceHandler, + ServerContext context, AccumuloConfiguration conf) throws Exception { + return COMPACTOR.getServer(CompactorService.Processor.class, CompactorService.Iface.class, + serviceHandler, context, conf); + } + + public static TProcessor getCoordinatorThriftServer( Review Comment: These could also substitute server with processor in the name, maybe something like : ```suggestion public static TProcessor getCoordinatorTProcessor( ``` or if we are always using the class name like `AccumuloTProcessors.getCoordinator()`, could drop TProcessor from the method name because its redundant. ```suggestion public static TProcessor getCoordinator( ``` ########## server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java: ########## @@ -162,17 +164,22 @@ import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; -public class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface { +public class ThriftClientHandler implements TabletClientService.Iface { Review Comment: The existing name for this class was not good, `TabletClientHandler` would be a better name. ########## server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java: ########## @@ -600,30 +604,32 @@ private void returnManagerConnection(ManagerClientService.Client client) { private HostAndPort startTabletClientService() throws UnknownHostException { // start listening for client connection last - clientHandler = getThriftClientHandler(); - Iface rpcProxy = TraceUtil.wrapService(clientHandler); - final Processor<Iface> processor; - if (getContext().getThriftServerType() == ThriftServerType.SASL) { - Iface tcredProxy = TCredentialsUpdatingWrapper.service(rpcProxy, ThriftClientHandler.class, - getConfiguration()); - processor = new Processor<>(tcredProxy); - } else { Review Comment: Did this SASL processing move elsewhere? ########## server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java: ########## @@ -162,17 +164,22 @@ import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; -public class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface { +public class ThriftClientHandler implements TabletClientService.Iface { private static final Logger log = LoggerFactory.getLogger(ThriftClientHandler.class); private final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS; private static final long RECENTLY_SPLIT_MILLIES = MINUTES.toMillis(1); private final TabletServer server; + protected final TransactionWatcher transactionWatcher; + protected final ServerContext context; + protected final SecurityOperation security; private final WriteTracker writeTracker = new WriteTracker(); private final RowLocks rowLocks = new RowLocks(); public ThriftClientHandler(TabletServer server) { - super(server.getContext(), new TransactionWatcher(server.getContext())); + this.context = server.getContext(); + this.transactionWatcher = new TransactionWatcher(server.getContext()); Review Comment: Should not create a new TransactionWatcher here. The ClientHandler has an `isActive()` method that consults the transaction watcher, so both handlers needs the same TransactionWatcher object for bulk import correctness. In another comment in TabletServer code I suggested a change to create the TransactionWatcher there and pass it to both handlers. ########## server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java: ########## @@ -600,30 +604,32 @@ private void returnManagerConnection(ManagerClientService.Client client) { private HostAndPort startTabletClientService() throws UnknownHostException { // start listening for client connection last - clientHandler = getThriftClientHandler(); - Iface rpcProxy = TraceUtil.wrapService(clientHandler); - final Processor<Iface> processor; - if (getContext().getThriftServerType() == ThriftServerType.SASL) { - Iface tcredProxy = TCredentialsUpdatingWrapper.service(rpcProxy, ThriftClientHandler.class, - getConfiguration()); - processor = new Processor<>(tcredProxy); - } else { - processor = new Processor<>(rpcProxy); + clientHandler = getClientHandler(); + thriftClientHandler = getThriftClientHandler(); Review Comment: The handlers need the same transaction watcher instance ```suggestion var transactionWatcher = new TransactionWatcher(context); clientHandler = getClientHandler(transactionWatcher ); thriftClientHandler = getThriftClientHandler(transactionWatcher ); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
