Wail Alkowaileet has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2076

Change subject: [WIP] Jackson parser for JSON format
......................................................................

[WIP] Jackson parser for JSON format

Currently it gives around 4X speedup comapred with ADMDataParser.
TODO:
- Support streams.
- Add tests.
- Check the ability to extend Jakson parser to include ADM types.

Change-Id: Iacf9e496dbe2146f5eeeb1506b945991c300a7de
---
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
M asterixdb/asterix-external-data/pom.xml
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractNestedDataParser.java
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java
M 
asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
M 
asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/IAsterixListBuilder.java
9 files changed, 642 insertions(+), 8 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/76/2076/1

diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index f960ce5..17e9dbe 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -238,6 +238,7 @@
     public static final int METADATA_DROP_FUCTION_IN_USE = 3109;
     public static final int FEED_FAILED_WHILE_GETTING_A_NEW_RECORD = 3110;
     public static final int FEED_START_FEED_WITHOUT_CONNECTION = 3111;
+    public static final int PARSER_COLLECTION_ITEM_CANNOT_BE_NULL = 3112;
 
     // Lifecycle management errors
     public static final int DUPLICATE_PARTITION_ID = 4000;
diff --git 
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties 
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 7362181..23d852d 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -227,6 +227,7 @@
 3109 = Function %1$s is being used. It cannot be dropped
 3110 = Feed failed while reading a new record
 3111 = Feed %1$s is not connected to any dataset
+3112 = Array/Multiset item cannot be null
 
 # Lifecycle management errors
 4000 = Partition id %1$d for node %2$s already in use by node %3$s
