Author: mduerig
Date: Wed Mar 12 11:38:56 2014
New Revision: 1576702

URL: http://svn.apache.org/r1576702
Log:
OAK-1497: JackrabbitEvent#isExternal() returns true for internal changes for 
slow listeners
Block commits once revision queue fills up

Added:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiter.java
Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
    
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java
    
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
    
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
    
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/osgi/OsgiRepository.java
    
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/osgi/RepositoryManager.java
    
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/repository/RepositoryImpl.java
    
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/SessionContext.java

Added: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiter.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiter.java?rev=1576702&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiter.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiter.java
 Wed Mar 12 11:38:56 2014
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.observation;
+
+import static org.apache.jackrabbit.oak.api.CommitFailedException.OAK;
+
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.spi.commit.CommitHook;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+
+/**
+ * This {@code CommitHook} can be used to block commits for any length of time.
+ * As long as commits are blocked this hook throws a {@code 
CommitFailedException}.
+ */
+public class CommitRateLimiter implements CommitHook {
+    private volatile boolean blockCommits;
+
+    /**
+     * Block any further commits until {@link #unblockCommits()} is called.
+     */
+    public void blockCommits() {
+        blockCommits = true;
+    }
+
+    /**
+     * Unblock blocked commits.
+     */
+    public void unblockCommits() {
+        blockCommits = false;
+    }
+
+    @Nonnull
+    @Override
+    public NodeState processCommit(NodeState before, NodeState after, 
CommitInfo info)
+            throws CommitFailedException {
+        if (blockCommits) {
+            throw new CommitFailedException(OAK, 1, "System busy. Try again 
later.");
+        } else {
+            return after;
+        }
+    }
+}

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java?rev=1576702&r1=1576701&r2=1576702&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
 Wed Mar 12 11:38:56 2014
@@ -78,6 +78,11 @@ public class BackgroundObserver implemen
      */
     private final BlockingQueue<ContentChange> queue;
 
+    /**
+     * Maximal number of elements before queue will start to block
+     */
+    private final int queueLength;
+
     private static class ContentChange {
         private final NodeState root;
         private final CommitInfo info;
@@ -118,6 +123,7 @@ public class BackgroundObserver implemen
                         observer.contentChanged(change.root, change.info);
                         change = queue.poll();
                     }
+                    queueEmpty();
                 } catch (Throwable t) {
                     exceptionHandler.uncaughtException(Thread.currentThread(), 
t);
                 }
@@ -146,6 +152,7 @@ public class BackgroundObserver implemen
         this.executor = checkNotNull(executor);
         this.exceptionHandler = checkNotNull(exceptionHandler);
         this.queue = newArrayBlockingQueue(queueLength);
+        this.queueLength = queueLength;
     }
 
     public BackgroundObserver(
@@ -166,6 +173,9 @@ public class BackgroundObserver implemen
         this(observer, executor, 1000);
     }
 
