Caideyipi commented on code in PR #12647:
URL: https://github.com/apache/iotdb/pull/12647#discussion_r1628640019
##########
integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java:
##########
@@ -543,7 +551,112 @@ public void testTopicInvalidProcessorConfig() throws
Exception {
config.put("processor", "tumbling-time-sampling-processor");
config.put("processor.tumbling-time.interval-seconds", "1");
config.put("processor.down-sampling.split-file", "true");
- testTopicInvalidRuntimeConfigTemplate("topic4", config);
+ testTopicInvalidRuntimeConfigTemplate("topic10", config);
+ }
+
+ @Test
+ public void testTopicWithQueryMode() throws Exception {
+ // Insert some historical data
+ try (final ISession session = senderEnv.getSessionConnection()) {
+ for (int i = 0; i < 100; ++i) {
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d1(time, s1) values (%s, 1)",
i));
+ }
+ session.executeNonQueryStatement("flush");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Create topic
+ final String topicName = "topic11";
+ final String host = senderEnv.getIP();
+ final int port = Integer.parseInt(senderEnv.getPort());
+ try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
+ session.open();
+ final Properties config = new Properties();
+ config.put(TopicConstant.FORMAT_KEY,
TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+ config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_QUERY_VALUE);
+ session.createTopic(topicName, config);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Subscription
+ final AtomicInteger rowCount = new AtomicInteger();
+ final AtomicBoolean isClosed = new AtomicBoolean(false);
+ final Thread thread =
+ new Thread(
+ () -> {
+ try (final SubscriptionPullConsumer consumer =
+ new SubscriptionPullConsumer.Builder()
+ .host(host)
+ .port(port)
+ .consumerId("c1")
+ .consumerGroupId("cg1")
+ .autoCommit(false)
+ .buildPullConsumer()) {
+ consumer.open();
+ consumer.subscribe(topicName);
+ while (!isClosed.get()) {
+ LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS);
// wait some time
+ final List<SubscriptionMessage> messages =
+
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
+ for (final SubscriptionMessage message : messages) {
+ try (final TsFileReader tsFileReader =
+ message.getTsFileHandler().openReader()) {
+ final Path path = new Path("root.db.d1", "s1", true);
+ final QueryDataSet dataSet =
+ tsFileReader.query(
+
QueryExpression.create(Collections.singletonList(path), null));
+ while (dataSet.hasNext()) {
+ dataSet.next();
+ rowCount.addAndGet(1);
+ }
+ }
+ }
+ consumer.commitSync(messages);
+ }
+ consumer.unsubscribe(topicName);
Review Comment:
This will work not only in the query mode since the "unsubscribe" is
explicitly called...
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java:
##########
@@ -74,6 +74,31 @@ public long getCreationTime() {
return creationTime;
}
+ public static /* @NonNull */ Set<String> getTopicsUnsubByGroup(
+ final ConsumerGroupMeta currentMeta, final ConsumerGroupMeta
updatedMeta) {
+ if (!Objects.equals(currentMeta.consumerGroupId,
updatedMeta.consumerGroupId)) {
+ return Collections.emptySet();
+ }
+ if (!Objects.equals(currentMeta.creationTime, updatedMeta.creationTime)) {
+ return Collections.emptySet();
+ }
+ if (!Objects.equals(
+ currentMeta.consumerIdToConsumerMeta,
updatedMeta.consumerIdToConsumerMeta)) {
Review Comment:
Consider closing the consumer directly, and the consumerIdToConsumerMeta is
altered by the DropConsumerProcedure.... In that case the queue can not be
removed.
##########
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java:
##########
@@ -336,18 +336,23 @@ protected List<SubscriptionMessage> poll(final
Set<String> topicNames, final lon
case ERROR:
final ErrorPayload payload = (ErrorPayload)
pollResponse.getPayload();
final String errorMessage = payload.getErrorMessage();
- final boolean critical = payload.isCritical();
- LOGGER.warn(
- "Error occurred when SubscriptionConsumer {} polling topics
{}: {}, critical: {}",
- this,
- topicNames,
- errorMessage,
- critical);
- if (critical) {
+ if (payload.isCritical()) {
throw new SubscriptionRuntimeCriticalException(errorMessage);
} else {
throw new
SubscriptionRuntimeNonCriticalException(errorMessage);
}
+ case TERMINATION:
+ final SubscriptionCommitContext commitContext =
pollResponse.getCommitContext();
+ final String topicNameToUnsubscribe =
commitContext.getTopicName();
+ LOGGER.info(
+ "Termination occurred when SubscriptionConsumer {} polling
topics {}, unsubscribe topic {} on DN {} automatically",
+ this,
+ topicNames,
+ topicNameToUnsubscribe,
+ commitContext.getDataNodeId());
+ unsubscribe(topicNameToUnsubscribe);
Review Comment:
Note that since the pipe is automatically dropped, the "DropPipeProcedure"s
in "DropSubscriptionProcedure" will encounter failure in validation... Need to
add some checks or else the procedure will fail.
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java:
##########
@@ -74,6 +74,31 @@ public long getCreationTime() {
return creationTime;
}
+ public static /* @NonNull */ Set<String> getTopicsUnsubByGroup(
+ final ConsumerGroupMeta currentMeta, final ConsumerGroupMeta
updatedMeta) {
+ if (!Objects.equals(currentMeta.consumerGroupId,
updatedMeta.consumerGroupId)) {
+ return Collections.emptySet();
+ }
+ if (!Objects.equals(currentMeta.creationTime, updatedMeta.creationTime)) {
+ return Collections.emptySet();
+ }
+ if (!Objects.equals(
+ currentMeta.consumerIdToConsumerMeta,
updatedMeta.consumerIdToConsumerMeta)) {
Review Comment:
TODO: Remove "topic" key in ConsumerGroupMeta#removeConsumer.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java:
##########
@@ -162,8 +162,30 @@ public void unbindPrefetchingQueue(final String topicName)
{
LOGGER.warn("Subscription: prefetching queue bound to topic [{}] does
not exist", topicName);
return;
}
+
// clean up uncommitted events
prefetchingQueue.cleanup();
+ prefetchingQueue.markCompleted();
+
+ // we remove this prefetching queue and deregister metrics when consumers
dropping related
+ // subscription...
+ }
+
+ public void removePrefetchingQueue(final String topicName) {
+ final SubscriptionPrefetchingQueue prefetchingQueue =
+ topicNameToPrefetchingQueue.get(topicName);
+ if (Objects.isNull(prefetchingQueue)) {
+ LOGGER.warn("Subscription: prefetching queue bound to topic [{}] does
not exist", topicName);
+ return;
+ }
+
+ if (!prefetchingQueue.isCompleted()) {
Review Comment:
It seems that the "completed" is marked when the pipe is dropped. However in
the normal scenarios (namely the unsubscriptions is manually triggered by the
client) the "drop pipe" is executed after updating consumer group meta. That
is, the "isCompleted" may be false when a topic's last subscription is removed.
##########
integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java:
##########
@@ -543,7 +551,112 @@ public void testTopicInvalidProcessorConfig() throws
Exception {
config.put("processor", "tumbling-time-sampling-processor");
config.put("processor.tumbling-time.interval-seconds", "1");
config.put("processor.down-sampling.split-file", "true");
- testTopicInvalidRuntimeConfigTemplate("topic4", config);
+ testTopicInvalidRuntimeConfigTemplate("topic10", config);
+ }
+
+ @Test
+ public void testTopicWithQueryMode() throws Exception {
+ // Insert some historical data
+ try (final ISession session = senderEnv.getSessionConnection()) {
+ for (int i = 0; i < 100; ++i) {
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d1(time, s1) values (%s, 1)",
i));
+ }
+ session.executeNonQueryStatement("flush");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Create topic
+ final String topicName = "topic11";
+ final String host = senderEnv.getIP();
+ final int port = Integer.parseInt(senderEnv.getPort());
+ try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
+ session.open();
+ final Properties config = new Properties();
+ config.put(TopicConstant.FORMAT_KEY,
TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+ config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_QUERY_VALUE);
+ session.createTopic(topicName, config);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Subscription
+ final AtomicInteger rowCount = new AtomicInteger();
+ final AtomicBoolean isClosed = new AtomicBoolean(false);
+ final Thread thread =
+ new Thread(
+ () -> {
+ try (final SubscriptionPullConsumer consumer =
+ new SubscriptionPullConsumer.Builder()
+ .host(host)
+ .port(port)
+ .consumerId("c1")
+ .consumerGroupId("cg1")
+ .autoCommit(false)
+ .buildPullConsumer()) {
+ consumer.open();
+ consumer.subscribe(topicName);
+ while (!isClosed.get()) {
+ LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS);
// wait some time
+ final List<SubscriptionMessage> messages =
+
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
+ for (final SubscriptionMessage message : messages) {
+ try (final TsFileReader tsFileReader =
+ message.getTsFileHandler().openReader()) {
+ final Path path = new Path("root.db.d1", "s1", true);
+ final QueryDataSet dataSet =
+ tsFileReader.query(
+
QueryExpression.create(Collections.singletonList(path), null));
+ while (dataSet.hasNext()) {
+ dataSet.next();
+ rowCount.addAndGet(1);
+ }
+ }
+ }
+ consumer.commitSync(messages);
+ }
+ consumer.unsubscribe(topicName);
Review Comment:
And better test the ”TerminationPayload“ as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]