Re: No Twitter Input from Kafka to Spark Streaming

2015-08-06 Thread Akhil Das
You just pasted your twitter credentials, consider changing it. :/

Thanks
Best Regards

On Wed, Aug 5, 2015 at 10:07 PM, narendra  wrote:

> Thanks Akash for the answer. I added endpoint to the listener and now it is
> working.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/No-Twitter-Input-from-Kafka-to-Spark-Streaming-tp24131p24142.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: No Twitter Input from Kafka to Spark Streaming

2015-08-05 Thread narendra
Thanks Akash for the answer. I added endpoint to the listener and now it is
working.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-Twitter-Input-from-Kafka-to-Spark-Streaming-tp24131p24142.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: No Twitter Input from Kafka to Spark Streaming

2015-08-04 Thread Cody Koeninger
Have you tried using the console consumer to see if anything is actually
getting published to that topic?

On Tue, Aug 4, 2015 at 11:45 AM, narendra  wrote:

> My application takes Twitter4j tweets and publishes those to a topic in
> Kafka. Spark Streaming subscribes to that topic for processing. But in
> actual, Spark Streaming is not able to receive tweet data from Kafka so
> Spark Streaming is running empty batch jobs with out input and I am not
> able
> to see any output from Spark Streaming.
>
> The code of the application is -
>
> import java.util.HashMap
> import java.util.Properties
> import twitter4j._
> import twitter4j.FilterQuery;
> import twitter4j.StallWarning;
> import twitter4j.Status;
> import twitter4j.StatusDeletionNotice;
> import twitter4j.StatusListener;
> import twitter4j.TwitterStream;
> import twitter4j.TwitterStreamFactory;
> import twitter4j.conf.ConfigurationBuilder;
> import twitter4j.json.DataObjectFactory;
> import kafka.serializer.StringDecoder
> import org.apache.spark.streaming.kafka._
> import kafka.javaapi.producer.Producer
> import kafka.producer.{KeyedMessage, ProducerConfig}
> import org.apache.spark._
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.StreamingContext._
>
> object TwitterPopularTags {
> def main(args: Array[String]) {
>
> /** Information necessary for accessing the Twitter API */
> val consumerKey= "2AgtQfH8rlyUDyfjwPOCDosEQ"
> val consumerSecret=
> "vnG8uoaan4gPmoy1rFMbz3i19396jODwGRLRqsHBPTwSlMcUIl"
> val accessToken=
> "33807905-2vYZMjZyRjFJQrrkPVQwmiQcZCnag6m2wKujpiu4e"
> val accessTokenSecret =
> "X880Iq3YseBsAs3e8ZoHSOaDnN431dWJ6QpeMJO6VVAzm"
> val cb = new ConfigurationBuilder()
> cb.setOAuthConsumerKey(consumerKey)
> cb.setOAuthConsumerSecret(consumerSecret)
> cb.setOAuthAccessToken(accessToken)
> cb.setOAuthAccessTokenSecret(accessTokenSecret)
> cb.setJSONStoreEnabled(true)
> cb.setIncludeEntitiesEnabled(true)
> val twitterStream = new
> TwitterStreamFactory(cb.build()).getInstance()
>
> val KafkaTopic = "LiveTweets"
> /* kafka producer properties */
> val kafkaProducer = {
> val props = new Properties()
> props.put("metadata.broker.list",
> "broker2:9092,localhost:9092")
> props.put("serializer.class",
> "kafka.serializer.StringEncoder")
> props.put("request.required.acks", "1")
> val config = new ProducerConfig(props)
> new Producer[String, String](config)
>  }
>
> /* Invoked when a new tweet comes */
> val listener = new StatusListener() {
>
>override def onStatus(status: Status): Unit = {
>val msg = new KeyedMessage[String,
> String](KafkaTopic,DataObjectFactory.getRawJSON(status))
>kafkaProducer.send(msg)
>   }
>override def onException(ex: Exception): Unit = throw ex
>
>   // no-op for the following events
>   override def onStallWarning(warning: StallWarning): Unit
> =
> {}
>   override def onDeletionNotice(statusDeletionNotice:
> StatusDeletionNotice): Unit = {}
>   override def onScrubGeo(userId: Long, upToStatusId:
> Long):
> Unit = {}
>   override def
> onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = {}
> }
>
> twitterStream.addListener(listener)
> // Create Spark Streaming context
> val sparkConf = new SparkConf().setAppName("Twitter-Kafka-Spark
> Streaming")
> val sc = new SparkContext(sparkConf)
> val ssc = new StreamingContext(sc, Seconds(2))
>
> // Define the Kafka parameters, broker list must be specified
> val kafkaParams = Map("metadata.broker.list" ->
> "broker2:9092,localhost:9092")
> val topics = Set(KafkaTopic)
>
> // Create the direct stream with the Kafka parameters and topics
> val kafkaStream = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc,kafkaParams,topics)
> val lines = kafkaStream.map(_._2)
> val words = lines.flatMap(_.split(" "))
> val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
> wordCounts.print()
> ssc.start()

No Twitter Input from Kafka to Spark Streaming

2015-08-04 Thread narendra
My application takes Twitter4j tweets and publishes those to a topic in
Kafka. Spark Streaming subscribes to that topic for processing. But in
actual, Spark Streaming is not able to receive tweet data from Kafka so
Spark Streaming is running empty batch jobs with out input and I am not able
to see any output from Spark Streaming.

The code of the application is - 

import java.util.HashMap
import java.util.Properties
import twitter4j._
import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
import twitter4j.json.DataObjectFactory;
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka._
import kafka.javaapi.producer.Producer
import kafka.producer.{KeyedMessage, ProducerConfig}
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ 

object TwitterPopularTags {
def main(args: Array[String]) {

/** Information necessary for accessing the Twitter API */
val consumerKey= "2AgtQfH8rlyUDyfjwPOCDosEQ"
val consumerSecret=
"vnG8uoaan4gPmoy1rFMbz3i19396jODwGRLRqsHBPTwSlMcUIl"
val accessToken=
"33807905-2vYZMjZyRjFJQrrkPVQwmiQcZCnag6m2wKujpiu4e"
val accessTokenSecret =
"X880Iq3YseBsAs3e8ZoHSOaDnN431dWJ6QpeMJO6VVAzm"
val cb = new ConfigurationBuilder()
cb.setOAuthConsumerKey(consumerKey)
cb.setOAuthConsumerSecret(consumerSecret)
cb.setOAuthAccessToken(accessToken)
cb.setOAuthAccessTokenSecret(accessTokenSecret)
cb.setJSONStoreEnabled(true)
cb.setIncludeEntitiesEnabled(true)
val twitterStream = new
TwitterStreamFactory(cb.build()).getInstance()  

val KafkaTopic = "LiveTweets"
/* kafka producer properties */
val kafkaProducer = {
val props = new Properties()
props.put("metadata.broker.list",
"broker2:9092,localhost:9092")
props.put("serializer.class",
"kafka.serializer.StringEncoder")
props.put("request.required.acks", "1")
val config = new ProducerConfig(props)
new Producer[String, String](config)
 }

/* Invoked when a new tweet comes */
val listener = new StatusListener() { 

   override def onStatus(status: Status): Unit = {
   val msg = new KeyedMessage[String,
String](KafkaTopic,DataObjectFactory.getRawJSON(status))
   kafkaProducer.send(msg)
  }
   override def onException(ex: Exception): Unit = throw ex

  // no-op for the following events
  override def onStallWarning(warning: StallWarning): Unit =
{}
  override def onDeletionNotice(statusDeletionNotice:
StatusDeletionNotice): Unit = {}
  override def onScrubGeo(userId: Long, upToStatusId: Long):
Unit = {}
  override def
onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = {}
}

twitterStream.addListener(listener)
// Create Spark Streaming context
val sparkConf = new SparkConf().setAppName("Twitter-Kafka-Spark
Streaming")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))

// Define the Kafka parameters, broker list must be specified
val kafkaParams = Map("metadata.broker.list" ->
"broker2:9092,localhost:9092")
val topics = Set(KafkaTopic)

// Create the direct stream with the Kafka parameters and topics
val kafkaStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc,kafkaParams,topics)
val lines = kafkaStream.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()

  }
}

Spark Streaming web UI - 

<http://apache-spark-user-list.1001560.n3.nabble.com/file/n24131/streaming.png> 

<http://apache-spark-user-list.1001560.n3.nabble.com/file/n24131/sparkjobs.png> 


Thank you.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-Twitter-Input-from-Kafka-to-Spark-Streaming-tp24131.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org