SteveYurongSu commented on code in PR #12138: URL: https://github.com/apache/iotdb/pull/12138#discussion_r1531844113
########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java: ########## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.subscription.receiver; + +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; +import org.apache.iotdb.commons.consensus.ConfigRegionId; +import org.apache.iotdb.commons.exception.SubscriptionException; +import org.apache.iotdb.confignode.rpc.thrift.TDataNodeConfigurationResp; +import org.apache.iotdb.db.protocol.client.ConfigNodeClient; +import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; +import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; +import org.apache.iotdb.db.subscription.agent.SubscriptionAgent; +import org.apache.iotdb.db.subscription.broker.SerializedEnrichedEvent; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.rpc.subscription.payload.config.ConsumerConfig; +import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeCloseReq; +import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeCommitReq; +import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeHandshakeReq; +import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeHeartbeatReq; +import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribePollReq; +import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeRequestType; +import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeRequestVersion; +import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeSubscribeReq; +import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeUnsubscribeReq; +import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeCloseResp; +import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeCommitResp; +import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeHandshakeResp; +import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeHeartbeatResp; +import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribePollResp; +import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeResponseType; +import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeResponseVersion; +import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeSubscribeResp; +import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeUnsubscribeResp; +import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq; +import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp; + +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +public class SubscriptionReceiverV1 implements SubscriptionReceiver { + + private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionReceiverV1.class); + + private final ThreadLocal<ConsumerConfig> consumerConfigThreadLocal = new ThreadLocal<>(); + + private static final IClientManager<ConfigRegionId, ConfigNodeClient> CONFIG_NODE_CLIENT_MANAGER = + ConfigNodeClientManager.getInstance(); + + private static final TPipeSubscribeResp SUBSCRIPTION_MISSING_CUSTOMER_RESP = + new TPipeSubscribeResp( + RpcUtils.getStatus( + TSStatusCode.SUBSCRIPTION_MISSING_CUSTOMER, + "Missing consumer config, please handshake first."), + PipeSubscribeResponseVersion.VERSION_1.getVersion(), + PipeSubscribeResponseType.ACK.getType()); + + @Override + public PipeSubscribeRequestVersion getVersion() { + return PipeSubscribeRequestVersion.VERSION_1; + } + + @Override + public final TPipeSubscribeResp handle(TPipeSubscribeReq req) { + final short reqType = req.getType(); + if (PipeSubscribeRequestType.isValidatedRequestType(reqType)) { + switch (PipeSubscribeRequestType.valueOf(reqType)) { + case HANDSHAKE: + return handlePipeSubscribeHandshake(PipeSubscribeHandshakeReq.fromTPipeSubscribeReq(req)); + case HEARTBEAT: + return handlePipeSubscribeHeartbeat(PipeSubscribeHeartbeatReq.fromTPipeSubscribeReq(req)); + case SUBSCRIBE: + return handlePipeSubscribeSubscribe(PipeSubscribeSubscribeReq.fromTPipeSubscribeReq(req)); + case UNSUBSCRIBE: + return handlePipeSubscribeUnsubscribe( + PipeSubscribeUnsubscribeReq.fromTPipeSubscribeReq(req)); + case POLL: + return handlePipeSubscribePoll(PipeSubscribePollReq.fromTPipeSubscribeReq(req)); + case COMMIT: + return handlePipeSubscribeCommit(PipeSubscribeCommitReq.fromTPipeSubscribeReq(req)); + case CLOSE: + return handlePipeSubscribeClose(PipeSubscribeCloseReq.fromTPipeSubscribeReq(req)); + default: + break; + } + } + + final TSStatus status = + RpcUtils.getStatus( + TSStatusCode.SUBSCRIPTION_TYPE_ERROR, + String.format("Unknown PipeSubscribeRequestType %s.", reqType)); + LOGGER.warn("Subscription: Unknown PipeSubscribeRequestType, response status = {}.", status); + return new TPipeSubscribeResp( + status, + PipeSubscribeResponseVersion.VERSION_1.getVersion(), + PipeSubscribeResponseType.ACK.getType()); + } + + private TPipeSubscribeResp handlePipeSubscribeHandshake(PipeSubscribeHandshakeReq req) { + try { + return handlePipeSubscribeHandshakeInternal(req); + } catch (SubscriptionException e) { + final String exceptionMessage = + String.format( + "Subscription: something unexpected happened when handshaking: %s, req: %s", + e.getMessage(), req); + LOGGER.warn(exceptionMessage); + return PipeSubscribeHandshakeResp.toTPipeSubscribeResp( + RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_HANDSHAKE_ERROR, exceptionMessage)); + } + } + + private TPipeSubscribeResp handlePipeSubscribeHandshakeInternal(PipeSubscribeHandshakeReq req) + throws SubscriptionException { + // set consumer config thread local + ConsumerConfig existedConsumerConfig = consumerConfigThreadLocal.get(); + ConsumerConfig consumerConfig = req.getConsumerConfig(); + + if (Objects.isNull(existedConsumerConfig)) { + consumerConfigThreadLocal.set(consumerConfig); + } else { + if (!existedConsumerConfig.equals(consumerConfig)) { + LOGGER.warn( + "Subscription: Detect stale consumer config when handshaking, stale consumer config {} will be cleared, consumer config will set to the incoming consumer config {}.", + existedConsumerConfig, + consumerConfig); + // drop stale consumer + SubscriptionAgent.consumer().dropConsumer(existedConsumerConfig); + consumerConfigThreadLocal.set(consumerConfig); + } + } + + // create consumer if not existed + if (!SubscriptionAgent.consumer() + .isConsumerExisted(consumerConfig.getConsumerId(), consumerConfig.getConsumerGroupId())) { + SubscriptionAgent.consumer().createConsumer(consumerConfig); + } else { + LOGGER.info( + "Subscription: Detect the same consumer {} when handshaking, skip the creation of consumer.", + consumerConfig); + } + + // fetch DN endPoints by CN + // TODO: cache result and listen changes + try (ConfigNodeClient configNodeClient = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + TDataNodeConfigurationResp resp = configNodeClient.getDataNodeConfiguration(-1); + if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != resp.getStatus().getCode()) { + final String exceptionMessage = + String.format( + "Subscription: Failed to get data node configuration in config node, status is %s.", + resp.getStatus()); + LOGGER.warn(exceptionMessage); + throw new SubscriptionException(exceptionMessage); + } + + Map<Integer, TEndPoint> endPoints = + Objects.isNull(resp.dataNodeConfigurationMap) + ? Collections.emptyMap() + : resp.dataNodeConfigurationMap.entrySet().stream() + .collect( + Collectors.toMap( + Entry::getKey, entry -> entry.getValue().location.clientRpcEndPoint)); + + LOGGER.info( + "Subscription: consumer {} handshake successfully, get DN endPoints: {}", + req.getConsumerConfig(), + endPoints); + return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS, endPoints); + } catch (ClientManagerException | TException e) { + final String exceptionMessage = + String.format( + "Subscription: Failed to get data node configuration in config node, exception is %s.", + e.getMessage()); + LOGGER.warn(exceptionMessage); + throw new SubscriptionException(exceptionMessage); + } + } + + private TPipeSubscribeResp handlePipeSubscribeHeartbeat(PipeSubscribeHeartbeatReq req) { + try { + return handlePipeSubscribeHeartbeatInternal(req); + } catch (SubscriptionException e) { + final String exceptionMessage = + String.format( + "Subscription: something unexpected happened when heartbeat: %s, req: %s", + e.getMessage(), req); + LOGGER.warn(exceptionMessage); + return PipeSubscribeHandshakeResp.toTPipeSubscribeResp( + RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_HEARTBEAT_ERROR, exceptionMessage)); + } + } + + private TPipeSubscribeResp handlePipeSubscribeHeartbeatInternal(PipeSubscribeHeartbeatReq req) { + // check consumer config thread local + ConsumerConfig consumerConfig = consumerConfigThreadLocal.get(); + if (Objects.isNull(consumerConfig)) { + LOGGER.warn( + "Subscription: missing consumer config when handling PipeSubscribeHeartbeatReq: {}", req); + return SUBSCRIPTION_MISSING_CUSTOMER_RESP; + } + + // TODO: do something + + LOGGER.info("Subscription: consumer {} heartbeat successfully", consumerConfig); + return PipeSubscribeHeartbeatResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS); + } + + private TPipeSubscribeResp handlePipeSubscribeSubscribe(PipeSubscribeSubscribeReq req) { + try { + return handlePipeSubscribeSubscribeInternal(req); + } catch (SubscriptionException e) { + final String exceptionMessage = + String.format( + "Subscription: something unexpected happened when subscribing: %s, req: %s", + e.getMessage(), req); + LOGGER.warn(exceptionMessage); + return PipeSubscribeHandshakeResp.toTPipeSubscribeResp( + RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_SUBSCRIBE_ERROR, exceptionMessage)); + } + } + + private TPipeSubscribeResp handlePipeSubscribeSubscribeInternal(PipeSubscribeSubscribeReq req) { + // check consumer config thread local + ConsumerConfig consumerConfig = consumerConfigThreadLocal.get(); + if (Objects.isNull(consumerConfig)) { + LOGGER.warn( + "Subscription: missing consumer config when handling PipeSubscribeSubscribeReq: {}", req); + return SUBSCRIPTION_MISSING_CUSTOMER_RESP; + } + + // subscribe topics + Set<String> topicNames = req.getTopicNames(); + SubscriptionAgent.consumer().subscribe(consumerConfig, topicNames); + + LOGGER.info("Subscription: consumer {} subscribe {} successfully", consumerConfig, topicNames); + return PipeSubscribeSubscribeResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS); + } + + private TPipeSubscribeResp handlePipeSubscribeUnsubscribe(PipeSubscribeUnsubscribeReq req) { + try { + return handlePipeSubscribeUnsubscribeInternal(req); + } catch (SubscriptionException e) { + final String exceptionMessage = + String.format( + "Subscription: something unexpected happened when unsubscribing: %s, req: %s", + e.getMessage(), req); + LOGGER.warn(exceptionMessage); + return PipeSubscribeHandshakeResp.toTPipeSubscribeResp( + RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_UNSUBSCRIBE_ERROR, exceptionMessage)); + } + } + + private TPipeSubscribeResp handlePipeSubscribeUnsubscribeInternal( + PipeSubscribeUnsubscribeReq req) { + // check consumer config thread local + ConsumerConfig consumerConfig = consumerConfigThreadLocal.get(); + if (Objects.isNull(consumerConfig)) { + LOGGER.warn( + "Subscription: missing consumer config when handling PipeSubscribeUnsubscribeReq: {}", + req); + return SUBSCRIPTION_MISSING_CUSTOMER_RESP; + } + + // unsubscribe topics + Set<String> topicNames = req.getTopicNames(); + SubscriptionAgent.consumer().unsubscribe(consumerConfig, topicNames); + + LOGGER.info( Review Comment: Some loggers in this file should be downgraded. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
