CRZbulabula commented on code in PR #17735:
URL: https://github.com/apache/iotdb/pull/17735#discussion_r3279367681
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java:
##########
@@ -92,6 +93,20 @@ public TSStatus setTTL(SetTTLPlan plan) {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
+ private int calculateNewTTLRuleCount(SetTTLPlan plan) {
+ int newTTLRuleCount = getNewTTLRuleCount(plan.getPathPattern());
+ if (plan.isDataBase()) {
+ String[] pathNodes = Arrays.copyOf(plan.getPathPattern(),
plan.getPathPattern().length + 1);
+ pathNodes[pathNodes.length - 1] =
IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+ newTTLRuleCount += getNewTTLRuleCount(pathNodes);
+ }
+ return newTTLRuleCount;
+ }
+
+ private int getNewTTLRuleCount(String[] pathNodes) {
+ return ttlCache.getLastNodeTTL(pathNodes) == TTLCache.NULL_TTL ? 1 : 0;
Review Comment:
`getNewTTLRuleCount` is slightly mis-named — it returns a 0/1 indicator, not
a count. Consider renaming to `isNewTTLRule` (return `boolean`) and
accumulating `boolean ? 1 : 0` in `calculateNewTTLRuleCount`. Reads more
naturally at the call site.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java:
##########
@@ -70,7 +70,8 @@ public TSStatus setTTL(SetTTLPlan plan) {
try {
// check ttl rule capacity
final int tTlRuleCapacity =
CommonDescriptor.getInstance().getConfig().getTTlRuleCapacity();
- if (getTTLCount() >= tTlRuleCapacity) {
+ final int newTTLRuleCount = calculateNewTTLRuleCount(plan);
+ if (newTTLRuleCount > 0 && ttlCache.getTtlCount() + newTTLRuleCount >
tTlRuleCapacity) {
Review Comment:
**Capacity check looks correct.** Short-circuiting on `newTTLRuleCount > 0`
is what lets `testUpdateExistingTTLWhenCurrentStateIsAlreadyOversize` pass — an
update of an existing rule no longer trips the limit even when the current
count is already over capacity.
One diagnostic suggestion: when the limit is hit, include both
`tTlRuleCapacity` and `ttlCache.getTtlCount() + newTTLRuleCount` in the error
message. It makes it much easier to tell at a glance whether the cluster
genuinely filled the table, or whether the capacity was tightened by a config
change.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java:
##########
@@ -86,18 +93,9 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env,
SetTTLState state)
}
}
- private void setConfigNodeTTL(ConfigNodeProcedureEnv env) {
- TSStatus res;
- try {
- res =
- env.getConfigManager()
- .getConsensusManager()
- .write(isGeneratedByPipe ? new PipeEnrichedPlan(this.plan) :
this.plan);
- } catch (ConsensusException e) {
-
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
e);
- res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
- res.setMessage(e.getMessage());
- }
+ protected void setConfigNodeTTL(final ConfigNodeProcedureEnv env) {
Review Comment:
These methods (`setConfigNodeTTL`, `updateDataNodeTTL`,
`writeConfigNodePlan`, `sendTTLRequest`) are made `protected` only so that
`TestingSetTTLProcedure` in the test file can override them — no production
subclass needs them. Consider making them package-private and putting the test
class in the same package; that keeps the public/protected surface narrower.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java:
##########
@@ -86,18 +93,9 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env,
SetTTLState state)
}
}
- private void setConfigNodeTTL(ConfigNodeProcedureEnv env) {
- TSStatus res;
- try {
- res =
- env.getConfigManager()
- .getConsensusManager()
- .write(isGeneratedByPipe ? new PipeEnrichedPlan(this.plan) :
this.plan);
- } catch (ConsensusException e) {
-
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
e);
- res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
- res.setMessage(e.getMessage());
- }
+ protected void setConfigNodeTTL(final ConfigNodeProcedureEnv env) {
+ capturePreviousTTLState(env);
+ final TSStatus res = writeConfigNodePlan(env, plan);
Review Comment:
**(medium) `previousTTL` can be overwritten with the new value after a
crash.**
`capturePreviousTTLState` and `writeConfigNodePlan` both run inside one
`executeFromState(SET_CONFIGNODE_TTL)` invocation, but the procedure framework
only persists `previousTTLStateCaptured` / `previousTTL` on state transitions.
If the ConfigNode crashes after the consensus write succeeds but before
`setNextState(UPDATE_DATANODE_CACHE)` is durable, recovery replays
`SET_CONFIGNODE_TTL`: `previousTTLStateCaptured` is still `false`, so we re-run
capture — and `getAllTTL()` now returns the **new** value, which gets recorded
as the "previous" TTL. If the DataNode update then fails, the rollback restores
the new value over itself, losing the original.
Suggestion: split capture into its own pre-state (e.g.
`CAPTURE_PREVIOUS_TTL` → `SET_CONFIGNODE_TTL` → `UPDATE_DATANODE_CACHE`) so the
captured snapshot is persisted with procedure state *before* the consensus
write.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java:
##########
@@ -47,14 +49,19 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
public class SetTTLProcedure extends
StateMachineProcedure<ConfigNodeProcedureEnv, SetTTLState> {
private static final Logger LOGGER =
LoggerFactory.getLogger(SetTTLProcedure.class);
+ private static final long TTL_NOT_EXIST = Long.MIN_VALUE;
private SetTTLPlan plan;
+ private long previousTTL = TTL_NOT_EXIST;
Review Comment:
Worth a one-line comment explaining why this sentinel is `Long.MIN_VALUE`
rather than `TTLCache.NULL_TTL` (`-1`). They have different meanings on the
rollback path: `TTL_NOT_EXIST` means "no TTL was set before this procedure",
while `NULL_TTL` is the explicit "unset" marker that `ConfigPlanExecutor`
interprets to route to `unsetTTL`. Conflating the two would corrupt rollback
behavior.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java:
##########
@@ -106,35 +104,171 @@ private void setConfigNodeTTL(ConfigNodeProcedureEnv
env) {
}
}
- private void updateDataNodeTTL(ConfigNodeProcedureEnv env) {
- Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ protected void updateDataNodeTTL(final ConfigNodeProcedureEnv env) {
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
- DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
- new DataNodeAsyncRequestContext<>(
- CnToDnAsyncRequestType.SET_TTL,
- new TSetTTLReq(
- Collections.singletonList(String.join(".",
plan.getPathPattern())),
- plan.getTTL(),
- plan.isDataBase()),
- dataNodeLocationMap);
+ final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+ sendTTLRequest(
+ dataNodeLocationMap,
+ buildSetTTLReq(plan.getPathPattern(), plan.getTTL(),
plan.isDataBase()));
+ if (hasFailedDataNode(clientHandler)) {
+ LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
+ setFailure(
+ new ProcedureException(
+ new
MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
+ }
+ }
+
+ private void capturePreviousTTLState(final ConfigNodeProcedureEnv env) {
+ if (previousTTLStateCaptured) {
+ return;
+ }
+ final Map<String, Long> ttlMap =
env.getConfigManager().getTTLManager().getAllTTL();
Review Comment:
**Performance: a full consensus read for one or two entries.**
`TTLManager.getAllTTL()` goes through
`getConsensusManager().read(showTTLPlan)`, which reads the entire TTL table
across consensus and serializes the DFS result into a `Map`. Every
`SetTTLProcedure` now pays this cost, which becomes noticeable on clusters
approaching `tTlRuleCapacity`.
Consider adding a targeted `getTTL(String[] pathPattern)` to `TTLManager` /
`TTLInfo` (e.g. wrapping `TTLCache.getLastNodeTTL`) and calling it twice when
`isDataBase()` is true.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java:
##########
@@ -159,14 +293,24 @@ public void serialize(DataOutputStream stream) throws
IOException {
: ProcedureType.SET_TTL_PROCEDURE.getTypeCode());
super.serialize(stream);
ReadWriteIOUtils.write(plan.serializeToByteBuffer(), stream);
+ stream.writeBoolean(previousTTLStateCaptured);
+ stream.writeLong(previousTTL);
+ stream.writeLong(previousDatabaseWildcardTTL);
}
@Override
public void deserialize(ByteBuffer byteBuffer) {
super.deserialize(byteBuffer);
try {
- ReadWriteIOUtils.readInt(byteBuffer);
+ final int length = ReadWriteIOUtils.readInt(byteBuffer);
+ final int position = byteBuffer.position();
this.plan = (SetTTLPlan) ConfigPhysicalPlan.Factory.create(byteBuffer);
+ byteBuffer.position(position + length);
+ if (byteBuffer.remaining() >= 17) {
Review Comment:
`17` is a magic number (`1 byte boolean + 2 * 8 byte long`). Extract a
constant:
```java
private static final int ROLLBACK_STATE_BYTES = Byte.BYTES + Long.BYTES * 2;
...
if (byteBuffer.remaining() >= ROLLBACK_STATE_BYTES) { ... }
```
Makes it survive any future addition of rollback fields without forgetting
to update the guard.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java:
##########
@@ -159,14 +293,24 @@ public void serialize(DataOutputStream stream) throws
IOException {
: ProcedureType.SET_TTL_PROCEDURE.getTypeCode());
super.serialize(stream);
ReadWriteIOUtils.write(plan.serializeToByteBuffer(), stream);
+ stream.writeBoolean(previousTTLStateCaptured);
+ stream.writeLong(previousTTL);
+ stream.writeLong(previousDatabaseWildcardTTL);
}
@Override
public void deserialize(ByteBuffer byteBuffer) {
super.deserialize(byteBuffer);
try {
- ReadWriteIOUtils.readInt(byteBuffer);
+ final int length = ReadWriteIOUtils.readInt(byteBuffer);
+ final int position = byteBuffer.position();
this.plan = (SetTTLPlan) ConfigPhysicalPlan.Factory.create(byteBuffer);
+ byteBuffer.position(position + length);
Review Comment:
This `byteBuffer.position(position + length)` is necessary (not redundant) —
worth a comment so a future reader doesn't "clean it up".
Reason: `serializeToByteBuffer()` returns
`ByteBuffer.wrap(publicBAOS.getBuf(), 0, size())`.
`ReadWriteIOUtils.write(ByteBuffer, OutputStream)` then writes
`byteBuffer.capacity()` (the *full backing array length*, which can be greater
than the actual data size) followed by the whole array, padding included.
Without forcing the position back to `position + length`, the new `boolean +
2*long` trailing fields would be read from those padding zero bytes.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java:
##########
@@ -180,12 +324,21 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
- return this.plan.equals(((SetTTLProcedure) o).plan)
- && this.isGeneratedByPipe == (((SetTTLProcedure) o).isGeneratedByPipe);
+ final SetTTLProcedure that = (SetTTLProcedure) o;
+ return this.isGeneratedByPipe == that.isGeneratedByPipe
+ && this.previousTTL == that.previousTTL
+ && this.previousDatabaseWildcardTTL == that.previousDatabaseWildcardTTL
+ && this.previousTTLStateCaptured == that.previousTTLStateCaptured
Review Comment:
Including `previousTTL` / `previousDatabaseWildcardTTL` /
`previousTTLStateCaptured` in `equals`/`hashCode` is consistent with the
serialization change, but please confirm no caller dedups procedures by
`equals` on the wait queue. If something does, two procedures with the same
`plan` but different captured state would now be considered distinct, which
could change idempotency behavior. If there's no such caller, ignore this
comment.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java:
##########
@@ -106,35 +104,171 @@ private void setConfigNodeTTL(ConfigNodeProcedureEnv
env) {
}
}
- private void updateDataNodeTTL(ConfigNodeProcedureEnv env) {
- Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ protected void updateDataNodeTTL(final ConfigNodeProcedureEnv env) {
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
- DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
- new DataNodeAsyncRequestContext<>(
- CnToDnAsyncRequestType.SET_TTL,
- new TSetTTLReq(
- Collections.singletonList(String.join(".",
plan.getPathPattern())),
- plan.getTTL(),
- plan.isDataBase()),
- dataNodeLocationMap);
+ final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+ sendTTLRequest(
+ dataNodeLocationMap,
+ buildSetTTLReq(plan.getPathPattern(), plan.getTTL(),
plan.isDataBase()));
+ if (hasFailedDataNode(clientHandler)) {
+ LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
+ setFailure(
+ new ProcedureException(
+ new
MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
+ }
+ }
+
+ private void capturePreviousTTLState(final ConfigNodeProcedureEnv env) {
+ if (previousTTLStateCaptured) {
+ return;
+ }
+ final Map<String, Long> ttlMap =
env.getConfigManager().getTTLManager().getAllTTL();
+ previousTTL = getTTLOrDefault(ttlMap, plan.getPathPattern());
+ if (plan.isDataBase()) {
+ previousDatabaseWildcardTTL =
+ getTTLOrDefault(ttlMap,
getDatabaseWildcardPathPattern(plan.getPathPattern()));
+ }
+ previousTTLStateCaptured = true;
+ }
+
+ protected TSStatus writeConfigNodePlan(
+ final ConfigNodeProcedureEnv env, final SetTTLPlan setTTLPlan) {
+ try {
+ return env.getConfigManager()
+ .getConsensusManager()
+ .write(isGeneratedByPipe ? new PipeEnrichedPlan(setTTLPlan) :
setTTLPlan);
+ } catch (ConsensusException e) {
+
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
e);
+ final TSStatus res = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+ res.setMessage(e.getMessage());
+ return res;
+ }
+ }
+
+ protected DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> sendTTLRequest(
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap, final
TSetTTLReq req) {
+ final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+ new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SET_TTL, req,
dataNodeLocationMap);
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
- Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
- for (TSStatus status : statusMap.values()) {
- // all dataNodes must clear the related schemaengine cache
+ return clientHandler;
+ }
+
+ private TSetTTLReq buildSetTTLReq(
+ final String[] pathPattern, final long ttl, final boolean isDataBase) {
+ return new TSetTTLReq(
+ Collections.singletonList(String.join(".", pathPattern)), ttl,
isDataBase);
+ }
+
+ private boolean hasFailedDataNode(
+ final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler) {
+ if (!clientHandler.getRequestIndices().isEmpty()) {
+ return true;
+ }
+ for (TSStatus status : clientHandler.getResponseMap().values()) {
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
- setFailure(
- new ProcedureException(
- new
MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
- return;
+ return true;
}
}
+ return false;
+ }
+
+ private long getTTLOrDefault(final Map<String, Long> ttlMap, final String[]
pathPattern) {
+ return ttlMap.getOrDefault(String.join(".", pathPattern), TTL_NOT_EXIST);
+ }
+
+ private String[] getDatabaseWildcardPathPattern(final String[] pathPattern) {
+ final String[] pathNodes = Arrays.copyOf(pathPattern, pathPattern.length +
1);
+ pathNodes[pathNodes.length - 1] = IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+ return pathNodes;
+ }
+
+ private void rollbackConfigNodeTTL(final ConfigNodeProcedureEnv env) throws
ProcedureException {
+ restoreTTLOnConfigNode(env, plan.getPathPattern(), previousTTL);
+ if (plan.isDataBase()) {
+ restoreTTLOnConfigNode(
+ env, getDatabaseWildcardPathPattern(plan.getPathPattern()),
previousDatabaseWildcardTTL);
+ }
+ }
+
+ private void restoreTTLOnConfigNode(
+ final ConfigNodeProcedureEnv env, final String[] pathPattern, final long
ttl)
+ throws ProcedureException {
+ final SetTTLPlan rollbackPlan =
+ new SetTTLPlan(pathPattern, ttl == TTL_NOT_EXIST ? TTLCache.NULL_TTL :
ttl);
Review Comment:
The `ttl == TTL_NOT_EXIST ? TTLCache.NULL_TTL : ttl` translation deserves an
inline comment. It encodes a cross-file convention: a `SetTTLPlan` whose `ttl
== NULL_TTL` is interpreted by `ConfigPlanExecutor.SetTTL` (file
`ConfigPlanExecutor.java:430`) as a signal to call `ttlInfo.unsetTTL` rather
than `setTTL`, and similarly `StorageEngine.setTTL` (`StorageEngine.java:973`)
routes to `unsetTTLForTree`. Reading the rollback code in isolation, it's not
obvious that writing `-1` is what undoes the previous SET — a one-line note
here would save a lot of code-spelunking.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java:
##########
@@ -106,35 +104,171 @@ private void setConfigNodeTTL(ConfigNodeProcedureEnv
env) {
}
}
- private void updateDataNodeTTL(ConfigNodeProcedureEnv env) {
- Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ protected void updateDataNodeTTL(final ConfigNodeProcedureEnv env) {
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
- DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
- new DataNodeAsyncRequestContext<>(
- CnToDnAsyncRequestType.SET_TTL,
- new TSetTTLReq(
- Collections.singletonList(String.join(".",
plan.getPathPattern())),
- plan.getTTL(),
- plan.isDataBase()),
- dataNodeLocationMap);
+ final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+ sendTTLRequest(
+ dataNodeLocationMap,
+ buildSetTTLReq(plan.getPathPattern(), plan.getTTL(),
plan.isDataBase()));
+ if (hasFailedDataNode(clientHandler)) {
+ LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
+ setFailure(
+ new ProcedureException(
+ new
MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
+ }
+ }
+
+ private void capturePreviousTTLState(final ConfigNodeProcedureEnv env) {
+ if (previousTTLStateCaptured) {
+ return;
+ }
+ final Map<String, Long> ttlMap =
env.getConfigManager().getTTLManager().getAllTTL();
+ previousTTL = getTTLOrDefault(ttlMap, plan.getPathPattern());
+ if (plan.isDataBase()) {
+ previousDatabaseWildcardTTL =
+ getTTLOrDefault(ttlMap,
getDatabaseWildcardPathPattern(plan.getPathPattern()));
+ }
+ previousTTLStateCaptured = true;
+ }
+
+ protected TSStatus writeConfigNodePlan(
+ final ConfigNodeProcedureEnv env, final SetTTLPlan setTTLPlan) {
+ try {
+ return env.getConfigManager()
+ .getConsensusManager()
+ .write(isGeneratedByPipe ? new PipeEnrichedPlan(setTTLPlan) :
setTTLPlan);
+ } catch (ConsensusException e) {
+
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
e);
+ final TSStatus res = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+ res.setMessage(e.getMessage());
+ return res;
+ }
+ }
+
+ protected DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> sendTTLRequest(
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap, final
TSetTTLReq req) {
+ final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+ new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SET_TTL, req,
dataNodeLocationMap);
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
- Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
- for (TSStatus status : statusMap.values()) {
- // all dataNodes must clear the related schemaengine cache
+ return clientHandler;
+ }
+
+ private TSetTTLReq buildSetTTLReq(
+ final String[] pathPattern, final long ttl, final boolean isDataBase) {
+ return new TSetTTLReq(
+ Collections.singletonList(String.join(".", pathPattern)), ttl,
isDataBase);
+ }
+
+ private boolean hasFailedDataNode(
+ final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler) {
+ if (!clientHandler.getRequestIndices().isEmpty()) {
+ return true;
+ }
+ for (TSStatus status : clientHandler.getResponseMap().values()) {
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
- setFailure(
- new ProcedureException(
- new
MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
- return;
+ return true;
}
}
+ return false;
+ }
+
+ private long getTTLOrDefault(final Map<String, Long> ttlMap, final String[]
pathPattern) {
+ return ttlMap.getOrDefault(String.join(".", pathPattern), TTL_NOT_EXIST);
+ }
+
+ private String[] getDatabaseWildcardPathPattern(final String[] pathPattern) {
+ final String[] pathNodes = Arrays.copyOf(pathPattern, pathPattern.length +
1);
+ pathNodes[pathNodes.length - 1] = IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+ return pathNodes;
+ }
+
+ private void rollbackConfigNodeTTL(final ConfigNodeProcedureEnv env) throws
ProcedureException {
+ restoreTTLOnConfigNode(env, plan.getPathPattern(), previousTTL);
+ if (plan.isDataBase()) {
+ restoreTTLOnConfigNode(
+ env, getDatabaseWildcardPathPattern(plan.getPathPattern()),
previousDatabaseWildcardTTL);
+ }
+ }
+
+ private void restoreTTLOnConfigNode(
+ final ConfigNodeProcedureEnv env, final String[] pathPattern, final long
ttl)
+ throws ProcedureException {
+ final SetTTLPlan rollbackPlan =
+ new SetTTLPlan(pathPattern, ttl == TTL_NOT_EXIST ? TTLCache.NULL_TTL :
ttl);
+ rollbackPlan.setDataBase(false);
Review Comment:
The explicit `setDataBase(false)` here is correct but non-obvious — please
add a comment. The reason: in the database case, `rollbackConfigNodeTTL`
already issues two separate `restoreTTLOnConfigNode` calls (one for `db`, one
for `db.**`). If this inner `SetTTLPlan` carried `isDataBase=true`,
`TTLInfo.setTTL` would auto-expand the wildcard *again*, overwriting the
just-restored `db.**` value with the value being applied to `db`. Forcing
`isDataBase=false` keeps each path independent.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java:
##########
@@ -106,35 +104,171 @@ private void setConfigNodeTTL(ConfigNodeProcedureEnv
env) {
}
}
- private void updateDataNodeTTL(ConfigNodeProcedureEnv env) {
- Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ protected void updateDataNodeTTL(final ConfigNodeProcedureEnv env) {
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
- DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
- new DataNodeAsyncRequestContext<>(
- CnToDnAsyncRequestType.SET_TTL,
- new TSetTTLReq(
- Collections.singletonList(String.join(".",
plan.getPathPattern())),
- plan.getTTL(),
- plan.isDataBase()),
- dataNodeLocationMap);
+ final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+ sendTTLRequest(
+ dataNodeLocationMap,
+ buildSetTTLReq(plan.getPathPattern(), plan.getTTL(),
plan.isDataBase()));
+ if (hasFailedDataNode(clientHandler)) {
+ LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
+ setFailure(
+ new ProcedureException(
+ new
MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
+ }
+ }
+
+ private void capturePreviousTTLState(final ConfigNodeProcedureEnv env) {
+ if (previousTTLStateCaptured) {
+ return;
+ }
+ final Map<String, Long> ttlMap =
env.getConfigManager().getTTLManager().getAllTTL();
+ previousTTL = getTTLOrDefault(ttlMap, plan.getPathPattern());
+ if (plan.isDataBase()) {
+ previousDatabaseWildcardTTL =
+ getTTLOrDefault(ttlMap,
getDatabaseWildcardPathPattern(plan.getPathPattern()));
+ }
+ previousTTLStateCaptured = true;
+ }
+
+ protected TSStatus writeConfigNodePlan(
+ final ConfigNodeProcedureEnv env, final SetTTLPlan setTTLPlan) {
+ try {
+ return env.getConfigManager()
+ .getConsensusManager()
+ .write(isGeneratedByPipe ? new PipeEnrichedPlan(setTTLPlan) :
setTTLPlan);
+ } catch (ConsensusException e) {
+
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
e);
+ final TSStatus res = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+ res.setMessage(e.getMessage());
+ return res;
+ }
+ }
+
+ protected DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> sendTTLRequest(
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap, final
TSetTTLReq req) {
+ final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+ new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SET_TTL, req,
dataNodeLocationMap);
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
- Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
- for (TSStatus status : statusMap.values()) {
- // all dataNodes must clear the related schemaengine cache
+ return clientHandler;
+ }
+
+ private TSetTTLReq buildSetTTLReq(
+ final String[] pathPattern, final long ttl, final boolean isDataBase) {
+ return new TSetTTLReq(
+ Collections.singletonList(String.join(".", pathPattern)), ttl,
isDataBase);
+ }
+
+ private boolean hasFailedDataNode(
+ final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler) {
+ if (!clientHandler.getRequestIndices().isEmpty()) {
+ return true;
+ }
+ for (TSStatus status : clientHandler.getResponseMap().values()) {
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
- setFailure(
- new ProcedureException(
- new
MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
- return;
+ return true;
}
}
+ return false;
+ }
+
+ private long getTTLOrDefault(final Map<String, Long> ttlMap, final String[]
pathPattern) {
+ return ttlMap.getOrDefault(String.join(".", pathPattern), TTL_NOT_EXIST);
+ }
+
+ private String[] getDatabaseWildcardPathPattern(final String[] pathPattern) {
+ final String[] pathNodes = Arrays.copyOf(pathPattern, pathPattern.length +
1);
+ pathNodes[pathNodes.length - 1] = IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+ return pathNodes;
+ }
+
+ private void rollbackConfigNodeTTL(final ConfigNodeProcedureEnv env) throws
ProcedureException {
+ restoreTTLOnConfigNode(env, plan.getPathPattern(), previousTTL);
+ if (plan.isDataBase()) {
+ restoreTTLOnConfigNode(
+ env, getDatabaseWildcardPathPattern(plan.getPathPattern()),
previousDatabaseWildcardTTL);
+ }
+ }
+
+ private void restoreTTLOnConfigNode(
+ final ConfigNodeProcedureEnv env, final String[] pathPattern, final long
ttl)
+ throws ProcedureException {
+ final SetTTLPlan rollbackPlan =
+ new SetTTLPlan(pathPattern, ttl == TTL_NOT_EXIST ? TTLCache.NULL_TTL :
ttl);
+ rollbackPlan.setDataBase(false);
+ final TSStatus status = writeConfigNodePlan(env, rollbackPlan);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new ProcedureException(
+ new MetadataException(
+ "Rollback ConfigNode ttl failed for "
+ + String.join(".", pathPattern)
+ + ": "
+ + status.getMessage()));
+ }
+ }
+
+ private void rollbackDataNodeTTL(final ConfigNodeProcedureEnv env) throws
ProcedureException {
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
+ restoreTTLOnDataNodes(dataNodeLocationMap, plan.getPathPattern(),
previousTTL);
+ if (plan.isDataBase()) {
+ restoreTTLOnDataNodes(
+ dataNodeLocationMap,
+ getDatabaseWildcardPathPattern(plan.getPathPattern()),
+ previousDatabaseWildcardTTL);
+ }
+ }
+
+ private void restoreTTLOnDataNodes(
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+ final String[] pathPattern,
+ final long ttl)
+ throws ProcedureException {
+ if (dataNodeLocationMap.isEmpty()) {
+ return;
+ }
+ final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+ sendTTLRequest(
+ dataNodeLocationMap,
+ buildSetTTLReq(pathPattern, ttl == TTL_NOT_EXIST ?
TTLCache.NULL_TTL : ttl, false));
+ if (hasFailedDataNode(clientHandler)) {
+ throw new ProcedureException(
+ new MetadataException(
+ "Rollback dataNode ttl cache failed for " + String.join(".",
pathPattern)));
+ }
}
@Override
- protected void rollbackState(
- ConfigNodeProcedureEnv configNodeProcedureEnv, SetTTLState setTTLState)
- throws IOException, InterruptedException, ProcedureException {}
+ protected void rollbackState(final ConfigNodeProcedureEnv env, final
SetTTLState setTTLState)
+ throws IOException, InterruptedException, ProcedureException {
+ if (setTTLState != SetTTLState.UPDATE_DATANODE_CACHE ||
!previousTTLStateCaptured) {
+ return;
+ }
+ ProcedureException rollbackFailure = null;
+ try {
+ rollbackConfigNodeTTL(env);
+ } catch (ProcedureException e) {
+ LOGGER.error("Failed to rollback ConfigNode ttl state.", e);
+ rollbackFailure = e;
+ }
+ try {
+ rollbackDataNodeTTL(env);
Review Comment:
Two suggestions on the rollback error-aggregation strategy:
1. Only the *first* exception survives; the second is logged but lost. Use
`rollbackFailure.addSuppressed(e)` so a postmortem can still see both.
2. Add a method-level comment clarifying the policy: "best-effort — if one
side fails, the other is still attempted; the earliest exception is what
propagates." Otherwise readers might expect fail-fast semantics.
##########
iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java:
##########
@@ -65,4 +83,234 @@ public void serializeDeserializeTest() throws IOException,
IllegalPathException
buffer.clear();
byteArrayOutputStream.reset();
}
+
+ @Test
+ public void serializeDeserializeTestWithCapturedRollbackState() throws
Exception {
+ final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
+
+ final SetTTLPlan setTTLPlan =
+ new SetTTLPlan(Arrays.asList(new PartialPath("root.db").getNodes()),
2000L);
+ setTTLPlan.setDataBase(true);
+ final TestingSetTTLProcedure procedure = new
TestingSetTTLProcedure(setTTLPlan);
+
+ final Map<String, Long> ttlMap = new HashMap<>();
+ ttlMap.put("root.**", Long.MAX_VALUE);
+ ttlMap.put("root.db", 500L);
+ ttlMap.put("root.db.**", 600L);
+
+ procedure.executeFromState(mockProcedureEnv(ttlMap),
SetTTLState.SET_CONFIGNODE_TTL);
+
+ procedure.serialize(outputStream);
+ final ByteBuffer buffer =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ final SetTTLProcedure deserializedProcedure =
+ (SetTTLProcedure) ProcedureFactory.getInstance().create(buffer);
+ assertSerializedProcedure(
+ deserializedProcedure, "root.db", 2000L, true, true, 500L, 600L,
false);
+ }
Review Comment:
Please add a backward-compatibility deserialization test: construct an "old
format" byte stream by hand — `[procedureType:short][super
state...][length:int][plan bytes]` with no trailing 17 bytes — feed it through
`deserialize`, and assert `previousTTLStateCaptured == false`, `previousTTL ==
Long.MIN_VALUE`, `previousDatabaseWildcardTTL == Long.MIN_VALUE`. The current
round-trip test only exercises new-write/new-read; without an old-format
fixture, a rolling upgrade where an in-flight pre-PR procedure is persisted and
then resumed on a post-PR ConfigNode is not covered.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]