This is an automated email from the ASF dual-hosted git repository.
miroslav pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new e4a2b39160 OAK-12039 Treat transient client-side exceptions as
recoverable in AzureRepositoryLock (#2663)
e4a2b39160 is described below
commit e4a2b3916035a13adb0f63d686f3fd3ebaf47ef9
Author: Miroslav Smiljanic <[email protected]>
AuthorDate: Fri Dec 19 11:52:00 2025 +0100
OAK-12039 Treat transient client-side exceptions as recoverable in
AzureRepositoryLock (#2663)
* OAK-12039 Treat transient client-side exceptions as recoverable in
AzureRepositoryLock
* OAK-12039 update verison of reactor-core
* OAK-12039 introduced oak.segment.azure.lock.leaseRenewalTimeoutInMs sys
property
* OAK-12039 change the test to cause timeout exception instead of mocking it
* OAK-12039 retry on IllegalStateException
---------
Co-authored-by: smiroslav <[email protected]>
---
.../oak/segment/azure/AzureRepositoryLock.java | 29 +++-
.../oak/segment/azure/AzureRepositoryLockTest.java | 153 ++++++++++++++++++++-
.../oak/segment/azure/AzuriteDockerRule.java | 37 +++++
3 files changed, 217 insertions(+), 2 deletions(-)
diff --git
a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java
b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java
index a4d38ac3fc..a34193f742 100644
---
a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java
+++
b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java
@@ -36,7 +36,9 @@ public class AzureRepositoryLock implements RepositoryLock {
private static final Logger log =
LoggerFactory.getLogger(AzureRepositoryLock.class);
private static final int TIMEOUT_SEC =
Integer.getInteger("oak.segment.azure.lock.timeout", 0);
- private static final Integer LEASE_RENEWAL_TIMEOUT_MS = 5000;
+
+ public static final String LEASE_RENEWAL_TIMEOUT_PROP =
"oak.segment.azure.lock.leaseRenewalTimeoutInMs";
+ private static final int LEASE_RENEWAL_TIMEOUT_MS =
Integer.getInteger(LEASE_RENEWAL_TIMEOUT_PROP, 5000);
public static final String LEASE_DURATION_PROP =
"oak.segment.azure.lock.leaseDurationInSec";
private final int leaseDuration = Integer.getInteger(LEASE_DURATION_PROP,
60);
@@ -160,6 +162,8 @@ public class AzureRepositoryLock implements RepositoryLock {
} else {
log.warn("Could not renew lease due to storage
exception. Retry in progress ... ", e);
}
+ } else if (isTransientClientSideException(e)) {
+ log.warn("Could not renew the lease due to transient
client-side error. Retry in progress ...", e);
} else {
log.error("Can't renew the lease", e);
shutdownHook.run();
@@ -211,6 +215,29 @@ public class AzureRepositoryLock implements RepositoryLock
{
return inError;
}
+ /**
+ * Checks if the exception is a transient client-side exception that
should be retried.
+ * This includes timeouts and IO/network errors that can occur when
communicating with Azure.
+ * <p>
+ * Per Azure SDK documentation, the timeout parameter causes a
RuntimeException to be raised.
+ * Reactor-core throws IllegalStateException with message "Timeout on
blocking read" when
+ * the timeout expires (see BlockingSingleSubscriber.blockingGet in
reactor-core).
+ * @param e the exception to check
+ * @return true if this is a transient exception that should be retried
+ */
+ private boolean isTransientClientSideException(Exception e) {
+ Throwable current = e;
+ while (current != null) {
+ if (current instanceof java.util.concurrent.TimeoutException ||
+ current instanceof java.io.IOException ||
+ current instanceof IllegalStateException) {
+ return true;
+ }
+ current = current.getCause();
+ }
+ return false;
+ }
+
private void waitABit(long millis) {
try {
Thread.sleep(millis);
diff --git
a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLockTest.java
b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLockTest.java
index 33716b17a9..752933ee21 100644
---
a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLockTest.java
+++
b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLockTest.java
@@ -20,7 +20,11 @@ package org.apache.jackrabbit.oak.segment.azure;
import com.azure.core.http.HttpHeaderName;
import com.azure.core.http.HttpHeaders;
+import com.azure.core.http.HttpPipelineCallContext;
+import com.azure.core.http.HttpPipelineNextPolicy;
+import com.azure.core.http.HttpResponse;
import com.azure.core.http.RequestConditions;
+import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobStorageException;
@@ -37,12 +41,17 @@ import
org.junit.contrib.java.lang.system.ProvideSystemProperty;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
+import java.time.Duration;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
@@ -53,6 +62,7 @@ public class AzureRepositoryLockTest {
public static final String LEASE_DURATION = "15";
public static final String RENEWAL_INTERVAL = "3";
public static final String TIME_TO_WAIT_BEFORE_BLOCK = "9";
+ public static final String LEASE_RENEWAL_TIMEOUT = "1000"; // 1 second for
faster tests
@ClassRule
public static AzuriteDockerRule azurite = new AzuriteDockerRule();
@@ -69,7 +79,8 @@ public class AzureRepositoryLockTest {
@Rule
public final ProvideSystemProperty systemPropertyRule = new
ProvideSystemProperty(AzureRepositoryLock.LEASE_DURATION_PROP, LEASE_DURATION)
.and(AzureRepositoryLock.RENEWAL_INTERVAL_PROP, RENEWAL_INTERVAL)
- .and(AzureRepositoryLock.TIME_TO_WAIT_BEFORE_WRITE_BLOCK_PROP,
TIME_TO_WAIT_BEFORE_BLOCK);
+ .and(AzureRepositoryLock.TIME_TO_WAIT_BEFORE_WRITE_BLOCK_PROP,
TIME_TO_WAIT_BEFORE_BLOCK)
+ .and(AzureRepositoryLock.LEASE_RENEWAL_TIMEOUT_PROP,
LEASE_RENEWAL_TIMEOUT);
@Test
public void testFailingLock() throws IOException, BlobStorageException {
@@ -204,4 +215,144 @@ public class AzureRepositoryLockTest {
Mockito.doCallRealMethod().when(blobLeaseMocked).renewLeaseWithResponse((RequestConditions)
any(), any(), any());
}
+
+ @Test
+ public void testClientSideTimeoutExceptionIsRecoverable() throws Exception
{
+ // Create a delay policy that delays lease renewal requests for 2
seconds (> 1s timeout set via system property)
+ // This will cause real IllegalStateException with TimeoutException
from Reactor's Mono.block()
+ DelayInjectionPolicy delayPolicy = new DelayInjectionPolicy(
+ context -> {
+ String url = context.getHttpRequest().getUrl().toString();
+ String method =
context.getHttpRequest().getHttpMethod().toString();
+ // Only delay PUT requests to repo.lock with comp=lease
(lease renewal operations)
+ // Skip the first request which is the initial lease
acquisition
+ return "PUT".equals(method) && url.contains("repo.lock")
&& url.contains("comp=lease");
+ },
+ Duration.ofSeconds(2), // Delay 2 seconds (timeout is 1
second via system property)
+ 2 // Delay first 2 renewal requests,
then succeed
+ );
+
+ // Create container client with the delay policy injected
+ BlobContainerClient containerWithDelay =
azurite.getContainerClientWithPolicies("oak-test", null, delayPolicy);
+ containerWithDelay.createIfNotExists();
+
+ BlockBlobClient blockBlobClient =
containerWithDelay.getBlobClient("oak/repo.lock").getBlockBlobClient();
+ BlobLeaseClient blobLeaseClient = Mockito.spy(new
BlobLeaseClientBuilder().blobClient(blockBlobClient).buildClient());
+
+ // Track if shutdown hook was called
+ AtomicBoolean shutdownCalled = new AtomicBoolean(false);
+ Runnable shutdownHook = () -> shutdownCalled.set(true);
+
+ WriteAccessController writeAccessController = new
WriteAccessController();
+
+ AzureRepositoryLock lock = new AzureRepositoryLock(blockBlobClient,
blobLeaseClient, shutdownHook, writeAccessController);
+ lock.lock();
+
+ // Enable delay injection after initial lease acquisition
+ delayPolicy.setEnabled(true);
+
+ // Wait for at least 3 renewal calls (2 timeouts + 1 success) with a
timeout
+ Mockito.verify(blobLeaseClient, Mockito.timeout(15000).atLeast(3))
+ .renewLeaseWithResponse((RequestConditions) any(), any(),
any());
+
+ assertTrue("Should have delayed at least 2 requests, but delayed: " +
delayPolicy.getDelayedRequestCount(),
+ delayPolicy.getDelayedRequestCount() >= 2);
+
+ assertFalse("Shutdown hook should not be called for client-side
timeout exceptions", shutdownCalled.get());
+
+ lock.unlock();
+ }
+
+ @Test
+ public void testIOExceptionIsRecoverable() throws Exception {
+ BlockBlobClient blockBlobClient =
readBlobContainerClient.getBlobClient("oak/repo.lock").getBlockBlobClient();
+ BlockBlobClient noRetryBlockBlobClient =
noRetryBlobContainerClient.getBlobClient("oak/repo.lock").getBlockBlobClient();
+ BlobLeaseClient blobLeaseClient = new
BlobLeaseClientBuilder().blobClient(noRetryBlockBlobClient).buildClient();
+
+ BlockBlobClient blobMocked = Mockito.spy(blockBlobClient);
+ BlobLeaseClient blobLeaseMocked = Mockito.spy(blobLeaseClient);
+
+ // Simulate network error wrapped in UncheckedIOException (as reactor
does with IOException)
+ java.io.UncheckedIOException networkError = new
java.io.UncheckedIOException(
+ "Connection reset",
+ new java.io.IOException("Connection reset by peer"));
+
+ // Track if shutdown hook was called
+ AtomicBoolean shutdownCalled = new AtomicBoolean(false);
+ Runnable shutdownHook = () -> shutdownCalled.set(true);
+
+ // Instrument the mock to throw the IO exception twice, then succeed
+ Mockito.doThrow(networkError)
+ .doThrow(networkError)
+ .doCallRealMethod()
+
.when(blobLeaseMocked).renewLeaseWithResponse((RequestConditions) any(), any(),
any());
+
+ WriteAccessController writeAccessController = new
WriteAccessController();
+
+ AzureRepositoryLock lock = new AzureRepositoryLock(blobMocked,
blobLeaseMocked, shutdownHook, writeAccessController);
+ lock.lock();
+
+ // Wait for at least 3 calls (2 failures + 1 success) with a timeout
+ Mockito.verify(blobLeaseMocked, Mockito.timeout(10000).atLeast(3))
+ .renewLeaseWithResponse((RequestConditions) any(), any(),
any());
+
+ // Verify that shutdown hook was NOT called - the IO exception should
be treated as recoverable
+ assertFalse("Shutdown hook should not be called for IO exceptions",
shutdownCalled.get());
+
+ // Clean up: stop the renewal thread and release the lease
+ lock.unlock();
+ }
+
+ /**
+ * HTTP pipeline policy that injects delays into specific requests.
+ * Used to cause real client-side timeouts in tests.
+ */
+ private static class DelayInjectionPolicy implements HttpPipelinePolicy {
+
+ private final Predicate<HttpPipelineCallContext> shouldDelay;
+ private final Duration delay;
+ private final AtomicInteger delayCount;
+ private final int maxDelays;
+ private final AtomicBoolean enabled;
+
+ /**
+ * Creates a delay injection policy.
+ *
+ * @param shouldDelay Predicate to determine if this request should be
delayed
+ * @param delay How long to delay (should exceed the client
timeout)
+ * @param maxDelays How many requests to delay before returning to
normal
+ */
+ DelayInjectionPolicy(Predicate<HttpPipelineCallContext> shouldDelay,
+ Duration delay,
+ int maxDelays) {
+ this.shouldDelay = shouldDelay;
+ this.delay = delay;
+ this.maxDelays = maxDelays;
+ this.delayCount = new AtomicInteger(0);
+ this.enabled = new AtomicBoolean(false);
+ }
+
+ @Override
+ public Mono<HttpResponse> process(HttpPipelineCallContext context,
HttpPipelineNextPolicy next) {
+ // Check if delay injection is enabled and this request should be
delayed
+ if (enabled.get() && shouldDelay.test(context) && delayCount.get()
< maxDelays) {
+ delayCount.incrementAndGet();
+ log.info("Injecting {}ms delay for request: {} {}",
+ delay.toMillis(),
+ context.getHttpRequest().getHttpMethod(),
+ context.getHttpRequest().getUrl());
+ // Delay BEFORE calling next.process() - this will cause the
client timeout
+ return Mono.delay(delay).then(next.process());
+ }
+ return next.process();
+ }
+
+ int getDelayedRequestCount() {
+ return delayCount.get();
+ }
+
+ void setEnabled(boolean enabled) {
+ this.enabled.set(enabled);
+ }
+ }
}
diff --git
a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzuriteDockerRule.java
b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzuriteDockerRule.java
index 187e85afb9..409442c82b 100644
---
a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzuriteDockerRule.java
+++
b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzuriteDockerRule.java
@@ -16,6 +16,7 @@
*/
package org.apache.jackrabbit.oak.segment.azure;
+import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
@@ -140,4 +141,40 @@ public class AzuriteDockerRule extends ExternalResource {
public int getMappedPort() {
return azuriteContainer.getMappedPort(10000);
}
+
+ /**
+ * Creates a BlobContainerClient with custom HTTP pipeline policies.
+ * Useful for testing scenarios like injecting delays or errors.
+ *
+ * @param containerName the container name
+ * @param retryOptions retry options (can be null)
+ * @param policies additional HTTP pipeline policies to add
+ * @return the configured BlobContainerClient
+ */
+ public BlobContainerClient getContainerClientWithPolicies(String
containerName,
+
RequestRetryOptions retryOptions,
+
HttpPipelinePolicy... policies) {
+ String blobEndpoint = "BlobEndpoint=" + getBlobEndpoint();
+ String accountName = "AccountName=" + ACCOUNT_NAME;
+ String accountKey = "AccountKey=" + ACCOUNT_KEY;
+
+ BlobServiceClientBuilder builder = new BlobServiceClientBuilder()
+ .endpoint(getBlobEndpoint())
+ .connectionString(("DefaultEndpointsProtocol=http;" +
accountName + ";" + accountKey + ";" + blobEndpoint));
+
+ // Add custom policies first
+ for (HttpPipelinePolicy policy : policies) {
+ builder.addPolicy(policy);
+ }
+
+ // Add logging policy
+ builder.addPolicy(new AzureHttpRequestLoggingTestingPolicy());
+
+ if (retryOptions != null) {
+ builder.retryOptions(retryOptions);
+ }
+
+ BlobServiceClient blobServiceClient = builder.buildClient();
+ return blobServiceClient.getBlobContainerClient(containerName);
+ }
}