DanielWang2035 commented on code in PR #17238: URL: https://github.com/apache/iotdb/pull/17238#discussion_r3279206769
########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/ConsensusSubscriptionBroker.java: ########## @@ -0,0 +1,811 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.subscription.broker; + +import org.apache.iotdb.commons.consensus.ConsensusGroupId; +import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; +import org.apache.iotdb.consensus.iot.IoTConsensusServerImpl; +import org.apache.iotdb.consensus.iot.SubscriptionWalRetentionPolicy; +import org.apache.iotdb.db.subscription.broker.consensus.ConsensusLogToTabletConverter; +import org.apache.iotdb.db.subscription.broker.consensus.ConsensusPrefetchingQueue; +import org.apache.iotdb.db.subscription.broker.consensus.ConsensusRegionRuntimeState; +import org.apache.iotdb.db.subscription.broker.consensus.ConsensusSubscriptionCommitManager; +import org.apache.iotdb.db.subscription.event.SubscriptionEvent; +import org.apache.iotdb.rpc.subscription.payload.poll.RegionProgress; +import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext; +import org.apache.iotdb.rpc.subscription.payload.poll.TopicProgress; +import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeSeekReq; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +/** + * Consensus-based subscription broker that reads data directly from IoTConsensus WAL. Each instance + * manages consensus prefetching queues for a single consumer group. + */ +public class ConsensusSubscriptionBroker implements ISubscriptionBroker { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConsensusSubscriptionBroker.class); + + private final String brokerId; // consumer group id + + /** Maps topic name to a list of ConsensusPrefetchingQueues, one per data region. */ + private final Map<String, List<ConsensusPrefetchingQueue>> topicNameToConsensusPrefetchingQueues; + + /** Round-robin counter for fair polling among region queues already assigned to this consumer. */ + private final AtomicInteger pollRoundRobinIndex = new AtomicInteger(0); + + private final Map<String, ConcurrentHashMap<String, Long>> topicConsumerLastPollMs = + new ConcurrentHashMap<>(); + + private final Map<String, TopicOwnershipSnapshot> topicOwnershipSnapshots = + new ConcurrentHashMap<>(); + + public ConsensusSubscriptionBroker(final String brokerId) { + this.brokerId = brokerId; + this.topicNameToConsensusPrefetchingQueues = new ConcurrentHashMap<>(); + } + + @Override + public boolean isEmpty() { + return topicNameToConsensusPrefetchingQueues.isEmpty(); + } + + @Override + public boolean hasQueue(final String topicName) { + final List<ConsensusPrefetchingQueue> queues = + topicNameToConsensusPrefetchingQueues.get(topicName); + return Objects.nonNull(queues) + && !queues.isEmpty() + && queues.stream().anyMatch(q -> !q.isClosed()); + } + + //////////////////////////// poll //////////////////////////// + + @Override + public List<SubscriptionEvent> poll( + final String consumerId, final Set<String> topicNames, final long maxBytes) { + return poll(consumerId, topicNames, maxBytes, Collections.emptyMap()); + } + + public List<SubscriptionEvent> poll( + final String consumerId, + final Set<String> topicNames, + final long maxBytes, + final Map<String, TopicProgress> progressByTopic) { + LOGGER.debug( + "ConsensusSubscriptionBroker [{}]: poll called, consumerId={}, topicNames={}, " + + "queueCount={}, maxBytes={}", + brokerId, + consumerId, + topicNames, + topicNameToConsensusPrefetchingQueues.size(), + maxBytes); + + final List<SubscriptionEvent> eventsToPoll = new ArrayList<>(); + final List<SubscriptionEvent> eventsToNack = new ArrayList<>(); + long totalSize = 0; + + for (final String topicName : topicNames) { + final List<ConsensusPrefetchingQueue> queues = + topicNameToConsensusPrefetchingQueues.get(topicName); + if (Objects.isNull(queues) || queues.isEmpty()) { + continue; + } + + final TopicOwnershipSnapshot ownershipSnapshot = + refreshAndGetTopicOwnership(topicName, queues, consumerId); + final List<ConsensusPrefetchingQueue> assignedQueues = + getAssignedQueues(queues, consumerId, ownershipSnapshot); + if (assignedQueues.isEmpty()) { + continue; + } + + final List<ConsensusPrefetchingQueue> pollQueues = + buildPollOrderForAssignedQueues(assignedQueues, topicName); + final int eventsBeforeTopicPoll = eventsToPoll.size(); + + for (final ConsensusPrefetchingQueue consensusQueue : pollQueues) { + if (consensusQueue.isClosed()) { + continue; + } + + final String regionIdStr = consensusQueue.getConsensusGroupId().toString(); + final TopicProgress topicProgress = progressByTopic.get(topicName); + final RegionProgress regionProgress = + Objects.nonNull(topicProgress) + ? topicProgress.getRegionProgress().get(regionIdStr) + : null; + + final SubscriptionEvent event = consensusQueue.poll(consumerId, regionProgress); + if (Objects.isNull(event)) { + continue; + } + + final long currentSize; + try { + currentSize = event.getCurrentResponseSize(); + } catch (final IOException e) { + eventsToNack.add(event); + continue; + } + + eventsToPoll.add(event); + totalSize += currentSize; + + if (totalSize >= maxBytes) { + break; + } + } + if (totalSize >= maxBytes) { + break; + } + } + + // Nack any events that had errors + if (!eventsToNack.isEmpty()) { + commit( + consumerId, + eventsToNack.stream() + .map(SubscriptionEvent::getCommitContext) + .collect(Collectors.toList()), + true); + } + + LOGGER.debug( + "ConsensusSubscriptionBroker [{}]: poll result, consumerId={}, eventsPolled={}, eventsNacked={}", + brokerId, + consumerId, + eventsToPoll.size(), + eventsToNack.size()); + + return eventsToPoll; + } + + @Override + public List<SubscriptionEvent> pollTablets( + final String consumerId, final SubscriptionCommitContext commitContext, final int offset) { + final String topicName = commitContext.getTopicName(); + final List<ConsensusPrefetchingQueue> queues = + topicNameToConsensusPrefetchingQueues.get(topicName); + if (Objects.isNull(queues) || queues.isEmpty()) { + return Collections.emptyList(); + } + + final ConsensusPrefetchingQueue assignedQueue = + getAssignedQueueForConsumer( + queues, topicName, consumerId, commitContext.getRegionId(), "pollTablets"); + if (Objects.isNull(assignedQueue)) { + return Collections.emptyList(); + } + + final SubscriptionEvent event = assignedQueue.pollTablets(consumerId, commitContext, offset); + if (Objects.nonNull(event)) { + return Collections.singletonList(event); + } + return Collections.emptyList(); + } + + //////////////////////////// commit //////////////////////////// + + @Override + public List<SubscriptionCommitContext> commit( + final String consumerId, + final List<SubscriptionCommitContext> commitContexts, + final boolean nack) { + final List<SubscriptionCommitContext> successfulCommitContexts = new ArrayList<>(); + for (final SubscriptionCommitContext commitContext : commitContexts) { + final String topicName = commitContext.getTopicName(); + final List<ConsensusPrefetchingQueue> queues = + topicNameToConsensusPrefetchingQueues.get(topicName); + if (Objects.isNull(queues) || queues.isEmpty()) { + LOGGER.warn( + "ConsensusSubscriptionBroker [{}]: no queues for topic [{}] to commit", + brokerId, + topicName); + continue; + } + + final ConsensusPrefetchingQueue assignedQueue = + getAssignedQueueForConsumer( + queues, topicName, consumerId, commitContext.getRegionId(), nack ? "nack" : "ack"); + boolean handled = false; + if (Objects.nonNull(assignedQueue)) { + final boolean success; + if (!nack) { + success = assignedQueue.ackSilent(consumerId, commitContext); + } else { + success = assignedQueue.nackSilent(consumerId, commitContext); + } + if (success) { + successfulCommitContexts.add(commitContext); + handled = true; + } + } + if (!handled) { + LOGGER.warn( + "ConsensusSubscriptionBroker [{}]: commit context {} not found in any of {} region queue(s) for topic [{}]", + brokerId, + commitContext, + queues.size(), + topicName); + } + } + return successfulCommitContexts; + } + + @Override + public boolean isCommitContextOutdated(final SubscriptionCommitContext commitContext) { + final String topicName = commitContext.getTopicName(); + final List<ConsensusPrefetchingQueue> queues = + topicNameToConsensusPrefetchingQueues.get(topicName); + if (Objects.isNull(queues) || queues.isEmpty()) { + return true; + } + // Route directly to the correct region queue using regionId + final String regionId = commitContext.getRegionId(); + for (final ConsensusPrefetchingQueue q : queues) { + if (!regionId.isEmpty() && !regionId.equals(q.getConsensusGroupId().toString())) { + continue; + } + return q.isCommitContextOutdated(commitContext); + } + return true; + } + + //////////////////////////// seek //////////////////////////// + + public void seek(final String topicName, final short seekType) { + final List<ConsensusPrefetchingQueue> queues = + topicNameToConsensusPrefetchingQueues.get(topicName); + if (Objects.isNull(queues) || queues.isEmpty()) { + LOGGER.warn( + "ConsensusSubscriptionBroker [{}]: no queues for topic [{}] to seek", + brokerId, + topicName); + return; + } + + for (final ConsensusPrefetchingQueue queue : queues) { + if (queue.isClosed()) { + continue; + } + switch (seekType) { + case PipeSubscribeSeekReq.SEEK_TO_BEGINNING: + queue.seekToBeginning(); + break; + case PipeSubscribeSeekReq.SEEK_TO_END: + queue.seekToEnd(); + break; + default: + LOGGER.warn( + "ConsensusSubscriptionBroker [{}]: unsupported seekType {} for topic [{}]", + brokerId, + seekType, + topicName); + break; + } + } + } + + public void seek(final String topicName, final TopicProgress topicProgress) { + final TopicProgress safeProgress = + topicProgress != null ? topicProgress : new TopicProgress(Collections.emptyMap()); + final List<ConsensusPrefetchingQueue> queues = + topicNameToConsensusPrefetchingQueues.get(topicName); + if (Objects.isNull(queues) || queues.isEmpty()) { + LOGGER.warn( + "ConsensusSubscriptionBroker [{}]: no queues for topic [{}] to seek(topicProgress)", + brokerId, + topicName); + return; + } + for (final ConsensusPrefetchingQueue queue : queues) { + if (queue.isClosed()) { + continue; + } + final RegionProgress regionProgress = + safeProgress.getRegionProgress().get(queue.getConsensusGroupId().toString()); + seekQueueToRegionProgress(queue, regionProgress, false); + } + } + + public void seekAfter(final String topicName, final TopicProgress topicProgress) { + final TopicProgress safeProgress = + topicProgress != null ? topicProgress : new TopicProgress(Collections.emptyMap()); + final List<ConsensusPrefetchingQueue> queues = + topicNameToConsensusPrefetchingQueues.get(topicName); + if (Objects.isNull(queues) || queues.isEmpty()) { + LOGGER.warn( + "ConsensusSubscriptionBroker [{}]: no queues for topic [{}] to seekAfter(topicProgress)", + brokerId, + topicName); + return; + } + for (final ConsensusPrefetchingQueue queue : queues) { + if (queue.isClosed()) { + continue; + } + final RegionProgress regionProgress = + safeProgress.getRegionProgress().get(queue.getConsensusGroupId().toString()); + seekQueueToRegionProgress(queue, regionProgress, true); + } + } + + private void seekQueueToRegionProgress( + final ConsensusPrefetchingQueue queue, + final RegionProgress regionProgress, + final boolean seekAfter) { + if (Objects.isNull(regionProgress) || regionProgress.getWriterPositions().isEmpty()) { + return; + } + if (seekAfter) { + queue.seekAfterRegionProgress(regionProgress); + } else { + queue.seekToRegionProgress(regionProgress); + } + } + + //////////////////////////// prefetching //////////////////////////// + + @Override + public boolean executePrefetch(final String topicName) { + // Consensus prefetch is fully driven by queue-local wakeup sources and the dedicated delayed + // scheduler. This interface remains only to satisfy the shared broker contract used by + // pipe-based subscription. + return false; + } + + @Override + public int getEventCount(final String topicName) { + final List<ConsensusPrefetchingQueue> queues = + topicNameToConsensusPrefetchingQueues.get(topicName); + if (Objects.isNull(queues)) { + return 0; + } + return queues.stream() + .filter(queue -> !queue.isClosed()) + .mapToInt(ConsensusPrefetchingQueue::getPrefetchedEventCount) + .sum(); + } + + @Override + public int getQueueCount() { + return topicNameToConsensusPrefetchingQueues.size(); + } + + /** + * Returns per-region lag information for all topics managed by this broker. The result maps + * "topicName/regionId" to the lag (number of WAL entries behind). + */ + public Map<String, Long> getLagSummary() { + final Map<String, Long> lagMap = new ConcurrentHashMap<>(); + for (final Map.Entry<String, List<ConsensusPrefetchingQueue>> entry : + topicNameToConsensusPrefetchingQueues.entrySet()) { + for (final ConsensusPrefetchingQueue queue : entry.getValue()) { + if (!queue.isClosed()) { + lagMap.put(entry.getKey() + "/" + queue.getConsensusGroupId().toString(), queue.getLag()); + } + } + } + return lagMap; + } + + private TopicOwnershipSnapshot refreshAndGetTopicOwnership( + final String topicName, + final List<ConsensusPrefetchingQueue> queues, + final String consumerId) { + final ConcurrentHashMap<String, Long> consumerTimestamps = + topicConsumerLastPollMs.computeIfAbsent(topicName, ignored -> new ConcurrentHashMap<>()); + consumerTimestamps.put(consumerId, System.currentTimeMillis()); + evictInactiveConsumers(consumerTimestamps); + final List<String> sortedConsumers = new ArrayList<>(consumerTimestamps.keySet()); + Collections.sort(sortedConsumers); + + final List<String> activeRegionIds = + queues.stream() + .filter(q -> !q.isClosed()) + .map(q -> q.getConsensusGroupId().toString()) + .sorted() + .collect(Collectors.toList()); + + final TopicOwnershipSnapshot existingSnapshot = topicOwnershipSnapshots.get(topicName); + if (Objects.nonNull(existingSnapshot) + && existingSnapshot.hasSameConsumers(sortedConsumers) + && existingSnapshot.hasSameRegions(activeRegionIds)) { + return existingSnapshot; + } + + final TopicOwnershipSnapshot refreshedSnapshot = + TopicOwnershipSnapshot.create(sortedConsumers, activeRegionIds); + topicOwnershipSnapshots.put(topicName, refreshedSnapshot); + LOGGER.debug( + "ConsensusSubscriptionBroker [{}]: refreshed ownership for topic [{}], consumers={}, regions={}, generation={}", + brokerId, + topicName, + sortedConsumers, + activeRegionIds, + refreshedSnapshot.getGeneration()); + return refreshedSnapshot; + } + + private List<ConsensusPrefetchingQueue> getAssignedQueues( + final List<ConsensusPrefetchingQueue> queues, + final String consumerId, + final TopicOwnershipSnapshot ownershipSnapshot) { + if (Objects.isNull(ownershipSnapshot) || ownershipSnapshot.isEmpty()) { + return Collections.emptyList(); + } + final List<ConsensusPrefetchingQueue> assignedQueues = new ArrayList<>(); + for (final ConsensusPrefetchingQueue queue : queues) { + if (queue.isClosed()) { + continue; + } + if (consumerId.equals( + ownershipSnapshot.getOwnerConsumerId(queue.getConsensusGroupId().toString()))) { + assignedQueues.add(queue); + } + } + return assignedQueues; + } + + private List<ConsensusPrefetchingQueue> buildPollOrderForAssignedQueues( + final List<ConsensusPrefetchingQueue> assignedQueues, final String topicName) { + if (assignedQueues.size() <= 1) { + return assignedQueues; + } + final List<ConsensusPrefetchingQueue> pollQueues = new ArrayList<>(assignedQueues); + if (SubscriptionConfig.getInstance().isSubscriptionConsensusLagBasedPriority()) { + pollQueues.sort( + Comparator.comparingLong(ConsensusPrefetchingQueue::getLag) + .reversed() + .thenComparing(q -> q.getConsensusGroupId().toString())); + return pollQueues; + } + + final int startOffset = Math.floorMod(pollRoundRobinIndex.getAndIncrement(), pollQueues.size()); + final List<ConsensusPrefetchingQueue> orderedQueues = new ArrayList<>(pollQueues.size()); + for (int i = 0; i < pollQueues.size(); i++) { + orderedQueues.add(pollQueues.get((startOffset + i) % pollQueues.size())); + } + LOGGER.debug( + "ConsensusSubscriptionBroker [{}]: stable ownership poll order for topic [{}], assignedQueueCount={}", + brokerId, + topicName, + orderedQueues.size()); + return orderedQueues; + } + + private ConsensusPrefetchingQueue getAssignedQueueForConsumer( + final List<ConsensusPrefetchingQueue> queues, + final String topicName, + final String consumerId, + final String regionId, + final String action) { + final TopicOwnershipSnapshot ownershipSnapshot = + refreshAndGetTopicOwnership(topicName, queues, consumerId); + for (final ConsensusPrefetchingQueue queue : queues) { + if (queue.isClosed()) { + continue; + } + if (!regionId.isEmpty() && !regionId.equals(queue.getConsensusGroupId().toString())) { + continue; + } + if (consumerId.equals( + ownershipSnapshot.getOwnerConsumerId(queue.getConsensusGroupId().toString()))) { + return queue; + } + LOGGER.debug( + "ConsensusSubscriptionBroker [{}]: consumer [{}] skipped {} on topic [{}], region [{}] is currently owned by [{}]", + brokerId, + consumerId, + action, + topicName, + queue.getConsensusGroupId(), + ownershipSnapshot.getOwnerConsumerId(queue.getConsensusGroupId().toString())); + return null; + } + return null; + } + + /** Evicts consumers that have not polled within the configured eviction timeout. */ + private void evictInactiveConsumers(final ConcurrentHashMap<String, Long> consumerTimestamps) { + final long now = System.currentTimeMillis(); + final long timeout = + SubscriptionConfig.getInstance().getSubscriptionConsensusConsumerEvictionTimeoutMs(); + consumerTimestamps.entrySet().removeIf(entry -> (now - entry.getValue()) > timeout); + } + + //////////////////////////// queue management //////////////////////////// + + public void bindConsensusPrefetchingQueue( + final String topicName, + final String orderMode, + final ConsensusGroupId consensusGroupId, + final IoTConsensusServerImpl serverImpl, + final SubscriptionWalRetentionPolicy retentionPolicy, + final ConsensusLogToTabletConverter converter, + final ConsensusSubscriptionCommitManager commitManager, + final RegionProgress fallbackCommittedRegionProgress, + final long tailStartSearchIndex, + final long initialRuntimeVersion, + final boolean initialActive) { + // Get or create the list of queues for this topic + final List<ConsensusPrefetchingQueue> queues = + topicNameToConsensusPrefetchingQueues.computeIfAbsent( + topicName, k -> new CopyOnWriteArrayList<>()); + + // Check for duplicate region binding + for (final ConsensusPrefetchingQueue existing : queues) { + if (consensusGroupId.equals(existing.getConsensusGroupId()) && !existing.isClosed()) { + LOGGER.info( + "Subscription: consensus prefetching queue for topic [{}], region [{}] " + + "in consumer group [{}] already exists, skipping", + topicName, + consensusGroupId, + brokerId); + return; + } + } + + // Create the per-region consensus queue for this topic. + final ConsensusPrefetchingQueue consensusQueue = + new ConsensusPrefetchingQueue( + brokerId, + topicName, + orderMode, + consensusGroupId, + serverImpl, + retentionPolicy, + converter, + commitManager, + fallbackCommittedRegionProgress, + tailStartSearchIndex, + initialRuntimeVersion, + initialActive); + queues.add(consensusQueue); + LOGGER.info( + "Subscription: create consensus prefetching queue bound to topic [{}] for consumer group [{}], " + + "consensusGroupId={}, fallbackCommittedRegionProgress={}, " + + "tailStartSearchIndex={}, initialRuntimeVersion={}, initialActive={}, totalRegionQueues={}", + topicName, + brokerId, + consensusGroupId, + fallbackCommittedRegionProgress, + tailStartSearchIndex, + initialRuntimeVersion, + initialActive, + queues.size()); + } + + public void refreshConsensusQueueOrderMode(final String topicName, final String orderMode) { + final List<ConsensusPrefetchingQueue> queues = + topicNameToConsensusPrefetchingQueues.get(topicName); + if (Objects.isNull(queues) || queues.isEmpty()) { + return; + } + + for (final ConsensusPrefetchingQueue queue : queues) { + queue.setOrderMode(orderMode); + } + } + + public void unbindConsensusPrefetchingQueue(final String topicName) { + closeAndRemoveConsensusPrefetchingQueues(topicName, true); + } + + public int unbindByRegion(final ConsensusGroupId regionId) { + int closedCount = 0; + for (final Map.Entry<String, List<ConsensusPrefetchingQueue>> entry : + topicNameToConsensusPrefetchingQueues.entrySet()) { + final List<ConsensusPrefetchingQueue> queues = entry.getValue(); + final int beforeSize = queues.size(); + queues.removeIf( + q -> { + if (!regionId.equals(q.getConsensusGroupId())) { + return false; + } + q.close(); + LOGGER.info( + "Subscription: closed consensus prefetching queue for topic [{}] region [{}] " + + "in consumer group [{}] due to region removal", + entry.getKey(), + regionId, + brokerId); + return true; + }); + closedCount += beforeSize - queues.size(); + if (queues.isEmpty()) { + topicNameToConsensusPrefetchingQueues.remove(entry.getKey(), queues); + topicConsumerLastPollMs.remove(entry.getKey()); + topicOwnershipSnapshots.remove(entry.getKey()); + } else { + topicOwnershipSnapshots.remove(entry.getKey()); + } + } + return closedCount; + } + + /** + * Activates or deactivates all queues bound to {@code regionId}. Called on leader migration: + * {@code false} on old leader, {@code true} on new leader. Inactive queues skip prefetching and + * return null on poll, ensuring only the preferred writer serves subscription data. + */ + public void setActiveForRegion(final ConsensusGroupId regionId, final boolean active) { + for (final List<ConsensusPrefetchingQueue> queues : + topicNameToConsensusPrefetchingQueues.values()) { + for (final ConsensusPrefetchingQueue q : queues) { + if (regionId.equals(q.getConsensusGroupId())) { + q.setActive(active); + } + } + } + } + + public void setActiveWritersForRegion( + final ConsensusGroupId regionId, final Set<Integer> activeWriterNodeIds) { + final Set<Integer> normalizedActiveWriterNodeIds = + Collections.unmodifiableSet(new LinkedHashSet<>(activeWriterNodeIds)); + for (final List<ConsensusPrefetchingQueue> queues : + topicNameToConsensusPrefetchingQueues.values()) { + for (final ConsensusPrefetchingQueue q : queues) { + if (regionId.equals(q.getConsensusGroupId())) { + q.setActiveWriterNodeIds(normalizedActiveWriterNodeIds); + } + } + } + } + + public void applyRuntimeStateForRegion( + final ConsensusGroupId regionId, final ConsensusRegionRuntimeState runtimeState) { + for (final List<ConsensusPrefetchingQueue> queues : + topicNameToConsensusPrefetchingQueues.values()) { + for (final ConsensusPrefetchingQueue q : queues) { + if (regionId.equals(q.getConsensusGroupId())) { + q.applyRuntimeState(runtimeState); + } + } + } + } + + public void abortPendingSeeksForRuntimeStop() { + for (final List<ConsensusPrefetchingQueue> queues : + topicNameToConsensusPrefetchingQueues.values()) { + for (final ConsensusPrefetchingQueue q : queues) { + if (!q.isClosed()) { + q.abortPendingSeekForRuntimeStop(); + } + } + } + } + + @Override + public void removeQueue(final String topicName) { + final List<ConsensusPrefetchingQueue> queues = + topicNameToConsensusPrefetchingQueues.get(topicName); + if (Objects.nonNull(queues) && !queues.isEmpty()) { + LOGGER.info( + "Subscription: consensus prefetching queue(s) bound to topic [{}] for consumer group [{}] still exist, unbind before closing", + topicName, + brokerId); + } + closeAndRemoveConsensusPrefetchingQueues(topicName, false); + } Review Comment: Added a broker-level queue lifecycle lock. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
