liyuheng55555 commented on code in PR #13013:
URL: https://github.com/apache/iotdb/pull/13013#discussion_r1689577598
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java:
##########
@@ -702,126 +704,79 @@ public boolean hasNext() {
}
}
- // find all nodes of current wal file
- List<IConsensusRequest> tmpNodes = new ArrayList<>();
- long targetIndex = nextSearchIndex;
- try (WALByteBufReader walByteBufReader =
- new WALByteBufReader(filesToSearch[currentFileIndex])) {
- while (walByteBufReader.hasNext()) {
- ByteBuffer buffer = walByteBufReader.next();
- WALEntryType type = WALEntryType.valueOf(buffer.get());
- if (type.needSearch()) {
- // see WALInfoEntry#serialize, entry type + memtable id + plan
node type
- buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE +
PlanNodeType.BYTES);
- long currentIndex = buffer.getLong();
- buffer.clear();
- if (currentIndex == targetIndex) {
- tmpNodes.add(new IoTConsensusRequest(buffer));
- } else {
- // different search index, all slices found
- if (!tmpNodes.isEmpty()) {
- insertNodes.add(new IndexedConsensusRequest(targetIndex,
tmpNodes));
- tmpNodes = new ArrayList<>();
- }
- // remember to add current plan node
- if (currentIndex > targetIndex) {
- tmpNodes.add(new IoTConsensusRequest(buffer));
- targetIndex = currentIndex;
+ /* ------ find all nodes from all wal file ------ */
+
+ AtomicReference<List<IConsensusRequest>> tmpNodes = new
AtomicReference<>(new ArrayList<>());
+ AtomicBoolean notFirstFile = new AtomicBoolean(false);
+ AtomicBoolean hasCollectedSufficientData = new AtomicBoolean(false);
+
+ // try to collect current tmpNodes to insertNodes, return true if
successfully collect an
+ // insert node
+ Runnable tryToCollectInsertNodeAndBumpIndex =
+ () -> {
+ if (!tmpNodes.get().isEmpty()) {
+ insertNodes.add(new IndexedConsensusRequest(nextSearchIndex,
tmpNodes.get()));
+ tmpNodes.set(new ArrayList<>());
+ nextSearchIndex++;
+ if (notFirstFile.get()) {
+ hasCollectedSufficientData.set(true);
}
}
- } else if (!tmpNodes.isEmpty()) {
- // next entry doesn't need to be searched, all slices found
- insertNodes.add(new IndexedConsensusRequest(targetIndex,
tmpNodes));
- targetIndex++;
- tmpNodes = new ArrayList<>();
- }
- }
- } catch (FileNotFoundException e) {
- logger.debug(
- "WAL file {} has been deleted, try to find next {} again.",
- identifier,
- nextSearchIndex);
- reset();
- return hasNext();
- } catch (Exception e) {
- brokenFileId =
WALFileUtils.parseVersionId(filesToSearch[currentFileIndex].getName());
- logger.error(
- "Fail to read wal from wal file {}, skip this file.",
- filesToSearch[currentFileIndex],
- e);
- // skip this file when it's broken from the beginning
- if (insertNodes.isEmpty() && tmpNodes.isEmpty()) {
- currentFileIndex++;
- return hasNext();
+ };
+
+ COLLECT_FILE_LOOP:
+ for (; currentFileIndex < filesToSearch.length - 1; currentFileIndex++) {
+ // cannot find any in this file, so all slices of last plan node are
found
+ if
(WALFileUtils.parseStatusCode(filesToSearch[currentFileIndex].getName())
+ == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
+ tryToCollectInsertNodeAndBumpIndex.run();
+ continue;
}
- }
-
- // find remaining slices of last plan node of targetIndex
- if (tmpNodes.isEmpty()) { // all plan nodes scanned
- currentFileIndex++;
- } else {
- int fileIndex = currentFileIndex + 1;
- while (!tmpNodes.isEmpty() && fileIndex < filesToSearch.length - 1) {
- // cannot find any in this file, so all slices of last plan node are
found
- if (WALFileUtils.parseStatusCode(filesToSearch[fileIndex].getName())
- == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX) {
- insertNodes.add(new IndexedConsensusRequest(targetIndex,
tmpNodes));
- tmpNodes = Collections.emptyList();
- break;
- }
- // read until find all plan nodes whose search index equals target
index
- try (WALByteBufReader walByteBufReader = new
WALByteBufReader(filesToSearch[fileIndex])) {
- // first search index are different, so all slices of last plan
node are found
- if (walByteBufReader.getFirstSearchIndex() != targetIndex) {
- insertNodes.add(new IndexedConsensusRequest(targetIndex,
tmpNodes));
- tmpNodes = Collections.emptyList();
- break;
- } else {
- // read until one node has different search index
- while (walByteBufReader.hasNext()) {
- ByteBuffer buffer = walByteBufReader.next();
- WALEntryType type = WALEntryType.valueOf(buffer.get());
- if (type.needSearch()) {
- // see WALInfoEntry#serialize, entry type + memtable id +
plan node type
- buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE +
PlanNodeType.BYTES);
- long currentIndex = buffer.getLong();
- buffer.clear();
- if (currentIndex == targetIndex) {
- tmpNodes.add(new IoTConsensusRequest(buffer));
- } else { // find all slices of plan node
- insertNodes.add(new IndexedConsensusRequest(targetIndex,
tmpNodes));
- tmpNodes = Collections.emptyList();
- break;
- }
- } else { // find all slices of plan node
- insertNodes.add(new IndexedConsensusRequest(targetIndex,
tmpNodes));
- tmpNodes = Collections.emptyList();
- break;
+ try (WALByteBufReader walByteBufReader =
+ new WALByteBufReader(filesToSearch[currentFileIndex])) {
+ while (walByteBufReader.hasNext()) {
+ ByteBuffer buffer = walByteBufReader.next();
+ WALEntryType type = WALEntryType.valueOf(buffer.get());
+ if (type.needSearch()) {
+ // see WALInfoEntry#serialize, entry type + memtable id + plan
node type
+ buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE +
PlanNodeType.BYTES);
+ final long currentWalEntryIndex = buffer.getLong();
+ buffer.clear();
+ if (currentWalEntryIndex == -1) {
+ // WAL entry of targetIndex has been fully collected, so put
them into insertNodes
+ tryToCollectInsertNodeAndBumpIndex.run();
+ } else if (currentWalEntryIndex < nextSearchIndex) {
+ // WAL entry is outdated, do nothing, continue to see next WAL
entry
+ } else if (currentWalEntryIndex == nextSearchIndex) {
+ tmpNodes.get().add(new IoTConsensusRequest(buffer));
+ } else {
+ // currentWalEntryIndex > targetIndex
+ // WAL entry of targetIndex has been fully collected, put them
into insertNodes
+ tryToCollectInsertNodeAndBumpIndex.run();
+ if (currentWalEntryIndex != nextSearchIndex) {
+ logger.warn(
+ "The search index of next WAL entry should be {}, but
actually it's {}",
+ nextSearchIndex,
+ currentWalEntryIndex);
+ nextSearchIndex = currentWalEntryIndex;
}
+ tmpNodes.get().add(new IoTConsensusRequest(buffer));
}
+ } else {
+ tryToCollectInsertNodeAndBumpIndex.run();
+ }
+ if (hasCollectedSufficientData.get()) {
+ break COLLECT_FILE_LOOP;
Review Comment:
The outside loop is for traversing files, the inside loop is for traversing
WAL entries of one file. Why do you think one of them needs to be removed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]