diff --git a/asterixdb/asterix-external-data/pom.xml 
b/asterixdb/asterix-external-data/pom.xml
index 37f91ce..eda44cf 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -412,5 +412,9 @@
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractNestedDataParser.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractNestedDataParser.java
new file mode 100644
index 0000000..2080a96
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractNestedDataParser.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.asterix.builders.AbvsBuilderFactory;
+import org.apache.asterix.builders.IARecordBuilder;
+import org.apache.asterix.builders.IAsterixListBuilder;
+import org.apache.asterix.builders.ListBuilderFactory;
+import org.apache.asterix.builders.RecordBuilderFactory;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.om.base.AUnorderedList;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.AUnorderedListType;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
+import org.apache.asterix.om.util.container.IObjectPool;
+import org.apache.asterix.om.util.container.ListObjectPool;
+import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IMutableValueStorage;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+/**
+ * Common methods for JSON and ADM parsers
+ * TODO (wyk): find a way to use Jakson parser for ADM extensions.
+ */
+public abstract class AbstractNestedDataParser extends AbstractDataParser {
+
+    private final IObjectPool<IARecordBuilder, ATypeTag> objectBuilderPool =
+            new ListObjectPool<>(new RecordBuilderFactory());
+    private final IObjectPool<IAsterixListBuilder, ATypeTag> arrayBuilderPool =
+            new ListObjectPool<>(new ListBuilderFactory());
+    private final IObjectPool<IMutableValueStorage, ATypeTag> abvsBuilderPool =
+            new ListObjectPool<>(new AbvsBuilderFactory());
+    private final TreeMap<Integer, BitSet> nullBitmapPool = new TreeMap<>();
+    private final Map<String, IMutableValueStorage> serializedFieldNames = new 
HashMap<>();
+
+    /**
+     * Parse object using the defined recordType.
+     *
+     * @param recordType
+     *            {@value null} if the parsing open object
+     * @param out
+     * @throws HyracksDataException
+     */
+    protected abstract void parseObject(ARecordType recordType, DataOutput 
out) throws IOException;
+
+    /**
+     * Parse array using the defined listType.
+     *
+     * NOTE: currently AsterixDB only supports null values for open collection 
types.
+     *
+     * @param recordType
+     *            {@value null} if the parsing open array
+     * @param out
+     * @throws HyracksDataException
+     */
+    protected abstract void parseArray(AOrderedListType listType, DataOutput 
out) throws IOException;
+
+    /**
+     * Parse multiset using the defined listType.
+     *
+     * NOTE: currently AsterixDB only supports null values for open collection 
types.
+     *
+     * @param recordType
+     *            {@value null} if the parsing open multiset
+     * @param out
+     * @throws HyracksDataException
+     */
+    protected abstract void parseMultiset(AUnorderedList listType, DataOutput 
out) throws HyracksDataException;
+
+    protected IARecordBuilder getObjectBuilder(ARecordType recordType) {
+        final IARecordBuilder objectBuilder = 
objectBuilderPool.allocate(ATypeTag.OBJECT);
+        objectBuilder.reset(recordType);
+        return objectBuilder;
+    }
+
+    protected IAsterixListBuilder getCollectionBuilder(AbstractCollectionType 
collectionType) {
+        final ATypeTag collectionTypeTag = collectionType.getTypeTag();
+        final IAsterixListBuilder collectionBuilder = 
arrayBuilderPool.allocate(collectionTypeTag);
+        collectionBuilder.reset(collectionType);
+        return collectionBuilder;
+    }
+
+    protected IMutableValueStorage getTempBuffer() {
+        IMutableValueStorage tempBuffer = 
abvsBuilderPool.allocate(ATypeTag.BINARY);
+        tempBuffer.reset();
+        return tempBuffer;
+    }
+
+    /**
+     * Experimental idea to see if serializing cost is high.
+     *
+     * @param fieldName
+     * @return
+     * @throws HyracksDataException
+     */
+    protected IMutableValueStorage getSerializedFieldName(String fieldName) 
throws HyracksDataException {
+        IMutableValueStorage serializedFieldName = 
serializedFieldNames.get(fieldName);
+        if (serializedFieldName == null) {
+            aStringFieldName.setValue(fieldName);
+            serializedFieldName = new ArrayBackedValueStorage();
+            stringSerde.serialize(aStringFieldName, 
serializedFieldName.getDataOutput());
+            serializedFieldNames.put(fieldName, serializedFieldName);
+        }
+        return serializedFieldName;
+    }
+
+    protected BitSet getNullBitMap(int size) {
+        final Map.Entry<Integer, BitSet> entry = 
nullBitmapPool.ceilingEntry(size);
+        BitSet nullBitMap = entry != null ? entry.getValue() : null;
+        if (nullBitMap == null) {
+            nullBitMap = new BitSet(size);
+            nullBitmapPool.put(size, nullBitMap);
+        }
+        return nullBitMap;
+    }
+
+    protected void resetPools() {
+        objectBuilderPool.reset();
+        arrayBuilderPool.reset();
+        abvsBuilderPool.reset();
+    }
+
+    protected boolean isNullableType(IAType definedType) {
+        if (definedType == null || definedType.getTypeTag() != ATypeTag.UNION) 
{
+            return false;
+        }
+
+        return ((AUnionType) definedType).isNullableType();
+    }
+
+    protected boolean isMissableType(IAType definedType) {
+        if (definedType == null || definedType.getTypeTag() != ATypeTag.UNION) 
{
+            return false;
+        }
+
+        return ((AUnionType) definedType).isMissableType();
+    }
+
+    protected void checkOptionalConstraints(ARecordType recordType, BitSet 
nullBitmap) throws RuntimeDataException {
+        for (int i = 0; i < nullBitmap.length(); i++) {
+            if (!nullBitmap.get(i) && 
!isMissableType(recordType.getFieldTypes()[i])) {
+                throw new 
RuntimeDataException(ErrorCode.PARSER_TWEET_PARSER_CLOSED_FIELD_NULL,
+                        recordType.getFieldNames()[i]);
+            }
+        }
+    }
+
+    /**
+     * Parser is not expecting definedType to be null.
+     *
+     * @param definedType
+     *            type defined by the user.
+     * @param parsedTypeTag
+     *            parsed type.
+     * @return
+     *         definedType == null => fully_open_complex_type | ANY for flat 
values
+     *         defiendType == parsedTypeTag OR canBeConverted => return 
definedType
+     * 
+     * @throws RuntimeDataException
+     *             type mismatch
+     */
+    protected IAType checkAndGetActualType(IAType definedType, ATypeTag 
parsedTypeTag) throws RuntimeDataException {
+        if (definedType == null || definedType.getTypeTag() == ATypeTag.ANY) {
+            switch (parsedTypeTag) {
+                case OBJECT:
+                    return RecordUtil.FULLY_OPEN_RECORD_TYPE;
+                case ARRAY:
+                    return AOrderedListType.FULL_OPEN_ORDEREDLIST_TYPE;
+                case MULTISET:
+                    return AUnorderedListType.FULLY_OPEN_UNORDEREDLIST_TYPE;
+                default:
+                    return BuiltinType.ANY;
+            }
+        } else if (definedType.getTypeTag() == parsedTypeTag
+                || isConvertable(parsedTypeTag, definedType.getTypeTag())) {
+            return definedType;
+        }
+
+        throw new 
RuntimeDataException(ErrorCode.PARSER_ADM_DATA_PARSER_TYPE_MISMATCH, 
definedType.getTypeName());
+    }
+
+    /**
+     * Check promote/demote rules for mismatched types.
+     * String type is a special case as it can be parsed as 
date/time/datetime/UUID
+     *
+     * @param parsedTypeTag
+     * @param definedTypeTag
+     * @return
+     *         true if it can be converted
+     *         false otherwise
+     */
+    protected boolean isConvertable(ATypeTag parsedTypeTag, ATypeTag 
definedTypeTag) {
+        boolean convertable = parsedTypeTag == ATypeTag.STRING;
+
+        convertable &= definedTypeTag == ATypeTag.UUID || definedTypeTag == 
ATypeTag.DATE
+                || definedTypeTag == ATypeTag.TIME || definedTypeTag == 
ATypeTag.DATETIME;
+
+        return convertable || ATypeHierarchy.canPromote(parsedTypeTag, 
definedTypeTag)
+                || ATypeHierarchy.canDemote(parsedTypeTag, definedTypeTag);
+    }
+
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java
new file mode 100644
index 0000000..40c287d
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.BitSet;
+
+import org.apache.asterix.builders.IARecordBuilder;
+import org.apache.asterix.builders.IAsterixListBuilder;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IStreamDataParser;
+import org.apache.asterix.om.base.ABoolean;
+import org.apache.asterix.om.base.ANull;
+import org.apache.asterix.om.base.AUnorderedList;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.asterix.runtime.exceptions.UnsupportedTypeException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IMutableValueStorage;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+
+public class JSONDataParser extends AbstractNestedDataParser implements 
IStreamDataParser, IRecordDataParser<char[]> {
+
+    private final JsonFactory jsonFactory;
+    private final ARecordType rootType;
+
+    private JsonParser jsonParser;
+
+    public JSONDataParser(ARecordType recordType, JsonFactory jsonFactory, 
boolean isStream) {
+        this.rootType = recordType != null ? recordType : 
RecordUtil.FULLY_OPEN_RECORD_TYPE;
+        this.jsonFactory = jsonFactory;
+    }
+
+    /*
+     ****************************************************
+     * Public methods
+     ****************************************************
+     */
+
+    @Override
+    public void parse(IRawRecord<? extends char[]> record, DataOutput out) 
throws HyracksDataException {
+        try {
+            resetPools();
+            jsonParser = jsonFactory.createParser(record.get());
+            jsonParser.nextToken();
+            parseObject(rootType, out);
+        } catch (IOException e) {
+            throw new 
RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM, e);
+        }
+    }
+
+    @Override
+    public void setInputStream(InputStream in) throws IOException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public boolean parse(DataOutput out) throws HyracksDataException {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public boolean reset(InputStream in) throws IOException {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    /*
+     ****************************************************
+     * Complex types parsers
+     ****************************************************
+     */
+
+    @Override
+    protected void parseObject(ARecordType recordType, DataOutput out) throws 
IOException {
+        final IARecordBuilder objectBuilder = getObjectBuilder(recordType);
+        final IMutableValueStorage valueBuffer = getTempBuffer();
+        final BitSet nullBitMap = 
getNullBitMap(recordType.getFieldTypes().length);
+        while (jsonParser.nextToken() != JsonToken.END_OBJECT) {
+            final String fieldName = jsonParser.getCurrentName();
+            final int fieldIndex = recordType.getFieldIndex(fieldName);
+
+            if (!recordType.isOpen() && fieldIndex < 0) {
+                throw new 
RuntimeDataException(ErrorCode.PARSER_ADM_DATA_PARSER_EXTRA_FIELD_IN_CLOSED_RECORD,
+                        fieldName);
+            }
+            valueBuffer.reset();
+            jsonParser.nextToken();
+
+            if (fieldIndex < 0) {
+                //field is not defined and the type is open
+                parseValue(BuiltinType.ANY, valueBuffer.getDataOutput());
+                objectBuilder.addField(getSerializedFieldName(fieldName), 
valueBuffer);
+            } else {
+                //field is defined
+                final IAType fieldType = recordType.getFieldType(fieldName);
+
+                //fail fast if the current field is not nullable
+                if (jsonParser.currentToken() == JsonToken.VALUE_NULL && 
!isNullableType(fieldType)) {
+                    throw new 
RuntimeDataException(ErrorCode.PARSER_TWEET_PARSER_CLOSED_FIELD_NULL, 
fieldName);
+                }
+
+                nullBitMap.set(fieldIndex);
+                parseValue(fieldType, valueBuffer.getDataOutput());
+                objectBuilder.addField(fieldIndex, valueBuffer);
+            }
+        }
+
+        //Throws exception if there is a violation
+        checkOptionalConstraints(recordType, nullBitMap);
+
+        objectBuilder.write(out, true);
+    }
+
+    @Override
+    protected void parseArray(AOrderedListType listType, DataOutput out) 
throws IOException {
+        final IAsterixListBuilder arrayBuilder = 
getCollectionBuilder(listType);
+        final IMutableValueStorage valueBuffer = getTempBuffer();
+        final boolean isOpen = listType.getItemType().getTypeTag() == 
ATypeTag.ANY;
+        while (jsonParser.nextToken() != JsonToken.END_ARRAY) {
+            valueBuffer.reset();
+
+            if (isOpen) {
+                parseValue(BuiltinType.ANY, valueBuffer.getDataOutput());
+            } else {
+                //fail fast if current value is null
+                if (jsonParser.currentToken() == JsonToken.VALUE_NULL) {
+                    throw new 
RuntimeDataException(ErrorCode.PARSER_COLLECTION_ITEM_CANNOT_BE_NULL);
+                }
+                parseValue(listType.getItemType(), 
valueBuffer.getDataOutput());
+            }
+            arrayBuilder.addItem(valueBuffer);
+        }
+
+        arrayBuilder.write(out, true);
+    }
+
+    @Override
+    protected void parseMultiset(AUnorderedList listType, DataOutput out) 
throws HyracksDataException {
+        throw new UnsupportedTypeException("JSON parser", 
ATypeTag.SERIALIZED_UNORDEREDLIST_TYPE_TAG);
+
+    }
+
+    /*
+     ****************************************************
+     * Value parsers and serializers
+     ****************************************************
+     */
+    private void parseValue(IAType definedType, DataOutput out) throws 
IOException {
+        final ATypeTag currentTypeTag = mapCurrentToken();
+        final IAType actualType = checkAndGetActualType(definedType, 
currentTypeTag);
+        switch (jsonParser.currentToken()) {
+            case VALUE_NULL:
+                nullSerde.serialize(ANull.NULL, out);
+                break;
+            case VALUE_FALSE:
+                booleanSerde.serialize(ABoolean.FALSE, out);
+                break;
+            case VALUE_TRUE:
+                booleanSerde.serialize(ABoolean.TRUE, out);
+                break;
+            case VALUE_NUMBER_INT:
+            case VALUE_NUMBER_FLOAT:
+                serailizeNumeric(actualType.getTypeTag(), out);
+                break;
+            case VALUE_STRING:
+                serializeString(actualType.getTypeTag(), out);
+                break;
+            case START_OBJECT:
+                parseObject((ARecordType) actualType, out);
+                break;
+            case START_ARRAY:
+                parseArray((AOrderedListType) actualType, out);
+                break;
+            default:
+                throw new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, 
jsonParser.currentToken().toString());
+        }
+    }
+
+    private void serailizeNumeric(ATypeTag numericType, DataOutput out) throws 
IOException {
+        final ATypeTag typeToUse = numericType == ATypeTag.ANY ? 
mapCurrentToken() : numericType;
+
+        switch (typeToUse) {
+            case BIGINT:
+                aInt64.setValue(jsonParser.getLongValue());
+                int64Serde.serialize(aInt64, out);
+                break;
+            case INTEGER:
+                aInt32.setValue(jsonParser.getIntValue());
+                int32Serde.serialize(aInt32, out);
+                break;
+            case SMALLINT:
+                aInt16.setValue(jsonParser.getShortValue());
+                int16Serde.serialize(aInt16, out);
+                break;
+            case TINYINT:
+                aInt8.setValue(jsonParser.getByteValue());
+                int8Serde.serialize(aInt8, out);
+                break;
+            case DOUBLE:
+                aDouble.setValue(jsonParser.getDoubleValue());
+                doubleSerde.serialize(aDouble, out);
+                break;
+            case FLOAT:
+                aFloat.setValue(jsonParser.getFloatValue());
+                floatSerde.serialize(aFloat, out);
+                break;
+            default:
+                throw new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, 
jsonParser.currentToken().toString());
+        }
+    }
+
+    private void serializeString(ATypeTag stringVariantType, DataOutput out) 
throws IOException {
+        final String stringValue = jsonParser.getValueAsString();
+        switch (stringVariantType) {
+            case ANY:
+            case STRING:
+                aString.setValue(stringValue);
+                stringSerde.serialize(aString, out);
+                break;
+            case DATE:
+                parseDate(stringValue, out);
+                break;
+            case DATETIME:
+                parseDateTime(stringValue, out);
+                break;
+            case TIME:
+                parseTime(stringValue, out);
+                break;
+            default:
+                throw new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, 
jsonParser.currentToken().toString());
+
+        }
+    }
+
+    private ATypeTag mapCurrentToken() {
+        switch (jsonParser.currentToken()) {
+            case VALUE_FALSE:
+            case VALUE_TRUE:
+                return ATypeTag.BOOLEAN;
+            case VALUE_STRING:
+                return ATypeTag.STRING;
+            case VALUE_NULL:
+                return ATypeTag.NULL;
+            case VALUE_NUMBER_FLOAT:
+                return ATypeTag.DOUBLE;
+            case VALUE_NUMBER_INT:
+                return ATypeTag.BIGINT;
+            case START_OBJECT:
+                return ATypeTag.OBJECT;
+            case START_ARRAY:
+                return ATypeTag.ARRAY;
+            default:
+                return ATypeTag.ANY;
+        }
+    }
+
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java
index 489cf77..394fcb3 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/ADMDataParserFactory.java
@@ -18,24 +18,24 @@
  */
 package org.apache.asterix.external.parser.factory;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import 
