Author: catholicon
Date: Sun Oct  1 00:42:02 2017
New Revision: 1810242

URL: http://svn.apache.org/viewvc?rev=1810242&view=rev
Log:
OAK-6269: Support non chunk storage in OakDirectory

Implment OakStreamingIndexFile and changes required along with that.

Added:
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakStreamingIndexFile.java
   (with props)
Modified:
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectory.java
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakBufferedIndexFile.java
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakDirectory.java
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexFile.java
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexInput.java
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexOutput.java

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java?rev=1810242&r1=1810241&r2=1810242&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
 Sun Oct  1 00:42:02 2017
@@ -59,6 +59,7 @@ import org.apache.jackrabbit.oak.plugins
 import 
org.apache.jackrabbit.oak.plugins.index.fulltext.PreExtractedTextProvider;
 import org.apache.jackrabbit.oak.plugins.index.importer.IndexImporterProvider;
 import 
org.apache.jackrabbit.oak.plugins.index.lucene.directory.ActiveDeletedBlobCollectorFactory;
+import 
org.apache.jackrabbit.oak.plugins.index.lucene.directory.BufferedOakDirectory;
 import 
org.apache.jackrabbit.oak.plugins.index.lucene.directory.LuceneIndexImporter;
 import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.DocumentQueue;
 import 
org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.ExternalObserverBuilder;
@@ -254,6 +255,16 @@ public class LuceneIndexProviderService
     final long MIN_BLOB_AGE_TO_ACTIVELY_DELETE = 
Long.getLong("oak.active.deletion.minAge",
             TimeUnit.HOURS.toSeconds(24));
 
+
+    private static final boolean 
PROP_ENABLE_SINGLE_BLOB_PER_INDEX_FILE_DEFAULT = true;
+    @Property(
+            boolValue = PROP_ENABLE_SINGLE_BLOB_PER_INDEX_FILE_DEFAULT,
+            label = "With CoW enabled, should index files by written as single 
blobs",
+            description = "Index files can be written as single blobs as 
chunked into smaller blobs. Enable" +
+                    " this to write single blob per index file. This would 
reduce number of blobs in the data store."
+    )
+    private static final String PROP_NAME_ENABLE_SINGLE_BLOB_PER_INDEX_FILE = 
"enableSingleBlobIndexFiles";
+
     private final Clock clock = Clock.SIMPLE;
 
     private Whiteboard whiteboard;
@@ -334,6 +345,13 @@ public class LuceneIndexProviderService
         configureIndexDefinitionStorage(config);
         configureBooleanClauseLimit(config);
         initializeFactoryClassLoaders(getClass().getClassLoader());
