This is an automated email from the ASF dual-hosted git repository.
baedke pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8f0bedf8a4 OAK-10356 - Adjust lower and upper bounds of auto-detected
memory limits in PipelinedStrategy (#1032)
8f0bedf8a4 is described below
commit 8f0bedf8a42bb7d7119495d8afe5f9422693ed6c
Author: Nuno Santos <[email protected]>
AuthorDate: Wed Jul 19 09:55:55 2023 +0200
OAK-10356 - Adjust lower and upper bounds of auto-detected memory limits in
PipelinedStrategy (#1032)
* Reduce the minimum bound and set a maximum bound for the auto-detected
working memory in the Pipelined strategy.
* Reduce default number of transform threads.
---
.../flatfile/pipelined/NodeStateEntryBatch.java | 6 ++++--
.../flatfile/pipelined/PipelinedStrategy.java | 20 +++++++++++++-------
.../flatfile/pipelined/NodeStateEntryBatchTest.java | 19 +++++++++----------
.../pipelined/PipelinedSortBatchTaskTest.java | 8 ++++----
4 files changed, 30 insertions(+), 23 deletions(-)
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java
index 6e4f508d84..5fb2ef0163 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java
@@ -22,9 +22,11 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
public class NodeStateEntryBatch {
+ // Must be large enough to hold a full node state entry
+ static final int MIN_BUFFER_SIZE = 256 * 1024;
public static NodeStateEntryBatch createNodeStateEntryBatch(int
bufferSize, int maxNumEntries) {
- if (bufferSize < 128) {
- throw new IllegalArgumentException("Buffer size must be at least
128 bytes");
+ if (bufferSize < MIN_BUFFER_SIZE) {
+ throw new IllegalArgumentException("Buffer size must be at least "
+ MIN_BUFFER_SIZE + " bytes");
}
if (maxNumEntries < 1) {
throw new IllegalArgumentException("Max number of entries must be
at least 1");
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
index 56c8f93b9e..bdd94d1f9e 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
@@ -18,12 +18,12 @@
*/
package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.mongodb.BasicDBObject;
import com.mongodb.client.MongoCollection;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.guava.common.base.Preconditions;
import org.apache.jackrabbit.guava.common.base.Stopwatch;
+import org.apache.jackrabbit.guava.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.jackrabbit.oak.commons.Compression;
import
org.apache.jackrabbit.oak.index.indexer.document.flatfile.NodeStateEntryWriter;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.SortStrategy;
@@ -116,7 +116,7 @@ public class PipelinedStrategy implements SortStrategy {
public static final String OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_SIZE =
"oak.indexer.pipelined.mongoDocBatchSize";
public static final int DEFAULT_OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_SIZE
= 500;
public static final String OAK_INDEXER_PIPELINED_TRANSFORM_THREADS =
"oak.indexer.pipelined.transformThreads";
- public static final int DEFAULT_OAK_INDEXER_PIPELINED_TRANSFORM_THREADS =
3;
+ public static final int DEFAULT_OAK_INDEXER_PIPELINED_TRANSFORM_THREADS =
2;
public static final String OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB =
"oak.indexer.pipelined.workingMemoryMB";
// 0 means autodetect
public static final int DEFAULT_OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB =
0;
@@ -127,8 +127,10 @@ public class PipelinedStrategy implements SortStrategy {
static final Charset FLATFILESTORE_CHARSET = StandardCharsets.UTF_8;
private static final Logger LOG =
LoggerFactory.getLogger(PipelinedStrategy.class);
- private static final int MIN_ENTRY_BATCH_BUFFER_SIZE_MB = 64;
- private static final int MIN_WORKING_MEMORY_MB = 512;
+ // A MongoDB document is at most 16MB, so the buffer that holds node state
entries must be at least that big
+ private static final int MIN_ENTRY_BATCH_BUFFER_SIZE_MB = 16;
+ private static final int MIN_AUTODETECT_WORKING_MEMORY_MB = 128;
+ private static final int MAX_AUTODETECT_WORKING_MEMORY_MB = 4000;
private class MonitorTask implements Runnable {
private final ArrayBlockingQueue<BasicDBObject[]> mongoDocQueue;
@@ -219,9 +221,13 @@ public class PipelinedStrategy implements SortStrategy {
int maxHeapSizeMB = (int) (Runtime.getRuntime().maxMemory() /
FileUtils.ONE_MB);
int workingMemoryMB = maxHeapSizeMB - 2048;
LOG.info("Auto detecting working memory. Maximum heap size: {} MB,
selected working memory: {} MB", maxHeapSizeMB, workingMemoryMB);
- if (workingMemoryMB < MIN_WORKING_MEMORY_MB) {
- LOG.warn("Working memory too low. Setting to minimum: {} MB",
MIN_WORKING_MEMORY_MB);
- workingMemoryMB = MIN_WORKING_MEMORY_MB;
+ if (workingMemoryMB > MAX_AUTODETECT_WORKING_MEMORY_MB) {
+ LOG.warn("Auto-detected value for working memory too high, setting
to the maximum allowed for auto-detection: {} MB",
MAX_AUTODETECT_WORKING_MEMORY_MB);
+ return MAX_AUTODETECT_WORKING_MEMORY_MB;
+ }
+ if (workingMemoryMB < MIN_AUTODETECT_WORKING_MEMORY_MB) {
+ LOG.warn("Auto-detected value for working memory too low, setting
to the minimum allowed for auto-detection: {} MB",
MIN_AUTODETECT_WORKING_MEMORY_MB);
+ return MIN_AUTODETECT_WORKING_MEMORY_MB;
}
return workingMemoryMB;
}
diff --git
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java
index beb2fc0ac9..32cae0a0e7 100644
---
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java
+++
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java
@@ -33,7 +33,7 @@ public class NodeStateEntryBatchTest {
@Test
public void testMaximumNumberOfEntries() {
- NodeStateEntryBatch batch =
NodeStateEntryBatch.createNodeStateEntryBatch(1024, 2);
+ NodeStateEntryBatch batch =
NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE,
2);
assertFalse(batch.isAtMaxEntries());
batch.addEntry("a", new byte[1]);
assertFalse(batch.isAtMaxEntries());
@@ -45,12 +45,12 @@ public class NodeStateEntryBatchTest {
@Test
public void testMaximumBufferSize() {
- NodeStateEntryBatch batch =
NodeStateEntryBatch.createNodeStateEntryBatch(128, 10);
- assertTrue(batch.hasSpaceForEntry(new byte[124])); // Needs 4 bytes
for the length
- assertFalse(batch.hasSpaceForEntry(new byte[125]));
+ NodeStateEntryBatch batch =
NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE,
10);
+ assertTrue(batch.hasSpaceForEntry(new
byte[NodeStateEntryBatch.MIN_BUFFER_SIZE -4])); // Needs 4 bytes for the length
+ assertFalse(batch.hasSpaceForEntry(new
byte[NodeStateEntryBatch.MIN_BUFFER_SIZE]));
- batch.addEntry("a", new byte[124]);
- assertEquals(124 + 4, batch.sizeOfEntries());
+ batch.addEntry("a", new byte[NodeStateEntryBatch.MIN_BUFFER_SIZE -4]);
+ assertEquals(NodeStateEntryBatch.MIN_BUFFER_SIZE,
batch.sizeOfEntries());
assertEquals(1, batch.numberOfEntries());
assertFalse(batch.hasSpaceForEntry(new byte[1]));
assertThrows(BufferOverflowException.class, () -> batch.addEntry("b",
new byte[1]));
@@ -58,9 +58,8 @@ public class NodeStateEntryBatchTest {
@Test
public void flipAndResetBuffer() {
- int sizeOfEntry = 124;
- int bufferSize = 1024;
- NodeStateEntryBatch batch =
NodeStateEntryBatch.createNodeStateEntryBatch(bufferSize, 10);
+ int sizeOfEntry = NodeStateEntryBatch.MIN_BUFFER_SIZE-4;
+ NodeStateEntryBatch batch =
NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE,
10);
byte[] testArray = new byte[sizeOfEntry];
for (int i = 0; i < sizeOfEntry; i++) {
testArray[i] = (byte) (i % 127);
@@ -83,6 +82,6 @@ public class NodeStateEntryBatchTest {
assertEquals(0, batch.numberOfEntries());
assertEquals(0, batch.getBuffer().position());
- assertEquals(bufferSize, batch.getBuffer().remaining());
+ assertEquals(NodeStateEntryBatch.MIN_BUFFER_SIZE,
batch.getBuffer().remaining());
}
}
\ No newline at end of file
diff --git
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
index 381d067838..7665929e28 100644
---
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
+++
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
@@ -79,7 +79,7 @@ public class PipelinedSortBatchTaskTest {
@Test
public void emptyBatch() throws Exception {
- NodeStateEntryBatch batch =
NodeStateEntryBatch.createNodeStateEntryBatch(1024, 10);
+ NodeStateEntryBatch batch =
NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE,
10);
TestResult testResult = runTest(batch);
@@ -91,7 +91,7 @@ public class PipelinedSortBatchTaskTest {
@Test
public void oneBatch() throws Exception {
- NodeStateEntryBatch batch =
NodeStateEntryBatch.createNodeStateEntryBatch(1024, 10);
+ NodeStateEntryBatch batch =
NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE,
10);
addEntry(batch, "/a0/b0", "{\"key\":2}");
addEntry(batch, "/a0", "{\"key\":1}");
addEntry(batch, "/a1/b0", "{\"key\":6}");
@@ -120,12 +120,12 @@ public class PipelinedSortBatchTaskTest {
@Test
public void twoBatches() throws Exception {
- NodeStateEntryBatch batch1 =
NodeStateEntryBatch.createNodeStateEntryBatch(1024, 10);
+ NodeStateEntryBatch batch1 =
NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE,
10);
addEntry(batch1, "/a0/b0", "{\"key\":2}");
addEntry(batch1, "/a0", "{\"key\":1}");
addEntry(batch1, "/a1/b0", "{\"key\":6}");
- NodeStateEntryBatch batch2 =
NodeStateEntryBatch.createNodeStateEntryBatch(1024, 10);
+ NodeStateEntryBatch batch2 =
NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE,
10);
addEntry(batch2, "/a0/b1", "{\"key\":5}");
addEntry(batch2, "/a0/b0/c1", "{\"key\":4}");
addEntry(batch2, "/a0/b0/c0", "{\"key\":3}");