>From Hussain Towaileb <[email protected]>:

Hussain Towaileb has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21326?usp=email )

Change subject: [NO ISSUE][EXT]: set inputStreamType and changeDetectionMode if 
missing
......................................................................

[NO ISSUE][EXT]: set inputStreamType and changeDetectionMode if missing

Ext-ref: MB-72281
Change-Id: I3fed787edf8414d4654caeceebc9f447ed93800e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21326
Tested-by: Jenkins <[email protected]>
Reviewed-by: Michael Blow <[email protected]>
Integration-Tests: Jenkins <[email protected]>
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
3 files changed, 28 insertions(+), 51 deletions(-)

Approvals:
  Michael Blow: Looks good to me, approved
  Jenkins: Verified; Verified




diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index b744a80..e94466c 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -116,6 +116,7 @@
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.WriterValidationUtil;
+import org.apache.asterix.external.util.aws.s3.S3Utils;
 import org.apache.asterix.external.util.iceberg.IcebergConstants;
 import org.apache.asterix.external.util.iceberg.IcebergUtils;
 import 
org.apache.asterix.external.writer.printer.parquet.SchemaConverterVisitor;
@@ -1118,7 +1119,7 @@
                     ExternalDataUtils.normalize(properties);
                     ExternalDataUtils.validate(properties);
                     ExternalDataUtils.validateType(properties, (ARecordType) 
itemType);
-                    beforeExternalCollectionValidation(properties);
+                    beforeExternalCollectionValidation(externalDetails, 
properties);

                     Map<String, String> propertiesCopy = new 
HashMap<>(properties);
                     validateIfIcebergTable(metadataProvider, 
requestParameters, propertiesCopy, mdTxnCtx, sourceLoc,
@@ -1241,10 +1242,15 @@
      *
      * @param properties external collection properties
      */
-    protected void beforeExternalCollectionValidation(Map<String, String> 
properties) {
+    protected void beforeExternalCollectionValidation(ExternalDetailsDecl 
externalDetailsDecl,
+            Map<String, String> properties) throws CompilationException {
         if (IcebergUtils.isIcebergTable(properties)) {
             IcebergUtils.setDefaults(properties);
         }
+        if (ExternalDataUtils.isParquetFormat(properties) || 
ExternalDataUtils.isDeltaTable(properties)) {
+            S3Utils.validateAndNormalizeStreamInputType(properties);
+            S3Utils.validateAndNormalizeChangeDetectionMode(properties);
+        }
     }

     protected void validateIfIcebergTable(MetadataProvider metadataProvider, 
IRequestParameters requestParameters,
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
index 8390576..c458dc3 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
@@ -49,17 +49,18 @@
     public static final String HADOOP_SERVICE_END_POINT = "fs.s3a.endpoint";
     public static final String HADOOP_REGION = "fs.s3a.endpoint.region";

+    // value to indicate we should use whatever the SDK defaults to
+    public static final String SDK_DEFAULT = "sdk_default";
+
     // input stream type
     public static final String INPUT_STREAM_TYPE_FIELD_NAME = 
"inputStreamType";
     public static final String HADOOP_INPUT_STREAM_TYPE = 
"fs.s3a.input.stream.type";
-    public static final String HADOOP_INPUT_STREAM_TYPE_VAL_AUTO = "auto";
     public static final String HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC = 
"classic";
     public static final String HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS = 
"analytics";

     // change detection mode
     public static final String CHANGE_DETECTION_MODE_FIELD_NAME = 
"changeDetectionMode";
     public static final String HADOOP_CHANGE_DETECTION_MODE = 
"fs.s3a.change.detection.mode";
-    public static final String HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO = "auto";
     public static final String HADOOP_CHANGE_DETECTION_MODE_VAL_SERVER = 
"server";
     public static final String HADOOP_CHANGE_DETECTION_MODE_VAL_CLIENT = 
"client";
     public static final String HADOOP_CHANGE_DETECTION_MODE_VAL_NONE = "none";
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index 0b00b08..7eb227c 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -56,15 +56,12 @@
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_REGION;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_DURATION;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_NAME;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CHANGE_DETECTION_MODE_VAL_CLIENT;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CHANGE_DETECTION_MODE_VAL_NONE;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CHANGE_DETECTION_MODE_VAL_SERVER;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIAL_PROVIDER_KEY;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE_VAL_AUTO;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INSTANCE_PROFILE;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_PATH_STYLE_ACCESS;
@@ -77,6 +74,7 @@
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_TEMPORARY;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.INPUT_STREAM_TYPE_FIELD_NAME;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.PATH_STYLE_ADDRESSING_FIELD_NAME;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.SDK_DEFAULT;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 import static 
org.apache.hyracks.util.annotations.AiProvenance.Agent.CLAUDE_SONNET_4_6;
 import static 
org.apache.hyracks.util.annotations.AiProvenance.Tool.GITHUB_COPILOT;
@@ -307,8 +305,8 @@
             jobConf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint);
         }

-        setInputStreamType(configuration, jobConf, serviceEndpoint);
-        setChangeDetectionMode(configuration, jobConf, serviceEndpoint);
+        setInputStreamType(configuration, jobConf);
+        setChangeDetectionMode(configuration, jobConf);

         boolean pathStyleAddressing =
                 
validateAndGetPathStyleAddressing(configuration.get(PATH_STYLE_ADDRESSING_FIELD_NAME),
 serviceEndpoint);
@@ -326,44 +324,16 @@
         }
     }

