Pengzna commented on code in PR #12355:
URL: https://github.com/apache/iotdb/pull/12355#discussion_r1617099794


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java:
##########
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import 
org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.response.PipeConsensusTransferFilePieceResp;
+import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
+import org.apache.iotdb.consensus.pipe.PipeConsensus;
+import org.apache.iotdb.consensus.pipe.client.SyncPipeConsensusServiceClient;
+import org.apache.iotdb.consensus.pipe.thrift.TCommitId;
+import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusBatchTransferResp;
+import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.builder.PipeConsensusSyncBatchReqBuilder;
+import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletBinaryReq;
+import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletInsertNodeReq;
+import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletRawReq;
+import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFilePieceReq;
+import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFilePieceWithModReq;
+import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealReq;
+import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealWithModReq;
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** This connector is used for PipeConsensus to transfer queued event. */
+public class PipeConsensusSyncConnector extends IoTDBConnector {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConsensusSyncConnector.class);
+
+  private static final String PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT =
+      "PipeConsensus: syncClient connection to %s:%s failed when %s, because: 
%s";
+
+  private static final String TABLET_INSERTION_NODE_SCENARIO = "transfer 
insertionNode tablet";
+
+  private static final String TABLET_RAW_SCENARIO = "transfer raw tablet";
+
+  private static final String TSFILE_SCENARIO = "transfer tsfile";
+
+  private static final String TABLET_BATCH_SCENARIO = "transfer tablet batch";
+
+  private final IClientManager<TEndPoint, SyncPipeConsensusServiceClient>
+      syncRetryAndHandshakeClientManager;
+
+  private final List<TEndPoint> peers;
+
+  private final int thisDataNodeId;
+
+  private String consensusGroupId;
+
+  private PipeConsensusSyncBatchReqBuilder tabletBatchBuilder;
+
+  public PipeConsensusSyncConnector(
+      List<TEndPoint> peers, String consensusGroupId, int thisDataNodeId) {
+    // In PipeConsensus, one pipeConsensusTask corresponds to a 
pipeConsensusConnector. Thus,
+    // `peers` here actually is a singletonList that contains one peer's 
TEndPoint. But here we
+    // retain the implementation of list to cope with possible future expansion
+    this.peers = peers;
+    this.consensusGroupId = consensusGroupId;
+    this.thisDataNodeId = thisDataNodeId;
+    this.syncRetryAndHandshakeClientManager =
+        ((PipeConsensus) 
DataRegionConsensusImpl.getInstance()).getSyncClientManager();
+  }
+
+  @Override
+  public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfiguration configuration)
+      throws Exception {
+    super.customize(parameters, configuration);
+    if (isTabletBatchModeEnabled) {
+      tabletBatchBuilder =
+          new PipeConsensusSyncBatchReqBuilder(
+              parameters,
+              new TConsensusGroupId(
+                  TConsensusGroupType.DataRegion, 
Integer.parseInt(consensusGroupId)),
+              thisDataNodeId);
+    }
+    // currently, tablet batch is false by default in PipeConsensus;
+    isTabletBatchModeEnabled = false;
+  }
+
+  @Override
+  public void handshake() throws Exception {
+    // do nothing
+    // PipeConsensus doesn't need to do handshake, since nodes in same 
consensusGroup/cluster
+    // usually have same configuration.
+  }
+
+  @Override
+  public void heartbeat() throws Exception {
+    // do nothing
+  }
+
+  @Override
+  public void transfer(TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
+    // Note: here we don't need to do type judgment here, because 
PipeConsensus uses DO_NOTHING
+    // processor and will not change the event type like
+    // 
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector
+    try {
+      if (isTabletBatchModeEnabled) {
+        if (tabletBatchBuilder.onEvent(tabletInsertionEvent)) {
+          doTransfer();
+        }
+      } else {
+        if (tabletInsertionEvent instanceof 
PipeInsertNodeTabletInsertionEvent) {
+          doTransferWrapper((PipeInsertNodeTabletInsertionEvent) 
tabletInsertionEvent);
+        } else {
+          doTransferWrapper((PipeRawTabletInsertionEvent) 
tabletInsertionEvent);
+        }
+      }
+    } catch (Exception e) {
+      throw new PipeConnectionException(
+          String.format(
+              "Failed to transfer tablet insertion event %s, because %s.",
+              tabletInsertionEvent, e.getMessage()),
+          e);
+    }
+  }
+
+  @Override
+  public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
+    // Note: here we don't need to do type judgment here, because 
PipeConsensus uses DO_NOTHING
+    // processor and will not change the event type like
+    // 
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector
+    try {
+      // In order to commit in order
+      if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
+        doTransfer();
+      }
+
+      doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent);
+    } catch (Exception e) {
+      throw new PipeConnectionException(
+          String.format(
+              "Failed to transfer tsfile insertion event %s, because %s.",
+              tsFileInsertionEvent, e.getMessage()),
+          e);
+    }
+  }
+
+  @Override
+  public void transfer(Event event) throws Exception {
+    // in order to commit in order
+    if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
+      doTransfer();
+    }
+
+    if (!(event instanceof PipeHeartbeatEvent)) {
+      LOGGER.warn(
+          "PipeConsensusSyncConnector does not support transferring generic 
event: {}.", event);
+    }
+  }
+
+  private void doTransfer() {
+    try (final SyncPipeConsensusServiceClient syncPipeConsensusServiceClient =
+        syncRetryAndHandshakeClientManager.borrowClient(getFollowerUrl())) {
+      final TPipeConsensusBatchTransferResp resp;
+      resp =
+          syncPipeConsensusServiceClient.pipeConsensusBatchTransfer(
+              tabletBatchBuilder.toTPipeConsensusBatchTransferReq());
+
+      final List<TSStatus> statusList =
+          resp.getBatchResps().stream()
+              .map(TPipeConsensusTransferResp::getStatus)
+              .collect(Collectors.toList());
+
+      // TODO(support batch): handle retry logic
+      // Only handle the failed statuses to avoid string format performance 
overhead
+      //      if (status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()
+      //          && status.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+      //        receiverStatusHandler.handle(
+      //            resp.getStatus(),
+      //            String.format(
+      //                "Transfer PipeConsensusTransferTabletBatchReq error, 
result status %s",
+      //                resp.status),
+      //            tabletBatchBuilder.deepCopyEvents().toString());
+      //      }
+
+      tabletBatchBuilder.onSuccess();
+    } catch (Exception e) {
+      throw new PipeConnectionException(
+          String.format(
+              PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
+              getFollowerUrl().getIp(),
+              getFollowerUrl().getPort(),
+              e.getMessage(),
+              TABLET_BATCH_SCENARIO),
+          e);
+    }
+  }
+
+  private void doTransferWrapper(
+      final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent)
+      throws PipeException {
+    try {
+      // We increase the reference count for this event to determine if the 
event may be released.
+      if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
+          PipeConsensusSyncConnector.class.getName())) {
+        return;
+      }
+      doTransfer(pipeInsertNodeTabletInsertionEvent);
+    } finally {
+      pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(
+          PipeConsensusSyncConnector.class.getName(), false);
+    }
+  }
+
+  private void doTransfer(PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent)
+      throws PipeException {
+    final InsertNode insertNode;
+    final ProgressIndex progressIndex;
+    final TPipeConsensusTransferResp resp;
+    TCommitId tCommitId =
+        new TCommitId(
+            pipeInsertNodeTabletInsertionEvent.getCommitId(),
+            pipeInsertNodeTabletInsertionEvent.getRebootTimes());
+    TConsensusGroupId tConsensusGroupId =
+        new TConsensusGroupId(TConsensusGroupType.DataRegion, 
Integer.parseInt(consensusGroupId));
+
+    try (final SyncPipeConsensusServiceClient syncPipeConsensusServiceClient =
+        syncRetryAndHandshakeClientManager.borrowClient(getFollowerUrl())) {
+      insertNode = 
pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
+      progressIndex = pipeInsertNodeTabletInsertionEvent.getProgressIndex();
+
+      if (insertNode != null) {
+        resp =
+            syncPipeConsensusServiceClient.pipeConsensusTransfer(
+                PipeConsensusTabletInsertNodeReq.toTPipeConsensusTransferReq(
+                    insertNode, tCommitId, tConsensusGroupId, progressIndex, 
thisDataNodeId));
+      } else {
+        resp =
+            syncPipeConsensusServiceClient.pipeConsensusTransfer(
+                PipeConsensusTabletBinaryReq.toTPipeConsensusTransferReq(
+                    pipeInsertNodeTabletInsertionEvent.getByteBuffer(),
+                    tCommitId,
+                    tConsensusGroupId,
+                    progressIndex,
+                    thisDataNodeId));
+      }
+    } catch (Exception e) {
+      throw new PipeConnectionException(
+          String.format(
+              PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
+              getFollowerUrl().getIp(),
+              getFollowerUrl().getPort(),
+              e.getMessage(),
+              TABLET_INSERTION_NODE_SCENARIO),
+          e);
+    }
+
+    final TSStatus status = resp.getStatus();
+    // Only handle the failed statuses to avoid string format performance 
overhead
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        && status.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+      receiverStatusHandler.handle(
+          status,
+          String.format(
+              "PipeConsensus transfer PipeInsertNodeTabletInsertionEvent %s 
error, result status %s",
+              pipeInsertNodeTabletInsertionEvent, status),
+          pipeInsertNodeTabletInsertionEvent.toString());
+    }
+  }
+
+  private void doTransferWrapper(final PipeRawTabletInsertionEvent 
pipeRawTabletInsertionEvent)
+      throws PipeException {
+    try {
+      // We increase the reference count for this event to determine if the 
event may be released.
+      if (!pipeRawTabletInsertionEvent.increaseReferenceCount(
+          PipeConsensusSyncConnector.class.getName())) {
+        return;
+      }
+      doTransfer(pipeRawTabletInsertionEvent);
+    } finally {
+      pipeRawTabletInsertionEvent.decreaseReferenceCount(
+          PipeConsensusSyncConnector.class.getName(), false);
+    }
+  }
+
+  private void doTransfer(PipeRawTabletInsertionEvent 
pipeRawTabletInsertionEvent)
+      throws PipeException {
+    final TPipeConsensusTransferResp resp;
+
+    try (final SyncPipeConsensusServiceClient syncPipeConsensusServiceClient =
+        syncRetryAndHandshakeClientManager.borrowClient(getFollowerUrl()); ) {
+      TCommitId tCommitId =
+          new TCommitId(
+              pipeRawTabletInsertionEvent.getCommitId(),
+              pipeRawTabletInsertionEvent.getRebootTimes());
+      TConsensusGroupId tConsensusGroupId =
+          new TConsensusGroupId(TConsensusGroupType.DataRegion, 
Integer.parseInt(consensusGroupId));
+
+      resp =
+          syncPipeConsensusServiceClient.pipeConsensusTransfer(
+              PipeConsensusTabletRawReq.toTPipeConsensusTransferReq(
+                  pipeRawTabletInsertionEvent.convertToTablet(),
+                  pipeRawTabletInsertionEvent.isAligned(),
+                  tCommitId,
+                  tConsensusGroupId,
+                  thisDataNodeId));
+    } catch (Exception e) {
+      throw new PipeConnectionException(
+          String.format(
+              PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT,
+              getFollowerUrl().getIp(),
+              getFollowerUrl().getPort(),
+              e.getMessage(),
+              TABLET_RAW_SCENARIO),
+          e);
+    }
+
+    final TSStatus status = resp.getStatus();
+    // Only handle the failed statuses to avoid string format performance 
overhead
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        && status.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+      receiverStatusHandler.handle(
+          status,
+          String.format(
+              "PipeConsensus transfer PipeRawTabletInsertionEvent %s error, 
result status %s",
+              pipeRawTabletInsertionEvent, status),
+          pipeRawTabletInsertionEvent.toString());
+    }
+  }
+
+  private void doTransfer(PipeTsFileInsertionEvent pipeTsFileInsertionEvent)
+      throws PipeException, IOException {
+    final File tsFile = pipeTsFileInsertionEvent.getTsFile();
+    final File modFile = pipeTsFileInsertionEvent.getModFile();
+    final TPipeConsensusTransferResp resp;
+
+    try (final SyncPipeConsensusServiceClient syncPipeConsensusServiceClient =
+        syncRetryAndHandshakeClientManager.borrowClient(getFollowerUrl())) {
+      final TCommitId tCommitId =
+          new TCommitId(
+              pipeTsFileInsertionEvent.getCommitId(), 
pipeTsFileInsertionEvent.getRebootTimes());
+      final TConsensusGroupId tConsensusGroupId =
+          new TConsensusGroupId(TConsensusGroupType.DataRegion, 
Integer.parseInt(consensusGroupId));
+
+      // 1. Transfer tsFile, and mod file if exists and receiver's version >= 2
+      if (pipeTsFileInsertionEvent.isWithMod()) {
+        transferFilePieces(
+            modFile, syncPipeConsensusServiceClient, true, tCommitId, 
tConsensusGroupId);
+        transferFilePieces(
+            tsFile, syncPipeConsensusServiceClient, true, tCommitId, 
tConsensusGroupId);
+        // 2. Transfer filre seal signal with mod, which means the file is 
tansferred completely

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to