DanielWang2035 commented on code in PR #17238:
URL: https://github.com/apache/iotdb/pull/17238#discussion_r3279131397


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java:
##########
@@ -200,41 +522,183 @@ public void bindPrefetchingQueue(final 
SubscriptionSinkSubtask subtask) {
     prefetchingQueueCount.invalidate();
   }
 
-  public void updateCompletedTopicNames(final String consumerGroupId, final 
String topicName) {
-    final SubscriptionBroker broker = 
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
+  public void bindConsensusPrefetchingQueue(
+      final String consumerGroupId,
+      final String topicName,
+      final String orderMode,
+      final ConsensusGroupId consensusGroupId,
+      final IoTConsensusServerImpl serverImpl,
+      final SubscriptionWalRetentionPolicy retentionPolicy,
+      final ConsensusLogToTabletConverter converter,
+      final ConsensusSubscriptionCommitManager commitManager,
+      final RegionProgress fallbackCommittedRegionProgress,
+      final long tailStartSearchIndex,
+      final long initialRuntimeVersion,
+      final boolean initialActive) {
+    consumerGroupIdToConsensusBroker
+        .compute(
+            consumerGroupId,
+            (id, broker) -> {
+              if (Objects.isNull(broker)) {
+                LOGGER.info(
+                    "Subscription: consensus broker bound to consumer group 
[{}] does not exist, create new for binding consensus prefetching queue",
+                    consumerGroupId);
+                return new ConsensusSubscriptionBroker(consumerGroupId);
+              }
+              return broker;
+            })
+        .bindConsensusPrefetchingQueue(
+            topicName,
+            orderMode,
+            consensusGroupId,
+            serverImpl,
+            retentionPolicy,
+            converter,
+            commitManager,
+            fallbackCommittedRegionProgress,
+            tailStartSearchIndex,
+            initialRuntimeVersion,
+            initialActive);
+    prefetchingQueueCount.invalidate();
+  }
+
+  public void refreshConsensusQueueOrderMode(final String topicName, final 
String orderMode) {
+    LOGGER.info(
+        "SubscriptionBrokerAgent: refreshing consensus queue order-mode for 
topic [{}] to [{}]",
+        topicName,
+        orderMode);
+    for (final ConsensusSubscriptionBroker broker : 
consumerGroupIdToConsensusBroker.values()) {
+      broker.refreshConsensusQueueOrderMode(topicName, orderMode);
+    }
+  }
+
+  public void unbindConsensusPrefetchingQueue(
+      final String consumerGroupId, final String topicName) {
+    final ConsensusSubscriptionBroker broker =
+        consumerGroupIdToConsensusBroker.get(consumerGroupId);
     if (Objects.isNull(broker)) {
       LOGGER.warn(
-          "Subscription: broker bound to consumer group [{}] does not exist", 
consumerGroupId);
+          "Subscription: consensus broker bound to consumer group [{}] does 
not exist",
+          consumerGroupId);
       return;
     }
-    broker.updateCompletedTopicNames(topicName);
+    broker.unbindConsensusPrefetchingQueue(topicName);
+    prefetchingQueueCount.invalidate();
+  }
+
+  public void unbindByRegion(final ConsensusGroupId regionId) {
+    int totalClosed = 0;
+    for (final ConsensusSubscriptionBroker broker : 
consumerGroupIdToConsensusBroker.values()) {
+      totalClosed += broker.unbindByRegion(regionId);
+    }
+    if (totalClosed > 0) {
+      prefetchingQueueCount.invalidate();
+      LOGGER.info(
+          "Subscription: unbound {} consensus prefetching queue(s) for removed 
region [{}]",
+          totalClosed,
+          regionId);
+    }
+  }
+
+  /**
+   * Activates or deactivates all consensus prefetching queues bound to {@code 
regionId} across all
+   * consumer groups. Called on leader migration to ensure only the preferred 
writer serves
+   * subscription data.
+   */
+  public void setActiveForRegion(final ConsensusGroupId regionId, final 
boolean active) {
+    LOGGER.info(
+        "SubscriptionBrokerAgent: setActiveForRegion regionId={}, active={}", 
regionId, active);
+    for (final ConsensusSubscriptionBroker broker : 
consumerGroupIdToConsensusBroker.values()) {
+      broker.setActiveForRegion(regionId, active);
+    }
+  }
+
+  public void setActiveWritersForRegion(
+      final ConsensusGroupId regionId, final Set<Integer> activeWriterNodeIds) 
{
+    LOGGER.info(
+        "SubscriptionBrokerAgent: setActiveWritersForRegion regionId={}, 
activeWriterNodeIds={}",
+        regionId,
+        activeWriterNodeIds);
+    for (final ConsensusSubscriptionBroker broker : 
consumerGroupIdToConsensusBroker.values()) {
+      broker.setActiveWritersForRegion(regionId, activeWriterNodeIds);
+    }
+  }
+
+  public void applyRuntimeStateForRegion(
+      final ConsensusGroupId regionId, final ConsensusRegionRuntimeState 
runtimeState) {
+    LOGGER.info(
+        "SubscriptionBrokerAgent: applyRuntimeStateForRegion regionId={}, 
runtimeState={}",
+        regionId,
+        runtimeState);
+    for (final ConsensusSubscriptionBroker broker : 
consumerGroupIdToConsensusBroker.values()) {
+      broker.applyRuntimeStateForRegion(regionId, runtimeState);
+    }
+  }
+
+  public void abortConsensusPendingSeeksForRuntimeStop() {
+    for (final ConsensusSubscriptionBroker broker : 
consumerGroupIdToConsensusBroker.values()) {
+      broker.abortPendingSeeksForRuntimeStop();
+    }
+  }
+
+  public void updateCompletedTopicNames(final String consumerGroupId, final 
String topicName) {
+    final SubscriptionBroker pipeBroker = 
consumerGroupIdToPipeBroker.get(consumerGroupId);
+    if (Objects.isNull(pipeBroker)) {
+      LOGGER.warn(
+          "Subscription: pipe broker bound to consumer group [{}] does not 
exist", consumerGroupId);
+      return;
+    }
+    pipeBroker.updateCompletedTopicNames(topicName);
   }
 
   public void unbindPrefetchingQueue(final String consumerGroupId, final 
String topicName) {
-    final SubscriptionBroker broker = 
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
-    if (Objects.isNull(broker)) {
+    // Try consensus broker first
+    final ConsensusSubscriptionBroker consensusBroker =
+        consumerGroupIdToConsensusBroker.get(consumerGroupId);
+    if (Objects.nonNull(consensusBroker) && 
consensusBroker.hasQueue(topicName)) {
+      consensusBroker.unbindConsensusPrefetchingQueue(topicName);
+      prefetchingQueueCount.invalidate();
+      return;
+    }
+    // Fall back to pipe broker
+    final SubscriptionBroker pipeBroker = 
consumerGroupIdToPipeBroker.get(consumerGroupId);
+    if (Objects.isNull(pipeBroker)) {
       LOGGER.warn(
           "Subscription: broker bound to consumer group [{}] does not exist", 
consumerGroupId);
       return;
     }
-    broker.unbindPrefetchingQueue(topicName);
+    pipeBroker.unbindPrefetchingQueue(topicName);

Review Comment:
   Extracted the shared broker operations into ISubscriptionBroker



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to