Caideyipi commented on PR #17916:
URL: https://github.com/apache/iotdb/pull/17916#issuecomment-4688797655

   Thanks for the fix. I think the new drop order has a side effect for 
IoTConsensusV2.
   
   PipeTaskStage.drop() stops and then drops the stage, so with 
sinkStage.drop() first, PipeSinkSubtaskManager.deregister() now runs before the 
processor/source stages are dropped. That method deregisters 
PipeEventCommitManager and resets ReplicateProgressDataNodeManager, and its 
current comment explicitly assumes extractor and processor have already been 
dropped. With the new order, that assumption is no longer true.
   
   There is still a race window where the processor execution thread may 
deliver events downstream after close, and the source can still assign 
IoTConsensusV2 replicate indexes while supply() is racing with drop. In that 
case the sink can reset the replicate index to 0 first, and then 
source/processor can assign or process another event afterwards, defeating the 
reset and potentially leaving stale/non-zero leader replicate progress after 
the pipe has been dropped.
   
   Could we split the operation so that the sink is stopped first to stop 
forwarding promptly, then processor/source are dropped, and only then the sink 
does deregister/reset? Alternatively, move the IoTV2 replicate-index reset and 
committer deregistration to a point after processor/source are known to be 
dropped. A test covering DROP PIPE while a historical or realtime consensus 
pipe is actively forwarding would also help guard this ordering.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to