>From Hussain Towaileb <[email protected]>:

Hussain Towaileb has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21263?usp=email )

Change subject: [NO ISSUE][EXT]: misc iceberg fixes
......................................................................

[NO ISSUE][EXT]: misc iceberg fixes

Details:
- Ensure "Bearer " is not part of bearer token when passed.
- Add serialization replacement for Nessie exceptions.
- Ensure projected colums filtration is applied.
- Fix handling of serializing binary with respect to arrays.
- Ensure record field are accessed by name instead of position.

Ext-ref: MB-71912
Change-Id: If41bc97040b0ef6dfd45ca085c61e796d0e63c5a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21263
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Hussain Towaileb <[email protected]>
Tested-by: Hussain Towaileb <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M 
asterixdb/asterix-app/src/test/resources/runtimets/results/iceberg/all-data-types/result.020.adm
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/iceberg/auth/GoogleAuthSession.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/nessie/NessieUtils.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestConstants.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestUtils.java
10 files changed, 71 insertions(+), 35 deletions(-)

Approvals:
  Murtadha Hubail: Looks good to me, approved
  Jenkins: Verified
  Anon. E. Moose #1000171:
  Hussain Towaileb: Looks good to me, but someone else must approve; Verified




diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 6d17fef..822ebc1 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -139,6 +139,7 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.projectnessie.client.http.HttpClientException;
+import org.projectnessie.client.rest.NessieServiceException;
 import org.projectnessie.error.BaseNessieClientServerException;

 import com.azure.storage.blob.models.BlobStorageException;
@@ -168,6 +169,7 @@
         registerReplacement(BlobStorageException.class, 
SerializableExceptionProxy::new);
         registerReplacement(DataLakeStorageException.class, 
SerializableExceptionProxy::new);
         registerReplacement(BaseNessieClientServerException.class, 
SerializableExceptionProxy::new);
+        registerReplacement(NessieServiceException.class, 
SerializableExceptionProxy::new);
         registerReplacement(HttpClientException.class, 
SerializableExceptionProxy::new);
     }

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 7566206..be43cc0 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -117,6 +117,7 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.projectnessie.client.http.HttpClientException;
+import org.projectnessie.client.rest.NessieServiceException;
 import org.projectnessie.error.BaseNessieClientServerException;

 import com.azure.storage.blob.models.BlobStorageException;
@@ -155,6 +156,7 @@
         registerReplacement(BlobStorageException.class, 
SerializableExceptionProxy::new);
         registerReplacement(DataLakeStorageException.class, 
SerializableExceptionProxy::new);
         registerReplacement(BaseNessieClientServerException.class, 
SerializableExceptionProxy::new);
+        registerReplacement(NessieServiceException.class, 
SerializableExceptionProxy::new);
         registerReplacement(HttpClientException.class, 
SerializableExceptionProxy::new);
     }

diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/iceberg/all-data-types/result.020.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/iceberg/all-data-types/result.020.adm
index 2835651..d5f405e 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/iceberg/all-data-types/result.020.adm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/iceberg/all-data-types/result.020.adm
@@ -1 +1 @@
-{ "binary_field": "AQIDBAUGBwg=", "fixed_field": "SGVsbG8gV29ybGQ=", 
"geometry_field": "AQIDBAUGBwgJCg==", "geography_field": "AQIDBAUGBwgJCg0=", 
"bool_field": true, "byte_field": 42, "short_field": 1000, "int_field": 42, 
"long_field": 9223372036854775807, "float_field": 3.14, "double_field": 
2.718281828459045, "decimal_field": 12345.6789, "string_field": "hello world", 
"varchar_field": "varchar value one", "char_field": "Hi", "uuid_field": 
uuid("550e8400-e29b-41d4-a716-446655440000"), "date_field": 19723, 
"time_field": 37230000, "timestamp_field": 1707000000000, 
"timestamp_ntz_field": 1707048000000, "timestamp_nano_field": 1707000000000, 
"interval_ym_field": 14, "interval_dt_field": 37230000000, "struct_field": { 
"name": "Alice", "age": 30, "active": true }, "list_field": [ "a", "b", "c" ], 
"map_field": { "key1": "value1", "key2": "100" }, "variant_field": "string 
value", "unknown_field": null }
\ No newline at end of file
+{ "binary_field": "AQIDBAUGBwg=", "fixed_field": "SGVsbG8gV29ybGQ=", 
"geometry_field": "AQIDBAUGBwgJCg==", "geography_field": "AQIDBAUGBwgJCg0=", 
"bool_field": true, "byte_field": 42, "short_field": 1000, "int_field": 42, 
"long_field": 9223372036854775807, "float_field": 3.14, "double_field": 
2.718281828459045, "decimal_field": 12345.6789, "string_field": "hello world", 
"varchar_field": "varchar value one", "char_field": "Hi", "uuid_field": 
uuid("550e8400-e29b-41d4-a716-446655440000"), "date_field": 19723, 
"time_field": 37230000, "timestamp_field": 1707000000000000, 
"timestamp_ntz_field": 1707048000000000, "timestamp_nano_field": 
1707000000000000000, "interval_ym_field": 14, "interval_dt_field": 37230000000, 
"struct_field": { "name": "Alice", "age": 30, "active": true }, "list_field": [ 
"a", "b", "c" ], "map_field": { "key1": "value1", "key2": "100" }, 
"variant_field": "string value", "unknown_field": null }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java
index 6949ef8..06ff6ef 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java
@@ -33,6 +33,8 @@
 import org.apache.asterix.external.util.iceberg.IcebergConstants;
 import org.apache.asterix.external.util.iceberg.IcebergUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
+import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
@@ -76,7 +78,8 @@
         try {
             initializeTable();
         } catch (CompilationException e) {
-            throw HyracksDataException.create(e);
+            Throwable throwable = closeResources(e);
+            throw HyracksDataException.create(throwable);
         }
     }

