DanielWang2035 commented on code in PR #17238:
URL: https://github.com/apache/iotdb/pull/17238#discussion_r3279122167


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALFileUtils.java:
##########
@@ -182,4 +197,243 @@ public static String getTsFileRelativePath(String 
absolutePath) {
     Path path = new File(absolutePath).toPath();
     return path.subpath(path.getNameCount() - 5, 
path.getNameCount()).toString();
   }
+
+  /**
+   * Locate the first local searchIndex whose writer progress is equal to or 
strictly greater than
+   * the given writer-local frontier. This is currently used by single-writer 
recovery paths, so it
+   * matches only entries from the supplied nodeId.
+   *
+   * @return [targetSearchIndex, exactMatchFlag], or null if no matching/later 
entry exists
+   */
+  public static long[] locateByWriterProgress(
+      final File logDir, final int nodeId, final long physicalTime, final long 
localSeq) {
+    final long[] exactSearchIndex = new long[] {-1L};
+    final long[] firstAfterSearchIndex = new long[] {-1L};
+    final long[] firstAfterPhysicalTime = new long[] {Long.MAX_VALUE};
+    final long[] firstAfterLocalSeq = new long[] {Long.MAX_VALUE};
+
+    forEachSealedSearchableRequest(
+        logDir,
+        request -> {
+          if (request.nodeId != nodeId) {
+            return true;
+          }
+          final int cmp =
+              compareWriterProgress(
+                  request.physicalTime,
+                  request.nodeId,
+                  request.localSeq,
+                  physicalTime,
+                  nodeId,
+                  localSeq);
+          if (cmp == 0) {
+            exactSearchIndex[0] = request.searchIndex;
+            return false;
+          }
+          if (cmp > 0
+              && (firstAfterSearchIndex[0] < 0L
+                  || compareWriterProgress(
+                          request.physicalTime,
+                          request.nodeId,
+                          request.localSeq,
+                          firstAfterPhysicalTime[0],
+                          nodeId,
+                          firstAfterLocalSeq[0])
+                      < 0)) {
+            firstAfterSearchIndex[0] = request.searchIndex;
+            firstAfterPhysicalTime[0] = request.physicalTime;
+            firstAfterLocalSeq[0] = request.localSeq;
+          }
+          return true;
+        });
+
+    if (exactSearchIndex[0] >= 0L) {
+      return new long[] {exactSearchIndex[0], 1L};
+    }
+    if (firstAfterSearchIndex[0] >= 0L) {
+      return new long[] {firstAfterSearchIndex[0], 0L};
+    }
+    return null;
+  }
+
+  public static long findSearchIndexByWriterProgress(
+      final File logDir, final int nodeId, final long physicalTime, final long 
localSeq) {
+    final long[] located = locateByWriterProgress(logDir, nodeId, 
physicalTime, localSeq);
+    return located != null && located[1] == 1L ? located[0] : -1L;
+  }
+
+  public static long findSearchIndexAfterWriterProgress(
+      final File logDir, final int nodeId, final long physicalTime, final long 
localSeq) {
+    final long[] bestSearchIndex = new long[] {-1L};
+    final long[] bestPhysicalTime = new long[] {Long.MAX_VALUE};
+    final long[] bestLocalSeq = new long[] {Long.MAX_VALUE};
+    forEachSealedSearchableRequest(
+        logDir,
+        request -> {
+          if (request.nodeId != nodeId) {
+            return true;
+          }
+          if (compareWriterProgress(
+                  request.physicalTime,
+                  request.nodeId,
+                  request.localSeq,
+                  physicalTime,
+                  nodeId,
+                  localSeq)
+              <= 0) {
+            return true;
+          }
+          if (bestSearchIndex[0] < 0L
+              || compareWriterProgress(
+                      request.physicalTime,
+                      request.nodeId,
+                      request.localSeq,
+                      bestPhysicalTime[0],
+                      nodeId,
+                      bestLocalSeq[0])
+                  < 0) {
+            bestSearchIndex[0] = request.searchIndex;
+            bestPhysicalTime[0] = request.physicalTime;
+            bestLocalSeq[0] = request.localSeq;
+          }
+          return true;
+        });
+    return bestSearchIndex[0];
+  }
+
+  private interface SearchableRequestVisitor {
+    boolean onRequest(SearchableRequestMeta request);
+  }
+
+  private static final class SearchableRequestMeta {
+    private final long searchIndex;
+    private final long physicalTime;
+    private final int nodeId;
+    private final long localSeq;
+
+    private SearchableRequestMeta(
+        final long searchIndex, final long physicalTime, final int nodeId, 
final long localSeq) {
+      this.searchIndex = searchIndex;
+      this.physicalTime = physicalTime;
+      this.nodeId = nodeId;
+      this.localSeq = localSeq;
+    }
+  }
+
+  private static void forEachSealedSearchableRequest(
+      final File logDir, final SearchableRequestVisitor visitor) {
+    final File[] walFiles = listSealedWALFiles(logDir);
+    if (walFiles == null || walFiles.length == 0) {
+      return;
+    }
+
+    for (final File walFile : walFiles) {
+      try (final ProgressWALReader reader = new ProgressWALReader(walFile)) {
+        long pendingSearchIndex = Long.MIN_VALUE;
+        long pendingPhysicalTime = 0L;
+        int pendingNodeId = -1;
+        long pendingLocalSeq = Long.MIN_VALUE;
+        boolean hasPending = false;
+
+        while (reader.hasNext()) {
+          final ByteBuffer buffer = reader.next();
+          final WALEntryType type = WALEntryType.valueOf(buffer.get());
+          buffer.clear();
+          if (!type.needSearch()) {
+            continue;
+          }
+
+          final long currentLocalSeq = reader.getCurrentEntryLocalSeq();
+          final long currentPhysicalTime = 
reader.getCurrentEntryPhysicalTime();
+          final int currentNodeId = reader.getCurrentEntryNodeId();
+
+          buffer.position(SEARCH_INDEX_OFFSET);
+          final long bodySearchIndex = buffer.getLong();
+          buffer.clear();
+          final long currentSearchIndex = bodySearchIndex >= 0 ? 
bodySearchIndex : currentLocalSeq;
+
+          if (hasPending
+              && pendingPhysicalTime == currentPhysicalTime
+              && pendingNodeId == currentNodeId
+              && pendingLocalSeq == currentLocalSeq) {
+            if (pendingSearchIndex < 0 && currentSearchIndex >= 0) {
+              pendingSearchIndex = currentSearchIndex;
+            }
+            continue;
+          }
+
+          if (hasPending
+              && !visitor.onRequest(
+                  new SearchableRequestMeta(
+                      pendingSearchIndex >= 0 ? pendingSearchIndex : 
pendingLocalSeq,
+                      pendingPhysicalTime,
+                      pendingNodeId,
+                      pendingLocalSeq))) {
+            return;
+          }
+
+          hasPending = true;
+          pendingSearchIndex = currentSearchIndex;
+          pendingPhysicalTime = currentPhysicalTime;
+          pendingNodeId = currentNodeId;
+          pendingLocalSeq = currentLocalSeq;
+        }
+
+        if (hasPending
+            && !visitor.onRequest(
+                new SearchableRequestMeta(
+                    pendingSearchIndex >= 0 ? pendingSearchIndex : 
pendingLocalSeq,
+                    pendingPhysicalTime,
+                    pendingNodeId,
+                    pendingLocalSeq))) {
+          return;
+        }
+      } catch (final IOException e) {
+        logger.warn("Failed to scan WAL file {} for searchable request 
metadata", walFile, e);
+      }
+    }
+  }
+
+  private static int compareCompatibleProgress(
+      final long leftPhysicalTime,
+      final int leftNodeId,
+      final long leftLocalSeq,
+      final long rightPhysicalTime,
+      final long rightLocalSeq) {
+    if (leftPhysicalTime != rightPhysicalTime) {
+      return Long.compare(leftPhysicalTime, rightPhysicalTime);
+    }
+    if (leftLocalSeq != rightLocalSeq) {
+      return Long.compare(leftLocalSeq, rightLocalSeq);
+    }
+    return 0;
+  }

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to