DanielWang2035 commented on code in PR #17238:
URL: https://github.com/apache/iotdb/pull/17238#discussion_r3279131397
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java:
##########
@@ -200,41 +522,183 @@ public void bindPrefetchingQueue(final
SubscriptionSinkSubtask subtask) {
prefetchingQueueCount.invalidate();
}
- public void updateCompletedTopicNames(final String consumerGroupId, final
String topicName) {
- final SubscriptionBroker broker =
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
+ public void bindConsensusPrefetchingQueue(
+ final String consumerGroupId,
+ final String topicName,
+ final String orderMode,
+ final ConsensusGroupId consensusGroupId,
+ final IoTConsensusServerImpl serverImpl,
+ final SubscriptionWalRetentionPolicy retentionPolicy,
+ final ConsensusLogToTabletConverter converter,
+ final ConsensusSubscriptionCommitManager commitManager,
+ final RegionProgress fallbackCommittedRegionProgress,
+ final long tailStartSearchIndex,
+ final long initialRuntimeVersion,
+ final boolean initialActive) {
+ consumerGroupIdToConsensusBroker
+ .compute(
+ consumerGroupId,
+ (id, broker) -> {
+ if (Objects.isNull(broker)) {
+ LOGGER.info(
+ "Subscription: consensus broker bound to consumer group
[{}] does not exist, create new for binding consensus prefetching queue",
+ consumerGroupId);
+ return new ConsensusSubscriptionBroker(consumerGroupId);
+ }
+ return broker;
+ })
+ .bindConsensusPrefetchingQueue(
+ topicName,
+ orderMode,
+ consensusGroupId,
+ serverImpl,
+ retentionPolicy,
+ converter,
+ commitManager,
+ fallbackCommittedRegionProgress,
+ tailStartSearchIndex,
+ initialRuntimeVersion,
+ initialActive);
+ prefetchingQueueCount.invalidate();
+ }
+
+ public void refreshConsensusQueueOrderMode(final String topicName, final
String orderMode) {
+ LOGGER.info(
+ "SubscriptionBrokerAgent: refreshing consensus queue order-mode for
topic [{}] to [{}]",
+ topicName,
+ orderMode);
+ for (final ConsensusSubscriptionBroker broker :
consumerGroupIdToConsensusBroker.values()) {
+ broker.refreshConsensusQueueOrderMode(topicName, orderMode);
+ }
+ }
+
+ public void unbindConsensusPrefetchingQueue(
+ final String consumerGroupId, final String topicName) {
+ final ConsensusSubscriptionBroker broker =
+ consumerGroupIdToConsensusBroker.get(consumerGroupId);
if (Objects.isNull(broker)) {
LOGGER.warn(
- "Subscription: broker bound to consumer group [{}] does not exist",
consumerGroupId);
+ "Subscription: consensus broker bound to consumer group [{}] does
not exist",
+ consumerGroupId);
return;
}
- broker.updateCompletedTopicNames(topicName);
+ broker.unbindConsensusPrefetchingQueue(topicName);
+ prefetchingQueueCount.invalidate();
+ }
+
+ public void unbindByRegion(final ConsensusGroupId regionId) {
+ int totalClosed = 0;
+ for (final ConsensusSubscriptionBroker broker :
consumerGroupIdToConsensusBroker.values()) {
+ totalClosed += broker.unbindByRegion(regionId);
+ }
+ if (totalClosed > 0) {
+ prefetchingQueueCount.invalidate();
+ LOGGER.info(
+ "Subscription: unbound {} consensus prefetching queue(s) for removed
region [{}]",
+ totalClosed,
+ regionId);
+ }
+ }
+
+ /**
+ * Activates or deactivates all consensus prefetching queues bound to {@code
regionId} across all
+ * consumer groups. Called on leader migration to ensure only the preferred
writer serves
+ * subscription data.
+ */
+ public void setActiveForRegion(final ConsensusGroupId regionId, final
boolean active) {
+ LOGGER.info(
+ "SubscriptionBrokerAgent: setActiveForRegion regionId={}, active={}",
regionId, active);
+ for (final ConsensusSubscriptionBroker broker :
consumerGroupIdToConsensusBroker.values()) {
+ broker.setActiveForRegion(regionId, active);
+ }
+ }
+
+ public void setActiveWritersForRegion(
+ final ConsensusGroupId regionId, final Set<Integer> activeWriterNodeIds)
{
+ LOGGER.info(
+ "SubscriptionBrokerAgent: setActiveWritersForRegion regionId={},
activeWriterNodeIds={}",
+ regionId,
+ activeWriterNodeIds);
+ for (final ConsensusSubscriptionBroker broker :
consumerGroupIdToConsensusBroker.values()) {
+ broker.setActiveWritersForRegion(regionId, activeWriterNodeIds);
+ }
+ }
+
+ public void applyRuntimeStateForRegion(
+ final ConsensusGroupId regionId, final ConsensusRegionRuntimeState
runtimeState) {
+ LOGGER.info(
+ "SubscriptionBrokerAgent: applyRuntimeStateForRegion regionId={},
runtimeState={}",
+ regionId,
+ runtimeState);
+ for (final ConsensusSubscriptionBroker broker :
consumerGroupIdToConsensusBroker.values()) {
+ broker.applyRuntimeStateForRegion(regionId, runtimeState);
+ }
+ }
+
+ public void abortConsensusPendingSeeksForRuntimeStop() {
+ for (final ConsensusSubscriptionBroker broker :
consumerGroupIdToConsensusBroker.values()) {
+ broker.abortPendingSeeksForRuntimeStop();
+ }
+ }
+
+ public void updateCompletedTopicNames(final String consumerGroupId, final
String topicName) {
+ final SubscriptionBroker pipeBroker =
consumerGroupIdToPipeBroker.get(consumerGroupId);
+ if (Objects.isNull(pipeBroker)) {
+ LOGGER.warn(
+ "Subscription: pipe broker bound to consumer group [{}] does not
exist", consumerGroupId);
+ return;
+ }
+ pipeBroker.updateCompletedTopicNames(topicName);
}
public void unbindPrefetchingQueue(final String consumerGroupId, final
String topicName) {
- final SubscriptionBroker broker =
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
- if (Objects.isNull(broker)) {
+ // Try consensus broker first
+ final ConsensusSubscriptionBroker consensusBroker =
+ consumerGroupIdToConsensusBroker.get(consumerGroupId);
+ if (Objects.nonNull(consensusBroker) &&
consensusBroker.hasQueue(topicName)) {
+ consensusBroker.unbindConsensusPrefetchingQueue(topicName);
+ prefetchingQueueCount.invalidate();
+ return;
+ }
+ // Fall back to pipe broker
+ final SubscriptionBroker pipeBroker =
consumerGroupIdToPipeBroker.get(consumerGroupId);
+ if (Objects.isNull(pipeBroker)) {
LOGGER.warn(
"Subscription: broker bound to consumer group [{}] does not exist",
consumerGroupId);
return;
}
- broker.unbindPrefetchingQueue(topicName);
+ pipeBroker.unbindPrefetchingQueue(topicName);
Review Comment:
Extracted the shared broker operations into ISubscriptionBroker
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]