Wail Alkowaileet has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2076
Change subject: [WIP] Jackson parser for JSON format
......................................................................
[WIP] Jackson parser for JSON format
Currently it gives around 4X speedup comapred with ADMDataParser.
TODO:
- Support streams.
- Add tests.
- Check the ability to extend Jakson parser to include ADM types.
Change-Id: Iacf9e496dbe2146f5eeeb1506b945991c300a7de
---
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
M asterixdb/asterix-external-data/pom.xml
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractNestedDataParser.java
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java
M
asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
M
asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/IAsterixListBuilder.java
9 files changed, 642 insertions(+), 8 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/76/2076/1
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index f960ce5..17e9dbe 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -238,6 +238,7 @@
public static final int METADATA_DROP_FUCTION_IN_USE = 3109;
public static final int FEED_FAILED_WHILE_GETTING_A_NEW_RECORD = 3110;
public static final int FEED_START_FEED_WITHOUT_CONNECTION = 3111;
+ public static final int PARSER_COLLECTION_ITEM_CANNOT_BE_NULL = 3112;
// Lifecycle management errors
public static final int DUPLICATE_PARTITION_ID = 4000;
diff --git
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 7362181..23d852d 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -227,6 +227,7 @@
3109 = Function %1$s is being used. It cannot be dropped
3110 = Feed failed while reading a new record
3111 = Feed %1$s is not connected to any dataset
+3112 = Array/Multiset item cannot be null
# Lifecycle management errors
4000 = Partition id %1$d for node %2$s already in use by node %3$s
diff --git a/asterixdb/asterix-external-data/pom.xml
b/asterixdb/asterix-external-data/pom.xml
index 37f91ce..eda44cf 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -412,5 +412,9 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractNestedDataParser.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractNestedDataParser.java
new file mode 100644
index 0000000..2080a96
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractNestedDataParser.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.asterix.builders.AbvsBuilderFactory;
+import org.apache.asterix.builders.IARecordBuilder;
+import org.apache.asterix.builders.IAsterixListBuilder;
+import org.apache.asterix.builders.ListBuilderFactory;
+import org.apache.asterix.builders.RecordBuilderFactory;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.om.base.AUnorderedList;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.AUnorderedListType;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
+import org.apache.asterix.om.util.container.IObjectPool;
+import org.apache.asterix.om.util.container.ListObjectPool;
+import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IMutableValueStorage;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+/**
+ * Common methods for JSON and ADM parsers
+ * TODO (wyk): find a way to use Jakson parser for ADM extensions.
+ */
+public abstract class AbstractNestedDataParser extends AbstractDataParser {
+
+ private final IObjectPool<IARecordBuilder, ATypeTag> objectBuilderPool =
+ new ListObjectPool<>(new RecordBuilderFactory());
+ private final IObjectPool<IAsterixListBuilder, ATypeTag> arrayBuilderPool =
+ new ListObjectPool<>(new ListBuilderFactory());
+ private final IObjectPool<IMutableValueStorage, ATypeTag> abvsBuilderPool =
+ new ListObjectPool<>(new AbvsBuilderFactory());
+ private final TreeMap<Integer, BitSet> nullBitmapPool = new TreeMap<>();
+ private final Map<String, IMutableValueStorage> serializedFieldNames = new
HashMap<>();
+
+ /**
+ * Parse object using the defined recordType.
+ *
+ * @param recordType
+ * {@value null} if the parsing open object
+ * @param out
+ * @throws HyracksDataException
+ */
+ protected abstract void parseObject(ARecordType recordType, DataOutput
out) throws IOException;
+
+ /**
+ * Parse array using the defined listType.
+ *
+ * NOTE: currently AsterixDB only supports null values for open collection
types.
+ *
+ * @param recordType
+ * {@value null} if the parsing open array
+ * @param out
+ * @throws HyracksDataException
+ */
+ protected abstract void parseArray(AOrderedListType listType, DataOutput
out) throws IOException;
+
+ /**
+ * Parse multiset using the defined listType.
+ *
+ * NOTE: currently AsterixDB only supports null values for open collection
types.
+ *
+ * @param recordType
+ * {@value null} if the parsing open multiset
+ * @param out
+ * @throws HyracksDataException
+ */
+ protected abstract void parseMultiset(AUnorderedList listType, DataOutput
out) throws HyracksDataException;
+
+ protected IARecordBuilder getObjectBuilder(ARecordType recordType) {
+ final IARecordBuilder objectBuilder =
objectBuilderPool.allocate(ATypeTag.OBJECT);
+ objectBuilder.reset(recordType);
+ return objectBuilder;
+ }
+
+ protected IAsterixListBuilder getCollectionBuilder(AbstractCollectionType
collectionType) {
+ final ATypeTag collectionTypeTag = collectionType.getTypeTag();
+ final IAsterixListBuilder collectionBuilder =
arrayBuilderPool.allocate(collectionTypeTag);
+ collectionBuilder.reset(collectionType);
+ return collectionBuilder;
+ }
+
+ protected IMutableValueStorage getTempBuffer() {
+ IMutableValueStorage tempBuffer =
abvsBuilderPool.allocate(ATypeTag.BINARY);
+ tempBuffer.reset();
+ return tempBuffer;
+ }
+
+ /**
+ * Experimental idea to see if serializing cost is high.
+ *
+ * @param fieldName
+ * @return
+ * @throws HyracksDataException
+ */
+ protected IMutableValueStorage getSerializedFieldName(String fieldName)
throws HyracksDataException {
+ IMutableValueStorage serializedFieldName =
serializedFieldNames.get(fieldName);
+ if (serializedFieldName == null) {
+ aStringFieldName.setValue(fieldName);
+ serializedFieldName = new ArrayBackedValueStorage();
+ stringSerde.serialize(aStringFieldName,
serializedFieldName.getDataOutput());
+ serializedFieldNames.put(fieldName, serializedFieldName);
+ }
+ return serializedFieldName;
+ }
+
+ protected BitSet getNullBitMap(int size) {
+ final Map.Entry<Integer, BitSet> entry =
nullBitmapPool.ceilingEntry(size);
+ BitSet nullBitMap = entry != null ? entry.getValue() : null;
+ if (nullBitMap == null) {
+ nullBitMap = new BitSet(size);
+ nullBitmapPool.put(size, nullBitMap);
+ }
+ return nullBitMap;
+ }
+
+ protected void resetPools() {
+ objectBuilderPool.reset();
+ arrayBuilderPool.reset();
+ abvsBuilderPool.reset();
+ }
+
+ protected boolean isNullableType(IAType definedType) {
+ if (definedType == null || definedType.getTypeTag() != ATypeTag.UNION)
{
+ return false;
+ }
+
+ return ((AUnionType) definedType).isNullableType();
+ }
+
+ protected boolean isMissableType(IAType definedType) {
+ if (definedType == null || definedType.getTypeTag() != ATypeTag.UNION)
{
+ return false;
+ }
+
+ return ((AUnionType) definedType).isMissableType();
+ }
+
+ protected void checkOptionalConstraints(ARecordType recordType, BitSet
nullBitmap) throws RuntimeDataException {
+ for (int i = 0; i < nullBitmap.length(); i++) {
+ if (!nullBitmap.get(i) &&
!isMissableType(recordType.getFieldTypes()[i])) {
+ throw new
RuntimeDataException(ErrorCode.PARSER_TWEET_PARSER_CLOSED_FIELD_NULL,
+ recordType.getFieldNames()[i]);
+ }
+ }
+ }
+
+ /**
+ * Parser is not expecting definedType to be null.
+ *
+ * @param definedType
+ * type defined by the user.
+ * @param parsedTypeTag
+ * parsed type.
+ * @return
+ * definedType == null => fully_open_complex_type | ANY for flat
values
+ * defiendType == parsedTypeTag OR canBeConverted => return
definedType
+ *
+ * @throws RuntimeDataException
+ * type mismatch
+ */
+ protected IAType checkAndGetActualType(IAType definedType, ATypeTag
parsedTypeTag) throws RuntimeDataException {
+ if (definedType == null || definedType.getTypeTag() == ATypeTag.ANY) {
+ switch (parsedTypeTag) {
+ case OBJECT:
+ return RecordUtil.FULLY_OPEN_RECORD_TYPE;
+ case ARRAY:
+ return AOrderedListType.FULL_OPEN_ORDEREDLIST_TYPE;
+ case MULTISET:
+ return AUnorderedListType.FULLY_OPEN_UNORDEREDLIST_TYPE;
+ default:
+ return BuiltinType.ANY;
+ }
+ } else if (definedType.getTypeTag() == parsedTypeTag
+ || isConvertable(parsedTypeTag, definedType.getTypeTag())) {
+ return definedType;
+ }
+
+ throw new
RuntimeDataException(ErrorCode.PARSER_ADM_DATA_PARSER_TYPE_MISMATCH,
definedType.getTypeName());
+ }
+
+ /**
+ * Check promote/demote rules for mismatched types.
+ * String type is a special case as it can be parsed as
date/time/datetime/UUID
+ *
+ * @param parsedTypeTag
+ * @param definedTypeTag
+ * @return
+ * true if it can be converted
+ * false otherwise
+ */
+ protected boolean isConvertable(ATypeTag parsedTypeTag, ATypeTag
definedTypeTag) {
+ boolean convertable = parsedTypeTag == ATypeTag.STRING;
+
+ convertable &= definedTypeTag == ATypeTag.UUID || definedTypeTag ==
ATypeTag.DATE
+ || definedTypeTag == ATypeTag.TIME || definedTypeTag ==
ATypeTag.DATETIME;
+
+ return convertable || ATypeHierarchy.canPromote(parsedTypeTag,
definedTypeTag)
+ || ATypeHierarchy.canDemote(parsedTypeTag, definedTypeTag);
+ }
+
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java
new file mode 100644
index 0000000..40c287d
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.BitSet;
+
+import org.apache.asterix.builders.IARecordBuilder;
+import org.apache.asterix.builders.IAsterixListBuilder;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IStreamDataParser;
+import org.apache.asterix.om.base.ABoolean;
+import org.apache.asterix.om.base.ANull;
+import org.apache.asterix.om.base.AUnorderedList;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.asterix.runtime.exceptions.UnsupportedTypeException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IMutableValueStorage;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+
+public class JSONDataParser extends AbstractNestedDataParser implements
IStreamDataParser, IRecordDataParser<char[]> {
+
+ private final JsonFactory jsonFactory;
+ private final ARecordType rootType;
+
+ private JsonParser jsonParser;
+
+ public JSONDataParser(ARecordType recordType, JsonFactory jsonFactory,
boolean isStream) {
+ this.rootType = recordType != null ? recordType :
RecordUtil.FULLY_OPEN_RECORD_TYPE;
+ this.jsonFactory = jsonFactory;
+ }
+
+ /*
+ ****************************************************
+ * Public methods
+ ****************************************************
+ */
+
+ @Override
+ public void parse(IRawRecord<? extends char[]> record, DataOutput out)
throws HyracksDataException {
+ try {
+ resetPools();
+ jsonParser = jsonFactory.createParser(record.get());
+ jsonParser.nextToken();
+ parseObject(rootType, out);
+ } catch (IOException e) {
+ throw new
RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM, e);
+ }
+ }
+
+ @Override
+ public void setInputStream(InputStream in) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public boolean parse(DataOutput out) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean reset(InputStream in) throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ /*
+ ****************************************************
+ * Complex types parsers
+ ****************************************************
+ */
+
+ @Override
+ protected void parseObject(ARecordType recordType, DataOutput out) throws
IOException {
+ final IARecordBuilder objectBuilder = getObjectBuilder(recordType);
+ final IMutableValueStorage valueBuffer = getTempBuffer();
+ final BitSet nullBitMap =
getNullBitMap(recordType.getFieldTypes().length);
+ while (jsonParser.nextToken() != JsonToken.END_OBJECT) {
+ final String fieldName = jsonParser.getCurrentName();
+ final int fieldIndex = recordType.getFieldIndex(fieldName);
+
+ if (!recordType.isOpen() && fieldIndex < 0) {
+ throw new
RuntimeDataException(ErrorCode.PARSER_ADM_DATA_PARSER_EXTRA_FIELD_IN_CLOSED_RECORD,
+ fieldName);
+ }
+ valueBuffer.reset();
+ jsonParser.nextToken();
+
+ if (fieldIndex < 0) {
+ //field is not defined and the type is open
+ parseValue(BuiltinType.ANY, valueBuffer.getDataOutput());
+ objectBuilder.addField(getSerializedFieldName(fieldName),
valueBuffer);
+ } else {
+ //field is defined
+ final IAType fieldType = recordType.getFieldType(fieldName);
+
+ //fail fast if the current field is not nullable
+ if (jsonParser.currentToken() == JsonToken.VALUE_NULL &&
!isNullableType(fieldType)) {
+ throw new
RuntimeDataException(ErrorCode.PARSER_TWEET_PARSER_CLOSED_FIELD_NULL,
fieldName);
+ }
+
+ nullBitMap.set(fieldIndex);
+ parseValue(fieldType, valueBuffer.getDataOutput());
+ objectBuilder.addField(fieldIndex, valueBuffer);
+ }
+ }
+
+ //Throws exception if there is a violation
+ checkOptionalConstraints(recordType, nullBitMap);
+
+ objectBuilder.write(out, true);
+ }
+
+ @Override
+ protected void parseArray(AOrderedListType listType, DataOutput out)
throws IOException {
+ final IAsterixListBuilder arrayBuilder =
getCollectionBuilder(listType);
+ final IMutableValueStorage valueBuffer = getTempBuffer();
+ final boolean isOpen = listType.getItemType().getTypeTag() ==
ATypeTag.ANY;
+ while (jsonParser.nextToken() != JsonToken.END_ARRAY) {
+ valueBuffer.reset();
+
+ if (isOpen) {
+ parseValue(BuiltinType.ANY, valueBuffer.getDataOutput());
+ } else {
+ //fail fast if current value is null
+ if (jsonParser.currentToken() == JsonToken.VALUE_NULL) {
+ throw new
RuntimeDataException(ErrorCode.PARSER_COLLECTION_ITEM_CANNOT_BE_NULL);
+ }
+ parseValue(listType.getItemType(),
valueBuffer.getDataOutput());
+ }
+ arrayBuilder.addItem(valueBuffer);
+ }
+
+ arrayBuilder.write(out, true);
+ }
+
+ @Override
+ protected void parseMultiset(AUnorderedList listType, DataOutput out)
throws HyracksDataException {
+ throw new UnsupportedTypeException("JSON parser",
ATypeTag.SERIALIZED_UNORDEREDLIST_TYPE_TAG);
+
+ }
+
+ /*
+ ****************************************************
+ * Value parsers and serializers
+ ****************************************************
+ */
+ private void parseValue(IAType definedType, DataOutput out) throws
IOException {
+ final ATypeTag currentTypeTag = mapCurrentToken();
+ final IAType actualType = checkAndGetActualType(definedType,
currentTypeTag);
+ switch (jsonParser.currentToken()) {
+ case VALUE_NULL:
+ nullSerde.serialize(ANull.NULL, out);
+ break;
+ case VALUE_FALSE:
+ booleanSerde.serialize(ABoolean.FALSE, out);
+ break;
+ case VALUE_TRUE:
+ booleanSerde.serialize(ABoolean.TRUE, out);
+ break;
+ case VALUE_NUMBER_INT:
+ case VALUE_NUMBER_FLOAT:
+ serailizeNumeric(actualType.getTypeTag(), out);
+ break;
+ case VALUE_STRING:
+ serializeString(actualType.getTypeTag(), out);
+ break;
+ case START_OBJECT:
+ parseObject((ARecordType) actualType, out);
+ break;
+ case START_ARRAY:
+ parseArray((AOrderedListType) actualType, out);
+ break;
+ default:
+ throw new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED,
jsonParser.currentToken().toString());
+ }
+ }
+
+ private void serailizeNumeric(ATypeTag numericType, DataOutput out) throws
IOException {
+ final ATypeTag typeToUse = numericType == ATypeTag.ANY ?
mapCurrentToken() : numericType;
+
+ switch (typeToUse) {
+ case BIGINT:
+ aInt64.setValue(jsonParser.getLongValue());
+ int64Serde.serialize(aInt64, out);
+ break;
+ case INTEGER:
+ aInt32.setValue(jsonParser.getIntValue());
+ int32Serde.serialize(aInt32, out);
+ break;
+ case SMALLINT:
+ aInt16.setValue(jsonParser.getShortValue());
+ int16Serde.serialize(aInt16, out);
+ break;
+ case TINYINT:
+ aInt8.setValue(jsonParser.getByteValue());
+ int8Serde.serialize(aInt8, out);
+ break;
+ case DOUBLE:
+ aDouble.setValue(jsonParser.getDoubleValue());
+ doubleSerde.serialize(aDouble, out);
+ break;
+ case FLOAT:
+ aFloat.setValue(jsonParser.getFloatValue());
+ floatSerde.serialize(aFloat, out);
+ break;
+ default:
+ throw new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED,
jsonParser.currentToken().toString());
+ }
+ }
+
+ private void serializeString(ATypeTag stringVariantType, DataOutput out)
throws IOException {
+ final String stringValue = jsonParser.getValueAsString();
+ switch (stringVariantType) {
+ case ANY:
+ case STRING:
+ aString.setValue(stringValue);
+ stringSerde.serialize(aString, out);
+ break;
+ case DATE:
+ parseDate(stringValue, out);
+ break;
+ case DATETIME:
+ parseDateTime(stringValue, out);
+ break;
+ case TIME:
+ parseTime(stringValue, out);
+ break;
+ default:
+ throw new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED,
jsonParser.currentToken().toString());
+
+ }
+ }
+
+ private ATypeTag mapCurrentToken() {
+ switch (jsonParser.currentToken()) {
+ case VALUE_FALSE:
+ case VALUE_TRUE:
+ return ATypeTag.BOOLEAN;
+ case VALUE_STRING:
+ return ATypeTag.STRING;
+ case VALUE_NULL:
+ return ATypeTag.NULL;
+ case VALUE_NUMBER_FLOAT:
+ return ATypeTag.DOUBLE;
+ case VALUE_NUMBER_INT:
+ return ATypeTag.BIGINT;
+ case START_OBJECT:
+ return ATypeTag.OBJECT;
+ case START_ARRAY:
+ return ATypeTag.ARRAY;
+ default:
+ return ATypeTag.ANY;
+ }
+ }
+
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java
index 489cf77..394fcb3 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java
@@ -18,24 +18,24 @@
*/
package org.apache.asterix.external.parser.factory;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import
org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IStreamDataParser;
-import
org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
import org.apache.asterix.external.parser.ADMDataParser;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
public class ADMDataParserFactory extends
AbstractRecordStreamParserFactory<char[]> {
private static final long serialVersionUID = 1L;
- private static final List<String> parserFormats = Collections
- .unmodifiableList(Arrays.asList("adm", "json", "semi-structured"));
+ private static final List<String> parserFormats =
+ Collections.unmodifiableList(Arrays.asList("adm",
"semi-structured"));
@Override
public IRecordDataParser<char[]> createRecordParser(IHyracksTaskContext
ctx) {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java
new file mode 100644
index 0000000..0a2255e
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.factory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import
org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IStreamDataParser;
+import org.apache.asterix.external.parser.JSONDataParser;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.exceptions.UnsupportedTypeException;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import com.fasterxml.jackson.core.JsonFactory;
+
+public class JSONDataParserFactory extends
AbstractRecordStreamParserFactory<char[]> {
+
+ private static final long serialVersionUID = 1L;
+ private static final List<String> parserFormats =
Collections.unmodifiableList(Arrays.asList("json"));
+ private static final List<ATypeTag> uncompatibleTypes =
Collections.unmodifiableList(
+ Arrays.asList(ATypeTag.MULTISET, ATypeTag.CIRCLE, ATypeTag.POINT,
ATypeTag.POINT3D, ATypeTag.POLYGON,
+ ATypeTag.RECTANGLE, ATypeTag.LINE, ATypeTag.INTERVAL,
ATypeTag.DAYTIMEDURATION, ATypeTag.DURATION));
+ private final JsonFactory jsonFactory = new JsonFactory();
+
+ @Override
+ public IStreamDataParser createInputStreamParser(IHyracksTaskContext ctx,
int partition)
+ throws HyracksDataException {
+ return createParser();
+ }
+
+ @Override
+ public void setMetaType(ARecordType metaType) {
+ // no MetaType to set.
+ }
+
+ @Override
+ public List<String> getParserFormats() {
+ return parserFormats;
+ }
+
+ @Override
+ public IRecordDataParser<char[]> createRecordParser(IHyracksTaskContext
ctx) throws HyracksDataException {
+ return createParser();
+ }
+
+ @Override
+ public Class<?> getRecordClass() {
+ return char[].class;
+ }
+
+ private JSONDataParser createParser() throws HyracksDataException {
+ checkRecordTypeCompatibility(recordType);
+ return new JSONDataParser(recordType, jsonFactory,
+
ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.STREAM));
+ }
+
+ /**
+ * Check if the defined type contains ADM special types.
+ * if it contains unsupported types: throw an exception
+ * proceed otherwise.
+ *
+ * @param recordType
+ * @throws HyracksDataException
+ */
+ private void checkRecordTypeCompatibility(ARecordType recordType) throws
HyracksDataException {
+ final IAType[] fieldTypes = recordType.getFieldTypes();
+ for (IAType type : fieldTypes) {
+ checkTypeCompatibility(type);
+ }
+ }
+
+ private void checkTypeCompatibility(IAType type) throws
HyracksDataException {
+ if (uncompatibleTypes.contains(type.getTypeTag())) {
+ throw new UnsupportedTypeException("JSON parser",
type.getTypeTag().serialize());
+ } else if (type.getTypeTag() == ATypeTag.ARRAY) {
+ checkTypeCompatibility(((AOrderedListType) type).getItemType());
+ } else if (type.getTypeTag() == ATypeTag.OBJECT) {
+ checkRecordTypeCompatibility((ARecordType) type);
+ }
+ }
+
+}
diff --git
a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
index 79289b0..7ce2048 100644
---
a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
+++
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
@@ -16,6 +16,7 @@
# under the License.
#
org.apache.asterix.external.parser.factory.ADMDataParserFactory
+org.apache.asterix.external.parser.factory.JSONDataParserFactory
org.apache.asterix.external.parser.factory.DelimitedDataParserFactory
org.apache.asterix.external.parser.factory.HiveDataParserFactory
org.apache.asterix.external.parser.factory.RecordWithMetadataParserFactory
diff --git
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/IAsterixListBuilder.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/IAsterixListBuilder.java
index 5aa95ef..9a7d0c4ee 100644
---
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/IAsterixListBuilder.java
+++
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/IAsterixListBuilder.java
@@ -29,7 +29,7 @@
* @param listType
* Type of the list: AUnorderedListType or AOrderedListType.
*/
- public void reset(AbstractCollectionType listType) throws
HyracksDataException;
+ public void reset(AbstractCollectionType listType);
/**
* @param item
--
To view, visit https://asterix-gerrit.ics.uci.edu/2076
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Iacf9e496dbe2146f5eeeb1506b945991c300a7de
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Wail Alkowaileet <[email protected]>