Caideyipi commented on code in PR #17238:
URL: https://github.com/apache/iotdb/pull/17238#discussion_r2928803499


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusPrefetchingQueue.java:
##########
@@ -0,0 +1,1420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.broker.consensus;
+
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IoTConsensusRequest;
+import org.apache.iotdb.consensus.iot.IoTConsensusServerImpl;
+import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
+import org.apache.iotdb.db.storageengine.dataregion.wal.node.WALNode;
+import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
+import org.apache.iotdb.rpc.subscription.payload.poll.ErrorPayload;
+import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
+import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
+import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload;
+
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.record.Tablet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext.INVALID_COMMIT_ID;
+
+/**
+ * A prefetching queue that reads data from IoTConsensus using a hybrid 
approach:
+ *
+ * <ol>
+ *   <li><b>In-memory pending queue</b>: Registered with {@link 
IoTConsensusServerImpl}, receives
+ *       {@link IndexedConsensusRequest} in real-time from the write path 
(same mechanism as
+ *       LogDispatcher). This avoids waiting for WAL flush to disk.
+ *   <li><b>WAL fallback</b>: Uses {@link ConsensusReqReader.ReqIterator} to 
read from WAL files for
+ *       gap-filling (pending queue overflow) or catch-up scenarios.
+ *   <li><b>WAL pinning</b>: Supplies the earliest outstanding (uncommitted) 
search index to {@link
+ *       IoTConsensusServerImpl}, preventing WAL deletion of entries not yet 
consumed by the
+ *       subscription.
+ * </ol>
+ *
+ * <p>A background prefetch thread continuously drains the pending queue, 
converts InsertNode
+ * entries to Tablets via {@link ConsensusLogToTabletConverter}, and enqueues 
{@link
+ * SubscriptionEvent} objects into the prefetchingQueue for consumer polling.
+ *
+ * <p>This design mirrors LogDispatcher's dual-path (pendingEntries + WAL 
reader) but targets
+ * subscription delivery instead of replication.
+ *
+ * <p>Thread safety: Uses a fair {@link ReentrantReadWriteLock} to ensure 
mutual exclusion between
+ * cleanup and other operations (poll, ack, nack), consistent with the 
existing prefetching queue
+ * design.
+ */
+public class ConsensusPrefetchingQueue {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsensusPrefetchingQueue.class);
+
+  private final String brokerId; // consumer group id
+  private final String topicName;
+  private final String consensusGroupId;
+
+  private final IoTConsensusServerImpl serverImpl;
+
+  private final ConsensusReqReader consensusReqReader;
+
+  private volatile ConsensusReqReader.ReqIterator reqIterator;
+
+  /**
+   * In-memory pending queue registered with {@link 
IoTConsensusServerImpl#write}. Receives
+   * IndexedConsensusRequest in real-time without waiting for WAL flush. 
Capacity is bounded to
+   * apply back-pressure; overflows are filled from WAL.
+   */
+  private final BlockingQueue<IndexedConsensusRequest> pendingEntries;
+
+  private static final int PENDING_QUEUE_CAPACITY = 4096;
+
+  private final ConsensusLogToTabletConverter converter;
+
+  private final ConsensusSubscriptionCommitManager commitManager;
+
+  /** Commit ID generator, monotonically increasing within this queue's 
lifetime. */
+  private final AtomicLong commitIdGenerator;
+
+  /**
+   * Commit IDs less than or equal to this threshold are considered outdated. 
Updated on creation
+   * and on seek to invalidate all pre-seek events.
+   */
+  private volatile long outdatedCommitIdThreshold;
+
+  private final AtomicLong nextExpectedSearchIndex;
+
+  private final PriorityBlockingQueue<SubscriptionEvent> prefetchingQueue;
+
+  /**
+   * Tracks in-flight events that have been polled but not yet committed. Key: 
(consumerId,
+   * commitContext) -> event.
+   */
+  private final Map<Pair<String, SubscriptionCommitContext>, 
SubscriptionEvent> inFlightEvents;
+
+  /**
+   * Tracks outstanding (uncommitted) events for WAL pinning. Maps commitId to 
the startSearchIndex
+   * of that event batch. The earliest entry's value is supplied to 
IoTConsensusServerImpl to pin
+   * WAL files from deletion.
+   */
+  private final ConcurrentSkipListMap<Long, Long> 
outstandingCommitIdToStartIndex;

