Author: catholicon
Date: Tue Sep  5 03:56:06 2017
New Revision: 1807310

URL: http://svn.apache.org/viewvc?rev=1807310&view=rev
Log:
OAK-6576: Refactor OakDirectory to be more manageable

Added:
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/BlobFactory.java
   (with props)
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexFile.java
   (with props)
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexInput.java
   (with props)
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexOutput.java
   (with props)
Modified:
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectory.java
    
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectoryTest.java

Added: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/BlobFactory.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/BlobFactory.java?rev=1807310&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/BlobFactory.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/BlobFactory.java
 Tue Sep  5 03:56:06 2017
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.lucene;
+
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
+import org.apache.jackrabbit.oak.spi.blob.BlobOptions;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static 
org.apache.jackrabbit.oak.spi.blob.BlobOptions.UploadType.SYNCHRONOUS;
+
+public interface BlobFactory {
+    Boolean ENABLE_ASYNC_DS = Boolean.getBoolean("oak.lucene.ds.async");
+
+    Blob createBlob(InputStream in) throws IOException;
+
+    static BlobFactory getNodeBuilderBlobFactory(final NodeBuilder builder) {
+        return builder::createBlob;
+    }
+
+    static BlobFactory getBlobStoreBlobFactory(final BlobStore store) {
+        return in -> {
+            String blobId;
+            if (!ENABLE_ASYNC_DS) {
+                blobId = store.writeBlob(in, new 
BlobOptions().setUpload(SYNCHRONOUS));
+            } else {
+                blobId = store.writeBlob(in);
+            }
+            return new BlobStoreBlob(store, blobId);
+        };
+    }
+}

Propchange: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/BlobFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java?rev=1807310&r1=1807309&r2=1807310&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java
 Tue Sep  5 03:56:06 2017
@@ -16,38 +16,17 @@
  */
 package org.apache.jackrabbit.oak.plugins.index.lucene;
 
-import java.io.ByteArrayInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.SequenceInputStream;
-import java.security.SecureRandom;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import com.google.common.io.ByteStreams;
-import com.google.common.primitives.Ints;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.commons.StringUtils;
-import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
+import org.apache.jackrabbit.oak.commons.benchmark.PerfLogger;
 import 
org.apache.jackrabbit.oak.plugins.index.lucene.directory.ActiveDeletedBlobCollectorFactory.BlobDeletionCallback;
-import org.apache.jackrabbit.oak.spi.blob.BlobOptions;
-import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
-import org.apache.jackrabbit.oak.commons.benchmark.PerfLogger;
-import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
@@ -55,32 +34,30 @@ import org.apache.lucene.store.IndexOutp
 import org.apache.lucene.store.Lock;
 import org.apache.lucene.store.LockFactory;
 import org.apache.lucene.store.NoLockFactory;
-import org.apache.lucene.util.WeakIdentityMap;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.SecureRandom;
+import java.util.Collection;
+import java.util.Set;
+
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkElementIndex;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkPositionIndexes;
-import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.collect.Lists.newArrayList;
 import static org.apache.jackrabbit.JcrConstants.JCR_DATA;
-import static org.apache.jackrabbit.JcrConstants.JCR_LASTMODIFIED;
 import static org.apache.jackrabbit.oak.api.Type.BINARIES;
 import static org.apache.jackrabbit.oak.api.Type.BINARY;
 import static org.apache.jackrabbit.oak.api.Type.STRINGS;
 import static 
org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.INDEX_DATA_CHILD_NAME;
 import static 
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 import static 
org.apache.jackrabbit.oak.plugins.memory.PropertyStates.createProperty;
-import static 
org.apache.jackrabbit.oak.spi.blob.BlobOptions.UploadType.SYNCHRONOUS;
 
 /**
  * Implementation of the Lucene {@link Directory} (a flat list of files)
  * based on an Oak {@link NodeBuilder}.
  */
 public class OakDirectory extends Directory {
-    private static final Boolean ENABLE_AYNC_DS = 
Boolean.getBoolean("oak.lucene.ds.async");
-
     static final PerfLogger PERF_LOGGER = new 
PerfLogger(LoggerFactory.getLogger(OakDirectory.class.getName() + ".perf"));
     static final String PROP_DIR_LISTING = "dirListing";
     static final String PROP_BLOB_SIZE = "blobSize";
@@ -107,7 +84,7 @@ public class OakDirectory extends Direct
     }
 
     public OakDirectory(NodeBuilder builder, String dataNodeName, 
IndexDefinition definition, boolean readOnly) {
-        this(builder, dataNodeName, definition, readOnly, new 
NodeBuilderBlobFactory(builder));
+        this(builder, dataNodeName, definition, readOnly, 
BlobFactory.getNodeBuilderBlobFactory(builder));
     }
 
     public OakDirectory(NodeBuilder builder, String dataNodeName, 
IndexDefinition definition,
@@ -119,7 +96,7 @@ public class OakDirectory extends Direct
                         boolean readOnly, @Nullable 
GarbageCollectableBlobStore blobStore,
                         @Nonnull BlobDeletionCallback blobDeletionCallback) {
         this(builder, dataNodeName, definition, readOnly,
-                blobStore != null ? new BlobStoreBlobFactory(blobStore) : new 
NodeBuilderBlobFactory(builder),
+                blobStore != null ? 
BlobFactory.getBlobStoreBlobFactory(blobStore) : 
BlobFactory.getNodeBuilderBlobFactory(builder),
                 blobDeletionCallback);
     }
 
