This is an automated email from the ASF dual-hosted git repository.
nfsantos pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8494c94520 OAK-10541 - Pipelined strategy: improve memory management
of transform stage (#1202)
8494c94520 is described below
commit 8494c94520e92374d658ca697f43daca26f61478
Author: Nuno Santos <[email protected]>
AuthorDate: Fri Nov 17 13:39:19 2023 +0100
OAK-10541 - Pipelined strategy: improve memory management of transform
stage (#1202)
* Remove intermediate buffer used to serialize entries in transform thread.
* Improve how memory is reserved for the sort buffer in the sort batch
thread. Make it configurable.
* When writing to compressed files, buffer between the compression streams
and the file output stream instead of buffering on top of the compression
stream. The compression streams already do their own buffering.
* Print more metrics at the end of the sort-batch task: time creating sort
array, time sorting, time writing.
---
.../oak/commons/sort/ExternalSortByteArray.java | 8 +-
.../jackrabbit/oak/commons/sort/package-info.java | 2 +-
.../flatfile/pipelined/NodeStateEntryBatch.java | 54 ++++++-------
.../flatfile/pipelined/PipelinedMergeSortTask.java | 4 +-
.../pipelined/PipelinedMongoDownloadTask.java | 13 +--
.../flatfile/pipelined/PipelinedSortBatchTask.java | 94 +++++++++++++++++-----
.../flatfile/pipelined/PipelinedStrategy.java | 87 +++++++++++---------
.../flatfile/pipelined/PipelinedTransformTask.java | 19 +++--
.../flatfile/pipelined/PipelinedUtils.java | 58 +++++++++++++
.../pipelined/TransformStageStatistics.java | 6 +-
.../document/indexstore/IndexStoreUtils.java | 14 ++--
.../pipelined/NodeStateEntryBatchTest.java | 49 +++++------
.../pipelined/PipelinedSortBatchTaskTest.java | 4 +
.../flatfile/pipelined/PipelinedUtilsTest.java | 66 +++++++++++++++
14 files changed, 336 insertions(+), 142 deletions(-)
diff --git
a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortByteArray.java
b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortByteArray.java
index 10c3d7735e..8dd1c590d2 100644
---
a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortByteArray.java
+++
b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortByteArray.java
@@ -19,11 +19,11 @@ package org.apache.jackrabbit.oak.commons.sort;
import org.apache.jackrabbit.guava.common.base.Preconditions;
import org.apache.jackrabbit.oak.commons.Compression;
-import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -39,14 +39,14 @@ import java.util.function.Function;
public class ExternalSortByteArray {
private final static int DEFAULT_BUFFER_SIZE = 16 * 1024;
- public static <T> void mergeSortedFilesBinary(List<Path> files,
BufferedOutputStream fbw, final Comparator<T> cmp,
+ public static <T> void mergeSortedFilesBinary(List<Path> files,
OutputStream fbw, final Comparator<T> cmp,
boolean distinct,
Compression algorithm,
Function<T, byte[]>
typeToByteArray, Function<byte[], T> byteArrayToType)
throws IOException {
mergeSortedFilesBinary(files, fbw, cmp, distinct, algorithm,
typeToByteArray, byteArrayToType, DEFAULT_BUFFER_SIZE);
}
- public static <T> void mergeSortedFilesBinary(List<Path> files,
BufferedOutputStream fbw, final Comparator<T> cmp,
+ public static <T> void mergeSortedFilesBinary(List<Path> files,
OutputStream fbw, final Comparator<T> cmp,
boolean distinct,
Compression algorithm,
Function<T, byte[]>
typeToByteArray, Function<byte[], T> byteArrayToType, int readBufferSize)
throws IOException {
@@ -70,7 +70,7 @@ public class ExternalSortByteArray {
}
}
- private static <T> int mergeBinary(BufferedOutputStream fbw, final
Comparator<T> cmp, boolean distinct,
+ private static <T> int mergeBinary(OutputStream fbw, final Comparator<T>
cmp, boolean distinct,
List<BinaryFileBuffer<T>> buffers,
Function<T, byte[]> typeToByteArray)
throws IOException {
PriorityQueue<BinaryFileBuffer<T>> pq = new PriorityQueue<>(
diff --git
a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java
b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java
index 10f91cc554..7c3993f7de 100644
---
a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java
+++
b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-@Version("1.3.0")
+@Version("2.0.0")
package org.apache.jackrabbit.oak.commons.sort;
import org.osgi.annotation.versioning.Version;
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java
index 7acf8feeb6..bfddbd45e7 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java
@@ -21,7 +21,6 @@ package
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
public class NodeStateEntryBatch {
@@ -35,7 +34,6 @@ public class NodeStateEntryBatch {
}
}
- public static final byte DELIMITER = '|';
// Must be large enough to hold a full node state entry
static final int MIN_BUFFER_SIZE = 256 * 1024;
@@ -51,49 +49,46 @@ public class NodeStateEntryBatch {
}
private final ByteBuffer buffer;
- private final ArrayList<SortKey> sortBuffer;
private final int maxEntries;
+ private int numberOfEntries = 0;
+ private int sizeOfEntriesBytes = 0;
public NodeStateEntryBatch(ByteBuffer buffer, int maxEntries) {
this.buffer = buffer;
this.maxEntries = maxEntries;
- this.sortBuffer = new ArrayList<>(maxEntries);
- }
-
- public ArrayList<SortKey> getSortBuffer() {
- return sortBuffer;
- }
-
- public ByteBuffer getBuffer() {
- return buffer;
}
public int addEntry(String path, byte[] entryData) throws
BufferFullException {
- if (numberOfEntries() == maxEntries) {
- throw new BufferFullException("Sort buffer size is full, reached
max entries: " + sortBuffer.size());
+ if (numberOfEntries == maxEntries) {
+ throw new BufferFullException("Sort buffer size is full, reached
max entries: " + numberOfEntries);
}
int bufferPos = buffer.position();
byte[] pathBytes = path.getBytes(StandardCharsets.UTF_8);
- int entrySize = pathBytes.length + 1 + entryData.length;
+ int totalSize = 4 + pathBytes.length + 4 + entryData.length;
try {
- buffer.putInt(entrySize);
+ // Each nse is written as: "len(path)|path|len(json)|json"
+ buffer.putInt(pathBytes.length);
buffer.put(pathBytes);
- buffer.put(DELIMITER);
+ buffer.putInt(entryData.length);
buffer.put(entryData);
- String[] key = SortKey.genSortKeyPathElements(path);
- sortBuffer.add(new SortKey(key, bufferPos));
- return entrySize;
+ numberOfEntries++;
+ sizeOfEntriesBytes += totalSize;
+ return totalSize;
} catch (BufferOverflowException e) {
buffer.position(bufferPos);
- throw new BufferFullException("Buffer full while adding entry: " +
path + " size: " + entrySize, e);
+ throw new BufferFullException("while adding entry " + path + " of
size: " + totalSize, e);
}
}
+ public ByteBuffer getBuffer() {
+ return buffer;
+ }
+
public boolean isAtMaxEntries() {
- if (sortBuffer.size() > maxEntries) {
- throw new AssertionError("Sort buffer size exceeded max entries: "
+ sortBuffer.size() + " > " + maxEntries);
+ if (numberOfEntries > maxEntries) {
+ throw new AssertionError("Sort buffer size exceeded max entries: "
+ numberOfEntries + " > " + maxEntries);
}
- return sortBuffer.size() == maxEntries;
+ return numberOfEntries == maxEntries;
}
public void flip() {
@@ -102,18 +97,19 @@ public class NodeStateEntryBatch {
public void reset() {
buffer.clear();
- sortBuffer.clear();
+ numberOfEntries = 0;
+ sizeOfEntriesBytes = 0;
}
- public int sizeOfEntries() {
- return buffer.position();
+ public int sizeOfEntriesBytes() {
+ return sizeOfEntriesBytes;
}
public int numberOfEntries() {
- return sortBuffer.size();
+ return numberOfEntries;
}
public int capacity() {
return buffer.capacity();
}
-}
\ No newline at end of file
+}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
index 7090fcdba6..bf4083e796 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
@@ -33,8 +33,8 @@ import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -328,7 +328,7 @@ public class PipelinedMergeSortTask implements
Callable<PipelinedMergeSortTask.R
private Path sortStoreFile(List<Path> sortedFilesBatch) throws IOException
{
Path sortedFile = storeDir.resolve(getSortedStoreFileName(algorithm));
- try (BufferedOutputStream writer =
IndexStoreUtils.createOutputStream(sortedFile, algorithm)) {
+ try (OutputStream writer =
IndexStoreUtils.createOutputStream(sortedFile, algorithm)) {
Function<byte[], NodeStateHolder> byteArrayToType = new
NodeStateHolderFactory();
Function<NodeStateHolder, byte[]> typeToByteArray = holder ->
holder == null ? null : holder.getLine();
ExternalSortByteArray.mergeSortedFilesBinary(sortedFilesBatch,
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
index a9a8e863a2..6c16faf703 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
@@ -196,18 +196,19 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
} else {
downloadWithNaturalOrdering();
}
- double enqueueingDelayPercentage = (100.0 *
totalEnqueueWaitTimeMillis) / downloadStartWatch.elapsed(TimeUnit.MILLISECONDS);
+ long durationMillis =
downloadStartWatch.elapsed(TimeUnit.MILLISECONDS);
+ String enqueueingDelayPercentage =
PipelinedUtils.formatAsPercentage(totalEnqueueWaitTimeMillis, durationMillis);
String metrics = MetricsFormatter.newBuilder()
.add("duration",
FormattingUtils.formatToSeconds(downloadStartWatch))
- .add("durationSeconds",
downloadStartWatch.elapsed(TimeUnit.SECONDS))
+ .add("durationSeconds", durationMillis/1000)
.add("documentsDownloaded", documentsRead)
- .add("enqueueingDelayMs", totalEnqueueWaitTimeMillis)
- .add("enqueueingDelayPercentage", String.format("%1.2f",
enqueueingDelayPercentage))
+ .add("enqueueingDelayMillis", totalEnqueueWaitTimeMillis)
+ .add("enqueueingDelayPercentage",
enqueueingDelayPercentage)
.build();
MetricsUtils.setCounterOnce(statisticsProvider,
PipelinedMetrics.OAK_INDEXER_PIPELINED_MONGO_DOWNLOAD_ENQUEUE_DELAY_PERCENTAGE,
- Math.round(enqueueingDelayPercentage)
+ PipelinedUtils.toPercentage(totalEnqueueWaitTimeMillis,
durationMillis)
);
LOG.info("[TASK:{}:END] Metrics: {}",
THREAD_NAME.toUpperCase(Locale.ROOT), metrics);
return new Result(documentsRead);
@@ -225,7 +226,7 @@ public class PipelinedMongoDownloadTask implements
Callable<PipelinedMongoDownlo
private void reportProgress(String id) {
if (this.documentsRead % 10000 == 0) {
double rate = ((double) this.documentsRead) /
downloadStartWatch.elapsed(TimeUnit.SECONDS);
- String formattedRate = String.format("%1.2f nodes/s, %1.2f
nodes/hr", rate, rate * 3600);
+ String formattedRate = String.format(Locale.ROOT, "%1.2f nodes/s,
%1.2f nodes/hr", rate, rate * 3600);
LOG.info("Dumping from NSET Traversed #{} {} [{}] (Elapsed {})",
this.documentsRead, id, formattedRate,
FormattingUtils.formatToSeconds(downloadStartWatch));
}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
index ab5fca9bee..c53c72a955 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
@@ -21,13 +21,15 @@ package
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.oak.commons.Compression;
+import
org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreUtils;
import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -38,7 +40,7 @@ import java.util.concurrent.Callable;
import static
org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCountBin;
import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy.SENTINEL_NSE_BUFFER;
-import static
org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreUtils.createOutputStream;
+import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedUtils.formatAsPercentage;
/**
* Receives batches of node state entries, sorts then in memory, and finally
writes them to a file.
@@ -66,9 +68,12 @@ class PipelinedSortBatchTask implements
Callable<PipelinedSortBatchTask.Result>
private final BlockingQueue<NodeStateEntryBatch> nonEmptyBuffersQueue;
private final BlockingQueue<Path> sortedFilesQueue;
private final Path sortWorkDir;
- private final byte[] copyBuffer = new byte[4096];
+ private final ArrayList<SortKey> sortBuffer = new ArrayList<>(32 * 1024);
private long entriesProcessed = 0;
private long batchesProcessed = 0;
+ private long timeCreatingSortArrayMillis = 0;
+ private long timeSortingMillis = 0;
+ private long timeWritingMillis = 0;
public PipelinedSortBatchTask(Path storeDir,
PathElementComparator pathComparator,
@@ -86,6 +91,7 @@ class PipelinedSortBatchTask implements
Callable<PipelinedSortBatchTask.Result>
@Override
public Result call() throws Exception {
+ Stopwatch taskStartTime = Stopwatch.createStarted();
String originalName = Thread.currentThread().getName();
Thread.currentThread().setName(THREAD_NAME);
try {
@@ -94,9 +100,22 @@ class PipelinedSortBatchTask implements
Callable<PipelinedSortBatchTask.Result>
LOG.info("Waiting for next batch");
NodeStateEntryBatch nseBuffer = nonEmptyBuffersQueue.take();
if (nseBuffer == SENTINEL_NSE_BUFFER) {
+ long totalTimeMillis = taskStartTime.elapsed().toMillis();
+ sortBuffer.clear(); // It should be empty already
+ sortBuffer.trimToSize(); // Release the internal array
which may be very large, several millions
+ String timeCreatingSortArrayPercentage =
formatAsPercentage(timeCreatingSortArrayMillis, totalTimeMillis);
+ String timeSortingPercentage =
formatAsPercentage(timeSortingMillis, totalTimeMillis);
+ String timeWritingPercentage =
formatAsPercentage(timeWritingMillis, totalTimeMillis);
String metrics = MetricsFormatter.newBuilder()
.add("batchesProcessed", batchesProcessed)
.add("entriesProcessed", entriesProcessed)
+ .add("timeCreatingSortArrayMillis",
timeCreatingSortArrayMillis)
+ .add("timeCreatingSortArrayPercentage",
timeCreatingSortArrayPercentage)
+ .add("timeSortingMillis", timeSortingMillis)
+ .add("timeSortingPercentage",
timeSortingPercentage)
+ .add("timeWritingMillis", timeWritingMillis)
+ .add("timeWritingPercentage",
timeWritingPercentage)
+ .add("totalTimeSeconds", totalTimeMillis / 1000)
.build();
LOG.info("[TASK:{}:END] Metrics: {}",
THREAD_NAME.toUpperCase(Locale.ROOT), metrics);
return new Result(entriesProcessed);
@@ -116,48 +135,81 @@ class PipelinedSortBatchTask implements
Callable<PipelinedSortBatchTask.Result>
}
}
+ private void buildSortArray(NodeStateEntryBatch nseb) {
+ Stopwatch startTime = Stopwatch.createStarted();
+ ByteBuffer buffer = nseb.getBuffer();
+ int totalPathSize = 0;
+ while (buffer.hasRemaining()) {
+ int positionInBuffer = buffer.position();
+ // Read the next key from the buffer
+ int pathLength = buffer.getInt();
+ totalPathSize += pathLength;
+ // Create the String directly from the buffer without creating an
intermediate byte[]
+ String path = new String(buffer.array(), buffer.position(),
pathLength, StandardCharsets.UTF_8);
+ buffer.position(buffer.position() + pathLength);
+ // Skip the json
+ int entryLength = buffer.getInt();
+ buffer.position(buffer.position() + entryLength);
+ String[] pathSegments = SortKey.genSortKeyPathElements(path);
+ sortBuffer.add(new SortKey(pathSegments, positionInBuffer));
+ }
+ timeCreatingSortArrayMillis += startTime.elapsed().toMillis();
+ LOG.info("Built sort array in {}. Entries: {}, Total size of path
strings: {}",
+ startTime, sortBuffer.size(),
humanReadableByteCountBin(totalPathSize));
+ }
+
private void sortAndSaveBatch(NodeStateEntryBatch nseb) throws Exception {
- ArrayList<SortKey> sortBuffer = nseb.getSortBuffer();
ByteBuffer buffer = nseb.getBuffer();
LOG.info("Going to sort batch in memory. Entries: {}, Size: {}",
- sortBuffer.size(),
humanReadableByteCountBin(buffer.remaining()));
+ nseb.numberOfEntries(),
humanReadableByteCountBin(nseb.sizeOfEntriesBytes()));
+ sortBuffer.clear();
+ buildSortArray(nseb);
if (sortBuffer.isEmpty()) {
return;
}
Stopwatch sortClock = Stopwatch.createStarted();
sortBuffer.sort(pathComparator);
+ timeSortingMillis += sortClock.elapsed().toMillis();
LOG.info("Sorted batch in {}. Saving to disk", sortClock);
Stopwatch saveClock = Stopwatch.createStarted();
Path newtmpfile = Files.createTempFile(sortWorkDir, "sortInBatch",
"flatfile");
long textSize = 0;
batchesProcessed++;
- try (BufferedOutputStream writer = createOutputStream(newtmpfile,
algorithm)) {
+ try (OutputStream os = IndexStoreUtils.createOutputStream(newtmpfile,
algorithm)) {
for (SortKey entry : sortBuffer) {
entriesProcessed++;
// Retrieve the entry from the buffer
int posInBuffer = entry.getBufferPos();
buffer.position(posInBuffer);
- int entrySize = buffer.getInt();
-
- // Write the entry to the file without creating intermediate
byte[]
- int bytesRemaining = entrySize;
- while (bytesRemaining > 0) {
- int bytesRead = Math.min(copyBuffer.length,
bytesRemaining);
- buffer.get(copyBuffer, 0, bytesRead);
- writer.write(copyBuffer, 0, bytesRead);
- bytesRemaining -= bytesRead;
- }
- writer.write(PipelinedStrategy.FLATFILESTORE_LINE_SEPARATOR);
- textSize += entrySize + 1;
+
+ int pathSize = buffer.getInt();
+ copyField(os, buffer, pathSize);
+ os.write(PipelinedStrategy.FLATFILESTORE_DELIMITER);
+ int jsonSize = buffer.getInt();
+ copyField(os, buffer, jsonSize);
+ os.write(PipelinedStrategy.FLATFILESTORE_LINE_SEPARATOR);
+ textSize += pathSize + jsonSize + 2;
}
}
- LOG.info("Stored batch of size {} (uncompressed {}) with {} entries in
{}",
- humanReadableByteCountBin(Files.size(newtmpfile)),
+ timeWritingMillis += saveClock.elapsed().toMillis();
+ long compressedSize = Files.size(newtmpfile);
+ LOG.info("Wrote batch of size {} (uncompressed {}) with {} entries in
{} at {}",
+ humanReadableByteCountBin(compressedSize),
humanReadableByteCountBin(textSize),
- sortBuffer.size(), saveClock);
+ sortBuffer.size(), saveClock,
+ PipelinedUtils.formatAsTransferSpeedMBs(compressedSize,
saveClock.elapsed().toMillis())
+ );
+ // Free the memory taken by the entries in the buffer
+ sortBuffer.clear();
sortedFilesQueue.put(newtmpfile);
}
+ private void copyField(OutputStream writer, ByteBuffer buffer, int
fieldSize) throws IOException {
+ // Write the entry to the file without creating intermediate byte[]
+ writer.write(buffer.array(), buffer.position(), fieldSize);
+ buffer.position(buffer.position() + fieldSize);
+ }
+
private static Path createdSortWorkDir(Path storeDir) throws IOException {
Path sortedFileDir = storeDir.resolve("sort-work-dir");
FileUtils.forceMkdir(sortedFileDir.toFile());
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
index 57369d701f..14bf4e8bdb 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
@@ -131,13 +131,16 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
public static final String OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB =
"oak.indexer.pipelined.workingMemoryMB";
// 0 means autodetect
public static final int DEFAULT_OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB =
0;
+ // Between 1 and 100
+ public static final String
OAK_INDEXER_PIPELINED_SORT_BUFFER_MEMORY_PERCENTAGE =
"oak.indexer.pipelined.sortBufferMemoryPercentage";
+ public static final int
DEFAULT_OAK_INDEXER_PIPELINED_SORT_BUFFER_MEMORY_PERCENTAGE = 25;
static final NodeDocument[] SENTINEL_MONGO_DOCUMENT = new NodeDocument[0];
static final NodeStateEntryBatch SENTINEL_NSE_BUFFER = new
NodeStateEntryBatch(ByteBuffer.allocate(0), 0);
static final Path SENTINEL_SORTED_FILES_QUEUE = Paths.get("SENTINEL");
static final Charset FLATFILESTORE_CHARSET = StandardCharsets.UTF_8;
static final char FLATFILESTORE_LINE_SEPARATOR = '\n';
-
+ static final byte FLATFILESTORE_DELIMITER = '|';
private static final Logger LOG =
LoggerFactory.getLogger(PipelinedStrategy.class);
// A MongoDB document is at most 16MB, so the buffer that holds node state
entries must be at least that big
private static final int MIN_MONGO_DOC_QUEUE_RESERVED_MEMORY_MB = 16;
@@ -219,12 +222,10 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
private final int mongoDocBatchMaxSizeMB;
private final int mongoDocBatchMaxNumberOfDocuments;
private final int nseBuffersCount;
- private final int nseBufferMaxEntriesPerBuffer;
private final int nseBuffersSizeBytes;
private long nodeStateEntriesExtracted;
-
/**
* @param pathPredicate Used by the transform stage to test if a node
should be kept or discarded.
* @param pathFilters If non-empty, the download stage will use
these filters to try to create a query that downloads
@@ -270,10 +271,9 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
Preconditions.checkArgument(numberOfTransformThreads > 0,
"Invalid value for property " +
OAK_INDEXER_PIPELINED_TRANSFORM_THREADS + ": " + numberOfTransformThreads + ".
Must be > 0");
- // Derived values for transform <-> sort-save
- int nseBuffersReservedMemoryMB = readNSEBuffersReservedMemory();
-
- // Calculate values derived from the configuration settings
+ int sortBufferMemoryPercentage =
ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_SORT_BUFFER_MEMORY_PERCENTAGE,
DEFAULT_OAK_INDEXER_PIPELINED_SORT_BUFFER_MEMORY_PERCENTAGE);
+ Preconditions.checkArgument(sortBufferMemoryPercentage > 0 &&
sortBufferMemoryPercentage <= 100,
+ "Invalid value for property " +
OAK_INDEXER_PIPELINED_SORT_BUFFER_MEMORY_PERCENTAGE + ": " +
numberOfTransformThreads + ". Must be between 1 and 100");
// mongo-dump <-> transform threads
Preconditions.checkArgument(mongoDocQueueReservedMemoryMB >= 8 *
mongoDocBatchMaxSizeMB,
@@ -283,21 +283,19 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
);
this.mongoDocQueueSize = mongoDocQueueReservedMemoryMB /
mongoDocBatchMaxSizeMB;
- // Transform threads <-> merge-sort
+ // Derived values for transform <-> sort-save
+ int nseWorkingMemoryMB = readNSEBuffersReservedMemory();
this.nseBuffersCount = 1 + numberOfTransformThreads;
-
- long nseBuffersReservedMemoryBytes = nseBuffersReservedMemoryMB *
FileUtils.ONE_MB;
+ long nseWorkingMemoryBytes = (long) nseWorkingMemoryMB *
FileUtils.ONE_MB;
// The working memory is divided in the following regions:
// - #transforThreads NSE Binary buffers
- // - 1x Metadata of NSE entries in Binary buffers,
list of SortKeys
- // A ByteBuffer can be at most Integer.MAX_VALUE bytes long
- this.nseBuffersSizeBytes =
limitToIntegerRange(nseBuffersReservedMemoryBytes / (nseBuffersCount + 1));
+ // - x1 Memory reserved for the array created by the
sort-batch thread with the keys of the entries
+ // in the batch that is being sorted
+ long memoryReservedForSortKeysArray =
estimateMaxSizeOfSortKeyArray(nseWorkingMemoryBytes, nseBuffersCount,
sortBufferMemoryPercentage);
+ long memoryReservedForBuffers = nseWorkingMemoryBytes -
memoryReservedForSortKeysArray;
- // Assuming 1 instance of SortKey takes around 256 bytes. We have
#transformThreads + 1 regions of nseBufferSizeBytes.
- // The extra region is for the SortKey instances. Below we compute the
total number of SortKey instances that
- // fit in the memory region reserved for them, assuming that each
SortKey instance takes 256 bytes. Then we
- // distribute equally these available entries among the nse buffers
- this.nseBufferMaxEntriesPerBuffer = (this.nseBuffersSizeBytes / 256) /
this.nseBuffersCount;
+ // A ByteBuffer can be at most Integer.MAX_VALUE bytes long
+ this.nseBuffersSizeBytes =
limitToIntegerRange(memoryReservedForBuffers / nseBuffersCount);
if (nseBuffersSizeBytes < MIN_ENTRY_BATCH_BUFFER_SIZE_MB *
FileUtils.ONE_MB) {
throw new IllegalArgumentException("Entry batch buffer size too
small: " + nseBuffersSizeBytes +
@@ -311,11 +309,21 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
mongoDocQueueReservedMemoryMB,
mongoDocBatchMaxSizeMB,
mongoDocQueueSize);
- LOG.info("NodeStateEntryBuffers: [ workingMemory: {} MB,
numberOfBuffers: {}, bufferSize: {}, maxEntriesPerBuffer: {} ]",
- nseBuffersReservedMemoryMB,
+ LOG.info("NodeStateEntryBuffers: [ workingMemory: {} MB,
numberOfBuffers: {}, bufferSize: {}, sortBufferReservedMemory: {} ]",
+ nseWorkingMemoryMB,
nseBuffersCount,
IOUtils.humanReadableByteCountBin(nseBuffersSizeBytes),
- nseBufferMaxEntriesPerBuffer);
+
IOUtils.humanReadableByteCountBin(memoryReservedForSortKeysArray)
+ );
+ }
+
+ static long estimateMaxSizeOfSortKeyArray(long nseWorkingMemoryBytes, long
nseBuffersCount, int sortBufferMemoryPercentage) {
+ // We reserve a percentage of the size of a buffer for the sort keys
array. That is, we are assuming that for every line
+ // in the sort buffer, the memory needed to store the SortKey of the
path section of the line will not be more
+ // than sortBufferMemoryPercentage of the total size of the line in
average
+ // Estimate memory needed by the sort keys array. We assume each entry
requires 256 bytes.
+ long approxNseBufferSize = limitToIntegerRange(nseWorkingMemoryBytes /
nseBuffersCount);
+ return approxNseBufferSize * sortBufferMemoryPercentage / 100;
}
private int readNSEBuffersReservedMemory() {
@@ -390,12 +398,13 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
// Create empty buffers
for (int i = 0; i < nseBuffersCount; i++) {
-
emptyBatchesQueue.add(NodeStateEntryBatch.createNodeStateEntryBatch(nseBuffersSizeBytes,
nseBufferMaxEntriesPerBuffer));
+ // No limits on the number of entries, only on their total
size. This might be revised later.
+
emptyBatchesQueue.add(NodeStateEntryBatch.createNodeStateEntryBatch(nseBuffersSizeBytes,
Integer.MAX_VALUE));
}
LOG.info("[TASK:PIPELINED-DUMP:START] Starting to build FFS");
Stopwatch start = Stopwatch.createStarted();
- PipelinedMongoDownloadTask downloadTask = new
PipelinedMongoDownloadTask(
+ ecs.submit(new PipelinedMongoDownloadTask(
mongoDatabase,
docStore,
(int) (mongoDocBatchMaxSizeMB * FileUtils.ONE_MB),
@@ -403,14 +412,11 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
mongoDocQueue,
pathFilters,
statisticsProvider
- );
- ecs.submit(downloadTask);
-
- Path flatFileStore = null;
+ ));
for (int i = 0; i < numberOfTransformThreads; i++) {
NodeStateEntryWriter entryWriter = new
NodeStateEntryWriter(blobStore);
- PipelinedTransformTask transformTask = new
PipelinedTransformTask(
+ ecs.submit(new PipelinedTransformTask(
docStore,
documentNodeStore,
rootRevision,
@@ -420,30 +426,27 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
emptyBatchesQueue,
nonEmptyBatchesQueue,
transformStageStatistics
- );
- ecs.submit(transformTask);
+ ));
}
- PipelinedSortBatchTask sortTask = new PipelinedSortBatchTask(
+ ecs.submit(new PipelinedSortBatchTask(
this.getStoreDir().toPath(),
pathComparator,
this.getAlgorithm(),
emptyBatchesQueue,
nonEmptyBatchesQueue,
sortedFilesQueue
- );
- ecs.submit(sortTask);
+ ));
PipelinedMergeSortTask mergeSortTask = new PipelinedMergeSortTask(
this.getStoreDir().toPath(),
pathComparator,
this.getAlgorithm(),
- sortedFilesQueue,
- statisticsProvider
- );
- ecs.submit(mergeSortTask);
+ sortedFilesQueue, statisticsProvider);
+ ecs.submit(mergeSortTask);
+ Path flatFileStore = null;
try {
LOG.info("Waiting for tasks to complete");
int tasksFinished = 0;
@@ -480,8 +483,16 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
} else if (result instanceof
PipelinedSortBatchTask.Result) {
PipelinedSortBatchTask.Result sortTaskResult =
(PipelinedSortBatchTask.Result) result;
LOG.info("Sort batch task finished. Entries
processed: {}", sortTaskResult.getTotalEntries());
- printStatistics(mongoDocQueue, emptyBatchesQueue,
nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics, true);
sortedFilesQueue.put(SENTINEL_SORTED_FILES_QUEUE);
+ // The buffers between transform and merge sort
tasks are no longer needed, so remove them
+ // from the queues so they can be garbage
collected.
+ // These buffers can be very large, so this is
important to avoid running out of memory in
+ // the merge-sort phase
+ if (!nonEmptyBatchesQueue.isEmpty()) {
+ LOG.warn("emptyBatchesQueue is not empty.
Size: {}", emptyBatchesQueue.size());
+ }
+ emptyBatchesQueue.clear();
+ printStatistics(mongoDocQueue, emptyBatchesQueue,
nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics, true);
} else if (result instanceof
PipelinedMergeSortTask.Result) {
PipelinedMergeSortTask.Result mergeSortedFilesTask
= (PipelinedMergeSortTask.Result) result;
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
index f3f269984c..9d9f94f849 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
@@ -20,6 +20,7 @@ package
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.guava.common.base.Stopwatch;
+import org.apache.jackrabbit.oak.commons.IOUtils;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
import
org.apache.jackrabbit.oak.index.indexer.document.flatfile.NodeStateEntryWriter;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeState;
@@ -88,6 +89,7 @@ class PipelinedTransformTask implements
Callable<PipelinedTransformTask.Result>
private final TransformStageStatistics statistics;
private final int threadId = threadIdGenerator.getAndIncrement();
private long totalEnqueueDelayMillis = 0;
+ private long totalEmptyBatchQueueWaitTimeMillis = 0;
public PipelinedTransformTask(MongoDocumentStore mongoStore,
DocumentNodeStore documentNodeStore,
@@ -132,16 +134,20 @@ class PipelinedTransformTask implements
Callable<PipelinedTransformTask.Result>
NodeDocument[] nodeDocumentBatch = mongoDocQueue.take();
totalDocumentQueueWaitTimeMillis +=
docQueueWaitStopwatch.elapsed(TimeUnit.MILLISECONDS);
if (nodeDocumentBatch == SENTINEL_MONGO_DOCUMENT) {
- String totalDocumentQueueWaitPercentage =
String.format("%1.2f", (100.0 * totalDocumentQueueWaitTimeMillis) /
taskStartWatch.elapsed(TimeUnit.MILLISECONDS));
- String totalEnqueueDelayPercentage =
String.format("%1.2f", (100.0 * totalEnqueueDelayMillis) /
taskStartWatch.elapsed(TimeUnit.MILLISECONDS));
+ long totalDurationMillis =
taskStartWatch.elapsed(TimeUnit.MILLISECONDS);
+ String totalDocumentQueueWaitPercentage =
PipelinedUtils.formatAsPercentage(totalDocumentQueueWaitTimeMillis,
totalDurationMillis);
+ String totalEnqueueDelayPercentage =
PipelinedUtils.formatAsPercentage(totalEnqueueDelayMillis, totalDurationMillis);
+ String totalEmptyBatchQueueWaitPercentage =
PipelinedUtils.formatAsPercentage(totalEmptyBatchQueueWaitTimeMillis,
totalDurationMillis);
String metrics = MetricsFormatter.newBuilder()
.add("duration",
FormattingUtils.formatToSeconds(taskStartWatch))
- .add("durationSeconds",
taskStartWatch.elapsed(TimeUnit.SECONDS))
+ .add("durationSeconds", totalDurationMillis / 1000)
.add("nodeStateEntriesGenerated", totalEntryCount)
.add("enqueueDelayMillis", totalEnqueueDelayMillis)
.add("enqueueDelayPercentage",
totalEnqueueDelayPercentage)
.add("documentQueueWaitMillis",
totalDocumentQueueWaitTimeMillis)
.add("documentQueueWaitPercentage",
totalDocumentQueueWaitPercentage)
+ .add("totalEmptyBatchQueueWaitTimeMillis",
totalEmptyBatchQueueWaitTimeMillis)
+ .add("totalEmptyBatchQueueWaitPercentage",
totalEmptyBatchQueueWaitPercentage)
.build();
LOG.info("[TASK:{}:END] Metrics: {}",
threadName.toUpperCase(Locale.ROOT), metrics);
//Save the last batch
@@ -155,7 +161,7 @@ class PipelinedTransformTask implements
Callable<PipelinedTransformTask.Result>
if (mongoObjectsProcessed % 50000 == 0) {
LOG.info("Mongo objects: {}, total entries: {},
current batch: {}, Size: {}/{} MB",
mongoObjectsProcessed, totalEntryCount,
nseBatch.numberOfEntries(),
- nseBatch.sizeOfEntries() /
FileUtils.ONE_MB,
+ nseBatch.sizeOfEntriesBytes() /
FileUtils.ONE_MB,
nseBatch.capacity() / FileUtils.ONE_MB
);
}
@@ -182,11 +188,14 @@ class PipelinedTransformTask implements
Callable<PipelinedTransformTask.Result>
entrySize =
nseBatch.addEntry(path, jsonBytes);
} catch
(NodeStateEntryBatch.BufferFullException e) {
LOG.info("Buffer full, passing
buffer to sort task. Total entries: {}, entries in buffer {}, buffer size: {}",
- totalEntryCount,
nseBatch.numberOfEntries(), nseBatch.sizeOfEntries());
+ totalEntryCount,
nseBatch.numberOfEntries(),
IOUtils.humanReadableByteCountBin(nseBatch.sizeOfEntriesBytes()));
nseBatch.flip();
tryEnqueue(nseBatch);
// Get an empty buffer
+ Stopwatch
emptyBatchesQueueStopwatch = Stopwatch.createStarted();
nseBatch =
emptyBatchesQueue.take();
+ totalEmptyBatchQueueWaitTimeMillis
+= emptyBatchesQueueStopwatch.elapsed(TimeUnit.MILLISECONDS);
+
// Now it must fit, otherwise it
means that the buffer is smaller than a single
// entry, which is an error.
entrySize =
nseBatch.addEntry(path, jsonBytes);
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedUtils.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedUtils.java
new file mode 100644
index 0000000000..85fad87ae0
--- /dev/null
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
+
+import org.apache.commons.io.FileUtils;
+
+import java.util.Locale;
+
+public class PipelinedUtils {
+ /**
+ * <p>Format a percentage as a string with 2 decimal places. For instance:
+ * <code>formatAsPercentage(52, 1000)</code> returns
<code>"5.20"</code>.</p>
+ */
+ public static String formatAsPercentage(long numerator, long denominator) {
+ if (denominator == 0) {
+ return "N/A";
+ } else {
+ return String.format(Locale.ROOT, "%1.2f", (100.0 * numerator) /
denominator);
+ }
+ }
+
+ /**
+ * <p>Convert to a percentage as an integer from 0 to 100, with -1
representing an undefined value.</p>
+ */
+ public static int toPercentage(long numerator, long denominator) {
+ if (denominator == 0) {
+ return -1;
+ } else {
+ return (int) Math.round((100.0 * numerator) / denominator);
+ }
+ }
+
+ public static String formatAsTransferSpeedMBs(long numberOfBytes, long
timeMillis) {
+ if (timeMillis == 0) {
+ return "N/A";
+ } else {
+ double speed = 1000 * (((double) numberOfBytes) / timeMillis) /
FileUtils.ONE_MB;
+ return String.format(Locale.ROOT, "%1.2f MB/s", speed);
+ }
+ }
+
+}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/TransformStageStatistics.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/TransformStageStatistics.java
index 0b586c77e4..61cf67e02a 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/TransformStageStatistics.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/TransformStageStatistics.java
@@ -169,10 +169,8 @@ public class TransformStageStatistics {
long documentsRejectedTotal = documentsRejectedSplitSum +
documentsRejectedEmptyNodeStateSum;
long documentsAcceptedTotal = mongoDocumentsTraversedSum -
documentsRejectedTotal;
long totalEntries = entriesAcceptedSum + entriesRejectedSum;
- String documentsAcceptedPercentage = mongoDocumentsTraversedSum == 0 ?
"N/A" :
- String.format("%2.1f%%", (100.0 * documentsAcceptedTotal) /
mongoDocumentsTraversedSum);
- String entriesAcceptedPercentage = totalEntries == 0 ? "N/A" :
- String.format("%1.1f%%", (100.0 * entriesAcceptedSum) /
totalEntries);
+ String documentsAcceptedPercentage =
PipelinedUtils.formatAsPercentage(documentsAcceptedTotal,
mongoDocumentsTraversedSum);
+ String entriesAcceptedPercentage =
PipelinedUtils.formatAsPercentage(entriesAcceptedSum, totalEntries);
long avgEntrySize = entriesAcceptedSum == 0 ? -1 :
extractedEntriesTotalSizeSum / entriesAcceptedSum;
return MetricsFormatter.newBuilder()
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/indexstore/IndexStoreUtils.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/indexstore/IndexStoreUtils.java
index b901621887..cdb3c1f78d 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/indexstore/IndexStoreUtils.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/indexstore/IndexStoreUtils.java
@@ -88,14 +88,12 @@ public class IndexStoreUtils {
return new BufferedWriter(new
OutputStreamWriter(algorithm.getOutputStream(out)));
}
- public static BufferedOutputStream createOutputStream(File file,
Compression algorithm) throws IOException {
- OutputStream out = new FileOutputStream(file);
- return new BufferedOutputStream(algorithm.getOutputStream(out));
- }
-
- public static BufferedOutputStream createOutputStream(Path file,
Compression algorithm) throws IOException {
- OutputStream out = Files.newOutputStream(file);
- return new BufferedOutputStream(algorithm.getOutputStream(out));
+ public static OutputStream createOutputStream(Path file, Compression
algorithm) throws IOException {
+ // The output streams created by LZ4 and GZIP buffer their input, so
we should not wrap then again.
+ // However, the implementation of the compression streams may make
small writes to the underlying stream,
+ // so we buffer the FileOutputStream
+ OutputStream out = new
BufferedOutputStream(Files.newOutputStream(file));
+ return algorithm.getOutputStream(out);
}
public static long sizeOf(List<File> sortedFiles) {
diff --git
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java
index 8051f2c3f3..4d7857c8b3 100644
---
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java
+++
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java
@@ -20,8 +20,6 @@ package
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
import org.junit.Test;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@@ -48,43 +46,46 @@ public class NodeStateEntryBatchTest {
@Test
public void testMaximumBufferSize() {
NodeStateEntryBatch batch =
NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE,
10);
- batch.addEntry("a", new byte[NodeStateEntryBatch.MIN_BUFFER_SIZE - 4 -
2]); // Needs 4 bytes for the length and 2 for the key
+ int keySize = "a".getBytes(StandardCharsets.UTF_8).length;
+ batch.addEntry("a", new byte[NodeStateEntryBatch.MIN_BUFFER_SIZE - 4 -
4 - keySize]); // Needs 4 bytes for the length of the path and of the entry data
assertThrows(NodeStateEntryBatch.BufferFullException.class, () ->
batch.addEntry("b", new byte[NodeStateEntryBatch.MIN_BUFFER_SIZE]));
- assertEquals(NodeStateEntryBatch.MIN_BUFFER_SIZE,
batch.sizeOfEntries());
+ assertEquals(NodeStateEntryBatch.MIN_BUFFER_SIZE,
batch.sizeOfEntriesBytes());
assertEquals(1, batch.numberOfEntries());
assertThrows(NodeStateEntryBatch.BufferFullException.class, () ->
batch.addEntry("b", new byte[1]));
}
@Test
- public void flipAndResetBuffer() throws IOException {
- int sizeOfEntry = NodeStateEntryBatch.MIN_BUFFER_SIZE-4;
- String key = "a";
- int jsonNodeLength = sizeOfEntry-2; // minus key and pipe characters
-
+ public void flipAndResetBuffer() {
NodeStateEntryBatch batch =
NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE,
10);
- byte[] jsonNodeBytes = new byte[jsonNodeLength];
- for (int i = 0; i < jsonNodeLength; i++) {
+ int expectedBytesWrittenToBuffer = NodeStateEntryBatch.MIN_BUFFER_SIZE;
+
+ String key = "a";
+ int keyLength = "a".getBytes(StandardCharsets.UTF_8).length;
+ byte[] jsonNodeBytes = new byte[batch.capacity()-4-4-keyLength];
+ for (int i = 0; i < jsonNodeBytes.length; i++) {
jsonNodeBytes[i] = (byte) (i % 127);
}
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- baos.write("a|".getBytes(StandardCharsets.UTF_8));
- baos.write(jsonNodeBytes);
- byte[] expectedContents = baos.toByteArray();
- int size = batch.addEntry(key, jsonNodeBytes);
- assertEquals(size, sizeOfEntry);
- assertEquals(batch.getBuffer().position(), sizeOfEntry + 4);
+
+ int bytesWrittenToBuffer = batch.addEntry(key, jsonNodeBytes);
+ assertEquals(bytesWrittenToBuffer, expectedBytesWrittenToBuffer);
+ assertEquals(batch.getBuffer().position(),
expectedBytesWrittenToBuffer);
batch.flip();
ByteBuffer buffer = batch.getBuffer();
assertEquals(buffer.position(), 0);
- assertEquals(buffer.remaining(), sizeOfEntry + 4);
- assertEquals(sizeOfEntry, buffer.getInt());
- byte[] entryData = new byte[sizeOfEntry];
- buffer.get(entryData);
- assertEquals(buffer.position(), sizeOfEntry + 4);
- assertArrayEquals(expectedContents, entryData);
+ assertEquals(buffer.remaining(), expectedBytesWrittenToBuffer);
+ assertEquals(keyLength, buffer.getInt());
+ byte[] keyBytes = new byte[keyLength];
+ buffer.get(keyBytes);
+ assertEquals(key, new String(keyBytes, StandardCharsets.UTF_8));
+
+ int jsonLength = buffer.getInt();
+ byte[] jsonBytes = new byte[jsonLength];
+ buffer.get(jsonBytes);
+ assertEquals(buffer.position(), expectedBytesWrittenToBuffer);
+ assertArrayEquals(jsonNodeBytes, jsonBytes);
batch.reset();
diff --git
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
index 70cf463ffa..2e88536fd0 100644
---
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
+++
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
@@ -75,6 +75,7 @@ public class PipelinedSortBatchTaskTest {
@Test
public void emptyBatch() throws Exception {
NodeStateEntryBatch batch =
NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE,
10);
+ batch.flip();
TestResult testResult = runTest(batch);
@@ -93,6 +94,7 @@ public class PipelinedSortBatchTaskTest {
addEntry(batch, "/a0/b1", "{\"key\":5}");
addEntry(batch, "/a0/b0/c1", "{\"key\":4}");
addEntry(batch, "/a0/b0/c0", "{\"key\":3}");
+ batch.flip();
TestResult testResult = runTest(batch);
@@ -119,11 +121,13 @@ public class PipelinedSortBatchTaskTest {
addEntry(batch1, "/a0/b0", "{\"key\":2}");
addEntry(batch1, "/a0", "{\"key\":1}");
addEntry(batch1, "/a1/b0", "{\"key\":6}");
+ batch1.flip();
NodeStateEntryBatch batch2 =
NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE,
10);
addEntry(batch2, "/a0/b1", "{\"key\":5}");
addEntry(batch2, "/a0/b0/c1", "{\"key\":4}");
addEntry(batch2, "/a0/b0/c0", "{\"key\":3}");
+ batch2.flip();
TestResult testResult = runTest(batch1, batch2);
diff --git
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedUtilsTest.java
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedUtilsTest.java
new file mode 100644
index 0000000000..6749f0e2bb
--- /dev/null
+++
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedUtilsTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
+
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedUtils.formatAsPercentage;
+import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedUtils.formatAsTransferSpeedMBs;
+import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedUtils.toPercentage;
+import static org.junit.Assert.assertEquals;
+
+public class PipelinedUtilsTest {
+ @Test
+ public void testFormatAsPercentage() {
+ assertEquals("0.00", formatAsPercentage(0, 100));
+ assertEquals("1.00", formatAsPercentage(1, 100));
+ assertEquals("0.10", formatAsPercentage(1, 1000));
+ assertEquals("0.01", formatAsPercentage(1, 10_000));
+ assertEquals("N/A", formatAsPercentage(1, 0));
+ assertEquals("100.00", formatAsPercentage(100, 100));
+ assertEquals("120.00", formatAsPercentage(120, 100));
+ assertEquals("314.16", formatAsPercentage(355, 113));
+ }
+
+ @Test
+ public void testFormatAsTransferSpeedMBs() {
+ assertEquals("0.95 MB/s", formatAsTransferSpeedMBs(1_000_000,
TimeUnit.SECONDS.toMillis(1)));
+ assertEquals("0.00 MB/s", formatAsTransferSpeedMBs(0,
TimeUnit.SECONDS.toMillis(1)));
+ assertEquals("0.00 MB/s", formatAsTransferSpeedMBs(1,
TimeUnit.SECONDS.toMillis(1)));
+ assertEquals("N/A", formatAsTransferSpeedMBs(1_000_000,
TimeUnit.SECONDS.toMillis(0)));
+ assertEquals("8796093022208.00 MB/s",
formatAsTransferSpeedMBs(Long.MAX_VALUE, TimeUnit.SECONDS.toMillis(1)));
+ assertEquals("-8796093022208.00 MB/s",
formatAsTransferSpeedMBs(Long.MIN_VALUE, TimeUnit.SECONDS.toMillis(1)));
+ assertEquals("0.00 MB/s", formatAsTransferSpeedMBs(1_000_000,
Long.MAX_VALUE));
+ assertEquals("-0.00 MB/s", formatAsTransferSpeedMBs(1_000_000,
Long.MIN_VALUE));
+ }
+
+ @Test
+ public void testToPercentage() {
+ assertEquals(0, toPercentage (0, 100));
+ assertEquals(1, toPercentage(1, 100));
+ assertEquals(0, toPercentage(1, 1000));
+ assertEquals(0, toPercentage(1, 10_000));
+ assertEquals(-1, toPercentage(1, 0));
+ assertEquals(100, toPercentage(100, 100));
+ assertEquals(120, toPercentage(120, 100));
+ assertEquals(314, toPercentage(355, 113));
+ }
+}
\ No newline at end of file