DanielWang2035 commented on code in PR #17238:
URL: https://github.com/apache/iotdb/pull/17238#discussion_r3279128688


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java:
##########
@@ -109,89 +201,319 @@ public List<SubscriptionCommitContext> commit(
       final List<SubscriptionCommitContext> commitContexts,
       final boolean nack) {
     final String consumerGroupId = consumerConfig.getConsumerGroupId();
-    final SubscriptionBroker broker = 
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
-    if (Objects.isNull(broker)) {
+    final String consumerId = consumerConfig.getConsumerId();
+    final List<SubscriptionCommitContext> allSuccessful = new ArrayList<>();
+
+    final SubscriptionBroker pipeBroker = 
consumerGroupIdToPipeBroker.get(consumerGroupId);
+    final ConsensusSubscriptionBroker consensusBroker =
+        consumerGroupIdToConsensusBroker.get(consumerGroupId);
+
+    if (Objects.isNull(pipeBroker) && Objects.isNull(consensusBroker)) {
+      final String errorMessage =
+          String.format("Subscription: no broker bound to consumer group 
[%s]", consumerGroupId);
+      LOGGER.warn(errorMessage);
+      throw new SubscriptionException(errorMessage);
+    }
+
+    // Partition commit contexts by which broker owns the topic.
+    final List<SubscriptionCommitContext> pipeContexts = new ArrayList<>();
+    final List<SubscriptionCommitContext> consensusContexts = new 
ArrayList<>();
+    for (final SubscriptionCommitContext ctx : commitContexts) {
+      final String topicName = ctx.getTopicName();
+      if (Objects.nonNull(consensusBroker)
+          && 
ConsensusSubscriptionSetupHandler.isConsensusBasedTopic(topicName)) {
+        consensusContexts.add(ctx);
+      } else {
+        pipeContexts.add(ctx);
+      }
+    }
+
+    if (Objects.nonNull(pipeBroker) && !pipeContexts.isEmpty()) {
+      allSuccessful.addAll(pipeBroker.commit(consumerId, pipeContexts, nack));
+    }
+    if (Objects.nonNull(consensusBroker) && !consensusContexts.isEmpty()) {
+      allSuccessful.addAll(consensusBroker.commit(consumerId, 
consensusContexts, nack));
+    }
+
+    return allSuccessful;
+  }
+
+  public void seek(
+      final ConsumerConfig consumerConfig, final String topicName, final short 
seekType) {
+    final String consumerGroupId = consumerConfig.getConsumerGroupId();
+
+    final ConsensusSubscriptionBroker consensusBroker =
+        consumerGroupIdToConsensusBroker.get(consumerGroupId);
+    if (Objects.nonNull(consensusBroker) && 
consensusBroker.hasQueue(topicName)) {
+      ensureConsensusSeekRuntimeAvailable(consumerGroupId, topicName, "seek");
+      if (seekType != PipeSubscribeSeekReq.SEEK_TO_BEGINNING
+          && seekType != PipeSubscribeSeekReq.SEEK_TO_END) {
+        final String errorMessage =
+            String.format(
+                "Subscription: consensus seek only supports beginning/end or 
topic progress, "
+                    + "consumerGroup=%s, topic=%s, seekType=%s",
+                consumerGroupId, topicName, seekType);
+        LOGGER.warn(errorMessage);
+        throw new SubscriptionException(errorMessage);
+      }
+      consensusBroker.seek(topicName, seekType);
+      return;
+    }
+
+    if (isConsensusRuntimeUnsupported(topicName)) {
+      final String errorMessage =
+          buildUnsupportedConsensusRuntimeMessage(consumerGroupId, topicName, 
"seek");
+      LOGGER.warn(errorMessage);
+      throw new SubscriptionException(errorMessage);
+    }
+
+    final String errorMessage =
+        String.format(
+            "Subscription: seek is only supported for consensus-based 
subscriptions, "
+                + "consumerGroup=%s, topic=%s",
+            consumerGroupId, topicName);
+    LOGGER.warn(errorMessage);
+    throw new SubscriptionException(errorMessage);
+  }
+
+  public void seekToTopicProgress(
+      final ConsumerConfig consumerConfig,
+      final String topicName,
+      final TopicProgress topicProgress) {
+    final String consumerGroupId = consumerConfig.getConsumerGroupId();
+
+    final ConsensusSubscriptionBroker consensusBroker =
+        consumerGroupIdToConsensusBroker.get(consumerGroupId);
+    if (Objects.nonNull(consensusBroker) && 
consensusBroker.hasQueue(topicName)) {
+      ensureConsensusSeekRuntimeAvailable(consumerGroupId, topicName, 
"seek(topicProgress)");
+      consensusBroker.seek(topicName, topicProgress);
+      return;
+    }
+
+    if (isConsensusRuntimeUnsupported(topicName)) {
+      final String errorMessage =
+          buildUnsupportedConsensusRuntimeMessage(
+              consumerGroupId, topicName, "seek(topicProgress)");
+      LOGGER.warn(errorMessage);
+      throw new SubscriptionException(errorMessage);
+    }
+
+    final String errorMessage =
+        String.format(
+            "Subscription: seek(topicProgress) is only supported for 
consensus-based subscriptions, "
+                + "consumerGroup=%s, topic=%s",
+            consumerGroupId, topicName);
+    LOGGER.warn(errorMessage);
+    throw new SubscriptionException(errorMessage);
+  }
+
+  public void seekAfterTopicProgress(
+      final ConsumerConfig consumerConfig,
+      final String topicName,
+      final TopicProgress topicProgress) {
+    final String consumerGroupId = consumerConfig.getConsumerGroupId();
+
+    final ConsensusSubscriptionBroker consensusBroker =
+        consumerGroupIdToConsensusBroker.get(consumerGroupId);
+    if (Objects.nonNull(consensusBroker) && 
consensusBroker.hasQueue(topicName)) {
+      ensureConsensusSeekRuntimeAvailable(consumerGroupId, topicName, 
"seekAfter(topicProgress)");
+      consensusBroker.seekAfter(topicName, topicProgress);
+      return;
+    }
+
+    if (isConsensusRuntimeUnsupported(topicName)) {
+      final String errorMessage =
+          buildUnsupportedConsensusRuntimeMessage(
+              consumerGroupId, topicName, "seekAfter(topicProgress)");
+      LOGGER.warn(errorMessage);
+      throw new SubscriptionException(errorMessage);
+    }
+
+    final String errorMessage =
+        String.format(
+            "Subscription: seekAfter(topicProgress) is only supported for 
consensus-based subscriptions, "
+                + "consumerGroup=%s, topic=%s",
+            consumerGroupId, topicName);
+    LOGGER.warn(errorMessage);
+    throw new SubscriptionException(errorMessage);
+  }
+
+  private void ensureConsensusSeekRuntimeAvailable(
+      final String consumerGroupId, final String topicName, final String 
operation) {
+    if (!ConsensusSubscriptionPrefetchExecutorManager.getInstance().isStarted()
+        || SubscriptionAgent.runtime().isShutdown()) {
       final String errorMessage =
           String.format(
-              "Subscription: broker bound to consumer group [%s] does not 
exist", consumerGroupId);
+              "Subscription: consensus %s is unavailable because subscription 
runtime is stopped, "
+                  + "consumerGroup=%s, topic=%s",
+              operation, consumerGroupId, topicName);
       LOGGER.warn(errorMessage);
       throw new SubscriptionException(errorMessage);
     }
-    final String consumerId = consumerConfig.getConsumerId();
-    return broker.commit(consumerId, commitContexts, nack);
+  }
+
+  private boolean isConsensusRuntimeUnsupported(final String topicName) {
+    return !(DataRegionConsensusImpl.getInstance() instanceof IoTConsensus)
+        && ConsensusSubscriptionSetupHandler.isConsensusBasedTopic(topicName);
+  }
+
+  private List<String> getUnsupportedConsensusTopics(final Set<String> 
topicNames) {
+    if (DataRegionConsensusImpl.getInstance() instanceof IoTConsensus) {
+      return Collections.emptyList();
+    }
+
+    final List<String> unsupportedConsensusTopics = new ArrayList<>();
+    for (final String topicName : topicNames) {
+      if (ConsensusSubscriptionSetupHandler.isConsensusBasedTopic(topicName)) {
+        unsupportedConsensusTopics.add(topicName);
+      }
+    }
+    return unsupportedConsensusTopics;
+  }
+
+  private String buildUnsupportedConsensusRuntimeMessage(
+      final String consumerGroupId, final String topicName, final String 
operation) {
+    return buildUnsupportedConsensusRuntimeMessage(
+        consumerGroupId, Collections.singletonList(topicName), operation);
+  }
+
+  private String buildUnsupportedConsensusRuntimeMessage(
+      final String consumerGroupId, final List<String> topicNames, final 
String operation) {
+    final IConsensus dataRegionConsensus = 
DataRegionConsensusImpl.getInstance();
+    final String configuredProtocol =
+        
IoTDBDescriptor.getInstance().getConfig().getDataRegionConsensusProtocolClass();
+    final String runtimeConsensusImplementation =
+        Objects.nonNull(dataRegionConsensus) ? 
dataRegionConsensus.getClass().getName() : "null";
+    return String.format(
+        "Subscription: cannot %s consensus-based topic(s) %s in consumer group 
[%s] because "
+            + "mode=consensus only supports 
data_region_consensus_protocol_class=%s, but current "
+            + "configured value is %s (runtime consensus implementation: %s)",
+        operation,
+        topicNames,
+        consumerGroupId,
+        ConsensusFactory.IOT_CONSENSUS,
+        configuredProtocol,
+        runtimeConsensusImplementation);
   }
 
   public boolean isCommitContextOutdated(final SubscriptionCommitContext 
commitContext) {
     final String consumerGroupId = commitContext.getConsumerGroupId();
-    final SubscriptionBroker broker = 
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
-    if (Objects.isNull(broker)) {
+    final String topicName = commitContext.getTopicName();
+
+    // Try consensus broker first
+    final ConsensusSubscriptionBroker consensusBroker =
+        consumerGroupIdToConsensusBroker.get(consumerGroupId);
+    if (Objects.nonNull(consensusBroker) && 
consensusBroker.hasQueue(topicName)) {
+      return consensusBroker.isCommitContextOutdated(commitContext);
+    }
+
+    // Fall back to pipe broker
+    final SubscriptionBroker pipeBroker = 
consumerGroupIdToPipeBroker.get(consumerGroupId);
+    if (Objects.isNull(pipeBroker)) {
       return true;
     }
-    return broker.isCommitContextOutdated(commitContext);
+    return pipeBroker.isCommitContextOutdated(commitContext);
   }
 
   public List<String> fetchTopicNamesToUnsubscribe(
       final ConsumerConfig consumerConfig, final Set<String> topicNames) {
     final String consumerGroupId = consumerConfig.getConsumerGroupId();
-    final SubscriptionBroker broker = 
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
-    if (Objects.isNull(broker)) {
+
+    // Consensus-based subscription topics are unbounded streams, so they do 
not trigger
+    // auto-unsubscribe.
+    final ConsensusSubscriptionBroker consensusBroker =
+        consumerGroupIdToConsensusBroker.get(consumerGroupId);
+    final Set<String> pipeOnlyTopicNames;
+    if (Objects.nonNull(consensusBroker)) {
+      pipeOnlyTopicNames = new java.util.HashSet<>(topicNames);
+      pipeOnlyTopicNames.removeIf(consensusBroker::hasQueue);
+    } else {
+      pipeOnlyTopicNames = topicNames;
+    }
+
+    if (pipeOnlyTopicNames.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    final SubscriptionBroker pipeBroker = 
consumerGroupIdToPipeBroker.get(consumerGroupId);
+    if (Objects.isNull(pipeBroker)) {
       return Collections.emptyList();
     }
-    return broker.fetchTopicNamesToUnsubscribe(topicNames);
+    return pipeBroker.fetchTopicNamesToUnsubscribe(pipeOnlyTopicNames);
   }
 
   /////////////////////////////// broker ///////////////////////////////
 
   public boolean isBrokerExist(final String consumerGroupId) {
-    return consumerGroupIdToSubscriptionBroker.containsKey(consumerGroupId);
+    return consumerGroupIdToPipeBroker.containsKey(consumerGroupId)
+        || consumerGroupIdToConsensusBroker.containsKey(consumerGroupId);
   }
 
   public void createBrokerIfNotExist(final String consumerGroupId) {
-    consumerGroupIdToSubscriptionBroker.computeIfAbsent(consumerGroupId, 
SubscriptionBroker::new);
-    LOGGER.info("Subscription: create broker bound to consumer group [{}]", 
consumerGroupId);
+    consumerGroupIdToPipeBroker.computeIfAbsent(consumerGroupId, 
SubscriptionBroker::new);
+    LOGGER.info("Subscription: create pipe broker bound to consumer group 
[{}]", consumerGroupId);
   }

Review Comment:
   Renamed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to