Author: chetanm
Date: Mon Mar 24 06:19:23 2014
New Revision: 1580769
URL: http://svn.apache.org/r1580769
Log:
OAK-1586 - Implement checkpoint support in DocumentNodeStore
Initial implementation which stores the checkpoint data as part of NODES
collection
-- Using Clock for determining current time to simplify testing
-- Custom Clock can be specified via Builder
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Checkpoints.java
(with props)
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CheckpointsTest.java
(with props)
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Checkpoints.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Checkpoints.java?rev=1580769&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Checkpoints.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Checkpoints.java
Mon Mar 24 06:19:23 2014
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.Map;
+import java.util.SortedMap;
+
+import javax.annotation.CheckForNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
+
+/**
+ * Checkpoints provide details around which revision are to be kept. Currently
these
+ * are stored in NODES collection itself.
+ */
+class Checkpoints {
+ /**
+ * Id of checkpoint document. It differs from normal convention of ID used
for NodeDocument
+ * which back JCR Nodes as it is internal to DocumentNodeStore
+ */
+ private static final String ID = "/checkpoint";
+
+ /**
+ * Property name to store all checkpoint data. The data is stored as
Revision => expiryTime
+ */
+ private static final String PROP_CHECKPOINT = "checkpoint";
+
+ private final DocumentNodeStore nodeStore;
+
+ private final DocumentStore store;
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ Checkpoints(DocumentNodeStore store) {
+ this.nodeStore = store;
+ this.store = store.getDocumentStore();
+ createIfNotExist();
+ }
+
+ public Revision create(long lifetimeInMillis) {
+ Revision r = nodeStore.getHeadRevision();
+ UpdateOp op = new UpdateOp(ID, false);
+ long endTime = nodeStore.getClock().getTime() + lifetimeInMillis;
+ op.setMapEntry(PROP_CHECKPOINT, r, Long.toString(endTime));
+ store.createOrUpdate(NODES, op);
+ return r;
+ }
+
+
+ /**
+ * Returns the oldest valid checkpoint registered.
+ *
+ * @return oldest valid checkpoint registered. Might return null if no
valid
+ * checkpoint found
+ */
+ @CheckForNull
+ public Revision getOldestRevisionToKeep() {
+ //Get uncached doc
+ NodeDocument cdoc = store.find(NODES, ID, 0);
+ SortedMap<Revision, String> checkpoints =
cdoc.getLocalMap(PROP_CHECKPOINT);
+
+ final long currentTime = nodeStore.getClock().getTime();
+ UpdateOp op = new UpdateOp(ID, false);
+ Revision lastAliveRevision = null;
+ long oldestExpiryTime = 0;
+
+ for (Map.Entry<Revision, String> e : checkpoints.entrySet()) {
+ final long expiryTime = Long.parseLong(e.getValue());
+ if (currentTime > expiryTime) {
+ op.removeMapEntry(PROP_CHECKPOINT, e.getKey());
+ } else if (expiryTime > oldestExpiryTime) {
+ oldestExpiryTime = expiryTime;
+ lastAliveRevision = e.getKey();
+ }
+ }
+
+ if (op.hasChanges()) {
+ store.findAndUpdate(NODES, op);
+ log.info("Purged {} expired checkpoints", op.getChanges().size());
+ }
+
+ return lastAliveRevision;
+ }
+
+ private void createIfNotExist() {
+ if (store.find(NODES, ID) == null) {
+ UpdateOp updateOp = new UpdateOp(ID, true);
+ store.createOrUpdate(NODES, updateOp);
+ }
+ }
+}
Propchange:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Checkpoints.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java?rev=1580769&r1=1580768&r2=1580769&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
Mon Mar 24 06:19:23 2014
@@ -45,6 +45,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.rdb.RDBBlobStore;
import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore;
+import org.apache.jackrabbit.oak.stats.Clock;
/**
* A MicroKernel implementation that stores the data in a {@link
DocumentStore}.
@@ -460,6 +461,7 @@ public class DocumentMK implements Micro
private boolean useSimpleRevision;
private long splitDocumentAgeMillis = 5 * 60 * 1000;
private long offHeapCacheSize = -1;
+ private Clock clock = Clock.SIMPLE;
public Builder() {
memoryCacheSize(DEFAULT_MEMORY_CACHE_SIZE);
@@ -717,6 +719,15 @@ public class DocumentMK implements Micro
return this;
}
+ public Builder clock(Clock clock){
+ this.clock = clock;
+ return this;
+ }
+
+ public Clock getClock(){
+ return clock;
+ }
+
/**
* Open the DocumentMK instance using the configured options.
*
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1580769&r1=1580768&r2=1580769&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
Mon Mar 24 06:19:23 2014
@@ -80,6 +80,7 @@ import org.apache.jackrabbit.oak.spi.com
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.stats.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -277,6 +278,10 @@ public final class DocumentNodeStore
}
};
+ private final Clock clock;
+
+ private final Checkpoints checkpoints;
+
public DocumentNodeStore(DocumentMK.Builder builder) {
this.blobStore = builder.getBlobStore();
if (builder.isUseSimpleRevision()) {
@@ -290,6 +295,7 @@ public final class DocumentNodeStore
s = new LoggingDocumentStoreWrapper(s);
}
this.store = s;
+ this.clock = builder.getClock();
int cid = builder.getClusterId();
cid = Integer.getInteger("oak.documentMK.clusterId", cid);
if (cid == 0) {
@@ -327,6 +333,7 @@ public final class DocumentNodeStore
builder.getWeigher(), builder.getDocChildrenCacheSize());
diffCache = builder.getDiffCache();
+ checkpoints = new Checkpoints(this);
// check if root node exists
if (store.find(Collection.NODES, Utils.getIdFromPath("/")) == null) {
@@ -352,6 +359,7 @@ public final class DocumentNodeStore
}
}
getRevisionComparator().add(headRevision, Revision.newRevision(0));
+
dispatcher = new ChangeDispatcher(getRoot());
commitQueue = new CommitQueue(this, dispatcher);
batchCommitQueue = new BatchCommitQueue(store, revisionComparator);
@@ -1224,9 +1232,7 @@ public final class DocumentNodeStore
@Nonnull
@Override
public String checkpoint(long lifetime) {
- // FIXME: need to signal to the garbage collector that this revision
- // should not be collected until the requested lifetime is over
- return getHeadRevision().toString();
+ return checkpoints.create(lifetime).toString();
}
@CheckForNull
@@ -1370,7 +1376,7 @@ public final class DocumentNodeStore
NodeDocument after = store.find(Collection.NODES,
op.getId());
if (after != null) {
LOG.info("Split operation on {}. Size before: {},
after: {}",
- new Object[]{id, before.getMemory(),
after.getMemory()});
+ id, before.getMemory(), after.getMemory());
}
}
}
@@ -1631,4 +1637,12 @@ public final class DocumentNodeStore
public DiffCache getDiffCache() {
return diffCache;
}
+
+ public Clock getClock() {
+ return clock;
+ }
+
+ public Checkpoints getCheckpoints() {
+ return checkpoints;
+ }
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java?rev=1580769&r1=1580768&r2=1580769&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java
Mon Mar 24 06:19:23 2014
@@ -113,6 +113,16 @@ public final class UpdateOp {
}
/**
+ * Checks if the UpdateOp has any change operation is registered with
+ * current update operation
+ *
+ * @return true if any change operation is created
+ */
+ public boolean hasChanges(){
+ return !changes.isEmpty();
+ }
+
+ /**
* Add a new or update an existing map entry.
* The property is a map of revisions / values.
*
Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CheckpointsTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CheckpointsTest.java?rev=1580769&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CheckpointsTest.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CheckpointsTest.java
Mon Mar 24 06:19:23 2014
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+
+public class CheckpointsTest {
+
+ private final SimulatedClock clock = new SimulatedClock();
+
+ private final DocumentNodeStore store = new
DocumentMK.Builder().clock(clock).getNodeStore();
+
+ @Test
+ public void testCheckpointPurge() throws Exception {
+ long expiryTime = 1000;
+ Revision r1 = Revision.fromString(store.checkpoint(expiryTime));
+ assertEquals(r1, store.getCheckpoints().getOldestRevisionToKeep());
+
+ //Trigger expiry by forwarding the clock to future
+ clock.forwardPast(expiryTime);
+ assertNull(store.getCheckpoints().getOldestRevisionToKeep());
+ }
+
+ @Test
+ public void testGetOldestRevisionToKeep() throws Exception {
+ long et1 = 1000, et2 = et1 + 1000;
+
+ Revision r1 = Revision.fromString(store.checkpoint(et1));
+
+ //Do some commit to change headRevision
+ NodeBuilder b2 = store.getRoot().builder();
+ b2.child("x");
+ store.merge(b2, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ Revision r2 = Revision.fromString(store.checkpoint(et2));
+ assertNotSame(r1, r2);
+
+ //r2 has the later expiry
+ assertEquals(r2, store.getCheckpoints().getOldestRevisionToKeep());
+
+ //Trigger expiry by forwarding the clock to future e1
+ clock.forwardPast(et1);
+ assertEquals(r2, store.getCheckpoints().getOldestRevisionToKeep());
+
+ //Trigger expiry by forwarding the clock to future e2
+ //This time no valid checkpoint
+ clock.forwardPast(et2);
+ assertNull(store.getCheckpoints().getOldestRevisionToKeep());
+ }
+
+ private static class SimulatedClock extends Clock {
+ private final AtomicLong time = new AtomicLong();
+
+ @Override
+ public long getTime() {
+ return time.incrementAndGet();
+ }
+
+ /**
+ * Fast forwards the clock to some future time past the given time
+ */
+ public void forwardPast(long futureTime) {
+ time.set(futureTime + 100);
+ }
+ }
+}
Propchange:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CheckpointsTest.java
------------------------------------------------------------------------------
svn:eol-style = native