-    private static void setInputStreamType(Map<String, String> configuration, 
JobConf jobConf, String serviceEndpoint) {
+    private static void setInputStreamType(Map<String, String> configuration, 
JobConf jobConf) {
         String configuredInputStreamType = 
configuration.get(INPUT_STREAM_TYPE_FIELD_NAME);
-        if (configuredInputStreamType == null) {
-            configuredInputStreamType = HADOOP_INPUT_STREAM_TYPE_VAL_AUTO;
-        }
-
-        if 
(HADOOP_INPUT_STREAM_TYPE_VAL_AUTO.equals(configuredInputStreamType)) {
-            // Auto mode: decide based on endpoint
-            if (serviceEndpoint != null) {
-                // The analytics-accelerator stream factory (default in Hadoop 
3.4+) performs a HeadObject call during
-                // stream initialization to fetch the ETag. Non-AWS 
S3-compatible endpoints may not return an ETag on
-                // HeadObject, which causes a NullPointerException. Fall back 
to the classic stream
-                // implementation when a custom service endpoint is in use.
-                jobConf.set(HADOOP_INPUT_STREAM_TYPE, 
HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC);
-            } else {
-                jobConf.set(HADOOP_INPUT_STREAM_TYPE, 
HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS);
-            }
-        } else {
-            // Explicit override: use the user-specified stream type
-            jobConf.set(HADOOP_INPUT_STREAM_TYPE, configuredInputStreamType);
+        if (!Objects.equals(configuredInputStreamType, SDK_DEFAULT)) {
+            jobConf.set(S3Constants.HADOOP_INPUT_STREAM_TYPE, 
configuredInputStreamType);
         }
     }

-    private static void setChangeDetectionMode(Map<String, String> 
configuration, JobConf jobConf,
-            String serviceEndpoint) {
+    private static void setChangeDetectionMode(Map<String, String> 
configuration, JobConf jobConf) {
         String configuredChangeDetectionMode = 
configuration.get(CHANGE_DETECTION_MODE_FIELD_NAME);
-        if (configuredChangeDetectionMode == null) {
-            configuredChangeDetectionMode = 
HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO;
-        }
-
-        String inputStreamType = jobConf.get(HADOOP_INPUT_STREAM_TYPE);
-        if 
(HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO.equals(configuredChangeDetectionMode)) {
-            // If using a custom endpoint with classic streams, default to no 
change detection as
-            // S3-compatible implementations may not support ETag/version 
metadata consistently.
-            if (serviceEndpoint != null && 
HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC.equals(inputStreamType)) {
-                jobConf.set(S3Constants.HADOOP_CHANGE_DETECTION_MODE, 
HADOOP_CHANGE_DETECTION_MODE_VAL_NONE);
-            }
-        } else {
+        if (!Objects.equals(configuredChangeDetectionMode, SDK_DEFAULT)) {
             jobConf.set(S3Constants.HADOOP_CHANGE_DETECTION_MODE, 
configuredChangeDetectionMode);
         }
     }
@@ -851,16 +821,15 @@
             throws CompilationException {
         String streamInputType = 
configuration.get(INPUT_STREAM_TYPE_FIELD_NAME);
         if (streamInputType == null || streamInputType.isBlank()) {
-            configuration.put(INPUT_STREAM_TYPE_FIELD_NAME, 
HADOOP_INPUT_STREAM_TYPE_VAL_AUTO);
-            return;
+            streamInputType = SDK_DEFAULT;
+            configuration.put(INPUT_STREAM_TYPE_FIELD_NAME, streamInputType);
         }

-        if 
(!HADOOP_INPUT_STREAM_TYPE_VAL_AUTO.equalsIgnoreCase(streamInputType)
+        if (!SDK_DEFAULT.equalsIgnoreCase(streamInputType)
                 && 
!HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS.equalsIgnoreCase(streamInputType)
                 && 
!HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC.equalsIgnoreCase(streamInputType)) {
-            throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, 
INPUT_STREAM_TYPE_FIELD_NAME,
-                    HADOOP_INPUT_STREAM_TYPE_VAL_AUTO + ", " + 
HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS + ", "
-                            + HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC);
+            throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, 
INPUT_STREAM_TYPE_FIELD_NAME, SDK_DEFAULT
+                    + ", " + HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS + ", " + 
HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC);
         }
         configuration.put(INPUT_STREAM_TYPE_FIELD_NAME, 
streamInputType.toLowerCase());
     }
@@ -869,16 +838,17 @@
             throws CompilationException {
         String changeDetectionMode = 
configuration.get(CHANGE_DETECTION_MODE_FIELD_NAME);
         if (changeDetectionMode == null || changeDetectionMode.isBlank()) {
-            configuration.put(CHANGE_DETECTION_MODE_FIELD_NAME, 
HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO);
+            changeDetectionMode = SDK_DEFAULT;
+            configuration.put(CHANGE_DETECTION_MODE_FIELD_NAME, 
changeDetectionMode);
             return;
         }

-        if 
(!HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO.equalsIgnoreCase(changeDetectionMode)
+        if (!SDK_DEFAULT.equalsIgnoreCase(changeDetectionMode)
                 && 
!HADOOP_CHANGE_DETECTION_MODE_VAL_NONE.equalsIgnoreCase(changeDetectionMode)
                 && 
!HADOOP_CHANGE_DETECTION_MODE_VAL_CLIENT.equalsIgnoreCase(changeDetectionMode)
                 && 
!HADOOP_CHANGE_DETECTION_MODE_VAL_SERVER.equalsIgnoreCase(changeDetectionMode)) 
{
             throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, 
CHANGE_DETECTION_MODE_FIELD_NAME,
-                    HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO + ", " + 
HADOOP_CHANGE_DETECTION_MODE_VAL_NONE + ", "
+                    SDK_DEFAULT + ", " + HADOOP_CHANGE_DETECTION_MODE_VAL_NONE 
+ ", "
                             + HADOOP_CHANGE_DETECTION_MODE_VAL_CLIENT + ", " + 
HADOOP_CHANGE_DETECTION_MODE_VAL_SERVER);
         }
         configuration.put(CHANGE_DETECTION_MODE_FIELD_NAME, 
changeDetectionMode.toLowerCase());

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21326?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: merged
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: I3fed787edf8414d4654caeceebc9f447ed93800e
Gerrit-Change-Number: 21326
Gerrit-PatchSet: 4
Gerrit-Owner: Hussain Towaileb <[email protected]>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Hussain Towaileb <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Michael Blow <[email protected]>

Reply via email to