Caideyipi commented on code in PR #15112:
URL: https://github.com/apache/iotdb/pull/15112#discussion_r2048129416


##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java:
##########
@@ -74,6 +76,10 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
   protected String username = CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
   protected String password = CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
 
+  // Used to store the clusterId for location comparison
+  public static final Map<String, String> CLUSTER_ID_MAP = new HashMap<>();

Review Comment:
   Why not use a hashSet....



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java:
##########
@@ -233,16 +245,20 @@ public void setSearchIndex(final long searchIndex) {
   protected void serializeAttributes(final ByteBuffer byteBuffer) {
     PlanNodeType.PIPE_ENRICHED_INSERT_DATA.serialize(byteBuffer);
     insertNode.serialize(byteBuffer);
+    ReadWriteIOUtils.write(originClusterId, byteBuffer);
   }
 
   @Override
   protected void serializeAttributes(final DataOutputStream stream) throws 
IOException {
     PlanNodeType.PIPE_ENRICHED_INSERT_DATA.serialize(stream);
     insertNode.serialize(stream);
+    ReadWriteIOUtils.write(originClusterId, stream);
   }
 
   public static PipeEnrichedInsertNode deserialize(final ByteBuffer buffer) {
-    return new PipeEnrichedInsertNode((InsertNode) 
PlanNodeType.deserialize(buffer));
+    return new PipeEnrichedInsertNode(
+        (InsertNode) PlanNodeType.deserialize(buffer),
+        buffer.hasRemaining() ? ReadWriteIOUtils.readString(buffer) : null);

Review Comment:
   What about ignore children size?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java:
##########
@@ -1045,6 +1045,7 @@ private RegionExecutionResult 
executeCreateOrUpdateTableDevice(
     @Override
     public RegionExecutionResult visitPipeEnrichedWritePlanNode(
         final PipeEnrichedWritePlanNode node, final 
WritePlanNodeExecutionContext context) {
+      node.setOriginClusterId(node.getOriginClusterId());

Review Comment:
   Seemingly some extra operations are needed to pass the "originClusterId" to 
the state machine....



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedWritePlanNode.java:
##########
@@ -156,16 +170,20 @@ public <R, C> R accept(final PlanVisitor<R, C> visitor, 
final C context) {
   protected void serializeAttributes(final ByteBuffer byteBuffer) {
     PlanNodeType.PIPE_ENRICHED_WRITE.serialize(byteBuffer);
     writePlanNode.serialize(byteBuffer);
+    ReadWriteIOUtils.write(originClusterId, byteBuffer);
   }
 
   @Override
   protected void serializeAttributes(final DataOutputStream stream) throws 
IOException {
     PlanNodeType.PIPE_ENRICHED_WRITE.serialize(stream);
     writePlanNode.serialize(stream);
+    ReadWriteIOUtils.write(originClusterId, stream);
   }
 
   public static PipeEnrichedWritePlanNode deserialize(final ByteBuffer buffer) 
{
-    return new PipeEnrichedWritePlanNode((WritePlanNode) 
PlanNodeType.deserialize(buffer));
+    return new PipeEnrichedWritePlanNode(
+        (WritePlanNode) PlanNodeType.deserialize(buffer),

Review Comment:
   Will there be some ignored children sizes..



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedNonWritePlanNode.java:
##########
@@ -141,16 +154,20 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C 
context) {
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.PIPE_ENRICHED_NON_WRITE.serialize(byteBuffer);
     nonWritePlanNode.serialize(byteBuffer);
+    ReadWriteIOUtils.write(originClusterId, byteBuffer);
   }
 
   @Override
   protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
     PlanNodeType.PIPE_ENRICHED_NON_WRITE.serialize(stream);
     nonWritePlanNode.serialize(stream);
+    ReadWriteIOUtils.write(originClusterId, stream);
   }
 
   public static PipeEnrichedNonWritePlanNode deserialize(ByteBuffer buffer) {
-    return new PipeEnrichedNonWritePlanNode(PlanNodeType.deserialize(buffer));
+    return new PipeEnrichedNonWritePlanNode(
+        PlanNodeType.deserialize(buffer),

Review Comment:
   What about ignored children size?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to