Caideyipi commented on code in PR #12969:
URL: https://github.com/apache/iotdb/pull/12969#discussion_r1687340566


##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java:
##########
@@ -160,18 +161,32 @@ public TSStatus stopPipe(String pipeName) {
   }
 
   /** Caller should ensure that the method is called in the lock {@link 
#lock()}. */
-  public TSStatus dropPipe(String pipeName) {
-    final boolean isPipeExistedBeforeDrop = 
pipeTaskInfo.isPipeExisted(pipeName);
-    final TSStatus status = 
configManager.getProcedureManager().dropPipe(pipeName);
+  public TSStatus dropPipe(TDropPipeReq req) {
+    final boolean isPipeExistedBeforeDrop = 
pipeTaskInfo.isPipeExisted(req.getPipeName());
+    final TSStatus status = 
configManager.getProcedureManager().dropPipe(req.getPipeName());
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      LOGGER.warn("Failed to drop pipe {}. Result status: {}.", pipeName, 
status);
+      LOGGER.warn("Failed to drop pipe {}. Result status: {}.", 
req.getPipeName(), status);
     }
-    return isPipeExistedBeforeDrop
-        ? status
-        : RpcUtils.getStatus(
-            TSStatusCode.PIPE_NOT_EXIST_ERROR,
-            String.format(
-                "Failed to drop pipe %s. Failures: %s does not exist.", 
pipeName, pipeName));
+
+    // After the deletion operation is completed, handle the situation where 
the pipe does not
+    // exist
+    if (!isPipeExistedBeforeDrop) {
+      // If the IF EXISTS condition is set in the request, return the current 
status.
+      if (req.isIfExistsCondition()) {
+        return status;

Review Comment:
   The conditions can somehow be compacted, since there are 2 "return status"es.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java:
##########
@@ -149,11 +151,31 @@ public TSStatus createTopic(TCreateTopicReq req) {
     return status;
   }
 
-  public TSStatus dropTopic(String topicName) {
-    final TSStatus status = 
configManager.getProcedureManager().dropTopic(topicName);
+  public TSStatus dropTopic(TDropTopicReq req) {
+    final boolean isTopicExistedBeforeDrop = 
subscriptionInfo.isTopicExisted(req.getTopicName());
+    final TSStatus status = 
configManager.getProcedureManager().dropTopic(req.getTopicName());
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      LOGGER.warn("Failed to drop topic {}. Result status: {}.", topicName, 
status);
+      LOGGER.warn("Failed to drop topic {}. Result status: {}.", 
req.getTopicName(), status);
     }
+
+    // After the deletion operation is completed, handle the situation where 
the topic does not
+    // exist
+    if (!isTopicExistedBeforeDrop) {
+      // If the IF EXISTS condition is set in the request, return the current 
status.
+      if (req.isIfExistsCondition()) {
+        return status;

Review Comment:
   Same as above.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java:
##########
@@ -179,7 +182,10 @@ protected Flow executeFromState(ConfigNodeProcedureEnv 
env, OperateSubscriptionS
     try {
       switch (state) {
         case VALIDATE:
-          executeFromValidate(env);
+          if (!executeFromValidate(env)) {

Review Comment:
   If the procedure shall be skipped, AbstractOperatePipeProcedureV2 returns 
true, however AbstractSubscriptionProcedure returns false here. Better change 
one of them.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java:
##########
@@ -110,6 +111,17 @@ protected PipeTaskOperation getOperation() {
     return PipeTaskOperation.CREATE_PIPE;
   }
 
+  /**
+   * Check the {@link PipePlugin} configuration in the Pipe, if there is an 
error then throw a
+   * {@link PipeException}, if there is the same Pipe name and there is no 
IfNotExists condition in
+   * the {@link #createPipeRequest} then throw a {@link PipeException}, if 
there is an IfNotExists
+   * condition then end normally. {@link CreatePipeProcedureV2} process, if 
there is no Pipe with

Review Comment:
   What does "{@link CreatePipeProcedureV2} process," mean?



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java:
##########
@@ -91,19 +91,28 @@ public void releasePipePluginInfoLock() {
 
   /////////////////////////////// Validator ///////////////////////////////
 
-  public void validateBeforeCreatingPipePlugin(
-      final String pluginName, final String jarName, final String jarMD5) {
+  public boolean validateBeforeCreatingPipePlugin(
+      final String pluginName,
+      final String jarName,
+      final String jarMD5,
+      final boolean ifNotExistsCondition) {
     // both build-in and user defined pipe plugin should be unique
     if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
+      if (ifNotExistsCondition) {
+        return false;
+      }
       throw new PipeException(
           String.format(
               "Failed to create PipePlugin [%s], the same name PipePlugin has 
been created",
               pluginName));
     }
+    return true;
   }
 
-  public void validateBeforeDroppingPipePlugin(final String pluginName) {
+  public boolean validateBeforeDroppingPipePlugin(
+      final String pluginName, boolean ifExistsCondition) {
     if (!pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
+      if (ifExistsCondition) return false;

Review Comment:
   Use {} like other codes....



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java:
##########
@@ -110,6 +111,17 @@ protected PipeTaskOperation getOperation() {
     return PipeTaskOperation.CREATE_PIPE;
   }
 
+  /**
+   * Check the {@link PipePlugin} configuration in the Pipe, if there is an 
error then throw a
+   * {@link PipeException}, if there is the same Pipe name and there is no 
IfNotExists condition in
+   * the {@link #createPipeRequest} then throw a {@link PipeException}, if 
there is an IfNotExists
+   * condition then end normally. {@link CreatePipeProcedureV2} process, if 
there is no Pipe with

Review Comment:
   What does "{@link CreatePipeProcedureV2} process," mean?



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java:
##########
@@ -110,6 +111,17 @@ protected PipeTaskOperation getOperation() {
     return PipeTaskOperation.CREATE_PIPE;
   }
 
+  /**
+   * Check the {@link PipePlugin} configuration in the Pipe, if there is an 
error then throw a
+   * {@link PipeException}, if there is the same Pipe name and there is no 
IfNotExists condition in
+   * the {@link #createPipeRequest} then throw a {@link PipeException}, if 
there is an IfNotExists
+   * condition then end normally. {@link CreatePipeProcedureV2} process, if 
there is no Pipe with

Review Comment:
   What does "{@link CreatePipeProcedureV2} process," mean?



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java:
##########
@@ -128,6 +134,18 @@ private Flow executeFromLock(ConfigNodeProcedureEnv env) {
       return Flow.NO_MORE_STATE;
     }
 
+    // If there is a Plugin with the same name and the IFExists condition 
exists, the process ends
+    // normally.
+    if (!notExists) {
+      LOGGER.info(
+          "Pipe plugin {} is already exist, end the 
DropPipePluginProcedure({})",

Review Comment:
   ....



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java:
##########
@@ -53,6 +53,9 @@ public abstract class AbstractOperateSubscriptionProcedure
   private static final Logger LOGGER =
       LoggerFactory.getLogger(AbstractOperateSubscriptionProcedure.class);
 
+  private static final String SKIP_PROCEDURE_MESSAGE =
+      "Skip the following Procedure execution steps.";

Review Comment:
   Better add some possible reasons here.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java:
##########
@@ -110,6 +111,17 @@ protected PipeTaskOperation getOperation() {
     return PipeTaskOperation.CREATE_PIPE;
   }
 
+  /**
+   * Check the {@link PipePlugin} configuration in the Pipe, if there is an 
error then throw a
+   * {@link PipeException}, if there is the same Pipe name and there is no 
IfNotExists condition in
+   * the {@link #createPipeRequest} then throw a {@link PipeException}, if 
there is an IfNotExists
+   * condition then end normally. {@link CreatePipeProcedureV2} process, if 
there is no Pipe with

Review Comment:
   What does "{@link CreatePipeProcedureV2} process," mean?



##########
integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeCreateAndDropIT.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.autocreate;
+
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2AutoCreateSchema.class})
+public class IoTDBPipeCreateAndDropIT extends AbstractPipeDualAutoIT {
+
+  @Test
+  public void testBasicCreatePipe() throws Exception {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    // Create pipe
+    String sql =
+        String.format(
+            "create pipe a2b with source ('source'='iotdb-source', 
'source.path'='root.test1.**') with processor 
('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
+            receiverDataNode.getIpAndPortString());
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute(sql);
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+
+    // show pipe
+    long creationTime;
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      Assert.assertEquals(1, showPipeResult.size());
+      // Check status
+      Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
+      // Check configurations
+      
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+      
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.path=root.test1.**"));
+      Assert.assertTrue(
+          
showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor"));
+      Assert.assertTrue(
+          showPipeResult
+              .get(0)
+              .pipeConnector
+              .contains(String.format("node-urls=%s", 
receiverDataNode.getIpAndPortString())));
+      // Record last creation time
+      creationTime = showPipeResult.get(0).creationTime;
+    }
+
+    // Create pipe If Not Exists
+    sql =

Review Comment:
   Better test the "if not exists" / "if exists" only, because the normal 
processes are already tested... We shall save time for github IT...



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java:
##########
@@ -116,8 +118,12 @@ private Flow executeFromLock(ConfigNodeProcedureEnv env) {
     final AtomicReference<PipeTaskInfo> pipeTaskInfo = 
pipeTaskCoordinator.lock();
     pipePluginCoordinator.lock();
 
+    boolean notExists = true;
     try {
-      
pipePluginCoordinator.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName);
+      notExists =

Review Comment:
   This return values means "exists", not "not exists"....



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java:
##########
@@ -155,19 +155,24 @@ public boolean canSkipNextSync() {
 
   /////////////////////////////// Validator ///////////////////////////////
 
-  public void checkBeforeCreatePipe(final TCreatePipeReq createPipeRequest) 
throws PipeException {
+  public boolean checkBeforeCreatePipe(final TCreatePipeReq createPipeRequest)
+      throws PipeException {
     acquireReadLock();
     try {
-      checkBeforeCreatePipeInternal(createPipeRequest);
+      return checkBeforeCreatePipeInternal(createPipeRequest);
     } finally {
       releaseReadLock();
     }
   }
 
-  private void checkBeforeCreatePipeInternal(final TCreatePipeReq 
createPipeRequest)
+  private boolean checkBeforeCreatePipeInternal(final TCreatePipeReq 
createPipeRequest)
       throws PipeException {
     if (!isPipeExisted(createPipeRequest.getPipeName())) {
-      return;
+      return true;
+    } else {
+      if (createPipeRequest.ifNotExistsCondition) {

Review Comment:
   Can use “else if” directly.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java:
##########
@@ -110,6 +111,17 @@ protected PipeTaskOperation getOperation() {
     return PipeTaskOperation.CREATE_PIPE;
   }
 
+  /**
+   * Check the {@link PipePlugin} configuration in the Pipe, if there is an 
error then throw a
+   * {@link PipeException}, if there is the same Pipe name and there is no 
IfNotExists condition in
+   * the {@link #createPipeRequest} then throw a {@link PipeException}, if 
there is an IfNotExists
+   * condition then end normally. {@link CreatePipeProcedureV2} process, if 
there is no Pipe with

Review Comment:
   What does "{@link CreatePipeProcedureV2} process," mean?



##########
iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java:
##########
@@ -250,6 +250,7 @@ public enum TSStatusCode {
   ALTER_TOPIC_ERROR(2002),
   SHOW_TOPIC_ERROR(2003),
   TOPIC_PUSH_META_ERROR(2004),
+  TOPIC_NOT_EXIST_ERROR(2005),

Review Comment:
   Is it used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to