Hello,

Has anybody tried to union two streams of Twitter Statues? I am
instantiating two twitter streams through two different set of credentials
and passing them through a union function, but the console does not show
any activity neither there are any errors.


--static function that returns JavaReceiverInputDStream<Status>--



public static JavaReceiverInputDStream<Status>
getTwitterStream(JavaStreamingContext spark, String consumerKey, String
consumerSecret,String accessToken, String accessTokenSecret,String[]
filter) {
  // Enable Oauth
  ConfigurationBuilder cb = new ConfigurationBuilder();
  cb.setDebugEnabled(false)
    .setOAuthConsumerKey(consumerKey).setOAuthConsumerSecret(consumerSecret)

.setOAuthAccessToken(accessToken).setOAuthAccessTokenSecret(accessTokenSecret)
    .setJSONStoreEnabled(true);
  TwitterFactory tf = new TwitterFactory(cb.build());
  Twitter twitter = tf.getInstance();

  // Create stream
  return TwitterUtils.createStream(spark,
twitter.getAuthorization(),filter);
 }
---trying to union two twitter streams---

JavaStreamingContext jssc = new JavaStreamingContext(conf,
Durations.minutes(5));

jssc.sparkContext().setLogLevel("ERROR");


JavaReceiverInputDStream<Status> twitterStreamByHashtag =
TwitterUtil.getTwitterStream(jssc, consumerKey1, consumerSecret1,
accessToken1, accessTokenSecret1,new String[]{"#Twitter"});
      // JavaReceiverInputDStream<Status> twitterStreamByUser =
TwitterUtil.getTwitterStream(jssc, consumerKey2, consumerSecret2,
accessToken2, accessTokenSecret2,new String[]{"@Twitter"});


JavaDStream<String> statuses = twitterStreamByHashtag
                .union(twitterStreamByUser)
                .map(s->{return s.getText();});


regards,
Imran

-- 
I.R

Reply via email to