Hello,
I am not sure why open method is not called, and how to resolve this.
This is my code, thank you in advance for your help,
package org.apache.storm.starter;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.HashMap;
import java.util.Map;
import javax.swing.JLabel;
import javax.swing.JTextPane;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.FilterQuery;
import twitter4j.Query;
import twitter4j.QueryResult;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.Twitter;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
/**
This is a basic example of a Storm topology.
*/
class Tweet implements Serializable{
/**
*
*/
private static final long serialVersionUID = -2156582873277297616L;
String UserName;
String Lang;
String Location;
int FollowersCount;
int FriendsCount;
String tweetText;
Date t_date;
}
public class TwitterStreamTopology {
static ArrayList<String> Users = new ArrayList<String>();
static ArrayList<String> Tweets = new ArrayList<String>();
static int usersCount = 0;
static int tweetsCount = 0;
JTextPane tpReport;
public TwitterStreamTopology (JTextPane tpReport) {
this.tpReport = tpReport;
}
public static class TwitterStreamBolt extends BaseBasicBolt {
/**
*/
JLabel lblTweetsCount;
JLabel lblUsersCount;
public TwitterStreamBolt (JLabel lblTweetsCount, JLabel
lblUsersCount) {
this.lblTweetsCount = lblTweetsCount;
this.lblUsersCount = lblUsersCount;
}
private static final long serialVersionUID = 4099225039441092308L;
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
boolean change = false;
if (!Users.contains(tuple.getString(0))) {
Users.add(tuple.getString(0));
usersCount ++;
change = true;
}
if (!Tweets.contains(tuple.getString(1))) {
Tweets.add(tuple.getString(1));
tweetsCount ++;
change = true;
}
if (change) {
lblTweetsCount.setText("Tweets Count : " + tweetsCount);
lblUsersCount.setText("Users Count : " + usersCount);
collector.emit(new Values(tuple.getString(0)));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("tweetsCount", "usersCount"));
}
}
LocalCluster cluster = new LocalCluster();
public void startTopology (String[] Keywords, JLabel lblTweetsCount,
JLabel lblUsersCount) {
TopologyBuilder builder = new TopologyBuilder();
TwitterStreamSpout t_spout = new TwitterStreamSpout(Keywords,
tpReport);
t_spout.activate();
builder.setSpout("twitterStreamSpout", t_spout, 1);
builder.setBolt("twitterStreamBolt", new
TwitterStreamBolt(lblTweetsCount, lblUsersCount),
1).shuffleGrouping("twitterStreamSpout");
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("twitterStream", conf,
builder.createTopology());
Utils.sleep(10000);
System.out.println("There are " + usersCount + " users who tweet
AASTMT: " + Users.toString());
System.out.println("There are " + tweetsCount + " tweets containing
AASTMT: " + Tweets.toString());
}
public void stopTopology () {
cluster.killTopology("twitterStream");
cluster.shutdown();
}
/*public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("twitterStreamSpout", (IRichSpout) new
TwitterStreamSpout(), 1);
builder.setBolt("twitterStreamBolt", new TwitterStreamBolt(),
1).shuffleGrouping("twitterStreamSpout");
Config conf = new Config();
//conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf,
builder.createTopology());
}
else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("twitterStream", conf,
builder.createTopology());
Utils.sleep(10000);
System.out.println("There are " + usersCount + " users who
tweet AASTMT: " + Users.toString());
System.out.println("There are " + tweetsCount + " tweets
containing AASTMT: " + Tweets.toString());
cluster.killTopology("twitterStream");
cluster.shutdown();
}
}*/
}
class TwitterStreamSpout implements IRichSpout {
/**
*
*/
private static final long serialVersionUID = -5695079926219121864L;
public static Logger LOG =
LoggerFactory.getLogger(TwitterStreamSpout.class);
boolean _isDistributed;
TopologyContext context;
SpoutOutputCollector collector;
Twitter twitter;
Query query;
QueryResult result;
Tweet n_tweet = new Tweet();
String[] keywords = {"Test"};;
//final JTextPane tpReport;
public TwitterStreamSpout(String[] keywords, final JTextPane tpReport) {
this(true);
this.keywords = keywords;
//this.tpReport = tpReport;
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true);
cb.setOAuthConsumerKey("...");
cb.setOAuthConsumerSecret("...");
cb.setOAuthAccessToken("..-..");
cb.setOAuthAccessTokenSecret("..");
cb.setUserStreamRepliesAllEnabled(true);
// The factory instance is re-useable and thread safe.
//TwitterFactory tf = new TwitterFactory(cb.build());
//twitter = tf.getInstance();
//query = new Query("AASTMT");
StatusListener listener = new StatusListener(){
public void onTrackLimitationNotice(int
numberOfLimitedStatuses) {}
public void onException(Exception ex) {
ex.printStackTrace();
}
@Override
public void onScrubGeo(long arg0, long arg1) {
// TODO Auto-generated method stub
}
@Override
public void onDeletionNotice(StatusDeletionNotice arg0) {
// TODO Auto-generated method stub
}
@Override
public void onStallWarning(StallWarning arg0) {
// TODO Auto-generated method stub
}
@Override
public void onStatus(Status status) {
// TODO Auto-generated method stub
System.out.println(status.getUser().getName() + " : " +
status.getText());
n_tweet.UserName = status.getUser().getName();
n_tweet.Lang = status.getUser().getLang();
n_tweet.Location = status.getUser().getLocation();
n_tweet.FollowersCount =
status.getUser().getFollowersCount();
n_tweet.FriendsCount =
status.getUser().getFriendsCount();
n_tweet.tweetText = status.getText();
n_tweet.t_date = status.getCreatedAt();
tpReport.setText(tpReport.getText() + "/n " +
n_tweet.UserName + ": " + n_tweet.tweetText + "\t " +
n_tweet.t_date + "\t " +
n_tweet.Lang + "\t " + n_tweet.Location +
"\t" + n_tweet.FollowersCount +
"\t" + n_tweet.FriendsCount);
nextTuple();
}
};
//TwitterStream twitterStream = new
TwitterStreamFactory(cb.build()).getInstance();
TwitterStream twitterStream = new
TwitterStreamFactory(cb.build()).getInstance();
//twitterStream.setOAuthConsumer("...", "...");
//AccessToken at = new AccessToken("...", "...");
//twitterStream.setOAuthAccessToken(at);
// sample() method internally creates a thread which manipulates
TwitterStream and calls these adequate listener methods continuously.
//twitterStream.sample();
twitterStream.addListener(listener);
FilterQuery fq = new FilterQuery();
fq.track(keywords);
twitterStream.filter(fq);
//twitterStream.sample();
}
public TwitterStreamSpout(boolean isDistributed) {
_isDistributed = isDistributed;
}
@Override
public void close() {
}
@Override
public void nextTuple() {
collector.emit(new Values(n_tweet.UserName, n_tweet.Lang,
n_tweet.Location, n_tweet.FollowersCount, n_tweet.FriendsCount,
n_tweet.tweetText, n_tweet.t_date));
}
@Override
public void ack(Object msgId) {
}
@Override
public void fail(Object msgId) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("user", "Lang", "Loc", "Followers",
"Friends", "tweet", "date"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
if(!_isDistributed) {
Map<String, Object> ret = new HashMap<String, Object>();
ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
return ret;
} else {
return null;
}
}
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
// TODO Auto-generated method stub
this.context = context;
this.collector = collector;
}
@Override
public void activate() {
// TODO Auto-generated method stub
}
@Override
public void deactivate() {
// TODO Auto-generated method stub
}
}