>From Hussain Towaileb <[email protected]>: Hussain Towaileb has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21326?usp=email )
Change subject: [NO ISSUE][EXT]: set inputStreamType and changeDetectionMode if missing ...................................................................... [NO ISSUE][EXT]: set inputStreamType and changeDetectionMode if missing Ext-ref: MB-72281 Change-Id: I3fed787edf8414d4654caeceebc9f447ed93800e Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21326 Tested-by: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Integration-Tests: Jenkins <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java 3 files changed, 28 insertions(+), 51 deletions(-) Approvals: Michael Blow: Looks good to me, approved Jenkins: Verified; Verified diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index b744a80..e94466c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -116,6 +116,7 @@ import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.external.util.WriterValidationUtil; +import org.apache.asterix.external.util.aws.s3.S3Utils; import org.apache.asterix.external.util.iceberg.IcebergConstants; import org.apache.asterix.external.util.iceberg.IcebergUtils; import org.apache.asterix.external.writer.printer.parquet.SchemaConverterVisitor; @@ -1118,7 +1119,7 @@ ExternalDataUtils.normalize(properties); ExternalDataUtils.validate(properties); ExternalDataUtils.validateType(properties, (ARecordType) itemType); - beforeExternalCollectionValidation(properties); + beforeExternalCollectionValidation(externalDetails, properties); Map<String, String> propertiesCopy = new HashMap<>(properties); validateIfIcebergTable(metadataProvider, requestParameters, propertiesCopy, mdTxnCtx, sourceLoc, @@ -1241,10 +1242,15 @@ * * @param properties external collection properties */ - protected void beforeExternalCollectionValidation(Map<String, String> properties) { + protected void beforeExternalCollectionValidation(ExternalDetailsDecl externalDetailsDecl, + Map<String, String> properties) throws CompilationException { if (IcebergUtils.isIcebergTable(properties)) { IcebergUtils.setDefaults(properties); } + if (ExternalDataUtils.isParquetFormat(properties) || ExternalDataUtils.isDeltaTable(properties)) { + S3Utils.validateAndNormalizeStreamInputType(properties); + S3Utils.validateAndNormalizeChangeDetectionMode(properties); + } } protected void validateIfIcebergTable(MetadataProvider metadataProvider, IRequestParameters requestParameters, diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java index 8390576..c458dc3 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java @@ -49,17 +49,18 @@ public static final String HADOOP_SERVICE_END_POINT = "fs.s3a.endpoint"; public static final String HADOOP_REGION = "fs.s3a.endpoint.region"; + // value to indicate we should use whatever the SDK defaults to + public static final String SDK_DEFAULT = "sdk_default"; + // input stream type public static final String INPUT_STREAM_TYPE_FIELD_NAME = "inputStreamType"; public static final String HADOOP_INPUT_STREAM_TYPE = "fs.s3a.input.stream.type"; - public static final String HADOOP_INPUT_STREAM_TYPE_VAL_AUTO = "auto"; public static final String HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC = "classic"; public static final String HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS = "analytics"; // change detection mode public static final String CHANGE_DETECTION_MODE_FIELD_NAME = "changeDetectionMode"; public static final String HADOOP_CHANGE_DETECTION_MODE = "fs.s3a.change.detection.mode"; - public static final String HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO = "auto"; public static final String HADOOP_CHANGE_DETECTION_MODE_VAL_SERVER = "server"; public static final String HADOOP_CHANGE_DETECTION_MODE_VAL_CLIENT = "client"; public static final String HADOOP_CHANGE_DETECTION_MODE_VAL_NONE = "none"; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java index 0b00b08..7eb227c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java @@ -56,15 +56,12 @@ import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_REGION; import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_DURATION; import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_NAME; -import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO; import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CHANGE_DETECTION_MODE_VAL_CLIENT; import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CHANGE_DETECTION_MODE_VAL_NONE; import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CHANGE_DETECTION_MODE_VAL_SERVER; import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY; import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIAL_PROVIDER_KEY; -import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE; import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS; -import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE_VAL_AUTO; import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC; import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INSTANCE_PROFILE; import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_PATH_STYLE_ACCESS; @@ -77,6 +74,7 @@ import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_TEMPORARY; import static org.apache.asterix.external.util.aws.s3.S3Constants.INPUT_STREAM_TYPE_FIELD_NAME; import static org.apache.asterix.external.util.aws.s3.S3Constants.PATH_STYLE_ADDRESSING_FIELD_NAME; +import static org.apache.asterix.external.util.aws.s3.S3Constants.SDK_DEFAULT; import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; import static org.apache.hyracks.util.annotations.AiProvenance.Agent.CLAUDE_SONNET_4_6; import static org.apache.hyracks.util.annotations.AiProvenance.Tool.GITHUB_COPILOT; @@ -307,8 +305,8 @@ jobConf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint); } - setInputStreamType(configuration, jobConf, serviceEndpoint); - setChangeDetectionMode(configuration, jobConf, serviceEndpoint); + setInputStreamType(configuration, jobConf); + setChangeDetectionMode(configuration, jobConf); boolean pathStyleAddressing = validateAndGetPathStyleAddressing(configuration.get(PATH_STYLE_ADDRESSING_FIELD_NAME), serviceEndpoint); @@ -326,44 +324,16 @@ } } - private static void setInputStreamType(Map<String, String> configuration, JobConf jobConf, String serviceEndpoint) { + private static void setInputStreamType(Map<String, String> configuration, JobConf jobConf) { String configuredInputStreamType = configuration.get(INPUT_STREAM_TYPE_FIELD_NAME); - if (configuredInputStreamType == null) { - configuredInputStreamType = HADOOP_INPUT_STREAM_TYPE_VAL_AUTO; - } - - if (HADOOP_INPUT_STREAM_TYPE_VAL_AUTO.equals(configuredInputStreamType)) { - // Auto mode: decide based on endpoint - if (serviceEndpoint != null) { - // The analytics-accelerator stream factory (default in Hadoop 3.4+) performs a HeadObject call during - // stream initialization to fetch the ETag. Non-AWS S3-compatible endpoints may not return an ETag on - // HeadObject, which causes a NullPointerException. Fall back to the classic stream - // implementation when a custom service endpoint is in use. - jobConf.set(HADOOP_INPUT_STREAM_TYPE, HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC); - } else { - jobConf.set(HADOOP_INPUT_STREAM_TYPE, HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS); - } - } else { - // Explicit override: use the user-specified stream type - jobConf.set(HADOOP_INPUT_STREAM_TYPE, configuredInputStreamType); + if (!Objects.equals(configuredInputStreamType, SDK_DEFAULT)) { + jobConf.set(S3Constants.HADOOP_INPUT_STREAM_TYPE, configuredInputStreamType); } } - private static void setChangeDetectionMode(Map<String, String> configuration, JobConf jobConf, - String serviceEndpoint) { + private static void setChangeDetectionMode(Map<String, String> configuration, JobConf jobConf) { String configuredChangeDetectionMode = configuration.get(CHANGE_DETECTION_MODE_FIELD_NAME); - if (configuredChangeDetectionMode == null) { - configuredChangeDetectionMode = HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO; - } - - String inputStreamType = jobConf.get(HADOOP_INPUT_STREAM_TYPE); - if (HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO.equals(configuredChangeDetectionMode)) { - // If using a custom endpoint with classic streams, default to no change detection as - // S3-compatible implementations may not support ETag/version metadata consistently. - if (serviceEndpoint != null && HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC.equals(inputStreamType)) { - jobConf.set(S3Constants.HADOOP_CHANGE_DETECTION_MODE, HADOOP_CHANGE_DETECTION_MODE_VAL_NONE); - } - } else { + if (!Objects.equals(configuredChangeDetectionMode, SDK_DEFAULT)) { jobConf.set(S3Constants.HADOOP_CHANGE_DETECTION_MODE, configuredChangeDetectionMode); } } @@ -851,16 +821,15 @@ throws CompilationException { String streamInputType = configuration.get(INPUT_STREAM_TYPE_FIELD_NAME); if (streamInputType == null || streamInputType.isBlank()) { - configuration.put(INPUT_STREAM_TYPE_FIELD_NAME, HADOOP_INPUT_STREAM_TYPE_VAL_AUTO); - return; + streamInputType = SDK_DEFAULT; + configuration.put(INPUT_STREAM_TYPE_FIELD_NAME, streamInputType); } - if (!HADOOP_INPUT_STREAM_TYPE_VAL_AUTO.equalsIgnoreCase(streamInputType) + if (!SDK_DEFAULT.equalsIgnoreCase(streamInputType) && !HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS.equalsIgnoreCase(streamInputType) && !HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC.equalsIgnoreCase(streamInputType)) { - throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, INPUT_STREAM_TYPE_FIELD_NAME, - HADOOP_INPUT_STREAM_TYPE_VAL_AUTO + ", " + HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS + ", " - + HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC); + throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, INPUT_STREAM_TYPE_FIELD_NAME, SDK_DEFAULT + + ", " + HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS + ", " + HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC); } configuration.put(INPUT_STREAM_TYPE_FIELD_NAME, streamInputType.toLowerCase()); } @@ -869,16 +838,17 @@ throws CompilationException { String changeDetectionMode = configuration.get(CHANGE_DETECTION_MODE_FIELD_NAME); if (changeDetectionMode == null || changeDetectionMode.isBlank()) { - configuration.put(CHANGE_DETECTION_MODE_FIELD_NAME, HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO); + changeDetectionMode = SDK_DEFAULT; + configuration.put(CHANGE_DETECTION_MODE_FIELD_NAME, changeDetectionMode); return; } - if (!HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO.equalsIgnoreCase(changeDetectionMode) + if (!SDK_DEFAULT.equalsIgnoreCase(changeDetectionMode) && !HADOOP_CHANGE_DETECTION_MODE_VAL_NONE.equalsIgnoreCase(changeDetectionMode) && !HADOOP_CHANGE_DETECTION_MODE_VAL_CLIENT.equalsIgnoreCase(changeDetectionMode) && !HADOOP_CHANGE_DETECTION_MODE_VAL_SERVER.equalsIgnoreCase(changeDetectionMode)) { throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, CHANGE_DETECTION_MODE_FIELD_NAME, - HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO + ", " + HADOOP_CHANGE_DETECTION_MODE_VAL_NONE + ", " + SDK_DEFAULT + ", " + HADOOP_CHANGE_DETECTION_MODE_VAL_NONE + ", " + HADOOP_CHANGE_DETECTION_MODE_VAL_CLIENT + ", " + HADOOP_CHANGE_DETECTION_MODE_VAL_SERVER); } configuration.put(CHANGE_DETECTION_MODE_FIELD_NAME, changeDetectionMode.toLowerCase()); -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21326?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: lumina Gerrit-Change-Id: I3fed787edf8414d4654caeceebc9f447ed93800e Gerrit-Change-Number: 21326 Gerrit-PatchSet: 4 Gerrit-Owner: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]>
