Caideyipi commented on code in PR #15404:
URL: https://github.com/apache/iotdb/pull/15404#discussion_r2067791959
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java:
##########
@@ -231,41 +231,50 @@ public void discardEventsOfPipe(final String
pipeNameToDrop, int regionId) {
// Try to remove the events as much as possible
inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, regionId);
- // synchronized to use the lastEvent & lastExceptionEvent
- synchronized (this) {
- // Here we discard the last event, and re-submit the pipe task to avoid
that the pipe task has
- // stopped submission but will not be stopped by critical exceptions,
because when it acquires
- // lock, the pipe is already dropped, thus it will do nothing.
- // Note that since we use a new thread to stop all the pipes, we will
not encounter deadlock
- // here. Or else we will.
- if (lastEvent instanceof EnrichedEvent
- && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName())
- && regionId == ((EnrichedEvent) lastEvent).getRegionId()) {
- // Do not clear last event's reference count because it may be on
transferring
- lastEvent = null;
- // Submit self to avoid that the lastEvent has been retried "max
times" times and has
- // stopped executing.
- // 1. If the last event is still on execution, or submitted by the
previous "onSuccess" or
- // "onFailure", the "submitSelf" cause nothing.
- // 2. If the last event is waiting the instance lock to call
"onSuccess", then the callback
- // method will skip this turn of submission.
- // 3. If the last event is waiting to call "onFailure", then it will
be ignored because the
- // last event has been set to null.
- // 4. If the last event has called "onFailure" and caused the subtask
to stop submission,
- // it's submitted here and the "report" will wait for the "drop
pipe" lock to stop all
- // the pipes with critical exceptions. As illustrated above, the
"report" will do
- // nothing.
- submitSelf();
- }
+ highPriorityLockTaskCount.incrementAndGet();
+ try {
+ // synchronized to use the lastEvent & lastExceptionEvent
+ synchronized (this) {
+ // Here we discard the last event, and re-submit the pipe task to
avoid that the pipe task
Review Comment:
May consider the tidiness of the comments....
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java:
##########
@@ -240,4 +254,13 @@ protected synchronized void
clearReferenceCountAndReleaseLastExceptionEvent() {
lastExceptionEvent = null;
}
}
+
+ private void preScheduleLowPriorityTask(int maxRetries) {
+ while (highPriorityLockTaskCount.get() != 0L && maxRetries != 0) {
+ maxRetries--;
+ if (maxRetries == 0) {
+ Thread.yield();
Review Comment:
This may not be useful is the CPU resource is abundant
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]