This warning is not related to "--from-beginning". It means there's no new
data for current partition in current batch duration, it is acceptable. If
you pushing the data into Kafka again, this warning log will be disappeared.

Thanks
Saisai

2015-03-30 16:58 GMT+08:00 <luohui20...@sina.com>:

> BTW, what's the matter about below warning? Not quite clear about KafkaRDD
>
>
>
> WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending
> offset skipping topic1 0.
>
>
>
>
>
> does this warning occurs relative with my starting consumer without
> "--from-beginning" param?
>
>
> --------------------------------
>
> Thanks&amp;Best regards!
> 罗辉 San.Luo
>
>  ----- 原始邮件 -----
> 发件人:<luohui20...@sina.com>
> 收件人:"Saisai Shao" <sai.sai.s...@gmail.com>
> 抄送人:"user" <user@spark.apache.org>
> 主题:回复:Re: Re: How SparkStreaming output messages to Kafka?
> 日期:2015年03月30日 16点46分
>
> Hi Saisai,
>
>           following your advice, i modified my code like below, using
> Singleton:
>
>
>
> import kafka.serializer.StringDecoder
> import java.util.Properties
> import kafka.producer._
>
> object MyKafkaProducerSingleton {
>   val Array(brokers2, topic2) = Array("localhost:9092", "topic2")
>   val props = new Properties()
>   props.put("metadata.broker.list", brokers2)
>   props.put("serializer.class", "kafka.serializer.StringEncoder")
>
>   val config = new ProducerConfig(props)
>   val myproducer = new Producer[String, String](config)
>   def send(messages: KeyedMessage[String,String]){
>     myproducer.send(messages)
>   }
> }
>
>
>
>
>
> object SparkStreamingSampleDirectApproach2 {
>   def main(args: Array[String]): Unit = {
>     Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
>     Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
>
>     val Array(brokers, topics) = Array("localhost:9092", "topic1")
>     val conf = new
> SparkConf().setMaster("local[2]").setAppName("SparkStreamingSampleDirectApproach").set("log4j.rootCategory",
> "WARN, console")
>     val ssc = new StreamingContext(conf, Seconds(1))
>
>     val topicsSet = topics.split(",").toSet
>     val kafkaParams = Map[String, String]("metadata.broker.list" ->
> brokers)
>     val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
>     //    messages.saveAsTextFiles("hdfs://localhost:8020/spark/data",
> "test")
>     //    val lines = messages.map(_._2)
>     //    val words = lines.flatMap(_.split(" "))
>     //    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
>     //    wordCounts.print()
>
> //    val Array(brokers2, topic2) = Array("localhost:9092", "topic2")
> //    val props = new Properties()
> //    props.put("metadata.broker.list", brokers2)
> //    props.put("serializer.class", "kafka.serializer.StringEncoder")
>
> /*    val messages2 = messages.foreachRDD { rdd =>
>       rdd.foreachPartition { record =>
>         val config = new ProducerConfig(props)
>         val producer = new Producer[String, String](config)
>         record.foreach { piece =>
>           val msg = new KeyedMessage[String, String](topic2, piece._2)
>           producer.send(msg)
>         }
>       }
>     }
> */
>     val messages2 = messages.foreachRDD { rdd =>
>       rdd.foreachPartition { record =>
>         record.foreach { piece =>
>           val msg = new KeyedMessage[String, String]("topic2", piece._2)
>           MyKafkaProducerSingleton.send(msg)
>         }
>       }
>     }
>
>     ssc.start()
>     ssc.awaitTermination()
>   }
> }
>
>
> --------------------------------
>
> Thanks&amp;Best regards!
> 罗辉 San.Luo
>
>  ----- 原始邮件 -----
> 发件人:Saisai Shao <sai.sai.s...@gmail.com>
> 收件人:罗辉 <luohui20...@sina.com>
> 抄送人:user <user@spark.apache.org>
> 主题:Re: Re: How SparkStreaming output messages to Kafka?
> 日期:2015年03月30日 15点17分
>
> Also you could use Producer singletion to improve the performance, since
> now you have to create a Producer for each partition in each batch
> duration, you could create a singleton object and reuse it (Producer is
> tread safe as I know).
>
> -Jerry
>
> 2015-03-30 15:13 GMT+08:00 Saisai Shao <sai.sai.s...@gmail.com>:
>
> Yeah, after review again about your code, the reason why you cannot
> receive any data is that your previous code lacks ACTION function of
> DStream, so the code actually doesn't execute, after you changing to the
> style as I mentioned, `foreachRDD` will trigger and run the jobs as you
> wrote.
>
> Yes, your understanding is correct.
>
> Thanks
> Jerry
>
>
>
> 2015-03-30 14:58 GMT+08:00 <luohui20...@sina.com>:
>
>
>
>
>
> To Saisai:
>
>         it works after I correct some of them with your advices like below:
>
>         Furthermore, I am not quite clear about which code running on
> driver and which code running on executor, so i wrote my understanding in
> comment. would you help check?  Thank you.
>
>
>
> To akhil:
>
>       yes, kafka has enough messages.I tested it with kafka producer
> sending scala Random Int ,it also works. thanks.
>
>
>
> object SparkStreamingSampleDirectApproach2 {
>   def main(args: Array[String]): Unit = {
>     Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
>     Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
>
>     val Array(brokers, topics) = Array("localhost:9092", "topic1")
>     val conf = new
> SparkConf().setMaster("local[2]").setAppName("SparkStreamingSampleDirectApproach").set("log4j.rootCategory",
> "WARN, console")
>     val ssc = new StreamingContext(conf, Seconds(1))
>
>     val topicsSet = topics.split(",").toSet
>     val kafkaParams = Map[String, String]("metadata.broker.list" ->
> brokers)
>     val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
>     //    messages.saveAsTextFiles("hdfs://localhost:8020/spark/data",
> "test")
> //    val lines = messages.map(_._2)
> //    val words = lines.flatMap(_.split(" "))
> //    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
> //    wordCounts.print()
>
>     val Array(brokers2, topic2) = Array("localhost:9092", "topic2")
>     val props = new Properties()
>     props.put("metadata.broker.list", brokers2)
>     props.put("serializer.class", "kafka.serializer.StringEncoder")
>
>     val messages2 = messages.foreachRDD { rdd =>
> //above code running on driver
>       rdd.foreachPartition { record
> =>                                                    //code from here
> running on executor
>         val config = new ProducerConfig(props)
>         val producer = new Producer[String, String](config)
>         record.foreach { piece =>
>           val msg = new KeyedMessage[String, String](topic2, piece._2)
>           producer.send(msg)
>         }
>
>       }
>     }
>
>     //    val messages2 = new KeyedMessage[String,
> String](topic2,messages.toString())
>     //    println(messages2)
>
>
> ssc.start()
> //again, running on driver
>     ssc.awaitTermination()
>   }
> }
>
>
>
>
>
>
> --------------------------------
>
> Thanks&amp;Best regards!
> 罗辉 San.Luo
>
>  ----- 原始邮件 -----
> 发件人:Saisai Shao <sai.sai.s...@gmail.com>
> 收件人:luohui20...@sina.com
> 抄送人:user <user@spark.apache.org>
> 主题:Re: How SparkStreaming output messages to Kafka?
> 日期:2015年03月30日 14点03分
>
> Hi Hui,
>
> Did you try the direct Kafka stream example under Spark Streaming's
> examples? Does it still fail to receive the message? Also would you please
> check all the setups including Kafka, test with Kafka console consumer to
> see if Kafka is OK.
>
> Besides seeing from your code, there's some problems in your code, here:
>
>     val messages2 = new KeyedMessage[String, String](topic2,messages.
> toString())
>     println(messages2)
>
>     producer.send(messages2)
>
> This code snippets are not lazily evaluated, this will be executed ONLY
> ONCE when running to here, so actually you may not write the data into the
> Kafka, you need to write like this:
>
> messages.foreachRDD { r =>
>      r.foreachPartition{ iter =>
>          // create Producer
>          // change this partition of data (iter) into keyedMessage and
> write into Kafka.
>         }
> }
>
> This is the basic style, sorry for any missing parts and typos, also pay a
> attention to serialization issue when you need to create executors on
> remote side. Please take a try again.
>
> Thanks
> Jerry
>
>
>
> 2015-03-30 13:34 GMT+08:00 <luohui20...@sina.com>:
>
>  Hi guys,
>
>           I am using SparkStreaming to receive message from kafka,process
> it and then send back to kafka. however ,kafka consumer can not receive any
> messages. Any one share ideas?
>
>
>
> here is my code:
>
>
>
> object SparkStreamingSampleDirectApproach {
>   def main(args: Array[String]): Unit = {
>     Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
>     Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
>
>
>     val Array(brokers, topics) = Array("localhost:9092", "topic1")
>     val conf = new
> SparkConf().setMaster("local[2]").setAppName("SparkStreamingSampleDirectApproach").set("log4j.rootCategory",
> "WARN, console")
>     val ssc = new StreamingContext(conf, Seconds(1))
>
>     val topicsSet = topics.split(",").toSet
>     val kafkaParams = Map[String, String]("metadata.broker.list" ->
> brokers)
>     val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
> //    messages.saveAsTextFiles("hdfs://localhost:8020/spark/data", "test")
>     val lines = messages.map(_._2)
>     val words = lines.flatMap(_.split(" "))
>     val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
>     wordCounts.print()
>
>     val Array(brokers2, topic2) = Array("localhost:9092", "topic2")
>     val props = new Properties()
>     props.put("metadata.broker.list", brokers)
>     props.put("serializer.class", "kafka.serializer.StringEncoder")
>
>     val config = new ProducerConfig(props)
>     val producer = new Producer[String, String](config)
>
> //    val messages2 = messages.map{line =>
> //      new KeyedMessage[String, String](topic2,wordCounts.toString())
> //    }.toArray
>
>     val messages2 = new KeyedMessage[String,
> String](topic2,messages.toString())
>     println(messages2)
>
>     producer.send(messages2)
>
>     ssc.start()
>     ssc.awaitTermination()
>   }
> }
>
> --------------------------------
>
> Thanks&amp;Best regards!
> 罗辉 San.Luo
>
>
>
>
>

Reply via email to