DanielWang2035 commented on code in PR #17238: URL: https://github.com/apache/iotdb/pull/17238#discussion_r3279186571
########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusSubscriptionCommitManager.java: ########## @@ -0,0 +1,1253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.subscription.broker.consensus; + +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.client.ClientPoolFactory; +import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.exception.ClientManagerException; +import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; +import org.apache.iotdb.commons.consensus.ConfigRegionId; +import org.apache.iotdb.commons.consensus.ConsensusGroupId; +import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; +import org.apache.iotdb.confignode.rpc.thrift.TGetCommitProgressReq; +import org.apache.iotdb.confignode.rpc.thrift.TGetCommitProgressResp; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.protocol.client.ConfigNodeClient; +import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; +import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; +import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher; +import org.apache.iotdb.mpp.rpc.thrift.TSyncSubscriptionProgressReq; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.rpc.subscription.payload.poll.RegionProgress; +import org.apache.iotdb.rpc.subscription.payload.poll.WriterId; +import org.apache.iotdb.rpc.subscription.payload.poll.WriterProgress; + +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Manages committed progress for consensus-based subscriptions. + * + * <p>State is maintained per {@code (consumerGroup, topic, region)} so each DataRegion can recover + * independently. + * + * <p>Committed progress is represented in per-writer terms via {@link WriterId} and {@link + * WriterProgress}. Outstanding deliveries are tracked by writer-local slots, while commit + * advancement is computed with ordered progress keys derived from {@code physicalTime}, {@code + * writerNodeId}, and {@code localSeq}. {@code searchIndex} is not the committed frontier here; it + * only remains an implementation aid for WAL positioning elsewhere. + * + * <p>Key responsibilities: + * + * <ul> + * <li>Track dispatched but uncommitted mappings per writer + * <li>Advance committed progress idempotently and contiguously on ack/commit + * <li>Persist, recover, and broadcast committed region progress + * </ul> + */ +public class ConsensusSubscriptionCommitManager { + + private static final Logger LOGGER = + LoggerFactory.getLogger(ConsensusSubscriptionCommitManager.class); + + private static final String PROGRESS_FILE_PREFIX = "consensus_subscription_progress_"; + private static final String PROGRESS_FILE_SUFFIX = ".dat"; + + private static final IClientManager<ConfigRegionId, ConfigNodeClient> CONFIG_NODE_CLIENT_MANAGER = + ConfigNodeClientManager.getInstance(); + + /** Client manager for DataNode-to-DataNode RPC (progress broadcast). */ + private static final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> + SYNC_DN_CLIENT_MANAGER = + new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>() + .createClientManager( + new ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory()); + + /** Minimum interval (ms) between broadcasts for the same (consumerGroup, topic, region). */ + private static final long MIN_BROADCAST_INTERVAL_MS = 5000; + + /** Rate-limiting: last broadcast timestamp per key. */ + private final Map<String, Long> lastBroadcastTime = new ConcurrentHashMap<>(); + + /** Single-threaded executor for fire-and-forget broadcasts. */ + private final ExecutorService broadcastExecutor = + Executors.newSingleThreadExecutor( + r -> { + final Thread t = new Thread(r, "SubscriptionProgressBroadcast"); + t.setDaemon(true); + return t; + }); + + /** Key: "consumerGroupId##topicName##regionId" -> progress tracking state */ + private final Map<String, ConsensusSubscriptionCommitState> commitStates = + new ConcurrentHashMap<>(); + + private final String persistDir; + + private ConsensusSubscriptionCommitManager() { + this.persistDir = + IoTDBDescriptor.getInstance().getConfig().getSystemDir() + + File.separator + + "subscription" + + File.separator + + "consensus_progress"; + final File dir = new File(persistDir); + if (!dir.exists()) { + dir.mkdirs(); + } + } + + /** + * Gets or creates the commit state for a specific (consumerGroup, topic, region) triple. + * + * @param consumerGroupId the consumer group ID + * @param topicName the topic name + * @param regionId the consensus group / data region ID + * @return the commit state + */ + public ConsensusSubscriptionCommitState getOrCreateState( + final String consumerGroupId, final String topicName, final ConsensusGroupId regionId) { + final String key = generateKey(consumerGroupId, topicName, regionId); + final String regionIdString = regionId.toString(); + return commitStates.computeIfAbsent( + key, + k -> { + // Try to recover from persisted local state + final ConsensusSubscriptionCommitState recovered = tryRecover(key, regionIdString); + if (recovered != null) { + return recovered; + } + final ConsensusSubscriptionCommitState recoveredFromConfigNode = + queryCommitProgressStateFromConfigNode(consumerGroupId, topicName, regionId); + if (Objects.nonNull(recoveredFromConfigNode)) { + return recoveredFromConfigNode; + } + return new ConsensusSubscriptionCommitState( + regionIdString, new SubscriptionConsensusProgress()); + }); + } + + public boolean hasPersistedState( + final String consumerGroupId, final String topicName, final ConsensusGroupId regionId) { + return getProgressFile(generateKey(consumerGroupId, topicName, regionId)).exists(); + } + + public void recordMapping( + final String consumerGroupId, + final String topicName, + final ConsensusGroupId regionId, + final WriterId writerId, + final WriterProgress writerProgress) { + final ConsensusSubscriptionCommitState state = + getOrCreateState(consumerGroupId, topicName, regionId); + state.recordMapping(writerId, writerProgress); + } + + public boolean commit( + final String consumerGroupId, + final String topicName, + final ConsensusGroupId regionId, + final WriterId writerId, + final WriterProgress writerProgress) { + final String key = generateKey(consumerGroupId, topicName, regionId); + final ConsensusSubscriptionCommitState state = commitStates.get(key); + if (state == null) { + LOGGER.warn( + "ConsensusSubscriptionCommitManager: Cannot commit for unknown state, " + + "consumerGroupId={}, topicName={}, regionId={}, writerId={}, writerProgress={}", + consumerGroupId, + topicName, + regionId, + writerId, + writerProgress); + return false; + } + final CommitOperationResult result = state.commitAndGetResult(writerId, writerProgress); + if (result.isHandled()) { + // Periodically persist progress + persistProgressIfNeeded(key, state); + if (result.hasAdvancedWriter()) { + maybeBroadcast( + key, + consumerGroupId, + topicName, + regionId, + result.getAdvancedWriterProgress(), + result.getAdvancedWriterId()); + } + } + return result.isHandled(); + } + + public boolean commitWithoutOutstanding( + final String consumerGroupId, + final String topicName, + final ConsensusGroupId regionId, + final WriterId writerId, + final WriterProgress writerProgress) { + final String key = generateKey(consumerGroupId, topicName, regionId); + final ConsensusSubscriptionCommitState state = commitStates.get(key); + if (state == null) { + LOGGER.warn( + "ConsensusSubscriptionCommitManager: Cannot direct-commit for unknown state, " + + "consumerGroupId={}, topicName={}, regionId={}, writerId={}, writerProgress={}", + consumerGroupId, + topicName, + regionId, + writerId, + writerProgress); + return false; + } + final CommitOperationResult result = + state.commitWithoutOutstandingAndGetResult(writerId, writerProgress); + if (result.isHandled()) { + persistProgressIfNeeded(key, state); + if (result.hasAdvancedWriter()) { + maybeBroadcast( + key, + consumerGroupId, + topicName, + regionId, + result.getAdvancedWriterProgress(), + result.getAdvancedWriterId()); + } + } + return result.isHandled(); + } + + public long getCommittedPhysicalTime( + final String consumerGroupId, final String topicName, final ConsensusGroupId regionId) { + final String key = generateKey(consumerGroupId, topicName, regionId); + final ConsensusSubscriptionCommitState state = commitStates.get(key); + return state != null ? state.getCommittedPhysicalTime() : 0L; + } + + public long getCommittedLocalSeq( + final String consumerGroupId, final String topicName, final ConsensusGroupId regionId) { + final String key = generateKey(consumerGroupId, topicName, regionId); + final ConsensusSubscriptionCommitState state = commitStates.get(key); + return state != null ? state.getCommittedLocalSeq() : -1L; + } + + public int getCommittedWriterNodeId( + final String consumerGroupId, final String topicName, final ConsensusGroupId regionId) { + final String key = generateKey(consumerGroupId, topicName, regionId); + final ConsensusSubscriptionCommitState state = commitStates.get(key); + return state != null ? state.getCommittedWriterNodeId() : -1; + } + + public WriterId getCommittedWriterId( + final String consumerGroupId, final String topicName, final ConsensusGroupId regionId) { + final String key = generateKey(consumerGroupId, topicName, regionId); + final ConsensusSubscriptionCommitState state = commitStates.get(key); + return state != null ? state.getCommittedWriterId() : null; + } + + public WriterProgress getCommittedWriterProgress( + final String consumerGroupId, final String topicName, final ConsensusGroupId regionId) { + final String key = generateKey(consumerGroupId, topicName, regionId); + final ConsensusSubscriptionCommitState state = commitStates.get(key); + return state != null ? state.getCommittedWriterProgress() : null; + } + + public RegionProgress getCommittedRegionProgress( + final String consumerGroupId, final String topicName, final ConsensusGroupId regionId) { + final String key = generateKey(consumerGroupId, topicName, regionId); + final ConsensusSubscriptionCommitState state = commitStates.get(key); + if (state == null) { + return new RegionProgress(Collections.emptyMap()); + } + return state.getCommittedRegionProgress(); + } + + /** + * Removes state for a specific (consumerGroup, topic, region) triple. + * + * @param consumerGroupId the consumer group ID + * @param topicName the topic name + * @param regionId the consensus group / data region ID + */ + public void removeState( + final String consumerGroupId, final String topicName, final ConsensusGroupId regionId) { + final String key = generateKey(consumerGroupId, topicName, regionId); + commitStates.remove(key); + // Clean up persisted file + final File file = getProgressFile(key); + if (file.exists()) { + file.delete(); + } + } + + /** + * Removes all states for a given (consumerGroup, topic) pair across all regions. Used during + * subscription teardown when the individual regionIds may not be readily available. + * + * @param consumerGroupId the consumer group ID + * @param topicName the topic name + */ + public void removeAllStatesForTopic(final String consumerGroupId, final String topicName) { + final String prefix = consumerGroupId + KEY_SEPARATOR + topicName + KEY_SEPARATOR; + final Iterator<Map.Entry<String, ConsensusSubscriptionCommitState>> it = + commitStates.entrySet().iterator(); + while (it.hasNext()) { + final Map.Entry<String, ConsensusSubscriptionCommitState> entry = it.next(); + if (entry.getKey().startsWith(prefix)) { + it.remove(); + final File file = getProgressFile(entry.getKey()); + if (file.exists()) { + file.delete(); + } + } + } + } + + public void resetState( + final String consumerGroupId, + final String topicName, + final ConsensusGroupId regionId, + final RegionProgress regionProgress) { + final String key = generateKey(consumerGroupId, topicName, regionId); + final ConsensusSubscriptionCommitState state = commitStates.get(key); + if (state == null) { + LOGGER.warn( + "ConsensusSubscriptionCommitManager: Cannot reset unknown state, " + + "consumerGroupId={}, topicName={}, regionId={}", + consumerGroupId, + topicName, + regionId); + return; + } + state.resetForSeek(regionProgress); + persistProgress(key, state); + } + + /** Persists all states. Should be called during graceful shutdown. */ + public void persistAll() { + for (final Map.Entry<String, ConsensusSubscriptionCommitState> entry : + commitStates.entrySet()) { + persistProgress(entry.getKey(), entry.getValue()); + } + } + + public Map<String, ByteBuffer> collectAllRegionProgress(final int dataNodeId) { + final Map<String, ByteBuffer> result = new ConcurrentHashMap<>(); + final String suffix = KEY_SEPARATOR + dataNodeId; + for (final Map.Entry<String, ConsensusSubscriptionCommitState> entry : + commitStates.entrySet()) { + final RegionProgress regionProgress = entry.getValue().getCommittedRegionProgress(); + final ByteBuffer serialized = serializeRegionProgress(regionProgress); + if (Objects.nonNull(serialized)) { + result.put(entry.getKey() + suffix, serialized); + } + } + return result; + } + + // ======================== Progress Broadcast (Leader → Follower) ======================== + + /** + * Broadcasts committed progress to followers if enough time has elapsed since the last broadcast + * for this key. The broadcast is async and fire-and-forget. + */ + private void maybeBroadcast( + final String key, + final String consumerGroupId, + final String topicName, + final ConsensusGroupId regionId, + final WriterProgress committedWriterProgress, + final WriterId committedWriterId) { + if (Objects.isNull(committedWriterId) || Objects.isNull(committedWriterProgress)) { + return; + } + final String broadcastKey = buildBroadcastKey(key, committedWriterId); + final long now = System.currentTimeMillis(); + final Long last = lastBroadcastTime.get(broadcastKey); + if (last != null && now - last < MIN_BROADCAST_INTERVAL_MS) { + return; + } + lastBroadcastTime.put(broadcastKey, now); + broadcastExecutor.submit( + () -> + doBroadcast( + consumerGroupId, topicName, regionId, committedWriterProgress, committedWriterId)); + } + + /** + * Sends committed progress to all follower replicas of the given region. Uses the partition cache + * to discover replica endpoints and skips the local DataNode. + */ + private void doBroadcast( + final String consumerGroupId, + final String topicName, + final ConsensusGroupId regionId, + final WriterProgress writerProgress, + final WriterId writerId) { + final int localDataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId(); + try { + final List<TRegionReplicaSet> replicaSets = + ClusterPartitionFetcher.getInstance() + .getRegionReplicaSet( + Collections.singletonList(regionId.convertToTConsensusGroupId())); + if (replicaSets.isEmpty()) { + return; + } + final String regionIdStr = regionId.toString(); + final int writerNodeId = + Objects.nonNull(writerId) && writerId.getNodeId() >= 0 ? writerId.getNodeId() : -1; + final TSyncSubscriptionProgressReq req = + new TSyncSubscriptionProgressReq( + consumerGroupId, + topicName, + regionIdStr, + Objects.nonNull(writerProgress) ? writerProgress.getPhysicalTime() : 0L, + writerNodeId, + Objects.nonNull(writerProgress) ? writerProgress.getLocalSeq() : -1L); + + for (final TDataNodeLocation location : replicaSets.get(0).getDataNodeLocations()) { + if (location.getDataNodeId() == localDataNodeId) { + continue; // skip self + } + final TEndPoint endpoint = location.getInternalEndPoint(); + try (final SyncDataNodeInternalServiceClient client = + SYNC_DN_CLIENT_MANAGER.borrowClient(endpoint)) { + client.syncSubscriptionProgress(req); + } catch (final ClientManagerException | TException e) { + LOGGER.debug( + "Failed to broadcast subscription progress to DataNode {} at {}: {}", + location.getDataNodeId(), + endpoint, + e.getMessage()); + } + } + } catch (final Exception e) { + LOGGER.debug( + "Failed to broadcast subscription progress for region {}: {}", regionId, e.getMessage()); + } + } + + /** + * Receives a committed progress broadcast from another DataNode (Leader). Updates local state if + * the broadcast progress is ahead of the current local progress. + */ + public void receiveProgressBroadcast( + final String consumerGroupId, + final String topicName, + final String regionIdStr, + final long physicalTime, + final int writerNodeId, + final long localSeq) { + receiveProgressBroadcast( + consumerGroupId, + topicName, + regionIdStr, + buildWriterId(regionIdStr, writerNodeId), + new WriterProgress(physicalTime, localSeq)); + } + + public void receiveProgressBroadcast( + final String consumerGroupId, + final String topicName, + final String regionIdStr, + final WriterId writerId, + final WriterProgress writerProgress) { + if (Objects.isNull(writerId) || Objects.isNull(writerProgress)) { + LOGGER.warn( + "ConsensusSubscriptionCommitManager: ignore broadcast without writer identity, " + + "consumerGroupId={}, topicName={}, regionId={}, writerId={}, writerProgress={}", + consumerGroupId, + topicName, + regionIdStr, + writerId, + writerProgress); + return; + } + final String key = consumerGroupId + KEY_SEPARATOR + topicName + KEY_SEPARATOR + regionIdStr; + final ConsensusSubscriptionCommitState state = commitStates.get(key); + if (state != null) { + // Update only if broadcast is ahead + state.updateFromBroadcast(writerId, writerProgress); + persistProgressIfNeeded(key, state); + } else { + // Create a new state from the broadcast progress + final ConsensusSubscriptionCommitState newState = + new ConsensusSubscriptionCommitState( + regionIdStr, + new SubscriptionConsensusProgress( + new RegionProgress(Collections.singletonMap(writerId, writerProgress)), 0L)); + newState.updateFromBroadcast(writerId, writerProgress); + commitStates.putIfAbsent(key, newState); + persistProgress(key, commitStates.get(key)); + } + LOGGER.debug( + "Received subscription progress broadcast: consumerGroupId={}, topicName={}, " + + "regionId={}, physicalTime={}, localSeq={}", + consumerGroupId, + topicName, + regionIdStr, + writerProgress != null ? writerProgress.getPhysicalTime() : 0L, + writerProgress != null ? writerProgress.getLocalSeq() : -1L); + } + + // ======================== Helper Methods ======================== + + // Use a separator that cannot appear in consumerGroupId, topicName, or regionId + // to prevent key collisions (e.g., "a_b" + "c" vs "a" + "b_c"). + private static final String KEY_SEPARATOR = "##"; + + private String generateKey( + final String consumerGroupId, final String topicName, final ConsensusGroupId regionId) { + return consumerGroupId + KEY_SEPARATOR + topicName + KEY_SEPARATOR + regionId.toString(); Review Comment: Added a test covering OS-forbidden characters plus recovery. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
