VGalaxies commented on code in PR #17238:
URL: https://github.com/apache/iotdb/pull/17238#discussion_r3353686592


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java:
##########
@@ -731,22 +728,19 @@ public void updateCompletedTopicNames(final String 
consumerGroupId, final String
   }
 
   public void unbindPrefetchingQueue(final String consumerGroupId, final 
String topicName) {
-    // Try consensus broker first
-    final ConsensusSubscriptionBroker consensusBroker =
-        consumerGroupIdToConsensusBroker.get(consumerGroupId);
-    if (Objects.nonNull(consensusBroker) && 
consensusBroker.hasQueue(topicName)) {
-      consensusBroker.unbindConsensusPrefetchingQueue(topicName);
-      prefetchingQueueCount.invalidate();
-      return;
-    }
-    // Fall back to pipe broker
-    final SubscriptionBroker pipeBroker = 
consumerGroupIdToPipeBroker.get(consumerGroupId);
-    if (Objects.isNull(pipeBroker)) {
+    final ISubscriptionBroker broker = getBrokerForTopic(consumerGroupId, 
topicName);
+    if (Objects.isNull(broker)) {
       LOGGER.warn(
           "Subscription: broker bound to consumer group [{}] does not exist", 
consumerGroupId);
       return;
     }
-    pipeBroker.unbindPrefetchingQueue(topicName);
+    if (broker instanceof ConsensusSubscriptionBroker) {
+      ((ConsensusSubscriptionBroker) 
broker).unbindConsensusPrefetchingQueue(topicName);
+    } else if (broker instanceof SubscriptionBroker) {
+      ((SubscriptionBroker) broker).unbindPrefetchingQueue(topicName);
+    } else {
+      broker.removeQueue(topicName);
+    }

Review Comment:
   Done in 75bacb1. Added `ISubscriptionBroker.unbind(String topicName)` and 
the agent now calls the broker interface directly.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java:
##########
@@ -797,23 +786,27 @@ public int getPrefetchingQueueCount() {
 
   public Map<String, Long> getConsensusLagSummary() {
     final Map<String, Long> result = new ConcurrentHashMap<>();
-    for (final Map.Entry<String, ConsensusSubscriptionBroker> entry :
-        consumerGroupIdToConsensusBroker.entrySet()) {
-      final String groupId = entry.getKey();
-      for (final Map.Entry<String, Long> lag : 
entry.getValue().getLagSummary().entrySet()) {
-        result.put(groupId + "/" + lag.getKey(), lag.getValue());
+    for (final Map.Entry<String, List<ISubscriptionBroker>> entry :
+        consumerGroupIdToBrokers.entrySet()) {
+      for (final ISubscriptionBroker broker : entry.getValue()) {
+        if (!(broker instanceof ConsensusSubscriptionBroker)) {
+          continue;
+        }
+        for (final Map.Entry<String, Long> lag :
+            ((ConsensusSubscriptionBroker) broker).getLagSummary().entrySet()) 
{
+          result.put(entry.getKey() + "/" + lag.getKey(), lag.getValue());
+        }
       }

Review Comment:
   Done in 75bacb1. `ISubscriptionBroker.getLagSummary()` now defaults to an 
empty map, so the pipe broker returns an empty summary through the interface.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusLogToTabletConverter.java:
##########
@@ -297,17 +297,25 @@ private List<Tablet> convertInsertTabletNode(final 
InsertTabletNode node) {
       schemas.add(new MeasurementSchema(measurements[colIdx], 
dataTypes[colIdx]));
     }
 
-    // Build column arrays and bitmaps using bulk copy
-    final long[] newTimes = Arrays.copyOf(times, rowCount);
+    // Reuse complete tablets on the server-side read path. Column filtering 
changes tablet shape
+    // and still needs copies for the selected columns.
+    final long[] newTimes = allColumnsMatch ? times : Arrays.copyOf(times, 
rowCount);

Review Comment:
   Done in 75bacb1. Column filtering now reuses the selected timestamp, value, 
and bitmap arrays from WAL InsertNodes instead of copying column contents.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALFileUtils.java:
##########
@@ -182,4 +197,243 @@ public static String getTsFileRelativePath(String 
absolutePath) {
     Path path = new File(absolutePath).toPath();
     return path.subpath(path.getNameCount() - 5, 
path.getNameCount()).toString();
   }
+
+  /**
+   * Locate the first local searchIndex whose writer progress is equal to or 
strictly greater than
+   * the given writer-local frontier. This is currently used by single-writer 
recovery paths, so it
+   * matches only entries from the supplied nodeId.
+   *
+   * @return [targetSearchIndex, exactMatchFlag], or null if no matching/later 
entry exists
+   */
+  public static long[] locateByWriterProgress(
+      final File logDir, final int nodeId, final long physicalTime, final long 
localSeq) {
+    final long[] exactSearchIndex = new long[] {-1L};
+    final long[] firstAfterSearchIndex = new long[] {-1L};
+    final long[] firstAfterPhysicalTime = new long[] {Long.MAX_VALUE};
+    final long[] firstAfterLocalSeq = new long[] {Long.MAX_VALUE};
+
+    forEachSealedSearchableRequest(
+        logDir,
+        request -> {
+          if (request.nodeId != nodeId) {
+            return true;
+          }
+          final int cmp =
+              compareWriterProgress(
+                  request.physicalTime,
+                  request.nodeId,
+                  request.localSeq,
+                  physicalTime,
+                  nodeId,
+                  localSeq);
+          if (cmp == 0) {
+            exactSearchIndex[0] = request.searchIndex;
+            return false;
+          }
+          if (cmp > 0
+              && (firstAfterSearchIndex[0] < 0L
+                  || compareWriterProgress(
+                          request.physicalTime,
+                          request.nodeId,
+                          request.localSeq,
+                          firstAfterPhysicalTime[0],
+                          nodeId,
+                          firstAfterLocalSeq[0])
+                      < 0)) {
+            firstAfterSearchIndex[0] = request.searchIndex;
+            firstAfterPhysicalTime[0] = request.physicalTime;
+            firstAfterLocalSeq[0] = request.localSeq;
+          }
+          return true;
+        });
+
+    if (exactSearchIndex[0] >= 0L) {
+      return new long[] {exactSearchIndex[0], 1L};
+    }
+    if (firstAfterSearchIndex[0] >= 0L) {
+      return new long[] {firstAfterSearchIndex[0], 0L};
+    }
+    return null;
+  }
+
+  public static long findSearchIndexByWriterProgress(
+      final File logDir, final int nodeId, final long physicalTime, final long 
localSeq) {
+    final long[] located = locateByWriterProgress(logDir, nodeId, 
physicalTime, localSeq);
+    return located != null && located[1] == 1L ? located[0] : -1L;
+  }
+
+  public static long findSearchIndexAfterWriterProgress(
+      final File logDir, final int nodeId, final long physicalTime, final long 
localSeq) {
+    final long[] bestSearchIndex = new long[] {-1L};
+    final long[] bestPhysicalTime = new long[] {Long.MAX_VALUE};
+    final long[] bestLocalSeq = new long[] {Long.MAX_VALUE};
+    forEachSealedSearchableRequest(
+        logDir,
+        request -> {
+          if (request.nodeId != nodeId) {
+            return true;
+          }
+          if (compareWriterProgress(
+                  request.physicalTime,
+                  request.nodeId,
+                  request.localSeq,
+                  physicalTime,
+                  nodeId,
+                  localSeq)
+              <= 0) {
+            return true;
+          }
+          if (bestSearchIndex[0] < 0L
+              || compareWriterProgress(
+                      request.physicalTime,
+                      request.nodeId,
+                      request.localSeq,
+                      bestPhysicalTime[0],
+                      nodeId,
+                      bestLocalSeq[0])
+                  < 0) {
+            bestSearchIndex[0] = request.searchIndex;
+            bestPhysicalTime[0] = request.physicalTime;
+            bestLocalSeq[0] = request.localSeq;
+          }
+          return true;
+        });
+    return bestSearchIndex[0];
+  }
+
+  private interface SearchableRequestVisitor {
+    boolean onRequest(SearchableRequestMeta request);

Review Comment:
   Done in 75bacb1. Added return-value javadoc to 
`SearchableRequestVisitor.onRequest(SearchableRequestMeta request)` explaining 
continue vs stop semantics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to