+        if 
(System.getProperty(BufferedOakDirectory.ENABLE_WRITING_SINGLE_BLOB_INDEX_FILE_PARAM)
 == null) {
+            
BufferedOakDirectory.setEnableWritingSingleBlobIndexFile(PropertiesUtil.toBoolean(
+                    config.get(PROP_NAME_ENABLE_SINGLE_BLOB_PER_INDEX_FILE),
+                    PROP_ENABLE_SINGLE_BLOB_PER_INDEX_FILE_DEFAULT));
+        } else {
+            log.info("Not setting config for single blob for an index file as 
it's set by command line!");
+        }
 
         whiteboard = new OsgiWhiteboard(bundleContext);
         threadPoolSize = 
PropertiesUtil.toInteger(config.get(PROP_THREAD_POOL_SIZE), 
PROP_THREAD_POOL_SIZE_DEFAULT);

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectory.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectory.java?rev=1810242&r1=1810241&r2=1810242&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectory.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectory.java
 Sun Oct  1 00:42:02 2017
@@ -48,6 +48,35 @@ import static org.apache.jackrabbit.oak.
  * except for blob values. Those are written immediately to the store.
  */
 public final class BufferedOakDirectory extends Directory {
+    public static final String ENABLE_WRITING_SINGLE_BLOB_INDEX_FILE_PARAM = 
"oak.lucene.enableSingleBlobIndexFiles";
+    private static boolean enableWritingSingleBlobIndexFile = 
Boolean.parseBoolean(
+            System.getProperty(ENABLE_WRITING_SINGLE_BLOB_INDEX_FILE_PARAM, 
"true"));
+    public static void setEnableWritingSingleBlobIndexFile (boolean val) {
+        String cliValStr = 
System.getProperty(ENABLE_WRITING_SINGLE_BLOB_INDEX_FILE_PARAM);
+
+        if (cliValStr != null) {
+            boolean cliVal = Boolean.parseBoolean(cliValStr);
+
+            if (cliVal != val) {
+                LOG.warn("Ignoring configuration {} as CLI param overrides 
with a different value", val);
+                if (cliVal != enableWritingSingleBlobIndexFile) {
+                    enableWritingSingleBlobIndexFile = cliVal;
+                }
+                return;
+            }
+        }
+        enableWritingSingleBlobIndexFile = val;
+    }
+    public static boolean isEnableWritingSingleBlobIndexFile() {
+        return enableWritingSingleBlobIndexFile;
+    }
+    // for test
+    static void reReadCommandLineParam() {
+        String val = 
System.getProperty(ENABLE_WRITING_SINGLE_BLOB_INDEX_FILE_PARAM);
+        if (val != null) {
+            enableWritingSingleBlobIndexFile = Boolean.parseBoolean(val);
+        }
+    }
 
     static final int DELETE_THRESHOLD_UNTIL_REOPEN = 100;
 
@@ -91,7 +120,7 @@ public final class BufferedOakDirectory
         this.dataNodeName = checkNotNull(dataNodeName);
         this.definition = checkNotNull(definition);
         this.base = new OakDirectory(checkNotNull(builder), dataNodeName,
-                definition, false, blobFactory, blobDeletionCallback);
+                definition, false, blobFactory, blobDeletionCallback, 
isEnableWritingSingleBlobIndexFile());
         reopenBuffered();
     }
 
@@ -224,6 +253,6 @@ public final class BufferedOakDirectory
         // those are files that were created and later deleted again
         bufferedBuilder = squeeze(bufferedBuilder.getNodeState()).builder();
         buffered = new OakDirectory(bufferedBuilder, dataNodeName,
-                definition, false, blobFactory, blobDeletionCallback);
+                definition, false, blobFactory, blobDeletionCallback, 
isEnableWritingSingleBlobIndexFile());
     }
 }

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakBufferedIndexFile.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakBufferedIndexFile.java?rev=1810242&r1=1810241&r2=1810242&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakBufferedIndexFile.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakBufferedIndexFile.java
 Sun Oct  1 00:42:02 2017
@@ -23,6 +23,7 @@ import org.apache.jackrabbit.oak.api.Pro
 import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.commons.StringUtils;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.lucene.store.DataInput;
 
 import javax.annotation.Nonnull;
 import java.io.ByteArrayInputStream;
@@ -298,6 +299,16 @@ class OakBufferedIndexFile implements Oa
         }
     }
 
