Caideyipi commented on code in PR #17734:
URL: https://github.com/apache/iotdb/pull/17734#discussion_r3332720561
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java:
##########
@@ -79,7 +85,11 @@ public TSStatus createCQ(TCreateCQReq req) {
public TSStatus dropCQ(TDropCQReq req) {
try {
- return configManager.getConsensusManager().write(new
DropCQPlan(req.cqId));
+ TSStatus status = configManager.getConsensusManager().write(new
DropCQPlan(req.cqId));
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ locallyScheduledCQs.remove(req.cqId);
+ }
+ return status;
Review Comment:
Done in f09c3b0: dropCQ now coordinates with scheduler start/stop and
cancels the local CQScheduleTask after the metadata drop succeeds, so queued
future executions are aborted.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java:
##########
@@ -176,11 +195,30 @@ public void stopCQScheduler() {
try {
previous = executor;
executor = null;
+ locallyScheduledCQs.clear();
} finally {
lock.writeLock().unlock();
}
if (previous != null) {
previous.shutdown();
}
}
+
+ public boolean markCQLocallyScheduled(String cqId, String md5) {
+ AtomicBoolean shouldSchedule = new AtomicBoolean(false);
+ locallyScheduledCQs.compute(
+ cqId,
+ (ignored, previousMd5) -> {
+ if (!md5.equals(previousMd5)) {
Review Comment:
Done in f09c3b0: the local schedule registry now stores the metadata token
together with the CQScheduleTask. If the same CQ id is registered with a
different token, the old task is cancelled before the new one is scheduled.
Drop/start/stop also cancel local tasks, and cancelled callbacks no longer
resubmit.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java:
##########
@@ -168,6 +177,42 @@ private void activeCQ(ConfigNodeProcedureEnv env) {
}
}
+ void recoverScheduledTask(ConfigNodeProcedureEnv env) throws
ConsensusException {
+ Optional<CQInfo.CQEntry> cqEntry = getCurrentCQEntry(env);
+ if (!cqEntry.isPresent()) {
+ LOGGER.info(
+ "Skip recovering the schedule task of CQ {} because its metadata is
unavailable.",
+ req.cqId);
+ return;
+ }
+ submitScheduleTask(env, new CQScheduleTask(cqEntry.get(), executor,
env.getConfigManager()));
+ }
+
+ Optional<CQInfo.CQEntry> getCurrentCQEntry(ConfigNodeProcedureEnv env)
throws ConsensusException {
+ ShowCQResp response =
+ (ShowCQResp) env.getConfigManager().getConsensusManager().read(new
ShowCQPlan());
+ return response.getCqList().stream()
+ .filter(entry -> req.cqId.equals(entry.getCqId()) &&
md5.equals(entry.getMd5()))
+ .findFirst();
+ }
+
+ private static String generateCQToken(String cqId) {
+ return DigestUtils.md2Hex(cqId + "-" + UUID.randomUUID());
+ }
Review Comment:
Done in f09c3b0: generateCQToken now uses UUID.randomUUID().toString()
directly and the DigestUtils/md2Hex dependency was removed from
CreateCQProcedure.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java:
##########
@@ -168,6 +177,42 @@ private void activeCQ(ConfigNodeProcedureEnv env) {
}
}
+ void recoverScheduledTask(ConfigNodeProcedureEnv env) throws
ConsensusException {
+ Optional<CQInfo.CQEntry> cqEntry = getCurrentCQEntry(env);
+ if (!cqEntry.isPresent()) {
+ LOGGER.info(
+ "Skip recovering the schedule task of CQ {} because its metadata is
unavailable.",
+ req.cqId);
+ return;
+ }
+ submitScheduleTask(env, new CQScheduleTask(cqEntry.get(), executor,
env.getConfigManager()));
+ }
+
+ Optional<CQInfo.CQEntry> getCurrentCQEntry(ConfigNodeProcedureEnv env)
throws ConsensusException {
+ ShowCQResp response =
+ (ShowCQResp) env.getConfigManager().getConsensusManager().read(new
ShowCQPlan());
+ return response.getCqList().stream()
+ .filter(entry -> req.cqId.equals(entry.getCqId()) &&
md5.equals(entry.getMd5()))
+ .findFirst();
+ }
Review Comment:
Done in f09c3b0: ShowCQPlan now accepts an optional CQ id, and CQInfo
filters by CQ id before returning the response. The recovery path now reads
only the target CQ and then checks the token.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java:
##########
@@ -57,11 +60,14 @@ public class CQManager {
private final ReadWriteLock lock;
+ private final ConcurrentMap<String, String> locallyScheduledCQs;
Review Comment:
Done in f09c3b0: added an inline explanation. The key is CQ id, and the
value is the local task plus the metadata token it owns.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java:
##########
@@ -75,7 +81,7 @@ public CreateCQProcedure(ScheduledExecutorService executor) {
public CreateCQProcedure(TCreateCQReq req, ScheduledExecutorService
executor) {
super();
this.req = req;
- this.md5 = DigestUtils.md2Hex(req.cqId);
+ this.md5 = generateCQToken(req.cqId);
Review Comment:
Done in f09c3b0: renamed the CQ identifier from md5 to cqToken across the CQ
plans, metadata, procedure and scheduler code.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java:
##########
@@ -155,8 +166,16 @@ public void startCQScheduler() {
if (allCQs != null) {
for (CQInfo.CQEntry entry : allCQs) {
if (entry.getState() == CQState.ACTIVE) {
+ if (!markCQLocallyScheduled(entry.getCqId(), entry.getMd5())) {
+ continue;
+ }
CQScheduleTask cqScheduleTask = new CQScheduleTask(entry,
executor, configManager);
- cqScheduleTask.submitSelf();
+ try {
+ cqScheduleTask.submitSelf();
+ } catch (RuntimeException e) {
+ unmarkCQLocallyScheduled(entry.getCqId(), entry.getMd5());
+ throw e;
+ }
Review Comment:
Done in f09c3b0: locallyScheduledCQs now stores a LocallyScheduledCQ value
containing the token and CQScheduleTask. CQScheduleTask owns its latest
ScheduledFuture and exposes cancel(), so drop and token replacement can cancel
it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]