luoluoyuyu commented on code in PR #17734:
URL: https://github.com/apache/iotdb/pull/17734#discussion_r3309368816


##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java:
##########
@@ -176,11 +195,30 @@ public void stopCQScheduler() {
     try {
       previous = executor;
       executor = null;
+      locallyScheduledCQs.clear();
     } finally {
       lock.writeLock().unlock();
     }
     if (previous != null) {
       previous.shutdown();
     }
   }
+
+  public boolean markCQLocallyScheduled(String cqId, String md5) {
+    AtomicBoolean shouldSchedule = new AtomicBoolean(false);
+    locallyScheduledCQs.compute(
+        cqId,
+        (ignored, previousMd5) -> {
+          if (!md5.equals(previousMd5)) {
+            shouldSchedule.set(true);
+            return md5;
+          }
+          return previousMd5;
+        });
+    return shouldSchedule.get();
+  }
+
+  public void unmarkCQLocallyScheduled(String cqId, String md5) {
+    locallyScheduledCQs.remove(cqId, md5);

Review Comment:
   Using remove(cqId, md5) ensures we only unmark the entry we own after a 
failed submitSelf. Good defensive pattern.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java:
##########
@@ -176,11 +195,30 @@ public void stopCQScheduler() {
     try {
       previous = executor;
       executor = null;
+      locallyScheduledCQs.clear();
     } finally {
       lock.writeLock().unlock();
     }
     if (previous != null) {
       previous.shutdown();
     }
   }
+
+  public boolean markCQLocallyScheduled(String cqId, String md5) {
+    AtomicBoolean shouldSchedule = new AtomicBoolean(false);
+    locallyScheduledCQs.compute(
+        cqId,
+        (ignored, previousMd5) -> {
+          if (!md5.equals(previousMd5)) {

Review Comment:
   When previousMd5 is null, !md5.equals(previousMd5) schedules the CQ as 
expected. When previousMd5 equals md5, scheduling is skipped, which avoids 
duplicates.
   
   If the same cqId is updated with a new md5, this returns true and schedules 
again. Please confirm in this PR that any previously running CQScheduleTask for 
the old md5 is stopped or superseded; otherwise two tasks could run briefly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to