jt2594838 commented on code in PR #17238:
URL: https://github.com/apache/iotdb/pull/17238#discussion_r3345501697


##########
iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumer.java:
##########
@@ -279,6 +303,139 @@ protected AbstractSubscriptionPullConsumer addProcessor(
     return this;
   }
 
+  @Override
+  List<SubscriptionCommitContext> getProcessorBufferedCommitContexts(final int 
dataNodeId) {
+    if (processorBufferedCommitContexts.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    final List<SubscriptionCommitContext> result = new ArrayList<>();
+    for (final SubscriptionCommitContext commitContext : 
processorBufferedCommitContexts) {
+      if (Objects.nonNull(commitContext) && commitContext.getDataNodeId() == 
dataNodeId) {
+        result.add(commitContext);
+      }
+    }
+    return result;
+  }
+
+  private void refreshProcessorBufferedCommitContexts() {
+    processorBufferedCommitContexts.clear();
+    for (final SubscriptionMessageProcessor processor : processors) {
+      final List<SubscriptionCommitContext> bufferedCommitContexts =
+          processor.getBufferedCommitContexts();
+      if (Objects.isNull(bufferedCommitContexts)) {
+        continue;
+      }
+      for (final SubscriptionCommitContext commitContext : 
bufferedCommitContexts) {
+        if (Objects.nonNull(commitContext) && commitContext.isCommittable()) {
+          processorBufferedCommitContexts.add(commitContext);
+        }
+      }
+    }
+  }
+
+  private List<SubscriptionMessage> drainProcessorPipeline() {
+    List<SubscriptionMessage> drainedMessages = Collections.emptyList();
+    for (final SubscriptionMessageProcessor processor : processors) {
+      if (!drainedMessages.isEmpty()) {
+        drainedMessages = processor.process(drainedMessages);
+        if (Objects.isNull(drainedMessages)) {
+          drainedMessages = Collections.emptyList();
+        }
+      }
+
+      final List<SubscriptionMessage> flushedMessages = processor.flush();
+      if (Objects.nonNull(flushedMessages) && !flushedMessages.isEmpty()) {
+        if (drainedMessages.isEmpty()) {
+          drainedMessages = new ArrayList<>(flushedMessages);
+        } else {
+          drainedMessages = new ArrayList<>(drainedMessages);
+          drainedMessages.addAll(flushedMessages);
+        }
+      }
+    }

Review Comment:
   Why is `drainedMessages` newed every time?
   Will the processor modify the list after `flush()` is called?



##########
iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java:
##########
@@ -526,6 +546,37 @@ void commit(final List<SubscriptionCommitContext> 
subscriptionCommitContexts, fi
       throw new SubscriptionConnectionException(e.getMessage(), e);
     }
     verifyPipeSubscribeSuccess(resp.status);
+    final PipeSubscribeCommitResp commitResp = 
PipeSubscribeCommitResp.fromTPipeSubscribeResp(resp);
+    return new CommitResult(
+        commitResp.getAcceptedCommitContexts(), 
commitResp.getCommittedProgressByTopic());
+  }
+
+  static final class CommitResult {
+
+    private final List<SubscriptionCommitContext> acceptedCommitContexts;
+
+    private final Map<String, TopicProgress> committedProgressByTopic;
+
+    private CommitResult(
+        final List<SubscriptionCommitContext> acceptedCommitContexts,
+        final Map<String, TopicProgress> committedProgressByTopic) {
+      this.acceptedCommitContexts =
+          acceptedCommitContexts == null ? Collections.emptyList() : 
acceptedCommitContexts;
+      this.committedProgressByTopic =
+          committedProgressByTopic == null ? Collections.emptyMap() : 
committedProgressByTopic;
+    }
+
+    static CommitResult empty() {
+      return new CommitResult(Collections.emptyList(), Collections.emptyMap());
+    }

Review Comment:
   May add a static member and reuse it, if this method is called often.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java:
##########
@@ -397,22 +473,32 @@ private String buildUnsupportedConsensusRuntimeMessage(
   }
 
   public boolean isCommitContextOutdated(final SubscriptionCommitContext 
commitContext) {
-    final String consumerGroupId = commitContext.getConsumerGroupId();
-    final String topicName = commitContext.getTopicName();
+    final ISubscriptionBroker broker =
+        getBrokerForTopic(commitContext.getConsumerGroupId(), 
commitContext.getTopicName());
+    return Objects.isNull(broker) || 
broker.isCommitContextOutdated(commitContext);
+  }
 
-    // Try consensus broker first
+  private ISubscriptionBroker getBrokerForTopic(
+      final String consumerGroupId, final String topicName) {
     final ConsensusSubscriptionBroker consensusBroker =
         consumerGroupIdToConsensusBroker.get(consumerGroupId);
     if (Objects.nonNull(consensusBroker) && 
consensusBroker.hasQueue(topicName)) {
-      return consensusBroker.isCommitContextOutdated(commitContext);
+      return consensusBroker;
     }
+    return consumerGroupIdToPipeBroker.get(consumerGroupId);
+  }
 
-    // Fall back to pipe broker
-    final SubscriptionBroker pipeBroker = 
consumerGroupIdToPipeBroker.get(consumerGroupId);
-    if (Objects.isNull(pipeBroker)) {
-      return true;
+  private ISubscriptionBroker getBrokerForTopicOrThrow(
+      final String consumerGroupId, final String topicName) {
+    final ISubscriptionBroker broker = getBrokerForTopic(consumerGroupId, 
topicName);
+    if (Objects.nonNull(broker)) {
+      return broker;
     }
-    return pipeBroker.isCommitContextOutdated(commitContext);
+    final String errorMessage =
+        String.format(
+            "Subscription: broker bound to consumer group [%s] does not 
exist", consumerGroupId);

Review Comment:
   i18n



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java:
##########
@@ -507,16 +592,13 @@ public boolean dropBroker(final String consumerGroupId) {
   public void bindPrefetchingQueue(final SubscriptionSinkSubtask subtask) {
     final String consumerGroupId = subtask.getConsumerGroupId();
     consumerGroupIdToPipeBroker
-        .compute(
+        .computeIfAbsent(
             consumerGroupId,
-            (id, broker) -> {
-              if (Objects.isNull(broker)) {
-                LOGGER.info(
-                    "Subscription: pipe broker bound to consumer group [{}] 
does not exist, create new for binding prefetching queue",
-                    consumerGroupId);
-                return new SubscriptionBroker(consumerGroupId);
-              }
-              return broker;
+            id -> {
+              LOGGER.info(
+                  "Subscription: pipe broker bound to consumer group [{}] does 
not exist, create new for binding prefetching queue",
+                  consumerGroupId);

Review Comment:
   i18n



##########
iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/ColumnAlignProcessor.java:
##########
@@ -63,39 +70,72 @@ public List<SubscriptionMessage> flush() {
     return Collections.emptyList();
   }
 
-  private void fillTablet(final Tablet tablet) {
-    final String deviceKey = getDeviceKey(tablet);
-    final Map<String, Object> cache = lastValues.computeIfAbsent(deviceKey, k 
-> new HashMap<>());
+  @Override
+  public void reset() {
+    lastValues.clear();
+  }
+
+  @Override
+  public boolean supportsTopicScopedReset() {
+    return true;
+  }
+
+  @Override
+  public void reset(final String topicName) {
+    lastValues.remove(topicName);
+  }
+
+  private void fillTablet(final String topicName, final Tablet tablet) {
+    final Map<String, Map<String, Pair<Long, Object>>> topicCache =
+        lastValues.computeIfAbsent(topicName, ignored -> new HashMap<>());
 
     final Object[] values = tablet.getValues();
     final BitMap[] bitMaps = tablet.getBitMaps();
     final int rowSize = tablet.getRowSize();
     final int columnCount = values.length;
 
     for (int row = 0; row < rowSize; row++) {
+      final long timestamp = tablet.getTimestamp(row);
+      final String deviceKey = getDeviceKey(tablet, row);
+      if (deviceKey == null) {
+        continue;
+      }
+      final Map<String, Pair<Long, Object>> cache =
+          topicCache.computeIfAbsent(deviceKey, ignored -> new HashMap<>());
       for (int col = 0; col < columnCount; col++) {
         final String columnKey = getColumnKey(tablet, col);
         final boolean isNull =
             bitMaps != null && bitMaps[col] != null && 
bitMaps[col].isMarked(row);
         if (isNull) {
-          // try forward-fill from cache
-          final Object cached = cache.get(columnKey);
-          if (cached != null) {
-            setValueAt(values[col], row, cached);
+          final Pair<Long, Object> cached = cache.get(columnKey);
+          if (cached != null && cached.left < timestamp) {
+            setValueAt(values[col], row, cached.right);
             bitMaps[col].unmark(row);
           }
         } else {
-          // update cache with this non-null value
-          cache.put(columnKey, getValueAt(values[col], row));
+          final Pair<Long, Object> cached = cache.get(columnKey);
+          if (cached == null || timestamp >= cached.left) {
+            cache.put(columnKey, new Pair<>(timestamp, getValueAt(values[col], 
row)));
+          }

Review Comment:
   If cached != null, may directly update the cache?



##########
iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeHeartbeatReq.java:
##########
@@ -40,11 +56,43 @@ public static PipeSubscribeHeartbeatReq 
toTPipeSubscribeReq() {
     return req;
   }
 
+  /**
+   * Serialize the incoming parameters into `PipeSubscribeHeartbeatReq`, 
called by the subscription
+   * client.
+   */
+  public static PipeSubscribeHeartbeatReq toTPipeSubscribeReq(
+      final List<SubscriptionCommitContext> processorBufferedCommitContexts) 
throws IOException {
+    final PipeSubscribeHeartbeatReq req = toTPipeSubscribeReq();
+    req.processorBufferedCommitContexts =
+        Objects.nonNull(processorBufferedCommitContexts)
+            ? processorBufferedCommitContexts
+            : new ArrayList<>();

Review Comment:
   Collections.emptyList()?



##########
iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeHeartbeatReq.java:
##########
@@ -40,11 +56,43 @@ public static PipeSubscribeHeartbeatReq 
toTPipeSubscribeReq() {
     return req;
   }
 
+  /**
+   * Serialize the incoming parameters into `PipeSubscribeHeartbeatReq`, 
called by the subscription
+   * client.
+   */
+  public static PipeSubscribeHeartbeatReq toTPipeSubscribeReq(

Review Comment:
   Subscription is not restricted to Pipe now; may remove `Pipe` from the 
associated names.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java:
##########
@@ -238,42 +225,136 @@ public List<SubscriptionCommitContext> commit(
     return allSuccessful;
   }
 
-  public void seek(
-      final ConsumerConfig consumerConfig, final String topicName, final short 
seekType) {
-    final String consumerGroupId = consumerConfig.getConsumerGroupId();
+  public int refreshInFlightEventLeases(
+      final ConsumerConfig consumerConfig,
+      final List<SubscriptionCommitContext> processorBufferedCommitContexts) {
+    if (Objects.isNull(processorBufferedCommitContexts)
+        || processorBufferedCommitContexts.isEmpty()) {
+      return 0;
+    }
 
-    final ConsensusSubscriptionBroker consensusBroker =
+    final String consumerGroupId = consumerConfig.getConsumerGroupId();
+    final String consumerId = consumerConfig.getConsumerId();
+    final ISubscriptionBroker pipeBroker = 
consumerGroupIdToPipeBroker.get(consumerGroupId);
+    final ISubscriptionBroker consensusBroker =
         consumerGroupIdToConsensusBroker.get(consumerGroupId);

Review Comment:
   Is it possible to store both in the same map?



##########
iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/ColumnAlignProcessor.java:
##########
@@ -63,39 +70,72 @@ public List<SubscriptionMessage> flush() {
     return Collections.emptyList();
   }
 
-  private void fillTablet(final Tablet tablet) {
-    final String deviceKey = getDeviceKey(tablet);
-    final Map<String, Object> cache = lastValues.computeIfAbsent(deviceKey, k 
-> new HashMap<>());
+  @Override
+  public void reset() {
+    lastValues.clear();
+  }
+
+  @Override
+  public boolean supportsTopicScopedReset() {
+    return true;
+  }
+
+  @Override
+  public void reset(final String topicName) {
+    lastValues.remove(topicName);
+  }
+
+  private void fillTablet(final String topicName, final Tablet tablet) {
+    final Map<String, Map<String, Pair<Long, Object>>> topicCache =
+        lastValues.computeIfAbsent(topicName, ignored -> new HashMap<>());
 
     final Object[] values = tablet.getValues();
     final BitMap[] bitMaps = tablet.getBitMaps();
     final int rowSize = tablet.getRowSize();
     final int columnCount = values.length;
 
     for (int row = 0; row < rowSize; row++) {
+      final long timestamp = tablet.getTimestamp(row);
+      final String deviceKey = getDeviceKey(tablet, row);
+      if (deviceKey == null) {
+        continue;
+      }
+      final Map<String, Pair<Long, Object>> cache =
+          topicCache.computeIfAbsent(deviceKey, ignored -> new HashMap<>());
       for (int col = 0; col < columnCount; col++) {
         final String columnKey = getColumnKey(tablet, col);
         final boolean isNull =
             bitMaps != null && bitMaps[col] != null && 
bitMaps[col].isMarked(row);
         if (isNull) {
-          // try forward-fill from cache
-          final Object cached = cache.get(columnKey);
-          if (cached != null) {
-            setValueAt(values[col], row, cached);
+          final Pair<Long, Object> cached = cache.get(columnKey);
+          if (cached != null && cached.left < timestamp) {
+            setValueAt(values[col], row, cached.right);
             bitMaps[col].unmark(row);
           }
         } else {
-          // update cache with this non-null value
-          cache.put(columnKey, getValueAt(values[col], row));
+          final Pair<Long, Object> cached = cache.get(columnKey);
+          if (cached == null || timestamp >= cached.left) {
+            cache.put(columnKey, new Pair<>(timestamp, getValueAt(values[col], 
row)));
+          }
         }
       }
     }
   }
 
-  private static String getDeviceKey(final Tablet tablet) {
-    // tree model uses deviceId; table model uses tableName
+  private static String getDeviceKey(final Tablet tablet, final int row) {
+    if (hasTableDeviceIdentity(tablet)) {
+      return "table:" + tablet.getDeviceID(row);
+    }
+
     final String deviceId = tablet.getDeviceId();
-    return deviceId != null ? deviceId : tablet.getTableName();
+    return deviceId != null && deviceId.startsWith(TREE_MODEL_DEVICE_PREFIX)
+        ? "tree:" + deviceId
+        : null;
+  }
+
+  private static boolean hasTableDeviceIdentity(final Tablet tablet) {
+    final List<ColumnCategory> columnTypes = tablet.getColumnTypes();
+    return columnTypes != null && columnTypes.stream().anyMatch(type -> type 
== ColumnCategory.TAG);

Review Comment:
   Is it possible to pass the dialect to the processor during construction?
   
   It is invalid to distinguish a table or a tree like this, since a table may 
not have a tag column.
   
   May directly use the IDeviceId as the map key, because two devices may be 
represented by the same string.



##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java:
##########


Review Comment:
   Use i18n



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusSubscriptionCommitManager.java:
##########
@@ -529,33 +570,210 @@ public void receiveProgressBroadcast(
 
   // ======================== Helper Methods ========================
 
-  // Use a separator that cannot appear in consumerGroupId, topicName, or 
regionId
-  // to prevent key collisions (e.g., "a_b" + "c" vs "a" + "b_c").
+  // Kept as the in-memory and ConfigNode sync key separator for the existing 
progress protocol.
   private static final String KEY_SEPARATOR = "##";
 
   private String generateKey(
       final String consumerGroupId, final String topicName, final 
ConsensusGroupId regionId) {
-    return consumerGroupId + KEY_SEPARATOR + topicName + KEY_SEPARATOR + 
regionId.toString();
+    return generateKey(consumerGroupId, topicName, regionId.toString());
   }
 
-  private File getProgressFile(final String key) {
-    return new File(persistDir, PROGRESS_FILE_PREFIX + key + 
PROGRESS_FILE_SUFFIX);
+  private String generateKey(
+      final String consumerGroupId, final String topicName, final String 
regionIdStr) {
+    return consumerGroupId + KEY_SEPARATOR + topicName + KEY_SEPARATOR + 
regionIdStr;
   }
 
-  private ConsensusSubscriptionCommitState tryRecover(final String key, final 
String regionIdStr) {
-    final File file = getProgressFile(key);
-    if (!file.exists()) {
+  private CommitStateKey getCommitStateKey(
+      final String consumerGroupId, final String topicName, final String 
regionIdStr) {
+    final String stateKey = generateKey(consumerGroupId, topicName, 
regionIdStr);
+    return commitStateKeys.computeIfAbsent(
+        stateKey,
+        ignored ->
+            new CommitStateKey(
+                consumerGroupId,
+                topicName,
+                regionIdStr,
+                stateKey,
+                generateTopicFileKey(consumerGroupId, topicName)));
+  }
+
+  private String generateTopicFileKey(final String consumerGroupId, final 
String topicName) {
+    return encodeFileNameComponent(consumerGroupId)
+        + KEY_SEPARATOR
+        + encodeFileNameComponent(topicName);
+  }
+
+  private static String encodeFileNameComponent(final String value) {
+    return Base64.getUrlEncoder()
+        .withoutPadding()
+        
.encodeToString(String.valueOf(value).getBytes(StandardCharsets.UTF_8));
+  }
+
+  private static String decodeFileNameComponent(final String value) {
+    switch (value.length() & 3) {
+      case 0:
+        return new String(Base64.getUrlDecoder().decode(value), 
StandardCharsets.UTF_8);
+      case 2:
+        return new String(Base64.getUrlDecoder().decode(value + "=="), 
StandardCharsets.UTF_8);
+      case 3:
+        return new String(Base64.getUrlDecoder().decode(value + "="), 
StandardCharsets.UTF_8);
+      default:
+        throw new IllegalArgumentException("Invalid base64 url component 
length");
+    }
+  }
+
+  private static String[] decodeTopicFileKey(final String topicFileKey) {
+    final int separatorIndex = topicFileKey.indexOf(KEY_SEPARATOR);
+    if (separatorIndex < 0) {
       return null;
     }
-    try (final FileInputStream fis = new FileInputStream(file)) {
-      final byte[] bytes = new byte[(int) file.length()];
-      fis.read(bytes);
-      final ByteBuffer buffer = ByteBuffer.wrap(bytes);
-      return ConsensusSubscriptionCommitState.deserialize(regionIdStr, buffer);
-    } catch (final IOException e) {
-      LOGGER.warn("Failed to recover consensus subscription progress from {}", 
file, e);
+    try {
+      return new String[] {
+        decodeFileNameComponent(topicFileKey.substring(0, separatorIndex)),
+        decodeFileNameComponent(topicFileKey.substring(separatorIndex + 
KEY_SEPARATOR.length()))
+      };
+    } catch (final IllegalArgumentException e) {
+      return null;
+    }
+  }
+
+  private Object getTopicPersistLock(final String topicFileKey) {
+    return topicPersistLocks.computeIfAbsent(topicFileKey, ignored -> new 
Object());
+  }
+
+  private File getMetaFile(final String topicFileKey) {
+    return new File(persistDir, PROGRESS_FILE_PREFIX + topicFileKey + 
PROGRESS_META_FILE_SUFFIX);
+  }
+
+  private File getLegacyIndexFile(final String topicFileKey) {
+    return new File(
+        persistDir, PROGRESS_FILE_PREFIX + topicFileKey + 
LEGACY_PROGRESS_INDEX_FILE_SUFFIX);
+  }
+
+  private void recoverAllTopicStatesIfNeeded() {
+    final File dir = new File(persistDir);
+    final File[] metaFiles =
+        dir.listFiles(
+            (ignored, name) ->
+                name.startsWith(PROGRESS_FILE_PREFIX) && 
name.endsWith(PROGRESS_META_FILE_SUFFIX));
+    if (Objects.isNull(metaFiles)) {
+      return;
+    }
+    for (final File metaFile : metaFiles) {
+      final String topicFileKey = extractTopicFileKey(metaFile.getName());
+      if (Objects.isNull(topicFileKey) || 
recoveredTopicKeys.contains(topicFileKey)) {
+        continue;
+      }
+      final String[] decodedTopicKey = decodeTopicFileKey(topicFileKey);
+      if (Objects.isNull(decodedTopicKey)) {
+        LOGGER.warn(
+            "Skip malformed consensus subscription progress file name {}", 
metaFile.getName());
+        continue;
+      }
+      recoverTopicStatesIfNeeded(topicFileKey, decodedTopicKey[0], 
decodedTopicKey[1]);
+    }
+  }
+
+  private String extractTopicFileKey(final String metaFileName) {
+    if (!metaFileName.startsWith(PROGRESS_FILE_PREFIX)
+        || !metaFileName.endsWith(PROGRESS_META_FILE_SUFFIX)) {
       return null;
     }
+    return metaFileName.substring(
+        PROGRESS_FILE_PREFIX.length(), metaFileName.length() - 
PROGRESS_META_FILE_SUFFIX.length());
+  }
+
+  private void recoverTopicStatesIfNeeded(final CommitStateKey stateKey) {
+    recoverTopicStatesIfNeeded(stateKey.topicFileKey, 
stateKey.consumerGroupId, stateKey.topicName);
+  }
+
+  private void recoverTopicStatesIfNeeded(
+      final String topicFileKey, final String consumerGroupId, final String 
topicName) {
+    if (recoveredTopicKeys.contains(topicFileKey)) {
+      return;
+    }
+    synchronized (getTopicPersistLock(topicFileKey)) {
+      if (recoveredTopicKeys.contains(topicFileKey)) {
+        return;
+      }
+      try {
+        final TopicProgressSnapshot snapshot = 
readTopicProgressSnapshot(topicFileKey);
+        for (final Map.Entry<String, ConsensusSubscriptionCommitState> entry :
+            snapshot.states.entrySet()) {
+          final CommitStateKey recoveredStateKey =
+              getCommitStateKey(consumerGroupId, topicName, entry.getKey());
+          commitStates.putIfAbsent(recoveredStateKey.stateKey, 
entry.getValue());
+        }
+        topicProgressIndexes.put(topicFileKey, snapshot.indexEntries);
+      } catch (final IOException e) {
+        LOGGER.warn(
+            "Failed to recover consensus subscription progress for 
consumerGroupId={}, "
+                + "topicName={}",
+            consumerGroupId,
+            topicName,
+            e);
+        topicProgressIndexes.put(topicFileKey, Collections.emptyMap());
+      } finally {
+        recoveredTopicKeys.add(topicFileKey);
+      }
+    }
+  }
+
+  // Recovery reads .meta sequentially and rebuilds the runtime offset index.
+  private TopicProgressSnapshot readTopicProgressSnapshot(final String 
topicFileKey)
+      throws IOException {
+    final File metaFile = getMetaFile(topicFileKey);
+    if (!metaFile.exists()) {
+      return TopicProgressSnapshot.empty();
+    }
+    try {
+      final ByteBuffer buffer = 
ByteBuffer.wrap(Files.readAllBytes(metaFile.toPath()));
+      final int version = ReadWriteIOUtils.readInt(buffer);
+      if (version != TOPIC_PROGRESS_FILE_VERSION) {
+        throw new IOException(
+            "Unsupported consensus subscription progress file version " + 
version);

Review Comment:
   i18n



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java:
##########
@@ -238,42 +225,136 @@ public List<SubscriptionCommitContext> commit(
     return allSuccessful;
   }
 
-  public void seek(
-      final ConsumerConfig consumerConfig, final String topicName, final short 
seekType) {
-    final String consumerGroupId = consumerConfig.getConsumerGroupId();
+  public int refreshInFlightEventLeases(
+      final ConsumerConfig consumerConfig,
+      final List<SubscriptionCommitContext> processorBufferedCommitContexts) {
+    if (Objects.isNull(processorBufferedCommitContexts)
+        || processorBufferedCommitContexts.isEmpty()) {
+      return 0;
+    }
 
-    final ConsensusSubscriptionBroker consensusBroker =
+    final String consumerGroupId = consumerConfig.getConsumerGroupId();
+    final String consumerId = consumerConfig.getConsumerId();
+    final ISubscriptionBroker pipeBroker = 
consumerGroupIdToPipeBroker.get(consumerGroupId);
+    final ISubscriptionBroker consensusBroker =
         consumerGroupIdToConsensusBroker.get(consumerGroupId);
-    if (Objects.nonNull(consensusBroker) && 
consensusBroker.hasQueue(topicName)) {
-      ensureConsensusSeekRuntimeAvailable(consumerGroupId, topicName, "seek");
-      if (seekType != PipeSubscribeSeekReq.SEEK_TO_BEGINNING
-          && seekType != PipeSubscribeSeekReq.SEEK_TO_END) {
-        final String errorMessage =
-            String.format(
-                "Subscription: consensus seek only supports beginning/end or 
topic progress, "
-                    + "consumerGroup=%s, topic=%s, seekType=%s",
-                consumerGroupId, topicName, seekType);
-        LOGGER.warn(errorMessage);
-        throw new SubscriptionException(errorMessage);
+
+    final List<SubscriptionCommitContext> pipeContexts = new ArrayList<>();
+    final List<SubscriptionCommitContext> consensusContexts = new 
ArrayList<>();
+    for (final SubscriptionCommitContext ctx : 
processorBufferedCommitContexts) {
+      if (Objects.isNull(ctx) || Objects.isNull(ctx.getTopicName())) {
+        continue;
       }
-      consensusBroker.seek(topicName, seekType);
+      if (Objects.nonNull(consensusBroker)
+          && 
ConsensusSubscriptionSetupHandler.isConsensusBasedTopic(ctx.getTopicName())) {
+        consensusContexts.add(ctx);
+      } else {
+        pipeContexts.add(ctx);
+      }
+    }
+
+    int refreshedCount = 0;
+    if (Objects.nonNull(pipeBroker) && !pipeContexts.isEmpty()) {
+      refreshedCount += pipeBroker.refreshInFlightEventLeases(consumerId, 
pipeContexts);
+    }
+    if (Objects.nonNull(consensusBroker) && !consensusContexts.isEmpty()) {
+      refreshedCount += consensusBroker.refreshInFlightEventLeases(consumerId, 
consensusContexts);
+    }
+    return refreshedCount;
+  }
+
+  public Map<String, TopicProgress> getConsensusCommittedProgressByTopic(
+      final ConsumerConfig consumerConfig,
+      final List<SubscriptionCommitContext> acceptedCommitContexts,
+      final boolean nack) {
+    if (nack || acceptedCommitContexts.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    final Map<String, Set<String>> regionIdsByTopic = new LinkedHashMap<>();
+    for (final SubscriptionCommitContext commitContext : 
acceptedCommitContexts) {
+      if (Objects.isNull(commitContext) || 
Objects.isNull(commitContext.getTopicName())) {
+        continue;
+      }
+      final String topicName = commitContext.getTopicName();
+      if (!ConsensusSubscriptionSetupHandler.isConsensusBasedTopic(topicName)) 
{
+        continue;
+      }
+      final String regionId = commitContext.getRegionId();
+      if (Objects.isNull(regionId) || regionId.isEmpty()) {
+        continue;
+      }
+      regionIdsByTopic.computeIfAbsent(topicName, ignored -> new 
LinkedHashSet<>()).add(regionId);
+    }
+
+    final String consumerGroupId = consumerConfig.getConsumerGroupId();
+    final Map<String, TopicProgress> result = new LinkedHashMap<>();
+    for (final Map.Entry<String, Set<String>> entry : 
regionIdsByTopic.entrySet()) {
+      final String topicName = entry.getKey();
+      final Map<String, RegionProgress> regionProgressById = new 
LinkedHashMap<>();
+      for (final String regionId : entry.getValue()) {
+        putRegionProgress(
+            regionProgressById,
+            regionId,
+            getConsensusCommittedRegionProgress(consumerGroupId, topicName, 
regionId));
+      }
+      if (!regionProgressById.isEmpty()) {
+        result.put(topicName, new TopicProgress(regionProgressById));
+      }
+    }
+    return result;
+  }
+
+  private RegionProgress getConsensusCommittedRegionProgress(
+      final String consumerGroupId, final String topicName, final String 
regionId) {
+    try {
+      return ConsensusSubscriptionCommitManager.getInstance()
+          .getCommittedRegionProgress(
+              consumerGroupId, topicName, 
ConsensusGroupId.Factory.createFromString(regionId));
+    } catch (final IllegalArgumentException e) {
+      LOGGER.warn(
+          "Subscription: failed to parse consensus region id {} for committed 
progress, topic={}, consumerGroup={}",
+          regionId,
+          topicName,
+          consumerGroupId,
+          e);
+      return null;
+    }
+  }
+
+  private void putRegionProgress(
+      final Map<String, RegionProgress> regionProgressById,
+      final String regionId,
+      final RegionProgress regionProgress) {
+    if (Objects.isNull(regionId)
+        || regionId.isEmpty()
+        || Objects.isNull(regionProgress)
+        || regionProgress.getWriterPositions().isEmpty()) {
       return;
     }
+    regionProgressById.put(regionId, regionProgress);
+  }
 
-    if (isConsensusRuntimeUnsupported(topicName)) {
+  public void seek(
+      final ConsumerConfig consumerConfig, final String topicName, final short 
seekType) {
+    final String consumerGroupId = consumerConfig.getConsumerGroupId();
+
+    if (seekType != SubscriptionSeekReq.SEEK_TO_BEGINNING
+        && seekType != SubscriptionSeekReq.SEEK_TO_END) {
       final String errorMessage =
-          buildUnsupportedConsensusRuntimeMessage(consumerGroupId, topicName, 
"seek");
+          String.format(
+              "Subscription: consensus seek only supports beginning/end or 
topic progress, "
+                  + "consumerGroup=%s, topic=%s, seekType=%s",
+              consumerGroupId, topicName, seekType);
       LOGGER.warn(errorMessage);
       throw new SubscriptionException(errorMessage);
     }
 
-    final String errorMessage =
-        String.format(
-            "Subscription: seek is only supported for consensus-based 
subscriptions, "
-                + "consumerGroup=%s, topic=%s",
-            consumerGroupId, topicName);
-    LOGGER.warn(errorMessage);
-    throw new SubscriptionException(errorMessage);
+    final ConsensusSubscriptionBroker consensusBroker =
+        getConsensusBrokerForSeekOrNoOp(consumerGroupId, topicName, "seek");
+    if (Objects.nonNull(consensusBroker)) {
+      consensusBroker.seek(topicName, seekType);
+    }
   }

Review Comment:
   Is it possible to implement the logic (accepting or rejecting) inside each 
broker.
   Then the agent does not need to know which broker it is any more.



##########
iotdb-client/subscription/src/test/java/org/apache/iotdb/session/subscription/consumer/base/ColumnAlignProcessorTest.java:
##########
@@ -80,6 +82,73 @@ public void testNewColumnDoesNotReuseOldIndexCache() {
     Assert.assertTrue(schemaChangedTablet.getBitMaps()[1].isMarked(0));
   }
 
+  @Test
+  public void testTableModelForwardFillDoesNotCrossDevices() {
+    final ColumnAlignProcessor processor = new ColumnAlignProcessor();
+
+    processor.process(
+        Collections.singletonList(
+            messageForTablet(
+                tableModelTablet(new long[] {1L}, new String[] {"id1"}, new 
Long[] {11L}, false))));
+
+    final Tablet differentDeviceTablet =
+        tableModelTablet(new long[] {2L}, new String[] {"id2"}, new Long[] 
{0L}, true);
+
+    
processor.process(Collections.singletonList(messageForTablet(differentDeviceTablet)));
+
+    Assert.assertTrue(differentDeviceTablet.getBitMaps()[1].isMarked(0));
+
+    final Tablet sameDeviceTablet =
+        tableModelTablet(new long[] {3L}, new String[] {"id1"}, new Long[] 
{0L}, true);
+
+    
processor.process(Collections.singletonList(messageForTablet(sameDeviceTablet)));
+
+    Assert.assertEquals(11L, ((long[]) sameDeviceTablet.getValues()[1])[0]);
+    Assert.assertFalse(sameDeviceTablet.getBitMaps()[1].isMarked(0));
+  }
+
+  @Test
+  public void testForwardFillOnlyUsesEarlierTimestamp() {
+    final ColumnAlignProcessor processor = new ColumnAlignProcessor();
+
+    processor.process(
+        Collections.singletonList(
+            messageForTablet(tabletWithSingleRow(new String[] {"s1"}, new 
long[] {100L}, 10L))));
+
+    final Tablet olderNullTablet =
+        tabletWithSingleRow(new String[] {"s1"}, new long[] {0L}, 5L, true);
+
+    
processor.process(Collections.singletonList(messageForTablet(olderNullTablet)));
+
+    Assert.assertTrue(olderNullTablet.getBitMaps()[0].isMarked(0));
+
+    processor.process(
+        Collections.singletonList(
+            messageForTablet(tabletWithSingleRow(new String[] {"s1"}, new 
long[] {50L}, 6L))));
+
+    final Tablet newerNullTablet =
+        tabletWithSingleRow(new String[] {"s1"}, new long[] {0L}, 11L, true);
+
+    
processor.process(Collections.singletonList(messageForTablet(newerNullTablet)));
+
+    Assert.assertEquals(100L, ((long[]) newerNullTablet.getValues()[0])[0]);
+    Assert.assertFalse(newerNullTablet.getBitMaps()[0].isMarked(0));
+  }
+
+  @Test
+  public void testTableModelWithoutDeviceIdentityDoesNotForwardFill() {
+    final ColumnAlignProcessor processor = new ColumnAlignProcessor();
+
+    processor.process(
+        
Collections.singletonList(messageForTablet(fieldOnlyTableModelTablet(1L, 11L, 
false))));
+
+    final Tablet ambiguousTablet = fieldOnlyTableModelTablet(2L, 0L, true);
+
+    
processor.process(Collections.singletonList(messageForTablet(ambiguousTablet)));
+
+    Assert.assertTrue(ambiguousTablet.getBitMaps()[0].isMarked(0));
+  }

Review Comment:
   The no-tag device (or all-tag-null device) is also a specific device that 
should be filled.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusSubscriptionCommitManager.java:
##########
@@ -146,16 +165,20 @@ private ConsensusSubscriptionCommitManager() {
    */
   public ConsensusSubscriptionCommitState getOrCreateState(
       final String consumerGroupId, final String topicName, final 
ConsensusGroupId regionId) {
-    final String key = generateKey(consumerGroupId, topicName, regionId);
     final String regionIdString = regionId.toString();
+    final CommitStateKey stateKey = getCommitStateKey(consumerGroupId, 
topicName, regionIdString);
+    final ConsensusSubscriptionCommitState existing = 
commitStates.get(stateKey.stateKey);

Review Comment:
   `stateKey.stateKey` is weird, rename one of them.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusSubscriptionCommitManager.java:
##########
@@ -110,17 +122,24 @@ public class ConsensusSubscriptionCommitManager {
 
   /** Single-threaded executor for fire-and-forget broadcasts. */
   private final ExecutorService broadcastExecutor =
-      Executors.newSingleThreadExecutor(
-          r -> {
-            final Thread t = new Thread(r, "SubscriptionProgressBroadcast");
-            t.setDaemon(true);
-            return t;
-          });
+      IoTDBThreadPoolFactory.newSingleThreadExecutor(
+          ThreadName.SUBSCRIPTION_CONSENSUS_PROGRESS_BROADCASTER.getName());
 
   /** Key: "consumerGroupId##topicName##regionId" -> progress tracking state */
   private final Map<String, ConsensusSubscriptionCommitState> commitStates =
       new ConcurrentHashMap<>();
 
+  private final Map<String, CommitStateKey> commitStateKeys = new 
ConcurrentHashMap<>();
+
+  private final Set<String> recoveredTopicKeys = ConcurrentHashMap.newKeySet();
+
+  private final Map<String, Object> topicPersistLocks = new 
ConcurrentHashMap<>();
+
+  // Runtime random-write index. The on-disk .meta file is the only source of 
truth; this map is
+  // rebuilt from .meta during recovery and is never persisted separately.
+  private final Map<String, Map<String, ProgressIndexEntry>> 
topicProgressIndexes =
+      new ConcurrentHashMap<>();
+

Review Comment:
   Exaplain the key of `commitStateKeys` and the second key of 
`topicProgressIndexes`.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusPrefetchingQueue.java:
##########
@@ -1129,7 +1125,40 @@ public SubscriptionEvent pollTablets(
                 "ConsensusPrefetchingQueue %s: no in-flight event for consumer 
%s, commit context %s",
                 this, consumerId, commitContext));
       }
-      return event;
+      if (Objects.isNull(event.getCurrentResponse())
+          || event.getCurrentResponse().getResponseType()
+              != SubscriptionPollResponseType.TABLETS.getType()
+          || !(event.getCurrentResponse().getPayload() instanceof 
TabletsPayload)) {
+        return generateErrorResponse(
+            String.format(
+                "ConsensusPrefetchingQueue %s: unexpected in-flight response 
for consumer %s, "
+                    + "commit context %s, offset %s",

Review Comment:
   i18n?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusLogToTabletConverter.java:
##########
@@ -0,0 +1,606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.broker.consensus;
+
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
+import 
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowsNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
+
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.regex.Pattern;
+
+/** Converts IoTConsensus WAL log entries (InsertNode) to Tablet format for 
subscription. */
+public class ConsensusLogToTabletConverter {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsensusLogToTabletConverter.class);
+
+  private final TreePattern treePattern;
+  private final TablePattern tablePattern;
+  private final Pattern tableColumnPattern;
+
+  /**
+   * The actual database name of the DataRegion this converter processes 
(table-model format without
+   * "root." prefix). Null for tree-model topics.
+   */
+  private final String databaseName;
+
+  public ConsensusLogToTabletConverter(
+      final TreePattern treePattern,
+      final TablePattern tablePattern,
+      final Pattern tableColumnPattern,
+      final String databaseName) {
+    this.treePattern = treePattern;
+    this.tablePattern = tablePattern;
+    this.tableColumnPattern = tableColumnPattern;
+    this.databaseName = databaseName;
+  }
+
+  public String getDatabaseName() {
+    return databaseName;
+  }
+
+  static String safeDeviceIdForLog(final InsertNode node) {
+    try {
+      final Object deviceId = node.getDeviceID();
+      return deviceId != null ? deviceId.toString() : "null";
+    } catch (final Exception e) {
+      return "N/A(" + node.getType() + ")";
+    }
+  }
+
+  public List<Tablet> convert(final InsertNode insertNode) {
+    if (Objects.isNull(insertNode)) {
+      return Collections.emptyList();
+    }
+
+    final PlanNodeType nodeType = insertNode.getType();
+    if (nodeType == null) {
+      LOGGER.warn("InsertNode type is null, skipping conversion");
+      return Collections.emptyList();
+    }
+
+    LOGGER.debug(
+        "ConsensusLogToTabletConverter: converting InsertNode type={}, 
deviceId={}",
+        nodeType,
+        safeDeviceIdForLog(insertNode));
+
+    switch (nodeType) {
+      case INSERT_ROW:
+        return convertInsertRowNode((InsertRowNode) insertNode);
+      case INSERT_TABLET:
+        return convertInsertTabletNode((InsertTabletNode) insertNode);
+      case INSERT_ROWS:
+        return convertInsertRowsNode((InsertRowsNode) insertNode);
+      case INSERT_ROWS_OF_ONE_DEVICE:
+        return convertInsertRowsOfOneDeviceNode((InsertRowsOfOneDeviceNode) 
insertNode);
+      case INSERT_MULTI_TABLET:
+        return convertInsertMultiTabletsNode((InsertMultiTabletsNode) 
insertNode);
+      case RELATIONAL_INSERT_ROW:
+        return convertRelationalInsertRowNode((RelationalInsertRowNode) 
insertNode);
+      case RELATIONAL_INSERT_TABLET:
+        return convertRelationalInsertTabletNode((RelationalInsertTabletNode) 
insertNode);
+      case RELATIONAL_INSERT_ROWS:
+        return convertRelationalInsertRowsNode((RelationalInsertRowsNode) 
insertNode);
+      default:
+        LOGGER.debug("Unsupported InsertNode type for subscription: {}", 
nodeType);
+        return Collections.emptyList();
+    }
+  }
+
+  // ======================== Tree Model Conversion ========================
+
+  private List<Tablet> convertInsertRowNode(final InsertRowNode node) {
+    final IDeviceID deviceId = node.getDeviceID();
+
+    // Device-level path filtering
+    if (treePattern != null && !treePattern.mayOverlapWithDevice(deviceId)) {
+      return Collections.emptyList();
+    }
+
+    final long time = node.getTime();
+
+    // Determine which columns match the pattern
+    final String[] measurements = node.getMeasurements();
+    final TSDataType[] dataTypes = node.getDataTypes();
+    final Object[] values = node.getValues();
+    final List<Integer> matchedColumnIndices = 
getMatchedTreeColumnIndices(deviceId, measurements);
+
+    if (matchedColumnIndices.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    // Build Tablet with matched columns
+    final int columnCount = matchedColumnIndices.size();
+    final List<IMeasurementSchema> schemas = new ArrayList<>(columnCount);
+    for (final int colIdx : matchedColumnIndices) {
+      schemas.add(new MeasurementSchema(measurements[colIdx], 
dataTypes[colIdx]));
+    }
+
+    final Tablet tablet = new Tablet(deviceId.toString(), schemas, 1 /* 
maxRowNumber */);
+    tablet.addTimestamp(0, time);
+
+    for (int i = 0; i < columnCount; i++) {
+      final int originalColIdx = matchedColumnIndices.get(i);
+      final Object value = values[originalColIdx];
+      if (value == null) {
+        if (tablet.getBitMaps() == null) {
+          tablet.initBitMaps();
+        }
+        tablet.getBitMaps()[i].mark(0);
+      } else {
+        addValueToTablet(tablet, 0, i, dataTypes[originalColIdx], value);
+      }
+    }
+    tablet.setRowSize(1);
+
+    return Collections.singletonList(tablet);
+  }
+
+  private List<Tablet> convertInsertTabletNode(final InsertTabletNode node) {
+    if (node instanceof RelationalInsertTabletNode) {
+      return convertRelationalInsertTabletNode((RelationalInsertTabletNode) 
node);
+    }
+
+    final IDeviceID deviceId = node.getDeviceID();
+
+    // Device-level path filtering
+    if (treePattern != null && !treePattern.mayOverlapWithDevice(deviceId)) {
+      return Collections.emptyList();
+    }
+
+    final String[] measurements = node.getMeasurements();
+    final TSDataType[] dataTypes = node.getDataTypes();
+    final long[] times = node.getTimes();
+    final Object[] columns = node.getColumns();
+    final BitMap[] bitMaps = node.getBitMaps();
+    final int rowCount = node.getRowCount();
+
+    // Column filtering
+    final List<Integer> matchedColumnIndices = 
getMatchedTreeColumnIndices(deviceId, measurements);
+    if (matchedColumnIndices.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    final int columnCount = matchedColumnIndices.size();
+    final boolean allColumnsMatch = (columnCount == measurements.length);
+
+    // Build schemas (always needed)
+    final List<IMeasurementSchema> schemas = new ArrayList<>(columnCount);
+    for (final int colIdx : matchedColumnIndices) {
+      schemas.add(new MeasurementSchema(measurements[colIdx], 
dataTypes[colIdx]));
+    }
+
+    // Build column arrays and bitmaps using bulk copy
+    final long[] newTimes = Arrays.copyOf(times, rowCount);
+    final Object[] newColumns = new Object[columnCount];
+    final BitMap[] newBitMaps = new BitMap[columnCount];
+
+    for (int i = 0; i < columnCount; i++) {
+      final int originalColIdx = allColumnsMatch ? i : 
matchedColumnIndices.get(i);
+      newColumns[i] = copyColumnArray(dataTypes[originalColIdx], 
columns[originalColIdx], rowCount);
+      if (bitMaps != null && bitMaps[originalColIdx] != null) {
+        newBitMaps[i] = new BitMap(rowCount);
+        BitMap.copyOfRange(bitMaps[originalColIdx], 0, newBitMaps[i], 0, 
rowCount);
+      }
+    }

Review Comment:
   This will consume considerable memory bandwidth when multiple subscriptions 
are present.
   We would better figure out which paths may modify the content of a tablet 
and use a copy-on-write mechanism.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to