Author: mduerig
Date: Thu Jan 30 15:40:50 2014
New Revision: 1562859
URL: http://svn.apache.org/r1562859
Log:
OAK-1370: Guard against concurrent read access through the same session
Wrap iterators returned from Node into synchronized wrappers
Modified:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/NodeImpl.java
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentReadIT.java
Modified:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java?rev=1562859&r1=1562858&r2=1562859&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
(original)
+++
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
Thu Jan 30 15:40:50 2014
@@ -20,6 +20,7 @@ import static com.google.common.base.Pre
import static org.apache.jackrabbit.oak.commons.PathUtils.denotesRoot;
import java.io.IOException;
+import java.util.Iterator;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
@@ -93,6 +94,17 @@ public class SessionDelegate {
}
/**
+ * Wrap the passed {@code iterator} in an iterator that synchronizes
+ * all access to the underlying session.
+ * @param iterator iterator to synchronized
+ * @param <T>
+ * @return synchronized iterator
+ */
+ public <T> Iterator<T> sync(Iterator<T> iterator) {
+ return new SynchronizedIterator<T>(iterator);
+ }
+
+ /**
* Performs the passed {@code SessionOperation} in a safe execution
context. This
* context ensures that the session is refreshed if necessary and that
refreshing
* occurs before the session operation is performed and the refreshing is
done only
@@ -463,4 +475,43 @@ public class SessionDelegate {
private static RepositoryException
newRepositoryException(CommitFailedException exception) {
return exception.asRepositoryException();
}
+
+ //------------------------------------------------------------<
SynchronizedIterator >---
+
+ /**
+ * This iterator delegates to a backing iterator and synchronises
+ * all calls to the backing iterator on this {@code SessionDelegate}
+ * instance.
+ *
+ * @param <T>
+ */
+ private final class SynchronizedIterator<T> implements Iterator<T> {
+ private final Iterator<T> iterator;
+
+ SynchronizedIterator(Iterator<T> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ synchronized (SessionDelegate.this) {
+ return iterator.hasNext();
+ }
+ }
+
+ @Override
+ public T next() {
+ synchronized (SessionDelegate.this) {
+ return iterator.next();
+ }
+ }
+
+ @Override
+ public void remove() {
+ synchronized (SessionDelegate.this) {
+ iterator.remove();
+ }
+ }
+ }
+
}
Modified:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/NodeImpl.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/NodeImpl.java?rev=1562859&r1=1562858&r2=1562859&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/NodeImpl.java
(original)
+++
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/NodeImpl.java
Thu Jan 30 15:40:50 2014
@@ -17,6 +17,7 @@
package org.apache.jackrabbit.oak.jcr.session;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Iterators.transform;
import static com.google.common.collect.Sets.newLinkedHashSet;
import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
@@ -760,7 +761,7 @@ public class NodeImpl<T extends NodeDele
}
);
- return new PropertyIteratorAdapter(properties.iterator());
+ return new
PropertyIteratorAdapter(sessionDelegate.sync(properties.iterator()));
}
});
}
@@ -1279,25 +1280,25 @@ public class NodeImpl<T extends NodeDele
}
private Iterator<Node> nodeIterator(Iterator<NodeDelegate> childNodes) {
- return Iterators.transform(
+ return sessionDelegate.sync(transform(
childNodes,
new Function<NodeDelegate, Node>() {
@Override
public Node apply(NodeDelegate nodeDelegate) {
return new NodeImpl<NodeDelegate>(nodeDelegate,
sessionContext);
}
- });
+ }));
}
private Iterator<Property> propertyIterator(Iterator<PropertyDelegate>
properties) {
- return Iterators.transform(
+ return sessionDelegate.sync(transform(
properties,
new Function<PropertyDelegate, Property>() {
@Override
public Property apply(PropertyDelegate propertyDelegate) {
return new PropertyImpl(propertyDelegate,
sessionContext);
}
- });
+ }));
}
private void checkValidWorkspace(String workspaceName)
Modified:
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentReadIT.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentReadIT.java?rev=1562859&r1=1562858&r2=1562859&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentReadIT.java
(original)
+++
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentReadIT.java
Thu Jan 30 15:40:50 2014
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import javax.jcr.Node;
import javax.jcr.NodeIterator;
+import javax.jcr.PropertyIterator;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
@@ -35,7 +36,6 @@ import com.google.common.util.concurrent
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-import org.junit.Ignore;
import org.junit.Test;
/**
@@ -47,7 +47,6 @@ public class ConcurrentReadIT extends Ab
super(fixture);
}
- @Ignore("OAK-1370") // FIXME OAK-1370
@Test
public void concurrentNodeIteration()
throws RepositoryException, InterruptedException,
ExecutionException {
@@ -86,4 +85,42 @@ public class ConcurrentReadIT extends Ab
}
}
+ @Test
+ public void concurrentPropertyIteration()
+ throws RepositoryException, InterruptedException,
ExecutionException {
+ final Session session = createAdminSession();
+ try {
+ final Node testRoot = session.getRootNode().addNode("test-root");
+ for (int k = 0; k < 50; k++) {
+ testRoot.setProperty("p" + k, k);
+ }
+ session.save();
+
+ ListeningExecutorService executorService =
MoreExecutors.listeningDecorator(
+ Executors.newCachedThreadPool());
+
+ List<ListenableFuture<?>> futures = Lists.newArrayList();
+ for (int k = 0; k < 20; k ++) {
+ futures.add(executorService.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ for (int k = 0; k < 100000; k++) {
+ session.refresh(false);
+ PropertyIterator properties =
testRoot.getProperties();
+ properties.hasNext();
+ }
+ return null;
+ }
+ }));
+ }
+
+ // Throws ExecutionException if any of the submitted task failed
+ Futures.allAsList(futures).get();
+ executorService.shutdown();
+ executorService.awaitTermination(1, TimeUnit.DAYS);
+ } finally {
+ session.logout();
+ }
+ }
+
}