This is an automated email from the ASF dual-hosted git repository.

daim pushed a commit to branch 1.22
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/1.22 by this push:
     new 0e54141ef6 OAK-12009 : provided support for GCP for new AWS sdk 2.x 
(#2621) (#2673)
0e54141ef6 is described below

commit 0e54141ef69e26c635fa4f52c30c457a5a924ffe
Author: Rishabh Kumar <[email protected]>
AuthorDate: Mon Jan 5 13:22:38 2026 +0530

    OAK-12009 : provided support for GCP for new AWS sdk 2.x (#2621) (#2673)
    
    * OAK-12009 : provided support for GCP for new AWS sdk 2.x
    
    * OAK-12009 : ircorporated review comments
    
    * OAK-12009 : avoid reading files into memory
    
    * OAK-12009 : extracted out getting length from InputStream to a separate 
method
---
 .../jackrabbit/oak/blob/cloud/s3/S3Backend.java    |  58 ++++++++-
 .../apache/jackrabbit/oak/blob/cloud/s3/Utils.java | 130 ++++++++++++++++++---
 .../jackrabbit/oak/blob/cloud/s3/UtilsTest.java    |  17 ++-
 3 files changed, 182 insertions(+), 23 deletions(-)

diff --git 
a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java
 
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java
index 8d60832d43..75ef22df76 100644
--- 
a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java
+++ 
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java
@@ -19,8 +19,11 @@ package org.apache.jackrabbit.oak.blob.cloud.s3;
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
@@ -369,7 +372,7 @@ public class S3Backend extends AbstractSharedBackend {
                             uploadReq.source(file).
                                     putObjectRequest(
                                             s3ReqDecorator.decorate(
-                                                    
PutObjectRequest.builder().bucket(bucket).key(key)
+                                                    
PutObjectRequest.builder().bucket(bucket).key(key).contentLength(file.length())
                                                             .build()))
                                     .build());
 
@@ -520,12 +523,17 @@ public class S3Backend extends AbstractSharedBackend {
         final ExecutorService executor = Executors.newSingleThreadExecutor();
         try {
             
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+            final PutObjectRequest.Builder builder = PutObjectRequest.builder()
+                    .bucket(bucket)
+                    .contentType("application/octet-stream")
+                    .key(addMetaKeyPrefix(name));
+
             // Specify `null` for the content length when you don't know the 
content length.
-            final AsyncRequestBody body = 
AsyncRequestBody.fromInputStream(input, null, executor);
+            final AsyncRequestBody body = getRequestBody(input, executor, 
builder);
             final Upload upload = tmx.upload(uploadReq ->
                     uploadReq.requestBody(body).
                             putObjectRequest(
-                                    
s3ReqDecorator.decorate(PutObjectRequest.builder().bucket(bucket).key(addMetaKeyPrefix(name)).build()))
+                                    s3ReqDecorator.decorate(builder.build()))
                             .build());
             upload.completionFuture().join();
         } catch (Exception e) {
@@ -560,7 +568,7 @@ public class S3Backend extends AbstractSharedBackend {
                     uploadReq.source(input).
                             putObjectRequest(
                                     s3ReqDecorator.decorate(
-                                            
PutObjectRequest.builder().bucket(bucket).key(addMetaKeyPrefix(name)).build()))
+                                            
PutObjectRequest.builder().bucket(bucket).contentLength(input.length()).key(addMetaKeyPrefix(name)).build()))
                             .build());
 
             upload.completionFuture().join();
@@ -1330,6 +1338,48 @@ public class S3Backend extends AbstractSharedBackend {
         return key.substring(0, 4) + key.substring(5);
     }
 
+    @NotNull
+    private AsyncRequestBody getRequestBody(final InputStream input, final 
ExecutorService executor,
+                                            final PutObjectRequest.Builder 
builder) throws IOException {
+        final AsyncRequestBody body;
+        if (Objects.equals(RemoteStorageMode.S3, 
properties.get(S3Constants.MODE))) {
+            body = AsyncRequestBody.fromInputStream(input, null, executor);
+        } else {
+            // for GCP we need to know the length in advance, else it won't 
work.
+            final long length;
+            if (input instanceof FileInputStream) {
+                final FileInputStream fis = (FileInputStream) input;
+                // if the file is modified after opening, the size may not 
reflect the latest changes
+                length = fis.getChannel().size();
+                body = AsyncRequestBody.fromInputStream(input, length, 
executor);
+            } else if (input instanceof ByteArrayInputStream) {
+                length = input.available();
+                body = AsyncRequestBody.fromInputStream(input, length, 
executor);
+            } else if (input.markSupported()) {
+                // in case the inputStream supports mark & reset
+                input.mark(Integer.MAX_VALUE);
+                length = IOUtils.consume(input);
+                input.reset();
+                body = AsyncRequestBody.fromInputStream(input, length, 
executor);
+            } else {
+                // we have to read all the stream to get the actual length
+                // last else block: store to temp file and re-read
+                final File tempFile = File.createTempFile("inputstream-", 
".tmp");
+                tempFile.deleteOnExit(); // Clean up after JVM exits
+
+                try (OutputStream out = new FileOutputStream(tempFile)) {
+                    IOUtils.copy(input, out); // Copy all bytes to file
+                }
+                // Get length from file
+                length = tempFile.length();
+                // Re-create InputStream from temp file
+                body = AsyncRequestBody.fromInputStream(new 
FileInputStream(tempFile), length, executor);
+            }
+            builder.contentLength(length);
+        }
+        return body;
+    }
+
     /**
      * The class renames object key in S3 in a thread.
      */
diff --git 
a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/Utils.java
 
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/Utils.java
index 9f5a51a15d..6a60793340 100644
--- 
a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/Utils.java
+++ 
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/Utils.java
@@ -31,6 +31,7 @@ import java.util.regex.Pattern;
 
 import org.apache.jackrabbit.oak.blob.cloud.s3.S3Backend.RemoteStorageMode;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,6 +41,8 @@ import 
software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
 import software.amazon.awssdk.auth.signer.AwsS3V4Signer;
+import software.amazon.awssdk.core.checksums.RequestChecksumCalculation;
+import software.amazon.awssdk.core.checksums.ResponseChecksumValidation;
 import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
 import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
 import software.amazon.awssdk.core.exception.SdkClientException;
@@ -48,6 +51,7 @@ import software.amazon.awssdk.http.apache.ApacheHttpClient;
 import software.amazon.awssdk.http.apache.ProxyConfiguration;
 import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
 import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain;
 import software.amazon.awssdk.services.s3.S3AsyncClient;
@@ -55,6 +59,7 @@ import 
software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
 import software.amazon.awssdk.services.s3.S3BaseClientBuilder;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.S3Configuration;
 import software.amazon.awssdk.services.s3.presigner.S3Presigner;
 import software.amazon.awssdk.utils.BinaryUtils;
 import software.amazon.awssdk.utils.Md5Utils;
@@ -130,11 +135,21 @@ public final class Utils {
      */
     public static S3AsyncClient openAsyncService(final Properties prop) {
         S3AsyncClientBuilder builder = S3AsyncClient.builder();
+        boolean isS3 = Objects.equals(RemoteStorageMode.S3, 
prop.get(S3Constants.MODE));
 
         configureBuilder(builder, prop, false);
         // async http client
         builder.httpClient(getSdkAsyncHttpClient(prop));
-        builder.multipartEnabled(true);
+
+        // AWS-specific optimizations
+        if (isS3) {
+            builder.multipartEnabled(true);
+            builder.multipartConfiguration(c -> c
+                    .minimumPartSizeInBytes(5L * 1024 * 1024)  // 5MB minimum
+                    .thresholdInBytes(10L * 1024 * 1024));      // 10MB 
threshold
+        } else {
+            builder.multipartEnabled(false);  // GCP doesn't support S3 
multipart
+        }
 
         return builder.build();
     }
@@ -151,9 +166,14 @@ public final class Utils {
      * @return a configured {@link S3Presigner} instance
      */
     public static S3Presigner createPresigner(final S3Client s3Client, final 
Properties props) {
-        return S3Presigner.builder().s3Client(s3Client).
-                credentialsProvider(Utils.getAwsCredentials(props))
+        final boolean isGCP = Objects.equals(RemoteStorageMode.GCP, 
props.get(S3Constants.MODE));
+        return S3Presigner.builder().s3Client(s3Client)
+                .credentialsProvider(Utils.getAwsCredentials(props))
                 .region(Region.of(Utils.getRegion(props)))
+                .serviceConfiguration(S3Configuration.builder()
+                        .pathStyleAccessEnabled(isGCP)
+                        .chunkedEncodingEnabled(!isGCP)
+                        .build())
                 .build();
     }
 
@@ -257,7 +277,7 @@ public final class Utils {
         String region = null;
 
         if (Objects.nonNull(prop.getProperty(S3Constants.S3_END_POINT))) {
-            region = 
getRegionFromEndpoint(prop.getProperty(S3Constants.S3_END_POINT));
+            region = 
getRegionFromEndpoint(prop.getProperty(S3Constants.S3_END_POINT), 
prop.getProperty(S3Constants.S3_CONN_PROTOCOL));
         }
 
         if (Objects.nonNull(region)) {
@@ -283,6 +303,7 @@ public final class Utils {
      * <ul>
      *   <li>https://s3.eu-west-1.amazonaws.com</li>
      *   <li>https://bucket.s3.eu-west-1.amazonaws.com</li>
+     *   <li>s3.eu-west-1.amazonaws.com</li>
      *   <li>https://s3.amazonaws.com (returns us-east-1)</li>
      * </ul>
      * If the region cannot be determined, returns null.
@@ -290,9 +311,9 @@ public final class Utils {
      * @param endpoint the S3 endpoint URL as a string
      * @return the AWS region string, or null if not found
      */
-    static String getRegionFromEndpoint(final String endpoint) {
+    static String getRegionFromEndpoint(final String endpoint, String 
protocol) {
         try {
-            URI uri = URI.create(endpoint);
+            URI uri = getEndPointUri(endpoint, protocol);
             String host = uri.getHost();
 
             // Pattern for standard S3 endpoints: s3.region.amazonaws.com or 
bucket.s3.region.amazonaws.com
@@ -353,12 +374,27 @@ public final class Utils {
          */
 
         // Check if endpoint already contains protocol
+        return getEndPointUri(endPoint, 
prop.getProperty(S3Constants.S3_CONN_PROTOCOL));
+    }
+
+    /**
+     * Constructs a URI for the S3 endpoint using the provided endpoint string 
and protocol.
+     * <p>
+     * If the endpoint string already contains a protocol (`http://` or 
`https://`), it is used directly.
+     * Otherwise, the specified protocol (or "https" if null/empty) is 
prepended to the endpoint.
+     * </p>
+     *
+     * @param endPoint the S3 endpoint string (may or may not include protocol)
+     * @param protocol the protocol to use ("http" or "https"); defaults to 
"https" if null or empty
+     * @return the constructed {@link URI} for the S3 endpoint
+     */
+    @NotNull
+    private static URI getEndPointUri(final String endPoint, @Nullable String 
protocol) {
         if (endPoint.startsWith("http://";) || endPoint.startsWith("https://";)) 
{
             LOG.info("S3 service endpoint [{}] ", endPoint);
             return URI.create(endPoint);
         }
 
-        String protocol = prop.getProperty(S3Constants.S3_CONN_PROTOCOL);
         if (protocol == null || protocol.isEmpty()) {
             protocol = HTTPS; // default protocol
         }
@@ -373,16 +409,24 @@ public final class Utils {
     }
 
     private static ClientOverrideConfiguration 
getClientConfiguration(Properties prop) {
+        final boolean isS3 = Objects.equals(RemoteStorageMode.S3, 
prop.get(S3Constants.MODE));
+
         int maxErrorRetry = 
Integer.parseInt(prop.getProperty(S3Constants.S3_MAX_ERR_RETRY));
         int connectionTimeOut = 
Integer.parseInt(prop.getProperty(S3Constants.S3_CONN_TIMEOUT));
         String encryptionType = prop.getProperty(S3Constants.S3_ENCRYPTION);
 
+        // API timeout should be much longer than connection timeout for large 
file uploads
+        // Use at least 5 minutes, or 10x connection timeout, whichever is 
larger
+        int apiTimeout = Math.max(connectionTimeOut * 10, 300000); // At least 
5 minutes
+
         ClientOverrideConfiguration.Builder builder = 
ClientOverrideConfiguration.builder();
 
         builder.retryStrategy(b -> b.maxAttempts(maxErrorRetry));
-        builder.apiCallTimeout(Duration.ofMillis(connectionTimeOut));
+        builder.apiCallTimeout(Duration.ofMillis(apiTimeout)); // Long timeout 
for large uploads
+        builder.apiCallAttemptTimeout(Duration.ofMillis(connectionTimeOut)); 
// Per-attempt timeout
 
-        if (S3Constants.S3_ENCRYPTION_SSE_KMS.equals(encryptionType)) {
+        // Only use KMS signer for AWS S3, not for GCP
+        if (isS3 && S3Constants.S3_ENCRYPTION_SSE_KMS.equals(encryptionType)) {
             builder.putAdvancedOption(SdkAdvancedClientOption.SIGNER, 
AwsS3V4Signer.create());
         }
         return builder.build();
@@ -391,10 +435,28 @@ public final class Utils {
     private static SdkHttpClient getSdkHttpClient(Properties prop) {
         HttpClientConfig config = new HttpClientConfig(prop);
         final ApacheHttpClient.Builder builder = ApacheHttpClient.builder();
+        final boolean isGCP = Objects.equals(RemoteStorageMode.GCP, 
prop.get(S3Constants.MODE));
+
+        // Calculate connection lifecycle based on socket timeout (all in 
SECONDS)
+        long socketTimeoutSeconds = config.socketTimeout / 1000;
 
-        builder.connectionTimeout(Duration.ofMillis(config.connectionTimeout))
+        // Idle time: 2x socket timeout (min 30s, max 120s)
+        long idleTimeSeconds = Math.min(120, Math.max(30, socketTimeoutSeconds 
* 2));
+
+        // TTL: 5x socket timeout (min 60s, max 600s = 10min)
+        long ttlSeconds = Math.min(600, Math.max(60, socketTimeoutSeconds * 
5));
+
+        // GCP needs higher max connections (no HTTP/2)
+        int maxConnections = isGCP ? Math.max(100, config.maxConnections) : 
Math.max(50, config.maxConnections);
+
+        builder.connectionTimeout(Duration.ofMillis(config.connectionTimeout)) 
// Connection timeouts
                 .socketTimeout(Duration.ofMillis(config.socketTimeout))
-                .maxConnections(config.maxConnections);
+                .maxConnections(maxConnections) // Connection pool
+                .connectionMaxIdleTime(Duration.ofSeconds(idleTimeSeconds))
+                .connectionTimeToLive(Duration.ofSeconds(ttlSeconds))
+                .useIdleConnectionReaper(true)
+                .tcpKeepAlive(true) // TCP keepalive
+                .expectContinueEnabled(true); // Expect-continue handshake 
(reduces overhead for large uploads)
 
         if (config.proxyHost != null && !config.proxyHost.isEmpty() && 
config.proxyPort != null && !config.proxyPort.isEmpty()) {
             String protocol = "http".equalsIgnoreCase(config.protocol) ? 
"http" : config.protocol;
@@ -411,12 +473,38 @@ public final class Utils {
 
     private static SdkAsyncHttpClient getSdkAsyncHttpClient(Properties prop) {
         HttpClientConfig config = new HttpClientConfig(prop);
+        final boolean isGCP = Objects.equals(RemoteStorageMode.GCP, 
prop.get(S3Constants.MODE));
         final NettyNioAsyncHttpClient.Builder builder = 
NettyNioAsyncHttpClient.builder();
 
-        builder.connectionTimeout(Duration.ofMillis(config.connectionTimeout))
+        // Calculate connection lifecycle based on socket timeout (all in 
SECONDS)
+        long socketTimeoutSeconds = config.socketTimeout / 1000;
+
+        // Idle time: 2x socket timeout (min 30s, max 120s)
+        long idleTimeSeconds = Math.min(120, Math.max(30, socketTimeoutSeconds 
* 2));
+
+        // TTL: 5x socket timeout (min 60s, max 600s = 10min)
+        long ttlSeconds = Math.min(600, Math.max(60, socketTimeoutSeconds * 
5));
+
+        // GCP needs higher concurrency (no HTTP/2, so more connections needed)
+        final int concurrency = isGCP ? Math.max(100, config.maxConnections) : 
Math.max(50, config.maxConnections);
+
+        // More threads for GCP
+        final int threads = isGCP ? Math.max(16, 
Runtime.getRuntime().availableProcessors() * 2)
+                : Math.max(4, Runtime.getRuntime().availableProcessors());
+
+
+        builder.connectionTimeout(Duration.ofMillis(config.connectionTimeout)) 
// Connection timeouts
                 .readTimeout(Duration.ofMillis(config.socketTimeout))
                 .writeTimeout(Duration.ofMillis(config.socketTimeout))
-                .maxConcurrency(config.maxConnections);
+                .maxConcurrency(concurrency)  // Connection pool - increased 
for better concurrency
+                .connectionMaxIdleTime(Duration.ofSeconds(idleTimeSeconds))
+                .connectionTimeToLive(Duration.ofSeconds(ttlSeconds))
+                .useIdleConnectionReaper(true)
+                .tcpKeepAlive(true) // TCP optimizations
+                .eventLoopGroup(
+                        SdkEventLoopGroup.builder()
+                                .numberOfThreads(threads) // Thread pool for 
Netty
+                                .build());
 
         if (config.proxyHost != null && !config.proxyHost.isEmpty() && 
config.proxyPort != null && !config.proxyPort.isEmpty()) {
             String protocol = HTTPS.equalsIgnoreCase(config.protocol) ? HTTPS 
: config.protocol;
@@ -474,6 +562,8 @@ public final class Utils {
     }
 
     private static void configureBuilder(final S3BaseClientBuilder builder, 
final Properties prop, final boolean accReq) {
+        final boolean isGCP = Objects.equals(RemoteStorageMode.GCP, 
prop.get(S3Constants.MODE));
+
         builder.credentialsProvider(getAwsCredentials(prop));
         builder.overrideConfiguration(getClientConfiguration(prop));
 
@@ -483,6 +573,20 @@ public final class Utils {
 
         builder.endpointOverride(getEndPointUri(prop, accReq, region));
         
builder.crossRegionAccessEnabled(Boolean.parseBoolean(prop.getProperty(S3Constants.S3_CROSS_REGION_ACCESS)));
+
+        // Disable checksums (replaces deprecated checksumValidationEnabled)
+        if (isGCP) {
+            // disable checksum for GCP, not working with AWS sdk
+            
builder.requestChecksumCalculation(RequestChecksumCalculation.WHEN_REQUIRED);
+            
builder.responseChecksumValidation(ResponseChecksumValidation.WHEN_REQUIRED);
+        }
+
+        builder.serviceConfiguration(
+                S3Configuration.builder()
+                        .pathStyleAccessEnabled(isGCP) // enable for GCP
+                        .chunkedEncodingEnabled(!isGCP) // Disable for GCP
+                        .useArnRegionEnabled(!isGCP)  // Disable for GCP
+                        .build());
     }
 
     // Helper class to hold common Http config
diff --git 
a/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/UtilsTest.java
 
b/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/UtilsTest.java
index e47fe2af52..580c111e3c 100644
--- 
a/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/UtilsTest.java
+++ 
b/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/UtilsTest.java
@@ -111,32 +111,37 @@ public class UtilsTest {
 
     @Test
     public void testGetRegionFromStandardEndpoint() {
-        Assert.assertEquals("eu-west-1", 
Utils.getRegionFromEndpoint("https://s3.eu-west-1.amazonaws.com";));
+        Assert.assertEquals("eu-west-1", 
Utils.getRegionFromEndpoint("https://s3.eu-west-1.amazonaws.com";, null));
     }
 
     @Test
     public void testGetRegionFromVirtualHostedEndpoint() {
-        Assert.assertEquals("ap-south-1", 
Utils.getRegionFromEndpoint("https://bucket.s3.ap-south-1.amazonaws.com";));
+        Assert.assertEquals("ap-south-1", 
Utils.getRegionFromEndpoint("https://bucket.s3.ap-south-1.amazonaws.com";, 
null));
     }
 
     @Test
     public void testGetRegionFromUsEast1Endpoint() {
-        Assert.assertEquals("us-east-1", 
Utils.getRegionFromEndpoint("https://s3.amazonaws.com";));
+        Assert.assertEquals("us-east-1", 
Utils.getRegionFromEndpoint("https://s3.amazonaws.com";, null));
     }
 
     @Test
     public void testGetRegionFromVirtualHostedUsEast1() {
-        Assert.assertEquals("us-east-1", 
Utils.getRegionFromEndpoint("https://bucket.s3.amazonaws.com";));
+        Assert.assertEquals("us-east-1", 
Utils.getRegionFromEndpoint("https://bucket.s3.amazonaws.com";, null));
     }
 
     @Test
     public void testGetRegionFromInvalidEndpoint() {
-        Assert.assertNull(Utils.getRegionFromEndpoint("https://example.com";));
+        Assert.assertNull(Utils.getRegionFromEndpoint("https://example.com";, 
null));
     }
 
     @Test
     public void testGetRegionFromMalformedEndpoint() {
-        Assert.assertNull(Utils.getRegionFromEndpoint("not-a-valid-uri"));
+        Assert.assertNull(Utils.getRegionFromEndpoint("not-a-valid-uri", 
"https"));
+    }
+
+    @Test
+    public void testGetRegionFromEndpointWithoutProtocol() {
+        Assert.assertEquals("us-east-1", 
Utils.getRegionFromEndpoint("s3.us-east-1.amazonaws.com", "https"));
     }
 
     @Test

Reply via email to