Re: Spark 1.6.1. How to prevent serialization of KafkaProducer
Thanks brian. This is basically what I have as well, i just posted the same gist pretty much on the first email: .foreachRDD(rdd => { rdd.foreachPartition(part => { val producer: Producer[String, String] = KafkaWriter.createProducer( brokers) part.foreach(item => producer.send(item)) producer.close() }) I had a bug w/ implicits from spark. Not really sure why, but I had built a spark context on a different configuration file and I'm not sure what was passing the wrong spark context. Effectively, everything worked when I tested merging my funcs into one file. Thanks again. On Thu, Apr 21, 2016 at 2:58 PM, Bryan Jeffreywrote: > Here is what we're doing: > > > import java.util.Properties > > import kafka.producer.{KeyedMessage, Producer, ProducerConfig} > import net.liftweb.json.Extraction._ > import net.liftweb.json._ > import org.apache.spark.streaming.dstream.DStream > > class KafkaWriter(brokers: Array[String], topic: String, numPartitions: > Int) { > def write[T](data: DStream[T]): Unit = { > KafkaWriter.write(data, topic, brokers, numPartitions) > } > } > > object KafkaWriter { > def write[T](data: DStream[T], topic: String, brokers: Array[String], > numPartitions: Int): Unit = { > val dataToWrite = > if (numPartitions > 0) { > data.repartition(numPartitions) > } else { > data > } > > dataToWrite > .map(x => new KeyedMessage[String, String](topic, > KafkaWriter.toJson(x))) > .foreachRDD(rdd => { > rdd.foreachPartition(part => { > val producer: Producer[String, String] = > KafkaWriter.createProducer(brokers) > part.foreach(item => producer.send(item)) > producer.close() > }) > }) > } > > def apply(brokers: Option[Array[String]], topic: String, numPartitions: > Int): KafkaWriter = { > val brokersToUse = > brokers match { > case Some(x) => x > case None => throw new IllegalArgumentException("Must specify > brokers!") > } > > new KafkaWriter(brokersToUse, topic, numPartitions) > } > > def toJson[T](data: T): String = { > implicit val formats = DefaultFormats ++ > net.liftweb.json.ext.JodaTimeSerializers.all > compactRender(decompose(data)) > } > > def createProducer(brokers: Array[String]): Producer[String, String] = { > val properties = new Properties() > properties.put("metadata.broker.list", brokers.mkString(",")) > properties.put("serializer.class", "kafka.serializer.StringEncoder") > > val kafkaConfig = new ProducerConfig(properties) > new Producer[String, String](kafkaConfig) > } > } > > > Then just call: > > val kafkaWriter: KafkaWriter = > KafkaWriter(KafkaStreamFactory.getBrokersFromConfig(config), > config.getString(Parameters.topicName), numPartitions = > kafkaWritePartitions) > detectionWriter.write(dataToWriteToKafka) > > > Hope that helps! > > Bryan Jeffrey > > On Thu, Apr 21, 2016 at 2:08 PM, Alexander Gallego > wrote: > >> Thanks Ted. >> >> KafkaWordCount (producer) does not operate on a DStream[T] >> >> ```scala >> >> >> object KafkaWordCountProducer { >> >> def main(args: Array[String]) { >> if (args.length < 4) { >> System.err.println("Usage: KafkaWordCountProducer >> " + >> " ") >> System.exit(1) >> } >> >> val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args >> >> // Zookeeper connection properties >> val props = new HashMap[String, Object]() >> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) >> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, >> "org.apache.kafka.common.serialization.StringSerializer") >> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, >> "org.apache.kafka.common.serialization.StringSerializer") >> >> val producer = new KafkaProducer[String, String](props) >> >> // Send some messages >> while(true) { >> (1 to messagesPerSec.toInt).foreach { messageNum => >> val str = (1 to wordsPerMessage.toInt).map(x => >> scala.util.Random.nextInt(10).toString) >> .mkString(" ") >> >> val message = new ProducerRecord[String, String](topic, null, str) >> producer.send(message) >> } >> >> Thread.sleep(1000) >> } >> } >> >> } >> >> ``` >> >> >> Also, doing: >> >> >> ``` >> object KafkaSink { >> def send(brokers: String, sc: SparkContext, topic: String, key: >> String, value: String) = >> getInstance(brokers, sc).value.send(new ProducerRecord(topic, >> key, value)) >> } >> >> KafkaSink.send(brokers, sparkContext)(outputTopic, record._1, record._2) >> >> ``` >> >> >> Doesn't work either, the result is: >> >> Exception in thread "main" org.apache.spark.SparkException: Task not >> serializable >> >> >> Thanks! >> >> >> >> >> On Thu, Apr 21, 2016 at 1:08 PM, Ted Yu wrote: >> > >> > In
Re: Spark 1.6.1. How to prevent serialization of KafkaProducer
Here is what we're doing: import java.util.Properties import kafka.producer.{KeyedMessage, Producer, ProducerConfig} import net.liftweb.json.Extraction._ import net.liftweb.json._ import org.apache.spark.streaming.dstream.DStream class KafkaWriter(brokers: Array[String], topic: String, numPartitions: Int) { def write[T](data: DStream[T]): Unit = { KafkaWriter.write(data, topic, brokers, numPartitions) } } object KafkaWriter { def write[T](data: DStream[T], topic: String, brokers: Array[String], numPartitions: Int): Unit = { val dataToWrite = if (numPartitions > 0) { data.repartition(numPartitions) } else { data } dataToWrite .map(x => new KeyedMessage[String, String](topic, KafkaWriter.toJson(x))) .foreachRDD(rdd => { rdd.foreachPartition(part => { val producer: Producer[String, String] = KafkaWriter.createProducer(brokers) part.foreach(item => producer.send(item)) producer.close() }) }) } def apply(brokers: Option[Array[String]], topic: String, numPartitions: Int): KafkaWriter = { val brokersToUse = brokers match { case Some(x) => x case None => throw new IllegalArgumentException("Must specify brokers!") } new KafkaWriter(brokersToUse, topic, numPartitions) } def toJson[T](data: T): String = { implicit val formats = DefaultFormats ++ net.liftweb.json.ext.JodaTimeSerializers.all compactRender(decompose(data)) } def createProducer(brokers: Array[String]): Producer[String, String] = { val properties = new Properties() properties.put("metadata.broker.list", brokers.mkString(",")) properties.put("serializer.class", "kafka.serializer.StringEncoder") val kafkaConfig = new ProducerConfig(properties) new Producer[String, String](kafkaConfig) } } Then just call: val kafkaWriter: KafkaWriter = KafkaWriter(KafkaStreamFactory.getBrokersFromConfig(config), config.getString(Parameters.topicName), numPartitions = kafkaWritePartitions) detectionWriter.write(dataToWriteToKafka) Hope that helps! Bryan Jeffrey On Thu, Apr 21, 2016 at 2:08 PM, Alexander Gallegowrote: > Thanks Ted. > > KafkaWordCount (producer) does not operate on a DStream[T] > > ```scala > > > object KafkaWordCountProducer { > > def main(args: Array[String]) { > if (args.length < 4) { > System.err.println("Usage: KafkaWordCountProducer > " + > " ") > System.exit(1) > } > > val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args > > // Zookeeper connection properties > val props = new HashMap[String, Object]() > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) > props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringSerializer") > props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringSerializer") > > val producer = new KafkaProducer[String, String](props) > > // Send some messages > while(true) { > (1 to messagesPerSec.toInt).foreach { messageNum => > val str = (1 to wordsPerMessage.toInt).map(x => > scala.util.Random.nextInt(10).toString) > .mkString(" ") > > val message = new ProducerRecord[String, String](topic, null, str) > producer.send(message) > } > > Thread.sleep(1000) > } > } > > } > > ``` > > > Also, doing: > > > ``` > object KafkaSink { > def send(brokers: String, sc: SparkContext, topic: String, key: > String, value: String) = > getInstance(brokers, sc).value.send(new ProducerRecord(topic, > key, value)) > } > > KafkaSink.send(brokers, sparkContext)(outputTopic, record._1, record._2) > > ``` > > > Doesn't work either, the result is: > > Exception in thread "main" org.apache.spark.SparkException: Task not > serializable > > > Thanks! > > > > > On Thu, Apr 21, 2016 at 1:08 PM, Ted Yu wrote: > > > > In KafkaWordCount , the String is sent back and producer.send() is > called. > > > > I guess if you don't find via solution in your current design, you can > consider the above. > > > > On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallego > wrote: > >> > >> Hello, > >> > >> I understand that you cannot serialize Kafka Producer. > >> > >> So I've tried: > >> > >> (as suggested here > https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html) > > >> > >> - Make the class Serializable - not possible > >> > >> - Declare the instance only within the lambda function passed in map. > >> > >> via: > >> > >> // as suggested by the docs > >> > >> > >> ```scala > >> > >>kafkaOut.foreachRDD(rdd => { > >> rdd.foreachPartition(partition => { > >> val producer = new KafkaProducer(..) > >> partition.foreach { record => > >> producer.send(new
Re: Spark 1.6.1. How to prevent serialization of KafkaProducer
Have you looked at these: http://allegro.tech/2015/08/spark-kafka-integration.html http://mkuthan.github.io/blog/2016/01/29/spark-kafka-integration2/ Full example here: https://github.com/mkuthan/example-spark-kafka HTH. -Todd On Thu, Apr 21, 2016 at 2:08 PM, Alexander Gallegowrote: > Thanks Ted. > > KafkaWordCount (producer) does not operate on a DStream[T] > > ```scala > > > object KafkaWordCountProducer { > > def main(args: Array[String]) { > if (args.length < 4) { > System.err.println("Usage: KafkaWordCountProducer > " + > " ") > System.exit(1) > } > > val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args > > // Zookeeper connection properties > val props = new HashMap[String, Object]() > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) > props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringSerializer") > props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringSerializer") > > val producer = new KafkaProducer[String, String](props) > > // Send some messages > while(true) { > (1 to messagesPerSec.toInt).foreach { messageNum => > val str = (1 to wordsPerMessage.toInt).map(x => > scala.util.Random.nextInt(10).toString) > .mkString(" ") > > val message = new ProducerRecord[String, String](topic, null, str) > producer.send(message) > } > > Thread.sleep(1000) > } > } > > } > > ``` > > > Also, doing: > > > ``` > object KafkaSink { > def send(brokers: String, sc: SparkContext, topic: String, key: > String, value: String) = > getInstance(brokers, sc).value.send(new ProducerRecord(topic, > key, value)) > } > > KafkaSink.send(brokers, sparkContext)(outputTopic, record._1, record._2) > > ``` > > > Doesn't work either, the result is: > > Exception in thread "main" org.apache.spark.SparkException: Task not > serializable > > > Thanks! > > > > > On Thu, Apr 21, 2016 at 1:08 PM, Ted Yu wrote: > > > > In KafkaWordCount , the String is sent back and producer.send() is > called. > > > > I guess if you don't find via solution in your current design, you can > consider the above. > > > > On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallego > wrote: > >> > >> Hello, > >> > >> I understand that you cannot serialize Kafka Producer. > >> > >> So I've tried: > >> > >> (as suggested here > https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html) > > >> > >> - Make the class Serializable - not possible > >> > >> - Declare the instance only within the lambda function passed in map. > >> > >> via: > >> > >> // as suggested by the docs > >> > >> > >> ```scala > >> > >>kafkaOut.foreachRDD(rdd => { > >> rdd.foreachPartition(partition => { > >> val producer = new KafkaProducer(..) > >> partition.foreach { record => > >> producer.send(new ProducerRecord(outputTopic, record._1, > record._2) > >>} > >> producer.close() > >>}) > >> }) // foreachRDD > >> > >> > >> ``` > >> > >> - Make the NotSerializable object as a static and create it once per > machine. > >> > >> via: > >> > >> > >> ```scala > >> > >> > >> object KafkaSink { > >> @volatile private var instance: Broadcast[KafkaProducer[String, > String]] = null > >> def getInstance(brokers: String, sc: SparkContext): > Broadcast[KafkaProducer[String, String]] = { > >> if (instance == null) { > >> synchronized { > >> println("Creating new kafka producer") > >> val props = new java.util.Properties() > >> ... > >> instance = sc.broadcast(new KafkaProducer[String, > String](props)) > >> sys.addShutdownHook { > >> instance.value.close() > >> } > >> } > >> } > >> instance > >> } > >> } > >> > >> > >> ``` > >> > >> > >> > >> - Call rdd.forEachPartition and create the NotSerializable object in > there like this: > >> > >> Same as above. > >> > >> > >> - Mark the instance @transient > >> > >> Same thing, just make it a class variable via: > >> > >> > >> ``` > >> @transient var producer: KakfaProducer[String,String] = null > >> def getInstance() = { > >>if( producer == null ) { > >>producer = new KafkaProducer() > >>} > >>producer > >> } > >> > >> ``` > >> > >> > >> However, I get serialization problems with all of these options. > >> > >> > >> Thanks for your help. > >> > >> - Alex > >> > > > > > > -- > > > > > > Alexander Gallego > Co-Founder & CTO >
Re: Spark 1.6.1. How to prevent serialization of KafkaProducer
Thanks Ted. KafkaWordCount (producer) does not operate on a DStream[T] ```scala object KafkaWordCountProducer { def main(args: Array[String]) { if (args.length < 4) { System.err.println("Usage: KafkaWordCountProducer " + " ") System.exit(1) } val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args // Zookeeper connection properties val props = new HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](props) // Send some messages while(true) { (1 to messagesPerSec.toInt).foreach { messageNum => val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) .mkString(" ") val message = new ProducerRecord[String, String](topic, null, str) producer.send(message) } Thread.sleep(1000) } } } ``` Also, doing: ``` object KafkaSink { def send(brokers: String, sc: SparkContext, topic: String, key: String, value: String) = getInstance(brokers, sc).value.send(new ProducerRecord(topic, key, value)) } KafkaSink.send(brokers, sparkContext)(outputTopic, record._1, record._2) ``` Doesn't work either, the result is: Exception in thread "main" org.apache.spark.SparkException: Task not serializable Thanks! On Thu, Apr 21, 2016 at 1:08 PM, Ted Yuwrote: > > In KafkaWordCount , the String is sent back and producer.send() is called. > > I guess if you don't find via solution in your current design, you can consider the above. > > On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallego wrote: >> >> Hello, >> >> I understand that you cannot serialize Kafka Producer. >> >> So I've tried: >> >> (as suggested here https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html) >> >> - Make the class Serializable - not possible >> >> - Declare the instance only within the lambda function passed in map. >> >> via: >> >> // as suggested by the docs >> >> >> ```scala >> >>kafkaOut.foreachRDD(rdd => { >> rdd.foreachPartition(partition => { >> val producer = new KafkaProducer(..) >> partition.foreach { record => >> producer.send(new ProducerRecord(outputTopic, record._1, record._2) >>} >> producer.close() >>}) >> }) // foreachRDD >> >> >> ``` >> >> - Make the NotSerializable object as a static and create it once per machine. >> >> via: >> >> >> ```scala >> >> >> object KafkaSink { >> @volatile private var instance: Broadcast[KafkaProducer[String, String]] = null >> def getInstance(brokers: String, sc: SparkContext): Broadcast[KafkaProducer[String, String]] = { >> if (instance == null) { >> synchronized { >> println("Creating new kafka producer") >> val props = new java.util.Properties() >> ... >> instance = sc.broadcast(new KafkaProducer[String, String](props)) >> sys.addShutdownHook { >> instance.value.close() >> } >> } >> } >> instance >> } >> } >> >> >> ``` >> >> >> >> - Call rdd.forEachPartition and create the NotSerializable object in there like this: >> >> Same as above. >> >> >> - Mark the instance @transient >> >> Same thing, just make it a class variable via: >> >> >> ``` >> @transient var producer: KakfaProducer[String,String] = null >> def getInstance() = { >>if( producer == null ) { >>producer = new KafkaProducer() >>} >>producer >> } >> >> ``` >> >> >> However, I get serialization problems with all of these options. >> >> >> Thanks for your help. >> >> - Alex >> > -- Alexander Gallego Co-Founder & CTO
Re: Spark 1.6.1. How to prevent serialization of KafkaProducer
In KafkaWordCount , the String is sent back and producer.send() is called. I guess if you don't find via solution in your current design, you can consider the above. On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallegowrote: > Hello, > > I understand that you cannot serialize Kafka Producer. > > So I've tried: > > (as suggested here > https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html > ) > > - Make the class Serializable - not possible > > - Declare the instance only within the lambda function passed in map. > > via: > > // as suggested by the docs > > > ```scala > >kafkaOut.foreachRDD(rdd => { > rdd.foreachPartition(partition => { > val producer = new KafkaProducer(..) > partition.foreach { record => > producer.send(new ProducerRecord(outputTopic, record._1, > record._2) >} > producer.close() >}) > }) // foreachRDD > > > ``` > > - Make the NotSerializable object as a static and create it once per > machine. > > via: > > > ```scala > > > object KafkaSink { > @volatile private var instance: Broadcast[KafkaProducer[String, String]] > = null > def getInstance(brokers: String, sc: SparkContext): > Broadcast[KafkaProducer[String, String]] = { > if (instance == null) { > synchronized { > println("Creating new kafka producer") > val props = new java.util.Properties() > ... > instance = sc.broadcast(new KafkaProducer[String, String](props)) > sys.addShutdownHook { > instance.value.close() > } > } > } > instance > } > } > > > ``` > > > > - Call rdd.forEachPartition and create the NotSerializable object in > there like this: > > Same as above. > > > - Mark the instance @transient > > Same thing, just make it a class variable via: > > > ``` > @transient var producer: KakfaProducer[String,String] = null > def getInstance() = { >if( producer == null ) { >producer = new KafkaProducer() >} >producer > } > > ``` > > > However, I get serialization problems with all of these options. > > > Thanks for your help. > > - Alex > >
Spark 1.6.1. How to prevent serialization of KafkaProducer
Hello, I understand that you cannot serialize Kafka Producer. So I've tried: (as suggested here https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html ) - Make the class Serializable - not possible - Declare the instance only within the lambda function passed in map. via: // as suggested by the docs ```scala kafkaOut.foreachRDD(rdd => { rdd.foreachPartition(partition => { val producer = new KafkaProducer(..) partition.foreach { record => producer.send(new ProducerRecord(outputTopic, record._1, record._2) } producer.close() }) }) // foreachRDD ``` - Make the NotSerializable object as a static and create it once per machine. via: ```scala object KafkaSink { @volatile private var instance: Broadcast[KafkaProducer[String, String]] = null def getInstance(brokers: String, sc: SparkContext): Broadcast[KafkaProducer[String, String]] = { if (instance == null) { synchronized { println("Creating new kafka producer") val props = new java.util.Properties() ... instance = sc.broadcast(new KafkaProducer[String, String](props)) sys.addShutdownHook { instance.value.close() } } } instance } } ``` - Call rdd.forEachPartition and create the NotSerializable object in there like this: Same as above. - Mark the instance @transient Same thing, just make it a class variable via: ``` @transient var producer: KakfaProducer[String,String] = null def getInstance() = { if( producer == null ) { producer = new KafkaProducer() } producer } ``` However, I get serialization problems with all of these options. Thanks for your help. - Alex