Re: No Twitter Input from Kafka to Spark Streaming
You just pasted your twitter credentials, consider changing it. :/ Thanks Best Regards On Wed, Aug 5, 2015 at 10:07 PM, narendra wrote: > Thanks Akash for the answer. I added endpoint to the listener and now it is > working. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/No-Twitter-Input-from-Kafka-to-Spark-Streaming-tp24131p24142.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: No Twitter Input from Kafka to Spark Streaming
Thanks Akash for the answer. I added endpoint to the listener and now it is working. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/No-Twitter-Input-from-Kafka-to-Spark-Streaming-tp24131p24142.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: No Twitter Input from Kafka to Spark Streaming
Have you tried using the console consumer to see if anything is actually getting published to that topic? On Tue, Aug 4, 2015 at 11:45 AM, narendra wrote: > My application takes Twitter4j tweets and publishes those to a topic in > Kafka. Spark Streaming subscribes to that topic for processing. But in > actual, Spark Streaming is not able to receive tweet data from Kafka so > Spark Streaming is running empty batch jobs with out input and I am not > able > to see any output from Spark Streaming. > > The code of the application is - > > import java.util.HashMap > import java.util.Properties > import twitter4j._ > import twitter4j.FilterQuery; > import twitter4j.StallWarning; > import twitter4j.Status; > import twitter4j.StatusDeletionNotice; > import twitter4j.StatusListener; > import twitter4j.TwitterStream; > import twitter4j.TwitterStreamFactory; > import twitter4j.conf.ConfigurationBuilder; > import twitter4j.json.DataObjectFactory; > import kafka.serializer.StringDecoder > import org.apache.spark.streaming.kafka._ > import kafka.javaapi.producer.Producer > import kafka.producer.{KeyedMessage, ProducerConfig} > import org.apache.spark._ > import org.apache.spark.streaming._ > import org.apache.spark.streaming.StreamingContext._ > > object TwitterPopularTags { > def main(args: Array[String]) { > > /** Information necessary for accessing the Twitter API */ > val consumerKey= "2AgtQfH8rlyUDyfjwPOCDosEQ" > val consumerSecret= > "vnG8uoaan4gPmoy1rFMbz3i19396jODwGRLRqsHBPTwSlMcUIl" > val accessToken= > "33807905-2vYZMjZyRjFJQrrkPVQwmiQcZCnag6m2wKujpiu4e" > val accessTokenSecret = > "X880Iq3YseBsAs3e8ZoHSOaDnN431dWJ6QpeMJO6VVAzm" > val cb = new ConfigurationBuilder() > cb.setOAuthConsumerKey(consumerKey) > cb.setOAuthConsumerSecret(consumerSecret) > cb.setOAuthAccessToken(accessToken) > cb.setOAuthAccessTokenSecret(accessTokenSecret) > cb.setJSONStoreEnabled(true) > cb.setIncludeEntitiesEnabled(true) > val twitterStream = new > TwitterStreamFactory(cb.build()).getInstance() > > val KafkaTopic = "LiveTweets" > /* kafka producer properties */ > val kafkaProducer = { > val props = new Properties() > props.put("metadata.broker.list", > "broker2:9092,localhost:9092") > props.put("serializer.class", > "kafka.serializer.StringEncoder") > props.put("request.required.acks", "1") > val config = new ProducerConfig(props) > new Producer[String, String](config) > } > > /* Invoked when a new tweet comes */ > val listener = new StatusListener() { > >override def onStatus(status: Status): Unit = { >val msg = new KeyedMessage[String, > String](KafkaTopic,DataObjectFactory.getRawJSON(status)) >kafkaProducer.send(msg) > } >override def onException(ex: Exception): Unit = throw ex > > // no-op for the following events > override def onStallWarning(warning: StallWarning): Unit > = > {} > override def onDeletionNotice(statusDeletionNotice: > StatusDeletionNotice): Unit = {} > override def onScrubGeo(userId: Long, upToStatusId: > Long): > Unit = {} > override def > onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = {} > } > > twitterStream.addListener(listener) > // Create Spark Streaming context > val sparkConf = new SparkConf().setAppName("Twitter-Kafka-Spark > Streaming") > val sc = new SparkContext(sparkConf) > val ssc = new StreamingContext(sc, Seconds(2)) > > // Define the Kafka parameters, broker list must be specified > val kafkaParams = Map("metadata.broker.list" -> > "broker2:9092,localhost:9092") > val topics = Set(KafkaTopic) > > // Create the direct stream with the Kafka parameters and topics > val kafkaStream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](ssc,kafkaParams,topics) > val lines = kafkaStream.map(_._2) > val words = lines.flatMap(_.split(" ")) > val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) > wordCounts.print() > ssc.start()
No Twitter Input from Kafka to Spark Streaming
My application takes Twitter4j tweets and publishes those to a topic in Kafka. Spark Streaming subscribes to that topic for processing. But in actual, Spark Streaming is not able to receive tweet data from Kafka so Spark Streaming is running empty batch jobs with out input and I am not able to see any output from Spark Streaming. The code of the application is - import java.util.HashMap import java.util.Properties import twitter4j._ import twitter4j.FilterQuery; import twitter4j.StallWarning; import twitter4j.Status; import twitter4j.StatusDeletionNotice; import twitter4j.StatusListener; import twitter4j.TwitterStream; import twitter4j.TwitterStreamFactory; import twitter4j.conf.ConfigurationBuilder; import twitter4j.json.DataObjectFactory; import kafka.serializer.StringDecoder import org.apache.spark.streaming.kafka._ import kafka.javaapi.producer.Producer import kafka.producer.{KeyedMessage, ProducerConfig} import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ object TwitterPopularTags { def main(args: Array[String]) { /** Information necessary for accessing the Twitter API */ val consumerKey= "2AgtQfH8rlyUDyfjwPOCDosEQ" val consumerSecret= "vnG8uoaan4gPmoy1rFMbz3i19396jODwGRLRqsHBPTwSlMcUIl" val accessToken= "33807905-2vYZMjZyRjFJQrrkPVQwmiQcZCnag6m2wKujpiu4e" val accessTokenSecret = "X880Iq3YseBsAs3e8ZoHSOaDnN431dWJ6QpeMJO6VVAzm" val cb = new ConfigurationBuilder() cb.setOAuthConsumerKey(consumerKey) cb.setOAuthConsumerSecret(consumerSecret) cb.setOAuthAccessToken(accessToken) cb.setOAuthAccessTokenSecret(accessTokenSecret) cb.setJSONStoreEnabled(true) cb.setIncludeEntitiesEnabled(true) val twitterStream = new TwitterStreamFactory(cb.build()).getInstance() val KafkaTopic = "LiveTweets" /* kafka producer properties */ val kafkaProducer = { val props = new Properties() props.put("metadata.broker.list", "broker2:9092,localhost:9092") props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("request.required.acks", "1") val config = new ProducerConfig(props) new Producer[String, String](config) } /* Invoked when a new tweet comes */ val listener = new StatusListener() { override def onStatus(status: Status): Unit = { val msg = new KeyedMessage[String, String](KafkaTopic,DataObjectFactory.getRawJSON(status)) kafkaProducer.send(msg) } override def onException(ex: Exception): Unit = throw ex // no-op for the following events override def onStallWarning(warning: StallWarning): Unit = {} override def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice): Unit = {} override def onScrubGeo(userId: Long, upToStatusId: Long): Unit = {} override def onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = {} } twitterStream.addListener(listener) // Create Spark Streaming context val sparkConf = new SparkConf().setAppName("Twitter-Kafka-Spark Streaming") val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(2)) // Define the Kafka parameters, broker list must be specified val kafkaParams = Map("metadata.broker.list" -> "broker2:9092,localhost:9092") val topics = Set(KafkaTopic) // Create the direct stream with the Kafka parameters and topics val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topics) val lines = kafkaStream.map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } } Spark Streaming web UI - <http://apache-spark-user-list.1001560.n3.nabble.com/file/n24131/streaming.png> <http://apache-spark-user-list.1001560.n3.nabble.com/file/n24131/sparkjobs.png> Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/No-Twitter-Input-from-Kafka-to-Spark-Streaming-tp24131.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org