Re: Spark 1.6.1. How to prevent serialization of KafkaProducer

2016-04-22 Thread Alexander Gallego
Thanks brian.

This is basically what I have as well, i just posted the same gist pretty
much on the first email:

  .foreachRDD(rdd => {
  rdd.foreachPartition(part => {
val producer: Producer[String, String] = KafkaWriter.createProducer(
brokers)
part.foreach(item => producer.send(item))
producer.close()
  })


I had a bug w/ implicits from spark. Not really sure why, but I had built a
spark context on a different configuration file and I'm not sure what was
passing the wrong spark context.

Effectively, everything worked when I tested merging my funcs into one file.

Thanks again.

On Thu, Apr 21, 2016 at 2:58 PM, Bryan Jeffrey <bryan.jeff...@gmail.com>
wrote:

> Here is what we're doing:
>
>
> import java.util.Properties
>
> import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
> import net.liftweb.json.Extraction._
> import net.liftweb.json._
> import org.apache.spark.streaming.dstream.DStream
>
> class KafkaWriter(brokers: Array[String], topic: String, numPartitions:
> Int) {
>   def write[T](data: DStream[T]): Unit = {
> KafkaWriter.write(data, topic, brokers, numPartitions)
>   }
> }
>
> object KafkaWriter {
>   def write[T](data: DStream[T], topic: String, brokers: Array[String],
> numPartitions: Int): Unit = {
> val dataToWrite =
>   if (numPartitions > 0) {
> data.repartition(numPartitions)
>   } else {
> data
>   }
>
> dataToWrite
>   .map(x => new KeyedMessage[String, String](topic,
> KafkaWriter.toJson(x)))
>   .foreachRDD(rdd => {
>   rdd.foreachPartition(part => {
> val producer: Producer[String, String] =
> KafkaWriter.createProducer(brokers)
> part.foreach(item => producer.send(item))
> producer.close()
>   })
> })
>   }
>
>   def apply(brokers: Option[Array[String]], topic: String, numPartitions:
> Int): KafkaWriter = {
> val brokersToUse =
>   brokers match {
> case Some(x) => x
> case None => throw new IllegalArgumentException("Must specify
> brokers!")
>   }
>
> new KafkaWriter(brokersToUse, topic, numPartitions)
>   }
>
>   def toJson[T](data: T): String = {
> implicit val formats = DefaultFormats ++
> net.liftweb.json.ext.JodaTimeSerializers.all
> compactRender(decompose(data))
>   }
>
>   def createProducer(brokers: Array[String]): Producer[String, String] = {
> val properties = new Properties()
> properties.put("metadata.broker.list", brokers.mkString(","))
> properties.put("serializer.class", "kafka.serializer.StringEncoder")
>
> val kafkaConfig = new ProducerConfig(properties)
> new Producer[String, String](kafkaConfig)
>   }
> }
>
>
> Then just call:
>
> val kafkaWriter: KafkaWriter =
> KafkaWriter(KafkaStreamFactory.getBrokersFromConfig(config),
> config.getString(Parameters.topicName), numPartitions =
> kafkaWritePartitions)
> detectionWriter.write(dataToWriteToKafka)
>
>
> Hope that helps!
>
> Bryan Jeffrey
>
> On Thu, Apr 21, 2016 at 2:08 PM, Alexander Gallego <agall...@concord.io>
> wrote:
>
>> Thanks Ted.
>>
>>  KafkaWordCount (producer) does not operate on a DStream[T]
>>
>> ```scala
>>
>>
>> object KafkaWordCountProducer {
>>
>>   def main(args: Array[String]) {
>> if (args.length < 4) {
>>   System.err.println("Usage: KafkaWordCountProducer
>>   " +
>> " ")
>>   System.exit(1)
>> }
>>
>> val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
>>
>> // Zookeeper connection properties
>> val props = new HashMap[String, Object]()
>> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
>> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>>   "org.apache.kafka.common.serialization.StringSerializer")
>> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>>   "org.apache.kafka.common.serialization.StringSerializer")
>>
>> val producer = new KafkaProducer[String, String](props)
>>
>> // Send some messages
>> while(true) {
>>   (1 to messagesPerSec.toInt).foreach { messageNum =>
>> val str = (1 to wordsPerMessage.toInt).map(x =>
>> scala.util.Random.nextInt(10).toString)
>>   .mkString(" ")
>>
>> val message = new ProducerRecord[String, String](topic, null, str)
>> producer.send(message)
>>   }
>

Re: Spark 1.6.1. How to prevent serialization of KafkaProducer

2016-04-21 Thread Alexander Gallego
Thanks Ted.

 KafkaWordCount (producer) does not operate on a DStream[T]

```scala


object KafkaWordCountProducer {

  def main(args: Array[String]) {
if (args.length < 4) {
  System.err.println("Usage: KafkaWordCountProducer
  " +
" ")
  System.exit(1)
}

val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

// Zookeeper connection properties
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")

val producer = new KafkaProducer[String, String](props)

// Send some messages
while(true) {
  (1 to messagesPerSec.toInt).foreach { messageNum =>
val str = (1 to wordsPerMessage.toInt).map(x =>
scala.util.Random.nextInt(10).toString)
  .mkString(" ")

val message = new ProducerRecord[String, String](topic, null, str)
producer.send(message)
  }

  Thread.sleep(1000)
}
  }

}

```


Also, doing:


```
object KafkaSink {
 def send(brokers: String, sc: SparkContext, topic: String, key:
String, value: String) =
getInstance(brokers, sc).value.send(new ProducerRecord(topic,
key, value))
}

KafkaSink.send(brokers, sparkContext)(outputTopic, record._1, record._2)

```


Doesn't work either, the result is:

Exception in thread "main" org.apache.spark.SparkException: Task not
serializable


Thanks!



On Thu, Apr 21, 2016 at 1:08 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
> In KafkaWordCount , the String is sent back and producer.send() is called.
>
> I guess if you don't find via solution in your current design, you can
consider the above.
>
> On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallego <agall...@concord.io>
wrote:
>>
>> Hello,
>>
>> I understand that you cannot serialize Kafka Producer.
>>
>> So I've tried:
>>
>> (as suggested here
https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html)

>>
>>  - Make the class Serializable - not possible
>>
>>  - Declare the instance only within the lambda function passed in map.
>>
>> via:
>>
>> // as suggested by the docs
>>
>>
>> ```scala
>>
>>kafkaOut.foreachRDD(rdd => {
>>  rdd.foreachPartition(partition => {
>>   val producer = new KafkaProducer(..)
>>   partition.foreach { record =>
>>   producer.send(new ProducerRecord(outputTopic, record._1,
record._2)
>>}
>>   producer.close()
>>})
>>  }) // foreachRDD
>>
>>
>> ```
>>
>> - Make the NotSerializable object as a static and create it once per
machine.
>>
>> via:
>>
>>
>> ```scala
>>
>>
>> object KafkaSink {
>>   @volatile private var instance: Broadcast[KafkaProducer[String,
String]] = null
>>   def getInstance(brokers: String, sc: SparkContext):
Broadcast[KafkaProducer[String, String]] = {
>> if (instance == null) {
>>   synchronized {
>> println("Creating new kafka producer")
>> val props = new java.util.Properties()
>> ...
>> instance = sc.broadcast(new KafkaProducer[String, String](props))
>> sys.addShutdownHook {
>>   instance.value.close()
>> }
>>   }
>> }
>> instance
>>   }
>> }
>>
>>
>> ```
>>
>>
>>
>>  - Call rdd.forEachPartition and create the NotSerializable object in
there like this:
>>
>> Same as above.
>>
>>
>> - Mark the instance @transient
>>
>> Same thing, just make it a class variable via:
>>
>>
>> ```
>> @transient var producer: KakfaProducer[String,String] = null
>> def getInstance() = {
>>if( producer == null ) {
>>producer = new KafkaProducer()
>>}
>>producer
>> }
>>
>> ```
>>
>>
>> However, I get serialization problems with all of these options.
>>
>>
>> Thanks for your help.
>>
>> - Alex
>>
>



--





Alexander Gallego
Co-Founder & CTO


Spark 1.6.1. How to prevent serialization of KafkaProducer

2016-04-21 Thread Alexander Gallego
Hello,

I understand that you cannot serialize Kafka Producer.

So I've tried:

(as suggested here
https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html
)

 - Make the class Serializable - not possible

 - Declare the instance only within the lambda function passed in map.

via:

// as suggested by the docs


```scala

   kafkaOut.foreachRDD(rdd => {
 rdd.foreachPartition(partition => {
  val producer = new KafkaProducer(..)
  partition.foreach { record =>
  producer.send(new ProducerRecord(outputTopic, record._1,
record._2)
   }
  producer.close()
   })
 }) // foreachRDD


```

- Make the NotSerializable object as a static and create it once per
machine.

via:


```scala


object KafkaSink {
  @volatile private var instance: Broadcast[KafkaProducer[String, String]]
= null
  def getInstance(brokers: String, sc: SparkContext):
Broadcast[KafkaProducer[String, String]] = {
if (instance == null) {
  synchronized {
println("Creating new kafka producer")
val props = new java.util.Properties()
...
instance = sc.broadcast(new KafkaProducer[String, String](props))
sys.addShutdownHook {
  instance.value.close()
}
  }
}
instance
  }
}


```



 - Call rdd.forEachPartition and create the NotSerializable object in there
like this:

Same as above.


- Mark the instance @transient

Same thing, just make it a class variable via:


```
@transient var producer: KakfaProducer[String,String] = null
def getInstance() = {
   if( producer == null ) {
   producer = new KafkaProducer()
   }
   producer
}

```


However, I get serialization problems with all of these options.


Thanks for your help.

- Alex