Copilot commented on code in PR #17238:
URL: https://github.com/apache/iotdb/pull/17238#discussion_r2870247778
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java:
##########
@@ -109,89 +169,160 @@ public List<SubscriptionCommitContext> commit(
final List<SubscriptionCommitContext> commitContexts,
final boolean nack) {
final String consumerGroupId = consumerConfig.getConsumerGroupId();
- final SubscriptionBroker broker =
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
- if (Objects.isNull(broker)) {
+ final String consumerId = consumerConfig.getConsumerId();
+ final List<SubscriptionCommitContext> allSuccessful = new ArrayList<>();
+
+ final SubscriptionBroker pipeBroker =
consumerGroupIdToPipeBroker.get(consumerGroupId);
+ final ConsensusSubscriptionBroker consensusBroker =
+ consumerGroupIdToConsensusBroker.get(consumerGroupId);
+
+ if (Objects.isNull(pipeBroker) && Objects.isNull(consensusBroker)) {
final String errorMessage =
- String.format(
- "Subscription: broker bound to consumer group [%s] does not
exist", consumerGroupId);
+ String.format("Subscription: no broker bound to consumer group
[%s]", consumerGroupId);
LOGGER.warn(errorMessage);
throw new SubscriptionException(errorMessage);
}
- final String consumerId = consumerConfig.getConsumerId();
- return broker.commit(consumerId, commitContexts, nack);
+
+ // Partition commit contexts by which broker owns the topic.
+ final List<SubscriptionCommitContext> pipeContexts = new ArrayList<>();
+ final List<SubscriptionCommitContext> consensusContexts = new
ArrayList<>();
+ for (final SubscriptionCommitContext ctx : commitContexts) {
+ final String topicName = ctx.getTopicName();
+ if (Objects.nonNull(consensusBroker) &&
consensusBroker.hasQueue(topicName)) {
+ consensusContexts.add(ctx);
+ } else {
+ pipeContexts.add(ctx);
+ }
+ }
+
+ if (Objects.nonNull(pipeBroker) && !pipeContexts.isEmpty()) {
+ allSuccessful.addAll(pipeBroker.commit(consumerId, pipeContexts, nack));
+ }
+ if (Objects.nonNull(consensusBroker) && !consensusContexts.isEmpty()) {
+ allSuccessful.addAll(consensusBroker.commit(consumerId,
consensusContexts, nack));
+ }
+
+ return allSuccessful;
}
public boolean isCommitContextOutdated(final SubscriptionCommitContext
commitContext) {
final String consumerGroupId = commitContext.getConsumerGroupId();
- final SubscriptionBroker broker =
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
- if (Objects.isNull(broker)) {
+ final String topicName = commitContext.getTopicName();
+
+ // Try consensus broker first
+ final ConsensusSubscriptionBroker consensusBroker =
+ consumerGroupIdToConsensusBroker.get(consumerGroupId);
+ if (Objects.nonNull(consensusBroker) &&
consensusBroker.hasQueue(topicName)) {
+ return consensusBroker.isCommitContextOutdated(commitContext);
+ }
+
+ // Fall back to pipe broker
+ final SubscriptionBroker pipeBroker =
consumerGroupIdToPipeBroker.get(consumerGroupId);
+ if (Objects.isNull(pipeBroker)) {
return true;
}
- return broker.isCommitContextOutdated(commitContext);
+ return pipeBroker.isCommitContextOutdated(commitContext);
}
public List<String> fetchTopicNamesToUnsubscribe(
final ConsumerConfig consumerConfig, final Set<String> topicNames) {
final String consumerGroupId = consumerConfig.getConsumerGroupId();
- final SubscriptionBroker broker =
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
- if (Objects.isNull(broker)) {
+
+ // Consensus-based subscription topics are unbounded streams, so they do
not trigger
+ // auto-unsubscribe.
+ final ConsensusSubscriptionBroker consensusBroker =
+ consumerGroupIdToConsensusBroker.get(consumerGroupId);
+ final Set<String> pipeOnlyTopicNames;
+ if (Objects.nonNull(consensusBroker)) {
+ pipeOnlyTopicNames = new java.util.HashSet<>(topicNames);
+ pipeOnlyTopicNames.removeIf(consensusBroker::hasQueue);
+ } else {
+ pipeOnlyTopicNames = topicNames;
+ }
+
+ if (pipeOnlyTopicNames.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ final SubscriptionBroker pipeBroker =
consumerGroupIdToPipeBroker.get(consumerGroupId);
+ if (Objects.isNull(pipeBroker)) {
return Collections.emptyList();
}
- return broker.fetchTopicNamesToUnsubscribe(topicNames);
+ return pipeBroker.fetchTopicNamesToUnsubscribe(pipeOnlyTopicNames);
}
/////////////////////////////// broker ///////////////////////////////
public boolean isBrokerExist(final String consumerGroupId) {
- return consumerGroupIdToSubscriptionBroker.containsKey(consumerGroupId);
+ return consumerGroupIdToPipeBroker.containsKey(consumerGroupId)
+ || consumerGroupIdToConsensusBroker.containsKey(consumerGroupId);
}
public void createBrokerIfNotExist(final String consumerGroupId) {
- consumerGroupIdToSubscriptionBroker.computeIfAbsent(consumerGroupId,
SubscriptionBroker::new);
- LOGGER.info("Subscription: create broker bound to consumer group [{}]",
consumerGroupId);
+ consumerGroupIdToPipeBroker.computeIfAbsent(consumerGroupId,
SubscriptionBroker::new);
+ LOGGER.info("Subscription: create pipe broker bound to consumer group
[{}]", consumerGroupId);
}
/**
* @return {@code true} if drop broker success, {@code false} otherwise
*/
public boolean dropBroker(final String consumerGroupId) {
final AtomicBoolean dropped = new AtomicBoolean(false);
- consumerGroupIdToSubscriptionBroker.compute(
+
+ // Drop pipe broker
+ consumerGroupIdToPipeBroker.compute(
consumerGroupId,
(id, broker) -> {
if (Objects.isNull(broker)) {
+ dropped.set(true);
Review Comment:
`dropBroker` sets `dropped=true` when the pipe broker entry is already null.
This causes `dropBroker` to return true even when nothing was removed, and can
also mask failure to drop a non-empty consensus broker (pipe broker null =>
dropped=true, consensus broker kept). Only mark `dropped` true when an existing
broker entry is actually removed.
```suggestion
// No existing broker to drop
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusSubscriptionCommitManager.java:
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.broker.consensus;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Manages commit state for consensus-based subscriptions.
+ *
+ * <p>This manager tracks which events have been committed by consumers and
maps commit IDs back to
+ * WAL search indices. It maintains the progress for each (consumerGroup,
topic, region) triple and
+ * supports persistence and recovery.
+ *
+ * <p>Progress is tracked <b>per-region</b> because searchIndex is
region-local — each DataRegion
+ * has its own independent WAL with its own searchIndex namespace. Using a
single state per topic
+ * would cause TreeSet deduplication bugs when different regions emit the same
searchIndex value.
+ *
+ * <p>Key responsibilities:
+ *
+ * <ul>
+ * <li>Track the mapping from commitId to searchIndex
+ * <li>Handle commit/ack from consumers
+ * <li>Persist and recover progress state
+ * </ul>
+ */
+public class ConsensusSubscriptionCommitManager {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ConsensusSubscriptionCommitManager.class);
+
+ private static final String PROGRESS_FILE_PREFIX =
"consensus_subscription_progress_";
+ private static final String PROGRESS_FILE_SUFFIX = ".dat";
+
+ /** Key: "consumerGroupId_topicName_regionId" -> progress tracking state */
+ private final Map<String, ConsensusSubscriptionCommitState> commitStates =
+ new ConcurrentHashMap<>();
+
+ private final String persistDir;
+
+ private ConsensusSubscriptionCommitManager() {
+ this.persistDir =
+ IoTDBDescriptor.getInstance().getConfig().getSystemDir()
+ + File.separator
+ + "subscription"
+ + File.separator
+ + "consensus_progress";
+ final File dir = new File(persistDir);
+ if (!dir.exists()) {
+ dir.mkdirs();
+ }
+ }
+
+ /**
+ * Gets or creates the commit state for a specific (consumerGroup, topic,
region) triple.
+ *
+ * @param consumerGroupId the consumer group ID
+ * @param topicName the topic name
+ * @param regionId the consensus group / data region ID string
+ * @return the commit state
+ */
+ public ConsensusSubscriptionCommitState getOrCreateState(
+ final String consumerGroupId, final String topicName, final String
regionId) {
+ final String key = generateKey(consumerGroupId, topicName, regionId);
+ return commitStates.computeIfAbsent(
+ key,
+ k -> {
+ // Try to recover from persisted state
+ final ConsensusSubscriptionCommitState recovered = tryRecover(key);
+ if (recovered != null) {
+ return recovered;
+ }
+ return new ConsensusSubscriptionCommitState(new
SubscriptionConsensusProgress(0L, 0L));
+ });
+ }
+
+ /**
+ * Records commitId to searchIndex mapping for later commit handling.
+ *
+ * @param consumerGroupId the consumer group ID
+ * @param topicName the topic name
+ * @param regionId the consensus group / data region ID string
+ * @param commitId the assigned commit ID
+ * @param searchIndex the WAL search index corresponding to this event
+ */
+ public void recordCommitMapping(
+ final String consumerGroupId,
+ final String topicName,
+ final String regionId,
+ final long commitId,
+ final long searchIndex) {
+ final ConsensusSubscriptionCommitState state =
+ getOrCreateState(consumerGroupId, topicName, regionId);
+ state.recordMapping(commitId, searchIndex);
+ }
+
+ /**
+ * Handles commit (ack) for an event. Updates the progress and potentially
advances the committed
+ * search index.
+ *
+ * @param consumerGroupId the consumer group ID
+ * @param topicName the topic name
+ * @param regionId the consensus group / data region ID string
+ * @param commitId the committed event's commit ID
+ * @return true if commit handled successfully
+ */
+ public boolean commit(
+ final String consumerGroupId,
+ final String topicName,
+ final String regionId,
+ final long commitId) {
+ final String key = generateKey(consumerGroupId, topicName, regionId);
+ final ConsensusSubscriptionCommitState state = commitStates.get(key);
+ if (state == null) {
+ LOGGER.warn(
+ "ConsensusSubscriptionCommitManager: Cannot commit for unknown
state, "
+ + "consumerGroupId={}, topicName={}, regionId={}, commitId={}",
+ consumerGroupId,
+ topicName,
+ regionId,
+ commitId);
+ return false;
+ }
+ final boolean success = state.commit(commitId);
+ if (success) {
+ // Periodically persist progress
+ persistProgressIfNeeded(key, state);
+ }
+ return success;
+ }
+
+ /**
+ * Gets the current committed search index for a specific region's state.
+ *
+ * @param consumerGroupId the consumer group ID
+ * @param topicName the topic name
+ * @param regionId the consensus group / data region ID string
+ * @return the committed search index, or -1 if no state exists
+ */
+ public long getCommittedSearchIndex(
+ final String consumerGroupId, final String topicName, final String
regionId) {
+ final String key = generateKey(consumerGroupId, topicName, regionId);
+ final ConsensusSubscriptionCommitState state = commitStates.get(key);
+ if (state == null) {
+ return -1;
+ }
+ return state.getCommittedSearchIndex();
+ }
+
+ /**
+ * Removes state for a specific (consumerGroup, topic, region) triple.
+ *
+ * @param consumerGroupId the consumer group ID
+ * @param topicName the topic name
+ * @param regionId the consensus group / data region ID string
+ */
+ public void removeState(
+ final String consumerGroupId, final String topicName, final String
regionId) {
+ final String key = generateKey(consumerGroupId, topicName, regionId);
+ commitStates.remove(key);
+ // Clean up persisted file
+ final File file = getProgressFile(key);
+ if (file.exists()) {
+ file.delete();
+ }
+ }
+
+ /**
+ * Removes all states for a given (consumerGroup, topic) pair across all
regions. Used during
+ * subscription teardown when the individual regionIds may not be readily
available.
+ *
+ * @param consumerGroupId the consumer group ID
+ * @param topicName the topic name
+ */
+ public void removeAllStatesForTopic(final String consumerGroupId, final
String topicName) {
+ final String prefix = consumerGroupId + "_" + topicName + "_";
+ final Iterator<Map.Entry<String, ConsensusSubscriptionCommitState>> it =
+ commitStates.entrySet().iterator();
+ while (it.hasNext()) {
+ final Map.Entry<String, ConsensusSubscriptionCommitState> entry =
it.next();
+ if (entry.getKey().startsWith(prefix)) {
+ it.remove();
+ final File file = getProgressFile(entry.getKey());
+ if (file.exists()) {
+ file.delete();
+ }
+ }
+ }
+ }
+
+ /** Persists all states. Should be called during graceful shutdown. */
+ public void persistAll() {
+ for (final Map.Entry<String, ConsensusSubscriptionCommitState> entry :
+ commitStates.entrySet()) {
+ persistProgress(entry.getKey(), entry.getValue());
+ }
+ }
+
+ // ======================== Helper Methods ========================
+
+ private String generateKey(
+ final String consumerGroupId, final String topicName, final String
regionId) {
+ return consumerGroupId + "_" + topicName + "_" + regionId;
+ }
+
+ private File getProgressFile(final String key) {
+ return new File(persistDir, PROGRESS_FILE_PREFIX + key +
PROGRESS_FILE_SUFFIX);
+ }
+
+ private ConsensusSubscriptionCommitState tryRecover(final String key) {
+ final File file = getProgressFile(key);
+ if (!file.exists()) {
+ return null;
+ }
+ try (final FileInputStream fis = new FileInputStream(file)) {
+ final byte[] bytes = new byte[(int) file.length()];
+ fis.read(bytes);
Review Comment:
`tryRecover` reads the progress file with a single `fis.read(bytes)` call,
which is not guaranteed to fill the buffer. This can lead to partial reads and
corrupted/failed deserialization. Use a readFully loop /
`DataInputStream.readFully`, and validate that the expected number of bytes was
read before deserializing.
```suggestion
int offset = 0;
while (offset < bytes.length) {
final int bytesRead = fis.read(bytes, offset, bytes.length - offset);
if (bytesRead < 0) {
break;
}
offset += bytesRead;
}
if (offset != bytes.length) {
LOGGER.warn(
"Failed to fully read consensus subscription progress from {}:
expected {} bytes, read {} bytes",
file,
bytes.length,
offset);
return null;
}
```
##########
example/session/pom.xml:
##########
@@ -40,4 +40,17 @@
<version>${project.version}</version>
</dependency>
</dependencies>
+ <!-- TODO: remove below -->
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>11</source>
+ <target>11</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
Review Comment:
This module overrides the example parent’s compiler level (example/pom.xml
sets source/target=8) to Java 11. That can break building examples under the
project’s expected Java version and makes the module inconsistent with other
examples. Prefer inheriting the parent compiler settings (or updating the
parent if the whole examples tree should move to 11) instead of overriding in
this module.
```suggestion
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusSubscriptionSetupHandler.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.broker.consensus;
+
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixTreePattern;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.iot.IoTConsensus;
+import org.apache.iotdb.consensus.iot.IoTConsensusServerImpl;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
+import org.apache.iotdb.rpc.subscription.config.TopicConfig;
+import org.apache.iotdb.rpc.subscription.config.TopicConstant;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Handles the setup and teardown of consensus-based subscription queues on
DataNode. When a
+ * real-time subscription is detected, this handler finds the local
IoTConsensus data regions,
+ * creates the appropriate converter, and binds prefetching queues to the
subscription broker.
+ */
+public class ConsensusSubscriptionSetupHandler {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ConsensusSubscriptionSetupHandler.class);
+
+ private static final IoTDBConfig IOTDB_CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+
+ private ConsensusSubscriptionSetupHandler() {
+ // utility class
+ }
+
+ /**
+ * Ensures that the IoTConsensus new-peer callback is set, so that when a
new DataRegion is
+ * created, all active consensus subscriptions are automatically bound to
the new region.
+ */
+ public static void ensureNewRegionListenerRegistered() {
+ if (IoTConsensus.onNewPeerCreated != null) {
+ return;
+ }
+ IoTConsensus.onNewPeerCreated =
ConsensusSubscriptionSetupHandler::onNewRegionCreated;
+ LOGGER.info(
+ "Set IoTConsensus.onNewPeerCreated callback for consensus subscription
auto-binding");
+ }
+
+ /**
+ * Callback invoked when a new DataRegion (IoTConsensusServerImpl) is
created locally. Queries
+ * existing subscription metadata to find all active consensus subscriptions
and binds prefetching
+ * queues to the new region.
+ */
+ private static void onNewRegionCreated(
+ final ConsensusGroupId groupId, final IoTConsensusServerImpl serverImpl)
{
+ if (!(groupId instanceof DataRegionId)) {
+ return;
+ }
+
+ // Query existing metadata keepers for all active subscriptions
+ final Map<String, java.util.Set<String>> allSubscriptions =
+ SubscriptionAgent.consumer().getAllSubscriptions();
+ if (allSubscriptions.isEmpty()) {
+ return;
+ }
+
+ final ConsensusSubscriptionCommitManager commitManager =
+ ConsensusSubscriptionCommitManager.getInstance();
+ final long startSearchIndex = serverImpl.getSearchIndex() + 1;
+
+ LOGGER.info(
+ "New DataRegion {} created, checking {} consumer group(s) for
auto-binding, "
+ + "startSearchIndex={}",
+ groupId,
+ allSubscriptions.size(),
+ startSearchIndex);
+
+ for (final Map.Entry<String, java.util.Set<String>> groupEntry :
allSubscriptions.entrySet()) {
+ final String consumerGroupId = groupEntry.getKey();
+ for (final String topicName : groupEntry.getValue()) {
+ if (!isConsensusBasedTopic(topicName)) {
+ continue;
+ }
+ try {
+ final Map<String, TopicConfig> topicConfigs =
+
SubscriptionAgent.topic().getTopicConfigs(java.util.Collections.singleton(topicName));
+ final TopicConfig topicConfig = topicConfigs.get(topicName);
+ if (topicConfig == null) {
+ continue;
+ }
+
+ // Resolve the new DataRegion's actual database name
+ final DataRegion dataRegion =
+ StorageEngine.getInstance().getDataRegion((DataRegionId)
groupId);
+ if (dataRegion == null) {
+ continue;
+ }
+ final String dbRaw = dataRegion.getDatabaseName();
+ final String dbTableModel = dbRaw.startsWith("root.") ?
dbRaw.substring(5) : dbRaw;
+
+ // For table topics, skip if this region's database doesn't match
the topic filter
+ if (topicConfig.isTableTopic()) {
+ final String topicDb =
+ topicConfig.getStringOrDefault(
+ TopicConstant.DATABASE_KEY,
TopicConstant.DATABASE_DEFAULT_VALUE);
+ if (topicDb != null
+ && !topicDb.isEmpty()
+ && !TopicConstant.DATABASE_DEFAULT_VALUE.equals(topicDb)
+ && !topicDb.equalsIgnoreCase(dbTableModel)) {
+ continue;
+ }
+ }
+
+ final String actualDbName = topicConfig.isTableTopic() ?
dbTableModel : null;
+ final ConsensusLogToTabletConverter converter =
buildConverter(topicConfig, actualDbName);
+
+ LOGGER.info(
+ "Auto-binding consensus queue for topic [{}] in group [{}] to
new region {} (database={})",
+ topicName,
+ consumerGroupId,
+ groupId,
+ dbTableModel);
+
+ SubscriptionAgent.broker()
+ .bindConsensusPrefetchingQueue(
+ consumerGroupId,
+ topicName,
+ groupId.toString(),
+ serverImpl,
+ converter,
+ commitManager,
+ startSearchIndex);
+ } catch (final Exception e) {
+ LOGGER.error(
+ "Failed to auto-bind topic [{}] in group [{}] to new region {}",
+ topicName,
+ consumerGroupId,
+ groupId,
+ e);
+ }
+ }
+ }
+ }
+
+ public static boolean isConsensusBasedTopic(final String topicName) {
+ try {
+ final String topicMode =
SubscriptionAgent.topic().getTopicMode(topicName);
+ final String topicFormat =
SubscriptionAgent.topic().getTopicFormat(topicName);
+ final boolean result =
+ TopicConstant.MODE_LIVE_VALUE.equalsIgnoreCase(topicMode)
+ &&
!TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equalsIgnoreCase(topicFormat);
+ LOGGER.info(
+ "isConsensusBasedTopic check for topic [{}]: mode={}, format={},
result={}",
+ topicName,
+ topicMode,
+ topicFormat,
+ result);
+ return result;
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Failed to check if topic [{}] is consensus-based, defaulting to
false", topicName, e);
+ return false;
+ }
+ }
+
+ public static void setupConsensusSubscriptions(
+ final String consumerGroupId, final Set<String> topicNames) {
+ final IConsensus dataRegionConsensus =
DataRegionConsensusImpl.getInstance();
+ if (!(dataRegionConsensus instanceof IoTConsensus)) {
+ LOGGER.warn(
+ "Data region consensus is not IoTConsensus (actual: {}), "
+ + "cannot set up consensus-based subscription for consumer group
[{}]",
+ dataRegionConsensus.getClass().getSimpleName(),
+ consumerGroupId);
+ return;
+ }
+
+ // Ensure the new-region listener is registered (idempotent)
+ ensureNewRegionListenerRegistered();
+
+ final IoTConsensus ioTConsensus = (IoTConsensus) dataRegionConsensus;
+ final ConsensusSubscriptionCommitManager commitManager =
+ ConsensusSubscriptionCommitManager.getInstance();
+
+ LOGGER.info(
+ "Setting up consensus subscriptions for consumer group [{}],
topics={}, "
+ + "total consensus groups={}",
+ consumerGroupId,
+ topicNames,
+ ioTConsensus.getAllConsensusGroupIds().size());
+
+ for (final String topicName : topicNames) {
+ if (!isConsensusBasedTopic(topicName)) {
+ continue;
+ }
+
+ try {
+ setupConsensusQueueForTopic(consumerGroupId, topicName, ioTConsensus,
commitManager);
+ } catch (final Exception e) {
+ LOGGER.error(
+ "Failed to set up consensus subscription for topic [{}] in
consumer group [{}]",
+ topicName,
+ consumerGroupId,
+ e);
+ }
+ }
+ }
+
+ /**
+ * Set up consensus queue for a single topic. Discovers all local data
region consensus groups and
+ * binds a ConsensusReqReader-based prefetching queue to every matching
region.
+ *
+ * <p>For table-model topics, only regions whose database matches the
topic's {@code DATABASE_KEY}
+ * filter are bound. For tree-model topics, all local data regions are
bound. Additionally, the
+ * {@link #onNewRegionCreated} callback ensures that regions created after
this method runs are
+ * also automatically bound.
+ */
+ private static void setupConsensusQueueForTopic(
+ final String consumerGroupId,
+ final String topicName,
+ final IoTConsensus ioTConsensus,
+ final ConsensusSubscriptionCommitManager commitManager) {
+
+ // Get topic config for building the converter
+ final Map<String, TopicConfig> topicConfigs =
+
SubscriptionAgent.topic().getTopicConfigs(java.util.Collections.singleton(topicName));
+ final TopicConfig topicConfig = topicConfigs.get(topicName);
+ if (topicConfig == null) {
+ LOGGER.warn(
+ "Topic config not found for topic [{}], cannot set up consensus
queue", topicName);
+ return;
+ }
+
+ // Build the converter based on topic config (path pattern, time range,
tree/table model)
+ LOGGER.info(
+ "Setting up consensus queue for topic [{}]: isTableTopic={},
config={}",
+ topicName,
+ topicConfig.isTableTopic(),
+ topicConfig.getAttribute());
+
+ // For table topics, extract the database filter from topic config
+ final String topicDatabaseFilter =
+ topicConfig.isTableTopic()
+ ? topicConfig.getStringOrDefault(
+ TopicConstant.DATABASE_KEY,
TopicConstant.DATABASE_DEFAULT_VALUE)
+ : null;
+
+ final List<ConsensusGroupId> allGroupIds =
ioTConsensus.getAllConsensusGroupIds();
+ LOGGER.info(
+ "Discovered {} consensus group(s) for topic [{}] in consumer group
[{}]: {}",
+ allGroupIds.size(),
+ topicName,
+ consumerGroupId,
+ allGroupIds);
+ boolean bound = false;
+
+ for (final ConsensusGroupId groupId : allGroupIds) {
+ if (!(groupId instanceof DataRegionId)) {
+ continue;
+ }
+
+ final IoTConsensusServerImpl serverImpl = ioTConsensus.getImpl(groupId);
+ if (serverImpl == null) {
+ continue;
+ }
+
+ // Resolve the DataRegion's actual database name
+ final DataRegion dataRegion =
+ StorageEngine.getInstance().getDataRegion((DataRegionId) groupId);
+ if (dataRegion == null) {
+ continue;
+ }
+ final String dbRaw = dataRegion.getDatabaseName();
+ final String dbTableModel = dbRaw.startsWith("root.") ?
dbRaw.substring(5) : dbRaw;
+
+ if (topicDatabaseFilter != null
+ && !topicDatabaseFilter.isEmpty()
+ && !TopicConstant.DATABASE_DEFAULT_VALUE.equals(topicDatabaseFilter)
+ && !topicDatabaseFilter.equalsIgnoreCase(dbTableModel)) {
+ LOGGER.info(
+ "Skipping region {} (database={}) for table topic [{}]
(DATABASE_KEY={})",
+ groupId,
+ dbTableModel,
+ topicName,
+ topicDatabaseFilter);
+ continue;
+ }
+
+ final String actualDbName = topicConfig.isTableTopic() ? dbTableModel :
null;
+ final ConsensusLogToTabletConverter converter =
buildConverter(topicConfig, actualDbName);
+
+ final long startSearchIndex = serverImpl.getSearchIndex() + 1;
+
Review Comment:
`startSearchIndex` is always initialized as `serverImpl.getSearchIndex() +
1`, which ignores any persisted subscription progress in
`ConsensusSubscriptionCommitManager`. After a DataNode restart (or queue
rebind), this will skip unconsumed WAL entries and break the at-least-once /
recovery semantics described in the PR. The start index should be derived from
the recovered committed search index when state exists (e.g., `max(committed+1,
subscribeStart)` depending on desired semantics).
```suggestion
long startSearchIndex = serverImpl.getSearchIndex() + 1;
// If we have recovered a committed search index for this group,
advance the
// startSearchIndex to at least committed + 1 to honor at-least-once
semantics.
if (commitManager != null) {
try {
final long committedSearchIndex =
commitManager.getCommittedSearchIndex(groupId);
if (committedSearchIndex >= 0) {
startSearchIndex = Math.max(startSearchIndex,
committedSearchIndex + 1);
}
} catch (Exception e) {
LOGGER.warn(
"Failed to load committed search index for consensus group {}
when binding "
+ "subscription prefetching queue, fallback to
startSearchIndex={}",
groupId,
startSearchIndex,
e);
}
}
```
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java:
##########
@@ -30,7 +30,7 @@ public class SubscriptionConfig {
private static final CommonConfig COMMON_CONFIG =
CommonDescriptor.getInstance().getConfig();
public boolean getSubscriptionEnabled() {
- return false;
+ return true; // TODO: make it configurable after subscription is stable
Review Comment:
`getSubscriptionEnabled()` is now hard-coded to always return true, which
changes the default behavior globally (multiple code paths gate subscription
operations on this flag). This should remain configurable (or default to the
previous disabled state) to avoid unexpectedly enabling an unstable feature in
production deployments.
```suggestion
return false; // TODO: make it configurable or enable by default after
subscription is stable
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/ConsensusSubscriptionBroker.java:
##########
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.broker;
+
+import org.apache.iotdb.consensus.iot.IoTConsensusServerImpl;
+import
org.apache.iotdb.db.subscription.broker.consensus.ConsensusLogToTabletConverter;
+import
org.apache.iotdb.db.subscription.broker.consensus.ConsensusPrefetchingQueue;
+import
org.apache.iotdb.db.subscription.broker.consensus.ConsensusSubscriptionCommitManager;
+import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
+import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+/**
+ * Consensus-based subscription broker that reads data directly from
IoTConsensus WAL. Each instance
+ * manages consensus prefetching queues for a single consumer group.
+ */
+public class ConsensusSubscriptionBroker implements ISubscriptionBroker {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConsensusSubscriptionBroker.class);
+
+ private final String brokerId; // consumer group id
+
+ /** Maps topic name to a list of ConsensusPrefetchingQueues, one per data
region. */
+ private final Map<String, List<ConsensusPrefetchingQueue>>
topicNameToConsensusPrefetchingQueues;
+
+ /** Shared commit ID generators per topic. */
+ private final Map<String, AtomicLong> topicNameToCommitIdGenerator;
+
+ public ConsensusSubscriptionBroker(final String brokerId) {
+ this.brokerId = brokerId;
+ this.topicNameToConsensusPrefetchingQueues = new ConcurrentHashMap<>();
+ this.topicNameToCommitIdGenerator = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return topicNameToConsensusPrefetchingQueues.isEmpty();
+ }
+
+ @Override
+ public boolean hasQueue(final String topicName) {
+ final List<ConsensusPrefetchingQueue> queues =
+ topicNameToConsensusPrefetchingQueues.get(topicName);
+ return Objects.nonNull(queues)
+ && !queues.isEmpty()
+ && queues.stream().anyMatch(q -> !q.isClosed());
+ }
+
+ //////////////////////////// poll ////////////////////////////
+
+ @Override
+ public List<SubscriptionEvent> poll(
+ final String consumerId, final Set<String> topicNames, final long
maxBytes) {
+ LOGGER.debug(
+ "ConsensusSubscriptionBroker [{}]: poll called, consumerId={},
topicNames={}, "
+ + "queueCount={}, maxBytes={}",
+ brokerId,
+ consumerId,
+ topicNames,
+ topicNameToConsensusPrefetchingQueues.size(),
+ maxBytes);
+
+ final List<SubscriptionEvent> eventsToPoll = new ArrayList<>();
+ final List<SubscriptionEvent> eventsToNack = new ArrayList<>();
+ long totalSize = 0;
+
+ for (final String topicName : topicNames) {
+ final List<ConsensusPrefetchingQueue> queues =
+ topicNameToConsensusPrefetchingQueues.get(topicName);
+ if (Objects.isNull(queues) || queues.isEmpty()) {
+ continue;
+ }
+
+ // Poll from all region queues for this topic
+ for (final ConsensusPrefetchingQueue consensusQueue : queues) {
+ if (consensusQueue.isClosed()) {
+ continue;
+ }
+
+ final SubscriptionEvent event = consensusQueue.poll(consumerId);
+ if (Objects.isNull(event)) {
+ continue;
+ }
+
+ final long currentSize;
+ try {
+ currentSize = event.getCurrentResponseSize();
+ } catch (final IOException e) {
+ eventsToNack.add(event);
+ continue;
+ }
+
+ eventsToPoll.add(event);
+ totalSize += currentSize;
+
+ if (totalSize + currentSize > maxBytes) {
Review Comment:
The `maxBytes` limiting logic is incorrect: the event is added and
`totalSize` is incremented, then the code checks `if (totalSize + currentSize >
maxBytes)` which double-counts `currentSize` and can still return a batch
exceeding `maxBytes`. Check the size *before* adding the event (or adjust the
condition) and decide whether to stop/push back the event when it would exceed
the limit.
```suggestion
if (totalSize > maxBytes) {
```
##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java:
##########
@@ -879,10 +972,25 @@ void checkAndUpdateSafeDeletedSearchIndex() {
if (configuration.isEmpty()) {
logger.error(
"Configuration is empty, which is unexpected. Safe deleted search
index won't be updated this time.");
- } else if (configuration.size() == 1) {
+ return;
+ }
+
+ // Compute the minimum search index that subscription consumers still need.
+ // WAL entries at or after this index must be preserved.
+ long minSubscriptionIndex = Long.MAX_VALUE;
+ for (final LongSupplier supplier : subscriptionSyncIndexSuppliers) {
+ minSubscriptionIndex = Math.min(minSubscriptionIndex,
supplier.getAsLong());
+ }
+
+ if (configuration.size() == 1 && subscriptionSyncIndexSuppliers.isEmpty())
{
+ // Single replica, no subscription consumers => delete all WAL freely
consensusReqReader.setSafelyDeletedSearchIndex(Long.MAX_VALUE);
} else {
- consensusReqReader.setSafelyDeletedSearchIndex(getMinFlushedSyncIndex());
+ // min(replication progress, subscription progress) — preserve WAL for
both
+ final long replicationIndex =
+ configuration.size() > 1 ? getMinFlushedSyncIndex() : Long.MAX_VALUE;
+ consensusReqReader.setSafelyDeletedSearchIndex(
+ Math.min(replicationIndex, minSubscriptionIndex));
Review Comment:
`checkAndUpdateSafeDeletedSearchIndex()` now computes a subscription-aware
safe delete index, but other code paths (e.g., `LogDispatcher` periodically
calling `reader.setSafelyDeletedSearchIndex(impl.getMinFlushedSyncIndex())`)
can overwrite it with a replication-only value. That can allow WAL deletion
past the slowest subscription consumer, causing data loss. Consider
centralizing *all* updates to safelyDeletedSearchIndex through a single
subscription-aware method (or make `getMinFlushedSyncIndex()` incorporate
subscription progress).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]