DanielWang2035 commented on code in PR #17238:
URL: https://github.com/apache/iotdb/pull/17238#discussion_r3278680701


##########
iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java:
##########
@@ -1390,6 +1628,365 @@ private void unsubscribeWithRedirection(final 
Set<String> topicNames)
     throw new SubscriptionRuntimeCriticalException(errorMessage);
   }
 
+  /**
+   * Sends seek request to ALL available providers. Unlike 
subscribe/unsubscribe, seek is only
+   * considered successful if every available provider acknowledges it because 
data regions for the
+   * topic may be distributed across different nodes.
+   */
+  private void seekWithRedirection(
+      final String topicName, final short seekType, final long timestamp)
+      throws SubscriptionException {
+    final List<AbstractSubscriptionProvider> providers = 
this.providers.getAllAvailableProviders();
+    if (providers.isEmpty()) {
+      throw new SubscriptionConnectionException(
+          String.format(
+              "Cluster has no available subscription providers when %s seek 
topic %s",
+              this, topicName));
+    }
+    final List<AbstractSubscriptionProvider> failedProviders = new 
ArrayList<>();
+    Throwable firstFailure = null;
+    for (final AbstractSubscriptionProvider provider : providers) {
+      try {
+        provider.seek(topicName, seekType, timestamp);
+      } catch (final Exception e) {
+        failedProviders.add(provider);
+        if (Objects.isNull(firstFailure)) {
+          firstFailure = e;
+        }
+        LOGGER.warn(
+            "{} failed to seek topic {} from subscription provider {}; seek 
requires every provider to succeed, so the client will continue notifying the 
remaining providers before failing this seek.",
+            this,
+            topicName,
+            provider,
+            e);
+      }
+    }
+    if (!failedProviders.isEmpty()) {
+      final String errorMessage =
+          String.format(
+              "%s failed to seek topic %s on subscription providers %s; seek 
requires every available provider to succeed",
+              this, topicName, failedProviders);
+      LOGGER.warn(errorMessage);
+      throw new SubscriptionRuntimeCriticalException(errorMessage, 
firstFailure);
+    }
+  }
+
+  /** Same all-provider success requirement as {@link 
#seekWithRedirection(String, short, long)}. */
+  private void seekWithRedirectionTopicProgress(
+      final String topicName, final TopicProgress topicProgress) throws 
SubscriptionException {
+    final List<AbstractSubscriptionProvider> providers = 
this.providers.getAllAvailableProviders();
+    if (providers.isEmpty()) {
+      throw new SubscriptionConnectionException(
+          String.format(
+              "Cluster has no available subscription providers when %s seek 
topic %s",
+              this, topicName));
+    }
+    final List<AbstractSubscriptionProvider> failedProviders = new 
ArrayList<>();
+    Throwable firstFailure = null;
+    for (final AbstractSubscriptionProvider provider : providers) {
+      try {
+        provider.seekToTopicProgress(topicName, topicProgress);
+      } catch (final Exception e) {
+        failedProviders.add(provider);
+        if (Objects.isNull(firstFailure)) {
+          firstFailure = e;
+        }
+        LOGGER.warn(
+            "{} failed to seek topic {} to topicProgress(regionCount={}) from 
provider {}; seek requires every provider to succeed, so the client will 
continue notifying the remaining providers before failing this seek.",
+            this,
+            topicName,
+            topicProgress.getRegionProgress().size(),
+            provider,
+            e);
+      }
+    }
+    if (!failedProviders.isEmpty()) {
+      final String errorMessage =
+          String.format(
+              "%s failed to seek topic %s to topicProgress(regionCount=%d) on 
subscription providers %s; seek requires every available provider to succeed",
+              this, topicName, topicProgress.getRegionProgress().size(), 
failedProviders);
+      LOGGER.warn(errorMessage);
+      throw new SubscriptionRuntimeCriticalException(errorMessage, 
firstFailure);
+    }
+  }
+
+  /** Same all-provider success requirement as {@link 
#seekWithRedirection(String, short, long)}. */
+  private void seekAfterWithRedirectionTopicProgress(
+      final String topicName, final TopicProgress topicProgress) throws 
SubscriptionException {
+    final List<AbstractSubscriptionProvider> providers = 
this.providers.getAllAvailableProviders();
+    if (providers.isEmpty()) {
+      throw new SubscriptionConnectionException(
+          String.format(
+              "Cluster has no available subscription providers when %s 
seekAfter topic %s",
+              this, topicName));
+    }
+    final List<AbstractSubscriptionProvider> failedProviders = new 
ArrayList<>();
+    Throwable firstFailure = null;
+    for (final AbstractSubscriptionProvider provider : providers) {
+      try {
+        provider.seekAfterTopicProgress(topicName, topicProgress);
+      } catch (final Exception e) {
+        failedProviders.add(provider);
+        if (Objects.isNull(firstFailure)) {
+          firstFailure = e;
+        }
+        LOGGER.warn(
+            "{} failed to seekAfter topic {} to topicProgress(regionCount={}) 
from provider {}; seek requires every provider to succeed, so the client will 
continue notifying the remaining providers before failing this seekAfter.",
+            this,
+            topicName,
+            topicProgress.getRegionProgress().size(),
+            provider,
+            e);
+      }
+    }
+    if (!failedProviders.isEmpty()) {
+      final String errorMessage =
+          String.format(
+              "%s failed to seekAfter topic %s to 
topicProgress(regionCount=%d) on subscription providers %s; seek requires every 
available provider to succeed",
+              this, topicName, topicProgress.getRegionProgress().size(), 
failedProviders);
+      LOGGER.warn(errorMessage);
+      throw new SubscriptionRuntimeCriticalException(errorMessage, 
firstFailure);
+    }
+  }
+
+  private Map<String, TopicProgress> buildCurrentProgressByTopic(final 
Set<String> topicNames) {
+    final Map<String, TopicProgress> result = new HashMap<>();
+    for (final String topicName : topicNames) {
+      final TopicProgress topicProgress = 
currentPositionsByTopic.get(topicName);
+      if (Objects.isNull(topicProgress) || 
topicProgress.getRegionProgress().isEmpty()) {
+        continue;
+      }
+      result.put(topicName, new 
TopicProgress(topicProgress.getRegionProgress()));
+    }
+    return result;
+  }
+
+  private void advanceCurrentPositions(final List<SubscriptionMessage> 
messages) {
+    for (final SubscriptionMessage message : messages) {
+      final SubscriptionCommitContext commitContext = 
message.getCommitContext();
+      if (Objects.isNull(commitContext) || 
Objects.isNull(commitContext.getTopicName())) {
+        continue;
+      }
+      mergeTopicProgress(
+          currentPositionsByTopic,
+          commitContext.getTopicName(),
+          extractWriterId(commitContext),
+          extractWriterProgress(commitContext));
+    }
+  }
+
+  private void advanceCommittedPositions(
+      final List<SubscriptionCommitContext> subscriptionCommitContexts) {
+    for (final SubscriptionCommitContext commitContext : 
subscriptionCommitContexts) {
+      if (Objects.isNull(commitContext) || 
Objects.isNull(commitContext.getTopicName())) {
+        continue;
+      }
+      mergeTopicProgress(
+          committedPositionsByTopic,
+          commitContext.getTopicName(),
+          extractWriterId(commitContext),
+          extractWriterProgress(commitContext));
+    }
+  }
+
+  private boolean isConsensusCommitContext(final SubscriptionCommitContext 
commitContext) {
+    return Objects.nonNull(commitContext)
+        && Objects.nonNull(commitContext.getWriterId())
+        && Objects.nonNull(commitContext.getWriterProgress())
+        && Objects.nonNull(commitContext.getRegionId())
+        && !commitContext.getRegionId().isEmpty();
+  }
+
+  private String buildTopicRegionKey(final SubscriptionCommitContext 
commitContext) {
+    return commitContext.getTopicName() + '\u0001' + 
commitContext.getRegionId();
+  }
+
+  private void stagePendingRedirectAck(final SubscriptionCommitContext 
commitContext) {
+    pendingRedirectAcksByTopicRegion
+        .computeIfAbsent(
+            buildTopicRegionKey(commitContext), ignored -> 
ConcurrentHashMap.newKeySet())
+        .add(commitContext);
+  }
+
+  private void flushPendingRedirectAcks(final List<SubscriptionMessage> 
currentMessages) {
+    final Map<String, Integer> redirectTargetByTopicRegion = new HashMap<>();
+    for (final SubscriptionMessage message : currentMessages) {
+      final SubscriptionCommitContext commitContext = 
message.getCommitContext();
+      if (!isConsensusCommitContext(commitContext)) {
+        continue;
+      }
+      redirectTargetByTopicRegion.put(
+          buildTopicRegionKey(commitContext), commitContext.getDataNodeId());
+    }
+
+    for (final Entry<String, Integer> entry : 
redirectTargetByTopicRegion.entrySet()) {
+      final Set<SubscriptionCommitContext> pendingContexts =
+          pendingRedirectAcksByTopicRegion.get(entry.getKey());
+      if (Objects.isNull(pendingContexts) || pendingContexts.isEmpty()) {
+        continue;
+      }
+
+      final List<SubscriptionCommitContext> contextsToRedirect = new 
ArrayList<>(pendingContexts);
+      try {
+        commitInternal(entry.getValue(), contextsToRedirect, false);
+        advanceCommittedPositions(contextsToRedirect);
+        contextsToRedirect.forEach(pendingContexts::remove);
+        if (pendingContexts.isEmpty()) {
+          pendingRedirectAcksByTopicRegion.remove(entry.getKey(), 
pendingContexts);
+        }
+      } catch (final SubscriptionException e) {
+        LOGGER.warn(
+            "{} failed to redirect {} pending consensus ack(s) for {} via 
provider {}",
+            this,
+            contextsToRedirect.size(),
+            entry.getKey(),
+            entry.getValue(),
+            e);
+      }
+    }
+  }
+
+  private boolean isNewerWriterProgress(
+      final long newPhysicalTime,
+      final long newLocalSeq,
+      final long oldPhysicalTime,
+      final long oldLocalSeq) {
+    return newPhysicalTime > oldPhysicalTime
+        || (newPhysicalTime == oldPhysicalTime && newLocalSeq > oldLocalSeq);
+  }
+
+  private void clearCurrentPositions(final String topicName) {
+    currentPositionsByTopic.remove(topicName);
+  }
+
+  private void clearCommittedPositions(final String topicName) {
+    committedPositionsByTopic.remove(topicName);
+  }
+
+  private void clearPendingRedirectAcks(final String topicName) {
+    final String prefix = topicName + '\u0001';
+    pendingRedirectAcksByTopicRegion.keySet().removeIf(key -> 
key.startsWith(prefix));
+  }
+
+  private void setCurrentPositions(final String topicName, final TopicProgress 
topicProgress) {
+    if (Objects.isNull(topicProgress) || 
topicProgress.getRegionProgress().isEmpty()) {
+      currentPositionsByTopic.remove(topicName);
+      return;
+    }
+    currentPositionsByTopic.put(topicName, new 
TopicProgress(topicProgress.getRegionProgress()));
+  }
+
+  private void setCommittedPositions(final String topicName, final 
TopicProgress topicProgress) {
+    if (Objects.isNull(topicProgress) || 
topicProgress.getRegionProgress().isEmpty()) {
+      committedPositionsByTopic.remove(topicName);
+      return;
+    }
+    committedPositionsByTopic.put(topicName, new 
TopicProgress(topicProgress.getRegionProgress()));
+  }
+
+  private void overlayCurrentPositions(final String topicName, final 
TopicProgress topicProgress) {
+    overlayTopicProgress(currentPositionsByTopic, topicName, topicProgress);
+  }
+
+  private void overlayCommittedPositions(
+      final String topicName, final TopicProgress topicProgress) {
+    overlayTopicProgress(committedPositionsByTopic, topicName, topicProgress);
+  }
+
+  private void overlayTopicProgress(
+      final Map<String, TopicProgress> progressByTopic,
+      final String topicName,
+      final TopicProgress topicProgress) {
+    if (Objects.isNull(topicName)
+        || topicName.isEmpty()
+        || Objects.isNull(topicProgress)
+        || topicProgress.getRegionProgress().isEmpty()) {
+      return;
+    }
+    progressByTopic.compute(
+        topicName,
+        (ignored, oldTopicProgress) -> {
+          final Map<String, RegionProgress> mergedRegionProgress =
+              Objects.nonNull(oldTopicProgress)
+                  ? new HashMap<>(oldTopicProgress.getRegionProgress())
+                  : new HashMap<>();
+          topicProgress
+              .getRegionProgress()
+              .forEach(
+                  (regionId, regionProgress) -> {
+                    if (Objects.isNull(regionId)
+                        || regionId.isEmpty()
+                        || Objects.isNull(regionProgress)
+                        || regionProgress.getWriterPositions().isEmpty()) {
+                      return;
+                    }
+                    mergedRegionProgress.put(
+                        regionId,
+                        new RegionProgress(new 
HashMap<>(regionProgress.getWriterPositions())));
+                  });

Review Comment:
   To avoid mutating a progress object that may still be referenced elsewhere. 
The merged result is treated as a new snapshot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to