Xikui Wang has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/3032
Change subject: [NO ISSUE][ING] Tweets ingestion related refactoring
......................................................................
[NO ISSUE][ING] Tweets ingestion related refactoring
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
As the incoming Tweets are merely JSON records, the Twitter adapter
should be able to use JSON parser. For that purpose, we changed the
record record to be consistent with JSONDataParser which uses char[]
instead of String. Also, "JSON" should also be a valid way to specify
using JSON parser.
Change-Id: Id6d656a4af974499ef0df2e389152ca205c2d078
---
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
M
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/TweetParserTest.java
9 files changed, 42 insertions(+), 27 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/32/3032/1
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index cc4b7f9..674dd52 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -41,7 +41,8 @@
private int recordNumber = 0;
private static final List<String> recordReaderFormats =
Collections.unmodifiableList(Arrays.asList(ExternalDataConstants.FORMAT_ADM,
- ExternalDataConstants.FORMAT_JSON,
ExternalDataConstants.FORMAT_SEMISTRUCTURED));
+ ExternalDataConstants.FORMAT_JSON,
ExternalDataConstants.FORMAT_JSON_CAP,
+ ExternalDataConstants.FORMAT_SEMISTRUCTURED));
private static final String REQUIRED_CONFIGS = "";
@Override
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
index 2cf5396..f5404f6 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
@@ -24,6 +24,7 @@
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.input.record.CharArrayRecord;
import org.apache.asterix.external.input.record.GenericRecord;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -35,7 +36,7 @@
import twitter4j.TwitterException;
import twitter4j.TwitterObjectFactory;
-public class TwitterPullRecordReader implements IRecordReader<String> {
+public class TwitterPullRecordReader implements IRecordReader<char[]> {
private Query query;
private Twitter twitter;
@@ -43,7 +44,7 @@
private QueryResult result;
private int nextTweetIndex = 0;
private long lastTweetIdReceived = 0;
- private GenericRecord<String> record;
+ private CharArrayRecord record;
private boolean stopped = false;
public TwitterPullRecordReader(Twitter twitter, String keywords, int
requestInterval) {
@@ -51,7 +52,7 @@
this.requestInterval = requestInterval;
this.query = new Query(keywords);
this.query.setCount(100);
- this.record = new GenericRecord<>();
+ this.record = new CharArrayRecord(0);
}
@Override
@@ -65,7 +66,7 @@
}
@Override
- public IRawRecord<String> next() throws IOException, InterruptedException {
+ public IRawRecord<char[]> next() throws IOException, InterruptedException {
if (result == null || nextTweetIndex >= result.getTweets().size()) {
Thread.sleep(1000 * requestInterval);
query.setSinceId(lastTweetIdReceived);
@@ -83,7 +84,7 @@
lastTweetIdReceived = tweet.getId();
}
String jsonTweet = TwitterObjectFactory.getRawJSON(tweet); //
transform tweet obj to json
- record.set(jsonTweet);
+ record.set(jsonTweet.toCharArray());
return record;
} else {
return null;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
index 3c63281..072ff22 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
@@ -24,6 +24,7 @@
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.input.record.CharArrayRecord;
import org.apache.asterix.external.input.record.GenericRecord;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.asterix.external.util.TwitterUtil;
@@ -31,10 +32,10 @@
import twitter4j.FilterQuery;
import twitter4j.TwitterStream;
-public class TwitterPushRecordReader implements IRecordReader<String> {
+public class TwitterPushRecordReader implements IRecordReader<char[]> {
private LinkedBlockingQueue<String> inputQ;
private TwitterStream twitterStream;
- private GenericRecord<String> record;
+ private CharArrayRecord record;
private boolean closed = false;
public TwitterPushRecordReader(TwitterStream twitterStream,
TwitterUtil.TweetListener tweetListener,
@@ -60,7 +61,7 @@
}
private void init(TwitterStream twitterStream) {
- record = new GenericRecord<>();
+ record = new CharArrayRecord(0);
inputQ = new LinkedBlockingQueue<>();
this.twitterStream = twitterStream;
}
@@ -81,12 +82,12 @@
}
@Override
- public IRawRecord<String> next() throws IOException, InterruptedException {
+ public IRawRecord<char[]> next() throws IOException, InterruptedException {
String tweet = inputQ.poll();
if (tweet == null) {
return null;
}
- record.set(tweet);
+ record.set(tweet.toCharArray());
return record;
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index a31e0da..43a3816 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -43,7 +43,7 @@
import twitter4j.FilterQuery;
-public class TwitterRecordReaderFactory implements
IRecordReaderFactory<String> {
+public class TwitterRecordReaderFactory implements
IRecordReaderFactory<char[]> {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = LogManager.getLogger();
@@ -128,9 +128,9 @@
}
@Override
- public IRecordReader<? extends String>
createRecordReader(IHyracksTaskContext ctx, int partition)
+ public IRecordReader<? extends char[]>
createRecordReader(IHyracksTaskContext ctx, int partition)
throws HyracksDataException {
- IRecordReader<? extends String> recordReader;
+ IRecordReader<? extends char[]> recordReader;
switch (configuration.get(ExternalDataConstants.KEY_READER)) {
case ExternalDataConstants.KEY_ADAPTER_NAME_PULL_TWITTER:
recordReader = new
TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration),
@@ -161,8 +161,8 @@
}
@Override
- public Class<? extends String> getRecordClass() {
- return String.class;
+ public Class<? extends char[]> getRecordClass() {
+ return char[].class;
}
private boolean validateConfiguration(Map<String, String> configuration) {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
index 0183196..bf8d134 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
@@ -47,7 +47,13 @@
import java.io.IOException;
import java.util.Iterator;
-public class TweetParser extends AbstractDataParser implements
IRecordDataParser<String> {
+/**
+ * This class was introduced to parse Twitter data. As the Tweets are JSON
formatted record, this class could be
+ * deprecated. For future use cases, we could add special processes for Tweets
in this parser, like parsing timestamps
+ * in Tweets to datetime instead of strings.
+ */
+
+public class TweetParser extends AbstractDataParser implements
IRecordDataParser<char[]> {
private final IObjectPool<IARecordBuilder, ATypeTag> recordBuilderPool =
new ListObjectPool<>(new RecordBuilderFactory());
private final IObjectPool<IAsterixListBuilder, ATypeTag> listBuilderPool =
@@ -252,12 +258,12 @@
}
@Override
- public void parse(IRawRecord<? extends String> record, DataOutput out)
throws HyracksDataException {
+ public void parse(IRawRecord<? extends char[]> record, DataOutput out)
throws HyracksDataException {
try {
//TODO get rid of this temporary json
resetPools();
ObjectMapper om = new ObjectMapper();
- writeRecord(om.readTree(record.get()), out, recordType);
+ writeRecord(om.readTree(record.getBytes()), out, recordType);
} catch (IOException e) {
throw HyracksDataException.create(e);
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java
index 501aea0..81abae5 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/JSONDataParserFactory.java
@@ -27,6 +27,7 @@
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IStreamDataParser;
import org.apache.asterix.external.parser.JSONDataParser;
+import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.om.types.AOrderedListType;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
@@ -41,7 +42,8 @@
public class JSONDataParserFactory extends
AbstractRecordStreamParserFactory<char[]> {
private static final long serialVersionUID = 1L;
- private static final List<String> PARSER_FORMAT =
Collections.unmodifiableList(Arrays.asList("json"));
+ private static final List<String> PARSER_FORMAT = Collections
+ .unmodifiableList(Arrays.asList(ExternalDataConstants.FORMAT_JSON,
ExternalDataConstants.FORMAT_JSON_CAP));
private static final List<ATypeTag> UNSUPPORTED_TYPES = Collections
.unmodifiableList(Arrays.asList(ATypeTag.MULTISET,
ATypeTag.POINT3D, ATypeTag.CIRCLE, ATypeTag.RECTANGLE,
ATypeTag.INTERVAL, ATypeTag.DAYTIMEDURATION,
ATypeTag.DURATION, ATypeTag.BINARY));
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
index 34f0434..7844924 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/TweetParserFactory.java
@@ -26,13 +26,15 @@
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IRecordDataParserFactory;
import org.apache.asterix.external.parser.TweetParser;
+import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.api.context.IHyracksTaskContext;
-public class TweetParserFactory implements IRecordDataParserFactory<String> {
+public class TweetParserFactory implements IRecordDataParserFactory<char[]> {
private static final long serialVersionUID = 1L;
- private static final List<String> parserFormats =
Collections.unmodifiableList(Arrays.asList("twitter-status"));
+ private static final List<String> parserFormats =
+
Collections.unmodifiableList(Arrays.asList(ExternalDataConstants.FORMAT_TWEET));
private ARecordType recordType;
@Override
@@ -46,7 +48,7 @@
}
@Override
- public IRecordDataParser<String> createRecordParser(IHyracksTaskContext
ctx) {
+ public IRecordDataParser<char[]> createRecordParser(IHyracksTaskContext
ctx) {
TweetParser dataParser = new TweetParser(recordType);
return dataParser;
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 5a04e33..bd9ef50 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -154,6 +154,7 @@
public static final String FORMAT_BINARY = "binary";
public static final String FORMAT_ADM = "adm";
public static final String FORMAT_JSON = "json";
+ public static final String FORMAT_JSON_CAP = "JSON";
public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
public static final String FORMAT_TWEET = "twitter-status";
public static final String FORMAT_RSS = "rss";
diff --git
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/TweetParserTest.java
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/TweetParserTest.java
index 9916fa5..6888648 100644
---
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/TweetParserTest.java
+++
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/TweetParserTest.java
@@ -34,6 +34,7 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.asterix.external.input.record.CharArrayRecord;
import org.apache.asterix.external.input.record.GenericRecord;
import org.apache.asterix.external.parser.TweetParser;
import org.apache.asterix.om.types.AOrderedListType;
@@ -69,8 +70,8 @@
ByteArrayOutputStream is = new ByteArrayOutputStream();
DataOutput output = new DataOutputStream(is);
for (int iter1 = 0; iter1 < lines.size(); iter1++) {
- GenericRecord<String> record = new GenericRecord<>();
- record.set(lines.get(iter1));
+ CharArrayRecord record = new CharArrayRecord(0);
+ record.set(lines.get(iter1).toCharArray());
try {
parser.parse(record, output);
} catch (HyracksDataException e) {
@@ -95,8 +96,8 @@
DataOutput output = new DataOutputStream(is);
int regularCount = 0;
for (int iter1 = 0; iter1 < lines.size(); iter1++) {
- GenericRecord<String> record = new GenericRecord<>();
- record.set(lines.get(iter1));
+ CharArrayRecord record = new CharArrayRecord(0);
+ record.set(lines.get(iter1).toCharArray());
try {
parser.parse(record, output);
regularCount++;
--
To view, visit https://asterix-gerrit.ics.uci.edu/3032
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Id6d656a4af974499ef0df2e389152ca205c2d078
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <[email protected]>