Author: mduerig
Date: Mon Sep 2 12:55:55 2013
New Revision: 1519422
URL: http://svn.apache.org/r1519422
Log:
OAK-960 Enable session refresh state coordination between multiple session in
single thread
Slice RefreshManager into a composite of RefreshStrategy
Added:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RefreshStrategy.java
(with props)
Removed:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RefreshManager.java
Modified:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RepositoryImpl.java
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
Added:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RefreshStrategy.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RefreshStrategy.java?rev=1519422&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RefreshStrategy.java
(added)
+++
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RefreshStrategy.java
Mon Sep 2 12:55:55 2013
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.jcr;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import org.apache.jackrabbit.oak.jcr.operation.SessionOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Instances of this class determine whether a session needs to be refreshed
base on
+ * the current {@link SessionOperation operation} to be performed and past
+ * {@link #refreshed() refreshes} and {@link #saved() saves}.
+ * <p>
+ * Before an operation is performed a session calls {@link
#needsRefresh(SessionOperation)},
+ * to determine whether the session needs to be refreshed first. To maintain a
session strategy's
+ * state sessions call {@link #refreshed()} right after each refresh operation
and
+ * {@link #saved()} right after each save operation.
+ * <p>
+ * {@code RefreshStrategy} is a composite of zero or more {@code
RefreshStrategy} instances,
+ * each of which covers a certain strategy.
+ * @see Default
+ * @see Once
+ * @see Timed
+ * @see LogOnce
+ * @see ThreadSynchronising
+ */
+public class RefreshStrategy {
+ private static final Logger log =
LoggerFactory.getLogger(RefreshStrategy.class);
+
+ private final RefreshStrategy[] refreshStrategies;
+
+ /**
+ * Create a new instance consisting of the composite of the passed {@code
RefreshStrategy}
+ * instances.
+ * @param refreshStrategies individual refresh strategies
+ */
+ public RefreshStrategy(RefreshStrategy... refreshStrategies) {
+ this.refreshStrategies = refreshStrategies;
+ }
+
+ /**
+ * Determine whether the session needs to refresh before {@code
sessionOperation} is performed.
+ * <p>
+ * This implementation return {@code false} if either {@code
sessionsOperation} is an refresh
+ * operation or a save operation. Otherwise it returns {@code true} if and
only if any of the
+ * individual refresh strategies passed to the constructor returns {@code
true}.
+ * @param sessionOperation operation about to be performed
+ * @return {@code true} if and only if the session needs to refresh.
+ */
+ public boolean needsRefresh(SessionOperation<?> sessionOperation) {
+ // Don't refresh if this operation is a refresh operation itself or
+ // a save operation, which does an implicit refresh
+ if (sessionOperation.isRefresh() || sessionOperation.isSave()) {
+ return false;
+ }
+
+ boolean refresh = false;
+ // Don't shortcut here since the individual strategies rely on side
effects of this call
+ for (RefreshStrategy r : refreshStrategies) {
+ refresh |= r.needsRefresh(sessionOperation);
+ }
+ return refresh;
+ }
+
+ /**
+ * Called whenever a session has been refreshed.
+ * <p>
+ * This implementation forwards to the {@code refresh} method of the
individual refresh
+ * strategies passed to the constructor.
+ */
+ public void refreshed() {
+ for (RefreshStrategy r : refreshStrategies) {
+ r.refreshed();
+ }
+ }
+
+ /**
+ * Called whenever a session has been saved.
+ * <p>
+ * This implementation forwards to the {@code save} method of the
individual refresh
+ * strategies passed to the constructor.
+ */
+ public void saved() {
+ for (RefreshStrategy r : refreshStrategies) {
+ r.saved();
+ }
+ }
+
+ /**
+ * Accept the passed visitor.
+ * <p>
+ * This implementation forwards to the {@code accept} method of the
individual refresh
+ * strategies passed to the constructor.
+ */
+ public void accept(Visitor visitor) {
+ for (RefreshStrategy r: refreshStrategies) {
+ r.accept(visitor);
+ }
+ }
+
+ /**
+ * Visitor for traversing the composite.
+ */
+ public static class Visitor {
+ public void visit(Default strategy) {}
+ public void visit(Once strategy) {}
+ public void visit(Timed strategy) {}
+ public void visit(LogOnce strategy) {}
+ public void visit(ThreadSynchronising strategy) {}
+ }
+
+ /**
+ * This refresh strategy does wither always or never refresh depending of
the value of the
+ * {@code refresh} argument passed to its constructor.
+ * <p>
+ */
+ public static class Default extends RefreshStrategy {
+
+ /** A refresh strategy that always refreshed */
+ public static RefreshStrategy ALWAYS = new Default(true);
+
+ /** A refresh strategy that never refreshed */
+ public static RefreshStrategy NEVER = new Default(false);
+
+ /** Value returned from {@code needsRefresh} */
+ protected boolean refresh;
+
+ /**
+ * @param refresh value returned from {@code needsRefresh}
+ */
+ public Default(boolean refresh) {
+ this.refresh = refresh;
+ }
+
+ /**
+ * @return {@link #refresh}
+ */
+ @Override
+ public boolean needsRefresh(SessionOperation<?> sessionOperation) {
+ return refresh;
+ }
+
+ @Override
+ public void refreshed() {
+ }
+
+ @Override
+ public void saved() {
+ }
+
+ @Override
+ public void accept(Visitor visitor) {
+ visitor.visit(this);
+ }
+ }
+
+ /**
+ * This refresh strategy refreshed exactly once when enabled. Calling
+ * {@link #reset()} enables the strategy.
+ */
+ public static class Once extends Default {
+
+ /** Visitor for resetting this refresh strategy */
+ public static final Visitor RESETTING_VISITOR = new Visitor() {
+ @Override
+ public void visit(Once strategy) {
+ strategy.reset();
+ }
+ };
+
+ /**
+ * @param enabled whether this refresh strategy is initially enabled
+ */
+ public Once(boolean enabled) {
+ super(enabled);
+ }
+
+ /**
+ * Enable this refresh strategy
+ */
+ public void reset() {
+ refresh = true;
+ }
+
+ @Override
+ public void refreshed() {
+ refresh = false;
+ }
+
+ @Override
+ public void saved() {
+ refresh = false;
+ }
+
+ @Override
+ public void accept(Visitor visitor) {
+ visitor.visit(this);
+ }
+ }
+
+ /**
+ * This refresh strategy refreshes after a given timeout of inactivity.
+ */
+ public static class Timed extends RefreshStrategy {
+ private final long interval;
+ private long lastAccessed = System.currentTimeMillis();
+
+ /**
+ * @param interval Interval in seconds after which a session should
refresh if there was no
+ * activity.
+ */
+ public Timed(long interval) {
+ this.interval = MILLISECONDS.convert(interval, SECONDS);
+ }
+
+ /**
+ * Called whenever {@code needsRefresh} determines that the time out
interval was exceeded.
+ * This default implementation always returns {@code true}.
Descendants may override this
+ * method to provide more refined behaviour.
+ * @param timeElapsed the time that elapsed since the session was
last accessed.
+ * @return {@code true}
+ */
+ protected boolean timeOut(long timeElapsed) {
+ return true;
+ }
+
+ @Override
+ public boolean needsRefresh(SessionOperation<?> sessionOperation) {
+ long now = System.currentTimeMillis();
+ long timeElapsed = now - lastAccessed;
+ lastAccessed = now;
+ return timeElapsed > interval && timeOut(timeElapsed);
+ }
+
+ @Override
+ public void refreshed() {
+ lastAccessed = System.currentTimeMillis();
+ }
+
+ @Override
+ public void saved() {
+ lastAccessed = System.currentTimeMillis();
+ }
+
+ @Override
+ public void accept(Visitor visitor) {
+ visitor.visit(this);
+ }
+ }
+
+ /**
+ * This refresh strategy never refreshed the session but logs a warning if
a session has been
+ * idle for more than a given time.
+ *
+ * TODO replace logging with JMX monitoring. See OAK-941
+ */
+ public static class LogOnce extends Timed {
+ private final Exception initStackTrace = new Exception("The session
was created here:");
+
+ private boolean warnIfIdle = true;
+
+ /**
+ * @param interval Interval in seconds after which a warning is
logged if there was no
+ * activity.
+ */
+ public LogOnce(long interval) {
+ super(interval);
+ }
+
+ /**
+ * Log once
+ * @param timeElapsed the time that elapsed since the session was
last accessed.
+ * @return {@code false}
+ */
+ @Override
+ protected boolean timeOut(long timeElapsed) {
+ if (warnIfIdle) {
+ log.warn("This session has been idle for " + MINUTES.convert(
+ timeElapsed, MILLISECONDS) + " minutes and might be
out of date. " +
+ "Consider using a fresh session or explicitly refresh
the session.",
+ initStackTrace);
+ warnIfIdle = false;
+ }
+ return false;
+ }
+
+ @Override
+ public void accept(Visitor visitor) {
+ visitor.visit(this);
+ }
+ }
+
+ /**
+ * This refresh strategy synchronises session states across accesses
within the same thread.
+ */
+ public static class ThreadSynchronising extends RefreshStrategy {
+ /**
+ * ThreadLocal instance to keep track of the save operations performed
in the thread so far
+ * This is is then used to determine if the current session needs to
be refreshed to see the
+ * changes done by another session in current thread.
+ * <p>
+ * <b>Note</b> - This thread local is never cleared. However, we only
store
+ * java.lang.Integer and do not derive from ThreadLocal such that
(class loader)
+ * leaks typically associated with thread locals do not occur.
+ */
+ private final ThreadLocal<Long> threadSaveCount;
+
+ private long sessionSaveCount;
+
+ /**
+ * @param threadSaveCount thread local for tracking thread local
state.
+ */
+ public ThreadSynchronising(ThreadLocal<Long> threadSaveCount) {
+ this.threadSaveCount = threadSaveCount;
+ sessionSaveCount = getThreadSaveCount();
+ }
+
+ @Override
+ public boolean needsRefresh(SessionOperation<?> sessionOperation) {
+ // If the threadLocal counter differs from our seen
sessionSaveCount so far then
+ // some other session would have done a commit. If that is the
case a refresh would
+ // be required
+ return getThreadSaveCount() != sessionSaveCount;
+ }
+
+ @Override
+ public void refreshed() {
+ // Avoid further refreshing if refreshed already
+ sessionSaveCount = getThreadSaveCount();
+ }
+
+ @Override
+ public void saved() {
+ // Force refreshing on access through other sessions on the same
thread
+ threadSaveCount.set(sessionSaveCount = (getThreadSaveCount() + 1));
+ }
+
+ private long getThreadSaveCount() {
+ Long c = threadSaveCount.get();
+ return c == null ? 0 : c;
+ }
+
+ @Override
+ public void accept(Visitor visitor) {
+ visitor.visit(this);
+ }
+ }
+}
Propchange:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RefreshStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RefreshStrategy.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision Rev URL
Modified:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RepositoryImpl.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RepositoryImpl.java?rev=1519422&r1=1519421&r2=1519422&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RepositoryImpl.java
(original)
+++
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/RepositoryImpl.java
Mon Sep 2 12:55:55 2013
@@ -17,8 +17,6 @@
package org.apache.jackrabbit.oak.jcr;
import static com.google.common.base.Preconditions.checkNotNull;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.Collections;
import java.util.Map;
@@ -39,6 +37,10 @@ import org.apache.jackrabbit.api.securit
import org.apache.jackrabbit.commons.SimpleValueFactory;
import org.apache.jackrabbit.oak.api.ContentRepository;
import org.apache.jackrabbit.oak.api.ContentSession;
+import org.apache.jackrabbit.oak.jcr.RefreshStrategy.LogOnce;
+import org.apache.jackrabbit.oak.jcr.RefreshStrategy.Once;
+import org.apache.jackrabbit.oak.jcr.RefreshStrategy.ThreadSynchronising;
+import org.apache.jackrabbit.oak.jcr.RefreshStrategy.Timed;
import org.apache.jackrabbit.oak.jcr.delegate.SessionDelegate;
import org.apache.jackrabbit.oak.spi.security.SecurityProvider;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
@@ -59,21 +61,15 @@ public class RepositoryImpl implements J
* Name of the session attribute value determining the session refresh
* interval in seconds.
*
- * @see RefreshManager
+ * @see RefreshStrategy
*/
public static final String REFRESH_INTERVAL = "oak.refresh-interval";
- /**
- * Default value for {@link #REFRESH_INTERVAL}.
- */
- private static final long DEFAULT_REFRESH_INTERVAL = Long.getLong(
- "default-refresh-interval", Long.MAX_VALUE);
-
private final Descriptors descriptors = new Descriptors(new
SimpleValueFactory());
private final ContentRepository contentRepository;
protected final Whiteboard whiteboard;
private final SecurityProvider securityProvider;
- private final ThreadLocal<Integer> threadSafeCount;
+ private final ThreadLocal<Long> threadSaveCount;
public RepositoryImpl(@Nonnull ContentRepository contentRepository,
@Nonnull Whiteboard whiteboard,
@@ -81,7 +77,7 @@ public class RepositoryImpl implements J
this.contentRepository = checkNotNull(contentRepository);
this.whiteboard = checkNotNull(whiteboard);
this.securityProvider = checkNotNull(securityProvider);
- this.threadSafeCount = new ThreadLocal<Integer>();
+ this.threadSaveCount = new ThreadLocal<Long>();
}
//---------------------------------------------------------< Repository
>---
@@ -203,18 +199,13 @@ public class RepositoryImpl implements J
} else if (attributes.containsKey(REFRESH_INTERVAL)) {
throw new RepositoryException("Duplicate attribute '" +
REFRESH_INTERVAL + "'.");
}
- if (refreshInterval == null) {
- refreshInterval = DEFAULT_REFRESH_INTERVAL;
- }
+ RefreshStrategy refreshStrategy =
createRefreshStrategy(refreshInterval);
ContentSession contentSession =
contentRepository.login(credentials, workspaceName);
- RefreshManager refreshManager = new RefreshManager(
- MILLISECONDS.convert(refreshInterval, SECONDS),
threadSafeCount);
SessionDelegate sessionDelegate = new SessionDelegate(
- contentSession, refreshManager, securityProvider);
+ contentSession, refreshStrategy, securityProvider);
SessionContext context = createSessionContext(
- Collections.<String, Object>singletonMap(REFRESH_INTERVAL,
refreshInterval),
- sessionDelegate);
+ createAttributes(refreshInterval), sessionDelegate);
return context.getSession();
} catch (LoginException e) {
throw new javax.jcr.LoginException(e.getMessage(), e);
@@ -289,4 +280,39 @@ public class RepositoryImpl implements J
}
}
+ private static Map<String, Object> createAttributes(Long refreshInterval) {
+ return refreshInterval == null
+ ? Collections.<String, Object>emptyMap()
+ : Collections.<String, Object>singletonMap(REFRESH_INTERVAL,
refreshInterval);
+ }
+
+ /**
+ * Auto refresh logic for sessions, which is done to enhance backwards
compatibility with
+ * Jackrabbit 2.
+ * <p>
+ * A sessions is automatically refreshed when
+ * <ul>
+ * <li>it has not been accessed for the number of seconds specified by
the
+ * {@code refreshInterval} parameter,</li>
+ * <li>an observation event has been delivered to a listener
registered from within this
+ * session,</li>
+ * <li>an updated occurred through a different session from <em>within
the same
+ * thread.</em></li>
+ * </ul>
+ * In addition a warning is logged once per session if the session is
accessed after one
+ * minute of inactivity.
+ */
+ private RefreshStrategy createRefreshStrategy(Long refreshInterval) {
+ return new RefreshStrategy(refreshInterval == null
+ ? new RefreshStrategy[] {
+ new Once(false),
+ new LogOnce(60),
+ new ThreadSynchronising(threadSaveCount)}
+ : new RefreshStrategy[] {
+ new Once(false),
+ new Timed(refreshInterval),
+ new LogOnce(60),
+ new ThreadSynchronising(threadSaveCount)});
+ }
+
}
\ No newline at end of file
Modified:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java?rev=1519422&r1=1519421&r2=1519422&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
(original)
+++
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
Mon Sep 2 12:55:55 2013
@@ -35,7 +35,7 @@ import org.apache.jackrabbit.oak.api.Roo
import org.apache.jackrabbit.oak.api.Tree;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.core.IdentifierManager;
-import org.apache.jackrabbit.oak.jcr.RefreshManager;
+import org.apache.jackrabbit.oak.jcr.RefreshStrategy;
import org.apache.jackrabbit.oak.jcr.operation.SessionOperation;
import org.apache.jackrabbit.oak.jcr.security.AccessManager;
import org.apache.jackrabbit.oak.spi.security.SecurityProvider;
@@ -52,7 +52,7 @@ public class SessionDelegate {
static final Logger log = LoggerFactory.getLogger(SessionDelegate.class);
private final ContentSession contentSession;
- private final RefreshManager refreshManager;
+ private final RefreshStrategy refreshStrategy;
private final Root root;
private final IdentifierManager idManager;
@@ -71,13 +71,13 @@ public class SessionDelegate {
* dispatcher in order.
*
* @param contentSession the content session
- * @param refreshManager the refresh manager used to handle auto
refreshing this session
+ * @param refreshStrategy the refresh strategy used for auto refreshing
this session
* @param securityProvider the security provider
*/
- public SessionDelegate(@Nonnull ContentSession contentSession,
RefreshManager refreshManager,
+ public SessionDelegate(@Nonnull ContentSession contentSession,
RefreshStrategy refreshStrategy,
SecurityProvider securityProvider) {
this.contentSession = checkNotNull(contentSession);
- this.refreshManager = checkNotNull(refreshManager);
+ this.refreshStrategy = checkNotNull(refreshStrategy);
this.root = contentSession.getLatestRoot();
this.idManager = new IdentifierManager(root);
this.permissionProvider = checkNotNull(securityProvider)
@@ -86,7 +86,7 @@ public class SessionDelegate {
}
public synchronized void refreshAtNextAccess() {
- refreshManager.refreshAtNextAccess();
+ refreshStrategy.accept(RefreshStrategy.Once.RESETTING_VISITOR);
}
/**
@@ -106,8 +106,9 @@ public class SessionDelegate {
// Synchronize to avoid conflicting refreshes from concurrent JCR API
calls
if (sessionOpCount == 0) {
// Refresh and precondition checks only for non re-entrant session
operations
- if (refreshManager.needsRefresh(sessionOperation)) {
+ if (refreshStrategy.needsRefresh(sessionOperation)) {
refresh(true);
+ refreshStrategy.refreshed();
updateCount++;
}
sessionOperation.checkPreconditions();
@@ -120,6 +121,11 @@ public class SessionDelegate {
if (sessionOperation.isUpdate()) {
updateCount++;
}
+ if (sessionOperation.isSave()) {
+ refreshStrategy.saved();
+ } else if (sessionOperation.isRefresh()) {
+ refreshStrategy.refreshed();
+ }
}
}