VGalaxies commented on code in PR #12724:
URL: https://github.com/apache/iotdb/pull/12724#discussion_r1639173737


##########
example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java:
##########
@@ -226,10 +233,109 @@ private static void dataSubscription2() throws Exception 
{
     }
   }
 
+  /** multi push consumer subscribe topic with tsfile format and query mode */
+  private static void dataSubscription3() throws Exception {
+    try (final SubscriptionSession subscriptionSession = new 
SubscriptionSession(HOST, PORT)) {
+      subscriptionSession.open();
+      final Properties config = new Properties();
+      config.put(TopicConstant.FORMAT_KEY, 
TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+      config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_QUERY_VALUE);
+      subscriptionSession.createTopic(TOPIC_3, config);
+    }
+
+    final List<Thread> threads = new ArrayList<>();
+    for (int i = 0; i < PARALLELISM; ++i) {
+      final int idx = i;
+      final Thread thread =
+          new Thread(
+              () -> {
+                // Subscription: builder-style ctor
+                try (final SubscriptionPushConsumer consumer3 =
+                    new SubscriptionPushConsumer.Builder()
+                        .consumerId("c" + idx)
+                        .consumerGroupId("cg3")
+                        .ackStrategy(AckStrategy.AFTER_CONSUME)
+                        .consumeListener(
+                            message -> {
+                              // do something for SubscriptionTsFileHandler
+                              System.out.println(
+                                  
message.getTsFileHandler().getFile().getAbsolutePath());
+                              return ConsumeResult.SUCCESS;
+                            })
+                        .buildPushConsumer()) {
+                  consumer3.open();
+                  consumer3.subscribe(TOPIC_3);
+                  while (!consumer3.hasNoTopicsSubscribed()) {
+                    LockSupport.parkNanos(SLEEP_NS); // wait some time
+                  }
+                }
+              });
+      thread.start();
+      threads.add(thread);
+    }
+
+    for (final Thread thread : threads) {
+      thread.join();
+    }
+  }
+
+  /** multi pull consumer subscribe topic with tsfile format and query mode */
+  private static void dataSubscription4() throws Exception {
+    try (final SubscriptionSession subscriptionSession = new 
SubscriptionSession(HOST, PORT)) {
+      subscriptionSession.open();
+      final Properties config = new Properties();
+      config.put(TopicConstant.FORMAT_KEY, 
TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+      config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_QUERY_VALUE);
+      subscriptionSession.createTopic(TOPIC_4, config);
+    }
+
+    final List<Thread> threads = new ArrayList<>();
+    for (int i = 0; i < PARALLELISM; ++i) {
+      final int idx = i;
+      final Thread thread =
+          new Thread(
+              () -> {
+                // Subscription: builder-style ctor
+                try (final SubscriptionPullConsumer consumer4 =
+                    new SubscriptionPullConsumer.Builder()
+                        .consumerId("c" + idx)
+                        .consumerGroupId("cg4")
+                        .autoCommit(true)
+                        .fileSaveFsync(true)
+                        .buildPullConsumer()) {
+                  consumer4.open();
+                  consumer4.subscribe(TOPIC_4);
+                  while (true) {
+                    LockSupport.parkNanos(SLEEP_NS); // wait some time
+                    if (consumer4.hasNoTopicsSubscribed()) {

Review Comment:
   has been renamed, it can't negate here...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to