@@ -322,456 +299,4 @@ public class OakDirectory extends Direct
         return result;
     }
 
-    /**
-     * Size of the blob entries to which the Lucene files are split.
-     * Set to higher than the 4kB inline limit for the BlobStore,
-     */
-    static final int DEFAULT_BLOB_SIZE = 32 * 1024;
-
-    /**
-     * A file, which might be split into multiple blobs.
-     */
-    private static class OakIndexFile {
-
-        /**
-         * The file name.
-         */
-        private final String name;
-
-        /**
-         * The node that contains the data for this file.
-         */
-        private final NodeBuilder file;
-
-        /**
-         * The maximum size of each blob.
-         */
-        private final int blobSize;
-        
-        /**
-         * The current position within the file (for positioned read and write
-         * operations).
-         */
-        private long position = 0;
-
-        /**
-         * The length of the file.
-         */
-        private long length;
-
-        /**
-         * The list of blobs (might be empty).
-         * The last blob has a size of 1 up to blobSize.
-         * All other blobs have a size of blobSize.
-         */
-        private List<Blob> data;
-
-        /**
-         * Whether the data was modified since it was last flushed. If yes, on 
a
-         * flush, the metadata, and the list of blobs need to be stored.
-         */
-        private boolean dataModified = false;
-
-        /**
-         * The index of the currently loaded blob.
-         */
-        private int index = -1;
-
-        /**
-         * The data of the currently loaded blob.
-         */
-        private byte[] blob;
-        
-        /**
-         * The unique key that is used to make the content unique (to allow 
removing binaries from the blob store without risking to remove binaries that 
are still needed).
-         */
-        private final byte[] uniqueKey;
-
-        /**
-         * Whether the currently loaded blob was modified since the blob was
-         * flushed.
-         */
-        private boolean blobModified = false;
-
-        private final String dirDetails;
-
-        private final BlobFactory blobFactory;
-
-        public OakIndexFile(String name, NodeBuilder file, String dirDetails,
-            @Nonnull BlobFactory blobFactory) {
-            this.name = name;
-            this.file = file;
-            this.dirDetails = dirDetails;
-            this.blobSize = determineBlobSize(file);
-            this.uniqueKey = readUniqueKey(file);
-            this.blob = new byte[blobSize];
-            this.blobFactory = checkNotNull(blobFactory);
-
-            PropertyState property = file.getProperty(JCR_DATA);
-            if (property != null && property.getType() == BINARIES) {
-                this.data = newArrayList(property.getValue(BINARIES));
-            } else {
-                this.data = newArrayList();
-            }
-
-            this.length = (long)data.size() * blobSize;
-            if (!data.isEmpty()) {
-                Blob last = data.get(data.size() - 1);
-                this.length -= blobSize - last.length();
-                if (uniqueKey != null) {
-                    this.length -= uniqueKey.length;
-                }
-            }
-        }
-
-        private OakIndexFile(OakIndexFile that) {
-            this.name = that.name;
-            this.file = that.file;
-            this.dirDetails = that.dirDetails;
-            this.blobSize = that.blobSize;
-            this.uniqueKey = that.uniqueKey;
-            this.blob = new byte[blobSize];
-
-            this.position = that.position;
-            this.length = that.length;
-            this.data = newArrayList(that.data);
-            this.dataModified = that.dataModified;
-            this.blobFactory = that.blobFactory;
-        }
-
-        private void loadBlob(int i) throws IOException {
-            checkElementIndex(i, data.size());
-            if (index != i) {
-                flushBlob();
-                checkState(!blobModified);
-
-                int n = (int) Math.min(blobSize, length - (long)i * blobSize);
-                InputStream stream = data.get(i).getNewStream();
-                try {
-                    ByteStreams.readFully(stream, blob, 0, n);
-                } finally {
-                    stream.close();
-                }
-                index = i;
-            }
-        }
-
-        private void flushBlob() throws IOException {
-            if (blobModified) {
-                int n = (int) Math.min(blobSize, length - (long)index * 
blobSize);
-                InputStream in = new ByteArrayInputStream(blob, 0, n);
-                if (uniqueKey != null) {
-                    in = new SequenceInputStream(in, 
-                            new ByteArrayInputStream(uniqueKey));
-                }
-
-                Blob b = blobFactory.createBlob(in);
-                if (index < data.size()) {
-                    data.set(index, b);
-                } else {
-                    checkState(index == data.size());
-                    data.add(b);
-                }
-                dataModified = true;
-                blobModified = false;
-            }
-        }
-
-        public void seek(long pos) throws IOException {
-            // seek() may be called with pos == length
-            // see https://issues.apache.org/jira/browse/LUCENE-1196
-            if (pos < 0 || pos > length) {
-                String msg = String.format("Invalid seek request for [%s][%s], 
" +
-                        "position: %d, file length: %d", dirDetails, name, 
pos, length);
-                throw new IOException(msg);                
-            } else {
-                position = pos;
-            }
-        }
-
-        public void readBytes(byte[] b, int offset, int len)
-                throws IOException {
-            checkPositionIndexes(offset, offset + len, checkNotNull(b).length);
-
-            if (len < 0 || position + len > length) {
-                String msg = String.format("Invalid byte range request for 
[%s][%s], " +
-                        "position: %d, file length: %d, len: %d", dirDetails, 
name, position, length, len);
-                throw new IOException(msg);
-            }
-
-            int i = (int) (position / blobSize);
-            int o = (int) (position % blobSize);
-            while (len > 0) {
-                loadBlob(i);
-
-                int l = Math.min(len, blobSize - o);
-                System.arraycopy(blob, o, b, offset, l);
-
-                offset += l;
-                len -= l;
-                position += l;
-                // next block
-                i++;
-                // for the next block, we read from the beginning
-                o = 0;
-            }
-        }
-
-        public void writeBytes(byte[] b, int offset, int len)
-                throws IOException {
-            int i = (int) (position / blobSize);
-            int o = (int) (position % blobSize);
-            while (len > 0) {
-                int l = Math.min(len, blobSize - o);
-
-                if (index != i) {
-                    if (o > 0 || (l < blobSize && position + l < length)) {
-                        // loadBlob first flushes the previous block,
-                        // and it sets the index
-                        loadBlob(i);
-                    } else {
-                        // we don't need to load the block,
-                        // as we anyway overwrite it fully, if:
-                        // o == 0 (start writing at a block boundary)
-                        // and either: l is the blockSize, or
-                        // we write at least to the end of the file
-                        flushBlob();
-                        index = i;
-                    }
-                }
-                System.arraycopy(b, offset, blob, o, l);
-                blobModified = true;
-
-                offset += l;
-                len -= l;
-                position += l;
-                length = Math.max(length, position);
-
-                i++;
-                o = 0;
-            }
-        }
-
-        private static int determineBlobSize(NodeBuilder file){
-            if (file.hasProperty(PROP_BLOB_SIZE)){
-                return 
Ints.checkedCast(file.getProperty(PROP_BLOB_SIZE).getValue(Type.LONG));
-            }
-            return DEFAULT_BLOB_SIZE;
-        }
-
-        private static byte[] readUniqueKey(NodeBuilder file) {
-            if (file.hasProperty(PROP_UNIQUE_KEY)) {
-                String key = file.getString(PROP_UNIQUE_KEY);
-                return StringUtils.convertHexToBytes(key);
-            }
-            return null;
-        }
-
-        public void flush() throws IOException {
-            flushBlob();
-            if (dataModified) {
-                file.setProperty(JCR_LASTMODIFIED, System.currentTimeMillis());
-                file.setProperty(JCR_DATA, data, BINARIES);
-                dataModified = false;
-            }
-        }
-
-        @Override
-        public String toString() {
-            return name;
-        }
-
-        public String getName() {
-            return name;
-        }
-    }
-
-    private static class OakIndexInput extends IndexInput {
-
-        private final OakIndexFile file;
-        private boolean isClone = false;
-        private final WeakIdentityMap<OakIndexInput, Boolean> clones;
-        private final String dirDetails;
-
-        public OakIndexInput(String name, NodeBuilder file, String dirDetails,
-            BlobFactory blobFactory) {
-            super(name);
-            this.dirDetails = dirDetails;
-            this.file = new OakIndexFile(name, file, dirDetails, blobFactory);
-            clones = WeakIdentityMap.newConcurrentHashMap();
-        }
-
-        private OakIndexInput(OakIndexInput that) {
-            super(that.toString());
-            this.file = new OakIndexFile(that.file);
-            clones = null;
-            this.dirDetails = that.dirDetails;
-        }
-
-        @Override
-        public OakIndexInput clone() {
-            // TODO : shouldn't we call super#clone ?
-            OakIndexInput clonedIndexInput = new OakIndexInput(this);
-            clonedIndexInput.isClone = true;
-            if (clones != null) {
-                clones.put(clonedIndexInput, Boolean.TRUE);
-            }
-            return clonedIndexInput;
-        }
-
-        @Override
-        public void readBytes(byte[] b, int o, int n) throws IOException {
-            checkNotClosed();
-            file.readBytes(b, o, n);
-        }
-
-        @Override
-        public byte readByte() throws IOException {
-            checkNotClosed();
-            byte[] b = new byte[1];
-            readBytes(b, 0, 1);
-            return b[0];
-        }
-
-        @Override
-        public void seek(long pos) throws IOException {
-            checkNotClosed();
-            file.seek(pos);
-        }
-
-        @Override
-        public long length() {
-            checkNotClosed();
-            return file.length;
-        }
-
-        @Override
-        public long getFilePointer() {
-            checkNotClosed();
-            return file.position;
-        }
-
-        @Override
-        public void close() {
-            file.blob = null;
-            file.data = null;
-
-            if (clones != null) {
-                for (Iterator<OakIndexInput> it = clones.keyIterator(); 
it.hasNext();) {
-                    final OakIndexInput clone = it.next();
-                    assert clone.isClone;
-                    clone.close();
-                }
-            }
-        }
-
-        private void checkNotClosed() {
-            if (file.blob == null && file.data == null) {
-                throw new AlreadyClosedException("Already closed: [" + 
dirDetails + "] " + this);
-            }
-        }
-
-    }
-
-    private final class OakIndexOutput extends IndexOutput {
-        private final String dirDetails;
-        private final OakIndexFile file;
-
-        public OakIndexOutput(String name, NodeBuilder file, String dirDetails,
-                              BlobFactory blobFactory) throws IOException {
-            this.dirDetails = dirDetails;
-            this.file = new OakIndexFile(name, file, dirDetails, blobFactory);
-        }
-
-        @Override
-        public long length() {
-            return file.length;
-        }
-
-        @Override
-        public long getFilePointer() {
-            return file.position;
-        }
-
-        @Override
-        public void seek(long pos) throws IOException {
-            file.seek(pos);
-        }
-
-        @Override
-        public void writeBytes(byte[] b, int offset, int length)
-                throws IOException {
-            try {
-                file.writeBytes(b, offset, length);
-            } catch (IOException e) {
-                throw wrapWithDetails(e);
-            }
-        }
-
-        @Override
-        public void writeByte(byte b) throws IOException {
-            writeBytes(new byte[] { b }, 0, 1);
-        }
-
-        @Override
-        public void flush() throws IOException {
-            try {
-                file.flush();
-            } catch (IOException e) {
-                throw wrapWithDetails(e);
-            }
-        }
-
-        @Override
-        public void close() throws IOException {
-            flush();
-            file.blob = null;
-            file.data = null;
-        }
-
-        private IOException wrapWithDetails(IOException e) {
-            String msg = String.format("Error occurred while writing to blob 
[%s][%s]", dirDetails, file.getName());
-            return new IOException(msg, e);
-        }
-
-    }
-
-    public interface BlobFactory {
-
-        Blob createBlob(InputStream in) throws IOException;
-    }
-
-    public static final class NodeBuilderBlobFactory implements BlobFactory {
-
-        private final NodeBuilder builder;
-
-        public NodeBuilderBlobFactory(NodeBuilder builder) {
-            this.builder = builder;
-        }
-
-        @Override
-        public Blob createBlob(InputStream in) throws IOException {
-            return builder.createBlob(in);
-        }
-    }
-
-    public static final class BlobStoreBlobFactory implements BlobFactory {
-
-        private final BlobStore store;
-
-        public BlobStoreBlobFactory(BlobStore store) {
-            this.store = store;
-        }
-
-        @Override
-        public Blob createBlob(InputStream in) throws IOException {
-            String blobId;
-            if (!ENABLE_AYNC_DS) {
-                blobId = store.writeBlob(in, new 
BlobOptions().setUpload(SYNCHRONOUS));
-            } else {
-                blobId = store.writeBlob(in);
-            }
-            return new BlobStoreBlob(store, blobId);
-        }
-    }
 }

