>From Hussain Towaileb <[email protected]>:

Hussain Towaileb has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21263?usp=email )


Change subject: [NO ISSUE][EXT]: misc iceberg fixes
......................................................................

[NO ISSUE][EXT]: misc iceberg fixes

Details:
- Ensure "Bearer " is not part of bearer token when passed.
- Add serialization replacement for Nessie exceptions.
- Ensure projected colums filtration is applied.
- Fix handling of serializing binary with respect to arrays.
- Ensure record field are accessed by name instead of position.

Ext-ref: MB-71912
Change-Id: If41bc97040b0ef6dfd45ca085c61e796d0e63c5a
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/nessie/NessieUtils.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestUtils.java
7 files changed, 52 insertions(+), 17 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/63/21263/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 6d17fef..822ebc1 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -139,6 +139,7 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.projectnessie.client.http.HttpClientException;
+import org.projectnessie.client.rest.NessieServiceException;
 import org.projectnessie.error.BaseNessieClientServerException;

 import com.azure.storage.blob.models.BlobStorageException;
@@ -168,6 +169,7 @@
         registerReplacement(BlobStorageException.class, 
SerializableExceptionProxy::new);
         registerReplacement(DataLakeStorageException.class, 
SerializableExceptionProxy::new);
         registerReplacement(BaseNessieClientServerException.class, 
SerializableExceptionProxy::new);
+        registerReplacement(NessieServiceException.class, 
SerializableExceptionProxy::new);
         registerReplacement(HttpClientException.class, 
SerializableExceptionProxy::new);
     }

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 7566206..be43cc0 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -117,6 +117,7 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.projectnessie.client.http.HttpClientException;
+import org.projectnessie.client.rest.NessieServiceException;
 import org.projectnessie.error.BaseNessieClientServerException;

 import com.azure.storage.blob.models.BlobStorageException;
@@ -155,6 +156,7 @@
         registerReplacement(BlobStorageException.class, 
SerializableExceptionProxy::new);
         registerReplacement(DataLakeStorageException.class, 
SerializableExceptionProxy::new);
         registerReplacement(BaseNessieClientServerException.class, 
SerializableExceptionProxy::new);
+        registerReplacement(NessieServiceException.class, 
SerializableExceptionProxy::new);
         registerReplacement(HttpClientException.class, 
SerializableExceptionProxy::new);
     }

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java
index 6949ef8..06ff6ef 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java
@@ -33,6 +33,8 @@
 import org.apache.asterix.external.util.iceberg.IcebergConstants;
 import org.apache.asterix.external.util.iceberg.IcebergUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
+import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
@@ -76,7 +78,8 @@
         try {
             initializeTable();
         } catch (CompilationException e) {
-            throw HyracksDataException.create(e);
+            Throwable throwable = closeResources(e);
+            throw HyracksDataException.create(throwable);
         }
     }

@@ -158,17 +161,17 @@

     @Override
     public void close() throws IOException {
-        if (iterable != null) {
-            iterable.close();
-        }
-        if (tableFileIo != null) {
-            tableFileIo.close();
-        }
-
+        Throwable throwable = CleanupUtils.closeSilently(iterable, null);
+        throwable = CleanupUtils.closeSilently(tableFileIo, throwable);
         try {
-            IcebergUtils.closeAndCleanup(catalog, catalogProperties);
-        } catch (CompilationException e) {
-            throw HyracksDataException.create(e);
+            if (catalog != null) {
+                IcebergUtils.closeAndCleanup(catalog, catalogProperties);
+            }
+        } catch (Exception ex) {
+            throwable = ExceptionUtils.suppress(throwable, ex);
+        }
+        if (throwable != null) {
+            throw HyracksDataException.create(throwable);
         }
     }

@@ -219,4 +222,18 @@
         }
         throw new IllegalStateException("Snapshot must've been pinned during 
compilation phase");
     }
