>From Hussain Towaileb <[email protected]>:
Hussain Towaileb has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21326?usp=email )
Change subject: [NO ISSUE][EXT]: set inputStreamType and changeDetectionMode if
missing
......................................................................
[NO ISSUE][EXT]: set inputStreamType and changeDetectionMode if missing
Ext-ref: MB-72281
Change-Id: I3fed787edf8414d4654caeceebc9f447ed93800e
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
3 files changed, 28 insertions(+), 51 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/26/21326/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index b744a80..e58b8e8 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -116,6 +116,7 @@
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.WriterValidationUtil;
+import org.apache.asterix.external.util.aws.s3.S3Utils;
import org.apache.asterix.external.util.iceberg.IcebergConstants;
import org.apache.asterix.external.util.iceberg.IcebergUtils;
import
org.apache.asterix.external.writer.printer.parquet.SchemaConverterVisitor;
@@ -1118,7 +1119,7 @@
ExternalDataUtils.normalize(properties);
ExternalDataUtils.validate(properties);
ExternalDataUtils.validateType(properties, (ARecordType)
itemType);
- beforeExternalCollectionValidation(properties);
+ beforeExternalCollectionValidation(externalDetails,
properties);
Map<String, String> propertiesCopy = new
HashMap<>(properties);
validateIfIcebergTable(metadataProvider,
requestParameters, propertiesCopy, mdTxnCtx, sourceLoc,
@@ -1241,10 +1242,15 @@
*
* @param properties external collection properties
*/
- protected void beforeExternalCollectionValidation(Map<String, String>
properties) {
+ protected void beforeExternalCollectionValidation(ExternalDetailsDecl
externalDetailsDecl,
+ Map<String, String> properties) throws CompilationException {
if (IcebergUtils.isIcebergTable(properties)) {
IcebergUtils.setDefaults(properties);
}
+ if (ExternalDataUtils.isParquetFormat(properties)) {
+ S3Utils.validateAndNormalizeStreamInputType(properties);
+ S3Utils.validateAndNormalizeChangeDetectionMode(properties);
+ }
}
protected void validateIfIcebergTable(MetadataProvider metadataProvider,
IRequestParameters requestParameters,
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
index 8390576..c458dc3 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
@@ -49,17 +49,18 @@
public static final String HADOOP_SERVICE_END_POINT = "fs.s3a.endpoint";
public static final String HADOOP_REGION = "fs.s3a.endpoint.region";
+ // value to indicate we should use whatever the SDK defaults to
+ public static final String SDK_DEFAULT = "sdk_default";
+
// input stream type
public static final String INPUT_STREAM_TYPE_FIELD_NAME =
"inputStreamType";
public static final String HADOOP_INPUT_STREAM_TYPE =
"fs.s3a.input.stream.type";
- public static final String HADOOP_INPUT_STREAM_TYPE_VAL_AUTO = "auto";
public static final String HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC =
"classic";
public static final String HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS =
"analytics";
// change detection mode
public static final String CHANGE_DETECTION_MODE_FIELD_NAME =
"changeDetectionMode";
public static final String HADOOP_CHANGE_DETECTION_MODE =
"fs.s3a.change.detection.mode";
- public static final String HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO = "auto";
public static final String HADOOP_CHANGE_DETECTION_MODE_VAL_SERVER =
"server";
public static final String HADOOP_CHANGE_DETECTION_MODE_VAL_CLIENT =
"client";
public static final String HADOOP_CHANGE_DETECTION_MODE_VAL_NONE = "none";
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index db91300..fe2d6a6 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -56,15 +56,12 @@
import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_REGION;
import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_DURATION;
import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_NAME;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO;
import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CHANGE_DETECTION_MODE_VAL_CLIENT;
import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CHANGE_DETECTION_MODE_VAL_NONE;
import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CHANGE_DETECTION_MODE_VAL_SERVER;
import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY;
import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIAL_PROVIDER_KEY;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE;
import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS;
-import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE_VAL_AUTO;
import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC;
import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INSTANCE_PROFILE;
import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_PATH_STYLE_ACCESS;
@@ -77,6 +74,7 @@
import static
org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_TEMPORARY;
import static
org.apache.asterix.external.util.aws.s3.S3Constants.INPUT_STREAM_TYPE_FIELD_NAME;
import static
org.apache.asterix.external.util.aws.s3.S3Constants.PATH_STYLE_ADDRESSING_FIELD_NAME;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.SDK_DEFAULT;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
import static
org.apache.hyracks.util.annotations.AiProvenance.Agent.CLAUDE_SONNET_4_6;
import static
org.apache.hyracks.util.annotations.AiProvenance.Tool.GITHUB_COPILOT;
@@ -305,8 +303,8 @@
jobConf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint);
}
- setInputStreamType(configuration, jobConf, serviceEndpoint);
- setChangeDetectionMode(configuration, jobConf, serviceEndpoint);
+ setInputStreamType(configuration, jobConf);
+ setChangeDetectionMode(configuration, jobConf);
boolean pathStyleAddressing =
validateAndGetPathStyleAddressing(configuration.get(PATH_STYLE_ADDRESSING_FIELD_NAME),
serviceEndpoint);
@@ -324,44 +322,16 @@
}
}
- private static void setInputStreamType(Map<String, String> configuration,
JobConf jobConf, String serviceEndpoint) {
+ private static void setInputStreamType(Map<String, String> configuration,
JobConf jobConf) {
String configuredInputStreamType =
configuration.get(INPUT_STREAM_TYPE_FIELD_NAME);
- if (configuredInputStreamType == null) {
- configuredInputStreamType = HADOOP_INPUT_STREAM_TYPE_VAL_AUTO;
- }
-
- if
(HADOOP_INPUT_STREAM_TYPE_VAL_AUTO.equals(configuredInputStreamType)) {
- // Auto mode: decide based on endpoint
- if (serviceEndpoint != null) {
- // The analytics-accelerator stream factory (default in Hadoop
3.4+) performs a HeadObject call during
- // stream initialization to fetch the ETag. Non-AWS
S3-compatible endpoints may not return an ETag on
- // HeadObject, which causes a NullPointerException. Fall back
to the classic stream
- // implementation when a custom service endpoint is in use.
- jobConf.set(HADOOP_INPUT_STREAM_TYPE,
HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC);
- } else {
- jobConf.set(HADOOP_INPUT_STREAM_TYPE,
HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS);
- }
- } else {
- // Explicit override: use the user-specified stream type
- jobConf.set(HADOOP_INPUT_STREAM_TYPE, configuredInputStreamType);
+ if (!Objects.equals(configuredInputStreamType, SDK_DEFAULT)) {
+ jobConf.set(S3Constants.HADOOP_INPUT_STREAM_TYPE,
configuredInputStreamType);
}
}
- private static void setChangeDetectionMode(Map<String, String>
configuration, JobConf jobConf,
- String serviceEndpoint) {
+ private static void setChangeDetectionMode(Map<String, String>
configuration, JobConf jobConf) {
String configuredChangeDetectionMode =
configuration.get(CHANGE_DETECTION_MODE_FIELD_NAME);
- if (configuredChangeDetectionMode == null) {
- configuredChangeDetectionMode =
HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO;
- }
-
- String inputStreamType = jobConf.get(HADOOP_INPUT_STREAM_TYPE);
- if
(HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO.equals(configuredChangeDetectionMode)) {
- // If using a custom endpoint with classic streams, default to no
change detection as
- // S3-compatible implementations may not support ETag/version
metadata consistently.
- if (serviceEndpoint != null &&
HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC.equals(inputStreamType)) {
- jobConf.set(S3Constants.HADOOP_CHANGE_DETECTION_MODE,
HADOOP_CHANGE_DETECTION_MODE_VAL_NONE);
- }
- } else {
+ if (!Objects.equals(configuredChangeDetectionMode, SDK_DEFAULT)) {
jobConf.set(S3Constants.HADOOP_CHANGE_DETECTION_MODE,
configuredChangeDetectionMode);
}
}
@@ -849,16 +819,15 @@
throws CompilationException {
String streamInputType =
configuration.get(INPUT_STREAM_TYPE_FIELD_NAME);
if (streamInputType == null || streamInputType.isBlank()) {
- configuration.put(INPUT_STREAM_TYPE_FIELD_NAME,
HADOOP_INPUT_STREAM_TYPE_VAL_AUTO);
- return;
+ streamInputType = SDK_DEFAULT;
+ configuration.put(INPUT_STREAM_TYPE_FIELD_NAME, streamInputType);
}
- if
(!HADOOP_INPUT_STREAM_TYPE_VAL_AUTO.equalsIgnoreCase(streamInputType)
+ if (!SDK_DEFAULT.equalsIgnoreCase(streamInputType)
&&
!HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS.equalsIgnoreCase(streamInputType)
&&
!HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC.equalsIgnoreCase(streamInputType)) {
- throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE,
INPUT_STREAM_TYPE_FIELD_NAME,
- HADOOP_INPUT_STREAM_TYPE_VAL_AUTO + ", " +
HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS + ", "
- + HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC);
+ throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE,
INPUT_STREAM_TYPE_FIELD_NAME, SDK_DEFAULT
+ + ", " + HADOOP_INPUT_STREAM_TYPE_VAL_ANALYTICS + ", " +
HADOOP_INPUT_STREAM_TYPE_VAL_CLASSIC);
}
configuration.put(INPUT_STREAM_TYPE_FIELD_NAME,
streamInputType.toLowerCase());
}
@@ -867,16 +836,17 @@
throws CompilationException {
String changeDetectionMode =
configuration.get(CHANGE_DETECTION_MODE_FIELD_NAME);
if (changeDetectionMode == null || changeDetectionMode.isBlank()) {
- configuration.put(CHANGE_DETECTION_MODE_FIELD_NAME,
HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO);
+ changeDetectionMode = SDK_DEFAULT;
+ configuration.put(CHANGE_DETECTION_MODE_FIELD_NAME,
changeDetectionMode);
return;
}
- if
(!HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO.equalsIgnoreCase(changeDetectionMode)
+ if (!SDK_DEFAULT.equalsIgnoreCase(changeDetectionMode)
&&
!HADOOP_CHANGE_DETECTION_MODE_VAL_NONE.equalsIgnoreCase(changeDetectionMode)
&&
!HADOOP_CHANGE_DETECTION_MODE_VAL_CLIENT.equalsIgnoreCase(changeDetectionMode)
&&
!HADOOP_CHANGE_DETECTION_MODE_VAL_SERVER.equalsIgnoreCase(changeDetectionMode))
{
throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE,
CHANGE_DETECTION_MODE_FIELD_NAME,
- HADOOP_CHANGE_DETECTION_MODE_VAL_AUTO + ", " +
HADOOP_CHANGE_DETECTION_MODE_VAL_NONE + ", "
+ SDK_DEFAULT + ", " + HADOOP_CHANGE_DETECTION_MODE_VAL_NONE
+ ", "
+ HADOOP_CHANGE_DETECTION_MODE_VAL_CLIENT + ", " +
HADOOP_CHANGE_DETECTION_MODE_VAL_SERVER);
}
configuration.put(CHANGE_DETECTION_MODE_FIELD_NAME,
changeDetectionMode.toLowerCase());
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21326?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: I3fed787edf8414d4654caeceebc9f447ed93800e
Gerrit-Change-Number: 21326
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <[email protected]>