This is an automated email from the ASF dual-hosted git repository.

fortino pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 546e889da6 OAK-10460 - PIPELINED strategy: support eager merging of 
intermediate sorted files (#1156)
546e889da6 is described below

commit 546e889da6782d8b67f5c9427bdfd3e496e66f7f
Author: Nuno Santos <[email protected]>
AuthorDate: Thu Oct 26 09:34:05 2023 +0200

    OAK-10460 - PIPELINED strategy: support eager merging of intermediate 
sorted files (#1156)
    
    * Refactor Indexing logic to take only a MongoDatabase instance instead of 
a MongoConnection, which is a more complex object with more fields other than 
the MongoDatabase.
    
    * Remove unused getter.
    
    * Add eager merging of intermediate sorted files in Pipeline indexing 
strategy.
    Add an optimized variant of external merge which avoids the conversion from 
binary to UTF-8 and then back to binary of the lines read from the files being 
merged.
    
    * Retrigger CI.
    
    * Add missing license files.
    
    * Improve documentation and refactor code.
    Fix wrong termination condition when computing list of candidate files to 
merge.
    
    * Make internal helper class private.
    
    * Improve docs, refactor test to use Path instead of File.
    
    * Remove uses of deprecated methods.
    
    * Clarify purpose of disabled test.
    
    * Address review comments.
    
    * Improve logging of byte sizes.
    
    * Remove obsolete comment.
    
    * Reduce the default of the minimum number of files to merge from 8 to 4.
    
    * Trigger CI.
    
    * - Add system property to configure size of read buffer in external merge 
sort: "oak.indexer.pipelined.externalMerge.readBufferSize", default 16KB
    - Fix: missing explicit charset when converting from byte array to String.
    
    * Fix
    
    * Retrigger CI.
    
    * Retrigger CI.
    
    * Increase version of oak.commons.sort
    
    * Add test to ExternalSortByteArray
---
 .../jackrabbit/oak/commons/sort/ExternalSort.java  |   2 +-
 .../oak/commons/sort/ExternalSortByteArray.java    | 223 +++++++++++++++++
 .../jackrabbit/oak/commons/sort/package-info.java  |   2 +-
 .../commons/sort/ExternalSortByteArrayTest.java    | 183 ++++++++++++++
 .../oak/commons/sort/ExternalSortTest.java         |   4 +-
 .../document/flatfile/FlatFileStoreUtils.java      |   6 -
 .../flatfile/pipelined/NodeStateHolder.java        |  23 +-
 .../flatfile/pipelined/NodeStateHolderFactory.java |  84 +++++++
 .../flatfile/pipelined/PathElementComparator.java  |   7 +-
 .../flatfile/pipelined/PipelinedMergeSortTask.java | 273 +++++++++++++++++----
 .../flatfile/pipelined/PipelinedSortBatchTask.java |  25 +-
 .../flatfile/pipelined/PipelinedStrategy.java      |  32 ++-
 .../document/indexstore/IndexStoreUtils.java       |   7 +
 .../document/flatfile/pipelined/PipelinedIT.java   |  10 +-
 .../PipelinedMergeSortTaskParameterizedTest.java   | 117 +++++++++
 .../pipelined/PipelinedMergeSortTaskTest.java      | 132 ++++++----
 .../pipelined/PipelinedMergeSortTaskTestBase.java  | 129 ++++++++++
 .../pipelined/PipelinedSortBatchTaskTest.java      |  39 +--
 18 files changed, 1123 insertions(+), 175 deletions(-)

diff --git 
a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSort.java
 
b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSort.java
index 03a6404009..706bae7aca 100644
--- 
a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSort.java
+++ 
b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSort.java
@@ -627,7 +627,7 @@ public class ExternalSort {
                 // Write if  filterPredicate return true and line is not 
duplicate
                 if (!distinct || (lastLine == null || (lastLine != null && 
cmp.compare(r, lastLine) != 0))) {
                     fbw.write(typeToString.apply(r));
-                    fbw.newLine();
+                    fbw.write('\n');
                     lastLine = r;
                 }
             }
diff --git 
a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortByteArray.java
 
b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortByteArray.java
new file mode 100644
index 0000000000..10c3d7735e
--- /dev/null
+++ 
b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortByteArray.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.commons.sort;
+
+import org.apache.jackrabbit.guava.common.base.Preconditions;
+import org.apache.jackrabbit.oak.commons.Compression;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.function.Function;
+
+/**
+ * Variation of ExternalSort that stores the lines read from intermediate 
files as byte arrays to avoid the conversion
+ * from byte[] to String and then back.
+ */
+public class ExternalSortByteArray {
+    private final static int DEFAULT_BUFFER_SIZE = 16 * 1024;
+
+    public static <T> void mergeSortedFilesBinary(List<Path> files, 
BufferedOutputStream fbw, final Comparator<T> cmp,
+                                                  boolean distinct, 
Compression algorithm,
+                                                  Function<T, byte[]> 
typeToByteArray, Function<byte[], T> byteArrayToType)
+            throws IOException {
+        mergeSortedFilesBinary(files, fbw, cmp, distinct, algorithm, 
typeToByteArray, byteArrayToType, DEFAULT_BUFFER_SIZE);
+    }
+
+    public static <T> void mergeSortedFilesBinary(List<Path> files, 
BufferedOutputStream fbw, final Comparator<T> cmp,
+                                                  boolean distinct, 
Compression algorithm,
+                                                  Function<T, byte[]> 
typeToByteArray, Function<byte[], T> byteArrayToType, int readBufferSize)
+            throws IOException {
+        ArrayList<BinaryFileBuffer<T>> bfbs = new ArrayList<>();
+        try {
+            for (Path f : files) {
+                InputStream in = 
algorithm.getInputStream(Files.newInputStream(f));
+                bfbs.add(new BinaryFileBuffer<>(in, byteArrayToType, 
readBufferSize));
+            }
+            mergeBinary(fbw, cmp, distinct, bfbs, typeToByteArray);
+        } finally {
+            for (BinaryFileBuffer<T> buffer : bfbs) {
+                try {
+                    buffer.close();
+                } catch (Exception ignored) {
+                }
+            }
+            for (Path f : files) {
+                Files.deleteIfExists(f);
+            }
+        }
+    }
+
+    private static <T> int mergeBinary(BufferedOutputStream fbw, final 
Comparator<T> cmp, boolean distinct,
+                                       List<BinaryFileBuffer<T>> buffers, 
Function<T, byte[]> typeToByteArray)
+            throws IOException {
+        PriorityQueue<BinaryFileBuffer<T>> pq = new PriorityQueue<>(
+                11,
+                (i, j) -> cmp.compare(i.peek(), j.peek())
+        );
+        for (BinaryFileBuffer<T> bfb : buffers) {
+            if (!bfb.empty()) {
+                pq.add(bfb);
+            }
+        }
+        int rowcounter = 0;
+        T lastLine = null;
+        while (!pq.isEmpty()) {
+            BinaryFileBuffer<T> bfb = pq.poll();
+            T r = bfb.pop();
+            // Skip duplicate lines
+            if (!distinct || lastLine == null || cmp.compare(r, lastLine) != 
0) {
+                fbw.write(typeToByteArray.apply(r));
+                fbw.write('\n');
+                lastLine = r;
+            }
+            ++rowcounter;
+            if (bfb.empty()) {
+                bfb.fbr.close();
+            } else {
+                pq.add(bfb); // add it back
+            }
+        }
+        return rowcounter;
+    }
+
+    /**
+     * WARNING: Uses '\n' as a line separator, it will not work with other 
line separators.
+     */
+    private static class BinaryFileBuffer<T> {
+        public final InputStream fbr;
+        private final Function<byte[], T> byteArrayToType;
+        private T cache;
+        private boolean empty;
+
+        // Used to reassemble the lines read from the source input stream.
+        private final ByteArrayOutputStream bais = new ByteArrayOutputStream();
+        private final byte[] buffer;
+        private int bufferPos = 0;
+        private int bufferLimit = 0;
+
+        public BinaryFileBuffer(InputStream r, Function<byte[], T> 
byteArrayToType, int bufferSize)
+                throws IOException {
+            Preconditions.checkArgument(bufferSize > 1024, "Buffer size must 
be greater than 1024 bytes");
+            this.fbr = r;
+            this.byteArrayToType = byteArrayToType;
+            this.buffer = new byte[bufferSize];
+            reload();
+        }
+
+        public boolean empty() {
+            return this.empty;
+        }
+
+        private void reload() throws IOException {
+            try {
+                byte[] line = readLine();
+                this.cache = byteArrayToType.apply(line);
+                this.empty = this.cache == null;
+            } catch (EOFException oef) {
+                this.empty = true;
+                this.cache = null;
+            }
+        }
+
+        private boolean bufferIsEmpty() {
+            return bufferPos >= bufferLimit;
+        }
+
+        /*
+         * Read a line from the source input as a byte array. This is adapted 
from the implementation of
+         * BufferedReader#readLine() but without converting the line to a 
String.
+         */
+        private byte[] readLine() throws IOException {
+            bais.reset();
+
+            for (; ; ) {
+                if (bufferIsEmpty()) {
+                    bufferLimit = fbr.read(buffer);
+                    bufferPos = 0;
+                }
+                if (bufferIsEmpty()) { /* EOF */
+                    // Buffer is still empty even after trying to read from 
input stream. We have reached the EOF
+                    // Return whatever is left on the bais
+                    if (bais.size() == 0) {
+                        return null;
+                    } else {
+                        return bais.toByteArray();
+                    }
+                }
+                // Start reading a new line
+                int startByte = bufferPos;
+                while (!bufferIsEmpty()) {
+                    byte c = buffer[bufferPos];
+                    bufferPos++;
+                    if (c == '\n') { /* EOL */
+                        int lineSegmentSize = bufferPos - startByte - 1; // 
exclude \n
+                        if (bais.size() == 0) {
+                            // There is no partial data on the bais, which 
means that the whole line is in the
+                            // buffer. In this case, we can extract the line 
directly from the buffer without
+                            // copying first to the bais
+                            if (lineSegmentSize == 0) {
+                                return null;
+                            } else {
+                                // Copy the line from the buffer to a new byte 
array and return it
+                                byte[] line = new byte[lineSegmentSize];
+                                System.arraycopy(buffer, startByte, line, 0, 
lineSegmentSize);
+                                return line;
+                            }
+                        } else {
+                            // The first section of the line is in the bais, 
the remainder in the buffer. Finish
+                            // reassembling the line in the bais and return it
+                            bais.write(buffer, startByte, lineSegmentSize);
+                            if (bais.size() == 0) {
+                                return null;
+                            } else {
+                                return bais.toByteArray();
+                            }
+                        }
+                    }
+                }
+                // Reached the end of the buffer. Copy whatever is left in the 
buffer and read more from the source stream
+                bais.write(buffer, startByte, bufferPos - startByte);
+            }
+        }
+
+        public void close() throws IOException {
+            this.fbr.close();
+        }
+
+        public T peek() {
+            if (empty()) {
+                return null;
+            }
+            return this.cache;
+        }
+
+        public T pop() throws IOException {
+            T answer = peek();
+            reload();
+            return answer;
+        }
+    }
+}
diff --git 
a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java
 
b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java
index 37c743fdaa..10f91cc554 100644
--- 
a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java
+++ 
b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-@Version("1.2.0")
+@Version("1.3.0")
 package org.apache.jackrabbit.oak.commons.sort;
 
 import org.osgi.annotation.versioning.Version;
diff --git 
a/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortByteArrayTest.java
 
b/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortByteArrayTest.java
new file mode 100644
index 0000000000..3279a582a6
--- /dev/null
+++ 
b/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortByteArrayTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.commons.sort;
+
+import org.apache.jackrabbit.oak.commons.Compression;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertArrayEquals;
+
+public class ExternalSortByteArrayTest {
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+    private final static Charset charset = StandardCharsets.UTF_8;
+    private final static Comparator<BinaryTestLine> cmp = 
Comparator.naturalOrder();
+    private final static Function<byte[], BinaryTestLine> byteArrayToType = 
line -> line != null ? new BinaryTestLine(line) : null;
+    private final static Function<BinaryTestLine, byte[]> typeToByteArray = tl 
-> tl != null ? tl.bytes : null;
+
+    @Test
+    public void sortManyFilesNoCompression() throws Exception {
+        sortManyFiles(Compression.NONE);
+    }
+
+    @Test
+    public void sortManyFilesGzipCompression() throws Exception {
+        sortManyFiles(Compression.GZIP);
+    }
+
+    @Test
+    public void sortManyFilesLZ4Compression() throws Exception {
+        sortManyFiles(ExternalSortTest.LZ4());
+    }
+
+    public void sortManyFiles(Compression compression) throws Exception {
+        int testCount = 1000;
+        List<BinaryTestLine> testLines = generateTestLines(testCount);
+
+        List<BinaryTestLine> testLinesShuffled = new ArrayList<>(testLines);
+        Collections.shuffle(testLinesShuffled);
+
+        List<Path> intermediateFiles = 
createIntermediateFiles(testLinesShuffled, 10, compression);
+        Path resultFile = 
folder.newFile(compression.addSuffix("sorted.json")).toPath();
+
+        try (BufferedOutputStream bos = new 
BufferedOutputStream(compression.getOutputStream(Files.newOutputStream(resultFile))))
 {
+            ExternalSortByteArray.mergeSortedFilesBinary(intermediateFiles,
+                    bos,
+                    cmp,
+                    true,
+                    compression,
+                    typeToByteArray,
+                    byteArrayToType);
+        }
+
+        ArrayList<String> lines = new ArrayList<>();
+        try (BufferedReader bufferedReader = new BufferedReader(new 
InputStreamReader(compression.getInputStream(Files.newInputStream(resultFile)), 
charset))) {
+            while (true) {
+                String line = bufferedReader.readLine();
+                if (line == null) {
+                    break;
+                }
+                lines.add(line);
+            }
+        }
+        String[] actual = lines.toArray(new String[0]);
+        String[] expected = testLines.stream().map(tl -> 
tl.line).toArray(String[]::new);
+        assertArrayEquals(expected, actual);
+    }
+
+    private List<BinaryTestLine> generateTestLines(int numberOfLines) {
+        List<BinaryTestLine> testLines = new ArrayList<>(numberOfLines);
+        for (int i = 0; i < numberOfLines; i++) {
+            testLines.add(new BinaryTestLine(i + ":" + "foo-" + i));
+        }
+        return testLines;
+    }
+
+    private List<Path> createIntermediateFiles(List<BinaryTestLine> ffsLines, 
int numberOfFiles, Compression compression) throws Exception {
+        Iterator<BinaryTestLine> ffsIter = ffsLines.iterator();
+        Path workFolder = folder.newFolder("merge_many_test").toPath();
+        ArrayList<Path> intermediateFiles = new ArrayList<>(numberOfFiles);
+        int linesPerFile = ffsLines.size() / numberOfFiles;
+
+        for (int fileIdx = 0; fileIdx < numberOfFiles; fileIdx++) {
+            Path intermediateFile = 
workFolder.resolve(compression.addSuffix("intermediate-" + fileIdx + ".json"));
+            ArrayList<BinaryTestLine> binaryTestLinesInFile = new 
ArrayList<>();
+            while (binaryTestLinesInFile.size() < linesPerFile && 
ffsIter.hasNext()) {
+                binaryTestLinesInFile.add(ffsIter.next());
+            }
+            if (fileIdx == numberOfFiles - 1) {
+                // Add the remaining elements to the last file
+                while (ffsIter.hasNext()) {
+                    binaryTestLinesInFile.add(ffsIter.next());
+                }
+            }
+            binaryTestLinesInFile.sort(cmp);
+
+            try (BufferedWriter bw = new BufferedWriter(
+                    new OutputStreamWriter(
+                            compression.getOutputStream(
+                                    
Files.newOutputStream(intermediateFile))))) {
+                for (BinaryTestLine binaryTestLine : binaryTestLinesInFile) {
+                    bw.write(binaryTestLine.line);
+                    bw.write("\n");
+                }
+            }
+            intermediateFiles.add(intermediateFile);
+        }
+        return intermediateFiles;
+    }
+
+    private static class BinaryTestLine implements Comparable<BinaryTestLine> {
+        final String line;
+        final int value;
+        final byte[] bytes;
+
+        public BinaryTestLine(String line) {
+            this.line = line;
+            this.value = Integer.parseInt(line.substring(0, 
line.indexOf(':')));
+            this.bytes = line.getBytes(StandardCharsets.UTF_8);
+        }
+
+        public BinaryTestLine(byte[] bytes) {
+            this.bytes = bytes;
+            this.line = new String(bytes, StandardCharsets.UTF_8);
+            this.value = Integer.parseInt(line.substring(0, 
line.indexOf(':')));
+        }
+
+        @Override
+        public int compareTo(BinaryTestLine o) {
+            return Integer.compare(value, o.value);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            BinaryTestLine binaryTestLine = (BinaryTestLine) o;
+            return line.equals(binaryTestLine.line);
+        }
+
+        @Override
+        public int hashCode() {
+            return line.hashCode();
+        }
+
+        @Override
+        public String toString() {
+            return line;
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortTest.java
 
b/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortTest.java
index 359bda6646..47530a471b 100644
--- 
a/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortTest.java
+++ 
b/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortTest.java
@@ -518,7 +518,7 @@ public class ExternalSortTest {
 
     }
 
-    private static class TestLine implements Comparable<TestLine> {
+    static class TestLine implements Comparable<TestLine> {
         final String line;
         final int value;
 
@@ -589,7 +589,7 @@ public class ExternalSortTest {
         }
     }
 
-    private static Compression LZ4() {
+    static Compression LZ4() {
         return new Compression() {
             @Override
             public InputStream getInputStream(InputStream in) throws 
IOException {
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreUtils.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreUtils.java
index 06fa755d16..2b5b36298c 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreUtils.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreUtils.java
@@ -21,7 +21,6 @@ package 
org.apache.jackrabbit.oak.index.indexer.document.flatfile;
 
 import org.apache.jackrabbit.oak.commons.Compression;
 
-import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
@@ -70,11 +69,6 @@ public class FlatFileStoreUtils {
         return new BufferedWriter(new 
OutputStreamWriter(algorithm.getOutputStream(out)));
     }
 
-    public static BufferedOutputStream createOutputStream(File file, 
Compression algorithm) throws IOException {
-        OutputStream out = new FileOutputStream(file);
-        return new BufferedOutputStream(algorithm.getOutputStream(out));
-    }
-
     public static long sizeOf(List<File> sortedFiles) {
         return sortedFiles.stream().mapToLong(File::length).sum();
     }
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateHolder.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateHolder.java
index c45f33e124..59e18c20fa 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateHolder.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateHolder.java
@@ -18,35 +18,22 @@
  */
 package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
 
-import org.apache.jackrabbit.oak.commons.PathUtils;
-import 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.NodeStateEntryWriter;
 
-import java.util.Iterator;
+public final class NodeStateHolder {
 
-final class NodeStateHolder {
-
-    private final String line;
+    private final byte[] line;
     private final String[] pathElements;
 
-    public NodeStateHolder(String line) {
+    public NodeStateHolder(byte[] line, String[] pathElements) {
         this.line = line;
-        String path = NodeStateEntryWriter.getPath(line);
-        int depth = PathUtils.getDepth(path);
-        this.pathElements = new String[depth];
-        Iterator<String> iter = PathUtils.elements(path).iterator();
-        for (int i = 0; i < pathElements.length; i++) {
-            pathElements[i] = iter.next();
-        }
+        this.pathElements = pathElements;
     }
 
     public String[] getPathElements() {
         return pathElements;
     }
 
-    /**
-     * Line here contains the path also
-     */
-    public String getLine() {
+    public byte[] getLine() {
         return line;
     }
 }
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateHolderFactory.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateHolderFactory.java
new file mode 100644
index 0000000000..45e02a1b6e
--- /dev/null
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateHolderFactory.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
+
+import java.util.ArrayList;
+import java.util.function.Function;
+
+import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy.FLATFILESTORE_CHARSET;
+
+final class NodeStateHolderFactory implements Function<byte[], 
NodeStateHolder> {
+    private final static byte PIPE = (byte) '|';
+    private final static byte PATH_SEPARATOR = (byte) '/';
+
+    // In UTF-8, the ASCII characters have similar encoding as in ASCII, so we 
can search for them without decoding the
+    // stream. And characters encoded as multibyte will not have any of their 
bytes matching an ASCII character, because
+    // the bytes in a multibyte encoding all start with 1, while ASCII 
characters start with 0.
+    // https://en.wikipedia.org/wiki/UTF-8#Encoding
+    private static int indexOf(byte[] ffsLine, byte ch, int from, int to) {
+        for (int i = from; i < to; i++) {
+            if (ffsLine[i] == ch) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+
+    private static int indexOf(byte[] ffsLine, byte ch) {
+        return indexOf(ffsLine, ch, 0, ffsLine.length);
+    }
+
+    private static boolean isAbsolutePath(String path) {
+        return !path.isEmpty() && path.charAt(0) == '/';
+    }
+
+    private final ArrayList<String> partsBuffer = new ArrayList<>(16);
+
+    public NodeStateHolder apply(byte[] ffsLine) {
+        return ffsLine == null ? null : new NodeStateHolder(ffsLine, 
parts(ffsLine));
+    }
+
+    // We are only interested in the path section of the line, we do not need 
to convert to String the full line. So
+    // we search for the pipe and then convert only this section a String.
+    private String[] parts(byte[] ffsLine) {
+        partsBuffer.clear();
+        int pipeIdx = indexOf(ffsLine, PIPE);
+        if (pipeIdx < 0) {
+            throw new IllegalStateException("Line does not contain a pipe: " + 
new String(ffsLine, FLATFILESTORE_CHARSET));
+        }
+        String path = new String(ffsLine, 0, pipeIdx, FLATFILESTORE_CHARSET);
+
+        int pos = isAbsolutePath(path) ? 1 : 0;
+        while (true) {
+            if (pos >= path.length()) {
+                return partsBuffer.toArray(new String[0]);
+            }
+            int i = path.indexOf(PATH_SEPARATOR, pos);
+            if (i < 0) {
+                // Add last part and exit
+                partsBuffer.add(path.substring(pos));
+                pos = path.length();
+            } else {
+                partsBuffer.add(path.substring(pos, i));
+                pos = i + 1;
+            }
+        }
+    }
+}
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PathElementComparator.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PathElementComparator.java
index 7172fe4bbc..803e95f9e5 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PathElementComparator.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PathElementComparator.java
@@ -18,21 +18,18 @@
  */
 package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
 
-import org.apache.jackrabbit.guava.common.collect.ImmutableSet;
-
 import java.util.Comparator;
 import java.util.Set;
 
-class PathElementComparator implements Comparator<String[]> {
+public class PathElementComparator implements Comparator<String[]> {
     private final Set<String> preferred;
 
     public PathElementComparator(Set<String> preferredPathElements) {
-        this.preferred = ImmutableSet.copyOf(preferredPathElements);
+        this.preferred = Set.copyOf(preferredPathElements);
     }
 
     @Override
     public int compare(String[] p1, String[] p2) {
-
         int i1 = 0;
         int i2 = 0;
 
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
index a857a4422c..d14fc0f8c0 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
@@ -18,111 +18,223 @@
  */
 package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.guava.common.base.Preconditions;
 import org.apache.jackrabbit.guava.common.base.Stopwatch;
 import org.apache.jackrabbit.oak.commons.Compression;
-import org.apache.jackrabbit.oak.commons.sort.ExternalSort;
-import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
+import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.apache.jackrabbit.oak.commons.sort.ExternalSortByteArray;
+import 
org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreUtils;
 import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
+import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
+import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedWriter;
-import java.io.File;
+import java.io.BufferedOutputStream;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Locale;
+import java.util.PriorityQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
-import static 
org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCountBin;
-import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils.createWriter;
-import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils.getSortedStoreFileName;
-import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils.sizeOf;
-import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy.FLATFILESTORE_CHARSET;
 import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy.SENTINEL_SORTED_FILES_QUEUE;
+import static 
org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreUtils.getSortedStoreFileName;
 
 /**
  * Accumulates the intermediate sorted files and, when all files are 
generated, merges them into a single sorted file,
  * the flat file store
  */
-class PipelinedMergeSortTask implements 
Callable<PipelinedMergeSortTask.Result> {
-    // TODO: start merging small files into larger files to avoid having too 
many "small" files at the end.
-    //  Idea: when there are more than k (for instance, 10) intermediate files 
whose size is under a certain limit
-    //  (for instance, 1GB),  merge them into a  single file. And repeat the 
test whenever this task receives a new
-    //  intermediate file.
+public class PipelinedMergeSortTask implements 
Callable<PipelinedMergeSortTask.Result> {
+    /**
+     * Minimum number of intermediate files that must exist before trying to 
do an eager merge
+     */
+    public static final String 
OAK_INDEXER_PIPELINED_EAGER_MERGE_TRIGGER_THRESHOLD = 
"oak.indexer.pipelined.eagerMergeTriggerThreshold";
+    public static final int 
DEFAULT_OAK_INDEXER_PIPELINED_EAGER_MERGE_TRIGGER_THRESHOLD = 64;
+    /*
+     * Maximum number of files to eagerly merge. This is to keep the eager 
merges efficient, as efficiency decreases
+     * with the number of files in a merge
+     */
+    public static final String 
OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_FILES_TO_MERGE = 
"oak.indexer.pipelined.eagerMergeMaxFilesToMerge";
+    public static final int 
DEFAULT_OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_FILES_TO_MERGE = 32;
+    /*
+     * Minimum number of files to eagerly merge at a time.
+     */
+    public static final String 
OAK_INDEXER_PIPELINED_EAGER_MERGE_MIN_FILES_TO_MERGE = 
"oak.indexer.pipelined.eagerMergeMinFilesToMerge";
+    public static final int 
DEFAULT_OAK_INDEXER_PIPELINED_EAGER_MERGE_MIN_FILES_TO_MERGE = 4;
+    /*
+     * Maximum total size of intermediate files that can be eagerly merged.
+     */
+    public static final String 
OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_SIZE_TO_MERGE_MB = 
"oak.indexer.pipelined.eagerMergeMaxSizeToMergeMB";
+    public static final int 
DEFAULT_OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_SIZE_TO_MERGE_MB = 2048;
+
+    public final static String 
OAK_INDEXER_PIPELINED_EXTERNAL_MERGE_READ_BUFFER_SIZE = 
"oak.indexer.pipelined.externalMerge.readBufferSize";
+    public final static int 
DEFAULT_OAK_INDEXER_PIPELINED_EXTERNAL_MERGE_READ_BUFFER_SIZE = 16 * 1024;
+
     public static class Result {
-        private final File flatFileStoreFile;
-        private final int filesMerged;
+        private final Path flatFileStoreFile;
+        private final int intermediateFilesCount;
+        private final int finalMergeFilesCount;
+        private final int eagerMergeRuns;
 
-        public Result(File flatFileStoreFile, int filesMerged) {
+        public Result(Path flatFileStoreFile, int intermediateFilesCount, int 
finalMergeFilesCount, int eagerMergeRuns) {
             this.flatFileStoreFile = flatFileStoreFile;
-            this.filesMerged = filesMerged;
+            this.intermediateFilesCount = intermediateFilesCount;
+            this.finalMergeFilesCount = finalMergeFilesCount;
+            this.eagerMergeRuns = eagerMergeRuns;
         }
 
-        public File getFlatFileStoreFile() {
+        public Path getFlatFileStoreFile() {
             return flatFileStoreFile;
         }
 
-        public int getFilesMerged() {
-            return filesMerged;
+        public int getFinalMergeFilesCount() {
+            return finalMergeFilesCount;
+        }
+
+        public int getEagerMergeRuns() {
+            return eagerMergeRuns;
+        }
+
+        public int getIntermediateFilesCount() {
+            return intermediateFilesCount;
+        }
+
+        @Override
+        public String toString() {
+            return "Result{" +
+                    "flatFileStoreFile=" + flatFileStoreFile +
+                    ", intermediateFilesCount=" + intermediateFilesCount +
+                    ", finalMergeFilesCount=" + finalMergeFilesCount +
+                    ", eagerMergeRuns=" + eagerMergeRuns +
+                    '}';
         }
     }
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(PipelinedMergeSortTask.class);
+    private static class PathAndSize implements Comparable<PathAndSize> {
+        final Path file;
+        final long size;
+
+        public PathAndSize(Path file, long size) {
+            this.file = file;
+            this.size = size;
+        }
+
+        @Override
+        public String toString() {
+            return "FileAndSize{" +
+                    "file=" + file.toString() +
+                    ", size=" + IOUtils.humanReadableByteCountBin(size) +
+                    '}';
+        }
 
+        @Override
+        public int compareTo(@NotNull PipelinedMergeSortTask.PathAndSize 
other) {
+            return Long.compare(this.size, other.size);
+        }
+    }
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PipelinedMergeSortTask.class);
     private static final String THREAD_NAME = "mongo-merge-sort-files";
 
-    private final File storeDir;
+    private final Path storeDir;
     private final Comparator<NodeStateHolder> comparator;
     private final Compression algorithm;
-    private final BlockingQueue<File> sortedFilesQueue;
-    private final ArrayList<File> sortedFiles = new ArrayList<>();
+    private final BlockingQueue<Path> sortedFilesQueue;
+    private final PriorityQueue<PathAndSize> sortedFiles = new 
PriorityQueue<>();
+    private final AtomicBoolean stopEagerMerging = new AtomicBoolean(false);
+    private final int mergeTriggerThreshold;
+    private final int minFilesToMerge;
+    private final int maxFilesToMerge;
+    private final int maxSizeToMergeMB;
+    private final int externalMergeReadBufferSize;
+    private int eagerMergeRuns;
+    private int mergedFilesCounter = 0;
 
-    public PipelinedMergeSortTask(File storeDir,
+    public PipelinedMergeSortTask(Path storeDir,
                                   PathElementComparator pathComparator,
                                   Compression algorithm,
-                                  BlockingQueue<File> sortedFilesQueue) {
+                                  BlockingQueue<Path> sortedFilesQueue) {
         this.storeDir = storeDir;
         this.comparator = (e1, e2) -> 
pathComparator.compare(e1.getPathElements(), e2.getPathElements());
         this.algorithm = algorithm;
         this.sortedFilesQueue = sortedFilesQueue;
+
+        this.mergeTriggerThreshold = 
ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_EAGER_MERGE_TRIGGER_THRESHOLD,
 DEFAULT_OAK_INDEXER_PIPELINED_EAGER_MERGE_TRIGGER_THRESHOLD);
+        Preconditions.checkArgument(mergeTriggerThreshold >= 16,
+                "Invalid value for property " + 
OAK_INDEXER_PIPELINED_EAGER_MERGE_TRIGGER_THRESHOLD + ": " + 
mergeTriggerThreshold + ". Must be >= 16");
+
+        this.minFilesToMerge = 
ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_EAGER_MERGE_MIN_FILES_TO_MERGE,
 DEFAULT_OAK_INDEXER_PIPELINED_EAGER_MERGE_MIN_FILES_TO_MERGE);
+        Preconditions.checkArgument(minFilesToMerge >= 2,
+                "Invalid value for property " + 
OAK_INDEXER_PIPELINED_EAGER_MERGE_MIN_FILES_TO_MERGE + ": " + minFilesToMerge + 
". Must be >= 2");
+
+        this.maxFilesToMerge = 
ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_FILES_TO_MERGE,
 DEFAULT_OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_FILES_TO_MERGE);
+        Preconditions.checkArgument(maxFilesToMerge >= minFilesToMerge,
+                "Invalid value for property " + 
OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_FILES_TO_MERGE + ": " + maxFilesToMerge + 
". Must be >= " + OAK_INDEXER_PIPELINED_EAGER_MERGE_MIN_FILES_TO_MERGE + " (" + 
minFilesToMerge + ")");
+
+        this.maxSizeToMergeMB = 
ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_SIZE_TO_MERGE_MB,
 DEFAULT_OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_SIZE_TO_MERGE_MB);
+        Preconditions.checkArgument(maxSizeToMergeMB >= 1,
+                "Invalid value for property " + 
OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_SIZE_TO_MERGE_MB + ": " + 
maxSizeToMergeMB + ". Must be >= 1");
+
+        this.externalMergeReadBufferSize = 
ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_EXTERNAL_MERGE_READ_BUFFER_SIZE,
 DEFAULT_OAK_INDEXER_PIPELINED_EXTERNAL_MERGE_READ_BUFFER_SIZE);
+        Preconditions.checkArgument(externalMergeReadBufferSize >= 
FileUtils.ONE_KB,
+                "Invalid value for property " + 
OAK_INDEXER_PIPELINED_EXTERNAL_MERGE_READ_BUFFER_SIZE + ": " + 
externalMergeReadBufferSize + ". Must be >= 1 KB");
     }
 
     @Override
     public Result call() throws Exception {
+        this.eagerMergeRuns = 0;
         String originalName = Thread.currentThread().getName();
         Thread.currentThread().setName(THREAD_NAME);
+        int intermediateFilesCount = 0;
         try {
             LOG.info("[TASK:{}:START] Starting merge sort task", 
THREAD_NAME.toUpperCase(Locale.ROOT));
             while (true) {
-                LOG.info("Waiting for next intermediate sorted file");
-                File sortedIntermediateFile = sortedFilesQueue.take();
+                LOG.debug("Waiting for next intermediate sorted file");
+                Path sortedIntermediateFile = sortedFilesQueue.take();
                 if (sortedIntermediateFile == SENTINEL_SORTED_FILES_QUEUE) {
                     long sortedFilesSizeBytes = sizeOf(sortedFiles);
-                    LOG.info("Going to sort {} files, total size {}", 
sortedFiles.size(), humanReadableByteCountBin(sortedFilesSizeBytes));
+                    LOG.info("Going to sort {} files, total size {}", 
sortedFiles.size(), IOUtils.humanReadableByteCountBin(sortedFilesSizeBytes));
                     Stopwatch w = Stopwatch.createStarted();
-                    File flatFileStore = sortStoreFile(sortedFiles);
-                    LOG.info("Final merge completed in {}. Created file: {}", 
FormattingUtils.formatToSeconds(w), flatFileStore.getAbsolutePath());
-                    long ffsSizeBytes = flatFileStore.length();
+                    List<Path> simpleFileList = sortedFiles.stream().map(f -> 
f.file).collect(Collectors.toList());
+                    Path flatFileStore = sortStoreFile(simpleFileList);
+
+                    LOG.info("Final merge completed in {}. Created file: {}", 
FormattingUtils.formatToSeconds(w), flatFileStore.toAbsolutePath());
+                    long ffsSizeBytes = Files.size(flatFileStore);
                     String metrics = MetricsFormatter.newBuilder()
                             .add("duration", 
FormattingUtils.formatToSeconds(w))
                             .add("durationSeconds", 
w.elapsed(TimeUnit.SECONDS))
                             .add("filesMerged", sortedFiles.size())
                             .add("ffsSizeBytes", ffsSizeBytes)
-                            .add("ffsSize", 
humanReadableByteCountBin(ffsSizeBytes))
+                            .add("ffsSize", 
IOUtils.humanReadableByteCountBin(ffsSizeBytes))
                             .build();
 
                     LOG.info("[TASK:{}:END] Metrics: {}", 
THREAD_NAME.toUpperCase(Locale.ROOT), metrics);
-                    return new Result(flatFileStore, sortedFiles.size());
+                    return new Result(flatFileStore, intermediateFilesCount, 
sortedFiles.size(), eagerMergeRuns);
+
+                } else {
+                    sortedFiles.add(new PathAndSize(sortedIntermediateFile, 
Files.size(sortedIntermediateFile)));
+                    intermediateFilesCount++;
+                    LOG.info("Received new intermediate sorted file {}. Size: 
{}. Total files: {} of size {}",
+                            sortedIntermediateFile, 
IOUtils.humanReadableByteCountBin(Files.size(sortedIntermediateFile)),
+                            sortedFiles.size(), 
IOUtils.humanReadableByteCountBin(sizeOf(sortedFiles)));
+                    // No point in doing eager merging if we already finished 
downloading from Mongo.
+                    // In this case, we do only the final merge.
+                    if (stopEagerMerging.get()) {
+                        LOG.debug("Skipping eager merging because download 
from Mongo has finished");
+                    } else {
+                        tryMergeIntermediateFilesEagerly();
+                    }
                 }
-                sortedFiles.add(sortedIntermediateFile);
-                LOG.info("Received new intermediate sorted file {}. Size: {}. 
Total files: {} of size {}",
-                        sortedIntermediateFile, 
humanReadableByteCountBin(sortedIntermediateFile.length()),
-                        sortedFiles.size(), 
humanReadableByteCountBin(sizeOf(sortedFiles)));
             }
         } catch (InterruptedException t) {
             LOG.warn("Thread interrupted", t);
@@ -135,19 +247,84 @@ class PipelinedMergeSortTask implements 
Callable<PipelinedMergeSortTask.Result>
         }
     }
 
-    private File sortStoreFile(List<File> sortedFilesBatch) throws IOException 
{
-        File sortedFile = new File(storeDir, 
getSortedStoreFileName(algorithm));
-        try (BufferedWriter writer = createWriter(sortedFile, algorithm)) {
-            Function<String, NodeStateHolder> stringToType = (line) -> line == 
null ? null : new NodeStateHolder(line);
-            Function<NodeStateHolder, String> typeToString = holder -> holder 
== null ? null : holder.getLine();
-            ExternalSort.mergeSortedFiles(sortedFilesBatch,
+    /**
+     * Stops eager merging. After this method is called, eager merging will no 
longer be performed, even if the
+     * conditions for merging are met.
+     * This method should be called when the download terminates, to avoid 
starting new eager merges which would delay
+     * the final merge.
+     */
+    public void stopEagerMerging() {
+        stopEagerMerging.set(true);
+    }
+
+    private static long sizeOf(PriorityQueue<PathAndSize> sortedFiles) {
+        return sortedFiles.stream().mapToLong(f -> f.size).sum();
+    }
+
+    private void tryMergeIntermediateFilesEagerly() throws IOException {
+        if (sortedFiles.size() < mergeTriggerThreshold) {
+            // Not enough intermediate files to merge.
+            return;
+        }
+
+        ArrayList<PathAndSize> filesAndSizeToSort = new ArrayList<>();
+        long sumOfSizesBytes = 0;
+        while (!sortedFiles.isEmpty() &&
+                filesAndSizeToSort.size() < maxFilesToMerge &&
+                sumOfSizesBytes / FileUtils.ONE_MB < maxSizeToMergeMB) {
+            // Get the next candidate. Do not remove the file from the queue 
because it may be too large for merging eagerly
+            PathAndSize pathAndSize = sortedFiles.peek();
+            if (pathAndSize.size / FileUtils.ONE_MB > maxSizeToMergeMB) {
+                LOG.debug("File {} is too large to be merged. Size: {}, max 
allowed: {} MB. Stopping searching for intermediate files to merge because all 
other files are larger.",
+                        pathAndSize.file.toAbsolutePath(),
+                        IOUtils.humanReadableByteCountBin(pathAndSize.size),
+                        maxSizeToMergeMB);
+                break;
+            }
+            // Remove the file from the sorted files queue and add it to the 
list of files to sort
+            sortedFiles.poll();
+            filesAndSizeToSort.add(pathAndSize);
+            sumOfSizesBytes += pathAndSize.size;
+        }
+        if (filesAndSizeToSort.size() < minFilesToMerge) {
+            // Not enough candidate files to merge. Put back the candidate 
files in the sorted files queue
+            sortedFiles.addAll(filesAndSizeToSort);
+            LOG.debug("Not enough candidate files to merge. Found {} 
candidates of size {}, minimum for merging is {}",
+                    filesAndSizeToSort.size(), 
IOUtils.humanReadableByteCountBin(sumOfSizesBytes), minFilesToMerge);
+            return;
+        }
+        LOG.info("Merge threshold reached: {} > {}. Going to merge the 
following {} files {} of total size {}.",
+                sortedFiles.size() + filesAndSizeToSort.size(), 
mergeTriggerThreshold,
+                filesAndSizeToSort.size(),
+                filesAndSizeToSort.stream()
+                        .map(fs -> fs.file.getFileName() + ": " + 
IOUtils.humanReadableByteCountBin(fs.size))
+                        .collect(Collectors.joining(", ", "[", "]")),
+                IOUtils.humanReadableByteCountBin(sumOfSizesBytes));
+        Stopwatch start = Stopwatch.createStarted();
+        Path mergedFiled = sortStoreFile(filesAndSizeToSort.stream().map(f -> 
f.file).collect(Collectors.toList()));
+        eagerMergeRuns++;
+        Path destFile = mergedFiled.getParent().resolve("merged-" + 
mergedFilesCounter++);
+        Files.move(mergedFiled, destFile);
+        PathAndSize mergedPathAndSize = new PathAndSize(destFile, 
Files.size(destFile));
+        sortedFiles.add(mergedPathAndSize);
+        LOG.info("{} files merged in {} seconds. New file {}, size: {}",
+                filesAndSizeToSort.size(), start.elapsed(TimeUnit.SECONDS),
+                mergedPathAndSize.file.getFileName(), 
IOUtils.humanReadableByteCountBin(mergedPathAndSize.size));
+    }
+
+    private Path sortStoreFile(List<Path> sortedFilesBatch) throws IOException 
{
+        Path sortedFile = storeDir.resolve(getSortedStoreFileName(algorithm));
+        try (BufferedOutputStream writer = 
IndexStoreUtils.createOutputStream(sortedFile, algorithm)) {
+            Function<byte[], NodeStateHolder> byteArrayToType = new 
NodeStateHolderFactory();
+            Function<NodeStateHolder, byte[]> typeToByteArray = holder -> 
holder == null ? null : holder.getLine();
+            ExternalSortByteArray.mergeSortedFilesBinary(sortedFilesBatch,
                     writer,
                     comparator,
-                    FLATFILESTORE_CHARSET,
                     true, //distinct
                     algorithm,
-                    typeToString,
-                    stringToType
+                    typeToByteArray,
+                    byteArrayToType,
+                    externalMergeReadBufferSize
             );
         }
         return sortedFile;
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
index 22a7552445..ab5fca9bee 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
@@ -26,9 +26,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Locale;
@@ -36,8 +37,8 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 
 import static 
org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCountBin;
-import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils.createOutputStream;
 import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy.SENTINEL_NSE_BUFFER;
+import static 
org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreUtils.createOutputStream;
 
 /**
  * Receives batches of node state entries, sorts then in memory, and finally 
writes them to a file.
@@ -63,18 +64,18 @@ class PipelinedSortBatchTask implements 
Callable<PipelinedSortBatchTask.Result>
     private final Compression algorithm;
     private final BlockingQueue<NodeStateEntryBatch> emptyBuffersQueue;
     private final BlockingQueue<NodeStateEntryBatch> nonEmptyBuffersQueue;
-    private final BlockingQueue<File> sortedFilesQueue;
-    private final File sortWorkDir;
+    private final BlockingQueue<Path> sortedFilesQueue;
+    private final Path sortWorkDir;
     private final byte[] copyBuffer = new byte[4096];
     private long entriesProcessed = 0;
     private long batchesProcessed = 0;
 
-    public PipelinedSortBatchTask(File storeDir,
+    public PipelinedSortBatchTask(Path storeDir,
                                   PathElementComparator pathComparator,
                                   Compression algorithm,
                                   BlockingQueue<NodeStateEntryBatch> 
emptyBuffersQueue,
                                   BlockingQueue<NodeStateEntryBatch> 
nonEmptyBuffersQueue,
-                                  BlockingQueue<File> sortedFilesQueue) throws 
IOException {
+                                  BlockingQueue<Path> sortedFilesQueue) throws 
IOException {
         this.pathComparator = (e1, e2) -> 
pathComparator.compare(e1.getPathElements(), e2.getPathElements());
         this.algorithm = algorithm;
         this.emptyBuffersQueue = emptyBuffersQueue;
@@ -127,7 +128,7 @@ class PipelinedSortBatchTask implements 
Callable<PipelinedSortBatchTask.Result>
         sortBuffer.sort(pathComparator);
         LOG.info("Sorted batch in {}. Saving to disk", sortClock);
         Stopwatch saveClock = Stopwatch.createStarted();
-        File newtmpfile = File.createTempFile("sortInBatch", "flatfile", 
sortWorkDir);
+        Path newtmpfile = Files.createTempFile(sortWorkDir, "sortInBatch", 
"flatfile");
         long textSize = 0;
         batchesProcessed++;
         try (BufferedOutputStream writer = createOutputStream(newtmpfile, 
algorithm)) {
@@ -146,20 +147,20 @@ class PipelinedSortBatchTask implements 
Callable<PipelinedSortBatchTask.Result>
                     writer.write(copyBuffer, 0, bytesRead);
                     bytesRemaining -= bytesRead;
                 }
-                writer.write('\n');
+                writer.write(PipelinedStrategy.FLATFILESTORE_LINE_SEPARATOR);
                 textSize += entrySize + 1;
             }
         }
         LOG.info("Stored batch of size {} (uncompressed {}) with {} entries in 
{}",
-                humanReadableByteCountBin(newtmpfile.length()),
+                humanReadableByteCountBin(Files.size(newtmpfile)),
                 humanReadableByteCountBin(textSize),
                 sortBuffer.size(), saveClock);
         sortedFilesQueue.put(newtmpfile);
     }
 
-    private static File createdSortWorkDir(File storeDir) throws IOException {
-        File sortedFileDir = new File(storeDir, "sort-work-dir");
-        FileUtils.forceMkdir(sortedFileDir);
+    private static Path createdSortWorkDir(Path storeDir) throws IOException {
+        Path sortedFileDir = storeDir.resolve("sort-work-dir");
+        FileUtils.forceMkdir(sortedFileDir.toFile());
         return sortedFileDir;
     }
 }
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
index 5f343601a8..b7f1a8dc25 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
@@ -24,6 +24,7 @@ import org.apache.jackrabbit.guava.common.base.Preconditions;
 import org.apache.jackrabbit.guava.common.base.Stopwatch;
 import org.apache.jackrabbit.guava.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.jackrabbit.oak.commons.Compression;
+import org.apache.jackrabbit.oak.commons.IOUtils;
 import 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.NodeStateEntryWriter;
 import 
org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreSortStrategyBase;
 import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
@@ -42,6 +43,9 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -127,8 +131,9 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
 
     static final NodeDocument[] SENTINEL_MONGO_DOCUMENT = new NodeDocument[0];
     static final NodeStateEntryBatch SENTINEL_NSE_BUFFER = new 
NodeStateEntryBatch(ByteBuffer.allocate(0), 0);
-    static final File SENTINEL_SORTED_FILES_QUEUE = new File("SENTINEL");
+    static final Path SENTINEL_SORTED_FILES_QUEUE = Paths.get("SENTINEL");
     static final Charset FLATFILESTORE_CHARSET = StandardCharsets.UTF_8;
+    static final char FLATFILESTORE_LINE_SEPARATOR = '\n';
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PipelinedStrategy.class);
     // A MongoDB document is at most 16MB, so the buffer that holds node state 
entries must be at least that big
@@ -142,13 +147,13 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
         private final ArrayBlockingQueue<T[]> mongoDocQueue;
         private final ArrayBlockingQueue<NodeStateEntryBatch> 
emptyBatchesQueue;
         private final ArrayBlockingQueue<NodeStateEntryBatch> 
nonEmptyBatchesQueue;
-        private final ArrayBlockingQueue<File> sortedFilesQueue;
+        private final ArrayBlockingQueue<Path> sortedFilesQueue;
         private final TransformStageStatistics transformStageStatistics;
 
         public MonitorTask(ArrayBlockingQueue<T[]> mongoDocQueue,
                            ArrayBlockingQueue<NodeStateEntryBatch> 
emptyBatchesQueue,
                            ArrayBlockingQueue<NodeStateEntryBatch> 
nonEmptyBatchesQueue,
-                           ArrayBlockingQueue<File> sortedFilesQueue,
+                           ArrayBlockingQueue<Path> sortedFilesQueue,
                            TransformStageStatistics transformStageStatistics) {
             this.mongoDocQueue = mongoDocQueue;
             this.emptyBatchesQueue = emptyBatchesQueue;
@@ -170,7 +175,7 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
     private static <T> void printStatistics(ArrayBlockingQueue<T[]> 
mongoDocQueue,
                                             
ArrayBlockingQueue<NodeStateEntryBatch> emptyBuffersQueue,
                                             
ArrayBlockingQueue<NodeStateEntryBatch> nonEmptyBuffersQueue,
-                                            ArrayBlockingQueue<File> 
sortedFilesQueue,
+                                            ArrayBlockingQueue<Path> 
sortedFilesQueue,
                                             TransformStageStatistics 
transformStageStatistics,
                                             boolean printHistogramsAtInfo) {
 
@@ -299,10 +304,10 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
                 mongoDocQueueReservedMemoryMB,
                 mongoDocBatchMaxSizeMB,
                 mongoDocQueueSize);
-        LOG.info("NodeStateEntryBuffers: [ workingMemory: {} MB, 
numberOfBuffers: {}, bufferSize: {} MB, maxEntriesPerBuffer: {} ]",
+        LOG.info("NodeStateEntryBuffers: [ workingMemory: {} MB, 
numberOfBuffers: {}, bufferSize: {}, maxEntriesPerBuffer: {} ]",
                 nseBuffersReservedMemoryMB,
                 nseBuffersCount,
-                nseBuffersSizeBytes / FileUtils.ONE_MB,
+                IOUtils.humanReadableByteCountBin(nseBuffersSizeBytes),
                 nseBufferMaxEntriesPerBuffer);
     }
 
@@ -368,7 +373,7 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
             ArrayBlockingQueue<NodeStateEntryBatch> nonEmptyBatchesQueue = new 
ArrayBlockingQueue<>(nseBuffersCount + 1);
 
             // Queue between sort-and-save thread and the merge-sorted-files 
thread
-            ArrayBlockingQueue<File> sortedFilesQueue = new 
ArrayBlockingQueue<>(64);
+            ArrayBlockingQueue<Path> sortedFilesQueue = new 
ArrayBlockingQueue<>(64);
 
             TransformStageStatistics transformStageStatistics = new 
TransformStageStatistics();
 
@@ -393,7 +398,7 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
             );
             ecs.submit(downloadTask);
 
-            File flatFileStore = null;
+            Path flatFileStore = null;
 
             for (int i = 0; i < numberOfTransformThreads; i++) {
                 NodeStateEntryWriter entryWriter = new 
NodeStateEntryWriter(blobStore);
@@ -412,11 +417,11 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
             }
 
             PipelinedSortBatchTask sortTask = new PipelinedSortBatchTask(
-                    this.getStoreDir(), pathComparator, this.getAlgorithm(), 
emptyBatchesQueue, nonEmptyBatchesQueue, sortedFilesQueue
+                    this.getStoreDir().toPath(), pathComparator, 
this.getAlgorithm(), emptyBatchesQueue, nonEmptyBatchesQueue, sortedFilesQueue
             );
             ecs.submit(sortTask);
 
-            PipelinedMergeSortTask mergeSortTask = new 
PipelinedMergeSortTask(this.getStoreDir(), pathComparator,
+            PipelinedMergeSortTask mergeSortTask = new 
PipelinedMergeSortTask(this.getStoreDir().toPath(), pathComparator,
                     this.getAlgorithm(), sortedFilesQueue);
             ecs.submit(mergeSortTask);
 
@@ -436,6 +441,7 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
                             for (int i = 0; i < numberOfTransformThreads; i++) 
{
                                 mongoDocQueue.put(SENTINEL_MONGO_DOCUMENT);
                             }
+                            mergeSortTask.stopEagerMerging();
 
                         } else if (result instanceof 
PipelinedTransformTask.Result) {
                             PipelinedTransformTask.Result transformResult = 
(PipelinedTransformTask.Result) result;
@@ -459,8 +465,8 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
 
                         } else if (result instanceof 
PipelinedMergeSortTask.Result) {
                             PipelinedMergeSortTask.Result mergeSortedFilesTask 
= (PipelinedMergeSortTask.Result) result;
-                            File ffs = 
mergeSortedFilesTask.getFlatFileStoreFile();
-                            LOG.info("Merge-sort sort task finished. FFS: {}, 
Size: {}", ffs, humanReadableByteCountBin(ffs.length()));
+                            Path ffs = 
mergeSortedFilesTask.getFlatFileStoreFile();
+                            LOG.info("Merge-sort sort task finished. FFS: {}, 
Size: {}", ffs, humanReadableByteCountBin(Files.size(ffs)));
                             flatFileStore = 
mergeSortedFilesTask.getFlatFileStoreFile();
 
                         } else {
@@ -497,7 +503,7 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
                 // No longer need to monitor the size of the queues,
                 monitorFuture.cancel(true);
             }
-            return flatFileStore;
+            return flatFileStore.toFile();
         } finally {
             threadPool.shutdown();
             monitorThreadPool.shutdown();
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/indexstore/IndexStoreUtils.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/indexstore/IndexStoreUtils.java
index 8dea97d36e..b901621887 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/indexstore/IndexStoreUtils.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/indexstore/IndexStoreUtils.java
@@ -33,6 +33,8 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.List;
 
 import static org.apache.jackrabbit.guava.common.base.Preconditions.checkState;
@@ -91,6 +93,11 @@ public class IndexStoreUtils {
         return new BufferedOutputStream(algorithm.getOutputStream(out));
     }
 
+    public static BufferedOutputStream createOutputStream(Path file, 
Compression algorithm) throws IOException {
+        OutputStream out = Files.newOutputStream(file);
+        return new BufferedOutputStream(algorithm.getOutputStream(out));
+    }
+
     public static long sizeOf(List<File> sortedFiles) {
         return sortedFiles.stream().mapToLong(File::length).sum();
     }
diff --git 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedIT.java
 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedIT.java
index 84b86a6089..e1356a6d7d 100644
--- 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedIT.java
+++ 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedIT.java
@@ -41,6 +41,7 @@ import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.jetbrains.annotations.NotNull;
 import org.junit.After;
 import org.junit.Assume;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Rule;
@@ -60,6 +61,7 @@ import java.util.function.Predicate;
 
 import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedMongoDownloadTask.OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING;
 import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedMongoDownloadTask.OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
@@ -89,7 +91,7 @@ public class PipelinedIT {
         }
     }
 
-    @After
+    @After @Before
     public void tear() {
         MongoConnection c = connectionFactory.getConnection();
         if (c != null) {
@@ -248,8 +250,8 @@ public class PipelinedIT {
         
System.setProperty(PipelinedStrategy.OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_MAX_SIZE_MB,
 "1");
         
System.setProperty(PipelinedStrategy.OAK_INDEXER_PIPELINED_MONGO_DOC_QUEUE_RESERVED_MEMORY_MB,
 "32");
 
-        Predicate<String> pathPredicate = s -> true;
-        List<PathFilter> pathFilters = List.of(contentDamPathFilter);
+        Predicate<String> pathPredicate = s -> contentDamPathFilter.filter(s) 
!= PathFilter.Result.EXCLUDE;
+        List<PathFilter> pathFilters = null;
 
         Backend rwStore = createNodeStore(false);
         @NotNull NodeBuilder rootBuilder = 
rwStore.documentNodeStore.getRoot().builder();
@@ -278,7 +280,7 @@ public class PipelinedIT {
 
         File file = pipelinedStrategy.createSortedStoreFile();
         assertTrue(file.exists());
-        assertEquals(expected, Files.readAllLines(file.toPath()));
+        assertArrayEquals(expected.toArray(new String[0]), 
Files.readAllLines(file.toPath()).toArray(new String[0]));
     }
 
 
diff --git 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTaskParameterizedTest.java
 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTaskParameterizedTest.java
new file mode 100644
index 0000000000..18b8fef779
--- /dev/null
+++ 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTaskParameterizedTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
+
+import org.apache.jackrabbit.oak.commons.Compression;
+import 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.LZ4Compression;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy.FLATFILESTORE_CHARSET;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class PipelinedMergeSortTaskParameterizedTest extends 
PipelinedMergeSortTaskTestBase {
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> data() {
+        // numberOfIntermediateFiles, eagerMergeTriggerThreshold
+        return Arrays.asList(new Object[][]{
+                {256, 50},
+                {256, 300}
+        });
+    }
+
+    private final int numberOfIntermediateFiles;
+    private final int eagerMergeTriggerThreshold;
+
+    public PipelinedMergeSortTaskParameterizedTest(int 
numberOfIntermediateFiles, int eagerMergeTriggerThreshold) {
+        this.numberOfIntermediateFiles = numberOfIntermediateFiles;
+        this.eagerMergeTriggerThreshold = eagerMergeTriggerThreshold;
+    }
+
+    @Test
+    public void manyFilesToMerge() throws Exception {
+        log.info("Running with intermediateFiles: {}, 
eagerMergeTriggerThreshold: {}", numberOfIntermediateFiles, 
eagerMergeTriggerThreshold);
+        
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_TRIGGER_THRESHOLD,
 Integer.toString(eagerMergeTriggerThreshold));
+        
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_FILES_TO_MERGE,
 "32");
+        
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_SIZE_TO_MERGE_MB,
 "512");
+        
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_MIN_FILES_TO_MERGE,
 "8");
+
+        // Generate FFS
+        List<String> ffs = generateFFS(LINES_IN_FFS);
+        // Shuffle entries to simulate retrieving from MongoDB by an arbitrary 
order
+        Collections.shuffle(ffs);
+
+        // Generate the expected results by sorting using the node state 
entries comparator,
+        List<NodeStateHolder> nodesOrdered = sortAsNodeStateEntries(ffs);
+        // Convert back to a list of Strings
+        String[] expectedFFS = nodesOrdered.stream().map(f -> new 
String(f.getLine())).toArray(String[]::new);
+
+        // Write intermediate files
+        List<Path> intermediateFiles = createIntermediateFiles(ffs, 
this.numberOfIntermediateFiles);
+
+        // Run test
+        PipelinedMergeSortTask.Result result = 
runTestLargeFiles(Compression.NONE, intermediateFiles.toArray(new Path[0]));
+        Path resultFile = result.getFlatFileStoreFile();
+
+        assertEquals(this.numberOfIntermediateFiles, 
result.getIntermediateFilesCount());
+        if (this.numberOfIntermediateFiles > eagerMergeTriggerThreshold) {
+            assertTrue(result.getEagerMergeRuns() > 0);
+            assertTrue(result.getFinalMergeFilesCount() < 
eagerMergeTriggerThreshold);
+        } else {
+            assertEquals(0, result.getEagerMergeRuns());
+            assertEquals(this.numberOfIntermediateFiles, 
result.getFinalMergeFilesCount());
+        }
+        // Verify result
+        List<String> actualFFS = Files.readAllLines(resultFile);
+        assertArrayEquals(expectedFFS, actualFFS.toArray(new String[0]));
+    }
+
+    /**
+     * For manual testing.
+     */
+    @Ignore
+    public void manyFilesToMergeManual() throws Exception {
+        
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_TRIGGER_THRESHOLD,
 "50");
+        
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_FILES_TO_MERGE,
 "32");
+        
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_SIZE_TO_MERGE_MB,
 "512");
+        
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_MIN_FILES_TO_MERGE,
 "8");
+        Path dirWithFilesToMerge = 
Paths.get("/path/to/ffs/intermediate/files");
+        Path[] files = Files.list(dirWithFilesToMerge)
+                .filter(Files::isRegularFile)
+                .toArray(Path[]::new);
+
+        PipelinedMergeSortTask.Result result = runTestLargeFiles(new 
LZ4Compression(), files);
+        Path resultFile = result.getFlatFileStoreFile();
+        log.info("Result: {}\n{}", resultFile, Files.readString(resultFile, 
FLATFILESTORE_CHARSET));
+    }
+}
diff --git 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTaskTest.java
 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTaskTest.java
index 5275474457..226ae89a98 100644
--- 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTaskTest.java
+++ 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTaskTest.java
@@ -19,103 +19,143 @@
 package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
 
 import org.apache.jackrabbit.oak.commons.Compression;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.BufferedWriter;
-import java.io.File;
 import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.Arrays;
-import java.util.Set;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy.FLATFILESTORE_CHARSET;
 import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy.SENTINEL_SORTED_FILES_QUEUE;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-public class PipelinedMergeSortTaskTest {
-    private static final Logger LOG = 
LoggerFactory.getLogger(PipelinedMergeSortTaskTest.class);
+public class PipelinedMergeSortTaskTest extends PipelinedMergeSortTaskTestBase 
{
     private final ClassLoader classLoader = getClass().getClassLoader();
-    private final PathElementComparator pathComparator = new 
PathElementComparator(Set.of());
     private final Compression algorithm = Compression.NONE;
-    @Rule
-    public TemporaryFolder sortFolder = new TemporaryFolder();
 
     @Test
     public void noFileToMerge() throws Exception {
-        PipelinedMergeSortTask.Result result = runTest();
-        Path resultFile = result.getFlatFileStoreFile().toPath();
+        PipelinedMergeSortTask.Result result = runTest(algorithm);
+        Path resultFile = result.getFlatFileStoreFile();
         assertEquals(0, Files.size(resultFile));
     }
 
     @Test
     public void oneFileToMerge() throws Exception {
-        File singleFileToMerge = getTestFile("pipelined/merge-stage-1.json");
-        PipelinedMergeSortTask.Result result = runTest(singleFileToMerge);
-        Path resultFile = result.getFlatFileStoreFile().toPath();
-        assertEquals(Files.readAllLines(singleFileToMerge.toPath(), 
FLATFILESTORE_CHARSET), Files.readAllLines(resultFile, FLATFILESTORE_CHARSET));
+        Path singleFileToMerge = getTestFile("pipelined/merge-stage-1.json");
+        PipelinedMergeSortTask.Result result = runTest(algorithm, 
singleFileToMerge);
+        Path resultFile = result.getFlatFileStoreFile();
+        assertEquals(Files.readAllLines(singleFileToMerge, 
FLATFILESTORE_CHARSET), Files.readAllLines(resultFile, FLATFILESTORE_CHARSET));
     }
 
     @Test
     public void twoFilesToMerge() throws Exception {
-        File merge1 = getTestFile("pipelined/merge-stage-1.json");
-        File merge2 = getTestFile("pipelined/merge-stage-2.json");
-        File expected = getTestFile("pipelined/merge-expected.json");
-
-        PipelinedMergeSortTask.Result result = runTest(merge1, merge2);
-        Path resultFile = result.getFlatFileStoreFile().toPath();
-        LOG.info("Result: {}\n{}", resultFile, Files.readString(resultFile, 
FLATFILESTORE_CHARSET));
-        assertEquals(Files.readAllLines(expected.toPath(), 
FLATFILESTORE_CHARSET), Files.readAllLines(resultFile, FLATFILESTORE_CHARSET));
+        Path merge1 = getTestFile("pipelined/merge-stage-1.json");
+        Path merge2 = getTestFile("pipelined/merge-stage-2.json");
+        Path expected = getTestFile("pipelined/merge-expected.json");
+
+        PipelinedMergeSortTask.Result result = runTest(algorithm, merge1, 
merge2);
+        Path resultFile = result.getFlatFileStoreFile();
+        log.info("Result: {}\n{}", resultFile, Files.readString(resultFile, 
FLATFILESTORE_CHARSET));
+        assertEquals(Files.readAllLines(expected, FLATFILESTORE_CHARSET), 
Files.readAllLines(resultFile, FLATFILESTORE_CHARSET));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void invalidReadBufferSize() throws Exception {
+        
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EXTERNAL_MERGE_READ_BUFFER_SIZE,
 "10");
+        Path singleFileToMerge = getTestFile("pipelined/merge-stage-1.json");
+        runTest(algorithm, singleFileToMerge);
     }
 
-    private File getTestFile(String name) {
+    private Path getTestFile(String name) {
         URL url = classLoader.getResource(name);
         if (url == null) throw new IllegalArgumentException("Test file not 
found: " + name);
-        return new File(url.getPath());
+        return Paths.get(url.getPath());
     }
 
-    private PipelinedMergeSortTask.Result runTest(File... files) throws 
Exception {
-        File sortRoot = sortFolder.getRoot();
+    private PipelinedMergeSortTask.Result runTest(Compression algorithm, 
Path... files) throws Exception {
+        Path sortRoot = sortFolder.getRoot().toPath();
         // +1 for the Sentinel.
-        ArrayBlockingQueue<File> sortedFilesQueue = new 
ArrayBlockingQueue<>(files.length + 1);
+        ArrayBlockingQueue<Path> sortedFilesQueue = new 
ArrayBlockingQueue<>(files.length + 1);
         PipelinedMergeSortTask mergeSortTask = new 
PipelinedMergeSortTask(sortRoot, pathComparator, algorithm, sortedFilesQueue);
         // Enqueue all the files that are to be merged
-        for (File file : files) {
+        for (Path file : files) {
             // The intermediate files are deleted after being merged, so we 
should copy them to the temporary sort root folder
-            Path workDirCopy = Files.copy(file.toPath(), 
sortRoot.toPath().resolve(file.getName()));
-            sortedFilesQueue.put(workDirCopy.toFile());
+            Path workDirCopy = Files.copy(file, 
sortRoot.resolve(file.getFileName()));
+            sortedFilesQueue.put(workDirCopy);
         }
         // Signal end of files to merge
         sortedFilesQueue.put(SENTINEL_SORTED_FILES_QUEUE);
         // Run the merge task
         PipelinedMergeSortTask.Result result = mergeSortTask.call();
-        File[] filesInWorkDir = sortRoot.listFiles();
-        if (filesInWorkDir == null)
-            throw new IllegalStateException("The sort work directory is not a 
directory: " + sortRoot);
-        assertEquals("The sort work directory should contain only the flat 
file store, the intermediate files should have been deleted after merged. 
Instead it contains: " + Arrays.toString(filesInWorkDir),
-                1, filesInWorkDir.length);
-        assertTrue(result.getFlatFileStoreFile().exists());
+
+        try (Stream<Path> fileStream = Files.list(sortRoot)) {
+            List<String> filesInWorkDir = fileStream
+                    .map(path -> path.getFileName().toString())
+                    .collect(Collectors.toList());
+            assertEquals("The sort work directory should contain only the flat 
file store, the intermediate files should have been deleted after merged. 
Instead it contains: " + filesInWorkDir,
+                    1, filesInWorkDir.size());
+        }
+        assertTrue(Files.exists(result.getFlatFileStoreFile()));
         return result;
     }
 
     @Test(expected = IllegalStateException.class)
     public void badInputFile() throws Exception {
-        File singleFileToMerge = createFileWithWrongFormat();
-        runTest(singleFileToMerge);
+        Path singleFileToMerge = createFileWithWrongFormat();
+        runTest(algorithm, singleFileToMerge);
     }
 
-    private File createFileWithWrongFormat() throws Exception {
-        File file = Files.createTempFile("merge-stage-input", 
".json").toFile();
-        try (BufferedWriter bw = Files.newBufferedWriter(file.toPath(), 
FLATFILESTORE_CHARSET)) {
+    private Path createFileWithWrongFormat() throws Exception {
+        Path file = Files.createTempFile(sortFolder.getRoot().toPath(), 
"merge-stage-input", ".json");
+        try (BufferedWriter bw = Files.newBufferedWriter(file, 
FLATFILESTORE_CHARSET)) {
             bw.write("/a/b/c\n");
         }
-        file.deleteOnExit();
         return file;
     }
+
+    @Test
+    public void manyFilesToMergeDidNotMerge() throws Exception {
+        int intermediateFilesCount = 256;
+        
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_TRIGGER_THRESHOLD,
 "20");
+        
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_FILES_TO_MERGE,
 "1000");
+        
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_SIZE_TO_MERGE_MB,
 "1");
+        
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_MIN_FILES_TO_MERGE,
 "1000");
+
+        // Generate FFS
+        List<String> ffs = generateFFS(LINES_IN_FFS);
+        // Shuffle entries to simulate retrieving from MongoDB by an arbitrary 
order
+        Collections.shuffle(ffs);
+
+        // Generate the expected results by sorting using the node state 
entries comparator,
+        List<NodeStateHolder> nodesOrdered = sortAsNodeStateEntries(ffs);
+        // Convert back to a list of Strings to use as expected result
+        String[] expectedFFS = nodesOrdered.stream()
+                .map(f -> new String(f.getLine(), FLATFILESTORE_CHARSET))
+                .toArray(String[]::new);
+
+        // Write intermediate files
+        List<Path> intermediateFiles = createIntermediateFiles(ffs, 
intermediateFilesCount);
+
+        // Run test
+        PipelinedMergeSortTask.Result result = 
runTestLargeFiles(Compression.NONE, intermediateFiles.toArray(new Path[0]));
+        Path resultFile = result.getFlatFileStoreFile();
+
+        assertEquals(intermediateFilesCount, 
result.getIntermediateFilesCount());
+        assertEquals(0, result.getEagerMergeRuns());
+        assertEquals(intermediateFilesCount, result.getFinalMergeFilesCount());
+
+        // Verify result
+        List<String> actualFFS = Files.readAllLines(resultFile);
+        assertArrayEquals(expectedFFS, actualFFS.toArray(new String[0]));
+    }
 }
diff --git 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTaskTestBase.java
 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTaskTestBase.java
new file mode 100644
index 0000000000..b135ccf195
--- /dev/null
+++ 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTaskTestBase.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
+
+import org.apache.jackrabbit.oak.commons.Compression;
+import org.junit.Rule;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy.FLATFILESTORE_CHARSET;
+import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy.SENTINEL_SORTED_FILES_QUEUE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class PipelinedMergeSortTaskTestBase {
+    static final int LINES_IN_FFS = 100000;
+    static final PathElementComparator pathComparator = new 
PathElementComparator(Set.of());
+
+    protected final Logger log = LoggerFactory.getLogger(this.getClass());
+    @Rule
+    public TemporaryFolder sortFolder = new TemporaryFolder();
+    @Rule
+    public final RestoreSystemProperties restoreSystemProperties = new 
RestoreSystemProperties();
+
+
+    protected List<NodeStateHolder> sortAsNodeStateEntries(List<String> 
ffsLines) {
+        Comparator<NodeStateHolder> comparatorBinary = (e1, e2) -> 
pathComparator.compare(e1.getPathElements(), e2.getPathElements());
+        NodeStateHolderFactory nodeFactory = new NodeStateHolderFactory();
+        List<NodeStateHolder> nodesOrdered = ffsLines.stream()
+                .map(ffsLine -> 
nodeFactory.apply(ffsLine.getBytes(FLATFILESTORE_CHARSET)))
+                .sorted(comparatorBinary)
+                .collect(Collectors.toList());
+        return nodesOrdered;
+    }
+
+
+    protected List<String> generateFFS(int numberOfLines) {
+        List<String> ffsLines = new ArrayList<>(numberOfLines);
+        for (int i = 0; i < numberOfLines; i++) {
+            String path = 
"/a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z/" + i;
+            String entry = "{\"_id\":\"" + path + 
"\",\"property\":[{\"name\":\"jcr:primaryType\",\"values\":[\"nt:unstructured\"]}]}";
+            ffsLines.add(path + "|" + entry);
+        }
+        return ffsLines;
+    }
+
+    protected List<Path> createIntermediateFiles(List<String> ffsLines, int 
numberOfFiles) throws Exception {
+        Iterator<String> ffsIter = ffsLines.iterator();
+        Path workFolder = sortFolder.newFolder("merge_many_test").toPath();
+        ArrayList<Path> intermediateFiles = new ArrayList<>(numberOfFiles);
+        int linesPerFile = ffsLines.size() / numberOfFiles;
+
+        for (int fileIdx = 0; fileIdx < numberOfFiles; fileIdx++) {
+            Path intermediateFile = workFolder.resolve("intermediate-" + 
fileIdx + ".json");
+            ArrayList<String> linesInIntermediateFile = new ArrayList<>();
+            while (linesInIntermediateFile.size() < linesPerFile && 
ffsIter.hasNext()) {
+                linesInIntermediateFile.add(ffsIter.next());
+            }
+            if (fileIdx == numberOfFiles - 1) {
+                // Add the remaining elements to the last file
+                while (ffsIter.hasNext()) {
+                    linesInIntermediateFile.add(ffsIter.next());
+                }
+            }
+            List<NodeStateHolder> nodesSorted = 
sortAsNodeStateEntries(linesInIntermediateFile);
+            try (BufferedWriter bw = Files.newBufferedWriter(intermediateFile, 
FLATFILESTORE_CHARSET)) {
+                for (NodeStateHolder node : nodesSorted) {
+                    bw.write(new String(node.getLine()));
+                    bw.write("\n");
+                }
+            }
+            intermediateFiles.add(intermediateFile);
+        }
+        return intermediateFiles;
+    }
+
+    protected PipelinedMergeSortTask.Result runTestLargeFiles(Compression 
algorithm, Path... files) throws Exception {
+        Path sortRoot = sortFolder.getRoot().toPath();
+        // +1 for the Sentinel.
+        ArrayBlockingQueue<Path> sortedFilesQueue = new 
ArrayBlockingQueue<>(files.length + 1);
+        PipelinedMergeSortTask mergeSortTask = new 
PipelinedMergeSortTask(sortRoot, pathComparator, algorithm, sortedFilesQueue);
+        // Enqueue all the files that are to be merged
+        for (Path file : files) {
+            sortedFilesQueue.put(file);
+        }
+        // Signal end of files to merge
+        sortedFilesQueue.put(SENTINEL_SORTED_FILES_QUEUE);
+        // Run the merge task
+        PipelinedMergeSortTask.Result result = mergeSortTask.call();
+        List<Path> filesInWorkDir;
+        try (Stream<Path> stream = Files.list(sortRoot)) {
+            filesInWorkDir = 
stream.filter(Files::isRegularFile).collect(Collectors.toList());
+        }
+        assertEquals("The sort work directory should contain only the flat 
file store, the intermediate files should have been deleted after merged. 
Instead it contains: " + filesInWorkDir,
+                1, filesInWorkDir.size());
+        assertTrue(Files.exists(result.getFlatFileStoreFile()));
+        return result;
+    }
+}
diff --git 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
index 7665929e28..892b7a3c10 100644
--- 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
+++ 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
@@ -28,11 +28,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.Writer;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -42,9 +42,9 @@ import static org.junit.Assert.assertEquals;
 public class PipelinedSortBatchTaskTest {
     static class TestResult {
         final PipelinedSortBatchTask.Result result;
-        final BlockingQueue<File> sortedFilesQueue;
+        final BlockingQueue<Path> sortedFilesQueue;
 
-        public TestResult(PipelinedSortBatchTask.Result result, 
BlockingQueue<File> sortedFilesQueue) {
+        public TestResult(PipelinedSortBatchTask.Result result, 
BlockingQueue<Path> sortedFilesQueue) {
             this.result = result;
             this.sortedFilesQueue = sortedFilesQueue;
         }
@@ -53,10 +53,11 @@ public class PipelinedSortBatchTaskTest {
             return result;
         }
 
-        public BlockingQueue<File> getSortedFilesQueue() {
+        public BlockingQueue<Path> getSortedFilesQueue() {
             return sortedFilesQueue;
         }
     }
+
     private static final Logger LOG = 
LoggerFactory.getLogger(PipelinedMergeSortTaskTest.class);
 
     @Rule
@@ -72,7 +73,7 @@ public class PipelinedSortBatchTaskTest {
         TestResult testResult = runTest();
 
         PipelinedSortBatchTask.Result result = testResult.getResult();
-        BlockingQueue<File> sortedFilesQueue = 
testResult.getSortedFilesQueue();
+        BlockingQueue<Path> sortedFilesQueue = 
testResult.getSortedFilesQueue();
         assertEquals(0, result.getTotalEntries());
         assertEquals(0, sortedFilesQueue.size());
     }
@@ -84,7 +85,7 @@ public class PipelinedSortBatchTaskTest {
         TestResult testResult = runTest(batch);
 
         PipelinedSortBatchTask.Result result = testResult.getResult();
-        BlockingQueue<File> sortedFilesQueue = 
testResult.getSortedFilesQueue();
+        BlockingQueue<Path> sortedFilesQueue = 
testResult.getSortedFilesQueue();
         assertEquals(0, result.getTotalEntries());
         assertEquals(0, sortedFilesQueue.size());
     }
@@ -102,19 +103,19 @@ public class PipelinedSortBatchTaskTest {
         TestResult testResult = runTest(batch);
 
         PipelinedSortBatchTask.Result result = testResult.getResult();
-        BlockingQueue<File> sortedFilesQueue = 
testResult.getSortedFilesQueue();
+        BlockingQueue<Path> sortedFilesQueue = 
testResult.getSortedFilesQueue();
         assertEquals(6, result.getTotalEntries());
         assertEquals(1, sortedFilesQueue.size());
 
-        File sortedFile = sortedFilesQueue.take();
-        LOG.info("Sorted file:\n{}", Files.readString(sortedFile.toPath()));
+        Path sortedFile = sortedFilesQueue.take();
+        LOG.info("Sorted file:\n{}", Files.readString(sortedFile));
         assertEquals("/a0|{\"key\":1}\n" +
                         "/a0/b0|{\"key\":2}\n" +
                         "/a0/b0/c0|{\"key\":3}\n" +
                         "/a0/b0/c1|{\"key\":4}\n" +
                         "/a0/b1|{\"key\":5}\n" +
                         "/a1/b0|{\"key\":6}\n",
-                Files.readString(sortedFile.toPath())
+                Files.readString(sortedFile)
         );
     }
 
@@ -133,23 +134,23 @@ public class PipelinedSortBatchTaskTest {
         TestResult testResult = runTest(batch1, batch2);
 
         PipelinedSortBatchTask.Result result = testResult.getResult();
-        BlockingQueue<File> sortedFilesQueue = 
testResult.getSortedFilesQueue();
+        BlockingQueue<Path> sortedFilesQueue = 
testResult.getSortedFilesQueue();
         assertEquals(6, result.getTotalEntries());
         assertEquals(2, sortedFilesQueue.size());
 
-        File sortedFile1 = sortedFilesQueue.take();
-        File sortedFile2 = sortedFilesQueue.take();
+        Path sortedFile1 = sortedFilesQueue.take();
+        Path sortedFile2 = sortedFilesQueue.take();
 
-        LOG.info("Sorted file:\n{}", Files.readString(sortedFile1.toPath()));
-        LOG.info("Sorted file:\n{}", Files.readString(sortedFile2.toPath()));
+        LOG.info("Sorted file:\n{}", Files.readString(sortedFile1));
+        LOG.info("Sorted file:\n{}", Files.readString(sortedFile2));
         assertEquals("/a0|{\"key\":1}\n" +
                         "/a0/b0|{\"key\":2}\n" +
                         "/a1/b0|{\"key\":6}\n",
-                Files.readString(sortedFile1.toPath()));
+                Files.readString(sortedFile1));
         assertEquals("/a0/b0/c0|{\"key\":3}\n" +
                         "/a0/b0/c1|{\"key\":4}\n" +
                         "/a0/b1|{\"key\":5}\n",
-                Files.readString(sortedFile2.toPath())
+                Files.readString(sortedFile2)
         );
     }
 
@@ -162,11 +163,11 @@ public class PipelinedSortBatchTaskTest {
     }
 
     private TestResult runTest(NodeStateEntryBatch... nodeStateEntryBatches) 
throws Exception {
-        File sortRoot = sortFolder.getRoot();
+        Path sortRoot = sortFolder.getRoot().toPath();
         int numberOfBuffers = nodeStateEntryBatches.length + 1; // +1 for the 
sentinel
         ArrayBlockingQueue<NodeStateEntryBatch> emptyBatchesQueue = new 
ArrayBlockingQueue<>(numberOfBuffers);
         ArrayBlockingQueue<NodeStateEntryBatch> nonEmptyBatchesQueue = new 
ArrayBlockingQueue<>(numberOfBuffers);
-        ArrayBlockingQueue<File> sortedFilesQueue = new 
ArrayBlockingQueue<>(numberOfBuffers);
+        ArrayBlockingQueue<Path> sortedFilesQueue = new 
ArrayBlockingQueue<>(numberOfBuffers);
 
         for (NodeStateEntryBatch nodeStateEntryBatch : nodeStateEntryBatches) {
             nonEmptyBatchesQueue.put(nodeStateEntryBatch);

Reply via email to