This is an automated email from the ASF dual-hosted git repository.
daim pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new f7273e7228 OAK-12115 : add monitor and monitor.guard test coverage for
SegmentBu… (#2772)
f7273e7228 is described below
commit f7273e7228773a60005c9210a1f91d75a54a586b
Author: Rishabh Kumar <[email protected]>
AuthorDate: Wed Mar 4 17:17:13 2026 +0530
OAK-12115 : add monitor and monitor.guard test coverage for SegmentBu…
(#2772)
* OAK-12115 : add monitor and monitor.guard test coverage for
SegmentBufferWriterPool
* OAK-12115 : fixed sonar issues
---
.../SegmentBufferWriterPoolMonitorTest.java | 439 +++++++++++++++++++++
1 file changed, 439 insertions(+)
diff --git
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolMonitorTest.java
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolMonitorTest.java
new file mode 100644
index 0000000000..6a2184febf
--- /dev/null
+++
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolMonitorTest.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment;
+
+import org.apache.jackrabbit.oak.segment.WriteOperationHandler.WriteOperation;
+import org.apache.jackrabbit.oak.segment.memory.MemoryStore;
+import org.apache.jackrabbit.oak.segment.spi.persistence.GCGeneration;
+import org.jetbrains.annotations.NotNull;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Tests for Monitor and Monitor.Guard usage in SegmentBufferWriterPool.
+ * These tests specifically verify the behavior of:
+ * - Monitor.enter() and Monitor.leave() in borrowWriter/returnWriter
+ * - Monitor.enterWhen(Guard) in flush() waiting for borrowed writers
+ * - Monitor.Guard.isSatisfied() in allReturned()
+ * - safeEnterWhen() interruption handling
+ */
+public class SegmentBufferWriterPoolMonitorTest {
+
+ private MemoryStore store;
+ private SegmentBufferWriterPool pool;
+ private GCGeneration gcGeneration;
+ private ExecutorService executor;
+
+ @Before
+ public void setUp() throws IOException {
+ store = new MemoryStore();
+ gcGeneration = GCGeneration.NULL;
+ pool = SegmentBufferWriterPool.factory(
+ store.getSegmentIdProvider(),
+ "test",
+ () -> gcGeneration
+ ).newPool(SegmentBufferWriterPool.PoolType.GLOBAL);
+ executor = Executors.newFixedThreadPool(10);
+ }
+
+ @After
+ public void tearDown() {
+ executor.shutdownNow();
+ }
+
+ /**
+ * Tests concurrent borrowWriter and returnWriter operations.
+ * This verifies Monitor.enter() and Monitor.leave() work correctly
+ * with multiple threads accessing the pool simultaneously.
+ */
+ @Test
+ public void testConcurrentBorrowAndReturn() throws Exception {
+ int numThreads = 10;
+ int iterationsPerThread = 100;
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch doneLatch = new CountDownLatch(numThreads);
+ AtomicInteger successCount = new AtomicInteger(0);
+
+ for (int i = 0; i < numThreads; i++) {
+ executor.submit(() -> {
+ try {
+ startLatch.await();
+ for (int j = 0; j < iterationsPerThread; j++) {
+ RecordId result = pool.execute(gcGeneration, writer ->
{
+ // Simulate some work
+ return store.getRevisions().getHead();
+ });
+ Assert.assertNotNull(result);
+ successCount.incrementAndGet();
+ }
+ } catch (Exception e) {
+ Assert.fail("Concurrent operation failed: " +
e.getMessage());
+ } finally {
+ doneLatch.countDown();
+ }
+ });
+ }
+
+ startLatch.countDown();
+ Assert.assertTrue("Threads should complete within timeout",
+ doneLatch.await(30, TimeUnit.SECONDS));
+ Assert.assertEquals("All operations should succeed",
+ numThreads * iterationsPerThread, successCount.get());
+ }
+
+ /**
+ * Tests that flush() waits for borrowed writers to be returned.
+ * This specifically tests Monitor.enterWhen(Guard) and
Monitor.Guard.isSatisfied().
+ */
+ @Test
+ public void testFlushWaitsForBorrowedWriters() throws Exception {
+ CyclicBarrier barrier = new CyclicBarrier(2);
+ AtomicBoolean writeOperationStarted = new AtomicBoolean(false);
+ AtomicBoolean writeOperationCompleted = new AtomicBoolean(false);
+ AtomicBoolean flushCompleted = new AtomicBoolean(false);
+
+ // Start a write operation that will hold a writer
+ Future<RecordId> writeFuture = executor.submit(() ->
pool.execute(gcGeneration, writer -> {
+ writeOperationStarted.set(true);
+ try {
+ // Wait for flush to be called
+ barrier.await(5, TimeUnit.SECONDS);
+ // Hold the writer for a bit to ensure flush is waiting
+ Thread.sleep(100);
+ writeOperationCompleted.set(true);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return store.getRevisions().getHead();
+ }));
+
+ // Wait for write operation to start
+ while (!writeOperationStarted.get()) {
+ Thread.sleep(10);
+ }
+
+ // Start flush operation
+ Future<Void> flushFuture = executor.submit(() -> {
+ try {
+ // Signal that flush is about to be called
+ barrier.await(5, TimeUnit.SECONDS);
+ pool.flush(store);
+ flushCompleted.set(true);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+
+ // Wait for both operations to complete
+ RecordId writeResult = writeFuture.get(10, TimeUnit.SECONDS);
+ flushFuture.get(10, TimeUnit.SECONDS);
+
+ Assert.assertNotNull("Write operation should complete", writeResult);
+ Assert.assertTrue("Write operation should complete before flush",
+ writeOperationCompleted.get());
+ Assert.assertTrue("Flush should complete after write operation returns
writer",
+ flushCompleted.get());
+ }
+
+ /**
+ * Tests that multiple writers are properly waited for during flush.
+ * This tests the Monitor.Guard condition checking all borrowed writers.
+ */
+ @Test
+ public void testFlushWaitsForMultipleBorrowedWriters() throws Exception {
+ int numWriters = 5;
+ CountDownLatch writersStarted = new CountDownLatch(numWriters);
+ CountDownLatch releaseWriters = new CountDownLatch(1);
+ AtomicInteger completedWrites = new AtomicInteger(0);
+
+ List<Future<RecordId>> writeFutures = new ArrayList<>();
+
+ // Start multiple write operations
+ for (int i = 0; i < numWriters; i++) {
+ Future<RecordId> future = executor.submit(() ->
pool.execute(gcGeneration, writer -> {
+ writersStarted.countDown();
+ try {
+ // Wait until all writers are borrowed
+ releaseWriters.await(5, TimeUnit.SECONDS);
+ completedWrites.incrementAndGet();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ return store.getRevisions().getHead();
+ }));
+ writeFutures.add(future);
+ }
+
+ // Wait for all writers to be borrowed
+ Assert.assertTrue("All writers should be borrowed",
+ writersStarted.await(5, TimeUnit.SECONDS));
+
+ // Start flush operation
+ Future<Void> flushFuture = executor.submit(() -> {
+ try {
+ // Small delay to ensure flush starts after all writers
borrowed
+ Thread.sleep(50);
+ pool.flush(store);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+
+ // Give flush a chance to start waiting
+ Thread.sleep(100);
+
+ // Release all writers
+ releaseWriters.countDown();
+
+ // Wait for all writes to complete
+ for (Future<RecordId> future : writeFutures) {
+ Assert.assertNotNull("Write should complete", future.get(5,
TimeUnit.SECONDS));
+ }
+
+ // Wait for flush to complete
+ flushFuture.get(5, TimeUnit.SECONDS);
+
+ Assert.assertEquals("All writes should complete", numWriters,
completedWrites.get());
+ }
+
+ /**
+ * Tests interrupted flush operation.
+ * This verifies safeEnterWhen() properly handles InterruptedException.
+ */
+ @Test
+ public void testFlushWithInterruption() throws Exception {
+ CountDownLatch writerBorrowed = new CountDownLatch(1);
+ CountDownLatch flushStarted = new CountDownLatch(1);
+
+ // Borrow a writer and hold it
+ executor.submit(() -> pool.execute(gcGeneration, writer -> {
+ writerBorrowed.countDown();
+ try {
+ // Hold writer for 5 seconds
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ // Expected if test is torn down
+ }
+ return store.getRevisions().getHead();
+ }));
+
+ // Wait for writer to be borrowed
+ Assert.assertTrue("Writer should be borrowed",
+ writerBorrowed.await(2, TimeUnit.SECONDS));
+
+ // Start flush in separate thread and interrupt it
+ Thread flushThread = new Thread(() -> {
+ try {
+ flushStarted.countDown();
+ pool.flush(store);
+ } catch (IOException e) {
+ // Ignore
+ }
+ });
+
+ flushThread.start();
+
+ // Wait for flush to start
+ Assert.assertTrue("Flush should start", flushStarted.await(2,
TimeUnit.SECONDS));
+
+ // Give flush time to enter waiting state
+ Thread.sleep(100);
+
+ // Interrupt the flush thread
+ flushThread.interrupt();
+
+ // Verify thread was interrupted (should exit quickly)
+ flushThread.join(1000);
+ Assert.assertFalse("Flush thread should terminate after interrupt",
+ flushThread.isAlive());
+ }
+
+ /**
+ * Tests writer disposal during concurrent flush.
+ * This tests the scenario where a writer is returned after flush() starts.
+ */
+ @Test
+ public void testWriterDisposalDuringFlush() throws Exception {
+ CountDownLatch writerBorrowed = new CountDownLatch(1);
+ CountDownLatch flushCanStart = new CountDownLatch(1);
+ CountDownLatch writerReturned = new CountDownLatch(1);
+
+ // Borrow a writer
+ Future<RecordId> writeFuture = executor.submit(() ->
pool.execute(gcGeneration, writer -> {
+ writerBorrowed.countDown();
+ try {
+ // Wait for flush to start
+ flushCanStart.await(5, TimeUnit.SECONDS);
+ // Small delay to ensure flush has collected borrowed writers
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } finally {
+ writerReturned.countDown();
+ }
+ return store.getRevisions().getHead();
+ }));
+
+ // Wait for writer to be borrowed
+ Assert.assertTrue("Writer should be borrowed",
+ writerBorrowed.await(2, TimeUnit.SECONDS));
+
+ // Start flush
+ Future<Void> flushFuture = executor.submit(() -> {
+ try {
+ flushCanStart.countDown();
+ pool.flush(store);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+
+ // Wait for operations to complete
+ RecordId writeResult = writeFuture.get(10, TimeUnit.SECONDS);
+ flushFuture.get(10, TimeUnit.SECONDS);
+
+ Assert.assertNotNull("Write should complete", writeResult);
+ Assert.assertEquals("Writer should be returned", 0,
writerReturned.getCount());
+ }
+
+ /**
+ * Tests multiple concurrent flushes.
+ * This verifies Monitor enter/leave work correctly with contention.
+ */
+ @Test
+ public void testMultipleConcurrentFlushes() throws Exception {
+ int numFlushes = 5;
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch doneLatch = new CountDownLatch(numFlushes);
+ AtomicInteger successfulFlushes = new AtomicInteger(0);
+
+ // Do some writes first
+ for (int i = 0; i < 10; i++) {
+ pool.execute(gcGeneration, (WriteOperation) writer ->
store.getRevisions().getHead());
+ }
+
+ // Start multiple concurrent flushes
+ for (int i = 0; i < numFlushes; i++) {
+ executor.submit(() -> {
+ try {
+ startLatch.await();
+ pool.flush(store);
+ successfulFlushes.incrementAndGet();
+ } catch (Exception e) {
+ Assert.fail("Flush failed: " + e.getMessage());
+ } finally {
+ doneLatch.countDown();
+ }
+ });
+ }
+
+ startLatch.countDown();
+ Assert.assertTrue("All flushes should complete",
+ doneLatch.await(10, TimeUnit.SECONDS));
+ Assert.assertEquals("All flushes should succeed",
+ numFlushes, successfulFlushes.get());
+ }
+
+ /**
+ * Tests that borrowWriter and returnWriter maintain proper writer state.
+ * This verifies Monitor protection of the borrowed/disposed sets.
+ */
+ @Test
+ public void testBorrowReturnStateConsistency() throws Exception {
+ int iterations = 100;
+
+ for (int i = 0; i < iterations; i++) {
+ RecordId result = pool.execute(gcGeneration, writer -> {
+ // Writer should be borrowed here
+ Assert.assertNotNull("Writer should not be null", writer);
+ return store.getRevisions().getHead();
+ });
+ Assert.assertNotNull("Result should not be null", result);
+ }
+
+ // Flush should work without issues
+ pool.flush(store);
+
+ // Should be able to borrow again after flush
+ RecordId result = pool.execute(gcGeneration, writer -> {
+ Assert.assertNotNull("Writer should not be null after flush",
writer);
+ return store.getRevisions().getHead();
+ });
+ Assert.assertNotNull("Result after flush should not be null", result);
+ }
+
+ /**
+ * Tests rapid borrow/return cycles under load.
+ * This stresses the Monitor enter/leave operations.
+ */
+ @Test
+ public void testRapidBorrowReturnCycles() throws Exception {
+ int numThreads = 20;
+ int cyclesPerThread = 50;
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch doneLatch = new CountDownLatch(numThreads);
+ AtomicInteger totalOperations = new AtomicInteger(0);
+
+ for (int i = 0; i < numThreads; i++) {
+ executor.submit(() -> {
+ try {
+ startLatch.await();
+ for (int j = 0; j < cyclesPerThread; j++) {
+ pool.execute(gcGeneration, new WriteOperation() {
+ @NotNull
+ @Override
+ public RecordId execute(@NotNull
SegmentBufferWriter writer) {
+ totalOperations.incrementAndGet();
+ return store.getRevisions().getHead();
+ }
+ });
+ }
+ } catch (Exception e) {
+ Assert.fail("Operation failed: " + e.getMessage());
+ } finally {
+ doneLatch.countDown();
+ }
+ });
+ }
+
+ startLatch.countDown();
+ Assert.assertTrue("All operations should complete",
+ doneLatch.await(30, TimeUnit.SECONDS));
+ Assert.assertEquals("All operations should execute",
+ numThreads * cyclesPerThread, totalOperations.get());
+ }
+}