OneSizeFitsQuorum commented on code in PR #14505:
URL: https://github.com/apache/iotdb/pull/14505#discussion_r1893586877
##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java:
##########
@@ -145,6 +146,15 @@ public interface IConsensus {
*/
void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws
ConsensusException;
+ /**
+ * Record the correct peer list (likely got from the ConfigNode) for future
use in resetPeerList.
+ * Only use this method if necessary. If it is called, it should be called
before {@link
+ * #start()}.
+ *
+ * @param correctPeerList The correct consensus group member list
+ */
+ void recordCorrectPeerListBeforeStart(Map<ConsensusGroupId, List<Peer>>
correctPeerList);
Review Comment:
BeforeStaring?
##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java:
##########
@@ -178,10 +180,30 @@ private void initAndRecover() throws IOException {
syncClientManager,
config);
stateMachineMap.put(consensusGroupId, consensus);
- consensus.start();
}
}
}
+ if (correctPeerListBeforeStart != null) {
+ BiConsumer<ConsensusGroupId, List<Peer>> resetPeerListWithoutThrow =
+ (consensusGroupId, peers) -> {
+ try {
+ resetPeerList(consensusGroupId, peers);
+ } catch (ConsensusGroupNotExistException ignore) {
+
+ } catch (Exception e) {
+ logger.warn("Failed to reset peer list while start", e);
+ }
+ };
+ // make peers which are in list correct
+ correctPeerListBeforeStart.forEach(resetPeerListWithoutThrow);
+ // clear peers which are not in the list
+ stateMachineMap.keySet().stream()
Review Comment:
will modity the stateMachineMap when iteration?cause exception?
##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java:
##########
@@ -178,10 +180,30 @@ private void initAndRecover() throws IOException {
syncClientManager,
config);
stateMachineMap.put(consensusGroupId, consensus);
- consensus.start();
}
}
}
+ if (correctPeerListBeforeStart != null) {
+ BiConsumer<ConsensusGroupId, List<Peer>> resetPeerListWithoutThrow =
Review Comment:
make it a class field?
##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java:
##########
@@ -483,10 +475,14 @@ public void reloadConsensusConfig(ConsensusConfig
consensusConfig) {
.init(config.getReplication().getRegionMigrationSpeedLimitBytesPerSecond());
}
+ @Override
+ public void recordCorrectPeerListBeforeStart(Map<ConsensusGroupId,
List<Peer>> correctPeerList) {
Review Comment:
Add some UT for this
##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java:
##########
@@ -146,33 +147,61 @@ private void initAndRecover() throws IOException {
}
} else {
// asynchronously recover, retry logic is implemented at
PipeConsensusImpl
- CompletableFuture.runAsync(
- () -> {
- try (DirectoryStream<Path> stream =
Files.newDirectoryStream(storageDir.toPath())) {
- for (Path path : stream) {
- ConsensusGroupId consensusGroupId =
- parsePeerFileName(path.getFileName().toString());
- PipeConsensusServerImpl consensus =
- new PipeConsensusServerImpl(
- new Peer(consensusGroupId, thisNodeId, thisNode),
- registry.apply(consensusGroupId),
- path.toString(),
- new ArrayList<>(),
- config,
- consensusPipeManager,
- syncClientManager);
- stateMachineMap.put(consensusGroupId, consensus);
- consensus.start(true);
- }
- } catch (Exception e) {
- LOGGER.error("Failed to recover consensus from {}",
storageDir, e);
- }
- })
- .exceptionally(
- e -> {
- LOGGER.error("Failed to recover consensus from {}",
storageDir, e);
- return null;
- });
+ CompletableFuture<Void> future =
+ CompletableFuture.runAsync(
+ () -> {
+ try (DirectoryStream<Path> stream =
+ Files.newDirectoryStream(storageDir.toPath())) {
+ for (Path path : stream) {
+ ConsensusGroupId consensusGroupId =
+ parsePeerFileName(path.getFileName().toString());
+ PipeConsensusServerImpl consensus =
+ new PipeConsensusServerImpl(
+ new Peer(consensusGroupId, thisNodeId,
thisNode),
+ registry.apply(consensusGroupId),
+ path.toString(),
+ new ArrayList<>(),
+ config,
+ consensusPipeManager,
+ syncClientManager);
+ stateMachineMap.put(consensusGroupId, consensus);
+ consensus.start(true);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to recover consensus from {}",
storageDir, e);
+ }
+ })
+ .exceptionally(
+ e -> {
+ LOGGER.error("Failed to recover consensus from {}",
storageDir, e);
+ return null;
+ });
+
+ if (correctPeerListBeforeStart != null) {
+ try {
+ future.get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ BiConsumer<ConsensusGroupId, List<Peer>> resetPeerListWithoutThrow =
+ (consensusGroupId, peers) -> {
+ try {
+ resetPeerList(consensusGroupId, peers);
+ } catch (ConsensusGroupNotExistException ignore) {
+
+ } catch (Exception e) {
+ LOGGER.warn("Failed to reset peer list while start", e);
+ }
+ };
+ // make peers which are in list correct
+ correctPeerListBeforeStart.forEach(resetPeerListWithoutThrow);
+ // clear peers which are not in the list
+ stateMachineMap.keySet().stream()
Review Comment:
same
##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java:
##########
@@ -211,6 +210,13 @@ public void removeRemotePeer(ConsensusGroupId groupId,
Peer peer) throws Consens
throw new ConsensusException("SimpleConsensus does not support membership
changes");
}
+ @Override
+ public void recordCorrectPeerListBeforeStarting(
+ Map<ConsensusGroupId, List<Peer>> correctPeerList) {
+ throw new UnsupportedOperationException(
Review Comment:
just do nothing? or simpleconsensus init may failed?
##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java:
##########
@@ -146,33 +147,61 @@ private void initAndRecover() throws IOException {
}
} else {
// asynchronously recover, retry logic is implemented at
PipeConsensusImpl
- CompletableFuture.runAsync(
- () -> {
- try (DirectoryStream<Path> stream =
Files.newDirectoryStream(storageDir.toPath())) {
- for (Path path : stream) {
- ConsensusGroupId consensusGroupId =
- parsePeerFileName(path.getFileName().toString());
- PipeConsensusServerImpl consensus =
- new PipeConsensusServerImpl(
- new Peer(consensusGroupId, thisNodeId, thisNode),
- registry.apply(consensusGroupId),
- path.toString(),
- new ArrayList<>(),
- config,
- consensusPipeManager,
- syncClientManager);
- stateMachineMap.put(consensusGroupId, consensus);
- consensus.start(true);
- }
- } catch (Exception e) {
- LOGGER.error("Failed to recover consensus from {}",
storageDir, e);
- }
- })
- .exceptionally(
- e -> {
- LOGGER.error("Failed to recover consensus from {}",
storageDir, e);
- return null;
- });
+ CompletableFuture<Void> future =
+ CompletableFuture.runAsync(
+ () -> {
+ try (DirectoryStream<Path> stream =
+ Files.newDirectoryStream(storageDir.toPath())) {
+ for (Path path : stream) {
+ ConsensusGroupId consensusGroupId =
+ parsePeerFileName(path.getFileName().toString());
+ PipeConsensusServerImpl consensus =
+ new PipeConsensusServerImpl(
+ new Peer(consensusGroupId, thisNodeId,
thisNode),
+ registry.apply(consensusGroupId),
+ path.toString(),
+ new ArrayList<>(),
+ config,
+ consensusPipeManager,
+ syncClientManager);
+ stateMachineMap.put(consensusGroupId, consensus);
+ consensus.start(true);
Review Comment:
can we start them after resetpeerlist?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java:
##########
@@ -64,6 +69,10 @@ public void createPipe(
TSStatus status = configNodeClient.createPipe(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != status.getCode()) {
LOGGER.warn("Failed to create consensus pipe-{}, status: {}",
pipeName, status);
Review Comment:
move to line 75?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeDispatcher.java:
##########
@@ -111,6 +120,10 @@ public void dropPipe(ConsensusPipeName pipeName) throws
Exception {
final TSStatus status = configNodeClient.dropPipe(pipeName.toString());
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != status.getCode()) {
LOGGER.warn("Failed to drop consensus pipe-{}, status: {}", pipeName,
status);
Review Comment:
move to line 126?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]