This is an automated email from the ASF dual-hosted git repository.
fortino pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new 546e889da6 OAK-10460 - PIPELINED strategy: support eager merging of
intermediate sorted files (#1156)
546e889da6 is described below
commit 546e889da6782d8b67f5c9427bdfd3e496e66f7f
Author: Nuno Santos <[email protected]>
AuthorDate: Thu Oct 26 09:34:05 2023 +0200
OAK-10460 - PIPELINED strategy: support eager merging of intermediate
sorted files (#1156)
* Refactor Indexing logic to take only a MongoDatabase instance instead of
a MongoConnection, which is a more complex object with more fields other than
the MongoDatabase.
* Remove unused getter.
* Add eager merging of intermediate sorted files in Pipeline indexing
strategy.
Add an optimized variant of external merge which avoids the conversion from
binary to UTF-8 and then back to binary of the lines read from the files being
merged.
* Retrigger CI.
* Add missing license files.
* Improve documentation and refactor code.
Fix wrong termination condition when computing list of candidate files to
merge.
* Make internal helper class private.
* Improve docs, refactor test to use Path instead of File.
* Remove uses of deprecated methods.
* Clarify purpose of disabled test.
* Address review comments.
* Improve logging of byte sizes.
* Remove obsolete comment.
* Reduce the default of the minimum number of files to merge from 8 to 4.
* Trigger CI.
* - Add system property to configure size of read buffer in external merge
sort: "oak.indexer.pipelined.externalMerge.readBufferSize", default 16KB
- Fix: missing explicit charset when converting from byte array to String.
* Fix
* Retrigger CI.
* Retrigger CI.
* Increase version of oak.commons.sort
* Add test to ExternalSortByteArray
---
.../jackrabbit/oak/commons/sort/ExternalSort.java | 2 +-
.../oak/commons/sort/ExternalSortByteArray.java | 223 +++++++++++++++++
.../jackrabbit/oak/commons/sort/package-info.java | 2 +-
.../commons/sort/ExternalSortByteArrayTest.java | 183 ++++++++++++++
.../oak/commons/sort/ExternalSortTest.java | 4 +-
.../document/flatfile/FlatFileStoreUtils.java | 6 -
.../flatfile/pipelined/NodeStateHolder.java | 23 +-
.../flatfile/pipelined/NodeStateHolderFactory.java | 84 +++++++
.../flatfile/pipelined/PathElementComparator.java | 7 +-
.../flatfile/pipelined/PipelinedMergeSortTask.java | 273 +++++++++++++++++----
.../flatfile/pipelined/PipelinedSortBatchTask.java | 25 +-
.../flatfile/pipelined/PipelinedStrategy.java | 32 ++-
.../document/indexstore/IndexStoreUtils.java | 7 +
.../document/flatfile/pipelined/PipelinedIT.java | 10 +-
.../PipelinedMergeSortTaskParameterizedTest.java | 117 +++++++++
.../pipelined/PipelinedMergeSortTaskTest.java | 132 ++++++----
.../pipelined/PipelinedMergeSortTaskTestBase.java | 129 ++++++++++
.../pipelined/PipelinedSortBatchTaskTest.java | 39 +--
18 files changed, 1123 insertions(+), 175 deletions(-)
diff --git
a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSort.java
b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSort.java
index 03a6404009..706bae7aca 100644
---
a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSort.java
+++
b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSort.java
@@ -627,7 +627,7 @@ public class ExternalSort {
// Write if filterPredicate return true and line is not
duplicate
if (!distinct || (lastLine == null || (lastLine != null &&
cmp.compare(r, lastLine) != 0))) {
fbw.write(typeToString.apply(r));
- fbw.newLine();
+ fbw.write('\n');
lastLine = r;
}
}
diff --git
a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortByteArray.java
b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortByteArray.java
new file mode 100644
index 0000000000..10c3d7735e
--- /dev/null
+++
b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortByteArray.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.commons.sort;
+
+import org.apache.jackrabbit.guava.common.base.Preconditions;
+import org.apache.jackrabbit.oak.commons.Compression;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.function.Function;
+
+/**
+ * Variation of ExternalSort that stores the lines read from intermediate
files as byte arrays to avoid the conversion
+ * from byte[] to String and then back.
+ */
+public class ExternalSortByteArray {
+ private final static int DEFAULT_BUFFER_SIZE = 16 * 1024;
+
+ public static <T> void mergeSortedFilesBinary(List<Path> files,
BufferedOutputStream fbw, final Comparator<T> cmp,
+ boolean distinct,
Compression algorithm,
+ Function<T, byte[]>
typeToByteArray, Function<byte[], T> byteArrayToType)
+ throws IOException {
+ mergeSortedFilesBinary(files, fbw, cmp, distinct, algorithm,
typeToByteArray, byteArrayToType, DEFAULT_BUFFER_SIZE);
+ }
+
+ public static <T> void mergeSortedFilesBinary(List<Path> files,
BufferedOutputStream fbw, final Comparator<T> cmp,
+ boolean distinct,
Compression algorithm,
+ Function<T, byte[]>
typeToByteArray, Function<byte[], T> byteArrayToType, int readBufferSize)
+ throws IOException {
+ ArrayList<BinaryFileBuffer<T>> bfbs = new ArrayList<>();
+ try {
+ for (Path f : files) {
+ InputStream in =
algorithm.getInputStream(Files.newInputStream(f));
+ bfbs.add(new BinaryFileBuffer<>(in, byteArrayToType,
readBufferSize));
+ }
+ mergeBinary(fbw, cmp, distinct, bfbs, typeToByteArray);
+ } finally {
+ for (BinaryFileBuffer<T> buffer : bfbs) {
+ try {
+ buffer.close();
+ } catch (Exception ignored) {
+ }
+ }
+ for (Path f : files) {
+ Files.deleteIfExists(f);
+ }
+ }
+ }
+
+ private static <T> int mergeBinary(BufferedOutputStream fbw, final
Comparator<T> cmp, boolean distinct,
+ List<BinaryFileBuffer<T>> buffers,
Function<T, byte[]> typeToByteArray)
+ throws IOException {
+ PriorityQueue<BinaryFileBuffer<T>> pq = new PriorityQueue<>(
+ 11,
+ (i, j) -> cmp.compare(i.peek(), j.peek())
+ );
+ for (BinaryFileBuffer<T> bfb : buffers) {
+ if (!bfb.empty()) {
+ pq.add(bfb);
+ }
+ }
+ int rowcounter = 0;
+ T lastLine = null;
+ while (!pq.isEmpty()) {
+ BinaryFileBuffer<T> bfb = pq.poll();
+ T r = bfb.pop();
+ // Skip duplicate lines
+ if (!distinct || lastLine == null || cmp.compare(r, lastLine) !=
0) {
+ fbw.write(typeToByteArray.apply(r));
+ fbw.write('\n');
+ lastLine = r;
+ }
+ ++rowcounter;
+ if (bfb.empty()) {
+ bfb.fbr.close();
+ } else {
+ pq.add(bfb); // add it back
+ }
+ }
+ return rowcounter;
+ }
+
+ /**
+ * WARNING: Uses '\n' as a line separator, it will not work with other
line separators.
+ */
+ private static class BinaryFileBuffer<T> {
+ public final InputStream fbr;
+ private final Function<byte[], T> byteArrayToType;
+ private T cache;
+ private boolean empty;
+
+ // Used to reassemble the lines read from the source input stream.
+ private final ByteArrayOutputStream bais = new ByteArrayOutputStream();
+ private final byte[] buffer;
+ private int bufferPos = 0;
+ private int bufferLimit = 0;
+
+ public BinaryFileBuffer(InputStream r, Function<byte[], T>
byteArrayToType, int bufferSize)
+ throws IOException {
+ Preconditions.checkArgument(bufferSize > 1024, "Buffer size must
be greater than 1024 bytes");
+ this.fbr = r;
+ this.byteArrayToType = byteArrayToType;
+ this.buffer = new byte[bufferSize];
+ reload();
+ }
+
+ public boolean empty() {
+ return this.empty;
+ }
+
+ private void reload() throws IOException {
+ try {
+ byte[] line = readLine();
+ this.cache = byteArrayToType.apply(line);
+ this.empty = this.cache == null;
+ } catch (EOFException oef) {
+ this.empty = true;
+ this.cache = null;
+ }
+ }
+
+ private boolean bufferIsEmpty() {
+ return bufferPos >= bufferLimit;
+ }
+
+ /*
+ * Read a line from the source input as a byte array. This is adapted
from the implementation of
+ * BufferedReader#readLine() but without converting the line to a
String.
+ */
+ private byte[] readLine() throws IOException {
+ bais.reset();
+
+ for (; ; ) {
+ if (bufferIsEmpty()) {
+ bufferLimit = fbr.read(buffer);
+ bufferPos = 0;
+ }
+ if (bufferIsEmpty()) { /* EOF */
+ // Buffer is still empty even after trying to read from
input stream. We have reached the EOF
+ // Return whatever is left on the bais
+ if (bais.size() == 0) {
+ return null;
+ } else {
+ return bais.toByteArray();
+ }
+ }
+ // Start reading a new line
+ int startByte = bufferPos;
+ while (!bufferIsEmpty()) {
+ byte c = buffer[bufferPos];
+ bufferPos++;
+ if (c == '\n') { /* EOL */
+ int lineSegmentSize = bufferPos - startByte - 1; //
exclude \n
+ if (bais.size() == 0) {
+ // There is no partial data on the bais, which
means that the whole line is in the
+ // buffer. In this case, we can extract the line
directly from the buffer without
+ // copying first to the bais
+ if (lineSegmentSize == 0) {
+ return null;
+ } else {
+ // Copy the line from the buffer to a new byte
array and return it
+ byte[] line = new byte[lineSegmentSize];
+ System.arraycopy(buffer, startByte, line, 0,
lineSegmentSize);
+ return line;
+ }
+ } else {
+ // The first section of the line is in the bais,
the remainder in the buffer. Finish
+ // reassembling the line in the bais and return it
+ bais.write(buffer, startByte, lineSegmentSize);
+ if (bais.size() == 0) {
+ return null;
+ } else {
+ return bais.toByteArray();
+ }
+ }
+ }
+ }
+ // Reached the end of the buffer. Copy whatever is left in the
buffer and read more from the source stream
+ bais.write(buffer, startByte, bufferPos - startByte);
+ }
+ }
+
+ public void close() throws IOException {
+ this.fbr.close();
+ }
+
+ public T peek() {
+ if (empty()) {
+ return null;
+ }
+ return this.cache;
+ }
+
+ public T pop() throws IOException {
+ T answer = peek();
+ reload();
+ return answer;
+ }
+ }
+}
diff --git
a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java
b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java
index 37c743fdaa..10f91cc554 100644
---
a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java
+++
b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-@Version("1.2.0")
+@Version("1.3.0")
package org.apache.jackrabbit.oak.commons.sort;
import org.osgi.annotation.versioning.Version;
diff --git
a/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortByteArrayTest.java
b/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortByteArrayTest.java
new file mode 100644
index 0000000000..3279a582a6
--- /dev/null
+++
b/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortByteArrayTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.commons.sort;
+
+import org.apache.jackrabbit.oak.commons.Compression;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertArrayEquals;
+
+public class ExternalSortByteArrayTest {
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+ private final static Charset charset = StandardCharsets.UTF_8;
+ private final static Comparator<BinaryTestLine> cmp =
Comparator.naturalOrder();
+ private final static Function<byte[], BinaryTestLine> byteArrayToType =
line -> line != null ? new BinaryTestLine(line) : null;
+ private final static Function<BinaryTestLine, byte[]> typeToByteArray = tl
-> tl != null ? tl.bytes : null;
+
+ @Test
+ public void sortManyFilesNoCompression() throws Exception {
+ sortManyFiles(Compression.NONE);
+ }
+
+ @Test
+ public void sortManyFilesGzipCompression() throws Exception {
+ sortManyFiles(Compression.GZIP);
+ }
+
+ @Test
+ public void sortManyFilesLZ4Compression() throws Exception {
+ sortManyFiles(ExternalSortTest.LZ4());
+ }
+
+ public void sortManyFiles(Compression compression) throws Exception {
+ int testCount = 1000;
+ List<BinaryTestLine> testLines = generateTestLines(testCount);
+
+ List<BinaryTestLine> testLinesShuffled = new ArrayList<>(testLines);
+ Collections.shuffle(testLinesShuffled);
+
+ List<Path> intermediateFiles =
createIntermediateFiles(testLinesShuffled, 10, compression);
+ Path resultFile =
folder.newFile(compression.addSuffix("sorted.json")).toPath();
+
+ try (BufferedOutputStream bos = new
BufferedOutputStream(compression.getOutputStream(Files.newOutputStream(resultFile))))
{
+ ExternalSortByteArray.mergeSortedFilesBinary(intermediateFiles,
+ bos,
+ cmp,
+ true,
+ compression,
+ typeToByteArray,
+ byteArrayToType);
+ }
+
+ ArrayList<String> lines = new ArrayList<>();
+ try (BufferedReader bufferedReader = new BufferedReader(new
InputStreamReader(compression.getInputStream(Files.newInputStream(resultFile)),
charset))) {
+ while (true) {
+ String line = bufferedReader.readLine();
+ if (line == null) {
+ break;
+ }
+ lines.add(line);
+ }
+ }
+ String[] actual = lines.toArray(new String[0]);
+ String[] expected = testLines.stream().map(tl ->
tl.line).toArray(String[]::new);
+ assertArrayEquals(expected, actual);
+ }
+
+ private List<BinaryTestLine> generateTestLines(int numberOfLines) {
+ List<BinaryTestLine> testLines = new ArrayList<>(numberOfLines);
+ for (int i = 0; i < numberOfLines; i++) {
+ testLines.add(new BinaryTestLine(i + ":" + "foo-" + i));
+ }
+ return testLines;
+ }
+
+ private List<Path> createIntermediateFiles(List<BinaryTestLine> ffsLines,
int numberOfFiles, Compression compression) throws Exception {
+ Iterator<BinaryTestLine> ffsIter = ffsLines.iterator();
+ Path workFolder = folder.newFolder("merge_many_test").toPath();
+ ArrayList<Path> intermediateFiles = new ArrayList<>(numberOfFiles);
+ int linesPerFile = ffsLines.size() / numberOfFiles;
+
+ for (int fileIdx = 0; fileIdx < numberOfFiles; fileIdx++) {
+ Path intermediateFile =
workFolder.resolve(compression.addSuffix("intermediate-" + fileIdx + ".json"));
+ ArrayList<BinaryTestLine> binaryTestLinesInFile = new
ArrayList<>();
+ while (binaryTestLinesInFile.size() < linesPerFile &&
ffsIter.hasNext()) {
+ binaryTestLinesInFile.add(ffsIter.next());
+ }
+ if (fileIdx == numberOfFiles - 1) {
+ // Add the remaining elements to the last file
+ while (ffsIter.hasNext()) {
+ binaryTestLinesInFile.add(ffsIter.next());
+ }
+ }
+ binaryTestLinesInFile.sort(cmp);
+
+ try (BufferedWriter bw = new BufferedWriter(
+ new OutputStreamWriter(
+ compression.getOutputStream(
+
Files.newOutputStream(intermediateFile))))) {
+ for (BinaryTestLine binaryTestLine : binaryTestLinesInFile) {
+ bw.write(binaryTestLine.line);
+ bw.write("\n");
+ }
+ }
+ intermediateFiles.add(intermediateFile);
+ }
+ return intermediateFiles;
+ }
+
+ private static class BinaryTestLine implements Comparable<BinaryTestLine> {
+ final String line;
+ final int value;
+ final byte[] bytes;
+
+ public BinaryTestLine(String line) {
+ this.line = line;
+ this.value = Integer.parseInt(line.substring(0,
line.indexOf(':')));
+ this.bytes = line.getBytes(StandardCharsets.UTF_8);
+ }
+
+ public BinaryTestLine(byte[] bytes) {
+ this.bytes = bytes;
+ this.line = new String(bytes, StandardCharsets.UTF_8);
+ this.value = Integer.parseInt(line.substring(0,
line.indexOf(':')));
+ }
+
+ @Override
+ public int compareTo(BinaryTestLine o) {
+ return Integer.compare(value, o.value);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ BinaryTestLine binaryTestLine = (BinaryTestLine) o;
+ return line.equals(binaryTestLine.line);
+ }
+
+ @Override
+ public int hashCode() {
+ return line.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return line;
+ }
+ }
+}
\ No newline at end of file
diff --git
a/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortTest.java
b/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortTest.java
index 359bda6646..47530a471b 100644
---
a/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortTest.java
+++
b/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/sort/ExternalSortTest.java
@@ -518,7 +518,7 @@ public class ExternalSortTest {
}
- private static class TestLine implements Comparable<TestLine> {
+ static class TestLine implements Comparable<TestLine> {
final String line;
final int value;
@@ -589,7 +589,7 @@ public class ExternalSortTest {
}
}
- private static Compression LZ4() {
+ static Compression LZ4() {
return new Compression() {
@Override
public InputStream getInputStream(InputStream in) throws
IOException {
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreUtils.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreUtils.java
index 06fa755d16..2b5b36298c 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreUtils.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreUtils.java
@@ -21,7 +21,6 @@ package
org.apache.jackrabbit.oak.index.indexer.document.flatfile;
import org.apache.jackrabbit.oak.commons.Compression;
-import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
@@ -70,11 +69,6 @@ public class FlatFileStoreUtils {
return new BufferedWriter(new
OutputStreamWriter(algorithm.getOutputStream(out)));
}
- public static BufferedOutputStream createOutputStream(File file,
Compression algorithm) throws IOException {
- OutputStream out = new FileOutputStream(file);
- return new BufferedOutputStream(algorithm.getOutputStream(out));
- }
-
public static long sizeOf(List<File> sortedFiles) {
return sortedFiles.stream().mapToLong(File::length).sum();
}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateHolder.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateHolder.java
index c45f33e124..59e18c20fa 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateHolder.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateHolder.java
@@ -18,35 +18,22 @@
*/
package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
-import org.apache.jackrabbit.oak.commons.PathUtils;
-import
org.apache.jackrabbit.oak.index.indexer.document.flatfile.NodeStateEntryWriter;
-import java.util.Iterator;
+public final class NodeStateHolder {
-final class NodeStateHolder {
-
- private final String line;
+ private final byte[] line;
private final String[] pathElements;
- public NodeStateHolder(String line) {
+ public NodeStateHolder(byte[] line, String[] pathElements) {
this.line = line;
- String path = NodeStateEntryWriter.getPath(line);
- int depth = PathUtils.getDepth(path);
- this.pathElements = new String[depth];
- Iterator<String> iter = PathUtils.elements(path).iterator();
- for (int i = 0; i < pathElements.length; i++) {
- pathElements[i] = iter.next();
- }
+ this.pathElements = pathElements;
}
public String[] getPathElements() {
return pathElements;
}
- /**
- * Line here contains the path also
- */
- public String getLine() {
+ public byte[] getLine() {
return line;
}
}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateHolderFactory.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateHolderFactory.java
new file mode 100644
index 0000000000..45e02a1b6e
--- /dev/null
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateHolderFactory.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
+
+import java.util.ArrayList;
+import java.util.function.Function;
+
+import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy.FLATFILESTORE_CHARSET;
+
+final class NodeStateHolderFactory implements Function<byte[],
NodeStateHolder> {
+ private final static byte PIPE = (byte) '|';
+ private final static byte PATH_SEPARATOR = (byte) '/';
+
+ // In UTF-8, the ASCII characters have similar encoding as in ASCII, so we
can search for them without decoding the
+ // stream. And characters encoded as multibyte will not have any of their
bytes matching an ASCII character, because
+ // the bytes in a multibyte encoding all start with 1, while ASCII
characters start with 0.
+ // https://en.wikipedia.org/wiki/UTF-8#Encoding
+ private static int indexOf(byte[] ffsLine, byte ch, int from, int to) {
+ for (int i = from; i < to; i++) {
+ if (ffsLine[i] == ch) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+
+ private static int indexOf(byte[] ffsLine, byte ch) {
+ return indexOf(ffsLine, ch, 0, ffsLine.length);
+ }
+
+ private static boolean isAbsolutePath(String path) {
+ return !path.isEmpty() && path.charAt(0) == '/';
+ }
+
+ private final ArrayList<String> partsBuffer = new ArrayList<>(16);
+
+ public NodeStateHolder apply(byte[] ffsLine) {
+ return ffsLine == null ? null : new NodeStateHolder(ffsLine,
parts(ffsLine));
+ }
+
+ // We are only interested in the path section of the line, we do not need
to convert to String the full line. So
+ // we search for the pipe and then convert only this section a String.
+ private String[] parts(byte[] ffsLine) {
+ partsBuffer.clear();
+ int pipeIdx = indexOf(ffsLine, PIPE);
+ if (pipeIdx < 0) {
+ throw new IllegalStateException("Line does not contain a pipe: " +
new String(ffsLine, FLATFILESTORE_CHARSET));
+ }
+ String path = new String(ffsLine, 0, pipeIdx, FLATFILESTORE_CHARSET);
+
+ int pos = isAbsolutePath(path) ? 1 : 0;
+ while (true) {
+ if (pos >= path.length()) {
+ return partsBuffer.toArray(new String[0]);
+ }
+ int i = path.indexOf(PATH_SEPARATOR, pos);
+ if (i < 0) {
+ // Add last part and exit
+ partsBuffer.add(path.substring(pos));
+ pos = path.length();
+ } else {
+ partsBuffer.add(path.substring(pos, i));
+ pos = i + 1;
+ }
+ }
+ }
+}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PathElementComparator.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PathElementComparator.java
index 7172fe4bbc..803e95f9e5 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PathElementComparator.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PathElementComparator.java
@@ -18,21 +18,18 @@
*/
package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
-import org.apache.jackrabbit.guava.common.collect.ImmutableSet;
-
import java.util.Comparator;
import java.util.Set;
-class PathElementComparator implements Comparator<String[]> {
+public class PathElementComparator implements Comparator<String[]> {
private final Set<String> preferred;
public PathElementComparator(Set<String> preferredPathElements) {
- this.preferred = ImmutableSet.copyOf(preferredPathElements);
+ this.preferred = Set.copyOf(preferredPathElements);
}
@Override
public int compare(String[] p1, String[] p2) {
-
int i1 = 0;
int i2 = 0;
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
index a857a4422c..d14fc0f8c0 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
@@ -18,111 +18,223 @@
*/
package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.guava.common.base.Preconditions;
import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.oak.commons.Compression;
-import org.apache.jackrabbit.oak.commons.sort.ExternalSort;
-import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
+import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.apache.jackrabbit.oak.commons.sort.ExternalSortByteArray;
+import
org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreUtils;
import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
+import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
+import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedWriter;
-import java.io.File;
+import java.io.BufferedOutputStream;
import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
+import java.util.PriorityQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
+import java.util.stream.Collectors;
-import static
org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCountBin;
-import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils.createWriter;
-import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils.getSortedStoreFileName;
-import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils.sizeOf;
-import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy.FLATFILESTORE_CHARSET;
import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy.SENTINEL_SORTED_FILES_QUEUE;
+import static
org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreUtils.getSortedStoreFileName;
/**
* Accumulates the intermediate sorted files and, when all files are
generated, merges them into a single sorted file,
* the flat file store
*/
-class PipelinedMergeSortTask implements
Callable<PipelinedMergeSortTask.Result> {
- // TODO: start merging small files into larger files to avoid having too
many "small" files at the end.
- // Idea: when there are more than k (for instance, 10) intermediate files
whose size is under a certain limit
- // (for instance, 1GB), merge them into a single file. And repeat the
test whenever this task receives a new
- // intermediate file.
+public class PipelinedMergeSortTask implements
Callable<PipelinedMergeSortTask.Result> {
+ /**
+ * Minimum number of intermediate files that must exist before trying to
do an eager merge
+ */
+ public static final String
OAK_INDEXER_PIPELINED_EAGER_MERGE_TRIGGER_THRESHOLD =
"oak.indexer.pipelined.eagerMergeTriggerThreshold";
+ public static final int
DEFAULT_OAK_INDEXER_PIPELINED_EAGER_MERGE_TRIGGER_THRESHOLD = 64;
+ /*
+ * Maximum number of files to eagerly merge. This is to keep the eager
merges efficient, as efficiency decreases
+ * with the number of files in a merge
+ */
+ public static final String
OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_FILES_TO_MERGE =
"oak.indexer.pipelined.eagerMergeMaxFilesToMerge";
+ public static final int
DEFAULT_OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_FILES_TO_MERGE = 32;
+ /*
+ * Minimum number of files to eagerly merge at a time.
+ */
+ public static final String
OAK_INDEXER_PIPELINED_EAGER_MERGE_MIN_FILES_TO_MERGE =
"oak.indexer.pipelined.eagerMergeMinFilesToMerge";
+ public static final int
DEFAULT_OAK_INDEXER_PIPELINED_EAGER_MERGE_MIN_FILES_TO_MERGE = 4;
+ /*
+ * Maximum total size of intermediate files that can be eagerly merged.
+ */
+ public static final String
OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_SIZE_TO_MERGE_MB =
"oak.indexer.pipelined.eagerMergeMaxSizeToMergeMB";
+ public static final int
DEFAULT_OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_SIZE_TO_MERGE_MB = 2048;
+
+ public final static String
OAK_INDEXER_PIPELINED_EXTERNAL_MERGE_READ_BUFFER_SIZE =
"oak.indexer.pipelined.externalMerge.readBufferSize";
+ public final static int
DEFAULT_OAK_INDEXER_PIPELINED_EXTERNAL_MERGE_READ_BUFFER_SIZE = 16 * 1024;
+
public static class Result {
- private final File flatFileStoreFile;
- private final int filesMerged;
+ private final Path flatFileStoreFile;
+ private final int intermediateFilesCount;
+ private final int finalMergeFilesCount;
+ private final int eagerMergeRuns;
- public Result(File flatFileStoreFile, int filesMerged) {
+ public Result(Path flatFileStoreFile, int intermediateFilesCount, int
finalMergeFilesCount, int eagerMergeRuns) {
this.flatFileStoreFile = flatFileStoreFile;
- this.filesMerged = filesMerged;
+ this.intermediateFilesCount = intermediateFilesCount;
+ this.finalMergeFilesCount = finalMergeFilesCount;
+ this.eagerMergeRuns = eagerMergeRuns;
}
- public File getFlatFileStoreFile() {
+ public Path getFlatFileStoreFile() {
return flatFileStoreFile;
}
- public int getFilesMerged() {
- return filesMerged;
+ public int getFinalMergeFilesCount() {
+ return finalMergeFilesCount;
+ }
+
+ public int getEagerMergeRuns() {
+ return eagerMergeRuns;
+ }
+
+ public int getIntermediateFilesCount() {
+ return intermediateFilesCount;
+ }
+
+ @Override
+ public String toString() {
+ return "Result{" +
+ "flatFileStoreFile=" + flatFileStoreFile +
+ ", intermediateFilesCount=" + intermediateFilesCount +
+ ", finalMergeFilesCount=" + finalMergeFilesCount +
+ ", eagerMergeRuns=" + eagerMergeRuns +
+ '}';
}
}
- private static final Logger LOG =
LoggerFactory.getLogger(PipelinedMergeSortTask.class);
+ private static class PathAndSize implements Comparable<PathAndSize> {
+ final Path file;
+ final long size;
+
+ public PathAndSize(Path file, long size) {
+ this.file = file;
+ this.size = size;
+ }
+
+ @Override
+ public String toString() {
+ return "FileAndSize{" +
+ "file=" + file.toString() +
+ ", size=" + IOUtils.humanReadableByteCountBin(size) +
+ '}';
+ }
+ @Override
+ public int compareTo(@NotNull PipelinedMergeSortTask.PathAndSize
other) {
+ return Long.compare(this.size, other.size);
+ }
+ }
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PipelinedMergeSortTask.class);
private static final String THREAD_NAME = "mongo-merge-sort-files";
- private final File storeDir;
+ private final Path storeDir;
private final Comparator<NodeStateHolder> comparator;
private final Compression algorithm;
- private final BlockingQueue<File> sortedFilesQueue;
- private final ArrayList<File> sortedFiles = new ArrayList<>();
+ private final BlockingQueue<Path> sortedFilesQueue;
+ private final PriorityQueue<PathAndSize> sortedFiles = new
PriorityQueue<>();
+ private final AtomicBoolean stopEagerMerging = new AtomicBoolean(false);
+ private final int mergeTriggerThreshold;
+ private final int minFilesToMerge;
+ private final int maxFilesToMerge;
+ private final int maxSizeToMergeMB;
+ private final int externalMergeReadBufferSize;
+ private int eagerMergeRuns;
+ private int mergedFilesCounter = 0;
- public PipelinedMergeSortTask(File storeDir,
+ public PipelinedMergeSortTask(Path storeDir,
PathElementComparator pathComparator,
Compression algorithm,
- BlockingQueue<File> sortedFilesQueue) {
+ BlockingQueue<Path> sortedFilesQueue) {
this.storeDir = storeDir;
this.comparator = (e1, e2) ->
pathComparator.compare(e1.getPathElements(), e2.getPathElements());
this.algorithm = algorithm;
this.sortedFilesQueue = sortedFilesQueue;
+
+ this.mergeTriggerThreshold =
ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_EAGER_MERGE_TRIGGER_THRESHOLD,
DEFAULT_OAK_INDEXER_PIPELINED_EAGER_MERGE_TRIGGER_THRESHOLD);
+ Preconditions.checkArgument(mergeTriggerThreshold >= 16,
+ "Invalid value for property " +
OAK_INDEXER_PIPELINED_EAGER_MERGE_TRIGGER_THRESHOLD + ": " +
mergeTriggerThreshold + ". Must be >= 16");
+
+ this.minFilesToMerge =
ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_EAGER_MERGE_MIN_FILES_TO_MERGE,
DEFAULT_OAK_INDEXER_PIPELINED_EAGER_MERGE_MIN_FILES_TO_MERGE);
+ Preconditions.checkArgument(minFilesToMerge >= 2,
+ "Invalid value for property " +
OAK_INDEXER_PIPELINED_EAGER_MERGE_MIN_FILES_TO_MERGE + ": " + minFilesToMerge +
". Must be >= 2");
+
+ this.maxFilesToMerge =
ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_FILES_TO_MERGE,
DEFAULT_OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_FILES_TO_MERGE);
+ Preconditions.checkArgument(maxFilesToMerge >= minFilesToMerge,
+ "Invalid value for property " +
OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_FILES_TO_MERGE + ": " + maxFilesToMerge +
". Must be >= " + OAK_INDEXER_PIPELINED_EAGER_MERGE_MIN_FILES_TO_MERGE + " (" +
minFilesToMerge + ")");
+
+ this.maxSizeToMergeMB =
ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_SIZE_TO_MERGE_MB,
DEFAULT_OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_SIZE_TO_MERGE_MB);
+ Preconditions.checkArgument(maxSizeToMergeMB >= 1,
+ "Invalid value for property " +
OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_SIZE_TO_MERGE_MB + ": " +
maxSizeToMergeMB + ". Must be >= 1");
+
+ this.externalMergeReadBufferSize =
ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_EXTERNAL_MERGE_READ_BUFFER_SIZE,
DEFAULT_OAK_INDEXER_PIPELINED_EXTERNAL_MERGE_READ_BUFFER_SIZE);
+ Preconditions.checkArgument(externalMergeReadBufferSize >=
FileUtils.ONE_KB,
+ "Invalid value for property " +
OAK_INDEXER_PIPELINED_EXTERNAL_MERGE_READ_BUFFER_SIZE + ": " +
externalMergeReadBufferSize + ". Must be >= 1 KB");
}
@Override
public Result call() throws Exception {
+ this.eagerMergeRuns = 0;
String originalName = Thread.currentThread().getName();
Thread.currentThread().setName(THREAD_NAME);
+ int intermediateFilesCount = 0;
try {
LOG.info("[TASK:{}:START] Starting merge sort task",
THREAD_NAME.toUpperCase(Locale.ROOT));
while (true) {
- LOG.info("Waiting for next intermediate sorted file");
- File sortedIntermediateFile = sortedFilesQueue.take();
+ LOG.debug("Waiting for next intermediate sorted file");
+ Path sortedIntermediateFile = sortedFilesQueue.take();
if (sortedIntermediateFile == SENTINEL_SORTED_FILES_QUEUE) {
long sortedFilesSizeBytes = sizeOf(sortedFiles);
- LOG.info("Going to sort {} files, total size {}",
sortedFiles.size(), humanReadableByteCountBin(sortedFilesSizeBytes));
+ LOG.info("Going to sort {} files, total size {}",
sortedFiles.size(), IOUtils.humanReadableByteCountBin(sortedFilesSizeBytes));
Stopwatch w = Stopwatch.createStarted();
- File flatFileStore = sortStoreFile(sortedFiles);
- LOG.info("Final merge completed in {}. Created file: {}",
FormattingUtils.formatToSeconds(w), flatFileStore.getAbsolutePath());
- long ffsSizeBytes = flatFileStore.length();
+ List<Path> simpleFileList = sortedFiles.stream().map(f ->
f.file).collect(Collectors.toList());
+ Path flatFileStore = sortStoreFile(simpleFileList);
+
+ LOG.info("Final merge completed in {}. Created file: {}",
FormattingUtils.formatToSeconds(w), flatFileStore.toAbsolutePath());
+ long ffsSizeBytes = Files.size(flatFileStore);
String metrics = MetricsFormatter.newBuilder()
.add("duration",
FormattingUtils.formatToSeconds(w))
.add("durationSeconds",
w.elapsed(TimeUnit.SECONDS))
.add("filesMerged", sortedFiles.size())
.add("ffsSizeBytes", ffsSizeBytes)
- .add("ffsSize",
humanReadableByteCountBin(ffsSizeBytes))
+ .add("ffsSize",
IOUtils.humanReadableByteCountBin(ffsSizeBytes))
.build();
LOG.info("[TASK:{}:END] Metrics: {}",
THREAD_NAME.toUpperCase(Locale.ROOT), metrics);
- return new Result(flatFileStore, sortedFiles.size());
+ return new Result(flatFileStore, intermediateFilesCount,
sortedFiles.size(), eagerMergeRuns);
+
+ } else {
+ sortedFiles.add(new PathAndSize(sortedIntermediateFile,
Files.size(sortedIntermediateFile)));
+ intermediateFilesCount++;
+ LOG.info("Received new intermediate sorted file {}. Size:
{}. Total files: {} of size {}",
+ sortedIntermediateFile,
IOUtils.humanReadableByteCountBin(Files.size(sortedIntermediateFile)),
+ sortedFiles.size(),
IOUtils.humanReadableByteCountBin(sizeOf(sortedFiles)));
+ // No point in doing eager merging if we already finished
downloading from Mongo.
+ // In this case, we do only the final merge.
+ if (stopEagerMerging.get()) {
+ LOG.debug("Skipping eager merging because download
from Mongo has finished");
+ } else {
+ tryMergeIntermediateFilesEagerly();
+ }
}
- sortedFiles.add(sortedIntermediateFile);
- LOG.info("Received new intermediate sorted file {}. Size: {}.
Total files: {} of size {}",
- sortedIntermediateFile,
humanReadableByteCountBin(sortedIntermediateFile.length()),
- sortedFiles.size(),
humanReadableByteCountBin(sizeOf(sortedFiles)));
}
} catch (InterruptedException t) {
LOG.warn("Thread interrupted", t);
@@ -135,19 +247,84 @@ class PipelinedMergeSortTask implements
Callable<PipelinedMergeSortTask.Result>
}
}
- private File sortStoreFile(List<File> sortedFilesBatch) throws IOException
{
- File sortedFile = new File(storeDir,
getSortedStoreFileName(algorithm));
- try (BufferedWriter writer = createWriter(sortedFile, algorithm)) {
- Function<String, NodeStateHolder> stringToType = (line) -> line ==
null ? null : new NodeStateHolder(line);
- Function<NodeStateHolder, String> typeToString = holder -> holder
== null ? null : holder.getLine();
- ExternalSort.mergeSortedFiles(sortedFilesBatch,
+ /**
+ * Stops eager merging. After this method is called, eager merging will no
longer be performed, even if the
+ * conditions for merging are met.
+ * This method should be called when the download terminates, to avoid
starting new eager merges which would delay
+ * the final merge.
+ */
+ public void stopEagerMerging() {
+ stopEagerMerging.set(true);
+ }
+
+ private static long sizeOf(PriorityQueue<PathAndSize> sortedFiles) {
+ return sortedFiles.stream().mapToLong(f -> f.size).sum();
+ }
+
+ private void tryMergeIntermediateFilesEagerly() throws IOException {
+ if (sortedFiles.size() < mergeTriggerThreshold) {
+ // Not enough intermediate files to merge.
+ return;
+ }
+
+ ArrayList<PathAndSize> filesAndSizeToSort = new ArrayList<>();
+ long sumOfSizesBytes = 0;
+ while (!sortedFiles.isEmpty() &&
+ filesAndSizeToSort.size() < maxFilesToMerge &&
+ sumOfSizesBytes / FileUtils.ONE_MB < maxSizeToMergeMB) {
+ // Get the next candidate. Do not remove the file from the queue
because it may be too large for merging eagerly
+ PathAndSize pathAndSize = sortedFiles.peek();
+ if (pathAndSize.size / FileUtils.ONE_MB > maxSizeToMergeMB) {
+ LOG.debug("File {} is too large to be merged. Size: {}, max
allowed: {} MB. Stopping searching for intermediate files to merge because all
other files are larger.",
+ pathAndSize.file.toAbsolutePath(),
+ IOUtils.humanReadableByteCountBin(pathAndSize.size),
+ maxSizeToMergeMB);
+ break;
+ }
+ // Remove the file from the sorted files queue and add it to the
list of files to sort
+ sortedFiles.poll();
+ filesAndSizeToSort.add(pathAndSize);
+ sumOfSizesBytes += pathAndSize.size;
+ }
+ if (filesAndSizeToSort.size() < minFilesToMerge) {
+ // Not enough candidate files to merge. Put back the candidate
files in the sorted files queue
+ sortedFiles.addAll(filesAndSizeToSort);
+ LOG.debug("Not enough candidate files to merge. Found {}
candidates of size {}, minimum for merging is {}",
+ filesAndSizeToSort.size(),
IOUtils.humanReadableByteCountBin(sumOfSizesBytes), minFilesToMerge);
+ return;
+ }
+ LOG.info("Merge threshold reached: {} > {}. Going to merge the
following {} files {} of total size {}.",
+ sortedFiles.size() + filesAndSizeToSort.size(),
mergeTriggerThreshold,
+ filesAndSizeToSort.size(),
+ filesAndSizeToSort.stream()
+ .map(fs -> fs.file.getFileName() + ": " +
IOUtils.humanReadableByteCountBin(fs.size))
+ .collect(Collectors.joining(", ", "[", "]")),
+ IOUtils.humanReadableByteCountBin(sumOfSizesBytes));
+ Stopwatch start = Stopwatch.createStarted();
+ Path mergedFiled = sortStoreFile(filesAndSizeToSort.stream().map(f ->
f.file).collect(Collectors.toList()));
+ eagerMergeRuns++;
+ Path destFile = mergedFiled.getParent().resolve("merged-" +
mergedFilesCounter++);
+ Files.move(mergedFiled, destFile);
+ PathAndSize mergedPathAndSize = new PathAndSize(destFile,
Files.size(destFile));
+ sortedFiles.add(mergedPathAndSize);
+ LOG.info("{} files merged in {} seconds. New file {}, size: {}",
+ filesAndSizeToSort.size(), start.elapsed(TimeUnit.SECONDS),
+ mergedPathAndSize.file.getFileName(),
IOUtils.humanReadableByteCountBin(mergedPathAndSize.size));
+ }
+
+ private Path sortStoreFile(List<Path> sortedFilesBatch) throws IOException
{
+ Path sortedFile = storeDir.resolve(getSortedStoreFileName(algorithm));
+ try (BufferedOutputStream writer =
IndexStoreUtils.createOutputStream(sortedFile, algorithm)) {
+ Function<byte[], NodeStateHolder> byteArrayToType = new
NodeStateHolderFactory();
+ Function<NodeStateHolder, byte[]> typeToByteArray = holder ->
holder == null ? null : holder.getLine();
+ ExternalSortByteArray.mergeSortedFilesBinary(sortedFilesBatch,
writer,
comparator,
- FLATFILESTORE_CHARSET,
true, //distinct
algorithm,
- typeToString,
- stringToType
+ typeToByteArray,
+ byteArrayToType,
+ externalMergeReadBufferSize
);
}
return sortedFile;
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
index 22a7552445..ab5fca9bee 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
@@ -26,9 +26,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedOutputStream;
-import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Locale;
@@ -36,8 +37,8 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import static
org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCountBin;
-import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils.createOutputStream;
import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy.SENTINEL_NSE_BUFFER;
+import static
org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreUtils.createOutputStream;
/**
* Receives batches of node state entries, sorts then in memory, and finally
writes them to a file.
@@ -63,18 +64,18 @@ class PipelinedSortBatchTask implements
Callable<PipelinedSortBatchTask.Result>
private final Compression algorithm;
private final BlockingQueue<NodeStateEntryBatch> emptyBuffersQueue;
private final BlockingQueue<NodeStateEntryBatch> nonEmptyBuffersQueue;
- private final BlockingQueue<File> sortedFilesQueue;
- private final File sortWorkDir;
+ private final BlockingQueue<Path> sortedFilesQueue;
+ private final Path sortWorkDir;
private final byte[] copyBuffer = new byte[4096];
private long entriesProcessed = 0;
private long batchesProcessed = 0;
- public PipelinedSortBatchTask(File storeDir,
+ public PipelinedSortBatchTask(Path storeDir,
PathElementComparator pathComparator,
Compression algorithm,
BlockingQueue<NodeStateEntryBatch>
emptyBuffersQueue,
BlockingQueue<NodeStateEntryBatch>
nonEmptyBuffersQueue,
- BlockingQueue<File> sortedFilesQueue) throws
IOException {
+ BlockingQueue<Path> sortedFilesQueue) throws
IOException {
this.pathComparator = (e1, e2) ->
pathComparator.compare(e1.getPathElements(), e2.getPathElements());
this.algorithm = algorithm;
this.emptyBuffersQueue = emptyBuffersQueue;
@@ -127,7 +128,7 @@ class PipelinedSortBatchTask implements
Callable<PipelinedSortBatchTask.Result>
sortBuffer.sort(pathComparator);
LOG.info("Sorted batch in {}. Saving to disk", sortClock);
Stopwatch saveClock = Stopwatch.createStarted();
- File newtmpfile = File.createTempFile("sortInBatch", "flatfile",
sortWorkDir);
+ Path newtmpfile = Files.createTempFile(sortWorkDir, "sortInBatch",
"flatfile");
long textSize = 0;
batchesProcessed++;
try (BufferedOutputStream writer = createOutputStream(newtmpfile,
algorithm)) {
@@ -146,20 +147,20 @@ class PipelinedSortBatchTask implements
Callable<PipelinedSortBatchTask.Result>
writer.write(copyBuffer, 0, bytesRead);
bytesRemaining -= bytesRead;
}
- writer.write('\n');
+ writer.write(PipelinedStrategy.FLATFILESTORE_LINE_SEPARATOR);
textSize += entrySize + 1;
}
}
LOG.info("Stored batch of size {} (uncompressed {}) with {} entries in
{}",
- humanReadableByteCountBin(newtmpfile.length()),
+ humanReadableByteCountBin(Files.size(newtmpfile)),
humanReadableByteCountBin(textSize),
sortBuffer.size(), saveClock);
sortedFilesQueue.put(newtmpfile);
}
- private static File createdSortWorkDir(File storeDir) throws IOException {
- File sortedFileDir = new File(storeDir, "sort-work-dir");
- FileUtils.forceMkdir(sortedFileDir);
+ private static Path createdSortWorkDir(Path storeDir) throws IOException {
+ Path sortedFileDir = storeDir.resolve("sort-work-dir");
+ FileUtils.forceMkdir(sortedFileDir.toFile());
return sortedFileDir;
}
}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
index 5f343601a8..b7f1a8dc25 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
@@ -24,6 +24,7 @@ import org.apache.jackrabbit.guava.common.base.Preconditions;
import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.guava.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.jackrabbit.oak.commons.Compression;
+import org.apache.jackrabbit.oak.commons.IOUtils;
import
org.apache.jackrabbit.oak.index.indexer.document.flatfile.NodeStateEntryWriter;
import
org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreSortStrategyBase;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
@@ -42,6 +43,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
@@ -127,8 +131,9 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
static final NodeDocument[] SENTINEL_MONGO_DOCUMENT = new NodeDocument[0];
static final NodeStateEntryBatch SENTINEL_NSE_BUFFER = new
NodeStateEntryBatch(ByteBuffer.allocate(0), 0);
- static final File SENTINEL_SORTED_FILES_QUEUE = new File("SENTINEL");
+ static final Path SENTINEL_SORTED_FILES_QUEUE = Paths.get("SENTINEL");
static final Charset FLATFILESTORE_CHARSET = StandardCharsets.UTF_8;
+ static final char FLATFILESTORE_LINE_SEPARATOR = '\n';
private static final Logger LOG =
LoggerFactory.getLogger(PipelinedStrategy.class);
// A MongoDB document is at most 16MB, so the buffer that holds node state
entries must be at least that big
@@ -142,13 +147,13 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
private final ArrayBlockingQueue<T[]> mongoDocQueue;
private final ArrayBlockingQueue<NodeStateEntryBatch>
emptyBatchesQueue;
private final ArrayBlockingQueue<NodeStateEntryBatch>
nonEmptyBatchesQueue;
- private final ArrayBlockingQueue<File> sortedFilesQueue;
+ private final ArrayBlockingQueue<Path> sortedFilesQueue;
private final TransformStageStatistics transformStageStatistics;
public MonitorTask(ArrayBlockingQueue<T[]> mongoDocQueue,
ArrayBlockingQueue<NodeStateEntryBatch>
emptyBatchesQueue,
ArrayBlockingQueue<NodeStateEntryBatch>
nonEmptyBatchesQueue,
- ArrayBlockingQueue<File> sortedFilesQueue,
+ ArrayBlockingQueue<Path> sortedFilesQueue,
TransformStageStatistics transformStageStatistics) {
this.mongoDocQueue = mongoDocQueue;
this.emptyBatchesQueue = emptyBatchesQueue;
@@ -170,7 +175,7 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
private static <T> void printStatistics(ArrayBlockingQueue<T[]>
mongoDocQueue,
ArrayBlockingQueue<NodeStateEntryBatch> emptyBuffersQueue,
ArrayBlockingQueue<NodeStateEntryBatch> nonEmptyBuffersQueue,
- ArrayBlockingQueue<File>
sortedFilesQueue,
+ ArrayBlockingQueue<Path>
sortedFilesQueue,
TransformStageStatistics
transformStageStatistics,
boolean printHistogramsAtInfo) {
@@ -299,10 +304,10 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
mongoDocQueueReservedMemoryMB,
mongoDocBatchMaxSizeMB,
mongoDocQueueSize);
- LOG.info("NodeStateEntryBuffers: [ workingMemory: {} MB,
numberOfBuffers: {}, bufferSize: {} MB, maxEntriesPerBuffer: {} ]",
+ LOG.info("NodeStateEntryBuffers: [ workingMemory: {} MB,
numberOfBuffers: {}, bufferSize: {}, maxEntriesPerBuffer: {} ]",
nseBuffersReservedMemoryMB,
nseBuffersCount,
- nseBuffersSizeBytes / FileUtils.ONE_MB,
+ IOUtils.humanReadableByteCountBin(nseBuffersSizeBytes),
nseBufferMaxEntriesPerBuffer);
}
@@ -368,7 +373,7 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
ArrayBlockingQueue<NodeStateEntryBatch> nonEmptyBatchesQueue = new
ArrayBlockingQueue<>(nseBuffersCount + 1);
// Queue between sort-and-save thread and the merge-sorted-files
thread
- ArrayBlockingQueue<File> sortedFilesQueue = new
ArrayBlockingQueue<>(64);
+ ArrayBlockingQueue<Path> sortedFilesQueue = new
ArrayBlockingQueue<>(64);
TransformStageStatistics transformStageStatistics = new
TransformStageStatistics();
@@ -393,7 +398,7 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
);
ecs.submit(downloadTask);
- File flatFileStore = null;
+ Path flatFileStore = null;
for (int i = 0; i < numberOfTransformThreads; i++) {
NodeStateEntryWriter entryWriter = new
NodeStateEntryWriter(blobStore);
@@ -412,11 +417,11 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
}
PipelinedSortBatchTask sortTask = new PipelinedSortBatchTask(
- this.getStoreDir(), pathComparator, this.getAlgorithm(),
emptyBatchesQueue, nonEmptyBatchesQueue, sortedFilesQueue
+ this.getStoreDir().toPath(), pathComparator,
this.getAlgorithm(), emptyBatchesQueue, nonEmptyBatchesQueue, sortedFilesQueue
);
ecs.submit(sortTask);
- PipelinedMergeSortTask mergeSortTask = new
PipelinedMergeSortTask(this.getStoreDir(), pathComparator,
+ PipelinedMergeSortTask mergeSortTask = new
PipelinedMergeSortTask(this.getStoreDir().toPath(), pathComparator,
this.getAlgorithm(), sortedFilesQueue);
ecs.submit(mergeSortTask);
@@ -436,6 +441,7 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
for (int i = 0; i < numberOfTransformThreads; i++)
{
mongoDocQueue.put(SENTINEL_MONGO_DOCUMENT);
}
+ mergeSortTask.stopEagerMerging();
} else if (result instanceof
PipelinedTransformTask.Result) {
PipelinedTransformTask.Result transformResult =
(PipelinedTransformTask.Result) result;
@@ -459,8 +465,8 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
} else if (result instanceof
PipelinedMergeSortTask.Result) {
PipelinedMergeSortTask.Result mergeSortedFilesTask
= (PipelinedMergeSortTask.Result) result;
- File ffs =
mergeSortedFilesTask.getFlatFileStoreFile();
- LOG.info("Merge-sort sort task finished. FFS: {},
Size: {}", ffs, humanReadableByteCountBin(ffs.length()));
+ Path ffs =
mergeSortedFilesTask.getFlatFileStoreFile();
+ LOG.info("Merge-sort sort task finished. FFS: {},
Size: {}", ffs, humanReadableByteCountBin(Files.size(ffs)));
flatFileStore =
mergeSortedFilesTask.getFlatFileStoreFile();
} else {
@@ -497,7 +503,7 @@ public class PipelinedStrategy extends
IndexStoreSortStrategyBase {
// No longer need to monitor the size of the queues,
monitorFuture.cancel(true);
}
- return flatFileStore;
+ return flatFileStore.toFile();
} finally {
threadPool.shutdown();
monitorThreadPool.shutdown();
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/indexstore/IndexStoreUtils.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/indexstore/IndexStoreUtils.java
index 8dea97d36e..b901621887 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/indexstore/IndexStoreUtils.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/indexstore/IndexStoreUtils.java
@@ -33,6 +33,8 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.List;
import static org.apache.jackrabbit.guava.common.base.Preconditions.checkState;
@@ -91,6 +93,11 @@ public class IndexStoreUtils {
return new BufferedOutputStream(algorithm.getOutputStream(out));
}
+ public static BufferedOutputStream createOutputStream(Path file,
Compression algorithm) throws IOException {
+ OutputStream out = Files.newOutputStream(file);
+ return new BufferedOutputStream(algorithm.getOutputStream(out));
+ }
+
public static long sizeOf(List<File> sortedFiles) {
return sortedFiles.stream().mapToLong(File::length).sum();
}
diff --git
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedIT.java
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedIT.java
index 84b86a6089..e1356a6d7d 100644
---
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedIT.java
+++
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedIT.java
@@ -41,6 +41,7 @@ import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Assume;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
@@ -60,6 +61,7 @@ import java.util.function.Predicate;
import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedMongoDownloadTask.OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING;
import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedMongoDownloadTask.OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@@ -89,7 +91,7 @@ public class PipelinedIT {
}
}
- @After
+ @After @Before
public void tear() {
MongoConnection c = connectionFactory.getConnection();
if (c != null) {
@@ -248,8 +250,8 @@ public class PipelinedIT {
System.setProperty(PipelinedStrategy.OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_MAX_SIZE_MB,
"1");
System.setProperty(PipelinedStrategy.OAK_INDEXER_PIPELINED_MONGO_DOC_QUEUE_RESERVED_MEMORY_MB,
"32");
- Predicate<String> pathPredicate = s -> true;
- List<PathFilter> pathFilters = List.of(contentDamPathFilter);
+ Predicate<String> pathPredicate = s -> contentDamPathFilter.filter(s)
!= PathFilter.Result.EXCLUDE;
+ List<PathFilter> pathFilters = null;
Backend rwStore = createNodeStore(false);
@NotNull NodeBuilder rootBuilder =
rwStore.documentNodeStore.getRoot().builder();
@@ -278,7 +280,7 @@ public class PipelinedIT {
File file = pipelinedStrategy.createSortedStoreFile();
assertTrue(file.exists());
- assertEquals(expected, Files.readAllLines(file.toPath()));
+ assertArrayEquals(expected.toArray(new String[0]),
Files.readAllLines(file.toPath()).toArray(new String[0]));
}
diff --git
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTaskParameterizedTest.java
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTaskParameterizedTest.java
new file mode 100644
index 0000000000..18b8fef779
--- /dev/null
+++
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTaskParameterizedTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
+
+import org.apache.jackrabbit.oak.commons.Compression;
+import
org.apache.jackrabbit.oak.index.indexer.document.flatfile.LZ4Compression;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy.FLATFILESTORE_CHARSET;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class PipelinedMergeSortTaskParameterizedTest extends
PipelinedMergeSortTaskTestBase {
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> data() {
+ // numberOfIntermediateFiles, eagerMergeTriggerThreshold
+ return Arrays.asList(new Object[][]{
+ {256, 50},
+ {256, 300}
+ });
+ }
+
+ private final int numberOfIntermediateFiles;
+ private final int eagerMergeTriggerThreshold;
+
+ public PipelinedMergeSortTaskParameterizedTest(int
numberOfIntermediateFiles, int eagerMergeTriggerThreshold) {
+ this.numberOfIntermediateFiles = numberOfIntermediateFiles;
+ this.eagerMergeTriggerThreshold = eagerMergeTriggerThreshold;
+ }
+
+ @Test
+ public void manyFilesToMerge() throws Exception {
+ log.info("Running with intermediateFiles: {},
eagerMergeTriggerThreshold: {}", numberOfIntermediateFiles,
eagerMergeTriggerThreshold);
+
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_TRIGGER_THRESHOLD,
Integer.toString(eagerMergeTriggerThreshold));
+
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_FILES_TO_MERGE,
"32");
+
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_SIZE_TO_MERGE_MB,
"512");
+
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_MIN_FILES_TO_MERGE,
"8");
+
+ // Generate FFS
+ List<String> ffs = generateFFS(LINES_IN_FFS);
+ // Shuffle entries to simulate retrieving from MongoDB by an arbitrary
order
+ Collections.shuffle(ffs);
+
+ // Generate the expected results by sorting using the node state
entries comparator,
+ List<NodeStateHolder> nodesOrdered = sortAsNodeStateEntries(ffs);
+ // Convert back to a list of Strings
+ String[] expectedFFS = nodesOrdered.stream().map(f -> new
String(f.getLine())).toArray(String[]::new);
+
+ // Write intermediate files
+ List<Path> intermediateFiles = createIntermediateFiles(ffs,
this.numberOfIntermediateFiles);
+
+ // Run test
+ PipelinedMergeSortTask.Result result =
runTestLargeFiles(Compression.NONE, intermediateFiles.toArray(new Path[0]));
+ Path resultFile = result.getFlatFileStoreFile();
+
+ assertEquals(this.numberOfIntermediateFiles,
result.getIntermediateFilesCount());
+ if (this.numberOfIntermediateFiles > eagerMergeTriggerThreshold) {
+ assertTrue(result.getEagerMergeRuns() > 0);
+ assertTrue(result.getFinalMergeFilesCount() <
eagerMergeTriggerThreshold);
+ } else {
+ assertEquals(0, result.getEagerMergeRuns());
+ assertEquals(this.numberOfIntermediateFiles,
result.getFinalMergeFilesCount());
+ }
+ // Verify result
+ List<String> actualFFS = Files.readAllLines(resultFile);
+ assertArrayEquals(expectedFFS, actualFFS.toArray(new String[0]));
+ }
+
+ /**
+ * For manual testing.
+ */
+ @Ignore
+ public void manyFilesToMergeManual() throws Exception {
+
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_TRIGGER_THRESHOLD,
"50");
+
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_FILES_TO_MERGE,
"32");
+
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_SIZE_TO_MERGE_MB,
"512");
+
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_MIN_FILES_TO_MERGE,
"8");
+ Path dirWithFilesToMerge =
Paths.get("/path/to/ffs/intermediate/files");
+ Path[] files = Files.list(dirWithFilesToMerge)
+ .filter(Files::isRegularFile)
+ .toArray(Path[]::new);
+
+ PipelinedMergeSortTask.Result result = runTestLargeFiles(new
LZ4Compression(), files);
+ Path resultFile = result.getFlatFileStoreFile();
+ log.info("Result: {}\n{}", resultFile, Files.readString(resultFile,
FLATFILESTORE_CHARSET));
+ }
+}
diff --git
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTaskTest.java
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTaskTest.java
index 5275474457..226ae89a98 100644
---
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTaskTest.java
+++
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTaskTest.java
@@ -19,103 +19,143 @@
package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
import org.apache.jackrabbit.oak.commons.Compression;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
-import java.io.File;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.Arrays;
-import java.util.Set;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy.FLATFILESTORE_CHARSET;
import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy.SENTINEL_SORTED_FILES_QUEUE;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-public class PipelinedMergeSortTaskTest {
- private static final Logger LOG =
LoggerFactory.getLogger(PipelinedMergeSortTaskTest.class);
+public class PipelinedMergeSortTaskTest extends PipelinedMergeSortTaskTestBase
{
private final ClassLoader classLoader = getClass().getClassLoader();
- private final PathElementComparator pathComparator = new
PathElementComparator(Set.of());
private final Compression algorithm = Compression.NONE;
- @Rule
- public TemporaryFolder sortFolder = new TemporaryFolder();
@Test
public void noFileToMerge() throws Exception {
- PipelinedMergeSortTask.Result result = runTest();
- Path resultFile = result.getFlatFileStoreFile().toPath();
+ PipelinedMergeSortTask.Result result = runTest(algorithm);
+ Path resultFile = result.getFlatFileStoreFile();
assertEquals(0, Files.size(resultFile));
}
@Test
public void oneFileToMerge() throws Exception {
- File singleFileToMerge = getTestFile("pipelined/merge-stage-1.json");
- PipelinedMergeSortTask.Result result = runTest(singleFileToMerge);
- Path resultFile = result.getFlatFileStoreFile().toPath();
- assertEquals(Files.readAllLines(singleFileToMerge.toPath(),
FLATFILESTORE_CHARSET), Files.readAllLines(resultFile, FLATFILESTORE_CHARSET));
+ Path singleFileToMerge = getTestFile("pipelined/merge-stage-1.json");
+ PipelinedMergeSortTask.Result result = runTest(algorithm,
singleFileToMerge);
+ Path resultFile = result.getFlatFileStoreFile();
+ assertEquals(Files.readAllLines(singleFileToMerge,
FLATFILESTORE_CHARSET), Files.readAllLines(resultFile, FLATFILESTORE_CHARSET));
}
@Test
public void twoFilesToMerge() throws Exception {
- File merge1 = getTestFile("pipelined/merge-stage-1.json");
- File merge2 = getTestFile("pipelined/merge-stage-2.json");
- File expected = getTestFile("pipelined/merge-expected.json");
-
- PipelinedMergeSortTask.Result result = runTest(merge1, merge2);
- Path resultFile = result.getFlatFileStoreFile().toPath();
- LOG.info("Result: {}\n{}", resultFile, Files.readString(resultFile,
FLATFILESTORE_CHARSET));
- assertEquals(Files.readAllLines(expected.toPath(),
FLATFILESTORE_CHARSET), Files.readAllLines(resultFile, FLATFILESTORE_CHARSET));
+ Path merge1 = getTestFile("pipelined/merge-stage-1.json");
+ Path merge2 = getTestFile("pipelined/merge-stage-2.json");
+ Path expected = getTestFile("pipelined/merge-expected.json");
+
+ PipelinedMergeSortTask.Result result = runTest(algorithm, merge1,
merge2);
+ Path resultFile = result.getFlatFileStoreFile();
+ log.info("Result: {}\n{}", resultFile, Files.readString(resultFile,
FLATFILESTORE_CHARSET));
+ assertEquals(Files.readAllLines(expected, FLATFILESTORE_CHARSET),
Files.readAllLines(resultFile, FLATFILESTORE_CHARSET));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void invalidReadBufferSize() throws Exception {
+
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EXTERNAL_MERGE_READ_BUFFER_SIZE,
"10");
+ Path singleFileToMerge = getTestFile("pipelined/merge-stage-1.json");
+ runTest(algorithm, singleFileToMerge);
}
- private File getTestFile(String name) {
+ private Path getTestFile(String name) {
URL url = classLoader.getResource(name);
if (url == null) throw new IllegalArgumentException("Test file not
found: " + name);
- return new File(url.getPath());
+ return Paths.get(url.getPath());
}
- private PipelinedMergeSortTask.Result runTest(File... files) throws
Exception {
- File sortRoot = sortFolder.getRoot();
+ private PipelinedMergeSortTask.Result runTest(Compression algorithm,
Path... files) throws Exception {
+ Path sortRoot = sortFolder.getRoot().toPath();
// +1 for the Sentinel.
- ArrayBlockingQueue<File> sortedFilesQueue = new
ArrayBlockingQueue<>(files.length + 1);
+ ArrayBlockingQueue<Path> sortedFilesQueue = new
ArrayBlockingQueue<>(files.length + 1);
PipelinedMergeSortTask mergeSortTask = new
PipelinedMergeSortTask(sortRoot, pathComparator, algorithm, sortedFilesQueue);
// Enqueue all the files that are to be merged
- for (File file : files) {
+ for (Path file : files) {
// The intermediate files are deleted after being merged, so we
should copy them to the temporary sort root folder
- Path workDirCopy = Files.copy(file.toPath(),
sortRoot.toPath().resolve(file.getName()));
- sortedFilesQueue.put(workDirCopy.toFile());
+ Path workDirCopy = Files.copy(file,
sortRoot.resolve(file.getFileName()));
+ sortedFilesQueue.put(workDirCopy);
}
// Signal end of files to merge
sortedFilesQueue.put(SENTINEL_SORTED_FILES_QUEUE);
// Run the merge task
PipelinedMergeSortTask.Result result = mergeSortTask.call();
- File[] filesInWorkDir = sortRoot.listFiles();
- if (filesInWorkDir == null)
- throw new IllegalStateException("The sort work directory is not a
directory: " + sortRoot);
- assertEquals("The sort work directory should contain only the flat
file store, the intermediate files should have been deleted after merged.
Instead it contains: " + Arrays.toString(filesInWorkDir),
- 1, filesInWorkDir.length);
- assertTrue(result.getFlatFileStoreFile().exists());
+
+ try (Stream<Path> fileStream = Files.list(sortRoot)) {
+ List<String> filesInWorkDir = fileStream
+ .map(path -> path.getFileName().toString())
+ .collect(Collectors.toList());
+ assertEquals("The sort work directory should contain only the flat
file store, the intermediate files should have been deleted after merged.
Instead it contains: " + filesInWorkDir,
+ 1, filesInWorkDir.size());
+ }
+ assertTrue(Files.exists(result.getFlatFileStoreFile()));
return result;
}
@Test(expected = IllegalStateException.class)
public void badInputFile() throws Exception {
- File singleFileToMerge = createFileWithWrongFormat();
- runTest(singleFileToMerge);
+ Path singleFileToMerge = createFileWithWrongFormat();
+ runTest(algorithm, singleFileToMerge);
}
- private File createFileWithWrongFormat() throws Exception {
- File file = Files.createTempFile("merge-stage-input",
".json").toFile();
- try (BufferedWriter bw = Files.newBufferedWriter(file.toPath(),
FLATFILESTORE_CHARSET)) {
+ private Path createFileWithWrongFormat() throws Exception {
+ Path file = Files.createTempFile(sortFolder.getRoot().toPath(),
"merge-stage-input", ".json");
+ try (BufferedWriter bw = Files.newBufferedWriter(file,
FLATFILESTORE_CHARSET)) {
bw.write("/a/b/c\n");
}
- file.deleteOnExit();
return file;
}
+
+ @Test
+ public void manyFilesToMergeDidNotMerge() throws Exception {
+ int intermediateFilesCount = 256;
+
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_TRIGGER_THRESHOLD,
"20");
+
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_FILES_TO_MERGE,
"1000");
+
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_MAX_SIZE_TO_MERGE_MB,
"1");
+
System.setProperty(PipelinedMergeSortTask.OAK_INDEXER_PIPELINED_EAGER_MERGE_MIN_FILES_TO_MERGE,
"1000");
+
+ // Generate FFS
+ List<String> ffs = generateFFS(LINES_IN_FFS);
+ // Shuffle entries to simulate retrieving from MongoDB by an arbitrary
order
+ Collections.shuffle(ffs);
+
+ // Generate the expected results by sorting using the node state
entries comparator,
+ List<NodeStateHolder> nodesOrdered = sortAsNodeStateEntries(ffs);
+ // Convert back to a list of Strings to use as expected result
+ String[] expectedFFS = nodesOrdered.stream()
+ .map(f -> new String(f.getLine(), FLATFILESTORE_CHARSET))
+ .toArray(String[]::new);
+
+ // Write intermediate files
+ List<Path> intermediateFiles = createIntermediateFiles(ffs,
intermediateFilesCount);
+
+ // Run test
+ PipelinedMergeSortTask.Result result =
runTestLargeFiles(Compression.NONE, intermediateFiles.toArray(new Path[0]));
+ Path resultFile = result.getFlatFileStoreFile();
+
+ assertEquals(intermediateFilesCount,
result.getIntermediateFilesCount());
+ assertEquals(0, result.getEagerMergeRuns());
+ assertEquals(intermediateFilesCount, result.getFinalMergeFilesCount());
+
+ // Verify result
+ List<String> actualFFS = Files.readAllLines(resultFile);
+ assertArrayEquals(expectedFFS, actualFFS.toArray(new String[0]));
+ }
}
diff --git
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTaskTestBase.java
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTaskTestBase.java
new file mode 100644
index 0000000000..b135ccf195
--- /dev/null
+++
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTaskTestBase.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
+
+import org.apache.jackrabbit.oak.commons.Compression;
+import org.junit.Rule;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy.FLATFILESTORE_CHARSET;
+import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy.SENTINEL_SORTED_FILES_QUEUE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class PipelinedMergeSortTaskTestBase {
+ static final int LINES_IN_FFS = 100000;
+ static final PathElementComparator pathComparator = new
PathElementComparator(Set.of());
+
+ protected final Logger log = LoggerFactory.getLogger(this.getClass());
+ @Rule
+ public TemporaryFolder sortFolder = new TemporaryFolder();
+ @Rule
+ public final RestoreSystemProperties restoreSystemProperties = new
RestoreSystemProperties();
+
+
+ protected List<NodeStateHolder> sortAsNodeStateEntries(List<String>
ffsLines) {
+ Comparator<NodeStateHolder> comparatorBinary = (e1, e2) ->
pathComparator.compare(e1.getPathElements(), e2.getPathElements());
+ NodeStateHolderFactory nodeFactory = new NodeStateHolderFactory();
+ List<NodeStateHolder> nodesOrdered = ffsLines.stream()
+ .map(ffsLine ->
nodeFactory.apply(ffsLine.getBytes(FLATFILESTORE_CHARSET)))
+ .sorted(comparatorBinary)
+ .collect(Collectors.toList());
+ return nodesOrdered;
+ }
+
+
+ protected List<String> generateFFS(int numberOfLines) {
+ List<String> ffsLines = new ArrayList<>(numberOfLines);
+ for (int i = 0; i < numberOfLines; i++) {
+ String path =
"/a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z/" + i;
+ String entry = "{\"_id\":\"" + path +
"\",\"property\":[{\"name\":\"jcr:primaryType\",\"values\":[\"nt:unstructured\"]}]}";
+ ffsLines.add(path + "|" + entry);
+ }
+ return ffsLines;
+ }
+
+ protected List<Path> createIntermediateFiles(List<String> ffsLines, int
numberOfFiles) throws Exception {
+ Iterator<String> ffsIter = ffsLines.iterator();
+ Path workFolder = sortFolder.newFolder("merge_many_test").toPath();
+ ArrayList<Path> intermediateFiles = new ArrayList<>(numberOfFiles);
+ int linesPerFile = ffsLines.size() / numberOfFiles;
+
+ for (int fileIdx = 0; fileIdx < numberOfFiles; fileIdx++) {
+ Path intermediateFile = workFolder.resolve("intermediate-" +
fileIdx + ".json");
+ ArrayList<String> linesInIntermediateFile = new ArrayList<>();
+ while (linesInIntermediateFile.size() < linesPerFile &&
ffsIter.hasNext()) {
+ linesInIntermediateFile.add(ffsIter.next());
+ }
+ if (fileIdx == numberOfFiles - 1) {
+ // Add the remaining elements to the last file
+ while (ffsIter.hasNext()) {
+ linesInIntermediateFile.add(ffsIter.next());
+ }
+ }
+ List<NodeStateHolder> nodesSorted =
sortAsNodeStateEntries(linesInIntermediateFile);
+ try (BufferedWriter bw = Files.newBufferedWriter(intermediateFile,
FLATFILESTORE_CHARSET)) {
+ for (NodeStateHolder node : nodesSorted) {
+ bw.write(new String(node.getLine()));
+ bw.write("\n");
+ }
+ }
+ intermediateFiles.add(intermediateFile);
+ }
+ return intermediateFiles;
+ }
+
+ protected PipelinedMergeSortTask.Result runTestLargeFiles(Compression
algorithm, Path... files) throws Exception {
+ Path sortRoot = sortFolder.getRoot().toPath();
+ // +1 for the Sentinel.
+ ArrayBlockingQueue<Path> sortedFilesQueue = new
ArrayBlockingQueue<>(files.length + 1);
+ PipelinedMergeSortTask mergeSortTask = new
PipelinedMergeSortTask(sortRoot, pathComparator, algorithm, sortedFilesQueue);
+ // Enqueue all the files that are to be merged
+ for (Path file : files) {
+ sortedFilesQueue.put(file);
+ }
+ // Signal end of files to merge
+ sortedFilesQueue.put(SENTINEL_SORTED_FILES_QUEUE);
+ // Run the merge task
+ PipelinedMergeSortTask.Result result = mergeSortTask.call();
+ List<Path> filesInWorkDir;
+ try (Stream<Path> stream = Files.list(sortRoot)) {
+ filesInWorkDir =
stream.filter(Files::isRegularFile).collect(Collectors.toList());
+ }
+ assertEquals("The sort work directory should contain only the flat
file store, the intermediate files should have been deleted after merged.
Instead it contains: " + filesInWorkDir,
+ 1, filesInWorkDir.size());
+ assertTrue(Files.exists(result.getFlatFileStoreFile()));
+ return result;
+ }
+}
diff --git
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
index 7665929e28..892b7a3c10 100644
---
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
+++
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
@@ -28,11 +28,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
-import java.io.File;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -42,9 +42,9 @@ import static org.junit.Assert.assertEquals;
public class PipelinedSortBatchTaskTest {
static class TestResult {
final PipelinedSortBatchTask.Result result;
- final BlockingQueue<File> sortedFilesQueue;
+ final BlockingQueue<Path> sortedFilesQueue;
- public TestResult(PipelinedSortBatchTask.Result result,
BlockingQueue<File> sortedFilesQueue) {
+ public TestResult(PipelinedSortBatchTask.Result result,
BlockingQueue<Path> sortedFilesQueue) {
this.result = result;
this.sortedFilesQueue = sortedFilesQueue;
}
@@ -53,10 +53,11 @@ public class PipelinedSortBatchTaskTest {
return result;
}
- public BlockingQueue<File> getSortedFilesQueue() {
+ public BlockingQueue<Path> getSortedFilesQueue() {
return sortedFilesQueue;
}
}
+
private static final Logger LOG =
LoggerFactory.getLogger(PipelinedMergeSortTaskTest.class);
@Rule
@@ -72,7 +73,7 @@ public class PipelinedSortBatchTaskTest {
TestResult testResult = runTest();
PipelinedSortBatchTask.Result result = testResult.getResult();
- BlockingQueue<File> sortedFilesQueue =
testResult.getSortedFilesQueue();
+ BlockingQueue<Path> sortedFilesQueue =
testResult.getSortedFilesQueue();
assertEquals(0, result.getTotalEntries());
assertEquals(0, sortedFilesQueue.size());
}
@@ -84,7 +85,7 @@ public class PipelinedSortBatchTaskTest {
TestResult testResult = runTest(batch);
PipelinedSortBatchTask.Result result = testResult.getResult();
- BlockingQueue<File> sortedFilesQueue =
testResult.getSortedFilesQueue();
+ BlockingQueue<Path> sortedFilesQueue =
testResult.getSortedFilesQueue();
assertEquals(0, result.getTotalEntries());
assertEquals(0, sortedFilesQueue.size());
}
@@ -102,19 +103,19 @@ public class PipelinedSortBatchTaskTest {
TestResult testResult = runTest(batch);
PipelinedSortBatchTask.Result result = testResult.getResult();
- BlockingQueue<File> sortedFilesQueue =
testResult.getSortedFilesQueue();
+ BlockingQueue<Path> sortedFilesQueue =
testResult.getSortedFilesQueue();
assertEquals(6, result.getTotalEntries());
assertEquals(1, sortedFilesQueue.size());
- File sortedFile = sortedFilesQueue.take();
- LOG.info("Sorted file:\n{}", Files.readString(sortedFile.toPath()));
+ Path sortedFile = sortedFilesQueue.take();
+ LOG.info("Sorted file:\n{}", Files.readString(sortedFile));
assertEquals("/a0|{\"key\":1}\n" +
"/a0/b0|{\"key\":2}\n" +
"/a0/b0/c0|{\"key\":3}\n" +
"/a0/b0/c1|{\"key\":4}\n" +
"/a0/b1|{\"key\":5}\n" +
"/a1/b0|{\"key\":6}\n",
- Files.readString(sortedFile.toPath())
+ Files.readString(sortedFile)
);
}
@@ -133,23 +134,23 @@ public class PipelinedSortBatchTaskTest {
TestResult testResult = runTest(batch1, batch2);
PipelinedSortBatchTask.Result result = testResult.getResult();
- BlockingQueue<File> sortedFilesQueue =
testResult.getSortedFilesQueue();
+ BlockingQueue<Path> sortedFilesQueue =
testResult.getSortedFilesQueue();
assertEquals(6, result.getTotalEntries());
assertEquals(2, sortedFilesQueue.size());
- File sortedFile1 = sortedFilesQueue.take();
- File sortedFile2 = sortedFilesQueue.take();
+ Path sortedFile1 = sortedFilesQueue.take();
+ Path sortedFile2 = sortedFilesQueue.take();
- LOG.info("Sorted file:\n{}", Files.readString(sortedFile1.toPath()));
- LOG.info("Sorted file:\n{}", Files.readString(sortedFile2.toPath()));
+ LOG.info("Sorted file:\n{}", Files.readString(sortedFile1));
+ LOG.info("Sorted file:\n{}", Files.readString(sortedFile2));
assertEquals("/a0|{\"key\":1}\n" +
"/a0/b0|{\"key\":2}\n" +
"/a1/b0|{\"key\":6}\n",
- Files.readString(sortedFile1.toPath()));
+ Files.readString(sortedFile1));
assertEquals("/a0/b0/c0|{\"key\":3}\n" +
"/a0/b0/c1|{\"key\":4}\n" +
"/a0/b1|{\"key\":5}\n",
- Files.readString(sortedFile2.toPath())
+ Files.readString(sortedFile2)
);
}
@@ -162,11 +163,11 @@ public class PipelinedSortBatchTaskTest {
}
private TestResult runTest(NodeStateEntryBatch... nodeStateEntryBatches)
throws Exception {
- File sortRoot = sortFolder.getRoot();
+ Path sortRoot = sortFolder.getRoot().toPath();
int numberOfBuffers = nodeStateEntryBatches.length + 1; // +1 for the
sentinel
ArrayBlockingQueue<NodeStateEntryBatch> emptyBatchesQueue = new
ArrayBlockingQueue<>(numberOfBuffers);
ArrayBlockingQueue<NodeStateEntryBatch> nonEmptyBatchesQueue = new
ArrayBlockingQueue<>(numberOfBuffers);
- ArrayBlockingQueue<File> sortedFilesQueue = new
ArrayBlockingQueue<>(numberOfBuffers);
+ ArrayBlockingQueue<Path> sortedFilesQueue = new
ArrayBlockingQueue<>(numberOfBuffers);
for (NodeStateEntryBatch nodeStateEntryBatch : nodeStateEntryBatches) {
nonEmptyBatchesQueue.put(nodeStateEntryBatch);