Re: Spark 1.6.1. How to prevent serialization of KafkaProducer
Thanks brian. This is basically what I have as well, i just posted the same gist pretty much on the first email: .foreachRDD(rdd => { rdd.foreachPartition(part => { val producer: Producer[String, String] = KafkaWriter.createProducer( brokers) part.foreach(item => producer.send(item)) producer.close() }) I had a bug w/ implicits from spark. Not really sure why, but I had built a spark context on a different configuration file and I'm not sure what was passing the wrong spark context. Effectively, everything worked when I tested merging my funcs into one file. Thanks again. On Thu, Apr 21, 2016 at 2:58 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote: > Here is what we're doing: > > > import java.util.Properties > > import kafka.producer.{KeyedMessage, Producer, ProducerConfig} > import net.liftweb.json.Extraction._ > import net.liftweb.json._ > import org.apache.spark.streaming.dstream.DStream > > class KafkaWriter(brokers: Array[String], topic: String, numPartitions: > Int) { > def write[T](data: DStream[T]): Unit = { > KafkaWriter.write(data, topic, brokers, numPartitions) > } > } > > object KafkaWriter { > def write[T](data: DStream[T], topic: String, brokers: Array[String], > numPartitions: Int): Unit = { > val dataToWrite = > if (numPartitions > 0) { > data.repartition(numPartitions) > } else { > data > } > > dataToWrite > .map(x => new KeyedMessage[String, String](topic, > KafkaWriter.toJson(x))) > .foreachRDD(rdd => { > rdd.foreachPartition(part => { > val producer: Producer[String, String] = > KafkaWriter.createProducer(brokers) > part.foreach(item => producer.send(item)) > producer.close() > }) > }) > } > > def apply(brokers: Option[Array[String]], topic: String, numPartitions: > Int): KafkaWriter = { > val brokersToUse = > brokers match { > case Some(x) => x > case None => throw new IllegalArgumentException("Must specify > brokers!") > } > > new KafkaWriter(brokersToUse, topic, numPartitions) > } > > def toJson[T](data: T): String = { > implicit val formats = DefaultFormats ++ > net.liftweb.json.ext.JodaTimeSerializers.all > compactRender(decompose(data)) > } > > def createProducer(brokers: Array[String]): Producer[String, String] = { > val properties = new Properties() > properties.put("metadata.broker.list", brokers.mkString(",")) > properties.put("serializer.class", "kafka.serializer.StringEncoder") > > val kafkaConfig = new ProducerConfig(properties) > new Producer[String, String](kafkaConfig) > } > } > > > Then just call: > > val kafkaWriter: KafkaWriter = > KafkaWriter(KafkaStreamFactory.getBrokersFromConfig(config), > config.getString(Parameters.topicName), numPartitions = > kafkaWritePartitions) > detectionWriter.write(dataToWriteToKafka) > > > Hope that helps! > > Bryan Jeffrey > > On Thu, Apr 21, 2016 at 2:08 PM, Alexander Gallego <agall...@concord.io> > wrote: > >> Thanks Ted. >> >> KafkaWordCount (producer) does not operate on a DStream[T] >> >> ```scala >> >> >> object KafkaWordCountProducer { >> >> def main(args: Array[String]) { >> if (args.length < 4) { >> System.err.println("Usage: KafkaWordCountProducer >> " + >> " ") >> System.exit(1) >> } >> >> val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args >> >> // Zookeeper connection properties >> val props = new HashMap[String, Object]() >> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) >> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, >> "org.apache.kafka.common.serialization.StringSerializer") >> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, >> "org.apache.kafka.common.serialization.StringSerializer") >> >> val producer = new KafkaProducer[String, String](props) >> >> // Send some messages >> while(true) { >> (1 to messagesPerSec.toInt).foreach { messageNum => >> val str = (1 to wordsPerMessage.toInt).map(x => >> scala.util.Random.nextInt(10).toString) >> .mkString(" ") >> >> val message = new ProducerRecord[String, String](topic, null, str) >> producer.send(message) >> } >
Re: Spark 1.6.1. How to prevent serialization of KafkaProducer
Thanks Ted. KafkaWordCount (producer) does not operate on a DStream[T] ```scala object KafkaWordCountProducer { def main(args: Array[String]) { if (args.length < 4) { System.err.println("Usage: KafkaWordCountProducer " + " ") System.exit(1) } val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args // Zookeeper connection properties val props = new HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](props) // Send some messages while(true) { (1 to messagesPerSec.toInt).foreach { messageNum => val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) .mkString(" ") val message = new ProducerRecord[String, String](topic, null, str) producer.send(message) } Thread.sleep(1000) } } } ``` Also, doing: ``` object KafkaSink { def send(brokers: String, sc: SparkContext, topic: String, key: String, value: String) = getInstance(brokers, sc).value.send(new ProducerRecord(topic, key, value)) } KafkaSink.send(brokers, sparkContext)(outputTopic, record._1, record._2) ``` Doesn't work either, the result is: Exception in thread "main" org.apache.spark.SparkException: Task not serializable Thanks! On Thu, Apr 21, 2016 at 1:08 PM, Ted Yu <yuzhih...@gmail.com> wrote: > > In KafkaWordCount , the String is sent back and producer.send() is called. > > I guess if you don't find via solution in your current design, you can consider the above. > > On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallego <agall...@concord.io> wrote: >> >> Hello, >> >> I understand that you cannot serialize Kafka Producer. >> >> So I've tried: >> >> (as suggested here https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html) >> >> - Make the class Serializable - not possible >> >> - Declare the instance only within the lambda function passed in map. >> >> via: >> >> // as suggested by the docs >> >> >> ```scala >> >>kafkaOut.foreachRDD(rdd => { >> rdd.foreachPartition(partition => { >> val producer = new KafkaProducer(..) >> partition.foreach { record => >> producer.send(new ProducerRecord(outputTopic, record._1, record._2) >>} >> producer.close() >>}) >> }) // foreachRDD >> >> >> ``` >> >> - Make the NotSerializable object as a static and create it once per machine. >> >> via: >> >> >> ```scala >> >> >> object KafkaSink { >> @volatile private var instance: Broadcast[KafkaProducer[String, String]] = null >> def getInstance(brokers: String, sc: SparkContext): Broadcast[KafkaProducer[String, String]] = { >> if (instance == null) { >> synchronized { >> println("Creating new kafka producer") >> val props = new java.util.Properties() >> ... >> instance = sc.broadcast(new KafkaProducer[String, String](props)) >> sys.addShutdownHook { >> instance.value.close() >> } >> } >> } >> instance >> } >> } >> >> >> ``` >> >> >> >> - Call rdd.forEachPartition and create the NotSerializable object in there like this: >> >> Same as above. >> >> >> - Mark the instance @transient >> >> Same thing, just make it a class variable via: >> >> >> ``` >> @transient var producer: KakfaProducer[String,String] = null >> def getInstance() = { >>if( producer == null ) { >>producer = new KafkaProducer() >>} >>producer >> } >> >> ``` >> >> >> However, I get serialization problems with all of these options. >> >> >> Thanks for your help. >> >> - Alex >> > -- Alexander Gallego Co-Founder & CTO
Spark 1.6.1. How to prevent serialization of KafkaProducer
Hello, I understand that you cannot serialize Kafka Producer. So I've tried: (as suggested here https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html ) - Make the class Serializable - not possible - Declare the instance only within the lambda function passed in map. via: // as suggested by the docs ```scala kafkaOut.foreachRDD(rdd => { rdd.foreachPartition(partition => { val producer = new KafkaProducer(..) partition.foreach { record => producer.send(new ProducerRecord(outputTopic, record._1, record._2) } producer.close() }) }) // foreachRDD ``` - Make the NotSerializable object as a static and create it once per machine. via: ```scala object KafkaSink { @volatile private var instance: Broadcast[KafkaProducer[String, String]] = null def getInstance(brokers: String, sc: SparkContext): Broadcast[KafkaProducer[String, String]] = { if (instance == null) { synchronized { println("Creating new kafka producer") val props = new java.util.Properties() ... instance = sc.broadcast(new KafkaProducer[String, String](props)) sys.addShutdownHook { instance.value.close() } } } instance } } ``` - Call rdd.forEachPartition and create the NotSerializable object in there like this: Same as above. - Mark the instance @transient Same thing, just make it a class variable via: ``` @transient var producer: KakfaProducer[String,String] = null def getInstance() = { if( producer == null ) { producer = new KafkaProducer() } producer } ``` However, I get serialization problems with all of these options. Thanks for your help. - Alex