This is an automated email from the ASF dual-hosted git repository.
fortino pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new d7edc02a09 OAK-10538 - Pipeline strategy: eliminate unnecessary
intermediate copy of entries in transform stage (#1194)
d7edc02a09 is described below
commit d7edc02a092c94141ee0db585d847ec07c9aa25a
Author: Nuno Santos <[email protected]>
AuthorDate: Fri Nov 10 09:16:34 2023 +0100
OAK-10538 - Pipeline strategy: eliminate unnecessary intermediate copy of
entries in transform stage (#1194)
* Remove intermediate buffer used to serialize entries in transform thread.
* Better variable name.
* Minor reformatting to trigger a new run of the CI.
* Minor reformatting to trigger a new run of the CI.
* Minor reformatting to trigger a new run of the CI.
---
.../document/flatfile/NodeStateEntryWriter.java | 27 +++++---------
.../flatfile/pipelined/NodeStateEntryBatch.java | 41 ++++++++++++++++------
.../flatfile/pipelined/PipelinedTransformTask.java | 24 ++++++-------
.../pipelined/NodeStateEntryBatchTest.java | 34 +++++++++++-------
.../pipelined/PipelinedSortBatchTaskTest.java | 16 ++-------
5 files changed, 73 insertions(+), 69 deletions(-)
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntryWriter.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntryWriter.java
index d5bfe31bea..48cfbcc49c 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntryWriter.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntryWriter.java
@@ -28,12 +28,9 @@ import
org.apache.jackrabbit.oak.plugins.blob.serializer.BlobIdSerializer;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.state.NodeState;
-import java.io.IOException;
-import java.io.Writer;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
import static org.apache.jackrabbit.guava.common.base.Preconditions.checkState;
@@ -67,16 +64,6 @@ public class NodeStateEntryWriter {
return path + DELIMITER + nodeStateAsJson;
}
- public void writeTo(Writer writer, NodeStateEntry nse) throws IOException {
- writeTo(writer, nse.getPath(), asJson(nse.getNodeState()));
- }
-
- public void writeTo(Writer writer, String path, String value) throws
IOException {
- writer.write(path);
- writer.write(DELIMITER);
- writer.write(value);
- }
-
public String toString(List<String> pathElements, String nodeStateAsJson) {
int pathStringSize =
pathElements.stream().mapToInt(String::length).sum();
StringBuilder sb = new StringBuilder(nodeStateAsJson.length() +
pathStringSize + pathElements.size() + 1);
@@ -90,18 +77,20 @@ public class NodeStateEntryWriter {
if (SORTED_PROPERTIES) {
return asSortedJson(nodeState);
}
- return
asJson(StreamSupport.stream(nodeState.getProperties().spliterator(), false));
+ return asJson(nodeState.getProperties());
}
String asSortedJson(NodeState nodeState) {
- return
asJson(StreamSupport.stream(nodeState.getProperties().spliterator(), false)
- .sorted(Comparator.comparing(PropertyState::getName)));
+ List<PropertyState> properties = new ArrayList<>();
+ nodeState.getProperties().forEach(properties::add);
+ properties.sort(Comparator.comparing(PropertyState::getName));
+ return asJson(properties);
}
- private String asJson(Stream<? extends PropertyState> stream) {
+ private String asJson(Iterable<? extends PropertyState> properties) {
jw.resetWriter();
jw.object();
- stream.forEach(ps -> {
+ properties.forEach(ps -> {
String name = ps.getName();
if (include(name)) {
jw.key(name);
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java
index 2c68f1e2e0..7acf8feeb6 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java
@@ -18,12 +18,27 @@
*/
package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
+import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
public class NodeStateEntryBatch {
+
+ public static class BufferFullException extends RuntimeException {
+ public BufferFullException(String message) {
+ super(message);
+ }
+
+ public BufferFullException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ public static final byte DELIMITER = '|';
// Must be large enough to hold a full node state entry
static final int MIN_BUFFER_SIZE = 256 * 1024;
+
public static NodeStateEntryBatch createNodeStateEntryBatch(int
bufferSizeBytes, int maxNumEntries) {
if (bufferSizeBytes < MIN_BUFFER_SIZE) {
throw new IllegalArgumentException("Buffer size must be at least "
+ MIN_BUFFER_SIZE + " bytes");
@@ -53,15 +68,25 @@ public class NodeStateEntryBatch {
return buffer;
}
- public void addEntry(String path, byte[] entryData) {
+ public int addEntry(String path, byte[] entryData) throws
BufferFullException {
if (numberOfEntries() == maxEntries) {
- throw new IllegalStateException("Sort buffer size exceeded max
entries: " + sortBuffer.size() + " > " + maxEntries);
+ throw new BufferFullException("Sort buffer size is full, reached
max entries: " + sortBuffer.size());
}
int bufferPos = buffer.position();
- buffer.putInt(entryData.length);
- buffer.put(entryData);
- String[] key = SortKey.genSortKeyPathElements(path);
- sortBuffer.add(new SortKey(key, bufferPos));
+ byte[] pathBytes = path.getBytes(StandardCharsets.UTF_8);
+ int entrySize = pathBytes.length + 1 + entryData.length;
+ try {
+ buffer.putInt(entrySize);
+ buffer.put(pathBytes);
+ buffer.put(DELIMITER);
+ buffer.put(entryData);
+ String[] key = SortKey.genSortKeyPathElements(path);
+ sortBuffer.add(new SortKey(key, bufferPos));
+ return entrySize;
+ } catch (BufferOverflowException e) {
+ buffer.position(bufferPos);
+ throw new BufferFullException("Buffer full while adding entry: " +
path + " size: " + entrySize, e);
+ }
}
public boolean isAtMaxEntries() {
@@ -71,10 +96,6 @@ public class NodeStateEntryBatch {
return sortBuffer.size() == maxEntries;
}
- public boolean hasSpaceForEntry(byte[] entryData) {
- return !isAtMaxEntries() && entryData.length + 4 <= buffer.remaining();
- }
-
public void flip() {
buffer.flip();
}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
index 782a1d956b..ee98999da6 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
@@ -36,8 +36,7 @@ import org.apache.jackrabbit.oak.spi.state.NodeStateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayOutputStream;
-import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Locale;
import java.util.concurrent.ArrayBlockingQueue;
@@ -124,10 +123,6 @@ class PipelinedTransformTask implements
Callable<PipelinedTransformTask.Result>
long mongoObjectsProcessed = 0;
LOG.debug("Waiting for an empty buffer");
NodeStateEntryBatch nseBatch = emptyBatchesQueue.take();
-
- // Used to serialize a node state entry before writing it to the
buffer
- ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
- OutputStreamWriter writer = new OutputStreamWriter(baos,
PipelinedStrategy.FLATFILESTORE_CHARSET);
LOG.debug("Obtained an empty buffer. Starting to convert Mongo
documents to node state entries");
ArrayList<NodeStateEntry> nodeStateEntries = new ArrayList<>();
@@ -181,21 +176,22 @@ class PipelinedTransformTask implements
Callable<PipelinedTransformTask.Result>
statistics.incrementEntriesAccepted();
totalEntryCount++;
// Serialize entry
- entryWriter.writeTo(writer, nse);
- writer.flush();
- byte[] entryData = baos.toByteArray();
- baos.reset();
-
statistics.incrementTotalExtractedEntriesSize(entryData.length);
- if
(!nseBatch.hasSpaceForEntry(entryData)) {
+ byte[] jsonBytes =
entryWriter.asJson(nse.getNodeState()).getBytes(StandardCharsets.UTF_8);
+ int entrySize;
+ try {
+ entrySize =
nseBatch.addEntry(path, jsonBytes);
+ } catch
(NodeStateEntryBatch.BufferFullException e) {
LOG.info("Buffer full, passing
buffer to sort task. Total entries: {}, entries in buffer {}, buffer size: {}",
totalEntryCount,
nseBatch.numberOfEntries(), nseBatch.sizeOfEntries());
nseBatch.flip();
tryEnqueue(nseBatch);
// Get an empty buffer
nseBatch =
emptyBatchesQueue.take();
+ // Now it must fit, otherwise it
means that the buffer is smaller than a single
+ // entry, which is an error.
+ entrySize =
nseBatch.addEntry(path, jsonBytes);
}
- // Write entry to buffer
- nseBatch.addEntry(nse.getPath(),
entryData);
+
statistics.incrementTotalExtractedEntriesSize(entrySize);
} else {
statistics.incrementEntriesRejected();
if (NodeStateUtils.isHiddenPath(path))
{
diff --git
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java
index 32cae0a0e7..8051f2c3f3 100644
---
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java
+++
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java
@@ -20,8 +20,10 @@ package
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
import org.junit.Test;
-import java.nio.BufferOverflowException;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -40,31 +42,37 @@ public class NodeStateEntryBatchTest {
batch.addEntry("b", new byte[1]);
assertEquals(2, batch.numberOfEntries());
assertTrue(batch.isAtMaxEntries());
- assertThrows(IllegalStateException.class, () -> batch.addEntry("c",
new byte[1]));
+ assertThrows(NodeStateEntryBatch.BufferFullException.class, () ->
batch.addEntry("c", new byte[1]));
}
@Test
public void testMaximumBufferSize() {
NodeStateEntryBatch batch =
NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE,
10);
- assertTrue(batch.hasSpaceForEntry(new
byte[NodeStateEntryBatch.MIN_BUFFER_SIZE -4])); // Needs 4 bytes for the length
- assertFalse(batch.hasSpaceForEntry(new
byte[NodeStateEntryBatch.MIN_BUFFER_SIZE]));
+ batch.addEntry("a", new byte[NodeStateEntryBatch.MIN_BUFFER_SIZE - 4 -
2]); // Needs 4 bytes for the length and 2 for the key
+ assertThrows(NodeStateEntryBatch.BufferFullException.class, () ->
batch.addEntry("b", new byte[NodeStateEntryBatch.MIN_BUFFER_SIZE]));
- batch.addEntry("a", new byte[NodeStateEntryBatch.MIN_BUFFER_SIZE -4]);
assertEquals(NodeStateEntryBatch.MIN_BUFFER_SIZE,
batch.sizeOfEntries());
assertEquals(1, batch.numberOfEntries());
- assertFalse(batch.hasSpaceForEntry(new byte[1]));
- assertThrows(BufferOverflowException.class, () -> batch.addEntry("b",
new byte[1]));
+ assertThrows(NodeStateEntryBatch.BufferFullException.class, () ->
batch.addEntry("b", new byte[1]));
}
@Test
- public void flipAndResetBuffer() {
+ public void flipAndResetBuffer() throws IOException {
int sizeOfEntry = NodeStateEntryBatch.MIN_BUFFER_SIZE-4;
+ String key = "a";
+ int jsonNodeLength = sizeOfEntry-2; // minus key and pipe characters
+
NodeStateEntryBatch batch =
NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE,
10);
- byte[] testArray = new byte[sizeOfEntry];
- for (int i = 0; i < sizeOfEntry; i++) {
- testArray[i] = (byte) (i % 127);
+ byte[] jsonNodeBytes = new byte[jsonNodeLength];
+ for (int i = 0; i < jsonNodeLength; i++) {
+ jsonNodeBytes[i] = (byte) (i % 127);
}
- batch.addEntry("a", testArray);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ baos.write("a|".getBytes(StandardCharsets.UTF_8));
+ baos.write(jsonNodeBytes);
+ byte[] expectedContents = baos.toByteArray();
+ int size = batch.addEntry(key, jsonNodeBytes);
+ assertEquals(size, sizeOfEntry);
assertEquals(batch.getBuffer().position(), sizeOfEntry + 4);
batch.flip();
@@ -76,7 +84,7 @@ public class NodeStateEntryBatchTest {
byte[] entryData = new byte[sizeOfEntry];
buffer.get(entryData);
assertEquals(buffer.position(), sizeOfEntry + 4);
- assertArrayEquals(testArray, entryData);
+ assertArrayEquals(expectedContents, entryData);
batch.reset();
diff --git
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
index 892b7a3c10..70cf463ffa 100644
---
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
+++
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
@@ -19,18 +19,13 @@
package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
import org.apache.jackrabbit.oak.commons.Compression;
-import
org.apache.jackrabbit.oak.index.indexer.document.flatfile.NodeStateEntryWriter;
-import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Set;
@@ -65,7 +60,6 @@ public class PipelinedSortBatchTaskTest {
private final PathElementComparator pathComparator = new
PathElementComparator(Set.of());
private final Compression algorithm = Compression.NONE;
- private final NodeStateEntryWriter nodeStateEntryWriter = new
NodeStateEntryWriter(new MemoryBlobStore());
@Test
public void noBatch() throws Exception {
@@ -154,12 +148,8 @@ public class PipelinedSortBatchTaskTest {
);
}
- private void addEntry(NodeStateEntryBatch batch, String path, String
entry) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- Writer writer = new OutputStreamWriter(baos);
- nodeStateEntryWriter.writeTo(writer, path, entry);
- writer.close();
- batch.addEntry(path, baos.toByteArray());
+ private void addEntry(NodeStateEntryBatch batch, String path, String
entry) {
+ batch.addEntry(path, entry.getBytes(StandardCharsets.UTF_8));
}
private TestResult runTest(NodeStateEntryBatch... nodeStateEntryBatches)
throws Exception {