Caideyipi commented on code in PR #11994:
URL: https://github.com/apache/iotdb/pull/11994#discussion_r1471043511


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java:
##########
@@ -46,6 +49,7 @@ public class PipeRuntimeAgent implements IService {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeRuntimeAgent.class);
   private static final int DATA_NODE_ID = 
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
+  private static volatile String clusterId = null;

Review Comment:
   synchronized may not needs "volatile" and provides visibility itself?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/common/PipeConstant.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.payload.evolvable.common;
+
+public class PipeConstant {

Review Comment:
   The naming can be a little less generic, since other constants like 
"ExtractorConstant"s are not in here, and this is only in DataNode scope.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeV2Req.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
+
+import 
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.thrift.TException;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+public class PipeTransferHandshakeV2Req extends TPipeTransferReq {
+  private transient Map<String, String> params;
+
+  private PipeTransferHandshakeV2Req() {
+    // Empty constructor
+  }
+
+  public Map<String, String> getParams() {
+    return params;
+  }
+
+  /////////////////////////////// Thrift ///////////////////////////////
+
+  public static PipeTransferHandshakeV2Req toTPipeTransferReq(HashMap<String, 
String> params)
+      throws TException {
+    final PipeTransferHandshakeV2Req handshakeReq = new 
PipeTransferHandshakeV2Req();
+
+    handshakeReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
+    handshakeReq.type = PipeRequestType.HANDSHAKE_V2.getType();
+    handshakeReq.body = ByteBuffer.wrap(SerializationUtils.serialize(params));
+
+    handshakeReq.params = params;
+
+    return handshakeReq;
+  }
+
+  public static PipeTransferHandshakeV2Req 
fromTPipeTransferReq(TPipeTransferReq transferReq) {
+    final PipeTransferHandshakeV2Req handshakeReq = new 
PipeTransferHandshakeV2Req();
+
+    handshakeReq.params = 
SerializationUtils.deserialize(transferReq.body.array());
+
+    handshakeReq.version = transferReq.version;
+    handshakeReq.type = transferReq.type;
+    handshakeReq.body = transferReq.body;
+
+    return handshakeReq;
+  }
+
+  /////////////////////////////// Air Gap ///////////////////////////////
+
+  public static byte[] toTransferHandshakeBytes(HashMap<String, String> 
params) throws IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), 
outputStream);
+      ReadWriteIOUtils.write(PipeRequestType.HANDSHAKE_V2.getType(), 
outputStream);
+      ReadWriteIOUtils.write(params, outputStream);

Review Comment:
   ReadWriteIOUtils may not be compatible with SerializationUtils since the 
latter one use the "Serialzable" api from java...



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java:
##########
@@ -132,14 +143,42 @@ private void reconstructClient(TEndPoint endPoint) throws 
IOException {
               endPoint.getPort()),
           e);
     }
+  }
 
+  public void sendHandshakeReq(
+      Pair<IoTDBThriftSyncConnectorClient, Boolean> clientAndStatus, TEndPoint 
endPoint) {
     try {
-      final TPipeTransferResp resp =
+      HashMap<String, String> params = new HashMap<>();
+      params.put(
+          PipeConstant.HANDSHAKE_KEY_TIME_PRECISION,
+          CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+      params.put(PipeConstant.HANDSHAKE_KEY_CLUSTER_ID, 
PipeRuntimeAgent.getClusterId());
+
+      TPipeTransferResp resp =
           clientAndStatus
               .getLeft()
-              .pipeTransfer(
-                  PipeTransferHandshakeReq.toTPipeTransferReq(
-                      
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
+              
.pipeTransfer(PipeTransferHandshakeV2Req.toTPipeTransferReq(params));
+      if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        // Retry to handshake by PipeTransferHandshakeV1Req.
+        if (resp.getStatus().getCode() == 
TSStatusCode.PIPE_TYPE_ERROR.getStatusCode()) {
+          LOGGER.warn(

Review Comment:
   I think info here is better for better compatibility use.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java:
##########
@@ -246,6 +251,35 @@ private TPipeTransferResp 
handleTransferHandshake(PipeTransferHandshakeReq req)
     return new TPipeTransferResp(RpcUtils.SUCCESS_STATUS);
   }
 
+  private TPipeTransferResp 
handleTransferHandshakeV2(PipeTransferHandshakeV2Req req)
+      throws IOException {
+    // Reject to handshake if the receiver can not take clusterId from 
configNode.
+    String clusterId = PipeRuntimeAgent.getClusterId();
+    if (clusterId == null) {
+      final TSStatus status =
+          RpcUtils.getStatus(
+              TSStatusCode.PIPE_REJECT_ERROR, "Unable to get clusterId in 
receiver.");
+      LOGGER.warn("Handshake failed, response status = {}.", status);
+      return new TPipeTransferResp(status);
+    }
+
+    // Reject to handshake if the receiver and sender are from the same 
cluster.
+    if 
(req.getParams().get(PipeConstant.HANDSHAKE_KEY_CLUSTER_ID).equals(clusterId)) {
+      final TSStatus status =
+          RpcUtils.getStatus(
+              TSStatusCode.PIPE_REJECT_ERROR,
+              String.format("Unable to transfer data to IoTDB cluster %s 
itself", clusterId));
+      LOGGER.warn("Handshake failed, response status = {}.", status);
+      return new TPipeTransferResp(status);
+    }
+
+    // Handle the rest part by handleTransferHandshakeV1.

Review Comment:
   rest of the parts



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to