Author: mreutegg
Date: Mon Oct 26 14:25:54 2015
New Revision: 1710606
URL: http://svn.apache.org/viewvc?rev=1710606&view=rev
Log:
OAK-3549: Initial read of _lastRev creates incorrect RevisionComparator
Implement fix and enable test
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeRestartTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java?rev=1710606&r1=1710605&r2=1710606&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
Mon Oct 26 14:25:54 2015
@@ -127,7 +127,7 @@ public class DocumentMK {
}
void backgroundRead() {
- nodeStore.backgroundRead(true);
+ nodeStore.backgroundRead();
}
void backgroundWrite() {
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1710606&r1=1710605&r2=1710606&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
Mon Oct 26 14:25:54 2015
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.plugin
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.toArray;
import static com.google.common.collect.Iterables.transform;
@@ -487,27 +488,29 @@ public final class DocumentNodeStore
setHeadRevision(commit.getRevision());
// make sure _lastRev is written back to store
backgroundWrite();
- } else {
- // initialize branchCommits
- branches.init(store, this);
- // initial reading of the revisions of other cluster nodes
- backgroundRead(false);
- if (headRevision == null) {
- // no revision read from other cluster nodes
- setHeadRevision(newRevision());
+ rootDoc = store.find(NODES, Utils.getIdFromPath("/"));
+ // at this point the root document must exist
+ if (rootDoc == null) {
+ throw new IllegalStateException("Root document does not
exist");
}
+ } else {
+ checkLastRevRecovery();
+ initializeHeadRevision(rootDoc);
// check if _lastRev for our clusterId exists
if (!rootDoc.getLastRev().containsKey(clusterId)) {
unsavedLastRevisions.put("/", headRevision);
backgroundWrite();
}
}
- checkLastRevRecovery();
+
// Renew the lease because it may have been stale
renewClusterIdLease();
getRevisionComparator().add(headRevision, Revision.newRevision(0));
+ // initialize branchCommits
+ branches.init(store, this);
+
dispatcher = new ChangeDispatcher(getRoot());
commitQueue = new CommitQueue(this);
String threadNamePostfix = "(" + clusterId + ")";
@@ -1693,7 +1696,7 @@ public final class DocumentNodeStore
synchronized (backgroundReadMonitor) {
long start = clock.getTime();
// pull in changes from other cluster nodes
- BackgroundReadStats readStats = backgroundRead(true);
+ BackgroundReadStats readStats = backgroundRead();
long readTime = clock.getTime() - start;
String msg = "Background read operations stats (read:{} {})";
if (clock.getTime() - start > TimeUnit.SECONDS.toMillis(10)) {
@@ -1762,11 +1765,8 @@ public final class DocumentNodeStore
/**
* Perform a background read and make external changes visible.
- *
- * @param dispatchChange whether to dispatch external changes
- * to {@link #dispatcher}.
*/
- BackgroundReadStats backgroundRead(boolean dispatchChange) {
+ BackgroundReadStats backgroundRead() {
BackgroundReadStats stats = new BackgroundReadStats();
long time = clock.getTime();
String id = Utils.getIdFromPath("/");
@@ -1774,30 +1774,7 @@ public final class DocumentNodeStore
if (doc == null) {
return stats;
}
- Map<Integer, Revision> lastRevMap = doc.getLastRev();
- try {
- long externalTime =
Utils.getMaxExternalTimestamp(lastRevMap.values(), clusterId);
- long localTime = clock.getTime();
- if (localTime < externalTime) {
- LOG.warn("Detected clock differences. Local time is '{}', " +
- "while most recent external time is '{}'. " +
- "Current _lastRev entries: {}",
- new Date(localTime), new Date(externalTime),
lastRevMap.values());
- double delay = ((double) externalTime - localTime) / 1000d;
- String msg = String.format("Background read will be delayed by
%.1f seconds. " +
- "Please check system time on cluster nodes.", delay);
- LOG.warn(msg);
- clock.waitUntil(externalTime + 1);
- } else if (localTime == externalTime) {
- // make sure local time is past external time
- // but only log at debug
- LOG.debug("Local and external time are equal. Waiting until
local" +
- "time is more recent than external reported time.");
- clock.waitUntil(externalTime + 1);
- }
- } catch (InterruptedException e) {
- throw new RuntimeException("Background read interrupted", e);
- }
+ alignWithExternalRevisions(doc);
Revision.RevisionComparator revisionComparator =
getRevisionComparator();
// the (old) head occurred first
@@ -1807,6 +1784,7 @@ public final class DocumentNodeStore
StringSort externalSort = JournalEntry.newSorter();
+ Map<Integer, Revision> lastRevMap = doc.getLastRev();
try {
Map<Revision, Revision> externalChanges = Maps.newHashMap();
for (Map.Entry<Integer, Revision> e : lastRevMap.entrySet()) {
@@ -1899,23 +1877,21 @@ public final class DocumentNodeStore
Revision oldHead = headRevision;
// the new head revision is after other revisions
setHeadRevision(newRevision());
- if (dispatchChange) {
- commitQueue.headRevisionChanged();
- time = clock.getTime();
- if (externalSort != null) {
- // then there were external changes and reading
them
- // was successful -> apply them to the diff cache
- try {
- JournalEntry.applyTo(externalSort, diffCache,
oldHead, headRevision);
- } catch (Exception e1) {
- LOG.error("backgroundRead: Exception while
processing external changes from journal: {}", e1, e1);
- }
+ commitQueue.headRevisionChanged();
+ time = clock.getTime();
+ if (externalSort != null) {
+ // then there were external changes and reading them
+ // was successful -> apply them to the diff cache
+ try {
+ JournalEntry.applyTo(externalSort, diffCache,
oldHead, headRevision);
+ } catch (Exception e1) {
+ LOG.error("backgroundRead: Exception while
processing external changes from journal: {}", e1, e1);
}
- stats.populateDiffCache = clock.getTime() - time;
- time = clock.getTime();
-
-
dispatcher.contentChanged(getRoot().fromExternalChange(), null);
}
+ stats.populateDiffCache = clock.getTime() - time;
+ time = clock.getTime();
+
+ dispatcher.contentChanged(getRoot().fromExternalChange(),
null);
} finally {
backgroundOperationLock.writeLock().unlock();
}
@@ -2054,6 +2030,68 @@ public final class DocumentNodeStore
//-----------------------------< internal
>---------------------------------
+ /**
+ * Performs an initial read of the _lastRevs on the root document,
+ * initializes the {@link #revisionComparator} and sets the head revision.
+ *
+ * @param rootDoc the current root document.
+ */
+ private void initializeHeadRevision(NodeDocument rootDoc) {
+ checkState(headRevision == null);
+
+ alignWithExternalRevisions(rootDoc);
+ Map<Integer, Revision> lastRevMap = rootDoc.getLastRev();
+ Revision seenAt = Revision.newRevision(0);
+ long purgeMillis = revisionPurgeMillis();
+ for (Map.Entry<Integer, Revision> entry : lastRevMap.entrySet()) {
+ Revision r = entry.getValue();
+ if (r.getTimestamp() > purgeMillis) {
+ revisionComparator.add(r, seenAt);
+ }
+ if (entry.getKey() == clusterId) {
+ continue;
+ }
+ lastKnownRevision.put(entry.getKey(), entry.getValue());
+ }
+ revisionComparator.purge(purgeMillis);
+ setHeadRevision(newRevision());
+ }
+
+ /**
+ * Makes sure the current time is after the most recent external revision
+ * timestamp in the _lastRev map of the given root document. If necessary
+ * the current thread waits until {@link #clock} is after the external
+ * revision timestamp.
+ *
+ * @param rootDoc the root document.
+ */
+ private void alignWithExternalRevisions(@Nonnull NodeDocument rootDoc) {
+ Map<Integer, Revision> lastRevMap = checkNotNull(rootDoc).getLastRev();
+ try {
+ long externalTime =
Utils.getMaxExternalTimestamp(lastRevMap.values(), clusterId);
+ long localTime = clock.getTime();
+ if (localTime < externalTime) {
+ LOG.warn("Detected clock differences. Local time is '{}', " +
+ "while most recent external time is '{}'. " +
+ "Current _lastRev entries: {}",
+ new Date(localTime), new Date(externalTime),
lastRevMap.values());
+ double delay = ((double) externalTime - localTime) / 1000d;
+ String msg = String.format("Background read will be delayed by
%.1f seconds. " +
+ "Please check system time on cluster nodes.", delay);
+ LOG.warn(msg);
+ clock.waitUntil(externalTime + 1);
+ } else if (localTime == externalTime) {
+ // make sure local time is past external time
+ // but only log at debug
+ LOG.debug("Local and external time are equal. Waiting until
local" +
+ "time is more recent than external reported time.");
+ clock.waitUntil(externalTime + 1);
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Background read interrupted", e);
+ }
+ }
+
@Nonnull
private Commit newTrunkCommit(@Nonnull Revision base) {
checkArgument(!checkNotNull(base).isBranch(),
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeRestartTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeRestartTest.java?rev=1710606&r1=1710605&r2=1710606&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeRestartTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeRestartTest.java
Mon Oct 26 14:25:54 2015
@@ -22,7 +22,6 @@ import org.apache.jackrabbit.oak.spi.com
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
@@ -34,7 +33,6 @@ public class ClusterNodeRestartTest {
public DocumentMKBuilderProvider builderProvider = new
DocumentMKBuilderProvider();
// OAK-3549
- @Ignore
@Test
public void restart() throws Exception {
MemoryDocumentStore docStore = new MemoryDocumentStore();