org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IStreamDataParser;
-import 
org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
 import org.apache.asterix.external.parser.ADMDataParser;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
 public class ADMDataParserFactory extends 
AbstractRecordStreamParserFactory<char[]> {
 
     private static final long serialVersionUID = 1L;
-    private static final List<String> parserFormats = Collections
-            .unmodifiableList(Arrays.asList("adm", "json", "semi-structured"));
+    private static final List<String> parserFormats =
+            Collections.unmodifiableList(Arrays.asList("adm", 
"semi-structured"));
 
     @Override
     public IRecordDataParser<char[]> createRecordParser(IHyracksTaskContext 
ctx) {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java
new file mode 100644
index 0000000..0a2255e
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.factory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import 
org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IStreamDataParser;
+import org.apache.asterix.external.parser.JSONDataParser;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.exceptions.UnsupportedTypeException;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import com.fasterxml.jackson.core.JsonFactory;
+
+public class JSONDataParserFactory extends 
AbstractRecordStreamParserFactory<char[]> {
+
+    private static final long serialVersionUID = 1L;
+    private static final List<String> parserFormats = 
Collections.unmodifiableList(Arrays.asList("json"));
+    private static final List<ATypeTag> uncompatibleTypes = 
Collections.unmodifiableList(
+            Arrays.asList(ATypeTag.MULTISET, ATypeTag.CIRCLE, ATypeTag.POINT, 
ATypeTag.POINT3D, ATypeTag.POLYGON,
+                    ATypeTag.RECTANGLE, ATypeTag.LINE, ATypeTag.INTERVAL, 
ATypeTag.DAYTIMEDURATION, ATypeTag.DURATION));
+    private final JsonFactory jsonFactory = new JsonFactory();
+
+    @Override
+    public IStreamDataParser createInputStreamParser(IHyracksTaskContext ctx, 
int partition)
+            throws HyracksDataException {
+        return createParser();
+    }
+
+    @Override
+    public void setMetaType(ARecordType metaType) {
+        // no MetaType to set.
+    }
+
+    @Override
+    public List<String> getParserFormats() {
+        return parserFormats;
+    }
+
+    @Override
+    public IRecordDataParser<char[]> createRecordParser(IHyracksTaskContext 
ctx) throws HyracksDataException {
+        return createParser();
+    }
+
+    @Override
+    public Class<?> getRecordClass() {
+        return char[].class;
+    }
+
+    private JSONDataParser createParser() throws HyracksDataException {
+        checkRecordTypeCompatibility(recordType);
+        return new JSONDataParser(recordType, jsonFactory,
+                
ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.STREAM));
+    }
+
+    /**
+     * Check if the defined type contains ADM special types.
+     * if it contains unsupported types: throw an exception
+     * proceed otherwise.
+     * 
+     * @param recordType
+     * @throws HyracksDataException
+     */
+    private void checkRecordTypeCompatibility(ARecordType recordType) throws 
HyracksDataException {
+        final IAType[] fieldTypes = recordType.getFieldTypes();
+        for (IAType type : fieldTypes) {
+            checkTypeCompatibility(type);
+        }
+    }
+
+    private void checkTypeCompatibility(IAType type) throws 
HyracksDataException {
+        if (uncompatibleTypes.contains(type.getTypeTag())) {
+            throw new UnsupportedTypeException("JSON parser", 
type.getTypeTag().serialize());
+        } else if (type.getTypeTag() == ATypeTag.ARRAY) {
+            checkTypeCompatibility(((AOrderedListType) type).getItemType());
+        } else if (type.getTypeTag() == ATypeTag.OBJECT) {
+            checkRecordTypeCompatibility((ARecordType) type);
+        }
+    }
+
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
 
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
index 79289b0..7ce2048 100644
--- 
a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
+++ 
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
@@ -16,6 +16,7 @@
 # under the License.
 #
 org.apache.asterix.external.parser.factory.ADMDataParserFactory
+org.apache.asterix.external.parser.factory.JSONDataParserFactory
 org.apache.asterix.external.parser.factory.DelimitedDataParserFactory
 org.apache.asterix.external.parser.factory.HiveDataParserFactory
 org.apache.asterix.external.parser.factory.RecordWithMetadataParserFactory
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/IAsterixListBuilder.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/IAsterixListBuilder.java
index 5aa95ef..9a7d0c4ee 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/IAsterixListBuilder.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/IAsterixListBuilder.java
@@ -29,7 +29,7 @@
      * @param listType
      *            Type of the list: AUnorderedListType or AOrderedListType.
      */
-    public void reset(AbstractCollectionType listType) throws 
HyracksDataException;
+    public void reset(AbstractCollectionType listType);
 
     /**
      * @param item

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2076
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Iacf9e496dbe2146f5eeeb1506b945991c300a7de
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Wail Alkowaileet <[email protected]>

Reply via email to