Review Comment:
   Why it is named as "outstanding"......



##########
iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumer.java:
##########
@@ -195,6 +219,35 @@ protected List<SubscriptionMessage> poll(final Set<String> 
topicNames, final lon
       return messages;
     }
 
+    // Apply processor chain if configured
+    List<SubscriptionMessage> processed = messages;
+    if (!processors.isEmpty()) {
+      for (final SubscriptionMessageProcessor processor : processors) {
+        processed = processor.process(processed);
+      }
+    }
+
+    // Update watermark timestamp before stripping watermark events
+    for (final SubscriptionMessage m : processed) {
+      if (m.getMessageType() == SubscriptionMessageType.WATERMARK.getType()) {
+        final long ts = m.getWatermarkTimestamp();
+        if (ts > latestWatermarkTimestamp) {
+          latestWatermarkTimestamp = ts;
+        }
+      }
+    }
+
+    // Strip system messages — they are only for processors, not for users
+    processed.removeIf(
+        m -> {
+          final short type = m.getMessageType();
+          return type == SubscriptionMessageType.WATERMARK.getType();
+        });
+
+    if (processed.isEmpty()) {
+      return processed;
+    }
+
     // add to uncommitted messages

Review Comment:
   `SubscriptionMessageProcessor` can buffer messages locally (for example 
`WatermarkProcessor`), but only the post-
     processor output is tracked/committed here. The server still considers the 
buffered events in-flight and can recycle/
     re-deliver them after the uncommitted timeout. That means the same event 
can enter the processor more than once, which
     breaks ordered-consumption correctness. We probably need a lease/heartbeat 
mechanism for processor-buffered events, or
     another way to keep them pinned until the processor releases them.



##########
iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumer.java:
##########
@@ -135,6 +141,24 @@ public synchronized void close() {
       return;
     }
 
+    // flush all processors and commit any remaining buffered messages
+    if (!processors.isEmpty()) {
+      final List<SubscriptionMessage> flushed = new ArrayList<>();
+      for (final SubscriptionMessageProcessor processor : processors) {
+        final List<SubscriptionMessage> out = processor.flush();
+        if (out != null) {
+          flushed.addAll(out);
+        }
+      }
+      if (!flushed.isEmpty() && autoCommit) {

Review Comment:
   On close we flush processor buffers, but we only commit the flushed messages 
when `autoCommit == true`. In manual-
     commit mode those flushed messages are neither returned to the caller nor 
preserved for redelivery, so they can be
     silently lost on close. We should either reject close with buffered 
processor state in manual mode, return the flushed
     messages to the caller, or keep them recoverable.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java:
##########
@@ -669,22 +700,90 @@ private TPipeSubscribeResp 
handlePipeSubscribeCommitInternal(final PipeSubscribe
 
     if (Objects.equals(successfulCommitContexts.size(), 
commitContexts.size())) {
       LOGGER.info(
-          "Subscription: consumer {} commit (nack: {}) successfully, commit 
contexts: {}",
+          "Subscription: consumer {} commit (nack: {}) successfully, summary: 
{}",
           consumerConfig,
           nack,
-          commitContexts);
+          summarizeCommitContexts(commitContexts));
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug(
+            "Subscription: consumer {} commit (nack: {}) full commit contexts: 
{}",
+            consumerConfig,
+            nack,
+            commitContexts);
+      }
     } else {
       LOGGER.warn(
-          "Subscription: consumer {} commit (nack: {}) partially successful, 
commit contexts: {}, successful commit contexts: {}",
+          "Subscription: consumer {} commit (nack: {}) partially successful, 
requested summary: {}, successful summary: {}",
           consumerConfig,
           nack,
-          commitContexts,
-          successfulCommitContexts);
+          summarizeCommitContexts(commitContexts),
+          summarizeCommitContexts(successfulCommitContexts));
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug(
+            "Subscription: consumer {} commit (nack: {}) full requested commit 
contexts: {}, full successful commit contexts: {}",
+            consumerConfig,
+            nack,
+            commitContexts,
+            successfulCommitContexts);
+      }
     }
 
     return 
PipeSubscribeCommitResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS);

Review Comment:
   `successfulCommitContexts` is already computed above, but the RPC still 
returns a generic success status even when
     only part of the batch was actually committed. The client advances its 
local committed positions after any successful
     RPC, so this can move the client checkpoint ahead of the real committed 
frontier and cause skipped data after
     recovery/seek. We should either return the successful subset to the client 
or fail the RPC when the batch is only
     partially committed.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java:
##########
@@ -109,89 +201,319 @@ public List<SubscriptionCommitContext> commit(
       final List<SubscriptionCommitContext> commitContexts,
       final boolean nack) {
     final String consumerGroupId = consumerConfig.getConsumerGroupId();
-    final SubscriptionBroker broker = 
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
-    if (Objects.isNull(broker)) {
+    final String consumerId = consumerConfig.getConsumerId();
+    final List<SubscriptionCommitContext> allSuccessful = new ArrayList<>();
+
+    final SubscriptionBroker pipeBroker = 
consumerGroupIdToPipeBroker.get(consumerGroupId);
+    final ConsensusSubscriptionBroker consensusBroker =
+        consumerGroupIdToConsensusBroker.get(consumerGroupId);
+
+    if (Objects.isNull(pipeBroker) && Objects.isNull(consensusBroker)) {
+      final String errorMessage =
+          String.format("Subscription: no broker bound to consumer group 
[%s]", consumerGroupId);
+      LOGGER.warn(errorMessage);
+      throw new SubscriptionException(errorMessage);
+    }
+
+    // Partition commit contexts by which broker owns the topic.
+    final List<SubscriptionCommitContext> pipeContexts = new ArrayList<>();
+    final List<SubscriptionCommitContext> consensusContexts = new 
ArrayList<>();
+    for (final SubscriptionCommitContext ctx : commitContexts) {
+      final String topicName = ctx.getTopicName();
+      if (Objects.nonNull(consensusBroker)
+          && 
ConsensusSubscriptionSetupHandler.isConsensusBasedTopic(topicName)) {
+        consensusContexts.add(ctx);
+      } else {
+        pipeContexts.add(ctx);
+      }
+    }
+
+    if (Objects.nonNull(pipeBroker) && !pipeContexts.isEmpty()) {
+      allSuccessful.addAll(pipeBroker.commit(consumerId, pipeContexts, nack));
+    }
+    if (Objects.nonNull(consensusBroker) && !consensusContexts.isEmpty()) {
+      allSuccessful.addAll(consensusBroker.commit(consumerId, 
consensusContexts, nack));
+    }
+
+    return allSuccessful;
+  }
+
+  public void seek(
+      final ConsumerConfig consumerConfig, final String topicName, final short 
seekType) {
+    final String consumerGroupId = consumerConfig.getConsumerGroupId();
+

Review Comment:
   `seek/seekAfter` currently requires every available provider to succeed, but 
on the server side a DataNode only
     handles the request if it actually has a local consensus queue for this 
topic; otherwise it returns an error. In a
     multi-DataNode deployment this makes the operation fail even when some 
providers simply do not own any relevant local
     queue. The client/server contract needs to be aligned here: either route 
the request centrally, or treat non-owning
     nodes as no-op success.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to