Author: tomekr
Date: Wed May 2 10:17:40 2018
New Revision: 1830740
URL: http://svn.apache.org/viewvc?rev=1830740&view=rev
Log:
OAK-7465: It should be possible for an Azure Segment Store to wait until the
lease if released
Added:
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLockTest.java
Modified:
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java
Modified:
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java?rev=1830740&r1=1830739&r2=1830740&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java
Wed May 2 10:17:40 2018
@@ -89,12 +89,9 @@ public class AzurePersistence implements
@Override
public RepositoryLock lockRepository() throws IOException {
- return new AzureRepositoryLock(getBlockBlob("repo.lock"), new
Runnable() {
- @Override
- public void run() {
- log.warn("Lost connection to the Azure. The client will be
closed.");
- // TODO close the connection
- }
+ return new AzureRepositoryLock(getBlockBlob("repo.lock"), () -> {
+ log.warn("Lost connection to the Azure. The client will be
closed.");
+ // TODO close the connection
}).lock();
}
Modified:
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java?rev=1830740&r1=1830739&r2=1830740&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java
Wed May 2 10:17:40 2018
@@ -32,6 +32,8 @@ public class AzureRepositoryLock impleme
private static final Logger log =
LoggerFactory.getLogger(AzureRepositoryLock.class);
+ private static final int TIMEOUT_SEC =
Integer.getInteger("oak.segment.azure.lock.timeout", 0);
+
private static int INTERVAL = 60;
private final Runnable shutdownHook;
@@ -40,26 +42,54 @@ public class AzureRepositoryLock impleme
private final ExecutorService executor;
+ private final int timeoutSec;
+
private String leaseId;
private volatile boolean doUpdate;
public AzureRepositoryLock(CloudBlockBlob blob, Runnable shutdownHook) {
+ this(blob, shutdownHook, TIMEOUT_SEC);
+ }
+
+ public AzureRepositoryLock(CloudBlockBlob blob, Runnable shutdownHook, int
timeoutSec) {
this.shutdownHook = shutdownHook;
this.blob = blob;
this.executor = Executors.newSingleThreadExecutor();
+ this.timeoutSec = timeoutSec;
}
public AzureRepositoryLock lock() throws IOException {
- try {
- blob.openOutputStream().close();
- leaseId = blob.acquireLease(INTERVAL, null);
- log.info("Acquired lease {}", leaseId);
- } catch (StorageException e) {
- throw new IOException(e);
+ long start = System.currentTimeMillis();
+ Exception ex = null;
+ do {
+ try {
+ blob.openOutputStream().close();
+ leaseId = blob.acquireLease(INTERVAL, null);
+ log.info("Acquired lease {}", leaseId);
+ } catch (StorageException | IOException e) {
+ if (ex == null) {
+ log.info("Can't acquire the lease. Retrying every 1s.
Timeout is set to {}s.", timeoutSec);
+ }
+ ex = e;
+ if ((System.currentTimeMillis() - start) / 1000 < timeoutSec) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e1) {
+ throw new IOException(e1);
+ }
+ } else {
+ break;
+ }
+ }
+ } while (leaseId == null);
+ if (leaseId == null) {
+ log.error("Can't acquire the lease in {}s.", timeoutSec);
+ throw new IOException(ex);
+ } else {
+ executor.submit(this::refreshLease);
+ return this;
}
- executor.submit(this::refreshLease);
- return this;
}
private void refreshLease() {
@@ -104,6 +134,7 @@ public class AzureRepositoryLock impleme
blob.releaseLease(AccessCondition.generateLeaseCondition(leaseId));
blob.delete();
log.info("Released lease {}", leaseId);
+ leaseId = null;
} catch (StorageException e) {
throw new IOException(e);
}
Added:
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLockTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLockTest.java?rev=1830740&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLockTest.java
(added)
+++
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLockTest.java
Wed May 2 10:17:40 2018
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.rmi.server.ExportException;
+import java.security.InvalidKeyException;
+import java.util.concurrent.Semaphore;
+
+import static org.junit.Assert.fail;
+
+public class AzureRepositoryLockTest {
+
+ private static final Logger log =
LoggerFactory.getLogger(AzureRepositoryLockTest.class);
+
+ @ClassRule
+ public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+ private CloudBlobContainer container;
+
+ @Before
+ public void setup() throws StorageException, InvalidKeyException,
URISyntaxException {
+ container = azurite.getContainer("oak-test");
+ }
+
+ @Test
+ public void testFailingLock() throws URISyntaxException, IOException,
StorageException {
+ CloudBlockBlob blob = container.getBlockBlobReference("oak/repo.lock");
+ new AzureRepositoryLock(blob, () -> {}, 0).lock();
+ try {
+ new AzureRepositoryLock(blob, () -> {}, 0).lock();
+ fail("The second lock should fail.");
+ } catch (IOException e) {
+ // it's fine
+ }
+ }
+
+ @Test
+ public void testWaitingLock() throws URISyntaxException, IOException,
StorageException, InterruptedException {
+ CloudBlockBlob blob = container.getBlockBlobReference("oak/repo.lock");
+ Semaphore s = new Semaphore(0);
+ new Thread(() -> {
+ try {
+ RepositoryLock lock = new AzureRepositoryLock(blob, () -> {},
0).lock();
+ s.release();
+ Thread.sleep(1000);
+ lock.unlock();
+ } catch (Exception e) {
+ log.error("Can't lock or unlock the repo", e);
+ }
+ }).start();
+
+ s.acquire();
+ new AzureRepositoryLock(blob, () -> {}, 10).lock();
+ }
+
+}