This is an automated email from the ASF dual-hosted git repository.
daim pushed a commit to branch 1.22
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/1.22 by this push:
new 0e54141ef6 OAK-12009 : provided support for GCP for new AWS sdk 2.x
(#2621) (#2673)
0e54141ef6 is described below
commit 0e54141ef69e26c635fa4f52c30c457a5a924ffe
Author: Rishabh Kumar <[email protected]>
AuthorDate: Mon Jan 5 13:22:38 2026 +0530
OAK-12009 : provided support for GCP for new AWS sdk 2.x (#2621) (#2673)
* OAK-12009 : provided support for GCP for new AWS sdk 2.x
* OAK-12009 : ircorporated review comments
* OAK-12009 : avoid reading files into memory
* OAK-12009 : extracted out getting length from InputStream to a separate
method
---
.../jackrabbit/oak/blob/cloud/s3/S3Backend.java | 58 ++++++++-
.../apache/jackrabbit/oak/blob/cloud/s3/Utils.java | 130 ++++++++++++++++++---
.../jackrabbit/oak/blob/cloud/s3/UtilsTest.java | 17 ++-
3 files changed, 182 insertions(+), 23 deletions(-)
diff --git
a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java
index 8d60832d43..75ef22df76 100644
---
a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java
+++
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/S3Backend.java
@@ -19,8 +19,11 @@ package org.apache.jackrabbit.oak.blob.cloud.s3;
import java.io.ByteArrayInputStream;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
@@ -369,7 +372,7 @@ public class S3Backend extends AbstractSharedBackend {
uploadReq.source(file).
putObjectRequest(
s3ReqDecorator.decorate(
-
PutObjectRequest.builder().bucket(bucket).key(key)
+
PutObjectRequest.builder().bucket(bucket).key(key).contentLength(file.length())
.build()))
.build());
@@ -520,12 +523,17 @@ public class S3Backend extends AbstractSharedBackend {
final ExecutorService executor = Executors.newSingleThreadExecutor();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ final PutObjectRequest.Builder builder = PutObjectRequest.builder()
+ .bucket(bucket)
+ .contentType("application/octet-stream")
+ .key(addMetaKeyPrefix(name));
+
// Specify `null` for the content length when you don't know the
content length.
- final AsyncRequestBody body =
AsyncRequestBody.fromInputStream(input, null, executor);
+ final AsyncRequestBody body = getRequestBody(input, executor,
builder);
final Upload upload = tmx.upload(uploadReq ->
uploadReq.requestBody(body).
putObjectRequest(
-
s3ReqDecorator.decorate(PutObjectRequest.builder().bucket(bucket).key(addMetaKeyPrefix(name)).build()))
+ s3ReqDecorator.decorate(builder.build()))
.build());
upload.completionFuture().join();
} catch (Exception e) {
@@ -560,7 +568,7 @@ public class S3Backend extends AbstractSharedBackend {
uploadReq.source(input).
putObjectRequest(
s3ReqDecorator.decorate(
-
PutObjectRequest.builder().bucket(bucket).key(addMetaKeyPrefix(name)).build()))
+
PutObjectRequest.builder().bucket(bucket).contentLength(input.length()).key(addMetaKeyPrefix(name)).build()))
.build());
upload.completionFuture().join();
@@ -1330,6 +1338,48 @@ public class S3Backend extends AbstractSharedBackend {
return key.substring(0, 4) + key.substring(5);
}
+ @NotNull
+ private AsyncRequestBody getRequestBody(final InputStream input, final
ExecutorService executor,
+ final PutObjectRequest.Builder
builder) throws IOException {
+ final AsyncRequestBody body;
+ if (Objects.equals(RemoteStorageMode.S3,
properties.get(S3Constants.MODE))) {
+ body = AsyncRequestBody.fromInputStream(input, null, executor);
+ } else {
+ // for GCP we need to know the length in advance, else it won't
work.
+ final long length;
+ if (input instanceof FileInputStream) {
+ final FileInputStream fis = (FileInputStream) input;
+ // if the file is modified after opening, the size may not
reflect the latest changes
+ length = fis.getChannel().size();
+ body = AsyncRequestBody.fromInputStream(input, length,
executor);
+ } else if (input instanceof ByteArrayInputStream) {
+ length = input.available();
+ body = AsyncRequestBody.fromInputStream(input, length,
executor);
+ } else if (input.markSupported()) {
+ // in case the inputStream supports mark & reset
+ input.mark(Integer.MAX_VALUE);
+ length = IOUtils.consume(input);
+ input.reset();
+ body = AsyncRequestBody.fromInputStream(input, length,
executor);
+ } else {
+ // we have to read all the stream to get the actual length
+ // last else block: store to temp file and re-read
+ final File tempFile = File.createTempFile("inputstream-",
".tmp");
+ tempFile.deleteOnExit(); // Clean up after JVM exits
+
+ try (OutputStream out = new FileOutputStream(tempFile)) {
+ IOUtils.copy(input, out); // Copy all bytes to file
+ }
+ // Get length from file
+ length = tempFile.length();
+ // Re-create InputStream from temp file
+ body = AsyncRequestBody.fromInputStream(new
FileInputStream(tempFile), length, executor);
+ }
+ builder.contentLength(length);
+ }
+ return body;
+ }
+
/**
* The class renames object key in S3 in a thread.
*/
diff --git
a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/Utils.java
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/Utils.java
index 9f5a51a15d..6a60793340 100644
---
a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/Utils.java
+++
b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/s3/Utils.java
@@ -31,6 +31,7 @@ import java.util.regex.Pattern;
import org.apache.jackrabbit.oak.blob.cloud.s3.S3Backend.RemoteStorageMode;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +41,8 @@ import
software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.auth.signer.AwsS3V4Signer;
+import software.amazon.awssdk.core.checksums.RequestChecksumCalculation;
+import software.amazon.awssdk.core.checksums.ResponseChecksumValidation;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.exception.SdkClientException;
@@ -48,6 +51,7 @@ import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.apache.ProxyConfiguration;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain;
import software.amazon.awssdk.services.s3.S3AsyncClient;
@@ -55,6 +59,7 @@ import
software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3BaseClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.presigner.S3Presigner;
import software.amazon.awssdk.utils.BinaryUtils;
import software.amazon.awssdk.utils.Md5Utils;
@@ -130,11 +135,21 @@ public final class Utils {
*/
public static S3AsyncClient openAsyncService(final Properties prop) {
S3AsyncClientBuilder builder = S3AsyncClient.builder();
+ boolean isS3 = Objects.equals(RemoteStorageMode.S3,
prop.get(S3Constants.MODE));
configureBuilder(builder, prop, false);
// async http client
builder.httpClient(getSdkAsyncHttpClient(prop));
- builder.multipartEnabled(true);
+
+ // AWS-specific optimizations
+ if (isS3) {
+ builder.multipartEnabled(true);
+ builder.multipartConfiguration(c -> c
+ .minimumPartSizeInBytes(5L * 1024 * 1024) // 5MB minimum
+ .thresholdInBytes(10L * 1024 * 1024)); // 10MB
threshold
+ } else {
+ builder.multipartEnabled(false); // GCP doesn't support S3
multipart
+ }
return builder.build();
}
@@ -151,9 +166,14 @@ public final class Utils {
* @return a configured {@link S3Presigner} instance
*/
public static S3Presigner createPresigner(final S3Client s3Client, final
Properties props) {
- return S3Presigner.builder().s3Client(s3Client).
- credentialsProvider(Utils.getAwsCredentials(props))
+ final boolean isGCP = Objects.equals(RemoteStorageMode.GCP,
props.get(S3Constants.MODE));
+ return S3Presigner.builder().s3Client(s3Client)
+ .credentialsProvider(Utils.getAwsCredentials(props))
.region(Region.of(Utils.getRegion(props)))
+ .serviceConfiguration(S3Configuration.builder()
+ .pathStyleAccessEnabled(isGCP)
+ .chunkedEncodingEnabled(!isGCP)
+ .build())
.build();
}
@@ -257,7 +277,7 @@ public final class Utils {
String region = null;
if (Objects.nonNull(prop.getProperty(S3Constants.S3_END_POINT))) {
- region =
getRegionFromEndpoint(prop.getProperty(S3Constants.S3_END_POINT));
+ region =
getRegionFromEndpoint(prop.getProperty(S3Constants.S3_END_POINT),
prop.getProperty(S3Constants.S3_CONN_PROTOCOL));
}
if (Objects.nonNull(region)) {
@@ -283,6 +303,7 @@ public final class Utils {
* <ul>
* <li>https://s3.eu-west-1.amazonaws.com</li>
* <li>https://bucket.s3.eu-west-1.amazonaws.com</li>
+ * <li>s3.eu-west-1.amazonaws.com</li>
* <li>https://s3.amazonaws.com (returns us-east-1)</li>
* </ul>
* If the region cannot be determined, returns null.
@@ -290,9 +311,9 @@ public final class Utils {
* @param endpoint the S3 endpoint URL as a string
* @return the AWS region string, or null if not found
*/
- static String getRegionFromEndpoint(final String endpoint) {
+ static String getRegionFromEndpoint(final String endpoint, String
protocol) {
try {
- URI uri = URI.create(endpoint);
+ URI uri = getEndPointUri(endpoint, protocol);
String host = uri.getHost();
// Pattern for standard S3 endpoints: s3.region.amazonaws.com or
bucket.s3.region.amazonaws.com
@@ -353,12 +374,27 @@ public final class Utils {
*/
// Check if endpoint already contains protocol
+ return getEndPointUri(endPoint,
prop.getProperty(S3Constants.S3_CONN_PROTOCOL));
+ }
+
+ /**
+ * Constructs a URI for the S3 endpoint using the provided endpoint string
and protocol.
+ * <p>
+ * If the endpoint string already contains a protocol (`http://` or
`https://`), it is used directly.
+ * Otherwise, the specified protocol (or "https" if null/empty) is
prepended to the endpoint.
+ * </p>
+ *
+ * @param endPoint the S3 endpoint string (may or may not include protocol)
+ * @param protocol the protocol to use ("http" or "https"); defaults to
"https" if null or empty
+ * @return the constructed {@link URI} for the S3 endpoint
+ */
+ @NotNull
+ private static URI getEndPointUri(final String endPoint, @Nullable String
protocol) {
if (endPoint.startsWith("http://") || endPoint.startsWith("https://"))
{
LOG.info("S3 service endpoint [{}] ", endPoint);
return URI.create(endPoint);
}
- String protocol = prop.getProperty(S3Constants.S3_CONN_PROTOCOL);
if (protocol == null || protocol.isEmpty()) {
protocol = HTTPS; // default protocol
}
@@ -373,16 +409,24 @@ public final class Utils {
}
private static ClientOverrideConfiguration
getClientConfiguration(Properties prop) {
+ final boolean isS3 = Objects.equals(RemoteStorageMode.S3,
prop.get(S3Constants.MODE));
+
int maxErrorRetry =
Integer.parseInt(prop.getProperty(S3Constants.S3_MAX_ERR_RETRY));
int connectionTimeOut =
Integer.parseInt(prop.getProperty(S3Constants.S3_CONN_TIMEOUT));
String encryptionType = prop.getProperty(S3Constants.S3_ENCRYPTION);
+ // API timeout should be much longer than connection timeout for large
file uploads
+ // Use at least 5 minutes, or 10x connection timeout, whichever is
larger
+ int apiTimeout = Math.max(connectionTimeOut * 10, 300000); // At least
5 minutes
+
ClientOverrideConfiguration.Builder builder =
ClientOverrideConfiguration.builder();
builder.retryStrategy(b -> b.maxAttempts(maxErrorRetry));
- builder.apiCallTimeout(Duration.ofMillis(connectionTimeOut));
+ builder.apiCallTimeout(Duration.ofMillis(apiTimeout)); // Long timeout
for large uploads
+ builder.apiCallAttemptTimeout(Duration.ofMillis(connectionTimeOut));
// Per-attempt timeout
- if (S3Constants.S3_ENCRYPTION_SSE_KMS.equals(encryptionType)) {
+ // Only use KMS signer for AWS S3, not for GCP
+ if (isS3 && S3Constants.S3_ENCRYPTION_SSE_KMS.equals(encryptionType)) {
builder.putAdvancedOption(SdkAdvancedClientOption.SIGNER,
AwsS3V4Signer.create());
}
return builder.build();
@@ -391,10 +435,28 @@ public final class Utils {
private static SdkHttpClient getSdkHttpClient(Properties prop) {
HttpClientConfig config = new HttpClientConfig(prop);
final ApacheHttpClient.Builder builder = ApacheHttpClient.builder();
+ final boolean isGCP = Objects.equals(RemoteStorageMode.GCP,
prop.get(S3Constants.MODE));
+
+ // Calculate connection lifecycle based on socket timeout (all in
SECONDS)
+ long socketTimeoutSeconds = config.socketTimeout / 1000;
- builder.connectionTimeout(Duration.ofMillis(config.connectionTimeout))
+ // Idle time: 2x socket timeout (min 30s, max 120s)
+ long idleTimeSeconds = Math.min(120, Math.max(30, socketTimeoutSeconds
* 2));
+
+ // TTL: 5x socket timeout (min 60s, max 600s = 10min)
+ long ttlSeconds = Math.min(600, Math.max(60, socketTimeoutSeconds *
5));
+
+ // GCP needs higher max connections (no HTTP/2)
+ int maxConnections = isGCP ? Math.max(100, config.maxConnections) :
Math.max(50, config.maxConnections);
+
+ builder.connectionTimeout(Duration.ofMillis(config.connectionTimeout))
// Connection timeouts
.socketTimeout(Duration.ofMillis(config.socketTimeout))
- .maxConnections(config.maxConnections);
+ .maxConnections(maxConnections) // Connection pool
+ .connectionMaxIdleTime(Duration.ofSeconds(idleTimeSeconds))
+ .connectionTimeToLive(Duration.ofSeconds(ttlSeconds))
+ .useIdleConnectionReaper(true)
+ .tcpKeepAlive(true) // TCP keepalive
+ .expectContinueEnabled(true); // Expect-continue handshake
(reduces overhead for large uploads)
if (config.proxyHost != null && !config.proxyHost.isEmpty() &&
config.proxyPort != null && !config.proxyPort.isEmpty()) {
String protocol = "http".equalsIgnoreCase(config.protocol) ?
"http" : config.protocol;
@@ -411,12 +473,38 @@ public final class Utils {
private static SdkAsyncHttpClient getSdkAsyncHttpClient(Properties prop) {
HttpClientConfig config = new HttpClientConfig(prop);
+ final boolean isGCP = Objects.equals(RemoteStorageMode.GCP,
prop.get(S3Constants.MODE));
final NettyNioAsyncHttpClient.Builder builder =
NettyNioAsyncHttpClient.builder();
- builder.connectionTimeout(Duration.ofMillis(config.connectionTimeout))
+ // Calculate connection lifecycle based on socket timeout (all in
SECONDS)
+ long socketTimeoutSeconds = config.socketTimeout / 1000;
+
+ // Idle time: 2x socket timeout (min 30s, max 120s)
+ long idleTimeSeconds = Math.min(120, Math.max(30, socketTimeoutSeconds
* 2));
+
+ // TTL: 5x socket timeout (min 60s, max 600s = 10min)
+ long ttlSeconds = Math.min(600, Math.max(60, socketTimeoutSeconds *
5));
+
+ // GCP needs higher concurrency (no HTTP/2, so more connections needed)
+ final int concurrency = isGCP ? Math.max(100, config.maxConnections) :
Math.max(50, config.maxConnections);
+
+ // More threads for GCP
+ final int threads = isGCP ? Math.max(16,
Runtime.getRuntime().availableProcessors() * 2)
+ : Math.max(4, Runtime.getRuntime().availableProcessors());
+
+
+ builder.connectionTimeout(Duration.ofMillis(config.connectionTimeout))
// Connection timeouts
.readTimeout(Duration.ofMillis(config.socketTimeout))
.writeTimeout(Duration.ofMillis(config.socketTimeout))
- .maxConcurrency(config.maxConnections);
+ .maxConcurrency(concurrency) // Connection pool - increased
for better concurrency
+ .connectionMaxIdleTime(Duration.ofSeconds(idleTimeSeconds))
+ .connectionTimeToLive(Duration.ofSeconds(ttlSeconds))
+ .useIdleConnectionReaper(true)
+ .tcpKeepAlive(true) // TCP optimizations
+ .eventLoopGroup(
+ SdkEventLoopGroup.builder()
+ .numberOfThreads(threads) // Thread pool for
Netty
+ .build());
if (config.proxyHost != null && !config.proxyHost.isEmpty() &&
config.proxyPort != null && !config.proxyPort.isEmpty()) {
String protocol = HTTPS.equalsIgnoreCase(config.protocol) ? HTTPS
: config.protocol;
@@ -474,6 +562,8 @@ public final class Utils {
}
private static void configureBuilder(final S3BaseClientBuilder builder,
final Properties prop, final boolean accReq) {
+ final boolean isGCP = Objects.equals(RemoteStorageMode.GCP,
prop.get(S3Constants.MODE));
+
builder.credentialsProvider(getAwsCredentials(prop));
builder.overrideConfiguration(getClientConfiguration(prop));
@@ -483,6 +573,20 @@ public final class Utils {
builder.endpointOverride(getEndPointUri(prop, accReq, region));
builder.crossRegionAccessEnabled(Boolean.parseBoolean(prop.getProperty(S3Constants.S3_CROSS_REGION_ACCESS)));
+
+ // Disable checksums (replaces deprecated checksumValidationEnabled)
+ if (isGCP) {
+ // disable checksum for GCP, not working with AWS sdk
+
builder.requestChecksumCalculation(RequestChecksumCalculation.WHEN_REQUIRED);
+
builder.responseChecksumValidation(ResponseChecksumValidation.WHEN_REQUIRED);
+ }
+
+ builder.serviceConfiguration(
+ S3Configuration.builder()
+ .pathStyleAccessEnabled(isGCP) // enable for GCP
+ .chunkedEncodingEnabled(!isGCP) // Disable for GCP
+ .useArnRegionEnabled(!isGCP) // Disable for GCP
+ .build());
}
// Helper class to hold common Http config
diff --git
a/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/UtilsTest.java
b/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/UtilsTest.java
index e47fe2af52..580c111e3c 100644
---
a/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/UtilsTest.java
+++
b/oak-blob-cloud/src/test/java/org/apache/jackrabbit/oak/blob/cloud/s3/UtilsTest.java
@@ -111,32 +111,37 @@ public class UtilsTest {
@Test
public void testGetRegionFromStandardEndpoint() {
- Assert.assertEquals("eu-west-1",
Utils.getRegionFromEndpoint("https://s3.eu-west-1.amazonaws.com"));
+ Assert.assertEquals("eu-west-1",
Utils.getRegionFromEndpoint("https://s3.eu-west-1.amazonaws.com", null));
}
@Test
public void testGetRegionFromVirtualHostedEndpoint() {
- Assert.assertEquals("ap-south-1",
Utils.getRegionFromEndpoint("https://bucket.s3.ap-south-1.amazonaws.com"));
+ Assert.assertEquals("ap-south-1",
Utils.getRegionFromEndpoint("https://bucket.s3.ap-south-1.amazonaws.com",
null));
}
@Test
public void testGetRegionFromUsEast1Endpoint() {
- Assert.assertEquals("us-east-1",
Utils.getRegionFromEndpoint("https://s3.amazonaws.com"));
+ Assert.assertEquals("us-east-1",
Utils.getRegionFromEndpoint("https://s3.amazonaws.com", null));
}
@Test
public void testGetRegionFromVirtualHostedUsEast1() {
- Assert.assertEquals("us-east-1",
Utils.getRegionFromEndpoint("https://bucket.s3.amazonaws.com"));
+ Assert.assertEquals("us-east-1",
Utils.getRegionFromEndpoint("https://bucket.s3.amazonaws.com", null));
}
@Test
public void testGetRegionFromInvalidEndpoint() {
- Assert.assertNull(Utils.getRegionFromEndpoint("https://example.com"));
+ Assert.assertNull(Utils.getRegionFromEndpoint("https://example.com",
null));
}
@Test
public void testGetRegionFromMalformedEndpoint() {
- Assert.assertNull(Utils.getRegionFromEndpoint("not-a-valid-uri"));
+ Assert.assertNull(Utils.getRegionFromEndpoint("not-a-valid-uri",
"https"));
+ }
+
+ @Test
+ public void testGetRegionFromEndpointWithoutProtocol() {
+ Assert.assertEquals("us-east-1",
Utils.getRegionFromEndpoint("s3.us-east-1.amazonaws.com", "https"));
}
@Test