DanielWang2035 commented on code in PR #17238:
URL: https://github.com/apache/iotdb/pull/17238#discussion_r3279162731


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusPrefetchingQueue.java:
##########
@@ -0,0 +1,3443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.broker.consensus;
+
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode;
+import 
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.commons.request.IConsensusRequest;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IoTConsensusRequest;
+import org.apache.iotdb.consensus.iot.IoTConsensusServerImpl;
+import org.apache.iotdb.consensus.iot.SubscriptionWalRetentionPolicy;
+import org.apache.iotdb.consensus.iot.WriterSafeFrontierTracker;
+import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
+import org.apache.iotdb.db.storageengine.dataregion.wal.io.ProgressWALReader;
+import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALMetaData;
+import org.apache.iotdb.db.storageengine.dataregion.wal.node.WALNode;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils;
+import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
+import 
org.apache.iotdb.db.subscription.metric.ConsensusSubscriptionPrefetchingQueueMetrics;
+import 
org.apache.iotdb.db.subscription.task.execution.ConsensusSubscriptionPrefetchExecutor;
+import 
org.apache.iotdb.db.subscription.task.execution.ConsensusSubscriptionPrefetchExecutorManager;
+import org.apache.iotdb.db.subscription.task.subtask.ConsensusPrefetchSubtask;
+import org.apache.iotdb.rpc.subscription.config.TopicConfig;
+import org.apache.iotdb.rpc.subscription.config.TopicConstant;
+import org.apache.iotdb.rpc.subscription.payload.poll.ErrorPayload;
+import org.apache.iotdb.rpc.subscription.payload.poll.RegionProgress;
+import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
+import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
+import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload;
+import org.apache.iotdb.rpc.subscription.payload.poll.WatermarkPayload;
+import org.apache.iotdb.rpc.subscription.payload.poll.WriterId;
+import org.apache.iotdb.rpc.subscription.payload.poll.WriterProgress;
+
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.record.Tablet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import static 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext.INVALID_COMMIT_ID;
+
+public class ConsensusPrefetchingQueue {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsensusPrefetchingQueue.class);
+
+  private final String brokerId; // consumer group id
+  private final String topicName;
+  private final ConsensusGroupId consensusGroupId;
+
+  private final IoTConsensusServerImpl serverImpl;
+
+  private final ConsensusReqReader consensusReqReader;
+
+  private final SubscriptionWalRetentionPolicy retentionPolicy;
+
+  private final WakeableIndexedConsensusQueue pendingEntries;
+
+  private static final int PENDING_QUEUE_CAPACITY = 4096;
+
+  private final ConsensusLogToTabletConverter converter;
+
+  private final ConsensusSubscriptionCommitManager commitManager;
+
+  private final AtomicLong seekGeneration;
+
+  /** Internal WAL reader cursor used only for local replay positioning and 
deduplication. */
+  private final AtomicLong nextExpectedSearchIndex;
+
+  private final PriorityBlockingQueue<SubscriptionEvent> prefetchingQueue;
+
+  private final Map<Pair<String, SubscriptionCommitContext>, 
SubscriptionEvent> inFlightEvents;
+
+  private static final int MAX_PREFETCHING_QUEUE_SIZE =
+      
SubscriptionConfig.getInstance().getSubscriptionConsensusPrefetchingQueueCapacity();
+
+  private final AtomicLong walGapSkippedEntries = new AtomicLong(0);
+
+  /** Guards queue state transitions that touch replay positioning, seek 
state, and lane buffers. */
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+  private volatile boolean isClosed = false;
+
+  private volatile boolean closeRequested = false;
+
+  private volatile boolean isActive = true;
+
+  private volatile Set<Integer> activeWriterNodeIds = Collections.emptySet();
+
+  private volatile Set<Integer> runtimeActiveWriterNodeIds = 
Collections.emptySet();
+
+  private volatile int preferredWriterNodeId = -1;
+
+  private volatile int previousPreferredWriterNodeId = -1;
+
+  // ======================== Routing Runtime Version ========================
+
+  private volatile long runtimeVersion = 0;
+
+  private final AtomicLong runtimeVersionChangeCount = new AtomicLong(0);
+
+  // ======================== Unified WAL / Release State 
========================
+
+  private volatile ProgressWALIterator subscriptionWALIterator;
+
+  /**
+   * Seek requests must not close/reset the WAL iterator from RPC threads 
because the prefetch
+   * worker may be reading it concurrently. Instead, seek only records the 
latest desired reset and
+   * the queue's next prefetch round applies it after observing the new seek 
generation.
+   */
+  private volatile long pendingSubscriptionWalResetSearchIndex = 
Long.MIN_VALUE;
+
+  private volatile long pendingSubscriptionWalResetGeneration = Long.MIN_VALUE;
+
+  // ======================== Watermark ========================
+
+  /** Maximum data timestamp observed across all InsertNodes processed by this 
queue. */
+  private volatile long maxObservedTimestamp = Long.MIN_VALUE;
+
+  /** Wall-clock time (ms) of last watermark injection. 0 means never 
injected. */
+  private volatile long lastWatermarkEmitTimeMs = 0;
+
+  /** Number of entries accepted from realtime pending queue. */
+  private final AtomicLong pendingPathAcceptedEntries = new AtomicLong(0);
+
+  /** Number of entries accepted from WAL-backed paths (historical or 
catch-up). */
+  private final AtomicLong walPathAcceptedEntries = new AtomicLong(0);
+
+  private final Object prefetchBindingLock = new Object();
+
+  private volatile ConsensusPrefetchSubtask prefetchSubtask;
+
+  private volatile ConsensusSubscriptionPrefetchExecutor prefetchExecutor;
+
+  /**
+   * Whether the prefetch runtime has been initialized. Starts as false 
(dormant). Set to true on
+   * the first poll with a region progress hint or when a seek installs a 
pending reset. This keeps
+   * queue creation cheap: realtime entries can be buffered immediately while 
WAL replay state is
+   * only built once the queue is actually activated.
+   */
+  private volatile boolean prefetchInitialized = false;
+
+  private volatile PendingSeekRequest pendingSeekRequest;
+
+  private final DeliveryBatchState lingerBatch = new DeliveryBatchState();
+
+  private volatile long observedSeekGeneration;
+
+  private volatile long lastStatsLogTimeMs = System.currentTimeMillis();
+
+  private volatile long lastPendingAcceptedEntries = 0L;
+
+  private volatile long lastWalAcceptedEntries = 0L;
+
+  private volatile boolean pendingWalGapRetryRequested = false;
+
+  private volatile long walGapWaitStartTimeMs = 0L;
+
+  private volatile long lastWalGapWaitLogTimeMs = 0L;
+
+  /** Fallback committed region progress from local persisted state. */
+  private final RegionProgress fallbackCommittedRegionProgress;
+
+  /** Recovery-time per-writer frontiers used to skip already committed 
entries after restart. */
+  private final Map<WriterId, WriterProgress> recoveryWriterProgressByWriter =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Source-level dedup frontier for follower-origin entries that do not carry 
a local searchIndex.
+   * The same request may first arrive through pendingEntries and later become 
visible from WAL;
+   * once a follower-origin localSeq has already been materialized into queue 
state, the WAL path
+   * must not materialize it again.
+   */
+  private final Map<WriterLaneId, Long> materializedFollowerProgressByWriter =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Lane state keyed by writer identity. Release gating reasons in terms of 
writer lanes and safe
+   * frontiers instead of a region-level committed frontier.
+   */
+  private final Map<WriterLaneId, WriterLaneState> writerLanes = new 
ConcurrentHashMap<>();
+
+  /**
+   * Realtime lane buffers used by both pending replay and WAL catch-up so 
queue materialization
+   * converges on the same per-writer lane representation before batch 
delivery.
+   */
+  private final Map<WriterLaneId, NavigableMap<Long, PreparedEntry>> 
realtimeEntriesByLane =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Local tail position used only when initialization starts without any 
persisted region progress.
+   */
+  private final long fallbackTailSearchIndex;
+
+  /** Local sequence used to represent the position immediately before a 
writer's first record. */
+  private static final long BEFORE_FIRST_LOCAL_SEQ = -1L;
+
+  /** Writer-progress metadata for the current pending/WAL batch being 
assembled. */
+  private volatile long batchPhysicalTime = 0L;
+
+  private volatile int batchWriterNodeId = -1;
+  private volatile String orderMode = TopicConstant.ORDER_MODE_DEFAULT_VALUE;
+
+  protected enum ReplayLocateStatus {
+    FOUND,
+    AT_END,
+    LOCATE_MISS
+  }
+
+  protected static final class ReplayLocateDecision {
+    private final ReplayLocateStatus status;
+    private final long startSearchIndex;
+    private final RegionProgress recoveryRegionProgress;
+    private final String detail;
+
+    private ReplayLocateDecision(
+        final ReplayLocateStatus status,
+        final long startSearchIndex,
+        final RegionProgress recoveryRegionProgress,
+        final String detail) {
+      this.status = status;
+      this.startSearchIndex = startSearchIndex;
+      this.recoveryRegionProgress = recoveryRegionProgress;
+      this.detail = detail;
+    }
+
+    static ReplayLocateDecision found(
+        final long startSearchIndex,
+        final RegionProgress recoveryRegionProgress,
+        final String detail) {
+      return new ReplayLocateDecision(
+          ReplayLocateStatus.FOUND, startSearchIndex, recoveryRegionProgress, 
detail);
+    }
+
+    static ReplayLocateDecision atEnd(
+        final long startSearchIndex,
+        final RegionProgress recoveryRegionProgress,
+        final String detail) {
+      return new ReplayLocateDecision(
+          ReplayLocateStatus.AT_END, startSearchIndex, recoveryRegionProgress, 
detail);
+    }
+
+    static ReplayLocateDecision locateMiss(
+        final RegionProgress recoveryRegionProgress, final String detail) {
+      return new ReplayLocateDecision(
+          ReplayLocateStatus.LOCATE_MISS, Long.MIN_VALUE, 
recoveryRegionProgress, detail);
+    }
+
+    protected ReplayLocateStatus getStatus() {
+      return status;
+    }
+
+    protected long getStartSearchIndex() {
+      return startSearchIndex;
+    }
+
+    protected RegionProgress getRecoveryRegionProgress() {
+      return recoveryRegionProgress;
+    }
+
+    protected String getDetail() {
+      return detail;
+    }
+  }
+
+  private static final class WakeableIndexedConsensusQueue
+      extends LinkedBlockingDeque<IndexedConsensusRequest> {
+
+    private final Runnable wakeupHook;
+
+    private WakeableIndexedConsensusQueue(final int capacity, final Runnable 
wakeupHook) {
+      super(capacity);
+      this.wakeupHook = wakeupHook;
+    }
+
+    @Override
+    public boolean offer(final IndexedConsensusRequest request) {
+      final boolean offered = super.offer(request);
+      if (offered) {
+        wakeupHook.run();
+      }
+      return offered;
+    }
+
+    @Override
+    public void put(final IndexedConsensusRequest request) throws 
InterruptedException {
+      super.put(request);
+      wakeupHook.run();
+    }
+  }
+
+  private static final class PendingSeekRequest {
+
+    private final long targetSearchIndex;
+    private final RegionProgress committedRegionProgress;
+    private final String seekReason;
+    private final boolean previousPrefetchInitialized;
+    private final long previousSeekGeneration;
+    private final long targetSeekGeneration;
+
+    private boolean completed = false;
+    private RuntimeException failure;
+
+    private PendingSeekRequest(
+        final long targetSearchIndex,
+        final RegionProgress committedRegionProgress,
+        final String seekReason,
+        final boolean previousPrefetchInitialized,
+        final long previousSeekGeneration,
+        final long targetSeekGeneration) {
+      this.targetSearchIndex = targetSearchIndex;
+      this.committedRegionProgress = committedRegionProgress;
+      this.seekReason = seekReason;
+      this.previousPrefetchInitialized = previousPrefetchInitialized;
+      this.previousSeekGeneration = previousSeekGeneration;
+      this.targetSeekGeneration = targetSeekGeneration;
+    }
+
+    private synchronized void complete() {
+      completed = true;
+      notifyAll();
+    }
+
+    private synchronized void fail(final RuntimeException failure) {
+      this.failure = failure;
+      completed = true;
+      notifyAll();
+    }
+
+    private synchronized void awaitCompletion() {
+      while (!completed) {
+        try {
+          wait(50L);
+        } catch (final InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException("Interrupted while waiting for seek 
application", e);
+        }
+      }
+      if (failure != null) {
+        throw failure;
+      }
+    }
+  }
+
+  public ConsensusPrefetchingQueue(
+      final String brokerId,
+      final String topicName,
+      final String orderMode,
+      final ConsensusGroupId consensusGroupId,
+      final IoTConsensusServerImpl serverImpl,
+      final SubscriptionWalRetentionPolicy retentionPolicy,
+      final ConsensusLogToTabletConverter converter,
+      final ConsensusSubscriptionCommitManager commitManager,
+      final RegionProgress fallbackCommittedRegionProgress,
+      final long tailStartSearchIndex,
+      final long initialRuntimeVersion,
+      final boolean initialActive) {
+    this.brokerId = brokerId;
+    this.topicName = topicName;
+    this.consensusGroupId = consensusGroupId;
+    this.serverImpl = serverImpl;
+    this.consensusReqReader = serverImpl.getConsensusReqReader();
+    this.retentionPolicy = retentionPolicy;
+    this.converter = converter;
+    this.commitManager = commitManager;
+    this.fallbackCommittedRegionProgress = fallbackCommittedRegionProgress;
+    this.fallbackTailSearchIndex = tailStartSearchIndex;
+    this.runtimeVersion = initialRuntimeVersion;
+    this.isActive = initialActive;
+    this.orderMode = TopicConfig.normalizeOrderMode(orderMode);
+
+    this.seekGeneration = new AtomicLong(0);
+    this.nextExpectedSearchIndex = new AtomicLong(tailStartSearchIndex);
+
+    this.prefetchingQueue = new PriorityBlockingQueue<>();
+    this.inFlightEvents = new ConcurrentHashMap<>();
+    this.observedSeekGeneration = seekGeneration.get();
+
+    // Register pending queue early so we don't miss real-time writes
+    this.pendingEntries =
+        new WakeableIndexedConsensusQueue(PENDING_QUEUE_CAPACITY, 
this::requestPrefetch);
+    serverImpl.registerSubscriptionQueue(pendingEntries, retentionPolicy);
+
+    LOGGER.info(
+        "ConsensusPrefetchingQueue created (dormant): brokerId={}, 
topicName={}, "
+            + "orderMode={}, consensusGroupId={}, 
fallbackCommittedRegionProgress={}, "
+            + "fallbackTailSearchIndex={}, initialRuntimeVersion={}, 
initialActive={}",
+        brokerId,
+        topicName,
+        this.orderMode,
+        consensusGroupId,
+        fallbackCommittedRegionProgress,
+        tailStartSearchIndex,
+        initialRuntimeVersion,
+        initialActive);
+
+    // Register metrics
+    ConsensusSubscriptionPrefetchingQueueMetrics.getInstance().register(this);
+  }
+
+  // ======================== Lock Operations ========================
+
+  private void acquireReadLock() {
+    lock.readLock().lock();
+  }
+
+  private void releaseReadLock() {
+    lock.readLock().unlock();
+  }
+
+  private void acquireWriteLock() {
+    lock.writeLock().lock();
+  }
+
+  private void releaseWriteLock() {
+    lock.writeLock().unlock();
+  }
+
+  private void requestPrefetch() {
+    if (closeRequested || isClosed) {
+      return;
+    }
+    final ConsensusPrefetchSubtask subtask = ensurePrefetchSubtaskBound();
+    if (Objects.nonNull(subtask)) {
+      subtask.requestWakeupNow();
+    }
+  }
+
+  private ConsensusPrefetchSubtask ensurePrefetchSubtaskBound() {
+    if (closeRequested || isClosed) {
+      return null;
+    }
+
+    final ConsensusSubscriptionPrefetchExecutor currentExecutor =
+        
ConsensusSubscriptionPrefetchExecutorManager.getInstance().getExecutor();
+    if (Objects.isNull(currentExecutor)) {
+      return null;
+    }
+
+    final ConsensusPrefetchSubtask currentSubtask = prefetchSubtask;
+    if (Objects.nonNull(currentSubtask)
+        && prefetchExecutor == currentExecutor
+        && !currentSubtask.isClosed()) {
+      return currentSubtask;
+    }
+
+    synchronized (prefetchBindingLock) {
+      if (closeRequested || isClosed) {
+        return null;
+      }
+
+      if (Objects.nonNull(prefetchSubtask)
+          && prefetchExecutor == currentExecutor
+          && !prefetchSubtask.isClosed()) {
+        return prefetchSubtask;
+      }
+
+      final ConsensusPrefetchSubtask staleSubtask = prefetchSubtask;
+      final ConsensusSubscriptionPrefetchExecutor staleExecutor = 
prefetchExecutor;
+      if (Objects.nonNull(staleSubtask)
+          && Objects.nonNull(staleExecutor)
+          && (staleExecutor != currentExecutor || staleSubtask.isClosed())
+          && !staleExecutor.isShutdown()) {
+        staleExecutor.deregister(staleSubtask.getTaskId());
+      }
+
+      final ConsensusPrefetchSubtask newSubtask = new 
ConsensusPrefetchSubtask(this);
+      if (!currentExecutor.register(newSubtask)) {
+        return null;
+      }
+      prefetchExecutor = currentExecutor;
+      prefetchSubtask = newSubtask;
+      return newSubtask;
+    }
+  }
+
+  private Pair<ConsensusSubscriptionPrefetchExecutor, ConsensusPrefetchSubtask>
+      detachPrefetchSubtask() {
+    synchronized (prefetchBindingLock) {
+      final Pair<ConsensusSubscriptionPrefetchExecutor, 
ConsensusPrefetchSubtask> detached =
+          new Pair<>(prefetchExecutor, prefetchSubtask);
+      prefetchExecutor = null;
+      prefetchSubtask = null;
+      return detached;
+    }
+  }
+
+  private boolean shouldRecoverPrefetchBindingAfterEmptyPoll() {
+    if (!prefetchInitialized || isClosed || closeRequested || 
pendingSeekRequest != null) {
+      return false;
+    }
+
+    final ConsensusSubscriptionPrefetchExecutor currentExecutor =
+        
ConsensusSubscriptionPrefetchExecutorManager.getInstance().getExecutor();
+    if (Objects.isNull(currentExecutor)) {
+      return false;
+    }
+
+    final ConsensusPrefetchSubtask currentSubtask = prefetchSubtask;
+    final boolean bindingMissing =
+        Objects.isNull(currentSubtask)
+            || currentSubtask.isClosed()
+            || Objects.isNull(prefetchExecutor)
+            || prefetchExecutor.isShutdown()
+            || prefetchExecutor != currentExecutor;
+    if (!bindingMissing) {
+      return false;
+    }
+
+    return hasImmediatePrefetchableWork()
+        || hasHistoricalWalLag()
+        || !lingerBatch.isEmpty()
+        || !inFlightEvents.isEmpty()
+        || computeWatermarkDelayMs() > 0L;
+  }
+
+  // ======================== Poll ========================
+
+  public SubscriptionEvent poll(final String consumerId) {
+    return poll(consumerId, null);
+  }
+
+  public SubscriptionEvent poll(final String consumerId, final RegionProgress 
regionProgress) {
+    acquireReadLock();
+    try {
+      if (isClosed || closeRequested || !isActive) {
+        return null;
+      }
+      if (!prefetchInitialized) {
+        initPrefetch(regionProgress);
+      }
+      if (pendingSeekRequest != null) {
+        return null;
+      }
+      final SubscriptionEvent event = pollInternal(consumerId);
+      if (Objects.nonNull(event) && prefetchingQueue.size() < 
MAX_PREFETCHING_QUEUE_SIZE) {
+        requestPrefetch();
+      } else if (Objects.isNull(event) && 
shouldRecoverPrefetchBindingAfterEmptyPoll()) {
+        requestPrefetch();
+      }
+      return event;
+    } finally {
+      releaseReadLock();
+    }
+  }
+
+  private synchronized void initPrefetch(final RegionProgress regionProgress) {
+    if (prefetchInitialized) {
+      return; // double-check under synchronization
+    }
+
+    final RegionProgress committedRegionProgress = 
resolveCommittedRegionProgressForInit();
+    final boolean useConsumerHint =
+        shouldUseConsumerRegionProgressHint(regionProgress, 
committedRegionProgress);
+    final RegionProgress recoveryRegionProgress =
+        useConsumerHint
+            ? mergeRecoveryRegionProgress(committedRegionProgress, 
regionProgress)
+            : committedRegionProgress;
+    final String progressSource =
+        useConsumerHint
+            ? Objects.nonNull(committedRegionProgress)
+                    && !committedRegionProgress.getWriterPositions().isEmpty()
+                ? "merged committed region progress with consumer topic 
progress hint"
+                : "consumer topic progress hint"
+            : "committed region progress fallback";
+    final ReplayLocateDecision resolvedStart =
+        resolveInitReplayStartDecision(recoveryRegionProgress, progressSource);
+
+    clearRecoveryWriterProgress();
+    final RegionProgress effectiveRecoveryRegionProgress =
+        resolvedStart.getRecoveryRegionProgress();
+    if (Objects.nonNull(effectiveRecoveryRegionProgress)
+        && !effectiveRecoveryRegionProgress.getWriterPositions().isEmpty()) {
+      installRecoveryWriterProgress(effectiveRecoveryRegionProgress);
+    }
+
+    this.nextExpectedSearchIndex.set(resolvedStart.getStartSearchIndex());
+    if (consensusReqReader instanceof WALNode) {
+      this.subscriptionWALIterator =
+          new ProgressWALIterator(
+              (WALNode) consensusReqReader, 
resolvedStart.getStartSearchIndex());
+    }
+    this.prefetchInitialized = true;
+    this.observedSeekGeneration = seekGeneration.get();
+    this.lingerBatch.reset();
+    resetBatchWriterProgress();
+
+    LOGGER.info(
+        "ConsensusPrefetchingQueue {}: prefetch initialized, 
startSearchIndex={}, progressSource={}, recoveryWriterCount={}",
+        this,
+        resolvedStart.getStartSearchIndex(),
+        resolvedStart.getDetail(),
+        recoveryWriterProgressByWriter.size());
+
+    requestPrefetch();
+  }
+
+  private ReplayLocateDecision resolveInitReplayStartDecision(
+      final RegionProgress recoveryRegionProgress, final String 
progressSource) {
+    if (Objects.isNull(recoveryRegionProgress)
+        || recoveryRegionProgress.getWriterPositions().isEmpty()) {
+      return ReplayLocateDecision.found(
+          fallbackTailSearchIndex,
+          new RegionProgress(Collections.emptyMap()),
+          progressSource + " (tail start without progress)");
+    }
+    if (!(consensusReqReader instanceof WALNode)) {
+      throw new IllegalStateException(
+          String.format(
+              "ConsensusPrefetchingQueue %s: cannot recover from non-empty 
region progress without WAL access: %s",
+              this, recoveryRegionProgress));
+    }
+
+    final ReplayLocateDecision replayTarget =
+        locateReplayStartForRegionProgress(recoveryRegionProgress, true);
+    switch (replayTarget.getStatus()) {
+      case FOUND:
+      case AT_END:
+        return new ReplayLocateDecision(
+            replayTarget.getStatus(),
+            replayTarget.getStartSearchIndex(),
+            replayTarget.getRecoveryRegionProgress(),
+            progressSource + " (" + replayTarget.getDetail() + ")");
+      case LOCATE_MISS:
+      default:
+        throw new IllegalStateException(
+            String.format(
+                "ConsensusPrefetchingQueue %s: cannot initialize replay start 
from region progress %s: %s",
+                this, recoveryRegionProgress, replayTarget.getDetail()));
+    }
+  }
+
+  private boolean shouldUseConsumerRegionProgressHint(
+      final RegionProgress regionProgress, final RegionProgress 
committedRegionProgress) {
+    if (Objects.isNull(regionProgress) || 
regionProgress.getWriterPositions().isEmpty()) {
+      return false;
+    }
+    if (Objects.isNull(committedRegionProgress)
+        || committedRegionProgress.getWriterPositions().isEmpty()) {
+      return true;
+    }
+    for (final Map.Entry<WriterId, WriterProgress> entry :
+        regionProgress.getWriterPositions().entrySet()) {
+      if (Objects.isNull(entry.getKey()) || Objects.isNull(entry.getValue())) {
+        continue;
+      }
+      final WriterProgress committedWriterProgress =
+          committedRegionProgress.getWriterPositions().get(entry.getKey());
+      if (Objects.isNull(committedWriterProgress)
+          || compareWriterProgress(entry.getValue(), committedWriterProgress) 
> 0) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private RegionProgress mergeRecoveryRegionProgress(
+      final RegionProgress committedRegionProgress, final RegionProgress 
consumerRegionProgress) {
+    if (Objects.isNull(committedRegionProgress)
+        || committedRegionProgress.getWriterPositions().isEmpty()) {
+      return consumerRegionProgress;
+    }
+    if (Objects.isNull(consumerRegionProgress)
+        || consumerRegionProgress.getWriterPositions().isEmpty()) {
+      return committedRegionProgress;
+    }
+
+    final Map<WriterId, WriterProgress> mergedWriterProgress = new 
LinkedHashMap<>();
+    committedRegionProgress
+        .getWriterPositions()
+        .forEach(
+            (writerId, writerProgress) -> {
+              if (Objects.nonNull(writerId) && 
Objects.nonNull(writerProgress)) {
+                mergedWriterProgress.put(writerId, writerProgress);
+              }
+            });
+    consumerRegionProgress
+        .getWriterPositions()
+        .forEach(
+            (writerId, writerProgress) -> {
+              if (Objects.isNull(writerId) || Objects.isNull(writerProgress)) {
+                return;
+              }
+              mergedWriterProgress.merge(
+                  writerId,
+                  writerProgress,
+                  (committedWriterProgress, consumerWriterProgress) ->
+                      compareWriterProgress(consumerWriterProgress, 
committedWriterProgress) > 0
+                          ? consumerWriterProgress
+                          : committedWriterProgress);
+            });
+    return new RegionProgress(mergedWriterProgress);
+  }
+
+  protected RegionProgress resolveCommittedRegionProgressForInit() {
+    commitManager.getOrCreateState(brokerId, topicName, consensusGroupId);
+    final RegionProgress latestCommittedRegionProgress =
+        commitManager.getCommittedRegionProgress(brokerId, topicName, 
consensusGroupId);
+    if (Objects.nonNull(latestCommittedRegionProgress)
+        && !latestCommittedRegionProgress.getWriterPositions().isEmpty()) {
+      return latestCommittedRegionProgress;
+    }
+    return Objects.nonNull(fallbackCommittedRegionProgress)
+            && !fallbackCommittedRegionProgress.getWriterPositions().isEmpty()
+        ? fallbackCommittedRegionProgress
+        : null;
+  }
+
+  private void installRecoveryWriterProgress(final RegionProgress 
regionProgress) {
+    recoveryWriterProgressByWriter.clear();
+    recoveryWriterProgressByWriter.putAll(regionProgress.getWriterPositions());
+    regionProgress
+        .getWriterPositions()
+        .keySet()
+        .forEach(writerId -> trackWriterLane(writerId.getNodeId()));
+  }
+
+  private void clearRecoveryWriterProgress() {
+    recoveryWriterProgressByWriter.clear();
+  }
+
+  private boolean shouldSkipForRecoveryProgress(final IndexedConsensusRequest 
request) {
+    if (recoveryWriterProgressByWriter.isEmpty()) {
+      return false;
+    }
+    return isRequestCoveredByRegionProgress(request, 
recoveryWriterProgressByWriter, true);
+  }
+
+  private boolean hasComparableWriterProgress(final IndexedConsensusRequest 
request) {
+    return request.getNodeId() >= 0
+        && request.getPhysicalTime() > 0
+        && request.getProgressLocalSeq() >= 0;
+  }
+
+  private WriterId toWriterId(final IndexedConsensusRequest request) {
+    return new WriterId(consensusGroupId.toString(), request.getNodeId());
+  }
+
+  private WriterProgress toWriterProgress(final IndexedConsensusRequest 
request) {
+    return new WriterProgress(request.getPhysicalTime(), 
request.getProgressLocalSeq());
+  }
+
+  private boolean isRequestCoveredByRegionProgress(
+      final IndexedConsensusRequest request,
+      final Map<WriterId, WriterProgress> regionProgressByWriter,
+      final boolean seekAfter) {
+    if (!hasComparableWriterProgress(request)) {
+      return false;
+    }
+    final WriterProgress committedProgress = 
regionProgressByWriter.get(toWriterId(request));
+    if (Objects.isNull(committedProgress)) {
+      return false;
+    }
+    final int cmp = compareWriterProgress(toWriterProgress(request), 
committedProgress);
+    return seekAfter ? cmp <= 0 : cmp < 0;
+  }
+
+  private WriterProgress decrementWriterProgress(final WriterProgress 
writerProgress) {
+    return new WriterProgress(
+        writerProgress.getPhysicalTime(),
+        writerProgress.getLocalSeq() > 0L
+            ? writerProgress.getLocalSeq() - 1L
+            : BEFORE_FIRST_LOCAL_SEQ);
+  }
+
+  protected ReplayLocateDecision scanReplayStartForRequests(
+      final Iterable<IndexedConsensusRequest> requests,
+      final RegionProgress regionProgress,
+      final boolean seekAfter) {
+    final Map<WriterId, WriterProgress> requestedWriterProgress = new 
LinkedHashMap<>();
+    if (Objects.nonNull(regionProgress)) {
+      regionProgress
+          .getWriterPositions()
+          .forEach(
+              (writerId, writerProgress) -> {
+                if (Objects.nonNull(writerId) && 
Objects.nonNull(writerProgress)) {
+                  requestedWriterProgress.put(writerId, writerProgress);
+                }
+              });
+    }
+    final Map<WriterId, WriterProgress> effectiveRecoveryWriterProgress =
+        new LinkedHashMap<>(requestedWriterProgress);
+    final Set<WriterId> exactVisibleWriterIds = new LinkedHashSet<>();
+    Long firstUncoveredReplayableSearchIndex = null;
+    boolean sawBlockingNonReplayableUncovered = false;
+
+    for (final IndexedConsensusRequest request : requests) {
+      if (!hasComparableWriterProgress(request)) {
+        continue;
+      }
+
+      final WriterId writerId = toWriterId(request);
+      final WriterProgress requestProgress = toWriterProgress(request);
+      final WriterProgress storedWriterProgress = 
requestedWriterProgress.get(writerId);
+      if (!seekAfter
+          && Objects.nonNull(storedWriterProgress)
+          && compareWriterProgress(requestProgress, storedWriterProgress) == 
0) {
+        exactVisibleWriterIds.add(writerId);
+      }
+
+      if (isRequestCoveredByRegionProgress(request, requestedWriterProgress, 
seekAfter)) {
+        continue;
+      }
+
+      if (request.getSearchIndex() >= 0) {
+        if (Objects.isNull(firstUncoveredReplayableSearchIndex)) {
+          firstUncoveredReplayableSearchIndex = request.getSearchIndex();
+        }
+      } else if (Objects.isNull(firstUncoveredReplayableSearchIndex)) {
+        sawBlockingNonReplayableUncovered = true;
+      }
+    }
+
+    if (!seekAfter && !exactVisibleWriterIds.isEmpty()) {
+      for (final WriterId writerId : exactVisibleWriterIds) {
+        final WriterProgress writerProgress = 
requestedWriterProgress.get(writerId);
+        if (Objects.nonNull(writerProgress)) {
+          effectiveRecoveryWriterProgress.put(writerId, 
decrementWriterProgress(writerProgress));
+        }
+      }
+    }
+    final RegionProgress effectiveRecoveryRegionProgress =
+        new RegionProgress(effectiveRecoveryWriterProgress);
+
+    if (sawBlockingNonReplayableUncovered) {
+      return ReplayLocateDecision.locateMiss(
+          effectiveRecoveryRegionProgress,
+          "uncovered non-replayable WAL records appear before the first local 
replayable record");
+    }
+    if (Objects.nonNull(firstUncoveredReplayableSearchIndex)) {
+      return ReplayLocateDecision.found(
+          firstUncoveredReplayableSearchIndex,
+          effectiveRecoveryRegionProgress,
+          "resolved first uncovered replayable WAL record");
+    }
+    return ReplayLocateDecision.atEnd(
+        consensusReqReader.getCurrentSearchIndex(),
+        computeTailRegionProgress(),
+        "all locally replayable WAL records are already covered");
+  }
+
+  protected ReplayLocateDecision locateReplayStartForRegionProgress(
+      final RegionProgress regionProgress, final boolean seekAfter) {
+    if (!(consensusReqReader instanceof WALNode)) {
+      return ReplayLocateDecision.locateMiss(
+          regionProgress, "WAL access is unavailable for region-level replay 
lookup");
+    }
+
+    final WALNode walNode = (WALNode) consensusReqReader;
+    final List<IndexedConsensusRequest> replayRequests = new ArrayList<>();
+    try (final ProgressWALIterator iterator = new ProgressWALIterator(walNode, 
Long.MIN_VALUE)) {
+      while (iterator.hasNext()) {
+        replayRequests.add(iterator.next());
+      }
+      if (iterator.hasIncompleteScan()) {
+        return ReplayLocateDecision.locateMiss(
+            regionProgress,
+            "replay lookup did not complete: " + 
iterator.getIncompleteScanDetail());
+      }
+      return scanReplayStartForRequests(replayRequests, regionProgress, 
seekAfter);
+    } catch (final IOException e) {
+      return ReplayLocateDecision.locateMiss(
+          regionProgress, "failed to close replay lookup iterator: " + 
e.getMessage());
+    }
+  }
+
+  private boolean shouldTrackFollowerProgressForDedup(final 
IndexedConsensusRequest request) {
+    return request.getSearchIndex() < 0
+        && request.getNodeId() >= 0
+        && request.getProgressLocalSeq() >= 0;
+  }
+
+  private boolean shouldSkipForMaterializedFollowerProgress(final 
IndexedConsensusRequest request) {
+    if (!shouldTrackFollowerProgressForDedup(request)) {
+      return false;
+    }
+    final Long materializedLocalSeq =
+        materializedFollowerProgressByWriter.get(new 
WriterLaneId(request.getNodeId()));
+    return Objects.nonNull(materializedLocalSeq)
+        && request.getProgressLocalSeq() <= materializedLocalSeq;
+  }
+
+  private void markMaterializedFollowerProgress(final IndexedConsensusRequest 
request) {
+    if (!shouldTrackFollowerProgressForDedup(request)) {
+      return;
+    }
+    materializedFollowerProgressByWriter.merge(
+        new WriterLaneId(request.getNodeId()), request.getProgressLocalSeq(), 
Math::max);
+  }
+
+  private int compareWriterProgress(
+      final WriterProgress leftProgress, final WriterProgress rightProgress) {
+    int cmp = Long.compare(leftProgress.getPhysicalTime(), 
rightProgress.getPhysicalTime());
+    if (cmp != 0) {
+      return cmp;
+    }
+    return Long.compare(leftProgress.getLocalSeq(), 
rightProgress.getLocalSeq());
+  }
+
+  private WriterLaneState trackWriterLane(final int writerNodeId) {
+    return writerLanes.computeIfAbsent(
+        new WriterLaneId(writerNodeId), ignored -> new WriterLaneState());
+  }
+
+  private void refreshWriterLaneSafeFrontiers() {
+    final Map<WriterSafeFrontierTracker.WriterIdentity, Long> safePts =
+        serverImpl.getWriterSafeFrontierTracker().snapshotEffectiveSafePts();
+    for (final Map.Entry<WriterSafeFrontierTracker.WriterIdentity, Long> entry 
:
+        safePts.entrySet()) {
+      final WriterLaneState laneState = 
trackWriterLane(entry.getKey().getWriterNodeId());
+      laneState.effectiveSafePt = Math.max(laneState.effectiveSafePt, 
entry.getValue());
+    }
+  }
+
+  private <T extends LaneBufferedEntry> PriorityQueue<LaneFrontier> 
buildLaneFrontiers(
+      final Map<WriterLaneId, ?> laneEntriesByLane, final 
Function<WriterLaneId, T> headSupplier) {
+    refreshWriterLaneSafeFrontiers();
+    final PriorityQueue<LaneFrontier> frontiers = new PriorityQueue<>();
+    final boolean useActiveWriterBarriers = shouldUseActiveWriterBarriers();
+    final Set<WriterLaneId> laneIds = ConcurrentHashMap.newKeySet();
+    final Set<Integer> seenActiveWriterNodeIds = ConcurrentHashMap.newKeySet();
+    laneIds.addAll(writerLanes.keySet());
+    laneIds.addAll(laneEntriesByLane.keySet());
+    for (final WriterLaneId laneId : laneIds) {
+      final WriterLaneState laneState = writerLanes.get(laneId);
+      if (Objects.nonNull(laneState) && laneState.closed) {
+        continue;
+      }
+      final T head = headSupplier.apply(laneId);
+      if (Objects.nonNull(head)) {
+        if (isLaneRuntimeActive(laneId)) {
+          seenActiveWriterNodeIds.add(laneId.writerNodeId);
+        }
+        frontiers.add(LaneFrontier.forHead(laneId, head));
+        continue;
+      }
+      if (Objects.nonNull(laneState)
+          && laneState.effectiveSafePt > 0
+          && useActiveWriterBarriers
+          && isLaneRuntimeActive(laneId)) {
+        seenActiveWriterNodeIds.add(laneId.writerNodeId);
+        frontiers.add(LaneFrontier.forBarrier(laneId, 
laneState.effectiveSafePt));
+      }
+    }
+    if (useActiveWriterBarriers) {
+      for (final Integer activeWriterNodeId : activeWriterNodeIds) {
+        if (!seenActiveWriterNodeIds.contains(activeWriterNodeId)) {
+          frontiers.add(
+              LaneFrontier.forBarrier(new WriterLaneId(activeWriterNodeId), 
Long.MIN_VALUE));
+          break;
+        }
+      }
+    }
+    return frontiers;
+  }
+
+  private boolean shouldUseActiveWriterBarriers() {
+    return !TopicConstant.ORDER_MODE_PER_WRITER_VALUE.equals(orderMode);
+  }
+
+  private void bufferRealtimeEntry(final PreparedEntry entry) {
+    final WriterLaneId laneId = new WriterLaneId(entry.writerNodeId);
+    realtimeEntriesByLane
+        .computeIfAbsent(laneId, ignored -> new TreeMap<>())
+        .put(entry.localSeq, entry);
+  }
+
+  private PreparedEntry peekRealtimeEntry(final WriterLaneId laneId) {
+    final NavigableMap<Long, PreparedEntry> laneEntries = 
realtimeEntriesByLane.get(laneId);
+    if (Objects.isNull(laneEntries) || laneEntries.isEmpty()) {
+      return null;
+    }
+    final Map.Entry<Long, PreparedEntry> firstEntry = laneEntries.firstEntry();
+    return Objects.nonNull(firstEntry) ? firstEntry.getValue() : null;
+  }
+
+  private void removeRealtimeEntry(final WriterLaneId laneId, final long 
localSeq) {
+    final NavigableMap<Long, PreparedEntry> laneEntries = 
realtimeEntriesByLane.get(laneId);
+    if (Objects.isNull(laneEntries)) {
+      return;
+    }
+    laneEntries.remove(localSeq);
+    if (laneEntries.isEmpty()) {
+      realtimeEntriesByLane.remove(laneId);
+    }
+  }
+
+  private PriorityQueue<LaneFrontier> buildRealtimeLaneFrontiers() {
+    return buildLaneFrontiers(realtimeEntriesByLane, this::peekRealtimeEntry);
+  }
+
+  private SubscriptionEvent pollInternal(final String consumerId) {
+    final long size = prefetchingQueue.size();
+    if (size == 0) {
+      LOGGER.debug(
+          "ConsensusPrefetchingQueue {}: prefetching queue is empty for 
consumerId={}, "
+              + "pendingEntriesSize={}, nextExpected={}, isClosed={}, 
prefetchInitialized={}, subtaskScheduled={}",
+          this,
+          consumerId,
+          pendingEntries.size(),
+          nextExpectedSearchIndex.get(),
+          isClosed,
+          prefetchInitialized,
+          Objects.nonNull(prefetchSubtask) && 
prefetchSubtask.isScheduledOrRunning());
+      return null;
+    }
+
+    LOGGER.debug(
+        "ConsensusPrefetchingQueue {}: polling, queue size={}, consumerId={}",
+        this,
+        size,
+        consumerId);
+    long count = 0;
+
+    SubscriptionEvent event;
+    try {
+      while (count++ < size
+          && Objects.nonNull(
+              event =
+                  prefetchingQueue.poll(
+                      
SubscriptionConfig.getInstance().getSubscriptionPollMaxBlockingTimeMs(),
+                      TimeUnit.MILLISECONDS))) {
+        // Metadata events (currently WATERMARK) are fire-and-forget:
+        // skip inFlightEvents tracking so they are not recycled and 
re-delivered indefinitely.
+        if (event.getCurrentResponse().getResponseType()
+            == SubscriptionPollResponseType.WATERMARK.getType()) {
+          return event;
+        }
+
+        if (event.isCommitted()) {
+          LOGGER.warn(
+              "ConsensusPrefetchingQueue {} poll committed event {} (broken 
invariant), remove it",
+              this,
+              event);
+          continue;
+        }
+
+        if (!event.pollable()) {
+          LOGGER.warn(
+              "ConsensusPrefetchingQueue {} poll non-pollable event {} (broken 
invariant), nack it",
+              this,
+              event);
+          event.nack();
+          continue;
+        }
+
+        // Mark as polled before updating inFlightEvents
+        event.recordLastPolledTimestamp();
+        inFlightEvents.put(new Pair<>(consumerId, event.getCommitContext()), 
event);
+        event.recordLastPolledConsumerId(consumerId);
+        return event;
+      }
+    } catch (final InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOGGER.warn("ConsensusPrefetchingQueue {} interrupted while polling", 
this, e);
+    }
+
+    return null;
+  }
+
+  public SubscriptionEvent pollTablets(
+      final String consumerId, final SubscriptionCommitContext commitContext, 
final int offset) {

Review Comment:
   Currently not supported for consensus subscription.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to