CRZbulabula commented on code in PR #17609:
URL: https://github.com/apache/iotdb/pull/17609#discussion_r3200312085


##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java:
##########
@@ -121,6 +121,13 @@ public TSStatus write(IConsensusRequest request) {
 
   /** Transmit {@link ConfigPhysicalPlan} to {@link ConfigPlanExecutor} */
   protected TSStatus write(ConfigPhysicalPlan plan) {
+    if 
(ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass()))
 {
+      final TSStatus persistStatus = persistPlanForSimpleConsensus(plan);
+      if (persistStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return persistStatus;
+      }
+    }

Review Comment:
   **WAL replay idempotency requirement**
   
   With the new write-ahead ordering (persist → execute), if 
`persistPlanForSimpleConsensus` succeeds at line 125 but 
`executor.executeNonQueryPlan` fails at line 133, the plan is already persisted 
in WAL and **will be replayed on restart**.
   
   This is correct WAL semantics, but it introduces a hard requirement: **all 
`ConfigPhysicalPlan` implementations must be idempotent under replay.** For 
example, a `RegisterDataNodePlan` replayed against an already-registered node 
must not fail or double-register.
   
   Is this idempotency property currently guaranteed across all 
ConfigPhysicalPlan types? If not, this could cause issues on crash recovery in 
edge cases.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java:
##########
@@ -351,56 +360,48 @@ public boolean isReadOnly() {
     return CommonDescriptor.getInstance().getConfig().isReadOnly();
   }
 
-  private void writeLogForSimpleConsensus(ConfigPhysicalPlan plan) {
-    if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) {
-      try {
-        simpleLogWriter.force();
-        File completedFilePath = new File(FILE_PATH + startIndex + "_" + 
endIndex);
-        Files.move(
-            simpleLogFile.toPath(), completedFilePath.toPath(), 
StandardCopyOption.ATOMIC_MOVE);
-      } catch (IOException e) {
-        LOGGER.error("Can't force logWriter for ConfigNode SimpleConsensus 
mode", e);
+  private TSStatus persistPlanForSimpleConsensus(ConfigPhysicalPlan plan) {
+    try {
+      if (simpleLogWriter == null || simpleLogFile == null) {
+        throw new IOException("SimpleConsensus log writer is not 
initialized.");
       }
-      for (int retry = 0; retry < 5; retry++) {
-        try {
-          simpleLogWriter.close();
-        } catch (IOException e) {
-          LOGGER.warn(
-              "Can't close StandAloneLog for ConfigNode SimpleConsensus mode, "
-                  + "filePath: {}, retry: {}",
-              simpleLogFile.getAbsolutePath(),
-              retry);
-          try {
-            // Sleep 1s and retry
-            TimeUnit.SECONDS.sleep(1);
-          } catch (InterruptedException e2) {
-            Thread.currentThread().interrupt();
-            LOGGER.warn("Unexpected interruption during the close method of 
logWriter");
-          }
-          continue;
-        }
-        break;
+
+      if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) {
+        rollSimpleConsensusLogFile();
       }
-      startIndex = endIndex + 1;
-      createLogFile(startIndex);
-    }
 
-    try {
       ByteBuffer buffer = plan.serializeToByteBuffer();
       buffer.position(buffer.limit());
       simpleLogWriter.write(buffer);
+      simpleLogWriter.force();

Review Comment:
   **`force()` on every write makes scheduled flush redundant**
   
   Now that every write is synchronously forced here, the scheduled 
`flushWALForSimpleConsensus` thread (lines 447-455) becomes redundant — it 
calls `simpleLogWriter.force()` periodically, but there's nothing left to flush.
   
   This is harmless (double-force is idempotent) but worth either:
   1. Removing the scheduled thread to avoid confusion, or
   2. Adding a brief comment explaining the intentional belt-and-suspenders 
approach.
   
   For ConfigNode metadata operations (low write frequency), the per-write 
`force()` is the right durability choice.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java:
##########
@@ -351,56 +360,48 @@ public boolean isReadOnly() {
     return CommonDescriptor.getInstance().getConfig().isReadOnly();
   }
 
-  private void writeLogForSimpleConsensus(ConfigPhysicalPlan plan) {
-    if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) {
-      try {
-        simpleLogWriter.force();
-        File completedFilePath = new File(FILE_PATH + startIndex + "_" + 
endIndex);
-        Files.move(
-            simpleLogFile.toPath(), completedFilePath.toPath(), 
StandardCopyOption.ATOMIC_MOVE);
-      } catch (IOException e) {
-        LOGGER.error("Can't force logWriter for ConfigNode SimpleConsensus 
mode", e);
+  private TSStatus persistPlanForSimpleConsensus(ConfigPhysicalPlan plan) {
+    try {
+      if (simpleLogWriter == null || simpleLogFile == null) {
+        throw new IOException("SimpleConsensus log writer is not 
initialized.");
       }
-      for (int retry = 0; retry < 5; retry++) {
-        try {
-          simpleLogWriter.close();
-        } catch (IOException e) {
-          LOGGER.warn(
-              "Can't close StandAloneLog for ConfigNode SimpleConsensus mode, "
-                  + "filePath: {}, retry: {}",
-              simpleLogFile.getAbsolutePath(),
-              retry);
-          try {
-            // Sleep 1s and retry
-            TimeUnit.SECONDS.sleep(1);
-          } catch (InterruptedException e2) {
-            Thread.currentThread().interrupt();
-            LOGGER.warn("Unexpected interruption during the close method of 
logWriter");
-          }
-          continue;
-        }
-        break;
+
+      if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) {
+        rollSimpleConsensusLogFile();
       }
-      startIndex = endIndex + 1;
-      createLogFile(startIndex);
-    }
 
-    try {
       ByteBuffer buffer = plan.serializeToByteBuffer();
       buffer.position(buffer.limit());
       simpleLogWriter.write(buffer);
+      simpleLogWriter.force();
 
       endIndex = endIndex + 1;
     } catch (Exception e) {
       LOGGER.error(
-          "Can't serialize current ConfigPhysicalPlan for ConfigNode 
SimpleConsensus mode", e);
+          "Persist current ConfigPhysicalPlan for ConfigNode SimpleConsensus 
mode failed", e);
+      return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+          .setMessage(
+              "Persist ConfigNode SimpleConsensus log failed: " + 
String.valueOf(e.getMessage()));

Review Comment:
   Nit: `String.valueOf(e.getMessage())` is redundant — string concatenation 
with `+` already handles `null` by converting to `"null"`. Can simplify to:
   ```java
   "Persist ConfigNode SimpleConsensus log failed: " + e.getMessage()
   ```



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java:
##########
@@ -1303,6 +1322,144 @@ public TDataNodeLocation 
getLowestLoadDataNode(Set<Integer> nodes) {
     return getRegisteredDataNode(dataNodeId).getLocation();
   }
 
+  private TSStatus writeConfigPhysicalPlan(ConfigPhysicalPlan plan) {
+    try {
+      return getConsensusManager().write(plan);
+    } catch (ConsensusException e) {
+      LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
+      return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+          .setMessage(e.getMessage());
+    }
+  }
+
+  private boolean isConsensusWriteSuccessful(TSStatus status) {
+    return status != null && status.getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode();
+  }
+
+  private TSStatus rollbackDataNodeRegistration(
+      TDataNodeLocation dataNodeLocation, TSStatus versionUpdateStatus) {
+    final TSStatus rollbackStatus =
+        writeConfigPhysicalPlan(
+            new 
RemoveDataNodePlan(Collections.singletonList(dataNodeLocation)));

Review Comment:
   **Rollback safety verified** — `RemoveDataNodePlan` (via 
`NodeInfo.removeDataNode()`) is a pure metadata removal from 
`registeredDataNodes` and `nodeVersionInfo` maps, with no side effects like 
region migration. Same for `RemoveAINodePlan` and `RemoveConfigNodePlan`. The 
rollback approach is safe.
   
   One question: after a successful rollback, the `nodeId` that was generated 
via `nodeInfo.generateNextNodeId()` (line 326) is consumed but the node is 
removed. Does this leave a gap in the node ID sequence? If so, is that 
acceptable, or should `generateNextNodeId` be reverted on rollback to avoid ID 
fragmentation?



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java:
##########
@@ -80,9 +80,9 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, 
AddConfigNodeState s
           LOG.info("Successfully ADD_PEER {}", tConfigNodeLocation);
           break;
         case REGISTER_SUCCESS:
-          env.notifyRegisterSuccess(tConfigNodeLocation);
-          
env.createConfigNodeHeartbeatCache(tConfigNodeLocation.getConfigNodeId());
           env.applyConfigNode(tConfigNodeLocation, versionInfo);

Review Comment:
   **`applyConfigNode` can now throw `IllegalStateException`**
   
   After the refactoring in `NodeManager.applyConfigNode()`, this call can 
throw `IllegalStateException` if the rollback after a version-write failure 
succeeds. The exception will be caught by the `catch (Exception e)` at line 89, 
which calls `setFailure(...)` only if `isRollbackSupported(state)` returns true 
for `REGISTER_SUCCESS`.
   
   Please verify that 
`isRollbackSupported(AddConfigNodeState.REGISTER_SUCCESS)` returns `true`, 
otherwise the exception might be silently swallowed and the procedure could 
continue in an inconsistent state.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java:
##########
@@ -1008,9 +1010,14 @@ public boolean processTakeSnapshot(File snapshotDir) 
throws TException, IOExcept
         databasePartitionTableEntry.getValue().serialize(bufferedOutputStream, 
protocol);
       }
 
+      final List<RegionMaintainTask> copiedRegionMaintainTaskList;
+      synchronized (regionMaintainTaskList) {
+        copiedRegionMaintainTaskList = new ArrayList<>(regionMaintainTaskList);
+      }

Review Comment:
   Good fix — `synchronized` copy before iterating. One minor observation: this 
copies the entire list under the lock. If `regionMaintainTaskList` can grow 
very large, this could cause a brief pause. In practice, for ConfigNode this 
list is typically small, so this is fine. Just noting for future awareness.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to