jt2594838 commented on code in PR #17734:
URL: https://github.com/apache/iotdb/pull/17734#discussion_r3331337314


##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java:
##########
@@ -57,11 +60,14 @@ public class CQManager {
 
   private final ReadWriteLock lock;
 
+  private final ConcurrentMap<String, String> locallyScheduledCQs;

Review Comment:
   Explain the key and the value.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java:
##########
@@ -79,7 +85,11 @@ public TSStatus createCQ(TCreateCQReq req) {
 
   public TSStatus dropCQ(TDropCQReq req) {
     try {
-      return configManager.getConsensusManager().write(new 
DropCQPlan(req.cqId));
+      TSStatus status = configManager.getConsensusManager().write(new 
DropCQPlan(req.cqId));
+      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        locallyScheduledCQs.remove(req.cqId);
+      }
+      return status;

Review Comment:
   Dropping a CQ does not abort it?



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java:
##########
@@ -155,8 +166,16 @@ public void startCQScheduler() {
       if (allCQs != null) {
         for (CQInfo.CQEntry entry : allCQs) {
           if (entry.getState() == CQState.ACTIVE) {
+            if (!markCQLocallyScheduled(entry.getCqId(), entry.getMd5())) {
+              continue;
+            }
             CQScheduleTask cqScheduleTask = new CQScheduleTask(entry, 
executor, configManager);
-            cqScheduleTask.submitSelf();
+            try {
+              cqScheduleTask.submitSelf();
+            } catch (RuntimeException e) {
+              unmarkCQLocallyScheduled(entry.getCqId(), entry.getMd5());
+              throw e;
+            }

Review Comment:
   Add the task as part of the value in locallyScheduledCQs?
   So that you may abort it when dropping.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java:
##########
@@ -168,6 +177,42 @@ private void activeCQ(ConfigNodeProcedureEnv env) {
     }
   }
 
+  void recoverScheduledTask(ConfigNodeProcedureEnv env) throws 
ConsensusException {
+    Optional<CQInfo.CQEntry> cqEntry = getCurrentCQEntry(env);
+    if (!cqEntry.isPresent()) {
+      LOGGER.info(
+          "Skip recovering the schedule task of CQ {} because its metadata is 
unavailable.",
+          req.cqId);
+      return;
+    }
+    submitScheduleTask(env, new CQScheduleTask(cqEntry.get(), executor, 
env.getConfigManager()));
+  }
+
+  Optional<CQInfo.CQEntry> getCurrentCQEntry(ConfigNodeProcedureEnv env) 
throws ConsensusException {
+    ShowCQResp response =
+        (ShowCQResp) env.getConfigManager().getConsensusManager().read(new 
ShowCQPlan());
+    return response.getCqList().stream()
+        .filter(entry -> req.cqId.equals(entry.getCqId()) && 
md5.equals(entry.getMd5()))
+        .findFirst();
+  }
+
+  private static String generateCQToken(String cqId) {
+    return DigestUtils.md2Hex(cqId + "-" + UUID.randomUUID());
+  }

Review Comment:
   Now that you use UUID, it is no longer necessary to use md5.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java:
##########
@@ -75,7 +81,7 @@ public CreateCQProcedure(ScheduledExecutorService executor) {
   public CreateCQProcedure(TCreateCQReq req, ScheduledExecutorService 
executor) {
     super();
     this.req = req;
-    this.md5 = DigestUtils.md2Hex(req.cqId);
+    this.md5 = generateCQToken(req.cqId);

Review Comment:
   Improper to call it md5 now.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java:
##########
@@ -168,6 +177,42 @@ private void activeCQ(ConfigNodeProcedureEnv env) {
     }
   }
 
+  void recoverScheduledTask(ConfigNodeProcedureEnv env) throws 
ConsensusException {
+    Optional<CQInfo.CQEntry> cqEntry = getCurrentCQEntry(env);
+    if (!cqEntry.isPresent()) {
+      LOGGER.info(
+          "Skip recovering the schedule task of CQ {} because its metadata is 
unavailable.",
+          req.cqId);
+      return;
+    }
+    submitScheduleTask(env, new CQScheduleTask(cqEntry.get(), executor, 
env.getConfigManager()));
+  }
+
+  Optional<CQInfo.CQEntry> getCurrentCQEntry(ConfigNodeProcedureEnv env) 
throws ConsensusException {
+    ShowCQResp response =
+        (ShowCQResp) env.getConfigManager().getConsensusManager().read(new 
ShowCQPlan());
+    return response.getCqList().stream()
+        .filter(entry -> req.cqId.equals(entry.getCqId()) && 
md5.equals(entry.getMd5()))
+        .findFirst();
+  }

Review Comment:
   Is it possible to moderate ShowCQPlan to only return the desired CQs instead 
of filtering at the upper level?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to