This is an automated email from the ASF dual-hosted git repository. daim pushed a commit to branch OAK-12072 in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
commit 3e53b55b2765dbf4ca65c83204db150a3788a34e Author: rishabhdaim <[email protected]> AuthorDate: Mon Feb 2 19:41:37 2026 +0530 OAK-12072 : removed Guava's Monitor and Guard --- .../oak/jcr/observation/ChangeProcessor.java | 150 +++++++++++++-------- 1 file changed, 97 insertions(+), 53 deletions(-) diff --git a/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java b/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java index 3163afdbaf..136b172588 100644 --- a/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java +++ b/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java @@ -29,8 +29,10 @@ import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.scheduleW import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import javax.jcr.observation.Event; import javax.jcr.observation.EventIterator; @@ -73,9 +75,6 @@ import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.jackrabbit.guava.common.util.concurrent.Monitor; -import org.apache.jackrabbit.guava.common.util.concurrent.Monitor.Guard; - /** * A {@code ChangeProcessor} generates observation {@link javax.jcr.observation.Event}s * based on a {@link FilterProvider filter} and delivers them to an {@link EventListener}. @@ -417,8 +416,8 @@ class ChangeProcessor implements FilteringAwareObserver { }); } - private final Monitor runningMonitor = new Monitor(); - private final RunningGuard running = new RunningGuard(runningMonitor); + private final AtomicBoolean stopped = new AtomicBoolean(false); + private final ReentrantLock runningLock = new ReentrantLock(false); /** * Try to stop this change processor if running. This method will wait @@ -437,18 +436,23 @@ class ChangeProcessor implements FilteringAwareObserver { */ public synchronized boolean stopAndWait(int timeOut, TimeUnit unit) { Validate.checkState(registration != null, "Change processor not started"); - if (running.stop()) { - if (runningMonitor.enter(timeOut, unit)) { - registration.unregister(); - runningMonitor.leave(); - return true; - } else { - // Timed out - return false; - } - } else { - // Stopped already + + if (!stopFlagSet()) { + // already stopped, return from here + return true; + } + + if (!enterWithTimeout(timeOut, unit)) { + // timed out acquiring the running lock, return early + return false; + } + + // lock has been acquired + try { + registration.unregister(); return true; + } finally { + runningLock.unlock(); } } @@ -460,9 +464,15 @@ class ChangeProcessor implements FilteringAwareObserver { */ public synchronized void stop() { Validate.checkState(registration != null, "Change processor not started"); - if (running.stop()) { - registration.unregister(); - runningMonitor.leave(); + + if (stopFlagSet()) { + // Wait until no contentChanged is in the critical section + runningLock.lock(); + try { + registration.unregister(); + } finally { + runningLock.unlock(); + } } } @@ -503,19 +513,24 @@ class ChangeProcessor implements FilteringAwareObserver { long time = System.nanoTime(); boolean hasEvents = events.hasNext(); tracker.recordProducerTime(System.nanoTime() - time, TimeUnit.NANOSECONDS); - if (hasEvents && runningMonitor.enterIf(running)) { - if (commitRateLimiter != null) { - commitRateLimiter.beforeNonBlocking(); - } + if (hasEvents && enterIfRunning()) { + // lock has been acquired try { - CountingIterator countingEvents = new CountingIterator(events); - eventListener.onEvent(countingEvents); - countingEvents.updateCounters(eventCount, eventDuration); - } finally { if (commitRateLimiter != null) { - commitRateLimiter.afterNonBlocking(); + commitRateLimiter.beforeNonBlocking(); } - runningMonitor.leave(); + try { + CountingIterator countingEvents = new CountingIterator(events); + eventListener.onEvent(countingEvents); + countingEvents.updateCounters(eventCount, eventDuration); + } finally { + if (commitRateLimiter != null) { + commitRateLimiter.afterNonBlocking(); + } + } + } finally { + // unlock now + runningLock.unlock(); } } } @@ -602,29 +617,6 @@ class ChangeProcessor implements FilteringAwareObserver { } } - private static class RunningGuard extends Guard { - private boolean stopped; - - public RunningGuard(Monitor monitor) { - super(monitor); - } - - @Override - public boolean isSatisfied() { - return !stopped; - } - - /** - * @return {@code true} if this call set this guard to stopped, - * {@code false} if another call set this guard to stopped before. - */ - public boolean stop() { - boolean wasStopped = stopped; - stopped = true; - return !wasStopped; - } - } - @Override public String toString() { return "ChangeProcessor [" @@ -634,7 +626,7 @@ class ChangeProcessor implements FilteringAwareObserver { + ", eventCount=" + eventCount + ", eventDuration=" + eventDuration + ", commitRateLimiter=" + commitRateLimiter - + ", running=" + running.isSatisfied() + "]"; + + ", running=" + stopped.get() + "]"; } /** for logging only **/ @@ -689,4 +681,56 @@ class ChangeProcessor implements FilteringAwareObserver { return FilterResult.INCLUDE; } } + + // helper methods for lock/unlocking + + private boolean stopFlagSet() { + // true only for the first caller that changes false -> true + return stopped.compareAndSet(false, true); + } + + private boolean enterIfRunning() { + runningLock.lock(); + boolean ok = false; + try { + ok = !stopped.get(); // guard: same as RunningGuard.isSatisfied() + return ok; + } finally { + if (!ok) { + runningLock.unlock(); + } + } + } + + private boolean enterWithTimeout(long timeout, TimeUnit unit) { + + // non-fair fast path + if (runningLock.tryLock()) { + return true; + } + + long timeoutNanos = unit.toNanos(timeout); + + boolean interrupted = false; + try { + long start = System.nanoTime(); + long remaining = timeoutNanos; + + while (remaining > 0L) { + try { + // timed out + return runningLock.tryLock(remaining, TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { + interrupted = true; + long elapsed = System.nanoTime() - start; + remaining = timeoutNanos - elapsed; + } + } + return false; // timeout + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } }
