>From Wail Alkowaileet <[email protected]>: Wail Alkowaileet has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18201 )
Change subject: [ASTERIXDB-3367][EXT] Avro Parser for Union types for Bytes fix, Change in Map implementation. ...................................................................... [ASTERIXDB-3367][EXT] Avro Parser for Union types for Bytes fix, Change in Map implementation. Change-Id: I584873a47bf409351d6b63979117616bce415c8f Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18201 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Wail Alkowaileet <[email protected]> --- M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-types/avro-map/avro-map.03.query.sqlpp M asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.03.adm M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java M asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.02.adm M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileExampleGeneratorUtil.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java M asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-union/avro-union.02.adm 9 files changed, 57 insertions(+), 27 deletions(-) Approvals: Wail Alkowaileet: Looks good to me, approved Jenkins: Verified; Verified Anon. E. Moose #1000171: diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileExampleGeneratorUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileExampleGeneratorUtil.java index d62d2d1..60f4b83 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileExampleGeneratorUtil.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileExampleGeneratorUtil.java @@ -31,11 +31,12 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; import org.apache.avro.specific.SpecificDatumWriter; +import org.junit.Test; public class AvroFileExampleGeneratorUtil { private static final String SCHEMA_STRING = "{\n" + " \"type\": \"record\",\n" + " \"name\": \"SimpleRecord\",\n" + " \"namespace\": \"com.example\",\n" + " \"fields\": [\n" + " {\n" - + " \"name\": \"unionField\",\n" + " \"type\": [\"int\", \"string\"],\n" + + " \"name\": \"unionField\",\n" + " \"type\": [\"int\", \"string\", \"bytes\"],\n" + " \"doc\": \"This field can be either an int or a string.\"\n" + " },\n" + " {\n" + " \"name\": \"mapField\",\n" + " \"type\": {\n" + " \"type\": \"map\",\n" + " \"values\": \"int\",\n" + " \"doc\": \"This is a map of string keys to int values.\"\n" @@ -96,7 +97,7 @@ //second record to be added GenericRecord record2 = new GenericData.Record(schema); - record2.put("unionField", "Example string"); + record2.put("unionField", ByteBuffer.wrap(new byte[] { 0x01, 0x05 })); Map<String, Integer> map2 = new HashMap<>(); map2.put("key3", 3); map2.put("key4", 4); @@ -115,4 +116,9 @@ e.printStackTrace(); } } + + @Test + public void main() throws IOException { + writeExample(); + } } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-types/avro-map/avro-map.03.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-types/avro-map/avro-map.03.query.sqlpp index b97d9f8..c2ca9ee 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-types/avro-map/avro-map.03.query.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-types/avro-map/avro-map.03.query.sqlpp @@ -24,6 +24,6 @@ USE test; -SELECT RAW a.mapField.key1 +SELECT RAW a.mapField[0] FROM AvroDataset a ORDER BY a.id; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.02.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.02.adm index 5560bf9..92d5ea1 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.02.adm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.02.adm @@ -1,2 +1,2 @@ -{ "key1": 1, "key2": 2 } -{ "key3": 3, "key4": 4 } \ No newline at end of file +[ { "key": "key1", "value": 1 }, { "key": "key2", "value": 2 } ] +[ { "key": "key3", "value": 3 }, { "key": "key4", "value": 4 } ] diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.03.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.03.adm index fe0b81f..73b283a 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.03.adm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-map/avro-map.03.adm @@ -1,2 +1,2 @@ -1 -null \ No newline at end of file +{ "key": "key1", "value": 1 } +{ "key": "key3", "value": 3 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-union/avro-union.02.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-union/avro-union.02.adm index 15f8776..8fd0212 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-union/avro-union.02.adm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-types/avro-union/avro-union.02.adm @@ -1,2 +1,2 @@ 42 -"Example string" \ No newline at end of file +hex("0105") diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml index 57474b3..1e90f98 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml @@ -365,7 +365,7 @@ <placeholder name="adapter" value="S3" /> <output-dir compare="Text">none</output-dir> <source-location>false</source-location> - <expected-error>Not an Avro data file.</expected-error> + <expected-error>Malformed input stream</expected-error> </compilation-unit> </test-case> <test-case FilePath="external-dataset"> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java index e048890..3f92f00 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroRecordReader.java @@ -28,6 +28,8 @@ import java.util.Map; import java.util.function.Supplier; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.api.IRawRecord; import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; @@ -35,6 +37,7 @@ import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.external.util.IFeedLogManager; +import org.apache.avro.InvalidAvroMagicException; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; @@ -133,12 +136,15 @@ } private boolean advance() throws IOException { - if (inputStream.advance()) { - DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); - dataFileStream = new DataFileStream<>(inputStream, datumReader); - return true; + try { + if (inputStream.advance()) { + DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); + dataFileStream = new DataFileStream<>(inputStream, datumReader); + return true; + } + } catch (InvalidAvroMagicException e) { + throw new RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM, e); } - return false; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java index 985e2b5..1ebe982 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java @@ -132,11 +132,9 @@ streamRecordReader.configure(context.getTaskContext(), streamFactory.createInputStream(context), configuration); return streamRecordReader; - } catch (InstantiationException | IllegalAccessException | InvocationTargetException - | NoSuchMethodException e) { + } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException + | IOException e) { throw HyracksDataException.create(e); - } catch (IOException e) { - throw new RuntimeException(e); } } } \ No newline at end of file diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java index d760c1f..6ee74d7 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java @@ -91,19 +91,26 @@ } private void parseMap(Schema mapSchema, Map<String, ?> map, DataOutput out) throws IOException { - Schema valueSchema = mapSchema.getValueType(); - final IMutableValueStorage valueBuffer = parserContext.enterCollection(); - final IMutableValueStorage keyBuffer = parserContext.enterCollection(); + final IMutableValueStorage item = parserContext.enterCollection(); + final IMutableValueStorage valueBuffer = parserContext.enterObject(); IARecordBuilder objectBuilder = parserContext.getObjectBuilder(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE); + IAsterixListBuilder listBuilder = + parserContext.getCollectionBuilder(DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE); for (Map.Entry<String, ?> entry : map.entrySet()) { - keyBuffer.reset(); + objectBuilder.reset(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE); valueBuffer.reset(); - serializeString(entry.getKey(), Schema.Type.STRING, keyBuffer.getDataOutput()); - parseValue(valueSchema, entry.getValue(), valueBuffer.getDataOutput()); - objectBuilder.addField(keyBuffer, valueBuffer); + serializeString(entry.getKey(), Schema.Type.STRING, valueBuffer.getDataOutput()); + objectBuilder.addField(parserContext.getSerializedFieldName("key"), valueBuffer); + valueBuffer.reset(); + parseValue(mapSchema.getValueType(), entry.getValue(), valueBuffer.getDataOutput()); + objectBuilder.addField(parserContext.getSerializedFieldName("value"), valueBuffer); + item.reset(); + objectBuilder.write(item.getDataOutput(), true); + listBuilder.addItem(item); } - objectBuilder.write(out, true); + listBuilder.write(out, true); parserContext.exitObject(valueBuffer, null, objectBuilder); + parserContext.exitCollection(item, listBuilder); } private final void parseUnion(Schema unionSchema, Object value, DataOutput out) throws IOException { @@ -134,7 +141,7 @@ case BOOLEAN: return value instanceof Boolean; case BYTES: - return value instanceof Byte; + return value instanceof ByteBuffer; case RECORD: return value instanceof GenericData.Record; default: -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18201 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: I584873a47bf409351d6b63979117616bce415c8f Gerrit-Change-Number: 18201 Gerrit-PatchSet: 6 Gerrit-Owner: Ayush Tripathi <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Makki Al Hubail <[email protected]> Gerrit-Reviewer: Peeyush Gupta <[email protected]> Gerrit-Reviewer: Wail Alkowaileet Gerrit-Reviewer: Wail Alkowaileet <[email protected]> Gerrit-MessageType: merged
