This is an automated email from the ASF dual-hosted git repository.
thomasm pushed a commit to branch OAK-10341
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/OAK-10341 by this push:
new 5fb931c539 OAK-10341 TreeStore
5fb931c539 is described below
commit 5fb931c5395feac3541ad1fa3f4c05d61a6b66f7
Author: Thomas Mueller <[email protected]>
AuthorDate: Fri Jul 7 09:30:17 2023 +0200
OAK-10341 TreeStore
---
.../indexer/document/tree/store/AzureStore.java | 220 ++++++
.../indexer/document/tree/store/Compression.java | 98 +++
.../document/tree/store/DiskCacheStore.java | 226 ++++++
.../indexer/document/tree/store/FileStore.java | 279 +++++++
.../indexer/document/tree/store/LogStore.java | 122 +++
.../indexer/document/tree/store/MemoryStore.java | 99 +++
.../indexer/document/tree/store/PageFile.java | 376 ++++++++++
.../index/indexer/document/tree/store/Session.java | 829 +++++++++++++++++++++
.../indexer/document/tree/store/SlowStore.java | 140 ++++
.../indexer/document/tree/store/StatsStore.java | 198 +++++
.../index/indexer/document/tree/store/Store.java | 125 ++++
.../indexer/document/tree/store/StoreBuilder.java | 88 +++
.../indexer/document/tree/store/utils/Cache.java | 43 ++
.../document/tree/store/utils/Position.java | 30 +
.../document/tree/store/utils/SortedStream.java | 75 ++
.../indexer/document/tree/store/utils/Uuid.java | 181 +++++
16 files changed, 3129 insertions(+)
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/AzureStore.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/AzureStore.java
new file mode 100644
index 0000000000..31d90f0173
--- /dev/null
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/AzureStore.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.jackrabbit.oak.index.indexer.document.tree.store.utils.Uuid;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudAppendBlob;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import com.microsoft.azure.storage.blob.ListBlobItem;
+
+public class AzureStore implements Store {
+
+ private final Properties config;
+ private final CloudStorageAccount cloud;
+ private final CloudBlobClient cloudBlobClient;
+ private final CloudBlobContainer container;
+ private final CloudBlobDirectory dir;
+ private Compression compression = Compression.NO;
+ private long writeCount;
+ private long readCount;
+
+ public String toString() {
+ return "azure";
+ }
+
+ public AzureStore(Properties config) {
+ this.config = config;
+ try {
+ cloud = CloudStorageAccount.parse(
+ config.getProperty("storageAccount"));
+ cloudBlobClient = cloud.createCloudBlobClient();
+ String maximumExecutionTimeInMs =
config.getProperty("maximumExecutionTimeInMs");
+ if (maximumExecutionTimeInMs != null) {
+ cloudBlobClient.getDefaultRequestOptions().
+
setMaximumExecutionTimeInMs(Integer.parseInt(maximumExecutionTimeInMs));
+ }
+ container = cloudBlobClient.getContainerReference(
+ config.getProperty("container"));
+ container.createIfNotExists();
+ dir = container.getDirectoryReference(
+ config.getProperty("directory"));
+ } catch (Exception e) {
+; // TODO proper logging
+e.printStackTrace();
+ throw new IllegalArgumentException(config.toString(), e);
+ }
+ }
+
+ @Override
+ public void setWriteCompression(Compression compression) {
+ this.compression = compression;
+ }
+
+ @Override
+ public PageFile getIfExists(String key) {
+ try {
+ readCount++;
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ CloudAppendBlob blob = dir.getAppendBlobReference(key);
+ blob.download(out);
+ byte[] data = out.toByteArray();
+ Compression c = Compression.getCompressionFromData(data[0]);
+ data = c.expand(data);
+ return PageFile.fromBytes(data);
+ } catch (URISyntaxException | StorageException e) {
+; // TODO proper logging
+e.printStackTrace();
+ throw new IllegalArgumentException(key, e);
+ }
+ }
+
+ @Override
+ public boolean supportsByteOperations() {
+ return true;
+ }
+
+ @Override
+ public byte[] getBytes(String key) {
+ try {
+ readCount++;
+ CloudBlockBlob blob = dir.getBlockBlobReference(key);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ blob.download(out);
+ return out.toByteArray();
+ } catch (URISyntaxException | StorageException e) {
+; // TODO proper logging
+e.printStackTrace();
+ throw new IllegalArgumentException(key, e);
+ }
+ }
+
+ @Override
+ public void putBytes(String key, byte[] data) {
+ try {
+ writeCount++;
+ CloudBlockBlob blob = dir.getBlockBlobReference(key);
+ blob.upload(new ByteArrayInputStream(data), data.length);
+ } catch (URISyntaxException | StorageException | IOException e) {
+; // TODO proper logging
+e.printStackTrace();
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ @Override
+ public void put(String key, PageFile value) {
+ byte[] data = value.toBytes();
+ data = compression.compress(data);
+ CloudBlockBlob blob;
+ try {
+ writeCount++;
+ blob = dir.getBlockBlobReference(key);
+long start = System.nanoTime();
+ blob.uploadFromByteArray(data, 0, data.length);
+long time = System.nanoTime() - start;
+System.out.println("Azure upload " + key + " size " + data.length + " nanos "
+ time);
+
+ } catch (URISyntaxException | StorageException | IOException e) {
+; // TODO proper logging
+e.printStackTrace();
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ @Override
+ public String newFileName() {
+ return Uuid.timeBasedVersion7().toShortString();
+ }
+
+ @Override
+ public Set<String> keySet() {
+ try {
+ HashSet<String> set = new HashSet<>();
+ for (ListBlobItem item : dir.listBlobs()) {
+ if (item instanceof CloudBlockBlob) {
+ String name = ((CloudBlockBlob) item).getName();
+ int index = name.lastIndexOf('/');
+ if (index >= 0) {
+ name = name.substring(index + 1);
+ }
+ set.add(name);
+ }
+ }
+ return set;
+ } catch (StorageException | URISyntaxException e) {
+; // TODO proper logging
+e.printStackTrace();
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ @Override
+ public void remove(Set<String> set) {
+ try {
+ for(String key : set) {
+ CloudBlockBlob blob = dir.getBlockBlobReference(key);
+ writeCount++;
+ blob.deleteIfExists();
+ }
+ } catch (StorageException | URISyntaxException e) {
+; // TODO proper logging
+e.printStackTrace();
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ @Override
+ public void removeAll() {
+ remove(keySet());
+ }
+
+ @Override
+ public long getWriteCount() {
+ return writeCount;
+ }
+
+ @Override
+ public long getReadCount() {
+ return readCount;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public Properties getConfig() {
+ return config;
+ }
+
+}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/Compression.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/Compression.java
new file mode 100644
index 0000000000..da76027dc2
--- /dev/null
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/Compression.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store;
+
+import java.util.Arrays;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+
+public enum Compression {
+ NO {
+ @Override
+ byte[] compress(byte[] data) {
+ return data;
+ }
+
+ @Override
+ byte[] expand(byte[] data) {
+ return data;
+ }
+ },
+ LZ4 {
+ private final LZ4Compressor compressor =
LZ4Factory.fastestInstance().fastCompressor();
+ private final LZ4FastDecompressor decompressor =
LZ4Factory.fastestInstance().fastDecompressor();
+
+ private byte[] compressBuffer = new byte[1024 * 1024];
+
+ @Override
+ byte[] compress(byte[] data) {
+ // TODO synchronization is needed because we share the buffer
+ synchronized (compressor) {
+ byte[] buffer = compressBuffer;
+ if (buffer.length < 2 * data.length) {
+ // increase the size
+ buffer = new byte[2 * data.length];
+ compressBuffer = buffer;
+ }
+ buffer[0] = '4';
+ writeInt(buffer, 1, data.length);
+ int len = 5 + compressor.compress(data, 0, data.length,
buffer, 5, buffer.length - 5);
+ return Arrays.copyOf(buffer, len);
+ }
+ }
+
+ @Override
+ byte[] expand(byte[] data) {
+ // TODO synchronization is needed because we share the buffer
+ synchronized (decompressor) {
+ int len = readInt(data, 1);
+ byte[] target = new byte[len];
+ decompressor.decompress(data, 5, target, 0, len);
+ return target;
+ }
+ }
+ };
+
+ abstract byte[] compress(byte[] data);
+ abstract byte[] expand(byte[] data);
+
+
+ public static void writeInt(byte[] buff, int pos, int x) {
+ buff[pos++] = (byte) (x >> 24);
+ buff[pos++] = (byte) (x >> 16);
+ buff[pos++] = (byte) (x >> 8);
+ buff[pos] = (byte) x;
+ }
+
+ public static int readInt(byte[] buff, int pos) {
+ return (buff[pos++] << 24) + ((buff[pos++] & 0xff) << 16) +
((buff[pos++] & 0xff) << 8) + (buff[pos] & 0xff);
+ }
+
+ public static Compression getCompressionFromData(byte data) {
+ switch (data) {
+ case '4':
+ return Compression.LZ4;
+ default:
+ return Compression.NO;
+ }
+ }
+
+}
\ No newline at end of file
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/DiskCacheStore.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/DiskCacheStore.java
new file mode 100644
index 0000000000..e25ff91a77
--- /dev/null
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/DiskCacheStore.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jackrabbit.oak.index.indexer.document.tree.store.utils.Cache;
+
+public class DiskCacheStore implements Store {
+
+ private final Properties config;
+
+ // the frontend (local disk)
+ private Store frontend;
+
+ // the backend (azure or s3)
+ private Store backend;
+
+ // set of entries that are not yet uploaded
+ private ConcurrentHashMap<String, String> uploading = new
ConcurrentHashMap<>();
+
+ // executor service that uploads files
+ private ExecutorService executor;
+
+ // cache for entries that were downloaded
+ // (doesn't contain entries that are not yet uploaded)
+ private Cache<String, String> cache;
+
+ private int readAhead;
+
+ public String toString() {
+ return "diskCache(" + frontend + ", " + backend + ")";
+ }
+
+ public DiskCacheStore(Properties config) {
+ this.config = config;
+ frontend = StoreBuilder.build(StoreBuilder.subProperties(config,
"front."));
+
+ // cache size, in number of files on disk
+ int cacheSize = Integer.parseInt(config.getProperty("cacheSize",
"1000000000"));
+
+ // read ahead this number of children
+ this.readAhead = Integer.parseInt(config.getProperty("readAhead",
"1000000"));
+
+ // using this number of threads at most
+ int threads = Integer.parseInt(config.getProperty("threads", "20"));
+
+ cache = new Cache<String, String>(cacheSize) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean removeEldestEntry(Map.Entry<String, String> eldest)
{
+ boolean result = super.removeEldestEntry(eldest);
+ if (result) {
+ frontend.remove(Collections.singleton(eldest.getKey()));
+ }
+ return result;
+ }
+ };
+ executor = new ThreadPoolExecutor(threads, threads, 0L,
TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ r -> {
+ Thread thread = new Thread(r, "DiskCacheStore");
+ thread.setDaemon(true);
+ return thread;
+ });
+ backend = StoreBuilder.build(StoreBuilder.subProperties(config,
"back."));
+ }
+
+ @Override
+ public PageFile getIfExists(String key) {
+ PageFile result = frontend.getIfExists(key);
+ if (result != null) {
+ return result;
+ }
+ if (backend.supportsByteOperations() &&
frontend.supportsByteOperations()) {
+ byte[] data = backend.getBytes(key);
+ frontend.putBytes(key, data);
+ result = frontend.get(key);
+ } else {
+ result = backend.get(key);
+ frontend.put(key, result);
+ }
+ synchronized (cache) {
+ cache.put(key, key);
+ }
+ if (result.isInnerNode()) {
+ for (int i = 0; i < readAhead && i < result.getValueCount(); i++) {
+ String childKey = result.getChildValue(i);
+ // System.out.println(" prefetching " + childKey + "
because we read " + key);
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ // this will put the entry in the cache
+ // and also read the children if needed
+ getIfExists(childKey);
+ // System.out.println(" prefetch done for " +
childKey + " because we read " + key);
+ }
+
+ });
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void put(String key, PageFile value) {
+ if (uploading.containsKey(key)) {
+ System.out.println("WARNING: upload is in progress: " + key +
+ " and a new upload is scheduled. waiting for the upload to
finish, to avoid concurrency issues.");
+ while (uploading.containsKey(key)) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+ uploading.put(key, key);
+ frontend.put(key, value);
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ if (frontend.supportsByteOperations() &&
backend.supportsByteOperations()) {
+ byte[] data = frontend.getBytes(key);
+ backend.putBytes(key, data);
+ } else {
+ backend.put(key, frontend.get(key));
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ uploading.remove(key);
+ }
+
+ });
+ }
+
+ @Override
+ public String newFileName() {
+ return frontend.newFileName();
+ }
+
+ @Override
+ public Set<String> keySet() {
+ return backend.keySet();
+ }
+
+ @Override
+ public void remove(Set<String> set) {
+ synchronized (cache) {
+ set.forEach(k -> cache.remove(k));
+ }
+ frontend.remove(set);
+ backend.remove(set);
+ }
+
+ @Override
+ public void removeAll() {
+ frontend.removeAll();
+ backend.removeAll();
+ }
+
+ @Override
+ public long getWriteCount() {
+ return frontend.getWriteCount();
+ }
+
+ @Override
+ public long getReadCount() {
+ return frontend.getReadCount();
+ }
+
+ @Override
+ public void setWriteCompression(Compression compression) {
+ frontend.setWriteCompression(compression);
+ backend.setWriteCompression(compression);
+ }
+
+ @Override
+ public void close() {
+ try {
+ executor.shutdown();
+ executor.awaitTermination(1, TimeUnit.DAYS);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ frontend.close();
+ backend.close();
+ }
+
+ @Override
+ public Properties getConfig() {
+ return config;
+ }
+
+}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/FileStore.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/FileStore.java
new file mode 100644
index 0000000000..9c72f8d6f7
--- /dev/null
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/FileStore.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.jackrabbit.oak.index.indexer.document.tree.store.utils.Uuid;
+
+public class FileStore implements Store {
+
+ private final Properties config;
+ private final String directory;
+ private Compression compression = Compression.NO;
+ private long writeCount, readCount;
+ private Thread backgroundThread;
+ private ConcurrentHashMap<String, PageFile> pendingWrites = new
ConcurrentHashMap<>();
+ private LinkedBlockingQueue<WriteOperation> queue = new
LinkedBlockingQueue<>(100);
+
+ private static final WriteOperation STOP = new WriteOperation();
+
+ static class WriteOperation {
+ String key;
+ byte[] value;
+ }
+
+ public String toString() {
+ return "file(" + directory + ")";
+ }
+
+ public FileStore(Properties config) {
+ this.config = config;
+ this.directory = config.getProperty("dir");
+ new File(directory).mkdirs();
+ boolean asyncWrite = Boolean.parseBoolean(config.getProperty("async",
"false"));
+ if (asyncWrite) {
+ startAsyncWriter();
+ }
+ }
+
+ private void startAsyncWriter() {
+ backgroundThread = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ for (int i = 0;; i++) {
+ WriteOperation op = queue.take();
+ if (i % 200 == 0) {
+ // System.out.println(" file writer queue size "
+ queue.size());
+ }
+ if (op == STOP) {
+ break;
+ }
+ writeFile(op.key, op.value);
+ pendingWrites.remove(op.key);
+ }
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ });
+ backgroundThread.setDaemon(true);
+ backgroundThread.start();
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (backgroundThread != null) {
+ queue.put(STOP);
+ backgroundThread.join();
+ }
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void setWriteCompression(Compression compression) {
+ this.compression = compression;
+ }
+
+ @Override
+ public PageFile getIfExists(String key) {
+ PageFile pending = pendingWrites.get(key);
+ if (pending != null) {
+ return pending;
+ }
+ readCount++;
+ File f = getFile(key);
+ if (!f.exists()) {
+ return null;
+ }
+ try (RandomAccessFile file = new RandomAccessFile(f, "r")) {
+ long length = file.length();
+ if (length == 0) {
+ // deleted in the meantime
+ return null;
+ }
+ byte[] data = new byte[(int) length];
+ file.readFully(data);
+ Compression c = Compression.getCompressionFromData(data[0]);
+ data = c.expand(data);
+ return PageFile.fromBytes(data);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ @Override
+ public void put(String key, PageFile value) {
+ writeCount++;
+ if (backgroundThread != null) {
+ writeFileAsync(key, value.copy());
+ } else {
+ writeFile(key, value.toBytes());
+ }
+ }
+
+ private void writeFileAsync(String key, PageFile value) {
+ pendingWrites.put(key, value);
+ WriteOperation op = new WriteOperation();
+ op.key = key;
+ op.value = value.toBytes();
+ try {
+ queue.put(op);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public boolean supportsByteOperations() {
+ return true;
+ }
+
+ @Override
+ public byte[] getBytes(String key) {
+ File f = getFile(key);
+ if (!f.exists()) {
+ return null;
+ }
+ try {
+ readCount++;
+ try (RandomAccessFile file = new RandomAccessFile(f, "r")) {
+ long length = file.length();
+ if (length == 0) {
+ // deleted in the meantime
+ return null;
+ }
+ byte[] data = new byte[(int) length];
+ file.readFully(data);
+ return data;
+ }
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ @Override
+ public void putBytes(String key, byte[] data) {
+ try (FileOutputStream out = new FileOutputStream(getFile(key))) {
+ out.write(data);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ private void writeFile(String key, byte[] data) {
+ data = compression.compress(data);
+ putBytes(key, data);
+
+ /*
+ File tempFile = getFile(key, true);
+ File targetFile = getFile(key);
+ //
https://stackoverflow.com/questions/595631/how-to-atomically-rename-a-file-in-java-even-if-the-dest-file-already-exists
+ try (RandomAccessFile file = new RandomAccessFile(tempFile, "rw")) {
+ file.write(data);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ try {
+ Files.move(tempFile.toPath(), targetFile.toPath(),
StandardCopyOption.ATOMIC_MOVE);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ */
+ }
+
+ private File getFile(String key) {
+ return new File(directory, key);
+ }
+
+ @Override
+ public String newFileName() {
+ return Uuid.timeBasedVersion7().toShortString();
+ }
+
+ @Override
+ public Set<String> keySet() {
+ File dir = new File(directory);
+ if (!dir.exists()) {
+ return Collections.emptySet();
+ }
+ String[] list = dir.list(new FilenameFilter() {
+
+ @Override
+ public boolean accept(File dir, String name) {
+ return new File(dir, name).isFile();
+ }
+
+ });
+ return new HashSet<>(Arrays.asList(list));
+ }
+
+ @Override
+ public void remove(Set<String> set) {
+ // TODO keep for some time if the file is relatively new?
+ for (String key : set) {
+ writeCount++;
+ getFile(key).delete();
+ }
+ }
+
+ @Override
+ public void removeAll() {
+ File dir = new File(directory);
+ for(File f: dir.listFiles()) {
+ f.delete();
+ }
+ }
+
+ @Override
+ public long getWriteCount() {
+ return writeCount;
+ }
+
+ @Override
+ public long getReadCount() {
+ return readCount;
+ }
+
+ @Override
+ public Properties getConfig() {
+ return config;
+ }
+
+}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/LogStore.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/LogStore.java
new file mode 100644
index 0000000000..1d9824673c
--- /dev/null
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/LogStore.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.Set;
+
+public class LogStore implements Store {
+
+ private final Properties config;
+ private final Store backend;
+
+ public String toString() {
+ return "log(" + backend + ")";
+ }
+
+ LogStore(Store backend) {
+ this.config = backend.getConfig();
+ this.backend = backend;
+ }
+
+ private void log(String message, Object... args) {
+ System.out.println(backend + "." + message + " " +
Arrays.toString(args));
+ }
+
+ @Override
+ public PageFile getIfExists(String key) {
+ log("getIfExists", key);
+ return backend.getIfExists(key);
+ }
+
+ @Override
+ public void put(String key, PageFile value) {
+ log("put", key);
+ backend.put(key, value);
+ }
+
+ @Override
+ public String newFileName() {
+ return backend.newFileName();
+ }
+
+ @Override
+ public Set<String> keySet() {
+ log("keySet");
+ return backend.keySet();
+ }
+
+ @Override
+ public void remove(Set<String> set) {
+ log("remove", set);
+ backend.remove(set);
+ }
+
+ @Override
+ public void removeAll() {
+ log("removeAll");
+ backend.removeAll();
+ }
+
+ @Override
+ public long getWriteCount() {
+ return backend.getWriteCount();
+ }
+
+ @Override
+ public long getReadCount() {
+ return backend.getReadCount();
+ }
+
+ @Override
+ public void setWriteCompression(Compression compression) {
+ log("setWriteCompression", compression);
+ backend.setWriteCompression(compression);
+ }
+
+ @Override
+ public void close() {
+ log("close");
+ backend.close();
+ }
+
+ @Override
+ public Properties getConfig() {
+ return config;
+ }
+
+ @Override
+ public boolean supportsByteOperations() {
+ return backend.supportsByteOperations();
+ }
+
+ @Override
+ public byte[] getBytes(String key) {
+ log("getBytes", key);
+ return backend.getBytes(key);
+ }
+
+ @Override
+ public void putBytes(String key, byte[] data) {
+ log("putBytes", key, data.length);
+ backend.putBytes(key, data);
+ }
+
+}
\ No newline at end of file
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/MemoryStore.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/MemoryStore.java
new file mode 100644
index 0000000000..7892dfcf2d
--- /dev/null
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/MemoryStore.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store;
+
+import java.util.HashMap;
+import java.util.Properties;
+import java.util.Set;
+
+public class MemoryStore implements Store {
+
+ private final Properties config;
+ private final HashMap<String, PageFile> map = new HashMap<>();
+ private long nextFileName;
+ private long writeCount, readCount;
+
+ public MemoryStore() {
+ this(new Properties());
+ }
+
+ public MemoryStore(Properties config) {
+ this.config = config;
+ }
+
+ @Override
+ public void setWriteCompression(Compression compression) {
+ // ignore
+ }
+
+ public PageFile getIfExists(String key) {
+ readCount++;
+ return map.get(key);
+ }
+
+ public void put(String key, PageFile file) {
+ writeCount++;
+ map.put(key, file);
+ }
+
+ public String toString() {
+ return "files: " + map.size();
+ }
+
+ public String newFileName() {
+ return "f" + nextFileName++;
+ }
+
+ public Set<String> keySet() {
+ return map.keySet();
+ }
+
+ public void remove(Set<String> set) {
+ for (String key : set) {
+ writeCount++;
+ map.remove(key);
+ }
+ }
+
+ @Override
+ public void removeAll() {
+ map.clear();
+ nextFileName = 0;
+ }
+
+ @Override
+ public long getWriteCount() {
+ return writeCount;
+ }
+
+ @Override
+ public long getReadCount() {
+ return readCount;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public Properties getConfig() {
+ return config;
+ }
+
+}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/PageFile.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/PageFile.java
new file mode 100644
index 0000000000..f1e2c4de8b
--- /dev/null
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/PageFile.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A B-tree page (leaf, or inner node).
+ * An inner node contains one more value than keys.
+ * A leaf page has the same number of keys and values.
+ */
+public class PageFile {
+
+ private static final boolean VERIFY_SIZE = false;
+ private static final int INITIAL_SIZE_IN_BYTES = 24;
+
+ private final boolean innerNode;
+
+ private static ByteBuffer REUSED_BUFFER = ByteBuffer.allocate(1024 * 1024);
+
+ private ArrayList<String> keys = new ArrayList<>();
+ private ArrayList<String> values = new ArrayList<>();
+ private long update;
+ private String nextRoot;
+ private int sizeInBytes = INITIAL_SIZE_IN_BYTES;
+
+ // -1: beginning; 0: middle; 1: end
+ private int lastSearchIndex;
+
+ // contains unwritten modifications
+ private boolean modified;
+
+ public PageFile(boolean innerNode) {
+ this.innerNode = innerNode;
+ }
+
+ public void setUpdate(long update) {
+ modified = true;
+ this.update = update;
+ }
+
+ public static PageFile fromBytes(byte[] data) {
+ ByteBuffer buff = ByteBuffer.wrap(data);
+ int type = buff.get();
+ String nextRoot = readString(buff);
+ long update = buff.getLong();
+ String prefix = readString(buff);
+ int len = buff.getInt();
+ PageFile result;
+ if (type == 0) {
+ result = new PageFile(true);
+ for (int i = 0; i < len; i++) {
+ result.appendRecord(prefix + readString(buff),
readString(buff));
+ }
+ result.values.add(readString(buff));
+ } else {
+ result = new PageFile(false);
+ for (int i = 0; i < len; i++) {
+ result.appendRecord(prefix + readString(buff),
readString(buff));
+ }
+ }
+ if (!nextRoot.isEmpty()) {
+ result.setNextRoot(nextRoot);
+ }
+ result.setUpdate(update);
+ result.modified = false;
+ return result;
+ }
+
+ public byte[] toBytes() {
+ // TODO synchronization is needed because we share the buffer
+ synchronized (PageFile.class) {
+ ByteBuffer buff = REUSED_BUFFER;
+ if (buff.capacity() < sizeInBytes * 2) {
+ buff = REUSED_BUFFER = ByteBuffer.allocate(sizeInBytes * 2);
+ }
+ buff.rewind();
+ // first byte may not be '4', as that is used for LZ4 compression
+ buff.put((byte) (innerNode ? 0 : 1));
+ writeString(buff, nextRoot == null ? "" : nextRoot);
+ buff.putLong(update);
+ String prefix = keys.size() < 2 ? "" : commonPrefix(keys.get(0),
keys.get(keys.size() - 1));
+ writeString(buff, prefix);
+ buff.putInt(keys.size());
+ if (innerNode) {
+ for (int i = 0; i < keys.size(); i++) {
+ writeString(buff, keys.get(i).substring(prefix.length()));
+ writeString(buff, values.get(i));
+ }
+ writeString(buff, values.get(values.size() - 1));
+ } else {
+ for (int i = 0; i < keys.size(); i++) {
+ writeString(buff, keys.get(i).substring(prefix.length()));
+ writeString(buff, values.get(i));
+ }
+ }
+ buff.flip();
+ buff.rewind();
+ byte[] array = new byte[buff.remaining()];
+ buff.get(array);
+ // reset the limit
+ REUSED_BUFFER = ByteBuffer.wrap(buff.array());
+ return array;
+ }
+ }
+
+ private void writeString(ByteBuffer buff, String s) {
+ if (s == null) {
+ buff.putShort((short) -2);
+ return;
+ }
+ byte[] data = s.getBytes(StandardCharsets.UTF_8);
+ if (data.length < Short.MAX_VALUE) {
+ // could get a bit larger, but some negative values are reserved
+ buff.putShort((short) data.length);
+ } else {
+ buff.putShort((short) -1);
+ buff.putInt(data.length);
+ }
+ buff.put(data);
+ }
+
+ private static String readString(ByteBuffer buff) {
+ int len = buff.getShort();
+ if (len == -2) {
+ return null;
+ } else if (len == -1) {
+ len = buff.getInt();
+ int pos = buff.position();
+ buff.position(buff.position() + len);
+ return new String(buff.array(), pos, len, StandardCharsets.UTF_8);
+ } else {
+ int pos = buff.position();
+ buff.position(buff.position() + len);
+ return new String(buff.array(), pos, len, StandardCharsets.UTF_8);
+ }
+ }
+
+ private static String commonPrefix(String prefix, String x) {
+ if (prefix == null) {
+ return x;
+ }
+ int i = 0;
+ for (; i < prefix.length() && i < x.length(); i++) {
+ if (prefix.charAt(i) != x.charAt(i)) {
+ break;
+ }
+ }
+ return prefix.substring(0, i);
+ }
+
+ public String toString() {
+ return keys + "" + values;
+ }
+
+ public PageFile copy() {
+ PageFile result = new PageFile(innerNode);
+ result.modified = modified;
+ result.keys = new ArrayList<>(keys);
+ result.values = new ArrayList<>(values);
+ result.sizeInBytes = sizeInBytes;
+ result.nextRoot = nextRoot;
+ return result;
+ }
+
+ public void addChild(int index, String childKey, String newChildFileName) {
+ modified = true;
+ if (index > 0) {
+ keys.add(index - 1, childKey);
+ sizeInBytes += childKey.length();
+ }
+ values.add(index, newChildFileName);
+ sizeInBytes += 4;
+ sizeInBytes += newChildFileName.length();
+ }
+
+ public void setValue(int index, String value) {
+ modified = true;
+ sizeInBytes -= sizeInBytes(values.get(index));
+ sizeInBytes += sizeInBytes(value);
+ values.set(index, value);
+ }
+
+ private long sizeInBytes(String obj) {
+ if (obj == null) {
+ return 5;
+ } else if (obj instanceof String) {
+ return ((String) obj).length() + 2;
+ } else {
+ throw new IllegalStateException();
+ }
+ }
+
+ public void removeRecord(int index) {
+ modified = true;
+ String key = keys.remove(index);
+ String value = values.remove(index);
+ sizeInBytes -= 4;
+ sizeInBytes -= key.length();
+ sizeInBytes -= sizeInBytes(value);
+ }
+
+ public void appendRecord(String k, String v) {
+ modified = true;
+ keys.add(k);
+ values.add(v);
+ sizeInBytes += 4;
+ sizeInBytes += k.length();
+ sizeInBytes += sizeInBytes(v);
+ }
+
+ public void insertRecord(int index, String key, String value) {
+ modified = true;
+ keys.add(index, key);
+ values.add(index, value);
+ sizeInBytes += 4;
+ sizeInBytes += key.length();
+ sizeInBytes += sizeInBytes(value);
+ }
+
+ public long getUpdate() {
+ return update;
+ }
+
+ public int sizeInBytes() {
+ if (VERIFY_SIZE) {
+ int size = 24;
+ for (String p : keys) {
+ size += p.length();
+ size += 4;
+ }
+ for (String o : values) {
+ size += sizeInBytes(o);
+ }
+ if (size != sizeInBytes) {
+ throw new AssertionError();
+ }
+ }
+ return sizeInBytes;
+ }
+
+ public boolean canSplit() {
+ if (innerNode) {
+ return keys.size() > 2;
+ } else {
+ return keys.size() > 1;
+ }
+ }
+
+ public List<String> getKeys() {
+ return keys;
+ }
+
+ public int getKeyIndex(String key) {
+ int index;
+ if (keys.isEmpty()) {
+ return -1;
+ }
+ if (lastSearchIndex == 1) {
+ if (key.compareTo(keys.get(keys.size() - 1)) > 0) {
+ index = -(keys.size() + 1);
+ // index = Collections.binarySearch(recordKeys, key);
+ } else {
+ index = Collections.binarySearch(keys, key);
+ }
+ } else if (lastSearchIndex == -1) {
+ if (key.compareTo(keys.get(0)) < 0) {
+ index = -1;
+ // index = Collections.binarySearch(recordKeys, key);
+ } else {
+ index = Collections.binarySearch(keys, key);
+ }
+ } else {
+ index = Collections.binarySearch(keys, key);
+ }
+ if (index == -(keys.size() + 1)) {
+ lastSearchIndex = 1;
+ } else if (index == -1) {
+ lastSearchIndex = -1;
+ } else {
+ lastSearchIndex = 0;
+ }
+ return index;
+ }
+
+ public String getValue(int index) {
+ return values.get(index);
+ }
+
+ public String getChildValue(int index) {
+ return (String) values.get(index);
+ }
+
+ public String getNextKey(String largerThan) {
+ int index;
+ if (largerThan == null) {
+ index = 0;
+ } else {
+ index = getKeyIndex(largerThan);
+ if (index < 0) {
+ index = -index - 1;
+ } else {
+ index++;
+ }
+ }
+ if (index < 0 || index >= keys.size()) {
+ return null;
+ }
+ return keys.get(index);
+ }
+
+ public String getNextRoot() {
+ return nextRoot;
+ }
+
+ public void setNextRoot(String nextRoot) {
+ modified = true;
+ this.nextRoot = nextRoot;
+ }
+
+ public void removeKey(int index) {
+ modified = true;
+ String key = keys.get(index);
+ sizeInBytes -= key.length();
+ sizeInBytes -= 4;
+ keys.remove(index);
+ }
+
+ public void removeValue(int index) {
+ modified = true;
+ String x = (String) values.get(index);
+ sizeInBytes -= x.length();
+ values.remove(index);
+ }
+
+ public boolean isInnerNode() {
+ return innerNode;
+ }
+
+ public int getValueCount() {
+ return values.size();
+ }
+
+ public String getKey(int index) {
+ return keys.get(index);
+ }
+
+ public void setModified(boolean modified) {
+ this.modified = modified;
+ }
+
+ public boolean getModified() {
+ return modified;
+ }
+
+}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/Session.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/Session.java
new file mode 100644
index 0000000000..bf68a6d773
--- /dev/null
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/Session.java
@@ -0,0 +1,829 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
+import java.util.Properties;
+
+import org.apache.jackrabbit.oak.index.indexer.document.tree.store.utils.Cache;
+import
org.apache.jackrabbit.oak.index.indexer.document.tree.store.utils.Position;
+import
org.apache.jackrabbit.oak.index.indexer.document.tree.store.utils.SortedStream;
+
+/**
+ * Read and write keys and values.
+ */
+public class Session {
+
+ private static final int DEFAULT_CACHE_SIZE = 128;
+ private static final int DEFAULT_MAX_FILE_SIZE = 16 * 1024;
+ private static final int DEFAULT_CACHE_SIZE_MB = 16;
+ private static final int DEFAULT_MAX_ROOTS = 10;
+
+ static final String ROOT_NAME = "root";
+ static final String INNER_NODE_PREFIX = "node_";
+ static final String LEAF_PREFIX = "data_";
+ static final String DELETED = new String("DELETED");
+
+ static final boolean MULTI_ROOT = true;
+
+ private final Store store;
+ private final Cache<String, PageFile> cache = new
Cache<>(DEFAULT_CACHE_SIZE) {
+ private static final long serialVersionUID = 1L;
+
+ public boolean removeEldestEntry(Map.Entry<String, PageFile> eldest) {
+ boolean result = super.removeEldestEntry(eldest);
+ if (result) {
+ String key = eldest.getKey();
+ PageFile value = eldest.getValue();
+ if(value.getModified()) {
+ store.put(key, value);
+ // not strictly needed as it's no longer referenced
+ value.setModified(false);
+ }
+ }
+ return result;
+ }
+ };
+ private long updateId;
+ private int maxFileSize;
+ private int cacheSizeMB;
+ private int maxRoots = DEFAULT_MAX_ROOTS;
+ private long fileReadCount;
+
+ public Session() {
+ this(new MemoryStore(new Properties()));
+ }
+
+ public Session(Store store) {
+ this.store = store;
+ maxFileSize =
Integer.parseInt(store.getConfig().getProperty("maxFileSize", "" +
DEFAULT_MAX_FILE_SIZE));
+ cacheSizeMB =
Integer.parseInt(store.getConfig().getProperty("cacheSizeMB", "" +
DEFAULT_CACHE_SIZE_MB));
+ changeCacheSize();
+ }
+
+ /**
+ * Set the maximum number of roots.
+ *
+ * @param maxRoots the new value
+ */
+ public void setMaxRoots(int maxRoots) {
+ this.maxRoots = maxRoots;
+ }
+
+ public int getMaxRoots() {
+ return maxRoots;
+ }
+
+ /**
+ * Set the cache size in MB.
+ *
+ * @param mb the value
+ */
+ public void setCacheSizeMB(int mb) {
+ this.cacheSizeMB = mb;
+ }
+
+ /**
+ * Set the maximum file size. Files might be slightly larger than that,
but not a lot.
+ *
+ * @param sizeBytes the file size in bytes
+ */
+ public void setMaxFileSize(int sizeBytes) {
+ this.maxFileSize = sizeBytes;
+ changeCacheSize();
+ }
+
+ private void changeCacheSize() {
+ int cacheEntryCount = (int) (cacheSizeMB * 1024L * 1024 / maxFileSize);
+ cache.setSize(cacheEntryCount);
+ }
+
+ /**
+ * Get the number of files read from the cache or storage.
+ *
+ * @return the result
+ */
+ public long getFileReadCount() {
+ return fileReadCount;
+ }
+
+ private void mergeRootsIfNeeded() {
+ int count = 0;
+ String nextRoot = ROOT_NAME;
+ do {
+ PageFile root = getFile(nextRoot);
+ nextRoot = root.getNextRoot();
+ count++;
+ } while (nextRoot != null);
+ if (count > maxRoots) {
+ mergeRoots();
+ }
+ }
+
+ /**
+ * Initialize the storage, creating a root.
+ */
+ public void init() {
+ PageFile root = newPageFile(false);
+ putFile(ROOT_NAME, root);
+ }
+
+ private PageFile copyPageFile(PageFile old) {
+ PageFile result = old.copy();
+ result.setUpdate(updateId);
+ return result;
+ }
+
+ private PageFile newPageFile(boolean isInternalNode) {
+ PageFile result = new PageFile(isInternalNode);
+ result.setUpdate(updateId);
+ return result;
+ }
+
+ /**
+ * Get an entry.
+ *
+ * @param key the key
+ * @return the value, or null
+ */
+ public String get(String key) {
+ if (key == null) {
+ throw new NullPointerException();
+ }
+ String fileName = ROOT_NAME;
+ do {
+ PageFile file = getFile(fileName);
+ String nextRoot = file.getNextRoot();
+ String result = get(file, key);
+ if (result != null) {
+ return result == DELETED ? null : result;
+ }
+ fileName = nextRoot;
+ } while (fileName != null);
+ return null;
+ }
+
+ /**
+ * Get the entry if it exists.
+ *
+ * @param root the root file
+ * @param k the key
+ * @return null if not found, DELETED if removed, or the value
+ */
+ private String get(PageFile root, String k) {
+ while (true) {
+ if (!root.isInnerNode()) {
+ int index = root.getKeyIndex(k);
+ if (index >= 0) {
+ String result = root.getValue(index);
+ return result == null ? DELETED : result;
+ }
+ return null;
+ }
+ int index = root.getKeyIndex(k);
+ if (index < 0) {
+ index = -index - 2;
+ }
+ index++;
+ String fileName = root.getChildValue(index);
+ root = getFile(fileName);
+ // continue with the new file
+ }
+ }
+
+ /**
+ * Put a value.
+ * To remove an entry, the value needs to be null.
+ *
+ * @param key the key
+ * @param value the value
+ */
+ public void put(String key, String value) {
+ if (key == null) {
+ throw new NullPointerException();
+ }
+ if (value == null) {
+ PageFile root = getFile(ROOT_NAME);
+ if (root.getNextRoot() != null) {
+ value = DELETED;
+ }
+ }
+ put(ROOT_NAME, key, value);
+ }
+
+ /**
+ * Put a value.
+ *
+ * @param rootFileName
+ * @param key
+ * @param value (DELETED if we need to put that, or null to remove the
entry)
+ * @return the file name of the root (different than the passed file name,
if the file was copied)
+ */
+ private void put(String rootFileName, String key, String value) {
+ String fileName = rootFileName;
+ PageFile file = getFile(fileName);
+ if (file.getUpdate() < updateId) {
+ fileName = store.newFileName();
+ file = copyPageFile(file);
+ putFile(fileName, file);
+ }
+ ArrayList<String> parents = new ArrayList<>();
+ String k = key;
+ while (true) {
+ int index = file.getKeyIndex(k);
+ if (!file.isInnerNode()) {
+ if (index >= 0) {
+ if (value == null) {
+ file.removeRecord(index);
+ } else {
+ file.setValue(index, value == DELETED ? null : value);
+ }
+ } else {
+ // not found
+ if (value == null) {
+ // nothing to do
+ return;
+ }
+ file.insertRecord(-index - 1, k, value == DELETED ? null :
value);
+ }
+ break;
+ }
+ parents.add(fileName);
+ if (index < 0) {
+ index = -index - 2;
+ }
+ index++;
+ fileName = file.getChildValue(index);
+ file = getFile(fileName);
+ // continue with the new file
+ }
+ putFile(fileName, file);
+ splitOrMerge(fileName, file, parents);
+ }
+
+ private void splitOrMerge(String fileName, PageFile file,
ArrayList<String> parents) {
+ int size = file.sizeInBytes();
+ if (size > maxFileSize && file.canSplit()) {
+ split(fileName, file, parents);
+ } else if (file.getKeys().size() == 0) {
+ merge(fileName, file, parents);
+ }
+ }
+
+ private void merge(String fileName, PageFile file, ArrayList<String>
parents) {
+ if (file.getValueCount() > 0) {
+ return;
+ }
+ if (parents.isEmpty()) {
+ // root: ignore
+ return;
+ }
+ String parentFileName = parents.remove(parents.size() - 1);
+ PageFile parentFile = getFile(parentFileName);
+ for (int i = 0; i < parentFile.getValueCount(); i++) {
+ String pf = parentFile.getChildValue(i);
+ if (pf.equals(fileName)) {
+ if (parentFile.getValueCount() == 1) {
+ parentFile = newPageFile(false);
+ if (!parentFileName.startsWith(ROOT_NAME)) {
+ String newParentFileName = LEAF_PREFIX +
parentFileName.substring(INNER_NODE_PREFIX.length());
+ putFile(newParentFileName, parentFile);
+ updateChildFileName(parents.get(parents.size() - 1),
parentFileName, newParentFileName);
+ parentFileName = newParentFileName;
+ }
+ } else if (i == parentFile.getValueCount() - 1) {
+ // remove the last entry
+ parentFile.removeKey(i - 1);
+ parentFile.removeValue(i);
+ } else {
+ parentFile.removeKey(i);
+ parentFile.removeValue(i);
+ }
+ putFile(parentFileName, parentFile);
+ merge(parentFileName, parentFile, parents);
+ break;
+ }
+ }
+ }
+
+ private void updateChildFileName(String fileName, String oldChild, String
newChild) {
+ PageFile file = getFile(fileName);
+ for (int i = 0; i < file.getValueCount(); i++) {
+ if (file.getChildValue(i).equals(oldChild)) {
+ file.setValue(i, newChild);
+ putFile(fileName, file);
+ return;
+ }
+ }
+ }
+
+ private void split(String fileName, PageFile file, ArrayList<String>
parents) {
+ List<String> keys = new ArrayList<>(file.getKeys());
+ String parentFileName, newFileName1, newFileName2;
+ PageFile parentFile, newFile1, newFile2;
+ boolean isInternalNode = file.isInnerNode();
+ if (parents.isEmpty()) {
+ // new root
+ parentFileName = fileName;
+ parentFile = newPageFile(true);
+ parentFile.setNextRoot(file.getNextRoot());
+ newFileName1 = (isInternalNode ? INNER_NODE_PREFIX : LEAF_PREFIX) +
+ store.newFileName();
+ parentFile.addChild(0, null, newFileName1);
+ } else {
+ parentFileName = parents.remove(parents.size() - 1);
+ parentFile = getFile(parentFileName);
+ newFileName1 = fileName;
+ }
+ newFile1 = newPageFile(isInternalNode);
+ newFileName2 = (isInternalNode ? INNER_NODE_PREFIX : LEAF_PREFIX) +
+ store.newFileName();
+ newFile2 = newPageFile(isInternalNode);
+ int sentinelIndex = keys.size() / 2;
+ String sentinel = keys.get(sentinelIndex);
+ // shorten the sentinel if possible
+ String beforeSentinal = keys.get(sentinelIndex - 1);
+ while (sentinel.length() > 0 && !isInternalNode) {
+ // for internal nodes, there might be other keys on the left side
+ // that might be shoter than the entry before the sentinel
+ String oneShorter = sentinel.substring(0, sentinel.length() - 1);
+ if (beforeSentinal.compareTo(oneShorter) >= 0) {
+ break;
+ }
+ sentinel = oneShorter;
+ }
+ if (!isInternalNode) {
+ // leaf
+ for (int i = 0; i < keys.size() / 2; i++) {
+ String k = keys.get(i);
+ String v = file.getValue(i);
+ newFile1.appendRecord(k, v);
+ }
+ for (int i = keys.size() / 2; i < keys.size(); i++) {
+ String k = keys.get(i);
+ String v = file.getValue(i);
+ newFile2.appendRecord(k, v);
+ }
+ } else {
+ // inner node
+ newFile1.addChild(0, null, file.getChildValue(0));
+ for (int i = 1; i <= keys.size() / 2; i++) {
+ String p = keys.get(i - 1);
+ newFile1.appendRecord(p, file.getChildValue(i));
+ }
+ newFile2.addChild(0, null, file.getChildValue(keys.size() / 2 +
1));
+ for (int i = keys.size() / 2 + 2; i <= keys.size(); i++) {
+ String p = keys.get(i - 1);
+ newFile2.appendRecord(p, file.getChildValue(i));
+ }
+ }
+ // insert sentinel into parent
+ int index = parentFile.getKeyIndex(sentinel);
+ parentFile.addChild(-index, sentinel, newFileName2);
+ putFile(newFileName1, newFile1);
+ putFile(newFileName2, newFile2);
+ putFile(parentFileName, parentFile);
+ splitOrMerge(parentFileName, parentFile, parents);
+ }
+
+ private void putFile(String fileName, PageFile file) {
+ cache.put(fileName, file);
+ }
+
+ private PageFile getFile(String key) {
+ fileReadCount++;
+ PageFile result = cache.get(key);
+ if (result == null) {
+ result = store.get(key);
+ cache.put(key, result);
+ }
+ return result;
+ }
+
+ /**
+ * Merge all roots.
+ */
+ public void mergeRoots() {
+ PageFile root = getFile(ROOT_NAME);
+ String rootFileCopy = ROOT_NAME + "_" + updateId;
+ root = copyPageFile(root);
+ putFile(rootFileCopy, root);
+ Iterator<Entry<String, String>> it = iterator();
+ PageFile newRoot = newPageFile(false);
+ newRoot.setNextRoot(rootFileCopy);
+ putFile(ROOT_NAME, newRoot);
+ while(it.hasNext()) {
+ Entry<String, String> e = it.next();
+ put(e.getKey(), e.getValue());
+ }
+ newRoot = getFile(ROOT_NAME);
+ newRoot.setNextRoot(null);
+ flush();
+ }
+
+ /**
+ * Make the current tree read-only and switch to a new root.
+ * If there are already too many roots, then they will be merged.
+ * All changes are flushed to storage.
+ */
+ public void checkpoint() {
+ mergeRootsIfNeeded();
+ PageFile root = getFile(ROOT_NAME);
+ String rootFileCopy = ROOT_NAME + "_" + updateId;
+ putFile(rootFileCopy, root);
+ updateId++;
+ if (MULTI_ROOT) {
+ root = newPageFile(false);
+ root.setNextRoot(rootFileCopy);
+ putFile(ROOT_NAME, root);
+ // need to flush here
+ // so that GC does not collect rootFileCopy
+ flush();
+ root = copyPageFile(root);
+ putFile(ROOT_NAME, root);
+
+ } else {
+ flush();
+ root = copyPageFile(root);
+ putFile(ROOT_NAME, root);
+ }
+ }
+
+ /**
+ * Flush all changes to storage.
+ */
+ public void flush() {
+ Entry<String, PageFile> changedRoot = null;
+ for(Entry<String, PageFile> e : cache.entrySet()) {
+ String k = e.getKey();
+ PageFile v = e.getValue();
+ if (!v.getModified()) {
+ continue;
+ }
+ if (k.equals(ROOT_NAME)) {
+ // don't store the changed root yet
+ changedRoot = e;
+ } else {
+ store.put(k, v);
+ // here we have to reset the flag
+ v.setModified(false);
+ }
+ }
+ // now store the changed root
+ if (changedRoot != null) {
+ String k = changedRoot.getKey();
+ PageFile v = changedRoot.getValue();
+ cache.put(k, v);
+ store.put(k, v);
+ }
+ }
+
+ // ===============================================================
+ // iteration over entries
+ // this is fast: internally, a stack of Position object is kept
+
+ /**
+ * Get all entries. Do not add or move entries while
+ * iterating.
+ *
+ * @return the result
+ */
+ public Iterator<Entry<String, String>> iterator() {
+ return iterator(null);
+ }
+
+ /**
+ * Get all entries. Do not add or move entries while iterating.
+ *
+ * @param largerThan all returned keys are larger than this; null to start
at
+ * the beginning
+ * @return the result
+ */
+
+ public Iterator<Entry<String, String>> iterator(String largerThan) {
+ ArrayList<SortedStream> streams = new ArrayList<>();
+ String next = ROOT_NAME;
+ while (true) {
+ streams.add(new SortedStream(next, immutableRootIterator(next,
largerThan)));
+ next = getFile(next).getNextRoot();
+ if (next == null) {
+ break;
+ }
+ }
+ PriorityQueue<SortedStream> pq = new PriorityQueue<>(streams);
+ return new Iterator<Entry<String, String>>() {
+
+ Entry<String, String> current;
+ String lastKey;
+
+ {
+ fetchNext();
+ }
+
+ private void fetchNext() {
+ while (pq.size() > 0) {
+ SortedStream s = pq.poll();
+ if (s.currentKey == null) {
+ // if this is null, it must be the last stream
+ break;
+ }
+ String key = s.currentKey;
+ if (key.equals(lastKey)) {
+ continue;
+ }
+ String value = s.currentValue;
+ s.next();
+ pq.add(s);
+ if (value == DELETED) {
+ continue;
+ }
+ lastKey = key;
+ current = new Entry<>() {
+
+ @Override
+ public String getKey() {
+ return key;
+ }
+
+ @Override
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public String setValue(String value) {
+ throw new UnsupportedOperationException();
+ }
+
+ };
+ return;
+ }
+ current = null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return current != null;
+ }
+
+ @Override
+ public Entry<String, String> next() {
+ Entry<String, String> result = current;
+ fetchNext();
+ return result;
+ }
+
+ };
+ }
+
+ private Iterator<Position> immutableRootIterator(String rootFileName,
String largerThan) {
+
+ return new Iterator<Position>() {
+ private final ArrayList<Position> stack = new ArrayList<>();
+ private Position current;
+
+ {
+ current = new Position();
+ current.file = getFile(rootFileName);
+ current.valuePos = index(current.file, largerThan);
+ down(largerThan);
+ if (current.valuePos >= current.file.getValueCount()) {
+ next();
+ }
+ }
+
+ private int index(PageFile file, String largerThan) {
+ if (largerThan == null) {
+ return 0;
+ }
+ int index = file.getKeyIndex(largerThan);
+ if (file.isInnerNode()) {
+ if (index < 0) {
+ index = -index - 2;
+ }
+ index++;
+ } else {
+ if (index < 0) {
+ index = -index - 1;
+ } else {
+ index++;
+ }
+ }
+ return index;
+ }
+
+ @Override
+ public String toString() {
+ return stack + " " + current;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return current != null;
+ }
+
+ @Override
+ public Position next() {
+ if (current == null) {
+ throw new NoSuchElementException();
+ }
+ Position result = current;
+ current = new Position();
+ current.file = result.file;
+ current.valuePos = result.valuePos + 1;
+ while (true) {
+ if (!current.file.isInnerNode() && current.valuePos <
result.file.getValueCount()) {
+ break;
+ }
+ if (stack.size() == 0) {
+ current = null;
+ break;
+ }
+ current = stack.remove(stack.size() - 1);
+ current.valuePos++;
+ if (current.valuePos < current.file.getValueCount()) {
+ down(null);
+ break;
+ }
+ }
+ return result;
+ }
+
+ private void down(String largerThan) {
+ while (current.file.isInnerNode()) {
+ stack.add(current);
+ Position pos = new Position();
+ PageFile file =
getFile(current.file.getChildValue(current.valuePos));
+ pos.file = file;
+ pos.valuePos = index(pos.file, largerThan);
+ current = pos;
+ }
+ }
+ };
+ }
+
+ // ===============================================================
+ // iteration over keys, over all roots
+ // this is a bit slow: internally, *for each key*
+ // it will traverse from all roots down to the leaf
+
+ /**
+ * Return all keys in sorted order. Roots don't need to be merged.
+ *
+ * @return all keys
+ */
+ public Iterable<String> keys() {
+ return keys(null);
+ }
+
+ /**
+ * Return all keys in sorted order.
+ *
+ * @param largerThan all returned keys are larger than this; null to start
at
+ * the beginning
+ * @return the keys
+ */
+ public Iterable<String> keys(String largerThan) {
+ final String next = getNextKey(largerThan);
+ return new Iterable<String>() {
+
+ @Override
+ public Iterator<String> iterator() {
+ return new Iterator<String>() {
+
+ private String current = next;
+
+ @Override
+ public boolean hasNext() {
+ return current != null;
+ }
+
+ @Override
+ public String next() {
+ if (current == null) {
+ throw new NoSuchElementException();
+ }
+ String result = current;
+ current = getNextKey(current);
+ return result;
+ }
+
+ };
+ }
+
+ };
+ }
+
+ private String getNextKey(String largerThan) {
+ if (MULTI_ROOT) {
+ String fileName = ROOT_NAME;
+ String result = null;
+ do {
+ String next = getNextKey(largerThan, fileName);
+ if (result == null) {
+ result = next;
+ } else if (next != null && next.compareTo(result) < 0) {
+ result = next;
+ }
+ PageFile file = getFile(fileName);
+ fileName = file.getNextRoot();
+ } while (fileName != null);
+ return result;
+ } else {
+ return getNextKey(largerThan, ROOT_NAME);
+ }
+ }
+
+ private String getNextKey(String largerThan, String fileName) {
+ PageFile file = getFile(fileName);
+ if (!file.isInnerNode()) {
+ String nextKey = file.getNextKey(largerThan);
+ if (nextKey != null) {
+ return nextKey;
+ }
+ return null;
+ }
+ int index;
+ index = largerThan == null ? -1 : file.getKeyIndex(largerThan);
+ if (index < 0) {
+ index = -index - 2;
+ }
+ index++;
+ for (; index < file.getValueCount(); index++) {
+ fileName = file.getChildValue(index);
+ String result = getNextKey(largerThan, fileName);
+ if (result != null) {
+ return result;
+ }
+ }
+ return null;
+ }
+
+ // ===============================================================
+ // partitioning
+
+ public String getMinKey() {
+ return getNextKey(null);
+ }
+
+ public String getMaxKey() {
+ if (getFile(ROOT_NAME).getNextRoot() != null) {
+ throw new UnsupportedOperationException("Not fully merged");
+ }
+ String fileName = ROOT_NAME;
+ while (true) {
+ PageFile file = getFile(fileName);
+ if (!file.isInnerNode()) {
+ return file.getKey(file.getKeys().size() - 1);
+ }
+ fileName = file.getChildValue(file.getValueCount() - 1);
+ }
+ }
+
+ public String getApproximateMedianKey(String low, String high) {
+ if (getFile(ROOT_NAME).getNextRoot() != null) {
+ throw new UnsupportedOperationException("Not fully merged");
+ }
+ String fileName = ROOT_NAME;
+ while (true) {
+ PageFile file = getFile(fileName);
+ if (!file.isInnerNode()) {
+ return file.getKey(0);
+ }
+ int i1 = file.getKeyIndex(low);
+ int i2 = file.getKeyIndex(high);
+ if (i1 < 0) {
+ i1 = -i1 - 1;
+ }
+ if (i2 < 0) {
+ i2 = -i2 - 1;
+ }
+ if (i2 != i1) {
+ int middle = (i1 + i2) / 2;
+ return file.getKey(middle);
+ }
+ fileName = file.getChildValue(i1);
+ }
+ }
+
+}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/SlowStore.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/SlowStore.java
new file mode 100644
index 0000000000..8169091d21
--- /dev/null
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/SlowStore.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store;
+
+import java.util.Properties;
+import java.util.Set;
+
+public class SlowStore implements Store {
+
+ private final Properties config;
+ private final Store backend;
+ private final long mbOverhead;
+ private final long mbPerSecond;
+
+ public String toString() {
+ return "slow(" + backend + ")";
+ }
+
+ SlowStore(Store backend) {
+ this.config = backend.getConfig();
+ this.backend = backend;
+ this.mbOverhead = Integer.parseInt(config.getProperty("mbOverhead",
"3"));
+ this.mbPerSecond = Integer.parseInt(config.getProperty("mbPerSecond",
"70"));
+ }
+
+ private void delay(long nanos, int sizeInBytes) {
+ long bytes = sizeInBytes + 1_000_000 * mbOverhead;
+ long nanosRequired = 1_000 * bytes / mbPerSecond;
+ long delayNanos = nanosRequired - nanos;
+ long delayMillis = delayNanos / 1_000_000;
+ if (delayMillis > 0) {
+ try {
+ Thread.sleep(delayMillis);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+
+ @Override
+ public PageFile getIfExists(String key) {
+ long start = System.nanoTime();
+ PageFile result = backend.getIfExists(key);
+ long time = System.nanoTime() - start;
+ delay(time, result == null ? 0 : result.sizeInBytes());
+ return result;
+ }
+
+ @Override
+ public void put(String key, PageFile value) {
+ long start = System.nanoTime();
+ backend.put(key, value);
+ long time = System.nanoTime() - start;
+ delay(time, value.sizeInBytes());
+ }
+
+ @Override
+ public String newFileName() {
+ return backend.newFileName();
+ }
+
+ @Override
+ public Set<String> keySet() {
+ return backend.keySet();
+ }
+
+ @Override
+ public void remove(Set<String> set) {
+ backend.remove(set);
+ }
+
+ @Override
+ public void removeAll() {
+ backend.removeAll();
+ }
+
+ @Override
+ public long getWriteCount() {
+ return backend.getWriteCount();
+ }
+
+ @Override
+ public long getReadCount() {
+ return backend.getReadCount();
+ }
+
+ @Override
+ public void setWriteCompression(Compression compression) {
+ backend.setWriteCompression(compression);
+ }
+
+ @Override
+ public void close() {
+ backend.close();
+ }
+
+ @Override
+ public Properties getConfig() {
+ return config;
+ }
+
+ @Override
+ public boolean supportsByteOperations() {
+ return backend.supportsByteOperations();
+ }
+
+ @Override
+ public byte[] getBytes(String key) {
+ long start = System.nanoTime();
+ byte[] result = backend.getBytes(key);
+ long time = System.nanoTime() - start;
+ delay(time, result.length);
+ return result;
+ }
+
+ @Override
+ public void putBytes(String key, byte[] data) {
+ long start = System.nanoTime();
+ backend.putBytes(key, data);
+ long time = System.nanoTime() - start;
+ delay(time, data.length);
+ }
+
+}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/StatsStore.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/StatsStore.java
new file mode 100644
index 0000000000..ce5b39dc96
--- /dev/null
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/StatsStore.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store;
+
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class StatsStore implements Store {
+
+ private final Properties config;
+ private final Store backend;
+ private long lastLog;
+ private AtomicLong pending = new AtomicLong();
+
+ private final ConcurrentHashMap<String, Stats> map = new
ConcurrentHashMap<>();
+
+ public String toString() {
+ return "stats(" + backend + ")";
+ }
+
+ StatsStore(Store backend) {
+ this.config = backend.getConfig();
+ this.backend = backend;
+ }
+
+ @Override
+ public PageFile getIfExists(String key) {
+ long start = start();
+ long sizeInBytes = 0;
+ try {
+ PageFile result = backend.getIfExists(key);
+ sizeInBytes = result == null ? 0 : result.sizeInBytes();
+ return result;
+ } finally {
+ add("getIfExists", start, sizeInBytes);
+ }
+ }
+
+ private long start() {
+ pending.incrementAndGet();
+ return System.nanoTime();
+ }
+
+ private void add(String key, long start, long bytes) {
+ long now = System.nanoTime();
+ pending.decrementAndGet();
+ long nanos = now - start;
+ Stats stats = map.computeIfAbsent(key, s -> new Stats(key));
+ stats.count++;
+ stats.nanosMax = Math.max(stats.nanosMax, nanos);
+ stats.nanosMin = Math.min(stats.nanosMin, nanos);
+ stats.nanosTotal += nanos;
+ stats.bytesMax = Math.max(stats.bytesMax, bytes);
+ stats.bytesMin = Math.min(stats.bytesMin, nanos);
+ stats.bytesTotal += bytes;
+ if (lastLog == 0) {
+ lastLog = start;
+ }
+ if (now - lastLog > 10_000_000_000L) {
+ TreeMap<String, Stats> sorted = new TreeMap<>(map);
+ System.out.print(backend.toString());
+ System.out.println(sorted.values().toString() + " pending " +
pending);
+ lastLog = now;
+ }
+ }
+
+ @Override
+ public void put(String key, PageFile value) {
+ long start = start();
+ try {
+ backend.put(key, value);
+ } finally {
+ add("put", start, value.sizeInBytes());
+ }
+ }
+
+ @Override
+ public String newFileName() {
+ return backend.newFileName();
+ }
+
+ @Override
+ public Set<String> keySet() {
+ return backend.keySet();
+ }
+
+ @Override
+ public void remove(Set<String> set) {
+ backend.remove(set);
+ }
+
+ @Override
+ public void removeAll() {
+ backend.removeAll();
+ }
+
+ @Override
+ public long getWriteCount() {
+ return backend.getWriteCount();
+ }
+
+ @Override
+ public long getReadCount() {
+ return backend.getReadCount();
+ }
+
+ @Override
+ public void setWriteCompression(Compression compression) {
+ backend.setWriteCompression(compression);
+ }
+
+ @Override
+ public void close() {
+ backend.close();
+ }
+
+ @Override
+ public Properties getConfig() {
+ return config;
+ }
+
+ @Override
+ public boolean supportsByteOperations() {
+ return backend.supportsByteOperations();
+ }
+
+ @Override
+ public byte[] getBytes(String key) {
+ long start = start();
+ long len = 0;
+ try {
+ byte[] result = backend.getBytes(key);
+ len = result.length;
+ return result;
+ } finally {
+ add("getBytes", start, len);
+ }
+ }
+
+ @Override
+ public void putBytes(String key, byte[] data) {
+ long start = start();
+ try {
+ backend.putBytes(key, data);
+ } finally {
+ add("putBytes", start, data.length);
+ }
+ }
+
+ static class Stats {
+ final String key;
+ long count;
+ long bytesMin;
+ long bytesMax;
+ long bytesTotal;
+ long nanosMin;
+ long nanosMax;
+ long nanosTotal;
+
+ public Stats(String key) {
+ this.key = key;
+ }
+
+ public String toString() {
+ if (count == 0) {
+ return "";
+ }
+ String result = key;
+ result += " " + count + " calls";
+ if (bytesTotal > 0 && nanosTotal > 0) {
+ result += " " + (bytesTotal / count / 1_000_000) + " avgMB" +
+ (bytesTotal == 0 ? "" : (" " + ((bytesTotal * 1_000) /
nanosTotal) + " MB/s"));
+ }
+ return result;
+ }
+
+ }
+
+}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/Store.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/Store.java
new file mode 100644
index 0000000000..37e70560ac
--- /dev/null
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/Store.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store;
+
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Storage for files.
+ */
+public interface Store {
+
+ /**
+ * Get a file
+ *
+ * @param key the file name
+ * @return the file
+ */
+ default PageFile get(String key) {
+ PageFile result = getIfExists(key);
+ if (result == null) {
+ throw new IllegalStateException("Not found: " + key);
+ }
+ return result;
+ }
+
+ /**
+ * Get a file if it exists
+ *
+ * @param key the file name
+ * @return the file, or null
+ */
+ PageFile getIfExists(String key);
+
+ /**
+ * Storage a file.
+ *
+ * @param key the file name
+ * @param value the file
+ */
+ void put(String key, PageFile value);
+
+ /**
+ * Generate a new file name.
+ *
+ * @return
+ */
+ String newFileName();
+
+ /**
+ * Get the list of files.
+ *
+ * @return the result
+ */
+ Set<String> keySet();
+
+ /**
+ * Remove a number of files.
+ *
+ * @param set the result
+ */
+ void remove(Set<String> set);
+
+ /**
+ * Remove all files.
+ */
+ void removeAll();
+
+ /**
+ * Get the number of files written.
+ *
+ * @return the result
+ */
+ long getWriteCount();
+
+ /**
+ * Get the number of files read.
+ *
+ * @return the result
+ */
+ long getReadCount();
+
+ /**
+ * Set the compression algorithm used for writing from now on.
+ *
+ * @param compression the compression algorithm
+ */
+ void setWriteCompression(Compression compression);
+
+ /**
+ * Close the store
+ */
+ void close();
+
+ Properties getConfig();
+
+ default boolean supportsByteOperations() {
+ return false;
+ }
+
+ default byte[] getBytes(String key) {
+ throw new UnsupportedOperationException();
+ }
+
+ default void putBytes(String key, byte[] data) {
+ throw new UnsupportedOperationException();
+ }
+
+}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/StoreBuilder.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/StoreBuilder.java
new file mode 100644
index 0000000000..ccab3e112c
--- /dev/null
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/StoreBuilder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Properties;
+
+public class StoreBuilder {
+
+ /**
+ * Build a store. The configuration options are passed as a list of
properties.
+ *
+ * - empty string or null: in-memory.
+ * - "type=file": file system, with "dir" directory
+ * - "type=azure": Azure
+ * - "type=s3": Amazon S3
+ * - "type=diskCache": disk cache, with "dir" and "backend=azure" or "s3"
+ *
+ * @param config the config
+ * @return a store
+ * @throws IllegalArgumentException
+ */
+ public static Store build(String config) throws IllegalArgumentException {
+ if (config == null || config.isEmpty()) {
+ return new MemoryStore(new Properties());
+ }
+ Properties prop = new Properties();
+ try {
+ prop.load(new StringReader(config));
+ } catch (IOException e) {
+ throw new IllegalArgumentException(config, e);
+ }
+ return build(prop);
+ }
+
+ public static Store build(Properties config) {
+ String type = config.getProperty("type");
+ switch (type) {
+ case "memory":
+ return new MemoryStore(config);
+ case "file":
+ return new FileStore(config);
+ case "azure":
+ return new AzureStore(config);
+ case "diskCache":
+ return new DiskCacheStore(config);
+ }
+ if (type.startsWith("stats.")) {
+ config.put("type", type.substring("stats.".length()));
+ return new StatsStore(build(config));
+ } else if (type.startsWith("slow.")) {
+ config.put("type", type.substring("slow.".length()));
+ return new SlowStore(build(config));
+ } else if (type.startsWith("log.")) {
+ config.put("type", type.substring("log.".length()));
+ return new LogStore(build(config));
+ }
+ throw new IllegalArgumentException(config.toString());
+ }
+
+ public static Properties subProperties(Properties p, String prefix) {
+ Properties p2 = new Properties();
+ for (Object k : p.keySet()) {
+ if (k.toString().startsWith(prefix)) {
+ p2.put(k.toString().substring(prefix.length()), p.get(k));
+ }
+ }
+ return p2;
+ }
+
+}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/utils/Cache.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/utils/Cache.java
new file mode 100644
index 0000000000..fcb2e4bf49
--- /dev/null
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/utils/Cache.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store.utils;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class Cache<K, V> extends LinkedHashMap<K, V> {
+
+ private static final long serialVersionUID = 1L;
+ private int maxSize;
+
+ public Cache(int maxSize) {
+ super(16, 0.75f, true);
+ this.maxSize = maxSize;
+ }
+
+ public void setSize(int maxSize) {
+ this.maxSize = maxSize;
+ }
+
+ @Override
+ public boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+ return size() > maxSize;
+ }
+
+}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/utils/Position.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/utils/Position.java
new file mode 100644
index 0000000000..2fc6b6e51a
--- /dev/null
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/utils/Position.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store.utils;
+
+import org.apache.jackrabbit.oak.index.indexer.document.tree.store.PageFile;
+
+public class Position {
+ public PageFile file;
+ public int valuePos;
+
+ public String toString() {
+ return (file.isInnerNode() ? "internal" : "leaf") + " " + valuePos + "
" + file.getKeys() + "\n";
+ }
+}
\ No newline at end of file
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/utils/SortedStream.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/utils/SortedStream.java
new file mode 100644
index 0000000000..8659becb19
--- /dev/null
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/utils/SortedStream.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store.utils;
+
+import java.util.Iterator;
+
+public class SortedStream implements Comparable<SortedStream> {
+
+ private final String rootFileName;
+ private Iterator<Position> it;
+ public String currentKey;
+ public String currentValue;
+
+ public SortedStream(String rootFileName, Iterator<Position> it) {
+ this.rootFileName = rootFileName;
+ this.it = it;
+ next();
+ }
+
+ public String toString() {
+ return "file " + rootFileName + " key " + currentKey + " value " +
currentValue;
+ }
+
+ String currentKeyOrNull() {
+ return currentKey;
+ }
+
+ Object currentValue() {
+ return currentValue;
+ }
+
+ public void next() {
+ if (it.hasNext()) {
+ Position pos = it.next();
+ currentKey = pos.file.getKey(pos.valuePos);
+ currentValue = pos.file.getValue(pos.valuePos);
+ } else {
+ currentKey = null;
+ currentValue = null;
+ }
+ }
+
+ @Override
+ public int compareTo(SortedStream o) {
+ if (currentKey == null) {
+ if (o.currentKey == null) {
+ return rootFileName.compareTo(o.rootFileName);
+ }
+ return 1;
+ } else if (o.currentKey == null) {
+ return -1;
+ }
+ int comp = currentKey.compareTo(o.currentKey);
+ if (comp == 0) {
+ return rootFileName.compareTo(o.rootFileName);
+ }
+ return comp;
+ }
+}
\ No newline at end of file
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/utils/Uuid.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/utils/Uuid.java
new file mode 100644
index 0000000000..2e2873f6ae
--- /dev/null
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/utils/Uuid.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store.utils;
+
+import java.security.SecureRandom;
+import java.time.Instant;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A UUID implementation.
+ *
+ * It supports creating version 7 UUIDs, which are time-ordered.
+ * See also draft-ietf-uuidrev-rfc4122bis-00
+ *
+ * Unlike java.util.UUID, the comparison is unsigned,
+ * so that the string comparison yields the same result.
+ */
+public class Uuid implements Comparable<Uuid> {
+
+ private static final AtomicLong UUID_LAST_MILLIS_AND_COUNT = new
AtomicLong(0);
+
+ private static final SecureRandom RANDOM = new SecureRandom();
+
+ // most significant bits
+ private final long msb;
+
+ // least significant bits
+ private final long lsb;
+
+ Uuid(long msb, long lsb) {
+ this.msb = msb;
+ this.lsb = lsb;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%08x-%04x-%04x-%04x-%012x",
+ msb >>> 32, (msb >>> 16) & 0xffff, msb & 0xffff,
+ (lsb >>> 48) & 0xffff, lsb & 0xffffffffffffL);
+ }
+
+ public String toShortString() {
+ return String.format("%012x%03x%016x",
+ getTimestampPart(), getCounterPart(), getRandomPart());
+ }
+
+ public String toHumanReadableString() {
+ Instant instant = Instant.ofEpochMilli(getTimestampPart());
+ return String.format("%s %03x %016x",
+ instant.toString(), getCounterPart(), getRandomPart());
+ }
+
+ /**
+ * Get the timestamp part (48 bits).
+ *
+ * @return the timestamp part
+ */
+ public long getTimestampPart() {
+ return msb >>> 16;
+ }
+
+ /**
+ * Get the counter part (12 bits).
+ *
+ * @return counter part
+ */
+ public long getCounterPart() {
+ return msb & ((1L << 12) - 1);
+ }
+
+ /**
+ * Get the random part (62 bits).
+ * The first 2 bits are fixed.
+ *
+ * @return the random part
+ */
+ public long getRandomPart() {
+ return lsb;
+ }
+
+ /**
+ * Unlike java.util.UUID, the comparison is unsigned,
+ * so that the string comparison yields the same result.
+ */
+ @Override
+ public int compareTo(Uuid o) {
+ if (o.msb != msb) {
+ return Long.compareUnsigned(msb, o.msb);
+ }
+ return Long.compareUnsigned(lsb, o.lsb);
+ }
+
+ @Override
+ public int hashCode() {
+ long x = lsb ^ msb;
+ return (int) ((x >>> 32) ^ x);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ Uuid other = (Uuid) obj;
+ return lsb == other.lsb && msb == other.msb;
+ }
+
+ /**
+ * Get the next timestamp (in milliseconds) and counter.
+ * The lowest 12 bits of the returned value is the counter.
+ * The milliseconds are shifted by 12 bits.
+ *
+ * @param now the milliseconds
+ * @param lastMillisAndCount the last returned value
+ * @return the new value
+ */
+ static long getMillisAndCountIncreasing(long now, AtomicLong
lastMillisAndCount) {
+ long result = now << 12;
+ while (true) {
+ long last = lastMillisAndCount.get();
+ if (result <= last) {
+ // ensure it is non-decrementing
+ result = last + 1;
+ }
+ long got = lastMillisAndCount.compareAndExchange(last, result);
+ if (got == last) {
+ return result;
+ }
+ }
+ }
+
+ static Uuid timeBasedVersion7(long millisAndCount,
+ long random) {
+ long millis = millisAndCount >>> 12;
+ long counter = millisAndCount & ((1L << 12) - 1);
+ long version = 7;
+ long variant = 2;
+ long msb = (millis << 16) | (version << 12) | counter;
+ long lsb = (variant << 62) | (random & ((1L << 62) - 1));
+ return new Uuid(msb, lsb);
+ }
+
+ public static Uuid timeBasedVersion7() {
+ long millisAndCount = getMillisAndCountIncreasing(
+ System.currentTimeMillis(),
+ UUID_LAST_MILLIS_AND_COUNT);
+ long random = RANDOM.nextLong();
+ return timeBasedVersion7(millisAndCount, random);
+ }
+
+ public long getMostSignificantBits() {
+ return msb;
+ }
+
+ public long getLeastSignificantBits() {
+ return lsb;
+ }
+
+}