>From Wail Alkowaileet <[email protected]>:

Wail Alkowaileet has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18201 )

Change subject: [ASTERIXDB-3367][EXT] Avro Parser for Union types for Bytes 
fix, Change in Map implementation.
......................................................................

[ASTERIXDB-3367][EXT] Avro Parser for Union types for Bytes fix, Change in Map 
implementation.

Change-Id: I584873a47bf409351d6b63979117616bce415c8f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18201
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Wail Alkowaileet <[email protected]>
---
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-types/avro-map/avro-map.03.query.sqlpp
M 
asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.03.adm
M 
asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
M 
asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.02.adm
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileExampleGeneratorUtil.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java
M 
asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-union/avro-union.02.adm
9 files changed, 57 insertions(+), 27 deletions(-)

Approvals:
  Wail Alkowaileet: Looks good to me, approved
  Jenkins: Verified; Verified
  Anon. E. Moose #1000171:




diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileExampleGeneratorUtil.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileExampleGeneratorUtil.java
index d62d2d1..60f4b83 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileExampleGeneratorUtil.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileExampleGeneratorUtil.java
@@ -31,11 +31,12 @@
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.specific.SpecificDatumWriter;
+import org.junit.Test;

 public class AvroFileExampleGeneratorUtil {
     private static final String SCHEMA_STRING = "{\n" + "  \"type\": 
\"record\",\n" + "  \"name\": \"SimpleRecord\",\n"
             + "  \"namespace\": \"com.example\",\n" + "  \"fields\": [\n" + "  
  {\n"
-            + "      \"name\": \"unionField\",\n" + "      \"type\": [\"int\", 
\"string\"],\n"
+            + "      \"name\": \"unionField\",\n" + "      \"type\": [\"int\", 
\"string\", \"bytes\"],\n"
             + "      \"doc\": \"This field can be either an int or a 
string.\"\n" + "    },\n" + "    {\n"
             + "      \"name\": \"mapField\",\n" + "      \"type\": {\n" + "    
    \"type\": \"map\",\n"
             + "        \"values\": \"int\",\n" + "        \"doc\": \"This is a 
map of string keys to int values.\"\n"
@@ -96,7 +97,7 @@

             //second record to be added
             GenericRecord record2 = new GenericData.Record(schema);
-            record2.put("unionField", "Example string");
+            record2.put("unionField", ByteBuffer.wrap(new byte[] { 0x01, 0x05 
}));
             Map<String, Integer> map2 = new HashMap<>();
             map2.put("key3", 3);
             map2.put("key4", 4);
@@ -115,4 +116,9 @@
             e.printStackTrace();
         }
     }
+
+    @Test
+    public void main() throws IOException {
+        writeExample();
+    }
 }
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-types/avro-map/avro-map.03.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-types/avro-map/avro-map.03.query.sqlpp
index b97d9f8..c2ca9ee 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-types/avro-map/avro-map.03.query.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-types/avro-map/avro-map.03.query.sqlpp
@@ -24,6 +24,6 @@

 USE test;

-SELECT RAW a.mapField.key1
+SELECT RAW a.mapField[0]
 FROM AvroDataset a
 ORDER BY a.id;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.02.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.02.adm
index 5560bf9..92d5ea1 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.02.adm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.02.adm
@@ -1,2 +1,2 @@
-{ "key1": 1, "key2": 2 }
-{ "key3": 3, "key4": 4 }
\ No newline at end of file
+[ { "key": "key1", "value": 1 }, { "key": "key2", "value": 2 } ]
+[ { "key": "key3", "value": 3 }, { "key": "key4", "value": 4 } ]
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.03.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.03.adm
index fe0b81f..73b283a 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.03.adm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.03.adm
@@ -1,2 +1,2 @@
-1
-null
\ No newline at end of file
+{ "key": "key1", "value": 1 }
+{ "key": "key3", "value": 3 }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-union/avro-union.02.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-union/avro-union.02.adm
index 15f8776..8fd0212 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-union/avro-union.02.adm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-union/avro-union.02.adm
@@ -1,2 +1,2 @@
 42
-"Example string"
\ No newline at end of file
+hex("0105")
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 57474b3..1e90f98 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -365,7 +365,7 @@
         <placeholder name="adapter" value="S3" />
         <output-dir compare="Text">none</output-dir>
         <source-location>false</source-location>
