Hello,

I am not sure why open method is not called, and how to resolve this.

This is my code, thank you in advance for your help,

package org.apache.storm.starter;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.HashMap;
import java.util.Map;

import javax.swing.JLabel;
import javax.swing.JTextPane;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;



import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import twitter4j.FilterQuery;
import twitter4j.Query;
import twitter4j.QueryResult;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.Twitter;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;


/**

This is a basic example of a Storm topology.
*/

class Tweet implements Serializable{
    /**
     *
     */
    private static final long serialVersionUID = -2156582873277297616L;
    String UserName;
    String Lang;
    String Location;
    int FollowersCount;
    int FriendsCount;
    String tweetText;
    Date t_date;

}
public class TwitterStreamTopology {
    static ArrayList<String> Users = new ArrayList<String>();
    static ArrayList<String> Tweets = new ArrayList<String>();
    static int usersCount = 0;
    static int tweetsCount = 0;
    JTextPane tpReport;

    public TwitterStreamTopology (JTextPane tpReport) {
        this.tpReport = tpReport;

    }
    public static class TwitterStreamBolt extends BaseBasicBolt {

        /**

        */

        JLabel lblTweetsCount;
        JLabel lblUsersCount;

        public TwitterStreamBolt (JLabel lblTweetsCount, JLabel
lblUsersCount) {
        this.lblTweetsCount = lblTweetsCount;
        this.lblUsersCount = lblUsersCount;
        }

        private static final long serialVersionUID = 4099225039441092308L;


        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            boolean change = false;
            if (!Users.contains(tuple.getString(0))) {
                Users.add(tuple.getString(0));
                usersCount ++;
                change = true;
            }
            if (!Tweets.contains(tuple.getString(1))) {
                Tweets.add(tuple.getString(1));
                tweetsCount ++;
                change = true;
            }

