junkaixue commented on code in PR #2844: URL: https://github.com/apache/helix/pull/2844#discussion_r1688972784
########## helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java: ########## @@ -0,0 +1,122 @@ +package org.apache.helix.gateway.grpcservice; + +import io.grpc.stub.StreamObserver; +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; + +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.helix.gateway.service.GatewayServiceEvent; +import org.apache.helix.gateway.service.GatewayServiceManager; +import org.apache.helix.gateway.service.HelixGatewayServiceProcessor; +import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil; +import proto.org.apache.helix.gateway.*; +import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.*; + +import java.util.Map; + + +/** + * Helix Gateway Service GRPC UI implementation. + */ +public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.HelixGatewayServiceImplBase + implements HelixGatewayServiceProcessor { + + // Map to store the observer for each instance + Map<String, StreamObserver<TransitionMessage>> _observerMap = new HashMap<>(); + // A reverse map to store the instance name for each observer. It is used to find the instance when connection is closed. + Map<StreamObserver<TransitionMessage>, Pair<String, String>> _reversedObserverMap = new HashMap<>(); + + ReadWriteLock _observerMapLock; + GatewayServiceManager _manager; + + public HelixGatewayServiceGrpcService(GatewayServiceManager manager) { + // register manager + _manager = manager; + _observerMapLock = new ReentrantReadWriteLock(); + } + + @Override + public StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage> report( + StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage> responseObserver) { + + return new StreamObserver<ShardStateMessage>() { + + @Override + public void onNext(ShardStateMessage request) { + if (request.hasShardState()) { + ShardState shardState = request.getShardState(); + // Process shardState + updateObserver(shardState.getInstanceName(), shardState.getClusterName(), responseObserver); + } + _manager.newGatewayServiceEvent(StateTransitionMessageTranslateUtil.translateShardStateMessageToEvent(request)); + } + + @Override + public void onError(Throwable t) { + onClientClose(responseObserver); + } + + @Override + public void onCompleted() { + // called when the client completes + onClientClose(responseObserver); + } + }; + } + + @Override + public boolean sendStateTransitionMessage(String instanceName) { + StreamObserver<TransitionMessage> observer; + observer = _observerMap.get(instanceName); + _observerMapLock.readLock().lock(); Review Comment: use fine-grained lock for each entry. ########## helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java: ########## @@ -0,0 +1,77 @@ +package org.apache.helix.gateway.service; + +import java.util.List; +import java.util.Map; +import org.apache.helix.gateway.constant.GatewayServiceEventType; +import org.apache.helix.gateway.constant.MessageStatus; + + +public class GatewayServiceEvent { Review Comment: white space. ########## helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java: ########## @@ -16,50 +19,85 @@ */ public class GatewayServiceManager { - - HelixGatewayServiceService _helixGatewayServiceService; - - HelixGatewayServiceProcessor _helixGatewayServiceProcessor; - Map<String, HelixGatewayService> _helixGatewayServiceMap; - // TODO: add thread pool for init - // single thread tp for update + // a single thread tp for event processing + ExecutorService _participantStateTransitionResultUpdator; - public enum EventType { - CONNECT, // init connection to gateway service - UPDATE, // update state transition result - DISCONNECT // shutdown connection to gateway service. - } + // a thread pool for participant connecting + ExecutorService _participantConnectingTp; - public class GateWayServiceEvent { - // event type - EventType eventType; - // event data - String clusterName; - String participantName; - - // todo: add more fields - } + // grpc service + HelixGatewayServiceGrpcService _grpcService; public GatewayServiceManager() { _helixGatewayServiceMap = new ConcurrentHashMap<>(); + _participantStateTransitionResultUpdator = Executors.newSingleThreadExecutor(); + _participantConnectingTp = Executors.newFixedThreadPool(10); // todo: make it configurable Review Comment: Define variable. use DEFAULT one now. ########## helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java: ########## @@ -0,0 +1,122 @@ +package org.apache.helix.gateway.grpcservice; + +import io.grpc.stub.StreamObserver; +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; + +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.helix.gateway.service.GatewayServiceEvent; +import org.apache.helix.gateway.service.GatewayServiceManager; +import org.apache.helix.gateway.service.HelixGatewayServiceProcessor; +import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil; +import proto.org.apache.helix.gateway.*; +import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.*; + +import java.util.Map; + + +/** + * Helix Gateway Service GRPC UI implementation. + */ +public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.HelixGatewayServiceImplBase + implements HelixGatewayServiceProcessor { + + // Map to store the observer for each instance + Map<String, StreamObserver<TransitionMessage>> _observerMap = new HashMap<>(); + // A reverse map to store the instance name for each observer. It is used to find the instance when connection is closed. + Map<StreamObserver<TransitionMessage>, Pair<String, String>> _reversedObserverMap = new HashMap<>(); + + ReadWriteLock _observerMapLock; + GatewayServiceManager _manager; + + public HelixGatewayServiceGrpcService(GatewayServiceManager manager) { + // register manager + _manager = manager; + _observerMapLock = new ReentrantReadWriteLock(); + } + + @Override + public StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage> report( + StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage> responseObserver) { + + return new StreamObserver<ShardStateMessage>() { + + @Override + public void onNext(ShardStateMessage request) { + if (request.hasShardState()) { + ShardState shardState = request.getShardState(); + // Process shardState + updateObserver(shardState.getInstanceName(), shardState.getClusterName(), responseObserver); + } + _manager.newGatewayServiceEvent(StateTransitionMessageTranslateUtil.translateShardStateMessageToEvent(request)); + } + + @Override + public void onError(Throwable t) { + onClientClose(responseObserver); + } + + @Override + public void onCompleted() { + // called when the client completes + onClientClose(responseObserver); + } + }; + } + + @Override + public boolean sendStateTransitionMessage(String instanceName) { + StreamObserver<TransitionMessage> observer; + observer = _observerMap.get(instanceName); + _observerMapLock.readLock().lock(); Review Comment: also no need for read-write lock. read-write lock to guarantee data read after write, not apply for this case. ########## helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java: ########## @@ -0,0 +1,77 @@ +package org.apache.helix.gateway.service; + +import java.util.List; +import java.util.Map; +import org.apache.helix.gateway.constant.GatewayServiceEventType; +import org.apache.helix.gateway.constant.MessageStatus; + + +public class GatewayServiceEvent { + // event type + GatewayServiceEventType _eventType; + // event data + String _clusterName; + String _instanceName; + Map<String, Map<String, String>> _shardStateMap; Review Comment: Make it private. ########## helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java: ########## @@ -16,50 +19,85 @@ */ public class GatewayServiceManager { - - HelixGatewayServiceService _helixGatewayServiceService; - - HelixGatewayServiceProcessor _helixGatewayServiceProcessor; - Map<String, HelixGatewayService> _helixGatewayServiceMap; - // TODO: add thread pool for init - // single thread tp for update + // a single thread tp for event processing + ExecutorService _participantStateTransitionResultUpdator; - public enum EventType { - CONNECT, // init connection to gateway service - UPDATE, // update state transition result - DISCONNECT // shutdown connection to gateway service. - } + // a thread pool for participant connecting + ExecutorService _participantConnectingTp; - public class GateWayServiceEvent { - // event type - EventType eventType; - // event data - String clusterName; - String participantName; - - // todo: add more fields - } + // grpc service + HelixGatewayServiceGrpcService _grpcService; public GatewayServiceManager() { _helixGatewayServiceMap = new ConcurrentHashMap<>(); + _participantStateTransitionResultUpdator = Executors.newSingleThreadExecutor(); + _participantConnectingTp = Executors.newFixedThreadPool(10); // todo: make it configurable + _grpcService = new HelixGatewayServiceGrpcService(this); } public AtomicBoolean sendTransitionRequestToApplicationInstance() { return null; } - public void updateShardState() { - + public void newGatewayServiceEvent(GatewayServiceEvent event) { + if (event.getEventType().equals(GatewayServiceEventType.UPDATE)) { + _participantStateTransitionResultUpdator.submit(new shardStateUpdator(event)); + } else { + _participantConnectingTp.submit(new participantConnectionProcessor(event)); + } } - public void newParticipantConnecting() { - + /** + * Update shard state + */ + class shardStateUpdator implements Runnable { + + GatewayServiceEvent _event; + + public shardStateUpdator(GatewayServiceEvent event) { + _event = event; + } + + @Override + public void run() { + HelixGatewayService helixGatewayService = _helixGatewayServiceMap.get(_event._clusterName); + if (helixGatewayService == null) { + // TODO: return error code and throw exception. + return; + } + helixGatewayService.receiveSTResponse(); + } } - public void participantDisconnected() { - + /** + * Create HelixGatewayService instance and register it to the manager. + * It includes waiting for ZK connection, and also wait for previous Liveinstance to expire. + */ + class participantConnectionProcessor implements Runnable { + GatewayServiceEvent _event; + + public participantConnectionProcessor(GatewayServiceEvent event) { + _event = event; + } + + @Override + public void run() { + HelixGatewayService helixGatewayService; + if (_event.getEventType().equals(GatewayServiceEventType.CONNECT)) { + if (_helixGatewayServiceMap.containsKey(_event._clusterName)) { + helixGatewayService = _helixGatewayServiceMap.get(_event._clusterName); + } else { + helixGatewayService = new HelixGatewayService(GatewayServiceManager.this, _event._clusterName); + _helixGatewayServiceMap.put(_event._clusterName, helixGatewayService); + } + helixGatewayService.registerParticipant(); + } else { // disconnected + helixGatewayService = _helixGatewayServiceMap.get(_event._clusterName); Review Comment: NPE? ########## helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java: ########## @@ -16,50 +19,85 @@ */ public class GatewayServiceManager { - - HelixGatewayServiceService _helixGatewayServiceService; - - HelixGatewayServiceProcessor _helixGatewayServiceProcessor; - Map<String, HelixGatewayService> _helixGatewayServiceMap; - // TODO: add thread pool for init - // single thread tp for update + // a single thread tp for event processing + ExecutorService _participantStateTransitionResultUpdator; - public enum EventType { - CONNECT, // init connection to gateway service - UPDATE, // update state transition result - DISCONNECT // shutdown connection to gateway service. - } + // a thread pool for participant connecting + ExecutorService _participantConnectingTp; - public class GateWayServiceEvent { - // event type - EventType eventType; - // event data - String clusterName; - String participantName; - - // todo: add more fields - } + // grpc service + HelixGatewayServiceGrpcService _grpcService; public GatewayServiceManager() { _helixGatewayServiceMap = new ConcurrentHashMap<>(); + _participantStateTransitionResultUpdator = Executors.newSingleThreadExecutor(); + _participantConnectingTp = Executors.newFixedThreadPool(10); // todo: make it configurable + _grpcService = new HelixGatewayServiceGrpcService(this); } public AtomicBoolean sendTransitionRequestToApplicationInstance() { return null; } - public void updateShardState() { - + public void newGatewayServiceEvent(GatewayServiceEvent event) { + if (event.getEventType().equals(GatewayServiceEventType.UPDATE)) { + _participantStateTransitionResultUpdator.submit(new shardStateUpdator(event)); + } else { + _participantConnectingTp.submit(new participantConnectionProcessor(event)); + } } - public void newParticipantConnecting() { - + /** + * Update shard state + */ + class shardStateUpdator implements Runnable { + + GatewayServiceEvent _event; + + public shardStateUpdator(GatewayServiceEvent event) { + _event = event; + } + + @Override + public void run() { + HelixGatewayService helixGatewayService = _helixGatewayServiceMap.get(_event._clusterName); + if (helixGatewayService == null) { + // TODO: return error code and throw exception. + return; + } + helixGatewayService.receiveSTResponse(); + } } - public void participantDisconnected() { - + /** + * Create HelixGatewayService instance and register it to the manager. + * It includes waiting for ZK connection, and also wait for previous Liveinstance to expire. + */ + class participantConnectionProcessor implements Runnable { + GatewayServiceEvent _event; + + public participantConnectionProcessor(GatewayServiceEvent event) { + _event = event; + } + + @Override + public void run() { + HelixGatewayService helixGatewayService; + if (_event.getEventType().equals(GatewayServiceEventType.CONNECT)) { + if (_helixGatewayServiceMap.containsKey(_event._clusterName)) { + helixGatewayService = _helixGatewayServiceMap.get(_event._clusterName); + } else { + helixGatewayService = new HelixGatewayService(GatewayServiceManager.this, _event._clusterName); + _helixGatewayServiceMap.put(_event._clusterName, helixGatewayService); Review Comment: if (! contains) { new object } do xxxxxx -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For additional commands, e-mail: reviews-h...@helix.apache.org