Author: mreutegg
Date: Wed Mar 5 08:03:55 2014
New Revision: 1574391
URL: http://svn.apache.org/r1574391
Log:
OAK-1429: Slow event listeners do not scale as expected
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java?rev=1574391&r1=1574390&r2=1574391&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
Wed Mar 5 08:03:55 2014
@@ -103,8 +103,8 @@ public class NodeDocument extends Docume
*/
public static final String MODIFIED = "_modified";
- private static final SortedMap<Revision, Range> EMPTY_RANGE_MAP =
- Collections.unmodifiableSortedMap(new TreeMap<Revision, Range>());
+ private static final NavigableMap<Revision, Range> EMPTY_RANGE_MAP =
+ Maps.unmodifiableNavigableMap(new TreeMap<Revision, Range>());
/**
* The list of revision to root commit depth mappings to find out if a
@@ -170,7 +170,7 @@ public class NodeDocument extends Docume
/**
* Parsed and sorted set of previous revisions.
*/
- private SortedMap<Revision, Range> previous;
+ private NavigableMap<Revision, Range> previous;
/**
* Time at which this object was check for cache consistency
@@ -358,9 +358,17 @@ public class NodeDocument extends Docume
*/
@CheckForNull
public String getCommitRootPath(Revision revision) {
- Map<Revision, String> valueMap = getCommitRoot();
- String depth = valueMap.get(revision);
+ // check local map first
+ Map<Revision, String> local = getLocalCommitRoot();
+ String depth = local.get(revision);
+ if (depth == null) {
+ // check full map
+ depth = getCommitRoot().get(revision);
+ }
if (depth != null) {
+ if (depth.equals("0")) {
+ return "/";
+ }
String p = Utils.getPathFromId(getId());
return PathUtils.getAncestorPath(p,
PathUtils.getDepth(p) - Integer.parseInt(depth));
@@ -392,7 +400,7 @@ public class NodeDocument extends Docume
Arrays.asList(revisions.keySet(), commitRoots.keySet()),
revisions.comparator())) {
if (!r.equals(changeRev)) {
- if (isValidRevision(context, r, changeRev, new
HashSet<Revision>())) {
+ if (isValidRevision(context, r, null, changeRev, new
HashSet<Revision>())) {
newestRev = r;
// found newest revision, no need to check more revisions
// revisions are sorted newest first
@@ -426,6 +434,8 @@ public class NodeDocument extends Docume
* history of <code>readRevision</code>.
*
* @param rev revision to check.
+ * @param commitValue the commit value of the revision to check or
+ * <code>null</code> if unknown.
* @param readRevision the read revision of the client.
* @param validRevisions set of revisions already checked against
* <code>readRevision</code> and considered valid.
@@ -434,6 +444,7 @@ public class NodeDocument extends Docume
*/
boolean isValidRevision(@Nonnull RevisionContext context,
@Nonnull Revision rev,
+ @Nullable String commitValue,
@Nonnull Revision readRevision,
@Nonnull Set<Revision> validRevisions) {
if (validRevisions.contains(rev)) {
@@ -443,7 +454,7 @@ public class NodeDocument extends Docume
if (doc == null) {
return false;
}
- if (doc.isCommitted(context, rev, readRevision)) {
+ if (doc.isCommitted(context, rev, commitValue, readRevision)) {
validRevisions.add(rev);
return true;
}
@@ -727,20 +738,20 @@ public class NodeDocument extends Docume
* @return the previous ranges for this document.
*/
@Nonnull
- SortedMap<Revision, Range> getPreviousRanges() {
+ NavigableMap<Revision, Range> getPreviousRanges() {
if (previous == null) {
Map<Revision, String> map = getLocalMap(PREVIOUS);
if (map.isEmpty()) {
previous = EMPTY_RANGE_MAP;
} else {
- SortedMap<Revision, Range> transformed = new TreeMap<Revision,
Range>(
+ NavigableMap<Revision, Range> transformed = new
TreeMap<Revision, Range>(
StableRevisionComparator.REVERSE);
for (Map.Entry<Revision, String> entry : map.entrySet()) {
Revision high = entry.getKey();
Revision low = Revision.fromString(entry.getValue());
transformed.put(high, new Range(high, low));
}
- previous = Collections.unmodifiableSortedMap(transformed);
+ previous = Maps.unmodifiableNavigableMap(transformed);
}
}
return previous;
@@ -766,7 +777,22 @@ public class NodeDocument extends Docume
if (revision == null) {
return new PropertyHistory(store, this, property);
} else {
- return filter(transform(getPreviousRanges().entrySet(),
+ // first try to lookup revision directly
+ Revision r = getPreviousRanges().floorKey(revision);
+ if (r != null) {
+ String prevId = Utils.getPreviousIdFor(getId(), r);
+ NodeDocument prev = store.find(Collection.NODES, prevId);
+ if (prev != null) {
+ if (prev.getValueMap(property).containsKey(revision)) {
+ return Collections.singleton(prev);
+ }
+ } else {
+ LOG.warn("Document with previous revisions not found: " +
prevId);
+ }
+ }
+
+ // didn't find entry -> scan through remaining head ranges
+ return
filter(transform(getPreviousRanges().headMap(revision).entrySet(),
new Function<Map.Entry<Revision, Range>, NodeDocument>() {
@Override
public NodeDocument apply(Map.Entry<Revision, Range> input) {
@@ -955,44 +981,49 @@ public class NodeDocument extends Docume
* <code>readRevision</code>.
*
* @param revision the revision to check.
+ * @param commitValue the commit value of the revision to check or
+ * <code>null</code> if unknown.
* @param readRevision the read revision.
* @return <code>true</code> if the revision is committed, otherwise
* <code>false</code>.
*/
private boolean isCommitted(@Nonnull RevisionContext context,
@Nonnull Revision revision,
+ @Nullable String commitValue,
@Nonnull Revision readRevision) {
if (revision.equalsIgnoreBranch(readRevision)) {
return true;
}
- String value = getCommitValue(revision);
- if (value == null) {
+ if (commitValue == null) {
+ commitValue = getCommitValue(revision);
+ }
+ if (commitValue == null) {
return false;
}
- if (Utils.isCommitted(value)) {
+ if (Utils.isCommitted(commitValue)) {
if (context.getBranches().getBranch(readRevision) == null
&& !readRevision.isBranch()) {
// resolve commit revision
- revision = Utils.resolveCommitRevision(revision, value);
+ revision = Utils.resolveCommitRevision(revision, commitValue);
// readRevision is not from a branch
// compare resolved revision as is
return !isRevisionNewer(context, revision, readRevision);
} else {
// on same merged branch?
- if
(value.equals(getCommitValue(readRevision.asTrunkRevision()))) {
+ if
(commitValue.equals(getCommitValue(readRevision.asTrunkRevision()))) {
// compare unresolved revision
return !isRevisionNewer(context, revision, readRevision);
}
}
} else {
// branch commit (not merged)
- if (Revision.fromString(value).getClusterId() !=
context.getClusterId()) {
+ if (Revision.fromString(commitValue).getClusterId() !=
context.getClusterId()) {
// this is an unmerged branch commit from another cluster node,
// hence never visible to us
return false;
}
}
- return includeRevision(context, Utils.resolveCommitRevision(revision,
value), readRevision);
+ return includeRevision(context, Utils.resolveCommitRevision(revision,
commitValue), readRevision);
}
/**
@@ -1077,7 +1108,7 @@ public class NodeDocument extends Docume
Utils.resolveCommitRevision(propRev, commitValue))) {
continue;
}
- if (isValidRevision(context, propRev, readRevision,
validRevisions)) {
+ if (isValidRevision(context, propRev, commitValue, readRevision,
validRevisions)) {
// TODO: need to check older revisions as well?
latestRev = Utils.resolveCommitRevision(propRev, commitValue);
value = entry.getValue();