Author: catholicon
Date: Sun Oct 1 00:42:02 2017
New Revision: 1810242
URL: http://svn.apache.org/viewvc?rev=1810242&view=rev
Log:
OAK-6269: Support non chunk storage in OakDirectory
Implment OakStreamingIndexFile and changes required along with that.
Added:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakStreamingIndexFile.java
(with props)
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectory.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakBufferedIndexFile.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakDirectory.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexFile.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexInput.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexOutput.java
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java?rev=1810242&r1=1810241&r2=1810242&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
(original)
+++
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
Sun Oct 1 00:42:02 2017
@@ -59,6 +59,7 @@ import org.apache.jackrabbit.oak.plugins
import
org.apache.jackrabbit.oak.plugins.index.fulltext.PreExtractedTextProvider;
import org.apache.jackrabbit.oak.plugins.index.importer.IndexImporterProvider;
import
org.apache.jackrabbit.oak.plugins.index.lucene.directory.ActiveDeletedBlobCollectorFactory;
+import
org.apache.jackrabbit.oak.plugins.index.lucene.directory.BufferedOakDirectory;
import
org.apache.jackrabbit.oak.plugins.index.lucene.directory.LuceneIndexImporter;
import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.DocumentQueue;
import
org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.ExternalObserverBuilder;
@@ -254,6 +255,16 @@ public class LuceneIndexProviderService
final long MIN_BLOB_AGE_TO_ACTIVELY_DELETE =
Long.getLong("oak.active.deletion.minAge",
TimeUnit.HOURS.toSeconds(24));
+
+ private static final boolean
PROP_ENABLE_SINGLE_BLOB_PER_INDEX_FILE_DEFAULT = true;
+ @Property(
+ boolValue = PROP_ENABLE_SINGLE_BLOB_PER_INDEX_FILE_DEFAULT,
+ label = "With CoW enabled, should index files by written as single
blobs",
+ description = "Index files can be written as single blobs as
chunked into smaller blobs. Enable" +
+ " this to write single blob per index file. This would
reduce number of blobs in the data store."
+ )
+ private static final String PROP_NAME_ENABLE_SINGLE_BLOB_PER_INDEX_FILE =
"enableSingleBlobIndexFiles";
+
private final Clock clock = Clock.SIMPLE;
private Whiteboard whiteboard;
@@ -334,6 +345,13 @@ public class LuceneIndexProviderService
configureIndexDefinitionStorage(config);
configureBooleanClauseLimit(config);
initializeFactoryClassLoaders(getClass().getClassLoader());
+ if
(System.getProperty(BufferedOakDirectory.ENABLE_WRITING_SINGLE_BLOB_INDEX_FILE_PARAM)
== null) {
+
BufferedOakDirectory.setEnableWritingSingleBlobIndexFile(PropertiesUtil.toBoolean(
+ config.get(PROP_NAME_ENABLE_SINGLE_BLOB_PER_INDEX_FILE),
+ PROP_ENABLE_SINGLE_BLOB_PER_INDEX_FILE_DEFAULT));
+ } else {
+ log.info("Not setting config for single blob for an index file as
it's set by command line!");
+ }
whiteboard = new OsgiWhiteboard(bundleContext);
threadPoolSize =
PropertiesUtil.toInteger(config.get(PROP_THREAD_POOL_SIZE),
PROP_THREAD_POOL_SIZE_DEFAULT);
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectory.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectory.java?rev=1810242&r1=1810241&r2=1810242&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectory.java
(original)
+++
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectory.java
Sun Oct 1 00:42:02 2017
@@ -48,6 +48,35 @@ import static org.apache.jackrabbit.oak.
* except for blob values. Those are written immediately to the store.
*/
public final class BufferedOakDirectory extends Directory {
+ public static final String ENABLE_WRITING_SINGLE_BLOB_INDEX_FILE_PARAM =
"oak.lucene.enableSingleBlobIndexFiles";
+ private static boolean enableWritingSingleBlobIndexFile =
Boolean.parseBoolean(
+ System.getProperty(ENABLE_WRITING_SINGLE_BLOB_INDEX_FILE_PARAM,
"true"));
+ public static void setEnableWritingSingleBlobIndexFile (boolean val) {
+ String cliValStr =
System.getProperty(ENABLE_WRITING_SINGLE_BLOB_INDEX_FILE_PARAM);
+
+ if (cliValStr != null) {
+ boolean cliVal = Boolean.parseBoolean(cliValStr);
+
+ if (cliVal != val) {
+ LOG.warn("Ignoring configuration {} as CLI param overrides
with a different value", val);
+ if (cliVal != enableWritingSingleBlobIndexFile) {
+ enableWritingSingleBlobIndexFile = cliVal;
+ }
+ return;
+ }
+ }
+ enableWritingSingleBlobIndexFile = val;
+ }
+ public static boolean isEnableWritingSingleBlobIndexFile() {
+ return enableWritingSingleBlobIndexFile;
+ }
+ // for test
+ static void reReadCommandLineParam() {
+ String val =
System.getProperty(ENABLE_WRITING_SINGLE_BLOB_INDEX_FILE_PARAM);
+ if (val != null) {
+ enableWritingSingleBlobIndexFile = Boolean.parseBoolean(val);
+ }
+ }
static final int DELETE_THRESHOLD_UNTIL_REOPEN = 100;
@@ -91,7 +120,7 @@ public final class BufferedOakDirectory
this.dataNodeName = checkNotNull(dataNodeName);
this.definition = checkNotNull(definition);
this.base = new OakDirectory(checkNotNull(builder), dataNodeName,
- definition, false, blobFactory, blobDeletionCallback);
+ definition, false, blobFactory, blobDeletionCallback,
isEnableWritingSingleBlobIndexFile());
reopenBuffered();
}
@@ -224,6 +253,6 @@ public final class BufferedOakDirectory
// those are files that were created and later deleted again
bufferedBuilder = squeeze(bufferedBuilder.getNodeState()).builder();
buffered = new OakDirectory(bufferedBuilder, dataNodeName,
- definition, false, blobFactory, blobDeletionCallback);
+ definition, false, blobFactory, blobDeletionCallback,
isEnableWritingSingleBlobIndexFile());
}
}
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakBufferedIndexFile.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakBufferedIndexFile.java?rev=1810242&r1=1810241&r2=1810242&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakBufferedIndexFile.java
(original)
+++
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakBufferedIndexFile.java
Sun Oct 1 00:42:02 2017
@@ -23,6 +23,7 @@ import org.apache.jackrabbit.oak.api.Pro
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.commons.StringUtils;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.lucene.store.DataInput;
import javax.annotation.Nonnull;
import java.io.ByteArrayInputStream;
@@ -298,6 +299,16 @@ class OakBufferedIndexFile implements Oa
}
}
+ @Override
+ public boolean supportsCopyFromDataInput() {
+ return false;
+ }
+
+ @Override
+ public void copyBytes(DataInput input, long numBytes) throws IOException {
+ throw new IllegalArgumentException("Don't call copyBytes for buffered
case");
+ }
+
private static int determineBlobSize(NodeBuilder file){
if (file.hasProperty(OakDirectory.PROP_BLOB_SIZE)){
return
Ints.checkedCast(file.getProperty(OakDirectory.PROP_BLOB_SIZE).getValue(Type.LONG));
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakDirectory.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakDirectory.java?rev=1810242&r1=1810241&r2=1810242&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakDirectory.java
(original)
+++
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakDirectory.java
Sun Oct 1 00:42:02 2017
@@ -73,6 +73,7 @@ public class OakDirectory extends Direct
private final IndexDefinition definition;
private LockFactory lockFactory;
private final boolean readOnly;
+ private final boolean streamingWriteEnabled;
private final Set<String> fileNames = Sets.newConcurrentHashSet();
private final Set<String> fileNamesAtStart;
private final String indexName;
@@ -109,6 +110,14 @@ public class OakDirectory extends Direct
public OakDirectory(NodeBuilder builder, String dataNodeName,
IndexDefinition definition,
boolean readOnly, BlobFactory blobFactory,
@Nonnull BlobDeletionCallback blobDeletionCallback) {
+ this(builder, dataNodeName, definition, readOnly, blobFactory,
blobDeletionCallback, false);
+ }
+
+ public OakDirectory(NodeBuilder builder, String dataNodeName,
IndexDefinition definition,
+ boolean readOnly, BlobFactory blobFactory,
+ @Nonnull BlobDeletionCallback blobDeletionCallback,
+ boolean streamingWriteEnabled) {
+
this.lockFactory = NoLockFactory.getNoLockFactory();
this.builder = builder;
this.dataNodeName = dataNodeName;
@@ -120,6 +129,7 @@ public class OakDirectory extends Direct
this.indexName = definition.getIndexName();
this.blobFactory = blobFactory;
this.blobDeletionCallback = blobDeletionCallback;
+ this.streamingWriteEnabled = streamingWriteEnabled;
}
@Override
@@ -191,7 +201,7 @@ public class OakDirectory extends Direct
fileNames.add(name);
markDirty();
- return new OakIndexOutput(name, file, indexName, blobFactory);
+ return new OakIndexOutput(name, file, indexName, blobFactory,
streamingWriteEnabled);
}
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexFile.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexFile.java?rev=1810242&r1=1810241&r2=1810242&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexFile.java
(original)
+++
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexFile.java
Sun Oct 1 00:42:02 2017
@@ -1,8 +1,42 @@
package org.apache.jackrabbit.oak.plugins.index.lucene.directory;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.lucene.store.DataInput;
+
+import javax.annotation.Nonnull;
import java.io.IOException;
+import static org.apache.jackrabbit.JcrConstants.JCR_DATA;
+import static org.apache.jackrabbit.oak.api.Type.BINARY;
+
public interface OakIndexFile {
+ static OakIndexFile getOakIndexFile(String name, NodeBuilder file, String
dirDetails,
+ @Nonnull BlobFactory blobFactory) {
+ return getOakIndexFile(name, file, dirDetails, blobFactory, false);
+ }
+
+ static OakIndexFile getOakIndexFile(String name, NodeBuilder file, String
dirDetails,
+ @Nonnull BlobFactory blobFactory,
boolean streamingWriteEnabled) {
+
+ boolean useStreaming;
+ PropertyState property = file.getProperty(JCR_DATA);
+ if (property != null) { //reading
+ useStreaming = property.getType() == BINARY;
+ } else { //writing
+ useStreaming = streamingWriteEnabled;
+ }
+
+ return useStreaming ?
+ new OakStreamingIndexFile(name, file, dirDetails, blobFactory)
:
+ new OakBufferedIndexFile(name, file, dirDetails, blobFactory);
+ }
+
+ /**
+ * @return if the file implementation supports copying data from {@link
DataInput} directly.
+ */
+ boolean supportsCopyFromDataInput();
+
/**
* @return name of the index being accessed
*/
@@ -60,6 +94,10 @@ public interface OakIndexFile {
void writeBytes(byte[] b, int offset, int len)
throws IOException;
+ /** Copy numBytes bytes from input to ourself. */
+ void copyBytes(DataInput input, long numBytes) throws IOException;
+
+
/**
* Flushes the content into storage. Before calling this method, written
* content only exist in memory
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexInput.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexInput.java?rev=1810242&r1=1810241&r2=1810242&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexInput.java
(original)
+++
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexInput.java
Sun Oct 1 00:42:02 2017
@@ -24,9 +24,11 @@ import org.apache.lucene.util.WeakIdenti
import java.io.IOException;
import java.util.Iterator;
+import static
org.apache.jackrabbit.oak.plugins.index.lucene.directory.OakIndexFile.getOakIndexFile;
+
class OakIndexInput extends IndexInput {
- private final OakIndexFile file;
+ final OakIndexFile file;
private boolean isClone = false;
private final WeakIdentityMap<OakIndexInput, Boolean> clones;
private final String dirDetails;
@@ -35,7 +37,7 @@ class OakIndexInput extends IndexInput {
BlobFactory blobFactory) {
super(name);
this.dirDetails = dirDetails;
- this.file = new OakBufferedIndexFile(name, file, dirDetails,
blobFactory);
+ this.file = getOakIndexFile(name, file, dirDetails, blobFactory);
clones = WeakIdentityMap.newConcurrentHashMap();
}
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexOutput.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexOutput.java?rev=1810242&r1=1810241&r2=1810242&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexOutput.java
(original)
+++
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexOutput.java
Sun Oct 1 00:42:02 2017
@@ -17,18 +17,21 @@
package org.apache.jackrabbit.oak.plugins.index.lucene.directory;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.IndexOutput;
import java.io.IOException;
+import static
org.apache.jackrabbit.oak.plugins.index.lucene.directory.OakIndexFile.getOakIndexFile;
+
final class OakIndexOutput extends IndexOutput {
private final String dirDetails;
- private final OakIndexFile file;
+ final OakIndexFile file;
public OakIndexOutput(String name, NodeBuilder file, String dirDetails,
- BlobFactory blobFactory) throws IOException {
+ BlobFactory blobFactory, boolean
streamingWriteEnabled) throws IOException {
this.dirDetails = dirDetails;
- this.file = new OakBufferedIndexFile(name, file, dirDetails,
blobFactory);
+ this.file = getOakIndexFile(name, file, dirDetails, blobFactory,
streamingWriteEnabled);
}
@Override
@@ -62,6 +65,16 @@ final class OakIndexOutput extends Index
}
@Override
+ public void copyBytes(DataInput input, long numBytes) throws IOException {
+ //TODO: Do we know that copyBytes would always reach us via copy??
+ if (file.supportsCopyFromDataInput()) {
+ file.copyBytes(input, numBytes);
+ } else {
+ super.copyBytes(input, numBytes);
+ }
+ }
+
+ @Override
public void flush() throws IOException {
try {
file.flush();
Added:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakStreamingIndexFile.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakStreamingIndexFile.java?rev=1810242&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakStreamingIndexFile.java
(added)
+++
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakStreamingIndexFile.java
Sun Oct 1 00:42:02 2017
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.lucene.directory;
+
+import com.google.common.io.ByteStreams;
+import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.commons.StringUtils;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.lucene.store.DataInput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.SequenceInputStream;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkPositionIndexes;
+import static org.apache.jackrabbit.JcrConstants.JCR_DATA;
+import static org.apache.jackrabbit.JcrConstants.JCR_LASTMODIFIED;
+import static org.apache.jackrabbit.oak.api.Type.BINARY;
+
+/**
+ * A file which streams blob directly off of storage.
+ */
+class OakStreamingIndexFile implements OakIndexFile, AutoCloseable {
+ private static final Logger LOG =
LoggerFactory.getLogger(OakStreamingIndexFile.class.getName());
+
+ /**
+ * The file name.
+ */
+ private final String name;
+
+ /**
+ * The node that contains the blob for this file.
+ */
+ private final NodeBuilder file;
+
+ /**
+ * The current position within the file (in streaming case, useful only
for reading).
+ */
+ private long position = 0;
+
+ /**
+ * The length of the file.
+ */
+ private long length;
+
+ /**
+ * The blob which has been read for reading case.
+ * For writing case, it contains the blob that's pushed to repository
+ */
+ private Blob blob;
+
+ /**
+ * Whether the blob was modified since it was last flushed. If yes, on a
+ * flush the metadata and the blob to the store.
+ */
+ private boolean blobModified = false;
+
+ /**
+ * The {@link InputStream} to read blob from blob.
+ */
+ private InputStream blobInputStream;
+
+ /**
+ * The unique key that is used to make the content unique (to allow
removing binaries from the blob store without
+ * risking to remove binaries that are still needed).
+ */
+ private final byte[] uniqueKey;
+
+ private final String dirDetails;
+
+ private final BlobFactory blobFactory;
+
+ OakStreamingIndexFile(String name, NodeBuilder file, String dirDetails,
+ @Nonnull BlobFactory blobFactory) {
+ this.name = name;
+ this.file = file;
+ this.dirDetails = dirDetails;
+ this.uniqueKey = readUniqueKey(file);
+ this.blobFactory = checkNotNull(blobFactory);
+
+ PropertyState property = file.getProperty(JCR_DATA);
+ if (property != null) {
+ if (property.getType() == BINARY) {
+ this.blob = property.getValue(BINARY);
+ } else {
+ throw new IllegalArgumentException("Can't load blob for
streaming for " + name + " under " + file);
+ }
+ } else {
+ this.blob = null;
+ }
+
+ if (blob != null) {
+ this.length = blob.length();
+ if (uniqueKey != null) {
+ this.length -= uniqueKey.length;
+ }
+ }
+
+ this.blobInputStream = null;
+ }
+
+ private OakStreamingIndexFile(OakStreamingIndexFile that) {
+ this.name = that.name;
+ this.file = that.file;
+ this.dirDetails = that.dirDetails;
+ this.uniqueKey = that.uniqueKey;
+
+ this.position = that.position;
+ this.length = that.length;
+ this.blob = that.blob;
+ this.blobModified = that.blobModified;
+ this.blobFactory = that.blobFactory;
+ }
+
+ private void setupInputStream() throws IOException {
+ if (blobInputStream == null) {
+ blobInputStream = blob.getNewStream();
+
+ if (position > 0) {
+ long pos = position;
+ position = 0;
+ seek(pos);
+ }
+ }
+ }
+
+ private void releaseInputStream() {
+ if (blobInputStream != null) {
+ try {
+ blobInputStream.close();
+ } catch (Exception ignored) {
+ //ignore
+ }
+ blobInputStream = null;
+ }
+ }
+
+ @Override
+ public OakIndexFile clone() {
+ return new OakStreamingIndexFile(this);
+ }
+
+ @Override
+ public long length() {
+ return length;
+ }
+
+ @Override
+ public long position() {
+ return position;
+ }
+
+ @Override
+ public void close() {
+ IOUtils.closeQuietly(blobInputStream);
+ this.blob = null;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return blobInputStream == null && blob == null;
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ // seek() may be called with pos == length
+ // see https://issues.apache.org/jira/browse/LUCENE-1196
+ if (pos < 0 || pos > length) {
+ String msg = String.format("Invalid seek request for [%s][%s], " +
+ "position: %d, file length: %d", dirDetails, name, pos,
length);
+ releaseInputStream();
+ throw new IOException(msg);
+ } else {
+ if (blobInputStream == null) {
+ position = pos;
+ } else if (pos < position) {
+ LOG.warn("Seeking back on streaming index file {}. Current
position {}, requested position {}." +
+ "Please make sure that CopyOnRead and prefetch
of index files are enabled.",
+ getName(), position(), pos);
+
+ // seeking back on input stream. Close current one
+ IOUtils.closeQuietly(blobInputStream);
+ blobInputStream = null;
+ position = pos;
+ } else {
+ while (position < pos) {
+ long skipCnt = blobInputStream.skip(pos - position());
+ if (skipCnt <= 0) {
+ String msg = String.format("Seek request for [%s][%s],
" +
+ "position: %d, file length: %d failed.
InputStream.skip returned %d",
+ dirDetails, name, pos, length, skipCnt);
+ releaseInputStream();
+ throw new IOException(msg);
+ }
+ position += skipCnt;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void readBytes(byte[] b, int offset, int len)
+ throws IOException {
+ checkPositionIndexes(offset, offset + len, checkNotNull(b).length);
+
+ if (len < 0 || position + len > length) {
+ String msg = String.format("Invalid byte range request for
[%s][%s], " +
+ "position: %d, file length: %d, len: %d", dirDetails,
name, position, length, len);
+ releaseInputStream();
+ throw new IOException(msg);
+ }
+
+ setupInputStream();
+ int readCnt = ByteStreams.read(blobInputStream, b, offset, len);
+ if (readCnt < len) {
+ String msg = String.format("Couldn't read byte range request for
[%s][%s], " +
+ "position: %d, file length: %d, len: %d. Actual read bytes
%d",
+ dirDetails, name, position, length, len, readCnt);
+ releaseInputStream();
+ throw new IOException(msg);
+ }
+
+ position += len;
+ }
+
+ @Override
+ public void writeBytes(final byte[] b, final int offset, final int len)
+ throws IOException {
+ if (blobModified) {
+ throw new IllegalArgumentException("Can't do piece wise upload
with streaming access");
+ }
+
+ InputStream in = new InputStream() {
+ int position = offset;
+
+ @Override
+ public int available() throws IOException {
+ return offset + len - position;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (available() <= 0) {
+ return -1;
+ } else {
+ int ret = b[position++];
+ return ret < 0 ? 256 + ret: ret;
+ }
+ }
+
+ @Override
+ public int read(@Nonnull byte[] target, int off, int len) throws
IOException {
+ if (available() <= 0) {
+ return -1;
+ }
+
+ int read = (int)Math.min((long)len, available());
+ System.arraycopy(b, position, target, off, read);
+
+ position += read;
+
+ return read;
+ }
+ };
+
+ pushData(in);
+ }
+
+ @Override
+ public boolean supportsCopyFromDataInput() {
+ return true;
+ }
+
+ @Override
+ public void copyBytes(DataInput input, final long numBytes) throws
IOException {
+ InputStream in = new InputStream() {
+ long bytesLeftToRead = numBytes;
+
+ @Override
+ public int read() throws IOException {
+ if (bytesLeftToRead <= 0) {
+ return -1;
+ } else {
+ bytesLeftToRead--;
+ int ret = input.readByte();
+ return ret < 0 ? 256 + ret: ret;
+ }
+ }
+
+ @Override
+ public int read(@Nonnull byte[] b, int off, int len) throws
IOException {
+ if (bytesLeftToRead == 0) {
+ return -1;
+ }
+
+ int read = (int)Math.min((long)len, bytesLeftToRead);
+ input.readBytes(b, off, read);
+
+ bytesLeftToRead -= read;
+
+ return read;
+ }
+ };
+
+ pushData(in);
+ }
+
+ private void pushData(InputStream in) throws IOException {
+ if (uniqueKey != null) {
+ in = new SequenceInputStream(in,
+ new ByteArrayInputStream(uniqueKey));
+ }
+
+ blob = blobFactory.createBlob(in);
+ blobModified = true;
+ }
+
+ private static byte[] readUniqueKey(NodeBuilder file) {
+ if (file.hasProperty(OakDirectory.PROP_UNIQUE_KEY)) {
+ String key = file.getString(OakDirectory.PROP_UNIQUE_KEY);
+ return StringUtils.convertHexToBytes(key);
+ }
+ return null;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (blobModified) {
+ file.setProperty(JCR_LASTMODIFIED, System.currentTimeMillis());
+ file.setProperty(JCR_DATA, blob, BINARY);
+ blobModified = false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+}
Propchange:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakStreamingIndexFile.java
------------------------------------------------------------------------------
svn:eol-style = native