junkaixue commented on code in PR #2844: URL: https://github.com/apache/helix/pull/2844#discussion_r1688396795
########## helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java: ########## @@ -11,7 +15,24 @@ public static TransitionMessage translateSTMsgToProto() { return null; } - public static GatewayServiceManager.GateWayServiceEvent translateProtoToSTMsg(ShardStateMessage message) { - return null; + public static GatewayServiceEvent translateProtoToEvent(ShardStateMessage request) { + String instanceName = request.getInstanceName(); + String clusterName = request.getClusterName(); + + Map<String, String> shardStateMap = new HashMap<>(); + for (HelixGatewayServiceOuterClass.SingleShardState shardTransitionStatus : request.getShardStateList()) { + shardStateMap.put(shardTransitionStatus.getShardName(), shardTransitionStatus.getCurrentState()); + } + GatewayServiceEvent.GateWayServiceEventBuilder builder = + new GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.CONNECT).setClusterName(clusterName); Review Comment: Why this is connect not update? ########## helix-gateway/src/main/java/org/apache/helix/gateway/constant/ShardState.java: ########## @@ -0,0 +1,5 @@ +package org.apache.helix.gateway.constant; + +public enum ShardState { Review Comment: Why not use state model one? How about error? ########## helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java: ########## @@ -11,7 +15,24 @@ public static TransitionMessage translateSTMsgToProto() { return null; } - public static GatewayServiceManager.GateWayServiceEvent translateProtoToSTMsg(ShardStateMessage message) { - return null; + public static GatewayServiceEvent translateProtoToEvent(ShardStateMessage request) { Review Comment: I don't get this util at all... If you would like to differentiate the event type, you can just make a full util and selectively pass in null value for the arguments are not necessary. ########## helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java: ########## @@ -0,0 +1,95 @@ +package org.apache.helix.gateway.service; + +import java.util.Map; +import org.apache.helix.gateway.constant.GatewayServiceEventType; + + +public class GatewayServiceEvent { + // event type + GatewayServiceEventType _eventType; + // event data + String _clusterName; + String _instanceName; + Map<String, Map<String, String>> _shardStateMap; + + StateTransitionResult _StateTransitionResult; + + public class StateTransitionResult { + String stateTransitionId; + String stateTransitionStatus; + String shardState; + + public StateTransitionResult(String stateTransitionId, String stateTransitionStatus, String shardState) { + this.stateTransitionId = stateTransitionId; + this.stateTransitionStatus = stateTransitionStatus; + this.shardState = shardState; + } + } + + private GatewayServiceEvent(GatewayServiceEventType eventType, String clusterName, String instanceName) { + this._eventType = eventType; + this._clusterName = clusterName; + this._instanceName = instanceName; + } + + private GatewayServiceEvent(GatewayServiceEventType eventType, String clusterName, String instanceName, Map<String, Map<String, String>> shardStateMap) { + this._eventType = eventType; + this._clusterName = clusterName; + this._instanceName = instanceName; + this._shardStateMap = shardStateMap; + } + + private GatewayServiceEvent(GatewayServiceEventType eventType, String clusterName, String instanceName, String stateTransitionId, + String stateTransitionStatus, String shardState) { + this._eventType = eventType; + this._clusterName = clusterName; + this._instanceName = instanceName; + this._StateTransitionResult = new StateTransitionResult(stateTransitionId, stateTransitionStatus, shardState); + } + Review Comment: If we have a builder, no need to have so many constructor. Also even as public constructor, we should call "this(xxxxxxx)" to reduce redundant code. ########## helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java: ########## @@ -26,40 +29,85 @@ public class GatewayServiceManager { // TODO: add thread pool for init // single thread tp for update - public enum EventType { - CONNECT, // init connection to gateway service - UPDATE, // update state transition result - DISCONNECT // shutdown connection to gateway service. - } + // Map of cluster to HelixGatewayService - public class GateWayServiceEvent { - // event type - EventType eventType; - // event data - String clusterName; - String participantName; + // a single thread tp for event processing + ExecutorService _participantStateTransitionResultUpdator; - // todo: add more fields - } + // a thread pool for participant connecting + ExecutorService _participantConnectingTp; + + // grpc service + HelixGatewayServiceGrpcService _grpcService; public GatewayServiceManager() { _helixGatewayServiceMap = new ConcurrentHashMap<>(); + _participantStateTransitionResultUpdator = Executors.newSingleThreadExecutor(); + _participantConnectingTp = Executors.newFixedThreadPool(10); // todo: make it configurable + _grpcService = new HelixGatewayServiceGrpcService(this); } public AtomicBoolean sendTransitionRequestToApplicationInstance() { return null; } - public void updateShardState() { - + public void newGatewayServiceEvent(GatewayServiceEvent event) { + if (event.getEventType().equals(GatewayServiceEventType.UPDATE)) { + _participantStateTransitionResultUpdator.submit(new shardStateUpdator(event)); + } else { + _participantConnectingTp.submit(new participantConnectionProcessor(event)); + } } - public void newParticipantConnecting() { - + /** + * Update shard state + */ + class shardStateUpdator implements Runnable { + + GatewayServiceEvent _event; + + public shardStateUpdator(GatewayServiceEvent event) { + _event = event; + } + + @Override + public void run() { + HelixGatewayService helixGatewayService = _helixGatewayServiceMap.get(_event._clusterName); + if (helixGatewayService == null) { + // TODO: return error code and throw exception. + return; + } + helixGatewayService.receiveSTResponse(); + } } - public void participantDisconnected() { - + /** + * Create HelixGatewayService instance and register it to the manager. + * It includes waiting for ZK connection, and also wait for previous Liveinstance to expire. + */ + class participantConnectionProcessor implements Runnable { + GatewayServiceEvent _event; + + public participantConnectionProcessor(GatewayServiceEvent event) { + _event = event; + } + + @Override + public void run() { + HelixGatewayService helixGatewayService; + if (_event.getEventType().equals(GatewayServiceEventType.CONNECT)) { Review Comment: Where is update??? ########## helix-gateway/src/main/java/org/apache/helix/gateway/constant/GatewayServiceEventType.java: ########## @@ -0,0 +1,7 @@ +package org.apache.helix.gateway.constant; + +public enum GatewayServiceEventType { Review Comment: Why we need this new type? I remember we have one in your last pr. ########## helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java: ########## @@ -11,7 +15,24 @@ public static TransitionMessage translateSTMsgToProto() { return null; } - public static GatewayServiceManager.GateWayServiceEvent translateProtoToSTMsg(ShardStateMessage message) { - return null; + public static GatewayServiceEvent translateProtoToEvent(ShardStateMessage request) { Review Comment: Why it is called photo to event? I don't get it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org For additional commands, e-mail: reviews-h...@helix.apache.org