Author: mduerig
Date: Thu Jan 30 15:40:50 2014
New Revision: 1562859

URL: http://svn.apache.org/r1562859
Log:
OAK-1370: Guard against concurrent read access through the same session
Wrap iterators returned from Node into synchronized wrappers

Modified:
    
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
    
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/NodeImpl.java
    
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentReadIT.java

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java?rev=1562859&r1=1562858&r2=1562859&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
 Thu Jan 30 15:40:50 2014
@@ -20,6 +20,7 @@ import static com.google.common.base.Pre
 import static org.apache.jackrabbit.oak.commons.PathUtils.denotesRoot;
 
 import java.io.IOException;
+import java.util.Iterator;
 
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
@@ -93,6 +94,17 @@ public class SessionDelegate {
     }
 
     /**
+     * Wrap the passed {@code iterator} in an iterator that synchronizes
+     * all access to the underlying session.
+     * @param iterator  iterator to synchronized
+     * @param <T>
+     * @return  synchronized iterator
+     */
+    public <T> Iterator<T> sync(Iterator<T> iterator) {
+        return new SynchronizedIterator<T>(iterator);
+    }
+
+    /**
      * Performs the passed {@code SessionOperation} in a safe execution 
context. This
      * context ensures that the session is refreshed if necessary and that 
refreshing
      * occurs before the session operation is performed and the refreshing is 
done only
@@ -463,4 +475,43 @@ public class SessionDelegate {
     private static RepositoryException 
newRepositoryException(CommitFailedException exception) {
         return exception.asRepositoryException();
     }
+
+    //------------------------------------------------------------< 
SynchronizedIterator >---
+
+    /**
+     * This iterator delegates to a backing iterator and synchronises
+     * all calls to the backing iterator on this {@code SessionDelegate}
+     * instance.
+     *
+     * @param <T>
+     */
+    private final class SynchronizedIterator<T> implements Iterator<T> {
+        private final Iterator<T> iterator;
+
+        SynchronizedIterator(Iterator<T> iterator) {
+            this.iterator = iterator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            synchronized (SessionDelegate.this) {
+                return iterator.hasNext();
+            }
+        }
+
+        @Override
+        public T next() {
+            synchronized (SessionDelegate.this) {
+                return iterator.next();
+            }
+        }
+
+        @Override
+        public void remove() {
+            synchronized (SessionDelegate.this) {
+                iterator.remove();
+            }
+        }
+    }
+
 }

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/NodeImpl.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/NodeImpl.java?rev=1562859&r1=1562858&r2=1562859&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/NodeImpl.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/NodeImpl.java
 Thu Jan 30 15:40:50 2014
@@ -17,6 +17,7 @@
 package org.apache.jackrabbit.oak.jcr.session;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Iterators.transform;
 import static com.google.common.collect.Sets.newLinkedHashSet;
 import static java.util.Arrays.asList;
 import static java.util.Collections.singleton;
@@ -760,7 +761,7 @@ public class NodeImpl<T extends NodeDele
                         }
                 );
 
-                return new PropertyIteratorAdapter(properties.iterator());
+                return new 
PropertyIteratorAdapter(sessionDelegate.sync(properties.iterator()));
             }
         });
     }
@@ -1279,25 +1280,25 @@ public class NodeImpl<T extends NodeDele
     }
 
     private Iterator<Node> nodeIterator(Iterator<NodeDelegate> childNodes) {
-        return Iterators.transform(
+        return sessionDelegate.sync(transform(
                 childNodes,
                 new Function<NodeDelegate, Node>() {
                     @Override
                     public Node apply(NodeDelegate nodeDelegate) {
                         return new NodeImpl<NodeDelegate>(nodeDelegate, 
sessionContext);
                     }
-                });
+                }));
     }
 
     private Iterator<Property> propertyIterator(Iterator<PropertyDelegate> 
properties) {
-        return Iterators.transform(
+        return sessionDelegate.sync(transform(
                 properties,
                 new Function<PropertyDelegate, Property>() {
                     @Override
                     public Property apply(PropertyDelegate propertyDelegate) {
                         return new PropertyImpl(propertyDelegate, 
sessionContext);
                     }
-                });
+                }));
     }
 
     private void checkValidWorkspace(String workspaceName)

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentReadIT.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentReadIT.java?rev=1562859&r1=1562858&r2=1562859&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentReadIT.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentReadIT.java
 Thu Jan 30 15:40:50 2014
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 
 import javax.jcr.Node;
 import javax.jcr.NodeIterator;
+import javax.jcr.PropertyIterator;
 import javax.jcr.RepositoryException;
 import javax.jcr.Session;
 
@@ -35,7 +36,6 @@ import com.google.common.util.concurrent
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import org.junit.Ignore;
 import org.junit.Test;
 
 /**
@@ -47,7 +47,6 @@ public class ConcurrentReadIT extends Ab
         super(fixture);
     }
 
-    @Ignore("OAK-1370")  // FIXME OAK-1370
     @Test
     public void concurrentNodeIteration()
             throws RepositoryException, InterruptedException, 
ExecutionException {
@@ -86,4 +85,42 @@ public class ConcurrentReadIT extends Ab
         }
     }
 
+    @Test
+    public void concurrentPropertyIteration()
+            throws RepositoryException, InterruptedException, 
ExecutionException {
+        final Session session = createAdminSession();
+        try {
+            final Node testRoot = session.getRootNode().addNode("test-root");
+            for (int k = 0; k < 50; k++) {
+                testRoot.setProperty("p" + k, k);
+            }
+            session.save();
+
+            ListeningExecutorService executorService = 
MoreExecutors.listeningDecorator(
+                    Executors.newCachedThreadPool());
+
+            List<ListenableFuture<?>> futures = Lists.newArrayList();
+            for (int k = 0; k < 20; k ++) {
+                futures.add(executorService.submit(new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        for (int k = 0; k < 100000; k++) {
+                            session.refresh(false);
+                            PropertyIterator properties = 
testRoot.getProperties();
+                            properties.hasNext();
+                        }
+                        return null;
+                    }
+                }));
+            }
+
+            // Throws ExecutionException if any of the submitted task failed
+            Futures.allAsList(futures).get();
+            executorService.shutdown();
+            executorService.awaitTermination(1, TimeUnit.DAYS);
+        } finally {
+            session.logout();
+        }
+    }
+
 }


Reply via email to