Xikui Wang has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1002
Change subject: This patch includes following changes:
......................................................................
This patch includes following changes:
1. ExtendedTweetParser to parse more than fix attributes.
2. Changed the twitter feeds message unit from Status to String.
3. Fixed bug ASTERIXDB-1471.
4. Fixed bug ASTERIXDB-1352.
Change-Id: I7021e7b779de05b9ec999a8d5f8464fb0ab413c0
---
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
M
asterixdb/asterix-maven-plugins/lexer-generator-maven-plugin/src/main/resources/Lexer.java
M
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java
M
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java
12 files changed, 408 insertions(+), 149 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/02/1002/1
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
index e31325a..80e716d 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
@@ -28,13 +28,9 @@
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import twitter4j.Query;
-import twitter4j.QueryResult;
-import twitter4j.Status;
-import twitter4j.Twitter;
-import twitter4j.TwitterException;
+import twitter4j.*;
-public class TwitterPullRecordReader implements IRecordReader<Status> {
+public class TwitterPullRecordReader implements IRecordReader<String> {
private Query query;
private Twitter twitter;
@@ -42,14 +38,14 @@
private QueryResult result;
private int nextTweetIndex = 0;
private long lastTweetIdReceived = 0;
- private GenericRecord<Status> record;
+ private GenericRecord<String> record;
public TwitterPullRecordReader(Twitter twitter, String keywords, int
requestInterval) {
this.twitter = twitter;
this.requestInterval = requestInterval;
this.query = new Query(keywords);
this.query.setCount(100);
- this.record = new GenericRecord<Status>();
+ this.record = new GenericRecord<>();
}
@Override
@@ -62,7 +58,7 @@
}
@Override
- public IRawRecord<Status> next() throws IOException, InterruptedException {
+ public IRawRecord<String> next() throws IOException, InterruptedException {
if (result == null || nextTweetIndex >= result.getTweets().size()) {
Thread.sleep(1000 * requestInterval);
query.setSinceId(lastTweetIdReceived);
@@ -79,7 +75,8 @@
if (lastTweetIdReceived < tweet.getId()) {
lastTweetIdReceived = tweet.getId();
}
- record.set(tweet);
+ String jsonTweet = TwitterObjectFactory.getRawJSON(tweet); //
transform tweet obj to json
+ record.set(jsonTweet);
return record;
} else {
return null;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
index f04cdb9..02c3963 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
@@ -27,30 +27,25 @@
import org.apache.asterix.external.input.record.GenericRecord;
import org.apache.asterix.external.util.FeedLogManager;
-import twitter4j.FilterQuery;
-import twitter4j.StallWarning;
-import twitter4j.Status;
-import twitter4j.StatusDeletionNotice;
-import twitter4j.StatusListener;
-import twitter4j.TwitterStream;
+import twitter4j.*;
-public class TwitterPushRecordReader implements IRecordReader<Status> {
- private LinkedBlockingQueue<Status> inputQ;
+public class TwitterPushRecordReader implements IRecordReader<String> {
+ private LinkedBlockingQueue<String> inputQ;
private TwitterStream twitterStream;
- private GenericRecord<Status> record;
+ private GenericRecord<String> record;
private boolean closed = false;
public TwitterPushRecordReader(TwitterStream twitterStream, FilterQuery
query) {
- record = new GenericRecord<Status>();
- inputQ = new LinkedBlockingQueue<Status>();
+ record = new GenericRecord<>();
+ inputQ = new LinkedBlockingQueue<>();
this.twitterStream =
twitterStream;//TwitterUtil.getTwitterStream(configuration);
this.twitterStream.addListener(new TweetListener(inputQ));
this.twitterStream.filter(query);
}
public TwitterPushRecordReader(TwitterStream twitterStream) {
- record = new GenericRecord<Status>();
- inputQ = new LinkedBlockingQueue<Status>();
+ record = new GenericRecord<>();
+ inputQ = new LinkedBlockingQueue<>();
this.twitterStream = twitterStream;//
this.twitterStream.addListener(new TweetListener(inputQ));
twitterStream.sample();
@@ -72,8 +67,8 @@
}
@Override
- public IRawRecord<Status> next() throws IOException, InterruptedException {
- Status tweet = inputQ.poll();
+ public IRawRecord<String> next() throws IOException, InterruptedException {
+ String tweet = inputQ.poll();
if (tweet == null) {
return null;
}
@@ -93,15 +88,16 @@
private class TweetListener implements StatusListener {
- private LinkedBlockingQueue<Status> inputQ;
+ private LinkedBlockingQueue<String> inputQ;
- public TweetListener(LinkedBlockingQueue<Status> inputQ) {
+ public TweetListener(LinkedBlockingQueue<String> inputQ) {
this.inputQ = inputQ;
}
@Override
public void onStatus(Status tweet) {
- inputQ.add(tweet);
+ String jsonTweet = TwitterObjectFactory.getRawJSON(tweet);
+ inputQ.add(jsonTweet);
}
@Override
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 541737a..172b22b 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -37,7 +37,7 @@
import twitter4j.FilterQuery;
import twitter4j.Status;
-public class TwitterRecordReaderFactory implements
IRecordReaderFactory<Status> {
+public class TwitterRecordReaderFactory implements
IRecordReaderFactory<String> {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER =
Logger.getLogger(TwitterRecordReaderFactory.class.getName());
@@ -114,7 +114,7 @@
}
@Override
- public IRecordReader<? extends Status>
createRecordReader(IHyracksTaskContext ctx, int partition)
+ public IRecordReader<? extends String>
createRecordReader(IHyracksTaskContext ctx, int partition)
throws HyracksDataException {
if (pull) {
return new
TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration),
@@ -133,8 +133,8 @@
}
@Override
- public Class<? extends Status> getRecordClass() {
- return Status.class;
+ public Class<? extends String> getRecordClass() {
+ return String.class;
}
private boolean validateConfiguration(Map<String, String> configuration) {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
index 5923354..e686b97 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
@@ -70,23 +70,12 @@
import org.apache.asterix.external.library.java.JObjects.JString;
import org.apache.asterix.external.library.java.JObjects.JTime;
import org.apache.asterix.external.library.java.JObjects.JUnorderedList;
-import org.apache.asterix.om.base.ACircle;
-import org.apache.asterix.om.base.ADuration;
-import org.apache.asterix.om.base.ALine;
-import org.apache.asterix.om.base.APoint;
-import org.apache.asterix.om.base.APoint3D;
-import org.apache.asterix.om.base.APolygon;
-import org.apache.asterix.om.base.ARectangle;
+import org.apache.asterix.om.base.*;
import org.apache.asterix.om.pointables.AFlatValuePointable;
import org.apache.asterix.om.pointables.AListVisitablePointable;
import org.apache.asterix.om.pointables.ARecordVisitablePointable;
import org.apache.asterix.om.pointables.base.IVisitablePointable;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.AbstractCollectionType;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.om.types.EnumDeserializer;
-import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.*;
import org.apache.asterix.om.util.container.IObjectPool;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.util.string.UTF8StringReader;
@@ -471,9 +460,14 @@
for (IVisitablePointable fieldPointable : fieldPointables) {
closedPart = index < recordType.getFieldTypes().length;
IVisitablePointable tt = fieldTypeTags.get(index);
- IAType fieldType = closedPart ?
recordType.getFieldTypes()[index] : null;
ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
.deserialize(tt.getByteArray()[tt.getStartOffset()]);
+ IAType fieldType = null;
+ if(closedPart){
+ fieldType = recordType.getFieldTypes()[index];
+ }
+ else
+ fieldType = ATypeMachine(typeTag);
IVisitablePointable fieldName = fieldNames.get(index);
typeInfo.reset(fieldType, typeTag);
switch (typeTag) {
@@ -539,12 +533,13 @@
IJObject listItem = null;
int index = 0;
try {
-
for (IVisitablePointable itemPointable : items) {
IVisitablePointable itemTagPointable = itemTags.get(index);
ATypeTag itemTypeTag =
EnumDeserializer.ATYPETAGDESERIALIZER
.deserialize(itemTagPointable.getByteArray()[itemTagPointable.getStartOffset()]);
- typeInfo.reset(listType.getType(), listType.getTypeTag());
+ IAType fieldType = null;
+ fieldType = ATypeMachine(itemTypeTag);
+ typeInfo.reset(fieldType, itemTypeTag);
switch (itemTypeTag) {
case RECORD:
listItem =
pointableVisitor.visit((ARecordVisitablePointable) itemPointable, typeInfo);
@@ -557,10 +552,7 @@
throw new IllegalArgumentException(
"Cannot parse list item of type " +
listType.getTypeTag());
default:
- IAType itemType = ((AbstractCollectionType)
listType).getItemType();
- typeInfo.reset(itemType, itemType.getTypeTag());
listItem =
pointableVisitor.visit((AFlatValuePointable) itemPointable, typeInfo);
-
}
list.add(listItem);
}
@@ -580,4 +572,65 @@
}
}
+
+ public static IAType ATypeMachine(ATypeTag typeTag){
+ IAType aType = null;
+ switch (typeTag){
+ case BOOLEAN:
+ aType = BuiltinType.ABOOLEAN;
+ break;
+ case INT8:
+ aType = BuiltinType.AINT8;
+ break;
+ case INT16:
+ aType = BuiltinType.AINT16;
+ break;
+ case INT32:
+ aType = BuiltinType.AINT32;
+ break;
+ case INT64:
+ aType = BuiltinType.AINT64;
+ break;
+ case FLOAT:
+ aType = BuiltinType.AFLOAT;
+ break;
+ case DOUBLE:
+ aType = BuiltinType.ADOUBLE;
+ break;
+ case STRING:
+ aType = BuiltinType.ASTRING;
+ break;
+ case POINT:
+ aType = BuiltinType.APOINT;
+ break;
+ case POINT3D:
+ aType = BuiltinType.APOINT3D;
+ break;
+ case LINE:
+ aType = BuiltinType.ALINE;
+ break;
+ case DATE:
+ aType = BuiltinType.ADATE;
+ break;
+ case DATETIME:
+ aType = BuiltinType.ADATETIME;
+ break;
+ case DURATION:
+ aType = BuiltinType.ADURATION;
+ break;
+ case RECORD:
+ aType = ARecordType.FULLY_OPEN_RECORD_TYPE;
+ break;
+ case UNORDEREDLIST:
+ aType = AUnorderedListType.FULLY_OPEN_UNORDEREDLIST_TYPE;
+ break;
+ case ORDEREDLIST:
+ aType = AOrderedListType.FULL_OPEN_ORDEREDLIST_TYPE;
+ break;
+ default:
+ aType = BuiltinType.ANY;
+ break;
+ }
+ return aType;
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
index 522da06..fe3ec65 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
@@ -18,111 +18,234 @@
*/
package org.apache.asterix.external.parser;
-import java.io.DataOutput;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.asterix.builders.RecordBuilder;
-import org.apache.asterix.external.api.IDataParser;
+import org.apache.asterix.builders.*;
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordDataParser;
-import org.apache.asterix.external.library.java.JObjectUtil;
-import org.apache.asterix.external.util.Datatypes.Tweet;
-import org.apache.asterix.om.base.AMutableDouble;
-import org.apache.asterix.om.base.AMutableInt32;
-import org.apache.asterix.om.base.AMutableRecord;
+import org.apache.asterix.om.base.AMutablePoint;
import org.apache.asterix.om.base.AMutableString;
-import org.apache.asterix.om.base.IAObject;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.base.ANull;
+import org.apache.asterix.om.types.*;
+import org.apache.asterix.om.util.container.IObjectPool;
+import org.apache.asterix.om.util.container.ListObjectPool;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IMutableValueStorage;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.storage.am.common.ophelpers.LongArrayList;
+import org.apache.hyracks.util.string.UTF8StringWriter;
+import org.eclipse.jetty.util.ajax.JSON;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
-import twitter4j.Status;
-import twitter4j.User;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
-public class TweetParser implements IRecordDataParser<Status> {
+import static org.apache.asterix.om.types.ATypeTag.*;
- private IAObject[] mutableTweetFields;
- private IAObject[] mutableUserFields;
- private AMutableRecord mutableRecord;
- private AMutableRecord mutableUser;
- private final Map<String, Integer> userFieldNameMap = new HashMap<>();
- private final Map<String, Integer> tweetFieldNameMap = new HashMap<>();
- private RecordBuilder recordBuilder = new RecordBuilder();
+public class TweetParser extends AbstractDataParser implements
IRecordDataParser<String> {
+ private final IObjectPool<IARecordBuilder, ATypeTag> recordBuilderPool =
new ListObjectPool<IARecordBuilder, ATypeTag>(
+ new RecordBuilderFactory());
+ private final IObjectPool<IAsterixListBuilder, ATypeTag> listBuilderPool =
new ListObjectPool<IAsterixListBuilder, ATypeTag>(
+ new ListBuilderFactory());
+ private final IObjectPool<IMutableValueStorage, ATypeTag> abvsBuilderPool
= new ListObjectPool<IMutableValueStorage, ATypeTag>(
+ new AbvsBuilderFactory());
+ private ARecordType recordType;
+ private UTF8StringWriter utf8Writer = new UTF8StringWriter();
public TweetParser(ARecordType recordType) {
- initFieldNames(recordType);
- mutableUserFields = new IAObject[] { new AMutableString(null), new
AMutableString(null), new AMutableInt32(0),
- new AMutableInt32(0), new AMutableString(null), new
AMutableInt32(0) };
- mutableUser = new AMutableRecord((ARecordType)
recordType.getFieldTypes()[tweetFieldNameMap.get(Tweet.USER)],
- mutableUserFields);
-
- mutableTweetFields = new IAObject[] { new AMutableString(null),
mutableUser, new AMutableDouble(0),
- new AMutableDouble(0), new AMutableString(null), new
AMutableString(null) };
- mutableRecord = new AMutableRecord(recordType, mutableTweetFields);
+ this.recordType = recordType;
}
- // Initialize the hashmap values for the field names and positions
- private void initFieldNames(ARecordType recordType) {
- String tweetFields[] = recordType.getFieldNames();
- for (int i = 0; i < tweetFields.length; i++) {
- tweetFieldNameMap.put(tweetFields[i], i);
- if (tweetFields[i].equals(Tweet.USER)) {
- IAType fieldType = recordType.getFieldTypes()[i];
- if (fieldType.getTypeTag() == ATypeTag.RECORD) {
- String userFields[] = ((ARecordType)
fieldType).getFieldNames();
- for (int j = 0; j < userFields.length; j++) {
- userFieldNameMap.put(userFields[j], j);
- }
- }
+ private void parseUnorderedList(JSONArray jArray, DataOutput output)
throws IOException, JSONException {
+ ArrayBackedValueStorage itemBuffer = getTempBuffer();
+ UnorderedListBuilder unorderedListBuilder = (UnorderedListBuilder)
getUnorderedListBuilder();
+ unorderedListBuilder.reset(null);
+ for (int iter1 = 0; iter1 < jArray.length(); iter1++) {
+ itemBuffer.reset();
+ if(writeField(jArray.get(iter1),null,itemBuffer.getDataOutput()))
+ unorderedListBuilder.addItem(itemBuffer);
+ }
+ unorderedListBuilder.write(output, true);
+ }
+
+ private boolean writeField(Object fieldObj, IAType fieldType, DataOutput
out)
+ throws IOException, JSONException {
+ // save fieldType for closed type check
+ String nstt;
+ boolean writeResult = true;
+ if(fieldType!=null){
+ if(fieldType.getTypeTag() == BuiltinType.ASTRING.getTypeTag()){
+ out.write(BuiltinType.ASTRING.getTypeTag().serialize());
+ utf8Writer.writeUTF8(fieldObj.toString(), out);
+ }
+ else if (fieldType.getTypeTag() == INT64){
+ aInt64.setValue((long) fieldObj);
+ int64Serde.serialize(aInt64, out);
+ }
+ else if(fieldType.getTypeTag() == INT32){
+ out.write(BuiltinType.AINT32.getTypeTag().serialize());
+ out.writeInt((Integer) fieldObj);
+ }
+ else if(fieldType.getTypeTag() == DOUBLE){
+ out.write(BuiltinType.ADOUBLE.getTypeTag().serialize());
+ out.writeDouble((Double) fieldObj);
+ }
+ else if(fieldType.getTypeTag() == BOOLEAN){
+ out.write(BuiltinType.ABOOLEAN.getTypeTag().serialize());
+ out.writeBoolean((Boolean) fieldObj);
+ }
+ else if(fieldType.getTypeTag() == RECORD){
+ writeRecord((JSONObject)fieldObj, out,(ARecordType) fieldType);
+ }
+ else
+ writeResult = false;
+ }
+ else {
+ try {
+ if (fieldObj == JSONObject.NULL) {
+ nullSerde.serialize(ANull.NULL, out);
+ } else if (fieldObj instanceof Integer) {
+ out.write(BuiltinType.AINT32.getTypeTag().serialize());
+ out.writeInt((Integer) fieldObj);
+ } else if (fieldObj instanceof Boolean) {
+ out.write(BuiltinType.ABOOLEAN.getTypeTag().serialize());
+ out.writeBoolean((Boolean) fieldObj);
+ } else if (fieldObj instanceof Double) {
+ out.write(BuiltinType.ADOUBLE.getTypeTag().serialize());
+ out.writeDouble((Double) fieldObj);
+ } else if (fieldObj instanceof Long) {
+ out.write(BuiltinType.AINT64.getTypeTag().serialize());
+ out.writeLong((Long) fieldObj);
+ } else if (fieldObj instanceof String) {
+ out.write(BuiltinType.ASTRING.getTypeTag().serialize());
+ utf8Writer.writeUTF8((String) fieldObj, out);
+ } else if (fieldObj instanceof JSONArray) {
+ if (((JSONArray) fieldObj).length() != 0)
+ parseUnorderedList((JSONArray) fieldObj, out);
+ else
+ writeResult = false;
+ } else if (fieldObj instanceof JSONObject) {
+ if (((JSONObject) fieldObj).length() != 0)
+ writeRecord((JSONObject) fieldObj, out, null);
+ else
+ writeResult = false;
+ }
+ } catch (JSONException e) {
+ writeResult = false;
}
}
+ return writeResult;
}
+ private int checkAttrNameIdx(String[] nameList, String name){
+ int idx = 0;
+ if(nameList!=null)
+ for(String nln :nameList){
+ if(name.equals(nln))
+ return idx;
+ idx++;
+ }
+ return -1;
+ }
+
+ public void writeRecord(JSONObject obj, DataOutput out, ARecordType
curRecType) throws IOException, JSONException {
+ IAType[] curTypes = null;
+ String[] curFNames = null;
+
+ ArrayBackedValueStorage fieldValueBuffer = getTempBuffer();
+ ArrayBackedValueStorage fieldNameBuffer = getTempBuffer();
+ IARecordBuilder recBuilder = getRecordBuilder();
+
+ int fieldN;
+ int attrIdx;
+
+ if (curRecType != null) {
+ curTypes = curRecType.getFieldTypes();
+ curFNames = curRecType.getFieldNames();
+ }
+
+ recBuilder.reset(curRecType);
+ recBuilder.init();
+
+ if(curRecType!=null && !curRecType.isOpen()){
+ // closed record type
+ fieldN = curFNames.length;
+ for (int iter1 = 0; iter1 < fieldN; iter1++) {
+ fieldValueBuffer.reset();
+ DataOutput fieldOutput = fieldValueBuffer.getDataOutput();
+ if(obj.isNull(curFNames[iter1])){
+ if(curRecType.isClosedField(curFNames[iter1]))
+ throw new HyracksDataException("Closed field
"+curFNames[iter1]+" has null value.");
+ else
+ continue;
+ }
+ else {
+ if (writeField(obj.get(curFNames[iter1]), curTypes[iter1],
fieldOutput))
+ recBuilder.addField(iter1, fieldValueBuffer);
+ }
+ }
+ } else{
+ //open record type
+ int closedFieldCount = 0;
+ IAType curFieldType = null;
+ for (String attrName : JSONObject.getNames(obj)){
+ if(obj.isNull(attrName)||obj.length()==0) continue;
+ attrIdx = checkAttrNameIdx(curFNames, attrName);
+ if(curRecType!=null)
+ curFieldType = curRecType.getFieldType(attrName);
+ fieldValueBuffer.reset();
+ fieldNameBuffer.reset();
+ DataOutput fieldOutput = fieldValueBuffer.getDataOutput();
+ if (writeField(obj.get(attrName), curFieldType, fieldOutput)) {
+ if(attrIdx == -1){
+ aString.setValue(attrName);
+ stringSerde.serialize(aString,
fieldNameBuffer.getDataOutput());
+ recBuilder.addField(fieldNameBuffer, fieldValueBuffer);
+ }
+ else {
+ recBuilder.addField(attrIdx, fieldValueBuffer);
+ closedFieldCount++;
+ }
+ }
+ }
+ if(curRecType!=null && closedFieldCount<curFNames.length)
+ throw new HyracksDataException("Non-null field is null");
+ }
+ recBuilder.write(out, true);
+ }
+
+
+ private IARecordBuilder getRecordBuilder() {
+ return recordBuilderPool.allocate(ATypeTag.RECORD);
+ }
+
+ private IAsterixListBuilder getUnorderedListBuilder() {
+ return listBuilderPool.allocate(ATypeTag.UNORDEREDLIST);
+ }
+
+ private ArrayBackedValueStorage getTempBuffer() {
+ return (ArrayBackedValueStorage)
abvsBuilderPool.allocate(ATypeTag.BINARY);
+ }
+
+
@Override
- public void parse(IRawRecord<? extends Status> record, DataOutput out)
throws HyracksDataException {
- Status tweet = record.get();
- User user = tweet.getUser();
- // Tweet user data
- ((AMutableString)
mutableUserFields[userFieldNameMap.get(Tweet.SCREEN_NAME)])
-
.setValue(JObjectUtil.getNormalizedString(user.getScreenName()));
- ((AMutableString)
mutableUserFields[userFieldNameMap.get(Tweet.LANGUAGE)])
- .setValue(JObjectUtil.getNormalizedString(user.getLang()));
- ((AMutableInt32)
mutableUserFields[userFieldNameMap.get(Tweet.FRIENDS_COUNT)]).setValue(user.getFriendsCount());
- ((AMutableInt32)
mutableUserFields[userFieldNameMap.get(Tweet.STATUS_COUNT)]).setValue(user.getStatusesCount());
- ((AMutableString) mutableUserFields[userFieldNameMap.get(Tweet.NAME)])
- .setValue(JObjectUtil.getNormalizedString(user.getName()));
- ((AMutableInt32)
mutableUserFields[userFieldNameMap.get(Tweet.FOLLOWERS_COUNT)])
- .setValue(user.getFollowersCount());
-
- // Tweet data
- ((AMutableString)
mutableTweetFields[tweetFieldNameMap.get(Tweet.ID)]).setValue(String.valueOf(tweet.getId()));
-
- int userPos = tweetFieldNameMap.get(Tweet.USER);
- for (int i = 0; i < mutableUserFields.length; i++) {
- ((AMutableRecord) mutableTweetFields[userPos]).setValueAtPos(i,
mutableUserFields[i]);
+ public void parse(IRawRecord<? extends String> record, DataOutput out)
throws HyracksDataException {
+ try {
+ //TODO get rid of this temporary json
+ resetPools();
+ JSONObject jsObj = new JSONObject(record.get());
+ writeRecord(jsObj, out, recordType);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
}
- if (tweet.getGeoLocation() != null) {
- ((AMutableDouble)
mutableTweetFields[tweetFieldNameMap.get(Tweet.LATITUDE)])
- .setValue(tweet.getGeoLocation().getLatitude());
- ((AMutableDouble)
mutableTweetFields[tweetFieldNameMap.get(Tweet.LONGITUDE)])
- .setValue(tweet.getGeoLocation().getLongitude());
- } else {
- ((AMutableDouble)
mutableTweetFields[tweetFieldNameMap.get(Tweet.LATITUDE)]).setValue(0);
- ((AMutableDouble)
mutableTweetFields[tweetFieldNameMap.get(Tweet.LONGITUDE)]).setValue(0);
- }
- ((AMutableString)
mutableTweetFields[tweetFieldNameMap.get(Tweet.CREATED_AT)])
-
.setValue(JObjectUtil.getNormalizedString(tweet.getCreatedAt().toString()));
- ((AMutableString)
mutableTweetFields[tweetFieldNameMap.get(Tweet.MESSAGE)])
- .setValue(JObjectUtil.getNormalizedString(tweet.getText()));
+ }
- for (int i = 0; i < mutableTweetFields.length; i++) {
- mutableRecord.setValueAtPos(i, mutableTweetFields[i]);
- }
- recordBuilder.reset(mutableRecord.getType());
- recordBuilder.init();
- IDataParser.writeRecord(mutableRecord, out, recordBuilder);
+ private void resetPools() {
+ listBuilderPool.reset();
+ recordBuilderPool.reset();
+ abvsBuilderPool.reset();
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
index d6e536d..7e0bee5 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
@@ -28,7 +28,7 @@
import twitter4j.Status;
-public class TweetParserFactory implements IRecordDataParserFactory<Status> {
+public class TweetParserFactory implements IRecordDataParserFactory<String> {
private static final long serialVersionUID = 1L;
private ARecordType recordType;
@@ -44,14 +44,14 @@
}
@Override
- public IRecordDataParser<Status> createRecordParser(IHyracksTaskContext
ctx) {
+ public IRecordDataParser<String> createRecordParser(IHyracksTaskContext
ctx) {
TweetParser dataParser = new TweetParser(recordType);
return dataParser;
}
@Override
- public Class<? extends Status> getRecordClass() {
- return Status.class;
+ public Class<? extends String> getRecordClass() {
+ return String.class;
}
@Override
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
index e1a7911..42214c8 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/Datatypes.java
@@ -42,21 +42,103 @@
public static class Tweet {
public static final String ID = "id";
public static final String USER = "user";
- public static final String LATITUDE = "latitude";
- public static final String LONGITUDE = "longitude";
+ public static final String GEOLOCATION = "geo";
public static final String CREATED_AT = "created_at";
- public static final String MESSAGE = "message_text";
-
+ public static final String TEXT = "text";
public static final String COUNTRY = "country";
+ public static final String PLACE = "place";
+ public static final String SOURCE = "source";
+ public static final String TRUNCATED = "truncated";
+ public static final String IN_REPLY_TO_STATUS_ID =
"in_reply_to_status_id";
+ public static final String IN_REPLY_TO_USER_ID = "in_reply_to_user_id";
+ public static final String IN_REPLY_TO_SCREENNAME =
"in_reply_to_screen_name";
+ public static final String FAVORITED = "favorited";
+ public static final String RETWEETED = "retweeted";
+ public static final String FAVORITE_COUNT = "favorite_count";
+ public static final String RETWEET_COUNT = "retweet_count";
+ public static final String CONTRIBUTORS = "contributors";
+ public static final String LANGUAGE = "lang";
+ public static final String FILTER_LEVEL = "filter_level";
+ public static final String TIMESTAMP_MS = "timestamp_ms";
+ public static final String IS_QUOTE_STATUS = "is_quote_status";
+ // in API but not int JSON
+// public static final String SENSITIVE = "sensitive";
+// public static final String RETWEETED_BY_ME = "retweeted_by_me";
+// public static final String CURRENT_USER_RETWEET_ID =
"current_user_retweet_id";
+ // consistency consider
+ public static final String MESSAGE = "text_message";
+ public static final String LATITUDE = "latitude";
+ public static final String LONGITUDE = "longititude";
// User fields (for the sub record "user")
public static final String SCREEN_NAME = "screen_name";
- public static final String LANGUAGE = "language";
+ public static final String USER_PREFERRED_LANGUAGE =
"user_preferred_language";
public static final String FRIENDS_COUNT = "friends_count";
public static final String STATUS_COUNT = "status_count";
public static final String NAME = "name";
public static final String FOLLOWERS_COUNT = "followers_count";
+ }
+
+ public static final class Tweet_Place{
+ public static final String ID = "id";
+ public static final String URL = "url";
+ public static final String PLACE_TYPE = "place_type";
+ public static final String NAME = "name";
+ public static final String FULL_NAME = "full_name";
+ public static final String COUNTRY_CODE = "country_code";
+ public static final String COUNTRY = "country";
+ public static final String BOUNDING_BOX = "bounding_box";
+ public static final String ATTRIBUTES = "attributes";
+ }
+
+ public static final class Tweet_User{
+ public static final String ID = "id";
+ public static final String NAME = "name";
+ public static final String SCREEN_NAME = "screen_name";
+ public static final String LOCATION = "location";
+ public static final String DESCRIPTION = "description";
+ public static final String CONTRIBUTORS_ENABLED =
"contributors_enabled";
+ public static final String PROFILE_IMAGE_URL = "profile_image_url";
+ public static final String PROFILE_IMAGE_URL_HTTPS =
"profile_image_url_https";
+ public static final String URL = "url";
+ public static final String PROTECTED = "protected";
+ public static final String FOLLOWERS_COUNT = "followers_count";
+ public static final String PROFILE_BACKGROUND_COLOR =
"profile_background_color";
+ public static final String PROFILE_TEXT_COLOR = "profile_text_color";
+ public static final String PROFILE_LINK_COLOR = "profile_link_color";
+ public static final String PROFILE_SIDEBAR_FILL_COLOR =
"profile_sidebar_fill_color";
+ public static final String PROFILE_SIDEBAR_BORDER_COLOR =
"profile_sidebar_border_color";
+ public static final String PROFILE_USE_BACKGROUND_IMAGE =
"profile_use_background_image";
+ public static final String DEFAULT_PROFILE = "default_profile";
+ public static final String DEFAULT_PROFILE_IMAGE =
"default_profile_image";
+ public static final String FRIENDS_COUNT = "friends_count";
+ public static final String CREATED_AT = "CREATED_AT";
+ public static final String FAVOURITES_COUNT = "favourites_count";
+ public static final String UTC_OFFSET = "utc_offset";
+ public static final String TIME_ZONE = "time_zone";
+ public static final String PROFILE_BACKGROUND_IMAGE_URL =
"profile_background_image_url";
+ public static final String PROFILE_BACKGROUND_IMAGE_URL_HTTPS =
"profile_background_image_url_https";
+ public static final String PROFILE_BANNER_URL = "profile_banner_url";
+ public static final String LANG = "lang";
+ public static final String STATUSES_COUNT = "statuses_count";
+ public static final String GEO_ENABLED = "geo_enabled";
+ public static final String VERIFIED = "verified";
+ public static final String IS_TRANSLATOR = "is_translator";
+ public static final String LISTED_COUNT = "listed_count";
+ public static final String FOLLOW_REQUEST_SENT = "follow_request_sent";
+ // skip Entities, attrs in API but not in JSON is as below
+// public static final String WITHHELD_IN_COUNTRIES =
"withheld_in_countries";
+// public static final String BIGGER_PROFILE_IMAGE_URL =
"bigger_profile_image_url";
+// public static final String MINI_PROFILE_IMAGE_URL =
"mini_profile_image_url";
+// public static final String ORIGINAL_PROFILE_IMAGE_URL =
"original_profile_image_url";
+// public static final String SHOW_ALL_INLINE_MEDIA =
"show_all_inline_media";
+// public static final String PROFILE_BANNER_RETINA_URL =
"profile_banner_retina_url";
+// public static final String PROFILE_BANNER_IPAD_URL =
"profile_banner_ipad_url";
+// public static final String PROFILE_BANNER_IPAD_RETINA_URL =
"profile_banner_ipad_retina_url";
+// public static final String PROFILE_BANNER_MOBILE_URL =
"profile_banner_mobile_url";
+// public static final String PROFILE_BANNER_MOBILE_RETINA_URL =
"profile_banner_mobile_retina_url";
+// public static final String PROFILE_BACKGROUND_TILED =
"profile_background_tiled";
}
/*
@@ -77,4 +159,5 @@
public static final String TOPICS = "topics";
}
+
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 55dee04..0bc38df 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -221,7 +221,7 @@
public static final String KEY_STREAM_SOURCE = "stream-source";
public static final String EXTERNAL = "external";
public static final String KEY_READER_FACTORY = "reader-factory";
- public static final String READER_RSS = "rss";
+ public static final String READER_RSS = "rss_feed";
public static final String FORMAT_CSV = "csv";
public static final String ERROR_PARSE_RECORD = "Parser failed to parse
record";
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
index 4fb602b..6aac9f1 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
@@ -205,6 +205,7 @@
private static ConfigurationBuilder getAuthConfiguration(Map<String,
String> configuration) {
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true);
+ cb.setJSONStoreEnabled(true);
String oAuthConsumerKey =
configuration.get(AuthenticationConstants.OAUTH_CONSUMER_KEY);
String oAuthConsumerSecret =
configuration.get(AuthenticationConstants.OAUTH_CONSUMER_SECRET);
String oAuthAccessToken =
configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN);
diff --git
a/asterixdb/asterix-maven-plugins/lexer-generator-maven-plugin/src/main/resources/Lexer.java
b/asterixdb/asterix-maven-plugins/lexer-generator-maven-plugin/src/main/resources/Lexer.java
index 60e6e89..8dcac13 100644
---
a/asterixdb/asterix-maven-plugins/lexer-generator-maven-plugin/src/main/resources/Lexer.java
+++
b/asterixdb/asterix-maven-plugins/lexer-generator-maven-plugin/src/main/resources/Lexer.java
@@ -85,7 +85,7 @@
this.buffer = buffer;
tokenBegin = bufpos = 0;
containsEscapes = false;
- line++;
+// line++;
tokenBegin = -1;
}
diff --git
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java
index e16633e..a5af127 100644
---
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java
+++
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AOrderedListType.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.om.types;
+import org.apache.asterix.om.base.AOrderedList;
import org.apache.asterix.om.base.IAObject;
import org.json.JSONException;
import org.json.JSONObject;
@@ -26,6 +27,8 @@
private static final long serialVersionUID = 1L;
+ public static final AOrderedListType FULL_OPEN_ORDEREDLIST_TYPE = new
AOrderedListType(null,"");
+
/**
* @param itemType
* if null, the list is untyped
diff --git
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java
index 80b13b5..febc6ad 100644
---
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java
+++
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/AUnorderedListType.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.om.types;
+import org.apache.asterix.om.base.AUnorderedList;
import org.apache.asterix.om.base.IAObject;
import org.json.JSONException;
import org.json.JSONObject;
@@ -26,6 +27,8 @@
private static final long serialVersionUID = 1L;
+ public static final AUnorderedListType FULLY_OPEN_UNORDEREDLIST_TYPE = new
AUnorderedListType(null,"");
+
/**
* @param itemType
* if null, the collection is untyped
--
To view, visit https://asterix-gerrit.ics.uci.edu/1002
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I7021e7b779de05b9ec999a8d5f8464fb0ab413c0
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <[email protected]>