ctubbsii commented on code in PR #2620:
URL: https://github.com/apache/accumulo/pull/2620#discussion_r853263712
##########
server/manager/src/main/java/org/apache/accumulo/manager/Manager.java:
##########
@@ -1014,30 +1014,32 @@ public void run() {
// ACCUMULO-4424 Put up the Thrift servers before getting the lock as a
sign of process health
// when a hot-standby
//
+ // Start the Manager's Fate Service
+ fateServiceHandler = new FateServiceHandler(this);
+ managerClientHandler = new ManagerClientServiceHandler(this);
// Start the Manager's Client service
- clientHandler = new ManagerClientServiceHandler(this);
// Ensure that calls before the manager gets the lock fail
- Iface haProxy = HighlyAvailableServiceWrapper.service(clientHandler, this);
- Iface rpcProxy = TraceUtil.wrapService(haProxy);
- final Processor<Iface> processor;
- if (context.getThriftServerType() == ThriftServerType.SASL) {
- Iface tcredsProxy = TCredentialsUpdatingWrapper.service(rpcProxy,
clientHandler.getClass(),
- getConfiguration());
- processor = new Processor<>(tcredsProxy);
- } else {
- processor = new Processor<>(rpcProxy);
- }
+ ManagerClientService.Iface haProxy =
+ HighlyAvailableServiceWrapper.service(managerClientHandler, this);
+
ServerAddress sa;
try {
- sa = TServerUtils.startServer(context, getHostname(),
Property.MANAGER_CLIENTPORT, processor,
- "Manager", "Manager Client Service Handler", null,
Property.MANAGER_MINTHREADS,
- Property.MANAGER_MINTHREADS_TIMEOUT, Property.MANAGER_THREADCHECK,
- Property.GENERAL_MAX_MESSAGE_SIZE);
- } catch (UnknownHostException e) {
- throw new IllegalStateException("Unable to start server on host " +
getHostname(), e);
+ TProcessor processor =
ThriftProcessorTypes.getManagerTProcessor(fateServiceHandler, haProxy,
+ getContext(), getConfiguration());
+
+ try {
+ sa = TServerUtils.startServer(context, getHostname(),
Property.MANAGER_CLIENTPORT,
+ processor, "Manager", "Manager Client Service Handler", null,
+ Property.MANAGER_MINTHREADS, Property.MANAGER_MINTHREADS_TIMEOUT,
+ Property.MANAGER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
+ } catch (UnknownHostException e) {
+ throw new IllegalStateException("Unable to start server on host " +
getHostname(), e);
+ }
+ clientService = sa.server;
+ log.info("Started Manager client service at {}", sa.address);
+ } catch (Exception e2) {
+ throw new RuntimeException("Error creating thrift server processor", e2);
Review Comment:
Can this catch something more specific?
##########
core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java:
##########
@@ -91,63 +92,67 @@ public static TTransportFactory transportFactory() {
/**
* Create a Thrift client using the given factory and transport
*/
- public static <T extends TServiceClient> T
createClient(TServiceClientFactory<T> factory,
+ public static <T extends TServiceClient> T
createClient(ThriftClientType<T,?> type,
TTransport transport) {
- return factory.getClient(protocolFactory.getProtocol(transport),
- protocolFactory.getProtocol(transport));
+
+ TProtocol protocol = protocolFactory.getProtocol(transport);
+ if (type.isMultiplexed()) {
Review Comment:
I think this can be simplified slightly if we just make everything
multiplexed (since it's basically no cost, and can help make rpc extensible in
future).
##########
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java:
##########
@@ -600,30 +604,33 @@ private void
returnManagerConnection(ManagerClientService.Client client) {
private HostAndPort startTabletClientService() throws UnknownHostException {
// start listening for client connection last
- clientHandler = getThriftClientHandler();
- Iface rpcProxy = TraceUtil.wrapService(clientHandler);
- final Processor<Iface> processor;
- if (getContext().getThriftServerType() == ThriftServerType.SASL) {
- Iface tcredProxy = TCredentialsUpdatingWrapper.service(rpcProxy,
ThriftClientHandler.class,
- getConfiguration());
- processor = new Processor<>(tcredProxy);
- } else {
- processor = new Processor<>(rpcProxy);
+ TransactionWatcher watcher = new TransactionWatcher(context);
+ clientHandler = newClientHandler(watcher);
+ thriftClientHandler = newTabletClientHandler(watcher);
+
+ try {
+ TProcessor processor =
ThriftProcessorTypes.getTabletServerTProcessor(clientHandler,
+ thriftClientHandler, getContext(), getConfiguration());
+ HostAndPort address = startServer(getConfiguration(),
clientAddress.getHost(), processor);
+ log.info("address = {}", address);
+ return address;
+ } catch (Exception e) {
+ throw new RuntimeException("Error creating thrift server processor", e);
Review Comment:
Can this be a more specific exception?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]