@@ -158,17 +161,17 @@

     @Override
     public void close() throws IOException {
-        if (iterable != null) {
-            iterable.close();
-        }
-        if (tableFileIo != null) {
-            tableFileIo.close();
-        }
-
+        Throwable throwable = CleanupUtils.closeSilently(iterable, null);
+        throwable = CleanupUtils.closeSilently(tableFileIo, throwable);
         try {
-            IcebergUtils.closeAndCleanup(catalog, catalogProperties);
-        } catch (CompilationException e) {
-            throw HyracksDataException.create(e);
+            if (catalog != null) {
+                IcebergUtils.closeAndCleanup(catalog, catalogProperties);
+            }
+        } catch (Exception ex) {
+            throwable = ExceptionUtils.suppress(throwable, ex);
+        }
+        if (throwable != null) {
+            throw HyracksDataException.create(throwable);
         }
     }

@@ -219,4 +222,18 @@
         }
         throw new IllegalStateException("Snapshot must've been pinned during 
compilation phase");
     }
+
+    private Throwable closeResources(Throwable throwable) {
+        if (tableFileIo != null) {
+            throwable = CleanupUtils.closeSilently(tableFileIo, throwable);
+        }
+        if (catalog != null) {
+            try {
+                IcebergUtils.closeAndCleanup(catalog, catalogProperties);
+            } catch (Exception ex) {
+                throwable = ExceptionUtils.suppress(throwable, ex);
+            }
+        }
+        return throwable;
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
index e8f7d91..17d24ed 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
@@ -152,7 +152,7 @@
             if (projectedFields != null && projectedFields.length > 0) {
                 projectedSchema = projectedSchema.select(projectedFields);
             }
-            scan.project(projectedSchema);
+            scan = scan.project(projectedSchema);
             Expression filterExpression =
                     ((IcebergTableFilterEvaluatorFactory) 
filterEvaluatorFactory).getFilterExpression();
             if (filterExpression != null) {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
index a01be90..e88a9cc 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
@@ -25,11 +25,13 @@
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.OffsetDateTime;
 import java.time.ZoneId;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -95,13 +97,14 @@
             NestedField field = projectedSchema.columns().get(i);
             String fieldName = field.name();
             Type fieldType = field.type();
-            ATypeTag typeTag = getTypeTag(fieldType, record.get(i) == null, 
parserContext);
+            Object fieldValue = record.getField(fieldName);
+            ATypeTag typeTag = getTypeTag(fieldType, fieldValue == null, 
parserContext);
             IValueReference value;
             if (valueEmbedder.shouldEmbed(fieldName, typeTag)) {
                 value = valueEmbedder.getEmbeddedValue();
             } else {
                 valueBuffer.reset();
-                parseValue(fieldType, record.get(i), 
valueBuffer.getDataOutput());
+                parseValue(fieldType, fieldValue, valueBuffer.getDataOutput());
                 value = valueBuffer;
             }

@@ -308,7 +311,14 @@

     private void serializeBinary(Object value, DataOutput out) throws 
HyracksDataException {
         ByteBuffer byteBuffer = (ByteBuffer) value;
-        aBinary.setValue(byteBuffer.array(), 0, byteBuffer.array().length);
+        if (byteBuffer.hasArray()) {
+            aBinary.setValue(byteBuffer.array(), byteBuffer.arrayOffset() + 
byteBuffer.position(),
+                    byteBuffer.remaining());
+        } else {
+            byte[] bytes = new byte[byteBuffer.remaining()];
+            byteBuffer.duplicate().get(bytes);
+            aBinary.setValue(bytes, 0, bytes.length);
+        }
         binarySerde.serialize(aBinary, out);
     }

@@ -340,29 +350,25 @@
     }

     public void serializeTimestamp(Type type, Object value, DataOutput output) 
throws HyracksDataException {
-        long timestampInMillis;
+        Instant instant;
         switch (value) {
-            case OffsetDateTime offsetDateTime ->
-                    timestampInMillis = 
offsetDateTime.toInstant().toEpochMilli();
-
+            case OffsetDateTime offsetDateTime -> instant = 
offsetDateTime.toInstant();
             case LocalDateTime localDateTime -> {
                 ZoneId zoneId = parserContext.getTimeZoneId();
-                timestampInMillis = 
localDateTime.atZone(zoneId).toInstant().toEpochMilli();
+                instant = localDateTime.atZone(zoneId).toInstant();
             }
-
-            case null, default -> {
-                throw RuntimeDataException.create(
-                        ErrorCode.EXTERNAL_SOURCE_ERROR,
-                        value == null
-                                ? "unexpected null value for field type (" + 
type + ")"
-                                : "unexpected value type (" + value.getClass() 
+ ") for field type (" + type + ")");
-            }
+            case null, default -> throw 
RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR,
+                    value == null ? "unexpected null value for field type (" + 
type + ")"
+                            : "unexpected value type (" + value.getClass() + 
") for field type (" + type + ")");
         }

         if (parserContext.isTimestampAsLong()) {
-            serializeLong(timestampInMillis, output);
+            long timestampAsLong = type.typeId() == Type.TypeID.TIMESTAMP_NANO
+                    ? ChronoUnit.NANOS.between(Instant.EPOCH, instant)
+                    : ChronoUnit.MICROS.between(Instant.EPOCH, instant);
+            serializeLong(timestampAsLong, output);
         } else {
-            aDateTime.setValue(timestampInMillis);
+            aDateTime.setValue(instant.toEpochMilli());
             datetimeSerde.serialize(aDateTime, output);
         }
     }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/iceberg/auth/GoogleAuthSession.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/iceberg/auth/GoogleAuthSession.java
