This is an automated email from the ASF dual-hosted git repository.

daim pushed a commit to branch OAK-12072
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git

commit 3e53b55b2765dbf4ca65c83204db150a3788a34e
Author: rishabhdaim <[email protected]>
AuthorDate: Mon Feb 2 19:41:37 2026 +0530

    OAK-12072 : removed Guava's Monitor and Guard
---
 .../oak/jcr/observation/ChangeProcessor.java       | 150 +++++++++++++--------
 1 file changed, 97 insertions(+), 53 deletions(-)

diff --git 
a/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
 
b/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
index 3163afdbaf..136b172588 100644
--- 
a/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
+++ 
b/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
@@ -29,8 +29,10 @@ import static 
org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.scheduleW
 
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.jcr.observation.Event;
 import javax.jcr.observation.EventIterator;
@@ -73,9 +75,6 @@ import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.jackrabbit.guava.common.util.concurrent.Monitor;
-import org.apache.jackrabbit.guava.common.util.concurrent.Monitor.Guard;
-
 /**
  * A {@code ChangeProcessor} generates observation {@link 
javax.jcr.observation.Event}s
  * based on a {@link FilterProvider filter} and delivers them to an {@link 
EventListener}.
@@ -417,8 +416,8 @@ class ChangeProcessor implements FilteringAwareObserver {
         });
     }
 
-    private final Monitor runningMonitor = new Monitor();
-    private final RunningGuard running = new RunningGuard(runningMonitor);
+    private final AtomicBoolean stopped = new AtomicBoolean(false);
+    private final ReentrantLock runningLock = new ReentrantLock(false);
 
     /**
      * Try to stop this change processor if running. This method will wait
@@ -437,18 +436,23 @@ class ChangeProcessor implements FilteringAwareObserver {
      */
     public synchronized boolean stopAndWait(int timeOut, TimeUnit unit) {
         Validate.checkState(registration != null, "Change processor not 
started");
-        if (running.stop()) {
-            if (runningMonitor.enter(timeOut, unit)) {
-                registration.unregister();
-                runningMonitor.leave();
-                return true;
-            } else {
-                // Timed out
-                return false;
-            }
-        } else {
-            // Stopped already
+
+        if (!stopFlagSet()) {
+            // already stopped, return from here
+            return true;
+        }
+
+        if (!enterWithTimeout(timeOut, unit)) {
+            // timed out acquiring the running lock, return early
+            return false;
+        }
+
+        // lock has been acquired
+        try {
+            registration.unregister();
             return true;
+        } finally {
+            runningLock.unlock();
         }
     }
 
@@ -460,9 +464,15 @@ class ChangeProcessor implements FilteringAwareObserver {
      */
     public synchronized void stop() {
         Validate.checkState(registration != null, "Change processor not 
started");
-        if (running.stop()) {
-            registration.unregister();
-            runningMonitor.leave();
+
+        if (stopFlagSet()) {
+            // Wait until no contentChanged is in the critical section
+            runningLock.lock();
+            try {
+                registration.unregister();
+            } finally {
+                runningLock.unlock();
+            }
         }
     }
 
@@ -503,19 +513,24 @@ class ChangeProcessor implements FilteringAwareObserver {
                 long time = System.nanoTime();
                 boolean hasEvents = events.hasNext();
                 tracker.recordProducerTime(System.nanoTime() - time, 
TimeUnit.NANOSECONDS);
-                if (hasEvents && runningMonitor.enterIf(running)) {
-                    if (commitRateLimiter != null) {
-                        commitRateLimiter.beforeNonBlocking();
-                    }
+                if (hasEvents && enterIfRunning()) {
+                    // lock has been acquired
                     try {
-                        CountingIterator countingEvents = new 
CountingIterator(events);
-                        eventListener.onEvent(countingEvents);
-                        countingEvents.updateCounters(eventCount, 
eventDuration);
-                    } finally {
                         if (commitRateLimiter != null) {
-                            commitRateLimiter.afterNonBlocking();
+                            commitRateLimiter.beforeNonBlocking();
                         }
-                        runningMonitor.leave();
+                        try {
+                            CountingIterator countingEvents = new 
CountingIterator(events);
+                            eventListener.onEvent(countingEvents);
+                            countingEvents.updateCounters(eventCount, 
eventDuration);
+                        } finally {
+                            if (commitRateLimiter != null) {
+                                commitRateLimiter.afterNonBlocking();
+                            }
+                        }
+                    } finally {
+                        // unlock now
+                        runningLock.unlock();
                     }
                 }
             }
@@ -602,29 +617,6 @@ class ChangeProcessor implements FilteringAwareObserver {
         }
     }
 
-    private static class RunningGuard extends Guard {
-        private boolean stopped;
-
-        public RunningGuard(Monitor monitor) {
-            super(monitor);
-        }
-
-        @Override
-        public boolean isSatisfied() {
-            return !stopped;
-        }
-
-        /**
-         * @return  {@code true} if this call set this guard to stopped,
-         *          {@code false} if another call set this guard to stopped 
before.
-         */
-        public boolean stop() {
-            boolean wasStopped = stopped;
-            stopped = true;
-            return !wasStopped;
-        }
-    }
-
     @Override
     public String toString() {
         return "ChangeProcessor ["
@@ -634,7 +626,7 @@ class ChangeProcessor implements FilteringAwareObserver {
                 + ", eventCount=" + eventCount 
                 + ", eventDuration=" + eventDuration 
                 + ", commitRateLimiter=" + commitRateLimiter
-                + ", running=" + running.isSatisfied() + "]";
+                + ", running=" + stopped.get() + "]";
     }
     
     /** for logging only **/
@@ -689,4 +681,56 @@ class ChangeProcessor implements FilteringAwareObserver {
             return FilterResult.INCLUDE;
         }
     }
+
+    // helper methods for lock/unlocking
+
+    private boolean stopFlagSet() {
+        // true only for the first caller that changes false -> true
+        return stopped.compareAndSet(false, true);
+    }
+
+    private boolean enterIfRunning() {
+        runningLock.lock();
+        boolean ok = false;
+        try {
+            ok = !stopped.get();   // guard: same as RunningGuard.isSatisfied()
+            return ok;
+        } finally {
+            if (!ok) {
+                runningLock.unlock();
+            }
+        }
+    }
+
+    private boolean enterWithTimeout(long timeout, TimeUnit unit) {
+
+        // non-fair fast path
+        if (runningLock.tryLock()) {
+            return true;
+        }
+
+        long timeoutNanos = unit.toNanos(timeout);
+
+        boolean interrupted = false;
+        try {
+            long start = System.nanoTime();
+            long remaining = timeoutNanos;
+
+            while (remaining > 0L) {
+                try {
+                    // timed out
+                    return runningLock.tryLock(remaining, 
TimeUnit.NANOSECONDS);
+                } catch (InterruptedException e) {
+                    interrupted = true;
+                    long elapsed = System.nanoTime() - start;
+                    remaining = timeoutNanos - elapsed;
+                }
+            }
+            return false; // timeout
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
 }

Reply via email to