This is an automated email from the ASF dual-hosted git repository.
mkataria pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new 92bb0b0518 Oak-10533: Merging Incremental Store failure: Operations,
delete a node and adding node again is dumped as added node (#1189)
92bb0b0518 is described below
commit 92bb0b0518777318b08e19b43757e790784c7058
Author: Mohit Kataria <[email protected]>
AuthorDate: Thu Nov 16 11:42:40 2023 +0530
Oak-10533: Merging Incremental Store failure: Operations, delete a node and
adding node again is dumped as added node (#1189)
* workaround for not consistent checkpoint diff
* added log for bad deletion operand
* updated comment
* formatting
* OAK-10533: updated warn messages
---
.../MergeIncrementalFlatFileStore.java | 72 +++++++++++++++++-----
1 file changed, 58 insertions(+), 14 deletions(-)
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/MergeIncrementalFlatFileStore.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/MergeIncrementalFlatFileStore.java
index b6cc8ecc6b..485105fa18 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/MergeIncrementalFlatFileStore.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/MergeIncrementalFlatFileStore.java
@@ -96,6 +96,13 @@ public class MergeIncrementalFlatFileStore implements
MergeIncrementalStore {
}
}
+ /**
+ * Merges multiple index store files.
+ *
+ * This method is a little verbose, but I think this is fine
+ * as we are not getting consistent data from checkpoint diff
+ * and we need to handle cases differently.
+ */
private void mergeIndexStoreFiles() throws IOException {
Map<String, IncrementalStoreOperand> enumMap =
Arrays.stream(IncrementalStoreOperand.values())
.collect(Collectors.toUnmodifiableMap(IncrementalStoreOperand::toString, k ->
IncrementalStoreOperand.valueOf(k.name())));
@@ -113,18 +120,24 @@ public class MergeIncrementalFlatFileStore implements
MergeIncrementalStore {
compared = comparator.compare(new
SimpleNodeStateHolder(baseFFSLine), new
SimpleNodeStateHolder(incrementalFFSLine));
if (compared < 0) { // write baseFFSLine in merged file
and advance line in baseFFS
baseFFSLine = writeAndAdvance(writer,
baseFFSBufferedReader, baseFFSLine);
- } else if (compared > 0) { // write incrementalFFSline and
advance line in incrementalFFS
- String[] incrementalFFSParts =
IncrementalFlatFileStoreNodeStateEntryWriter.getParts(incrementalFFSLine);
- if
(!IncrementalStoreOperand.ADD.toString().equals(getOperand(incrementalFFSParts)))
{
- log.warn("Expected operand {} but got {} for
incremental line {}. Merging will proceed as usual, but this needs to be looked
into.",
- IncrementalStoreOperand.ADD,
getOperand(incrementalFFSParts), incrementalFFSLine);
- }
- incrementalFFSLine = writeAndAdvance(writer,
incrementalFFSBufferedReader,
-
getFFSLineFromIncrementalFFSParts(incrementalFFSParts));
+ }
+ // We are adding warn logs instead of checkState e.g.
+ // 1- delete a node
+ // 2- add node at same path.
+ // The incremental FFS with above operations are dumped as
node added instead of modified.
+ else if (compared > 0) { // write incrementalFFSline and
advance line in incrementalFFS
+ incrementalFFSLine =
processIncrementalFFSLine(enumMap, writer, incrementalFFSBufferedReader,
incrementalFFSLine);
} else {
String[] incrementalFFSParts =
IncrementalFlatFileStoreNodeStateEntryWriter.getParts(incrementalFFSLine);
String operand = getOperand(incrementalFFSParts);
switch (enumMap.get(operand)) {
+ case ADD:
+ log.warn("Expected operand {} or {} but got {}
for incremental line {}. " +
+ "Merging will proceed, but
this is unexpected.",
+ IncrementalStoreOperand.MODIFY,
IncrementalStoreOperand.DELETE, getOperand(incrementalFFSParts),
incrementalFFSLine);
+ incrementalFFSLine = writeAndAdvance(writer,
incrementalFFSBufferedReader,
+
getFFSLineFromIncrementalFFSParts(incrementalFFSParts));
+ break;
case MODIFY:
incrementalFFSLine = writeAndAdvance(writer,
incrementalFFSBufferedReader,
getFFSLineFromIncrementalFFSParts(incrementalFFSParts));
@@ -149,6 +162,40 @@ public class MergeIncrementalFlatFileStore implements
MergeIncrementalStore {
}
}
+ private String processIncrementalFFSLine(Map<String,
IncrementalStoreOperand> enumMap, BufferedWriter writer,
+ BufferedReader
incrementalFFSBufferedReader, String incrementalFFSLine) throws IOException {
+ String[] incrementalFFSParts =
IncrementalFlatFileStoreNodeStateEntryWriter.getParts(incrementalFFSLine);
+ String operand = getOperand(incrementalFFSParts);
+ switch (enumMap.get(operand)) {
+ case ADD:
+ incrementalFFSLine = writeAndAdvance(writer,
incrementalFFSBufferedReader,
+
getFFSLineFromIncrementalFFSParts(incrementalFFSParts));
+ break;
+ case MODIFY:
+ // this case should not happen. But in case this happens we
consider modify as addition of node
+ // this implies node is not present in older FFS and in
checkpointdiff this came as modified instead of
+ // node addition.
+ log.warn("Expected operand {} but got {} for incremental line
{}. " +
+ "Merging will proceed, but this is
unexpected.",
+ IncrementalStoreOperand.ADD, operand,
incrementalFFSLine);
+ incrementalFFSLine = writeAndAdvance(writer,
incrementalFFSBufferedReader,
+
getFFSLineFromIncrementalFFSParts(incrementalFFSParts));
+ break;
+ case DELETE:
+ // This case should not happen. If this happens, it means we
don't have any such node in baseFFS
+ // but this node came as deletion of node for an already
non-existing node.
+ // we just skip this node in this case.
+ log.warn("Expected operand {} but got {} for incremental line
{}. Merging will proceed as usual, but this needs to be looked into.",
+ IncrementalStoreOperand.ADD, operand,
incrementalFFSLine);
+ incrementalFFSLine = incrementalFFSBufferedReader.readLine();
+ break;
+ default:
+ log.error("Wrong operand in incremental ffs: operand:{},
line:{}", operand, incrementalFFSLine);
+ throw new RuntimeException("wrong operand in incremental ffs:
operand:" + operand + ", line:" + incrementalFFSLine);
+ }
+ return incrementalFFSLine;
+ }
+
private IndexStoreMetadata getIndexStoreMetadataForMergedFile() throws
IOException {
File baseFFSMetadataFile = IndexStoreUtils.getMetadataFile(baseFFS,
algorithm);
File incrementalMetadataFile =
IndexStoreUtils.getMetadataFile(incrementalFFS, algorithm);
@@ -202,13 +249,10 @@ public class MergeIncrementalFlatFileStore implements
MergeIncrementalStore {
}
private String writeRestOfIncrementalFileAndAdvance(BufferedWriter writer,
BufferedReader bufferedReader, String incrementalFFSLine) throws IOException {
+ Map<String, IncrementalStoreOperand> enumMap =
Arrays.stream(IncrementalStoreOperand.values())
+
.collect(Collectors.toUnmodifiableMap(IncrementalStoreOperand::toString, k ->
IncrementalStoreOperand.valueOf(k.name())));
do {
- String[] incrementalFFSParts =
IncrementalFlatFileStoreNodeStateEntryWriter.getParts(incrementalFFSLine);
- String operand = getOperand(incrementalFFSParts);
-
checkState(!IncrementalStoreOperand.MODIFY.toString().equals(operand)
- &&
!IncrementalStoreOperand.DELETE.toString().equals(operand),
- "incremental ffs should not have modify or delete
operands: {}", incrementalFFSLine);
- incrementalFFSLine = writeAndAdvance(writer, bufferedReader,
getFFSLineFromIncrementalFFSParts(incrementalFFSParts));
+ incrementalFFSLine = processIncrementalFFSLine(enumMap, writer,
bufferedReader, incrementalFFSLine);
} while (incrementalFFSLine != null);
return bufferedReader.readLine();
}