DanielWang2035 commented on code in PR #17238:
URL: https://github.com/apache/iotdb/pull/17238#discussion_r3279185039


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusSubscriptionCommitManager.java:
##########
@@ -0,0 +1,1253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.broker.consensus;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.consensus.ConfigRegionId;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
+import org.apache.iotdb.confignode.rpc.thrift.TGetCommitProgressReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetCommitProgressResp;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
+import org.apache.iotdb.mpp.rpc.thrift.TSyncSubscriptionProgressReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.rpc.subscription.payload.poll.RegionProgress;
+import org.apache.iotdb.rpc.subscription.payload.poll.WriterId;
+import org.apache.iotdb.rpc.subscription.payload.poll.WriterProgress;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Manages committed progress for consensus-based subscriptions.
+ *
+ * <p>State is maintained per {@code (consumerGroup, topic, region)} so each 
DataRegion can recover
+ * independently.
+ *
+ * <p>Committed progress is represented in per-writer terms via {@link 
WriterId} and {@link
+ * WriterProgress}. Outstanding deliveries are tracked by writer-local slots, 
while commit
+ * advancement is computed with ordered progress keys derived from {@code 
physicalTime}, {@code
+ * writerNodeId}, and {@code localSeq}. {@code searchIndex} is not the 
committed frontier here; it
+ * only remains an implementation aid for WAL positioning elsewhere.
+ *
+ * <p>Key responsibilities:
+ *
+ * <ul>
+ *   <li>Track dispatched but uncommitted mappings per writer
+ *   <li>Advance committed progress idempotently and contiguously on ack/commit
+ *   <li>Persist, recover, and broadcast committed region progress
+ * </ul>
+ */
+public class ConsensusSubscriptionCommitManager {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(ConsensusSubscriptionCommitManager.class);
+
+  private static final String PROGRESS_FILE_PREFIX = 
"consensus_subscription_progress_";
+  private static final String PROGRESS_FILE_SUFFIX = ".dat";
+
+  private static final IClientManager<ConfigRegionId, ConfigNodeClient> 
CONFIG_NODE_CLIENT_MANAGER =
+      ConfigNodeClientManager.getInstance();
+
+  /** Client manager for DataNode-to-DataNode RPC (progress broadcast). */
+  private static final IClientManager<TEndPoint, 
SyncDataNodeInternalServiceClient>
+      SYNC_DN_CLIENT_MANAGER =
+          new IClientManager.Factory<TEndPoint, 
SyncDataNodeInternalServiceClient>()
+              .createClientManager(
+                  new 
ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+
+  /** Minimum interval (ms) between broadcasts for the same (consumerGroup, 
topic, region). */
+  private static final long MIN_BROADCAST_INTERVAL_MS = 5000;
+
+  /** Rate-limiting: last broadcast timestamp per key. */
+  private final Map<String, Long> lastBroadcastTime = new 
ConcurrentHashMap<>();
+
+  /** Single-threaded executor for fire-and-forget broadcasts. */
+  private final ExecutorService broadcastExecutor =
+      Executors.newSingleThreadExecutor(
+          r -> {
+            final Thread t = new Thread(r, "SubscriptionProgressBroadcast");
+            t.setDaemon(true);
+            return t;
+          });
+
+  /** Key: "consumerGroupId##topicName##regionId" -> progress tracking state */
+  private final Map<String, ConsensusSubscriptionCommitState> commitStates =
+      new ConcurrentHashMap<>();
+
+  private final String persistDir;
+
+  private ConsensusSubscriptionCommitManager() {
+    this.persistDir =
+        IoTDBDescriptor.getInstance().getConfig().getSystemDir()
+            + File.separator
+            + "subscription"
+            + File.separator
+            + "consensus_progress";
+    final File dir = new File(persistDir);
+    if (!dir.exists()) {
+      dir.mkdirs();
+    }
+  }
+
+  /**
+   * Gets or creates the commit state for a specific (consumerGroup, topic, 
region) triple.
+   *
+   * @param consumerGroupId the consumer group ID
+   * @param topicName the topic name
+   * @param regionId the consensus group / data region ID
+   * @return the commit state
+   */
+  public ConsensusSubscriptionCommitState getOrCreateState(
+      final String consumerGroupId, final String topicName, final 
ConsensusGroupId regionId) {
+    final String key = generateKey(consumerGroupId, topicName, regionId);
+    final String regionIdString = regionId.toString();
+    return commitStates.computeIfAbsent(
+        key,
+        k -> {
+          // Try to recover from persisted local state
+          final ConsensusSubscriptionCommitState recovered = tryRecover(key, 
regionIdString);
+          if (recovered != null) {
+            return recovered;
+          }
+          final ConsensusSubscriptionCommitState recoveredFromConfigNode =
+              queryCommitProgressStateFromConfigNode(consumerGroupId, 
topicName, regionId);
+          if (Objects.nonNull(recoveredFromConfigNode)) {
+            return recoveredFromConfigNode;
+          }
+          return new ConsensusSubscriptionCommitState(
+              regionIdString, new SubscriptionConsensusProgress());
+        });
+  }
+
+  public boolean hasPersistedState(
+      final String consumerGroupId, final String topicName, final 
ConsensusGroupId regionId) {
+    return getProgressFile(generateKey(consumerGroupId, topicName, 
regionId)).exists();
+  }
+
+  public void recordMapping(
+      final String consumerGroupId,
+      final String topicName,
+      final ConsensusGroupId regionId,
+      final WriterId writerId,
+      final WriterProgress writerProgress) {
+    final ConsensusSubscriptionCommitState state =
+        getOrCreateState(consumerGroupId, topicName, regionId);
+    state.recordMapping(writerId, writerProgress);
+  }
+
+  public boolean commit(
+      final String consumerGroupId,
+      final String topicName,
+      final ConsensusGroupId regionId,
+      final WriterId writerId,
+      final WriterProgress writerProgress) {
+    final String key = generateKey(consumerGroupId, topicName, regionId);
+    final ConsensusSubscriptionCommitState state = commitStates.get(key);
+    if (state == null) {
+      LOGGER.warn(
+          "ConsensusSubscriptionCommitManager: Cannot commit for unknown 
state, "
+              + "consumerGroupId={}, topicName={}, regionId={}, writerId={}, 
writerProgress={}",
+          consumerGroupId,
+          topicName,
+          regionId,
+          writerId,
+          writerProgress);
+      return false;
+    }
+    final CommitOperationResult result = state.commitAndGetResult(writerId, 
writerProgress);
+    if (result.isHandled()) {
+      // Periodically persist progress
+      persistProgressIfNeeded(key, state);
+      if (result.hasAdvancedWriter()) {
+        maybeBroadcast(
+            key,
+            consumerGroupId,
+            topicName,
+            regionId,
+            result.getAdvancedWriterProgress(),
+            result.getAdvancedWriterId());
+      }
+    }
+    return result.isHandled();
+  }
+
+  public boolean commitWithoutOutstanding(
+      final String consumerGroupId,
+      final String topicName,
+      final ConsensusGroupId regionId,
+      final WriterId writerId,
+      final WriterProgress writerProgress) {

Review Comment:
   I kept this unbatched for now. The current path already rate-limits 
persistence and broadcasts, and local persistence can update unchanged-size 
region payloads in place, so batching would add coordination complexity without 
a clear benefit yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to