Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentPropertyState.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentPropertyState.java?rev=1746410&r1=1746409&r2=1746410&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentPropertyState.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentPropertyState.java Wed Jun 1 07:48:51 2016 @@ -62,7 +62,7 @@ import org.apache.jackrabbit.oak.plugins */ public class SegmentPropertyState extends Record implements PropertyState { @Nonnull - private final SegmentStore store; + private final SegmentReader reader; @Nonnull private final String name; @@ -70,17 +70,17 @@ public class SegmentPropertyState extend @Nonnull private final Type<?> type; - SegmentPropertyState(@Nonnull SegmentStore store, @Nonnull RecordId id, + SegmentPropertyState(@Nonnull SegmentReader reader, @Nonnull RecordId id, @Nonnull String name, @Nonnull Type<?> type) { super(id); - this.store = checkNotNull(store); + this.reader = checkNotNull(reader); this.name = checkNotNull(name); this.type = checkNotNull(type); } - SegmentPropertyState(@Nonnull SegmentStore store, @Nonnull RecordId id, + SegmentPropertyState(@Nonnull SegmentReader reader, @Nonnull RecordId id, @Nonnull PropertyTemplate template) { - this(store, id, template.getName(), template.getType()); + this(reader, id, template.getName(), template.getType()); } private ListRecord getValueList(Segment segment) { @@ -106,7 +106,7 @@ public class SegmentPropertyState extend ListRecord values = getValueList(segment); for (int i = 0; i < values.size(); i++) { RecordId valueId = values.getEntry(i); - String value = store.getReader().readString(valueId); + String value = reader.readString(valueId); map.put(value, valueId); } @@ -186,10 +186,10 @@ public class SegmentPropertyState extend @SuppressWarnings("unchecked") private <T> T getValue(RecordId id, Type<T> type) { if (type == BINARY) { - return (T) new SegmentBlob(store, id); // load binaries lazily + return (T) reader.readBlob(id); // load binaries lazily } - String value = store.getReader().readString(id); + String value = reader.readString(id); if (type == STRING || type == URI || type == DATE || type == NAME || type == PATH || type == REFERENCE || type == WEAKREFERENCE) { @@ -222,7 +222,7 @@ public class SegmentPropertyState extend RecordId entry = values.getEntry(index); if (getType().equals(BINARY) || getType().equals(BINARIES)) { - return new SegmentBlob(store, entry).length(); + return reader.readBlob(entry).length(); } return getSegment().readLength(entry);
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentReader.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentReader.java?rev=1746410&r1=1746409&r2=1746410&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentReader.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentReader.java Wed Jun 1 07:48:51 2016 @@ -21,8 +21,6 @@ package org.apache.jackrabbit.oak.segmen import javax.annotation.Nonnull; -import org.apache.jackrabbit.oak.cache.CacheStats; - public interface SegmentReader { @Nonnull String readString(@Nonnull RecordId id); @@ -33,7 +31,18 @@ public interface SegmentReader { @Nonnull Template readTemplate(@Nonnull RecordId id); - // FIXME OAK-4373 remove from this interface @Nonnull - CacheStats getStringCacheStats(); + SegmentNodeState readNode(@Nonnull RecordId id); + + @Nonnull + SegmentNodeState readHeadState(); + + @Nonnull + SegmentPropertyState readProperty( + @Nonnull RecordId id, + @Nonnull PropertyTemplate template); + + @Nonnull + SegmentBlob readBlob(@Nonnull RecordId id); + } Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentReaders.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentReaders.java?rev=1746410&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentReaders.java (added) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentReaders.java Wed Jun 1 07:48:51 2016 @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.segment; + +import javax.annotation.Nonnull; + +import com.google.common.base.Supplier; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.http.HttpStore; +import org.apache.jackrabbit.oak.segment.memory.MemoryStore; + +public final class SegmentReaders { + private SegmentReaders() {} + + @Nonnull + public static SegmentReader segmentReader(@Nonnull FileStore store, long stringCacheMB) { + return new CachingSegmentReader(getWriter(store), store.getRevisions(), store.getBlobStore(), stringCacheMB); + } + + private static Supplier<SegmentWriter> getWriter(final FileStore store) { + return new Supplier<SegmentWriter>() { + @Override + public SegmentWriter get() { + return store.getWriter(); + } + }; + } + + @Nonnull + public static SegmentReader segmentReader(@Nonnull MemoryStore store, long stringCacheMB) { + return new CachingSegmentReader(getWriter(store), store.getRevisions(), store.getBlobStore(), stringCacheMB); + } + + private static Supplier<SegmentWriter> getWriter(final MemoryStore store) { + return new Supplier<SegmentWriter>() { + @Override + public SegmentWriter get() { + return store.getWriter(); + } + }; + } + + @Nonnull + public static SegmentReader segmentReader(@Nonnull HttpStore store, long stringCacheMB) { + return new CachingSegmentReader(getWriter(store), store.getRevisions(), store.getBlobStore(), stringCacheMB); + } + + private static Supplier<SegmentWriter> getWriter(final HttpStore store) { + return new Supplier<SegmentWriter>() { + @Override + public SegmentWriter get() { + return store.getWriter(); + } + }; + } + +} Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentStore.java?rev=1746410&r1=1746409&r2=1746410&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentStore.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentStore.java Wed Jun 1 07:48:51 2016 @@ -18,38 +18,14 @@ */ package org.apache.jackrabbit.oak.segment; -import java.io.Closeable; import java.io.IOException; -import javax.annotation.CheckForNull; import javax.annotation.Nonnull; -import org.apache.jackrabbit.oak.api.Blob; -import org.apache.jackrabbit.oak.spi.blob.BlobStore; - /** * The backend storage interface used by the segment node store. */ -public interface SegmentStore extends Closeable { - - @Nonnull - SegmentTracker getTracker(); - - @Nonnull - SegmentWriter getWriter(); - - @Nonnull - SegmentReader getReader(); - - /** - * Returns the head state. - * - * @return head state - */ - @Nonnull - SegmentNodeState getHead(); - - boolean setHead(SegmentNodeState base, SegmentNodeState head); +public interface SegmentStore { /** * Checks whether the identified segment exists in this store. @@ -78,25 +54,4 @@ public interface SegmentStore extends Cl */ void writeSegment(SegmentId id, byte[] bytes, int offset, int length) throws IOException; - void close(); - - /** - * Read a blob from external storage. - * - * @param reference blob reference - * @return external blob - */ - Blob readBlob(String reference); - - /** - * Returns the external BlobStore (if configured) with this store - */ - @CheckForNull - BlobStore getBlobStore(); - - /** - * Triggers removal of segments that are no longer referenceable. - */ - void gc(); - } Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java?rev=1746410&r1=1746409&r2=1746410&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java Wed Jun 1 07:48:51 2016 @@ -80,9 +80,6 @@ import org.slf4j.LoggerFactory; * {@code WriteOperationHandler} passed to the constructor is thread safe. */ public class SegmentWriter { -// FIXME OAK-4102: Break cyclic dependency of FileStore and SegmentTracker -// Improve the way how SegmentWriter instances are created. (OAK-4102) - private static final Logger LOG = LoggerFactory.getLogger(SegmentWriter.class); static final int BLOCK_SIZE = 1 << 12; // 4kB @@ -94,6 +91,15 @@ public class SegmentWriter { private final SegmentStore store; @Nonnull + private final SegmentReader reader; + + @CheckForNull + private final BlobStore blobStore; + + @Nonnull + private final SegmentTracker tracker; + + @Nonnull private final WriteOperationHandler writeOperationHandler; /** @@ -101,11 +107,20 @@ public class SegmentWriter { * pointed out in the class comment. * * @param store store to write to + * @param reader segment reader for the {@code store} + * @param blobStore the blog store or {@code null} for inlined blobs + * @param tracker segment tracker for {@code store} * @param writeOperationHandler handler for write operations. */ public SegmentWriter(@Nonnull SegmentStore store, + @Nonnull SegmentReader reader, + @Nullable BlobStore blobStore, + @Nonnull SegmentTracker tracker, @Nonnull WriteOperationHandler writeOperationHandler) { this.store = checkNotNull(store); + this.reader = checkNotNull(reader); + this.blobStore = blobStore; + this.tracker = checkNotNull(tracker); this.writeOperationHandler = checkNotNull(writeOperationHandler); this.cacheManager = new WriterCacheManager(); } @@ -138,7 +153,7 @@ public class SegmentWriter { return with(writer).writeMap(base, changes); } }); - return new MapRecord(store, mapId); + return new MapRecord(reader, mapId); } /** @@ -187,7 +202,7 @@ public class SegmentWriter { return with(writer).writeBlob(blob); } }); - return new SegmentBlob(store, blobId); + return new SegmentBlob(blobStore, blobId); } /** @@ -225,11 +240,11 @@ public class SegmentWriter { return with(writer).writeStream(stream); } }); - return new SegmentBlob(store, blobId); + return new SegmentBlob(blobStore, blobId); } /** - * Write a propery. + * Write a property. * @param state the property to write * @return the property state written * @throws IOException @@ -243,7 +258,7 @@ public class SegmentWriter { return with(writer).writeProperty(state); } }); - return new SegmentPropertyState(store, id, state.getName(), state.getType()); + return new SegmentPropertyState(reader, id, state.getName(), state.getType()); } /** @@ -260,7 +275,7 @@ public class SegmentWriter { return with(writer).writeNode(state, 0); } }); - return new SegmentNodeState(store, nodeId); + return new SegmentNodeState(reader, this, nodeId); } /** @@ -287,7 +302,7 @@ public class SegmentWriter { } }); writeOperationHandler.flush(); - return new SegmentNodeState(store, nodeId); + return new SegmentNodeState(reader, this, nodeId); } catch (SegmentWriteOperation.CancelledWriteException ignore) { return null; } @@ -344,11 +359,11 @@ public class SegmentWriter { if (base != null && base.isDiff()) { Segment segment = base.getSegment(); RecordId key = segment.readRecordId(base.getOffset(8)); - String name = store.getReader().readString(key); + String name = reader.readString(key); if (!changes.containsKey(name)) { changes.put(name, segment.readRecordId(base.getOffset(8, 1))); } - base = new MapRecord(store, segment.readRecordId(base.getOffset(8, 2))); + base = new MapRecord(reader, segment.readRecordId(base.getOffset(8, 2))); } if (base != null && changes.size() == 1) { @@ -384,7 +399,7 @@ public class SegmentWriter { } if (keyId != null) { - entries.add(new MapEntry(store, key, keyId, entry.getValue())); + entries.add(new MapEntry(reader, key, keyId, entry.getValue())); } } return writeMapBucket(base, entries, 0); @@ -497,7 +512,7 @@ public class SegmentWriter { } private MapRecord mapRecordOrNull(RecordId id) { - return id == null ? null : new MapRecord(store, id); + return id == null ? null : new MapRecord(reader, id); } /** @@ -585,7 +600,7 @@ public class SegmentWriter { // write as many full bulk segments as possible while (pos + Segment.MAX_SEGMENT_SIZE <= data.length) { - SegmentId bulkId = store.getTracker().newBulkSegmentId(); + SegmentId bulkId = tracker.newBulkSegmentId(); store.writeSegment(bulkId, data, pos, Segment.MAX_SEGMENT_SIZE); for (int i = 0; i < Segment.MAX_SEGMENT_SIZE; i += BLOCK_SIZE) { blockIds.add(new RecordId(bulkId, i)); @@ -626,8 +641,8 @@ public class SegmentWriter { } String reference = blob.getReference(); - if (reference != null && store.getBlobStore() != null) { - String blobId = store.getBlobStore().getBlobId(reference); + if (reference != null && blobStore != null) { + String blobId = blobStore.getBlobId(reference); if (blobId != null) { return writeBlobId(blobId); } else { @@ -697,7 +712,6 @@ public class SegmentWriter { return writeValueRecord(n, data); } - BlobStore blobStore = store.getBlobStore(); if (blobStore != null) { String blobId = blobStore.writeBlob(new SequenceInputStream( new ByteArrayInputStream(data, 0, n), stream)); @@ -712,7 +726,7 @@ public class SegmentWriter { // Write the data to bulk segments and collect the list of block ids while (n != 0) { - SegmentId bulkId = store.getTracker().newBulkSegmentId(); + SegmentId bulkId = tracker.newBulkSegmentId(); int len = Segment.align(n, 1 << Segment.RECORD_ALIGN_BITS); LOG.debug("Writing bulk segment {} ({} bytes)", bulkId, n); store.writeSegment(bulkId, data, 0, len); @@ -893,7 +907,7 @@ public class SegmentWriter { } List<RecordId> ids = newArrayList(); - Template template = new Template(store, state); + Template template = new Template(reader, state); if (template.equals(beforeTemplate)) { ids.add(before.getTemplateId()); } else { Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriters.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriters.java?rev=1746410&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriters.java (added) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriters.java Wed Jun 1 07:48:51 2016 @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.segment; + +import javax.annotation.Nonnull; + +import com.google.common.base.Supplier; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.http.HttpStore; +import org.apache.jackrabbit.oak.segment.memory.MemoryStore; + +public final class SegmentWriters { + private SegmentWriters() {} + + @Nonnull + public static SegmentWriter pooledSegmentWriter(@Nonnull FileStore store, + @Nonnull SegmentVersion version, + @Nonnull String name, + @Nonnull Supplier<Integer> generation) { + return new SegmentWriter(store, store.getReader(), store.getBlobStore(), store.getTracker(), + new SegmentBufferWriterPool(store, store.getTracker(), store.getReader(), version, name, generation)); + } + + @Nonnull + public static SegmentWriter pooledSegmentWriter(@Nonnull MemoryStore store, + @Nonnull SegmentVersion version, + @Nonnull String name, + @Nonnull Supplier<Integer> generation) { + return new SegmentWriter(store, store.getReader(), store.getBlobStore(), store.getTracker(), + new SegmentBufferWriterPool(store, store.getTracker(), store.getReader(), version, name, generation)); + } + + @Nonnull + public static SegmentWriter pooledSegmentWriter(@Nonnull HttpStore store, + @Nonnull SegmentVersion version, + @Nonnull String name, + @Nonnull Supplier<Integer> generation) { + return new SegmentWriter(store, store.getReader(), store.getBlobStore(), store.getTracker(), + new SegmentBufferWriterPool(store, store.getTracker(), store.getReader(), version, name, generation)); + } + + @Nonnull + public static SegmentWriter segmentWriter(@Nonnull FileStore store, + @Nonnull SegmentVersion version, + @Nonnull String name, + int generation) { + return new SegmentWriter(store, store.getReader(), store.getBlobStore(), store.getTracker(), + new SegmentBufferWriter(store, store.getTracker(), store.getReader(), version, name, generation)); + } + + @Nonnull + public static SegmentWriter segmentWriter(@Nonnull MemoryStore store, + @Nonnull SegmentVersion version, + @Nonnull String name, + int generation) { + return new SegmentWriter(store, store.getReader(), store.getBlobStore(), store.getTracker(), + new SegmentBufferWriter(store, store.getTracker(), store.getReader(), version, name, generation)); + } + + @Nonnull + public static SegmentWriter segmentWriter(@Nonnull HttpStore store, + @Nonnull SegmentVersion version, + @Nonnull String name, + int generation) { + return new SegmentWriter(store, store.getReader(), store.getBlobStore(), store.getTracker(), + new SegmentBufferWriter(store, store.getTracker(), store.getReader(), version, name, generation)); + } + +} Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Template.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Template.java?rev=1746410&r1=1746409&r2=1746410&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Template.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Template.java Wed Jun 1 07:48:51 2016 @@ -59,7 +59,7 @@ public class Template { static final String MANY_CHILD_NODES = ""; @Nonnull - private final SegmentStore store; + private final SegmentReader reader; /** * The {@code jcr:primaryType} property, if present as a single-valued @@ -90,12 +90,12 @@ public class Template { @CheckForNull private final String childName; - Template(@Nonnull SegmentStore store, + Template(@Nonnull SegmentReader reader, @Nullable PropertyState primaryType, @Nullable PropertyState mixinTypes, @Nullable PropertyTemplate[] properties, @Nullable String childName) { - this.store = store; + this.reader = checkNotNull(reader); this.primaryType = primaryType; this.mixinTypes = mixinTypes; if (properties != null) { @@ -107,8 +107,9 @@ public class Template { this.childName = childName; } - Template(@Nonnull SegmentStore store, @Nonnull NodeState state) { - this.store = store; + Template(@Nonnull SegmentReader reader, @Nonnull NodeState state) { + this.reader = checkNotNull(reader); + checkNotNull(state); PropertyState primary = null; PropertyState mixins = null; List<PropertyTemplate> templates = Lists.newArrayList(); @@ -198,7 +199,7 @@ public class Template { RecordId lid = segment.readRecordId(offset); ListRecord props = new ListRecord(lid, properties.length); RecordId rid = props.getEntry(index); - return new SegmentPropertyState(store, rid, properties[index]); + return reader.readProperty(rid, properties[index]); } MapRecord getChildNodeMap(RecordId recordId) { @@ -206,7 +207,7 @@ public class Template { Segment segment = recordId.getSegment(); int offset = recordId.getOffset() + 2 * RECORD_ID_BYTES; RecordId childNodesId = segment.readRecordId(offset); - return store.getReader().readMap(childNodesId); + return reader.readMap(childNodesId); } public NodeState getChildNode(String name, RecordId recordId) { @@ -224,7 +225,7 @@ public class Template { Segment segment = recordId.getSegment(); int offset = recordId.getOffset() + 2 * RECORD_ID_BYTES; RecordId childNodeId = segment.readRecordId(offset); - return new SegmentNodeState(store, childNodeId); + return reader.readNode(childNodeId); } else { return MISSING_NODE; } @@ -241,7 +242,7 @@ public class Template { int offset = recordId.getOffset() + 2 * RECORD_ID_BYTES; RecordId childNodeId = segment.readRecordId(offset); return Collections.singletonList(new MemoryChildNodeEntry( - childName, new SegmentNodeState(store, childNodeId))); + childName, reader.readNode(childNodeId))); } } Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java?rev=1746410&r1=1746409&r2=1746410&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java Wed Jun 1 07:48:51 2016 @@ -33,10 +33,15 @@ import static java.util.Collections.empt import static java.util.Collections.singletonMap; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount; import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE; +import static org.apache.jackrabbit.oak.segment.CachingSegmentReader.DEFAULT_STRING_CACHE_MB; import static org.apache.jackrabbit.oak.segment.SegmentId.isDataSegmentId; -import static org.apache.jackrabbit.oak.segment.SegmentReaderImpl.DEFAULT_STRING_CACHE_MB; +import static org.apache.jackrabbit.oak.segment.SegmentVersion.LATEST_VERSION; +import static org.apache.jackrabbit.oak.segment.SegmentWriters.pooledSegmentWriter; +import static org.apache.jackrabbit.oak.segment.SegmentWriters.segmentWriter; +import static org.apache.jackrabbit.oak.segment.file.TarRevisions.timeout; import java.io.Closeable; import java.io.File; @@ -54,30 +59,28 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Matcher; import java.util.regex.Pattern; +import javax.annotation.CheckForNull; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; -import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.cache.CacheStats; -import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob; import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector; +import org.apache.jackrabbit.oak.segment.CachingSegmentReader; import org.apache.jackrabbit.oak.segment.RecordId; import org.apache.jackrabbit.oak.segment.Segment; import org.apache.jackrabbit.oak.segment.SegmentBufferWriter; -import org.apache.jackrabbit.oak.segment.SegmentBufferWriterPool; import org.apache.jackrabbit.oak.segment.SegmentCache; import org.apache.jackrabbit.oak.segment.SegmentGraph.SegmentGraphVisitor; import org.apache.jackrabbit.oak.segment.SegmentId; @@ -85,7 +88,7 @@ import org.apache.jackrabbit.oak.segment import org.apache.jackrabbit.oak.segment.SegmentNodeStore; import org.apache.jackrabbit.oak.segment.SegmentNotFoundException; import org.apache.jackrabbit.oak.segment.SegmentReader; -import org.apache.jackrabbit.oak.segment.SegmentReaderImpl; +import org.apache.jackrabbit.oak.segment.SegmentReaders; import org.apache.jackrabbit.oak.segment.SegmentStore; import org.apache.jackrabbit.oak.segment.SegmentTracker; import org.apache.jackrabbit.oak.segment.SegmentVersion; @@ -102,9 +105,7 @@ import org.slf4j.LoggerFactory; /** * The storage implementation for tar files. */ -public class FileStore implements SegmentStore { - - /** Logger instance */ +public class FileStore implements SegmentStore, Closeable { private static final Logger log = LoggerFactory.getLogger(FileStore.class); private static final int MB = 1024 * 1024; @@ -114,8 +115,6 @@ public class FileStore implements Segmen private static final String FILE_NAME_FORMAT = "data%05d%s.tar"; - private static final String JOURNAL_FILE_NAME = "journal.log"; - private static final String LOCK_FILE_NAME = "repo.lock"; /** @@ -149,24 +148,14 @@ public class FileStore implements Segmen private volatile File writeFile; - private volatile TarWriter writer; - - private final RandomAccessFile journalFile; + private volatile TarWriter tarWriter; private final RandomAccessFile lockFile; private final FileLock lock; - /** - * The latest head state. - */ - private final AtomicReference<RecordId> head; - - /** - * The persisted head of the root journal, used to determine whether the - * latest {@link #head} value should be written to the disk. - */ - private final AtomicReference<RecordId> persistedHead; + @Nonnull + private final TarRevisions revisions; /** * The background flush thread. Automatically flushes the TarMK state @@ -254,8 +243,6 @@ public class FileStore implements Segmen private BlobStore blobStore; // null -> store blobs inline - private NodeState root = EMPTY_NODE; - private int maxFileSize = 256; private int cacheSize; // 0 -> DEFAULT_MEMORY_CACHE_SIZE @@ -274,6 +261,8 @@ public class FileStore implements Segmen this.directory = directory; } + private TarRevisions revisions; + /** * Specify the {@link BlobStore}. * @param blobStore @@ -286,17 +275,6 @@ public class FileStore implements Segmen } /** - * Specify the initial root node state for the file store - * @param root - * @return this instance - */ - @Nonnull - public Builder withRoot(@Nonnull NodeState root) { - this.root = checkNotNull(root); - return this; - } - - /** * Maximal size of the generated tar files in MB. * @param maxFileSize * @return this instance @@ -384,7 +362,7 @@ public class FileStore implements Segmen @Nonnull public Builder withGCOptions(SegmentGCOptions gcOptions) { - this.gcOptions = gcOptions; + this.gcOptions = checkNotNull(gcOptions); return this; } @@ -407,34 +385,49 @@ public class FileStore implements Segmen */ @Nonnull public FileStore build() throws IOException { - return new FileStore(this, false); + directory.mkdir(); + revisions = new TarRevisions(false, directory); + FileStore store = new FileStore(this, false); + revisions.bind(store, store.getTracker(), initialNode(store)); + return store; } + @Nonnull public ReadOnlyStore buildReadOnly() throws IOException { - return new ReadOnlyStore(this); + checkState(directory.exists() && directory.isDirectory()); + revisions = new TarRevisions(true, directory); + ReadOnlyStore store = new ReadOnlyStore(this); + revisions.bind(store, store.getTracker(), initialNode(store)); + return store; } + @Nonnull + private static Supplier<RecordId> initialNode(final FileStore store) { + return new Supplier<RecordId>() { + @Override + public RecordId get() { + try { + SegmentWriter writer = segmentWriter(store, LATEST_VERSION, "init", 0); + NodeBuilder builder = EMPTY_NODE.builder(); + builder.setChildNode("root", EMPTY_NODE); + SegmentNodeState node = writer.writeNode(builder.getNodeState()); + writer.flush(); + return node.getRecordId(); + } catch (IOException e) { + String msg = "Failed to write initial node"; + log.error(msg, e); + throw new IllegalStateException(msg, e); + } + } + }; + } } private FileStore(Builder builder, boolean readOnly) throws IOException { this.version = builder.version; - - if (readOnly) { - checkNotNull(builder.directory); - checkState(builder.directory.exists() && builder.directory.isDirectory()); - } else { - checkNotNull(builder.directory).mkdirs(); - } - - // FIXME OAK-4102: Break cyclic dependency of FileStore and SegmentTracker - // SegmentTracker and FileStore have a cyclic dependency, which we should - // try to break. Here we pass along a not fully initialised instances of the - // FileStore to the SegmentTracker, which in turn is in later invoked to write - // the initial node state. Notably before this instance is fully initialised! - // Once consequence of this is that we cannot reliably determine the current - // GC generation while writing the initial head state. See further below. - this.tracker = new SegmentTracker(this); + this.revisions = builder.revisions; + this.blobStore = builder.blobStore; // FIXME OAK-4373 refactor cache size configurations if (builder.cacheSize < 0) { @@ -445,34 +438,24 @@ public class FileStore implements Segmen this.segmentCache = new SegmentCache(DEFAULT_STRING_CACHE_MB); } if (builder.cacheSize < 0) { - this.segmentReader = new SegmentReaderImpl(this, 0); + this.segmentReader = SegmentReaders.segmentReader(this, 0); } else if (builder.cacheSize > 0) { - this.segmentReader = new SegmentReaderImpl(this, builder.cacheSize); + this.segmentReader = SegmentReaders.segmentReader(this, builder.cacheSize); } else { - this.segmentReader = new SegmentReaderImpl(this); + this.segmentReader = SegmentReaders.segmentReader(this, DEFAULT_STRING_CACHE_MB); } - this.segmentWriter = new SegmentWriter(this, - new SegmentBufferWriterPool(this, version, "sys", new Supplier<Integer>() { + this.segmentWriter = pooledSegmentWriter(this, version, "sys", new Supplier<Integer>() { @Override public Integer get() { return getGcGeneration(); } - })); - this.blobStore = builder.blobStore; + }); this.directory = builder.directory; this.maxFileSize = builder.maxFileSize * MB; this.memoryMapping = builder.memoryMapping; this.gcMonitor = builder.gcMonitor; this.gcOptions = builder.gcOptions; - if (readOnly) { - journalFile = new RandomAccessFile(new File(directory, - JOURNAL_FILE_NAME), "r"); - } else { - journalFile = new RandomAccessFile(new File(directory, - JOURNAL_FILE_NAME), "rw"); - } - Map<Integer, Map<Character, File>> map = collectFiles(directory); this.readers = newArrayListWithCapacity(map.size()); Integer[] indices = map.keySet().toArray(new Integer[map.size()]); @@ -501,52 +484,17 @@ public class FileStore implements Segmen } this.writeFile = new File(directory, String.format( FILE_NAME_FORMAT, writeNumber, "a")); - this.writer = new TarWriter(writeFile, stats); - } - - RecordId id = null; - try (JournalReader journalReader = new JournalReader(new File(directory, JOURNAL_FILE_NAME))) { - Iterator<String> heads = journalReader.iterator(); - while (id == null && heads.hasNext()) { - String head = heads.next(); - try { - RecordId last = RecordId.fromString(tracker, head); - SegmentId segmentId = last.getSegmentId(); - if (containsSegment( - segmentId.getMostSignificantBits(), - segmentId.getLeastSignificantBits())) { - id = last; - } else { - log.warn("Unable to access revision {}, rewinding...", last); - } - } catch (IllegalArgumentException ignore) { - log.warn("Skipping invalid record id {}", head); - } - } + this.tarWriter = new TarWriter(writeFile, stats); } - journalFile.seek(journalFile.length()); - if (!readOnly) { - lockFile = new RandomAccessFile( - new File(directory, LOCK_FILE_NAME), "rw"); + lockFile = new RandomAccessFile(new File(directory, LOCK_FILE_NAME), "rw"); lock = lockFile.getChannel().lock(); } else { lockFile = null; lock = null; } - if (id != null) { - head = new AtomicReference<>(id); - persistedHead = new AtomicReference<>(id); - } else { - NodeBuilder nodeBuilder = EMPTY_NODE.builder(); - nodeBuilder.setChildNode("root", builder.root); - head = new AtomicReference<>(writeNode( - builder.root, segmentWriter, new SegmentBufferWriter(this, version, "init", 0))); - persistedHead = new AtomicReference<>(null); - } - // FIXME OAK-3468 Replace BackgroundThread with Scheduler // Externalise these background operations if (!readOnly) { @@ -601,19 +549,8 @@ public class FileStore implements Segmen log.debug("TarMK readers {}", this.readers); } - @Nonnull - private static RecordId writeNode(NodeState root, SegmentWriter writer, - SegmentBufferWriter bufferWriter) - throws IOException { - NodeBuilder nodeBuilder = EMPTY_NODE.builder(); - nodeBuilder.setChildNode("root", root); - SegmentNodeState node = writer.writeNode(nodeBuilder.getNodeState(), bufferWriter, Suppliers.ofInstance(false)); - assert node != null; - return node.getRecordId(); - } - private int getGcGeneration() { - return head.get().getSegment().getGcGeneration(); + return revisions.getHead().getSegment().getGcGeneration(); } @Nonnull @@ -621,6 +558,12 @@ public class FileStore implements Segmen return segmentCache.getCacheStats(); } + // FIXME OAK-4373 move access to the cache stats to the segment reader and avoid casting to implementation + @Nonnull + public CacheStats getStringCacheStats() { + return ((CachingSegmentReader)segmentReader).getStringCacheStats(); + } + public void maybeCompact(boolean cleanup) throws IOException { gcMonitor.info("TarMK GC #{}: started", GC_COUNT.incrementAndGet()); @@ -768,7 +711,7 @@ public class FileStore implements Segmen return dataFiles; } - public long size() { + public final long size() { fileStoreLock.readLock().lock(); try { long size = writeFile != null ? writeFile.length() : 0; @@ -799,8 +742,8 @@ public class FileStore implements Segmen fileStoreLock.readLock().lock(); try { int count = 0; - if (writer != null) { - count += writer.count(); + if (tarWriter != null) { + count += tarWriter.count(); } for (TarReader reader : readers) { count += reader.count(); @@ -818,7 +761,7 @@ public class FileStore implements Segmen * @return compaction gain estimate */ CompactionGainEstimate estimateCompactionGain(Supplier<Boolean> stop) { - CompactionGainEstimate estimate = new CompactionGainEstimate(getHead(), count(), stop); + CompactionGainEstimate estimate = new CompactionGainEstimate(segmentReader.readHeadState(), count(), stop); fileStoreLock.readLock().lock(); try { for (TarReader reader : readers) { @@ -838,52 +781,32 @@ public class FileStore implements Segmen } public void flush() throws IOException { - flush(cleanupNeeded.getAndSet(false)); - } - - public void flush(boolean cleanup) throws IOException { - synchronized (persistedHead) { - RecordId before = persistedHead.get(); - RecordId after = head.get(); - - if (cleanup || !after.equals(before)) { - segmentWriter.flush(); - + revisions.flush(new Callable<Void>() { + @Override + public Void call() throws Exception { // FIXME OAK-4291: FileStore.flush prone to races leading to corruption // There is a small windows that could lead to a corrupted store: // if we crash right after setting the persisted head but before any delay-flushed // SegmentBufferWriter instance flushes (see SegmentBufferWriterPool.returnWriter()) // then that data is lost although it might be referenced from the persisted head already. - // Need a test case. Possible fix: return a future from flush() and set the persisted head // in the completion handler. - writer.flush(); - - fileStoreLock.writeLock().lock(); - try { - log.debug("TarMK journal update {} -> {}", before, after); - journalFile.writeBytes(after.toString10() + " root " + System.currentTimeMillis()+"\n"); - journalFile.getChannel().force(false); - persistedHead.set(after); - } finally { - fileStoreLock.writeLock().unlock(); - } - - if (cleanup) { - // Explicitly give up reference to the previous root state - // otherwise they could block cleanup. See OAK-3347 - before = null; - after = null; - pendingRemove.addAll(cleanup()); - } + segmentWriter.flush(); + tarWriter.flush(); + return null; } + }); - // remove all obsolete tar generations + if (cleanupNeeded.getAndSet(false)) { + pendingRemove.addAll(cleanup()); + } + + // remove all obsolete tar generations + synchronized (pendingRemove) { Iterator<File> iterator = pendingRemove.iterator(); while (iterator.hasNext()) { File file = iterator.next(); - log.debug("TarMK GC: Attempting to remove old file {}", - file); + log.debug("TarMK GC: Attempting to remove old file {}", file); if (!file.exists() || file.delete()) { log.debug("TarMK GC: Removed old file {}", file); iterator.remove(); @@ -1103,7 +1026,7 @@ public class FileStore implements Segmen gcMonitor.info("TarMK GC #{}: compaction started, gc options={}", GC_COUNT, gcOptions); Stopwatch watch = Stopwatch.createStarted(); - SegmentNodeState before = getHead(); + SegmentNodeState before = segmentReader.readHeadState(); long existing = before.getChildNode(SegmentNodeStore.CHECKPOINTS) .getChildNodeCount(Long.MAX_VALUE); if (existing > 1) { @@ -1116,7 +1039,7 @@ public class FileStore implements Segmen final int newGeneration = getGcGeneration() + 1; SegmentBufferWriter bufferWriter = new SegmentBufferWriter( - this, version, "c", newGeneration); + this, tracker, segmentReader, version, "c", newGeneration); Supplier<Boolean> cancel = newCancelCompactionCondition(); SegmentNodeState after = compact(bufferWriter, before, cancel); if (after == null) { @@ -1130,13 +1053,14 @@ public class FileStore implements Segmen try { int cycles = 0; boolean success = false; - while (cycles++ < gcOptions.getRetryCount() && !(success = setHead(before, after))) { + while (cycles++ < gcOptions.getRetryCount() && + !(success = revisions.setHead(before.getRecordId(), after.getRecordId()))) { // Some other concurrent changes have been made. // Rebase (and compact) those changes on top of the // compacted state before retrying to set the head. gcMonitor.info("TarMK GC #{}: compaction detected concurrent commits while compacting. " + "Compacting these commits. Cycle {}", GC_COUNT, cycles); - SegmentNodeState head = getHead(); + SegmentNodeState head = segmentReader.readHeadState(); after = compact(bufferWriter, head, cancel); if (after == null) { gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT); @@ -1215,32 +1139,38 @@ public class FileStore implements Segmen return segmentWriter.writeNode(node, bufferWriter, cancel); } - private boolean forceCompact(SegmentBufferWriter bufferWriter, Supplier<Boolean> cancel) - throws InterruptedException, IOException { - if (rwLock.writeLock().tryLock(gcOptions.getLockWaitTime(), TimeUnit.SECONDS)) { - try { - SegmentNodeState head = getHead(); - SegmentNodeState after = compact(bufferWriter, head, cancel); - if (after == null) { - gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT); - return false; - } else { - return setHead(head, after); + private boolean forceCompact(@Nonnull final SegmentBufferWriter bufferWriter, + @Nonnull final Supplier<Boolean> cancel) + throws InterruptedException { + return revisions. + setHead(new Function<RecordId, RecordId>() { + @Nullable + @Override + public RecordId apply(RecordId base) { + try { + SegmentNodeState after = compact(bufferWriter, + segmentReader.readNode(base), cancel); + if (after == null) { + gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT); + return null; + } else { + return after.getRecordId(); + } + } catch (IOException e) { + gcMonitor.error("TarMK GC #{" + GC_COUNT + "}: Error during forced compaction.", e); + return null; + } } - } finally { - rwLock.writeLock().unlock(); - } - } else { - return false; - } + }, + timeout(gcOptions.getLockWaitTime(), SECONDS)); } public Iterable<SegmentId> getSegmentIds() { fileStoreLock.readLock().lock(); try { List<SegmentId> ids = newArrayList(); - if (writer != null) { - for (UUID uuid : writer.getUUIDs()) { + if (tarWriter != null) { + for (UUID uuid : tarWriter.getUUIDs()) { ids.add(tracker.getSegmentId( uuid.getMostSignificantBits(), uuid.getLeastSignificantBits())); @@ -1259,43 +1189,24 @@ public class FileStore implements Segmen } } - @Override @Nonnull public SegmentTracker getTracker() { return tracker; } - @Override @Nonnull public SegmentWriter getWriter() { return segmentWriter; } - @Override @Nonnull public SegmentReader getReader() { return segmentReader; } - @Override - public SegmentNodeState getHead() { - return new SegmentNodeState(this, head.get()); - } - - // FIXME OAK-4015: Expedite commits from the compactor - // use a lock that can expedite important commits like compaction and checkpoints. - private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); - - @Override - public boolean setHead(SegmentNodeState base, SegmentNodeState head) { - rwLock.readLock().lock(); - try { - RecordId id = this.head.get(); - return id.equals(base.getRecordId()) - && this.head.compareAndSet(id, head.getRecordId()); - } finally { - rwLock.readLock().unlock(); - } + @Nonnull + public TarRevisions getRevisions() { + return revisions; } @Override @@ -1310,12 +1221,13 @@ public class FileStore implements Segmen closeAndLogOnFail(diskSpaceThread); try { flush(); + revisions.close(); // FIXME OAK-4291: FileStore.flush prone to races leading to corruption // Replace this with a way to "close" the underlying SegmentBufferWriter(s) // tracker.getWriter().dropCache(); fileStoreLock.writeLock().lock(); try { - closeAndLogOnFail(writer); + closeAndLogOnFail(tarWriter); List<TarReader> list = readers; readers = newArrayList(); @@ -1327,7 +1239,6 @@ public class FileStore implements Segmen lock.release(); } closeAndLogOnFail(lockFile); - closeAndLogOnFail(journalFile); } finally { fileStoreLock.writeLock().unlock(); } @@ -1355,10 +1266,10 @@ public class FileStore implements Segmen } } - if (writer != null) { + if (tarWriter != null) { fileStoreLock.readLock().lock(); try { - if (writer.containsEntry(msb, lsb)) { + if (tarWriter.containsEntry(msb, lsb)) { return true; } } finally { @@ -1398,23 +1309,23 @@ public class FileStore implements Segmen ByteBuffer buffer = reader.readEntry(msb, lsb); if (buffer != null) { - return new Segment(FileStore.this, id, buffer); + return new Segment(tracker, segmentReader, id, buffer); } } catch (IOException e) { log.warn("Failed to read from tar file {}", reader, e); } } - if (writer != null) { + if (tarWriter != null) { fileStoreLock.readLock().lock(); try { try { - ByteBuffer buffer = writer.readEntry(msb, lsb); + ByteBuffer buffer = tarWriter.readEntry(msb, lsb); if (buffer != null) { - return new Segment(FileStore.this, id, buffer); + return new Segment(tracker, segmentReader, id, buffer); } } catch (IOException e) { - log.warn("Failed to read from tar file {}", writer, e); + log.warn("Failed to read from tar file {}", tarWriter, e); } } finally { fileStoreLock.readLock().unlock(); @@ -1434,7 +1345,7 @@ public class FileStore implements Segmen ByteBuffer buffer = reader.readEntry(msb, lsb); if (buffer != null) { - return new Segment(FileStore.this, id, buffer); + return new Segment(tracker, segmentReader, id, buffer); } } catch (IOException e) { log.warn("Failed to read from tar file {}", reader, e); @@ -1456,7 +1367,7 @@ public class FileStore implements Segmen fileStoreLock.writeLock().lock(); try { int generation = Segment.getGcGeneration(wrap(buffer, offset, length), id.asUUID()); - long size = writer.writeEntry( + long size = tarWriter.writeEntry( id.getMostSignificantBits(), id.getLeastSignificantBits(), buffer, offset, length, generation); @@ -1478,7 +1389,7 @@ public class FileStore implements Segmen } else { data = ByteBuffer.wrap(buffer, offset, length); } - segmentCache.putSegment(new Segment(this, id, data)); + segmentCache.putSegment(new Segment(tracker, segmentReader, id, data)); } } @@ -1488,8 +1399,8 @@ public class FileStore implements Segmen * @throws IOException */ private void newWriter() throws IOException { - if (writer.isDirty()) { - writer.close(); + if (tarWriter.isDirty()) { + tarWriter.close(); List<TarReader> list = newArrayListWithCapacity(1 + readers.size()); @@ -1501,25 +1412,21 @@ public class FileStore implements Segmen writeFile = new File( directory, String.format(FILE_NAME_FORMAT, writeNumber, "a")); - writer = new TarWriter(writeFile, stats); + tarWriter = new TarWriter(writeFile, stats); } } - @Override - public Blob readBlob(String blobId) { - if (blobStore != null) { - return new BlobStoreBlob(blobStore, blobId); - } - throw new IllegalStateException("Attempt to read external blob with blobId [" + blobId + "] " + - "without specifying BlobStore"); - } - - @Override + /** + * @return the external BlobStore (if configured) with this store, {@code null} otherwise. + */ + @CheckForNull public BlobStore getBlobStore() { return blobStore; } - @Override + /** + * Trigger a garbage collection cycle + */ public void gc() { compactionThread.trigger(); } @@ -1552,9 +1459,7 @@ public class FileStore implements Segmen private void setRevision(String rootRevision) { fileStoreLock.writeLock().lock(); try { - RecordId id = RecordId.fromString(tracker, rootRevision); - head.set(id); - persistedHead.set(id); + revisions.setHeadId(RecordId.fromString(tracker, rootRevision)); } finally { fileStoreLock.writeLock().unlock(); } @@ -1637,11 +1542,6 @@ public class FileStore implements Segmen } @Override - public boolean setHead(SegmentNodeState base, SegmentNodeState head) { - throw new UnsupportedOperationException("Read Only Store"); - } - - @Override public void writeSegment(SegmentId id, byte[] data, int offset, int length) { throw new UnsupportedOperationException("Read Only Store"); Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarRevisions.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarRevisions.java?rev=1746410&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarRevisions.java (added) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarRevisions.java Wed Jun 1 07:48:51 2016 @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.segment.file; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Throwables.propagate; +import static com.google.common.base.Throwables.propagateIfInstanceOf; +import static java.lang.Long.MAX_VALUE; +import static java.util.concurrent.TimeUnit.DAYS; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.Iterator; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import javax.annotation.CheckForNull; +import javax.annotation.Nonnull; + +import com.google.common.base.Function; +import com.google.common.base.Supplier; +import org.apache.jackrabbit.oak.segment.RecordId; +import org.apache.jackrabbit.oak.segment.Revisions; +import org.apache.jackrabbit.oak.segment.SegmentStore; +import org.apache.jackrabbit.oak.segment.SegmentTracker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TarRevisions implements Revisions, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(TarRevisions.class); + + public static final String JOURNAL_FILE_NAME = "journal.log"; + + private final boolean readOnly; + + /** + * The latest head state. + */ + @Nonnull + private final AtomicReference<RecordId> head; + + @Nonnull + private final File directory; + + @Nonnull + private final RandomAccessFile journalFile; + + /** + * The persisted head of the root journal, used to determine whether the + * latest {@link #head} value should be written to the disk. + */ + @Nonnull + private final AtomicReference<RecordId> persistedHead; + + // FIXME OAK-4015: Expedite commits from the compactor + // use a lock that can expedite important commits like compaction and checkpoints. + @Nonnull + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + + private static class TimeOutOption implements Option { + private final long time; + + @Nonnull + private final TimeUnit unit; + + TimeOutOption(long time, @Nonnull TimeUnit unit) { + this.time = time; + this.unit = unit; + } + + @Nonnull + public static TimeOutOption from(@CheckForNull Option option) { + if (option instanceof TimeOutOption) { + return (TimeOutOption) option; + } else { + throw new IllegalArgumentException("Invalid option " + option); + } + } + } + + public static final Option INFINITY = new TimeOutOption(MAX_VALUE, DAYS); + + public static Option timeout(long time, TimeUnit unit) { + return new TimeOutOption(time, unit); + } + + public TarRevisions(boolean readOnly, @Nonnull File directory) + throws IOException { + this.readOnly = readOnly; + this.directory = checkNotNull(directory); + this.journalFile = new RandomAccessFile(new File(directory, JOURNAL_FILE_NAME), + readOnly ? "r" : "rw"); + this.journalFile.seek(journalFile.length()); + this.head = new AtomicReference<>(null); + this.persistedHead = new AtomicReference<>(null); + } + + synchronized void bind(@Nonnull SegmentStore store, + @Nonnull SegmentTracker tracker, + @Nonnull Supplier<RecordId> writeInitialNode) + throws IOException { + if (head.get() == null) { + RecordId persistedId = null; + try (JournalReader journalReader = new JournalReader(new File(directory, JOURNAL_FILE_NAME))) { + Iterator<String> entries = journalReader.iterator(); + while (persistedId == null && entries.hasNext()) { + String entry = entries.next(); + try { + RecordId id = RecordId.fromString(tracker, entry); + if (store.containsSegment(id.getSegmentId())) { + persistedId = id; + } else { + LOG.warn("Unable to access revision {}, rewinding...", id); + } + } catch (IllegalArgumentException ignore) { + LOG.warn("Skipping invalid record id {}", entry); + } + } + } + + if (persistedId == null) { + head.set(writeInitialNode.get()); + } else { + persistedHead.set(persistedId); + head.set(persistedId); + } + } + } + + private void checkBound() { + checkState(head.get() != null, "Revisions not bound to a store"); + } + + private final Lock flushLock = new ReentrantLock(); + + public void flush(@Nonnull Callable<Void> persisted) throws IOException { + checkBound(); + if (flushLock.tryLock()) { + try { + RecordId before = persistedHead.get(); + RecordId after = getHead(); + if (!after.equals(before)) { + persisted.call(); + + LOG.debug("TarMK journal update {} -> {}", before, after); + journalFile.writeBytes(after.toString10() + " root " + System.currentTimeMillis() + "\n"); + journalFile.getChannel().force(false); + persistedHead.set(after); + } + } catch (Exception e) { + propagateIfInstanceOf(e, IOException.class); + propagate(e); + } finally { + flushLock.unlock(); + } + } + } + + @Nonnull + @Override + public RecordId getHead() { + checkBound(); + return head.get(); + } + + @Override + public boolean setHead( + @Nonnull RecordId base, + @Nonnull RecordId head, + @Nonnull Option... options) { + checkBound(); + rwLock.readLock().lock(); + try { + RecordId id = this.head.get(); + return id.equals(base) && this.head.compareAndSet(id, head); + } finally { + rwLock.readLock().unlock(); + } + } + + @Override + public boolean setHead( + @Nonnull Function<RecordId, RecordId> newHead, + @Nonnull Option... options) + throws InterruptedException { + checkBound(); + TimeOutOption timeout = getTimeout(options); + if (rwLock.writeLock().tryLock(timeout.time, timeout.unit)) { + try { + RecordId after = newHead.apply(getHead()); + if (after != null) { + head.set(after); + return true; + } else { + return false; + } + } finally { + rwLock.writeLock().unlock(); + } + } else { + return false; + } + } + + @Nonnull + private static TimeOutOption getTimeout(@Nonnull Option[] options) { + if (options.length == 0) { + return TimeOutOption.from(INFINITY); + } else if (options.length == 1) { + return TimeOutOption.from(options[0]); + } else { + throw new IllegalArgumentException("Expected zero or one options, got " + options.length); + } + } + + @Override + public void close() throws IOException { + journalFile.close(); + } + + void setHeadId(@Nonnull RecordId headId) { + checkState(readOnly, "Cannot set revision on a writable store"); + head.set(headId); + persistedHead.set(headId); + } +} Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/ConsistencyChecker.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/ConsistencyChecker.java?rev=1746410&r1=1746409&r2=1746410&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/ConsistencyChecker.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/ConsistencyChecker.java Wed Jun 1 07:48:51 2016 @@ -38,7 +38,7 @@ import org.apache.jackrabbit.oak.api.Blo import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.api.Type; import org.apache.jackrabbit.oak.segment.SegmentBlob; -import org.apache.jackrabbit.oak.segment.SegmentNodeStore; +import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; import org.apache.jackrabbit.oak.segment.file.FileStore; import org.apache.jackrabbit.oak.segment.file.FileStore.ReadOnlyStore; import org.apache.jackrabbit.oak.segment.file.JournalReader; @@ -145,7 +145,7 @@ public class ConsistencyChecker { private String checkPath(String path, long binLen) { try { print("Checking {}", path); - NodeState root = SegmentNodeStore.builder(store).build().getRoot(); + NodeState root = SegmentNodeStoreBuilders.builder(store).build().getRoot(); String parentPath = getParentPath(path); String name = getName(path); NodeState parent = getNode(root, parentPath); @@ -173,7 +173,8 @@ public class ConsistencyChecker { store.setRevision(revision); nodeCount = 0; propertyCount = 0; - String result = traverse(SegmentNodeStore.builder(store).build().getRoot(), "/", true, binLen); + String result = traverse(SegmentNodeStoreBuilders.builder(store).build() + .getRoot(), "/", true, binLen); print("Traversed {} nodes and {} properties", nodeCount, propertyCount); return result; } catch (RuntimeException e) { Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/RevisionHistory.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/RevisionHistory.java?rev=1746410&r1=1746409&r2=1746410&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/RevisionHistory.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/RevisionHistory.java Wed Jun 1 07:48:51 2016 @@ -81,7 +81,7 @@ public class RevisionHistory { @Nullable @Override public HistoryElement apply(String revision) { store.setRevision(revision); - NodeState node = getNode(store.getHead(), path); + NodeState node = getNode(store.getReader().readHeadState(), path); return new HistoryElement(revision, node); } }); Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStore.java?rev=1746410&r1=1746409&r2=1746410&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStore.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStore.java Wed Jun 1 07:48:51 2016 @@ -18,13 +18,12 @@ */ package org.apache.jackrabbit.oak.segment.http; -import static com.google.common.base.Charsets.UTF_8; +import static org.apache.jackrabbit.oak.segment.CachingSegmentReader.DEFAULT_STRING_CACHE_MB; import static org.apache.jackrabbit.oak.segment.SegmentVersion.LATEST_VERSION; +import static org.apache.jackrabbit.oak.segment.SegmentWriters.pooledSegmentWriter; -import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; import java.io.OutputStream; import java.net.MalformedURLException; import java.net.URL; @@ -34,16 +33,14 @@ import java.nio.ByteBuffer; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; +import com.google.common.base.Suppliers; import com.google.common.io.ByteStreams; -import org.apache.jackrabbit.oak.api.Blob; -import org.apache.jackrabbit.oak.segment.RecordId; +import org.apache.jackrabbit.oak.segment.Revisions; import org.apache.jackrabbit.oak.segment.Segment; -import org.apache.jackrabbit.oak.segment.SegmentBufferWriterPool; import org.apache.jackrabbit.oak.segment.SegmentId; -import org.apache.jackrabbit.oak.segment.SegmentNodeState; import org.apache.jackrabbit.oak.segment.SegmentNotFoundException; import org.apache.jackrabbit.oak.segment.SegmentReader; -import org.apache.jackrabbit.oak.segment.SegmentReaderImpl; +import org.apache.jackrabbit.oak.segment.SegmentReaders; import org.apache.jackrabbit.oak.segment.SegmentStore; import org.apache.jackrabbit.oak.segment.SegmentTracker; import org.apache.jackrabbit.oak.segment.SegmentWriter; @@ -55,11 +52,14 @@ public class HttpStore implements Segmen private final SegmentTracker tracker = new SegmentTracker(this); @Nonnull - private final SegmentWriter segmentWriter = new SegmentWriter(this, - new SegmentBufferWriterPool(this, LATEST_VERSION, "sys")); + private final HttpStoreRevisions revisions = new HttpStoreRevisions(this); @Nonnull - private final SegmentReader segmentReader = new SegmentReaderImpl(this); + private final SegmentReader segmentReader = SegmentReaders.segmentReader(this, DEFAULT_STRING_CACHE_MB); + + @Nonnull + private final SegmentWriter segmentWriter = pooledSegmentWriter(this, + LATEST_VERSION, "sys", Suppliers.ofInstance(0)); private final URL base; @@ -72,30 +72,32 @@ public class HttpStore implements Segmen this.base = base; } - @Override @Nonnull public SegmentTracker getTracker() { return tracker; } - @Override @Nonnull public SegmentWriter getWriter() { return segmentWriter; } - @Override @Nonnull public SegmentReader getReader() { return segmentReader; } + @Nonnull + public Revisions getRevisions() { + return revisions; + } + /** * Builds a simple URLConnection. This method can be extended to add * authorization headers if needed. * */ - protected URLConnection get(String fragment) throws MalformedURLException, + URLConnection get(String fragment) throws MalformedURLException, IOException { final URL url; if (fragment == null) { @@ -107,33 +109,6 @@ public class HttpStore implements Segmen } @Override - public SegmentNodeState getHead() { - try { - URLConnection connection = get(null); - InputStream stream = connection.getInputStream(); - try { - BufferedReader reader = new BufferedReader( - new InputStreamReader(stream, UTF_8)); - return new SegmentNodeState(this, RecordId.fromString(tracker, reader.readLine())); - } finally { - stream.close(); - } - } catch (IllegalArgumentException e) { - throw new IllegalStateException(e); - } catch (MalformedURLException e) { - throw new IllegalStateException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public boolean setHead(SegmentNodeState base, SegmentNodeState head) { - // TODO throw new UnsupportedOperationException(); - return true; - } - - @Override // FIXME OAK-4396: HttpStore.containsSegment throws SNFE instead of returning false for non existing segments public boolean containsSegment(SegmentId id) { return id.sameStore(this) || readSegment(id) != null; @@ -147,7 +122,7 @@ public class HttpStore implements Segmen InputStream stream = connection.getInputStream(); try { byte[] data = ByteStreams.toByteArray(stream); - return new Segment(this, id, ByteBuffer.wrap(data)); + return new Segment(tracker, segmentReader, id, ByteBuffer.wrap(data)); } finally { stream.close(); } @@ -176,23 +151,12 @@ public class HttpStore implements Segmen } } - @Override - public void close() { - } - - @Override @CheckForNull - public Blob readBlob(String reference) { - return null; - } - - @Override @CheckForNull + /** + * @return {@code null} + */ + @CheckForNull public BlobStore getBlobStore() { return null; } - @Override - public void gc() { - // TODO: distributed gc - } - } Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStoreRevisions.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStoreRevisions.java?rev=1746410&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStoreRevisions.java (added) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStoreRevisions.java Wed Jun 1 07:48:51 2016 @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.segment.http; + +import static com.google.common.base.Charsets.UTF_8; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.MalformedURLException; +import java.net.URLConnection; + +import javax.annotation.Nonnull; + +import com.google.common.base.Function; +import org.apache.jackrabbit.oak.segment.RecordId; +import org.apache.jackrabbit.oak.segment.Revisions; + +public class HttpStoreRevisions implements Revisions { + + @Nonnull + private final HttpStore store; + + public HttpStoreRevisions(@Nonnull HttpStore store) { + this.store = store; + } + + @Nonnull + @Override + public RecordId getHead() { + try { + URLConnection connection = store.get(null); + try ( + InputStream stream = connection.getInputStream(); + BufferedReader reader = new BufferedReader(new InputStreamReader(stream, UTF_8)) + ) { + return RecordId.fromString(store.getTracker(), reader.readLine()); + } + } catch (IllegalArgumentException | MalformedURLException e) { + throw new IllegalStateException(e); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean setHead( + @Nonnull RecordId base, @Nonnull RecordId head, + @Nonnull Option... options) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean setHead( + @Nonnull Function<RecordId, RecordId> newHead, + @Nonnull Option... options) throws InterruptedException { + throw new UnsupportedOperationException(); + } +} Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java?rev=1746410&r1=1746409&r2=1746410&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java Wed Jun 1 07:48:51 2016 @@ -18,30 +18,28 @@ */ package org.apache.jackrabbit.oak.segment.memory; -import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE; import static org.apache.jackrabbit.oak.segment.SegmentVersion.LATEST_VERSION; +import static org.apache.jackrabbit.oak.segment.SegmentWriters.pooledSegmentWriter; import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentMap; +import javax.annotation.CheckForNull; import javax.annotation.Nonnull; +import com.google.common.base.Suppliers; import com.google.common.collect.Maps; -import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.segment.Revisions; import org.apache.jackrabbit.oak.segment.Segment; -import org.apache.jackrabbit.oak.segment.SegmentBufferWriterPool; import org.apache.jackrabbit.oak.segment.SegmentId; -import org.apache.jackrabbit.oak.segment.SegmentNodeState; import org.apache.jackrabbit.oak.segment.SegmentNotFoundException; import org.apache.jackrabbit.oak.segment.SegmentReader; -import org.apache.jackrabbit.oak.segment.SegmentReaderImpl; +import org.apache.jackrabbit.oak.segment.SegmentReaders; import org.apache.jackrabbit.oak.segment.SegmentStore; import org.apache.jackrabbit.oak.segment.SegmentTracker; import org.apache.jackrabbit.oak.segment.SegmentWriter; import org.apache.jackrabbit.oak.spi.blob.BlobStore; -import org.apache.jackrabbit.oak.spi.state.NodeBuilder; -import org.apache.jackrabbit.oak.spi.state.NodeState; /** * A store used for in-memory operations. @@ -52,60 +50,44 @@ public class MemoryStore implements Segm private final SegmentTracker tracker = new SegmentTracker(this); @Nonnull - private final SegmentWriter segmentWriter = new SegmentWriter(this, - new SegmentBufferWriterPool(this, LATEST_VERSION, "sys")); + private final MemoryStoreRevisions revisions; @Nonnull - private final SegmentReader segmentReader = new SegmentReaderImpl(this, 16); + private final SegmentReader segmentReader; - private SegmentNodeState head; + @Nonnull + private final SegmentWriter segmentWriter; private final ConcurrentMap<SegmentId, Segment> segments = Maps.newConcurrentMap(); - public MemoryStore(NodeState root) throws IOException { - NodeBuilder builder = EMPTY_NODE.builder(); - builder.setChildNode("root", root); - - this.head = segmentWriter.writeNode(builder.getNodeState()); - segmentWriter.flush(); - } - public MemoryStore() throws IOException { - this(EMPTY_NODE); + this.revisions = new MemoryStoreRevisions(); + this.segmentReader = SegmentReaders.segmentReader(this, 16); + this.segmentWriter = pooledSegmentWriter(this, + LATEST_VERSION, "sys", Suppliers.ofInstance(0)); + revisions.bind(this); + segmentWriter.flush(); } - @Override @Nonnull public SegmentTracker getTracker() { return tracker; } - @Override @Nonnull public SegmentWriter getWriter() { return segmentWriter; } - @Override @Nonnull public SegmentReader getReader() { return segmentReader; } - @Override - public synchronized SegmentNodeState getHead() { - return head; - } - - @Override - public synchronized boolean setHead(SegmentNodeState base, SegmentNodeState head) { - if (this.head.getRecordId().equals(base.getRecordId())) { - this.head = head; - return true; - } else { - return false; - } + @Nonnull + public Revisions getRevisions() { + return revisions; } @Override @@ -128,27 +110,20 @@ public class MemoryStore implements Segm ByteBuffer buffer = ByteBuffer.allocate(length); buffer.put(data, offset, length); buffer.rewind(); - Segment segment = new Segment(this, id, buffer); + Segment segment = new Segment(tracker, segmentReader, id, buffer); if (segments.putIfAbsent(id, segment) != null) { throw new IOException("Segment override: " + id); } } - @Override - public void close() { - } - - @Override - public Blob readBlob(String reference) { - return null; - } - - @Override + /** + * @return {@code null} + */ + @CheckForNull public BlobStore getBlobStore() { return null; } - @Override public void gc() { System.gc(); segments.keySet().retainAll(tracker.getReferencedSegmentIds()); Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStoreRevisions.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStoreRevisions.java?rev=1746410&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStoreRevisions.java (added) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStoreRevisions.java Wed Jun 1 07:48:51 2016 @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.segment.memory; + +import static com.google.common.base.Preconditions.checkState; +import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE; + +import java.io.IOException; + +import javax.annotation.Nonnull; + +import com.google.common.base.Function; +import org.apache.jackrabbit.oak.segment.RecordId; +import org.apache.jackrabbit.oak.segment.Revisions; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; + +public class MemoryStoreRevisions implements Revisions { + private RecordId head; + + public void bind(MemoryStore store) throws IOException { + if (head == null) { + NodeBuilder builder = EMPTY_NODE.builder(); + builder.setChildNode("root", EMPTY_NODE); + head = store.getWriter().writeNode(builder.getNodeState()).getRecordId(); + store.getWriter().flush(); + } + } + + private void checkBound() { + checkState(head != null, "Revisions not bound to a store"); + } + + @Nonnull + @Override + public synchronized RecordId getHead() { + checkBound(); + return head; + } + + @Override + public synchronized boolean setHead( + @Nonnull RecordId base, @Nonnull RecordId head, + @Nonnull Option... options) { + checkBound(); + if (this.head.equals(base)) { + this.head = head; + return true; + } else { + return false; + } + } + + @Override + public boolean setHead( + @Nonnull Function<RecordId, RecordId> newHead, + @Nonnull Option... options) throws InterruptedException { + throw new UnsupportedOperationException(); + } +}
