Xikui Wang has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1272
Change subject: Add user-stream for Twitter Adaptor
......................................................................
Add user-stream for Twitter Adaptor
1. Add user-stream option for Twitter Adaptor
2. Refactor part of TwitterRecordReaderFactory
3. To create a user-stream feed, using following ddl:
create feed TwitterFeed using twitter_user_stream(
("format"="twitter-status"),
("type-name"="Tweet"),
...
// rest is same as push feed
Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
---
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
5 files changed, 278 insertions(+), 93 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/72/1272/1
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
index 9ead8a9..c296bc6 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
@@ -26,6 +26,8 @@
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.input.record.GenericRecord;
import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.TwitterUtil;
+import twitter4j.DirectMessage;
import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
@@ -33,27 +35,43 @@
import twitter4j.StatusListener;
import twitter4j.TwitterObjectFactory;
import twitter4j.TwitterStream;
+import twitter4j.User;
+import twitter4j.UserList;
+import twitter4j.UserStreamListener;
public class TwitterPushRecordReader implements IRecordReader<String> {
private LinkedBlockingQueue<String> inputQ;
private TwitterStream twitterStream;
private GenericRecord<String> record;
+ private StatusListener tweetListener;
private boolean closed = false;
- public TwitterPushRecordReader(TwitterStream twitterStream, FilterQuery
query) {
- record = new GenericRecord<>();
- inputQ = new LinkedBlockingQueue<>();
- this.twitterStream =
twitterStream;//TwitterUtil.getTwitterStream(configuration);
- this.twitterStream.addListener(new TweetListener(inputQ));
+ public TwitterPushRecordReader(TwitterStream twitterStream,
TwitterUtil.TweetListener tweetListener,
+ FilterQuery query) {
+ init(twitterStream);
+ tweetListener.setInputQ(inputQ);
+ this.twitterStream.addListener(tweetListener);
this.twitterStream.filter(query);
}
- public TwitterPushRecordReader(TwitterStream twitterStream) {
+ public TwitterPushRecordReader(TwitterStream twitterStream,
TwitterUtil.TweetListener tweetListener) {
+ init(twitterStream);
+ tweetListener.setInputQ(inputQ);
+ this.twitterStream.addListener(tweetListener);
+ twitterStream.sample();
+ }
+
+ public TwitterPushRecordReader(TwitterStream twitterStream,
TwitterUtil.UserTweetsListener tweetListener) {
+ init(twitterStream);
+ tweetListener.setInputQ(inputQ);
+ this.twitterStream.addListener(tweetListener);
+ twitterStream.user();
+ }
+
+ private void init(TwitterStream twitterStream) {
record = new GenericRecord<>();
inputQ = new LinkedBlockingQueue<>();
- this.twitterStream = twitterStream;//
- this.twitterStream.addListener(new TweetListener(inputQ));
- twitterStream.sample();
+ this.twitterStream = twitterStream;
}
@Override
@@ -89,46 +107,6 @@
return false;
}
return true;
- }
-
- private class TweetListener implements StatusListener {
-
- private LinkedBlockingQueue<String> inputQ;
-
- public TweetListener(LinkedBlockingQueue<String> inputQ) {
- this.inputQ = inputQ;
- }
-
- @Override
- public void onStatus(Status tweet) {
- String jsonTweet = TwitterObjectFactory.getRawJSON(tweet);
- inputQ.add(jsonTweet);
- }
-
- @Override
- public void onException(Exception arg0) {
- // do nothing
- }
-
- @Override
- public void onDeletionNotice(StatusDeletionNotice arg0) {
- // do nothing
- }
-
- @Override
- public void onScrubGeo(long arg0, long arg1) {
- // do nothing
- }
-
- @Override
- public void onStallWarning(StallWarning arg0) {
- // do nothing
- }
-
- @Override
- public void onTrackLimitationNotice(int arg0) {
- // do nothing
- }
}
@Override
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 73d1b39..c2af93f 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -73,39 +73,36 @@
builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
throw new AsterixException(builder.toString());
}
- if (TwitterRecordReaderFactory.isTwitterPull(configuration)) {
- pull = true;
- if (configuration.get(SearchAPIConstants.QUERY) == null) {
- throw new AsterixException(
- "parameter " + SearchAPIConstants.QUERY + " not
specified as part of adaptor configuration");
- }
- String interval = configuration.get(SearchAPIConstants.INTERVAL);
- if (interval != null) {
- try {
- Integer.parseInt(interval);
- } catch (NumberFormatException nfe) {
- throw new IllegalArgumentException(
- "parameter " + SearchAPIConstants.INTERVAL + " is
defined incorrectly, expecting a number");
- }
- } else {
- configuration.put(SearchAPIConstants.INTERVAL,
DEFAULT_INTERVAL);
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning(" Parameter " + SearchAPIConstants.INTERVAL
+ " not defined, using default ("
- + DEFAULT_INTERVAL + ")");
- }
- }
- } else {
- pull = false;
- }
- }
- public static boolean isTwitterPull(Map<String, String> configuration) {
- String reader = configuration.get(ExternalDataConstants.KEY_READER);
- if (reader.equals(ExternalDataConstants.READER_TWITTER_PULL)
- || reader.equals(ExternalDataConstants.READER_PULL_TWITTER)) {
- return true;
+ switch (configuration.get(ExternalDataConstants.KEY_READER)) {
+ case ExternalDataConstants.READER_PULL_TWITTER:
+ if (configuration.get(SearchAPIConstants.QUERY) == null) {
+ throw new AsterixException("parameter " +
SearchAPIConstants.QUERY
+ + " not specified as part of adaptor
configuration");
+ }
+ String interval =
configuration.get(SearchAPIConstants.INTERVAL);
+ if (interval != null) {
+ try {
+ Integer.parseInt(interval);
+ } catch (NumberFormatException nfe) {
+ throw new IllegalArgumentException("parameter " +
SearchAPIConstants.INTERVAL
+ + " is defined incorrectly, expecting a
number");
+ }
+ } else {
+ configuration.put(SearchAPIConstants.INTERVAL,
DEFAULT_INTERVAL);
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning(" Parameter " +
SearchAPIConstants.INTERVAL + " not defined, using default ("
+ + DEFAULT_INTERVAL + ")");
+ }
+ }
+ break;
+ case ExternalDataConstants.READER_PUSH_TWITTER:
+ // do nothing.
+ break;
+ case ExternalDataConstants.READER_USER_STREAM_TWITTER:
+ // do nothing.
+ break;
}
- return false;
}
@Override
@@ -116,20 +113,34 @@
@Override
public IRecordReader<? extends String>
createRecordReader(IHyracksTaskContext ctx, int partition)
throws HyracksDataException {
- if (pull) {
- return new
TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration),
- configuration.get(SearchAPIConstants.QUERY),
-
Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL)));
- } else {
- FilterQuery query;
- try {
- query = TwitterUtil.getFilterQuery(configuration);
- return (query == null) ? new
TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration))
- : new
TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration), query);
- } catch (AsterixException e) {
- throw new HyracksDataException(e);
- }
+ IRecordReader<? extends String> recordReader;
+ switch (configuration.get(ExternalDataConstants.KEY_READER)) {
+ case ExternalDataConstants.READER_PULL_TWITTER:
+ recordReader = new
TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration),
+ configuration.get(SearchAPIConstants.QUERY),
+
Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL)));
+ break;
+ case ExternalDataConstants.READER_PUSH_TWITTER:
+ FilterQuery query;
+ try {
+ query = TwitterUtil.getFilterQuery(configuration);
+ recordReader = (query == null)
+ ? new
TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration),
+ TwitterUtil.getTweetListener())
+ : new
TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration),
+ TwitterUtil.getTweetListener(), query);
+ } catch (AsterixException e) {
+ throw new HyracksDataException(e);
+ }
+ break;
+ case ExternalDataConstants.READER_USER_STREAM_TWITTER:
+ recordReader = new
TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration),
+ TwitterUtil.getUserTweetsListener());
+ break;
+ default:
+ throw new HyracksDataException("No Record reader found!");
}
+ return recordReader;
}
@Override
@@ -147,4 +158,5 @@
}
return true;
}
+
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index ad11171..d3faf84 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -101,6 +101,7 @@
case ExternalDataConstants.READER_TWITTER_PUSH:
case ExternalDataConstants.READER_PUSH_TWITTER:
case ExternalDataConstants.READER_PULL_TWITTER:
+ case ExternalDataConstants.READER_USER_STREAM_TWITTER:
return new TwitterRecordReaderFactory();
case ExternalDataConstants.ALIAS_TWITTER_FIREHOSE_ADAPTER:
return new StreamRecordReaderFactory(new
TwitterFirehoseStreamFactory());
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index c5167c1..fa337fa 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -128,6 +128,7 @@
public static final String READER_PUSH_TWITTER = "push_twitter";
public static final String READER_TWITTER_PULL = "twitter_pull";
public static final String READER_PULL_TWITTER = "pull_twitter";
+ public static final String READER_USER_STREAM_TWITTER =
"twitter_user_stream";
public static final String CLUSTER_LOCATIONS = "cluster-locations";
public static final String SCHEDULER = "hdfs-scheduler";
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
index 70d31c0..dedba40 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
@@ -19,17 +19,27 @@
package org.apache.asterix.external.util;
import org.apache.asterix.common.exceptions.AsterixException;
+import twitter4j.DirectMessage;
import twitter4j.FilterQuery;
+import twitter4j.StallWarning;
+import twitter4j.Status;
+import twitter4j.StatusDeletionNotice;
+import twitter4j.StatusListener;
import twitter4j.Twitter;
import twitter4j.TwitterFactory;
+import twitter4j.TwitterObjectFactory;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
+import twitter4j.User;
+import twitter4j.UserList;
+import twitter4j.UserStreamListener;
import twitter4j.conf.ConfigurationBuilder;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
@@ -275,4 +285,187 @@
public static final String INTERVAL = "interval";
}
+ public static UserTweetsListener getUserTweetsListener() {
+ return new UserTweetsListener();
+ }
+
+ public static TweetListener getTweetListener() {
+ return new TweetListener();
+ }
+
+ public static class UserTweetsListener implements UserStreamListener {
+
+ private LinkedBlockingQueue<String> inputQ;
+
+ public void setInputQ(LinkedBlockingQueue<String> inputQ) {
+ this.inputQ = inputQ;
+ }
+
+ @Override
+ public void onDeletionNotice(long l, long l1) {
+ //do nothing
+ }
+
+ @Override
+ public void onFriendList(long[] longs) {
+ //do nothing
+ }
+
+ @Override
+ public void onFavorite(User user, User user1, Status status) {
+ //do nothing
+ }
+
+ @Override
+ public void onUnfavorite(User user, User user1, Status status) {
+ //do nothing
+ }
+
+ @Override
+ public void onFollow(User user, User user1) {
+ //do nothing
+ }
+
+ @Override
+ public void onUnfollow(User user, User user1) {
+ //do nothing
+ }
+
+ @Override
+ public void onDirectMessage(DirectMessage directMessage) {
+ //do nothing
+ }
+
+ @Override
+ public void onUserListMemberAddition(User user, User user1, UserList
userList) {
+ //do nothing
+ }
+
+ @Override
+ public void onUserListMemberDeletion(User user, User user1, UserList
userList) {
+ //do nothing
+ }
+
+ @Override
+ public void onUserListSubscription(User user, User user1, UserList
userList) {
+ //do nothing
+ }
+
+ @Override
+ public void onUserListUnsubscription(User user, User user1, UserList
userList) {
+ //do nothing
+ }
+
+ @Override
+ public void onUserListCreation(User user, UserList userList) {
+ //do nothing
+ }
+
+ @Override
+ public void onUserListUpdate(User user, UserList userList) {
+ //do nothing
+ }
+
+ @Override
+ public void onUserListDeletion(User user, UserList userList) {
+ //do nothing
+ }
+
+ @Override
+ public void onUserProfileUpdate(User user) {
+ //do nothing
+ }
+
+ @Override
+ public void onUserSuspension(long l) {
+ //do nothing
+ }
+
+ @Override
+ public void onUserDeletion(long l) {
+ //do nothing
+ }
+
+ @Override
+ public void onBlock(User user, User user1) {
+ //do nothing
+ }
+
+ @Override
+ public void onUnblock(User user, User user1) {
+ //do nothing
+ }
+
+ @Override
+ public void onStatus(Status status) {
+ String jsonTweet = TwitterObjectFactory.getRawJSON(status);
+ inputQ.add(jsonTweet);
+ }
+
+ @Override
+ public void onDeletionNotice(StatusDeletionNotice
statusDeletionNotice) {
+ //do nothing
+ }
+
+ @Override
+ public void onTrackLimitationNotice(int i) {
+ //do nothing
+ }
+
+ @Override
+ public void onScrubGeo(long l, long l1) {
+ //do nothing
+ }
+
+ @Override
+ public void onStallWarning(StallWarning stallWarning) {
+ //do nothing
+ }
+
+ @Override
+ public void onException(Exception e) {
+ //do nothing
+ }
+ }
+
+ public static class TweetListener implements StatusListener {
+
+ private LinkedBlockingQueue<String> inputQ;
+
+ public void setInputQ(LinkedBlockingQueue<String> inputQ) {
+ this.inputQ = inputQ;
+ }
+
+ @Override
+ public void onStatus(Status tweet) {
+ String jsonTweet = TwitterObjectFactory.getRawJSON(tweet);
+ inputQ.add(jsonTweet);
+ }
+
+ @Override
+ public void onException(Exception arg0) {
+ // do nothing
+ }
+
+ @Override
+ public void onDeletionNotice(StatusDeletionNotice arg0) {
+ // do nothing
+ }
+
+ @Override
+ public void onScrubGeo(long arg0, long arg1) {
+ // do nothing
+ }
+
+ @Override
+ public void onStallWarning(StallWarning arg0) {
+ // do nothing
+ }
+
+ @Override
+ public void onTrackLimitationNotice(int arg0) {
+ // do nothing
+ }
+ }
+
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1272
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <[email protected]>