            if (change) {
                lblTweetsCount.setText("Tweets Count : " + tweetsCount);
                lblUsersCount.setText("Users Count : " + usersCount);
                collector.emit(new Values(tuple.getString(0)));
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("tweetsCount", "usersCount"));
        }


    }


    LocalCluster cluster = new LocalCluster();

    public void startTopology (String[] Keywords,  JLabel lblTweetsCount,
JLabel lblUsersCount) {
        TopologyBuilder builder = new TopologyBuilder();

        TwitterStreamSpout t_spout = new TwitterStreamSpout(Keywords,
tpReport);
        t_spout.activate();
        builder.setSpout("twitterStreamSpout",  t_spout, 1);
        builder.setBolt("twitterStreamBolt", new
TwitterStreamBolt(lblTweetsCount, lblUsersCount),
1).shuffleGrouping("twitterStreamSpout");

        Config conf = new Config();

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("twitterStream", conf,
builder.createTopology());
        Utils.sleep(10000);
        System.out.println("There are " + usersCount + " users who tweet
AASTMT: " + Users.toString());
        System.out.println("There are " + tweetsCount + " tweets containing
AASTMT: " + Tweets.toString());


    }

    public void stopTopology () {

        cluster.killTopology("twitterStream");
        cluster.shutdown();

    }
    /*public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("twitterStreamSpout", (IRichSpout) new
TwitterStreamSpout(), 1);
        builder.setBolt("twitterStreamBolt", new TwitterStreamBolt(),
1).shuffleGrouping("twitterStreamSpout");

        Config conf = new Config();
        //conf.setDebug(true);

        if (args != null && args.length > 0) {
            conf.setNumWorkers(3);

            StormSubmitter.submitTopologyWithProgressBar(args[0], conf,
builder.createTopology());
        }
        else {

            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("twitterStream", conf,
builder.createTopology());
            Utils.sleep(10000);
            System.out.println("There are " + usersCount + " users who
tweet AASTMT: " + Users.toString());
            System.out.println("There are " + tweetsCount + " tweets
containing AASTMT: " + Tweets.toString());
            cluster.killTopology("twitterStream");
            cluster.shutdown();
        }
    }*/
}

class TwitterStreamSpout implements IRichSpout {

    /**
     *
     */
    private static final long serialVersionUID = -5695079926219121864L;
    public static Logger LOG =
LoggerFactory.getLogger(TwitterStreamSpout.class);
    boolean _isDistributed;
    TopologyContext context;
    SpoutOutputCollector collector;
    Twitter twitter;
    Query query;
    QueryResult result;
    Tweet n_tweet = new Tweet();
    String[] keywords = {"Test"};;
    //final JTextPane tpReport;
    public TwitterStreamSpout(String[] keywords, final JTextPane tpReport) {
        this(true);
        this.keywords = keywords;
        //this.tpReport = tpReport;

        ConfigurationBuilder cb = new ConfigurationBuilder();
        cb.setDebugEnabled(true);
        cb.setOAuthConsumerKey("...");
        cb.setOAuthConsumerSecret("...");
        cb.setOAuthAccessToken("..-..");
        cb.setOAuthAccessTokenSecret("..");

        cb.setUserStreamRepliesAllEnabled(true);

     // The factory instance is re-useable and thread safe.
        //TwitterFactory tf = new TwitterFactory(cb.build());

        //twitter = tf.getInstance();
        //query = new Query("AASTMT");
        StatusListener listener = new StatusListener(){

            public void onTrackLimitationNotice(int
numberOfLimitedStatuses) {}
            public void onException(Exception ex) {
                ex.printStackTrace();
            }
            @Override
            public void onScrubGeo(long arg0, long arg1) {
                // TODO Auto-generated method stub

            }

            @Override
            public void onDeletionNotice(StatusDeletionNotice arg0) {
                // TODO Auto-generated method stub

            }
            @Override
            public void onStallWarning(StallWarning arg0) {
                // TODO Auto-generated method stub

            }
            @Override
            public void onStatus(Status status) {
                // TODO Auto-generated method stub

                 System.out.println(status.getUser().getName() + " : " +
status.getText());
                    n_tweet.UserName = status.getUser().getName();
                    n_tweet.Lang = status.getUser().getLang();
                    n_tweet.Location = status.getUser().getLocation();
                    n_tweet.FollowersCount =
status.getUser().getFollowersCount();
                    n_tweet.FriendsCount =
status.getUser().getFriendsCount();
                    n_tweet.tweetText = status.getText();
                    n_tweet.t_date = status.getCreatedAt();
                    tpReport.setText(tpReport.getText() + "/n " +
n_tweet.UserName + ": " + n_tweet.tweetText + "\t " +
                                        n_tweet.t_date + "\t " +
n_tweet.Lang + "\t " + n_tweet.Location +
                                        "\t" + n_tweet.FollowersCount +
"\t" + n_tweet.FriendsCount);
                    nextTuple();
            }
        };


        //TwitterStream twitterStream = new
TwitterStreamFactory(cb.build()).getInstance();
        TwitterStream twitterStream = new
TwitterStreamFactory(cb.build()).getInstance();

        //twitterStream.setOAuthConsumer("...", "...");
        //AccessToken at = new AccessToken("...", "...");
        //twitterStream.setOAuthAccessToken(at);


        // sample() method internally creates a thread which manipulates
TwitterStream and calls these adequate listener methods continuously.
        //twitterStream.sample();

        twitterStream.addListener(listener);

        FilterQuery fq = new FilterQuery();


        fq.track(keywords);
        twitterStream.filter(fq);
        //twitterStream.sample();
    }


    public TwitterStreamSpout(boolean isDistributed) {
        _isDistributed = isDistributed;
    }


    @Override
    public void close() {

    }

    @Override
    public void nextTuple() {
        collector.emit(new Values(n_tweet.UserName, n_tweet.Lang,
n_tweet.Location, n_tweet.FollowersCount, n_tweet.FriendsCount,
n_tweet.tweetText, n_tweet.t_date));
    }

    @Override
    public void ack(Object msgId) {

    }

    @Override
    public void fail(Object msgId) {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("user", "Lang", "Loc", "Followers",
"Friends", "tweet", "date"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        if(!_isDistributed) {
            Map<String, Object> ret = new HashMap<String, Object>();
            ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
            return ret;
        } else {
            return null;
        }
    }

    @Override
    public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
        // TODO Auto-generated method stub

        this.context = context;
        this.collector = collector;
    }

    @Override
    public void activate() {
        // TODO Auto-generated method stub

    }

    @Override
    public void deactivate() {
        // TODO Auto-generated method stub

    }
}

Reply via email to