Caideyipi commented on code in PR #17734:
URL: https://github.com/apache/iotdb/pull/17734#discussion_r3332720561


##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java:
##########
@@ -79,7 +85,11 @@ public TSStatus createCQ(TCreateCQReq req) {
 
   public TSStatus dropCQ(TDropCQReq req) {
     try {
-      return configManager.getConsensusManager().write(new 
DropCQPlan(req.cqId));
+      TSStatus status = configManager.getConsensusManager().write(new 
DropCQPlan(req.cqId));
+      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        locallyScheduledCQs.remove(req.cqId);
+      }
+      return status;

Review Comment:
   Done in f09c3b0: dropCQ now coordinates with scheduler start/stop and 
cancels the local CQScheduleTask after the metadata drop succeeds, so queued 
future executions are aborted.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java:
##########
@@ -176,11 +195,30 @@ public void stopCQScheduler() {
     try {
       previous = executor;
       executor = null;
+      locallyScheduledCQs.clear();
     } finally {
       lock.writeLock().unlock();
     }
     if (previous != null) {
       previous.shutdown();
     }
   }
+
+  public boolean markCQLocallyScheduled(String cqId, String md5) {
+    AtomicBoolean shouldSchedule = new AtomicBoolean(false);
+    locallyScheduledCQs.compute(
+        cqId,
+        (ignored, previousMd5) -> {
+          if (!md5.equals(previousMd5)) {

Review Comment:
   Done in f09c3b0: the local schedule registry now stores the metadata token 
together with the CQScheduleTask. If the same CQ id is registered with a 
different token, the old task is cancelled before the new one is scheduled. 
Drop/start/stop also cancel local tasks, and cancelled callbacks no longer 
resubmit.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java:
##########
@@ -168,6 +177,42 @@ private void activeCQ(ConfigNodeProcedureEnv env) {
     }
   }
 
+  void recoverScheduledTask(ConfigNodeProcedureEnv env) throws 
ConsensusException {
+    Optional<CQInfo.CQEntry> cqEntry = getCurrentCQEntry(env);
+    if (!cqEntry.isPresent()) {
+      LOGGER.info(
+          "Skip recovering the schedule task of CQ {} because its metadata is 
unavailable.",
+          req.cqId);
+      return;
+    }
+    submitScheduleTask(env, new CQScheduleTask(cqEntry.get(), executor, 
env.getConfigManager()));
+  }
+
+  Optional<CQInfo.CQEntry> getCurrentCQEntry(ConfigNodeProcedureEnv env) 
throws ConsensusException {
+    ShowCQResp response =
+        (ShowCQResp) env.getConfigManager().getConsensusManager().read(new 
ShowCQPlan());
+    return response.getCqList().stream()
+        .filter(entry -> req.cqId.equals(entry.getCqId()) && 
md5.equals(entry.getMd5()))
+        .findFirst();
+  }
+
+  private static String generateCQToken(String cqId) {
+    return DigestUtils.md2Hex(cqId + "-" + UUID.randomUUID());
+  }

Review Comment:
   Done in f09c3b0: generateCQToken now uses UUID.randomUUID().toString() 
directly and the DigestUtils/md2Hex dependency was removed from 
CreateCQProcedure.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java:
##########
@@ -168,6 +177,42 @@ private void activeCQ(ConfigNodeProcedureEnv env) {
     }
   }
 
+  void recoverScheduledTask(ConfigNodeProcedureEnv env) throws 
ConsensusException {
+    Optional<CQInfo.CQEntry> cqEntry = getCurrentCQEntry(env);
+    if (!cqEntry.isPresent()) {
+      LOGGER.info(
+          "Skip recovering the schedule task of CQ {} because its metadata is 
unavailable.",
+          req.cqId);
+      return;
+    }
+    submitScheduleTask(env, new CQScheduleTask(cqEntry.get(), executor, 
env.getConfigManager()));
+  }
+
+  Optional<CQInfo.CQEntry> getCurrentCQEntry(ConfigNodeProcedureEnv env) 
throws ConsensusException {
+    ShowCQResp response =
+        (ShowCQResp) env.getConfigManager().getConsensusManager().read(new 
ShowCQPlan());
+    return response.getCqList().stream()
+        .filter(entry -> req.cqId.equals(entry.getCqId()) && 
md5.equals(entry.getMd5()))
+        .findFirst();
+  }

Review Comment:
   Done in f09c3b0: ShowCQPlan now accepts an optional CQ id, and CQInfo 
filters by CQ id before returning the response. The recovery path now reads 
only the target CQ and then checks the token.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java:
##########
@@ -57,11 +60,14 @@ public class CQManager {
 
   private final ReadWriteLock lock;
 
+  private final ConcurrentMap<String, String> locallyScheduledCQs;

Review Comment:
   Done in f09c3b0: added an inline explanation. The key is CQ id, and the 
value is the local task plus the metadata token it owns.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java:
##########
@@ -75,7 +81,7 @@ public CreateCQProcedure(ScheduledExecutorService executor) {
   public CreateCQProcedure(TCreateCQReq req, ScheduledExecutorService 
executor) {
     super();
     this.req = req;
-    this.md5 = DigestUtils.md2Hex(req.cqId);
+    this.md5 = generateCQToken(req.cqId);

Review Comment:
   Done in f09c3b0: renamed the CQ identifier from md5 to cqToken across the CQ 
plans, metadata, procedure and scheduler code.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java:
##########
@@ -155,8 +166,16 @@ public void startCQScheduler() {
       if (allCQs != null) {
         for (CQInfo.CQEntry entry : allCQs) {
           if (entry.getState() == CQState.ACTIVE) {
+            if (!markCQLocallyScheduled(entry.getCqId(), entry.getMd5())) {
+              continue;
+            }
             CQScheduleTask cqScheduleTask = new CQScheduleTask(entry, 
executor, configManager);
-            cqScheduleTask.submitSelf();
+            try {
+              cqScheduleTask.submitSelf();
+            } catch (RuntimeException e) {
+              unmarkCQLocallyScheduled(entry.getCqId(), entry.getMd5());
+              throw e;
+            }

Review Comment:
   Done in f09c3b0: locallyScheduledCQs now stores a LocallyScheduledCQ value 
containing the token and CQScheduleTask. CQScheduleTask owns its latest 
ScheduledFuture and exposes cancel(), so drop and token replacement can cancel 
it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to