jt2594838 commented on code in PR #17238:
URL: https://github.com/apache/iotdb/pull/17238#discussion_r3352929737


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java:
##########
@@ -731,22 +728,19 @@ public void updateCompletedTopicNames(final String 
consumerGroupId, final String
   }
 
   public void unbindPrefetchingQueue(final String consumerGroupId, final 
String topicName) {
-    // Try consensus broker first
-    final ConsensusSubscriptionBroker consensusBroker =
-        consumerGroupIdToConsensusBroker.get(consumerGroupId);
-    if (Objects.nonNull(consensusBroker) && 
consensusBroker.hasQueue(topicName)) {
-      consensusBroker.unbindConsensusPrefetchingQueue(topicName);
-      prefetchingQueueCount.invalidate();
-      return;
-    }
-    // Fall back to pipe broker
-    final SubscriptionBroker pipeBroker = 
consumerGroupIdToPipeBroker.get(consumerGroupId);
-    if (Objects.isNull(pipeBroker)) {
+    final ISubscriptionBroker broker = getBrokerForTopic(consumerGroupId, 
topicName);
+    if (Objects.isNull(broker)) {
       LOGGER.warn(
           "Subscription: broker bound to consumer group [{}] does not exist", 
consumerGroupId);
       return;
     }
-    pipeBroker.unbindPrefetchingQueue(topicName);
+    if (broker instanceof ConsensusSubscriptionBroker) {
+      ((ConsensusSubscriptionBroker) 
broker).unbindConsensusPrefetchingQueue(topicName);
+    } else if (broker instanceof SubscriptionBroker) {
+      ((SubscriptionBroker) broker).unbindPrefetchingQueue(topicName);
+    } else {
+      broker.removeQueue(topicName);
+    }

Review Comment:
   Abstract as `ISubscriptionBroker.unbind(String topicName)` ?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java:
##########
@@ -797,23 +786,27 @@ public int getPrefetchingQueueCount() {
 
   public Map<String, Long> getConsensusLagSummary() {
     final Map<String, Long> result = new ConcurrentHashMap<>();
-    for (final Map.Entry<String, ConsensusSubscriptionBroker> entry :
-        consumerGroupIdToConsensusBroker.entrySet()) {
-      final String groupId = entry.getKey();
-      for (final Map.Entry<String, Long> lag : 
entry.getValue().getLagSummary().entrySet()) {
-        result.put(groupId + "/" + lag.getKey(), lag.getValue());
+    for (final Map.Entry<String, List<ISubscriptionBroker>> entry :
+        consumerGroupIdToBrokers.entrySet()) {
+      for (final ISubscriptionBroker broker : entry.getValue()) {
+        if (!(broker instanceof ConsensusSubscriptionBroker)) {
+          continue;
+        }
+        for (final Map.Entry<String, Long> lag :
+            ((ConsensusSubscriptionBroker) broker).getLagSummary().entrySet()) 
{
+          result.put(entry.getKey() + "/" + lag.getKey(), lag.getValue());
+        }
       }

Review Comment:
   May also allow PipeBroker to return an empty summary for now.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALFileUtils.java:
##########
@@ -182,4 +197,243 @@ public static String getTsFileRelativePath(String 
absolutePath) {
     Path path = new File(absolutePath).toPath();
     return path.subpath(path.getNameCount() - 5, 
path.getNameCount()).toString();
   }
+
+  /**
+   * Locate the first local searchIndex whose writer progress is equal to or 
strictly greater than
+   * the given writer-local frontier. This is currently used by single-writer 
recovery paths, so it
+   * matches only entries from the supplied nodeId.
+   *
+   * @return [targetSearchIndex, exactMatchFlag], or null if no matching/later 
entry exists
+   */
+  public static long[] locateByWriterProgress(
+      final File logDir, final int nodeId, final long physicalTime, final long 
localSeq) {
+    final long[] exactSearchIndex = new long[] {-1L};
+    final long[] firstAfterSearchIndex = new long[] {-1L};
+    final long[] firstAfterPhysicalTime = new long[] {Long.MAX_VALUE};
+    final long[] firstAfterLocalSeq = new long[] {Long.MAX_VALUE};
+
+    forEachSealedSearchableRequest(
+        logDir,
+        request -> {
+          if (request.nodeId != nodeId) {
+            return true;
+          }
+          final int cmp =
+              compareWriterProgress(
+                  request.physicalTime,
+                  request.nodeId,
+                  request.localSeq,
+                  physicalTime,
+                  nodeId,
+                  localSeq);
+          if (cmp == 0) {
+            exactSearchIndex[0] = request.searchIndex;
+            return false;
+          }
+          if (cmp > 0
+              && (firstAfterSearchIndex[0] < 0L
+                  || compareWriterProgress(
+                          request.physicalTime,
+                          request.nodeId,
+                          request.localSeq,
+                          firstAfterPhysicalTime[0],
+                          nodeId,
+                          firstAfterLocalSeq[0])
+                      < 0)) {
+            firstAfterSearchIndex[0] = request.searchIndex;
+            firstAfterPhysicalTime[0] = request.physicalTime;
+            firstAfterLocalSeq[0] = request.localSeq;
+          }
+          return true;
+        });
+
+    if (exactSearchIndex[0] >= 0L) {
+      return new long[] {exactSearchIndex[0], 1L};
+    }
+    if (firstAfterSearchIndex[0] >= 0L) {
+      return new long[] {firstAfterSearchIndex[0], 0L};
+    }
+    return null;
+  }
+
+  public static long findSearchIndexByWriterProgress(
+      final File logDir, final int nodeId, final long physicalTime, final long 
localSeq) {
+    final long[] located = locateByWriterProgress(logDir, nodeId, 
physicalTime, localSeq);
+    return located != null && located[1] == 1L ? located[0] : -1L;
+  }
+
+  public static long findSearchIndexAfterWriterProgress(
+      final File logDir, final int nodeId, final long physicalTime, final long 
localSeq) {
+    final long[] bestSearchIndex = new long[] {-1L};
+    final long[] bestPhysicalTime = new long[] {Long.MAX_VALUE};
+    final long[] bestLocalSeq = new long[] {Long.MAX_VALUE};
+    forEachSealedSearchableRequest(
+        logDir,
+        request -> {
+          if (request.nodeId != nodeId) {
+            return true;
+          }
+          if (compareWriterProgress(
+                  request.physicalTime,
+                  request.nodeId,
+                  request.localSeq,
+                  physicalTime,
+                  nodeId,
+                  localSeq)
+              <= 0) {
+            return true;
+          }
+          if (bestSearchIndex[0] < 0L
+              || compareWriterProgress(
+                      request.physicalTime,
+                      request.nodeId,
+                      request.localSeq,
+                      bestPhysicalTime[0],
+                      nodeId,
+                      bestLocalSeq[0])
+                  < 0) {
+            bestSearchIndex[0] = request.searchIndex;
+            bestPhysicalTime[0] = request.physicalTime;
+            bestLocalSeq[0] = request.localSeq;
+          }
+          return true;
+        });
+    return bestSearchIndex[0];
+  }
+
+  private interface SearchableRequestVisitor {
+    boolean onRequest(SearchableRequestMeta request);

Review Comment:
   I mean `boolean onRequest(SearchableRequestMeta request)`;



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusLogToTabletConverter.java:
##########
@@ -297,17 +297,25 @@ private List<Tablet> convertInsertTabletNode(final 
InsertTabletNode node) {
       schemas.add(new MeasurementSchema(measurements[colIdx], 
dataTypes[colIdx]));
     }
 
-    // Build column arrays and bitmaps using bulk copy
-    final long[] newTimes = Arrays.copyOf(times, rowCount);
+    // Reuse complete tablets on the server-side read path. Column filtering 
changes tablet shape
+    // and still needs copies for the selected columns.
+    final long[] newTimes = allColumnsMatch ? times : Arrays.copyOf(times, 
rowCount);

Review Comment:
   Why does column filtering need to copy the content of a column?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to