luoluoyuyu commented on code in PR #17748:
URL: https://github.com/apache/iotdb/pull/17748#discussion_r3309326152
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java:
##########
@@ -252,11 +248,18 @@ public void discardEventsOfPipe(
}
if (outputPipeSink instanceof PipeConnectorWithEventDiscard) {
- ((PipeConnectorWithEventDiscard) outputPipeSink)
- .discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
+ ((PipeConnectorWithEventDiscard)
outputPipeSink).discardEventsOfPipe(committerKey);
}
}
+ private static boolean isEventFromPipe(
+ final EnrichedEvent event, final CommitterKey committerKey) {
+ return committerKey.getPipeName().equals(event.getPipeName())
+ && committerKey.getCreationTime() == event.getCreationTime()
+ && committerKey.getRegionId() == event.getRegionId()
+ && (committerKey.getRestartTimes() < 0 ||
committerKey.equals(event.getCommitterKey()));
Review Comment:
`committerKey.getRestartTimes() < 0` 时跳过 restartTimes 比较,用于 drop 时丢弃该
creationTime 下所有 restart 实例。
**建议**:
1. 在 `CommitterKey` 或调用处用常量/document 说明 `-1` 语义(如 `DROP_ALL_RESTARTS`)
2. 补 IT:`CREATE pipe A → DROP → CREATE 同名 pipe B(新 creationTime)→ 验证 B 的事件不被
discard`
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java:
##########
@@ -238,7 +238,10 @@ private void collectEvent(final Event event) {
enrichedEvent.setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes());
if (enrichedEvent.getPipeName() != null
- && pendingQueue.isPipeDropped(enrichedEvent.getPipeName(),
creationTime, regionId)) {
+ && (pendingQueue.isEventFromDroppedPipe(enrichedEvent)
Review Comment:
丢弃逻辑分两支:
1. `isEventFromDroppedPipe(enrichedEvent)` — 有 committerKey 的精确匹配
2. `committerKey == null` 时回退到 `(pipeName, creationTime, regionId)`
**请确认**:`committerKey == null` 的事件是否只可能来自「尚未 enrich」的瞬时状态?若某些路径长期为 null,可能在
drop 后仍按旧三元组误丢**新** pipe(若 creationTime 复用)。
建议在注释中写明 null committerKey 的生命周期,或保证 enrich 在入队前完成。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]