Added: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexFile.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexFile.java?rev=1807310&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexFile.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexFile.java
 Tue Sep  5 03:56:06 2017
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.lucene;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.primitives.Ints;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.commons.StringUtils;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.SequenceInputStream;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkElementIndex;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkPositionIndexes;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Lists.newArrayList;
+import static org.apache.jackrabbit.JcrConstants.JCR_DATA;
+import static org.apache.jackrabbit.JcrConstants.JCR_LASTMODIFIED;
+import static org.apache.jackrabbit.oak.api.Type.BINARIES;
+
+/**
+ * A file, which might be split into multiple blobs.
+ */
+class OakIndexFile {
+    static Logger LOG = LoggerFactory.getLogger(OakIndexFile.class);
+
+    /**
+     * Size of the blob entries to which the Lucene files are split.
+     * Set to higher than the 4kB inline limit for the BlobStore,
+     */
+    static final int DEFAULT_BLOB_SIZE = 32 * 1024;
+
+    /**
+     * The file name.
+     */
+    private final String name;
+
+    /**
+     * The node that contains the data for this file.
+     */
+    private final NodeBuilder file;
+
+    /**
+     * The maximum size of each blob.
+     */
+    private final int blobSize;
+
+    /**
+     * The current position within the file (for positioned read and write
+     * operations).
+     */
+    private long position = 0;
+
+    /**
+     * The length of the file.
+     */
+    private long length;
+
+    /**
+     * The list of blobs (might be empty).
+     * The last blob has a size of 1 up to blobSize.
+     * All other blobs have a size of blobSize.
+     */
+    private List<Blob> data;
+
+    /**
+     * Whether the data was modified since it was last flushed. If yes, on a
+     * flush, the metadata, and the list of blobs need to be stored.
+     */
+    private boolean dataModified = false;
+
+    /**
+     * The index of the currently loaded blob.
+     */
+    private int index = -1;
+
+    /**
+     * The data of the currently loaded blob.
+     */
+    private byte[] blob;
+
+    /**
+     * The unique key that is used to make the content unique (to allow 
removing binaries from the blob store without risking to remove binaries that 
are still needed).
+     */
+    private final byte[] uniqueKey;
+
+    /**
+     * Whether the currently loaded blob was modified since the blob was
+     * flushed.
+     */
+    private boolean blobModified = false;
+
+    private final String dirDetails;
+
+    private final BlobFactory blobFactory;
+
+    public OakIndexFile(String name, NodeBuilder file, String dirDetails,
+                        @Nonnull BlobFactory blobFactory) {
+        this.name = name;
+        this.file = file;
+        this.dirDetails = dirDetails;
+        this.blobSize = determineBlobSize(file);
+        this.uniqueKey = readUniqueKey(file);
+        this.blob = new byte[blobSize];
+        this.blobFactory = checkNotNull(blobFactory);
+
+        PropertyState property = file.getProperty(JCR_DATA);
+        if (property != null && property.getType() == BINARIES) {
+            this.data = newArrayList(property.getValue(BINARIES));
+        } else {
+            this.data = newArrayList();
+        }
+
+        this.length = (long)data.size() * blobSize;
+        if (!data.isEmpty()) {
+            Blob last = data.get(data.size() - 1);
+            this.length -= blobSize - last.length();
+            if (uniqueKey != null) {
+                this.length -= uniqueKey.length;
+            }
+        }
+    }
+
+    private OakIndexFile(OakIndexFile that) {
+        this.name = that.name;
+        this.file = that.file;
+        this.dirDetails = that.dirDetails;
+        this.blobSize = that.blobSize;
+        this.uniqueKey = that.uniqueKey;
+        this.blob = new byte[blobSize];
+
+        this.position = that.position;
+        this.length = that.length;
+        this.data = newArrayList(that.data);
+        this.dataModified = that.dataModified;
+        this.blobFactory = that.blobFactory;
+    }
+
+    private void loadBlob(int i) throws IOException {
+        checkElementIndex(i, data.size());
+        if (index != i) {
+            LOG.info("Load {}th blob from {}", i, name);
+            flushBlob();
+            checkState(!blobModified);
+
+            int n = (int) Math.min(blobSize, length - (long)i * blobSize);
+            InputStream stream = data.get(i).getNewStream();
+            try {
+                ByteStreams.readFully(stream, blob, 0, n);
+            } finally {
+                stream.close();
+            }
+            index = i;
+        }
+    }
+
+    private void flushBlob() throws IOException {
+        LOG.info("Flushing blob {}. Modified: {}", name, blobModified);
+        if (blobModified) {
+            int n = (int) Math.min(blobSize, length - (long)index * blobSize);
+            InputStream in = new ByteArrayInputStream(blob, 0, n);
+            if (uniqueKey != null) {
+                in = new SequenceInputStream(in,
+                        new ByteArrayInputStream(uniqueKey));
+            }
+
+            Blob b = blobFactory.createBlob(in);
+            if (index < data.size()) {
+                data.set(index, b);
+            } else {
+                checkState(index == data.size());
+                data.add(b);
+            }
+            dataModified = true;
+            blobModified = false;
+        }
+    }
+
+    /**
+     * Duplicates this instance to be used by a different consumer/thread.
+     * State of the cloned instance is same as original. Once cloned, the 
states
+     * would change separately according to how are they accessed.
+     *
+     * @return cloned instance
+     */
+    public OakIndexFile clone() {
+        return new OakIndexFile(this);
+    }
+
+    /**
+     * @return length of index file
+     */
+    public long length() {
+        return length;
+    }
+
+    /**
+     * @return current location of access
+     */
+    public long position() {
+        return position;
+    }
+
+    public void close() {
+        this.blob = null;
+        this.data = null;
+    }
+
+    public boolean isClosed() {
+        return blob == null && data == null;
+    }
+
+    /**
+     * Seek current location of access to {@code pos}
+     * @param pos
+     * @throws IOException
+     */
+    public void seek(long pos) throws IOException {
+        LOG.info("Seek to {} in {}", pos, name);
+        // seek() may be called with pos == length
+        // see https://issues.apache.org/jira/browse/LUCENE-1196
+        if (pos < 0 || pos > length) {
+            String msg = String.format("Invalid seek request for [%s][%s], " +
+                    "position: %d, file length: %d", dirDetails, name, pos, 
length);
+            throw new IOException(msg);
+        } else {
+            position = pos;
+        }
+    }
+
+    /**
+     * Read {@code len} number of bytes from underlying storage and copy them
+     * into byte array {@code b} starting at {@code offset}
+     * @param b byte array to copy contents read from storage
+     * @param offset index into {@code b} where the copy begins
+     * @param len numeber of bytes to be read from storage
+     * @throws IOException
+     */
+    public void readBytes(byte[] b, int offset, int len)
+            throws IOException {
+        checkPositionIndexes(offset, offset + len, checkNotNull(b).length);
+        LOG.info("read {} bytes from {}", len, name);
+
+        if (len < 0 || position + len > length) {
+            String msg = String.format("Invalid byte range request for 
[%s][%s], " +
+                    "position: %d, file length: %d, len: %d", dirDetails, 
name, position, length, len);
+            throw new IOException(msg);
+        }
+
+        int i = (int) (position / blobSize);
+        int o = (int) (position % blobSize);
+        while (len > 0) {
+            loadBlob(i);
+
+            int l = Math.min(len, blobSize - o);
+            System.arraycopy(blob, o, b, offset, l);
+
+            offset += l;
+            len -= l;
+            position += l;
+            // next block
+            i++;
+            // for the next block, we read from the beginning
+            o = 0;
+        }
+    }
+
+    /**
+     * Writes {@code len} number of bytes from byte array {@code b}
+     * starting at {@code offset} into the underlying storage
+     * @param b byte array to copy contents into the storage
+     * @param offset index into {@code b} where the copy begins
+     * @param len numeber of bytes to be written to storage
+     * @throws IOException
+     */
+    public void writeBytes(byte[] b, int offset, int len)
+            throws IOException {
+        LOG.info("Write {} bytes at {} in {}", len, offset, name);
+        int i = (int) (position / blobSize);
+        int o = (int) (position % blobSize);
+        while (len > 0) {
+            int l = Math.min(len, blobSize - o);
+
+            if (index != i) {
+                if (o > 0 || (l < blobSize && position + l < length)) {
+                    // loadBlob first flushes the previous block,
+                    // and it sets the index
+                    loadBlob(i);
+                } else {
+                    // we don't need to load the block,
+                    // as we anyway overwrite it fully, if:
+                    // o == 0 (start writing at a block boundary)
+                    // and either: l is the blockSize, or
+                    // we write at least to the end of the file
+                    flushBlob();
+                    index = i;
+                }
+            }
+            System.arraycopy(b, offset, blob, o, l);
+            blobModified = true;
+
+            offset += l;
+            len -= l;
+            position += l;
+            length = Math.max(length, position);
+
+            i++;
+            o = 0;
+        }
+    }
+
+    private static int determineBlobSize(NodeBuilder file){
+        if (file.hasProperty(OakDirectory.PROP_BLOB_SIZE)){
+            return 
Ints.checkedCast(file.getProperty(OakDirectory.PROP_BLOB_SIZE).getValue(Type.LONG));
+        }
+        return DEFAULT_BLOB_SIZE;
+    }
+
+    private static byte[] readUniqueKey(NodeBuilder file) {
+        if (file.hasProperty(OakDirectory.PROP_UNIQUE_KEY)) {
+            String key = file.getString(OakDirectory.PROP_UNIQUE_KEY);
+            return StringUtils.convertHexToBytes(key);
+        }
+        return null;
+    }
+
+    /**
+     * Flushes the content into storage. Before calling this method, written
+     * content only exist in memory
+     * @throws IOException
+     */
+    public void flush() throws IOException {
+        flushBlob();
+        if (dataModified) {
+            file.setProperty(JCR_LASTMODIFIED, System.currentTimeMillis());
+            file.setProperty(JCR_DATA, data, BINARIES);
+            dataModified = false;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return name;
+    }
+
+    /**
+     * @return name of the index being accessed
+     */
+    public String getName() {
+        return name;
+    }
+}

Propchange: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexFile.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexInput.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexInput.java?rev=1807310&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexInput.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexInput.java
 Tue Sep  5 03:56:06 2017
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.lucene;
+
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.util.WeakIdentityMap;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+class OakIndexInput extends IndexInput {
+
+    private final OakIndexFile file;
+    private boolean isClone = false;
+    private final WeakIdentityMap<OakIndexInput, Boolean> clones;
+    private final String dirDetails;
+
+    public OakIndexInput(String name, NodeBuilder file, String dirDetails,
+                         BlobFactory blobFactory) {
+        super(name);
+        this.dirDetails = dirDetails;
+        this.file = new OakIndexFile(name, file, dirDetails, blobFactory);
+        clones = WeakIdentityMap.newConcurrentHashMap();
+    }
+
+    private OakIndexInput(OakIndexInput that) {
+        super(that.toString());
+        this.file = that.file.clone();
+        clones = null;
+        this.dirDetails = that.dirDetails;
+    }
+
+    @Override
+    public OakIndexInput clone() {
+        // TODO : shouldn't we call super#clone ?
+        OakIndexInput clonedIndexInput = new OakIndexInput(this);
+        clonedIndexInput.isClone = true;
+        if (clones != null) {
+            clones.put(clonedIndexInput, Boolean.TRUE);
+        }
+        return clonedIndexInput;
+    }
+
+    @Override
+    public void readBytes(byte[] b, int o, int n) throws IOException {
+        checkNotClosed();
+        file.readBytes(b, o, n);
+    }
+
+    @Override
+    public byte readByte() throws IOException {
+        checkNotClosed();
+        byte[] b = new byte[1];
+        readBytes(b, 0, 1);
+        return b[0];
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+        checkNotClosed();
+        file.seek(pos);
+    }
+
+    @Override
+    public long length() {
+        checkNotClosed();
+        return file.length();
+    }
+
+    @Override
+    public long getFilePointer() {
+        checkNotClosed();
+        return file.position();
+    }
+
+    @Override
+    public void close() {
+        file.close();
+
+        if (clones != null) {
+            for (Iterator<OakIndexInput> it = clones.keyIterator(); 
it.hasNext();) {
+                final OakIndexInput clone = it.next();
+                assert clone.isClone;
+                clone.close();
+            }
+        }
+    }
+
+    private void checkNotClosed() {
+        if (file.isClosed()) {
+            throw new AlreadyClosedException("Already closed: [" + dirDetails 
+ "] " + this);
+        }
+    }
+
+}

Propchange: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexInput.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexOutput.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexOutput.java?rev=1807310&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexOutput.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexOutput.java
 Tue Sep  5 03:56:06 2017
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.lucene;
+
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.lucene.store.IndexOutput;
+
+import java.io.IOException;
+
+final class OakIndexOutput extends IndexOutput {
+    private final String dirDetails;
+    private final OakIndexFile file;
+
+    public OakIndexOutput(String name, NodeBuilder file, String dirDetails,
+                          BlobFactory blobFactory) throws IOException {
+        this.dirDetails = dirDetails;
+        this.file = new OakIndexFile(name, file, dirDetails, blobFactory);
+    }
+
+    @Override
+    public long length() {
+        return file.length();
+    }
+
+    @Override
+    public long getFilePointer() {
+        return file.position();
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+        file.seek(pos);
+    }
+
+    @Override
+    public void writeBytes(byte[] b, int offset, int length)
+            throws IOException {
+        try {
+            file.writeBytes(b, offset, length);
+        } catch (IOException e) {
+            throw wrapWithDetails(e);
+        }
+    }
+
+    @Override
+    public void writeByte(byte b) throws IOException {
+        writeBytes(new byte[] { b }, 0, 1);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        try {
+            file.flush();
+        } catch (IOException e) {
+            throw wrapWithDetails(e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        flush();
+        file.close();
+    }
+
+    private IOException wrapWithDetails(IOException e) {
+        String msg = String.format("Error occurred while writing to blob 
[%s][%s]", dirDetails, file.getName());
+        return new IOException(msg, e);
+    }
+
+}

Propchange: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexOutput.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectory.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectory.java?rev=1807310&r1=1807309&r2=1807310&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectory.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectory.java
 Tue Sep  5 03:56:06 2017
@@ -16,21 +16,11 @@
  */
 package org.apache.jackrabbit.oak.plugins.index.lucene.directory;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Set;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
 import com.google.common.collect.Sets;
-
-import 
org.apache.jackrabbit.oak.plugins.index.lucene.directory.ActiveDeletedBlobCollectorFactory.BlobDeletionCallback;
+import org.apache.jackrabbit.oak.plugins.index.lucene.BlobFactory;
 import org.apache.jackrabbit.oak.plugins.index.lucene.IndexDefinition;
 import org.apache.jackrabbit.oak.plugins.index.lucene.OakDirectory;
-import org.apache.jackrabbit.oak.plugins.index.lucene.OakDirectory.BlobFactory;
+import 
org.apache.jackrabbit.oak.plugins.index.lucene.directory.ActiveDeletedBlobCollectorFactory.BlobDeletionCallback;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.lucene.store.Directory;
@@ -42,6 +32,14 @@ import org.apache.lucene.store.LockFacto
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Set;
+
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.Arrays.asList;
 import static 
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
@@ -89,8 +87,8 @@ public final class BufferedOakDirectory
                                 @Nullable BlobStore blobStore,
                                 @Nonnull BlobDeletionCallback 
blobDeletionCallback) {
         this.blobFactory = blobStore != null ?
-                new OakDirectory.BlobStoreBlobFactory(blobStore) :
-                new OakDirectory.NodeBuilderBlobFactory(builder);
+                BlobFactory.getBlobStoreBlobFactory(blobStore) :
+                BlobFactory.getNodeBuilderBlobFactory(builder);
         this.blobDeletionCallback = blobDeletionCallback;
         this.dataNodeName = checkNotNull(dataNodeName);
         this.definition = checkNotNull(definition);

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectoryTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectoryTest.java?rev=1807310&r1=1807309&r2=1807310&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectoryTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectoryTest.java
 Tue Sep  5 03:56:06 2017
@@ -104,9 +104,9 @@ public class OakDirectoryTest {
 
     @Test
     public void testCompatibility() throws Exception{
-        builder.setProperty(LuceneIndexConstants.BLOB_SIZE, 
OakDirectory.DEFAULT_BLOB_SIZE);
+        builder.setProperty(LuceneIndexConstants.BLOB_SIZE, 
OakIndexFile.DEFAULT_BLOB_SIZE);
         Directory dir = createDir(builder, false, "/foo");
-        byte[] data = assertWrites(dir, OakDirectory.DEFAULT_BLOB_SIZE);
+        byte[] data = assertWrites(dir, OakIndexFile.DEFAULT_BLOB_SIZE);
 
         NodeBuilder testNode = 
builder.child(INDEX_DATA_CHILD_NAME).child("test");
         //Remove the size property to simulate old behaviour
@@ -473,7 +473,7 @@ public class OakDirectoryTest {
         final AtomicBoolean identifiableBlob = new AtomicBoolean(false);
 
         IndexDefinition def = new IndexDefinition(root, 
builder.getNodeState(), "/foo");
-        OakDirectory.BlobFactory factory = new OakDirectory.BlobFactory() {
+        BlobFactory factory = new BlobFactory() {
             @Override
             public Blob createBlob(InputStream in) throws IOException {
                 ByteArrayOutputStream out = new ByteArrayOutputStream();
@@ -521,7 +521,7 @@ public class OakDirectoryTest {
         final AtomicInteger numBlobs = new AtomicInteger();
         final int fileSize = 1024;
         IndexDefinition def = new IndexDefinition(root, 
builder.getNodeState(), "/foo");
-        OakDirectory.BlobFactory factory = new OakDirectory.BlobFactory() {
+        BlobFactory factory = new BlobFactory() {
             @Override
             public Blob createBlob(InputStream in) throws IOException {
                 ByteArrayOutputStream out = new ByteArrayOutputStream();


Reply via email to