This is an automated email from the ASF dual-hosted git repository.

thomasm pushed a commit to branch OAK-10341
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/OAK-10341 by this push:
     new 5fb931c539 OAK-10341 TreeStore
5fb931c539 is described below

commit 5fb931c5395feac3541ad1fa3f4c05d61a6b66f7
Author: Thomas Mueller <[email protected]>
AuthorDate: Fri Jul 7 09:30:17 2023 +0200

    OAK-10341 TreeStore
---
 .../indexer/document/tree/store/AzureStore.java    | 220 ++++++
 .../indexer/document/tree/store/Compression.java   |  98 +++
 .../document/tree/store/DiskCacheStore.java        | 226 ++++++
 .../indexer/document/tree/store/FileStore.java     | 279 +++++++
 .../indexer/document/tree/store/LogStore.java      | 122 +++
 .../indexer/document/tree/store/MemoryStore.java   |  99 +++
 .../indexer/document/tree/store/PageFile.java      | 376 ++++++++++
 .../index/indexer/document/tree/store/Session.java | 829 +++++++++++++++++++++
 .../indexer/document/tree/store/SlowStore.java     | 140 ++++
 .../indexer/document/tree/store/StatsStore.java    | 198 +++++
 .../index/indexer/document/tree/store/Store.java   | 125 ++++
 .../indexer/document/tree/store/StoreBuilder.java  |  88 +++
 .../indexer/document/tree/store/utils/Cache.java   |  43 ++
 .../document/tree/store/utils/Position.java        |  30 +
 .../document/tree/store/utils/SortedStream.java    |  75 ++
 .../indexer/document/tree/store/utils/Uuid.java    | 181 +++++
 16 files changed, 3129 insertions(+)

diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/AzureStore.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/AzureStore.java
new file mode 100644
index 0000000000..31d90f0173
--- /dev/null
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/AzureStore.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.jackrabbit.oak.index.indexer.document.tree.store.utils.Uuid;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudAppendBlob;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import com.microsoft.azure.storage.blob.ListBlobItem;
+
+public class AzureStore implements Store {
+
+    private final Properties config;
+    private final CloudStorageAccount cloud;
+    private final CloudBlobClient cloudBlobClient;
+    private final CloudBlobContainer container;
+    private final CloudBlobDirectory dir;
+    private Compression compression = Compression.NO;
+    private long writeCount;
+    private long readCount;
+
+    public String toString() {
+        return "azure";
+    }
+
+    public AzureStore(Properties config) {
+        this.config = config;
+        try {
+            cloud = CloudStorageAccount.parse(
+                    config.getProperty("storageAccount"));
+            cloudBlobClient = cloud.createCloudBlobClient();
+            String maximumExecutionTimeInMs = 
config.getProperty("maximumExecutionTimeInMs");
+            if (maximumExecutionTimeInMs != null) {
+                cloudBlobClient.getDefaultRequestOptions().
+                    
setMaximumExecutionTimeInMs(Integer.parseInt(maximumExecutionTimeInMs));
+            }
+            container = cloudBlobClient.getContainerReference(
+                    config.getProperty("container"));
+            container.createIfNotExists();
+            dir = container.getDirectoryReference(
+                    config.getProperty("directory"));
+        } catch (Exception e) {
+; // TODO proper logging
+e.printStackTrace();
+            throw new IllegalArgumentException(config.toString(), e);
+        }
+    }
+
+    @Override
+    public void setWriteCompression(Compression compression) {
+        this.compression = compression;
+    }
+
+    @Override
+    public PageFile getIfExists(String key) {
+        try {
+            readCount++;
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            CloudAppendBlob blob = dir.getAppendBlobReference(key);
+            blob.download(out);
+            byte[] data = out.toByteArray();
+            Compression c = Compression.getCompressionFromData(data[0]);
+            data = c.expand(data);
+            return PageFile.fromBytes(data);
+        } catch (URISyntaxException | StorageException e) {
+; // TODO proper logging
+e.printStackTrace();
+            throw new IllegalArgumentException(key, e);
+        }
+    }
+
+    @Override
+    public boolean supportsByteOperations() {
+        return true;
+    }
+
+    @Override
+    public byte[] getBytes(String key) {
+        try {
+            readCount++;
+            CloudBlockBlob blob = dir.getBlockBlobReference(key);
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            blob.download(out);
+            return out.toByteArray();
+        } catch (URISyntaxException | StorageException e) {
+; // TODO proper logging
+e.printStackTrace();
+            throw new IllegalArgumentException(key, e);
+        }
+    }
+
+    @Override
+    public void putBytes(String key, byte[] data) {
+        try {
+            writeCount++;
+            CloudBlockBlob blob = dir.getBlockBlobReference(key);
+            blob.upload(new ByteArrayInputStream(data), data.length);
+        } catch (URISyntaxException | StorageException | IOException e) {
+; // TODO proper logging
+e.printStackTrace();
+            throw new IllegalArgumentException(e);
+        }
+    }
+
+    @Override
+    public void put(String key, PageFile value) {
+        byte[] data = value.toBytes();
+        data = compression.compress(data);
+        CloudBlockBlob blob;
+        try {
+            writeCount++;
+            blob = dir.getBlockBlobReference(key);
+long start = System.nanoTime();
+            blob.uploadFromByteArray(data, 0, data.length);
+long time = System.nanoTime() - start;
+System.out.println("Azure upload " + key + " size " + data.length + " nanos " 
+ time);
+
+        } catch (URISyntaxException | StorageException | IOException e) {
+; // TODO proper logging
+e.printStackTrace();
+            throw new IllegalArgumentException(e);
+        }
+    }
+
+    @Override
+    public String newFileName() {
+        return Uuid.timeBasedVersion7().toShortString();
+    }
+
+    @Override
+    public Set<String> keySet() {
+        try {
+            HashSet<String> set = new HashSet<>();
+            for (ListBlobItem item : dir.listBlobs()) {
+                if (item instanceof CloudBlockBlob) {
+                    String name = ((CloudBlockBlob) item).getName();
+                    int index = name.lastIndexOf('/');
+                    if (index >= 0) {
+                        name = name.substring(index + 1);
+                    }
+                    set.add(name);
+                }
+            }
+            return set;
+        } catch (StorageException | URISyntaxException e) {
+; // TODO proper logging
+e.printStackTrace();
+            throw new IllegalArgumentException(e);
+        }
+    }
+
+    @Override
+    public void remove(Set<String> set) {
+        try {
+            for(String key : set) {
+                CloudBlockBlob blob = dir.getBlockBlobReference(key);
+                writeCount++;
+                blob.deleteIfExists();
+            }
+        } catch (StorageException | URISyntaxException e) {
+; // TODO proper logging
+e.printStackTrace();
+            throw new IllegalArgumentException(e);
+        }
+    }
+
+    @Override
+    public void removeAll() {
+        remove(keySet());
+    }
+
+    @Override
+    public long getWriteCount() {
+        return writeCount;
+    }
+
+    @Override
+    public long getReadCount() {
+        return readCount;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public Properties getConfig() {
+        return config;
+    }
+
+}
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/Compression.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/Compression.java
new file mode 100644
index 0000000000..da76027dc2
--- /dev/null
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/Compression.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store;
+
+import java.util.Arrays;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+
+public enum Compression {
+    NO {
+        @Override
+        byte[] compress(byte[] data) {
+            return data;
+        }
+
+        @Override
+        byte[] expand(byte[] data) {
+            return data;
+        }
+    },
+    LZ4 {
+        private final LZ4Compressor compressor = 
LZ4Factory.fastestInstance().fastCompressor();
+        private final LZ4FastDecompressor decompressor = 
LZ4Factory.fastestInstance().fastDecompressor();
+
+        private byte[] compressBuffer = new byte[1024 * 1024];
+
+        @Override
+        byte[] compress(byte[] data) {
+            // TODO synchronization is needed because we share the buffer
+            synchronized (compressor) {
+                byte[] buffer = compressBuffer;
+                if (buffer.length < 2 * data.length) {
+                    // increase the size
+                    buffer = new byte[2 * data.length];
+                    compressBuffer = buffer;
+                }
+                buffer[0] = '4';
+                writeInt(buffer, 1, data.length);
+                int len = 5 + compressor.compress(data, 0, data.length, 
buffer, 5, buffer.length - 5);
+                return Arrays.copyOf(buffer, len);
+            }
+        }
+
+        @Override
+        byte[] expand(byte[] data) {
+            // TODO synchronization is needed because we share the buffer
+            synchronized (decompressor) {
+                int len = readInt(data, 1);
+                byte[] target = new byte[len];
+                decompressor.decompress(data, 5, target, 0, len);
+                return target;
+            }
+        }
+    };
+
+    abstract byte[] compress(byte[] data);
+    abstract byte[] expand(byte[] data);
+
+
+    public static void writeInt(byte[] buff, int pos, int x) {
+        buff[pos++] = (byte) (x >> 24);
+        buff[pos++] = (byte) (x >> 16);
+        buff[pos++] = (byte) (x >> 8);
+        buff[pos] = (byte) x;
+    }
+
+    public static int readInt(byte[] buff, int pos) {
+        return (buff[pos++] << 24) + ((buff[pos++] & 0xff) << 16) + 
((buff[pos++] & 0xff) << 8) + (buff[pos] & 0xff);
+    }
+
+    public static Compression getCompressionFromData(byte data) {
+        switch (data) {
+        case '4':
+            return Compression.LZ4;
+        default:
+            return Compression.NO;
+        }
+    }
+
+}
\ No newline at end of file
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/DiskCacheStore.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/DiskCacheStore.java
new file mode 100644
index 0000000000..e25ff91a77
--- /dev/null
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/DiskCacheStore.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jackrabbit.oak.index.indexer.document.tree.store.utils.Cache;
+
+public class DiskCacheStore implements Store {
+
+    private final Properties config;
+
+    // the frontend (local disk)
+    private Store frontend;
+
+    // the backend (azure or s3)
+    private Store backend;
+
+    // set of entries that are not yet uploaded
+    private ConcurrentHashMap<String, String> uploading = new 
ConcurrentHashMap<>();
+
+    // executor service that uploads files
+    private ExecutorService executor;
+
+    // cache for entries that were downloaded
+    // (doesn't contain entries that are not yet uploaded)
+    private Cache<String, String> cache;
+
+    private int readAhead;
+
+    public String toString() {
+        return "diskCache(" + frontend + ", " + backend + ")";
+    }
+
+    public DiskCacheStore(Properties config) {
+        this.config = config;
+        frontend = StoreBuilder.build(StoreBuilder.subProperties(config, 
"front."));
+
+        // cache size, in number of files on disk
+        int cacheSize = Integer.parseInt(config.getProperty("cacheSize", 
"1000000000"));
+
+        // read ahead this number of children
+        this.readAhead = Integer.parseInt(config.getProperty("readAhead", 
"1000000"));
+
+        // using this number of threads at most
+        int threads = Integer.parseInt(config.getProperty("threads", "20"));
+
+        cache = new Cache<String, String>(cacheSize) {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public boolean removeEldestEntry(Map.Entry<String, String> eldest) 
{
+                boolean result = super.removeEldestEntry(eldest);
+                if (result) {
+                    frontend.remove(Collections.singleton(eldest.getKey()));
+                }
+                return result;
+            }
+        };
+        executor = new ThreadPoolExecutor(threads, threads, 0L, 
TimeUnit.MILLISECONDS,
+                new LinkedBlockingQueue<>(),
+                r -> {
+                    Thread thread = new Thread(r, "DiskCacheStore");
+                    thread.setDaemon(true);
+                    return thread;
+                });
+        backend = StoreBuilder.build(StoreBuilder.subProperties(config, 
"back."));
+    }
+
+    @Override
+    public PageFile getIfExists(String key) {
+        PageFile result = frontend.getIfExists(key);
+        if (result != null) {
+            return result;
+        }
+        if (backend.supportsByteOperations() && 
frontend.supportsByteOperations()) {
+            byte[] data = backend.getBytes(key);
+            frontend.putBytes(key, data);
+            result = frontend.get(key);
+        } else {
+            result = backend.get(key);
+            frontend.put(key, result);
+        }
+        synchronized (cache) {
+            cache.put(key, key);
+        }
+        if (result.isInnerNode()) {
+            for (int i = 0; i < readAhead && i < result.getValueCount(); i++) {
+                String childKey = result.getChildValue(i);
+                // System.out.println("    prefetching " + childKey + " 
because we read " + key);
+                executor.submit(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        // this will put the entry in the cache
+                        // and also read the children if needed
+                        getIfExists(childKey);
+                        // System.out.println("    prefetch done for " + 
childKey + " because we read " + key);
+                    }
+
+                });
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public void put(String key, PageFile value) {
+        if (uploading.containsKey(key)) {
+            System.out.println("WARNING: upload is in progress: " + key +
+                    " and a new upload is scheduled. waiting for the upload to 
finish, to avoid concurrency issues.");
+            while (uploading.containsKey(key)) {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
+                }
+            }
+        }
+        uploading.put(key, key);
+        frontend.put(key, value);
+        executor.submit(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    if (frontend.supportsByteOperations() && 
backend.supportsByteOperations()) {
+                        byte[] data = frontend.getBytes(key);
+                        backend.putBytes(key, data);
+                    } else {
+                        backend.put(key, frontend.get(key));
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+                uploading.remove(key);
+            }
+
+        });
+    }
+
+    @Override
+    public String newFileName() {
+        return frontend.newFileName();
+    }
+
+    @Override
+    public Set<String> keySet() {
+        return backend.keySet();
+    }
+
+    @Override
+    public void remove(Set<String> set) {
+        synchronized (cache) {
+            set.forEach(k -> cache.remove(k));
+        }
+        frontend.remove(set);
+        backend.remove(set);
+    }
+
+    @Override
+    public void removeAll() {
+        frontend.removeAll();
+        backend.removeAll();
+    }
+
+    @Override
+    public long getWriteCount() {
+        return frontend.getWriteCount();
+    }
+
+    @Override
+    public long getReadCount() {
+        return frontend.getReadCount();
+    }
+
+    @Override
+    public void setWriteCompression(Compression compression) {
+        frontend.setWriteCompression(compression);
+        backend.setWriteCompression(compression);
+    }
+
+    @Override
+    public void close() {
+        try {
+            executor.shutdown();
+            executor.awaitTermination(1, TimeUnit.DAYS);
+        } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+        frontend.close();
+        backend.close();
+    }
+
+    @Override
+    public Properties getConfig() {
+        return config;
+    }
+
+}
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/FileStore.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/FileStore.java
new file mode 100644
index 0000000000..9c72f8d6f7
--- /dev/null
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/FileStore.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.jackrabbit.oak.index.indexer.document.tree.store.utils.Uuid;
+
+public class FileStore implements Store {
+
+    private final Properties config;
+    private final String directory;
+    private Compression compression = Compression.NO;
+    private long writeCount, readCount;
+    private Thread backgroundThread;
+    private ConcurrentHashMap<String, PageFile> pendingWrites = new 
ConcurrentHashMap<>();
+    private LinkedBlockingQueue<WriteOperation> queue = new 
LinkedBlockingQueue<>(100);
+
+    private static final WriteOperation STOP = new WriteOperation();
+
+    static class WriteOperation {
+        String key;
+        byte[] value;
+    }
+
+    public String toString() {
+        return "file(" + directory + ")";
+    }
+
+    public FileStore(Properties config) {
+        this.config = config;
+        this.directory = config.getProperty("dir");
+        new File(directory).mkdirs();
+        boolean asyncWrite = Boolean.parseBoolean(config.getProperty("async", 
"false"));
+        if (asyncWrite) {
+            startAsyncWriter();
+        }
+    }
+
+    private void startAsyncWriter() {
+        backgroundThread = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    for (int i = 0;; i++) {
+                        WriteOperation op = queue.take();
+                        if (i % 200 == 0) {
+                            // System.out.println("  file writer queue size " 
+ queue.size());
+                        }
+                        if (op == STOP) {
+                            break;
+                        }
+                        writeFile(op.key, op.value);
+                        pendingWrites.remove(op.key);
+                    }
+                } catch (InterruptedException e) {
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
+                }
+            }
+
+        });
+        backgroundThread.setDaemon(true);
+        backgroundThread.start();
+    }
+
+    @Override
+    public void close() {
+        try {
+            if (backgroundThread != null) {
+                queue.put(STOP);
+                backgroundThread.join();
+            }
+        } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void setWriteCompression(Compression compression) {
+        this.compression = compression;
+    }
+
+    @Override
+    public PageFile getIfExists(String key) {
+        PageFile pending = pendingWrites.get(key);
+        if (pending != null) {
+            return pending;
+        }
+        readCount++;
+        File f = getFile(key);
+        if (!f.exists()) {
+            return null;
+        }
+        try (RandomAccessFile file = new RandomAccessFile(f, "r")) {
+            long length = file.length();
+            if (length == 0) {
+                // deleted in the meantime
+                return null;
+            }
+            byte[] data = new byte[(int) length];
+            file.readFully(data);
+            Compression c = Compression.getCompressionFromData(data[0]);
+            data = c.expand(data);
+            return PageFile.fromBytes(data);
+        } catch (IOException e) {
+            throw new IllegalArgumentException(e);
+        }
+    }
+
+    @Override
+    public void put(String key, PageFile value) {
+        writeCount++;
+        if (backgroundThread != null) {
+            writeFileAsync(key, value.copy());
+        } else {
+            writeFile(key, value.toBytes());
+        }
+    }
+
+    private void writeFileAsync(String key, PageFile value) {
+        pendingWrites.put(key, value);
+        WriteOperation op = new WriteOperation();
+        op.key = key;
+        op.value = value.toBytes();
+        try {
+            queue.put(op);
+        } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public boolean supportsByteOperations() {
+        return true;
+    }
+
+    @Override
+    public byte[] getBytes(String key) {
+        File f = getFile(key);
+        if (!f.exists()) {
+            return null;
+        }
+        try {
+            readCount++;
+            try (RandomAccessFile file = new RandomAccessFile(f, "r")) {
+                long length = file.length();
+                if (length == 0) {
+                    // deleted in the meantime
+                    return null;
+                }
+                byte[] data = new byte[(int) length];
+                file.readFully(data);
+                return data;
+            }
+        } catch (IOException e) {
+            throw new IllegalArgumentException(e);
+        }
+    }
+
+    @Override
+    public void putBytes(String key, byte[] data) {
+        try (FileOutputStream out = new FileOutputStream(getFile(key))) {
+            out.write(data);
+        } catch (IOException e) {
+            throw new IllegalArgumentException(e);
+        }
+    }
+
+    private void writeFile(String key, byte[] data) {
+        data = compression.compress(data);
+        putBytes(key, data);
+
+        /*
+        File tempFile = getFile(key, true);
+        File targetFile = getFile(key);
+        // 
https://stackoverflow.com/questions/595631/how-to-atomically-rename-a-file-in-java-even-if-the-dest-file-already-exists
+        try (RandomAccessFile file = new RandomAccessFile(tempFile, "rw")) {
+            file.write(data);
+        } catch (IOException e) {
+            throw new IllegalArgumentException(e);
+        }
+        try {
+            Files.move(tempFile.toPath(), targetFile.toPath(), 
StandardCopyOption.ATOMIC_MOVE);
+        } catch (IOException e) {
+            throw new IllegalArgumentException(e);
+        }
+        */
+    }
+
+    private File getFile(String key) {
+        return new File(directory, key);
+    }
+
+    @Override
+    public String newFileName() {
+        return Uuid.timeBasedVersion7().toShortString();
+    }
+
+    @Override
+    public Set<String> keySet() {
+        File dir = new File(directory);
+        if (!dir.exists()) {
+            return Collections.emptySet();
+        }
+        String[] list = dir.list(new FilenameFilter() {
+
+            @Override
+            public boolean accept(File dir, String name) {
+                return new File(dir, name).isFile();
+            }
+
+        });
+        return new HashSet<>(Arrays.asList(list));
+    }
+
+    @Override
+    public void remove(Set<String> set) {
+        // TODO keep for some time if the file is relatively new?
+        for (String key : set) {
+            writeCount++;
+            getFile(key).delete();
+        }
+    }
+
+    @Override
+    public void removeAll() {
+        File dir = new File(directory);
+        for(File f: dir.listFiles()) {
+            f.delete();
+        }
+    }
+
+    @Override
+    public long getWriteCount() {
+        return writeCount;
+    }
+
+    @Override
+    public long getReadCount() {
+        return readCount;
+    }
+
+    @Override
+    public Properties getConfig() {
+        return config;
+    }
+
+}
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/LogStore.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/LogStore.java
new file mode 100644
index 0000000000..1d9824673c
--- /dev/null
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/LogStore.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.Set;
+
+public class LogStore implements Store {
+
+    private final Properties config;
+    private final Store backend;
+
+    public String toString() {
+        return "log(" + backend + ")";
+    }
+
+    LogStore(Store backend) {
+        this.config = backend.getConfig();
+        this.backend = backend;
+    }
+
+    private void log(String message, Object... args) {
+        System.out.println(backend + "." + message + " " + 
Arrays.toString(args));
+    }
+
+    @Override
+    public PageFile getIfExists(String key) {
+        log("getIfExists", key);
+        return backend.getIfExists(key);
+    }
+
+    @Override
+    public void put(String key, PageFile value) {
+        log("put", key);
+        backend.put(key, value);
+    }
+
+    @Override
+    public String newFileName() {
+        return backend.newFileName();
+    }
+
+    @Override
+    public Set<String> keySet() {
+        log("keySet");
+        return backend.keySet();
+    }
+
+    @Override
+    public void remove(Set<String> set) {
+        log("remove", set);
+        backend.remove(set);
+    }
+
+    @Override
+    public void removeAll() {
+        log("removeAll");
+        backend.removeAll();
+    }
+
+    @Override
+    public long getWriteCount() {
+        return backend.getWriteCount();
+    }
+
+    @Override
+    public long getReadCount() {
+        return backend.getReadCount();
+    }
+
+    @Override
+    public void setWriteCompression(Compression compression) {
+        log("setWriteCompression", compression);
+        backend.setWriteCompression(compression);
+    }
+
+    @Override
+    public void close() {
+        log("close");
+        backend.close();
+    }
+
+    @Override
+    public Properties getConfig() {
+        return config;
+    }
+
+    @Override
+    public boolean supportsByteOperations() {
+        return backend.supportsByteOperations();
+    }
+
+    @Override
+    public byte[] getBytes(String key) {
+        log("getBytes", key);
+        return backend.getBytes(key);
+    }
+
+    @Override
+    public void putBytes(String key, byte[] data) {
+        log("putBytes", key, data.length);
+        backend.putBytes(key, data);
+    }
+
+}
\ No newline at end of file
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/MemoryStore.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/MemoryStore.java
new file mode 100644
index 0000000000..7892dfcf2d
--- /dev/null
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/MemoryStore.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store;
+
+import java.util.HashMap;
+import java.util.Properties;
+import java.util.Set;
+
+public class MemoryStore implements Store {
+
+    private final Properties config;
+    private final HashMap<String, PageFile> map = new HashMap<>();
+    private long nextFileName;
+    private long writeCount, readCount;
+
+    public MemoryStore() {
+        this(new Properties());
+    }
+
+    public MemoryStore(Properties config) {
+        this.config = config;
+    }
+
+    @Override
+    public void setWriteCompression(Compression compression) {
+        // ignore
+    }
+
+    public PageFile getIfExists(String key) {
+        readCount++;
+        return map.get(key);
+    }
+
+    public void put(String key, PageFile file) {
+        writeCount++;
+        map.put(key, file);
+    }
+
+    public String toString() {
+        return "files: " + map.size();
+    }
+
+    public String newFileName() {
+        return "f" + nextFileName++;
+    }
+
+    public Set<String> keySet() {
+        return map.keySet();
+    }
+
+    public void remove(Set<String> set) {
+        for (String key : set) {
+            writeCount++;
+            map.remove(key);
+        }
+    }
+
+    @Override
+    public void removeAll() {
+        map.clear();
+        nextFileName = 0;
+    }
+
+    @Override
+    public long getWriteCount() {
+        return writeCount;
+    }
+
+    @Override
+    public long getReadCount() {
+        return readCount;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public Properties getConfig() {
+        return config;
+    }
+
+}
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/PageFile.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/PageFile.java
new file mode 100644
index 0000000000..f1e2c4de8b
--- /dev/null
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/PageFile.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A B-tree page (leaf, or inner node).
+ * An inner node contains one more value than keys.
+ * A leaf page has the same number of keys and values.
+ */
+public class PageFile {
+
+    private static final boolean VERIFY_SIZE = false;
+    private static final int INITIAL_SIZE_IN_BYTES = 24;
+
+    private final boolean innerNode;
+
+    private static ByteBuffer REUSED_BUFFER = ByteBuffer.allocate(1024 * 1024);
+
+    private ArrayList<String> keys = new ArrayList<>();
+    private ArrayList<String> values = new ArrayList<>();
+    private long update;
+    private String nextRoot;
+    private int sizeInBytes = INITIAL_SIZE_IN_BYTES;
+
+    // -1: beginning; 0: middle; 1: end
+    private int lastSearchIndex;
+
+    // contains unwritten modifications
+    private boolean modified;
+
+    public PageFile(boolean innerNode) {
+        this.innerNode = innerNode;
+    }
+
+    public void setUpdate(long update) {
+        modified = true;
+        this.update = update;
+    }
+
+    public static PageFile fromBytes(byte[] data) {
+        ByteBuffer buff = ByteBuffer.wrap(data);
+        int type = buff.get();
+        String nextRoot = readString(buff);
+        long update = buff.getLong();
+        String prefix = readString(buff);
+        int len = buff.getInt();
+        PageFile result;
+        if (type == 0) {
+            result = new PageFile(true);
+            for (int i = 0; i < len; i++) {
+                result.appendRecord(prefix + readString(buff), 
readString(buff));
+            }
+            result.values.add(readString(buff));
+        } else {
+            result = new PageFile(false);
+            for (int i = 0; i < len; i++) {
+                result.appendRecord(prefix + readString(buff), 
readString(buff));
+            }
+        }
+        if (!nextRoot.isEmpty()) {
+            result.setNextRoot(nextRoot);
+        }
+        result.setUpdate(update);
+        result.modified = false;
+        return result;
+    }
+
+    public byte[] toBytes() {
+        // TODO synchronization is needed because we share the buffer
+        synchronized (PageFile.class) {
+            ByteBuffer buff = REUSED_BUFFER;
+            if (buff.capacity() < sizeInBytes * 2) {
+                buff = REUSED_BUFFER = ByteBuffer.allocate(sizeInBytes * 2);
+            }
+            buff.rewind();
+            // first byte may not be '4', as that is used for LZ4 compression
+            buff.put((byte) (innerNode ? 0 : 1));
+            writeString(buff, nextRoot == null ? "" : nextRoot);
+            buff.putLong(update);
+            String prefix = keys.size() < 2 ? "" : commonPrefix(keys.get(0), 
keys.get(keys.size() - 1));
+            writeString(buff, prefix);
+            buff.putInt(keys.size());
+            if (innerNode) {
+                for (int i = 0; i < keys.size(); i++) {
+                    writeString(buff, keys.get(i).substring(prefix.length()));
+                    writeString(buff, values.get(i));
+                }
+                writeString(buff, values.get(values.size() - 1));
+            } else {
+                for (int i = 0; i < keys.size(); i++) {
+                    writeString(buff, keys.get(i).substring(prefix.length()));
+                    writeString(buff, values.get(i));
+                }
+            }
+            buff.flip();
+            buff.rewind();
+            byte[] array = new byte[buff.remaining()];
+            buff.get(array);
+            // reset the limit
+            REUSED_BUFFER = ByteBuffer.wrap(buff.array());
+            return array;
+        }
+    }
+
+    private void writeString(ByteBuffer buff, String s) {
+        if (s == null) {
+            buff.putShort((short) -2);
+            return;
+        }
+        byte[] data = s.getBytes(StandardCharsets.UTF_8);
+        if (data.length < Short.MAX_VALUE) {
+            // could get a bit larger, but some negative values are reserved
+            buff.putShort((short) data.length);
+        } else {
+            buff.putShort((short) -1);
+            buff.putInt(data.length);
+        }
+        buff.put(data);
+    }
+
+    private static String readString(ByteBuffer buff) {
+        int len = buff.getShort();
+        if (len == -2) {
+            return null;
+        } else if (len == -1) {
+            len = buff.getInt();
+            int pos = buff.position();
+            buff.position(buff.position() + len);
+            return new String(buff.array(), pos, len, StandardCharsets.UTF_8);
+        } else {
+            int pos = buff.position();
+            buff.position(buff.position() + len);
+            return new String(buff.array(), pos, len, StandardCharsets.UTF_8);
+        }
+    }
+
+    private static String commonPrefix(String prefix, String x) {
+        if (prefix == null) {
+            return x;
+        }
+        int i = 0;
+        for (; i < prefix.length() && i < x.length(); i++) {
+            if (prefix.charAt(i) != x.charAt(i)) {
+                break;
+            }
+        }
+        return prefix.substring(0, i);
+    }
+
+    public String toString() {
+        return keys + "" + values;
+    }
+
+    public PageFile copy() {
+        PageFile result = new PageFile(innerNode);
+        result.modified = modified;
+        result.keys = new ArrayList<>(keys);
+        result.values = new ArrayList<>(values);
+        result.sizeInBytes = sizeInBytes;
+        result.nextRoot = nextRoot;
+        return result;
+    }
+
+    public void addChild(int index, String childKey, String newChildFileName) {
+        modified = true;
+        if (index > 0) {
+            keys.add(index - 1, childKey);
+            sizeInBytes += childKey.length();
+        }
+        values.add(index, newChildFileName);
+        sizeInBytes += 4;
+        sizeInBytes += newChildFileName.length();
+    }
+
+    public void setValue(int index, String value) {
+        modified = true;
+        sizeInBytes -= sizeInBytes(values.get(index));
+        sizeInBytes += sizeInBytes(value);
+        values.set(index, value);
+    }
+
+    private long sizeInBytes(String obj) {
+        if (obj == null) {
+            return 5;
+        } else if (obj instanceof String) {
+            return ((String) obj).length() + 2;
+        } else {
+            throw new IllegalStateException();
+        }
+    }
+
+    public void removeRecord(int index) {
+        modified = true;
+        String key = keys.remove(index);
+        String value = values.remove(index);
+        sizeInBytes -= 4;
+        sizeInBytes -= key.length();
+        sizeInBytes -= sizeInBytes(value);
+    }
+
+    public void appendRecord(String k, String v) {
+        modified = true;
+        keys.add(k);
+        values.add(v);
+        sizeInBytes += 4;
+        sizeInBytes += k.length();
+        sizeInBytes += sizeInBytes(v);
+    }
+
+    public void insertRecord(int index, String key, String value) {
+        modified = true;
+        keys.add(index, key);
+        values.add(index, value);
+        sizeInBytes += 4;
+        sizeInBytes += key.length();
+        sizeInBytes += sizeInBytes(value);
+    }
+
+    public long getUpdate() {
+        return update;
+    }
+
+    public int sizeInBytes() {
+        if (VERIFY_SIZE) {
+            int size = 24;
+            for (String p : keys) {
+                size += p.length();
+                size += 4;
+            }
+            for (String o : values) {
+                size += sizeInBytes(o);
+            }
+            if (size != sizeInBytes) {
+                throw new AssertionError();
+            }
+        }
+        return sizeInBytes;
+    }
+
+    public boolean canSplit() {
+        if (innerNode) {
+            return keys.size() > 2;
+        } else {
+            return keys.size() > 1;
+        }
+    }
+
+    public List<String> getKeys() {
+        return keys;
+    }
+
+    public int getKeyIndex(String key) {
+        int index;
+        if (keys.isEmpty()) {
+            return -1;
+        }
+        if (lastSearchIndex == 1) {
+            if (key.compareTo(keys.get(keys.size() - 1)) > 0) {
+                index = -(keys.size() + 1);
+                // index = Collections.binarySearch(recordKeys, key);
+            } else {
+                index = Collections.binarySearch(keys, key);
+            }
+        } else if (lastSearchIndex == -1) {
+            if (key.compareTo(keys.get(0)) < 0) {
+                index = -1;
+                // index = Collections.binarySearch(recordKeys, key);
+            } else {
+                index = Collections.binarySearch(keys, key);
+            }
+        } else {
+            index = Collections.binarySearch(keys, key);
+        }
+        if (index == -(keys.size() + 1)) {
+            lastSearchIndex = 1;
+        } else if (index == -1) {
+            lastSearchIndex = -1;
+        } else {
+            lastSearchIndex = 0;
+        }
+        return index;
+    }
+
+    public String getValue(int index) {
+        return values.get(index);
+    }
+
+    public String getChildValue(int index) {
+        return (String) values.get(index);
+    }
+
+    public String getNextKey(String largerThan) {
+        int index;
+        if (largerThan == null) {
+            index = 0;
+        } else {
+            index = getKeyIndex(largerThan);
+            if (index < 0) {
+                index = -index - 1;
+            } else {
+                index++;
+            }
+        }
+        if (index < 0 || index >= keys.size()) {
+            return null;
+        }
+        return keys.get(index);
+    }
+
+    public String getNextRoot() {
+        return nextRoot;
+    }
+
+    public void setNextRoot(String nextRoot) {
+        modified = true;
+        this.nextRoot = nextRoot;
+    }
+
+    public void removeKey(int index) {
+        modified = true;
+        String key = keys.get(index);
+        sizeInBytes -= key.length();
+        sizeInBytes -= 4;
+        keys.remove(index);
+    }
+
+    public void removeValue(int index) {
+        modified = true;
+        String x = (String) values.get(index);
+        sizeInBytes -= x.length();
+        values.remove(index);
+    }
+
+    public boolean isInnerNode() {
+        return innerNode;
+    }
+
+    public int getValueCount() {
+        return values.size();
+    }
+
+    public String getKey(int index) {
+        return keys.get(index);
+    }
+
+    public void setModified(boolean modified) {
+        this.modified = modified;
+    }
+
+    public boolean getModified() {
+        return modified;
+    }
+
+}
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/Session.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/Session.java
new file mode 100644
index 0000000000..bf68a6d773
--- /dev/null
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/Session.java
@@ -0,0 +1,829 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
+import java.util.Properties;
+
+import org.apache.jackrabbit.oak.index.indexer.document.tree.store.utils.Cache;
+import 
org.apache.jackrabbit.oak.index.indexer.document.tree.store.utils.Position;
+import 
org.apache.jackrabbit.oak.index.indexer.document.tree.store.utils.SortedStream;
+
+/**
+ * Read and write keys and values.
+ */
+public class Session {
+
+    private static final int DEFAULT_CACHE_SIZE = 128;
+    private static final int DEFAULT_MAX_FILE_SIZE = 16 * 1024;
+    private static final int DEFAULT_CACHE_SIZE_MB = 16;
+    private static final int DEFAULT_MAX_ROOTS = 10;
+
+    static final String ROOT_NAME = "root";
+    static final String INNER_NODE_PREFIX = "node_";
+    static final String LEAF_PREFIX = "data_";
+    static final String DELETED = new String("DELETED");
+
+    static final boolean MULTI_ROOT = true;
+
+    private final Store store;
+    private final Cache<String, PageFile> cache = new 
Cache<>(DEFAULT_CACHE_SIZE)  {
+        private static final long serialVersionUID = 1L;
+
+        public boolean removeEldestEntry(Map.Entry<String, PageFile> eldest) {
+            boolean result = super.removeEldestEntry(eldest);
+            if (result) {
+                String key = eldest.getKey();
+                PageFile value = eldest.getValue();
+                if(value.getModified()) {
+                    store.put(key, value);
+                    // not strictly needed as it's no longer referenced
+                    value.setModified(false);
+                }
+            }
+            return result;
+        }
+    };
+    private long updateId;
+    private int maxFileSize;
+    private int cacheSizeMB;
+    private int maxRoots = DEFAULT_MAX_ROOTS;
+    private long fileReadCount;
+
+    public Session() {
+        this(new MemoryStore(new Properties()));
+    }
+
+    public Session(Store store) {
+        this.store = store;
+        maxFileSize = 
Integer.parseInt(store.getConfig().getProperty("maxFileSize", "" + 
DEFAULT_MAX_FILE_SIZE));
+        cacheSizeMB = 
Integer.parseInt(store.getConfig().getProperty("cacheSizeMB", "" + 
DEFAULT_CACHE_SIZE_MB));
+        changeCacheSize();
+    }
+
+    /**
+     * Set the maximum number of roots.
+     *
+     * @param maxRoots the new value
+     */
+    public void setMaxRoots(int maxRoots) {
+        this.maxRoots = maxRoots;
+    }
+
+    public int getMaxRoots() {
+        return maxRoots;
+    }
+
+    /**
+     * Set the cache size in MB.
+     *
+     * @param mb the value
+     */
+    public void setCacheSizeMB(int mb) {
+        this.cacheSizeMB = mb;
+    }
+
+    /**
+     * Set the maximum file size. Files might be slightly larger than that, 
but not a lot.
+     *
+     * @param sizeBytes the file size in bytes
+     */
+    public void setMaxFileSize(int sizeBytes) {
+        this.maxFileSize = sizeBytes;
+        changeCacheSize();
+    }
+
+    private void changeCacheSize() {
+        int cacheEntryCount = (int) (cacheSizeMB * 1024L * 1024 / maxFileSize);
+        cache.setSize(cacheEntryCount);
+    }
+
+    /**
+     * Get the number of files read from the cache or storage.
+     *
+     * @return the result
+     */
+    public long getFileReadCount() {
+        return fileReadCount;
+    }
+
+    private void mergeRootsIfNeeded() {
+        int count = 0;
+        String nextRoot = ROOT_NAME;
+        do {
+            PageFile root = getFile(nextRoot);
+            nextRoot = root.getNextRoot();
+            count++;
+        } while (nextRoot != null);
+        if (count > maxRoots) {
+            mergeRoots();
+        }
+    }
+
+    /**
+     * Initialize the storage, creating a root.
+     */
+    public void init() {
+        PageFile root = newPageFile(false);
+        putFile(ROOT_NAME, root);
+    }
+
+    private PageFile copyPageFile(PageFile old) {
+        PageFile result = old.copy();
+        result.setUpdate(updateId);
+        return result;
+    }
+
+    private PageFile newPageFile(boolean isInternalNode) {
+        PageFile result = new PageFile(isInternalNode);
+        result.setUpdate(updateId);
+        return result;
+    }
+
+    /**
+     * Get an entry.
+     *
+     * @param key the key
+     * @return the value, or null
+     */
+    public String get(String key) {
+        if (key == null) {
+            throw new NullPointerException();
+        }
+        String fileName = ROOT_NAME;
+        do {
+            PageFile file = getFile(fileName);
+            String nextRoot = file.getNextRoot();
+            String result = get(file, key);
+            if (result != null) {
+                return result == DELETED ? null : result;
+            }
+            fileName = nextRoot;
+        } while (fileName != null);
+        return null;
+    }
+
+    /**
+     * Get the entry if it exists.
+     *
+     * @param root the root file
+     * @param k the key
+     * @return null if not found, DELETED if removed, or the value
+     */
+    private String get(PageFile root, String k) {
+        while (true) {
+            if (!root.isInnerNode()) {
+                int index = root.getKeyIndex(k);
+                if (index >= 0) {
+                    String result = root.getValue(index);
+                    return result == null ? DELETED : result;
+                }
+                return null;
+            }
+            int index = root.getKeyIndex(k);
+            if (index < 0) {
+                index = -index - 2;
+            }
+            index++;
+            String fileName = root.getChildValue(index);
+            root = getFile(fileName);
+            // continue with the new file
+        }
+    }
+
+    /**
+     * Put a value.
+     * To remove an entry, the value needs to be null.
+     *
+     * @param key the key
+     * @param value the value
+     */
+    public void put(String key, String value) {
+        if (key == null) {
+            throw new NullPointerException();
+        }
+        if (value == null) {
+            PageFile root = getFile(ROOT_NAME);
+            if (root.getNextRoot() != null) {
+                value = DELETED;
+            }
+        }
+        put(ROOT_NAME, key, value);
+    }
+
+    /**
+     * Put a value.
+     *
+     * @param rootFileName
+     * @param key
+     * @param value (DELETED if we need to put that, or null to remove the 
entry)
+     * @return the file name of the root (different than the passed file name, 
if the file was copied)
+     */
+    private void put(String rootFileName, String key, String value) {
+        String fileName = rootFileName;
+        PageFile file = getFile(fileName);
+        if (file.getUpdate() < updateId) {
+            fileName = store.newFileName();
+            file = copyPageFile(file);
+            putFile(fileName, file);
+        }
+        ArrayList<String> parents = new ArrayList<>();
+        String k = key;
+        while (true) {
+            int index = file.getKeyIndex(k);
+            if (!file.isInnerNode()) {
+                if (index >= 0) {
+                    if (value == null) {
+                        file.removeRecord(index);
+                    } else {
+                        file.setValue(index, value == DELETED ? null : value);
+                    }
+                } else {
+                    // not found
+                    if (value == null) {
+                        // nothing to do
+                        return;
+                    }
+                    file.insertRecord(-index - 1, k, value == DELETED ? null : 
value);
+                }
+                break;
+            }
+            parents.add(fileName);
+            if (index < 0) {
+                index = -index - 2;
+            }
+            index++;
+            fileName = file.getChildValue(index);
+            file = getFile(fileName);
+            // continue with the new file
+        }
+        putFile(fileName, file);
+        splitOrMerge(fileName, file, parents);
+    }
+
+    private void splitOrMerge(String fileName, PageFile file, 
ArrayList<String> parents) {
+        int size = file.sizeInBytes();
+        if (size > maxFileSize && file.canSplit()) {
+            split(fileName, file, parents);
+        } else if (file.getKeys().size() == 0) {
+            merge(fileName, file, parents);
+        }
+    }
+
+    private void merge(String fileName, PageFile file, ArrayList<String> 
parents) {
+        if (file.getValueCount() > 0) {
+            return;
+        }
+        if (parents.isEmpty()) {
+            // root: ignore
+            return;
+        }
+        String parentFileName = parents.remove(parents.size() - 1);
+        PageFile parentFile = getFile(parentFileName);
+        for (int i = 0; i < parentFile.getValueCount(); i++) {
+            String pf = parentFile.getChildValue(i);
+            if (pf.equals(fileName)) {
+                if (parentFile.getValueCount() == 1) {
+                    parentFile = newPageFile(false);
+                    if (!parentFileName.startsWith(ROOT_NAME)) {
+                        String newParentFileName = LEAF_PREFIX + 
parentFileName.substring(INNER_NODE_PREFIX.length());
+                        putFile(newParentFileName, parentFile);
+                        updateChildFileName(parents.get(parents.size() - 1), 
parentFileName, newParentFileName);
+                        parentFileName = newParentFileName;
+                    }
+                } else if (i == parentFile.getValueCount() - 1) {
+                    // remove the last entry
+                    parentFile.removeKey(i - 1);
+                    parentFile.removeValue(i);
+                } else {
+                    parentFile.removeKey(i);
+                    parentFile.removeValue(i);
+                }
+                putFile(parentFileName, parentFile);
+                merge(parentFileName, parentFile, parents);
+                break;
+            }
+        }
+    }
+
+    private void updateChildFileName(String fileName, String oldChild, String 
newChild) {
+        PageFile file = getFile(fileName);
+        for (int i = 0; i < file.getValueCount(); i++) {
+            if (file.getChildValue(i).equals(oldChild)) {
+                file.setValue(i, newChild);
+                putFile(fileName, file);
+                return;
+            }
+        }
+    }
+
+    private void split(String fileName, PageFile file, ArrayList<String> 
parents) {
+        List<String> keys = new ArrayList<>(file.getKeys());
+        String parentFileName, newFileName1, newFileName2;
+        PageFile parentFile, newFile1, newFile2;
+        boolean isInternalNode = file.isInnerNode();
+        if (parents.isEmpty()) {
+            // new root
+            parentFileName = fileName;
+            parentFile = newPageFile(true);
+            parentFile.setNextRoot(file.getNextRoot());
+            newFileName1 = (isInternalNode ? INNER_NODE_PREFIX : LEAF_PREFIX) +
+                    store.newFileName();
+            parentFile.addChild(0, null, newFileName1);
+        } else {
+            parentFileName = parents.remove(parents.size() - 1);
+            parentFile = getFile(parentFileName);
+            newFileName1 = fileName;
+        }
+        newFile1 = newPageFile(isInternalNode);
+        newFileName2 = (isInternalNode ? INNER_NODE_PREFIX : LEAF_PREFIX) +
+                store.newFileName();
+        newFile2 = newPageFile(isInternalNode);
+        int sentinelIndex = keys.size() / 2;
+        String sentinel = keys.get(sentinelIndex);
+        // shorten the sentinel if possible
+        String beforeSentinal = keys.get(sentinelIndex - 1);
+        while (sentinel.length() > 0 && !isInternalNode) {
+            // for internal nodes, there might be other keys on the left side
+            // that might be shoter than the entry before the sentinel
+            String oneShorter = sentinel.substring(0, sentinel.length() - 1);
+            if (beforeSentinal.compareTo(oneShorter) >= 0) {
+                break;
+            }
+            sentinel = oneShorter;
+        }
+        if (!isInternalNode) {
+            // leaf
+            for (int i = 0; i < keys.size() / 2; i++) {
+                String k = keys.get(i);
+                String v = file.getValue(i);
+                newFile1.appendRecord(k, v);
+            }
+            for (int i = keys.size() / 2; i < keys.size(); i++) {
+                String k = keys.get(i);
+                String v = file.getValue(i);
+                newFile2.appendRecord(k, v);
+            }
+        } else {
+            // inner node
+            newFile1.addChild(0, null, file.getChildValue(0));
+            for (int i = 1; i <= keys.size() / 2; i++) {
+                String p = keys.get(i - 1);
+                newFile1.appendRecord(p, file.getChildValue(i));
+            }
+            newFile2.addChild(0, null, file.getChildValue(keys.size() / 2 + 
1));
+            for (int i = keys.size() / 2 + 2; i <= keys.size(); i++) {
+                String p = keys.get(i - 1);
+                newFile2.appendRecord(p, file.getChildValue(i));
+            }
+        }
+        // insert sentinel into parent
+        int index = parentFile.getKeyIndex(sentinel);
+        parentFile.addChild(-index, sentinel, newFileName2);
+        putFile(newFileName1, newFile1);
+        putFile(newFileName2, newFile2);
+        putFile(parentFileName, parentFile);
+        splitOrMerge(parentFileName, parentFile, parents);
+    }
+
+    private void putFile(String fileName, PageFile file) {
+        cache.put(fileName, file);
+    }
+
+    private PageFile getFile(String key) {
+        fileReadCount++;
+        PageFile result = cache.get(key);
+        if (result == null) {
+            result = store.get(key);
+            cache.put(key, result);
+        }
+        return result;
+    }
+
+    /**
+     * Merge all roots.
+     */
+    public void mergeRoots() {
+        PageFile root = getFile(ROOT_NAME);
+        String rootFileCopy = ROOT_NAME + "_" + updateId;
+        root = copyPageFile(root);
+        putFile(rootFileCopy, root);
+        Iterator<Entry<String, String>> it = iterator();
+        PageFile newRoot = newPageFile(false);
+        newRoot.setNextRoot(rootFileCopy);
+        putFile(ROOT_NAME, newRoot);
+        while(it.hasNext()) {
+            Entry<String, String> e = it.next();
+            put(e.getKey(), e.getValue());
+        }
+        newRoot = getFile(ROOT_NAME);
+        newRoot.setNextRoot(null);
+        flush();
+    }
+
+    /**
+     * Make the current tree read-only and switch to a new root.
+     * If there are already too many roots, then they will be merged.
+     * All changes are flushed to storage.
+     */
+    public void checkpoint() {
+        mergeRootsIfNeeded();
+        PageFile root = getFile(ROOT_NAME);
+        String rootFileCopy = ROOT_NAME + "_" + updateId;
+        putFile(rootFileCopy, root);
+        updateId++;
+        if (MULTI_ROOT) {
+            root = newPageFile(false);
+            root.setNextRoot(rootFileCopy);
+            putFile(ROOT_NAME, root);
+            // need to flush here
+            // so that GC does not collect rootFileCopy
+            flush();
+            root = copyPageFile(root);
+            putFile(ROOT_NAME, root);
+
+        } else {
+            flush();
+            root = copyPageFile(root);
+            putFile(ROOT_NAME, root);
+        }
+    }
+
+    /**
+     * Flush all changes to storage.
+     */
+    public void flush() {
+        Entry<String, PageFile> changedRoot = null;
+        for(Entry<String, PageFile> e : cache.entrySet()) {
+            String k = e.getKey();
+            PageFile v = e.getValue();
+            if (!v.getModified()) {
+                continue;
+            }
+            if (k.equals(ROOT_NAME)) {
+                // don't store the changed root yet
+                changedRoot = e;
+            } else {
+                store.put(k, v);
+                // here we have to reset the flag
+                v.setModified(false);
+            }
+        }
+        // now store the changed root
+        if (changedRoot != null) {
+            String k = changedRoot.getKey();
+            PageFile v = changedRoot.getValue();
+            cache.put(k, v);
+            store.put(k, v);
+        }
+    }
+
+    // ===============================================================
+    // iteration over entries
+    // this is fast: internally, a stack of Position object is kept
+
+    /**
+     * Get all entries. Do not add or move entries while
+     * iterating.
+     *
+     * @return the result
+     */
+    public Iterator<Entry<String, String>> iterator() {
+        return iterator(null);
+    }
+
+    /**
+     * Get all entries. Do not add or move entries while iterating.
+     *
+     * @param largerThan all returned keys are larger than this; null to start 
at
+     *                   the beginning
+     * @return the result
+     */
+
+    public Iterator<Entry<String, String>> iterator(String largerThan) {
+        ArrayList<SortedStream> streams = new ArrayList<>();
+        String next = ROOT_NAME;
+        while (true) {
+            streams.add(new SortedStream(next, immutableRootIterator(next, 
largerThan)));
+            next = getFile(next).getNextRoot();
+            if (next == null) {
+                break;
+            }
+        }
+        PriorityQueue<SortedStream> pq = new PriorityQueue<>(streams);
+        return new Iterator<Entry<String, String>>() {
+
+            Entry<String, String> current;
+            String lastKey;
+
+            {
+                fetchNext();
+            }
+
+            private void fetchNext() {
+                while (pq.size() > 0) {
+                    SortedStream s = pq.poll();
+                    if (s.currentKey == null) {
+                        // if this is null, it must be the last stream
+                        break;
+                    }
+                    String key = s.currentKey;
+                    if (key.equals(lastKey)) {
+                        continue;
+                    }
+                    String value = s.currentValue;
+                    s.next();
+                    pq.add(s);
+                    if (value == DELETED) {
+                        continue;
+                    }
+                    lastKey = key;
+                    current = new Entry<>() {
+
+                        @Override
+                        public String getKey() {
+                            return key;
+                        }
+
+                        @Override
+                        public String getValue() {
+                            return value;
+                        }
+
+                        @Override
+                        public String setValue(String value) {
+                            throw new UnsupportedOperationException();
+                        }
+
+                    };
+                    return;
+                }
+                current = null;
+            }
+
+            @Override
+            public boolean hasNext() {
+                return current != null;
+            }
+
+            @Override
+            public Entry<String, String> next() {
+                Entry<String, String> result = current;
+                fetchNext();
+                return result;
+            }
+
+        };
+    }
+
+    private Iterator<Position> immutableRootIterator(String rootFileName, 
String largerThan) {
+
+        return new Iterator<Position>() {
+            private final ArrayList<Position> stack = new ArrayList<>();
+            private Position current;
+
+            {
+                current = new Position();
+                current.file = getFile(rootFileName);
+                current.valuePos = index(current.file, largerThan);
+                down(largerThan);
+                if (current.valuePos >= current.file.getValueCount()) {
+                    next();
+                }
+            }
+
+            private int index(PageFile file, String largerThan) {
+                if (largerThan == null) {
+                    return 0;
+                }
+                int index = file.getKeyIndex(largerThan);
+                if (file.isInnerNode()) {
+                    if (index < 0) {
+                        index = -index - 2;
+                    }
+                    index++;
+                } else {
+                    if (index < 0) {
+                        index = -index - 1;
+                    } else {
+                        index++;
+                    }
+                }
+                return index;
+            }
+
+            @Override
+            public String toString() {
+                return stack + " " + current;
+            }
+
+            @Override
+            public boolean hasNext() {
+                return current != null;
+            }
+
+            @Override
+            public Position next() {
+                if (current == null) {
+                    throw new NoSuchElementException();
+                }
+                Position result = current;
+                current = new Position();
+                current.file = result.file;
+                current.valuePos = result.valuePos + 1;
+                while (true) {
+                    if (!current.file.isInnerNode() && current.valuePos < 
result.file.getValueCount()) {
+                        break;
+                    }
+                    if (stack.size() == 0) {
+                        current = null;
+                        break;
+                    }
+                    current = stack.remove(stack.size() - 1);
+                    current.valuePos++;
+                    if (current.valuePos < current.file.getValueCount()) {
+                        down(null);
+                        break;
+                    }
+                }
+                return result;
+            }
+
+            private void down(String largerThan) {
+                while (current.file.isInnerNode()) {
+                    stack.add(current);
+                    Position pos = new Position();
+                    PageFile file = 
getFile(current.file.getChildValue(current.valuePos));
+                    pos.file = file;
+                    pos.valuePos = index(pos.file, largerThan);
+                    current = pos;
+                }
+            }
+        };
+    }
+
+    // ===============================================================
+    // iteration over keys, over all roots
+    // this is a bit slow: internally, *for each key*
+    // it will traverse from all roots down to the leaf
+
+    /**
+     * Return all keys in sorted order. Roots don't need to be merged.
+     *
+     * @return all keys
+     */
+    public Iterable<String> keys() {
+        return keys(null);
+    }
+
+    /**
+     * Return all keys in sorted order.
+     *
+     * @param largerThan all returned keys are larger than this; null to start 
at
+     *                   the beginning
+     * @return the keys
+     */
+    public Iterable<String> keys(String largerThan) {
+        final String next = getNextKey(largerThan);
+        return new Iterable<String>() {
+
+            @Override
+            public Iterator<String> iterator() {
+                return new Iterator<String>() {
+
+                    private String current = next;
+
+                    @Override
+                    public boolean hasNext() {
+                        return current != null;
+                    }
+
+                    @Override
+                    public String next() {
+                        if (current == null) {
+                            throw new NoSuchElementException();
+                        }
+                        String result = current;
+                        current = getNextKey(current);
+                        return result;
+                    }
+
+                };
+            }
+
+        };
+    }
+
+    private String getNextKey(String largerThan) {
+        if (MULTI_ROOT) {
+            String fileName = ROOT_NAME;
+            String result = null;
+            do {
+                String next = getNextKey(largerThan, fileName);
+                if (result == null) {
+                    result = next;
+                } else if (next != null && next.compareTo(result) < 0) {
+                    result = next;
+                }
+                PageFile file = getFile(fileName);
+                fileName = file.getNextRoot();
+            } while (fileName != null);
+            return result;
+        } else {
+            return getNextKey(largerThan, ROOT_NAME);
+        }
+    }
+
+    private String getNextKey(String largerThan, String fileName) {
+        PageFile file = getFile(fileName);
+        if (!file.isInnerNode()) {
+            String nextKey = file.getNextKey(largerThan);
+            if (nextKey != null) {
+                return nextKey;
+            }
+            return null;
+        }
+        int index;
+        index = largerThan == null ? -1 : file.getKeyIndex(largerThan);
+        if (index < 0) {
+            index = -index - 2;
+        }
+        index++;
+        for (; index < file.getValueCount(); index++) {
+            fileName = file.getChildValue(index);
+            String result = getNextKey(largerThan, fileName);
+            if (result != null) {
+                return result;
+            }
+        }
+        return null;
+    }
+
+    // ===============================================================
+    // partitioning
+
+    public String getMinKey() {
+        return getNextKey(null);
+    }
+
+    public String getMaxKey() {
+        if (getFile(ROOT_NAME).getNextRoot() != null) {
+            throw new UnsupportedOperationException("Not fully merged");
+        }
+        String fileName = ROOT_NAME;
+        while (true) {
+            PageFile file = getFile(fileName);
+            if (!file.isInnerNode()) {
+                return file.getKey(file.getKeys().size() - 1);
+            }
+            fileName = file.getChildValue(file.getValueCount() - 1);
+        }
+    }
+
+    public String getApproximateMedianKey(String low, String high) {
+        if (getFile(ROOT_NAME).getNextRoot() != null) {
+            throw new UnsupportedOperationException("Not fully merged");
+        }
+        String fileName = ROOT_NAME;
+        while (true) {
+            PageFile file = getFile(fileName);
+            if (!file.isInnerNode()) {
+                return file.getKey(0);
+            }
+            int i1 = file.getKeyIndex(low);
+            int i2 = file.getKeyIndex(high);
+            if (i1 < 0) {
+                i1 = -i1 - 1;
+            }
+            if (i2 < 0) {
+                i2 = -i2 - 1;
+            }
+            if (i2 != i1) {
+                int middle = (i1 + i2) / 2;
+                return file.getKey(middle);
+            }
+            fileName = file.getChildValue(i1);
+        }
+    }
+
+}
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/SlowStore.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/SlowStore.java
new file mode 100644
index 0000000000..8169091d21
--- /dev/null
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/SlowStore.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store;
+
+import java.util.Properties;
+import java.util.Set;
+
+public class SlowStore implements Store {
+
+    private final Properties config;
+    private final Store backend;
+    private final long mbOverhead;
+    private final long mbPerSecond;
+
+    public String toString() {
+        return "slow(" + backend + ")";
+    }
+
+    SlowStore(Store backend) {
+        this.config = backend.getConfig();
+        this.backend = backend;
+        this.mbOverhead = Integer.parseInt(config.getProperty("mbOverhead", 
"3"));
+        this.mbPerSecond = Integer.parseInt(config.getProperty("mbPerSecond", 
"70"));
+    }
+
+    private void delay(long nanos, int sizeInBytes) {
+        long bytes = sizeInBytes + 1_000_000 * mbOverhead;
+        long nanosRequired = 1_000 * bytes / mbPerSecond;
+        long delayNanos = nanosRequired - nanos;
+        long delayMillis = delayNanos / 1_000_000;
+        if (delayMillis > 0) {
+            try {
+                Thread.sleep(delayMillis);
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+    }
+
+    @Override
+    public PageFile getIfExists(String key) {
+        long start = System.nanoTime();
+        PageFile result = backend.getIfExists(key);
+        long time = System.nanoTime() - start;
+        delay(time, result == null ? 0 : result.sizeInBytes());
+        return result;
+    }
+
+    @Override
+    public void put(String key, PageFile value) {
+        long start = System.nanoTime();
+        backend.put(key, value);
+        long time = System.nanoTime() - start;
+        delay(time, value.sizeInBytes());
+    }
+
+    @Override
+    public String newFileName() {
+        return backend.newFileName();
+    }
+
+    @Override
+    public Set<String> keySet() {
+        return backend.keySet();
+    }
+
+    @Override
+    public void remove(Set<String> set) {
+        backend.remove(set);
+    }
+
+    @Override
+    public void removeAll() {
+        backend.removeAll();
+    }
+
+    @Override
+    public long getWriteCount() {
+        return backend.getWriteCount();
+    }
+
+    @Override
+    public long getReadCount() {
+        return backend.getReadCount();
+    }
+
+    @Override
+    public void setWriteCompression(Compression compression) {
+        backend.setWriteCompression(compression);
+    }
+
+    @Override
+    public void close() {
+        backend.close();
+    }
+
+    @Override
+    public Properties getConfig() {
+        return config;
+    }
+
+    @Override
+    public boolean supportsByteOperations() {
+        return backend.supportsByteOperations();
+    }
+
+    @Override
+    public byte[] getBytes(String key) {
+        long start = System.nanoTime();
+        byte[] result = backend.getBytes(key);
+        long time = System.nanoTime() - start;
+        delay(time, result.length);
+        return result;
+    }
+
+    @Override
+    public void putBytes(String key, byte[] data) {
+        long start = System.nanoTime();
+        backend.putBytes(key, data);
+        long time = System.nanoTime() - start;
+        delay(time, data.length);
+    }
+
+}
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/StatsStore.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/StatsStore.java
new file mode 100644
index 0000000000..ce5b39dc96
--- /dev/null
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/StatsStore.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store;
+
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class StatsStore implements Store {
+
+    private final Properties config;
+    private final Store backend;
+    private long lastLog;
+    private AtomicLong pending = new AtomicLong();
+
+    private final ConcurrentHashMap<String, Stats> map = new 
ConcurrentHashMap<>();
+
+    public String toString() {
+        return "stats(" + backend + ")";
+    }
+
+    StatsStore(Store backend) {
+        this.config = backend.getConfig();
+        this.backend = backend;
+    }
+
+    @Override
+    public PageFile getIfExists(String key) {
+        long start = start();
+        long sizeInBytes = 0;
+        try {
+            PageFile result = backend.getIfExists(key);
+            sizeInBytes = result == null ? 0 : result.sizeInBytes();
+            return result;
+        } finally {
+            add("getIfExists", start, sizeInBytes);
+        }
+    }
+
+    private long start() {
+        pending.incrementAndGet();
+        return System.nanoTime();
+    }
+
+    private void add(String key, long start, long bytes) {
+        long now = System.nanoTime();
+        pending.decrementAndGet();
+        long nanos = now - start;
+        Stats stats = map.computeIfAbsent(key, s -> new Stats(key));
+        stats.count++;
+        stats.nanosMax = Math.max(stats.nanosMax, nanos);
+        stats.nanosMin = Math.min(stats.nanosMin, nanos);
+        stats.nanosTotal += nanos;
+        stats.bytesMax = Math.max(stats.bytesMax, bytes);
+        stats.bytesMin = Math.min(stats.bytesMin, nanos);
+        stats.bytesTotal += bytes;
+        if (lastLog == 0) {
+            lastLog = start;
+        }
+        if (now - lastLog > 10_000_000_000L) {
+            TreeMap<String, Stats> sorted = new TreeMap<>(map);
+            System.out.print(backend.toString());
+            System.out.println(sorted.values().toString() + " pending " + 
pending);
+            lastLog = now;
+        }
+    }
+
+    @Override
+    public void put(String key, PageFile value) {
+        long start = start();
+        try {
+            backend.put(key, value);
+        } finally {
+            add("put", start, value.sizeInBytes());
+        }
+    }
+
+    @Override
+    public String newFileName() {
+        return backend.newFileName();
+    }
+
+    @Override
+    public Set<String> keySet() {
+        return backend.keySet();
+    }
+
+    @Override
+    public void remove(Set<String> set) {
+        backend.remove(set);
+    }
+
+    @Override
+    public void removeAll() {
+        backend.removeAll();
+    }
+
+    @Override
+    public long getWriteCount() {
+        return backend.getWriteCount();
+    }
+
+    @Override
+    public long getReadCount() {
+        return backend.getReadCount();
+    }
+
+    @Override
+    public void setWriteCompression(Compression compression) {
+        backend.setWriteCompression(compression);
+    }
+
+    @Override
+    public void close() {
+        backend.close();
+    }
+
+    @Override
+    public Properties getConfig() {
+        return config;
+    }
+
+    @Override
+    public boolean supportsByteOperations() {
+        return backend.supportsByteOperations();
+    }
+
+    @Override
+    public byte[] getBytes(String key) {
+        long start = start();
+        long len = 0;
+        try {
+            byte[] result = backend.getBytes(key);
+            len = result.length;
+            return result;
+        } finally {
+            add("getBytes", start, len);
+        }
+    }
+
+    @Override
+    public void putBytes(String key, byte[] data) {
+        long start = start();
+        try {
+            backend.putBytes(key, data);
+        } finally {
+            add("putBytes", start, data.length);
+        }
+    }
+
+    static class Stats {
+        final String key;
+        long count;
+        long bytesMin;
+        long bytesMax;
+        long bytesTotal;
+        long nanosMin;
+        long nanosMax;
+        long nanosTotal;
+
+        public Stats(String key) {
+            this.key = key;
+        }
+
+        public String toString() {
+            if (count == 0) {
+                return "";
+            }
+            String result = key;
+            result += " " + count + " calls";
+            if (bytesTotal > 0 && nanosTotal > 0) {
+                result += " " + (bytesTotal / count / 1_000_000) + " avgMB" +
+                (bytesTotal == 0 ? "" : (" " + ((bytesTotal * 1_000) / 
nanosTotal) + " MB/s"));
+            }
+            return result;
+        }
+
+    }
+
+}
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/Store.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/Store.java
new file mode 100644
index 0000000000..37e70560ac
--- /dev/null
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/Store.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store;
+
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Storage for files.
+ */
+public interface Store {
+
+    /**
+     * Get a file
+     *
+     * @param key the file name
+     * @return the file
+     */
+    default PageFile get(String key) {
+        PageFile result = getIfExists(key);
+        if (result == null) {
+            throw new IllegalStateException("Not found: " + key);
+        }
+        return result;
+    }
+
+    /**
+     * Get a file if it exists
+     *
+     * @param key the file name
+     * @return the file, or null
+     */
+    PageFile getIfExists(String key);
+
+    /**
+     * Storage a file.
+     *
+     * @param key the file name
+     * @param value the file
+     */
+    void put(String key, PageFile value);
+
+    /**
+     * Generate a new file name.
+     *
+     * @return
+     */
+    String newFileName();
+
+    /**
+     * Get the list of files.
+     *
+     * @return the result
+     */
+    Set<String> keySet();
+
+    /**
+     * Remove a number of files.
+     *
+     * @param set the result
+     */
+    void remove(Set<String> set);
+
+    /**
+     * Remove all files.
+     */
+    void removeAll();
+
+    /**
+     * Get the number of files written.
+     *
+     * @return the result
+     */
+    long getWriteCount();
+
+    /**
+     * Get the number of files read.
+     *
+     * @return the result
+     */
+    long getReadCount();
+
+    /**
+     * Set the compression algorithm used for writing from now on.
+     *
+     * @param compression the compression algorithm
+     */
+    void setWriteCompression(Compression compression);
+
+    /**
+     * Close the store
+     */
+    void close();
+
+    Properties getConfig();
+
+    default boolean supportsByteOperations() {
+        return false;
+    }
+
+    default byte[] getBytes(String key) {
+        throw new UnsupportedOperationException();
+    }
+
+    default void putBytes(String key, byte[] data) {
+        throw new UnsupportedOperationException();
+    }
+
+}
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/StoreBuilder.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/StoreBuilder.java
new file mode 100644
index 0000000000..ccab3e112c
--- /dev/null
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/StoreBuilder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Properties;
+
+public class StoreBuilder {
+
+    /**
+     * Build a store. The configuration options are passed as a list of 
properties.
+     *
+     * - empty string or null: in-memory.
+     * - "type=file": file system, with "dir" directory
+     * - "type=azure": Azure
+     * - "type=s3": Amazon S3
+     * - "type=diskCache": disk cache, with "dir" and "backend=azure" or "s3"
+     *
+     * @param config the config
+     * @return a store
+     * @throws IllegalArgumentException
+     */
+    public static Store build(String config) throws IllegalArgumentException {
+        if (config == null || config.isEmpty()) {
+            return new MemoryStore(new Properties());
+        }
+        Properties prop = new Properties();
+        try {
+            prop.load(new StringReader(config));
+        } catch (IOException e) {
+            throw new IllegalArgumentException(config, e);
+        }
+        return build(prop);
+    }
+
+    public static Store build(Properties config) {
+        String type = config.getProperty("type");
+        switch (type) {
+        case "memory":
+            return new MemoryStore(config);
+        case "file":
+            return new FileStore(config);
+        case "azure":
+            return new AzureStore(config);
+        case "diskCache":
+            return new DiskCacheStore(config);
+        }
+        if (type.startsWith("stats.")) {
+            config.put("type", type.substring("stats.".length()));
+            return new StatsStore(build(config));
+        } else if (type.startsWith("slow.")) {
+            config.put("type", type.substring("slow.".length()));
+            return new SlowStore(build(config));
+        } else if (type.startsWith("log.")) {
+            config.put("type", type.substring("log.".length()));
+            return new LogStore(build(config));
+        }
+        throw new IllegalArgumentException(config.toString());
+    }
+
+    public static Properties subProperties(Properties p, String prefix) {
+        Properties p2 = new Properties();
+        for (Object k : p.keySet()) {
+            if (k.toString().startsWith(prefix)) {
+                p2.put(k.toString().substring(prefix.length()), p.get(k));
+            }
+        }
+        return p2;
+    }
+
+}
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/utils/Cache.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/utils/Cache.java
new file mode 100644
index 0000000000..fcb2e4bf49
--- /dev/null
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/utils/Cache.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store.utils;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class Cache<K, V> extends LinkedHashMap<K, V> {
+
+    private static final long serialVersionUID = 1L;
+    private int maxSize;
+
+    public Cache(int maxSize) {
+        super(16, 0.75f, true);
+        this.maxSize = maxSize;
+    }
+
+    public void setSize(int maxSize) {
+        this.maxSize = maxSize;
+    }
+
+    @Override
+    public boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+        return size() > maxSize;
+    }
+
+}
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/utils/Position.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/utils/Position.java
new file mode 100644
index 0000000000..2fc6b6e51a
--- /dev/null
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/utils/Position.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store.utils;
+
+import org.apache.jackrabbit.oak.index.indexer.document.tree.store.PageFile;
+
+public class Position {
+    public PageFile file;
+    public int valuePos;
+
+    public String toString() {
+        return (file.isInnerNode() ? "internal" : "leaf") + " " + valuePos + " 
" + file.getKeys() + "\n";
+    }
+}
\ No newline at end of file
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/utils/SortedStream.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/utils/SortedStream.java
new file mode 100644
index 0000000000..8659becb19
--- /dev/null
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/utils/SortedStream.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store.utils;
+
+import java.util.Iterator;
+
+public class SortedStream implements Comparable<SortedStream> {
+
+    private final String rootFileName;
+    private Iterator<Position> it;
+    public String currentKey;
+    public String currentValue;
+
+    public SortedStream(String rootFileName, Iterator<Position> it) {
+        this.rootFileName = rootFileName;
+        this.it = it;
+        next();
+    }
+
+    public String toString() {
+        return "file " + rootFileName + " key " + currentKey + " value " + 
currentValue;
+    }
+
+    String currentKeyOrNull() {
+        return currentKey;
+    }
+
+    Object currentValue() {
+        return currentValue;
+    }
+
+    public void next() {
+        if (it.hasNext()) {
+            Position pos = it.next();
+            currentKey = pos.file.getKey(pos.valuePos);
+            currentValue = pos.file.getValue(pos.valuePos);
+        } else {
+            currentKey = null;
+            currentValue = null;
+        }
+    }
+
+    @Override
+    public int compareTo(SortedStream o) {
+        if (currentKey == null) {
+            if (o.currentKey == null) {
+                return rootFileName.compareTo(o.rootFileName);
+            }
+            return 1;
+        } else if (o.currentKey == null) {
+            return -1;
+        }
+        int comp = currentKey.compareTo(o.currentKey);
+        if (comp == 0) {
+            return rootFileName.compareTo(o.rootFileName);
+        }
+        return comp;
+    }
+}
\ No newline at end of file
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/utils/Uuid.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/utils/Uuid.java
new file mode 100644
index 0000000000..2e2873f6ae
--- /dev/null
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/store/utils/Uuid.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.tree.store.utils;
+
+import java.security.SecureRandom;
+import java.time.Instant;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A UUID implementation.
+ *
+ * It supports creating version 7 UUIDs, which are time-ordered.
+ * See also draft-ietf-uuidrev-rfc4122bis-00
+ *
+ * Unlike java.util.UUID, the comparison is unsigned,
+ * so that the string comparison yields the same result.
+ */
+public class Uuid implements Comparable<Uuid> {
+
+    private static final AtomicLong UUID_LAST_MILLIS_AND_COUNT = new 
AtomicLong(0);
+
+    private static final SecureRandom RANDOM = new SecureRandom();
+
+    // most significant bits
+    private final long msb;
+
+    // least significant bits
+    private final long lsb;
+
+    Uuid(long msb, long lsb) {
+        this.msb = msb;
+        this.lsb = lsb;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("%08x-%04x-%04x-%04x-%012x",
+                msb >>> 32, (msb >>> 16) & 0xffff, msb & 0xffff,
+                (lsb >>> 48) & 0xffff, lsb & 0xffffffffffffL);
+    }
+
+    public String toShortString() {
+        return String.format("%012x%03x%016x",
+                getTimestampPart(), getCounterPart(), getRandomPart());
+    }
+
+    public String toHumanReadableString() {
+        Instant instant = Instant.ofEpochMilli(getTimestampPart());
+        return String.format("%s %03x %016x",
+                instant.toString(), getCounterPart(), getRandomPart());
+    }
+
+    /**
+     * Get the timestamp part (48 bits).
+     *
+     * @return the timestamp part
+     */
+    public long getTimestampPart() {
+        return msb >>> 16;
+    }
+
+    /**
+     * Get the counter part (12 bits).
+     *
+     * @return counter part
+     */
+    public long getCounterPart() {
+        return msb & ((1L << 12) - 1);
+    }
+
+    /**
+     * Get the random part (62 bits).
+     * The first 2 bits are fixed.
+     *
+     * @return the random part
+     */
+    public long getRandomPart() {
+        return lsb;
+    }
+
+    /**
+     * Unlike java.util.UUID, the comparison is unsigned,
+     * so that the string comparison yields the same result.
+     */
+    @Override
+    public int compareTo(Uuid o) {
+        if (o.msb != msb) {
+            return Long.compareUnsigned(msb, o.msb);
+        }
+        return Long.compareUnsigned(lsb, o.lsb);
+    }
+
+    @Override
+    public int hashCode() {
+        long x = lsb ^ msb;
+        return (int) ((x >>> 32) ^ x);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        Uuid other = (Uuid) obj;
+        return lsb == other.lsb && msb == other.msb;
+    }
+
+    /**
+     * Get the next timestamp (in milliseconds) and counter.
+     * The lowest 12 bits of the returned value is the counter.
+     * The milliseconds are shifted by 12 bits.
+     *
+     * @param now the milliseconds
+     * @param lastMillisAndCount the last returned value
+     * @return the new value
+     */
+    static long getMillisAndCountIncreasing(long now, AtomicLong 
lastMillisAndCount) {
+        long result = now << 12;
+        while (true) {
+            long last = lastMillisAndCount.get();
+            if (result <= last) {
+                // ensure it is non-decrementing
+                result = last + 1;
+            }
+            long got = lastMillisAndCount.compareAndExchange(last, result);
+            if (got == last) {
+                return result;
+            }
+        }
+    }
+
+    static Uuid timeBasedVersion7(long millisAndCount,
+            long random) {
+        long millis = millisAndCount >>> 12;
+        long counter = millisAndCount & ((1L << 12) - 1);
+        long version = 7;
+        long variant = 2;
+        long msb = (millis << 16) | (version << 12) | counter;
+        long lsb = (variant << 62) | (random & ((1L << 62) - 1));
+        return new Uuid(msb, lsb);
+    }
+
+    public static Uuid timeBasedVersion7() {
+        long millisAndCount = getMillisAndCountIncreasing(
+                System.currentTimeMillis(),
+                UUID_LAST_MILLIS_AND_COUNT);
+        long random = RANDOM.nextLong();
+        return timeBasedVersion7(millisAndCount, random);
+    }
+
+    public long getMostSignificantBits() {
+        return msb;
+    }
+
+    public long getLeastSignificantBits() {
+        return lsb;
+    }
+
+}

Reply via email to