This is an automated email from the ASF dual-hosted git repository. miroslav pushed a commit to branch issue/OAK-10006 in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
commit 5d322e710c06cc4eacf9c19e5f508e1d2235d638 Author: smiroslav <[email protected]> AuthorDate: Mon Nov 6 16:46:11 2023 +0100 OAK-10006 writes not possible during lease renewal --- .../oak/segment/azure/AzureArchiveManager.java | 9 ++- .../oak/segment/azure/AzureJournalFile.java | 14 +++- .../oak/segment/azure/AzurePersistence.java | 9 ++- .../oak/segment/azure/AzureRepositoryLock.java | 15 +++- .../segment/azure/AzureSegmentArchiveWriter.java | 13 ++++ .../oak/segment/azure/AzureArchiveManagerTest.java | 80 +++++++++++++++++++++- .../oak/segment/azure/AzureJournalFileTest.java | 3 +- .../oak/segment/azure/AzureRepositoryLockTest.java | 15 ++-- .../azure/journal/AzureJournalReaderTest.java | 3 +- .../remote/AbstractRemoteSegmentArchiveWriter.java | 2 + .../oak/segment/remote/WriteAccessController.java | 61 +++++++++++++++++ 11 files changed, 201 insertions(+), 23 deletions(-) diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java index daa042dbe2..ea943fb1b5 100644 --- a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java @@ -22,6 +22,7 @@ import com.microsoft.azure.storage.blob.CloudBlob; import com.microsoft.azure.storage.blob.CloudBlobDirectory; import com.microsoft.azure.storage.blob.CloudBlockBlob; import com.microsoft.azure.storage.blob.CopyStatus; +import org.apache.jackrabbit.oak.segment.remote.WriteAccessController; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager; import org.apache.jackrabbit.oak.segment.remote.RemoteUtilities; import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor; @@ -61,6 +62,7 @@ public class AzureArchiveManager implements SegmentArchiveManager { protected final IOMonitor ioMonitor; protected final FileStoreMonitor monitor; + private WriteAccessController writeAccessController = null; public AzureArchiveManager(CloudBlobDirectory cloudBlobDirectory, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor) { this.cloudBlobDirectory = cloudBlobDirectory; @@ -68,6 +70,11 @@ public class AzureArchiveManager implements SegmentArchiveManager { this.monitor = fileStoreMonitor; } + public AzureArchiveManager(CloudBlobDirectory segmentstoreDirectory, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor, WriteAccessController writeAccessController) { + this(segmentstoreDirectory, ioMonitor, fileStoreMonitor); + this.writeAccessController = writeAccessController; + } + @Override public List<String> listArchives() throws IOException { try { @@ -127,7 +134,7 @@ public class AzureArchiveManager implements SegmentArchiveManager { @Override public SegmentArchiveWriter create(String archiveName) throws IOException { - return new AzureSegmentArchiveWriter(getDirectory(archiveName), ioMonitor, monitor); + return new AzureSegmentArchiveWriter(getDirectory(archiveName), ioMonitor, monitor, writeAccessController); } @Override diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java index 9650334df0..5898836bbf 100644 --- a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java @@ -23,6 +23,7 @@ import com.microsoft.azure.storage.blob.CloudBlob; import com.microsoft.azure.storage.blob.CloudBlobDirectory; import com.microsoft.azure.storage.blob.ListBlobItem; import org.apache.jackrabbit.oak.segment.azure.util.CaseInsensitiveKeysMapAccess; +import org.apache.jackrabbit.oak.segment.remote.WriteAccessController; import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile; import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader; import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter; @@ -53,14 +54,17 @@ public class AzureJournalFile implements JournalFile { private final int lineLimit; - AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix, int lineLimit) { + private final WriteAccessController writeAccessController; + + AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix, int lineLimit, WriteAccessController writeAccessController) { this.directory = directory; this.journalNamePrefix = journalNamePrefix; this.lineLimit = lineLimit; + this.writeAccessController = writeAccessController; } - public AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix) { - this(directory, journalNamePrefix, JOURNAL_LINE_LIMIT); + public AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix, WriteAccessController writeAccessController) { + this(directory, journalNamePrefix, JOURNAL_LINE_LIMIT, writeAccessController); } @Override @@ -183,6 +187,8 @@ public class AzureJournalFile implements JournalFile { @Override public void truncate() throws IOException { try { + writeAccessController.checkWritingAllowed(); + for (CloudAppendBlob cloudAppendBlob : getJournalBlobs()) { cloudAppendBlob.delete(); } @@ -200,6 +206,8 @@ public class AzureJournalFile implements JournalFile { @Override public void batchWriteLines(List<String> lines) throws IOException { + writeAccessController.checkWritingAllowed(); + if (lines.isEmpty()) { return; } diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java index b5434e4175..ac0f0d5f29 100644 --- a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java @@ -34,6 +34,7 @@ import com.microsoft.azure.storage.blob.CloudAppendBlob; import com.microsoft.azure.storage.blob.CloudBlobDirectory; import com.microsoft.azure.storage.blob.CloudBlockBlob; import com.microsoft.azure.storage.blob.ListBlobItem; +import org.apache.jackrabbit.oak.segment.remote.WriteAccessController; import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor; import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor; import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitor; @@ -64,6 +65,8 @@ public class AzurePersistence implements SegmentNodeStorePersistence { protected final CloudBlobDirectory segmentstoreDirectory; + private WriteAccessController writeAccessController = new WriteAccessController(); + public AzurePersistence(CloudBlobDirectory segmentStoreDirectory) { this.segmentstoreDirectory = segmentStoreDirectory; @@ -92,7 +95,7 @@ public class AzurePersistence implements SegmentNodeStorePersistence { @Override public SegmentArchiveManager createArchiveManager(boolean mmap, boolean offHeapAccess, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor, RemoteStoreMonitor remoteStoreMonitor) { attachRemoteStoreMonitor(remoteStoreMonitor); - return new AzureArchiveManager(segmentstoreDirectory, ioMonitor, fileStoreMonitor); + return new AzureArchiveManager(segmentstoreDirectory, ioMonitor, fileStoreMonitor, writeAccessController); } @Override @@ -116,7 +119,7 @@ public class AzurePersistence implements SegmentNodeStorePersistence { @Override public JournalFile getJournalFile() { - return new AzureJournalFile(segmentstoreDirectory, "journal.log"); + return new AzureJournalFile(segmentstoreDirectory, "journal.log", writeAccessController); } @Override @@ -134,7 +137,7 @@ public class AzurePersistence implements SegmentNodeStorePersistence { return new AzureRepositoryLock(getBlockBlob("repo.lock"), () -> { log.warn("Lost connection to the Azure. The client will be closed."); // TODO close the connection - }).lock(); + }, writeAccessController).lock(); } private CloudBlockBlob getBlockBlob(String path) throws IOException { diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java index 60d7d0f3f3..6de3756d81 100644 --- a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java @@ -20,6 +20,7 @@ import com.microsoft.azure.storage.AccessCondition; import com.microsoft.azure.storage.StorageErrorCodeStrings; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.CloudBlockBlob; +import org.apache.jackrabbit.oak.segment.remote.WriteAccessController; import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,19 +46,22 @@ public class AzureRepositoryLock implements RepositoryLock { private final int timeoutSec; + private WriteAccessController writeAccessController; + private String leaseId; private volatile boolean doUpdate; - public AzureRepositoryLock(CloudBlockBlob blob, Runnable shutdownHook) { - this(blob, shutdownHook, TIMEOUT_SEC); + public AzureRepositoryLock(CloudBlockBlob blob, Runnable shutdownHook, WriteAccessController writeAccessController) { + this(blob, shutdownHook, TIMEOUT_SEC, writeAccessController); } - public AzureRepositoryLock(CloudBlockBlob blob, Runnable shutdownHook, int timeoutSec) { + public AzureRepositoryLock(CloudBlockBlob blob, Runnable shutdownHook, int timeoutSec, WriteAccessController writeAccessController) { this.shutdownHook = shutdownHook; this.blob = blob; this.executor = Executors.newSingleThreadExecutor(); this.timeoutSec = timeoutSec; + this.writeAccessController = writeAccessController; } public AzureRepositoryLock lock() throws IOException { @@ -67,6 +71,7 @@ public class AzureRepositoryLock implements RepositoryLock { try { blob.openOutputStream().close(); leaseId = blob.acquireLease(INTERVAL, null); + writeAccessController.enableWriting(); log.info("Acquired lease {}", leaseId); } catch (StorageException | IOException e) { if (ex == null) { @@ -100,7 +105,11 @@ public class AzureRepositoryLock implements RepositoryLock { try { long timeSinceLastUpdate = (System.currentTimeMillis() - lastUpdate) / 1000; if (timeSinceLastUpdate > INTERVAL / 2) { + writeAccessController.disableWriting(); + blob.renewLease(AccessCondition.generateLeaseCondition(leaseId)); + + writeAccessController.enableWriting(); lastUpdate = System.currentTimeMillis(); } } catch (StorageException e) { diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java index f7181bb810..f20ad4f67b 100644 --- a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java @@ -31,6 +31,7 @@ import com.microsoft.azure.storage.blob.CloudBlobDirectory; import com.microsoft.azure.storage.blob.CloudBlockBlob; import org.apache.jackrabbit.oak.commons.Buffer; +import org.apache.jackrabbit.oak.segment.remote.WriteAccessController; import org.apache.jackrabbit.oak.segment.azure.util.Retrier; import org.apache.jackrabbit.oak.segment.remote.AbstractRemoteSegmentArchiveWriter; import org.apache.jackrabbit.oak.segment.remote.RemoteSegmentArchiveEntry; @@ -51,6 +52,11 @@ public class AzureSegmentArchiveWriter extends AbstractRemoteSegmentArchiveWrite this.archiveDirectory = archiveDirectory; } + public AzureSegmentArchiveWriter(CloudBlobDirectory directory, IOMonitor ioMonitor, FileStoreMonitor monitor, WriteAccessController writeAccessController) { + this(directory, ioMonitor, monitor); + this.writeAccessController = writeAccessController; + } + @Override public String getName() { return AzureUtilities.getName(archiveDirectory); @@ -58,6 +64,9 @@ public class AzureSegmentArchiveWriter extends AbstractRemoteSegmentArchiveWrite @Override protected void doWriteArchiveEntry(RemoteSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException { + + writeAccessController.checkWritingAllowed(); + long msb = indexEntry.getMsb(); long lsb = indexEntry.getLsb(); String segmentName = getSegmentFileName(indexEntry); @@ -90,6 +99,8 @@ public class AzureSegmentArchiveWriter extends AbstractRemoteSegmentArchiveWrite protected void doWriteDataFile(byte[] data, String extension) throws IOException { retrier.execute(() -> { try { + writeAccessController.checkWritingAllowed(); + getBlob(getName() + extension).uploadFromByteArray(data, 0, data.length); } catch (StorageException e) { throw new IOException(e); @@ -101,6 +112,8 @@ public class AzureSegmentArchiveWriter extends AbstractRemoteSegmentArchiveWrite protected void afterQueueClosed() throws IOException { retrier.execute(() -> { try { + writeAccessController.checkWritingAllowed(); + getBlob("closed").uploadFromByteArray(new byte[0], 0, 0); } catch (StorageException e) { throw new IOException(e); diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java index cfcf1337bd..86b649cccf 100644 --- a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java @@ -16,10 +16,9 @@ */ package org.apache.jackrabbit.oak.segment.azure; +import com.microsoft.azure.storage.StorageErrorCodeStrings; import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.blob.CloudBlob; -import com.microsoft.azure.storage.blob.CloudBlobContainer; -import com.microsoft.azure.storage.blob.ListBlobItem; +import com.microsoft.azure.storage.blob.*; import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.api.Type; @@ -34,6 +33,7 @@ import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder; import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException; import org.apache.jackrabbit.oak.segment.file.ReadOnlyFileStore; import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence; +import org.apache.jackrabbit.oak.segment.remote.WriteAccessController; import org.apache.jackrabbit.oak.segment.spi.RepositoryNotReachableException; import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter; import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; @@ -55,6 +55,7 @@ import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; import java.io.File; import java.io.FileNotFoundException; @@ -66,6 +67,7 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.UUID; +import java.util.concurrent.TimeoutException; import static org.apache.jackrabbit.guava.common.collect.Lists.newArrayList; import static org.hamcrest.CoreMatchers.equalTo; @@ -465,6 +467,78 @@ public class AzureArchiveManagerTest { } } + @Test + public void testWriteAfterLoosingRepoLock() throws URISyntaxException, InvalidFileStoreVersionException, IOException, CommitFailedException, StorageException, InterruptedException { + CloudBlobDirectory oakDirectory = container.getDirectoryReference("oak"); + AzurePersistence rwPersistence = new AzurePersistence(oakDirectory); + + CloudBlockBlob blob = container.getBlockBlobReference("oak/repo.lock"); + + CloudBlockBlob blobMocked = Mockito.spy(blob); + + Mockito + .doCallRealMethod() + .when(blobMocked).renewLease(Mockito.any()); + + AzurePersistence mockedRwPersistence = Mockito.spy(rwPersistence); + WriteAccessController writeAccessController = new WriteAccessController(); + AzureRepositoryLock azureRepositoryLock = new AzureRepositoryLock(blobMocked, () -> {}, 0, writeAccessController); + AzureArchiveManager azureArchiveManager = new AzureArchiveManager(oakDirectory, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), writeAccessController); + + + Mockito + .doAnswer(invocation -> azureRepositoryLock.lock()) + .when(mockedRwPersistence).lockRepository(); + + Mockito + .doReturn(azureArchiveManager) + .when(mockedRwPersistence).createArchiveManager(Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.any()); + Mockito + .doReturn(new AzureJournalFile(oakDirectory, "journal.log", writeAccessController)) + .when(mockedRwPersistence).getJournalFile(); + + FileStore rwFileStore = FileStoreBuilder.fileStoreBuilder(folder.newFolder()).withCustomPersistence(mockedRwPersistence).build(); + SegmentNodeStore segmentNodeStore = SegmentNodeStoreBuilders.builder(rwFileStore).build(); + NodeBuilder builder = segmentNodeStore.getRoot().builder(); + + + // simulate operation timeout when trying to renew lease + Mockito.reset(blobMocked); + + StorageException storageException = + new StorageException(StorageErrorCodeStrings.OPERATION_TIMED_OUT, "operation timeout", new TimeoutException()); + + Mockito.doThrow(storageException).when(blobMocked).renewLease(Mockito.any()); + + + // wait till lease expires + Thread.sleep(70000); + + // try updating repository + Thread thread = new Thread(() -> { + try { + builder.setProperty("foo", "bar"); + segmentNodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + rwFileStore.flush(); + } catch (Exception e) { + fail("No Exception expected, but got: " + e.getMessage()); + } + }); + thread.start(); + + Thread.sleep(2000); + + // It should be possible to start another RW file store. + FileStore rwFileStore2 = FileStoreBuilder.fileStoreBuilder(folder.newFolder()).withCustomPersistence(new AzurePersistence(oakDirectory)).build(); + SegmentNodeStore segmentNodeStore2 = SegmentNodeStoreBuilders.builder(rwFileStore2).build(); + NodeBuilder builder2 = segmentNodeStore2.getRoot().builder(); + + //repository hasn't been updated + assertNull(builder2.getProperty("foo")); + + rwFileStore2.close(); + } + private PersistentCache createPersistenceCache() { return new AbstractPersistentCache() { @Override diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java index 14433250ba..78c1da1eae 100644 --- a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java @@ -23,6 +23,7 @@ import com.microsoft.azure.storage.blob.ListBlobItem; import java.util.stream.IntStream; import org.apache.commons.lang3.time.StopWatch; import org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage.AzuriteDockerRule; +import org.apache.jackrabbit.oak.segment.remote.WriteAccessController; import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader; import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter; import org.jetbrains.annotations.NotNull; @@ -54,7 +55,7 @@ public class AzureJournalFileTest { @Before public void setup() throws StorageException, InvalidKeyException, URISyntaxException { container = azurite.getContainer("oak-test"); - journal = new AzureJournalFile(container.getDirectoryReference("journal"), "journal.log", 50); + journal = new AzureJournalFile(container.getDirectoryReference("journal"), "journal.log", 50, new WriteAccessController()); } @Test diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLockTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLockTest.java index 4a5676b6b5..03119ab839 100644 --- a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLockTest.java +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLockTest.java @@ -24,10 +24,10 @@ import com.microsoft.azure.storage.blob.CloudBlobContainer; import com.microsoft.azure.storage.blob.CloudBlockBlob; import org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage.AzuriteDockerRule; +import org.apache.jackrabbit.oak.segment.remote.WriteAccessController; import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock; import org.junit.Before; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.mockito.Mockito; import org.slf4j.Logger; @@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URISyntaxException; -import java.rmi.server.ExportException; import java.security.InvalidKeyException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeoutException; @@ -59,9 +58,9 @@ public class AzureRepositoryLockTest { @Test public void testFailingLock() throws URISyntaxException, IOException, StorageException { CloudBlockBlob blob = container.getBlockBlobReference("oak/repo.lock"); - new AzureRepositoryLock(blob, () -> {}, 0).lock(); + new AzureRepositoryLock(blob, () -> {}, 0, new WriteAccessController()).lock(); try { - new AzureRepositoryLock(blob, () -> {}, 0).lock(); + new AzureRepositoryLock(blob, () -> {}, 0, new WriteAccessController()).lock(); fail("The second lock should fail."); } catch (IOException e) { // it's fine @@ -74,7 +73,7 @@ public class AzureRepositoryLockTest { Semaphore s = new Semaphore(0); new Thread(() -> { try { - RepositoryLock lock = new AzureRepositoryLock(blob, () -> {}, 0).lock(); + RepositoryLock lock = new AzureRepositoryLock(blob, () -> {}, 0, new WriteAccessController()).lock(); s.release(); Thread.sleep(1000); lock.unlock(); @@ -84,7 +83,7 @@ public class AzureRepositoryLockTest { }).start(); s.acquire(); - new AzureRepositoryLock(blob, () -> {}, 10).lock(); + new AzureRepositoryLock(blob, () -> {}, 10, new WriteAccessController()).lock(); } @Test @@ -101,7 +100,7 @@ public class AzureRepositoryLockTest { .doCallRealMethod() .when(blobMocked).renewLease(Mockito.any()); - new AzureRepositoryLock(blobMocked, () -> {}, 0).lock(); + new AzureRepositoryLock(blobMocked, () -> {}, 0, new WriteAccessController()).lock(); // wait till lease expires Thread.sleep(70000); @@ -110,7 +109,7 @@ public class AzureRepositoryLockTest { Mockito.doCallRealMethod().when(blobMocked).renewLease(Mockito.any()); try { - new AzureRepositoryLock(blobMocked, () -> {}, 0).lock(); + new AzureRepositoryLock(blobMocked, () -> {}, 0, new WriteAccessController()).lock(); fail("The second lock should fail."); } catch (IOException e) { // it's fine diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java index 544d168ff4..dcd59155ba 100644 --- a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java @@ -24,6 +24,7 @@ import org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage.AzuriteDockerRule; import org.apache.jackrabbit.oak.segment.file.JournalReader; import org.apache.jackrabbit.oak.segment.file.JournalReaderTest; import org.apache.jackrabbit.oak.segment.azure.AzureJournalFile; +import org.apache.jackrabbit.oak.segment.remote.WriteAccessController; import org.junit.Before; import org.junit.ClassRule; @@ -48,7 +49,7 @@ public class AzureJournalReaderTest extends JournalReaderTest { CloudAppendBlob blob = container.getAppendBlobReference("journal/journal.log.001"); blob.createOrReplace(); blob.appendText(s); - return new JournalReader(new AzureJournalFile(container.getDirectoryReference("journal"), "journal.log")); + return new JournalReader(new AzureJournalFile(container.getDirectoryReference("journal"), "journal.log", new WriteAccessController())); } catch (StorageException | URISyntaxException e) { throw new IOException(e); } diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveWriter.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveWriter.java index a681a49b12..a0b158e5be 100644 --- a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveWriter.java +++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveWriter.java @@ -46,6 +46,8 @@ public abstract class AbstractRemoteSegmentArchiveWriter implements SegmentArchi protected volatile boolean created = false; + protected WriteAccessController writeAccessController = null; + public AbstractRemoteSegmentArchiveWriter(IOMonitor ioMonitor, FileStoreMonitor monitor) { this.ioMonitor = ioMonitor; this.monitor = monitor; diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/WriteAccessController.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/WriteAccessController.java new file mode 100644 index 0000000000..c3ae0c3349 --- /dev/null +++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/WriteAccessController.java @@ -0,0 +1,61 @@ +package org.apache.jackrabbit.oak.segment.remote; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class WriteAccessController { + private boolean isWritingAllowed = false; + private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + public void disableWriting() { + try { + + + lock.writeLock().lock(); + try { + this.isWritingAllowed = false; + } finally { + lock.writeLock().unlock(); + } + + } catch (Throwable e) { + e.printStackTrace(); + } + } + + public void enableWriting() { + try { + lock.writeLock().lock(); + try { + this.isWritingAllowed = true; + synchronized (this) { + this.notifyAll(); + } + } finally { + lock.writeLock().unlock(); + } + + } catch (Throwable e) { + e.printStackTrace(); + } + } + + public void checkWritingAllowed() { + lock.readLock().lock(); + try { + while (!isWritingAllowed) { + synchronized (this) { + try { + this.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for writing to be allowed", e); + } + } + } + } finally { + lock.readLock().unlock(); + } + } +}
