Yingyi Bu has submitted this change and it was merged. Change subject: TweetParser Extension ......................................................................
TweetParser Extension This patch includes following changes: 1. ExtendedTweetParser to parse more than fix attributes. 2. Changed the twitter feeds message unit from Status to String. Change-Id: I7021e7b779de05b9ec999a8d5f8464fb0ab413c0 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1002 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Yingyi Bu <[email protected]> --- A asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/revised-tweet-parser/revised-tweet-parser.1.ddl.aql A asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/revised-tweet-parser/revised-tweet-parser.2.update.aql M asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/TypeTagUtil.java 15 files changed, 482 insertions(+), 154 deletions(-) Approvals: Yingyi Bu: Looks good to me, approved Jenkins: Verified; No violations found; Verified diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/revised-tweet-parser/revised-tweet-parser.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/revised-tweet-parser/revised-tweet-parser.1.ddl.aql new file mode 100644 index 0000000..ddffc43 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/revised-tweet-parser/revised-tweet-parser.1.ddl.aql @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : This test query will check the revised + * parser from two perspective: open Tweet + * will automatically includes all fields + * from tweet status; closed TwitterUser + * type will only includes specified fields. + * consumer.secret is missing here to create + * exception. + * Expected Res : Failure + */ + +drop dataverse feeds if exists; +create dataverse feeds; +use dataverse feeds; + +create type TwitterUser as closed{ + screen_name: string, + lang: string, + friends_count: int32, + statuses_count: int32 + }; + +create type Tweet as open +{ + id: int64, + user: TwitterUser +} + +create dataset Tweets (Tweet) +primary key id; + +create feed TwitterFeed using push_twitter( +("type-name"="Tweet"), +("format"="twitter-status"),// +("consumer.key"="************"), +("access.token"="************"), +("access.token.secret"="************")); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/revised-tweet-parser/revised-tweet-parser.2.update.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/revised-tweet-parser/revised-tweet-parser.2.update.aql new file mode 100644 index 0000000..66ede3f --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/revised-tweet-parser/revised-tweet-parser.2.update.aql @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +use dataverse feeds; +connect feed TwitterFeed to dataset Tweets; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml index 6b63ff0..e5710bc 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml @@ -150,6 +150,13 @@ </compilation-unit> </test-case> <test-case FilePath="feeds"> + <compilation-unit name="revised-tweet-parser"> + <output-dir compare="Text">revised-tweet-parser</output-dir> + <expected-error>One or more parameters are missing from adapter configuration</expected-error> + <expected-error>Unknown source feed</expected-error> + </compilation-unit> + </test-case> + <test-case FilePath="feeds"> <compilation-unit name="feed-with-external-parser"> <output-dir compare="Text">feed-with-external-parser</output-dir> </compilation-unit> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java index e31325a..5a7b4b9 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java @@ -27,14 +27,14 @@ import org.apache.asterix.external.input.record.GenericRecord; import org.apache.asterix.external.util.FeedLogManager; import org.apache.hyracks.api.exceptions.HyracksDataException; - import twitter4j.Query; import twitter4j.QueryResult; import twitter4j.Status; import twitter4j.Twitter; import twitter4j.TwitterException; +import twitter4j.TwitterObjectFactory; -public class TwitterPullRecordReader implements IRecordReader<Status> { +public class TwitterPullRecordReader implements IRecordReader<String> { private Query query; private Twitter twitter; @@ -42,18 +42,19 @@ private QueryResult result; private int nextTweetIndex = 0; private long lastTweetIdReceived = 0; - private GenericRecord<Status> record; + private GenericRecord<String> record; public TwitterPullRecordReader(Twitter twitter, String keywords, int requestInterval) { this.twitter = twitter; this.requestInterval = requestInterval; this.query = new Query(keywords); this.query.setCount(100); - this.record = new GenericRecord<Status>(); + this.record = new GenericRecord<>(); } @Override public void close() throws IOException { + // do nothing } @Override @@ -62,7 +63,7 @@ } @Override - public IRawRecord<Status> next() throws IOException, InterruptedException { + public IRawRecord<String> next() throws IOException, InterruptedException { if (result == null || nextTweetIndex >= result.getTweets().size()) { Thread.sleep(1000 * requestInterval); query.setSinceId(lastTweetIdReceived); @@ -79,7 +80,8 @@ if (lastTweetIdReceived < tweet.getId()) { lastTweetIdReceived = tweet.getId(); } - record.set(tweet); + String jsonTweet = TwitterObjectFactory.getRawJSON(tweet); // transform tweet obj to json + record.set(jsonTweet); return record; } else { return null; @@ -93,10 +95,12 @@ @Override public void setFeedLogManager(FeedLogManager feedLogManager) { + // do nothing } @Override public void setController(AbstractFeedDataFlowController controller) { + // do nothing } @Override diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java index f04cdb9..9ead8a9 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java @@ -26,31 +26,31 @@ import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; import org.apache.asterix.external.input.record.GenericRecord; import org.apache.asterix.external.util.FeedLogManager; - import twitter4j.FilterQuery; import twitter4j.StallWarning; import twitter4j.Status; import twitter4j.StatusDeletionNotice; import twitter4j.StatusListener; +import twitter4j.TwitterObjectFactory; import twitter4j.TwitterStream; -public class TwitterPushRecordReader implements IRecordReader<Status> { - private LinkedBlockingQueue<Status> inputQ; +public class TwitterPushRecordReader implements IRecordReader<String> { + private LinkedBlockingQueue<String> inputQ; private TwitterStream twitterStream; - private GenericRecord<Status> record; + private GenericRecord<String> record; private boolean closed = false; public TwitterPushRecordReader(TwitterStream twitterStream, FilterQuery query) { - record = new GenericRecord<Status>(); - inputQ = new LinkedBlockingQueue<Status>(); + record = new GenericRecord<>(); + inputQ = new LinkedBlockingQueue<>(); this.twitterStream = twitterStream;//TwitterUtil.getTwitterStream(configuration); this.twitterStream.addListener(new TweetListener(inputQ)); this.twitterStream.filter(query); } public TwitterPushRecordReader(TwitterStream twitterStream) { - record = new GenericRecord<Status>(); - inputQ = new LinkedBlockingQueue<Status>(); + record = new GenericRecord<>(); + inputQ = new LinkedBlockingQueue<>(); this.twitterStream = twitterStream;// this.twitterStream.addListener(new TweetListener(inputQ)); twitterStream.sample(); @@ -72,8 +72,8 @@ } @Override - public IRawRecord<Status> next() throws IOException, InterruptedException { - Status tweet = inputQ.poll(); + public IRawRecord<String> next() throws IOException, InterruptedException { + String tweet = inputQ.poll(); if (tweet == null) { return null; } @@ -93,45 +93,52 @@ private class TweetListener implements StatusListener { - private LinkedBlockingQueue<Status> inputQ; + private LinkedBlockingQueue<String> inputQ; - public TweetListener(LinkedBlockingQueue<Status> inputQ) { + public TweetListener(LinkedBlockingQueue<String> inputQ) { this.inputQ = inputQ; } @Override public void onStatus(Status tweet) { - inputQ.add(tweet); + String jsonTweet = TwitterObjectFactory.getRawJSON(tweet); + inputQ.add(jsonTweet); } @Override public void onException(Exception arg0) { - + // do nothing } @Override public void onDeletionNotice(StatusDeletionNotice arg0) { + // do nothing } @Override public void onScrubGeo(long arg0, long arg1) { + // do nothing } @Override public void onStallWarning(StallWarning arg0) { + // do nothing } @Override public void onTrackLimitationNotice(int arg0) { + // do nothing } } @Override public void setFeedLogManager(FeedLogManager feedLogManager) { + // do nothing } @Override public void setController(AbstractFeedDataFlowController controller) { + // do nothing } @Override diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java index 541737a..172b22b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java @@ -37,7 +37,7 @@ import twitter4j.FilterQuery; import twitter4j.Status; -public class TwitterRecordReaderFactory implements IRecordReaderFactory<Status> { +public class TwitterRecordReaderFactory implements IRecordReaderFactory<String> { private static final long serialVersionUID = 1L; private static final Logger LOGGER = Logger.getLogger(TwitterRecordReaderFactory.class.getName()); @@ -114,7 +114,7 @@ } @Override - public IRecordReader<? extends Status> createRecordReader(IHyracksTaskContext ctx, int partition) + public IRecordReader<? extends String> createRecordReader(IHyracksTaskContext ctx, int partition) throws HyracksDataException { if (pull) { return new TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration), @@ -133,8 +133,8 @@ } @Override - public Class<? extends Status> getRecordClass() { - return Status.class; + public Class<? extends String> getRecordClass() { + return String.class; } private boolean validateConfiguration(Map<String, String> configuration) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java index 5923354..ab908bf 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java @@ -18,12 +18,6 @@ */ package org.apache.asterix.external.library.java; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.IOException; -import java.util.LinkedHashMap; -import java.util.List; - import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer; import org.apache.asterix.dataflow.data.nontagged.serde.ACircleSerializerDeserializer; @@ -83,13 +77,19 @@ import org.apache.asterix.om.pointables.base.IVisitablePointable; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.ATypeTag; -import org.apache.asterix.om.types.AbstractCollectionType; import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.EnumDeserializer; import org.apache.asterix.om.types.IAType; +import org.apache.asterix.om.types.TypeTagUtil; import org.apache.asterix.om.util.container.IObjectPool; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.util.string.UTF8StringReader; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.List; public class JObjectAccessors { @@ -465,15 +465,18 @@ List<IVisitablePointable> fieldTypeTags = recordPointable.getFieldTypeTags(); List<IVisitablePointable> fieldNames = recordPointable.getFieldNames(); int index = 0; - boolean closedPart = true; + boolean closedPart; try { IJObject fieldObject = null; for (IVisitablePointable fieldPointable : fieldPointables) { closedPart = index < recordType.getFieldTypes().length; IVisitablePointable tt = fieldTypeTags.get(index); - IAType fieldType = closedPart ? recordType.getFieldTypes()[index] : null; ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER .deserialize(tt.getByteArray()[tt.getStartOffset()]); + IAType fieldType; + fieldType = closedPart ? + recordType.getFieldTypes()[index] : + TypeTagUtil.getBuiltinTypeByTag(typeTag); IVisitablePointable fieldName = fieldNames.get(index); typeInfo.reset(fieldType, typeTag); switch (typeTag) { @@ -486,8 +489,8 @@ // value is null fieldObject = null; } else { - fieldObject = pointableVisitor.visit((AListVisitablePointable) fieldPointable, - typeInfo); + fieldObject = pointableVisitor + .visit((AListVisitablePointable) fieldPointable, typeInfo); } break; case ANY: @@ -536,15 +539,16 @@ List<IVisitablePointable> items = pointable.getItems(); List<IVisitablePointable> itemTags = pointable.getItemTags(); JList list = pointable.ordered() ? new JOrderedList(listType) : new JUnorderedList(listType); - IJObject listItem = null; + IJObject listItem; int index = 0; try { - for (IVisitablePointable itemPointable : items) { IVisitablePointable itemTagPointable = itemTags.get(index); ATypeTag itemTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER .deserialize(itemTagPointable.getByteArray()[itemTagPointable.getStartOffset()]); - typeInfo.reset(listType.getType(), listType.getTypeTag()); + IAType fieldType; + fieldType = TypeTagUtil.getBuiltinTypeByTag(itemTypeTag); + typeInfo.reset(fieldType, itemTypeTag); switch (itemTypeTag) { case RECORD: listItem = pointableVisitor.visit((ARecordVisitablePointable) itemPointable, typeInfo); @@ -557,10 +561,7 @@ throw new IllegalArgumentException( "Cannot parse list item of type " + listType.getTypeTag()); default: - IAType itemType = ((AbstractCollectionType) listType).getItemType(); - typeInfo.reset(itemType, itemType.getTypeTag()); listItem = pointableVisitor.visit((AFlatValuePointable) itemPointable, typeInfo); - } list.add(listItem); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java index 522da06..8d483dc 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java @@ -18,111 +18,236 @@ */ package org.apache.asterix.external.parser; -import java.io.DataOutput; -import java.util.HashMap; -import java.util.Map; - -import org.apache.asterix.builders.RecordBuilder; -import org.apache.asterix.external.api.IDataParser; +import org.apache.asterix.builders.AbvsBuilderFactory; +import org.apache.asterix.builders.IARecordBuilder; +import org.apache.asterix.builders.IAsterixListBuilder; +import org.apache.asterix.builders.ListBuilderFactory; +import org.apache.asterix.builders.RecordBuilderFactory; +import org.apache.asterix.builders.UnorderedListBuilder; import org.apache.asterix.external.api.IRawRecord; import org.apache.asterix.external.api.IRecordDataParser; -import org.apache.asterix.external.library.java.JObjectUtil; -import org.apache.asterix.external.util.Datatypes.Tweet; -import org.apache.asterix.om.base.AMutableDouble; -import org.apache.asterix.om.base.AMutableInt32; -import org.apache.asterix.om.base.AMutableRecord; -import org.apache.asterix.om.base.AMutableString; -import org.apache.asterix.om.base.IAObject; +import org.apache.asterix.om.base.AMutablePoint; +import org.apache.asterix.om.base.ANull; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.IAType; +import org.apache.asterix.om.util.container.IObjectPool; +import org.apache.asterix.om.util.container.ListObjectPool; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IMutableValueStorage; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.util.string.UTF8StringWriter; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; -import twitter4j.Status; -import twitter4j.User; +import java.io.DataOutput; +import java.io.IOException; -public class TweetParser implements IRecordDataParser<Status> { - - private IAObject[] mutableTweetFields; - private IAObject[] mutableUserFields; - private AMutableRecord mutableRecord; - private AMutableRecord mutableUser; - private final Map<String, Integer> userFieldNameMap = new HashMap<>(); - private final Map<String, Integer> tweetFieldNameMap = new HashMap<>(); - private RecordBuilder recordBuilder = new RecordBuilder(); +public class TweetParser extends AbstractDataParser implements IRecordDataParser<String> { + private final IObjectPool<IARecordBuilder, ATypeTag> recordBuilderPool = new ListObjectPool<>( + new RecordBuilderFactory()); + private final IObjectPool<IAsterixListBuilder, ATypeTag> listBuilderPool = new ListObjectPool<>( + new ListBuilderFactory()); + private final IObjectPool<IMutableValueStorage, ATypeTag> abvsBuilderPool = new ListObjectPool<>( + new AbvsBuilderFactory()); + private ARecordType recordType; + private UTF8StringWriter utf8Writer = new UTF8StringWriter(); public TweetParser(ARecordType recordType) { - initFieldNames(recordType); - mutableUserFields = new IAObject[] { new AMutableString(null), new AMutableString(null), new AMutableInt32(0), - new AMutableInt32(0), new AMutableString(null), new AMutableInt32(0) }; - mutableUser = new AMutableRecord((ARecordType) recordType.getFieldTypes()[tweetFieldNameMap.get(Tweet.USER)], - mutableUserFields); - - mutableTweetFields = new IAObject[] { new AMutableString(null), mutableUser, new AMutableDouble(0), - new AMutableDouble(0), new AMutableString(null), new AMutableString(null) }; - mutableRecord = new AMutableRecord(recordType, mutableTweetFields); + this.recordType = recordType; + aPoint = new AMutablePoint(0, 0); } - // Initialize the hashmap values for the field names and positions - private void initFieldNames(ARecordType recordType) { - String tweetFields[] = recordType.getFieldNames(); - for (int i = 0; i < tweetFields.length; i++) { - tweetFieldNameMap.put(tweetFields[i], i); - if (tweetFields[i].equals(Tweet.USER)) { - IAType fieldType = recordType.getFieldTypes()[i]; - if (fieldType.getTypeTag() == ATypeTag.RECORD) { - String userFields[] = ((ARecordType) fieldType).getFieldNames(); - for (int j = 0; j < userFields.length; j++) { - userFieldNameMap.put(userFields[j], j); - } - } + private void parseUnorderedList(JSONArray jArray, DataOutput output) throws IOException, JSONException { + ArrayBackedValueStorage itemBuffer = getTempBuffer(); + UnorderedListBuilder unorderedListBuilder = (UnorderedListBuilder) getUnorderedListBuilder(); + unorderedListBuilder.reset(null); + for (int iter1 = 0; iter1 < jArray.length(); iter1++) { + itemBuffer.reset(); + if (writeField(jArray.get(iter1), null, itemBuffer.getDataOutput())) { + unorderedListBuilder.addItem(itemBuffer); } } + unorderedListBuilder.write(output, true); + } + + private boolean writeField(Object fieldObj, IAType fieldType, DataOutput out) throws IOException, JSONException { + boolean writeResult = true; + if (fieldType != null) { + switch (fieldType.getTypeTag()) { + case STRING: + out.write(BuiltinType.ASTRING.getTypeTag().serialize()); + utf8Writer.writeUTF8(fieldObj.toString(), out); + break; + case INT64: + aInt64.setValue((long) fieldObj); + int64Serde.serialize(aInt64, out); + break; + case INT32: + out.write(BuiltinType.AINT32.getTypeTag().serialize()); + out.writeInt((Integer) fieldObj); + break; + case DOUBLE: + out.write(BuiltinType.ADOUBLE.getTypeTag().serialize()); + out.writeDouble((Double) fieldObj); + break; + case BOOLEAN: + out.write(BuiltinType.ABOOLEAN.getTypeTag().serialize()); + out.writeBoolean((Boolean) fieldObj); + break; + case RECORD: + writeRecord((JSONObject) fieldObj, out, (ARecordType) fieldType); + break; + default: + writeResult = false; + } + } else { + if (fieldObj == JSONObject.NULL) { + nullSerde.serialize(ANull.NULL, out); + } else if (fieldObj instanceof Integer) { + out.write(BuiltinType.AINT32.getTypeTag().serialize()); + out.writeInt((Integer) fieldObj); + } else if (fieldObj instanceof Boolean) { + out.write(BuiltinType.ABOOLEAN.getTypeTag().serialize()); + out.writeBoolean((Boolean) fieldObj); + } else if (fieldObj instanceof Double) { + out.write(BuiltinType.ADOUBLE.getTypeTag().serialize()); + out.writeDouble((Double) fieldObj); + } else if (fieldObj instanceof Long) { + out.write(BuiltinType.AINT64.getTypeTag().serialize()); + out.writeLong((Long) fieldObj); + } else if (fieldObj instanceof String) { + out.write(BuiltinType.ASTRING.getTypeTag().serialize()); + utf8Writer.writeUTF8((String) fieldObj, out); + } else if (fieldObj instanceof JSONArray) { + if (((JSONArray) fieldObj).length() != 0) { + parseUnorderedList((JSONArray) fieldObj, out); + } else { + writeResult = false; + } + } else if (fieldObj instanceof JSONObject) { + if (((JSONObject) fieldObj).length() != 0) { + writeRecord((JSONObject) fieldObj, out, null); + } else { + writeResult = false; + } + } + } + return writeResult; + } + + private int checkAttrNameIdx(String[] nameList, String name) { + int idx = 0; + if (nameList != null) { + for (String nln : nameList) { + if (name.equals(nln)) { + return idx; + } + idx++; + } + } + return -1; + } + + public void writeRecord(JSONObject obj, DataOutput out, ARecordType curRecType) throws IOException, JSONException { + IAType[] curTypes = null; + String[] curFNames = null; + int fieldN; + int attrIdx; + + ArrayBackedValueStorage fieldValueBuffer = getTempBuffer(); + ArrayBackedValueStorage fieldNameBuffer = getTempBuffer(); + IARecordBuilder recBuilder = getRecordBuilder(); + + if (curRecType != null) { + curTypes = curRecType.getFieldTypes(); + curFNames = curRecType.getFieldNames(); + } + + recBuilder.reset(curRecType); + recBuilder.init(); + + if (curRecType != null && !curRecType.isOpen()) { + // closed record type + fieldN = curFNames.length; + for (int iter1 = 0; iter1 < fieldN; iter1++) { + fieldValueBuffer.reset(); + DataOutput fieldOutput = fieldValueBuffer.getDataOutput(); + if (obj.isNull(curFNames[iter1])) { + if (curRecType.isClosedField(curFNames[iter1])) { + throw new HyracksDataException("Closed field " + curFNames[iter1] + " has null value."); + } else { + continue; + } + } else { + if (writeField(obj.get(curFNames[iter1]), curTypes[iter1], fieldOutput)) { + recBuilder.addField(iter1, fieldValueBuffer); + } + } + } + } else { + //open record type + int closedFieldCount = 0; + IAType curFieldType = null; + for (String attrName : JSONObject.getNames(obj)) { + if (obj.isNull(attrName) || obj.length() == 0) { + continue; + } + attrIdx = checkAttrNameIdx(curFNames, attrName); + if (curRecType != null) { + curFieldType = curRecType.getFieldType(attrName); + } + fieldValueBuffer.reset(); + fieldNameBuffer.reset(); + DataOutput fieldOutput = fieldValueBuffer.getDataOutput(); + if (writeField(obj.get(attrName), curFieldType, fieldOutput)) { + if (attrIdx == -1) { + aString.setValue(attrName); + stringSerde.serialize(aString, fieldNameBuffer.getDataOutput()); + recBuilder.addField(fieldNameBuffer, fieldValueBuffer); + } else { + recBuilder.addField(attrIdx, fieldValueBuffer); + closedFieldCount++; + } + } + } + if (curRecType != null && closedFieldCount < curFNames.length) { + throw new HyracksDataException("Non-null field is null"); + } + } + recBuilder.write(out, true); + } + + private IARecordBuilder getRecordBuilder() { + return recordBuilderPool.allocate(ATypeTag.RECORD); + } + + private IAsterixListBuilder getUnorderedListBuilder() { + return listBuilderPool.allocate(ATypeTag.UNORDEREDLIST); + } + + private ArrayBackedValueStorage getTempBuffer() { + return (ArrayBackedValueStorage) abvsBuilderPool.allocate(ATypeTag.BINARY); } @Override - public void parse(IRawRecord<? extends Status> record, DataOutput out) throws HyracksDataException { - Status tweet = record.get(); - User user = tweet.getUser(); - // Tweet user data - ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.SCREEN_NAME)]) - .setValue(JObjectUtil.getNormalizedString(user.getScreenName())); - ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.LANGUAGE)]) - .setValue(JObjectUtil.getNormalizedString(user.getLang())); - ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.FRIENDS_COUNT)]).setValue(user.getFriendsCount()); - ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.STATUS_COUNT)]).setValue(user.getStatusesCount()); - ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.NAME)]) - .setValue(JObjectUtil.getNormalizedString(user.getName())); - ((AMutableInt32) mutableUserFields[userFieldNameMap.get(Tweet.FOLLOWERS_COUNT)]) - .setValue(user.getFollowersCount()); - - // Tweet data - ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.ID)]).setValue(String.valueOf(tweet.getId())); - - int userPos = tweetFieldNameMap.get(Tweet.USER); - for (int i = 0; i < mutableUserFields.length; i++) { - ((AMutableRecord) mutableTweetFields[userPos]).setValueAtPos(i, mutableUserFields[i]); + public void parse(IRawRecord<? extends String> record, DataOutput out) throws HyracksDataException { + try { + //TODO get rid of this temporary json + resetPools(); + JSONObject jsObj = new JSONObject(record.get()); + writeRecord(jsObj, out, recordType); + } catch (JSONException | IOException e) { + throw new HyracksDataException(e); } - if (tweet.getGeoLocation() != null) { - ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LATITUDE)]) - .setValue(tweet.getGeoLocation().getLatitude()); - ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LONGITUDE)]) - .setValue(tweet.getGeoLocation().getLongitude()); - } else { - ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LATITUDE)]).setValue(0); - ((AMutableDouble) mutableTweetFields[tweetFieldNameMap.get(Tweet.LONGITUDE)]).setValue(0); - } - ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.CREATED_AT)]) - .setValue(JObjectUtil.getNormalizedString(tweet.getCreatedAt().toString())); - ((AMutableString) mutableTweetFields[tweetFieldNameMap.get(Tweet.MESSAGE)]) - .setValue(JObjectUtil.getNormalizedString(tweet.getText())); + } - for (int i = 0; i < mutableTweetFields.length; i++) { - mutableRecord.setValueAtPos(i, mutableTweetFields[i]); - } - recordBuilder.reset(mutableRecord.getType()); - recordBuilder.init(); - IDataParser.writeRecord(mutableRecord, out, recordBuilder); + private void resetPools() { + listBuilderPool.reset(); + recordBuilderPool.reset(); + abvsBuilderPool.reset(); } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java index d6e536d..3539f6e 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java @@ -28,7 +28,7 @@ import twitter4j.Status; -public class TweetParserFactory implements IRecordDataParserFactory<Status> { +public class TweetParserFactory implements IRecordDataParserFactory<String> { private static final long serialVersionUID = 1L; private ARecordType recordType; @@ -44,18 +44,19 @@ } @Override - public IRecordDataParser<Status> createRecordParser(IHyracksTaskContext ctx) { + public IRecordDataParser<String> createRecordParser(IHyracksTaskContext ctx) { TweetParser dataParser = new TweetParser(recordType); return dataParser; } @Override - public Class<? extends Status> getRecordClass() { - return Status.class; + public Class<? extends String> getRecordClass() { + return String.class; } @Override public void setMetaType(ARecordType metaType) { + // do nothing } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java index e1a7911..94d7b53 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.external.util; +import org.apache.asterix.external.parser.TweetParser; + public class Datatypes { /* @@ -42,21 +44,109 @@ public static class Tweet { public static final String ID = "id"; public static final String USER = "user"; + public static final String GEOLOCATION = "geo"; + public static final String CREATED_AT = "created_at"; + public static final String TEXT = "text"; + public static final String COUNTRY = "country"; + public static final String PLACE = "place"; + public static final String SOURCE = "source"; + public static final String TRUNCATED = "truncated"; + public static final String IN_REPLY_TO_STATUS_ID = "in_reply_to_status_id"; + public static final String IN_REPLY_TO_USER_ID = "in_reply_to_user_id"; + public static final String IN_REPLY_TO_SCREENNAME = "in_reply_to_screen_name"; + public static final String FAVORITED = "favorited"; + public static final String RETWEETED = "retweeted"; + public static final String FAVORITE_COUNT = "favorite_count"; + public static final String RETWEET_COUNT = "retweet_count"; + public static final String CONTRIBUTORS = "contributors"; + public static final String LANGUAGE = "lang"; + public static final String FILTER_LEVEL = "filter_level"; + public static final String TIMESTAMP_MS = "timestamp_ms"; + public static final String IS_QUOTE_STATUS = "is_quote_status"; + // in API but not int JSON + public static final String SENSITIVE = "sensitive"; + public static final String RETWEETED_BY_ME = "retweeted_by_me"; + public static final String CURRENT_USER_RETWEET_ID = "current_user_retweet_id"; + + // consistency consider + public static final String MESSAGE = "message_text"; public static final String LATITUDE = "latitude"; public static final String LONGITUDE = "longitude"; - public static final String CREATED_AT = "created_at"; - public static final String MESSAGE = "message_text"; - - public static final String COUNTRY = "country"; - // User fields (for the sub record "user") public static final String SCREEN_NAME = "screen_name"; - public static final String LANGUAGE = "language"; + public static final String USER_PREFERRED_LANGUAGE = "user_preferred_language"; public static final String FRIENDS_COUNT = "friends_count"; public static final String STATUS_COUNT = "status_count"; public static final String NAME = "name"; public static final String FOLLOWERS_COUNT = "followers_count"; + } + + public static final class Tweet_Place { + public static final String ID = "id"; + public static final String URL = "url"; + public static final String PLACE_TYPE = "place_type"; + public static final String NAME = "name"; + public static final String FULL_NAME = "full_name"; + public static final String COUNTRY_CODE = "country_code"; + public static final String COUNTRY = "country"; + public static final String BOUNDING_BOX = "bounding_box"; + public static final String ATTRIBUTES = "attributes"; + + private Tweet_Place() { + } + } + + public static final class Tweet_User { + public static final String ID = "id"; + public static final String NAME = "name"; + public static final String SCREEN_NAME = "screen_name"; + public static final String LOCATION = "location"; + public static final String DESCRIPTION = "description"; + public static final String CONTRIBUTORS_ENABLED = "contributors_enabled"; + public static final String PROFILE_IMAGE_URL = "profile_image_url"; + public static final String PROFILE_IMAGE_URL_HTTPS = "profile_image_url_https"; + public static final String URL = "url"; + public static final String PROTECTED = "protected"; + public static final String FOLLOWERS_COUNT = "followers_count"; + public static final String PROFILE_BACKGROUND_COLOR = "profile_background_color"; + public static final String PROFILE_TEXT_COLOR = "profile_text_color"; + public static final String PROFILE_LINK_COLOR = "profile_link_color"; + public static final String PROFILE_SIDEBAR_FILL_COLOR = "profile_sidebar_fill_color"; + public static final String PROFILE_SIDEBAR_BORDER_COLOR = "profile_sidebar_border_color"; + public static final String PROFILE_USE_BACKGROUND_IMAGE = "profile_use_background_image"; + public static final String DEFAULT_PROFILE = "default_profile"; + public static final String DEFAULT_PROFILE_IMAGE = "default_profile_image"; + public static final String FRIENDS_COUNT = "friends_count"; + public static final String CREATED_AT = "CREATED_AT"; + public static final String FAVOURITES_COUNT = "favourites_count"; + public static final String UTC_OFFSET = "utc_offset"; + public static final String TIME_ZONE = "time_zone"; + public static final String PROFILE_BACKGROUND_IMAGE_URL = "profile_background_image_url"; + public static final String PROFILE_BACKGROUND_IMAGE_URL_HTTPS = "profile_background_image_url_https"; + public static final String PROFILE_BANNER_URL = "profile_banner_url"; + public static final String LANG = "lang"; + public static final String STATUSES_COUNT = "statuses_count"; + public static final String GEO_ENABLED = "geo_enabled"; + public static final String VERIFIED = "verified"; + public static final String IS_TRANSLATOR = "is_translator"; + public static final String LISTED_COUNT = "listed_count"; + public static final String FOLLOW_REQUEST_SENT = "follow_request_sent"; + // skip Entities, attrs in API but not in JSON is as below + public static final String WITHHELD_IN_COUNTRIES = "withheld_in_countries"; + public static final String BIGGER_PROFILE_IMAGE_URL = "bigger_profile_image_url"; + public static final String MINI_PROFILE_IMAGE_URL = "mini_profile_image_url"; + public static final String ORIGINAL_PROFILE_IMAGE_URL = "original_profile_image_url"; + public static final String SHOW_ALL_INLINE_MEDIA = "show_all_inline_media"; + public static final String PROFILE_BANNER_RETINA_URL = "profile_banner_retina_url"; + public static final String PROFILE_BANNER_IPAD_URL = "profile_banner_ipad_url"; + public static final String PROFILE_BANNER_IPAD_RETINA_URL = "profile_banner_ipad_retina_url"; + public static final String PROFILE_BANNER_MOBILE_URL = "profile_banner_mobile_url"; + public static final String PROFILE_BANNER_MOBILE_RETINA_URL = "profile_banner_mobile_retina_url"; + public static final String PROFILE_BACKGROUND_TILED = "profile_background_tiled"; + + private Tweet_User() { + } } /* @@ -76,5 +166,4 @@ public static final String LOCATION = "location"; public static final String TOPICS = "topics"; } - } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java index e251f32..b666487 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java @@ -170,7 +170,7 @@ public static final String ALIAS_SOCKET_ADAPTER = "socket_adapter"; public static final String ALIAS_TWITTER_FIREHOSE_ADAPTER = "twitter_firehose"; public static final String ALIAS_SOCKET_CLIENT_ADAPTER = "socket_client"; - public static final String ALIAS_RSS_ADAPTER = "rss_feed"; + public static final String ALIAS_RSS_ADAPTER = "rss"; public static final String ALIAS_FILE_FEED_ADAPTER = "file_feed"; public static final String ALIAS_TWITTER_PUSH_ADAPTER = "push_twitter"; public static final String ALIAS_TWITTER_PULL_ADAPTER = "pull_twitter"; @@ -223,7 +223,7 @@ public static final String KEY_STREAM_SOURCE = "stream-source"; public static final String EXTERNAL = "external"; public static final String KEY_READER_FACTORY = "reader-factory"; - public static final String READER_RSS = "rss"; + public static final String READER_RSS = "rss_feed"; public static final String FORMAT_CSV = "csv"; public static final String ERROR_PARSE_RECORD = "Parser failed to parse record"; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java index 4fb602b..70d31c0 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java @@ -46,12 +46,12 @@ public static final String LOCATION_EU = "Europe"; public static final String LANGUAGES = "languages"; // languages to track public static final String TRACK = "keywords"; // terms to track - public static final String FILTER_LEVEL = "filter-level"; + public static final String FILTER_LEVEL = "filter-level"; } public static class GeoConstants { - public static final double[][] US = new double[][] { { -124.848974, 24.396308 }, { -66.885444, 49.384358 } }; - public static final double[][] EU = new double[][]{{-29.7,36.7},{79.2,72.0}}; + private static final double[][] US = new double[][] { { -124.848974, 24.396308 }, { -66.885444, 49.384358 } }; + private static final double[][] EU = new double[][] { { -29.7, 36.7 }, { 79.2, 72.0 } }; public static Map<String, double[][]> boundingBoxes = initializeBoundingBoxes(); } @@ -83,24 +83,25 @@ if (m.matches()) { String[] locationStrings = locationValue.trim().split(";\\s*"); - locations = new double[locationStrings.length*2][2]; + locations = new double[locationStrings.length * 2][2]; - for(int i=0; i<locationStrings.length; i++) { + for (int i = 0; i < locationStrings.length; i++) { if (locationStrings[i].contains(",")) { String[] coordinatesString = locationStrings[i].split(","); - for(int k=0; k < 2; k++) { + for (int k = 0; k < 2; k++) { for (int l = 0; l < 2; l++) { try { locations[2 * i + k][l] = Double.parseDouble(coordinatesString[2 * k + l]); } catch (NumberFormatException ne) { - throw new AsterixException("Incorrect coordinate value " + coordinatesString[2 * k + l]); + throw new AsterixException( + "Incorrect coordinate value " + coordinatesString[2 * k + l]); } } } } else if (GeoConstants.boundingBoxes.containsKey(locationStrings[i])) { // Only add known locations double loc[][] = GeoConstants.boundingBoxes.get(locationStrings[i]); - for(int k=0; k < 2; k++) { + for (int k = 0; k < 2; k++) { for (int l = 0; l < 2; l++) { locations[2 * i + k][l] = loc[k][l]; } @@ -132,7 +133,7 @@ if (trackValue.contains(",")) { keywords = trackValue.trim().split(",\\s*"); } else { - keywords = new String[] {trackValue}; + keywords = new String[] { trackValue }; } filterQuery = filterQuery.track(keywords); } @@ -146,7 +147,7 @@ if (langValue.contains(",")) { languages = langValue.trim().split(",\\s*"); } else { - languages = new String[] {langValue}; + languages = new String[] { langValue }; } filterQuery = filterQuery.language(languages); } @@ -163,9 +164,9 @@ } // Filtering level: none, low or medium (defaul=none) - if(filterQuery != null) { + if (filterQuery != null) { String filterValue = configuration.get(ConfigurationConstants.FILTER_LEVEL); - if (filterValue!=null) { + if (filterValue != null) { filterQuery = filterQuery.filterLevel(filterValue); } } @@ -188,8 +189,11 @@ builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN + "\n"); builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET + "\n"); LOGGER.warning(builder.toString()); - LOGGER.warning("Unable to configure Twitter adapter due to incomplete/incorrect authentication credentials"); - LOGGER.warning("For details on how to obtain OAuth authentication token, visit https://dev.twitter.com/oauth/overview/application-owner-access-tokens"); + LOGGER.warning( + "Unable to configure Twitter adapter due to incomplete/incorrect authentication credentials"); + LOGGER.warning( + "For details on how to obtain OAuth authentication token, visit https://dev.twitter.com/oauth" + + "/overview/application-owner-access-tokens"); } } Twitter twitter = tf.getInstance(); @@ -205,6 +209,7 @@ private static ConfigurationBuilder getAuthConfiguration(Map<String, String> configuration) { ConfigurationBuilder cb = new ConfigurationBuilder(); cb.setDebugEnabled(true); + cb.setJSONStoreEnabled(true); String oAuthConsumerKey = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_KEY); String oAuthConsumerSecret = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_SECRET); String oAuthAccessToken = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN); diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java index e16633e..a5af127 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.om.types; +import org.apache.asterix.om.base.AOrderedList; import org.apache.asterix.om.base.IAObject; import org.json.JSONException; import org.json.JSONObject; @@ -26,6 +27,8 @@ private static final long serialVersionUID = 1L; + public static final AOrderedListType FULL_OPEN_ORDEREDLIST_TYPE = new AOrderedListType(null,""); + /** * @param itemType * if null, the list is untyped diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java index 80b13b5..febc6ad 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.om.types; +import org.apache.asterix.om.base.AUnorderedList; import org.apache.asterix.om.base.IAObject; import org.json.JSONException; import org.json.JSONObject; @@ -26,6 +27,8 @@ private static final long serialVersionUID = 1L; + public static final AUnorderedListType FULLY_OPEN_UNORDEREDLIST_TYPE = new AUnorderedListType(null,""); + /** * @param itemType * if null, the collection is untyped diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/TypeTagUtil.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/TypeTagUtil.java index 2a50506..b6c5712 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/TypeTagUtil.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/TypeTagUtil.java @@ -80,6 +80,12 @@ return BuiltinType.ADAYTIMEDURATION; case UUID: return BuiltinType.AUUID; + case RECORD: + return ARecordType.FULLY_OPEN_RECORD_TYPE; + case UNORDEREDLIST: + return AUnorderedListType.FULLY_OPEN_UNORDEREDLIST_TYPE; + case ORDEREDLIST: + return AOrderedListType.FULL_OPEN_ORDEREDLIST_TYPE; default: throw new AsterixException("Typetag " + typeTag + " is not a built-in type"); } -- To view, visit https://asterix-gerrit.ics.uci.edu/1002 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I7021e7b779de05b9ec999a8d5f8464fb0ab413c0 Gerrit-PatchSet: 15 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Xikui Wang <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Xikui Wang <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
