Author: mduerig
Date: Mon Mar 24 16:32:17 2014
New Revision: 1580905

URL: http://svn.apache.org/r1580905
Log:
OAK-1601: Log warning on concurrent session access
Use explicit lock to synchronise concurrent session access and to log warnings 
upon such

Modified:
    
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
    
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/operation/SessionOperation.java

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java?rev=1580905&r1=1580904&r2=1580905&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
 Mon Mar 24 16:32:17 2014
@@ -30,6 +30,8 @@ import java.io.IOException;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
@@ -116,6 +118,14 @@ public class SessionDelegate {
     private String userData = null;
 
     /**
+     * The lock used to guarantee synchronized execution of repository
+     * operations. An explicit lock is used instead of normal Java
+     * synchronization in order to be able to log attempts to concurrently
+     * use a session.
+     */
+    private final Lock lock = new ReentrantLock();
+
+    /**
      * Create a new session delegate for a {@code ContentSession}. The refresh 
behaviour of the
      * session is governed by the value of the {@code refreshInterval} 
argument: if the session
      * has been idle longer than that value, an implicit refresh will take 
place.
@@ -208,8 +218,13 @@ public class SessionDelegate {
         return saveCount;
     }
 
-    public synchronized void refreshAtNextAccess() {
-        refreshAtNextAccess = true;
+    public void refreshAtNextAccess() {
+        lock.lock();
+        try {
+            refreshAtNextAccess = true;
+        } finally {
+            lock.unlock();
+        }
     }
 
     /**
@@ -235,58 +250,88 @@ public class SessionDelegate {
      * @throws RepositoryException
      * @see #getRoot()
      */
-    public synchronized <T> T perform(SessionOperation<T> sessionOperation)
+    public <T> T perform(SessionOperation<T> sessionOperation)
             throws RepositoryException {
-        // Synchronize to avoid conflicting refreshes from concurrent JCR API 
calls
         long t0 = clock.getTime();
-        if (sessionOpCount == 0) {
-            // Refresh and precondition checks only for non re-entrant
-            // session operations. Don't refresh if this operation is a
-            // refresh operation itself or a save operation, which does an
-            // implicit refresh, or logout for obvious reasons.
-            if (!sessionOperation.isRefresh()
-                    && !sessionOperation.isSave()
-                    && !sessionOperation.isLogout()
-                    && (refreshAtNextAccess
-                        || sessionSaveCount != getThreadSaveCount()
-                        || refreshStrategy.needsRefresh(
-                                SECONDS.convert(t0 - accessTime, 
MILLISECONDS)))) {
-                refresh(true);
-                refreshAtNextAccess = false;
-                sessionSaveCount = getThreadSaveCount();
-                updateCount++;
+
+        // Acquire the exclusive lock for accessing session internals.
+        // No other session should be holding the lock, so we log a
+        // message to let the user know of such cases.
+        if (!lock.tryLock()) {
+            if (sessionOperation.isUpdate()) {
+                Exception trace = new Exception(
+                        "Stack trace of concurrent access to " + 
contentSession);
+                log.warn("Attempt to perform " + sessionOperation.description()
+                        + " while another thread is concurrently writing"
+                        + " to " + contentSession + ". Blocking until the 
other"
+                        + " thread is finished using this session. Please"
+                        + " review your code to avoid concurrent use of"
+                        + " a session.", trace);
+            } else if (log.isDebugEnabled()) {
+                Exception trace = new Exception(
+                        "Stack trace of concurrent access to " + 
contentSession);
+                log.warn("Attempt to perform " + sessionOperation.description()
+                        + " while another thread is concurrently reading from "
+                        + contentSession + ". Blocking until the other thread"
+                        + " is finished using this session. Please"
+                        + " review your code to avoid concurrent use of"
+                        + " a session.", trace);
             }
-            sessionOperation.checkPreconditions();
+            lock.lock();
         }
+
         try {
-            sessionOpCount++;
-            T result =  sessionOperation.perform();
-            logOperationDetails(sessionOperation);
-            return result;
-        } finally {
-            accessTime = t0;
-            long dt = NANOSECONDS.convert(clock.getTime() - t0, MILLISECONDS);
-            sessionOpCount--;
-            if (sessionOperation.isUpdate()) {
-                writeTime = t0;
-                writeCount++;
-                writeCounter.incrementAndGet();
-                writeDuration.addAndGet(dt);
-                updateCount++;
-            } else {
-                readTime = t0;
-                readCount++;
-                readCounter.incrementAndGet();
-                readDuration.addAndGet(dt);
-            }
-            if (sessionOperation.isSave()) {
-                refreshAtNextAccess = false;
-                // Force refreshing on access through other sessions on the 
same thread
-                threadSaveCount.set(sessionSaveCount = (getThreadSaveCount() + 
1));
-            } else if (sessionOperation.isRefresh()) {
-                refreshAtNextAccess = false;
-                sessionSaveCount = getThreadSaveCount();
+            if (sessionOpCount == 0) {
+                // Refresh and precondition checks only for non re-entrant
+                // session operations. Don't refresh if this operation is a
+                // refresh operation itself or a save operation, which does an
+                // implicit refresh, or logout for obvious reasons.
+                if (!sessionOperation.isRefresh()
+                        && !sessionOperation.isSave()
+                        && !sessionOperation.isLogout()
+                        && (refreshAtNextAccess
+                        || sessionSaveCount != getThreadSaveCount()
+                        || refreshStrategy.needsRefresh(
+                        SECONDS.convert(t0 - accessTime, MILLISECONDS)))) {
+                    refresh(true);
+                    refreshAtNextAccess = false;
+                    sessionSaveCount = getThreadSaveCount();
+                    updateCount++;
+                }
+                sessionOperation.checkPreconditions();
+            }
+            try {
+                sessionOpCount++;
+                T result = sessionOperation.perform();
+                logOperationDetails(contentSession, sessionOperation);
+                return result;
+            } finally {
+                accessTime = t0;
+                long dt = NANOSECONDS.convert(clock.getTime() - t0, 
MILLISECONDS);
+                sessionOpCount--;
+                if (sessionOperation.isUpdate()) {
+                    writeTime = t0;
+                    writeCount++;
+                    writeCounter.incrementAndGet();
+                    writeDuration.addAndGet(dt);
+                    updateCount++;
+                } else {
+                    readTime = t0;
+                    readCount++;
+                    readCounter.incrementAndGet();
+                    readDuration.addAndGet(dt);
+                }
+                if (sessionOperation.isSave()) {
+                    refreshAtNextAccess = false;
+                    // Force refreshing on access through other sessions on 
the same thread
+                    threadSaveCount.set(sessionSaveCount = 
(getThreadSaveCount() + 1));
+                } else if (sessionOperation.isRefresh()) {
+                    refreshAtNextAccess = false;
+                    sessionSaveCount = getThreadSaveCount();
+                }
             }
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -594,12 +639,14 @@ public class SessionDelegate {
 
     //------------------------------------------------------------< internal 
>---
 
-    private <T> void logOperationDetails(SessionOperation<T> ops)  throws 
RepositoryException {
-        if(operationLogger.isDebugEnabled()){
+    private static <T> void logOperationDetails(ContentSession session, 
SessionOperation<T> ops)
+            throws RepositoryException {
+        if (operationLogger.isDebugEnabled()){
             String desc = ops.description();
-            if(desc != null){
-                Marker sessionMarker = 
MarkerFactory.getMarker(this.toString());
-                operationLogger.debug(sessionMarker,String.format("[%s] 
%s",toString(),desc));
+            if (desc != null){
+                Marker sessionMarker = 
MarkerFactory.getMarker(session.toString());
+                String sessionId = session.toString();
+                operationLogger.debug(sessionMarker,String.format("[%s] %s", 
sessionId, desc));
             }
         }
     }
@@ -635,22 +682,31 @@ public class SessionDelegate {
 
         @Override
         public boolean hasNext() {
-            synchronized (SessionDelegate.this) {
+            lock.lock();
+            try {
                 return iterator.hasNext();
+            } finally {
+                lock.unlock();
             }
         }
 
         @Override
         public T next() {
-            synchronized (SessionDelegate.this) {
+            lock.lock();
+            try {
                 return iterator.next();
+            } finally {
+                lock.unlock();
             }
         }
 
         @Override
         public void remove() {
-            synchronized (SessionDelegate.this) {
+            lock.lock();
+            try {
                 iterator.remove();
+            } finally {
+                lock.unlock();
             }
         }
     }

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/operation/SessionOperation.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/operation/SessionOperation.java?rev=1580905&r1=1580904&r2=1580905&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/operation/SessionOperation.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/operation/SessionOperation.java
 Mon Mar 24 16:32:17 2014
@@ -68,7 +68,7 @@ public abstract class SessionOperation<T
     /**
      * Provide details about the operation being performed
      *
-     * @return operation description. Would return <code>null</code>
+     * @return operation description. Would return {@code null}
      * if no description provided
      */
     public String description() throws RepositoryException{


Reply via email to