This is an automated email from the ASF dual-hosted git repository.

nfsantos pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8494c94520 OAK-10541 - Pipelined strategy: improve memory management 
of transform stage (#1202)
8494c94520 is described below

commit 8494c94520e92374d658ca697f43daca26f61478
Author: Nuno Santos <[email protected]>
AuthorDate: Fri Nov 17 13:39:19 2023 +0100

    OAK-10541 - Pipelined strategy: improve memory management of transform 
stage (#1202)
    
    * Remove intermediate buffer used to serialize entries in transform thread.
    *  Improve how memory is reserved for the sort buffer in the sort batch 
thread. Make it configurable.
    * When writing to compressed files, buffer between the compression streams 
and the file output stream instead of buffering on top of the compression 
stream. The compression streams already do their own buffering.
    * Print more metrics at the end of the sort-batch task: time creating sort 
array, time sorting, time writing.
---
 .../oak/commons/sort/ExternalSortByteArray.java    |  8 +-
 .../jackrabbit/oak/commons/sort/package-info.java  |  2 +-
 .../flatfile/pipelined/NodeStateEntryBatch.java    | 54 ++++++-------
 .../flatfile/pipelined/PipelinedMergeSortTask.java |  4 +-
 .../pipelined/PipelinedMongoDownloadTask.java      | 13 +--
 .../flatfile/pipelined/PipelinedSortBatchTask.java | 94 +++++++++++++++++-----
 .../flatfile/pipelined/PipelinedStrategy.java      | 87 +++++++++++---------
 .../flatfile/pipelined/PipelinedTransformTask.java | 19 +++--
 .../flatfile/pipelined/PipelinedUtils.java         | 58 +++++++++++++
 .../pipelined/TransformStageStatistics.java        |  6 +-
 .../document/indexstore/IndexStoreUtils.java       | 14 ++--
 .../pipelined/NodeStateEntryBatchTest.java         | 49 +++++------
 .../pipelined/PipelinedSortBatchTaskTest.java      |  4 +
 .../flatfile/pipelined/PipelinedUtilsTest.java     | 66 +++++++++++++++
 14 files changed, 336 insertions(+), 142 deletions(-)

diff --git 
a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortByteArray.java
 
b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortByteArray.java
index 10c3d7735e..8dd1c590d2 100644
--- 
a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortByteArray.java
+++ 
b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortByteArray.java
@@ -19,11 +19,11 @@ package org.apache.jackrabbit.oak.commons.sort;
 import org.apache.jackrabbit.guava.common.base.Preconditions;
 import org.apache.jackrabbit.oak.commons.Compression;
 
-import java.io.BufferedOutputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -39,14 +39,14 @@ import java.util.function.Function;
 public class ExternalSortByteArray {
     private final static int DEFAULT_BUFFER_SIZE = 16 * 1024;
 
-    public static <T> void mergeSortedFilesBinary(List<Path> files, 
BufferedOutputStream fbw, final Comparator<T> cmp,
+    public static <T> void mergeSortedFilesBinary(List<Path> files, 
OutputStream fbw, final Comparator<T> cmp,
                                                   boolean distinct, 
Compression algorithm,
                                                   Function<T, byte[]> 
typeToByteArray, Function<byte[], T> byteArrayToType)
             throws IOException {
         mergeSortedFilesBinary(files, fbw, cmp, distinct, algorithm, 
typeToByteArray, byteArrayToType, DEFAULT_BUFFER_SIZE);
     }
 
-    public static <T> void mergeSortedFilesBinary(List<Path> files, 
BufferedOutputStream fbw, final Comparator<T> cmp,
+    public static <T> void mergeSortedFilesBinary(List<Path> files, 
OutputStream fbw, final Comparator<T> cmp,
                                                   boolean distinct, 
Compression algorithm,
                                                   Function<T, byte[]> 
typeToByteArray, Function<byte[], T> byteArrayToType, int readBufferSize)
             throws IOException {
@@ -70,7 +70,7 @@ public class ExternalSortByteArray {
         }
     }
 
-    private static <T> int mergeBinary(BufferedOutputStream fbw, final 
Comparator<T> cmp, boolean distinct,
+    private static <T> int mergeBinary(OutputStream fbw, final Comparator<T> 
cmp, boolean distinct,
                                        List<BinaryFileBuffer<T>> buffers, 
Function<T, byte[]> typeToByteArray)
             throws IOException {
         PriorityQueue<BinaryFileBuffer<T>> pq = new PriorityQueue<>(
diff --git 
a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java
 
b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java
index 10f91cc554..7c3993f7de 100644
--- 
a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java
+++ 
b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-@Version("1.3.0")
+@Version("2.0.0")
 package org.apache.jackrabbit.oak.commons.sort;
 
 import org.osgi.annotation.versioning.Version;
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java
index 7acf8feeb6..bfddbd45e7 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java
@@ -21,7 +21,6 @@ package 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
 
 public class NodeStateEntryBatch {
 
@@ -35,7 +34,6 @@ public class NodeStateEntryBatch {
         }
     }
 
-    public static final byte DELIMITER = '|';
     // Must be large enough to hold a full node state entry
     static final int MIN_BUFFER_SIZE = 256 * 1024;
 
@@ -51,49 +49,46 @@ public class NodeStateEntryBatch {
     }
 
     private final ByteBuffer buffer;
-    private final ArrayList<SortKey> sortBuffer;
     private final int maxEntries;
+    private int numberOfEntries = 0;
+    private int sizeOfEntriesBytes = 0;
 
     public NodeStateEntryBatch(ByteBuffer buffer, int maxEntries) {
         this.buffer = buffer;
         this.maxEntries = maxEntries;
-        this.sortBuffer = new ArrayList<>(maxEntries);
-    }
-
-    public ArrayList<SortKey> getSortBuffer() {
-        return sortBuffer;
-    }
-
-    public ByteBuffer getBuffer() {
-        return buffer;
     }
 
     public int addEntry(String path, byte[] entryData) throws 
BufferFullException {
-        if (numberOfEntries() == maxEntries) {
-            throw new BufferFullException("Sort buffer size is full, reached 
max entries: " + sortBuffer.size());
+        if (numberOfEntries == maxEntries) {
+            throw new BufferFullException("Sort buffer size is full, reached 
max entries: " + numberOfEntries);
         }
         int bufferPos = buffer.position();
         byte[] pathBytes = path.getBytes(StandardCharsets.UTF_8);
-        int entrySize = pathBytes.length + 1 + entryData.length;
+        int totalSize = 4 + pathBytes.length + 4 + entryData.length;
         try {
-            buffer.putInt(entrySize);
+            // Each nse is written as: "len(path)|path|len(json)|json"
+            buffer.putInt(pathBytes.length);
             buffer.put(pathBytes);
-            buffer.put(DELIMITER);
+            buffer.putInt(entryData.length);
             buffer.put(entryData);
-            String[] key = SortKey.genSortKeyPathElements(path);
-            sortBuffer.add(new SortKey(key, bufferPos));
-            return entrySize;
+            numberOfEntries++;
+            sizeOfEntriesBytes += totalSize;
+            return totalSize;
         } catch (BufferOverflowException e) {
             buffer.position(bufferPos);
-            throw new BufferFullException("Buffer full while adding entry: " + 
path + " size: " + entrySize, e);
+            throw new BufferFullException("while adding entry " + path + " of 
size: " + totalSize, e);
         }
     }
 
+    public ByteBuffer getBuffer() {
+        return buffer;
+    }
+
     public boolean isAtMaxEntries() {
-        if (sortBuffer.size() > maxEntries) {
-            throw new AssertionError("Sort buffer size exceeded max entries: " 
+ sortBuffer.size() + " > " + maxEntries);
+        if (numberOfEntries > maxEntries) {
+            throw new AssertionError("Sort buffer size exceeded max entries: " 
+ numberOfEntries + " > " + maxEntries);
         }
-        return sortBuffer.size() == maxEntries;
+        return numberOfEntries == maxEntries;
     }
 
     public void flip() {
@@ -102,18 +97,19 @@ public class NodeStateEntryBatch {
 
     public void reset() {
         buffer.clear();
-        sortBuffer.clear();
+        numberOfEntries = 0;
+        sizeOfEntriesBytes = 0;
     }
 
-    public int sizeOfEntries() {
-        return buffer.position();
+    public int sizeOfEntriesBytes() {
+        return sizeOfEntriesBytes;
     }
 
     public int numberOfEntries() {
-        return sortBuffer.size();
+        return numberOfEntries;
     }
 
     public int capacity() {
         return buffer.capacity();
     }
-}
\ No newline at end of file
+}
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
index 7090fcdba6..bf4083e796 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
@@ -33,8 +33,8 @@ import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -328,7 +328,7 @@ public class PipelinedMergeSortTask implements 
Callable<PipelinedMergeSortTask.R
 
     private Path sortStoreFile(List<Path> sortedFilesBatch) throws IOException 
{
         Path sortedFile = storeDir.resolve(getSortedStoreFileName(algorithm));
-        try (BufferedOutputStream writer = 
IndexStoreUtils.createOutputStream(sortedFile, algorithm)) {
+        try (OutputStream writer = 
IndexStoreUtils.createOutputStream(sortedFile, algorithm)) {
             Function<byte[], NodeStateHolder> byteArrayToType = new 
NodeStateHolderFactory();
             Function<NodeStateHolder, byte[]> typeToByteArray = holder -> 
holder == null ? null : holder.getLine();
             ExternalSortByteArray.mergeSortedFilesBinary(sortedFilesBatch,
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
index a9a8e863a2..6c16faf703 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
@@ -196,18 +196,19 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
             } else {
                 downloadWithNaturalOrdering();
             }
-            double enqueueingDelayPercentage = (100.0 * 
totalEnqueueWaitTimeMillis) / downloadStartWatch.elapsed(TimeUnit.MILLISECONDS);
+            long durationMillis = 
downloadStartWatch.elapsed(TimeUnit.MILLISECONDS);
+            String enqueueingDelayPercentage = 
PipelinedUtils.formatAsPercentage(totalEnqueueWaitTimeMillis, durationMillis);
             String metrics = MetricsFormatter.newBuilder()
                     .add("duration", 
FormattingUtils.formatToSeconds(downloadStartWatch))
-                    .add("durationSeconds", 
downloadStartWatch.elapsed(TimeUnit.SECONDS))
+                    .add("durationSeconds", durationMillis/1000)
                     .add("documentsDownloaded", documentsRead)
-                    .add("enqueueingDelayMs", totalEnqueueWaitTimeMillis)
-                    .add("enqueueingDelayPercentage", String.format("%1.2f", 
enqueueingDelayPercentage))
+                    .add("enqueueingDelayMillis", totalEnqueueWaitTimeMillis)
+                    .add("enqueueingDelayPercentage", 
enqueueingDelayPercentage)
                     .build();
 
             MetricsUtils.setCounterOnce(statisticsProvider,
                     
PipelinedMetrics.OAK_INDEXER_PIPELINED_MONGO_DOWNLOAD_ENQUEUE_DELAY_PERCENTAGE,
-                    Math.round(enqueueingDelayPercentage)
+                    PipelinedUtils.toPercentage(totalEnqueueWaitTimeMillis, 
durationMillis)
             );
             LOG.info("[TASK:{}:END] Metrics: {}", 
THREAD_NAME.toUpperCase(Locale.ROOT), metrics);
             return new Result(documentsRead);
@@ -225,7 +226,7 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
     private void reportProgress(String id) {
         if (this.documentsRead % 10000 == 0) {
             double rate = ((double) this.documentsRead) / 
downloadStartWatch.elapsed(TimeUnit.SECONDS);
-            String formattedRate = String.format("%1.2f nodes/s, %1.2f 
nodes/hr", rate, rate * 3600);
+            String formattedRate = String.format(Locale.ROOT, "%1.2f nodes/s, 
%1.2f nodes/hr", rate, rate * 3600);
             LOG.info("Dumping from NSET Traversed #{} {} [{}] (Elapsed {})",
                     this.documentsRead, id, formattedRate, 
FormattingUtils.formatToSeconds(downloadStartWatch));
         }
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
index ab5fca9bee..c53c72a955 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
@@ -21,13 +21,15 @@ package 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
 import org.apache.commons.io.FileUtils;
 import org.apache.jackrabbit.guava.common.base.Stopwatch;
 import org.apache.jackrabbit.oak.commons.Compression;
+import 
org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreUtils;
 import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -38,7 +40,7 @@ import java.util.concurrent.Callable;
 
 import static 
org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCountBin;
 import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy.SENTINEL_NSE_BUFFER;
-import static 
org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreUtils.createOutputStream;
+import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedUtils.formatAsPercentage;
 
 /**
  * Receives batches of node state entries, sorts then in memory, and finally 
writes them to a file.
@@ -66,9 +68,12 @@ class PipelinedSortBatchTask implements 
Callable<PipelinedSortBatchTask.Result>
     private final BlockingQueue<NodeStateEntryBatch> nonEmptyBuffersQueue;
     private final BlockingQueue<Path> sortedFilesQueue;
     private final Path sortWorkDir;
-    private final byte[] copyBuffer = new byte[4096];
+    private final ArrayList<SortKey> sortBuffer = new ArrayList<>(32 * 1024);
     private long entriesProcessed = 0;
     private long batchesProcessed = 0;
+    private long timeCreatingSortArrayMillis = 0;
+    private long timeSortingMillis = 0;
+    private long timeWritingMillis = 0;
 
     public PipelinedSortBatchTask(Path storeDir,
                                   PathElementComparator pathComparator,
@@ -86,6 +91,7 @@ class PipelinedSortBatchTask implements 
Callable<PipelinedSortBatchTask.Result>
 
     @Override
     public Result call() throws Exception {
+        Stopwatch taskStartTime = Stopwatch.createStarted();
         String originalName = Thread.currentThread().getName();
         Thread.currentThread().setName(THREAD_NAME);
         try {
@@ -94,9 +100,22 @@ class PipelinedSortBatchTask implements 
Callable<PipelinedSortBatchTask.Result>
                 LOG.info("Waiting for next batch");
                 NodeStateEntryBatch nseBuffer = nonEmptyBuffersQueue.take();
                 if (nseBuffer == SENTINEL_NSE_BUFFER) {
+                    long totalTimeMillis = taskStartTime.elapsed().toMillis();
+                    sortBuffer.clear(); // It should be empty already
+                    sortBuffer.trimToSize();  // Release the internal array 
which may be very large, several millions
+                    String timeCreatingSortArrayPercentage = 
formatAsPercentage(timeCreatingSortArrayMillis, totalTimeMillis);
+                    String timeSortingPercentage = 
formatAsPercentage(timeSortingMillis, totalTimeMillis);
+                    String timeWritingPercentage = 
formatAsPercentage(timeWritingMillis, totalTimeMillis);
                     String metrics = MetricsFormatter.newBuilder()
                             .add("batchesProcessed", batchesProcessed)
                             .add("entriesProcessed", entriesProcessed)
+                            .add("timeCreatingSortArrayMillis", 
timeCreatingSortArrayMillis)
+                            .add("timeCreatingSortArrayPercentage", 
timeCreatingSortArrayPercentage)
+                            .add("timeSortingMillis", timeSortingMillis)
+                            .add("timeSortingPercentage", 
timeSortingPercentage)
+                            .add("timeWritingMillis", timeWritingMillis)
+                            .add("timeWritingPercentage", 
timeWritingPercentage)
+                            .add("totalTimeSeconds", totalTimeMillis / 1000)
                             .build();
                     LOG.info("[TASK:{}:END] Metrics: {}", 
THREAD_NAME.toUpperCase(Locale.ROOT), metrics);
                     return new Result(entriesProcessed);
@@ -116,48 +135,81 @@ class PipelinedSortBatchTask implements 
Callable<PipelinedSortBatchTask.Result>
         }
     }
 
+    private void buildSortArray(NodeStateEntryBatch nseb) {
+        Stopwatch startTime = Stopwatch.createStarted();
+        ByteBuffer buffer = nseb.getBuffer();
+        int totalPathSize = 0;
+        while (buffer.hasRemaining()) {
+            int positionInBuffer = buffer.position();
+            // Read the next key from the buffer
+            int pathLength = buffer.getInt();
+            totalPathSize += pathLength;
+            // Create the String directly from the buffer without creating an 
intermediate byte[]
+            String path = new String(buffer.array(), buffer.position(), 
pathLength, StandardCharsets.UTF_8);
+            buffer.position(buffer.position() + pathLength);
+            // Skip the json
+            int entryLength = buffer.getInt();
+            buffer.position(buffer.position() + entryLength);
+            String[] pathSegments = SortKey.genSortKeyPathElements(path);
+            sortBuffer.add(new SortKey(pathSegments, positionInBuffer));
+        }
+        timeCreatingSortArrayMillis += startTime.elapsed().toMillis();
+        LOG.info("Built sort array in {}. Entries: {}, Total size of path 
strings: {}",
+                startTime, sortBuffer.size(), 
humanReadableByteCountBin(totalPathSize));
+    }
+
     private void sortAndSaveBatch(NodeStateEntryBatch nseb) throws Exception {
-        ArrayList<SortKey> sortBuffer = nseb.getSortBuffer();
         ByteBuffer buffer = nseb.getBuffer();
         LOG.info("Going to sort batch in memory. Entries: {}, Size: {}",
-                sortBuffer.size(), 
humanReadableByteCountBin(buffer.remaining()));
+                nseb.numberOfEntries(), 
humanReadableByteCountBin(nseb.sizeOfEntriesBytes()));
+        sortBuffer.clear();
+        buildSortArray(nseb);
         if (sortBuffer.isEmpty()) {
             return;
         }
         Stopwatch sortClock = Stopwatch.createStarted();
         sortBuffer.sort(pathComparator);
+        timeSortingMillis += sortClock.elapsed().toMillis();
         LOG.info("Sorted batch in {}. Saving to disk", sortClock);
         Stopwatch saveClock = Stopwatch.createStarted();
         Path newtmpfile = Files.createTempFile(sortWorkDir, "sortInBatch", 
"flatfile");
         long textSize = 0;
         batchesProcessed++;
-        try (BufferedOutputStream writer = createOutputStream(newtmpfile, 
algorithm)) {
+        try (OutputStream os = IndexStoreUtils.createOutputStream(newtmpfile, 
algorithm)) {
             for (SortKey entry : sortBuffer) {
                 entriesProcessed++;
                 // Retrieve the entry from the buffer
                 int posInBuffer = entry.getBufferPos();
                 buffer.position(posInBuffer);
-                int entrySize = buffer.getInt();
-
-                // Write the entry to the file without creating intermediate 
byte[]
-                int bytesRemaining = entrySize;
-                while (bytesRemaining > 0) {
-                    int bytesRead = Math.min(copyBuffer.length, 
bytesRemaining);
-                    buffer.get(copyBuffer, 0, bytesRead);
-                    writer.write(copyBuffer, 0, bytesRead);
-                    bytesRemaining -= bytesRead;
-                }
-                writer.write(PipelinedStrategy.FLATFILESTORE_LINE_SEPARATOR);
-                textSize += entrySize + 1;
+
+                int pathSize = buffer.getInt();
+                copyField(os, buffer, pathSize);
+                os.write(PipelinedStrategy.FLATFILESTORE_DELIMITER);
+                int jsonSize = buffer.getInt();
+                copyField(os, buffer, jsonSize);
+                os.write(PipelinedStrategy.FLATFILESTORE_LINE_SEPARATOR);
+                textSize += pathSize + jsonSize + 2;
             }
         }
-        LOG.info("Stored batch of size {} (uncompressed {}) with {} entries in 
{}",
-                humanReadableByteCountBin(Files.size(newtmpfile)),
+        timeWritingMillis += saveClock.elapsed().toMillis();
+        long compressedSize = Files.size(newtmpfile);
+        LOG.info("Wrote batch of size {} (uncompressed {}) with {} entries in 
{} at {}",
+                humanReadableByteCountBin(compressedSize),
                 humanReadableByteCountBin(textSize),
-                sortBuffer.size(), saveClock);
+                sortBuffer.size(), saveClock,
+                PipelinedUtils.formatAsTransferSpeedMBs(compressedSize, 
saveClock.elapsed().toMillis())
+        );
+        // Free the memory taken by the entries in the buffer
+        sortBuffer.clear();
         sortedFilesQueue.put(newtmpfile);
     }
 
+    private void copyField(OutputStream writer, ByteBuffer buffer, int 
fieldSize) throws IOException {
+        // Write the entry to the file without creating intermediate byte[]
+        writer.write(buffer.array(), buffer.position(), fieldSize);
+        buffer.position(buffer.position() + fieldSize);
+    }
+
     private static Path createdSortWorkDir(Path storeDir) throws IOException {
         Path sortedFileDir = storeDir.resolve("sort-work-dir");
         FileUtils.forceMkdir(sortedFileDir.toFile());
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
index 57369d701f..14bf4e8bdb 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
@@ -131,13 +131,16 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
     public static final String OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB = 
"oak.indexer.pipelined.workingMemoryMB";
     // 0 means autodetect
     public static final int DEFAULT_OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB = 
0;
+    // Between 1 and 100
+    public static final String 
OAK_INDEXER_PIPELINED_SORT_BUFFER_MEMORY_PERCENTAGE = 
"oak.indexer.pipelined.sortBufferMemoryPercentage";
+    public static final int 
DEFAULT_OAK_INDEXER_PIPELINED_SORT_BUFFER_MEMORY_PERCENTAGE = 25;
 
     static final NodeDocument[] SENTINEL_MONGO_DOCUMENT = new NodeDocument[0];
     static final NodeStateEntryBatch SENTINEL_NSE_BUFFER = new 
NodeStateEntryBatch(ByteBuffer.allocate(0), 0);
     static final Path SENTINEL_SORTED_FILES_QUEUE = Paths.get("SENTINEL");
     static final Charset FLATFILESTORE_CHARSET = StandardCharsets.UTF_8;
     static final char FLATFILESTORE_LINE_SEPARATOR = '\n';
-
+    static final byte FLATFILESTORE_DELIMITER = '|';
     private static final Logger LOG = 
LoggerFactory.getLogger(PipelinedStrategy.class);
     // A MongoDB document is at most 16MB, so the buffer that holds node state 
entries must be at least that big
     private static final int MIN_MONGO_DOC_QUEUE_RESERVED_MEMORY_MB = 16;
@@ -219,12 +222,10 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
     private final int mongoDocBatchMaxSizeMB;
     private final int mongoDocBatchMaxNumberOfDocuments;
     private final int nseBuffersCount;
-    private final int nseBufferMaxEntriesPerBuffer;
     private final int nseBuffersSizeBytes;
 
     private long nodeStateEntriesExtracted;
 
-
     /**
      * @param pathPredicate      Used by the transform stage to test if a node 
should be kept or discarded.
      * @param pathFilters        If non-empty, the download stage will use 
these filters to try to create a query that downloads
@@ -270,10 +271,9 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
         Preconditions.checkArgument(numberOfTransformThreads > 0,
                 "Invalid value for property " + 
OAK_INDEXER_PIPELINED_TRANSFORM_THREADS + ": " + numberOfTransformThreads + ". 
Must be > 0");
 
-        // Derived values for transform <-> sort-save
-        int nseBuffersReservedMemoryMB = readNSEBuffersReservedMemory();
-
-        // Calculate values derived from the configuration settings
+        int sortBufferMemoryPercentage = 
ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_SORT_BUFFER_MEMORY_PERCENTAGE,
 DEFAULT_OAK_INDEXER_PIPELINED_SORT_BUFFER_MEMORY_PERCENTAGE);
+        Preconditions.checkArgument(sortBufferMemoryPercentage > 0 && 
sortBufferMemoryPercentage <= 100,
+                "Invalid value for property " + 
OAK_INDEXER_PIPELINED_SORT_BUFFER_MEMORY_PERCENTAGE + ": " + 
numberOfTransformThreads + ". Must be between 1 and 100");
 
         // mongo-dump  <-> transform threads
         Preconditions.checkArgument(mongoDocQueueReservedMemoryMB >= 8 * 
mongoDocBatchMaxSizeMB,
@@ -283,21 +283,19 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
         );
         this.mongoDocQueueSize = mongoDocQueueReservedMemoryMB / 
mongoDocBatchMaxSizeMB;
 
-        // Transform threads <-> merge-sort
+        // Derived values for transform <-> sort-save
+        int nseWorkingMemoryMB = readNSEBuffersReservedMemory();
         this.nseBuffersCount = 1 + numberOfTransformThreads;
-
-        long nseBuffersReservedMemoryBytes = nseBuffersReservedMemoryMB * 
FileUtils.ONE_MB;
+        long nseWorkingMemoryBytes = (long) nseWorkingMemoryMB * 
FileUtils.ONE_MB;
         // The working memory is divided in the following regions:
         // - #transforThreads   NSE Binary buffers
-        // - 1x                 Metadata of NSE entries in Binary buffers, 
list of SortKeys
-        // A ByteBuffer can be at most Integer.MAX_VALUE bytes long
-        this.nseBuffersSizeBytes = 
limitToIntegerRange(nseBuffersReservedMemoryBytes / (nseBuffersCount + 1));
+        // - x1                 Memory reserved for the array created by the 
sort-batch thread with the keys of the entries
+        //                      in the batch that is being sorted
+        long memoryReservedForSortKeysArray = 
estimateMaxSizeOfSortKeyArray(nseWorkingMemoryBytes, nseBuffersCount, 
sortBufferMemoryPercentage);
+        long memoryReservedForBuffers = nseWorkingMemoryBytes - 
memoryReservedForSortKeysArray;
 
-        // Assuming 1 instance of SortKey takes around 256 bytes. We have 
#transformThreads + 1 regions of nseBufferSizeBytes.
-        // The extra region is for the SortKey instances. Below we compute the 
total number of SortKey instances that
-        // fit in the memory region reserved for them, assuming that each 
SortKey instance takes 256 bytes. Then we
-        // distribute equally these available entries among the nse buffers
-        this.nseBufferMaxEntriesPerBuffer = (this.nseBuffersSizeBytes / 256) / 
this.nseBuffersCount;
+        // A ByteBuffer can be at most Integer.MAX_VALUE bytes long
+        this.nseBuffersSizeBytes = 
limitToIntegerRange(memoryReservedForBuffers / nseBuffersCount);
 
         if (nseBuffersSizeBytes < MIN_ENTRY_BATCH_BUFFER_SIZE_MB * 
FileUtils.ONE_MB) {
             throw new IllegalArgumentException("Entry batch buffer size too 
small: " + nseBuffersSizeBytes +
@@ -311,11 +309,21 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
                 mongoDocQueueReservedMemoryMB,
                 mongoDocBatchMaxSizeMB,
                 mongoDocQueueSize);
-        LOG.info("NodeStateEntryBuffers: [ workingMemory: {} MB, 
numberOfBuffers: {}, bufferSize: {}, maxEntriesPerBuffer: {} ]",
-                nseBuffersReservedMemoryMB,
+        LOG.info("NodeStateEntryBuffers: [ workingMemory: {} MB, 
numberOfBuffers: {}, bufferSize: {}, sortBufferReservedMemory: {} ]",
+                nseWorkingMemoryMB,
                 nseBuffersCount,
                 IOUtils.humanReadableByteCountBin(nseBuffersSizeBytes),
-                nseBufferMaxEntriesPerBuffer);
+                
IOUtils.humanReadableByteCountBin(memoryReservedForSortKeysArray)
+        );
+    }
+
+    static long estimateMaxSizeOfSortKeyArray(long nseWorkingMemoryBytes, long 
nseBuffersCount, int sortBufferMemoryPercentage) {
+        // We reserve a percentage of the size of a buffer for the sort keys 
array. That is, we are assuming that for every line
+        // in the sort buffer, the memory needed to store the SortKey of the 
path section of the line will not be more
+        // than sortBufferMemoryPercentage of the total size of the line in 
average
+        // Estimate memory needed by the sort keys array. We assume each entry 
requires 256 bytes.
+        long approxNseBufferSize = limitToIntegerRange(nseWorkingMemoryBytes / 
nseBuffersCount);
+        return approxNseBufferSize * sortBufferMemoryPercentage / 100;
     }
 
     private int readNSEBuffersReservedMemory() {
@@ -390,12 +398,13 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
 
             // Create empty buffers
             for (int i = 0; i < nseBuffersCount; i++) {
-                
emptyBatchesQueue.add(NodeStateEntryBatch.createNodeStateEntryBatch(nseBuffersSizeBytes,
 nseBufferMaxEntriesPerBuffer));
+                // No limits on the number of entries, only on their total 
size. This might be revised later.
+                
emptyBatchesQueue.add(NodeStateEntryBatch.createNodeStateEntryBatch(nseBuffersSizeBytes,
 Integer.MAX_VALUE));
             }
 
             LOG.info("[TASK:PIPELINED-DUMP:START] Starting to build FFS");
             Stopwatch start = Stopwatch.createStarted();
-            PipelinedMongoDownloadTask downloadTask = new 
PipelinedMongoDownloadTask(
+            ecs.submit(new PipelinedMongoDownloadTask(
                     mongoDatabase,
                     docStore,
                     (int) (mongoDocBatchMaxSizeMB * FileUtils.ONE_MB),
@@ -403,14 +412,11 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
                     mongoDocQueue,
                     pathFilters,
                     statisticsProvider
-            );
-            ecs.submit(downloadTask);
-
-            Path flatFileStore = null;
+            ));
 
             for (int i = 0; i < numberOfTransformThreads; i++) {
                 NodeStateEntryWriter entryWriter = new 
NodeStateEntryWriter(blobStore);
-                PipelinedTransformTask transformTask = new 
PipelinedTransformTask(
+                ecs.submit(new PipelinedTransformTask(
                         docStore,
                         documentNodeStore,
                         rootRevision,
@@ -420,30 +426,27 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
                         emptyBatchesQueue,
                         nonEmptyBatchesQueue,
                         transformStageStatistics
-                );
-                ecs.submit(transformTask);
+                ));
             }
 
-            PipelinedSortBatchTask sortTask = new PipelinedSortBatchTask(
+            ecs.submit(new PipelinedSortBatchTask(
                     this.getStoreDir().toPath(),
                     pathComparator,
                     this.getAlgorithm(),
                     emptyBatchesQueue,
                     nonEmptyBatchesQueue,
                     sortedFilesQueue
-            );
-            ecs.submit(sortTask);
+            ));
 
             PipelinedMergeSortTask mergeSortTask = new PipelinedMergeSortTask(
                     this.getStoreDir().toPath(),
                     pathComparator,
                     this.getAlgorithm(),
-                    sortedFilesQueue,
-                    statisticsProvider
-            );
-            ecs.submit(mergeSortTask);
+                    sortedFilesQueue, statisticsProvider);
 
+            ecs.submit(mergeSortTask);
 
+            Path flatFileStore = null;
             try {
                 LOG.info("Waiting for tasks to complete");
                 int tasksFinished = 0;
@@ -480,8 +483,16 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
                         } else if (result instanceof 
PipelinedSortBatchTask.Result) {
                             PipelinedSortBatchTask.Result sortTaskResult = 
(PipelinedSortBatchTask.Result) result;
                             LOG.info("Sort batch task finished. Entries 
processed: {}", sortTaskResult.getTotalEntries());
-                            printStatistics(mongoDocQueue, emptyBatchesQueue, 
nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics, true);
                             sortedFilesQueue.put(SENTINEL_SORTED_FILES_QUEUE);
+                            // The buffers between transform and merge sort 
tasks are no longer needed, so remove them
+                            // from the queues so they can be garbage 
collected.
+                            // These buffers can be very large, so this is 
important to avoid running out of memory in
+                            // the merge-sort phase
+                            if (!nonEmptyBatchesQueue.isEmpty()) {
+                                LOG.warn("emptyBatchesQueue is not empty. 
Size: {}", emptyBatchesQueue.size());
+                            }
+                            emptyBatchesQueue.clear();
+                            printStatistics(mongoDocQueue, emptyBatchesQueue, 
nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics, true);
 
                         } else if (result instanceof 
PipelinedMergeSortTask.Result) {
                             PipelinedMergeSortTask.Result mergeSortedFilesTask 
= (PipelinedMergeSortTask.Result) result;
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
index f3f269984c..9d9f94f849 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
@@ -20,6 +20,7 @@ package 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.jackrabbit.guava.common.base.Stopwatch;
+import org.apache.jackrabbit.oak.commons.IOUtils;
 import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
 import 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.NodeStateEntryWriter;
 import org.apache.jackrabbit.oak.plugins.document.DocumentNodeState;
@@ -88,6 +89,7 @@ class PipelinedTransformTask implements 
Callable<PipelinedTransformTask.Result>
     private final TransformStageStatistics statistics;
     private final int threadId = threadIdGenerator.getAndIncrement();
     private long totalEnqueueDelayMillis = 0;
+    private long totalEmptyBatchQueueWaitTimeMillis = 0;
 
     public PipelinedTransformTask(MongoDocumentStore mongoStore,
                                   DocumentNodeStore documentNodeStore,
@@ -132,16 +134,20 @@ class PipelinedTransformTask implements 
Callable<PipelinedTransformTask.Result>
                 NodeDocument[] nodeDocumentBatch = mongoDocQueue.take();
                 totalDocumentQueueWaitTimeMillis += 
docQueueWaitStopwatch.elapsed(TimeUnit.MILLISECONDS);
                 if (nodeDocumentBatch == SENTINEL_MONGO_DOCUMENT) {
-                    String totalDocumentQueueWaitPercentage = 
String.format("%1.2f", (100.0 * totalDocumentQueueWaitTimeMillis) / 
taskStartWatch.elapsed(TimeUnit.MILLISECONDS));
-                    String totalEnqueueDelayPercentage = 
String.format("%1.2f", (100.0 * totalEnqueueDelayMillis) / 
taskStartWatch.elapsed(TimeUnit.MILLISECONDS));
+                    long totalDurationMillis = 
taskStartWatch.elapsed(TimeUnit.MILLISECONDS);
+                    String totalDocumentQueueWaitPercentage = 
PipelinedUtils.formatAsPercentage(totalDocumentQueueWaitTimeMillis, 
totalDurationMillis);
+                    String totalEnqueueDelayPercentage = 
PipelinedUtils.formatAsPercentage(totalEnqueueDelayMillis, totalDurationMillis);
+                    String totalEmptyBatchQueueWaitPercentage = 
PipelinedUtils.formatAsPercentage(totalEmptyBatchQueueWaitTimeMillis, 
totalDurationMillis);
                     String metrics = MetricsFormatter.newBuilder()
                             .add("duration", 
FormattingUtils.formatToSeconds(taskStartWatch))
-                            .add("durationSeconds", 
taskStartWatch.elapsed(TimeUnit.SECONDS))
+                            .add("durationSeconds", totalDurationMillis / 1000)
                             .add("nodeStateEntriesGenerated", totalEntryCount)
                             .add("enqueueDelayMillis", totalEnqueueDelayMillis)
                             .add("enqueueDelayPercentage", 
totalEnqueueDelayPercentage)
                             .add("documentQueueWaitMillis", 
totalDocumentQueueWaitTimeMillis)
                             .add("documentQueueWaitPercentage", 
totalDocumentQueueWaitPercentage)
+                            .add("totalEmptyBatchQueueWaitTimeMillis", 
totalEmptyBatchQueueWaitTimeMillis)
+                            .add("totalEmptyBatchQueueWaitPercentage", 
totalEmptyBatchQueueWaitPercentage)
                             .build();
                     LOG.info("[TASK:{}:END] Metrics: {}", 
threadName.toUpperCase(Locale.ROOT), metrics);
                     //Save the last batch
@@ -155,7 +161,7 @@ class PipelinedTransformTask implements 
Callable<PipelinedTransformTask.Result>
                         if (mongoObjectsProcessed % 50000 == 0) {
                             LOG.info("Mongo objects: {}, total entries: {}, 
current batch: {}, Size: {}/{} MB",
                                     mongoObjectsProcessed, totalEntryCount, 
nseBatch.numberOfEntries(),
-                                    nseBatch.sizeOfEntries() / 
FileUtils.ONE_MB,
+                                    nseBatch.sizeOfEntriesBytes() / 
FileUtils.ONE_MB,
                                     nseBatch.capacity() / FileUtils.ONE_MB
                             );
                         }
@@ -182,11 +188,14 @@ class PipelinedTransformTask implements 
Callable<PipelinedTransformTask.Result>
                                             entrySize = 
nseBatch.addEntry(path, jsonBytes);
                                         } catch 
(NodeStateEntryBatch.BufferFullException e) {
                                             LOG.info("Buffer full, passing 
buffer to sort task. Total entries: {}, entries in buffer {}, buffer size: {}",
-                                                    totalEntryCount, 
nseBatch.numberOfEntries(), nseBatch.sizeOfEntries());
+                                                    totalEntryCount, 
nseBatch.numberOfEntries(), 
IOUtils.humanReadableByteCountBin(nseBatch.sizeOfEntriesBytes()));
                                             nseBatch.flip();
                                             tryEnqueue(nseBatch);
                                             // Get an empty buffer
+                                            Stopwatch 
emptyBatchesQueueStopwatch = Stopwatch.createStarted();
                                             nseBatch = 
emptyBatchesQueue.take();
+                                            totalEmptyBatchQueueWaitTimeMillis 
+= emptyBatchesQueueStopwatch.elapsed(TimeUnit.MILLISECONDS);
+
                                             // Now it must fit, otherwise it 
means that the buffer is smaller than a single
                                             // entry, which is an error.
                                             entrySize = 
nseBatch.addEntry(path, jsonBytes);
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedUtils.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedUtils.java
new file mode 100644
index 0000000000..85fad87ae0
--- /dev/null
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
+
+import org.apache.commons.io.FileUtils;
+
+import java.util.Locale;
+
+public class PipelinedUtils {
+    /**
+     * <p>Format a percentage as a string with 2 decimal places. For instance:
+     * <code>formatAsPercentage(52, 1000)</code> returns 
<code>"5.20"</code>.</p>
+     */
+    public static String formatAsPercentage(long numerator, long denominator) {
+        if (denominator == 0) {
+            return "N/A";
+        } else {
+            return String.format(Locale.ROOT, "%1.2f", (100.0 * numerator) / 
denominator);
+        }
+    }
+
+    /**
+     * <p>Convert to a percentage as an integer from 0 to 100, with -1 
representing an undefined value.</p>
+     */
+    public static int toPercentage(long numerator, long denominator) {
+        if (denominator == 0) {
+            return -1;
+        } else {
+            return (int) Math.round((100.0 * numerator) / denominator);
+        }
+    }
+
+    public static String formatAsTransferSpeedMBs(long numberOfBytes, long 
timeMillis) {
+        if (timeMillis == 0) {
+            return "N/A";
+        } else {
+            double speed = 1000 * (((double) numberOfBytes) / timeMillis) / 
FileUtils.ONE_MB;
+            return String.format(Locale.ROOT, "%1.2f MB/s", speed);
+        }
+    }
+
+}
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/TransformStageStatistics.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/TransformStageStatistics.java
index 0b586c77e4..61cf67e02a 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/TransformStageStatistics.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/TransformStageStatistics.java
@@ -169,10 +169,8 @@ public class TransformStageStatistics {
         long documentsRejectedTotal = documentsRejectedSplitSum + 
documentsRejectedEmptyNodeStateSum;
         long documentsAcceptedTotal = mongoDocumentsTraversedSum - 
documentsRejectedTotal;
         long totalEntries = entriesAcceptedSum + entriesRejectedSum;
-        String documentsAcceptedPercentage = mongoDocumentsTraversedSum == 0 ? 
"N/A" :
-                String.format("%2.1f%%", (100.0 * documentsAcceptedTotal) / 
mongoDocumentsTraversedSum);
-        String entriesAcceptedPercentage = totalEntries == 0 ? "N/A" :
-                String.format("%1.1f%%", (100.0 * entriesAcceptedSum) / 
totalEntries);
+        String documentsAcceptedPercentage = 
PipelinedUtils.formatAsPercentage(documentsAcceptedTotal, 
mongoDocumentsTraversedSum);
+        String entriesAcceptedPercentage = 
PipelinedUtils.formatAsPercentage(entriesAcceptedSum, totalEntries);
         long avgEntrySize = entriesAcceptedSum == 0 ? -1 :
                 extractedEntriesTotalSizeSum / entriesAcceptedSum;
         return MetricsFormatter.newBuilder()
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/indexstore/IndexStoreUtils.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/indexstore/IndexStoreUtils.java
index b901621887..cdb3c1f78d 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/indexstore/IndexStoreUtils.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/indexstore/IndexStoreUtils.java
@@ -88,14 +88,12 @@ public class IndexStoreUtils {
         return new BufferedWriter(new 
OutputStreamWriter(algorithm.getOutputStream(out)));
     }
 
-    public static BufferedOutputStream createOutputStream(File file, 
Compression algorithm) throws IOException {
-        OutputStream out = new FileOutputStream(file);
-        return new BufferedOutputStream(algorithm.getOutputStream(out));
-    }
-
-    public static BufferedOutputStream createOutputStream(Path file, 
Compression algorithm) throws IOException {
-        OutputStream out = Files.newOutputStream(file);
-        return new BufferedOutputStream(algorithm.getOutputStream(out));
+    public static OutputStream createOutputStream(Path file, Compression 
algorithm) throws IOException {
+        // The output streams created by LZ4 and GZIP buffer their input, so 
we should not wrap then again.
+        // However, the implementation of the compression streams may make 
small writes to the underlying stream,
+        // so we buffer the FileOutputStream
+        OutputStream out = new 
BufferedOutputStream(Files.newOutputStream(file));
+        return algorithm.getOutputStream(out);
     }
 
     public static long sizeOf(List<File> sortedFiles) {
diff --git 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java
 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java
index 8051f2c3f3..4d7857c8b3 100644
--- 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java
+++ 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java
@@ -20,8 +20,6 @@ package 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
 
 import org.junit.Test;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 
@@ -48,43 +46,46 @@ public class NodeStateEntryBatchTest {
     @Test
     public void testMaximumBufferSize() {
         NodeStateEntryBatch batch = 
NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE,
 10);
-        batch.addEntry("a", new byte[NodeStateEntryBatch.MIN_BUFFER_SIZE - 4 - 
2]); // Needs 4 bytes for the length and 2 for the key
+        int keySize = "a".getBytes(StandardCharsets.UTF_8).length;
+        batch.addEntry("a", new byte[NodeStateEntryBatch.MIN_BUFFER_SIZE - 4 - 
4 - keySize]); // Needs 4 bytes for the length of the path and of the entry data
         assertThrows(NodeStateEntryBatch.BufferFullException.class, () -> 
batch.addEntry("b", new byte[NodeStateEntryBatch.MIN_BUFFER_SIZE]));
 
-        assertEquals(NodeStateEntryBatch.MIN_BUFFER_SIZE, 
batch.sizeOfEntries());
+        assertEquals(NodeStateEntryBatch.MIN_BUFFER_SIZE, 
batch.sizeOfEntriesBytes());
         assertEquals(1, batch.numberOfEntries());
         assertThrows(NodeStateEntryBatch.BufferFullException.class, () -> 
batch.addEntry("b", new byte[1]));
     }
 
     @Test
-    public void flipAndResetBuffer() throws IOException {
-        int sizeOfEntry = NodeStateEntryBatch.MIN_BUFFER_SIZE-4;
-        String key = "a";
-        int jsonNodeLength = sizeOfEntry-2; // minus key and pipe characters
-
+    public void flipAndResetBuffer() {
         NodeStateEntryBatch batch = 
NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE,
 10);
-        byte[] jsonNodeBytes = new byte[jsonNodeLength];
-        for (int i = 0; i < jsonNodeLength; i++) {
+        int expectedBytesWrittenToBuffer = NodeStateEntryBatch.MIN_BUFFER_SIZE;
+
+        String key = "a";
+        int keyLength = "a".getBytes(StandardCharsets.UTF_8).length;
+        byte[] jsonNodeBytes = new byte[batch.capacity()-4-4-keyLength];
+        for (int i = 0; i < jsonNodeBytes.length; i++) {
             jsonNodeBytes[i] = (byte) (i % 127);
         }
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        baos.write("a|".getBytes(StandardCharsets.UTF_8));
-        baos.write(jsonNodeBytes);
-        byte[] expectedContents = baos.toByteArray();
-        int size = batch.addEntry(key, jsonNodeBytes);
-        assertEquals(size, sizeOfEntry);
-        assertEquals(batch.getBuffer().position(), sizeOfEntry + 4);
+
+        int bytesWrittenToBuffer = batch.addEntry(key, jsonNodeBytes);
+        assertEquals(bytesWrittenToBuffer, expectedBytesWrittenToBuffer);
+        assertEquals(batch.getBuffer().position(), 
expectedBytesWrittenToBuffer);
 
         batch.flip();
 
         ByteBuffer buffer = batch.getBuffer();
         assertEquals(buffer.position(), 0);
-        assertEquals(buffer.remaining(), sizeOfEntry + 4);
-        assertEquals(sizeOfEntry, buffer.getInt());
-        byte[] entryData = new byte[sizeOfEntry];
-        buffer.get(entryData);
-        assertEquals(buffer.position(), sizeOfEntry + 4);
-        assertArrayEquals(expectedContents, entryData);
+        assertEquals(buffer.remaining(), expectedBytesWrittenToBuffer);
+        assertEquals(keyLength, buffer.getInt());
+        byte[] keyBytes = new byte[keyLength];
+        buffer.get(keyBytes);
+        assertEquals(key, new String(keyBytes, StandardCharsets.UTF_8));
+
+        int jsonLength = buffer.getInt();
+        byte[] jsonBytes = new byte[jsonLength];
+        buffer.get(jsonBytes);
+        assertEquals(buffer.position(), expectedBytesWrittenToBuffer);
+        assertArrayEquals(jsonNodeBytes, jsonBytes);
 
         batch.reset();
 
diff --git 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
index 70cf463ffa..2e88536fd0 100644
--- 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
+++ 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
@@ -75,6 +75,7 @@ public class PipelinedSortBatchTaskTest {
     @Test
     public void emptyBatch() throws Exception {
         NodeStateEntryBatch batch = 
NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE,
 10);
+        batch.flip();
 
         TestResult testResult = runTest(batch);
 
@@ -93,6 +94,7 @@ public class PipelinedSortBatchTaskTest {
         addEntry(batch, "/a0/b1", "{\"key\":5}");
         addEntry(batch, "/a0/b0/c1", "{\"key\":4}");
         addEntry(batch, "/a0/b0/c0", "{\"key\":3}");
+        batch.flip();
 
         TestResult testResult = runTest(batch);
 
@@ -119,11 +121,13 @@ public class PipelinedSortBatchTaskTest {
         addEntry(batch1, "/a0/b0", "{\"key\":2}");
         addEntry(batch1, "/a0", "{\"key\":1}");
         addEntry(batch1, "/a1/b0", "{\"key\":6}");
+        batch1.flip();
 
         NodeStateEntryBatch batch2 = 
NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE,
 10);
         addEntry(batch2, "/a0/b1", "{\"key\":5}");
         addEntry(batch2, "/a0/b0/c1", "{\"key\":4}");
         addEntry(batch2, "/a0/b0/c0", "{\"key\":3}");
+        batch2.flip();
 
         TestResult testResult = runTest(batch1, batch2);
 
diff --git 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedUtilsTest.java
 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedUtilsTest.java
new file mode 100644
index 0000000000..6749f0e2bb
--- /dev/null
+++ 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedUtilsTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
+
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedUtils.formatAsPercentage;
+import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedUtils.formatAsTransferSpeedMBs;
+import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedUtils.toPercentage;
+import static org.junit.Assert.assertEquals;
+
+public class PipelinedUtilsTest {
+    @Test
+    public void testFormatAsPercentage() {
+        assertEquals("0.00", formatAsPercentage(0, 100));
+        assertEquals("1.00", formatAsPercentage(1, 100));
+        assertEquals("0.10", formatAsPercentage(1, 1000));
+        assertEquals("0.01", formatAsPercentage(1, 10_000));
+        assertEquals("N/A", formatAsPercentage(1, 0));
+        assertEquals("100.00", formatAsPercentage(100, 100));
+        assertEquals("120.00", formatAsPercentage(120, 100));
+        assertEquals("314.16", formatAsPercentage(355, 113));
+    }
+
+    @Test
+    public void testFormatAsTransferSpeedMBs() {
+        assertEquals("0.95 MB/s", formatAsTransferSpeedMBs(1_000_000, 
TimeUnit.SECONDS.toMillis(1)));
+        assertEquals("0.00 MB/s", formatAsTransferSpeedMBs(0, 
TimeUnit.SECONDS.toMillis(1)));
+        assertEquals("0.00 MB/s", formatAsTransferSpeedMBs(1, 
TimeUnit.SECONDS.toMillis(1)));
+        assertEquals("N/A", formatAsTransferSpeedMBs(1_000_000, 
TimeUnit.SECONDS.toMillis(0)));
+        assertEquals("8796093022208.00 MB/s", 
formatAsTransferSpeedMBs(Long.MAX_VALUE, TimeUnit.SECONDS.toMillis(1)));
+        assertEquals("-8796093022208.00 MB/s", 
formatAsTransferSpeedMBs(Long.MIN_VALUE, TimeUnit.SECONDS.toMillis(1)));
+        assertEquals("0.00 MB/s", formatAsTransferSpeedMBs(1_000_000, 
Long.MAX_VALUE));
+        assertEquals("-0.00 MB/s", formatAsTransferSpeedMBs(1_000_000, 
Long.MIN_VALUE));
+    }
+
+    @Test
+    public void testToPercentage() {
+        assertEquals(0, toPercentage (0, 100));
+        assertEquals(1, toPercentage(1, 100));
+        assertEquals(0, toPercentage(1, 1000));
+        assertEquals(0, toPercentage(1, 10_000));
+        assertEquals(-1, toPercentage(1, 0));
+        assertEquals(100, toPercentage(100, 100));
+        assertEquals(120, toPercentage(120, 100));
+        assertEquals(314, toPercentage(355, 113));
+    }
+}
\ No newline at end of file

Reply via email to