+
+    private Throwable closeResources(Throwable throwable) {
+        if (tableFileIo != null) {
+            throwable = CleanupUtils.closeSilently(tableFileIo, throwable);
+        }
+        if (catalog != null) {
+            try {
+                IcebergUtils.closeAndCleanup(catalog, catalogProperties);
+            } catch (Exception ex) {
+                throwable = ExceptionUtils.suppress(throwable, ex);
+            }
+        }
+        return throwable;
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
index e8f7d91..17d24ed 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
@@ -152,7 +152,7 @@
             if (projectedFields != null && projectedFields.length > 0) {
                 projectedSchema = projectedSchema.select(projectedFields);
             }
-            scan.project(projectedSchema);
+            scan = scan.project(projectedSchema);
             Expression filterExpression =
                     ((IcebergTableFilterEvaluatorFactory) 
filterEvaluatorFactory).getFilterExpression();
             if (filterExpression != null) {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
index a01be90..1f045d6 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
@@ -95,13 +95,14 @@
             NestedField field = projectedSchema.columns().get(i);
             String fieldName = field.name();
             Type fieldType = field.type();
-            ATypeTag typeTag = getTypeTag(fieldType, record.get(i) == null, 
parserContext);
+            Object fieldValue = record.getField(fieldName);
+            ATypeTag typeTag = getTypeTag(fieldType, fieldValue == null, 
parserContext);
             IValueReference value;
             if (valueEmbedder.shouldEmbed(fieldName, typeTag)) {
                 value = valueEmbedder.getEmbeddedValue();
             } else {
                 valueBuffer.reset();
-                parseValue(fieldType, record.get(i), 
valueBuffer.getDataOutput());
+                parseValue(fieldType, fieldValue, valueBuffer.getDataOutput());
                 value = valueBuffer;
             }

@@ -308,7 +309,14 @@

     private void serializeBinary(Object value, DataOutput out) throws 
HyracksDataException {
         ByteBuffer byteBuffer = (ByteBuffer) value;
-        aBinary.setValue(byteBuffer.array(), 0, byteBuffer.array().length);
+        if (byteBuffer.hasArray()) {
+            aBinary.setValue(byteBuffer.array(), byteBuffer.arrayOffset() + 
byteBuffer.position(),
+                    byteBuffer.remaining());
+        } else {
+            byte[] bytes = new byte[byteBuffer.remaining()];
+            byteBuffer.duplicate().get(bytes);
+            aBinary.setValue(bytes, 0, bytes.length);
+        }
         binarySerde.serialize(aBinary, out);
     }

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/nessie/NessieUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/nessie/NessieUtils.java
index 0e36ac6..0874901 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/nessie/NessieUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/nessie/NessieUtils.java
@@ -155,7 +155,10 @@
         if (notAllowed != null) {
             throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed, 
BEARER_TOKEN_FIELD_NAME);
         }
-
+        // strip "Bearer " prefix if present - the Nessie client adds it 
internally
+        if (bearerToken.startsWith("Bearer ")) {
+            bearerToken = bearerToken.substring("Bearer ".length());
+        }
         catalogProperties.put(NESSIE_AUTHENTICATION_TYPE_FIELD_NAME, 
NESSIE_AUTHENTICATION_TYPE_BEARER);
         catalogProperties.put(NESSIE_AUTHENTICATION_BEARER_TOKEN_FIELD_NAME, 
bearerToken);
     }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestUtils.java
index 569955a..4818b94 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/rest/RestUtils.java
@@ -132,7 +132,10 @@
         if (notAllowed != null) {
             throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed, 
BEARER_TOKEN_FIELD_NAME);
         }
-
+        // Strip "Bearer " prefix if present - the Iceberg REST client adds it 
internally
+        if (bearerToken.startsWith("Bearer ")) {
+            bearerToken = bearerToken.substring("Bearer ".length());
+        }
         catalogProperties.put(AuthProperties.AUTH_TYPE, 
AuthProperties.AUTH_TYPE_OAUTH2);
         
catalogProperties.put(RestConstants.ICEBERG_BEARER_TOKEN_PROPERTY_NAME, 
bearerToken);
     }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21263?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: If41bc97040b0ef6dfd45ca085c61e796d0e63c5a
Gerrit-Change-Number: 21263
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <[email protected]>

Reply via email to