luoluoyuyu commented on code in PR #17748:
URL: https://github.com/apache/iotdb/pull/17748#discussion_r3309370308


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java:
##########
@@ -238,7 +238,10 @@ private void collectEvent(final Event event) {
       
enrichedEvent.setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes());
 
       if (enrichedEvent.getPipeName() != null
-          && pendingQueue.isPipeDropped(enrichedEvent.getPipeName(), 
creationTime, regionId)) {
+          && (pendingQueue.isEventFromDroppedPipe(enrichedEvent)

Review Comment:
   Drop logic uses isEventFromDroppedPipe when possible, and falls back to 
isPipeDropped only when committerKey is null. Please document when committerKey 
can still be null at collection time; if that window is wide, the fallback 
could discard events for a recreated pipe with the same name and creationTime.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java:
##########
@@ -252,11 +248,18 @@ public void discardEventsOfPipe(
     }
 
     if (outputPipeSink instanceof PipeConnectorWithEventDiscard) {
-      ((PipeConnectorWithEventDiscard) outputPipeSink)
-          .discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
+      ((PipeConnectorWithEventDiscard) 
outputPipeSink).discardEventsOfPipe(committerKey);
     }
   }
 
+  private static boolean isEventFromPipe(
+      final EnrichedEvent event, final CommitterKey committerKey) {
+    return committerKey.getPipeName().equals(event.getPipeName())
+        && committerKey.getCreationTime() == event.getCreationTime()
+        && committerKey.getRegionId() == event.getRegionId()
+        && (committerKey.getRestartTimes() < 0 || 
committerKey.equals(event.getCommitterKey()));

Review Comment:
   restartTimes < 0 skips restart comparison for drop-all-restarts behavior. 
Consider a named constant (e.g. DROP_ALL_RESTARTS) instead of a magic -1, and 
add a test: drop pipe, recreate same name, verify new events are not discarded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to