Copilot commented on code in PR #16250:
URL: https://github.com/apache/iotdb/pull/16250#discussion_r2297396040


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java:
##########
@@ -2007,6 +2007,78 @@ public SettableFuture<ConfigTaskResult> createPipe(
       return future;
     }
 
+    // Syntactic sugar: if full-sync mode is detected (i.e. not snapshot mode, 
or both realtime
+    // and history are true), the pipe is split into history-only and 
realtime–only modes.
+    final PipeParameters extractorPipeParameters =
+        new PipeParameters(createPipeStatement.getExtractorAttributes());
+    if (PipeDataNodeAgent.task().isFullSync(extractorPipeParameters)) {
+      try (final ConfigNodeClient configNodeClient =
+          
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+        // 1. Send request to create the historical data synchronization 
pipeline
+        final TCreatePipeReq historyReq =
+            new TCreatePipeReq()
+                // Append suffix to the pipeline name for historical data
+                .setPipeName(createPipeStatement.getPipeName() + "_history")
+                
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
+                // Use extractor parameters for historical data
+                .setExtractorAttributes(
+                    extractorPipeParameters
+                        .addOrReplaceEquivalentAttributesWithClone(
+                            new PipeParameters(
+                                Collections.singletonMap(
+                                    
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY, "true")))
+                        .addOrReplaceEquivalentAttributesWithClone(
+                            new PipeParameters(
+                                Collections.singletonMap(
+                                    
PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY, "false")))

Review Comment:
   [nitpick] The string literal "true" should be replaced with a boolean 
constant or the actual boolean value true to improve type safety and 
consistency.
   ```suggestion
                                       
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY, true)))
                           .addOrReplaceEquivalentAttributesWithClone(
                               new PipeParameters(
                                   Collections.singletonMap(
                                       
PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY, false)))
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java:
##########
@@ -2007,6 +2007,78 @@ public SettableFuture<ConfigTaskResult> createPipe(
       return future;
     }
 
+    // Syntactic sugar: if full-sync mode is detected (i.e. not snapshot mode, 
or both realtime
+    // and history are true), the pipe is split into history-only and 
realtime–only modes.
+    final PipeParameters extractorPipeParameters =
+        new PipeParameters(createPipeStatement.getExtractorAttributes());
+    if (PipeDataNodeAgent.task().isFullSync(extractorPipeParameters)) {
+      try (final ConfigNodeClient configNodeClient =
+          
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+        // 1. Send request to create the historical data synchronization 
pipeline
+        final TCreatePipeReq historyReq =
+            new TCreatePipeReq()
+                // Append suffix to the pipeline name for historical data
+                .setPipeName(createPipeStatement.getPipeName() + "_history")
+                
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
+                // Use extractor parameters for historical data
+                .setExtractorAttributes(
+                    extractorPipeParameters
+                        .addOrReplaceEquivalentAttributesWithClone(
+                            new PipeParameters(
+                                Collections.singletonMap(
+                                    
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY, "true")))
+                        .addOrReplaceEquivalentAttributesWithClone(
+                            new PipeParameters(
+                                Collections.singletonMap(
+                                    
PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY, "false")))
+                        .getAttribute())
+                
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
+                
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
+
+        final TSStatus historyTsStatus = 
configNodeClient.createPipe(historyReq);
+        // If creation fails, immediately return with exception
+        if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != 
historyTsStatus.getCode()) {
+          future.setException(new IoTDBException(historyTsStatus));
+          return future;
+        }
+
+        // 2. Send request to create the real-time data synchronization 
pipeline
+        final TCreatePipeReq realtimeReq =
+            new TCreatePipeReq()
+                // Append suffix to the pipeline name for real-time data
+                .setPipeName(createPipeStatement.getPipeName() + "_realtime")
+                
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
+                // Use extractor parameters for real-time data
+                .setExtractorAttributes(
+                    extractorPipeParameters
+                        .addOrReplaceEquivalentAttributesWithClone(
+                            new PipeParameters(
+                                Collections.singletonMap(
+                                    
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY, "false")))
+                        .addOrReplaceEquivalentAttributesWithClone(
+                            new PipeParameters(
+                                Collections.singletonMap(
+                                    
PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY, "true")))

Review Comment:
   [nitpick] The string literal "true" should be replaced with a boolean 
constant or the actual boolean value true to improve type safety and 
consistency.
   ```suggestion
                                       
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY, Boolean.toString(false))))
                           .addOrReplaceEquivalentAttributesWithClone(
                               new PipeParameters(
                                   Collections.singletonMap(
                                       
PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY, Boolean.toString(true))))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to