Copilot commented on code in PR #17332:
URL: https://github.com/apache/iotdb/pull/17332#discussion_r2969190222
##########
integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java:
##########
@@ -210,6 +217,187 @@ public void testReplicaConsistencyAfterLeaderStop()
throws Exception {
}
}
+ /**
+ * Test that DELETE TIMESERIES is properly replicated to all DataNode
replicas via IoTConsensusV2.
+ *
+ * <p>This test reproduces the scenario from the historical deletion
replication bug: when a
+ * timeseries is deleted after data insertion (with some unflushed data),
the deletion event must
+ * be consistently replicated to all replicas. After waiting for replication
to complete, stopping
+ * each DataNode in turn should show the same schema on all surviving nodes.
+ *
+ * <p>Scenario:
+ *
+ * <ol>
+ * <li>Insert data into root.sg.d1 with 3 measurements (speed,
temperature, power), flush
+ * <li>Insert more data (unflushed to create WAL-only entries)
+ * <li>DELETE TIMESERIES root.sg.d1.speed
+ * <li>Flush again to persist deletion
+ * <li>Wait for replication to complete on all DataNodes
+ * <li>Verify that every DataNode independently shows the same timeseries
(speed is gone)
+ * </ol>
+ */
+ public void testDeleteTimeSeriesReplicaConsistency() throws Exception {
+ try (Connection connection =
makeItCloseQuietly(EnvFactory.getEnv().getConnection());
+ Statement statement =
makeItCloseQuietly(connection.createStatement())) {
+
+ // Step 1: Insert data with 3 measurements and flush
+ LOGGER.info(
+ "Step 1: Inserting data with 3 measurements and flushing (mode:
{})...",
+ getIoTConsensusV2Mode());
+ statement.execute(INSERTION1);
+ statement.execute(INSERTION2);
+ statement.execute(FLUSH_COMMAND);
+
+ // Step 2: Insert more data without flush (creates WAL-only entries)
+ LOGGER.info("Step 2: Inserting more data without flush (WAL-only
entries)...");
+ statement.execute(INSERTION3);
+
+ // Step 3: Delete one timeseries
+ LOGGER.info("Step 3: Deleting timeseries root.sg.d1.speed...");
+ statement.execute(DELETE_TIMESERIES_SPEED);
+
+ // Step 4: Flush again to persist the deletion
+ LOGGER.info("Step 4: Flushing to persist deletion...");
+ statement.execute(FLUSH_COMMAND);
+
+ // Verify on the current connection: speed should be gone, 2 timeseries
remain
+ verifyTimeSeriesAfterDelete(statement, "via initial connection");
+
+ // Step 5: Wait for replication to complete on data region leaders
+ LOGGER.info("Step 5: Waiting for replication to complete on data region
leaders...");
+ Map<Integer, Pair<Integer, Set<Integer>>> dataRegionMap =
+ getDataRegionMapWithLeader(statement);
+ Set<Integer> leaderNodeIds = new HashSet<>();
+ for (Pair<Integer, Set<Integer>> leaderAndReplicas :
dataRegionMap.values()) {
+ if (leaderAndReplicas.getLeft() > 0) {
+ leaderNodeIds.add(leaderAndReplicas.getLeft());
+ }
+ }
+ for (int leaderNodeId : leaderNodeIds) {
+ EnvFactory.getEnv()
+ .dataNodeIdToWrapper(leaderNodeId)
+ .ifPresent(this::waitForReplicationComplete);
+ }
+
+ // Step 6: Verify schema consistency on each DataNode independently
+ LOGGER.info("Step 6: Verifying schema consistency on each DataNode
independently...");
+ List<DataNodeWrapper> dataNodeWrappers =
EnvFactory.getEnv().getDataNodeWrapperList();
+ for (DataNodeWrapper wrapper : dataNodeWrappers) {
+ String nodeDescription = "DataNode " + wrapper.getIp() + ":" +
wrapper.getPort();
+ LOGGER.info("Verifying schema on {}", nodeDescription);
+ Awaitility.await()
+ .atMost(60, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ try (Connection nodeConn =
+ makeItCloseQuietly(
+ EnvFactory.getEnv()
+ .getConnection(
+ wrapper,
+ SessionConfig.DEFAULT_USER,
+ SessionConfig.DEFAULT_PASSWORD,
+ BaseEnv.TREE_SQL_DIALECT));
+ Statement nodeStmt =
makeItCloseQuietly(nodeConn.createStatement())) {
+ verifyTimeSeriesAfterDelete(nodeStmt, nodeDescription);
+ }
+ });
+ }
+
+ // Step 7: Stop each DataNode one by one and verify remaining nodes
still consistent
+ LOGGER.info(
+ "Step 7: Stopping each DataNode in turn and verifying remaining
nodes show consistent schema...");
+ for (DataNodeWrapper stoppedNode : dataNodeWrappers) {
+ String stoppedDesc = "DataNode " + stoppedNode.getIp() + ":" +
stoppedNode.getPort();
+ LOGGER.info("Stopping {}", stoppedDesc);
+ stoppedNode.stopForcibly();
+ Assert.assertFalse(stoppedDesc + " should be stopped",
stoppedNode.isAlive());
Review Comment:
After calling stopForcibly(), the test immediately asserts !isAlive().
AbstractNodeWrapper.stopForcibly() only waits up to 10s and ignores the return
value, so the process may still be alive and this assertion can be flaky.
Prefer awaiting the node to actually stop (e.g., Awaitility.until(() ->
!stoppedNode.isAlive())) before proceeding.
```suggestion
Awaitility.await()
.atMost(60, TimeUnit.SECONDS)
.untilAsserted(
() ->
Assert.assertFalse(
stoppedDesc + " should be stopped",
stoppedNode.isAlive()));
```
##########
integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java:
##########
@@ -210,6 +217,187 @@ public void testReplicaConsistencyAfterLeaderStop()
throws Exception {
}
}
+ /**
+ * Test that DELETE TIMESERIES is properly replicated to all DataNode
replicas via IoTConsensusV2.
+ *
+ * <p>This test reproduces the scenario from the historical deletion
replication bug: when a
+ * timeseries is deleted after data insertion (with some unflushed data),
the deletion event must
+ * be consistently replicated to all replicas. After waiting for replication
to complete, stopping
+ * each DataNode in turn should show the same schema on all surviving nodes.
+ *
+ * <p>Scenario:
+ *
+ * <ol>
+ * <li>Insert data into root.sg.d1 with 3 measurements (speed,
temperature, power), flush
+ * <li>Insert more data (unflushed to create WAL-only entries)
+ * <li>DELETE TIMESERIES root.sg.d1.speed
+ * <li>Flush again to persist deletion
+ * <li>Wait for replication to complete on all DataNodes
+ * <li>Verify that every DataNode independently shows the same timeseries
(speed is gone)
+ * </ol>
+ */
+ public void testDeleteTimeSeriesReplicaConsistency() throws Exception {
+ try (Connection connection =
makeItCloseQuietly(EnvFactory.getEnv().getConnection());
+ Statement statement =
makeItCloseQuietly(connection.createStatement())) {
+
+ // Step 1: Insert data with 3 measurements and flush
+ LOGGER.info(
+ "Step 1: Inserting data with 3 measurements and flushing (mode:
{})...",
+ getIoTConsensusV2Mode());
+ statement.execute(INSERTION1);
+ statement.execute(INSERTION2);
+ statement.execute(FLUSH_COMMAND);
+
+ // Step 2: Insert more data without flush (creates WAL-only entries)
+ LOGGER.info("Step 2: Inserting more data without flush (WAL-only
entries)...");
+ statement.execute(INSERTION3);
+
+ // Step 3: Delete one timeseries
+ LOGGER.info("Step 3: Deleting timeseries root.sg.d1.speed...");
+ statement.execute(DELETE_TIMESERIES_SPEED);
+
+ // Step 4: Flush again to persist the deletion
+ LOGGER.info("Step 4: Flushing to persist deletion...");
+ statement.execute(FLUSH_COMMAND);
+
+ // Verify on the current connection: speed should be gone, 2 timeseries
remain
+ verifyTimeSeriesAfterDelete(statement, "via initial connection");
+
+ // Step 5: Wait for replication to complete on data region leaders
+ LOGGER.info("Step 5: Waiting for replication to complete on data region
leaders...");
+ Map<Integer, Pair<Integer, Set<Integer>>> dataRegionMap =
+ getDataRegionMapWithLeader(statement);
+ Set<Integer> leaderNodeIds = new HashSet<>();
+ for (Pair<Integer, Set<Integer>> leaderAndReplicas :
dataRegionMap.values()) {
+ if (leaderAndReplicas.getLeft() > 0) {
+ leaderNodeIds.add(leaderAndReplicas.getLeft());
+ }
+ }
+ for (int leaderNodeId : leaderNodeIds) {
+ EnvFactory.getEnv()
+ .dataNodeIdToWrapper(leaderNodeId)
+ .ifPresent(this::waitForReplicationComplete);
+ }
+
+ // Step 6: Verify schema consistency on each DataNode independently
+ LOGGER.info("Step 6: Verifying schema consistency on each DataNode
independently...");
+ List<DataNodeWrapper> dataNodeWrappers =
EnvFactory.getEnv().getDataNodeWrapperList();
+ for (DataNodeWrapper wrapper : dataNodeWrappers) {
+ String nodeDescription = "DataNode " + wrapper.getIp() + ":" +
wrapper.getPort();
+ LOGGER.info("Verifying schema on {}", nodeDescription);
+ Awaitility.await()
+ .atMost(60, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ try (Connection nodeConn =
+ makeItCloseQuietly(
+ EnvFactory.getEnv()
+ .getConnection(
+ wrapper,
+ SessionConfig.DEFAULT_USER,
+ SessionConfig.DEFAULT_PASSWORD,
+ BaseEnv.TREE_SQL_DIALECT));
+ Statement nodeStmt =
makeItCloseQuietly(nodeConn.createStatement())) {
+ verifyTimeSeriesAfterDelete(nodeStmt, nodeDescription);
+ }
+ });
+ }
+
+ // Step 7: Stop each DataNode one by one and verify remaining nodes
still consistent
+ LOGGER.info(
+ "Step 7: Stopping each DataNode in turn and verifying remaining
nodes show consistent schema...");
Review Comment:
This log message line is likely to exceed the project's 100-character
Checkstyle limit once indentation is included. Please wrap/split the string (or
use multiple LOGGER.info calls) to keep each line within the limit.
```suggestion
"Step 7: Stopping each DataNode in turn and verifying remaining
nodes "
+ "show consistent schema...");
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]