Caideyipi commented on code in PR #15404:
URL: https://github.com/apache/iotdb/pull/15404#discussion_r2067791959


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java:
##########
@@ -231,41 +231,50 @@ public void discardEventsOfPipe(final String 
pipeNameToDrop, int regionId) {
     // Try to remove the events as much as possible
     inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, regionId);
 
-    // synchronized to use the lastEvent & lastExceptionEvent
-    synchronized (this) {
-      // Here we discard the last event, and re-submit the pipe task to avoid 
that the pipe task has
-      // stopped submission but will not be stopped by critical exceptions, 
because when it acquires
-      // lock, the pipe is already dropped, thus it will do nothing.
-      // Note that since we use a new thread to stop all the pipes, we will 
not encounter deadlock
-      // here. Or else we will.
-      if (lastEvent instanceof EnrichedEvent
-          && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName())
-          && regionId == ((EnrichedEvent) lastEvent).getRegionId()) {
-        // Do not clear last event's reference count because it may be on 
transferring
-        lastEvent = null;
-        // Submit self to avoid that the lastEvent has been retried "max 
times" times and has
-        // stopped executing.
-        // 1. If the last event is still on execution, or submitted by the 
previous "onSuccess" or
-        //    "onFailure", the "submitSelf" cause nothing.
-        // 2. If the last event is waiting the instance lock to call 
"onSuccess", then the callback
-        //    method will skip this turn of submission.
-        // 3. If the last event is waiting to call "onFailure", then it will 
be ignored because the
-        //    last event has been set to null.
-        // 4. If the last event has called "onFailure" and caused the subtask 
to stop submission,
-        //    it's submitted here and the "report" will wait for the "drop 
pipe" lock to stop all
-        //    the pipes with critical exceptions. As illustrated above, the 
"report" will do
-        //    nothing.
-        submitSelf();
-      }
+    highPriorityLockTaskCount.incrementAndGet();
+    try {
+      // synchronized to use the lastEvent & lastExceptionEvent
+      synchronized (this) {
+        // Here we discard the last event, and re-submit the pipe task to 
avoid that the pipe task

Review Comment:
   May consider the tidiness of the comments....



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java:
##########
@@ -240,4 +254,13 @@ protected synchronized void 
clearReferenceCountAndReleaseLastExceptionEvent() {
       lastExceptionEvent = null;
     }
   }
+
+  private void preScheduleLowPriorityTask(int maxRetries) {
+    while (highPriorityLockTaskCount.get() != 0L && maxRetries != 0) {
+      maxRetries--;
+      if (maxRetries == 0) {
+        Thread.yield();

Review Comment:
   This may not be useful is the CPU resource is abundant



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to