CRZbulabula commented on code in PR #17609:
URL: https://github.com/apache/iotdb/pull/17609#discussion_r3200312085
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java:
##########
@@ -121,6 +121,13 @@ public TSStatus write(IConsensusRequest request) {
/** Transmit {@link ConfigPhysicalPlan} to {@link ConfigPlanExecutor} */
protected TSStatus write(ConfigPhysicalPlan plan) {
+ if
(ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass()))
{
+ final TSStatus persistStatus = persistPlanForSimpleConsensus(plan);
+ if (persistStatus.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return persistStatus;
+ }
+ }
Review Comment:
**WAL replay idempotency requirement**
With the new write-ahead ordering (persist → execute), if
`persistPlanForSimpleConsensus` succeeds at line 125 but
`executor.executeNonQueryPlan` fails at line 133, the plan is already persisted
in WAL and **will be replayed on restart**.
This is correct WAL semantics, but it introduces a hard requirement: **all
`ConfigPhysicalPlan` implementations must be idempotent under replay.** For
example, a `RegisterDataNodePlan` replayed against an already-registered node
must not fail or double-register.
Is this idempotency property currently guaranteed across all
ConfigPhysicalPlan types? If not, this could cause issues on crash recovery in
edge cases.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java:
##########
@@ -351,56 +360,48 @@ public boolean isReadOnly() {
return CommonDescriptor.getInstance().getConfig().isReadOnly();
}
- private void writeLogForSimpleConsensus(ConfigPhysicalPlan plan) {
- if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) {
- try {
- simpleLogWriter.force();
- File completedFilePath = new File(FILE_PATH + startIndex + "_" +
endIndex);
- Files.move(
- simpleLogFile.toPath(), completedFilePath.toPath(),
StandardCopyOption.ATOMIC_MOVE);
- } catch (IOException e) {
- LOGGER.error("Can't force logWriter for ConfigNode SimpleConsensus
mode", e);
+ private TSStatus persistPlanForSimpleConsensus(ConfigPhysicalPlan plan) {
+ try {
+ if (simpleLogWriter == null || simpleLogFile == null) {
+ throw new IOException("SimpleConsensus log writer is not
initialized.");
}
- for (int retry = 0; retry < 5; retry++) {
- try {
- simpleLogWriter.close();
- } catch (IOException e) {
- LOGGER.warn(
- "Can't close StandAloneLog for ConfigNode SimpleConsensus mode, "
- + "filePath: {}, retry: {}",
- simpleLogFile.getAbsolutePath(),
- retry);
- try {
- // Sleep 1s and retry
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e2) {
- Thread.currentThread().interrupt();
- LOGGER.warn("Unexpected interruption during the close method of
logWriter");
- }
- continue;
- }
- break;
+
+ if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) {
+ rollSimpleConsensusLogFile();
}
- startIndex = endIndex + 1;
- createLogFile(startIndex);
- }
- try {
ByteBuffer buffer = plan.serializeToByteBuffer();
buffer.position(buffer.limit());
simpleLogWriter.write(buffer);
+ simpleLogWriter.force();
Review Comment:
**`force()` on every write makes scheduled flush redundant**
Now that every write is synchronously forced here, the scheduled
`flushWALForSimpleConsensus` thread (lines 447-455) becomes redundant — it
calls `simpleLogWriter.force()` periodically, but there's nothing left to flush.
This is harmless (double-force is idempotent) but worth either:
1. Removing the scheduled thread to avoid confusion, or
2. Adding a brief comment explaining the intentional belt-and-suspenders
approach.
For ConfigNode metadata operations (low write frequency), the per-write
`force()` is the right durability choice.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java:
##########
@@ -351,56 +360,48 @@ public boolean isReadOnly() {
return CommonDescriptor.getInstance().getConfig().isReadOnly();
}
- private void writeLogForSimpleConsensus(ConfigPhysicalPlan plan) {
- if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) {
- try {
- simpleLogWriter.force();
- File completedFilePath = new File(FILE_PATH + startIndex + "_" +
endIndex);
- Files.move(
- simpleLogFile.toPath(), completedFilePath.toPath(),
StandardCopyOption.ATOMIC_MOVE);
- } catch (IOException e) {
- LOGGER.error("Can't force logWriter for ConfigNode SimpleConsensus
mode", e);
+ private TSStatus persistPlanForSimpleConsensus(ConfigPhysicalPlan plan) {
+ try {
+ if (simpleLogWriter == null || simpleLogFile == null) {
+ throw new IOException("SimpleConsensus log writer is not
initialized.");
}
- for (int retry = 0; retry < 5; retry++) {
- try {
- simpleLogWriter.close();
- } catch (IOException e) {
- LOGGER.warn(
- "Can't close StandAloneLog for ConfigNode SimpleConsensus mode, "
- + "filePath: {}, retry: {}",
- simpleLogFile.getAbsolutePath(),
- retry);
- try {
- // Sleep 1s and retry
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e2) {
- Thread.currentThread().interrupt();
- LOGGER.warn("Unexpected interruption during the close method of
logWriter");
- }
- continue;
- }
- break;
+
+ if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) {
+ rollSimpleConsensusLogFile();
}
- startIndex = endIndex + 1;
- createLogFile(startIndex);
- }
- try {
ByteBuffer buffer = plan.serializeToByteBuffer();
buffer.position(buffer.limit());
simpleLogWriter.write(buffer);
+ simpleLogWriter.force();
endIndex = endIndex + 1;
} catch (Exception e) {
LOGGER.error(
- "Can't serialize current ConfigPhysicalPlan for ConfigNode
SimpleConsensus mode", e);
+ "Persist current ConfigPhysicalPlan for ConfigNode SimpleConsensus
mode failed", e);
+ return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage(
+ "Persist ConfigNode SimpleConsensus log failed: " +
String.valueOf(e.getMessage()));
Review Comment:
Nit: `String.valueOf(e.getMessage())` is redundant — string concatenation
with `+` already handles `null` by converting to `"null"`. Can simplify to:
```java
"Persist ConfigNode SimpleConsensus log failed: " + e.getMessage()
```
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java:
##########
@@ -1303,6 +1322,144 @@ public TDataNodeLocation
getLowestLoadDataNode(Set<Integer> nodes) {
return getRegisteredDataNode(dataNodeId).getLocation();
}
+ private TSStatus writeConfigPhysicalPlan(ConfigPhysicalPlan plan) {
+ try {
+ return getConsensusManager().write(plan);
+ } catch (ConsensusException e) {
+ LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+ return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage(e.getMessage());
+ }
+ }
+
+ private boolean isConsensusWriteSuccessful(TSStatus status) {
+ return status != null && status.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode();
+ }
+
+ private TSStatus rollbackDataNodeRegistration(
+ TDataNodeLocation dataNodeLocation, TSStatus versionUpdateStatus) {
+ final TSStatus rollbackStatus =
+ writeConfigPhysicalPlan(
+ new
RemoveDataNodePlan(Collections.singletonList(dataNodeLocation)));
Review Comment:
**Rollback safety verified** — `RemoveDataNodePlan` (via
`NodeInfo.removeDataNode()`) is a pure metadata removal from
`registeredDataNodes` and `nodeVersionInfo` maps, with no side effects like
region migration. Same for `RemoveAINodePlan` and `RemoveConfigNodePlan`. The
rollback approach is safe.
One question: after a successful rollback, the `nodeId` that was generated
via `nodeInfo.generateNextNodeId()` (line 326) is consumed but the node is
removed. Does this leave a gap in the node ID sequence? If so, is that
acceptable, or should `generateNextNodeId` be reverted on rollback to avoid ID
fragmentation?
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java:
##########
@@ -80,9 +80,9 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env,
AddConfigNodeState s
LOG.info("Successfully ADD_PEER {}", tConfigNodeLocation);
break;
case REGISTER_SUCCESS:
- env.notifyRegisterSuccess(tConfigNodeLocation);
-
env.createConfigNodeHeartbeatCache(tConfigNodeLocation.getConfigNodeId());
env.applyConfigNode(tConfigNodeLocation, versionInfo);
Review Comment:
**`applyConfigNode` can now throw `IllegalStateException`**
After the refactoring in `NodeManager.applyConfigNode()`, this call can
throw `IllegalStateException` if the rollback after a version-write failure
succeeds. The exception will be caught by the `catch (Exception e)` at line 89,
which calls `setFailure(...)` only if `isRollbackSupported(state)` returns true
for `REGISTER_SUCCESS`.
Please verify that
`isRollbackSupported(AddConfigNodeState.REGISTER_SUCCESS)` returns `true`,
otherwise the exception might be silently swallowed and the procedure could
continue in an inconsistent state.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java:
##########
@@ -1008,9 +1010,14 @@ public boolean processTakeSnapshot(File snapshotDir)
throws TException, IOExcept
databasePartitionTableEntry.getValue().serialize(bufferedOutputStream,
protocol);
}
+ final List<RegionMaintainTask> copiedRegionMaintainTaskList;
+ synchronized (regionMaintainTaskList) {
+ copiedRegionMaintainTaskList = new ArrayList<>(regionMaintainTaskList);
+ }
Review Comment:
Good fix — `synchronized` copy before iterating. One minor observation: this
copies the entire list under the lock. If `regionMaintainTaskList` can grow
very large, this could cause a brief pause. In practice, for ConfigNode this
list is typically small, so this is fine. Just noting for future awareness.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]