Caideyipi commented on code in PR #12018:
URL: https://github.com/apache/iotdb/pull/12018#discussion_r1475707402


##########
integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeAlterIT.java:
##########
@@ -103,16 +104,62 @@ public void testBasicPipeAlter() throws Exception {
       List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
       Assert.assertEquals(1, showPipeResult.size());
       
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=true"));
+      // alter pipe (modify) will keep unspecified configurations
+      Assert.assertTrue(
+          showPipeResult
+              .get(0)
+              .pipeConnector
+              .contains(String.format("node-urls=%s", 
receiverDataNode.getIpAndPortString())));
+      // alter pipe will keep user stopped status
       Assert.assertEquals("STOPPED", showPipeResult.get(0).state);
       // alter pipe will reset pipe creation time
-      Assert.assertTrue(showPipeResult.get(0).creationTime > 
firstCreationTime);
+      Assert.assertTrue(showPipeResult.get(0).creationTime > lastCreationTime);
+      lastCreationTime = showPipeResult.get(0).creationTime;
+      // alter pipe will clear exception messages
+      Assert.assertEquals("", showPipeResult.get(0).exceptionMessage);
+    }
+
+    // start pipe
+    try (Connection connection = senderEnv.getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("start pipe a2b");
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+
+    // alter pipe (replace)
+    try (Connection connection = senderEnv.getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("alter pipe a2b replace processor 
('processor'='down-sampling-processor')");
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+
+    // show pipe
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      Assert.assertEquals(1, showPipeResult.size());
+      Assert.assertTrue(
+          
showPipeResult.get(0).pipeProcessor.contains("processor=down-sampling-processor"));
+      // alter pipe (without sink clause) will not modify sink's configurations
+      
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=true"));
+      Assert.assertTrue(
+          showPipeResult
+              .get(0)
+              .pipeConnector
+              .contains(String.format("node-urls=%s", 
receiverDataNode.getIpAndPortString())));
+      // alter pipe will keep running status
+      Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
+      // alter pipe will reset pipe creation time
+      Assert.assertTrue(showPipeResult.get(0).creationTime > lastCreationTime);
       // alter pipe will clear exception messages
       Assert.assertEquals("", showPipeResult.get(0).exceptionMessage);

Review Comment:
   The exception message may always be empty, whether or not the "alter pipe" 
clear it.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java:
##########
@@ -180,30 +180,36 @@ private void checkBeforeAlterPipeInternal(TAlterPipeReq 
alterPipeRequest) throws
       throw new PipeException(exceptionMessage);
     }
 
-    PipeMeta pipeMetaFromCoordinator = 
getPipeMetaByPipeName(alterPipeRequest.getPipeName());
-    PipeStaticMeta pipeStaticMetaFromCoordinator = 
pipeMetaFromCoordinator.getStaticMeta();
-    // fill empty attributes and check useless alter from the perspective of CN
-    boolean needToAlter = false;
+    PipeStaticMeta pipeStaticMetaFromCoordinator =
+        getPipeMetaByPipeName(alterPipeRequest.getPipeName()).getStaticMeta();
+    // deep copy current pipe static meta
+    PipeStaticMeta copiedPipeStaticMetaFromCoordinator =
+        new PipeStaticMeta(
+            pipeStaticMetaFromCoordinator.getPipeName(),
+            pipeStaticMetaFromCoordinator.getCreationTime(),
+            new 
HashMap<>(pipeStaticMetaFromCoordinator.getExtractorParameters().getAttribute()),
+            new 
HashMap<>(pipeStaticMetaFromCoordinator.getProcessorParameters().getAttribute()),
+            new 
HashMap<>(pipeStaticMetaFromCoordinator.getConnectorParameters().getAttribute()));
+    // fill or update attributes
     if (alterPipeRequest.getProcessorAttributes().isEmpty()) {
       alterPipeRequest.setProcessorAttributes(
-          
pipeStaticMetaFromCoordinator.getProcessorParameters().getAttribute());
-    } else if (!(new PipeParameters(alterPipeRequest.getProcessorAttributes()))
-        .isEquivalent(pipeStaticMetaFromCoordinator.getProcessorParameters())) 
{
-      needToAlter = true;
+          
copiedPipeStaticMetaFromCoordinator.getProcessorParameters().getAttribute());
+    } else if (!alterPipeRequest.isReplaceAllProcessorAttributes) {
+      alterPipeRequest.setProcessorAttributes(
+          copiedPipeStaticMetaFromCoordinator
+              .getProcessorParameters()
+              
.updateEquivalentAttributes(alterPipeRequest.getProcessorAttributes())
+              .getAttribute());
     }
     if (alterPipeRequest.getConnectorAttributes().isEmpty()) {
       alterPipeRequest.setConnectorAttributes(
-          
pipeStaticMetaFromCoordinator.getConnectorParameters().getAttribute());
-    } else if (!(new PipeParameters(alterPipeRequest.getConnectorAttributes())
-        
.isEquivalent(pipeStaticMetaFromCoordinator.getConnectorParameters()))) {
-      needToAlter = true;
-    }
-    if (!needToAlter) {
-      final String exceptionMessage =
-          String.format(
-              "Failed to alter pipe %s, nothing to alter", 
alterPipeRequest.getPipeName());
-      LOGGER.warn(exceptionMessage);
-      throw new PipeException(exceptionMessage);
+          
copiedPipeStaticMetaFromCoordinator.getConnectorParameters().getAttribute());
+    } else if (!alterPipeRequest.isReplaceAllConnectorAttributes) {
+      alterPipeRequest.setConnectorAttributes(
+          copiedPipeStaticMetaFromCoordinator
+              .getConnectorParameters()
+              
.updateEquivalentAttributes(alterPipeRequest.getConnectorAttributes())

Review Comment:
   What if the user wants to add some params into an existing plugin?



##########
integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeAlterIT.java:
##########
@@ -138,17 +185,60 @@ public void testPipeAlterFailure() {
     } catch (SQLException e) {
       fail(e.getMessage());
     }
+  }
 
-    // useless alter
-    sql =
+  @Test
+  public void testAlterPipeProcessor() {
+    DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    // create pipe
+    String sql =
         String.format(
-            "alter pipe a2b modify sink ('node-urls'='%s', 
'batch.enable'='false')",
+            "create pipe a2b with processor 
('processor'='down-sampling-processor', 'down-sampling.interval-seconds'='1', 
'down-sampling.split-file'='true') with sink ('node-urls'='%s', 
'batch.enable'='false')",
             receiverDataNode.getIpAndPortString());
     try (Connection connection = senderEnv.getConnection();
         Statement statement = connection.createStatement()) {
       statement.execute(sql);
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+
+    // insert history data on sender
+    if (!TestUtils.tryExecuteNonQueriesWithRetry(
+        senderEnv,
+        Arrays.asList(
+            "insert into root.db.d1 (time, at1) values (1000, 1), (1500, 2), 
(2000, 3), (2500, 4), (3000, 5)",
+            "flush"))) {
       fail();
-    } catch (SQLException ignore) {
     }
+
+    // check data on receiver
+    Set<String> expectedResSet = new HashSet<>();
+    expectedResSet.add("1000,1.0,");
+    expectedResSet.add("2000,3.0,");
+    expectedResSet.add("3000,5.0,");
+    TestUtils.assertDataOnEnv(
+        receiverEnv, "select * from root.**", "Time,root.db.d1.at1,", 
expectedResSet);
+
+    // clear data on receiver
+    if (!TestUtils.tryExecuteNonQueriesWithRetry(
+        receiverEnv, Collections.singletonList("delete from root.**"))) {
+      fail();
+    }
+
+    // alter pipe (modify 'down-sampling.interval-seconds')
+    try (Connection connection = senderEnv.getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("alter pipe a2b modify processor 
('down-sampling.interval-seconds'='2')");
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+
+    // check data on receiver
+    expectedResSet.clear();
+    expectedResSet.add("1000,1.0,");
+    expectedResSet.add("3000,5.0,");
+    TestUtils.assertDataOnEnv(

Review Comment:
   The previous progressIndex is reported, though they might be sent again when 
flushed to a single tsFile. However, assuring "retransmission" may not be good.



##########
iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java:
##########
@@ -307,6 +307,24 @@ public boolean isEquivalent(Object obj) {
             .toString());
   }
 
+  /**
+   * This method uses {@link KeyReducer} to check for equivalent keys in 
this.attributes and
+   * attributes, and updates the values of this.attributes accordingly.
+   *
+   * @param attributes Provide the key that needs to be updated along with the 
new value.
+   * @return this pipe parameters
+   */
+  public PipeParameters updateEquivalentAttributes(Map<String, String> 
attributes) {
+    attributes.forEach(
+        (k, v) ->
+            this.attributes.entrySet().stream()
+                .filter(
+                    entry ->
+                        entry.getKey().equals(k) || 
entry.getKey().equals(KeyReducer.reduce(k)))

Review Comment:
   What if entry.getKey().equals("sink.ip") and k.equals("connector.ip")?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to