+    protected void queueNearlyFull() {}
+    protected void queueEmpty() {}
+
     /**
      * Clears the change queue and signals the background thread to stop
      * without making any further {@link #contentChanged(NodeState, 
CommitInfo)}
@@ -227,6 +237,10 @@ public class BackgroundObserver implemen
             last = change;
         }
 
+        if (10 * queue.remainingCapacity() < queueLength) {
+            queueNearlyFull();
+        }
+
         // Set the completion handler on the currently running task. Multiple 
calls
         // to onComplete are not a problem here since we always pass the same 
value.
         // Thus there is no question as to which of the handlers will 
effectively run.

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java?rev=1576702&r1=1576701&r2=1576702&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java
 Wed Mar 12 11:38:56 2014
@@ -40,6 +40,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.name.NamespaceEditorProvider;
 import org.apache.jackrabbit.oak.plugins.nodetype.TypeEditorProvider;
 import org.apache.jackrabbit.oak.plugins.nodetype.write.InitialContent;
+import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
 import org.apache.jackrabbit.oak.plugins.version.VersionEditorProvider;
 import org.apache.jackrabbit.oak.security.SecurityProviderImpl;
 import org.apache.jackrabbit.oak.spi.commit.CommitHook;
@@ -60,6 +61,7 @@ public class Jcr {
 
     private SecurityProvider securityProvider;
     private int observationQueueLength = DEFAULT_OBSERVATION_QUEUE_LENGTH;
+    private CommitRateLimiter commitRateLimiter = null;
 
     public Jcr(Oak oak) {
         this.oak = oak;
@@ -174,12 +176,20 @@ public class Jcr {
         return this;
     }
 
+    @Nonnull
+    public Jcr with(CommitRateLimiter commitRateLimiter) {
+        oak.with(commitRateLimiter);
+        this.commitRateLimiter = commitRateLimiter;
+        return this;
+    }
+
     public Repository createRepository() {
         return new RepositoryImpl(
                 oak.createContentRepository(), 
                 oak.getWhiteboard(),
                 securityProvider,
-                observationQueueLength);
+                observationQueueLength,
+                commitRateLimiter);
     }
 
 }

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java?rev=1576702&r1=1576701&r2=1576702&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
 Wed Mar 12 11:38:56 2014
@@ -41,6 +41,7 @@ import org.apache.jackrabbit.commons.ite
 import org.apache.jackrabbit.commons.observation.ListenerTracker;
 import org.apache.jackrabbit.oak.api.ContentSession;
 import org.apache.jackrabbit.oak.namepath.NamePathMapper;
+import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
 import org.apache.jackrabbit.oak.plugins.observation.filter.ACFilter;
 import org.apache.jackrabbit.oak.plugins.observation.filter.EventFilter;
 import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
@@ -78,6 +79,7 @@ class ChangeProcessor implements Observe
     private final AtomicLong eventCount;
     private final AtomicLong eventDuration;
     private final int queueLength;
+    private final CommitRateLimiter commitRateLimiter;
 
     private CompositeRegistration registration;
     private NodeState previousRoot;
@@ -89,7 +91,8 @@ class ChangeProcessor implements Observe
             ListenerTracker tracker,
             FilterProvider filter,
             StatisticManager statisticManager,
-            int queueLength) {
+            int queueLength,
+            CommitRateLimiter commitRateLimiter) {
         this.contentSession = contentSession;
         this.namePathMapper = namePathMapper;
         this.permissionProvider = permissionProvider;
@@ -99,6 +102,7 @@ class ChangeProcessor implements Observe
         this.eventCount = 
statisticManager.getCounter(OBSERVATION_EVENT_COUNTER);
         this.eventDuration = 
statisticManager.getCounter(OBSERVATION_EVENT_DURATION);
         this.queueLength = queueLength;
+        this.commitRateLimiter = commitRateLimiter;
     }
 
     /**
@@ -120,7 +124,7 @@ class ChangeProcessor implements Observe
         final WhiteboardExecutor executor = new WhiteboardExecutor();
         executor.start(whiteboard);
         registration = new CompositeRegistration(
-            registerObserver(whiteboard, new BackgroundObserver(this, 
executor, queueLength)),
+            registerObserver(whiteboard, createObserver(executor)),
             registerMBean(whiteboard, EventListenerMBean.class,
                     tracker.getListenerMBean(), "EventListener", 
tracker.toString()),
             new Registration() {
@@ -131,6 +135,24 @@ class ChangeProcessor implements Observe
     });
     }
 
+    private BackgroundObserver createObserver(final WhiteboardExecutor 
executor) {
+        if (commitRateLimiter == null) {
+            return new BackgroundObserver(this, executor, queueLength);
+        } else {
+            return new BackgroundObserver(this, executor, queueLength) {
+                @Override
+                protected void queueNearlyFull() {
+                    commitRateLimiter.blockCommits();
+                }
+
+                @Override
+                protected void queueEmpty() {
+                    commitRateLimiter.unblockCommits();
+                }
+            };
+        }
+    }
+
     private final Monitor runningMonitor = new Monitor();
     private final RunningGuard running = new RunningGuard(runningMonitor);
 

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java?rev=1576702&r1=1576701&r2=1576702&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
 Wed Mar 12 11:38:56 2014
@@ -44,6 +44,7 @@ import org.apache.jackrabbit.oak.jcr.del
 import org.apache.jackrabbit.oak.jcr.session.SessionContext;
 import org.apache.jackrabbit.oak.namepath.NamePathMapper;
 import org.apache.jackrabbit.oak.plugins.nodetype.ReadOnlyNodeTypeManager;
+import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
 import org.apache.jackrabbit.oak.plugins.observation.ExcludeExternal;
 import org.apache.jackrabbit.oak.plugins.observation.filter.FilterBuilder;
 import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
@@ -77,6 +78,7 @@ public class ObservationManagerImpl impl
     private final Whiteboard whiteboard;
     private final StatisticManager statisticManager;
     private final int queueLength;
+    private final CommitRateLimiter commitRateLimiter;
 
     /**
      * Create a new instance based on a {@link ContentSession} that needs to 
implement
@@ -91,7 +93,7 @@ public class ObservationManagerImpl impl
     public ObservationManagerImpl(
             SessionContext sessionContext, ReadOnlyNodeTypeManager 
nodeTypeManager,
             PermissionProvider permissionProvider, Whiteboard whiteboard,
-            int queueLength) {
+            int queueLength, CommitRateLimiter commitRateLimiter) {
 
         this.sessionDelegate = sessionContext.getSessionDelegate();
         this.ntMgr = nodeTypeManager;
@@ -100,6 +102,7 @@ public class ObservationManagerImpl impl
         this.whiteboard = whiteboard;
         this.statisticManager = sessionContext.getStatisticManager();
         this.queueLength = queueLength;
+        this.commitRateLimiter = commitRateLimiter;
     }
 
     public void dispose() {
@@ -122,7 +125,8 @@ public class ObservationManagerImpl impl
             LOG.debug(OBSERVATION,
                     "Registering event listener {} with filter {}", listener, 
filterProvider);
             processor = new 
ChangeProcessor(sessionDelegate.getContentSession(), namePathMapper,
-                    permissionProvider, tracker, filterProvider, 
statisticManager, queueLength);
+                    permissionProvider, tracker, filterProvider, 
statisticManager, queueLength,
+                    commitRateLimiter);
             processors.put(listener, processor);
             processor.start(whiteboard);
         } else {

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/osgi/OsgiRepository.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/osgi/OsgiRepository.java?rev=1576702&r1=1576701&r2=1576702&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/osgi/OsgiRepository.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/osgi/OsgiRepository.java
 Wed Mar 12 11:38:56 2014
@@ -23,6 +23,7 @@ import javax.jcr.Session;
 import org.apache.jackrabbit.oak.Oak;
 import org.apache.jackrabbit.oak.api.ContentRepository;
 import org.apache.jackrabbit.oak.jcr.repository.RepositoryImpl;
+import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
 import org.apache.jackrabbit.oak.spi.security.SecurityProvider;
 import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 
@@ -36,8 +37,9 @@ public class OsgiRepository extends Repo
     public OsgiRepository(ContentRepository repository,
                           Whiteboard whiteboard,
                           SecurityProvider securityProvider,
-                          int observationQueueLength) {
-        super(repository, whiteboard, securityProvider, 
observationQueueLength);
+                          int observationQueueLength,
+                          CommitRateLimiter commitRateLimiter) {
+        super(repository, whiteboard, securityProvider, 
observationQueueLength, commitRateLimiter);
     }
 
     @Override

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/osgi/RepositoryManager.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/osgi/RepositoryManager.java?rev=1576702&r1=1576701&r2=1576702&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/osgi/RepositoryManager.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/osgi/RepositoryManager.java
 Wed Mar 12 11:38:56 2014
@@ -30,11 +30,11 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.jackrabbit.oak.Oak;
-import org.apache.jackrabbit.oak.api.ContentRepository;
 import org.apache.jackrabbit.oak.commons.PropertiesUtil;
 import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
 import org.apache.jackrabbit.oak.plugins.commit.JcrConflictHandler;
 import org.apache.jackrabbit.oak.plugins.nodetype.write.InitialContent;
+import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
 import org.apache.jackrabbit.oak.spi.security.SecurityProvider;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
@@ -54,6 +54,7 @@ import org.osgi.framework.ServiceRegistr
 @Component(policy = ConfigurationPolicy.REQUIRE)
 public class RepositoryManager {
     private static final int DEFAULT_OBSERVATION_QUEUE_LENGTH = 1000;
+    private static final boolean DEFAULT_COMMIT_RATE_LIMIT = false;
 
     private final WhiteboardEditorProvider editorProvider =
             new WhiteboardEditorProvider();
@@ -72,6 +73,8 @@ public class RepositoryManager {
 
     private int observationQueueLength;
 
+    private CommitRateLimiter commitRateLimiter;
+
     @Reference
     private SecurityProvider securityProvider;
 
@@ -79,16 +82,30 @@ public class RepositoryManager {
     private NodeStore store;
 
     @Property(
-            intValue = DEFAULT_OBSERVATION_QUEUE_LENGTH,
-            name = "Observation queue length",
-            description = "Maximum number of pending revisions in a 
observation listener queue")
-    private static final String OBSERVATION_QUEUE_LENGTH = 
"oak.observation-queue-length";
+        intValue = DEFAULT_OBSERVATION_QUEUE_LENGTH,
+        name = "Observation queue length",
+        description = "Maximum number of pending revisions in a observation 
listener queue")
+    private static final String OBSERVATION_QUEUE_LENGTH = 
"oak.observation.queue-length";
+
+    @Property(
+        boolValue = DEFAULT_COMMIT_RATE_LIMIT,
+        name = "Commit rate limiter",
+        description = "Limit the commit rate once the number of pending 
revisions in the observation " +
+                "queue exceed 90% of its capacity.")
+    private static final String COMMIT_RATE_LIMIT = 
"oak.observation.limit-commit-rate";
 
     @Activate
     public void activate(BundleContext bundleContext, Map<String, ?> config) 
throws Exception {
         observationQueueLength = PropertiesUtil.toInteger(prop(
                 config, bundleContext, OBSERVATION_QUEUE_LENGTH), 
DEFAULT_OBSERVATION_QUEUE_LENGTH);
 
+        if(PropertiesUtil.toBoolean(prop(
+                config, bundleContext, COMMIT_RATE_LIMIT), 
DEFAULT_COMMIT_RATE_LIMIT)) {
+            commitRateLimiter = new CommitRateLimiter();
+        } else {
+            commitRateLimiter = null;
+        }
+
         whiteboard = new OsgiWhiteboard(bundleContext);
         editorProvider.start(whiteboard);
         indexEditorProvider.start(whiteboard);
@@ -122,7 +139,7 @@ public class RepositoryManager {
     }
 
     private ServiceRegistration registerRepository(BundleContext 
bundleContext) {
-        ContentRepository cr = new Oak(store)
+        Oak oak = new Oak(store)
                 .with(new InitialContent())
                 .with(JcrConflictHandler.JCR_CONFLICT_HANDLER)
                 .with(whiteboard)
@@ -131,12 +148,16 @@ public class RepositoryManager {
                 .with(indexEditorProvider)
                 .with(indexProvider)
                 .withAsyncIndexing()
-                .with(executor)
-                .createContentRepository();
+                .with(executor);
+
+        if (commitRateLimiter != null) {
+            oak.with(commitRateLimiter);
+        }
 
         return bundleContext.registerService(
                 Repository.class.getName(),
-                new OsgiRepository(cr, whiteboard, securityProvider, 
observationQueueLength),
+                new OsgiRepository(oak.createContentRepository(), whiteboard, 
securityProvider,
+                        observationQueueLength, commitRateLimiter),
                 new Properties());
     }
 }

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/repository/RepositoryImpl.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/repository/RepositoryImpl.java?rev=1576702&r1=1576701&r2=1576702&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/repository/RepositoryImpl.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/repository/RepositoryImpl.java
 Wed Mar 12 11:38:56 2014
@@ -47,6 +47,7 @@ import org.apache.jackrabbit.commons.Sim
 import org.apache.jackrabbit.oak.api.ContentRepository;
 import org.apache.jackrabbit.oak.api.ContentSession;
 import org.apache.jackrabbit.oak.api.jmx.SessionMBean;
+import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
 import org.apache.jackrabbit.oak.stats.Clock;
 import org.apache.jackrabbit.oak.stats.StatisticManager;
 import org.apache.jackrabbit.oak.jcr.delegate.SessionDelegate;
@@ -84,6 +85,7 @@ public class RepositoryImpl implements J
     protected final Whiteboard whiteboard;
     private final SecurityProvider securityProvider;
     private final int observationQueueLength;
+    private final CommitRateLimiter commitRateLimiter;
 
     private final Clock clock;
 
@@ -107,11 +109,13 @@ public class RepositoryImpl implements J
     public RepositoryImpl(@Nonnull ContentRepository contentRepository,
                           @Nonnull Whiteboard whiteboard,
                           @Nonnull SecurityProvider securityProvider,
-                          int observationQueueLength) {
+                          int observationQueueLength,
+                          CommitRateLimiter commitRateLimiter) {
         this.contentRepository = checkNotNull(contentRepository);
         this.whiteboard = checkNotNull(whiteboard);
         this.securityProvider = checkNotNull(securityProvider);
         this.observationQueueLength = observationQueueLength;
+        this.commitRateLimiter = commitRateLimiter;
         this.descriptors = determineDescriptors();
         this.statisticManager = new StatisticManager(whiteboard, 
scheduledExecutor);
         this.clock = new Clock.Fast(scheduledExecutor);
@@ -242,7 +246,7 @@ public class RepositoryImpl implements J
             SessionDelegate sessionDelegate = 
createSessionDelegate(refreshStrategy, contentSession);
             SessionContext context = createSessionContext(
                     statisticManager, securityProvider, 
createAttributes(refreshInterval),
-                    sessionDelegate, observationQueueLength);
+                    sessionDelegate, observationQueueLength, 
commitRateLimiter);
             return context.getSession();
         } catch (LoginException e) {
             throw new javax.jcr.LoginException(e.getMessage(), e);
@@ -298,9 +302,10 @@ public class RepositoryImpl implements J
      */
     protected SessionContext createSessionContext(
             StatisticManager statisticManager, SecurityProvider 
securityProvider,
-            Map<String, Object> attributes, SessionDelegate delegate, int 
observationQueueLength) {
+            Map<String, Object> attributes, SessionDelegate delegate, int 
observationQueueLength,
+            CommitRateLimiter commitRateLimiter) {
         return new SessionContext(this, statisticManager, securityProvider, 
whiteboard, attributes,
-                delegate, observationQueueLength);
+                delegate, observationQueueLength, commitRateLimiter);
     }
 
     /**

Modified: 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/SessionContext.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/SessionContext.java?rev=1576702&r1=1576701&r2=1576702&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/SessionContext.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/SessionContext.java
 Wed Mar 12 11:38:56 2014
@@ -57,6 +57,7 @@ import org.apache.jackrabbit.oak.jcr.ses
 import org.apache.jackrabbit.oak.namepath.NamePathMapper;
 import org.apache.jackrabbit.oak.namepath.NamePathMapperImpl;
 import org.apache.jackrabbit.oak.plugins.nodetype.ReadOnlyNodeTypeManager;
+import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
 import org.apache.jackrabbit.oak.plugins.value.ValueFactoryImpl;
 import org.apache.jackrabbit.oak.spi.security.SecurityConfiguration;
 import org.apache.jackrabbit.oak.spi.security.SecurityProvider;
@@ -88,6 +89,7 @@ public class SessionContext implements N
     private final Map<String, Object> attributes;
     private final SessionDelegate delegate;
     private final int observationQueueLength;
+    private final CommitRateLimiter commitRateLimiter;
 
     private final SessionNamespaces namespaces;
     private final NamePathMapper namePathMapper;
@@ -114,7 +116,7 @@ public class SessionContext implements N
             @Nonnull Repository repository, @Nonnull StatisticManager 
statisticManager,
             @Nonnull SecurityProvider securityProvider, @Nonnull Whiteboard 
whiteboard,
             @Nonnull Map<String, Object> attributes, @Nonnull final 
SessionDelegate delegate,
-            int observationQueueLength) {
+            int observationQueueLength, CommitRateLimiter commitRateLimiter) {
         this.repository = checkNotNull(repository);
         this.statisticManager = statisticManager;
         this.securityProvider = checkNotNull(securityProvider);
@@ -122,6 +124,7 @@ public class SessionContext implements N
         this.attributes = checkNotNull(attributes);
         this.delegate = checkNotNull(delegate);
         this.observationQueueLength = observationQueueLength;
+        this.commitRateLimiter = commitRateLimiter;
         SessionStats sessionStats = delegate.getSessionStats();
         sessionStats.setAttributes(attributes);
 
@@ -267,7 +270,7 @@ public class SessionContext implements N
             observationManager = new ObservationManagerImpl(
                 this,
                 ReadOnlyNodeTypeManager.getInstance(delegate.getRoot(), 
namePathMapper),
-                getPermissionProvider(), whiteboard, observationQueueLength);
+                getPermissionProvider(), whiteboard, observationQueueLength, 
commitRateLimiter);
         }
         return observationManager;
     }


Reply via email to