luoluoyuyu commented on code in PR #14922:
URL: https://github.com/apache/iotdb/pull/14922#discussion_r1971037770
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java:
##########
@@ -155,97 +151,119 @@ private void doTransferWrapper(
return;
}
try {
- doTransfer(socket, pipeConfigRegionWritePlanEvent);
+ doTransfer(pipeConfigRegionWritePlanEvent);
} finally {
pipeConfigRegionWritePlanEvent.decreaseReferenceCount(
IoTDBConfigRegionAirGapConnector.class.getName(), false);
}
}
- private void doTransfer(
- final AirGapSocket socket,
- final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent)
+ private void doTransfer(final PipeConfigRegionWritePlanEvent
pipeConfigRegionWritePlanEvent)
throws PipeException, IOException {
- if (!send(
- pipeConfigRegionWritePlanEvent.getPipeName(),
- pipeConfigRegionWritePlanEvent.getCreationTime(),
- socket,
- PipeTransferConfigPlanReq.toTPipeTransferBytes(
- pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()))) {
- final String errorMessage =
- String.format(
- "Transfer config region write plan %s error. Socket: %s.",
-
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().getType(), socket);
- // Send handshake because we don't know whether the receiver side
configNode
- // has set up a new one
- sendHandshakeReq(socket);
- receiverStatusHandler.handle(
- new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
- .setMessage(errorMessage),
- errorMessage,
- pipeConfigRegionWritePlanEvent.toString());
+ final List<Integer> socketIndexes =
+ shouldSendToAllClients
+ ? allAliveSocketsIndex()
+ : Collections.singletonList(nextSocketIndex());
+ for (final int socketIndex : socketIndexes) {
+ final AirGapSocket socket = sockets.get(socketIndex);
+ try {
+ if (!send(
+ pipeConfigRegionWritePlanEvent.getPipeName(),
+ pipeConfigRegionWritePlanEvent.getCreationTime(),
+ socket,
+ PipeTransferConfigPlanReq.toTPipeTransferBytes(
+ pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()))) {
Review Comment:
I think this byte[] only needs to be constructed once, which can reduce a
lot of unnecessary overhead
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java:
##########
@@ -144,45 +146,48 @@ private void doTransferWrapper(
private void doTransfer(final PipeConfigRegionWritePlanEvent
pipeConfigRegionWritePlanEvent)
throws PipeException {
- final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
clientManager.getClient();
+ final List<Pair<IoTDBSyncClient, Boolean>> clientsAndStatuses =
+ shouldSendToAllClients
+ ? clientManager.getAllClients()
+ : Collections.singletonList(clientManager.getClient());
+ for (final Pair<IoTDBSyncClient, Boolean> clientAndStatus :
clientsAndStatuses) {
+ final TPipeTransferResp resp;
+ try {
+ final TPipeTransferReq req =
+ compressIfNeeded(
+ PipeTransferConfigPlanReq.toTPipeTransferReq(
+ pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()));
Review Comment:
If the byte[] here can also be reused, the sending performance can be
greatly improved. However, it should be noted that the internal ByteBuffer
needs to copy the internal array.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java:
##########
@@ -155,97 +151,119 @@ private void doTransferWrapper(
return;
}
try {
- doTransfer(socket, pipeConfigRegionWritePlanEvent);
+ doTransfer(pipeConfigRegionWritePlanEvent);
} finally {
pipeConfigRegionWritePlanEvent.decreaseReferenceCount(
IoTDBConfigRegionAirGapConnector.class.getName(), false);
}
}
- private void doTransfer(
- final AirGapSocket socket,
- final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent)
+ private void doTransfer(final PipeConfigRegionWritePlanEvent
pipeConfigRegionWritePlanEvent)
throws PipeException, IOException {
- if (!send(
- pipeConfigRegionWritePlanEvent.getPipeName(),
- pipeConfigRegionWritePlanEvent.getCreationTime(),
- socket,
- PipeTransferConfigPlanReq.toTPipeTransferBytes(
- pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()))) {
- final String errorMessage =
- String.format(
- "Transfer config region write plan %s error. Socket: %s.",
-
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().getType(), socket);
- // Send handshake because we don't know whether the receiver side
configNode
- // has set up a new one
- sendHandshakeReq(socket);
- receiverStatusHandler.handle(
- new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
- .setMessage(errorMessage),
- errorMessage,
- pipeConfigRegionWritePlanEvent.toString());
+ final List<Integer> socketIndexes =
+ shouldSendToAllClients
+ ? allAliveSocketsIndex()
+ : Collections.singletonList(nextSocketIndex());
+ for (final int socketIndex : socketIndexes) {
+ final AirGapSocket socket = sockets.get(socketIndex);
+ try {
+ if (!send(
+ pipeConfigRegionWritePlanEvent.getPipeName(),
+ pipeConfigRegionWritePlanEvent.getCreationTime(),
+ socket,
+ PipeTransferConfigPlanReq.toTPipeTransferBytes(
+ pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()))) {
+ final String errorMessage =
+ String.format(
+ "Transfer config region write plan %s error. Socket: %s.",
+
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().getType(), socket);
+ // Send handshake because we don't know whether the receiver side
configNode
+ // has set up a new one
+ sendHandshakeReq(socket);
+ receiverStatusHandler.handle(
+ new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(errorMessage),
+ errorMessage,
+ pipeConfigRegionWritePlanEvent.toString());
+ }
+ } catch (final IOException e) {
+ isSocketAlive.set(socketIndex, false);
+ throw e;
+ }
}
}
- private void doTransferWrapper(
- final AirGapSocket socket, final PipeConfigRegionSnapshotEvent
pipeConfigRegionSnapshotEvent)
+ private void doTransferWrapper(final PipeConfigRegionSnapshotEvent
pipeConfigRegionSnapshotEvent)
throws PipeException, IOException {
// We increase the reference count for this event to determine if the
event may be released.
if (!pipeConfigRegionSnapshotEvent.increaseReferenceCount(
IoTDBConfigRegionAirGapConnector.class.getName())) {
return;
}
try {
- doTransfer(socket, pipeConfigRegionSnapshotEvent);
+ doTransfer(pipeConfigRegionSnapshotEvent);
} finally {
pipeConfigRegionSnapshotEvent.decreaseReferenceCount(
IoTDBConfigRegionAirGapConnector.class.getName(), false);
}
}
- private void doTransfer(
- final AirGapSocket socket, final PipeConfigRegionSnapshotEvent
pipeConfigRegionSnapshotEvent)
+ private void doTransfer(final PipeConfigRegionSnapshotEvent
pipeConfigRegionSnapshotEvent)
throws PipeException, IOException {
final String pipeName = pipeConfigRegionSnapshotEvent.getPipeName();
final long creationTime = pipeConfigRegionSnapshotEvent.getCreationTime();
final File snapshot = pipeConfigRegionSnapshotEvent.getSnapshotFile();
final File templateFile = pipeConfigRegionSnapshotEvent.getTemplateFile();
- // 1. Transfer snapshotFile, and template file if exists
- transferFilePieces(pipeName, creationTime, snapshot, socket, true);
- if (Objects.nonNull(templateFile)) {
- transferFilePieces(pipeName, creationTime, templateFile, socket, true);
- }
- // 2. Transfer file seal signal, which means the snapshots are transferred
completely
- if (!send(
- pipeName,
- creationTime,
- socket,
- PipeTransferConfigSnapshotSealReq.toTPipeTransferBytes(
- // The pattern is surely Non-null
- pipeConfigRegionSnapshotEvent.getTreePatternString(),
-
pipeConfigRegionSnapshotEvent.getTablePattern().getDatabasePattern(),
- pipeConfigRegionSnapshotEvent.getTablePattern().getTablePattern(),
-
pipeConfigRegionSnapshotEvent.getTreePattern().isTreeModelDataAllowedToBeCaptured(),
-
pipeConfigRegionSnapshotEvent.getTablePattern().isTableModelDataAllowedToBeCaptured(),
- snapshot.getName(),
- snapshot.length(),
- Objects.nonNull(templateFile) ? templateFile.getName() : null,
- Objects.nonNull(templateFile) ? templateFile.length() : 0,
- pipeConfigRegionSnapshotEvent.getFileType(),
- pipeConfigRegionSnapshotEvent.toSealTypeString()))) {
- final String errorMessage =
- String.format("Seal config region snapshot %s error. Socket %s.",
snapshot, socket);
- // Send handshake because we don't know whether the receiver side
configNode
- // has set up a new one
- sendHandshakeReq(socket);
- receiverStatusHandler.handle(
- new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
- .setMessage(errorMessage),
- errorMessage,
- pipeConfigRegionSnapshotEvent.toString());
- } else {
- LOGGER.info("Successfully transferred config region snapshot {}.",
snapshot);
+ final List<Integer> socketIndexes =
+ shouldSendToAllClients
+ ? allAliveSocketsIndex()
+ : Collections.singletonList(nextSocketIndex());
+ for (final int socketIndex : socketIndexes) {
+ final AirGapSocket socket = sockets.get(socketIndex);
+ try {
+ // 1. Transfer snapshotFile, and template file if exists
+ transferFilePieces(pipeName, creationTime, snapshot, socket, true);
+ if (Objects.nonNull(templateFile)) {
+ transferFilePieces(pipeName, creationTime, templateFile, socket,
true);
+ }
+ // 2. Transfer file seal signal, which means the snapshots are
transferred completely
+ if (!send(
+ pipeName,
+ creationTime,
+ socket,
+ PipeTransferConfigSnapshotSealReq.toTPipeTransferBytes(
+ // The pattern is surely Non-null
+ pipeConfigRegionSnapshotEvent.getTreePatternString(),
+
pipeConfigRegionSnapshotEvent.getTablePattern().getDatabasePattern(),
+
pipeConfigRegionSnapshotEvent.getTablePattern().getTablePattern(),
+
pipeConfigRegionSnapshotEvent.getTreePattern().isTreeModelDataAllowedToBeCaptured(),
+ pipeConfigRegionSnapshotEvent
+ .getTablePattern()
+ .isTableModelDataAllowedToBeCaptured(),
+ snapshot.getName(),
+ snapshot.length(),
+ Objects.nonNull(templateFile) ? templateFile.getName() : null,
+ Objects.nonNull(templateFile) ? templateFile.length() : 0,
+ pipeConfigRegionSnapshotEvent.getFileType(),
+ pipeConfigRegionSnapshotEvent.toSealTypeString()))) {
Review Comment:
Same here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]