-        <expected-error>Not an Avro data file.</expected-error>
+        <expected-error>Malformed input stream</expected-error>
       </compilation-unit>
     </test-case>
     <test-case FilePath="external-dataset">
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java
index e048890..3f92f00 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java
@@ -28,6 +28,8 @@
 import java.util.Map;
 import java.util.function.Supplier;

+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
@@ -35,6 +37,7 @@
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.IFeedLogManager;
+import org.apache.avro.InvalidAvroMagicException;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
@@ -133,12 +136,15 @@
     }

     private boolean advance() throws IOException {
-        if (inputStream.advance()) {
-            DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>();
-            dataFileStream = new DataFileStream<>(inputStream, datumReader);
-            return true;
+        try {
+            if (inputStream.advance()) {
+                DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<>();
+                dataFileStream = new DataFileStream<>(inputStream, 
datumReader);
+                return true;
+            }
+        } catch (InvalidAvroMagicException e) {
+            throw new 
RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM, e);
         }
-
         return false;
     }

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
index 985e2b5..1ebe982 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
@@ -132,11 +132,9 @@
             streamRecordReader.configure(context.getTaskContext(), 
streamFactory.createInputStream(context),
                     configuration);
             return streamRecordReader;
-        } catch (InstantiationException | IllegalAccessException | 
InvocationTargetException
-                | NoSuchMethodException e) {
+        } catch (InstantiationException | IllegalAccessException | 
InvocationTargetException | NoSuchMethodException
+                | IOException e) {
             throw HyracksDataException.create(e);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
         }
     }
 }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
index d760c1f..6ee74d7 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
@@ -91,19 +91,26 @@
     }

     private void parseMap(Schema mapSchema, Map<String, ?> map, DataOutput 
out) throws IOException {
-        Schema valueSchema = mapSchema.getValueType();
-        final IMutableValueStorage valueBuffer = 
parserContext.enterCollection();
-        final IMutableValueStorage keyBuffer = parserContext.enterCollection();
+        final IMutableValueStorage item = parserContext.enterCollection();
+        final IMutableValueStorage valueBuffer = parserContext.enterObject();
         IARecordBuilder objectBuilder = 
parserContext.getObjectBuilder(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
+        IAsterixListBuilder listBuilder =
+                
parserContext.getCollectionBuilder(DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE);
         for (Map.Entry<String, ?> entry : map.entrySet()) {
-            keyBuffer.reset();
+            objectBuilder.reset(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
             valueBuffer.reset();
-            serializeString(entry.getKey(), Schema.Type.STRING, 
keyBuffer.getDataOutput());
-            parseValue(valueSchema, entry.getValue(), 
valueBuffer.getDataOutput());
-            objectBuilder.addField(keyBuffer, valueBuffer);
+            serializeString(entry.getKey(), Schema.Type.STRING, 
valueBuffer.getDataOutput());
+            
objectBuilder.addField(parserContext.getSerializedFieldName("key"), 
valueBuffer);
+            valueBuffer.reset();
+            parseValue(mapSchema.getValueType(), entry.getValue(), 
valueBuffer.getDataOutput());
+            
objectBuilder.addField(parserContext.getSerializedFieldName("value"), 
valueBuffer);
+            item.reset();
+            objectBuilder.write(item.getDataOutput(), true);
+            listBuilder.addItem(item);
         }
-        objectBuilder.write(out, true);
+        listBuilder.write(out, true);
         parserContext.exitObject(valueBuffer, null, objectBuilder);
+        parserContext.exitCollection(item, listBuilder);
     }

     private final void parseUnion(Schema unionSchema, Object value, DataOutput 
out) throws IOException {
@@ -134,7 +141,7 @@
             case BOOLEAN:
                 return value instanceof Boolean;
             case BYTES:
-                return value instanceof Byte;
+                return value instanceof ByteBuffer;
             case RECORD:
                 return value instanceof GenericData.Record;
             default:

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18201
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I584873a47bf409351d6b63979117616bce415c8f
Gerrit-Change-Number: 18201
Gerrit-PatchSet: 6
Gerrit-Owner: Ayush Tripathi <[email protected]>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Hussain Towaileb <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Murtadha Makki Al Hubail <[email protected]>
Gerrit-Reviewer: Peeyush Gupta <[email protected]>
Gerrit-Reviewer: Wail Alkowaileet
Gerrit-Reviewer: Wail Alkowaileet <[email protected]>
Gerrit-MessageType: merged

Reply via email to