Author: catholicon
Date: Tue Sep 5 03:56:06 2017
New Revision: 1807310
URL: http://svn.apache.org/viewvc?rev=1807310&view=rev
Log:
OAK-6576: Refactor OakDirectory to be more manageable
Added:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/BlobFactory.java
(with props)
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexFile.java
(with props)
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexInput.java
(with props)
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexOutput.java
(with props)
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectory.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectoryTest.java
Added:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/BlobFactory.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/BlobFactory.java?rev=1807310&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/BlobFactory.java
(added)
+++
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/BlobFactory.java
Tue Sep 5 03:56:06 2017
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.lucene;
+
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
+import org.apache.jackrabbit.oak.spi.blob.BlobOptions;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static
org.apache.jackrabbit.oak.spi.blob.BlobOptions.UploadType.SYNCHRONOUS;
+
+public interface BlobFactory {
+ Boolean ENABLE_ASYNC_DS = Boolean.getBoolean("oak.lucene.ds.async");
+
+ Blob createBlob(InputStream in) throws IOException;
+
+ static BlobFactory getNodeBuilderBlobFactory(final NodeBuilder builder) {
+ return builder::createBlob;
+ }
+
+ static BlobFactory getBlobStoreBlobFactory(final BlobStore store) {
+ return in -> {
+ String blobId;
+ if (!ENABLE_ASYNC_DS) {
+ blobId = store.writeBlob(in, new
BlobOptions().setUpload(SYNCHRONOUS));
+ } else {
+ blobId = store.writeBlob(in);
+ }
+ return new BlobStoreBlob(store, blobId);
+ };
+ }
+}
Propchange:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/BlobFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java?rev=1807310&r1=1807309&r2=1807310&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java
(original)
+++
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java
Tue Sep 5 03:56:06 2017
@@ -16,38 +16,17 @@
*/
package org.apache.jackrabbit.oak.plugins.index.lucene;
-import java.io.ByteArrayInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.SequenceInputStream;
-import java.security.SecureRandom;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import com.google.common.io.ByteStreams;
-import com.google.common.primitives.Ints;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.commons.StringUtils;
-import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
+import org.apache.jackrabbit.oak.commons.benchmark.PerfLogger;
import
org.apache.jackrabbit.oak.plugins.index.lucene.directory.ActiveDeletedBlobCollectorFactory.BlobDeletionCallback;
-import org.apache.jackrabbit.oak.spi.blob.BlobOptions;
-import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
-import org.apache.jackrabbit.oak.commons.benchmark.PerfLogger;
-import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
@@ -55,32 +34,30 @@ import org.apache.lucene.store.IndexOutp
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.NoLockFactory;
-import org.apache.lucene.util.WeakIdentityMap;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.SecureRandom;
+import java.util.Collection;
+import java.util.Set;
+
import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkElementIndex;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkPositionIndexes;
-import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.collect.Lists.newArrayList;
import static org.apache.jackrabbit.JcrConstants.JCR_DATA;
-import static org.apache.jackrabbit.JcrConstants.JCR_LASTMODIFIED;
import static org.apache.jackrabbit.oak.api.Type.BINARIES;
import static org.apache.jackrabbit.oak.api.Type.BINARY;
import static org.apache.jackrabbit.oak.api.Type.STRINGS;
import static
org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.INDEX_DATA_CHILD_NAME;
import static
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
import static
org.apache.jackrabbit.oak.plugins.memory.PropertyStates.createProperty;
-import static
org.apache.jackrabbit.oak.spi.blob.BlobOptions.UploadType.SYNCHRONOUS;
/**
* Implementation of the Lucene {@link Directory} (a flat list of files)
* based on an Oak {@link NodeBuilder}.
*/
public class OakDirectory extends Directory {
- private static final Boolean ENABLE_AYNC_DS =
Boolean.getBoolean("oak.lucene.ds.async");
-
static final PerfLogger PERF_LOGGER = new
PerfLogger(LoggerFactory.getLogger(OakDirectory.class.getName() + ".perf"));
static final String PROP_DIR_LISTING = "dirListing";
static final String PROP_BLOB_SIZE = "blobSize";
@@ -107,7 +84,7 @@ public class OakDirectory extends Direct
}
public OakDirectory(NodeBuilder builder, String dataNodeName,
IndexDefinition definition, boolean readOnly) {
- this(builder, dataNodeName, definition, readOnly, new
NodeBuilderBlobFactory(builder));
+ this(builder, dataNodeName, definition, readOnly,
BlobFactory.getNodeBuilderBlobFactory(builder));
}
public OakDirectory(NodeBuilder builder, String dataNodeName,
IndexDefinition definition,
@@ -119,7 +96,7 @@ public class OakDirectory extends Direct
boolean readOnly, @Nullable
GarbageCollectableBlobStore blobStore,
@Nonnull BlobDeletionCallback blobDeletionCallback) {
this(builder, dataNodeName, definition, readOnly,
- blobStore != null ? new BlobStoreBlobFactory(blobStore) : new
NodeBuilderBlobFactory(builder),
+ blobStore != null ?
BlobFactory.getBlobStoreBlobFactory(blobStore) :
BlobFactory.getNodeBuilderBlobFactory(builder),
blobDeletionCallback);
}
@@ -322,456 +299,4 @@ public class OakDirectory extends Direct
return result;
}
- /**
- * Size of the blob entries to which the Lucene files are split.
- * Set to higher than the 4kB inline limit for the BlobStore,
- */
- static final int DEFAULT_BLOB_SIZE = 32 * 1024;
-
- /**
- * A file, which might be split into multiple blobs.
- */
- private static class OakIndexFile {
-
- /**
- * The file name.
- */
- private final String name;
-
- /**
- * The node that contains the data for this file.
- */
- private final NodeBuilder file;
-
- /**
- * The maximum size of each blob.
- */
- private final int blobSize;
-
- /**
- * The current position within the file (for positioned read and write
- * operations).
- */
- private long position = 0;
-
- /**
- * The length of the file.
- */
- private long length;
-
- /**
- * The list of blobs (might be empty).
- * The last blob has a size of 1 up to blobSize.
- * All other blobs have a size of blobSize.
- */
- private List<Blob> data;
-
- /**
- * Whether the data was modified since it was last flushed. If yes, on
a
- * flush, the metadata, and the list of blobs need to be stored.
- */
- private boolean dataModified = false;
-
- /**
- * The index of the currently loaded blob.
- */
- private int index = -1;
-
- /**
- * The data of the currently loaded blob.
- */
- private byte[] blob;
-
- /**
- * The unique key that is used to make the content unique (to allow
removing binaries from the blob store without risking to remove binaries that
are still needed).
- */
- private final byte[] uniqueKey;
-
- /**
- * Whether the currently loaded blob was modified since the blob was
- * flushed.
- */
- private boolean blobModified = false;
-
- private final String dirDetails;
-
- private final BlobFactory blobFactory;
-
- public OakIndexFile(String name, NodeBuilder file, String dirDetails,
- @Nonnull BlobFactory blobFactory) {
- this.name = name;
- this.file = file;
- this.dirDetails = dirDetails;
- this.blobSize = determineBlobSize(file);
- this.uniqueKey = readUniqueKey(file);
- this.blob = new byte[blobSize];
- this.blobFactory = checkNotNull(blobFactory);
-
- PropertyState property = file.getProperty(JCR_DATA);
- if (property != null && property.getType() == BINARIES) {
- this.data = newArrayList(property.getValue(BINARIES));
- } else {
- this.data = newArrayList();
- }
-
- this.length = (long)data.size() * blobSize;
- if (!data.isEmpty()) {
- Blob last = data.get(data.size() - 1);
- this.length -= blobSize - last.length();
- if (uniqueKey != null) {
- this.length -= uniqueKey.length;
- }
- }
- }
-
- private OakIndexFile(OakIndexFile that) {
- this.name = that.name;
- this.file = that.file;
- this.dirDetails = that.dirDetails;
- this.blobSize = that.blobSize;
- this.uniqueKey = that.uniqueKey;
- this.blob = new byte[blobSize];
-
- this.position = that.position;
- this.length = that.length;
- this.data = newArrayList(that.data);
- this.dataModified = that.dataModified;
- this.blobFactory = that.blobFactory;
- }
-
- private void loadBlob(int i) throws IOException {
- checkElementIndex(i, data.size());
- if (index != i) {
- flushBlob();
- checkState(!blobModified);
-
- int n = (int) Math.min(blobSize, length - (long)i * blobSize);
- InputStream stream = data.get(i).getNewStream();
- try {
- ByteStreams.readFully(stream, blob, 0, n);
- } finally {
- stream.close();
- }
- index = i;
- }
- }
-
- private void flushBlob() throws IOException {
- if (blobModified) {
- int n = (int) Math.min(blobSize, length - (long)index *
blobSize);
- InputStream in = new ByteArrayInputStream(blob, 0, n);
- if (uniqueKey != null) {
- in = new SequenceInputStream(in,
- new ByteArrayInputStream(uniqueKey));
- }
-
- Blob b = blobFactory.createBlob(in);
- if (index < data.size()) {
- data.set(index, b);
- } else {
- checkState(index == data.size());
- data.add(b);
- }
- dataModified = true;
- blobModified = false;
- }
- }
-
- public void seek(long pos) throws IOException {
- // seek() may be called with pos == length
- // see https://issues.apache.org/jira/browse/LUCENE-1196
- if (pos < 0 || pos > length) {
- String msg = String.format("Invalid seek request for [%s][%s],
" +
- "position: %d, file length: %d", dirDetails, name,
pos, length);
- throw new IOException(msg);
- } else {
- position = pos;
- }
- }
-
- public void readBytes(byte[] b, int offset, int len)
- throws IOException {
- checkPositionIndexes(offset, offset + len, checkNotNull(b).length);
-
- if (len < 0 || position + len > length) {
- String msg = String.format("Invalid byte range request for
[%s][%s], " +
- "position: %d, file length: %d, len: %d", dirDetails,
name, position, length, len);
- throw new IOException(msg);
- }
-
- int i = (int) (position / blobSize);
- int o = (int) (position % blobSize);
- while (len > 0) {
- loadBlob(i);
-
- int l = Math.min(len, blobSize - o);
- System.arraycopy(blob, o, b, offset, l);
-
- offset += l;
- len -= l;
- position += l;
- // next block
- i++;
- // for the next block, we read from the beginning
- o = 0;
- }
- }
-
- public void writeBytes(byte[] b, int offset, int len)
- throws IOException {
- int i = (int) (position / blobSize);
- int o = (int) (position % blobSize);
- while (len > 0) {
- int l = Math.min(len, blobSize - o);
-
- if (index != i) {
- if (o > 0 || (l < blobSize && position + l < length)) {
- // loadBlob first flushes the previous block,
- // and it sets the index
- loadBlob(i);
- } else {
- // we don't need to load the block,
- // as we anyway overwrite it fully, if:
- // o == 0 (start writing at a block boundary)
- // and either: l is the blockSize, or
- // we write at least to the end of the file
- flushBlob();
- index = i;
- }
- }
- System.arraycopy(b, offset, blob, o, l);
- blobModified = true;
-
- offset += l;
- len -= l;
- position += l;
- length = Math.max(length, position);
-
- i++;
- o = 0;
- }
- }
-
- private static int determineBlobSize(NodeBuilder file){
- if (file.hasProperty(PROP_BLOB_SIZE)){
- return
Ints.checkedCast(file.getProperty(PROP_BLOB_SIZE).getValue(Type.LONG));
- }
- return DEFAULT_BLOB_SIZE;
- }
-
- private static byte[] readUniqueKey(NodeBuilder file) {
- if (file.hasProperty(PROP_UNIQUE_KEY)) {
- String key = file.getString(PROP_UNIQUE_KEY);
- return StringUtils.convertHexToBytes(key);
- }
- return null;
- }
-
- public void flush() throws IOException {
- flushBlob();
- if (dataModified) {
- file.setProperty(JCR_LASTMODIFIED, System.currentTimeMillis());
- file.setProperty(JCR_DATA, data, BINARIES);
- dataModified = false;
- }
- }
-
- @Override
- public String toString() {
- return name;
- }
-
- public String getName() {
- return name;
- }
- }
-
- private static class OakIndexInput extends IndexInput {
-
- private final OakIndexFile file;
- private boolean isClone = false;
- private final WeakIdentityMap<OakIndexInput, Boolean> clones;
- private final String dirDetails;
-
- public OakIndexInput(String name, NodeBuilder file, String dirDetails,
- BlobFactory blobFactory) {
- super(name);
- this.dirDetails = dirDetails;
- this.file = new OakIndexFile(name, file, dirDetails, blobFactory);
- clones = WeakIdentityMap.newConcurrentHashMap();
- }
-
- private OakIndexInput(OakIndexInput that) {
- super(that.toString());
- this.file = new OakIndexFile(that.file);
- clones = null;
- this.dirDetails = that.dirDetails;
- }
-
- @Override
- public OakIndexInput clone() {
- // TODO : shouldn't we call super#clone ?
- OakIndexInput clonedIndexInput = new OakIndexInput(this);
- clonedIndexInput.isClone = true;
- if (clones != null) {
- clones.put(clonedIndexInput, Boolean.TRUE);
- }
- return clonedIndexInput;
- }
-
- @Override
- public void readBytes(byte[] b, int o, int n) throws IOException {
- checkNotClosed();
- file.readBytes(b, o, n);
- }
-
- @Override
- public byte readByte() throws IOException {
- checkNotClosed();
- byte[] b = new byte[1];
- readBytes(b, 0, 1);
- return b[0];
- }
-
- @Override
- public void seek(long pos) throws IOException {
- checkNotClosed();
- file.seek(pos);
- }
-
- @Override
- public long length() {
- checkNotClosed();
- return file.length;
- }
-
- @Override
- public long getFilePointer() {
- checkNotClosed();
- return file.position;
- }
-
- @Override
- public void close() {
- file.blob = null;
- file.data = null;
-
- if (clones != null) {
- for (Iterator<OakIndexInput> it = clones.keyIterator();
it.hasNext();) {
- final OakIndexInput clone = it.next();
- assert clone.isClone;
- clone.close();
- }
- }
- }
-
- private void checkNotClosed() {
- if (file.blob == null && file.data == null) {
- throw new AlreadyClosedException("Already closed: [" +
dirDetails + "] " + this);
- }
- }
-
- }
-
- private final class OakIndexOutput extends IndexOutput {
- private final String dirDetails;
- private final OakIndexFile file;
-
- public OakIndexOutput(String name, NodeBuilder file, String dirDetails,
- BlobFactory blobFactory) throws IOException {
- this.dirDetails = dirDetails;
- this.file = new OakIndexFile(name, file, dirDetails, blobFactory);
- }
-
- @Override
- public long length() {
- return file.length;
- }
-
- @Override
- public long getFilePointer() {
- return file.position;
- }
-
- @Override
- public void seek(long pos) throws IOException {
- file.seek(pos);
- }
-
- @Override
- public void writeBytes(byte[] b, int offset, int length)
- throws IOException {
- try {
- file.writeBytes(b, offset, length);
- } catch (IOException e) {
- throw wrapWithDetails(e);
- }
- }
-
- @Override
- public void writeByte(byte b) throws IOException {
- writeBytes(new byte[] { b }, 0, 1);
- }
-
- @Override
- public void flush() throws IOException {
- try {
- file.flush();
- } catch (IOException e) {
- throw wrapWithDetails(e);
- }
- }
-
- @Override
- public void close() throws IOException {
- flush();
- file.blob = null;
- file.data = null;
- }
-
- private IOException wrapWithDetails(IOException e) {
- String msg = String.format("Error occurred while writing to blob
[%s][%s]", dirDetails, file.getName());
- return new IOException(msg, e);
- }
-
- }
-
- public interface BlobFactory {
-
- Blob createBlob(InputStream in) throws IOException;
- }
-
- public static final class NodeBuilderBlobFactory implements BlobFactory {
-
- private final NodeBuilder builder;
-
- public NodeBuilderBlobFactory(NodeBuilder builder) {
- this.builder = builder;
- }
-
- @Override
- public Blob createBlob(InputStream in) throws IOException {
- return builder.createBlob(in);
- }
- }
-
- public static final class BlobStoreBlobFactory implements BlobFactory {
-
- private final BlobStore store;
-
- public BlobStoreBlobFactory(BlobStore store) {
- this.store = store;
- }
-
- @Override
- public Blob createBlob(InputStream in) throws IOException {
- String blobId;
- if (!ENABLE_AYNC_DS) {
- blobId = store.writeBlob(in, new
BlobOptions().setUpload(SYNCHRONOUS));
- } else {
- blobId = store.writeBlob(in);
- }
- return new BlobStoreBlob(store, blobId);
- }
- }
}
Added:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexFile.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexFile.java?rev=1807310&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexFile.java
(added)
+++
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexFile.java
Tue Sep 5 03:56:06 2017
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.lucene;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.primitives.Ints;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.commons.StringUtils;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.SequenceInputStream;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkElementIndex;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkPositionIndexes;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Lists.newArrayList;
+import static org.apache.jackrabbit.JcrConstants.JCR_DATA;
+import static org.apache.jackrabbit.JcrConstants.JCR_LASTMODIFIED;
+import static org.apache.jackrabbit.oak.api.Type.BINARIES;
+
+/**
+ * A file, which might be split into multiple blobs.
+ */
+class OakIndexFile {
+ static Logger LOG = LoggerFactory.getLogger(OakIndexFile.class);
+
+ /**
+ * Size of the blob entries to which the Lucene files are split.
+ * Set to higher than the 4kB inline limit for the BlobStore,
+ */
+ static final int DEFAULT_BLOB_SIZE = 32 * 1024;
+
+ /**
+ * The file name.
+ */
+ private final String name;
+
+ /**
+ * The node that contains the data for this file.
+ */
+ private final NodeBuilder file;
+
+ /**
+ * The maximum size of each blob.
+ */
+ private final int blobSize;
+
+ /**
+ * The current position within the file (for positioned read and write
+ * operations).
+ */
+ private long position = 0;
+
+ /**
+ * The length of the file.
+ */
+ private long length;
+
+ /**
+ * The list of blobs (might be empty).
+ * The last blob has a size of 1 up to blobSize.
+ * All other blobs have a size of blobSize.
+ */
+ private List<Blob> data;
+
+ /**
+ * Whether the data was modified since it was last flushed. If yes, on a
+ * flush, the metadata, and the list of blobs need to be stored.
+ */
+ private boolean dataModified = false;
+
+ /**
+ * The index of the currently loaded blob.
+ */
+ private int index = -1;
+
+ /**
+ * The data of the currently loaded blob.
+ */
+ private byte[] blob;
+
+ /**
+ * The unique key that is used to make the content unique (to allow
removing binaries from the blob store without risking to remove binaries that
are still needed).
+ */
+ private final byte[] uniqueKey;
+
+ /**
+ * Whether the currently loaded blob was modified since the blob was
+ * flushed.
+ */
+ private boolean blobModified = false;
+
+ private final String dirDetails;
+
+ private final BlobFactory blobFactory;
+
+ public OakIndexFile(String name, NodeBuilder file, String dirDetails,
+ @Nonnull BlobFactory blobFactory) {
+ this.name = name;
+ this.file = file;
+ this.dirDetails = dirDetails;
+ this.blobSize = determineBlobSize(file);
+ this.uniqueKey = readUniqueKey(file);
+ this.blob = new byte[blobSize];
+ this.blobFactory = checkNotNull(blobFactory);
+
+ PropertyState property = file.getProperty(JCR_DATA);
+ if (property != null && property.getType() == BINARIES) {
+ this.data = newArrayList(property.getValue(BINARIES));
+ } else {
+ this.data = newArrayList();
+ }
+
+ this.length = (long)data.size() * blobSize;
+ if (!data.isEmpty()) {
+ Blob last = data.get(data.size() - 1);
+ this.length -= blobSize - last.length();
+ if (uniqueKey != null) {
+ this.length -= uniqueKey.length;
+ }
+ }
+ }
+
+ private OakIndexFile(OakIndexFile that) {
+ this.name = that.name;
+ this.file = that.file;
+ this.dirDetails = that.dirDetails;
+ this.blobSize = that.blobSize;
+ this.uniqueKey = that.uniqueKey;
+ this.blob = new byte[blobSize];
+
+ this.position = that.position;
+ this.length = that.length;
+ this.data = newArrayList(that.data);
+ this.dataModified = that.dataModified;
+ this.blobFactory = that.blobFactory;
+ }
+
+ private void loadBlob(int i) throws IOException {
+ checkElementIndex(i, data.size());
+ if (index != i) {
+ LOG.info("Load {}th blob from {}", i, name);
+ flushBlob();
+ checkState(!blobModified);
+
+ int n = (int) Math.min(blobSize, length - (long)i * blobSize);
+ InputStream stream = data.get(i).getNewStream();
+ try {
+ ByteStreams.readFully(stream, blob, 0, n);
+ } finally {
+ stream.close();
+ }
+ index = i;
+ }
+ }
+
+ private void flushBlob() throws IOException {
+ LOG.info("Flushing blob {}. Modified: {}", name, blobModified);
+ if (blobModified) {
+ int n = (int) Math.min(blobSize, length - (long)index * blobSize);
+ InputStream in = new ByteArrayInputStream(blob, 0, n);
+ if (uniqueKey != null) {
+ in = new SequenceInputStream(in,
+ new ByteArrayInputStream(uniqueKey));
+ }
+
+ Blob b = blobFactory.createBlob(in);
+ if (index < data.size()) {
+ data.set(index, b);
+ } else {
+ checkState(index == data.size());
+ data.add(b);
+ }
+ dataModified = true;
+ blobModified = false;
+ }
+ }
+
+ /**
+ * Duplicates this instance to be used by a different consumer/thread.
+ * State of the cloned instance is same as original. Once cloned, the
states
+ * would change separately according to how are they accessed.
+ *
+ * @return cloned instance
+ */
+ public OakIndexFile clone() {
+ return new OakIndexFile(this);
+ }
+
+ /**
+ * @return length of index file
+ */
+ public long length() {
+ return length;
+ }
+
+ /**
+ * @return current location of access
+ */
+ public long position() {
+ return position;
+ }
+
+ public void close() {
+ this.blob = null;
+ this.data = null;
+ }
+
+ public boolean isClosed() {
+ return blob == null && data == null;
+ }
+
+ /**
+ * Seek current location of access to {@code pos}
+ * @param pos
+ * @throws IOException
+ */
+ public void seek(long pos) throws IOException {
+ LOG.info("Seek to {} in {}", pos, name);
+ // seek() may be called with pos == length
+ // see https://issues.apache.org/jira/browse/LUCENE-1196
+ if (pos < 0 || pos > length) {
+ String msg = String.format("Invalid seek request for [%s][%s], " +
+ "position: %d, file length: %d", dirDetails, name, pos,
length);
+ throw new IOException(msg);
+ } else {
+ position = pos;
+ }
+ }
+
+ /**
+ * Read {@code len} number of bytes from underlying storage and copy them
+ * into byte array {@code b} starting at {@code offset}
+ * @param b byte array to copy contents read from storage
+ * @param offset index into {@code b} where the copy begins
+ * @param len numeber of bytes to be read from storage
+ * @throws IOException
+ */
+ public void readBytes(byte[] b, int offset, int len)
+ throws IOException {
+ checkPositionIndexes(offset, offset + len, checkNotNull(b).length);
+ LOG.info("read {} bytes from {}", len, name);
+
+ if (len < 0 || position + len > length) {
+ String msg = String.format("Invalid byte range request for
[%s][%s], " +
+ "position: %d, file length: %d, len: %d", dirDetails,
name, position, length, len);
+ throw new IOException(msg);
+ }
+
+ int i = (int) (position / blobSize);
+ int o = (int) (position % blobSize);
+ while (len > 0) {
+ loadBlob(i);
+
+ int l = Math.min(len, blobSize - o);
+ System.arraycopy(blob, o, b, offset, l);
+
+ offset += l;
+ len -= l;
+ position += l;
+ // next block
+ i++;
+ // for the next block, we read from the beginning
+ o = 0;
+ }
+ }
+
+ /**
+ * Writes {@code len} number of bytes from byte array {@code b}
+ * starting at {@code offset} into the underlying storage
+ * @param b byte array to copy contents into the storage
+ * @param offset index into {@code b} where the copy begins
+ * @param len numeber of bytes to be written to storage
+ * @throws IOException
+ */
+ public void writeBytes(byte[] b, int offset, int len)
+ throws IOException {
+ LOG.info("Write {} bytes at {} in {}", len, offset, name);
+ int i = (int) (position / blobSize);
+ int o = (int) (position % blobSize);
+ while (len > 0) {
+ int l = Math.min(len, blobSize - o);
+
+ if (index != i) {
+ if (o > 0 || (l < blobSize && position + l < length)) {
+ // loadBlob first flushes the previous block,
+ // and it sets the index
+ loadBlob(i);
+ } else {
+ // we don't need to load the block,
+ // as we anyway overwrite it fully, if:
+ // o == 0 (start writing at a block boundary)
+ // and either: l is the blockSize, or
+ // we write at least to the end of the file
+ flushBlob();
+ index = i;
+ }
+ }
+ System.arraycopy(b, offset, blob, o, l);
+ blobModified = true;
+
+ offset += l;
+ len -= l;
+ position += l;
+ length = Math.max(length, position);
+
+ i++;
+ o = 0;
+ }
+ }
+
+ private static int determineBlobSize(NodeBuilder file){
+ if (file.hasProperty(OakDirectory.PROP_BLOB_SIZE)){
+ return
Ints.checkedCast(file.getProperty(OakDirectory.PROP_BLOB_SIZE).getValue(Type.LONG));
+ }
+ return DEFAULT_BLOB_SIZE;
+ }
+
+ private static byte[] readUniqueKey(NodeBuilder file) {
+ if (file.hasProperty(OakDirectory.PROP_UNIQUE_KEY)) {
+ String key = file.getString(OakDirectory.PROP_UNIQUE_KEY);
+ return StringUtils.convertHexToBytes(key);
+ }
+ return null;
+ }
+
+ /**
+ * Flushes the content into storage. Before calling this method, written
+ * content only exist in memory
+ * @throws IOException
+ */
+ public void flush() throws IOException {
+ flushBlob();
+ if (dataModified) {
+ file.setProperty(JCR_LASTMODIFIED, System.currentTimeMillis());
+ file.setProperty(JCR_DATA, data, BINARIES);
+ dataModified = false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+
+ /**
+ * @return name of the index being accessed
+ */
+ public String getName() {
+ return name;
+ }
+}
Propchange:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexFile.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexInput.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexInput.java?rev=1807310&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexInput.java
(added)
+++
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexInput.java
Tue Sep 5 03:56:06 2017
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.lucene;
+
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.util.WeakIdentityMap;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+class OakIndexInput extends IndexInput {
+
+ private final OakIndexFile file;
+ private boolean isClone = false;
+ private final WeakIdentityMap<OakIndexInput, Boolean> clones;
+ private final String dirDetails;
+
+ public OakIndexInput(String name, NodeBuilder file, String dirDetails,
+ BlobFactory blobFactory) {
+ super(name);
+ this.dirDetails = dirDetails;
+ this.file = new OakIndexFile(name, file, dirDetails, blobFactory);
+ clones = WeakIdentityMap.newConcurrentHashMap();
+ }
+
+ private OakIndexInput(OakIndexInput that) {
+ super(that.toString());
+ this.file = that.file.clone();
+ clones = null;
+ this.dirDetails = that.dirDetails;
+ }
+
+ @Override
+ public OakIndexInput clone() {
+ // TODO : shouldn't we call super#clone ?
+ OakIndexInput clonedIndexInput = new OakIndexInput(this);
+ clonedIndexInput.isClone = true;
+ if (clones != null) {
+ clones.put(clonedIndexInput, Boolean.TRUE);
+ }
+ return clonedIndexInput;
+ }
+
+ @Override
+ public void readBytes(byte[] b, int o, int n) throws IOException {
+ checkNotClosed();
+ file.readBytes(b, o, n);
+ }
+
+ @Override
+ public byte readByte() throws IOException {
+ checkNotClosed();
+ byte[] b = new byte[1];
+ readBytes(b, 0, 1);
+ return b[0];
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ checkNotClosed();
+ file.seek(pos);
+ }
+
+ @Override
+ public long length() {
+ checkNotClosed();
+ return file.length();
+ }
+
+ @Override
+ public long getFilePointer() {
+ checkNotClosed();
+ return file.position();
+ }
+
+ @Override
+ public void close() {
+ file.close();
+
+ if (clones != null) {
+ for (Iterator<OakIndexInput> it = clones.keyIterator();
it.hasNext();) {
+ final OakIndexInput clone = it.next();
+ assert clone.isClone;
+ clone.close();
+ }
+ }
+ }
+
+ private void checkNotClosed() {
+ if (file.isClosed()) {
+ throw new AlreadyClosedException("Already closed: [" + dirDetails
+ "] " + this);
+ }
+ }
+
+}
Propchange:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexInput.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexOutput.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexOutput.java?rev=1807310&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexOutput.java
(added)
+++
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexOutput.java
Tue Sep 5 03:56:06 2017
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.lucene;
+
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.lucene.store.IndexOutput;
+
+import java.io.IOException;
+
+final class OakIndexOutput extends IndexOutput {
+ private final String dirDetails;
+ private final OakIndexFile file;
+
+ public OakIndexOutput(String name, NodeBuilder file, String dirDetails,
+ BlobFactory blobFactory) throws IOException {
+ this.dirDetails = dirDetails;
+ this.file = new OakIndexFile(name, file, dirDetails, blobFactory);
+ }
+
+ @Override
+ public long length() {
+ return file.length();
+ }
+
+ @Override
+ public long getFilePointer() {
+ return file.position();
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ file.seek(pos);
+ }
+
+ @Override
+ public void writeBytes(byte[] b, int offset, int length)
+ throws IOException {
+ try {
+ file.writeBytes(b, offset, length);
+ } catch (IOException e) {
+ throw wrapWithDetails(e);
+ }
+ }
+
+ @Override
+ public void writeByte(byte b) throws IOException {
+ writeBytes(new byte[] { b }, 0, 1);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ try {
+ file.flush();
+ } catch (IOException e) {
+ throw wrapWithDetails(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ flush();
+ file.close();
+ }
+
+ private IOException wrapWithDetails(IOException e) {
+ String msg = String.format("Error occurred while writing to blob
[%s][%s]", dirDetails, file.getName());
+ return new IOException(msg, e);
+ }
+
+}
Propchange:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakIndexOutput.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectory.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectory.java?rev=1807310&r1=1807309&r2=1807310&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectory.java
(original)
+++
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectory.java
Tue Sep 5 03:56:06 2017
@@ -16,21 +16,11 @@
*/
package org.apache.jackrabbit.oak.plugins.index.lucene.directory;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Set;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
import com.google.common.collect.Sets;
-
-import
org.apache.jackrabbit.oak.plugins.index.lucene.directory.ActiveDeletedBlobCollectorFactory.BlobDeletionCallback;
+import org.apache.jackrabbit.oak.plugins.index.lucene.BlobFactory;
import org.apache.jackrabbit.oak.plugins.index.lucene.IndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.lucene.OakDirectory;
-import org.apache.jackrabbit.oak.plugins.index.lucene.OakDirectory.BlobFactory;
+import
org.apache.jackrabbit.oak.plugins.index.lucene.directory.ActiveDeletedBlobCollectorFactory.BlobDeletionCallback;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.lucene.store.Directory;
@@ -42,6 +32,14 @@ import org.apache.lucene.store.LockFacto
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Set;
+
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Arrays.asList;
import static
org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
@@ -89,8 +87,8 @@ public final class BufferedOakDirectory
@Nullable BlobStore blobStore,
@Nonnull BlobDeletionCallback
blobDeletionCallback) {
this.blobFactory = blobStore != null ?
- new OakDirectory.BlobStoreBlobFactory(blobStore) :
- new OakDirectory.NodeBuilderBlobFactory(builder);
+ BlobFactory.getBlobStoreBlobFactory(blobStore) :
+ BlobFactory.getNodeBuilderBlobFactory(builder);
this.blobDeletionCallback = blobDeletionCallback;
this.dataNodeName = checkNotNull(dataNodeName);
this.definition = checkNotNull(definition);
Modified:
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectoryTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectoryTest.java?rev=1807310&r1=1807309&r2=1807310&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectoryTest.java
(original)
+++
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectoryTest.java
Tue Sep 5 03:56:06 2017
@@ -104,9 +104,9 @@ public class OakDirectoryTest {
@Test
public void testCompatibility() throws Exception{
- builder.setProperty(LuceneIndexConstants.BLOB_SIZE,
OakDirectory.DEFAULT_BLOB_SIZE);
+ builder.setProperty(LuceneIndexConstants.BLOB_SIZE,
OakIndexFile.DEFAULT_BLOB_SIZE);
Directory dir = createDir(builder, false, "/foo");
- byte[] data = assertWrites(dir, OakDirectory.DEFAULT_BLOB_SIZE);
+ byte[] data = assertWrites(dir, OakIndexFile.DEFAULT_BLOB_SIZE);
NodeBuilder testNode =
builder.child(INDEX_DATA_CHILD_NAME).child("test");
//Remove the size property to simulate old behaviour
@@ -473,7 +473,7 @@ public class OakDirectoryTest {
final AtomicBoolean identifiableBlob = new AtomicBoolean(false);
IndexDefinition def = new IndexDefinition(root,
builder.getNodeState(), "/foo");
- OakDirectory.BlobFactory factory = new OakDirectory.BlobFactory() {
+ BlobFactory factory = new BlobFactory() {
@Override
public Blob createBlob(InputStream in) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
@@ -521,7 +521,7 @@ public class OakDirectoryTest {
final AtomicInteger numBlobs = new AtomicInteger();
final int fileSize = 1024;
IndexDefinition def = new IndexDefinition(root,
builder.getNodeState(), "/foo");
- OakDirectory.BlobFactory factory = new OakDirectory.BlobFactory() {
+ BlobFactory factory = new BlobFactory() {
@Override
public Blob createBlob(InputStream in) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();