+    @Override
+    public boolean supportsCopyFromDataInput() {
+        return false;
+    }
+
+    @Override
+    public void copyBytes(DataInput input, long numBytes) throws IOException {
+        throw new IllegalArgumentException("Don't call copyBytes for buffered 
case");
+    }
+
     private static int determineBlobSize(NodeBuilder file){
         if (file.hasProperty(OakDirectory.PROP_BLOB_SIZE)){
             return 
Ints.checkedCast(file.getProperty(OakDirectory.PROP_BLOB_SIZE).getValue(Type.LONG));

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakDirectory.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakDirectory.java?rev=1810242&r1=1810241&r2=1810242&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakDirectory.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakDirectory.java
 Sun Oct  1 00:42:02 2017
@@ -73,6 +73,7 @@ public class OakDirectory extends Direct
     private final IndexDefinition definition;
     private LockFactory lockFactory;
     private final boolean readOnly;
+    private final boolean streamingWriteEnabled;
     private final Set<String> fileNames = Sets.newConcurrentHashSet();
     private final Set<String> fileNamesAtStart;
     private final String indexName;
@@ -109,6 +110,14 @@ public class OakDirectory extends Direct
     public OakDirectory(NodeBuilder builder, String dataNodeName, 
IndexDefinition definition,
                         boolean readOnly, BlobFactory blobFactory,
                         @Nonnull BlobDeletionCallback blobDeletionCallback) {
+        this(builder, dataNodeName, definition, readOnly, blobFactory, 
blobDeletionCallback, false);
+    }
+
+    public OakDirectory(NodeBuilder builder, String dataNodeName, 
IndexDefinition definition,
+                        boolean readOnly, BlobFactory blobFactory,
+                        @Nonnull BlobDeletionCallback blobDeletionCallback,
+                        boolean streamingWriteEnabled) {
+
         this.lockFactory = NoLockFactory.getNoLockFactory();
         this.builder = builder;
         this.dataNodeName = dataNodeName;
@@ -120,6 +129,7 @@ public class OakDirectory extends Direct
         this.indexName = definition.getIndexName();
         this.blobFactory = blobFactory;
         this.blobDeletionCallback = blobDeletionCallback;
+        this.streamingWriteEnabled = streamingWriteEnabled;
     }
 
     @Override
@@ -191,7 +201,7 @@ public class OakDirectory extends Direct
 
         fileNames.add(name);
         markDirty();
-        return new OakIndexOutput(name, file, indexName, blobFactory);
+        return new OakIndexOutput(name, file, indexName, blobFactory, 
streamingWriteEnabled);
     }
 
 

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexFile.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexFile.java?rev=1810242&r1=1810241&r2=1810242&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexFile.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexFile.java
 Sun Oct  1 00:42:02 2017
@@ -1,8 +1,42 @@
 package org.apache.jackrabbit.oak.plugins.index.lucene.directory;
 
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.lucene.store.DataInput;
+
+import javax.annotation.Nonnull;
 import java.io.IOException;
 
+import static org.apache.jackrabbit.JcrConstants.JCR_DATA;
+import static org.apache.jackrabbit.oak.api.Type.BINARY;
+
 public interface OakIndexFile {
+    static OakIndexFile getOakIndexFile(String name, NodeBuilder file, String 
dirDetails,
+                                        @Nonnull BlobFactory blobFactory) {
+        return getOakIndexFile(name, file, dirDetails, blobFactory, false);
+    }
+
+    static OakIndexFile getOakIndexFile(String name, NodeBuilder file, String 
dirDetails,
+                                        @Nonnull BlobFactory blobFactory, 
boolean streamingWriteEnabled) {
+
+        boolean useStreaming;
+        PropertyState property = file.getProperty(JCR_DATA);
+        if (property != null) { //reading
+                useStreaming = property.getType() == BINARY;
+        } else { //writing
+            useStreaming = streamingWriteEnabled;
+        }
+
+        return useStreaming ?
+                new OakStreamingIndexFile(name, file, dirDetails, blobFactory) 
:
+                new OakBufferedIndexFile(name, file, dirDetails, blobFactory);
+    }
+
+    /**
+     * @return if the file implementation supports copying data from {@link 
DataInput} directly.
+     */
+    boolean supportsCopyFromDataInput();
+
     /**
      * @return name of the index being accessed
      */
@@ -60,6 +94,10 @@ public interface OakIndexFile {
     void writeBytes(byte[] b, int offset, int len)
             throws IOException;
 
+    /** Copy numBytes bytes from input to ourself. */
+    void copyBytes(DataInput input, long numBytes) throws IOException;
+
+
     /**
      * Flushes the content into storage. Before calling this method, written
      * content only exist in memory

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexInput.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexInput.java?rev=1810242&r1=1810241&r2=1810242&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexInput.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexInput.java
 Sun Oct  1 00:42:02 2017
@@ -24,9 +24,11 @@ import org.apache.lucene.util.WeakIdenti
 import java.io.IOException;
 import java.util.Iterator;
 
+import static 
org.apache.jackrabbit.oak.plugins.index.lucene.directory.OakIndexFile.getOakIndexFile;
+
 class OakIndexInput extends IndexInput {
 
-    private final OakIndexFile file;
+    final OakIndexFile file;
     private boolean isClone = false;
     private final WeakIdentityMap<OakIndexInput, Boolean> clones;
     private final String dirDetails;
@@ -35,7 +37,7 @@ class OakIndexInput extends IndexInput {
                          BlobFactory blobFactory) {
         super(name);
         this.dirDetails = dirDetails;
-        this.file = new OakBufferedIndexFile(name, file, dirDetails, 
blobFactory);
+        this.file = getOakIndexFile(name, file, dirDetails, blobFactory);
         clones = WeakIdentityMap.newConcurrentHashMap();
     }
 

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexOutput.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexOutput.java?rev=1810242&r1=1810241&r2=1810242&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexOutput.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexOutput.java
 Sun Oct  1 00:42:02 2017
@@ -17,18 +17,21 @@
 package org.apache.jackrabbit.oak.plugins.index.lucene.directory;
 
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.lucene.store.DataInput;
 import org.apache.lucene.store.IndexOutput;
 
 import java.io.IOException;
 
+import static 
org.apache.jackrabbit.oak.plugins.index.lucene.directory.OakIndexFile.getOakIndexFile;
+
 final class OakIndexOutput extends IndexOutput {
     private final String dirDetails;
-    private final OakIndexFile file;
+    final OakIndexFile file;
 
     public OakIndexOutput(String name, NodeBuilder file, String dirDetails,
-                          BlobFactory blobFactory) throws IOException {
+                          BlobFactory blobFactory, boolean 
streamingWriteEnabled) throws IOException {
         this.dirDetails = dirDetails;
-        this.file = new OakBufferedIndexFile(name, file, dirDetails, 
blobFactory);
+        this.file = getOakIndexFile(name, file, dirDetails, blobFactory, 
streamingWriteEnabled);
     }
 
     @Override
@@ -62,6 +65,16 @@ final class OakIndexOutput extends Index
     }
 
     @Override
+    public void copyBytes(DataInput input, long numBytes) throws IOException {
+        //TODO: Do we know that copyBytes would always reach us via copy??
+        if (file.supportsCopyFromDataInput()) {
+            file.copyBytes(input, numBytes);
+        } else {
+            super.copyBytes(input, numBytes);
+        }
+    }
+
+    @Override
     public void flush() throws IOException {
         try {
             file.flush();

Added: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakStreamingIndexFile.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakStreamingIndexFile.java?rev=1810242&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakStreamingIndexFile.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakStreamingIndexFile.java
 Sun Oct  1 00:42:02 2017
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.lucene.directory;
+
+import com.google.common.io.ByteStreams;
+import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.commons.StringUtils;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.lucene.store.DataInput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.SequenceInputStream;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkPositionIndexes;
+import static org.apache.jackrabbit.JcrConstants.JCR_DATA;
+import static org.apache.jackrabbit.JcrConstants.JCR_LASTMODIFIED;
+import static org.apache.jackrabbit.oak.api.Type.BINARY;
+
+/**
+ * A file which streams blob directly off of storage.
+ */
+class OakStreamingIndexFile implements OakIndexFile, AutoCloseable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(OakStreamingIndexFile.class.getName());
+
+    /**
+     * The file name.
+     */
+    private final String name;
+
+    /**
+     * The node that contains the blob for this file.
+     */
+    private final NodeBuilder file;
+
+    /**
+     * The current position within the file (in streaming case, useful only 
for reading).
+     */
+    private long position = 0;
+
+    /**
+     * The length of the file.
+     */
+    private long length;
+
+    /**
+     * The blob which has been read for reading case.
+     * For writing case, it contains the blob that's pushed to repository
+     */
+    private Blob blob;
+
+    /**
+     * Whether the blob was modified since it was last flushed. If yes, on a
+     * flush the metadata and the blob to the store.
+     */
+    private boolean blobModified = false;
+
+    /**
+     * The {@link InputStream} to read blob from blob.
+     */
+    private InputStream blobInputStream;
+
+    /**
+     * The unique key that is used to make the content unique (to allow 
removing binaries from the blob store without
+     * risking to remove binaries that are still needed).
+     */
+    private final byte[] uniqueKey;
+
+    private final String dirDetails;
+
+    private final BlobFactory blobFactory;
+
+    OakStreamingIndexFile(String name, NodeBuilder file, String dirDetails,
+                                 @Nonnull BlobFactory blobFactory) {
+        this.name = name;
+        this.file = file;
+        this.dirDetails = dirDetails;
+        this.uniqueKey = readUniqueKey(file);
+        this.blobFactory = checkNotNull(blobFactory);
+
+        PropertyState property = file.getProperty(JCR_DATA);
+        if (property != null) {
+            if (property.getType() == BINARY) {
+                this.blob = property.getValue(BINARY);
+            } else {
+                throw new IllegalArgumentException("Can't load blob for 
streaming for " + name + " under " + file);
+            }
+        } else {
+            this.blob = null;
+        }
+
+        if (blob != null) {
+            this.length = blob.length();
+            if (uniqueKey != null) {
+                this.length -= uniqueKey.length;
+            }
+        }
+
+        this.blobInputStream = null;
+    }
+
+    private OakStreamingIndexFile(OakStreamingIndexFile that) {
+        this.name = that.name;
+        this.file = that.file;
+        this.dirDetails = that.dirDetails;
+        this.uniqueKey = that.uniqueKey;
+
+        this.position = that.position;
+        this.length = that.length;
+        this.blob = that.blob;
+        this.blobModified = that.blobModified;
+        this.blobFactory = that.blobFactory;
+    }
+
+    private void setupInputStream() throws IOException {
+        if (blobInputStream == null) {
+            blobInputStream = blob.getNewStream();
+
+            if (position > 0) {
+                long pos = position;
+                position = 0;
+                seek(pos);
+            }
+        }
+    }
+
+    private void releaseInputStream() {
+        if (blobInputStream != null) {
+            try {
+                blobInputStream.close();
+            } catch (Exception ignored) {
+                //ignore
+            }
+            blobInputStream = null;
+        }
+    }
+
+    @Override
+    public OakIndexFile clone() {
+        return new OakStreamingIndexFile(this);
+    }
+
+    @Override
+    public long length() {
+        return length;
+    }
+
+    @Override
+    public long position() {
+        return position;
+    }
+
+    @Override
+    public void close() {
+        IOUtils.closeQuietly(blobInputStream);
+        this.blob = null;
+    }
+
+    @Override
+    public boolean isClosed() {
+        return blobInputStream == null && blob == null;
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+        // seek() may be called with pos == length
+        // see https://issues.apache.org/jira/browse/LUCENE-1196
+        if (pos < 0 || pos > length) {
+            String msg = String.format("Invalid seek request for [%s][%s], " +
+                    "position: %d, file length: %d", dirDetails, name, pos, 
length);
+            releaseInputStream();
+            throw new IOException(msg);
+        } else {
+            if (blobInputStream == null) {
+                position = pos;
+            } else if (pos < position) {
+                LOG.warn("Seeking back on streaming index file {}. Current 
position {}, requested position {}." +
+                                "Please make sure that CopyOnRead and prefetch 
of index files are enabled.",
+                        getName(), position(), pos);
+
+                // seeking back on input stream. Close current one
+                IOUtils.closeQuietly(blobInputStream);
+                blobInputStream = null;
+                position = pos;
+            } else {
+                while (position < pos) {
+                    long skipCnt = blobInputStream.skip(pos - position());
+                    if (skipCnt <= 0) {
+                        String msg = String.format("Seek request for [%s][%s], 
" +
+                                "position: %d, file length: %d failed. 
InputStream.skip returned %d",
+                                dirDetails, name, pos, length, skipCnt);
+                        releaseInputStream();
+                        throw new IOException(msg);
+                    }
+                    position += skipCnt;
+                }
+            }
+        }
+    }
+
+    @Override
+    public void readBytes(byte[] b, int offset, int len)
+            throws IOException {
+        checkPositionIndexes(offset, offset + len, checkNotNull(b).length);
+
+        if (len < 0 || position + len > length) {
+            String msg = String.format("Invalid byte range request for 
[%s][%s], " +
+                    "position: %d, file length: %d, len: %d", dirDetails, 
name, position, length, len);
+            releaseInputStream();
+            throw new IOException(msg);
+        }
+
+        setupInputStream();
+        int readCnt = ByteStreams.read(blobInputStream, b, offset, len);
+        if (readCnt < len) {
+            String msg = String.format("Couldn't read byte range request for 
[%s][%s], " +
+                    "position: %d, file length: %d, len: %d. Actual read bytes 
%d",
+                    dirDetails, name, position, length, len, readCnt);
+            releaseInputStream();
+            throw new IOException(msg);
+        }
+
+        position += len;
+    }
+
+    @Override
+    public void writeBytes(final byte[] b, final int offset, final int len)
+            throws IOException {
+        if (blobModified) {
+            throw new IllegalArgumentException("Can't do piece wise upload 
with streaming access");
+        }
+
+        InputStream in = new InputStream() {
+            int position = offset;
+
+            @Override
+            public int available() throws IOException {
+                return offset + len - position;
+            }
+
+            @Override
+            public int read() throws IOException {
+                if (available() <= 0) {
+                    return -1;
+                } else {
+                    int ret = b[position++];
+                    return ret < 0 ? 256 + ret: ret;
+                }
+            }
+
+            @Override
+            public int read(@Nonnull byte[] target, int off, int len) throws 
IOException {
+                if (available() <= 0) {
+                    return -1;
+                }
+
+                int read = (int)Math.min((long)len, available());
+                System.arraycopy(b, position, target, off, read);
+
+                position += read;
+
+                return read;
+            }
+        };
+
+        pushData(in);
+    }
+
+    @Override
+    public boolean supportsCopyFromDataInput() {
+        return true;
+    }
+
+    @Override
+    public void copyBytes(DataInput input, final long numBytes) throws 
IOException {
+        InputStream in = new InputStream() {
+            long bytesLeftToRead = numBytes;
+
+            @Override
+            public int read() throws IOException {
+                if (bytesLeftToRead <= 0) {
+                    return -1;
+                } else {
+                    bytesLeftToRead--;
+                    int ret = input.readByte();
+                    return ret < 0 ? 256 + ret: ret;
+                }
+            }
+
+            @Override
+            public int read(@Nonnull byte[] b, int off, int len) throws 
IOException {
+                if (bytesLeftToRead == 0) {
+                    return -1;
+                }
+
+                int read = (int)Math.min((long)len, bytesLeftToRead);
+                input.readBytes(b, off, read);
+
+                bytesLeftToRead -= read;
+
+                return read;
+            }
+        };
+
+        pushData(in);
+    }
+
+    private void pushData(InputStream in) throws IOException {
+        if (uniqueKey != null) {
+            in = new SequenceInputStream(in,
+                    new ByteArrayInputStream(uniqueKey));
+        }
+
+        blob = blobFactory.createBlob(in);
+        blobModified = true;
+    }
+
+    private static byte[] readUniqueKey(NodeBuilder file) {
+        if (file.hasProperty(OakDirectory.PROP_UNIQUE_KEY)) {
+            String key = file.getString(OakDirectory.PROP_UNIQUE_KEY);
+            return StringUtils.convertHexToBytes(key);
+        }
+        return null;
+    }
+
+    @Override
+    public void flush() throws IOException {
+        if (blobModified) {
+            file.setProperty(JCR_LASTMODIFIED, System.currentTimeMillis());
+            file.setProperty(JCR_DATA, blob, BINARY);
+            blobModified = false;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return name;
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+}

Propchange: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakStreamingIndexFile.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to