luoluoyuyu commented on code in PR #17734:
URL: https://github.com/apache/iotdb/pull/17734#discussion_r3309322090


##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java:
##########
@@ -176,11 +195,30 @@ public void stopCQScheduler() {
     try {
       previous = executor;
       executor = null;
+      locallyScheduledCQs.clear();
     } finally {
       lock.writeLock().unlock();
     }
     if (previous != null) {
       previous.shutdown();
     }
   }
+
+  public boolean markCQLocallyScheduled(String cqId, String md5) {
+    AtomicBoolean shouldSchedule = new AtomicBoolean(false);
+    locallyScheduledCQs.compute(
+        cqId,
+        (ignored, previousMd5) -> {
+          if (!md5.equals(previousMd5)) {

Review Comment:
   当 `previousMd5 == null`(首次 schedule)时,`!md5.equals(previousMd5)` 为 true,会 
schedule —— 符合预期。
   
   当 `previousMd5` 与 `md5` **相同**(重复 `startCQScheduler` 或重复 submit)时返回 false 跳过 
—— 符合预期。
   
   **潜在问题**:若同一 `cqId` 的 CQ 定义被更新(md5 变化),会再次 schedule,但**旧** `CQScheduleTask` 
是否已 cancel?若未 cancel,可能短暂双跑。请确认 `CreateCQProcedure` / `ActiveCQPlan` 路径会先 drop 
旧 task 或 `CQScheduleTask` 内部有 md5 校验。
   
   **建议**:补 IT 或 UT:`create CQ → restart scheduler → 仅 1 个 active task`;`drop 
CQ → map 中 entry 清除`。



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java:
##########
@@ -176,11 +195,30 @@ public void stopCQScheduler() {
     try {
       previous = executor;
       executor = null;
+      locallyScheduledCQs.clear();
     } finally {
       lock.writeLock().unlock();
     }
     if (previous != null) {
       previous.shutdown();
     }
   }
+
+  public boolean markCQLocallyScheduled(String cqId, String md5) {
+    AtomicBoolean shouldSchedule = new AtomicBoolean(false);
+    locallyScheduledCQs.compute(
+        cqId,
+        (ignored, previousMd5) -> {
+          if (!md5.equals(previousMd5)) {
+            shouldSchedule.set(true);
+            return md5;
+          }
+          return previousMd5;
+        });
+    return shouldSchedule.get();
+  }
+
+  public void unmarkCQLocallyScheduled(String cqId, String md5) {
+    locallyScheduledCQs.remove(cqId, md5);

Review Comment:
   `ConcurrentMap.remove(cqId, md5)` 仅在 value 精确匹配时删除,与 `submitSelf` 失败时 
`unmarkCQLocallyScheduled(entry.getCqId(), entry.getMd5())` 配合正确,可避免误删新一次 
schedule 写入的 md5。
   
   👍 这是好的并发细节。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to