VGalaxies commented on code in PR #12647:
URL: https://github.com/apache/iotdb/pull/12647#discussion_r1628743055


##########
integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java:
##########
@@ -543,7 +551,112 @@ public void testTopicInvalidProcessorConfig() throws 
Exception {
     config.put("processor", "tumbling-time-sampling-processor");
     config.put("processor.tumbling-time.interval-seconds", "1");
     config.put("processor.down-sampling.split-file", "true");
-    testTopicInvalidRuntimeConfigTemplate("topic4", config);
+    testTopicInvalidRuntimeConfigTemplate("topic10", config);
+  }
+
+  @Test
+  public void testTopicWithQueryMode() throws Exception {
+    // Insert some historical data
+    try (final ISession session = senderEnv.getSessionConnection()) {
+      for (int i = 0; i < 100; ++i) {
+        session.executeNonQueryStatement(
+            String.format("insert into root.db.d1(time, s1) values (%s, 1)", 
i));
+      }
+      session.executeNonQueryStatement("flush");
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Create topic
+    final String topicName = "topic11";
+    final String host = senderEnv.getIP();
+    final int port = Integer.parseInt(senderEnv.getPort());
+    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
+      session.open();
+      final Properties config = new Properties();
+      config.put(TopicConstant.FORMAT_KEY, 
TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+      config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_QUERY_VALUE);
+      session.createTopic(topicName, config);
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Subscription
+    final AtomicInteger rowCount = new AtomicInteger();
+    final AtomicBoolean isClosed = new AtomicBoolean(false);
+    final Thread thread =
+        new Thread(
+            () -> {
+              try (final SubscriptionPullConsumer consumer =
+                  new SubscriptionPullConsumer.Builder()
+                      .host(host)
+                      .port(port)
+                      .consumerId("c1")
+                      .consumerGroupId("cg1")
+                      .autoCommit(false)
+                      .buildPullConsumer()) {
+                consumer.open();
+                consumer.subscribe(topicName);
+                while (!isClosed.get()) {
+                  LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); 
// wait some time
+                  final List<SubscriptionMessage> messages =
+                      
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
+                  for (final SubscriptionMessage message : messages) {
+                    try (final TsFileReader tsFileReader =
+                        message.getTsFileHandler().openReader()) {
+                      final Path path = new Path("root.db.d1", "s1", true);
+                      final QueryDataSet dataSet =
+                          tsFileReader.query(
+                              
QueryExpression.create(Collections.singletonList(path), null));
+                      while (dataSet.hasNext()) {
+                        dataSet.next();
+                        rowCount.addAndGet(1);
+                      }
+                    }
+                  }
+                  consumer.commitSync(messages);
+                }
+                consumer.unsubscribe(topicName);

Review Comment:
   > This will work not only in the query mode since the "unsubscribe" is 
explicitly called...
   
   Before the Awaitility test passes, the consumer is always in a poll loop, so 
it can be confirmed that the drop subscription is triggered due to receiving a 
TerminationPayload.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to