VGalaxies commented on code in PR #17238:
URL: https://github.com/apache/iotdb/pull/17238#discussion_r3346562437


##########
iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java:
##########
@@ -526,6 +546,37 @@ void commit(final List<SubscriptionCommitContext> 
subscriptionCommitContexts, fi
       throw new SubscriptionConnectionException(e.getMessage(), e);
     }
     verifyPipeSubscribeSuccess(resp.status);
+    final PipeSubscribeCommitResp commitResp = 
PipeSubscribeCommitResp.fromTPipeSubscribeResp(resp);
+    return new CommitResult(
+        commitResp.getAcceptedCommitContexts(), 
commitResp.getCommittedProgressByTopic());
+  }
+
+  static final class CommitResult {
+
+    private final List<SubscriptionCommitContext> acceptedCommitContexts;
+
+    private final Map<String, TopicProgress> committedProgressByTopic;
+
+    private CommitResult(
+        final List<SubscriptionCommitContext> acceptedCommitContexts,
+        final Map<String, TopicProgress> committedProgressByTopic) {
+      this.acceptedCommitContexts =
+          acceptedCommitContexts == null ? Collections.emptyList() : 
acceptedCommitContexts;
+      this.committedProgressByTopic =
+          committedProgressByTopic == null ? Collections.emptyMap() : 
committedProgressByTopic;
+    }
+
+    static CommitResult empty() {
+      return new CommitResult(Collections.emptyList(), Collections.emptyMap());
+    }

Review Comment:
   Done in 5ea428025e. CommitResult.empty() now returns a static reusable EMPTY 
instance.



##########
iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties:
##########
@@ -54,6 +54,12 @@ dn_data_region_consensus_port=10760
 schema_replication_factor=1
 data_replication_factor=1
 
+####################
+### Subscription Consensus Configuration
+####################
+
+# subscription_consensus_idle_safe_hlc_interval_ms=10000
+

Review Comment:
   Done in 5ea428025e. This property is no longer added to 
iotdb-system.properties; it remains in the template instead.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java:
##########
@@ -238,42 +225,136 @@ public List<SubscriptionCommitContext> commit(
     return allSuccessful;
   }
 
-  public void seek(
-      final ConsumerConfig consumerConfig, final String topicName, final short 
seekType) {
-    final String consumerGroupId = consumerConfig.getConsumerGroupId();
+  public int refreshInFlightEventLeases(
+      final ConsumerConfig consumerConfig,
+      final List<SubscriptionCommitContext> processorBufferedCommitContexts) {
+    if (Objects.isNull(processorBufferedCommitContexts)
+        || processorBufferedCommitContexts.isEmpty()) {
+      return 0;
+    }
 
-    final ConsensusSubscriptionBroker consensusBroker =
+    final String consumerGroupId = consumerConfig.getConsumerGroupId();
+    final String consumerId = consumerConfig.getConsumerId();
+    final ISubscriptionBroker pipeBroker = 
consumerGroupIdToPipeBroker.get(consumerGroupId);
+    final ISubscriptionBroker consensusBroker =
         consumerGroupIdToConsensusBroker.get(consumerGroupId);
-    if (Objects.nonNull(consensusBroker) && 
consensusBroker.hasQueue(topicName)) {
-      ensureConsensusSeekRuntimeAvailable(consumerGroupId, topicName, "seek");
-      if (seekType != PipeSubscribeSeekReq.SEEK_TO_BEGINNING
-          && seekType != PipeSubscribeSeekReq.SEEK_TO_END) {
-        final String errorMessage =
-            String.format(
-                "Subscription: consensus seek only supports beginning/end or 
topic progress, "
-                    + "consumerGroup=%s, topic=%s, seekType=%s",
-                consumerGroupId, topicName, seekType);
-        LOGGER.warn(errorMessage);
-        throw new SubscriptionException(errorMessage);
+
+    final List<SubscriptionCommitContext> pipeContexts = new ArrayList<>();
+    final List<SubscriptionCommitContext> consensusContexts = new 
ArrayList<>();
+    for (final SubscriptionCommitContext ctx : 
processorBufferedCommitContexts) {
+      if (Objects.isNull(ctx) || Objects.isNull(ctx.getTopicName())) {
+        continue;
       }
-      consensusBroker.seek(topicName, seekType);
+      if (Objects.nonNull(consensusBroker)
+          && 
ConsensusSubscriptionSetupHandler.isConsensusBasedTopic(ctx.getTopicName())) {
+        consensusContexts.add(ctx);
+      } else {
+        pipeContexts.add(ctx);
+      }
+    }
+
+    int refreshedCount = 0;
+    if (Objects.nonNull(pipeBroker) && !pipeContexts.isEmpty()) {
+      refreshedCount += pipeBroker.refreshInFlightEventLeases(consumerId, 
pipeContexts);
+    }
+    if (Objects.nonNull(consensusBroker) && !consensusContexts.isEmpty()) {
+      refreshedCount += consensusBroker.refreshInFlightEventLeases(consumerId, 
consensusContexts);
+    }
+    return refreshedCount;
+  }
+
+  public Map<String, TopicProgress> getConsensusCommittedProgressByTopic(
+      final ConsumerConfig consumerConfig,
+      final List<SubscriptionCommitContext> acceptedCommitContexts,
+      final boolean nack) {
+    if (nack || acceptedCommitContexts.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    final Map<String, Set<String>> regionIdsByTopic = new LinkedHashMap<>();
+    for (final SubscriptionCommitContext commitContext : 
acceptedCommitContexts) {
+      if (Objects.isNull(commitContext) || 
Objects.isNull(commitContext.getTopicName())) {
+        continue;
+      }
+      final String topicName = commitContext.getTopicName();
+      if (!ConsensusSubscriptionSetupHandler.isConsensusBasedTopic(topicName)) 
{
+        continue;
+      }
+      final String regionId = commitContext.getRegionId();
+      if (Objects.isNull(regionId) || regionId.isEmpty()) {
+        continue;
+      }
+      regionIdsByTopic.computeIfAbsent(topicName, ignored -> new 
LinkedHashSet<>()).add(regionId);
+    }
+
+    final String consumerGroupId = consumerConfig.getConsumerGroupId();
+    final Map<String, TopicProgress> result = new LinkedHashMap<>();
+    for (final Map.Entry<String, Set<String>> entry : 
regionIdsByTopic.entrySet()) {
+      final String topicName = entry.getKey();
+      final Map<String, RegionProgress> regionProgressById = new 
LinkedHashMap<>();
+      for (final String regionId : entry.getValue()) {
+        putRegionProgress(
+            regionProgressById,
+            regionId,
+            getConsensusCommittedRegionProgress(consumerGroupId, topicName, 
regionId));
+      }
+      if (!regionProgressById.isEmpty()) {
+        result.put(topicName, new TopicProgress(regionProgressById));
+      }
+    }
+    return result;
+  }
+
+  private RegionProgress getConsensusCommittedRegionProgress(
+      final String consumerGroupId, final String topicName, final String 
regionId) {
+    try {
+      return ConsensusSubscriptionCommitManager.getInstance()
+          .getCommittedRegionProgress(
+              consumerGroupId, topicName, 
ConsensusGroupId.Factory.createFromString(regionId));
+    } catch (final IllegalArgumentException e) {
+      LOGGER.warn(
+          "Subscription: failed to parse consensus region id {} for committed 
progress, topic={}, consumerGroup={}",
+          regionId,
+          topicName,
+          consumerGroupId,
+          e);
+      return null;
+    }
+  }
+
+  private void putRegionProgress(
+      final Map<String, RegionProgress> regionProgressById,
+      final String regionId,
+      final RegionProgress regionProgress) {
+    if (Objects.isNull(regionId)
+        || regionId.isEmpty()
+        || Objects.isNull(regionProgress)
+        || regionProgress.getWriterPositions().isEmpty()) {
       return;
     }
+    regionProgressById.put(regionId, regionProgress);
+  }
 
-    if (isConsensusRuntimeUnsupported(topicName)) {
+  public void seek(
+      final ConsumerConfig consumerConfig, final String topicName, final short 
seekType) {
+    final String consumerGroupId = consumerConfig.getConsumerGroupId();
+
+    if (seekType != SubscriptionSeekReq.SEEK_TO_BEGINNING
+        && seekType != SubscriptionSeekReq.SEEK_TO_END) {
       final String errorMessage =
-          buildUnsupportedConsensusRuntimeMessage(consumerGroupId, topicName, 
"seek");
+          String.format(
+              "Subscription: consensus seek only supports beginning/end or 
topic progress, "
+                  + "consumerGroup=%s, topic=%s, seekType=%s",
+              consumerGroupId, topicName, seekType);
       LOGGER.warn(errorMessage);
       throw new SubscriptionException(errorMessage);
     }
 
-    final String errorMessage =
-        String.format(
-            "Subscription: seek is only supported for consensus-based 
subscriptions, "
-                + "consumerGroup=%s, topic=%s",
-            consumerGroupId, topicName);
-    LOGGER.warn(errorMessage);
-    throw new SubscriptionException(errorMessage);
+    final ConsensusSubscriptionBroker consensusBroker =
+        getConsensusBrokerForSeekOrNoOp(consumerGroupId, topicName, "seek");
+    if (Objects.nonNull(consensusBroker)) {
+      consensusBroker.seek(topicName, seekType);
+    }
   }

Review Comment:
   Done in aa6810e062. ISubscriptionBroker now exposes 
acceptsTopic()/selectAcceptedCommitContexts(), so the agent delegates 
accept/reject routing to each broker instead of hard-coding broker type 
decisions in commit/lease paths.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java:
##########
@@ -507,16 +592,13 @@ public boolean dropBroker(final String consumerGroupId) {
   public void bindPrefetchingQueue(final SubscriptionSinkSubtask subtask) {
     final String consumerGroupId = subtask.getConsumerGroupId();
     consumerGroupIdToPipeBroker
-        .compute(
+        .computeIfAbsent(
             consumerGroupId,
-            (id, broker) -> {
-              if (Objects.isNull(broker)) {
-                LOGGER.info(
-                    "Subscription: pipe broker bound to consumer group [{}] 
does not exist, create new for binding prefetching queue",
-                    consumerGroupId);
-                return new SubscriptionBroker(consumerGroupId);
-              }
-              return broker;
+            id -> {
+              LOGGER.info(
+                  "Subscription: pipe broker bound to consumer group [{}] does 
not exist, create new for binding prefetching queue",
+                  consumerGroupId);

Review Comment:
   Done in 5ea428025e. The create/drop/non-empty broker log messages now use 
DataNodeMiscMessages en/zh constants.



##########
iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/ColumnAlignProcessor.java:
##########
@@ -63,39 +70,72 @@ public List<SubscriptionMessage> flush() {
     return Collections.emptyList();
   }
 
-  private void fillTablet(final Tablet tablet) {
-    final String deviceKey = getDeviceKey(tablet);
-    final Map<String, Object> cache = lastValues.computeIfAbsent(deviceKey, k 
-> new HashMap<>());
+  @Override
+  public void reset() {
+    lastValues.clear();
+  }
+
+  @Override
+  public boolean supportsTopicScopedReset() {
+    return true;
+  }
+
+  @Override
+  public void reset(final String topicName) {
+    lastValues.remove(topicName);
+  }
+
+  private void fillTablet(final String topicName, final Tablet tablet) {
+    final Map<String, Map<String, Pair<Long, Object>>> topicCache =
+        lastValues.computeIfAbsent(topicName, ignored -> new HashMap<>());
 
     final Object[] values = tablet.getValues();
     final BitMap[] bitMaps = tablet.getBitMaps();
     final int rowSize = tablet.getRowSize();
     final int columnCount = values.length;
 
     for (int row = 0; row < rowSize; row++) {
+      final long timestamp = tablet.getTimestamp(row);
+      final String deviceKey = getDeviceKey(tablet, row);
+      if (deviceKey == null) {
+        continue;
+      }
+      final Map<String, Pair<Long, Object>> cache =
+          topicCache.computeIfAbsent(deviceKey, ignored -> new HashMap<>());
       for (int col = 0; col < columnCount; col++) {
         final String columnKey = getColumnKey(tablet, col);
         final boolean isNull =
             bitMaps != null && bitMaps[col] != null && 
bitMaps[col].isMarked(row);
         if (isNull) {
-          // try forward-fill from cache
-          final Object cached = cache.get(columnKey);
-          if (cached != null) {
-            setValueAt(values[col], row, cached);
+          final Pair<Long, Object> cached = cache.get(columnKey);
+          if (cached != null && cached.left < timestamp) {
+            setValueAt(values[col], row, cached.right);
             bitMaps[col].unmark(row);
           }
         } else {
-          // update cache with this non-null value
-          cache.put(columnKey, getValueAt(values[col], row));
+          final Pair<Long, Object> cached = cache.get(columnKey);
+          if (cached == null || timestamp >= cached.left) {
+            cache.put(columnKey, new Pair<>(timestamp, getValueAt(values[col], 
row)));
+          }

Review Comment:
   Done in 5ea428025e. The cached Pair is now updated in place when an existing 
cache entry is reused.



##########
iotdb-client/subscription/src/test/java/org/apache/iotdb/session/subscription/consumer/base/ColumnAlignProcessorTest.java:
##########
@@ -80,6 +82,73 @@ public void testNewColumnDoesNotReuseOldIndexCache() {
     Assert.assertTrue(schemaChangedTablet.getBitMaps()[1].isMarked(0));
   }
 
+  @Test
+  public void testTableModelForwardFillDoesNotCrossDevices() {
+    final ColumnAlignProcessor processor = new ColumnAlignProcessor();
+
+    processor.process(
+        Collections.singletonList(
+            messageForTablet(
+                tableModelTablet(new long[] {1L}, new String[] {"id1"}, new 
Long[] {11L}, false))));
+
+    final Tablet differentDeviceTablet =
+        tableModelTablet(new long[] {2L}, new String[] {"id2"}, new Long[] 
{0L}, true);
+
+    
processor.process(Collections.singletonList(messageForTablet(differentDeviceTablet)));
+
+    Assert.assertTrue(differentDeviceTablet.getBitMaps()[1].isMarked(0));
+
+    final Tablet sameDeviceTablet =
+        tableModelTablet(new long[] {3L}, new String[] {"id1"}, new Long[] 
{0L}, true);
+
+    
processor.process(Collections.singletonList(messageForTablet(sameDeviceTablet)));
+
+    Assert.assertEquals(11L, ((long[]) sameDeviceTablet.getValues()[1])[0]);
+    Assert.assertFalse(sameDeviceTablet.getBitMaps()[1].isMarked(0));
+  }
+
+  @Test
+  public void testForwardFillOnlyUsesEarlierTimestamp() {
+    final ColumnAlignProcessor processor = new ColumnAlignProcessor();
+
+    processor.process(
+        Collections.singletonList(
+            messageForTablet(tabletWithSingleRow(new String[] {"s1"}, new 
long[] {100L}, 10L))));
+
+    final Tablet olderNullTablet =
+        tabletWithSingleRow(new String[] {"s1"}, new long[] {0L}, 5L, true);
+
+    
processor.process(Collections.singletonList(messageForTablet(olderNullTablet)));
+
+    Assert.assertTrue(olderNullTablet.getBitMaps()[0].isMarked(0));
+
+    processor.process(
+        Collections.singletonList(
+            messageForTablet(tabletWithSingleRow(new String[] {"s1"}, new 
long[] {50L}, 6L))));
+
+    final Tablet newerNullTablet =
+        tabletWithSingleRow(new String[] {"s1"}, new long[] {0L}, 11L, true);
+
+    
processor.process(Collections.singletonList(messageForTablet(newerNullTablet)));
+
+    Assert.assertEquals(100L, ((long[]) newerNullTablet.getValues()[0])[0]);
+    Assert.assertFalse(newerNullTablet.getBitMaps()[0].isMarked(0));
+  }
+
+  @Test
+  public void testTableModelWithoutDeviceIdentityDoesNotForwardFill() {
+    final ColumnAlignProcessor processor = new ColumnAlignProcessor();
+
+    processor.process(
+        
Collections.singletonList(messageForTablet(fieldOnlyTableModelTablet(1L, 11L, 
false))));
+
+    final Tablet ambiguousTablet = fieldOnlyTableModelTablet(2L, 0L, true);
+
+    
processor.process(Collections.singletonList(messageForTablet(ambiguousTablet)));
+
+    Assert.assertTrue(ambiguousTablet.getBitMaps()[0].isMarked(0));
+  }

Review Comment:
   Done in 5ea428025e. Table-mode tests now cover both no-tag and all-tag-null 
devices, and those rows are forward-filled as the same specific device.



##########
iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/ColumnAlignProcessor.java:
##########
@@ -63,39 +70,72 @@ public List<SubscriptionMessage> flush() {
     return Collections.emptyList();
   }
 
-  private void fillTablet(final Tablet tablet) {
-    final String deviceKey = getDeviceKey(tablet);
-    final Map<String, Object> cache = lastValues.computeIfAbsent(deviceKey, k 
-> new HashMap<>());
+  @Override
+  public void reset() {
+    lastValues.clear();
+  }
+
+  @Override
+  public boolean supportsTopicScopedReset() {
+    return true;
+  }
+
+  @Override
+  public void reset(final String topicName) {
+    lastValues.remove(topicName);
+  }
+
+  private void fillTablet(final String topicName, final Tablet tablet) {
+    final Map<String, Map<String, Pair<Long, Object>>> topicCache =
+        lastValues.computeIfAbsent(topicName, ignored -> new HashMap<>());
 
     final Object[] values = tablet.getValues();
     final BitMap[] bitMaps = tablet.getBitMaps();
     final int rowSize = tablet.getRowSize();
     final int columnCount = values.length;
 
     for (int row = 0; row < rowSize; row++) {
+      final long timestamp = tablet.getTimestamp(row);
+      final String deviceKey = getDeviceKey(tablet, row);
+      if (deviceKey == null) {
+        continue;
+      }
+      final Map<String, Pair<Long, Object>> cache =
+          topicCache.computeIfAbsent(deviceKey, ignored -> new HashMap<>());
       for (int col = 0; col < columnCount; col++) {
         final String columnKey = getColumnKey(tablet, col);
         final boolean isNull =
             bitMaps != null && bitMaps[col] != null && 
bitMaps[col].isMarked(row);
         if (isNull) {
-          // try forward-fill from cache
-          final Object cached = cache.get(columnKey);
-          if (cached != null) {
-            setValueAt(values[col], row, cached);
+          final Pair<Long, Object> cached = cache.get(columnKey);
+          if (cached != null && cached.left < timestamp) {
+            setValueAt(values[col], row, cached.right);
             bitMaps[col].unmark(row);
           }
         } else {
-          // update cache with this non-null value
-          cache.put(columnKey, getValueAt(values[col], row));
+          final Pair<Long, Object> cached = cache.get(columnKey);
+          if (cached == null || timestamp >= cached.left) {
+            cache.put(columnKey, new Pair<>(timestamp, getValueAt(values[col], 
row)));
+          }
         }
       }
     }
   }
 
-  private static String getDeviceKey(final Tablet tablet) {
-    // tree model uses deviceId; table model uses tableName
+  private static String getDeviceKey(final Tablet tablet, final int row) {
+    if (hasTableDeviceIdentity(tablet)) {
+      return "table:" + tablet.getDeviceID(row);
+    }
+
     final String deviceId = tablet.getDeviceId();
-    return deviceId != null ? deviceId : tablet.getTableName();
+    return deviceId != null && deviceId.startsWith(TREE_MODEL_DEVICE_PREFIX)
+        ? "tree:" + deviceId
+        : null;
+  }
+
+  private static boolean hasTableDeviceIdentity(final Tablet tablet) {
+    final List<ColumnCategory> columnTypes = tablet.getColumnTypes();
+    return columnTypes != null && columnTypes.stream().anyMatch(type -> type 
== ColumnCategory.TAG);

Review Comment:
   Done in 5ea428025e. ColumnAlignProcessor now takes the dialect in the 
constructor, removed tag-column auto detection, and table mode keys the cache 
by IDeviceID.



##########
iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumer.java:
##########
@@ -279,6 +303,139 @@ protected AbstractSubscriptionPullConsumer addProcessor(
     return this;
   }
 
+  @Override
+  List<SubscriptionCommitContext> getProcessorBufferedCommitContexts(final int 
dataNodeId) {
+    if (processorBufferedCommitContexts.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    final List<SubscriptionCommitContext> result = new ArrayList<>();
+    for (final SubscriptionCommitContext commitContext : 
processorBufferedCommitContexts) {
+      if (Objects.nonNull(commitContext) && commitContext.getDataNodeId() == 
dataNodeId) {
+        result.add(commitContext);
+      }
+    }
+    return result;
+  }
+
+  private void refreshProcessorBufferedCommitContexts() {
+    processorBufferedCommitContexts.clear();
+    for (final SubscriptionMessageProcessor processor : processors) {
+      final List<SubscriptionCommitContext> bufferedCommitContexts =
+          processor.getBufferedCommitContexts();
+      if (Objects.isNull(bufferedCommitContexts)) {
+        continue;
+      }
+      for (final SubscriptionCommitContext commitContext : 
bufferedCommitContexts) {
+        if (Objects.nonNull(commitContext) && commitContext.isCommittable()) {
+          processorBufferedCommitContexts.add(commitContext);
+        }
+      }
+    }
+  }
+
+  private List<SubscriptionMessage> drainProcessorPipeline() {
+    List<SubscriptionMessage> drainedMessages = Collections.emptyList();
+    for (final SubscriptionMessageProcessor processor : processors) {
+      if (!drainedMessages.isEmpty()) {
+        drainedMessages = processor.process(drainedMessages);
+        if (Objects.isNull(drainedMessages)) {
+          drainedMessages = Collections.emptyList();
+        }
+      }
+
+      final List<SubscriptionMessage> flushedMessages = processor.flush();
+      if (Objects.nonNull(flushedMessages) && !flushedMessages.isEmpty()) {
+        if (drainedMessages.isEmpty()) {
+          drainedMessages = new ArrayList<>(flushedMessages);
+        } else {
+          drainedMessages = new ArrayList<>(drainedMessages);
+          drainedMessages.addAll(flushedMessages);
+        }
+      }
+    }

Review Comment:
   Done in 5ea428025e. The flush() contract now documents ownership transfer, 
and drainProcessorPipeline() avoids allocating a new base list unless it really 
needs to merge non-empty flush results.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusSubscriptionCommitManager.java:
##########
@@ -146,16 +165,20 @@ private ConsensusSubscriptionCommitManager() {
    */
   public ConsensusSubscriptionCommitState getOrCreateState(
       final String consumerGroupId, final String topicName, final 
ConsensusGroupId regionId) {
-    final String key = generateKey(consumerGroupId, topicName, regionId);
     final String regionIdString = regionId.toString();
+    final CommitStateKey stateKey = getCommitStateKey(consumerGroupId, 
topicName, regionIdString);
+    final ConsensusSubscriptionCommitState existing = 
commitStates.get(stateKey.stateKey);

Review Comment:
   Done in 5ea428025e. Renamed CommitStateKey.stateKey to encodedStateKey.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusSubscriptionCommitManager.java:
##########
@@ -110,17 +122,24 @@ public class ConsensusSubscriptionCommitManager {
 
   /** Single-threaded executor for fire-and-forget broadcasts. */
   private final ExecutorService broadcastExecutor =
-      Executors.newSingleThreadExecutor(
-          r -> {
-            final Thread t = new Thread(r, "SubscriptionProgressBroadcast");
-            t.setDaemon(true);
-            return t;
-          });
+      IoTDBThreadPoolFactory.newSingleThreadExecutor(
+          ThreadName.SUBSCRIPTION_CONSENSUS_PROGRESS_BROADCASTER.getName());
 
   /** Key: "consumerGroupId##topicName##regionId" -> progress tracking state */
   private final Map<String, ConsensusSubscriptionCommitState> commitStates =
       new ConcurrentHashMap<>();
 
+  private final Map<String, CommitStateKey> commitStateKeys = new 
ConcurrentHashMap<>();
+
+  private final Set<String> recoveredTopicKeys = ConcurrentHashMap.newKeySet();
+
+  private final Map<String, Object> topicPersistLocks = new 
ConcurrentHashMap<>();
+
+  // Runtime random-write index. The on-disk .meta file is the only source of 
truth; this map is
+  // rebuilt from .meta during recovery and is never persisted separately.
+  private final Map<String, Map<String, ProgressIndexEntry>> 
topicProgressIndexes =
+      new ConcurrentHashMap<>();
+

Review Comment:
   Done in 5ea428025e. Added comments explaining commitStateKeys and the 
outer/inner keys of topicProgressIndexes.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java:
##########
@@ -238,42 +225,136 @@ public List<SubscriptionCommitContext> commit(
     return allSuccessful;
   }
 
-  public void seek(
-      final ConsumerConfig consumerConfig, final String topicName, final short 
seekType) {
-    final String consumerGroupId = consumerConfig.getConsumerGroupId();
+  public int refreshInFlightEventLeases(
+      final ConsumerConfig consumerConfig,
+      final List<SubscriptionCommitContext> processorBufferedCommitContexts) {
+    if (Objects.isNull(processorBufferedCommitContexts)
+        || processorBufferedCommitContexts.isEmpty()) {
+      return 0;
+    }
 
-    final ConsensusSubscriptionBroker consensusBroker =
+    final String consumerGroupId = consumerConfig.getConsumerGroupId();
+    final String consumerId = consumerConfig.getConsumerId();
+    final ISubscriptionBroker pipeBroker = 
consumerGroupIdToPipeBroker.get(consumerGroupId);
+    final ISubscriptionBroker consensusBroker =
         consumerGroupIdToConsensusBroker.get(consumerGroupId);

Review Comment:
   Done in aa6810e062. SubscriptionBrokerAgent now stores brokers in one 
consumerGroupId -> broker list map instead of separate pipe/consensus maps.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusPrefetchingQueue.java:
##########
@@ -1129,7 +1125,40 @@ public SubscriptionEvent pollTablets(
                 "ConsensusPrefetchingQueue %s: no in-flight event for consumer 
%s, commit context %s",
                 this, consumerId, commitContext));
       }
-      return event;
+      if (Objects.isNull(event.getCurrentResponse())
+          || event.getCurrentResponse().getResponseType()
+              != SubscriptionPollResponseType.TABLETS.getType()
+          || !(event.getCurrentResponse().getPayload() instanceof 
TabletsPayload)) {
+        return generateErrorResponse(
+            String.format(
+                "ConsensusPrefetchingQueue %s: unexpected in-flight response 
for consumer %s, "
+                    + "commit context %s, offset %s",

Review Comment:
   Done in 5ea428025e. The unexpected in-flight consensus response message now 
uses a DataNodeMiscMessages i18n constant.



##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java:
##########


Review Comment:
   Done in 5ea428025e. The hard-coded consensus group message now uses the 
existing i18n message constant.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java:
##########
@@ -397,22 +473,32 @@ private String buildUnsupportedConsensusRuntimeMessage(
   }
 
   public boolean isCommitContextOutdated(final SubscriptionCommitContext 
commitContext) {
-    final String consumerGroupId = commitContext.getConsumerGroupId();
-    final String topicName = commitContext.getTopicName();
+    final ISubscriptionBroker broker =
+        getBrokerForTopic(commitContext.getConsumerGroupId(), 
commitContext.getTopicName());
+    return Objects.isNull(broker) || 
broker.isCommitContextOutdated(commitContext);
+  }
 
-    // Try consensus broker first
+  private ISubscriptionBroker getBrokerForTopic(
+      final String consumerGroupId, final String topicName) {
     final ConsensusSubscriptionBroker consensusBroker =
         consumerGroupIdToConsensusBroker.get(consumerGroupId);
     if (Objects.nonNull(consensusBroker) && 
consensusBroker.hasQueue(topicName)) {
-      return consensusBroker.isCommitContextOutdated(commitContext);
+      return consensusBroker;
     }
+    return consumerGroupIdToPipeBroker.get(consumerGroupId);
+  }
 
-    // Fall back to pipe broker
-    final SubscriptionBroker pipeBroker = 
consumerGroupIdToPipeBroker.get(consumerGroupId);
-    if (Objects.isNull(pipeBroker)) {
-      return true;
+  private ISubscriptionBroker getBrokerForTopicOrThrow(
+      final String consumerGroupId, final String topicName) {
+    final ISubscriptionBroker broker = getBrokerForTopic(consumerGroupId, 
topicName);
+    if (Objects.nonNull(broker)) {
+      return broker;
     }
-    return pipeBroker.isCommitContextOutdated(commitContext);
+    final String errorMessage =
+        String.format(
+            "Subscription: broker bound to consumer group [%s] does not 
exist", consumerGroupId);

Review Comment:
   Done in 5ea428025e. The broker messages in this area were moved to 
DataNodeMiscMessages with both en/zh entries.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusSubscriptionCommitManager.java:
##########
@@ -529,33 +570,210 @@ public void receiveProgressBroadcast(
 
   // ======================== Helper Methods ========================
 
-  // Use a separator that cannot appear in consumerGroupId, topicName, or 
regionId
-  // to prevent key collisions (e.g., "a_b" + "c" vs "a" + "b_c").
+  // Kept as the in-memory and ConfigNode sync key separator for the existing 
progress protocol.
   private static final String KEY_SEPARATOR = "##";
 
   private String generateKey(
       final String consumerGroupId, final String topicName, final 
ConsensusGroupId regionId) {
-    return consumerGroupId + KEY_SEPARATOR + topicName + KEY_SEPARATOR + 
regionId.toString();
+    return generateKey(consumerGroupId, topicName, regionId.toString());
   }
 
-  private File getProgressFile(final String key) {
-    return new File(persistDir, PROGRESS_FILE_PREFIX + key + 
PROGRESS_FILE_SUFFIX);
+  private String generateKey(
+      final String consumerGroupId, final String topicName, final String 
regionIdStr) {
+    return consumerGroupId + KEY_SEPARATOR + topicName + KEY_SEPARATOR + 
regionIdStr;
   }
 
-  private ConsensusSubscriptionCommitState tryRecover(final String key, final 
String regionIdStr) {
-    final File file = getProgressFile(key);
-    if (!file.exists()) {
+  private CommitStateKey getCommitStateKey(
+      final String consumerGroupId, final String topicName, final String 
regionIdStr) {
+    final String stateKey = generateKey(consumerGroupId, topicName, 
regionIdStr);
+    return commitStateKeys.computeIfAbsent(
+        stateKey,
+        ignored ->
+            new CommitStateKey(
+                consumerGroupId,
+                topicName,
+                regionIdStr,
+                stateKey,
+                generateTopicFileKey(consumerGroupId, topicName)));
+  }
+
+  private String generateTopicFileKey(final String consumerGroupId, final 
String topicName) {
+    return encodeFileNameComponent(consumerGroupId)
+        + KEY_SEPARATOR
+        + encodeFileNameComponent(topicName);
+  }
+
+  private static String encodeFileNameComponent(final String value) {
+    return Base64.getUrlEncoder()
+        .withoutPadding()
+        
.encodeToString(String.valueOf(value).getBytes(StandardCharsets.UTF_8));
+  }
+
+  private static String decodeFileNameComponent(final String value) {
+    switch (value.length() & 3) {
+      case 0:
+        return new String(Base64.getUrlDecoder().decode(value), 
StandardCharsets.UTF_8);
+      case 2:
+        return new String(Base64.getUrlDecoder().decode(value + "=="), 
StandardCharsets.UTF_8);
+      case 3:
+        return new String(Base64.getUrlDecoder().decode(value + "="), 
StandardCharsets.UTF_8);
+      default:
+        throw new IllegalArgumentException("Invalid base64 url component 
length");
+    }
+  }
+
+  private static String[] decodeTopicFileKey(final String topicFileKey) {
+    final int separatorIndex = topicFileKey.indexOf(KEY_SEPARATOR);
+    if (separatorIndex < 0) {
       return null;
     }
-    try (final FileInputStream fis = new FileInputStream(file)) {
-      final byte[] bytes = new byte[(int) file.length()];
-      fis.read(bytes);
-      final ByteBuffer buffer = ByteBuffer.wrap(bytes);
-      return ConsensusSubscriptionCommitState.deserialize(regionIdStr, buffer);
-    } catch (final IOException e) {
-      LOGGER.warn("Failed to recover consensus subscription progress from {}", 
file, e);
+    try {
+      return new String[] {
+        decodeFileNameComponent(topicFileKey.substring(0, separatorIndex)),
+        decodeFileNameComponent(topicFileKey.substring(separatorIndex + 
KEY_SEPARATOR.length()))
+      };
+    } catch (final IllegalArgumentException e) {
+      return null;
+    }
+  }
+
+  private Object getTopicPersistLock(final String topicFileKey) {
+    return topicPersistLocks.computeIfAbsent(topicFileKey, ignored -> new 
Object());
+  }
+
+  private File getMetaFile(final String topicFileKey) {
+    return new File(persistDir, PROGRESS_FILE_PREFIX + topicFileKey + 
PROGRESS_META_FILE_SUFFIX);
+  }
+
+  private File getLegacyIndexFile(final String topicFileKey) {
+    return new File(
+        persistDir, PROGRESS_FILE_PREFIX + topicFileKey + 
LEGACY_PROGRESS_INDEX_FILE_SUFFIX);
+  }
+
+  private void recoverAllTopicStatesIfNeeded() {
+    final File dir = new File(persistDir);
+    final File[] metaFiles =
+        dir.listFiles(
+            (ignored, name) ->
+                name.startsWith(PROGRESS_FILE_PREFIX) && 
name.endsWith(PROGRESS_META_FILE_SUFFIX));
+    if (Objects.isNull(metaFiles)) {
+      return;
+    }
+    for (final File metaFile : metaFiles) {
+      final String topicFileKey = extractTopicFileKey(metaFile.getName());
+      if (Objects.isNull(topicFileKey) || 
recoveredTopicKeys.contains(topicFileKey)) {
+        continue;
+      }
+      final String[] decodedTopicKey = decodeTopicFileKey(topicFileKey);
+      if (Objects.isNull(decodedTopicKey)) {
+        LOGGER.warn(
+            "Skip malformed consensus subscription progress file name {}", 
metaFile.getName());
+        continue;
+      }
+      recoverTopicStatesIfNeeded(topicFileKey, decodedTopicKey[0], 
decodedTopicKey[1]);
+    }
+  }
+
+  private String extractTopicFileKey(final String metaFileName) {
+    if (!metaFileName.startsWith(PROGRESS_FILE_PREFIX)
+        || !metaFileName.endsWith(PROGRESS_META_FILE_SUFFIX)) {
       return null;
     }
+    return metaFileName.substring(
+        PROGRESS_FILE_PREFIX.length(), metaFileName.length() - 
PROGRESS_META_FILE_SUFFIX.length());
+  }
+
+  private void recoverTopicStatesIfNeeded(final CommitStateKey stateKey) {
+    recoverTopicStatesIfNeeded(stateKey.topicFileKey, 
stateKey.consumerGroupId, stateKey.topicName);
+  }
+
+  private void recoverTopicStatesIfNeeded(
+      final String topicFileKey, final String consumerGroupId, final String 
topicName) {
+    if (recoveredTopicKeys.contains(topicFileKey)) {
+      return;
+    }
+    synchronized (getTopicPersistLock(topicFileKey)) {
+      if (recoveredTopicKeys.contains(topicFileKey)) {
+        return;
+      }
+      try {
+        final TopicProgressSnapshot snapshot = 
readTopicProgressSnapshot(topicFileKey);
+        for (final Map.Entry<String, ConsensusSubscriptionCommitState> entry :
+            snapshot.states.entrySet()) {
+          final CommitStateKey recoveredStateKey =
+              getCommitStateKey(consumerGroupId, topicName, entry.getKey());
+          commitStates.putIfAbsent(recoveredStateKey.stateKey, 
entry.getValue());
+        }
+        topicProgressIndexes.put(topicFileKey, snapshot.indexEntries);
+      } catch (final IOException e) {
+        LOGGER.warn(
+            "Failed to recover consensus subscription progress for 
consumerGroupId={}, "
+                + "topicName={}",
+            consumerGroupId,
+            topicName,
+            e);
+        topicProgressIndexes.put(topicFileKey, Collections.emptyMap());
+      } finally {
+        recoveredTopicKeys.add(topicFileKey);
+      }
+    }
+  }
+
+  // Recovery reads .meta sequentially and rebuilds the runtime offset index.
+  private TopicProgressSnapshot readTopicProgressSnapshot(final String 
topicFileKey)
+      throws IOException {
+    final File metaFile = getMetaFile(topicFileKey);
+    if (!metaFile.exists()) {
+      return TopicProgressSnapshot.empty();
+    }
+    try {
+      final ByteBuffer buffer = 
ByteBuffer.wrap(Files.readAllBytes(metaFile.toPath()));
+      final int version = ReadWriteIOUtils.readInt(buffer);
+      if (version != TOPIC_PROGRESS_FILE_VERSION) {
+        throw new IOException(
+            "Unsupported consensus subscription progress file version " + 
version);

Review Comment:
   Done in 5ea428025e. The unsupported progress file version message now uses 
DataNodeMiscMessages en/zh constants.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to