xyuanlu commented on code in PR #2844:
URL: https://github.com/apache/helix/pull/2844#discussion_r1688449541
##########
helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java:
##########
@@ -26,40 +29,85 @@ public class GatewayServiceManager {
// TODO: add thread pool for init
// single thread tp for update
- public enum EventType {
- CONNECT, // init connection to gateway service
- UPDATE, // update state transition result
- DISCONNECT // shutdown connection to gateway service.
- }
+ // Map of cluster to HelixGatewayService
- public class GateWayServiceEvent {
- // event type
- EventType eventType;
- // event data
- String clusterName;
- String participantName;
+ // a single thread tp for event processing
+ ExecutorService _participantStateTransitionResultUpdator;
- // todo: add more fields
- }
+ // a thread pool for participant connecting
+ ExecutorService _participantConnectingTp;
+
+ // grpc service
+ HelixGatewayServiceGrpcService _grpcService;
public GatewayServiceManager() {
_helixGatewayServiceMap = new ConcurrentHashMap<>();
+ _participantStateTransitionResultUpdator =
Executors.newSingleThreadExecutor();
+ _participantConnectingTp = Executors.newFixedThreadPool(10); // todo: make
it configurable
+ _grpcService = new HelixGatewayServiceGrpcService(this);
}
public AtomicBoolean sendTransitionRequestToApplicationInstance() {
return null;
}
- public void updateShardState() {
-
+ public void newGatewayServiceEvent(GatewayServiceEvent event) {
+ if (event.getEventType().equals(GatewayServiceEventType.UPDATE)) {
+ _participantStateTransitionResultUpdator.submit(new
shardStateUpdator(event));
+ } else {
+ _participantConnectingTp.submit(new
participantConnectionProcessor(event));
+ }
}
- public void newParticipantConnecting() {
-
+ /**
+ * Update shard state
+ */
+ class shardStateUpdator implements Runnable {
+
+ GatewayServiceEvent _event;
+
+ public shardStateUpdator(GatewayServiceEvent event) {
+ _event = event;
+ }
+
+ @Override
+ public void run() {
+ HelixGatewayService helixGatewayService =
_helixGatewayServiceMap.get(_event._clusterName);
+ if (helixGatewayService == null) {
+ // TODO: return error code and throw exception.
+ return;
+ }
+ helixGatewayService.receiveSTResponse();
+ }
}
- public void participantDisconnected() {
-
+ /**
+ * Create HelixGatewayService instance and register it to the manager.
+ * It includes waiting for ZK connection, and also wait for previous
Liveinstance to expire.
+ */
+ class participantConnectionProcessor implements Runnable {
+ GatewayServiceEvent _event;
+
+ public participantConnectionProcessor(GatewayServiceEvent event) {
+ _event = event;
+ }
+
+ @Override
+ public void run() {
+ HelixGatewayService helixGatewayService;
+ if (_event.getEventType().equals(GatewayServiceEventType.CONNECT)) {
Review Comment:
Updated. Commits are messed up when I split pr.....
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]