>From Hussain Towaileb <[email protected]>:
Hussain Towaileb has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21263?usp=email )
Change subject: [NO ISSUE][EXT]: misc iceberg fixes
......................................................................
[NO ISSUE][EXT]: misc iceberg fixes
Details:
- Ensure "Bearer " is not part of bearer token when passed.
- Add serialization replacement for Nessie exceptions.
- Ensure projected colums filtration is applied.
- Fix handling of serializing binary with respect to arrays.
- Ensure record field are accessed by name instead of position.
Ext-ref: MB-71912
Change-Id: If41bc97040b0ef6dfd45ca085c61e796d0e63c5a
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/nessie/NessieUtils.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestUtils.java
7 files changed, 52 insertions(+), 17 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/63/21263/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 6d17fef..822ebc1 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -139,6 +139,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.projectnessie.client.http.HttpClientException;
+import org.projectnessie.client.rest.NessieServiceException;
import org.projectnessie.error.BaseNessieClientServerException;
import com.azure.storage.blob.models.BlobStorageException;
@@ -168,6 +169,7 @@
registerReplacement(BlobStorageException.class,
SerializableExceptionProxy::new);
registerReplacement(DataLakeStorageException.class,
SerializableExceptionProxy::new);
registerReplacement(BaseNessieClientServerException.class,
SerializableExceptionProxy::new);
+ registerReplacement(NessieServiceException.class,
SerializableExceptionProxy::new);
registerReplacement(HttpClientException.class,
SerializableExceptionProxy::new);
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 7566206..be43cc0 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -117,6 +117,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.projectnessie.client.http.HttpClientException;
+import org.projectnessie.client.rest.NessieServiceException;
import org.projectnessie.error.BaseNessieClientServerException;
import com.azure.storage.blob.models.BlobStorageException;
@@ -155,6 +156,7 @@
registerReplacement(BlobStorageException.class,
SerializableExceptionProxy::new);
registerReplacement(DataLakeStorageException.class,
SerializableExceptionProxy::new);
registerReplacement(BaseNessieClientServerException.class,
SerializableExceptionProxy::new);
+ registerReplacement(NessieServiceException.class,
SerializableExceptionProxy::new);
registerReplacement(HttpClientException.class,
SerializableExceptionProxy::new);
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java
index 6949ef8..06ff6ef 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java
@@ -33,6 +33,8 @@
import org.apache.asterix.external.util.iceberg.IcebergConstants;
import org.apache.asterix.external.util.iceberg.IcebergUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
+import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
@@ -76,7 +78,8 @@
try {
initializeTable();
} catch (CompilationException e) {
- throw HyracksDataException.create(e);
+ Throwable throwable = closeResources(e);
+ throw HyracksDataException.create(throwable);
}
}
@@ -158,17 +161,17 @@
@Override
public void close() throws IOException {
- if (iterable != null) {
- iterable.close();
- }
- if (tableFileIo != null) {
- tableFileIo.close();
- }
-
+ Throwable throwable = CleanupUtils.closeSilently(iterable, null);
+ throwable = CleanupUtils.closeSilently(tableFileIo, throwable);
try {
- IcebergUtils.closeAndCleanup(catalog, catalogProperties);
- } catch (CompilationException e) {
- throw HyracksDataException.create(e);
+ if (catalog != null) {
+ IcebergUtils.closeAndCleanup(catalog, catalogProperties);
+ }
+ } catch (Exception ex) {
+ throwable = ExceptionUtils.suppress(throwable, ex);
+ }
+ if (throwable != null) {
+ throw HyracksDataException.create(throwable);
}
}
@@ -219,4 +222,18 @@
}
throw new IllegalStateException("Snapshot must've been pinned during
compilation phase");
}
+
+ private Throwable closeResources(Throwable throwable) {
+ if (tableFileIo != null) {
+ throwable = CleanupUtils.closeSilently(tableFileIo, throwable);
+ }
+ if (catalog != null) {
+ try {
+ IcebergUtils.closeAndCleanup(catalog, catalogProperties);
+ } catch (Exception ex) {
+ throwable = ExceptionUtils.suppress(throwable, ex);
+ }
+ }
+ return throwable;
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
index e8f7d91..17d24ed 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
@@ -152,7 +152,7 @@
if (projectedFields != null && projectedFields.length > 0) {
projectedSchema = projectedSchema.select(projectedFields);
}
- scan.project(projectedSchema);
+ scan = scan.project(projectedSchema);
Expression filterExpression =
((IcebergTableFilterEvaluatorFactory)
filterEvaluatorFactory).getFilterExpression();
if (filterExpression != null) {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
index a01be90..1f045d6 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
@@ -95,13 +95,14 @@
NestedField field = projectedSchema.columns().get(i);
String fieldName = field.name();
Type fieldType = field.type();
- ATypeTag typeTag = getTypeTag(fieldType, record.get(i) == null,
parserContext);
+ Object fieldValue = record.getField(fieldName);
+ ATypeTag typeTag = getTypeTag(fieldType, fieldValue == null,
parserContext);
IValueReference value;
if (valueEmbedder.shouldEmbed(fieldName, typeTag)) {
value = valueEmbedder.getEmbeddedValue();
} else {
valueBuffer.reset();
- parseValue(fieldType, record.get(i),
valueBuffer.getDataOutput());
+ parseValue(fieldType, fieldValue, valueBuffer.getDataOutput());
value = valueBuffer;
}
@@ -308,7 +309,14 @@
private void serializeBinary(Object value, DataOutput out) throws
HyracksDataException {
ByteBuffer byteBuffer = (ByteBuffer) value;
- aBinary.setValue(byteBuffer.array(), 0, byteBuffer.array().length);
+ if (byteBuffer.hasArray()) {
+ aBinary.setValue(byteBuffer.array(), byteBuffer.arrayOffset() +
byteBuffer.position(),
+ byteBuffer.remaining());
+ } else {
+ byte[] bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.duplicate().get(bytes);
+ aBinary.setValue(bytes, 0, bytes.length);
+ }
binarySerde.serialize(aBinary, out);
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/nessie/NessieUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/nessie/NessieUtils.java
index 0e36ac6..0874901 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/nessie/NessieUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/nessie/NessieUtils.java
@@ -155,7 +155,10 @@
if (notAllowed != null) {
throw new
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
BEARER_TOKEN_FIELD_NAME);
}
-
+ // strip "Bearer " prefix if present - the Nessie client adds it
internally
+ if (bearerToken.startsWith("Bearer ")) {
+ bearerToken = bearerToken.substring("Bearer ".length());
+ }
catalogProperties.put(NESSIE_AUTHENTICATION_TYPE_FIELD_NAME,
NESSIE_AUTHENTICATION_TYPE_BEARER);
catalogProperties.put(NESSIE_AUTHENTICATION_BEARER_TOKEN_FIELD_NAME,
bearerToken);
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestUtils.java
index 569955a..4818b94 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestUtils.java
@@ -132,7 +132,10 @@
if (notAllowed != null) {
throw new
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
BEARER_TOKEN_FIELD_NAME);
}
-
+ // Strip "Bearer " prefix if present - the Iceberg REST client adds it
internally
+ if (bearerToken.startsWith("Bearer ")) {
+ bearerToken = bearerToken.substring("Bearer ".length());
+ }
catalogProperties.put(AuthProperties.AUTH_TYPE,
AuthProperties.AUTH_TYPE_OAUTH2);
catalogProperties.put(RestConstants.ICEBERG_BEARER_TOKEN_PROPERTY_NAME,
bearerToken);
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21263?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: If41bc97040b0ef6dfd45ca085c61e796d0e63c5a
Gerrit-Change-Number: 21263
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <[email protected]>