DanielWang2035 commented on code in PR #17238:
URL: https://github.com/apache/iotdb/pull/17238#discussion_r3246260988


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java:
##########
@@ -109,89 +201,319 @@ public List<SubscriptionCommitContext> commit(
       final List<SubscriptionCommitContext> commitContexts,
       final boolean nack) {
     final String consumerGroupId = consumerConfig.getConsumerGroupId();
-    final SubscriptionBroker broker = 
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
-    if (Objects.isNull(broker)) {
+    final String consumerId = consumerConfig.getConsumerId();
+    final List<SubscriptionCommitContext> allSuccessful = new ArrayList<>();
+
+    final SubscriptionBroker pipeBroker = 
consumerGroupIdToPipeBroker.get(consumerGroupId);
+    final ConsensusSubscriptionBroker consensusBroker =
+        consumerGroupIdToConsensusBroker.get(consumerGroupId);
+
+    if (Objects.isNull(pipeBroker) && Objects.isNull(consensusBroker)) {
+      final String errorMessage =
+          String.format("Subscription: no broker bound to consumer group 
[%s]", consumerGroupId);
+      LOGGER.warn(errorMessage);
+      throw new SubscriptionException(errorMessage);
+    }
+
+    // Partition commit contexts by which broker owns the topic.
+    final List<SubscriptionCommitContext> pipeContexts = new ArrayList<>();
+    final List<SubscriptionCommitContext> consensusContexts = new 
ArrayList<>();
+    for (final SubscriptionCommitContext ctx : commitContexts) {
+      final String topicName = ctx.getTopicName();
+      if (Objects.nonNull(consensusBroker)
+          && 
ConsensusSubscriptionSetupHandler.isConsensusBasedTopic(topicName)) {
+        consensusContexts.add(ctx);
+      } else {
+        pipeContexts.add(ctx);
+      }
+    }
+
+    if (Objects.nonNull(pipeBroker) && !pipeContexts.isEmpty()) {
+      allSuccessful.addAll(pipeBroker.commit(consumerId, pipeContexts, nack));
+    }
+    if (Objects.nonNull(consensusBroker) && !consensusContexts.isEmpty()) {
+      allSuccessful.addAll(consensusBroker.commit(consumerId, 
consensusContexts, nack));
+    }
+
+    return allSuccessful;
+  }
+
+  public void seek(
+      final ConsumerConfig consumerConfig, final String topicName, final short 
seekType) {
+    final String consumerGroupId = consumerConfig.getConsumerGroupId();
+

Review Comment:
   Fixed. For consensus topics, non-owning DataNodes now treat seek/seekAfter 
as no-op success when no local queue exists, while owning DataNodes still apply 
the seek.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to