Pengzna commented on code in PR #12355:
URL: https://github.com/apache/iotdb/pull/12355#discussion_r1618144182


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java:
##########
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import 
org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.response.PipeConsensusTransferFilePieceResp;
+import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
+import org.apache.iotdb.consensus.pipe.PipeConsensus;
+import org.apache.iotdb.consensus.pipe.client.SyncPipeConsensusServiceClient;
+import org.apache.iotdb.consensus.pipe.thrift.TCommitId;
+import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusBatchTransferResp;
+import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.builder.PipeConsensusSyncBatchReqBuilder;
+import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletBinaryReq;
+import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletInsertNodeReq;
+import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletRawReq;
+import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFilePieceReq;
+import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFilePieceWithModReq;
+import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealReq;
+import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealWithModReq;
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** This connector is used for PipeConsensus to transfer queued event. */
+public class PipeConsensusSyncConnector extends IoTDBConnector {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConsensusSyncConnector.class);
+
+  private static final String PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT =
+      "PipeConsensus: syncClient connection to %s:%s failed when %s, because: 
%s";
+
+  private static final String TABLET_INSERTION_NODE_SCENARIO = "transfer 
insertionNode tablet";
+
+  private static final String TABLET_RAW_SCENARIO = "transfer raw tablet";
+
+  private static final String TSFILE_SCENARIO = "transfer tsfile";
+
+  private static final String TABLET_BATCH_SCENARIO = "transfer tablet batch";
+
+  private final IClientManager<TEndPoint, SyncPipeConsensusServiceClient>
+      syncRetryAndHandshakeClientManager;
+
+  private final List<TEndPoint> peers;
+
+  private final int thisDataNodeId;
+
+  private String consensusGroupId;
+
+  private PipeConsensusSyncBatchReqBuilder tabletBatchBuilder;
+
+  public PipeConsensusSyncConnector(
+      List<TEndPoint> peers, String consensusGroupId, int thisDataNodeId) {
+    // In PipeConsensus, one pipeConsensusTask corresponds to a 
pipeConsensusConnector. Thus,
+    // `peers` here actually is a singletonList that contains one peer's 
TEndPoint. But here we
+    // retain the implementation of list to cope with possible future expansion
+    this.peers = peers;
+    this.consensusGroupId = consensusGroupId;
+    this.thisDataNodeId = thisDataNodeId;
+    this.syncRetryAndHandshakeClientManager =
+        ((PipeConsensus) 
DataRegionConsensusImpl.getInstance()).getSyncClientManager();
+  }
+
+  @Override
+  public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfiguration configuration)
+      throws Exception {
+    super.customize(parameters, configuration);
+    if (isTabletBatchModeEnabled) {
+      tabletBatchBuilder =
+          new PipeConsensusSyncBatchReqBuilder(
+              parameters,
+              new TConsensusGroupId(
+                  TConsensusGroupType.DataRegion, 
Integer.parseInt(consensusGroupId)),
+              thisDataNodeId);
+    }
+    // currently, tablet batch is false by default in PipeConsensus;
+    isTabletBatchModeEnabled = false;
+  }
+
+  @Override
+  public void handshake() throws Exception {
+    // do nothing
+    // PipeConsensus doesn't need to do handshake, since nodes in same 
consensusGroup/cluster
+    // usually have same configuration.
+  }
+
+  @Override
+  public void heartbeat() throws Exception {
+    // do nothing
+  }
+
+  @Override
+  public void transfer(TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
+    // Note: here we don't need to do type judgment here, because 
PipeConsensus uses DO_NOTHING
+    // processor and will not change the event type like
+    // 
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector
+    try {
+      if (isTabletBatchModeEnabled) {
+        if (tabletBatchBuilder.onEvent(tabletInsertionEvent)) {
+          doTransfer();
+        }
+      } else {
+        if (tabletInsertionEvent instanceof 
PipeInsertNodeTabletInsertionEvent) {
+          doTransferWrapper((PipeInsertNodeTabletInsertionEvent) 
tabletInsertionEvent);
+        } else {
+          doTransferWrapper((PipeRawTabletInsertionEvent) 
tabletInsertionEvent);
+        }
+      }
+    } catch (Exception e) {
+      throw new PipeConnectionException(
+          String.format(
+              "Failed to transfer tablet insertion event %s, because %s.",
+              tabletInsertionEvent, e.getMessage()),
+          e);
+    }
+  }
+
+  @Override
+  public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
+    // Note: here we don't need to do type judgment here, because 
PipeConsensus uses DO_NOTHING
+    // processor and will not change the event type like
+    // 
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector
+    try {
+      // In order to commit in order
+      if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
+        doTransfer();
+      }
+
+      doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent);
+    } catch (Exception e) {
+      throw new PipeConnectionException(
+          String.format(
+              "Failed to transfer tsfile insertion event %s, because %s.",
+              tsFileInsertionEvent, e.getMessage()),
+          e);
+    }
+  }
+
+  @Override
+  public void transfer(Event event) throws Exception {
+    // in order to commit in order
+    if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
+      doTransfer();
+    }
+
+    if (!(event instanceof PipeHeartbeatEvent)) {
+      LOGGER.warn(
+          "PipeConsensusSyncConnector does not support transferring generic 
event: {}.", event);
+    }
+  }
+
+  private void doTransfer() {
+    try (final SyncPipeConsensusServiceClient syncPipeConsensusServiceClient =
+        syncRetryAndHandshakeClientManager.borrowClient(getFollowerUrl())) {
+      final TPipeConsensusBatchTransferResp resp;
+      resp =
+          syncPipeConsensusServiceClient.pipeConsensusBatchTransfer(
+              tabletBatchBuilder.toTPipeConsensusBatchTransferReq());
+
+      final List<TSStatus> statusList =
+          resp.getBatchResps().stream()
+              .map(TPipeConsensusTransferResp::getStatus)
+              .collect(Collectors.toList());
+
+      // TODO(support batch): handle retry logic
+      // Only handle the failed statuses to avoid string format performance 
overhead
+      //      if (status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()
+      //          && status.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+      //        receiverStatusHandler.handle(
+      //            resp.getStatus(),
+      //            String.format(
+      //                "Transfer PipeConsensusTransferTabletBatchReq error, 
result status %s",
+      //                resp.status),
+      //            tabletBatchBuilder.deepCopyEvents().toString());
+      //      }
+
+      tabletBatchBuilder.onSuccess();
+    } catch (Exception e) {
+      throw new PipeConnectionException(
+          String.format(
+              PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
+              getFollowerUrl().getIp(),
+              getFollowerUrl().getPort(),
+              e.getMessage(),
+              TABLET_BATCH_SCENARIO),

Review Comment:
   good catch, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to