Caideyipi commented on code in PR #12228:
URL: https://github.com/apache/iotdb/pull/12228#discussion_r1540602325


##########
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.subscription;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.isession.SessionConfig;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.subscription.SubscriptionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionConnection;
+import org.apache.iotdb.session.subscription.model.Subscription;
+import org.apache.iotdb.session.subscription.model.Topic;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import java.time.ZoneId;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+public class SubscriptionSession extends Session {
+
+  public SubscriptionSession(String host, int port) {
+    this(host, port, SessionConfig.DEFAULT_USER, 
SessionConfig.DEFAULT_PASSWORD);
+  }
+
+  public SubscriptionSession(String host, int port, String username, String 
password) {
+    // TODO: more configs control
+    super(
+        new Session.Builder()
+            .host(host)
+            .port(port)
+            .username(username)
+            .password(password)
+            // disable auto fetch
+            .enableAutoFetch(false));
+  }
+
+  @Override
+  public SessionConnection constructSessionConnection(
+      Session session, TEndPoint endpoint, ZoneId zoneId) throws 
IoTDBConnectionException {
+    if (endpoint == null) {
+      return new SubscriptionSessionConnection(
+          session, zoneId, availableNodes, maxRetryCount, retryIntervalInMs);
+    }
+    return new SubscriptionSessionConnection(
+        session, endpoint, zoneId, availableNodes, maxRetryCount, 
retryIntervalInMs);
+  }
+
+  /////////////////////////////// topic ///////////////////////////////
+
+  public void createTopic(String topicName)
+      throws IoTDBConnectionException, StatementExecutionException {
+    final String sql = String.format("CREATE TOPIC %s", topicName);
+    executeNonQueryStatement(sql);
+  }
+
+  public void createTopic(String topicName, Properties config)
+      throws IoTDBConnectionException, StatementExecutionException {
+    if (config.isEmpty()) {
+      createTopic(topicName);
+    }
+    final StringBuilder sb = new StringBuilder();
+    sb.append('(');
+    config.forEach(
+        (k, v) ->
+            sb.append('\'')
+                .append(k)
+                .append('\'')
+                .append('=')
+                .append('\'')
+                .append(v)
+                .append('\'')
+                .append(','));
+    sb.deleteCharAt(sb.length() - 1);
+    sb.append(')');
+    final String sql = String.format("CREATE TOPIC %s WITH %s", topicName, sb);
+    executeNonQueryStatement(sql);
+  }
+
+  public void drop(String topicName) throws IoTDBConnectionException, 
StatementExecutionException {
+    final String sql = String.format("DROP TOPIC %s", topicName);
+    executeNonQueryStatement(sql);
+  }
+
+  public Set<Topic> getTopics() throws IoTDBConnectionException, 
StatementExecutionException {
+    final String sql = "SHOW TOPICS";
+    try (SessionDataSet dataSet = executeQueryStatement(sql)) {
+      return convertDataSetToTopics(dataSet);
+    }
+  }
+
+  public Optional<Topic> getTopic(String topicName)
+      throws IoTDBConnectionException, StatementExecutionException {
+    final String sql = String.format("SHOW TOPIC %s", topicName);
+    try (SessionDataSet dataSet = executeQueryStatement(sql)) {
+      Set<Topic> topics = convertDataSetToTopics(dataSet);
+      if (topics.isEmpty()) {
+        return Optional.empty();
+      }
+      return Optional.of(topics.iterator().next());
+    }
+  }
+
+  /////////////////////////////// subscription ///////////////////////////////
+
+  public Set<Subscription> getSubscriptions()
+      throws IoTDBConnectionException, StatementExecutionException {
+    final String sql = "SHOW SUBSCRIPTIONS";
+    try (SessionDataSet dataSet = executeQueryStatement(sql)) {
+      return convertDataSetToSubscriptions(dataSet);
+    }
+  }
+
+  public Set<Subscription> getSubscriptions(String topicName)
+      throws IoTDBConnectionException, StatementExecutionException {
+    final String sql = String.format("SHOW SUBSCRIPTIONS ON %s", topicName);
+    try (SessionDataSet dataSet = executeQueryStatement(sql)) {
+      return convertDataSetToSubscriptions(dataSet);
+    }
+  }
+
+  /////////////////////////////// utility ///////////////////////////////
+
+  public Set<Topic> convertDataSetToTopics(SessionDataSet dataSet)
+      throws IoTDBConnectionException, StatementExecutionException {
+    Set<Topic> topics = new HashSet<>();
+    while (dataSet.hasNext()) {
+      RowRecord record = dataSet.next();
+      List<Field> fields = record.getFields();
+      if (fields.size() != 2) {
+        throw new SubscriptionException("something unexpected happened when 
get topics...");

Review Comment:
   The error fields or record can be displayed



##########
integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionConsumerGroupIT.java:
##########
@@ -19,324 +19,317 @@
 
 package org.apache.iotdb.subscription.it;
 
+import org.apache.iotdb.db.it.utils.TestUtils;
 import org.apache.iotdb.isession.ISession;
-import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
-import org.apache.iotdb.itbase.category.LocalStandaloneIT;
-import org.apache.iotdb.rpc.subscription.payload.config.ConsumerConfig;
-import org.apache.iotdb.rpc.subscription.payload.config.ConsumerConstant;
-import org.apache.iotdb.rpc.subscription.payload.response.EnrichedTablets;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.itbase.category.MultiClusterIT2;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.session.subscription.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.SubscriptionPullConsumer;
+import org.apache.iotdb.session.subscription.SubscriptionSessionDataSet;
+import org.apache.iotdb.session.subscription.SubscriptionSessionDataSets;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
 
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
+import org.awaitility.Awaitility;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.sql.Connection;
+import java.sql.Statement;
+import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 
 import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
-@Category({LocalStandaloneIT.class})
-public class IoTDBSubscriptionConsumerGroupIT {
+@Category({MultiClusterIT2.class})
+public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(IoTDBSubscriptionConsumerGroupIT.class);
 
-  private static final int BASE = 233;
+  @Test
+  public void test3C1CGSubscribeOneTopic() throws Exception {
+    long currentTime = createTopics();
+    List<SubscriptionPullConsumer> consumers = new ArrayList<>();
+    consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
+    consumers.add(createConsumerAndSubscribeTopics("c2", "cg1", "topic1"));
+    consumers.add(createConsumerAndSubscribeTopics("c3", "cg1", "topic1"));
+    testMultiConsumersSubscribeMultiTopicsTemplate(
+        currentTime,
+        consumers,
+        () -> {
+          assertSingleDataEventuallyOnEnv(
+              receiverEnv,
+              "select count(*) from root.**",
+              new HashMap<String, String>() {
+                {
+                  put("count(root.topic1.s.cg1)", "100");
+                }
+              });
+          return null;
+        });
+  }
+
+  @Test
+  public void test3C3CGSubscribeOneTopic() throws Exception {
+    long currentTime = createTopics();
+    List<SubscriptionPullConsumer> consumers = new ArrayList<>();
+    consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
+    consumers.add(createConsumerAndSubscribeTopics("c2", "cg2", "topic1"));
+    consumers.add(createConsumerAndSubscribeTopics("c3", "cg3", "topic1"));
+    testMultiConsumersSubscribeMultiTopicsTemplate(
+        currentTime,
+        consumers,
+        () -> {
+          assertSingleDataEventuallyOnEnv(
+              receiverEnv,
+              "select count(*) from root.**",
+              new HashMap<String, String>() {
+                {
+                  put("count(root.topic1.s.cg1)", "100");
+                  put("count(root.topic1.s.cg2)", "100");
+                  put("count(root.topic1.s.cg3)", "100");
+                }
+              });
+          return null;
+        });
+  }
 
-  @Before
-  public void setUp() throws Exception {
-    EnvFactory.getEnv().initClusterEnvironment();
+  @Test
+  public void test3C1CGSubscribeTwoTopic() throws Exception {
+    long currentTime = createTopics();
+    List<SubscriptionPullConsumer> consumers = new ArrayList<>();
+    consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
+    consumers.add(createConsumerAndSubscribeTopics("c2", "cg1", "topic1", 
"topic2"));
+    consumers.add(createConsumerAndSubscribeTopics("c3", "cg1", "topic2"));
+    testMultiConsumersSubscribeMultiTopicsTemplate(
+        currentTime,
+        consumers,
+        () -> {
+          assertSingleDataEventuallyOnEnv(
+              receiverEnv,
+              "select count(*) from root.**",
+              new HashMap<String, String>() {
+                {
+                  put("count(root.topic1.s.cg1)", "100");
+                  put("count(root.topic2.s.cg1)", "100");
+                }
+              });
+          return null;
+        });
   }
 
-  @After
-  public void tearDown() throws Exception {
-    EnvFactory.getEnv().cleanClusterEnvironment();
+  @Test
+  public void test3C3CGSubscribeTwoTopic() throws Exception {
+    long currentTime = createTopics();
+    List<SubscriptionPullConsumer> consumers = new ArrayList<>();
+    consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
+    consumers.add(createConsumerAndSubscribeTopics("c2", "cg2", "topic1", 
"topic2"));
+    consumers.add(createConsumerAndSubscribeTopics("c3", "cg3", "topic2"));
+    testMultiConsumersSubscribeMultiTopicsTemplate(
+        currentTime,
+        consumers,
+        () -> {
+          assertSingleDataEventuallyOnEnv(
+              receiverEnv,
+              "select count(*) from root.**",
+              new HashMap<String, String>() {
+                {
+                  put("count(root.topic1.s.cg1)", "100");
+                  put("count(root.topic1.s.cg2)", "100");
+                  put("count(root.topic2.s.cg2)", "100");
+                  put("count(root.topic2.s.cg3)", "100");
+                }
+              });
+          return null;
+        });
   }
 
-  private void testMultiConsumersSubscribeMultiTopicsTemplate(
-      List<Pair<ConsumerConfig, Set<String>>> consumerConfigs, int factor) 
throws Exception {
-    ConcurrentHashMap<String, ConcurrentHashMap<Long, Long>> 
consumerGroupIdToTimestamps =
-        new ConcurrentHashMap<>();
+  @Test
+  public void test4C2CGSubscribeTwoTopic() throws Exception {
+    long currentTime = createTopics();
+    List<SubscriptionPullConsumer> consumers = new ArrayList<>();
+    consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
+    consumers.add(createConsumerAndSubscribeTopics("c2", "cg2", "topic1", 
"topic2"));
+    consumers.add(createConsumerAndSubscribeTopics("c3", "cg1", "topic1"));
+    consumers.add(createConsumerAndSubscribeTopics("c4", "cg2", "topic2"));
+    testMultiConsumersSubscribeMultiTopicsTemplate(
+        currentTime,
+        consumers,
+        () -> {
+          assertSingleDataEventuallyOnEnv(
+              receiverEnv,
+              "select count(*) from root.**",
+              new HashMap<String, String>() {
+                {
+                  put("count(root.topic1.s.cg1)", "100");
+                  put("count(root.topic1.s.cg2)", "100");
+                  put("count(root.topic2.s.cg2)", "100");
+                }
+              });
+          return null;
+        });
+  }
 
-    try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
-      session.executeNonQueryStatement("create topic topic1 with 
('start-time'='now')");
-      session.executeNonQueryStatement("create topic topic2 with 
('end-time'='now')");
+  private long createTopics() {
+    // create topics on sender
+    long currentTime = System.currentTimeMillis();
+    try (ISession session = senderEnv.getSessionConnection()) {
+      session.executeNonQueryStatement(
+          String.format("create topic topic1 with ('end-time'='%s')", 
currentTime - 1));
+      session.executeNonQueryStatement(
+          String.format("create topic topic2 with ('start-time'='%s')", 
currentTime));
     } catch (Exception e) {
+      e.printStackTrace();
       fail(e.getMessage());
     }
+    return currentTime;
+  }
 
-    long currentTime = System.currentTimeMillis();
-    try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
-      for (int i = 0; i < BASE; ++i) {
+  private SubscriptionPullConsumer createConsumerAndSubscribeTopics(
+      String consumerId, String consumerGroupId, String... topicNames) throws 
Exception {
+    SubscriptionPullConsumer consumer =
+        new SubscriptionPullConsumer.Builder()
+            .host(senderEnv.getIP())
+            .port(Integer.parseInt(senderEnv.getPort()))
+            .consumerId(consumerId)
+            .consumerGroupId(consumerGroupId)
+            .autoCommit(false)
+            .buildPullConsumer();
+    consumer.open();
+    consumer.subscribe(topicNames);
+    return consumer;
+  }
+
+  private void testMultiConsumersSubscribeMultiTopicsTemplate(
+      long currentTime, List<SubscriptionPullConsumer> consumers, 
Supplier<Void> checker)
+      throws Exception {
+    // insert some history data on sender
+    try (ISession session = senderEnv.getSessionConnection()) {
+      for (int i = 0; i < 100; ++i) {
         session.executeNonQueryStatement(
-            String.format("insert into root.db.d1(time, s) values (%s, 1)", 
i));
-      }
-      for (int i = 0; i < BASE; ++i) {
+            String.format("insert into root.topic1(time, s) values (%s, 1)", 
i)); // topic1
         session.executeNonQueryStatement(
-            String.format("insert into root.db.d2(time, s) values (%s, 1)", 
currentTime + i));
+            String.format(
+                "insert into root.topic2(time, s) values (%s, 1)", currentTime 
+ i)); // topic2
       }
       session.executeNonQueryStatement("flush");
-      session.executeNonQueryStatement("flush");
     } catch (Exception e) {
+      e.printStackTrace();
       fail(e.getMessage());
     }
 
+    AtomicBoolean isClosed = new AtomicBoolean(false);

Review Comment:
   This part of code can somehow be extracted due to its common use in 
subscription IT



##########
integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionConsumerGroupIT.java:
##########
@@ -19,324 +19,317 @@
 
 package org.apache.iotdb.subscription.it;
 
+import org.apache.iotdb.db.it.utils.TestUtils;
 import org.apache.iotdb.isession.ISession;
-import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
-import org.apache.iotdb.itbase.category.LocalStandaloneIT;
-import org.apache.iotdb.rpc.subscription.payload.config.ConsumerConfig;
-import org.apache.iotdb.rpc.subscription.payload.config.ConsumerConstant;
-import org.apache.iotdb.rpc.subscription.payload.response.EnrichedTablets;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.itbase.category.MultiClusterIT2;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.session.subscription.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.SubscriptionPullConsumer;
+import org.apache.iotdb.session.subscription.SubscriptionSessionDataSet;
+import org.apache.iotdb.session.subscription.SubscriptionSessionDataSets;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
 
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
+import org.awaitility.Awaitility;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.sql.Connection;
+import java.sql.Statement;
+import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 
 import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
-@Category({LocalStandaloneIT.class})
-public class IoTDBSubscriptionConsumerGroupIT {
+@Category({MultiClusterIT2.class})
+public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(IoTDBSubscriptionConsumerGroupIT.class);
 
-  private static final int BASE = 233;
+  @Test
+  public void test3C1CGSubscribeOneTopic() throws Exception {
+    long currentTime = createTopics();
+    List<SubscriptionPullConsumer> consumers = new ArrayList<>();
+    consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
+    consumers.add(createConsumerAndSubscribeTopics("c2", "cg1", "topic1"));
+    consumers.add(createConsumerAndSubscribeTopics("c3", "cg1", "topic1"));
+    testMultiConsumersSubscribeMultiTopicsTemplate(
+        currentTime,
+        consumers,
+        () -> {
+          assertSingleDataEventuallyOnEnv(
+              receiverEnv,
+              "select count(*) from root.**",
+              new HashMap<String, String>() {
+                {
+                  put("count(root.topic1.s.cg1)", "100");
+                }
+              });
+          return null;
+        });
+  }
+
+  @Test
+  public void test3C3CGSubscribeOneTopic() throws Exception {
+    long currentTime = createTopics();
+    List<SubscriptionPullConsumer> consumers = new ArrayList<>();
+    consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
+    consumers.add(createConsumerAndSubscribeTopics("c2", "cg2", "topic1"));
+    consumers.add(createConsumerAndSubscribeTopics("c3", "cg3", "topic1"));
+    testMultiConsumersSubscribeMultiTopicsTemplate(
+        currentTime,
+        consumers,
+        () -> {
+          assertSingleDataEventuallyOnEnv(
+              receiverEnv,
+              "select count(*) from root.**",
+              new HashMap<String, String>() {
+                {
+                  put("count(root.topic1.s.cg1)", "100");
+                  put("count(root.topic1.s.cg2)", "100");
+                  put("count(root.topic1.s.cg3)", "100");
+                }
+              });
+          return null;
+        });
+  }
 
-  @Before
-  public void setUp() throws Exception {
-    EnvFactory.getEnv().initClusterEnvironment();
+  @Test
+  public void test3C1CGSubscribeTwoTopic() throws Exception {
+    long currentTime = createTopics();
+    List<SubscriptionPullConsumer> consumers = new ArrayList<>();
+    consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
+    consumers.add(createConsumerAndSubscribeTopics("c2", "cg1", "topic1", 
"topic2"));
+    consumers.add(createConsumerAndSubscribeTopics("c3", "cg1", "topic2"));
+    testMultiConsumersSubscribeMultiTopicsTemplate(
+        currentTime,
+        consumers,
+        () -> {
+          assertSingleDataEventuallyOnEnv(
+              receiverEnv,
+              "select count(*) from root.**",
+              new HashMap<String, String>() {
+                {
+                  put("count(root.topic1.s.cg1)", "100");
+                  put("count(root.topic2.s.cg1)", "100");
+                }
+              });
+          return null;
+        });
   }
 
-  @After
-  public void tearDown() throws Exception {
-    EnvFactory.getEnv().cleanClusterEnvironment();
+  @Test
+  public void test3C3CGSubscribeTwoTopic() throws Exception {
+    long currentTime = createTopics();
+    List<SubscriptionPullConsumer> consumers = new ArrayList<>();
+    consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
+    consumers.add(createConsumerAndSubscribeTopics("c2", "cg2", "topic1", 
"topic2"));
+    consumers.add(createConsumerAndSubscribeTopics("c3", "cg3", "topic2"));
+    testMultiConsumersSubscribeMultiTopicsTemplate(
+        currentTime,
+        consumers,
+        () -> {
+          assertSingleDataEventuallyOnEnv(
+              receiverEnv,
+              "select count(*) from root.**",
+              new HashMap<String, String>() {
+                {
+                  put("count(root.topic1.s.cg1)", "100");
+                  put("count(root.topic1.s.cg2)", "100");
+                  put("count(root.topic2.s.cg2)", "100");
+                  put("count(root.topic2.s.cg3)", "100");
+                }
+              });
+          return null;
+        });
   }
 
-  private void testMultiConsumersSubscribeMultiTopicsTemplate(
-      List<Pair<ConsumerConfig, Set<String>>> consumerConfigs, int factor) 
throws Exception {
-    ConcurrentHashMap<String, ConcurrentHashMap<Long, Long>> 
consumerGroupIdToTimestamps =
-        new ConcurrentHashMap<>();
+  @Test
+  public void test4C2CGSubscribeTwoTopic() throws Exception {
+    long currentTime = createTopics();
+    List<SubscriptionPullConsumer> consumers = new ArrayList<>();
+    consumers.add(createConsumerAndSubscribeTopics("c1", "cg1", "topic1"));
+    consumers.add(createConsumerAndSubscribeTopics("c2", "cg2", "topic1", 
"topic2"));
+    consumers.add(createConsumerAndSubscribeTopics("c3", "cg1", "topic1"));
+    consumers.add(createConsumerAndSubscribeTopics("c4", "cg2", "topic2"));
+    testMultiConsumersSubscribeMultiTopicsTemplate(
+        currentTime,
+        consumers,
+        () -> {
+          assertSingleDataEventuallyOnEnv(
+              receiverEnv,
+              "select count(*) from root.**",
+              new HashMap<String, String>() {
+                {
+                  put("count(root.topic1.s.cg1)", "100");
+                  put("count(root.topic1.s.cg2)", "100");
+                  put("count(root.topic2.s.cg2)", "100");
+                }
+              });
+          return null;
+        });
+  }
 
-    try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
-      session.executeNonQueryStatement("create topic topic1 with 
('start-time'='now')");
-      session.executeNonQueryStatement("create topic topic2 with 
('end-time'='now')");
+  private long createTopics() {
+    // create topics on sender
+    long currentTime = System.currentTimeMillis();
+    try (ISession session = senderEnv.getSessionConnection()) {
+      session.executeNonQueryStatement(
+          String.format("create topic topic1 with ('end-time'='%s')", 
currentTime - 1));
+      session.executeNonQueryStatement(
+          String.format("create topic topic2 with ('start-time'='%s')", 
currentTime));
     } catch (Exception e) {
+      e.printStackTrace();
       fail(e.getMessage());
     }
+    return currentTime;
+  }
 
-    long currentTime = System.currentTimeMillis();
-    try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
-      for (int i = 0; i < BASE; ++i) {
+  private SubscriptionPullConsumer createConsumerAndSubscribeTopics(
+      String consumerId, String consumerGroupId, String... topicNames) throws 
Exception {
+    SubscriptionPullConsumer consumer =
+        new SubscriptionPullConsumer.Builder()
+            .host(senderEnv.getIP())
+            .port(Integer.parseInt(senderEnv.getPort()))
+            .consumerId(consumerId)
+            .consumerGroupId(consumerGroupId)
+            .autoCommit(false)
+            .buildPullConsumer();
+    consumer.open();
+    consumer.subscribe(topicNames);
+    return consumer;
+  }
+
+  private void testMultiConsumersSubscribeMultiTopicsTemplate(
+      long currentTime, List<SubscriptionPullConsumer> consumers, 
Supplier<Void> checker)
+      throws Exception {
+    // insert some history data on sender
+    try (ISession session = senderEnv.getSessionConnection()) {
+      for (int i = 0; i < 100; ++i) {
         session.executeNonQueryStatement(
-            String.format("insert into root.db.d1(time, s) values (%s, 1)", 
i));
-      }
-      for (int i = 0; i < BASE; ++i) {
+            String.format("insert into root.topic1(time, s) values (%s, 1)", 
i)); // topic1
         session.executeNonQueryStatement(
-            String.format("insert into root.db.d2(time, s) values (%s, 1)", 
currentTime + i));
+            String.format(
+                "insert into root.topic2(time, s) values (%s, 1)", currentTime 
+ i)); // topic2
       }
       session.executeNonQueryStatement("flush");
-      session.executeNonQueryStatement("flush");
     } catch (Exception e) {
+      e.printStackTrace();
       fail(e.getMessage());
     }
 
+    AtomicBoolean isClosed = new AtomicBoolean(false);
     List<Thread> threads = new ArrayList<>();
-    for (Pair<ConsumerConfig, Set<String>> consumerConfig : consumerConfigs) {
+    for (int i = 0; i < consumers.size(); ++i) {
+      final int index = i;
       Thread t =
           new Thread(
               () -> {
-                try (ISession session = 
EnvFactory.getEnv().getSessionConnection()) {
-                  session.createConsumer(consumerConfig.left);
-                  session.subscribe(consumerConfig.right);
-
-                  List<EnrichedTablets> enrichedTabletsList;
-                  while (true) {
-                    Thread.sleep(1000); // wait some time
-                    enrichedTabletsList = session.poll(consumerConfig.right);
-                    if (enrichedTabletsList.isEmpty()) {
+                try (SubscriptionPullConsumer consumer = consumers.get(index);
+                    ISession session = receiverEnv.getSessionConnection()) {
+                  while (!isClosed.get()) {
+                    try {
+                      Thread.sleep(1000); // wait some time
+                    } catch (InterruptedException e) {
                       break;
                     }
-                    Map<String, List<String>> topicNameToSubscriptionCommitIds 
= new HashMap<>();
-                    for (EnrichedTablets enrichedTablets : 
enrichedTabletsList) {
-                      for (Tablet tablet : enrichedTablets.getTablets()) {
-                        for (Long time : tablet.timestamps) {
-                          consumerGroupIdToTimestamps
-                              .computeIfAbsent(
-                                  consumerConfig.left.getConsumerGroupId(),
-                                  (consumerGroupId) -> new 
ConcurrentHashMap<>())
-                              .put(time, time);
+                    List<SubscriptionMessage> messages = 
consumer.poll(Duration.ofMillis(10000));
+                    if (messages.isEmpty()) {
+                      continue;
+                    }
+                    for (SubscriptionMessage message : messages) {
+                      SubscriptionSessionDataSets payload =
+                          (SubscriptionSessionDataSets) message.getPayload();
+                      for (SubscriptionSessionDataSet dataSet : payload) {
+                        List<String> columnNameList = dataSet.getColumnNames();
+                        while (dataSet.hasNext()) {
+                          RowRecord record = dataSet.next();
+                          insertRowRecordEnrichByConsumerGroupId(
+                              session, columnNameList, record, 
consumer.getConsumerGroupId());
                         }
                       }
-                      topicNameToSubscriptionCommitIds
-                          .computeIfAbsent(
-                              enrichedTablets.getTopicName(), (topicName) -> 
new ArrayList<>())
-                          .add(enrichedTablets.getSubscriptionCommitId());
                     }
-                    session.commit(topicNameToSubscriptionCommitIds);
+                    consumer.commitSync(messages);
                   }
-                  session.unsubscribe(consumerConfig.right);
-                  session.dropConsumer();
+                  // no need to unsubscribe
+                  LOGGER.info(
+                      "consumer {} (group {}) exiting...",
+                      consumer.getConsumerId(),
+                      consumer.getConsumerGroupId());
                 } catch (Exception e) {
-                  fail(e.getMessage());
+                  e.printStackTrace();
+                  // avoid fail
                 }
               });
       t.start();
       threads.add(t);
     }
 
+    // check data on receiver
+    checker.get();
+
+    isClosed.set(true);
     for (Thread thread : threads) {
       thread.join();
     }
-
-    Assert.assertEquals(
-        BASE * factor,
-        
consumerGroupIdToTimestamps.values().stream().mapToInt(Map::size).reduce(0, 
Integer::sum));
   }
 
-  @Test
-  public void test3C1CGSubscribeOneTopic() throws Exception {
-    List<Pair<ConsumerConfig, Set<String>>> consumerConfigs = new 
ArrayList<>();
-    consumerConfigs.add(
-        new Pair<>(
-            new ConsumerConfig(
-                new HashMap<String, String>() {
-                  {
-                    put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1");
-                    put(ConsumerConstant.CONSUMER_ID_KEY, "c1");
-                  }
-                }),
-            new HashSet<>(Collections.singletonList("topic1"))));
-    consumerConfigs.add(
-        new Pair<>(
-            new ConsumerConfig(
-                new HashMap<String, String>() {
-                  {
-                    put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1");
-                    put(ConsumerConstant.CONSUMER_ID_KEY, "c2");
-                  }
-                }),
-            new HashSet<>(Collections.singletonList("topic1"))));
-    consumerConfigs.add(
-        new Pair<>(
-            new ConsumerConfig(
-                new HashMap<String, String>() {
-                  {
-                    put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1");
-                    put(ConsumerConstant.CONSUMER_ID_KEY, "c3");
-                  }
-                }),
-            new HashSet<>(Collections.singletonList("topic1"))));
-    testMultiConsumersSubscribeMultiTopicsTemplate(consumerConfigs, 1);
-  }
-
-  @Test
-  public void test3C3CGSubscribeOneTopic() throws Exception {
-    List<Pair<ConsumerConfig, Set<String>>> consumerConfigs = new 
ArrayList<>();
-    consumerConfigs.add(
-        new Pair<>(
-            new ConsumerConfig(
-                new HashMap<String, String>() {
-                  {
-                    put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1");
-                    put(ConsumerConstant.CONSUMER_ID_KEY, "c1");
-                  }
-                }),
-            new HashSet<>(Collections.singletonList("topic1"))));
-    consumerConfigs.add(
-        new Pair<>(
-            new ConsumerConfig(
-                new HashMap<String, String>() {
-                  {
-                    put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg2");
-                    put(ConsumerConstant.CONSUMER_ID_KEY, "c2");
-                  }
-                }),
-            new HashSet<>(Collections.singletonList("topic1"))));
-    consumerConfigs.add(
-        new Pair<>(
-            new ConsumerConfig(
-                new HashMap<String, String>() {
-                  {
-                    put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg3");
-                    put(ConsumerConstant.CONSUMER_ID_KEY, "c3");
-                  }
-                }),
-            new HashSet<>(Collections.singletonList("topic1"))));
-    testMultiConsumersSubscribeMultiTopicsTemplate(consumerConfigs, 3);
-  }
-
-  @Test
-  public void test3C1CGSubscribeTwoTopic() throws Exception {
-    List<Pair<ConsumerConfig, Set<String>>> consumerConfigs = new 
ArrayList<>();
-    consumerConfigs.add(
-        new Pair<>(
-            new ConsumerConfig(
-                new HashMap<String, String>() {
-                  {
-                    put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1");
-                    put(ConsumerConstant.CONSUMER_ID_KEY, "c1");
-                  }
-                }),
-            new HashSet<>(Collections.singletonList("topic1"))));
-    consumerConfigs.add(
-        new Pair<>(
-            new ConsumerConfig(
-                new HashMap<String, String>() {
-                  {
-                    put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1");
-                    put(ConsumerConstant.CONSUMER_ID_KEY, "c2");
-                  }
-                }),
-            new HashSet<>(Arrays.asList("topic1", "topic2"))));
-    consumerConfigs.add(
-        new Pair<>(
-            new ConsumerConfig(
-                new HashMap<String, String>() {
-                  {
-                    put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1");
-                    put(ConsumerConstant.CONSUMER_ID_KEY, "c3");
-                  }
-                }),
-            new HashSet<>(Collections.singletonList("topic2"))));
-    testMultiConsumersSubscribeMultiTopicsTemplate(consumerConfigs, 2);
-  }
-
-  @Test
-  public void test3C3CGSubscribeTwoTopic() throws Exception {
-    List<Pair<ConsumerConfig, Set<String>>> consumerConfigs = new 
ArrayList<>();
-    consumerConfigs.add(
-        new Pair<>(
-            new ConsumerConfig(
-                new HashMap<String, String>() {
-                  {
-                    put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg1");
-                    put(ConsumerConstant.CONSUMER_ID_KEY, "c1");
-                  }
-                }),
-            new HashSet<>(Collections.singletonList("topic1"))));
-    consumerConfigs.add(
-        new Pair<>(
-            new ConsumerConfig(
-                new HashMap<String, String>() {
-                  {
-                    put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg2");
-                    put(ConsumerConstant.CONSUMER_ID_KEY, "c2");
-                  }
-                }),
-            new HashSet<>(Arrays.asList("topic1", "topic2"))));
-    consumerConfigs.add(
-        new Pair<>(
-            new ConsumerConfig(
-                new HashMap<String, String>() {
-                  {
-                    put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "cg3");
-                    put(ConsumerConstant.CONSUMER_ID_KEY, "c3");
-                  }
-                }),
-            new HashSet<>(Collections.singletonList("topic2"))));
-    testMultiConsumersSubscribeMultiTopicsTemplate(consumerConfigs, 4);
+  private void insertRowRecordEnrichByConsumerGroupId(

Review Comment:
   Consider "enriched"?



##########
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionConsumer.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.subscription;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.isession.SessionConfig;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
+
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public abstract class SubscriptionConsumer implements AutoCloseable {
+
+  private final TEndPoint defaultEndPoint;
+  private final String username;
+  private final String password;
+
+  private final String consumerId;
+  private final String consumerGroupId;
+
+  private Map<Integer, SubscriptionProvider>
+      subscriptionProviders; // contains default subscription provider, used 
for poll and commit
+  private SubscriptionProvider defaultSubscriptionProvider; // used for 
subscribe and unsubscribe
+
+  private static final long HEARTBEAT_INTERVAL = 5000; // unit: ms
+  private ScheduledExecutorService heartbeatWorkerExecutor;
+
+  private boolean isClosed = true;

Review Comment:
   Will this be concurrently altered?



##########
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.subscription;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.isession.SessionConfig;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.subscription.SubscriptionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionConnection;
+import org.apache.iotdb.session.subscription.model.Subscription;
+import org.apache.iotdb.session.subscription.model.Topic;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+import java.time.ZoneId;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+public class SubscriptionSession extends Session {
+
+  public SubscriptionSession(String host, int port) {
+    this(host, port, SessionConfig.DEFAULT_USER, 
SessionConfig.DEFAULT_PASSWORD);
+  }
+
+  public SubscriptionSession(String host, int port, String username, String 
password) {
+    // TODO: more configs control
+    super(
+        new Session.Builder()
+            .host(host)
+            .port(port)
+            .username(username)
+            .password(password)
+            // disable auto fetch
+            .enableAutoFetch(false));
+  }
+
+  @Override
+  public SessionConnection constructSessionConnection(
+      Session session, TEndPoint endpoint, ZoneId zoneId) throws 
IoTDBConnectionException {
+    if (endpoint == null) {
+      return new SubscriptionSessionConnection(
+          session, zoneId, availableNodes, maxRetryCount, retryIntervalInMs);
+    }
+    return new SubscriptionSessionConnection(
+        session, endpoint, zoneId, availableNodes, maxRetryCount, 
retryIntervalInMs);
+  }
+
+  /////////////////////////////// topic ///////////////////////////////
+
+  public void createTopic(String topicName)
+      throws IoTDBConnectionException, StatementExecutionException {
+    final String sql = String.format("CREATE TOPIC %s", topicName);
+    executeNonQueryStatement(sql);
+  }
+
+  public void createTopic(String topicName, Properties config)
+      throws IoTDBConnectionException, StatementExecutionException {
+    if (config.isEmpty()) {
+      createTopic(topicName);
+    }
+    final StringBuilder sb = new StringBuilder();
+    sb.append('(');
+    config.forEach(
+        (k, v) ->
+            sb.append('\'')
+                .append(k)
+                .append('\'')
+                .append('=')
+                .append('\'')
+                .append(v)
+                .append('\'')
+                .append(','));
+    sb.deleteCharAt(sb.length() - 1);
+    sb.append(')');
+    final String sql = String.format("CREATE TOPIC %s WITH %s", topicName, sb);
+    executeNonQueryStatement(sql);
+  }
+
+  public void drop(String topicName) throws IoTDBConnectionException, 
StatementExecutionException {
+    final String sql = String.format("DROP TOPIC %s", topicName);
+    executeNonQueryStatement(sql);
+  }
+
+  public Set<Topic> getTopics() throws IoTDBConnectionException, 
StatementExecutionException {
+    final String sql = "SHOW TOPICS";
+    try (SessionDataSet dataSet = executeQueryStatement(sql)) {
+      return convertDataSetToTopics(dataSet);
+    }
+  }
+
+  public Optional<Topic> getTopic(String topicName)
+      throws IoTDBConnectionException, StatementExecutionException {
+    final String sql = String.format("SHOW TOPIC %s", topicName);
+    try (SessionDataSet dataSet = executeQueryStatement(sql)) {
+      Set<Topic> topics = convertDataSetToTopics(dataSet);
+      if (topics.isEmpty()) {
+        return Optional.empty();
+      }
+      return Optional.of(topics.iterator().next());
+    }
+  }
+
+  /////////////////////////////// subscription ///////////////////////////////
+
+  public Set<Subscription> getSubscriptions()
+      throws IoTDBConnectionException, StatementExecutionException {
+    final String sql = "SHOW SUBSCRIPTIONS";
+    try (SessionDataSet dataSet = executeQueryStatement(sql)) {
+      return convertDataSetToSubscriptions(dataSet);
+    }
+  }
+
+  public Set<Subscription> getSubscriptions(String topicName)
+      throws IoTDBConnectionException, StatementExecutionException {
+    final String sql = String.format("SHOW SUBSCRIPTIONS ON %s", topicName);
+    try (SessionDataSet dataSet = executeQueryStatement(sql)) {
+      return convertDataSetToSubscriptions(dataSet);
+    }
+  }
+
+  /////////////////////////////// utility ///////////////////////////////
+
+  public Set<Topic> convertDataSetToTopics(SessionDataSet dataSet)
+      throws IoTDBConnectionException, StatementExecutionException {
+    Set<Topic> topics = new HashSet<>();
+    while (dataSet.hasNext()) {
+      RowRecord record = dataSet.next();
+      List<Field> fields = record.getFields();
+      if (fields.size() != 2) {
+        throw new SubscriptionException("something unexpected happened when 
get topics...");

Review Comment:
   Maybe "when getting" is better



##########
example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java:
##########
@@ -49,51 +50,89 @@ public static void main(String[] args) throws Exception {
             .build();
     session.open(false);
 
-    int count = 0;
-    session.executeNonQueryStatement("create topic topic1 with 
('start-time'='now')");
-
     // insert some history data

Review Comment:
   "Historical" may be better



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SerializedEnrichedEvent.java:
##########
@@ -111,4 +111,21 @@ public boolean pollable() {
     return System.currentTimeMillis() - lastPolledTimestamp
         > 
SubscriptionConfig.getInstance().getSubscriptionRecycleUncommittedEventIntervalSeconds();
   }
+
+  @Override
+  public boolean equals(Object obj) {

Review Comment:
   What about hash code? Is the "enrichedTablets" enough?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to