Author: jukka
Date: Wed Mar 5 19:11:06 2014
New Revision: 1574626
URL: http://svn.apache.org/r1574626
Log:
OAK-1418: Read performance regression
Inline RefreshStrategy.Once and .ThreadSynchronizing to optimize the
critical path in ReadPropertyTest
Modified:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/repository/RepositoryImpl.java
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/RefreshStrategy.java
Modified:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java?rev=1574626&r1=1574625&r2=1574626&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
(original)
+++
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/delegate/SessionDelegate.java
Wed Mar 5 19:11:06 2014
@@ -66,6 +66,22 @@ public class SessionDelegate {
private final ContentSession contentSession;
private final RefreshStrategy refreshStrategy;
+ private boolean refreshAtNextAccess = false;
+
+ /**
+ * The repository-wide {@link ThreadLocal} that keeps track of the number
+ * of saves performed in each thread.
+ */
+ private final ThreadLocal<Long> threadSaveCount;
+
+ /**
+ * Local copy of the {@link #threadSaveCount} for the current thread.
+ * If the repository-wide counter differs from our local copy, then
+ * some other session would have done a commit or this session is
+ * being accessed from some other thread. In either case it's best to
+ * refresh this session to avoid unexpected behaviour.
+ */
+ private long sessionSaveCount;
private final Root root;
private final IdentifierManager idManager;
@@ -111,9 +127,12 @@ public class SessionDelegate {
public SessionDelegate(
@Nonnull ContentSession contentSession,
@Nonnull RefreshStrategy refreshStrategy,
+ @Nonnull ThreadLocal<Long> threadSaveCount,
@Nonnull StatisticManager statisticManager) {
this.contentSession = checkNotNull(contentSession);
this.refreshStrategy = checkNotNull(refreshStrategy);
+ this.threadSaveCount = checkNotNull(threadSaveCount);
+ this.sessionSaveCount = getThreadSaveCount();
this.root = contentSession.getLatestRoot();
this.idManager = new IdentifierManager(root);
this.sessionStats = new SessionStats(this);
@@ -129,6 +148,11 @@ public class SessionDelegate {
return sessionStats;
}
+ private long getThreadSaveCount() {
+ Long c = threadSaveCount.get();
+ return c == null ? 0 : c;
+ }
+
public long getNanosecondsSinceLastAccess() {
return nanoTime() - lastAccessTimeNS;
}
@@ -182,8 +206,8 @@ public class SessionDelegate {
return saveCount;
}
- public void refreshAtNextAccess() {
- refreshStrategy.refreshAtNextAccess();
+ public synchronized void refreshAtNextAccess() {
+ refreshAtNextAccess = true;
}
/**
@@ -221,9 +245,12 @@ public class SessionDelegate {
if (!sessionOperation.isRefresh()
&& !sessionOperation.isSave()
&& !sessionOperation.isLogout()
- && refreshStrategy.needsRefresh(t0 - lastAccessTimeNS)) {
+ && (refreshAtNextAccess
+ || sessionSaveCount != getThreadSaveCount()
+ || refreshStrategy.needsRefresh(t0 -
lastAccessTimeNS))) {
refresh(true);
- refreshStrategy.refreshed();
+ refreshAtNextAccess = false;
+ sessionSaveCount = getThreadSaveCount();
updateCount++;
}
sessionOperation.checkPreconditions();
@@ -250,9 +277,12 @@ public class SessionDelegate {
readDuration.addAndGet(dt);
}
if (sessionOperation.isSave()) {
- refreshStrategy.saved();
+ refreshAtNextAccess = false;
+ // Force refreshing on access through other sessions on the
same thread
+ threadSaveCount.set(sessionSaveCount = (getThreadSaveCount() +
1));
} else if (sessionOperation.isRefresh()) {
- refreshStrategy.refreshed();
+ refreshAtNextAccess = false;
+ sessionSaveCount = getThreadSaveCount();
}
}
}
Modified:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/repository/RepositoryImpl.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/repository/RepositoryImpl.java?rev=1574626&r1=1574625&r2=1574626&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/repository/RepositoryImpl.java
(original)
+++
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/repository/RepositoryImpl.java
Wed Mar 5 19:11:06 2014
@@ -40,6 +40,7 @@ import com.google.common.util.concurrent
import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+
import org.apache.jackrabbit.api.JackrabbitRepository;
import
org.apache.jackrabbit.api.security.authentication.token.TokenCredentials;
import org.apache.jackrabbit.commons.SimpleValueFactory;
@@ -49,10 +50,6 @@ import org.apache.jackrabbit.oak.api.jmx
import org.apache.jackrabbit.oak.stats.StatisticManager;
import org.apache.jackrabbit.oak.jcr.delegate.SessionDelegate;
import org.apache.jackrabbit.oak.jcr.session.RefreshStrategy;
-import org.apache.jackrabbit.oak.jcr.session.RefreshStrategy.LogOnce;
-import org.apache.jackrabbit.oak.jcr.session.RefreshStrategy.Once;
-import
org.apache.jackrabbit.oak.jcr.session.RefreshStrategy.ThreadSynchronising;
-import org.apache.jackrabbit.oak.jcr.session.RefreshStrategy.Timed;
import org.apache.jackrabbit.oak.jcr.session.SessionContext;
import org.apache.jackrabbit.oak.jcr.session.SessionStats;
import org.apache.jackrabbit.oak.spi.security.SecurityProvider;
@@ -85,7 +82,20 @@ public class RepositoryImpl implements J
private final ContentRepository contentRepository;
protected final Whiteboard whiteboard;
private final SecurityProvider securityProvider;
- private final ThreadLocal<Long> threadSaveCount;
+
+ /**
+ * {@link ThreadLocal} counter that keeps track of the save operations
+ * performed per thread so far. This is is then used to determine if
+ * the current session needs to be refreshed to see the changes done by
+ * another session in the same thread.
+ * <p>
+ * <b>Note</b> - This thread local is never cleared. However, we only
+ * store a {@link Long} instance and do not derive from
+ * {@link ThreadLocal} so that (class loader) leaks typically associated
+ * with thread locals do not occur.
+ */
+ private final ThreadLocal<Long> threadSaveCount = new ThreadLocal<Long>();
+
private final ListeningScheduledExecutorService scheduledExecutor =
MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor());
private final StatisticManager statisticManager;
@@ -96,7 +106,6 @@ public class RepositoryImpl implements J
this.contentRepository = checkNotNull(contentRepository);
this.whiteboard = checkNotNull(whiteboard);
this.securityProvider = checkNotNull(securityProvider);
- this.threadSaveCount = new ThreadLocal<Long>();
this.descriptors = determineDescriptors();
this.statisticManager = new StatisticManager(whiteboard,
scheduledExecutor);
}
@@ -235,7 +244,9 @@ public class RepositoryImpl implements J
private SessionDelegate createSessionDelegate(
final RefreshStrategy refreshStrategy,
final ContentSession contentSession) {
- return new SessionDelegate(contentSession, refreshStrategy,
statisticManager) {
+ return new SessionDelegate(
+ contentSession, refreshStrategy,
+ threadSaveCount, statisticManager) {
// Defer session MBean registration to avoid cluttering the
// JMX name space with short lived sessions
ListenableScheduledFuture<Registration> registration =
scheduledExecutor.schedule(
@@ -364,15 +375,11 @@ public class RepositoryImpl implements J
* minute of inactivity.
*/
private RefreshStrategy createRefreshStrategy(Long refreshInterval) {
- return new RefreshStrategy(refreshInterval == null
- ? new RefreshStrategy[] {
- new Once(false),
- new LogOnce(60),
- new ThreadSynchronising(threadSaveCount)}
- : new RefreshStrategy[] {
- new Once(false),
- new Timed(refreshInterval),
- new ThreadSynchronising(threadSaveCount)});
+ if (refreshInterval == null) {
+ return new RefreshStrategy.LogOnce(60);
+ } else {
+ return new RefreshStrategy.Timed(refreshInterval);
+ }
}
private static class RegistrationCallable implements
Callable<Registration> {
Modified:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/RefreshStrategy.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/RefreshStrategy.java?rev=1574626&r1=1574625&r2=1574626&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/RefreshStrategy.java
(original)
+++
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/RefreshStrategy.java
Wed Mar 5 19:11:06 2014
@@ -23,41 +23,20 @@ import static java.util.concurrent.TimeU
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
-import org.apache.jackrabbit.oak.jcr.delegate.SessionDelegate;
-import org.apache.jackrabbit.oak.jcr.session.operation.SessionOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Instances of this class determine whether a session needs to be refreshed
- * before the next session operation is performed.
- * <p>
- * Before an operation is performed a session calls {@link
#needsRefresh(SessionDelegate)},
- * to determine whether the session needs to be refreshed first. To maintain a
session strategy's
- * state sessions call {@link #refreshed()} right after each refresh operation
and
- * {@link #saved()} right after each save operation.
- * <p>
- * {@code RefreshStrategy} is a composite of zero or more {@code
RefreshStrategy} instances,
- * each of which covers a certain strategy.
- * @see Default
- * @see Once
+ * Implementations of this interface determine whether a session needs
+ * to be refreshed before the next session operation is performed. This is
+ * done by the session calling {@link #needsRefresh(long)} to determine
+ * whether a refresh is needed.
+ *
+ * @see Composite
* @see Timed
* @see LogOnce
- * @see ThreadSynchronising
*/
-public class RefreshStrategy {
- private static final Logger log =
LoggerFactory.getLogger(RefreshStrategy.class);
-
- private final RefreshStrategy[] refreshStrategies;
-
- /**
- * Create a new instance consisting of the composite of the passed {@code
RefreshStrategy}
- * instances.
- * @param refreshStrategies individual refresh strategies
- */
- public RefreshStrategy(RefreshStrategy... refreshStrategies) {
- this.refreshStrategies = refreshStrategies;
- }
+public interface RefreshStrategy {
/**
* Determine whether the given session needs to refresh before the next
@@ -70,162 +49,51 @@ public class RefreshStrategy {
* @param nanosecondsSinceLastAccess nanoseconds since last access
* @return {@code true} if and only if the session needs to refresh.
*/
- public boolean needsRefresh(long nanosecondsSinceLastAccess) {
- for (RefreshStrategy r : refreshStrategies) {
- if (r.needsRefresh(nanosecondsSinceLastAccess)) {
- return true;
- }
- }
- return false;
- }
+ boolean needsRefresh(long nanosecondsSinceLastAccess);
/**
- * Called whenever a session has been refreshed.
- * <p>
- * This implementation forwards to the {@code refresh} method of the
individual refresh
- * strategies passed to the constructor.
+ * Composite of zero or more {@code RefreshStrategy} instances,
+ * each of which covers a certain strategy.
*/
- public void refreshed() {
- for (RefreshStrategy r : refreshStrategies) {
- r.refreshed();
- }
- }
+ public static class Composite implements RefreshStrategy {
- /**
- * Called whenever a session has been saved.
- * <p>
- * This implementation forwards to the {@code save} method of the
individual refresh
- * strategies passed to the constructor.
- */
- public void saved() {
- for (RefreshStrategy r : refreshStrategies) {
- r.saved();
- }
- }
-
- /**
- * Accept the passed visitor.
- * <p>
- * This implementation forwards to the {@code accept} method of the
individual refresh
- * strategies passed to the constructor.
- */
- public void accept(Visitor visitor) {
- for (RefreshStrategy r: refreshStrategies) {
- r.accept(visitor);
- }
- }
-
- /**
- * Force the next call to {@link #needsRefresh(SessionOperation)} to
return {@code true} for
- * every {@link Once} strategy in this composite.
- * <p>
- * This method is safe for calling concurrently to any other method of
this class.
- */
- public void refreshAtNextAccess() {
- accept(new Visitor() {
- @Override
- public void visit(Once strategy) {
- strategy.reset();
- }
- });
- }
-
- /**
- * Visitor for traversing the composite.
- */
- public static class Visitor {
- public void visit(Default strategy) {}
- public void visit(Once strategy) {}
- public void visit(Timed strategy) {}
- public void visit(LogOnce strategy) {}
- public void visit(ThreadSynchronising strategy) {}
- }
-
- /**
- * This refresh strategy does wither always or never refresh depending of
the value of the
- * {@code refresh} argument passed to its constructor.
- * <p>
- */
- public static class Default extends RefreshStrategy {
-
- /** A refresh strategy that always refreshed */
- public static RefreshStrategy ALWAYS = new Default(true);
-
- /** A refresh strategy that never refreshed */
- public static RefreshStrategy NEVER = new Default(false);
-
- /** Value returned from {@code needsRefresh} */
- protected volatile boolean refresh;
+ private final RefreshStrategy[] refreshStrategies;
/**
- * @param refresh value returned from {@code needsRefresh}
+ * Create a new instance consisting of the composite of the
+ * passed {@code RefreshStrategy} instances.
+ * @param refreshStrategies individual refresh strategies
*/
- public Default(boolean refresh) {
- this.refresh = refresh;
+ public Composite(RefreshStrategy... refreshStrategies) {
+ this.refreshStrategies = refreshStrategies;
}
/**
- * @return {@link #refresh}
+ * Determine whether the given session needs to refresh before the next
+ * session operation is performed.
+ * <p>
+ * This implementation returns {@code true} if and only if any of the
+ * individual refresh strategies passed to the constructor returns
+ * {@code true}.
+ *
+ * @param nanosecondsSinceLastAccess nanoseconds since last access
+ * @return {@code true} if and only if the session needs to refresh.
*/
- @Override
public boolean needsRefresh(long nanosecondsSinceLastAccess) {
- return refresh;
- }
-
- @Override
- public void refreshed() {
- }
-
- @Override
- public void saved() {
- }
-
- @Override
- public void accept(Visitor visitor) {
- visitor.visit(this);
- }
- }
-
- /**
- * This refresh strategy refreshed exactly once when enabled. Calling
- * {@link #reset()} enables the strategy.
- */
- public static class Once extends Default {
-
- /**
- * @param enabled whether this refresh strategy is initially enabled
- */
- public Once(boolean enabled) {
- super(enabled);
- }
-
- /**
- * Enable this refresh strategy
- */
- public void reset() {
- refresh = true;
- }
-
- @Override
- public void refreshed() {
- refresh = false;
- }
-
- @Override
- public void saved() {
- refresh = false;
+ for (RefreshStrategy r : refreshStrategies) {
+ if (r.needsRefresh(nanosecondsSinceLastAccess)) {
+ return true;
+ }
+ }
+ return false;
}
- @Override
- public void accept(Visitor visitor) {
- visitor.visit(this);
- }
}
/**
* This refresh strategy refreshes after a given timeout of inactivity.
*/
- public static class Timed extends RefreshStrategy {
+ public static class Timed implements RefreshStrategy {
private final long interval;
@@ -242,10 +110,6 @@ public class RefreshStrategy {
return nanosecondsSinceLastAccess > interval;
}
- @Override
- public void accept(Visitor visitor) {
- visitor.visit(this);
- }
}
/**
@@ -254,8 +118,13 @@ public class RefreshStrategy {
*
* TODO replace logging with JMX monitoring. See OAK-941
*/
- public static class LogOnce extends RefreshStrategy {
- private final Exception initStackTrace = new Exception("The session
was created here:");
+ public static class LogOnce implements RefreshStrategy {
+
+ private static final Logger log =
+ LoggerFactory.getLogger(RefreshStrategy.class);
+
+ private final Exception initStackTrace =
+ new Exception("The session was created here:");
private final long interval;
@@ -287,65 +156,6 @@ public class RefreshStrategy {
return false;
}
- @Override
- public void accept(Visitor visitor) {
- visitor.visit(this);
- }
}
- /**
- * This refresh strategy synchronises session states across accesses
within the same thread.
- */
- public static class ThreadSynchronising extends RefreshStrategy {
- /**
- * ThreadLocal instance to keep track of the save operations performed
in the thread so far
- * This is is then used to determine if the current session needs to
be refreshed to see the
- * changes done by another session in current thread.
- * <p>
- * <b>Note</b> - This thread local is never cleared. However, we only
store
- * java.lang.Integer and do not derive from ThreadLocal such that
(class loader)
- * leaks typically associated with thread locals do not occur.
- */
- private final ThreadLocal<Long> threadSaveCount;
-
- private long sessionSaveCount;
-
- /**
- * @param threadSaveCount thread local for tracking thread local
state.
- */
- public ThreadSynchronising(ThreadLocal<Long> threadSaveCount) {
- this.threadSaveCount = threadSaveCount;
- sessionSaveCount = getThreadSaveCount();
- }
-
- @Override
- public boolean needsRefresh(long nanosecondsSinceLastAccess) {
- // If the threadLocal counter differs from our seen
sessionSaveCount so far then
- // some other session would have done a commit. If that is the
case a refresh would
- // be required
- return getThreadSaveCount() != sessionSaveCount;
- }
-
- @Override
- public void refreshed() {
- // Avoid further refreshing if refreshed already
- sessionSaveCount = getThreadSaveCount();
- }
-
- @Override
- public void saved() {
- // Force refreshing on access through other sessions on the same
thread
- threadSaveCount.set(sessionSaveCount = (getThreadSaveCount() + 1));
- }
-
- private long getThreadSaveCount() {
- Long c = threadSaveCount.get();
- return c == null ? 0 : c;
- }
-
- @Override
- public void accept(Visitor visitor) {
- visitor.visit(this);
- }
- }
}