This is an automated email from the ASF dual-hosted git repository.

mkataria pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 92bb0b0518 Oak-10533: Merging Incremental Store failure: Operations, 
delete a node and adding node again is dumped as added node (#1189)
92bb0b0518 is described below

commit 92bb0b0518777318b08e19b43757e790784c7058
Author: Mohit Kataria <[email protected]>
AuthorDate: Thu Nov 16 11:42:40 2023 +0530

    Oak-10533: Merging Incremental Store failure: Operations, delete a node and 
adding node again is dumped as added node (#1189)
    
    * workaround for not consistent checkpoint diff
    
    * added log for bad deletion operand
    
    * updated comment
    
    * formatting
    
    * OAK-10533: updated warn messages
---
 .../MergeIncrementalFlatFileStore.java             | 72 +++++++++++++++++-----
 1 file changed, 58 insertions(+), 14 deletions(-)

diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/MergeIncrementalFlatFileStore.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/MergeIncrementalFlatFileStore.java
index b6cc8ecc6b..485105fa18 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/MergeIncrementalFlatFileStore.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/MergeIncrementalFlatFileStore.java
@@ -96,6 +96,13 @@ public class MergeIncrementalFlatFileStore implements 
MergeIncrementalStore {
         }
     }
 
+    /**
+     * Merges multiple index store files.
+     *
+     * This method is a little verbose, but I think this is fine
+     * as we are not getting consistent data from checkpoint diff
+     * and we need to handle cases differently.
+     */
     private void mergeIndexStoreFiles() throws IOException {
         Map<String, IncrementalStoreOperand> enumMap = 
Arrays.stream(IncrementalStoreOperand.values())
                 
.collect(Collectors.toUnmodifiableMap(IncrementalStoreOperand::toString, k -> 
IncrementalStoreOperand.valueOf(k.name())));
@@ -113,18 +120,24 @@ public class MergeIncrementalFlatFileStore implements 
MergeIncrementalStore {
                     compared = comparator.compare(new 
SimpleNodeStateHolder(baseFFSLine), new 
SimpleNodeStateHolder(incrementalFFSLine));
                     if (compared < 0) { // write baseFFSLine in merged file 
and advance line in baseFFS
                         baseFFSLine = writeAndAdvance(writer, 
baseFFSBufferedReader, baseFFSLine);
-                    } else if (compared > 0) { // write incrementalFFSline and 
advance line in incrementalFFS
-                        String[] incrementalFFSParts = 
IncrementalFlatFileStoreNodeStateEntryWriter.getParts(incrementalFFSLine);
-                        if 
(!IncrementalStoreOperand.ADD.toString().equals(getOperand(incrementalFFSParts)))
 {
-                            log.warn("Expected operand {} but got {} for 
incremental line {}. Merging will proceed as usual, but this needs to be looked 
into.",
-                                    IncrementalStoreOperand.ADD, 
getOperand(incrementalFFSParts), incrementalFFSLine);
-                        }
-                        incrementalFFSLine = writeAndAdvance(writer, 
incrementalFFSBufferedReader,
-                                
getFFSLineFromIncrementalFFSParts(incrementalFFSParts));
+                    }
+                    // We are adding warn logs instead of checkState e.g.
+                    // 1- delete a node
+                    // 2- add node at same path.
+                    // The incremental FFS with above operations are dumped as 
node added instead of modified.
+                    else if (compared > 0) { // write incrementalFFSline and 
advance line in incrementalFFS
+                        incrementalFFSLine = 
processIncrementalFFSLine(enumMap, writer, incrementalFFSBufferedReader, 
incrementalFFSLine);
                     } else {
                         String[] incrementalFFSParts = 
IncrementalFlatFileStoreNodeStateEntryWriter.getParts(incrementalFFSLine);
                         String operand = getOperand(incrementalFFSParts);
                         switch (enumMap.get(operand)) {
+                            case ADD:
+                                log.warn("Expected operand {} or {} but got {} 
for incremental line {}. " +
+                                                "Merging will proceed, but 
this is unexpected.",
+                                        IncrementalStoreOperand.MODIFY, 
IncrementalStoreOperand.DELETE, getOperand(incrementalFFSParts), 
incrementalFFSLine);
+                                incrementalFFSLine = writeAndAdvance(writer, 
incrementalFFSBufferedReader,
+                                        
getFFSLineFromIncrementalFFSParts(incrementalFFSParts));
+                                break;
                             case MODIFY:
                                 incrementalFFSLine = writeAndAdvance(writer, 
incrementalFFSBufferedReader,
                                         
getFFSLineFromIncrementalFFSParts(incrementalFFSParts));
@@ -149,6 +162,40 @@ public class MergeIncrementalFlatFileStore implements 
MergeIncrementalStore {
         }
     }
 
+    private String processIncrementalFFSLine(Map<String, 
IncrementalStoreOperand> enumMap, BufferedWriter writer,
+                                             BufferedReader 
incrementalFFSBufferedReader, String incrementalFFSLine) throws IOException {
+        String[] incrementalFFSParts = 
IncrementalFlatFileStoreNodeStateEntryWriter.getParts(incrementalFFSLine);
+        String operand = getOperand(incrementalFFSParts);
+        switch (enumMap.get(operand)) {
+            case ADD:
+                incrementalFFSLine = writeAndAdvance(writer, 
incrementalFFSBufferedReader,
+                        
getFFSLineFromIncrementalFFSParts(incrementalFFSParts));
+                break;
+            case MODIFY:
+                // this case should not happen. But in case this happens we 
consider modify as addition of node
+                // this implies node is not present in older FFS and in 
checkpointdiff this came as modified instead of
+                // node addition.
+                log.warn("Expected operand {} but got {} for incremental line 
{}. " +
+                                "Merging will proceed, but this is 
unexpected.",
+                        IncrementalStoreOperand.ADD, operand, 
incrementalFFSLine);
+                incrementalFFSLine = writeAndAdvance(writer, 
incrementalFFSBufferedReader,
+                        
getFFSLineFromIncrementalFFSParts(incrementalFFSParts));
+                break;
+            case DELETE:
+                // This case should not happen. If this happens, it means we 
don't have any such node in baseFFS
+                // but this node came as deletion of node for an already 
non-existing node.
+                // we just skip this node in this case.
+                log.warn("Expected operand {} but got {} for incremental line 
{}. Merging will proceed as usual, but this needs to be looked into.",
+                        IncrementalStoreOperand.ADD, operand, 
incrementalFFSLine);
+                incrementalFFSLine = incrementalFFSBufferedReader.readLine();
+                break;
+            default:
+                log.error("Wrong operand in incremental ffs: operand:{}, 
line:{}", operand, incrementalFFSLine);
+                throw new RuntimeException("wrong operand in incremental ffs: 
operand:" + operand + ", line:" + incrementalFFSLine);
+        }
+        return incrementalFFSLine;
+    }
+
     private IndexStoreMetadata getIndexStoreMetadataForMergedFile() throws 
IOException {
         File baseFFSMetadataFile = IndexStoreUtils.getMetadataFile(baseFFS, 
algorithm);
         File incrementalMetadataFile = 
IndexStoreUtils.getMetadataFile(incrementalFFS, algorithm);
@@ -202,13 +249,10 @@ public class MergeIncrementalFlatFileStore implements 
MergeIncrementalStore {
     }
 
     private String writeRestOfIncrementalFileAndAdvance(BufferedWriter writer, 
BufferedReader bufferedReader, String incrementalFFSLine) throws IOException {
+        Map<String, IncrementalStoreOperand> enumMap = 
Arrays.stream(IncrementalStoreOperand.values())
+                
.collect(Collectors.toUnmodifiableMap(IncrementalStoreOperand::toString, k -> 
IncrementalStoreOperand.valueOf(k.name())));
         do {
-            String[] incrementalFFSParts = 
IncrementalFlatFileStoreNodeStateEntryWriter.getParts(incrementalFFSLine);
-            String operand = getOperand(incrementalFFSParts);
-            
checkState(!IncrementalStoreOperand.MODIFY.toString().equals(operand)
-                            && 
!IncrementalStoreOperand.DELETE.toString().equals(operand),
-                    "incremental ffs should not have modify or delete 
operands: {}", incrementalFFSLine);
-            incrementalFFSLine = writeAndAdvance(writer, bufferedReader, 
getFFSLineFromIncrementalFFSParts(incrementalFFSParts));
+            incrementalFFSLine = processIncrementalFFSLine(enumMap, writer, 
bufferedReader, incrementalFFSLine);
         } while (incrementalFFSLine != null);
         return bufferedReader.readLine();
     }

Reply via email to