luoluoyuyu commented on code in PR #14922:
URL: https://github.com/apache/iotdb/pull/14922#discussion_r1971037770


##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java:
##########
@@ -155,97 +151,119 @@ private void doTransferWrapper(
       return;
     }
     try {
-      doTransfer(socket, pipeConfigRegionWritePlanEvent);
+      doTransfer(pipeConfigRegionWritePlanEvent);
     } finally {
       pipeConfigRegionWritePlanEvent.decreaseReferenceCount(
           IoTDBConfigRegionAirGapConnector.class.getName(), false);
     }
   }
 
-  private void doTransfer(
-      final AirGapSocket socket,
-      final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent)
+  private void doTransfer(final PipeConfigRegionWritePlanEvent 
pipeConfigRegionWritePlanEvent)
       throws PipeException, IOException {
-    if (!send(
-        pipeConfigRegionWritePlanEvent.getPipeName(),
-        pipeConfigRegionWritePlanEvent.getCreationTime(),
-        socket,
-        PipeTransferConfigPlanReq.toTPipeTransferBytes(
-            pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()))) {
-      final String errorMessage =
-          String.format(
-              "Transfer config region write plan %s error. Socket: %s.",
-              
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().getType(), socket);
-      // Send handshake because we don't know whether the receiver side 
configNode
-      // has set up a new one
-      sendHandshakeReq(socket);
-      receiverStatusHandler.handle(
-          new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
-              .setMessage(errorMessage),
-          errorMessage,
-          pipeConfigRegionWritePlanEvent.toString());
+    final List<Integer> socketIndexes =
+        shouldSendToAllClients
+            ? allAliveSocketsIndex()
+            : Collections.singletonList(nextSocketIndex());
+    for (final int socketIndex : socketIndexes) {
+      final AirGapSocket socket = sockets.get(socketIndex);
+      try {
+        if (!send(
+            pipeConfigRegionWritePlanEvent.getPipeName(),
+            pipeConfigRegionWritePlanEvent.getCreationTime(),
+            socket,
+            PipeTransferConfigPlanReq.toTPipeTransferBytes(
+                pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()))) {

Review Comment:
   I think this byte[] only needs to be constructed once, which can reduce a 
lot of unnecessary overhead



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java:
##########
@@ -144,45 +146,48 @@ private void doTransferWrapper(
 
   private void doTransfer(final PipeConfigRegionWritePlanEvent 
pipeConfigRegionWritePlanEvent)
       throws PipeException {
-    final Pair<IoTDBSyncClient, Boolean> clientAndStatus = 
clientManager.getClient();
+    final List<Pair<IoTDBSyncClient, Boolean>> clientsAndStatuses =
+        shouldSendToAllClients
+            ? clientManager.getAllClients()
+            : Collections.singletonList(clientManager.getClient());
+    for (final Pair<IoTDBSyncClient, Boolean> clientAndStatus : 
clientsAndStatuses) {
+      final TPipeTransferResp resp;
+      try {
+        final TPipeTransferReq req =
+            compressIfNeeded(
+                PipeTransferConfigPlanReq.toTPipeTransferReq(
+                    pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()));

Review Comment:
   If the byte[] here can also be reused, the sending performance can be 
greatly improved. However, it should be noted that the internal ByteBuffer 
needs to copy the internal array.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java:
##########
@@ -155,97 +151,119 @@ private void doTransferWrapper(
       return;
     }
     try {
-      doTransfer(socket, pipeConfigRegionWritePlanEvent);
+      doTransfer(pipeConfigRegionWritePlanEvent);
     } finally {
       pipeConfigRegionWritePlanEvent.decreaseReferenceCount(
           IoTDBConfigRegionAirGapConnector.class.getName(), false);
     }
   }
 
-  private void doTransfer(
-      final AirGapSocket socket,
-      final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent)
+  private void doTransfer(final PipeConfigRegionWritePlanEvent 
pipeConfigRegionWritePlanEvent)
       throws PipeException, IOException {
-    if (!send(
-        pipeConfigRegionWritePlanEvent.getPipeName(),
-        pipeConfigRegionWritePlanEvent.getCreationTime(),
-        socket,
-        PipeTransferConfigPlanReq.toTPipeTransferBytes(
-            pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()))) {
-      final String errorMessage =
-          String.format(
-              "Transfer config region write plan %s error. Socket: %s.",
-              
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().getType(), socket);
-      // Send handshake because we don't know whether the receiver side 
configNode
-      // has set up a new one
-      sendHandshakeReq(socket);
-      receiverStatusHandler.handle(
-          new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
-              .setMessage(errorMessage),
-          errorMessage,
-          pipeConfigRegionWritePlanEvent.toString());
+    final List<Integer> socketIndexes =
+        shouldSendToAllClients
+            ? allAliveSocketsIndex()
+            : Collections.singletonList(nextSocketIndex());
+    for (final int socketIndex : socketIndexes) {
+      final AirGapSocket socket = sockets.get(socketIndex);
+      try {
+        if (!send(
+            pipeConfigRegionWritePlanEvent.getPipeName(),
+            pipeConfigRegionWritePlanEvent.getCreationTime(),
+            socket,
+            PipeTransferConfigPlanReq.toTPipeTransferBytes(
+                pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()))) {
+          final String errorMessage =
+              String.format(
+                  "Transfer config region write plan %s error. Socket: %s.",
+                  
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().getType(), socket);
+          // Send handshake because we don't know whether the receiver side 
configNode
+          // has set up a new one
+          sendHandshakeReq(socket);
+          receiverStatusHandler.handle(
+              new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+                  .setMessage(errorMessage),
+              errorMessage,
+              pipeConfigRegionWritePlanEvent.toString());
+        }
+      } catch (final IOException e) {
+        isSocketAlive.set(socketIndex, false);
+        throw e;
+      }
     }
   }
 
-  private void doTransferWrapper(
-      final AirGapSocket socket, final PipeConfigRegionSnapshotEvent 
pipeConfigRegionSnapshotEvent)
+  private void doTransferWrapper(final PipeConfigRegionSnapshotEvent 
pipeConfigRegionSnapshotEvent)
       throws PipeException, IOException {
     // We increase the reference count for this event to determine if the 
event may be released.
     if (!pipeConfigRegionSnapshotEvent.increaseReferenceCount(
         IoTDBConfigRegionAirGapConnector.class.getName())) {
       return;
     }
     try {
-      doTransfer(socket, pipeConfigRegionSnapshotEvent);
+      doTransfer(pipeConfigRegionSnapshotEvent);
     } finally {
       pipeConfigRegionSnapshotEvent.decreaseReferenceCount(
           IoTDBConfigRegionAirGapConnector.class.getName(), false);
     }
   }
 
-  private void doTransfer(
-      final AirGapSocket socket, final PipeConfigRegionSnapshotEvent 
pipeConfigRegionSnapshotEvent)
+  private void doTransfer(final PipeConfigRegionSnapshotEvent 
pipeConfigRegionSnapshotEvent)
       throws PipeException, IOException {
     final String pipeName = pipeConfigRegionSnapshotEvent.getPipeName();
     final long creationTime = pipeConfigRegionSnapshotEvent.getCreationTime();
     final File snapshot = pipeConfigRegionSnapshotEvent.getSnapshotFile();
     final File templateFile = pipeConfigRegionSnapshotEvent.getTemplateFile();
 
-    // 1. Transfer snapshotFile, and template file if exists
-    transferFilePieces(pipeName, creationTime, snapshot, socket, true);
-    if (Objects.nonNull(templateFile)) {
-      transferFilePieces(pipeName, creationTime, templateFile, socket, true);
-    }
-    // 2. Transfer file seal signal, which means the snapshots are transferred 
completely
-    if (!send(
-        pipeName,
-        creationTime,
-        socket,
-        PipeTransferConfigSnapshotSealReq.toTPipeTransferBytes(
-            // The pattern is surely Non-null
-            pipeConfigRegionSnapshotEvent.getTreePatternString(),
-            
pipeConfigRegionSnapshotEvent.getTablePattern().getDatabasePattern(),
-            pipeConfigRegionSnapshotEvent.getTablePattern().getTablePattern(),
-            
pipeConfigRegionSnapshotEvent.getTreePattern().isTreeModelDataAllowedToBeCaptured(),
-            
pipeConfigRegionSnapshotEvent.getTablePattern().isTableModelDataAllowedToBeCaptured(),
-            snapshot.getName(),
-            snapshot.length(),
-            Objects.nonNull(templateFile) ? templateFile.getName() : null,
-            Objects.nonNull(templateFile) ? templateFile.length() : 0,
-            pipeConfigRegionSnapshotEvent.getFileType(),
-            pipeConfigRegionSnapshotEvent.toSealTypeString()))) {
-      final String errorMessage =
-          String.format("Seal config region snapshot %s error. Socket %s.", 
snapshot, socket);
-      // Send handshake because we don't know whether the receiver side 
configNode
-      // has set up a new one
-      sendHandshakeReq(socket);
-      receiverStatusHandler.handle(
-          new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
-              .setMessage(errorMessage),
-          errorMessage,
-          pipeConfigRegionSnapshotEvent.toString());
-    } else {
-      LOGGER.info("Successfully transferred config region snapshot {}.", 
snapshot);
+    final List<Integer> socketIndexes =
+        shouldSendToAllClients
+            ? allAliveSocketsIndex()
+            : Collections.singletonList(nextSocketIndex());
+    for (final int socketIndex : socketIndexes) {
+      final AirGapSocket socket = sockets.get(socketIndex);
+      try {
+        // 1. Transfer snapshotFile, and template file if exists
+        transferFilePieces(pipeName, creationTime, snapshot, socket, true);
+        if (Objects.nonNull(templateFile)) {
+          transferFilePieces(pipeName, creationTime, templateFile, socket, 
true);
+        }
+        // 2. Transfer file seal signal, which means the snapshots are 
transferred completely
+        if (!send(
+            pipeName,
+            creationTime,
+            socket,
+            PipeTransferConfigSnapshotSealReq.toTPipeTransferBytes(
+                // The pattern is surely Non-null
+                pipeConfigRegionSnapshotEvent.getTreePatternString(),
+                
pipeConfigRegionSnapshotEvent.getTablePattern().getDatabasePattern(),
+                
pipeConfigRegionSnapshotEvent.getTablePattern().getTablePattern(),
+                
pipeConfigRegionSnapshotEvent.getTreePattern().isTreeModelDataAllowedToBeCaptured(),
+                pipeConfigRegionSnapshotEvent
+                    .getTablePattern()
+                    .isTableModelDataAllowedToBeCaptured(),
+                snapshot.getName(),
+                snapshot.length(),
+                Objects.nonNull(templateFile) ? templateFile.getName() : null,
+                Objects.nonNull(templateFile) ? templateFile.length() : 0,
+                pipeConfigRegionSnapshotEvent.getFileType(),
+                pipeConfigRegionSnapshotEvent.toSealTypeString()))) {

Review Comment:
   Same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to