VGalaxies commented on code in PR #17238:
URL: https://github.com/apache/iotdb/pull/17238#discussion_r3346562437
##########
iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java:
##########
@@ -526,6 +546,37 @@ void commit(final List<SubscriptionCommitContext>
subscriptionCommitContexts, fi
throw new SubscriptionConnectionException(e.getMessage(), e);
}
verifyPipeSubscribeSuccess(resp.status);
+ final PipeSubscribeCommitResp commitResp =
PipeSubscribeCommitResp.fromTPipeSubscribeResp(resp);
+ return new CommitResult(
+ commitResp.getAcceptedCommitContexts(),
commitResp.getCommittedProgressByTopic());
+ }
+
+ static final class CommitResult {
+
+ private final List<SubscriptionCommitContext> acceptedCommitContexts;
+
+ private final Map<String, TopicProgress> committedProgressByTopic;
+
+ private CommitResult(
+ final List<SubscriptionCommitContext> acceptedCommitContexts,
+ final Map<String, TopicProgress> committedProgressByTopic) {
+ this.acceptedCommitContexts =
+ acceptedCommitContexts == null ? Collections.emptyList() :
acceptedCommitContexts;
+ this.committedProgressByTopic =
+ committedProgressByTopic == null ? Collections.emptyMap() :
committedProgressByTopic;
+ }
+
+ static CommitResult empty() {
+ return new CommitResult(Collections.emptyList(), Collections.emptyMap());
+ }
Review Comment:
Done in 5ea428025e. CommitResult.empty() now returns a static reusable EMPTY
instance.
##########
iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties:
##########
@@ -54,6 +54,12 @@ dn_data_region_consensus_port=10760
schema_replication_factor=1
data_replication_factor=1
+####################
+### Subscription Consensus Configuration
+####################
+
+# subscription_consensus_idle_safe_hlc_interval_ms=10000
+
Review Comment:
Done in 5ea428025e. This property is no longer added to
iotdb-system.properties; it remains in the template instead.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java:
##########
@@ -238,42 +225,136 @@ public List<SubscriptionCommitContext> commit(
return allSuccessful;
}
- public void seek(
- final ConsumerConfig consumerConfig, final String topicName, final short
seekType) {
- final String consumerGroupId = consumerConfig.getConsumerGroupId();
+ public int refreshInFlightEventLeases(
+ final ConsumerConfig consumerConfig,
+ final List<SubscriptionCommitContext> processorBufferedCommitContexts) {
+ if (Objects.isNull(processorBufferedCommitContexts)
+ || processorBufferedCommitContexts.isEmpty()) {
+ return 0;
+ }
- final ConsensusSubscriptionBroker consensusBroker =
+ final String consumerGroupId = consumerConfig.getConsumerGroupId();
+ final String consumerId = consumerConfig.getConsumerId();
+ final ISubscriptionBroker pipeBroker =
consumerGroupIdToPipeBroker.get(consumerGroupId);
+ final ISubscriptionBroker consensusBroker =
consumerGroupIdToConsensusBroker.get(consumerGroupId);
- if (Objects.nonNull(consensusBroker) &&
consensusBroker.hasQueue(topicName)) {
- ensureConsensusSeekRuntimeAvailable(consumerGroupId, topicName, "seek");
- if (seekType != PipeSubscribeSeekReq.SEEK_TO_BEGINNING
- && seekType != PipeSubscribeSeekReq.SEEK_TO_END) {
- final String errorMessage =
- String.format(
- "Subscription: consensus seek only supports beginning/end or
topic progress, "
- + "consumerGroup=%s, topic=%s, seekType=%s",
- consumerGroupId, topicName, seekType);
- LOGGER.warn(errorMessage);
- throw new SubscriptionException(errorMessage);
+
+ final List<SubscriptionCommitContext> pipeContexts = new ArrayList<>();
+ final List<SubscriptionCommitContext> consensusContexts = new
ArrayList<>();
+ for (final SubscriptionCommitContext ctx :
processorBufferedCommitContexts) {
+ if (Objects.isNull(ctx) || Objects.isNull(ctx.getTopicName())) {
+ continue;
}
- consensusBroker.seek(topicName, seekType);
+ if (Objects.nonNull(consensusBroker)
+ &&
ConsensusSubscriptionSetupHandler.isConsensusBasedTopic(ctx.getTopicName())) {
+ consensusContexts.add(ctx);
+ } else {
+ pipeContexts.add(ctx);
+ }
+ }
+
+ int refreshedCount = 0;
+ if (Objects.nonNull(pipeBroker) && !pipeContexts.isEmpty()) {
+ refreshedCount += pipeBroker.refreshInFlightEventLeases(consumerId,
pipeContexts);
+ }
+ if (Objects.nonNull(consensusBroker) && !consensusContexts.isEmpty()) {
+ refreshedCount += consensusBroker.refreshInFlightEventLeases(consumerId,
consensusContexts);
+ }
+ return refreshedCount;
+ }
+
+ public Map<String, TopicProgress> getConsensusCommittedProgressByTopic(
+ final ConsumerConfig consumerConfig,
+ final List<SubscriptionCommitContext> acceptedCommitContexts,
+ final boolean nack) {
+ if (nack || acceptedCommitContexts.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ final Map<String, Set<String>> regionIdsByTopic = new LinkedHashMap<>();
+ for (final SubscriptionCommitContext commitContext :
acceptedCommitContexts) {
+ if (Objects.isNull(commitContext) ||
Objects.isNull(commitContext.getTopicName())) {
+ continue;
+ }
+ final String topicName = commitContext.getTopicName();
+ if (!ConsensusSubscriptionSetupHandler.isConsensusBasedTopic(topicName))
{
+ continue;
+ }
+ final String regionId = commitContext.getRegionId();
+ if (Objects.isNull(regionId) || regionId.isEmpty()) {
+ continue;
+ }
+ regionIdsByTopic.computeIfAbsent(topicName, ignored -> new
LinkedHashSet<>()).add(regionId);
+ }
+
+ final String consumerGroupId = consumerConfig.getConsumerGroupId();
+ final Map<String, TopicProgress> result = new LinkedHashMap<>();
+ for (final Map.Entry<String, Set<String>> entry :
regionIdsByTopic.entrySet()) {
+ final String topicName = entry.getKey();
+ final Map<String, RegionProgress> regionProgressById = new
LinkedHashMap<>();
+ for (final String regionId : entry.getValue()) {
+ putRegionProgress(
+ regionProgressById,
+ regionId,
+ getConsensusCommittedRegionProgress(consumerGroupId, topicName,
regionId));
+ }
+ if (!regionProgressById.isEmpty()) {
+ result.put(topicName, new TopicProgress(regionProgressById));
+ }
+ }
+ return result;
+ }
+
+ private RegionProgress getConsensusCommittedRegionProgress(
+ final String consumerGroupId, final String topicName, final String
regionId) {
+ try {
+ return ConsensusSubscriptionCommitManager.getInstance()
+ .getCommittedRegionProgress(
+ consumerGroupId, topicName,
ConsensusGroupId.Factory.createFromString(regionId));
+ } catch (final IllegalArgumentException e) {
+ LOGGER.warn(
+ "Subscription: failed to parse consensus region id {} for committed
progress, topic={}, consumerGroup={}",
+ regionId,
+ topicName,
+ consumerGroupId,
+ e);
+ return null;
+ }
+ }
+
+ private void putRegionProgress(
+ final Map<String, RegionProgress> regionProgressById,
+ final String regionId,
+ final RegionProgress regionProgress) {
+ if (Objects.isNull(regionId)
+ || regionId.isEmpty()
+ || Objects.isNull(regionProgress)
+ || regionProgress.getWriterPositions().isEmpty()) {
return;
}
+ regionProgressById.put(regionId, regionProgress);
+ }
- if (isConsensusRuntimeUnsupported(topicName)) {
+ public void seek(
+ final ConsumerConfig consumerConfig, final String topicName, final short
seekType) {
+ final String consumerGroupId = consumerConfig.getConsumerGroupId();
+
+ if (seekType != SubscriptionSeekReq.SEEK_TO_BEGINNING
+ && seekType != SubscriptionSeekReq.SEEK_TO_END) {
final String errorMessage =
- buildUnsupportedConsensusRuntimeMessage(consumerGroupId, topicName,
"seek");
+ String.format(
+ "Subscription: consensus seek only supports beginning/end or
topic progress, "
+ + "consumerGroup=%s, topic=%s, seekType=%s",
+ consumerGroupId, topicName, seekType);
LOGGER.warn(errorMessage);
throw new SubscriptionException(errorMessage);
}
- final String errorMessage =
- String.format(
- "Subscription: seek is only supported for consensus-based
subscriptions, "
- + "consumerGroup=%s, topic=%s",
- consumerGroupId, topicName);
- LOGGER.warn(errorMessage);
- throw new SubscriptionException(errorMessage);
+ final ConsensusSubscriptionBroker consensusBroker =
+ getConsensusBrokerForSeekOrNoOp(consumerGroupId, topicName, "seek");
+ if (Objects.nonNull(consensusBroker)) {
+ consensusBroker.seek(topicName, seekType);
+ }
}
Review Comment:
Done in aa6810e062. ISubscriptionBroker now exposes
acceptsTopic()/selectAcceptedCommitContexts(), so the agent delegates
accept/reject routing to each broker instead of hard-coding broker type
decisions in commit/lease paths.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java:
##########
@@ -507,16 +592,13 @@ public boolean dropBroker(final String consumerGroupId) {
public void bindPrefetchingQueue(final SubscriptionSinkSubtask subtask) {
final String consumerGroupId = subtask.getConsumerGroupId();
consumerGroupIdToPipeBroker
- .compute(
+ .computeIfAbsent(
consumerGroupId,
- (id, broker) -> {
- if (Objects.isNull(broker)) {
- LOGGER.info(
- "Subscription: pipe broker bound to consumer group [{}]
does not exist, create new for binding prefetching queue",
- consumerGroupId);
- return new SubscriptionBroker(consumerGroupId);
- }
- return broker;
+ id -> {
+ LOGGER.info(
+ "Subscription: pipe broker bound to consumer group [{}] does
not exist, create new for binding prefetching queue",
+ consumerGroupId);
Review Comment:
Done in 5ea428025e. The create/drop/non-empty broker log messages now use
DataNodeMiscMessages en/zh constants.
##########
iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/ColumnAlignProcessor.java:
##########
@@ -63,39 +70,72 @@ public List<SubscriptionMessage> flush() {
return Collections.emptyList();
}
- private void fillTablet(final Tablet tablet) {
- final String deviceKey = getDeviceKey(tablet);
- final Map<String, Object> cache = lastValues.computeIfAbsent(deviceKey, k
-> new HashMap<>());
+ @Override
+ public void reset() {
+ lastValues.clear();
+ }
+
+ @Override
+ public boolean supportsTopicScopedReset() {
+ return true;
+ }
+
+ @Override
+ public void reset(final String topicName) {
+ lastValues.remove(topicName);
+ }
+
+ private void fillTablet(final String topicName, final Tablet tablet) {
+ final Map<String, Map<String, Pair<Long, Object>>> topicCache =
+ lastValues.computeIfAbsent(topicName, ignored -> new HashMap<>());
final Object[] values = tablet.getValues();
final BitMap[] bitMaps = tablet.getBitMaps();
final int rowSize = tablet.getRowSize();
final int columnCount = values.length;
for (int row = 0; row < rowSize; row++) {
+ final long timestamp = tablet.getTimestamp(row);
+ final String deviceKey = getDeviceKey(tablet, row);
+ if (deviceKey == null) {
+ continue;
+ }
+ final Map<String, Pair<Long, Object>> cache =
+ topicCache.computeIfAbsent(deviceKey, ignored -> new HashMap<>());
for (int col = 0; col < columnCount; col++) {
final String columnKey = getColumnKey(tablet, col);
final boolean isNull =
bitMaps != null && bitMaps[col] != null &&
bitMaps[col].isMarked(row);
if (isNull) {
- // try forward-fill from cache
- final Object cached = cache.get(columnKey);
- if (cached != null) {
- setValueAt(values[col], row, cached);
+ final Pair<Long, Object> cached = cache.get(columnKey);
+ if (cached != null && cached.left < timestamp) {
+ setValueAt(values[col], row, cached.right);
bitMaps[col].unmark(row);
}
} else {
- // update cache with this non-null value
- cache.put(columnKey, getValueAt(values[col], row));
+ final Pair<Long, Object> cached = cache.get(columnKey);
+ if (cached == null || timestamp >= cached.left) {
+ cache.put(columnKey, new Pair<>(timestamp, getValueAt(values[col],
row)));
+ }
Review Comment:
Done in 5ea428025e. The cached Pair is now updated in place when an existing
cache entry is reused.
##########
iotdb-client/subscription/src/test/java/org/apache/iotdb/session/subscription/consumer/base/ColumnAlignProcessorTest.java:
##########
@@ -80,6 +82,73 @@ public void testNewColumnDoesNotReuseOldIndexCache() {
Assert.assertTrue(schemaChangedTablet.getBitMaps()[1].isMarked(0));
}
+ @Test
+ public void testTableModelForwardFillDoesNotCrossDevices() {
+ final ColumnAlignProcessor processor = new ColumnAlignProcessor();
+
+ processor.process(
+ Collections.singletonList(
+ messageForTablet(
+ tableModelTablet(new long[] {1L}, new String[] {"id1"}, new
Long[] {11L}, false))));
+
+ final Tablet differentDeviceTablet =
+ tableModelTablet(new long[] {2L}, new String[] {"id2"}, new Long[]
{0L}, true);
+
+
processor.process(Collections.singletonList(messageForTablet(differentDeviceTablet)));
+
+ Assert.assertTrue(differentDeviceTablet.getBitMaps()[1].isMarked(0));
+
+ final Tablet sameDeviceTablet =
+ tableModelTablet(new long[] {3L}, new String[] {"id1"}, new Long[]
{0L}, true);
+
+
processor.process(Collections.singletonList(messageForTablet(sameDeviceTablet)));
+
+ Assert.assertEquals(11L, ((long[]) sameDeviceTablet.getValues()[1])[0]);
+ Assert.assertFalse(sameDeviceTablet.getBitMaps()[1].isMarked(0));
+ }
+
+ @Test
+ public void testForwardFillOnlyUsesEarlierTimestamp() {
+ final ColumnAlignProcessor processor = new ColumnAlignProcessor();
+
+ processor.process(
+ Collections.singletonList(
+ messageForTablet(tabletWithSingleRow(new String[] {"s1"}, new
long[] {100L}, 10L))));
+
+ final Tablet olderNullTablet =
+ tabletWithSingleRow(new String[] {"s1"}, new long[] {0L}, 5L, true);
+
+
processor.process(Collections.singletonList(messageForTablet(olderNullTablet)));
+
+ Assert.assertTrue(olderNullTablet.getBitMaps()[0].isMarked(0));
+
+ processor.process(
+ Collections.singletonList(
+ messageForTablet(tabletWithSingleRow(new String[] {"s1"}, new
long[] {50L}, 6L))));
+
+ final Tablet newerNullTablet =
+ tabletWithSingleRow(new String[] {"s1"}, new long[] {0L}, 11L, true);
+
+
processor.process(Collections.singletonList(messageForTablet(newerNullTablet)));
+
+ Assert.assertEquals(100L, ((long[]) newerNullTablet.getValues()[0])[0]);
+ Assert.assertFalse(newerNullTablet.getBitMaps()[0].isMarked(0));
+ }
+
+ @Test
+ public void testTableModelWithoutDeviceIdentityDoesNotForwardFill() {
+ final ColumnAlignProcessor processor = new ColumnAlignProcessor();
+
+ processor.process(
+
Collections.singletonList(messageForTablet(fieldOnlyTableModelTablet(1L, 11L,
false))));
+
+ final Tablet ambiguousTablet = fieldOnlyTableModelTablet(2L, 0L, true);
+
+
processor.process(Collections.singletonList(messageForTablet(ambiguousTablet)));
+
+ Assert.assertTrue(ambiguousTablet.getBitMaps()[0].isMarked(0));
+ }
Review Comment:
Done in 5ea428025e. Table-mode tests now cover both no-tag and all-tag-null
devices, and those rows are forward-filled as the same specific device.
##########
iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/ColumnAlignProcessor.java:
##########
@@ -63,39 +70,72 @@ public List<SubscriptionMessage> flush() {
return Collections.emptyList();
}
- private void fillTablet(final Tablet tablet) {
- final String deviceKey = getDeviceKey(tablet);
- final Map<String, Object> cache = lastValues.computeIfAbsent(deviceKey, k
-> new HashMap<>());
+ @Override
+ public void reset() {
+ lastValues.clear();
+ }
+
+ @Override
+ public boolean supportsTopicScopedReset() {
+ return true;
+ }
+
+ @Override
+ public void reset(final String topicName) {
+ lastValues.remove(topicName);
+ }
+
+ private void fillTablet(final String topicName, final Tablet tablet) {
+ final Map<String, Map<String, Pair<Long, Object>>> topicCache =
+ lastValues.computeIfAbsent(topicName, ignored -> new HashMap<>());
final Object[] values = tablet.getValues();
final BitMap[] bitMaps = tablet.getBitMaps();
final int rowSize = tablet.getRowSize();
final int columnCount = values.length;
for (int row = 0; row < rowSize; row++) {
+ final long timestamp = tablet.getTimestamp(row);
+ final String deviceKey = getDeviceKey(tablet, row);
+ if (deviceKey == null) {
+ continue;
+ }
+ final Map<String, Pair<Long, Object>> cache =
+ topicCache.computeIfAbsent(deviceKey, ignored -> new HashMap<>());
for (int col = 0; col < columnCount; col++) {
final String columnKey = getColumnKey(tablet, col);
final boolean isNull =
bitMaps != null && bitMaps[col] != null &&
bitMaps[col].isMarked(row);
if (isNull) {
- // try forward-fill from cache
- final Object cached = cache.get(columnKey);
- if (cached != null) {
- setValueAt(values[col], row, cached);
+ final Pair<Long, Object> cached = cache.get(columnKey);
+ if (cached != null && cached.left < timestamp) {
+ setValueAt(values[col], row, cached.right);
bitMaps[col].unmark(row);
}
} else {
- // update cache with this non-null value
- cache.put(columnKey, getValueAt(values[col], row));
+ final Pair<Long, Object> cached = cache.get(columnKey);
+ if (cached == null || timestamp >= cached.left) {
+ cache.put(columnKey, new Pair<>(timestamp, getValueAt(values[col],
row)));
+ }
}
}
}
}
- private static String getDeviceKey(final Tablet tablet) {
- // tree model uses deviceId; table model uses tableName
+ private static String getDeviceKey(final Tablet tablet, final int row) {
+ if (hasTableDeviceIdentity(tablet)) {
+ return "table:" + tablet.getDeviceID(row);
+ }
+
final String deviceId = tablet.getDeviceId();
- return deviceId != null ? deviceId : tablet.getTableName();
+ return deviceId != null && deviceId.startsWith(TREE_MODEL_DEVICE_PREFIX)
+ ? "tree:" + deviceId
+ : null;
+ }
+
+ private static boolean hasTableDeviceIdentity(final Tablet tablet) {
+ final List<ColumnCategory> columnTypes = tablet.getColumnTypes();
+ return columnTypes != null && columnTypes.stream().anyMatch(type -> type
== ColumnCategory.TAG);
Review Comment:
Done in 5ea428025e. ColumnAlignProcessor now takes the dialect in the
constructor, removed tag-column auto detection, and table mode keys the cache
by IDeviceID.
##########
iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumer.java:
##########
@@ -279,6 +303,139 @@ protected AbstractSubscriptionPullConsumer addProcessor(
return this;
}
+ @Override
+ List<SubscriptionCommitContext> getProcessorBufferedCommitContexts(final int
dataNodeId) {
+ if (processorBufferedCommitContexts.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ final List<SubscriptionCommitContext> result = new ArrayList<>();
+ for (final SubscriptionCommitContext commitContext :
processorBufferedCommitContexts) {
+ if (Objects.nonNull(commitContext) && commitContext.getDataNodeId() ==
dataNodeId) {
+ result.add(commitContext);
+ }
+ }
+ return result;
+ }
+
+ private void refreshProcessorBufferedCommitContexts() {
+ processorBufferedCommitContexts.clear();
+ for (final SubscriptionMessageProcessor processor : processors) {
+ final List<SubscriptionCommitContext> bufferedCommitContexts =
+ processor.getBufferedCommitContexts();
+ if (Objects.isNull(bufferedCommitContexts)) {
+ continue;
+ }
+ for (final SubscriptionCommitContext commitContext :
bufferedCommitContexts) {
+ if (Objects.nonNull(commitContext) && commitContext.isCommittable()) {
+ processorBufferedCommitContexts.add(commitContext);
+ }
+ }
+ }
+ }
+
+ private List<SubscriptionMessage> drainProcessorPipeline() {
+ List<SubscriptionMessage> drainedMessages = Collections.emptyList();
+ for (final SubscriptionMessageProcessor processor : processors) {
+ if (!drainedMessages.isEmpty()) {
+ drainedMessages = processor.process(drainedMessages);
+ if (Objects.isNull(drainedMessages)) {
+ drainedMessages = Collections.emptyList();
+ }
+ }
+
+ final List<SubscriptionMessage> flushedMessages = processor.flush();
+ if (Objects.nonNull(flushedMessages) && !flushedMessages.isEmpty()) {
+ if (drainedMessages.isEmpty()) {
+ drainedMessages = new ArrayList<>(flushedMessages);
+ } else {
+ drainedMessages = new ArrayList<>(drainedMessages);
+ drainedMessages.addAll(flushedMessages);
+ }
+ }
+ }
Review Comment:
Done in 5ea428025e. The flush() contract now documents ownership transfer,
and drainProcessorPipeline() avoids allocating a new base list unless it really
needs to merge non-empty flush results.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusSubscriptionCommitManager.java:
##########
@@ -146,16 +165,20 @@ private ConsensusSubscriptionCommitManager() {
*/
public ConsensusSubscriptionCommitState getOrCreateState(
final String consumerGroupId, final String topicName, final
ConsensusGroupId regionId) {
- final String key = generateKey(consumerGroupId, topicName, regionId);
final String regionIdString = regionId.toString();
+ final CommitStateKey stateKey = getCommitStateKey(consumerGroupId,
topicName, regionIdString);
+ final ConsensusSubscriptionCommitState existing =
commitStates.get(stateKey.stateKey);
Review Comment:
Done in 5ea428025e. Renamed CommitStateKey.stateKey to encodedStateKey.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusSubscriptionCommitManager.java:
##########
@@ -110,17 +122,24 @@ public class ConsensusSubscriptionCommitManager {
/** Single-threaded executor for fire-and-forget broadcasts. */
private final ExecutorService broadcastExecutor =
- Executors.newSingleThreadExecutor(
- r -> {
- final Thread t = new Thread(r, "SubscriptionProgressBroadcast");
- t.setDaemon(true);
- return t;
- });
+ IoTDBThreadPoolFactory.newSingleThreadExecutor(
+ ThreadName.SUBSCRIPTION_CONSENSUS_PROGRESS_BROADCASTER.getName());
/** Key: "consumerGroupId##topicName##regionId" -> progress tracking state */
private final Map<String, ConsensusSubscriptionCommitState> commitStates =
new ConcurrentHashMap<>();
+ private final Map<String, CommitStateKey> commitStateKeys = new
ConcurrentHashMap<>();
+
+ private final Set<String> recoveredTopicKeys = ConcurrentHashMap.newKeySet();
+
+ private final Map<String, Object> topicPersistLocks = new
ConcurrentHashMap<>();
+
+ // Runtime random-write index. The on-disk .meta file is the only source of
truth; this map is
+ // rebuilt from .meta during recovery and is never persisted separately.
+ private final Map<String, Map<String, ProgressIndexEntry>>
topicProgressIndexes =
+ new ConcurrentHashMap<>();
+
Review Comment:
Done in 5ea428025e. Added comments explaining commitStateKeys and the
outer/inner keys of topicProgressIndexes.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java:
##########
@@ -238,42 +225,136 @@ public List<SubscriptionCommitContext> commit(
return allSuccessful;
}
- public void seek(
- final ConsumerConfig consumerConfig, final String topicName, final short
seekType) {
- final String consumerGroupId = consumerConfig.getConsumerGroupId();
+ public int refreshInFlightEventLeases(
+ final ConsumerConfig consumerConfig,
+ final List<SubscriptionCommitContext> processorBufferedCommitContexts) {
+ if (Objects.isNull(processorBufferedCommitContexts)
+ || processorBufferedCommitContexts.isEmpty()) {
+ return 0;
+ }
- final ConsensusSubscriptionBroker consensusBroker =
+ final String consumerGroupId = consumerConfig.getConsumerGroupId();
+ final String consumerId = consumerConfig.getConsumerId();
+ final ISubscriptionBroker pipeBroker =
consumerGroupIdToPipeBroker.get(consumerGroupId);
+ final ISubscriptionBroker consensusBroker =
consumerGroupIdToConsensusBroker.get(consumerGroupId);
Review Comment:
Done in aa6810e062. SubscriptionBrokerAgent now stores brokers in one
consumerGroupId -> broker list map instead of separate pipe/consensus maps.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusPrefetchingQueue.java:
##########
@@ -1129,7 +1125,40 @@ public SubscriptionEvent pollTablets(
"ConsensusPrefetchingQueue %s: no in-flight event for consumer
%s, commit context %s",
this, consumerId, commitContext));
}
- return event;
+ if (Objects.isNull(event.getCurrentResponse())
+ || event.getCurrentResponse().getResponseType()
+ != SubscriptionPollResponseType.TABLETS.getType()
+ || !(event.getCurrentResponse().getPayload() instanceof
TabletsPayload)) {
+ return generateErrorResponse(
+ String.format(
+ "ConsensusPrefetchingQueue %s: unexpected in-flight response
for consumer %s, "
+ + "commit context %s, offset %s",
Review Comment:
Done in 5ea428025e. The unexpected in-flight consensus response message now
uses a DataNodeMiscMessages i18n constant.
##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java:
##########
Review Comment:
Done in 5ea428025e. The hard-coded consensus group message now uses the
existing i18n message constant.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java:
##########
@@ -397,22 +473,32 @@ private String buildUnsupportedConsensusRuntimeMessage(
}
public boolean isCommitContextOutdated(final SubscriptionCommitContext
commitContext) {
- final String consumerGroupId = commitContext.getConsumerGroupId();
- final String topicName = commitContext.getTopicName();
+ final ISubscriptionBroker broker =
+ getBrokerForTopic(commitContext.getConsumerGroupId(),
commitContext.getTopicName());
+ return Objects.isNull(broker) ||
broker.isCommitContextOutdated(commitContext);
+ }
- // Try consensus broker first
+ private ISubscriptionBroker getBrokerForTopic(
+ final String consumerGroupId, final String topicName) {
final ConsensusSubscriptionBroker consensusBroker =
consumerGroupIdToConsensusBroker.get(consumerGroupId);
if (Objects.nonNull(consensusBroker) &&
consensusBroker.hasQueue(topicName)) {
- return consensusBroker.isCommitContextOutdated(commitContext);
+ return consensusBroker;
}
+ return consumerGroupIdToPipeBroker.get(consumerGroupId);
+ }
- // Fall back to pipe broker
- final SubscriptionBroker pipeBroker =
consumerGroupIdToPipeBroker.get(consumerGroupId);
- if (Objects.isNull(pipeBroker)) {
- return true;
+ private ISubscriptionBroker getBrokerForTopicOrThrow(
+ final String consumerGroupId, final String topicName) {
+ final ISubscriptionBroker broker = getBrokerForTopic(consumerGroupId,
topicName);
+ if (Objects.nonNull(broker)) {
+ return broker;
}
- return pipeBroker.isCommitContextOutdated(commitContext);
+ final String errorMessage =
+ String.format(
+ "Subscription: broker bound to consumer group [%s] does not
exist", consumerGroupId);
Review Comment:
Done in 5ea428025e. The broker messages in this area were moved to
DataNodeMiscMessages with both en/zh entries.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusSubscriptionCommitManager.java:
##########
@@ -529,33 +570,210 @@ public void receiveProgressBroadcast(
// ======================== Helper Methods ========================
- // Use a separator that cannot appear in consumerGroupId, topicName, or
regionId
- // to prevent key collisions (e.g., "a_b" + "c" vs "a" + "b_c").
+ // Kept as the in-memory and ConfigNode sync key separator for the existing
progress protocol.
private static final String KEY_SEPARATOR = "##";
private String generateKey(
final String consumerGroupId, final String topicName, final
ConsensusGroupId regionId) {
- return consumerGroupId + KEY_SEPARATOR + topicName + KEY_SEPARATOR +
regionId.toString();
+ return generateKey(consumerGroupId, topicName, regionId.toString());
}
- private File getProgressFile(final String key) {
- return new File(persistDir, PROGRESS_FILE_PREFIX + key +
PROGRESS_FILE_SUFFIX);
+ private String generateKey(
+ final String consumerGroupId, final String topicName, final String
regionIdStr) {
+ return consumerGroupId + KEY_SEPARATOR + topicName + KEY_SEPARATOR +
regionIdStr;
}
- private ConsensusSubscriptionCommitState tryRecover(final String key, final
String regionIdStr) {
- final File file = getProgressFile(key);
- if (!file.exists()) {
+ private CommitStateKey getCommitStateKey(
+ final String consumerGroupId, final String topicName, final String
regionIdStr) {
+ final String stateKey = generateKey(consumerGroupId, topicName,
regionIdStr);
+ return commitStateKeys.computeIfAbsent(
+ stateKey,
+ ignored ->
+ new CommitStateKey(
+ consumerGroupId,
+ topicName,
+ regionIdStr,
+ stateKey,
+ generateTopicFileKey(consumerGroupId, topicName)));
+ }
+
+ private String generateTopicFileKey(final String consumerGroupId, final
String topicName) {
+ return encodeFileNameComponent(consumerGroupId)
+ + KEY_SEPARATOR
+ + encodeFileNameComponent(topicName);
+ }
+
+ private static String encodeFileNameComponent(final String value) {
+ return Base64.getUrlEncoder()
+ .withoutPadding()
+
.encodeToString(String.valueOf(value).getBytes(StandardCharsets.UTF_8));
+ }
+
+ private static String decodeFileNameComponent(final String value) {
+ switch (value.length() & 3) {
+ case 0:
+ return new String(Base64.getUrlDecoder().decode(value),
StandardCharsets.UTF_8);
+ case 2:
+ return new String(Base64.getUrlDecoder().decode(value + "=="),
StandardCharsets.UTF_8);
+ case 3:
+ return new String(Base64.getUrlDecoder().decode(value + "="),
StandardCharsets.UTF_8);
+ default:
+ throw new IllegalArgumentException("Invalid base64 url component
length");
+ }
+ }
+
+ private static String[] decodeTopicFileKey(final String topicFileKey) {
+ final int separatorIndex = topicFileKey.indexOf(KEY_SEPARATOR);
+ if (separatorIndex < 0) {
return null;
}
- try (final FileInputStream fis = new FileInputStream(file)) {
- final byte[] bytes = new byte[(int) file.length()];
- fis.read(bytes);
- final ByteBuffer buffer = ByteBuffer.wrap(bytes);
- return ConsensusSubscriptionCommitState.deserialize(regionIdStr, buffer);
- } catch (final IOException e) {
- LOGGER.warn("Failed to recover consensus subscription progress from {}",
file, e);
+ try {
+ return new String[] {
+ decodeFileNameComponent(topicFileKey.substring(0, separatorIndex)),
+ decodeFileNameComponent(topicFileKey.substring(separatorIndex +
KEY_SEPARATOR.length()))
+ };
+ } catch (final IllegalArgumentException e) {
+ return null;
+ }
+ }
+
+ private Object getTopicPersistLock(final String topicFileKey) {
+ return topicPersistLocks.computeIfAbsent(topicFileKey, ignored -> new
Object());
+ }
+
+ private File getMetaFile(final String topicFileKey) {
+ return new File(persistDir, PROGRESS_FILE_PREFIX + topicFileKey +
PROGRESS_META_FILE_SUFFIX);
+ }
+
+ private File getLegacyIndexFile(final String topicFileKey) {
+ return new File(
+ persistDir, PROGRESS_FILE_PREFIX + topicFileKey +
LEGACY_PROGRESS_INDEX_FILE_SUFFIX);
+ }
+
+ private void recoverAllTopicStatesIfNeeded() {
+ final File dir = new File(persistDir);
+ final File[] metaFiles =
+ dir.listFiles(
+ (ignored, name) ->
+ name.startsWith(PROGRESS_FILE_PREFIX) &&
name.endsWith(PROGRESS_META_FILE_SUFFIX));
+ if (Objects.isNull(metaFiles)) {
+ return;
+ }
+ for (final File metaFile : metaFiles) {
+ final String topicFileKey = extractTopicFileKey(metaFile.getName());
+ if (Objects.isNull(topicFileKey) ||
recoveredTopicKeys.contains(topicFileKey)) {
+ continue;
+ }
+ final String[] decodedTopicKey = decodeTopicFileKey(topicFileKey);
+ if (Objects.isNull(decodedTopicKey)) {
+ LOGGER.warn(
+ "Skip malformed consensus subscription progress file name {}",
metaFile.getName());
+ continue;
+ }
+ recoverTopicStatesIfNeeded(topicFileKey, decodedTopicKey[0],
decodedTopicKey[1]);
+ }
+ }
+
+ private String extractTopicFileKey(final String metaFileName) {
+ if (!metaFileName.startsWith(PROGRESS_FILE_PREFIX)
+ || !metaFileName.endsWith(PROGRESS_META_FILE_SUFFIX)) {
return null;
}
+ return metaFileName.substring(
+ PROGRESS_FILE_PREFIX.length(), metaFileName.length() -
PROGRESS_META_FILE_SUFFIX.length());
+ }
+
+ private void recoverTopicStatesIfNeeded(final CommitStateKey stateKey) {
+ recoverTopicStatesIfNeeded(stateKey.topicFileKey,
stateKey.consumerGroupId, stateKey.topicName);
+ }
+
+ private void recoverTopicStatesIfNeeded(
+ final String topicFileKey, final String consumerGroupId, final String
topicName) {
+ if (recoveredTopicKeys.contains(topicFileKey)) {
+ return;
+ }
+ synchronized (getTopicPersistLock(topicFileKey)) {
+ if (recoveredTopicKeys.contains(topicFileKey)) {
+ return;
+ }
+ try {
+ final TopicProgressSnapshot snapshot =
readTopicProgressSnapshot(topicFileKey);
+ for (final Map.Entry<String, ConsensusSubscriptionCommitState> entry :
+ snapshot.states.entrySet()) {
+ final CommitStateKey recoveredStateKey =
+ getCommitStateKey(consumerGroupId, topicName, entry.getKey());
+ commitStates.putIfAbsent(recoveredStateKey.stateKey,
entry.getValue());
+ }
+ topicProgressIndexes.put(topicFileKey, snapshot.indexEntries);
+ } catch (final IOException e) {
+ LOGGER.warn(
+ "Failed to recover consensus subscription progress for
consumerGroupId={}, "
+ + "topicName={}",
+ consumerGroupId,
+ topicName,
+ e);
+ topicProgressIndexes.put(topicFileKey, Collections.emptyMap());
+ } finally {
+ recoveredTopicKeys.add(topicFileKey);
+ }
+ }
+ }
+
+ // Recovery reads .meta sequentially and rebuilds the runtime offset index.
+ private TopicProgressSnapshot readTopicProgressSnapshot(final String
topicFileKey)
+ throws IOException {
+ final File metaFile = getMetaFile(topicFileKey);
+ if (!metaFile.exists()) {
+ return TopicProgressSnapshot.empty();
+ }
+ try {
+ final ByteBuffer buffer =
ByteBuffer.wrap(Files.readAllBytes(metaFile.toPath()));
+ final int version = ReadWriteIOUtils.readInt(buffer);
+ if (version != TOPIC_PROGRESS_FILE_VERSION) {
+ throw new IOException(
+ "Unsupported consensus subscription progress file version " +
version);
Review Comment:
Done in 5ea428025e. The unsupported progress file version message now uses
DataNodeMiscMessages en/zh constants.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]