xyuanlu commented on code in PR #2844: URL: https://github.com/apache/helix/pull/2844#discussion_r1688780575
########## helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java: ########## @@ -26,40 +29,85 @@ public class GatewayServiceManager { // TODO: add thread pool for init // single thread tp for update - public enum EventType { - CONNECT, // init connection to gateway service - UPDATE, // update state transition result - DISCONNECT // shutdown connection to gateway service. - } + // Map of cluster to HelixGatewayService - public class GateWayServiceEvent { - // event type - EventType eventType; - // event data - String clusterName; - String participantName; + // a single thread tp for event processing + ExecutorService _participantStateTransitionResultUpdator; - // todo: add more fields - } + // a thread pool for participant connecting + ExecutorService _participantConnectingTp; + + // grpc service + HelixGatewayServiceGrpcService _grpcService; public GatewayServiceManager() { _helixGatewayServiceMap = new ConcurrentHashMap<>(); + _participantStateTransitionResultUpdator = Executors.newSingleThreadExecutor(); + _participantConnectingTp = Executors.newFixedThreadPool(10); // todo: make it configurable + _grpcService = new HelixGatewayServiceGrpcService(this); } public AtomicBoolean sendTransitionRequestToApplicationInstance() { return null; } - public void updateShardState() { - + public void newGatewayServiceEvent(GatewayServiceEvent event) { + if (event.getEventType().equals(GatewayServiceEventType.UPDATE)) { + _participantStateTransitionResultUpdator.submit(new shardStateUpdator(event)); + } else { + _participantConnectingTp.submit(new participantConnectionProcessor(event)); + } } - public void newParticipantConnecting() { - + /** + * Update shard state + */ + class shardStateUpdator implements Runnable { + + GatewayServiceEvent _event; + + public shardStateUpdator(GatewayServiceEvent event) { + _event = event; + } + + @Override + public void run() { + HelixGatewayService helixGatewayService = _helixGatewayServiceMap.get(_event._clusterName); + if (helixGatewayService == null) { + // TODO: return error code and throw exception. + return; + } + helixGatewayService.receiveSTResponse(); + } } - public void participantDisconnected() { - + /** + * Create HelixGatewayService instance and register it to the manager. + * It includes waiting for ZK connection, and also wait for previous Liveinstance to expire. + */ + class participantConnectionProcessor implements Runnable { + GatewayServiceEvent _event; + + public participantConnectionProcessor(GatewayServiceEvent event) { + _event = event; + } + + @Override + public void run() { + HelixGatewayService helixGatewayService; + if (_event.getEventType().equals(GatewayServiceEventType.CONNECT)) { Review Comment: This is only for init connection and disconnect. update if handled in shardStateUpdator -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For additional commands, e-mail: reviews-h...@helix.apache.org