SteveYurongSu commented on code in PR #11639:
URL: https://github.com/apache/iotdb/pull/11639#discussion_r1420956175
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java:
##########
@@ -143,6 +145,22 @@ protected void
executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env)
if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(response.getMessage());
}
+
+ // Create subtask of schema pipe here
Review Comment:
另外,也需要考虑留下一个钩子,来处理心跳收集异常信息、进度等信息。建议也做一个 PipeTaskAgent 到 CN 上,作为 CN 的一个任务调度入口
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java:
##########
@@ -143,6 +145,22 @@ protected void
executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env)
if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(response.getMessage());
}
+
+ // Create subtask of schema pipe here
Review Comment:
钩子加在 procedure 里面可能是不够的,还是需要有一个类似 DN 上 pipe task agent 的
handlePipeMetaChange 的统一入口,针对 CN 节点是否自己是 leader、自己身上的 pipe 运行状态、请求过来的 meta
的状态统一决定是怎么调动 PipeConfigNodeSubtaskExecutor。这样才能通过现有的 pipe procedure 框架处理切主等异常情景。
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java:
##########
@@ -143,6 +145,22 @@ protected void
executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env)
if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(response.getMessage());
}
+
+ // Create subtask of schema pipe here
Review Comment:
钩子加在 procedure 里面可能是不够的,还是需要有一个类似 DN 上 pipe task agent 的
handlePipeMetaChange 的统一入口,针对 CN 节点是否自己是 leader、自己身上的 pipe 运行状态、请求过来的 meta
的状态统一决定是怎么调动 PipeConfigNodeSubtaskExecutor。这样才能通过现有的 pipe procedure 框架处理切主等异常情景。
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java:
##########
@@ -143,6 +145,22 @@ protected void
executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env)
if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(response.getMessage());
}
+
+ // Create subtask of schema pipe here
Review Comment:
另外,也需要考虑留下一个钩子,来处理心跳收集异常信息、进度等信息。建议也做一个 PipeTaskAgent 到 CN 上,作为 CN 的一个任务调度入口
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]