Are you having enough messages in kafka to consume? Can you make sure you
kafka setup is working with your console consumer? Also try this example
<https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala>

Thanks
Best Regards

On Mon, Mar 30, 2015 at 11:04 AM, <luohui20...@sina.com> wrote:

> Hi guys,
>
>           I am using SparkStreaming to receive message from kafka,process
> it and then send back to kafka. however ,kafka consumer can not receive any
> messages. Any one share ideas?
>
>
>
> here is my code:
>
>
>
> object SparkStreamingSampleDirectApproach {
>   def main(args: Array[String]): Unit = {
>     Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
>     Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
>
>
>     val Array(brokers, topics) = Array("localhost:9092", "topic1")
>     val conf = new
> SparkConf().setMaster("local[2]").setAppName("SparkStreamingSampleDirectApproach").set("log4j.rootCategory",
> "WARN, console")
>     val ssc = new StreamingContext(conf, Seconds(1))
>
>     val topicsSet = topics.split(",").toSet
>     val kafkaParams = Map[String, String]("metadata.broker.list" ->
> brokers)
>     val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
> //    messages.saveAsTextFiles("hdfs://localhost:8020/spark/data", "test")
>     val lines = messages.map(_._2)
>     val words = lines.flatMap(_.split(" "))
>     val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
>     wordCounts.print()
>
>     val Array(brokers2, topic2) = Array("localhost:9092", "topic2")
>     val props = new Properties()
>     props.put("metadata.broker.list", brokers)
>     props.put("serializer.class", "kafka.serializer.StringEncoder")
>
>     val config = new ProducerConfig(props)
>     val producer = new Producer[String, String](config)
>
> //    val messages2 = messages.map{line =>
> //      new KeyedMessage[String, String](topic2,wordCounts.toString())
> //    }.toArray
>
>     val messages2 = new KeyedMessage[String,
> String](topic2,messages.toString())
>     println(messages2)
>
>     producer.send(messages2)
>
>     ssc.start()
>     ssc.awaitTermination()
>   }
> }
>
> --------------------------------
>
> Thanks&amp;Best regards!
> 罗辉 San.Luo
>

Reply via email to