This is an automated email from the ASF dual-hosted git repository.

fortino pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d7edc02a09 OAK-10538 - Pipeline strategy: eliminate unnecessary 
intermediate copy of entries in transform stage (#1194)
d7edc02a09 is described below

commit d7edc02a092c94141ee0db585d847ec07c9aa25a
Author: Nuno Santos <[email protected]>
AuthorDate: Fri Nov 10 09:16:34 2023 +0100

    OAK-10538 - Pipeline strategy: eliminate unnecessary intermediate copy of 
entries in transform stage (#1194)
    
    * Remove intermediate buffer used to serialize entries in transform thread.
    
    * Better variable name.
    
    * Minor reformatting to trigger a new run of the CI.
    
    * Minor reformatting to trigger a new run of the CI.
    
    * Minor reformatting to trigger a new run of the CI.
---
 .../document/flatfile/NodeStateEntryWriter.java    | 27 +++++---------
 .../flatfile/pipelined/NodeStateEntryBatch.java    | 41 ++++++++++++++++------
 .../flatfile/pipelined/PipelinedTransformTask.java | 24 ++++++-------
 .../pipelined/NodeStateEntryBatchTest.java         | 34 +++++++++++-------
 .../pipelined/PipelinedSortBatchTaskTest.java      | 16 ++-------
 5 files changed, 73 insertions(+), 69 deletions(-)

diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntryWriter.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntryWriter.java
index d5bfe31bea..48cfbcc49c 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntryWriter.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntryWriter.java
@@ -28,12 +28,9 @@ import 
org.apache.jackrabbit.oak.plugins.blob.serializer.BlobIdSerializer;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 
-import java.io.IOException;
-import java.io.Writer;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
 
 import static org.apache.jackrabbit.guava.common.base.Preconditions.checkState;
 
@@ -67,16 +64,6 @@ public class NodeStateEntryWriter {
         return path + DELIMITER + nodeStateAsJson;
     }
 
-    public void writeTo(Writer writer, NodeStateEntry nse) throws IOException {
-        writeTo(writer, nse.getPath(), asJson(nse.getNodeState()));
-    }
-
-    public void writeTo(Writer writer, String path, String value) throws 
IOException {
-        writer.write(path);
-        writer.write(DELIMITER);
-        writer.write(value);
-    }
-
     public String toString(List<String> pathElements, String nodeStateAsJson) {
         int pathStringSize = 
pathElements.stream().mapToInt(String::length).sum();
         StringBuilder sb = new StringBuilder(nodeStateAsJson.length() + 
pathStringSize + pathElements.size() + 1);
@@ -90,18 +77,20 @@ public class NodeStateEntryWriter {
         if (SORTED_PROPERTIES) {
             return asSortedJson(nodeState);
         }
-        return 
asJson(StreamSupport.stream(nodeState.getProperties().spliterator(), false));
+        return asJson(nodeState.getProperties());
     }
 
     String asSortedJson(NodeState nodeState) {
-        return 
asJson(StreamSupport.stream(nodeState.getProperties().spliterator(), false)
-                .sorted(Comparator.comparing(PropertyState::getName)));
+        List<PropertyState> properties = new ArrayList<>();
+        nodeState.getProperties().forEach(properties::add);
+        properties.sort(Comparator.comparing(PropertyState::getName));
+        return asJson(properties);
     }
 
-    private String asJson(Stream<? extends PropertyState> stream) {
+    private String asJson(Iterable<? extends PropertyState> properties) {
         jw.resetWriter();
         jw.object();
-        stream.forEach(ps -> {
+        properties.forEach(ps -> {
             String name = ps.getName();
             if (include(name)) {
                 jw.key(name);
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java
index 2c68f1e2e0..7acf8feeb6 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java
@@ -18,12 +18,27 @@
  */
 package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
 
+import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 
 public class NodeStateEntryBatch {
+
+    public static class BufferFullException extends RuntimeException {
+        public BufferFullException(String message) {
+            super(message);
+        }
+
+        public BufferFullException(String message, Throwable cause) {
+            super(message, cause);
+        }
+    }
+
+    public static final byte DELIMITER = '|';
     // Must be large enough to hold a full node state entry
     static final int MIN_BUFFER_SIZE = 256 * 1024;
+
     public static NodeStateEntryBatch createNodeStateEntryBatch(int 
bufferSizeBytes, int maxNumEntries) {
         if (bufferSizeBytes < MIN_BUFFER_SIZE) {
             throw new IllegalArgumentException("Buffer size must be at least " 
+ MIN_BUFFER_SIZE + " bytes");
@@ -53,15 +68,25 @@ public class NodeStateEntryBatch {
         return buffer;
     }
 
-    public void addEntry(String path, byte[] entryData) {
+    public int addEntry(String path, byte[] entryData) throws 
BufferFullException {
         if (numberOfEntries() == maxEntries) {
-            throw new IllegalStateException("Sort buffer size exceeded max 
entries: " + sortBuffer.size() + " > " + maxEntries);
+            throw new BufferFullException("Sort buffer size is full, reached 
max entries: " + sortBuffer.size());
         }
         int bufferPos = buffer.position();
-        buffer.putInt(entryData.length);
-        buffer.put(entryData);
-        String[] key = SortKey.genSortKeyPathElements(path);
-        sortBuffer.add(new SortKey(key, bufferPos));
+        byte[] pathBytes = path.getBytes(StandardCharsets.UTF_8);
+        int entrySize = pathBytes.length + 1 + entryData.length;
+        try {
+            buffer.putInt(entrySize);
+            buffer.put(pathBytes);
+            buffer.put(DELIMITER);
+            buffer.put(entryData);
+            String[] key = SortKey.genSortKeyPathElements(path);
+            sortBuffer.add(new SortKey(key, bufferPos));
+            return entrySize;
+        } catch (BufferOverflowException e) {
+            buffer.position(bufferPos);
+            throw new BufferFullException("Buffer full while adding entry: " + 
path + " size: " + entrySize, e);
+        }
     }
 
     public boolean isAtMaxEntries() {
@@ -71,10 +96,6 @@ public class NodeStateEntryBatch {
         return sortBuffer.size() == maxEntries;
     }
 
-    public boolean hasSpaceForEntry(byte[] entryData) {
-        return !isAtMaxEntries() && entryData.length + 4 <= buffer.remaining();
-    }
-
     public void flip() {
         buffer.flip();
     }
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
index 782a1d956b..ee98999da6 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
@@ -36,8 +36,7 @@ import org.apache.jackrabbit.oak.spi.state.NodeStateUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayOutputStream;
-import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Locale;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -124,10 +123,6 @@ class PipelinedTransformTask implements 
Callable<PipelinedTransformTask.Result>
             long mongoObjectsProcessed = 0;
             LOG.debug("Waiting for an empty buffer");
             NodeStateEntryBatch nseBatch = emptyBatchesQueue.take();
-
-            // Used to serialize a node state entry before writing it to the 
buffer
-            ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
-            OutputStreamWriter writer = new OutputStreamWriter(baos, 
PipelinedStrategy.FLATFILESTORE_CHARSET);
             LOG.debug("Obtained an empty buffer. Starting to convert Mongo 
documents to node state entries");
 
             ArrayList<NodeStateEntry> nodeStateEntries = new ArrayList<>();
@@ -181,21 +176,22 @@ class PipelinedTransformTask implements 
Callable<PipelinedTransformTask.Result>
                                         statistics.incrementEntriesAccepted();
                                         totalEntryCount++;
                                         // Serialize entry
-                                        entryWriter.writeTo(writer, nse);
-                                        writer.flush();
-                                        byte[] entryData = baos.toByteArray();
-                                        baos.reset();
-                                        
statistics.incrementTotalExtractedEntriesSize(entryData.length);
-                                        if 
(!nseBatch.hasSpaceForEntry(entryData)) {
+                                        byte[] jsonBytes = 
entryWriter.asJson(nse.getNodeState()).getBytes(StandardCharsets.UTF_8);
+                                        int entrySize;
+                                        try {
+                                            entrySize = 
nseBatch.addEntry(path, jsonBytes);
+                                        } catch 
(NodeStateEntryBatch.BufferFullException e) {
                                             LOG.info("Buffer full, passing 
buffer to sort task. Total entries: {}, entries in buffer {}, buffer size: {}",
                                                     totalEntryCount, 
nseBatch.numberOfEntries(), nseBatch.sizeOfEntries());
                                             nseBatch.flip();
                                             tryEnqueue(nseBatch);
                                             // Get an empty buffer
                                             nseBatch = 
emptyBatchesQueue.take();
+                                            // Now it must fit, otherwise it 
means that the buffer is smaller than a single
+                                            // entry, which is an error.
+                                            entrySize = 
nseBatch.addEntry(path, jsonBytes);
                                         }
-                                        // Write entry to buffer
-                                        nseBatch.addEntry(nse.getPath(), 
entryData);
+                                        
statistics.incrementTotalExtractedEntriesSize(entrySize);
                                     } else {
                                         statistics.incrementEntriesRejected();
                                         if (NodeStateUtils.isHiddenPath(path)) 
{
diff --git 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java
 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java
index 32cae0a0e7..8051f2c3f3 100644
--- 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java
+++ 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java
@@ -20,8 +20,10 @@ package 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
 
 import org.junit.Test;
 
-import java.nio.BufferOverflowException;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -40,31 +42,37 @@ public class NodeStateEntryBatchTest {
         batch.addEntry("b", new byte[1]);
         assertEquals(2, batch.numberOfEntries());
         assertTrue(batch.isAtMaxEntries());
-        assertThrows(IllegalStateException.class, () -> batch.addEntry("c", 
new byte[1]));
+        assertThrows(NodeStateEntryBatch.BufferFullException.class, () -> 
batch.addEntry("c", new byte[1]));
     }
 
     @Test
     public void testMaximumBufferSize() {
         NodeStateEntryBatch batch = 
NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE,
 10);
-        assertTrue(batch.hasSpaceForEntry(new 
byte[NodeStateEntryBatch.MIN_BUFFER_SIZE -4])); // Needs 4 bytes for the length
-        assertFalse(batch.hasSpaceForEntry(new 
byte[NodeStateEntryBatch.MIN_BUFFER_SIZE]));
+        batch.addEntry("a", new byte[NodeStateEntryBatch.MIN_BUFFER_SIZE - 4 - 
2]); // Needs 4 bytes for the length and 2 for the key
+        assertThrows(NodeStateEntryBatch.BufferFullException.class, () -> 
batch.addEntry("b", new byte[NodeStateEntryBatch.MIN_BUFFER_SIZE]));
 
-        batch.addEntry("a", new byte[NodeStateEntryBatch.MIN_BUFFER_SIZE -4]);
         assertEquals(NodeStateEntryBatch.MIN_BUFFER_SIZE, 
batch.sizeOfEntries());
         assertEquals(1, batch.numberOfEntries());
-        assertFalse(batch.hasSpaceForEntry(new byte[1]));
-        assertThrows(BufferOverflowException.class, () -> batch.addEntry("b", 
new byte[1]));
+        assertThrows(NodeStateEntryBatch.BufferFullException.class, () -> 
batch.addEntry("b", new byte[1]));
     }
 
     @Test
-    public void flipAndResetBuffer() {
+    public void flipAndResetBuffer() throws IOException {
         int sizeOfEntry = NodeStateEntryBatch.MIN_BUFFER_SIZE-4;
+        String key = "a";
+        int jsonNodeLength = sizeOfEntry-2; // minus key and pipe characters
+
         NodeStateEntryBatch batch = 
NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE,
 10);
-        byte[] testArray = new byte[sizeOfEntry];
-        for (int i = 0; i < sizeOfEntry; i++) {
-            testArray[i] = (byte) (i % 127);
+        byte[] jsonNodeBytes = new byte[jsonNodeLength];
+        for (int i = 0; i < jsonNodeLength; i++) {
+            jsonNodeBytes[i] = (byte) (i % 127);
         }
-        batch.addEntry("a", testArray);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        baos.write("a|".getBytes(StandardCharsets.UTF_8));
+        baos.write(jsonNodeBytes);
+        byte[] expectedContents = baos.toByteArray();
+        int size = batch.addEntry(key, jsonNodeBytes);
+        assertEquals(size, sizeOfEntry);
         assertEquals(batch.getBuffer().position(), sizeOfEntry + 4);
 
         batch.flip();
@@ -76,7 +84,7 @@ public class NodeStateEntryBatchTest {
         byte[] entryData = new byte[sizeOfEntry];
         buffer.get(entryData);
         assertEquals(buffer.position(), sizeOfEntry + 4);
-        assertArrayEquals(testArray, entryData);
+        assertArrayEquals(expectedContents, entryData);
 
         batch.reset();
 
diff --git 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
index 892b7a3c10..70cf463ffa 100644
--- 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
+++ 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
@@ -19,18 +19,13 @@
 package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
 
 import org.apache.jackrabbit.oak.commons.Compression;
-import 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.NodeStateEntryWriter;
-import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Set;
@@ -65,7 +60,6 @@ public class PipelinedSortBatchTaskTest {
 
     private final PathElementComparator pathComparator = new 
PathElementComparator(Set.of());
     private final Compression algorithm = Compression.NONE;
-    private final NodeStateEntryWriter nodeStateEntryWriter = new 
NodeStateEntryWriter(new MemoryBlobStore());
 
     @Test
     public void noBatch() throws Exception {
@@ -154,12 +148,8 @@ public class PipelinedSortBatchTaskTest {
         );
     }
 
-    private void addEntry(NodeStateEntryBatch batch, String path, String 
entry) throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        Writer writer = new OutputStreamWriter(baos);
-        nodeStateEntryWriter.writeTo(writer, path, entry);
-        writer.close();
-        batch.addEntry(path, baos.toByteArray());
+    private void addEntry(NodeStateEntryBatch batch, String path, String 
entry) {
+        batch.addEntry(path, entry.getBytes(StandardCharsets.UTF_8));
     }
 
     private TestResult runTest(NodeStateEntryBatch... nodeStateEntryBatches) 
throws Exception {

Reply via email to