This is an automated email from the ASF dual-hosted git repository. daim pushed a commit to branch OAK-12009 in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
commit 106a1c4326c360c1ddb7f7ed53c731768ce3cc14 Author: Rishabh Kumar <[email protected]> AuthorDate: Mon Dec 1 10:43:27 2025 +0530 OAK-12009 : provided support for GCP for new AWS sdk 2.x (#2621) * OAK-12009 : provided support for GCP for new AWS sdk 2.x * OAK-12009 : ircorporated review comments * OAK-12009 : avoid reading files into memory * OAK-12009 : extracted out getting length from InputStream to a separate method --- .../jackrabbit/oak/blob/cloud/s3/S3Backend.java | 58 ++++++++- .../apache/jackrabbit/oak/blob/cloud/s3/Utils.java | 130 ++++++++++++++++++--- .../jackrabbit/oak/blob/cloud/s3/UtilsTest.java | 17 ++- 3 files changed, 182 insertions(+), 23 deletions(-) diff --git a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java index 8d60832d43..75ef22df76 100644 --- a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java +++ b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java @@ -19,8 +19,11 @@ package org.apache.jackrabbit.oak.blob.cloud.s3; import java.io.ByteArrayInputStream; import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; @@ -369,7 +372,7 @@ public class S3Backend extends AbstractSharedBackend { uploadReq.source(file). putObjectRequest( s3ReqDecorator.decorate( - PutObjectRequest.builder().bucket(bucket).key(key) + PutObjectRequest.builder().bucket(bucket).key(key).contentLength(file.length()) .build())) .build()); @@ -520,12 +523,17 @@ public class S3Backend extends AbstractSharedBackend { final ExecutorService executor = Executors.newSingleThreadExecutor(); try { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + final PutObjectRequest.Builder builder = PutObjectRequest.builder() + .bucket(bucket) + .contentType("application/octet-stream") + .key(addMetaKeyPrefix(name)); + // Specify `null` for the content length when you don't know the content length. - final AsyncRequestBody body = AsyncRequestBody.fromInputStream(input, null, executor); + final AsyncRequestBody body = getRequestBody(input, executor, builder); final Upload upload = tmx.upload(uploadReq -> uploadReq.requestBody(body). putObjectRequest( - s3ReqDecorator.decorate(PutObjectRequest.builder().bucket(bucket).key(addMetaKeyPrefix(name)).build())) + s3ReqDecorator.decorate(builder.build())) .build()); upload.completionFuture().join(); } catch (Exception e) { @@ -560,7 +568,7 @@ public class S3Backend extends AbstractSharedBackend { uploadReq.source(input). putObjectRequest( s3ReqDecorator.decorate( - PutObjectRequest.builder().bucket(bucket).key(addMetaKeyPrefix(name)).build())) + PutObjectRequest.builder().bucket(bucket).contentLength(input.length()).key(addMetaKeyPrefix(name)).build())) .build()); upload.completionFuture().join(); @@ -1330,6 +1338,48 @@ public class S3Backend extends AbstractSharedBackend { return key.substring(0, 4) + key.substring(5); } + @NotNull + private AsyncRequestBody getRequestBody(final InputStream input, final ExecutorService executor, + final PutObjectRequest.Builder builder) throws IOException { + final AsyncRequestBody body; + if (Objects.equals(RemoteStorageMode.S3, properties.get(S3Constants.MODE))) { + body = AsyncRequestBody.fromInputStream(input, null, executor); + } else { + // for GCP we need to know the length in advance, else it won't work. + final long length; + if (input instanceof FileInputStream) { + final FileInputStream fis = (FileInputStream) input; + // if the file is modified after opening, the size may not reflect the latest changes + length = fis.getChannel().size(); + body = AsyncRequestBody.fromInputStream(input, length, executor); + } else if (input instanceof ByteArrayInputStream) { + length = input.available(); + body = AsyncRequestBody.fromInputStream(input, length, executor); + } else if (input.markSupported()) { + // in case the inputStream supports mark & reset + input.mark(Integer.MAX_VALUE); + length = IOUtils.consume(input); + input.reset(); + body = AsyncRequestBody.fromInputStream(input, length, executor); + } else { + // we have to read all the stream to get the actual length + // last else block: store to temp file and re-read + final File tempFile = File.createTempFile("inputstream-", ".tmp"); + tempFile.deleteOnExit(); // Clean up after JVM exits + + try (OutputStream out = new FileOutputStream(tempFile)) { + IOUtils.copy(input, out); // Copy all bytes to file + } + // Get length from file + length = tempFile.length(); + // Re-create InputStream from temp file + body = AsyncRequestBody.fromInputStream(new FileInputStream(tempFile), length, executor); + } + builder.contentLength(length); + } + return body; + } + /** * The class renames object key in S3 in a thread. */ diff --git a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/Utils.java b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/Utils.java index 9f5a51a15d..6a60793340 100644 --- a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/Utils.java +++ b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/Utils.java @@ -31,6 +31,7 @@ import java.util.regex.Pattern; import org.apache.jackrabbit.oak.blob.cloud.s3.S3Backend.RemoteStorageMode; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +41,8 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.auth.signer.AwsS3V4Signer; +import software.amazon.awssdk.core.checksums.RequestChecksumCalculation; +import software.amazon.awssdk.core.checksums.ResponseChecksumValidation; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; import software.amazon.awssdk.core.exception.SdkClientException; @@ -48,6 +51,7 @@ import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.http.apache.ProxyConfiguration; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -55,6 +59,7 @@ import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; import software.amazon.awssdk.services.s3.S3BaseClientBuilder; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.S3Configuration; import software.amazon.awssdk.services.s3.presigner.S3Presigner; import software.amazon.awssdk.utils.BinaryUtils; import software.amazon.awssdk.utils.Md5Utils; @@ -130,11 +135,21 @@ public final class Utils { */ public static S3AsyncClient openAsyncService(final Properties prop) { S3AsyncClientBuilder builder = S3AsyncClient.builder(); + boolean isS3 = Objects.equals(RemoteStorageMode.S3, prop.get(S3Constants.MODE)); configureBuilder(builder, prop, false); // async http client builder.httpClient(getSdkAsyncHttpClient(prop)); - builder.multipartEnabled(true); + + // AWS-specific optimizations + if (isS3) { + builder.multipartEnabled(true); + builder.multipartConfiguration(c -> c + .minimumPartSizeInBytes(5L * 1024 * 1024) // 5MB minimum + .thresholdInBytes(10L * 1024 * 1024)); // 10MB threshold + } else { + builder.multipartEnabled(false); // GCP doesn't support S3 multipart + } return builder.build(); } @@ -151,9 +166,14 @@ public final class Utils { * @return a configured {@link S3Presigner} instance */ public static S3Presigner createPresigner(final S3Client s3Client, final Properties props) { - return S3Presigner.builder().s3Client(s3Client). - credentialsProvider(Utils.getAwsCredentials(props)) + final boolean isGCP = Objects.equals(RemoteStorageMode.GCP, props.get(S3Constants.MODE)); + return S3Presigner.builder().s3Client(s3Client) + .credentialsProvider(Utils.getAwsCredentials(props)) .region(Region.of(Utils.getRegion(props))) + .serviceConfiguration(S3Configuration.builder() + .pathStyleAccessEnabled(isGCP) + .chunkedEncodingEnabled(!isGCP) + .build()) .build(); } @@ -257,7 +277,7 @@ public final class Utils { String region = null; if (Objects.nonNull(prop.getProperty(S3Constants.S3_END_POINT))) { - region = getRegionFromEndpoint(prop.getProperty(S3Constants.S3_END_POINT)); + region = getRegionFromEndpoint(prop.getProperty(S3Constants.S3_END_POINT), prop.getProperty(S3Constants.S3_CONN_PROTOCOL)); } if (Objects.nonNull(region)) { @@ -283,6 +303,7 @@ public final class Utils { * <ul> * <li>https://s3.eu-west-1.amazonaws.com</li> * <li>https://bucket.s3.eu-west-1.amazonaws.com</li> + * <li>s3.eu-west-1.amazonaws.com</li> * <li>https://s3.amazonaws.com (returns us-east-1)</li> * </ul> * If the region cannot be determined, returns null. @@ -290,9 +311,9 @@ public final class Utils { * @param endpoint the S3 endpoint URL as a string * @return the AWS region string, or null if not found */ - static String getRegionFromEndpoint(final String endpoint) { + static String getRegionFromEndpoint(final String endpoint, String protocol) { try { - URI uri = URI.create(endpoint); + URI uri = getEndPointUri(endpoint, protocol); String host = uri.getHost(); // Pattern for standard S3 endpoints: s3.region.amazonaws.com or bucket.s3.region.amazonaws.com @@ -353,12 +374,27 @@ public final class Utils { */ // Check if endpoint already contains protocol + return getEndPointUri(endPoint, prop.getProperty(S3Constants.S3_CONN_PROTOCOL)); + } + + /** + * Constructs a URI for the S3 endpoint using the provided endpoint string and protocol. + * <p> + * If the endpoint string already contains a protocol (`http://` or `https://`), it is used directly. + * Otherwise, the specified protocol (or "https" if null/empty) is prepended to the endpoint. + * </p> + * + * @param endPoint the S3 endpoint string (may or may not include protocol) + * @param protocol the protocol to use ("http" or "https"); defaults to "https" if null or empty + * @return the constructed {@link URI} for the S3 endpoint + */ + @NotNull + private static URI getEndPointUri(final String endPoint, @Nullable String protocol) { if (endPoint.startsWith("http://") || endPoint.startsWith("https://")) { LOG.info("S3 service endpoint [{}] ", endPoint); return URI.create(endPoint); } - String protocol = prop.getProperty(S3Constants.S3_CONN_PROTOCOL); if (protocol == null || protocol.isEmpty()) { protocol = HTTPS; // default protocol } @@ -373,16 +409,24 @@ public final class Utils { } private static ClientOverrideConfiguration getClientConfiguration(Properties prop) { + final boolean isS3 = Objects.equals(RemoteStorageMode.S3, prop.get(S3Constants.MODE)); + int maxErrorRetry = Integer.parseInt(prop.getProperty(S3Constants.S3_MAX_ERR_RETRY)); int connectionTimeOut = Integer.parseInt(prop.getProperty(S3Constants.S3_CONN_TIMEOUT)); String encryptionType = prop.getProperty(S3Constants.S3_ENCRYPTION); + // API timeout should be much longer than connection timeout for large file uploads + // Use at least 5 minutes, or 10x connection timeout, whichever is larger + int apiTimeout = Math.max(connectionTimeOut * 10, 300000); // At least 5 minutes + ClientOverrideConfiguration.Builder builder = ClientOverrideConfiguration.builder(); builder.retryStrategy(b -> b.maxAttempts(maxErrorRetry)); - builder.apiCallTimeout(Duration.ofMillis(connectionTimeOut)); + builder.apiCallTimeout(Duration.ofMillis(apiTimeout)); // Long timeout for large uploads + builder.apiCallAttemptTimeout(Duration.ofMillis(connectionTimeOut)); // Per-attempt timeout - if (S3Constants.S3_ENCRYPTION_SSE_KMS.equals(encryptionType)) { + // Only use KMS signer for AWS S3, not for GCP + if (isS3 && S3Constants.S3_ENCRYPTION_SSE_KMS.equals(encryptionType)) { builder.putAdvancedOption(SdkAdvancedClientOption.SIGNER, AwsS3V4Signer.create()); } return builder.build(); @@ -391,10 +435,28 @@ public final class Utils { private static SdkHttpClient getSdkHttpClient(Properties prop) { HttpClientConfig config = new HttpClientConfig(prop); final ApacheHttpClient.Builder builder = ApacheHttpClient.builder(); + final boolean isGCP = Objects.equals(RemoteStorageMode.GCP, prop.get(S3Constants.MODE)); + + // Calculate connection lifecycle based on socket timeout (all in SECONDS) + long socketTimeoutSeconds = config.socketTimeout / 1000; - builder.connectionTimeout(Duration.ofMillis(config.connectionTimeout)) + // Idle time: 2x socket timeout (min 30s, max 120s) + long idleTimeSeconds = Math.min(120, Math.max(30, socketTimeoutSeconds * 2)); + + // TTL: 5x socket timeout (min 60s, max 600s = 10min) + long ttlSeconds = Math.min(600, Math.max(60, socketTimeoutSeconds * 5)); + + // GCP needs higher max connections (no HTTP/2) + int maxConnections = isGCP ? Math.max(100, config.maxConnections) : Math.max(50, config.maxConnections); + + builder.connectionTimeout(Duration.ofMillis(config.connectionTimeout)) // Connection timeouts .socketTimeout(Duration.ofMillis(config.socketTimeout)) - .maxConnections(config.maxConnections); + .maxConnections(maxConnections) // Connection pool + .connectionMaxIdleTime(Duration.ofSeconds(idleTimeSeconds)) + .connectionTimeToLive(Duration.ofSeconds(ttlSeconds)) + .useIdleConnectionReaper(true) + .tcpKeepAlive(true) // TCP keepalive + .expectContinueEnabled(true); // Expect-continue handshake (reduces overhead for large uploads) if (config.proxyHost != null && !config.proxyHost.isEmpty() && config.proxyPort != null && !config.proxyPort.isEmpty()) { String protocol = "http".equalsIgnoreCase(config.protocol) ? "http" : config.protocol; @@ -411,12 +473,38 @@ public final class Utils { private static SdkAsyncHttpClient getSdkAsyncHttpClient(Properties prop) { HttpClientConfig config = new HttpClientConfig(prop); + final boolean isGCP = Objects.equals(RemoteStorageMode.GCP, prop.get(S3Constants.MODE)); final NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder(); - builder.connectionTimeout(Duration.ofMillis(config.connectionTimeout)) + // Calculate connection lifecycle based on socket timeout (all in SECONDS) + long socketTimeoutSeconds = config.socketTimeout / 1000; + + // Idle time: 2x socket timeout (min 30s, max 120s) + long idleTimeSeconds = Math.min(120, Math.max(30, socketTimeoutSeconds * 2)); + + // TTL: 5x socket timeout (min 60s, max 600s = 10min) + long ttlSeconds = Math.min(600, Math.max(60, socketTimeoutSeconds * 5)); + + // GCP needs higher concurrency (no HTTP/2, so more connections needed) + final int concurrency = isGCP ? Math.max(100, config.maxConnections) : Math.max(50, config.maxConnections); + + // More threads for GCP + final int threads = isGCP ? Math.max(16, Runtime.getRuntime().availableProcessors() * 2) + : Math.max(4, Runtime.getRuntime().availableProcessors()); + + + builder.connectionTimeout(Duration.ofMillis(config.connectionTimeout)) // Connection timeouts .readTimeout(Duration.ofMillis(config.socketTimeout)) .writeTimeout(Duration.ofMillis(config.socketTimeout)) - .maxConcurrency(config.maxConnections); + .maxConcurrency(concurrency) // Connection pool - increased for better concurrency + .connectionMaxIdleTime(Duration.ofSeconds(idleTimeSeconds)) + .connectionTimeToLive(Duration.ofSeconds(ttlSeconds)) + .useIdleConnectionReaper(true) + .tcpKeepAlive(true) // TCP optimizations + .eventLoopGroup( + SdkEventLoopGroup.builder() + .numberOfThreads(threads) // Thread pool for Netty + .build()); if (config.proxyHost != null && !config.proxyHost.isEmpty() && config.proxyPort != null && !config.proxyPort.isEmpty()) { String protocol = HTTPS.equalsIgnoreCase(config.protocol) ? HTTPS : config.protocol; @@ -474,6 +562,8 @@ public final class Utils { } private static void configureBuilder(final S3BaseClientBuilder builder, final Properties prop, final boolean accReq) { + final boolean isGCP = Objects.equals(RemoteStorageMode.GCP, prop.get(S3Constants.MODE)); + builder.credentialsProvider(getAwsCredentials(prop)); builder.overrideConfiguration(getClientConfiguration(prop)); @@ -483,6 +573,20 @@ public final class Utils { builder.endpointOverride(getEndPointUri(prop, accReq, region)); builder.crossRegionAccessEnabled(Boolean.parseBoolean(prop.getProperty(S3Constants.S3_CROSS_REGION_ACCESS))); + + // Disable checksums (replaces deprecated checksumValidationEnabled) + if (isGCP) { + // disable checksum for GCP, not working with AWS sdk + builder.requestChecksumCalculation(RequestChecksumCalculation.WHEN_REQUIRED); + builder.responseChecksumValidation(ResponseChecksumValidation.WHEN_REQUIRED); + } + + builder.serviceConfiguration( + S3Configuration.builder() + .pathStyleAccessEnabled(isGCP) // enable for GCP + .chunkedEncodingEnabled(!isGCP) // Disable for GCP + .useArnRegionEnabled(!isGCP) // Disable for GCP + .build()); } // Helper class to hold common Http config diff --git a/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/UtilsTest.java b/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/UtilsTest.java index e47fe2af52..580c111e3c 100644 --- a/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/UtilsTest.java +++ b/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/UtilsTest.java @@ -111,32 +111,37 @@ public class UtilsTest { @Test public void testGetRegionFromStandardEndpoint() { - Assert.assertEquals("eu-west-1", Utils.getRegionFromEndpoint("https://s3.eu-west-1.amazonaws.com")); + Assert.assertEquals("eu-west-1", Utils.getRegionFromEndpoint("https://s3.eu-west-1.amazonaws.com", null)); } @Test public void testGetRegionFromVirtualHostedEndpoint() { - Assert.assertEquals("ap-south-1", Utils.getRegionFromEndpoint("https://bucket.s3.ap-south-1.amazonaws.com")); + Assert.assertEquals("ap-south-1", Utils.getRegionFromEndpoint("https://bucket.s3.ap-south-1.amazonaws.com", null)); } @Test public void testGetRegionFromUsEast1Endpoint() { - Assert.assertEquals("us-east-1", Utils.getRegionFromEndpoint("https://s3.amazonaws.com")); + Assert.assertEquals("us-east-1", Utils.getRegionFromEndpoint("https://s3.amazonaws.com", null)); } @Test public void testGetRegionFromVirtualHostedUsEast1() { - Assert.assertEquals("us-east-1", Utils.getRegionFromEndpoint("https://bucket.s3.amazonaws.com")); + Assert.assertEquals("us-east-1", Utils.getRegionFromEndpoint("https://bucket.s3.amazonaws.com", null)); } @Test public void testGetRegionFromInvalidEndpoint() { - Assert.assertNull(Utils.getRegionFromEndpoint("https://example.com")); + Assert.assertNull(Utils.getRegionFromEndpoint("https://example.com", null)); } @Test public void testGetRegionFromMalformedEndpoint() { - Assert.assertNull(Utils.getRegionFromEndpoint("not-a-valid-uri")); + Assert.assertNull(Utils.getRegionFromEndpoint("not-a-valid-uri", "https")); + } + + @Test + public void testGetRegionFromEndpointWithoutProtocol() { + Assert.assertEquals("us-east-1", Utils.getRegionFromEndpoint("s3.us-east-1.amazonaws.com", "https")); } @Test