index 7acaeb3..dbd5789 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/iceberg/auth/GoogleAuthSession.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/iceberg/auth/GoogleAuthSession.java
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.io.UncheckedIOException;

+import org.apache.asterix.external.util.iceberg.rest.RestConstants;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.rest.HTTPHeaders;
 import org.apache.iceberg.rest.HTTPRequest;
@@ -65,8 +66,8 @@
             AccessToken token = credentials.getAccessToken();

             if (token != null && token.getTokenValue() != null) {
-                HTTPHeaders newHeaders = request.headers().putIfAbsent(
-                        
HTTPHeaders.of(HTTPHeaders.HTTPHeader.of("Authorization", "Bearer " + 
token.getTokenValue())));
+                HTTPHeaders newHeaders = 
request.headers().putIfAbsent(HTTPHeaders.of(HTTPHeaders.HTTPHeader
+                        .of("Authorization", RestConstants.BEARER_TOKEN_PREFIX 
+ token.getTokenValue())));
                 return newHeaders.equals(request.headers()) ? request
                         : 
ImmutableHTTPRequest.builder().from(request).headers(newHeaders).build();
             } else {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/nessie/NessieUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/nessie/NessieUtils.java
index f0df588..f25d20f 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/nessie/NessieUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/nessie/NessieUtils.java
@@ -49,6 +49,7 @@
 import java.util.Map;

 import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.external.util.iceberg.rest.RestConstants;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.nessie.NessieCatalog;

@@ -158,7 +159,10 @@
         if (notAllowed != null) {
             throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed, 
BEARER_TOKEN_FIELD_NAME);
         }
-
+        // strip "Bearer " prefix if present - the Nessie client adds it 
internally
+        if (bearerToken.startsWith(RestConstants.BEARER_TOKEN_PREFIX)) {
+            bearerToken = 
bearerToken.substring(RestConstants.BEARER_TOKEN_PREFIX.length());
+        }
         catalogProperties.put(NESSIE_AUTHENTICATION_TYPE_FIELD_NAME, 
NESSIE_AUTHENTICATION_TYPE_BEARER);
         catalogProperties.put(NESSIE_AUTHENTICATION_BEARER_TOKEN_FIELD_NAME, 
bearerToken);
     }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestConstants.java
index 3b278c0..fe28ea5 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestConstants.java
@@ -28,4 +28,5 @@
     public static final String ICEBERG_CREDENTIAL_PROPERTY_NAME = "credential";
     public static final String ICEBERG_SCOPE_PROPERTY_NAME = "scope";

+    public static final String BEARER_TOKEN_PREFIX = "Bearer ";
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestUtils.java
index 57121b8..820e44b 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestUtils.java
@@ -134,7 +134,10 @@
         if (notAllowed != null) {
             throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed, 
BEARER_TOKEN_FIELD_NAME);
         }
-
+        // Strip "Bearer " prefix if present - the Iceberg REST client adds it 
internally
+        if (bearerToken.startsWith(RestConstants.BEARER_TOKEN_PREFIX)) {
+            bearerToken = 
bearerToken.substring(RestConstants.BEARER_TOKEN_PREFIX.length());
+        }
         catalogProperties.put(AuthProperties.AUTH_TYPE, 
AuthProperties.AUTH_TYPE_OAUTH2);
         
catalogProperties.put(RestConstants.ICEBERG_BEARER_TOKEN_PROPERTY_NAME, 
bearerToken);
     }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21263?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: merged
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: If41bc97040b0ef6dfd45ca085c61e796d0e63c5a
Gerrit-Change-Number: 21263
Gerrit-PatchSet: 5
Gerrit-Owner: Hussain Towaileb <[email protected]>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Hussain Towaileb <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Murtadha Hubail <[email protected]>

Reply via email to