Author: mreutegg
Date: Thu Feb 13 15:03:00 2014
New Revision: 1567943
URL: http://svn.apache.org/r1567943
Log:
OAK-1420: ConcurrentAddIT fails on buildbot
Potential fix. With these changes ConcurrentConflictTest.concurrentUpdates()
now runs successful -> enabled.
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConcurrentConflictTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java?rev=1567943&r1=1567942&r2=1567943&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java
Thu Feb 13 15:03:00 2014
@@ -36,6 +36,8 @@ import org.apache.jackrabbit.oak.commons
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
+import static
org.apache.jackrabbit.oak.plugins.document.NodeDocument.COLLISIONS;
import static
org.apache.jackrabbit.oak.plugins.document.NodeDocument.SPLIT_CANDIDATE_THRESHOLD;
/**
@@ -243,7 +245,7 @@ public class Commit {
try {
if (newNodes.size() > 0) {
// set commit root on new nodes
- if (!store.create(Collection.NODES, newNodes)) {
+ if (!store.create(NODES, newNodes)) {
// some of the documents already exist:
// try to apply all changes one by one
for (UpdateOp op : newNodes) {
@@ -276,14 +278,41 @@ public class Commit {
if (changedNodes.size() > 0 || !commitRoot.isNew()) {
NodeDocument.setRevision(commitRoot, revision, commitValue);
opLog.add(commitRoot);
- createOrUpdateNode(store, commitRoot);
+ if (baseBranchRevision == null) {
+ // create a clone of the commitRoot in order
+ // to set isNew to false. If we get here the
+ // commitRoot document already exists and
+ // only needs an update
+ UpdateOp commit = commitRoot.clone(commitRoot.getId());
+ commit.setNew(false);
+ // only set revision on commit root when there is
+ // no collision for this commit revision
+ commit.containsMapEntry(COLLISIONS, revision, false);
+ NodeDocument before = store.findAndUpdate(NODES, commit);
+ if (before == null) {
+ String msg = "Conflicting concurrent change. " +
+ "Update operation failed: " + commitRoot;
+ throw new MicroKernelException(msg);
+ } else {
+ // if we get here the commit was successful and
+ // the commit revision is set on the commitRoot
+ // document for this commit.
+ // now check for conflicts/collisions by other commits.
+ // use original commitRoot operation with
+ // correct isNew flag.
+ checkConflicts(commitRoot, before);
+ checkSplitCandidate(before);
+ }
+ } else {
+ // this is a branch commit, do not fail on collisions now
+ // trying to merge the branch will fail later
+ createOrUpdateNode(store, commitRoot);
+ }
operations.put(commitRootPath, commitRoot);
}
} catch (MicroKernelException e) {
- rollback(newNodes, opLog);
- String msg = "Exception committing " + diff.toString();
- LOG.debug(msg, e);
- throw new MicroKernelException(msg, e);
+ rollback(newNodes, opLog, commitRoot);
+ throw e;
}
}
@@ -309,14 +338,14 @@ public class Commit {
if (op.isNew()) {
NodeDocument.setChildrenFlag(op, true);
} else {
- NodeDocument nd = store.getIfCached(Collection.NODES,
Utils.getIdFromPath(parentPath));
+ NodeDocument nd = store.getIfCached(NODES,
Utils.getIdFromPath(parentPath));
if (nd != null && nd.hasChildren()) {
continue;
}
NodeDocument.setChildrenFlag(op, true);
}
} else {
- NodeDocument nd = store.getIfCached(Collection.NODES,
Utils.getIdFromPath(parentPath));
+ NodeDocument nd = store.getIfCached(NODES,
Utils.getIdFromPath(parentPath));
if (nd != null && nd.hasChildren()) {
//Flag already set to true. Nothing to do
continue;
@@ -328,15 +357,20 @@ public class Commit {
}
}
- private void rollback(ArrayList<UpdateOp> newDocuments,
ArrayList<UpdateOp> changed) {
+ private void rollback(List<UpdateOp> newDocuments,
+ List<UpdateOp> changed,
+ UpdateOp commitRoot) {
DocumentStore store = nodeStore.getDocumentStore();
for (UpdateOp op : changed) {
UpdateOp reverse = op.getReverseOperation();
- store.createOrUpdate(Collection.NODES, reverse);
+ store.createOrUpdate(NODES, reverse);
}
for (UpdateOp op : newDocuments) {
- store.remove(Collection.NODES, op.id);
+ store.remove(NODES, op.id);
}
+ UpdateOp removeCollision = new UpdateOp(commitRoot.getId(), false);
+ NodeDocument.removeCollision(removeCollision, revision);
+ store.createOrUpdate(NODES, removeCollision);
}
/**
@@ -346,13 +380,35 @@ public class Commit {
* @param store the store
* @param op the operation
*/
- public void createOrUpdateNode(DocumentStore store, UpdateOp op) {
+ private void createOrUpdateNode(DocumentStore store, UpdateOp op) {
+ NodeDocument doc = store.createOrUpdate(NODES, op);
+ checkConflicts(op, doc);
+ checkSplitCandidate(doc);
+ }
+
+ private void checkSplitCandidate(@Nullable NodeDocument doc) {
+ if (doc != null && doc.getMemory() > SPLIT_CANDIDATE_THRESHOLD) {
+ nodeStore.addSplitCandidate(doc.getId());
+ }
+ }
+
+ /**
+ * Checks if the update operation introduced any conflicts on the given
+ * document. The document shows the state right before the operation was
+ * applied.
+ *
+ * @param op the update operation.
+ * @param before how the document looked before the update was applied or
+ * {@code null} if it didn't exist before.
+ */
+ private void checkConflicts(@Nonnull UpdateOp op,
+ @Nullable NodeDocument before) {
+ DocumentStore store = nodeStore.getDocumentStore();
collisions.clear();
- NodeDocument doc = store.createOrUpdate(Collection.NODES, op);
if (baseRevision != null) {
Revision newestRev = null;
- if (doc != null) {
- newestRev = doc.getNewestRevision(nodeStore, revision,
+ if (before != null) {
+ newestRev = before.getNewestRevision(nodeStore, revision,
new CollisionHandler() {
@Override
void concurrentModification(Revision other) {
@@ -363,19 +419,19 @@ public class Commit {
String conflictMessage = null;
if (newestRev == null) {
if (op.isDelete() || !op.isNew()) {
- conflictMessage = "The node " +
+ conflictMessage = "The node " +
op.getId() + " does not exist or is already
deleted";
}
} else {
if (op.isNew()) {
- conflictMessage = "The node " +
+ conflictMessage = "The node " +
op.getId() + " was already added in revision\n" +
newestRev;
} else if (nodeStore.isRevisionNewer(newestRev, baseRevision)
- && (op.isDelete() || isConflicting(doc, op))) {
- conflictMessage = "The node " +
+ && (op.isDelete() || isConflicting(before, op))) {
+ conflictMessage = "The node " +
op.getId() + " was changed in revision\n" +
newestRev +
- ", which was applied after the base revision\n" +
+ ", which was applied after the base revision\n" +
baseRevision;
}
}
@@ -384,10 +440,10 @@ public class Commit {
// -> check for collisions and conflict (concurrent updates
// on a node are possible if property updates do not overlap)
// TODO: unify above conflict detection and isConflicting()
- if (!collisions.isEmpty() && isConflicting(doc, op)) {
+ if (!collisions.isEmpty() && isConflicting(before, op)) {
for (Revision r : collisions) {
// mark collisions on commit root
- Collision c = new Collision(doc, r, op, revision,
nodeStore);
+ Collision c = new Collision(before, r, op, revision,
nodeStore);
if (c.mark(store).equals(revision)) {
// our revision was marked
if (baseRevision.isBranch()) {
@@ -405,16 +461,12 @@ public class Commit {
}
}
if (conflictMessage != null) {
- conflictMessage += ", before\n" + revision +
- "; document:\n" + (doc == null ? "" : doc.format()) +
+ conflictMessage += ", before\n" + revision +
+ "; document:\n" + (before == null ? "" :
before.format()) +
",\nrevision order:\n" +
nodeStore.getRevisionComparator();
throw new MicroKernelException(conflictMessage);
}
}
-
- if (doc != null && doc.getMemory() > SPLIT_CANDIDATE_THRESHOLD) {
- nodeStore.addSplitCandidate(doc.getId());
- }
}
/**
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java?rev=1567943&r1=1567942&r2=1567943&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UpdateOp.java
Thu Feb 13 15:03:00 2014
@@ -74,6 +74,10 @@ public final class UpdateOp {
public boolean isNew() {
return isNew;
}
+
+ public void setNew(boolean isNew) {
+ this.isNew = isNew;
+ }
void setDelete(boolean isDelete) {
this.isDelete = isDelete;
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConcurrentConflictTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConcurrentConflictTest.java?rev=1567943&r1=1567942&r2=1567943&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConcurrentConflictTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConcurrentConflictTest.java
Thu Feb 13 15:03:00 2014
@@ -39,7 +39,8 @@ import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
/**
- * <code>ConcurrentConflictTest</code>...
+ * Updates multiple nodes in the same commit with multiple threads and verifies
+ * the commit is atomic.
*/
public class ConcurrentConflictTest extends BaseDocumentMKTest {
@@ -47,7 +48,7 @@ public class ConcurrentConflictTest exte
private static final Logger LOG =
LoggerFactory.getLogger(ConcurrentConflictTest.class);
private static final int NUM_WRITERS = 3;
private static final int NUM_NODES = 10;
- private static final int NUM_TRANSFERS_PER_THREAD = 10;
+ private static final int NUM_TRANSFERS_PER_THREAD = 100;
private DocumentStore store;
private List<DocumentMK> kernels = new ArrayList<DocumentMK>();
private final StringBuilder logBuffer = new StringBuilder();
@@ -77,7 +78,6 @@ public class ConcurrentConflictTest exte
concurrentUpdates(true);
}
- @Ignore
@Test
public void concurrentUpdates() throws Exception {
concurrentUpdates(false);
@@ -114,7 +114,7 @@ public class ConcurrentConflictTest exte
} catch (Exception e) {
exceptions.add(e);
}
- log("conflicts: " + conflictSet);
+ log("conflicts (" + conflictSet.cardinality() + "): " +
conflictSet);
}
private boolean transfer() throws Exception {