>From Hussain Towaileb <[email protected]>:

Hussain Towaileb has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21192?usp=email )


Change subject: [NO ISSUE][EXT]: add knobs to s3 parquet
......................................................................

[NO ISSUE][EXT]: add knobs to s3 parquet

Ext-ref: MB-71701
Change-Id: I404dd7dc013944409482b8e4d77938e843ed2ba8
---
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
A 
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/util/aws/s3/S3UtilsTest.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
5 files changed, 255 insertions(+), 7 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/92/21192/1

diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
index 9a183bd..1dc8a2c 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
@@ -24,6 +24,7 @@
 import static 
org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER;
 import static 
org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER_BYTE_UNIT;
 import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
+import static 
org.apache.hyracks.control.common.config.OptionTypes.getAcceptedStringValuesOptionType;
 import static 
org.apache.hyracks.control.common.config.OptionTypes.getRangedIntegerType;

 import org.apache.hyracks.api.config.IOption;
@@ -81,7 +82,18 @@
                 getRangedIntegerType(60, 3600),
                 900,
                 "GCS impersonating service account duration in seconds. "
-                        + "Range from 60 seconds (1 min) to 3600 seconds (1 
hour)");
+                        + "Range from 60 seconds (1 min) to 3600 seconds (1 
hour)"),
+        AWS_S3_STREAM_TYPE(
+                getAcceptedStringValuesOptionType("auto", "analytics", 
"classic"),
+                "auto",
+                "The stream type used to read from S3. Valid values are: auto, 
analytics, classic. "
+                        + "auto (default): uses analytics for AWS S3, classic 
for custom endpoints."),
+        AWS_S3_CHANGE_DETECTION_MODE(
+                getAcceptedStringValuesOptionType("auto", "server", "none", 
"client"),
+                "auto",
+                "The change detection mode used for S3. Valid values are: 
auto, server, client, none. "
+                        + "auto (default): uses none for classic streams with 
custom endpoints; otherwise "
+                        + "Hadoop's default is used.");

         private final IOptionType type;
         private final Object defaultValue;
@@ -114,6 +126,8 @@
                 case AWS_ASSUME_ROLE_PREFETCH_TIME:
                 case AWS_ASSUME_ROLE_ASYNC_REFRESH_ENABLED:
                 case GCP_IMPERSONATE_SERVICE_ACCOUNT_DURATION:
+                case AWS_S3_STREAM_TYPE:
+                case AWS_S3_CHANGE_DETECTION_MODE:
                     return Section.COMMON;
                 case CC_JAVA_OPTS:
                 case NC_JAVA_OPTS:
@@ -214,4 +228,12 @@
     public int getGcpImpersonateServiceAccountDuration() {
         return 
accessor.getInt(Option.GCP_IMPERSONATE_SERVICE_ACCOUNT_DURATION);
     }
+
+    public String getAwsS3StreamType() {
+        return accessor.getString(Option.AWS_S3_STREAM_TYPE);
+    }
+
+    public String getAwsS3ChangeDetectionMode() {
+        return accessor.getString(Option.AWS_S3_CHANGE_DETECTION_MODE);
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
index 2e5d213..fb4627e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
@@ -52,6 +52,8 @@
     public static final String HADOOP_INPUT_STREAM_TYPE = 
"fs.s3a.input.stream.type";
     public static final String HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC = 
"classic";
     public static final String HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS = 
"analytics";
+    public static final String HADOOP_CHANGE_DETECTION_MODE = 
"fs.s3a.change.detection.mode";
+    public static final String HADOOP_CHANGE_DETECTION_MODE_VAL_NONE = "none";

     /*
      * Internal configurations
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index 07424b1..c0ebb21 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -54,6 +54,7 @@
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_REGION;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_DURATION;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_NAME;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CHANGE_DETECTION_MODE_VAL_NONE;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIAL_PROVIDER_KEY;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE;
@@ -134,6 +135,7 @@

 public class S3Utils {
     private static final Logger LOGGER = LogManager.getLogger();
+    private static final String AUTO_SETTING_VALUE = "auto";

     private static final class StaticTrustManagersProvider implements 
TlsTrustManagersProvider {
         private final TrustManager[] trustManagers;
@@ -285,13 +287,37 @@
         if (serviceEndpoint != null) {
             // Validation of the URL should be done at hadoop-aws level
             jobConf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint);
+        }

-            // The analytics-accelerator stream factory (default in Hadoop 
3.4+) performs a HeadObject call during
-            // stream initialization to fetch the ETag. Non-AWS S3-compatible 
endpoints (e.g. mock servers) may not
-            // return an ETag on HeadObject, which causes a 
NullPointerException. Fall back to the classic stream
-            // implementation when a custom service endpoint is in use.
-            // TODO: make configurable
-            jobConf.set(HADOOP_INPUT_STREAM_TYPE, 
HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC);
+        String configuredInputStreamType = 
normalizeConfigValue(appCtx.getExternalProperties().getAwsS3StreamType());
+        String resolvedInputStreamType;
+        if (isAuto(configuredInputStreamType)) {
+            // Auto mode: decide based on endpoint
+            if (serviceEndpoint != null) {
+                // The analytics-accelerator stream factory (default in Hadoop 
3.4+) performs a HeadObject call during
+                // stream initialization to fetch the ETag. Non-AWS 
S3-compatible endpoints may not return an ETag on
+                // HeadObject, which causes a NullPointerException. Fall back 
to the classic stream
+                // implementation when a custom service endpoint is in use.
+                resolvedInputStreamType = HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC;
+            } else {
+                resolvedInputStreamType = 
S3Constants.HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS;
+            }
+        } else {
+            // Explicit override: use the user-specified stream type
+            resolvedInputStreamType = configuredInputStreamType;
+        }
+        jobConf.set(HADOOP_INPUT_STREAM_TYPE, resolvedInputStreamType);
+
+        String configuredChangeDetectionMode =
+                
normalizeConfigValue(appCtx.getExternalProperties().getAwsS3ChangeDetectionMode());
+        if (isAuto(configuredChangeDetectionMode)) {
+            // If using a custom endpoint with classic streams, default to no 
change detection as
+            // S3-compatible implementations may not support ETag/version 
metadata consistently.
+            if (serviceEndpoint != null && 
HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC.equals(resolvedInputStreamType)) {
+                jobConf.set(S3Constants.HADOOP_CHANGE_DETECTION_MODE, 
HADOOP_CHANGE_DETECTION_MODE_VAL_NONE);
+            }
+        } else {
+            jobConf.set(S3Constants.HADOOP_CHANGE_DETECTION_MODE, 
configuredChangeDetectionMode);
         }

         boolean pathStyleAddressing =
@@ -743,4 +769,12 @@
                     "true, false");
         }
     }
+
+    private static boolean isAuto(String value) {
+        return value == null || value.isBlank() || 
AUTO_SETTING_VALUE.equalsIgnoreCase(value);
+    }
+
+    private static String normalizeConfigValue(String value) {
+        return value == null ? null : value.trim().toLowerCase();
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/util/aws/s3/S3UtilsTest.java
 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/util/aws/s3/S3UtilsTest.java
new file mode 100644
index 0000000..ea98c26
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/util/aws/s3/S3UtilsTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.aws.s3;
+
+import static 
org.apache.asterix.external.util.aws.AwsConstants.CROSS_REGION_FIELD_NAME;
+import static 
org.apache.asterix.external.util.aws.AwsConstants.REGION_FIELD_NAME;
+import static 
org.apache.asterix.external.util.aws.AwsConstants.SERVICE_END_POINT_FIELD_NAME;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CHANGE_DETECTION_MODE;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CHANGE_DETECTION_MODE_VAL_NONE;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS;
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.config.ExternalProperties;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class S3UtilsTest {
+
+    @Test
+    public void 
autoModeWithCustomEndpointDefaultsToClassicAndNoneChangeDetection() throws 
Exception {
+        IApplicationContext appCtx = mockAppContext("auto", "auto");
+        JobConf jobConf = new JobConf();
+
+        S3Utils.configureAwsS3HdfsJobConf(appCtx, jobConf, 
getConfiguration("http://localhost:9000";));
+
+        Assert.assertEquals(HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC, 
jobConf.get(HADOOP_INPUT_STREAM_TYPE));
+        Assert.assertEquals(HADOOP_CHANGE_DETECTION_MODE_VAL_NONE, 
jobConf.get(HADOOP_CHANGE_DETECTION_MODE));
+    }
+
+    @Test
+    public void 
autoModeWithoutCustomEndpointUsesAnalyticsAndHadoopDefaultChangeDetection() 
throws Exception {
+        IApplicationContext appCtx = mockAppContext("auto", "auto");
+        JobConf jobConf = new JobConf();
+
+        S3Utils.configureAwsS3HdfsJobConf(appCtx, jobConf, 
getConfiguration(null));
+
+        Assert.assertEquals(HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS, 
jobConf.get(HADOOP_INPUT_STREAM_TYPE));
+        Assert.assertEquals("server", 
jobConf.get(HADOOP_CHANGE_DETECTION_MODE));
+    }
+
+    @Test
+    public void explicitChangeDetectionModeOverridesAutoBehavior() throws 
Exception {
+        IApplicationContext appCtx = mockAppContext("auto", "server");
+        JobConf jobConf = new JobConf();
+
+        S3Utils.configureAwsS3HdfsJobConf(appCtx, jobConf, 
getConfiguration("http://localhost:9000";));
+
+        Assert.assertEquals(HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC, 
jobConf.get(HADOOP_INPUT_STREAM_TYPE));
+        Assert.assertEquals("server", 
jobConf.get(HADOOP_CHANGE_DETECTION_MODE));
+    }
+
+    @Test
+    public void autoChangeDetectionWithClassicOnAwsKeepsHadoopDefault() throws 
Exception {
+        IApplicationContext appCtx = mockAppContext("classic", "auto");
+        JobConf jobConf = new JobConf();
+
+        S3Utils.configureAwsS3HdfsJobConf(appCtx, jobConf, 
getConfiguration(null));
+
+        Assert.assertEquals(HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC, 
jobConf.get(HADOOP_INPUT_STREAM_TYPE));
+        Assert.assertEquals("server", 
jobConf.get(HADOOP_CHANGE_DETECTION_MODE));
+    }
+
+    @Test
+    public void autoChangeDetectionWithClassicOnCustomEndpointDefaultsToNone() 
throws Exception {
+        IApplicationContext appCtx = mockAppContext("classic", "auto");
+        JobConf jobConf = new JobConf();
+
+        S3Utils.configureAwsS3HdfsJobConf(appCtx, jobConf, 
getConfiguration("http://localhost:9000";));
+
+        Assert.assertEquals(HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC, 
jobConf.get(HADOOP_INPUT_STREAM_TYPE));
+        Assert.assertEquals(HADOOP_CHANGE_DETECTION_MODE_VAL_NONE, 
jobConf.get(HADOOP_CHANGE_DETECTION_MODE));
+    }
+
+    @Test
+    public void 
autoChangeDetectionWithAnalyticsOnCustomEndpointKeepsHadoopDefault() throws 
Exception {
+        IApplicationContext appCtx = mockAppContext("analytics", "auto");
+        JobConf jobConf = new JobConf();
+
+        S3Utils.configureAwsS3HdfsJobConf(appCtx, jobConf, 
getConfiguration("http://localhost:9000";));
+
+        Assert.assertEquals(HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS, 
jobConf.get(HADOOP_INPUT_STREAM_TYPE));
+        Assert.assertEquals("server", 
jobConf.get(HADOOP_CHANGE_DETECTION_MODE));
+    }
+
+    @Test
+    public void explicitNoneChangeDetectionIsAlwaysRespected() throws 
Exception {
+        IApplicationContext appCtx = mockAppContext("analytics", "none");
+        JobConf jobConf = new JobConf();
+
+        S3Utils.configureAwsS3HdfsJobConf(appCtx, jobConf, 
getConfiguration(null));
+
+        Assert.assertEquals(HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS, 
jobConf.get(HADOOP_INPUT_STREAM_TYPE));
+        Assert.assertEquals(HADOOP_CHANGE_DETECTION_MODE_VAL_NONE, 
jobConf.get(HADOOP_CHANGE_DETECTION_MODE));
+    }
+
+    @Test
+    public void 
normalizedClassicAndAutoValuesStillDefaultToNoneOnCustomEndpoint() throws 
Exception {
+        IApplicationContext appCtx = mockAppContext("  CLASSIC  ", "  AUTO  ");
+        JobConf jobConf = new JobConf();
+
+        S3Utils.configureAwsS3HdfsJobConf(appCtx, jobConf, 
getConfiguration("http://localhost:9000";));
+
+        Assert.assertEquals(HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC, 
jobConf.get(HADOOP_INPUT_STREAM_TYPE));
+        Assert.assertEquals(HADOOP_CHANGE_DETECTION_MODE_VAL_NONE, 
jobConf.get(HADOOP_CHANGE_DETECTION_MODE));
+    }
+
+    private static IApplicationContext mockAppContext(String streamType, 
String changeDetectionMode) {
+        IApplicationContext appCtx = Mockito.mock(IApplicationContext.class);
+        ExternalProperties externalProperties = 
Mockito.mock(ExternalProperties.class);
+        
Mockito.when(appCtx.getExternalProperties()).thenReturn(externalProperties);
+        
Mockito.when(externalProperties.getAwsS3StreamType()).thenReturn(streamType);
+        
Mockito.when(externalProperties.getAwsS3ChangeDetectionMode()).thenReturn(changeDetectionMode);
+        return appCtx;
+    }
+
+    private static Map<String, String> getConfiguration(String 
serviceEndpoint) {
+        Map<String, String> configuration = new HashMap<>();
+        configuration.put(REGION_FIELD_NAME, "us-east-1");
+        configuration.put(CROSS_REGION_FIELD_NAME, "false");
+        if (serviceEndpoint != null) {
+            configuration.put(SERVICE_END_POINT_FIELD_NAME, serviceEndpoint);
+        }
+        return configuration;
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
index df2d291..f626851 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
@@ -20,9 +20,12 @@

 import java.net.MalformedURLException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Stream;

 import org.apache.hyracks.api.config.IOptionType;
@@ -487,4 +490,43 @@
     public static final IOptionType<Long> getRangedLongByteUnit(long min, long 
max) {
         return new LongByteUnit(min, max);
     }
+
+    public static IOptionType<String> 
getAcceptedStringValuesOptionType(String... acceptedValues) {
+        return new AcceptedStringValuesOptionType(acceptedValues);
+    }
+
+    private static class AcceptedStringValuesOptionType implements 
IOptionType<String> {
+        private final Set<String> accepted;
+        private final String acceptedAsString;
+
+        private AcceptedStringValuesOptionType(String... acceptedValues) {
+            accepted = new HashSet<>(Arrays.asList(acceptedValues));
+            acceptedAsString = String.join(", ", acceptedValues);
+        }
+
+        @Override
+        public String parse(String value) {
+            String normalized = value.trim().toLowerCase();
+            if (!accepted.contains(normalized)) {
+                throw new IllegalArgumentException(
+                        "Invalid value '" + value + "'. Accepted values: " + 
acceptedAsString);
+            }
+            return normalized;
+        }
+
+        @Override
+        public String parse(JsonNode node) {
+            return node.isNull() ? null : parse(node.asText());
+        }
+
+        @Override
+        public Class<String> targetType() {
+            return String.class;
+        }
+
+        @Override
+        public void serializeJSONFieldUnsafe(String fieldName, Object value, 
ObjectNode node) {
+            node.put(fieldName, (String) value);
+        }
+    }
 }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21192?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: I404dd7dc013944409482b8e4d77938e843ed2ba8
Gerrit-Change-Number: 21192
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <[email protected]>

Reply via email to