OneSizeFitsQuorum commented on code in PR #13013:
URL: https://github.com/apache/iotdb/pull/13013#discussion_r1689820557


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java:
##########
@@ -702,126 +704,79 @@ public boolean hasNext() {
         }
       }
 
-      // find all nodes of current wal file
-      List<IConsensusRequest> tmpNodes = new ArrayList<>();
-      long targetIndex = nextSearchIndex;
-      try (WALByteBufReader walByteBufReader =
-          new WALByteBufReader(filesToSearch[currentFileIndex])) {
-        while (walByteBufReader.hasNext()) {
-          ByteBuffer buffer = walByteBufReader.next();
-          WALEntryType type = WALEntryType.valueOf(buffer.get());
-          if (type.needSearch()) {
-            // see WALInfoEntry#serialize, entry type + memtable id + plan 
node type
-            buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE + 
PlanNodeType.BYTES);
-            long currentIndex = buffer.getLong();
-            buffer.clear();
-            if (currentIndex == targetIndex) {
-              tmpNodes.add(new IoTConsensusRequest(buffer));
-            } else {
-              // different search index, all slices found
-              if (!tmpNodes.isEmpty()) {
-                insertNodes.add(new IndexedConsensusRequest(targetIndex, 
tmpNodes));
-                tmpNodes = new ArrayList<>();
-              }
-              // remember to add current plan node
-              if (currentIndex > targetIndex) {
-                tmpNodes.add(new IoTConsensusRequest(buffer));
-                targetIndex = currentIndex;
+      /* ------ find all nodes from all wal file ------ */
+
+      AtomicReference<List<IConsensusRequest>> tmpNodes = new 
AtomicReference<>(new ArrayList<>());
+      AtomicBoolean notFirstFile = new AtomicBoolean(false);
+      AtomicBoolean hasCollectedSufficientData = new AtomicBoolean(false);
+
+      // try to collect current tmpNodes to insertNodes, return true if 
successfully collect an
+      // insert node
+      Runnable tryToCollectInsertNodeAndBumpIndex =
+          () -> {
+            if (!tmpNodes.get().isEmpty()) {
+              insertNodes.add(new IndexedConsensusRequest(nextSearchIndex, 
tmpNodes.get()));
+              tmpNodes.set(new ArrayList<>());
+              nextSearchIndex++;
+              if (notFirstFile.get()) {
+                hasCollectedSufficientData.set(true);
               }
             }
-          } else if (!tmpNodes.isEmpty()) {
-            // next entry doesn't need to be searched, all slices found
-            insertNodes.add(new IndexedConsensusRequest(targetIndex, 
tmpNodes));
-            targetIndex++;
-            tmpNodes = new ArrayList<>();
-          }
-        }
-      } catch (FileNotFoundException e) {
-        logger.debug(
-            "WAL file {} has been deleted, try to find next {} again.",
-            identifier,
-            nextSearchIndex);
-        reset();
-        return hasNext();
-      } catch (Exception e) {
-        brokenFileId = 
WALFileUtils.parseVersionId(filesToSearch[currentFileIndex].getName());
-        logger.error(
-            "Fail to read wal from wal file {}, skip this file.",
-            filesToSearch[currentFileIndex],
-            e);
-        // skip this file when it's broken from the beginning
-        if (insertNodes.isEmpty() && tmpNodes.isEmpty()) {
-          currentFileIndex++;
-          return hasNext();
+          };
+
+      COLLECT_FILE_LOOP:
+      for (; currentFileIndex < filesToSearch.length - 1; currentFileIndex++) {
+        // cannot find any in this file, so all slices of last plan node are 
found
+        if 
(WALFileUtils.parseStatusCode(filesToSearch[currentFileIndex].getName())
+            == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
+          tryToCollectInsertNodeAndBumpIndex.run();
+          continue;
         }
-      }
-
-      // find remaining slices of last plan node of targetIndex
-      if (tmpNodes.isEmpty()) { // all plan nodes scanned
-        currentFileIndex++;
-      } else {
-        int fileIndex = currentFileIndex + 1;
-        while (!tmpNodes.isEmpty() && fileIndex < filesToSearch.length - 1) {
-          // cannot find any in this file, so all slices of last plan node are 
found
-          if (WALFileUtils.parseStatusCode(filesToSearch[fileIndex].getName())
-              == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
-            insertNodes.add(new IndexedConsensusRequest(targetIndex, 
tmpNodes));
-            tmpNodes = Collections.emptyList();
-            break;
-          }
-          // read until find all plan nodes whose search index equals target 
index
-          try (WALByteBufReader walByteBufReader = new 
WALByteBufReader(filesToSearch[fileIndex])) {
-            // first search index are different, so all slices of last plan 
node are found
-            if (walByteBufReader.getFirstSearchIndex() != targetIndex) {
-              insertNodes.add(new IndexedConsensusRequest(targetIndex, 
tmpNodes));
-              tmpNodes = Collections.emptyList();
-              break;
-            } else {
-              // read until one node has different search index
-              while (walByteBufReader.hasNext()) {
-                ByteBuffer buffer = walByteBufReader.next();
-                WALEntryType type = WALEntryType.valueOf(buffer.get());
-                if (type.needSearch()) {
-                  // see WALInfoEntry#serialize, entry type + memtable id + 
plan node type
-                  buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE + 
PlanNodeType.BYTES);
-                  long currentIndex = buffer.getLong();
-                  buffer.clear();
-                  if (currentIndex == targetIndex) {
-                    tmpNodes.add(new IoTConsensusRequest(buffer));
-                  } else { // find all slices of plan node
-                    insertNodes.add(new IndexedConsensusRequest(targetIndex, 
tmpNodes));
-                    tmpNodes = Collections.emptyList();
-                    break;
-                  }
-                } else { // find all slices of plan node
-                  insertNodes.add(new IndexedConsensusRequest(targetIndex, 
tmpNodes));
-                  tmpNodes = Collections.emptyList();
-                  break;
+        try (WALByteBufReader walByteBufReader =
+            new WALByteBufReader(filesToSearch[currentFileIndex])) {
+          while (walByteBufReader.hasNext()) {
+            ByteBuffer buffer = walByteBufReader.next();
+            WALEntryType type = WALEntryType.valueOf(buffer.get());
+            if (type.needSearch()) {
+              // see WALInfoEntry#serialize, entry type + memtable id + plan 
node type
+              buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE + 
PlanNodeType.BYTES);
+              final long currentWalEntryIndex = buffer.getLong();
+              buffer.clear();
+              if (currentWalEntryIndex == -1) {
+                // WAL entry of targetIndex has been fully collected, so put 
them into insertNodes
+                tryToCollectInsertNodeAndBumpIndex.run();
+              } else if (currentWalEntryIndex < nextSearchIndex) {
+                // WAL entry is outdated, do nothing, continue to see next WAL 
entry
+              } else if (currentWalEntryIndex == nextSearchIndex) {
+                tmpNodes.get().add(new IoTConsensusRequest(buffer));
+              } else {
+                // currentWalEntryIndex > targetIndex
+                // WAL entry of targetIndex has been fully collected, put them 
into insertNodes
+                tryToCollectInsertNodeAndBumpIndex.run();
+                if (currentWalEntryIndex != nextSearchIndex) {
+                  logger.warn(
+                      "The search index of next WAL entry should be {}, but 
actually it's {}",
+                      nextSearchIndex,
+                      currentWalEntryIndex);
+                  nextSearchIndex = currentWalEntryIndex;
                 }
+                tmpNodes.get().add(new IoTConsensusRequest(buffer));
               }
+            } else {
+              tryToCollectInsertNodeAndBumpIndex.run();
+            }
+            if (hasCollectedSufficientData.get()) {
+              break COLLECT_FILE_LOOP;

Review Comment:
   I mean, can this style of loop be gracefully replaced?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to