Re: Spark 1.6.1. How to prevent serialization of KafkaProducer

2016-04-22 Thread Alexander Gallego
Thanks brian.

This is basically what I have as well, i just posted the same gist pretty
much on the first email:

  .foreachRDD(rdd => {
  rdd.foreachPartition(part => {
val producer: Producer[String, String] = KafkaWriter.createProducer(
brokers)
part.foreach(item => producer.send(item))
producer.close()
  })


I had a bug w/ implicits from spark. Not really sure why, but I had built a
spark context on a different configuration file and I'm not sure what was
passing the wrong spark context.

Effectively, everything worked when I tested merging my funcs into one file.

Thanks again.

On Thu, Apr 21, 2016 at 2:58 PM, Bryan Jeffrey 
wrote:

> Here is what we're doing:
>
>
> import java.util.Properties
>
> import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
> import net.liftweb.json.Extraction._
> import net.liftweb.json._
> import org.apache.spark.streaming.dstream.DStream
>
> class KafkaWriter(brokers: Array[String], topic: String, numPartitions:
> Int) {
>   def write[T](data: DStream[T]): Unit = {
> KafkaWriter.write(data, topic, brokers, numPartitions)
>   }
> }
>
> object KafkaWriter {
>   def write[T](data: DStream[T], topic: String, brokers: Array[String],
> numPartitions: Int): Unit = {
> val dataToWrite =
>   if (numPartitions > 0) {
> data.repartition(numPartitions)
>   } else {
> data
>   }
>
> dataToWrite
>   .map(x => new KeyedMessage[String, String](topic,
> KafkaWriter.toJson(x)))
>   .foreachRDD(rdd => {
>   rdd.foreachPartition(part => {
> val producer: Producer[String, String] =
> KafkaWriter.createProducer(brokers)
> part.foreach(item => producer.send(item))
> producer.close()
>   })
> })
>   }
>
>   def apply(brokers: Option[Array[String]], topic: String, numPartitions:
> Int): KafkaWriter = {
> val brokersToUse =
>   brokers match {
> case Some(x) => x
> case None => throw new IllegalArgumentException("Must specify
> brokers!")
>   }
>
> new KafkaWriter(brokersToUse, topic, numPartitions)
>   }
>
>   def toJson[T](data: T): String = {
> implicit val formats = DefaultFormats ++
> net.liftweb.json.ext.JodaTimeSerializers.all
> compactRender(decompose(data))
>   }
>
>   def createProducer(brokers: Array[String]): Producer[String, String] = {
> val properties = new Properties()
> properties.put("metadata.broker.list", brokers.mkString(","))
> properties.put("serializer.class", "kafka.serializer.StringEncoder")
>
> val kafkaConfig = new ProducerConfig(properties)
> new Producer[String, String](kafkaConfig)
>   }
> }
>
>
> Then just call:
>
> val kafkaWriter: KafkaWriter =
> KafkaWriter(KafkaStreamFactory.getBrokersFromConfig(config),
> config.getString(Parameters.topicName), numPartitions =
> kafkaWritePartitions)
> detectionWriter.write(dataToWriteToKafka)
>
>
> Hope that helps!
>
> Bryan Jeffrey
>
> On Thu, Apr 21, 2016 at 2:08 PM, Alexander Gallego 
> wrote:
>
>> Thanks Ted.
>>
>>  KafkaWordCount (producer) does not operate on a DStream[T]
>>
>> ```scala
>>
>>
>> object KafkaWordCountProducer {
>>
>>   def main(args: Array[String]) {
>> if (args.length < 4) {
>>   System.err.println("Usage: KafkaWordCountProducer
>>   " +
>> " ")
>>   System.exit(1)
>> }
>>
>> val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
>>
>> // Zookeeper connection properties
>> val props = new HashMap[String, Object]()
>> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
>> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>>   "org.apache.kafka.common.serialization.StringSerializer")
>> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>>   "org.apache.kafka.common.serialization.StringSerializer")
>>
>> val producer = new KafkaProducer[String, String](props)
>>
>> // Send some messages
>> while(true) {
>>   (1 to messagesPerSec.toInt).foreach { messageNum =>
>> val str = (1 to wordsPerMessage.toInt).map(x =>
>> scala.util.Random.nextInt(10).toString)
>>   .mkString(" ")
>>
>> val message = new ProducerRecord[String, String](topic, null, str)
>> producer.send(message)
>>   }
>>
>>   Thread.sleep(1000)
>> }
>>   }
>>
>> }
>>
>> ```
>>
>>
>> Also, doing:
>>
>>
>> ```
>> object KafkaSink {
>>  def send(brokers: String, sc: SparkContext, topic: String, key:
>> String, value: String) =
>> getInstance(brokers, sc).value.send(new ProducerRecord(topic,
>> key, value))
>> }
>>
>> KafkaSink.send(brokers, sparkContext)(outputTopic, record._1, record._2)
>>
>> ```
>>
>>
>> Doesn't work either, the result is:
>>
>> Exception in thread "main" org.apache.spark.SparkException: Task not
>> serializable
>>
>>
>> Thanks!
>>
>>
>>
>>
>> On Thu, Apr 21, 2016 at 1:08 PM, Ted Yu  wrote:
>> >
>> > In 

Re: Spark 1.6.1. How to prevent serialization of KafkaProducer

2016-04-21 Thread Bryan Jeffrey
Here is what we're doing:


import java.util.Properties

import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import net.liftweb.json.Extraction._
import net.liftweb.json._
import org.apache.spark.streaming.dstream.DStream

class KafkaWriter(brokers: Array[String], topic: String, numPartitions:
Int) {
  def write[T](data: DStream[T]): Unit = {
KafkaWriter.write(data, topic, brokers, numPartitions)
  }
}

object KafkaWriter {
  def write[T](data: DStream[T], topic: String, brokers: Array[String],
numPartitions: Int): Unit = {
val dataToWrite =
  if (numPartitions > 0) {
data.repartition(numPartitions)
  } else {
data
  }

dataToWrite
  .map(x => new KeyedMessage[String, String](topic,
KafkaWriter.toJson(x)))
  .foreachRDD(rdd => {
  rdd.foreachPartition(part => {
val producer: Producer[String, String] =
KafkaWriter.createProducer(brokers)
part.foreach(item => producer.send(item))
producer.close()
  })
})
  }

  def apply(brokers: Option[Array[String]], topic: String, numPartitions:
Int): KafkaWriter = {
val brokersToUse =
  brokers match {
case Some(x) => x
case None => throw new IllegalArgumentException("Must specify
brokers!")
  }

new KafkaWriter(brokersToUse, topic, numPartitions)
  }

  def toJson[T](data: T): String = {
implicit val formats = DefaultFormats ++
net.liftweb.json.ext.JodaTimeSerializers.all
compactRender(decompose(data))
  }

  def createProducer(brokers: Array[String]): Producer[String, String] = {
val properties = new Properties()
properties.put("metadata.broker.list", brokers.mkString(","))
properties.put("serializer.class", "kafka.serializer.StringEncoder")

val kafkaConfig = new ProducerConfig(properties)
new Producer[String, String](kafkaConfig)
  }
}


Then just call:

val kafkaWriter: KafkaWriter =
KafkaWriter(KafkaStreamFactory.getBrokersFromConfig(config),
config.getString(Parameters.topicName), numPartitions =
kafkaWritePartitions)
detectionWriter.write(dataToWriteToKafka)


Hope that helps!

Bryan Jeffrey

On Thu, Apr 21, 2016 at 2:08 PM, Alexander Gallego 
wrote:

> Thanks Ted.
>
>  KafkaWordCount (producer) does not operate on a DStream[T]
>
> ```scala
>
>
> object KafkaWordCountProducer {
>
>   def main(args: Array[String]) {
> if (args.length < 4) {
>   System.err.println("Usage: KafkaWordCountProducer
>   " +
> " ")
>   System.exit(1)
> }
>
> val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
>
> // Zookeeper connection properties
> val props = new HashMap[String, Object]()
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>   "org.apache.kafka.common.serialization.StringSerializer")
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>   "org.apache.kafka.common.serialization.StringSerializer")
>
> val producer = new KafkaProducer[String, String](props)
>
> // Send some messages
> while(true) {
>   (1 to messagesPerSec.toInt).foreach { messageNum =>
> val str = (1 to wordsPerMessage.toInt).map(x =>
> scala.util.Random.nextInt(10).toString)
>   .mkString(" ")
>
> val message = new ProducerRecord[String, String](topic, null, str)
> producer.send(message)
>   }
>
>   Thread.sleep(1000)
> }
>   }
>
> }
>
> ```
>
>
> Also, doing:
>
>
> ```
> object KafkaSink {
>  def send(brokers: String, sc: SparkContext, topic: String, key:
> String, value: String) =
> getInstance(brokers, sc).value.send(new ProducerRecord(topic,
> key, value))
> }
>
> KafkaSink.send(brokers, sparkContext)(outputTopic, record._1, record._2)
>
> ```
>
>
> Doesn't work either, the result is:
>
> Exception in thread "main" org.apache.spark.SparkException: Task not
> serializable
>
>
> Thanks!
>
>
>
>
> On Thu, Apr 21, 2016 at 1:08 PM, Ted Yu  wrote:
> >
> > In KafkaWordCount , the String is sent back and producer.send() is
> called.
> >
> > I guess if you don't find via solution in your current design, you can
> consider the above.
> >
> > On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallego 
> wrote:
> >>
> >> Hello,
> >>
> >> I understand that you cannot serialize Kafka Producer.
> >>
> >> So I've tried:
> >>
> >> (as suggested here
> https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html)
>
> >>
> >>  - Make the class Serializable - not possible
> >>
> >>  - Declare the instance only within the lambda function passed in map.
> >>
> >> via:
> >>
> >> // as suggested by the docs
> >>
> >>
> >> ```scala
> >>
> >>kafkaOut.foreachRDD(rdd => {
> >>  rdd.foreachPartition(partition => {
> >>   val producer = new KafkaProducer(..)
> >>   partition.foreach { record =>
> >>   producer.send(new 

Re: Spark 1.6.1. How to prevent serialization of KafkaProducer

2016-04-21 Thread Todd Nist
Have you looked at these:

http://allegro.tech/2015/08/spark-kafka-integration.html
http://mkuthan.github.io/blog/2016/01/29/spark-kafka-integration2/

Full example here:

https://github.com/mkuthan/example-spark-kafka

HTH.

-Todd

On Thu, Apr 21, 2016 at 2:08 PM, Alexander Gallego 
wrote:

> Thanks Ted.
>
>  KafkaWordCount (producer) does not operate on a DStream[T]
>
> ```scala
>
>
> object KafkaWordCountProducer {
>
>   def main(args: Array[String]) {
> if (args.length < 4) {
>   System.err.println("Usage: KafkaWordCountProducer
>   " +
> " ")
>   System.exit(1)
> }
>
> val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
>
> // Zookeeper connection properties
> val props = new HashMap[String, Object]()
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>   "org.apache.kafka.common.serialization.StringSerializer")
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>   "org.apache.kafka.common.serialization.StringSerializer")
>
> val producer = new KafkaProducer[String, String](props)
>
> // Send some messages
> while(true) {
>   (1 to messagesPerSec.toInt).foreach { messageNum =>
> val str = (1 to wordsPerMessage.toInt).map(x =>
> scala.util.Random.nextInt(10).toString)
>   .mkString(" ")
>
> val message = new ProducerRecord[String, String](topic, null, str)
> producer.send(message)
>   }
>
>   Thread.sleep(1000)
> }
>   }
>
> }
>
> ```
>
>
> Also, doing:
>
>
> ```
> object KafkaSink {
>  def send(brokers: String, sc: SparkContext, topic: String, key:
> String, value: String) =
> getInstance(brokers, sc).value.send(new ProducerRecord(topic,
> key, value))
> }
>
> KafkaSink.send(brokers, sparkContext)(outputTopic, record._1, record._2)
>
> ```
>
>
> Doesn't work either, the result is:
>
> Exception in thread "main" org.apache.spark.SparkException: Task not
> serializable
>
>
> Thanks!
>
>
>
>
> On Thu, Apr 21, 2016 at 1:08 PM, Ted Yu  wrote:
> >
> > In KafkaWordCount , the String is sent back and producer.send() is
> called.
> >
> > I guess if you don't find via solution in your current design, you can
> consider the above.
> >
> > On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallego 
> wrote:
> >>
> >> Hello,
> >>
> >> I understand that you cannot serialize Kafka Producer.
> >>
> >> So I've tried:
> >>
> >> (as suggested here
> https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html)
>
> >>
> >>  - Make the class Serializable - not possible
> >>
> >>  - Declare the instance only within the lambda function passed in map.
> >>
> >> via:
> >>
> >> // as suggested by the docs
> >>
> >>
> >> ```scala
> >>
> >>kafkaOut.foreachRDD(rdd => {
> >>  rdd.foreachPartition(partition => {
> >>   val producer = new KafkaProducer(..)
> >>   partition.foreach { record =>
> >>   producer.send(new ProducerRecord(outputTopic, record._1,
> record._2)
> >>}
> >>   producer.close()
> >>})
> >>  }) // foreachRDD
> >>
> >>
> >> ```
> >>
> >> - Make the NotSerializable object as a static and create it once per
> machine.
> >>
> >> via:
> >>
> >>
> >> ```scala
> >>
> >>
> >> object KafkaSink {
> >>   @volatile private var instance: Broadcast[KafkaProducer[String,
> String]] = null
> >>   def getInstance(brokers: String, sc: SparkContext):
> Broadcast[KafkaProducer[String, String]] = {
> >> if (instance == null) {
> >>   synchronized {
> >> println("Creating new kafka producer")
> >> val props = new java.util.Properties()
> >> ...
> >> instance = sc.broadcast(new KafkaProducer[String,
> String](props))
> >> sys.addShutdownHook {
> >>   instance.value.close()
> >> }
> >>   }
> >> }
> >> instance
> >>   }
> >> }
> >>
> >>
> >> ```
> >>
> >>
> >>
> >>  - Call rdd.forEachPartition and create the NotSerializable object in
> there like this:
> >>
> >> Same as above.
> >>
> >>
> >> - Mark the instance @transient
> >>
> >> Same thing, just make it a class variable via:
> >>
> >>
> >> ```
> >> @transient var producer: KakfaProducer[String,String] = null
> >> def getInstance() = {
> >>if( producer == null ) {
> >>producer = new KafkaProducer()
> >>}
> >>producer
> >> }
> >>
> >> ```
> >>
> >>
> >> However, I get serialization problems with all of these options.
> >>
> >>
> >> Thanks for your help.
> >>
> >> - Alex
> >>
> >
>
>
>
> --
>
>
>
>
>
> Alexander Gallego
> Co-Founder & CTO
>


Re: Spark 1.6.1. How to prevent serialization of KafkaProducer

2016-04-21 Thread Alexander Gallego
Thanks Ted.

 KafkaWordCount (producer) does not operate on a DStream[T]

```scala


object KafkaWordCountProducer {

  def main(args: Array[String]) {
if (args.length < 4) {
  System.err.println("Usage: KafkaWordCountProducer
  " +
" ")
  System.exit(1)
}

val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

// Zookeeper connection properties
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")

val producer = new KafkaProducer[String, String](props)

// Send some messages
while(true) {
  (1 to messagesPerSec.toInt).foreach { messageNum =>
val str = (1 to wordsPerMessage.toInt).map(x =>
scala.util.Random.nextInt(10).toString)
  .mkString(" ")

val message = new ProducerRecord[String, String](topic, null, str)
producer.send(message)
  }

  Thread.sleep(1000)
}
  }

}

```


Also, doing:


```
object KafkaSink {
 def send(brokers: String, sc: SparkContext, topic: String, key:
String, value: String) =
getInstance(brokers, sc).value.send(new ProducerRecord(topic,
key, value))
}

KafkaSink.send(brokers, sparkContext)(outputTopic, record._1, record._2)

```


Doesn't work either, the result is:

Exception in thread "main" org.apache.spark.SparkException: Task not
serializable


Thanks!



On Thu, Apr 21, 2016 at 1:08 PM, Ted Yu  wrote:
>
> In KafkaWordCount , the String is sent back and producer.send() is called.
>
> I guess if you don't find via solution in your current design, you can
consider the above.
>
> On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallego 
wrote:
>>
>> Hello,
>>
>> I understand that you cannot serialize Kafka Producer.
>>
>> So I've tried:
>>
>> (as suggested here
https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html)

>>
>>  - Make the class Serializable - not possible
>>
>>  - Declare the instance only within the lambda function passed in map.
>>
>> via:
>>
>> // as suggested by the docs
>>
>>
>> ```scala
>>
>>kafkaOut.foreachRDD(rdd => {
>>  rdd.foreachPartition(partition => {
>>   val producer = new KafkaProducer(..)
>>   partition.foreach { record =>
>>   producer.send(new ProducerRecord(outputTopic, record._1,
record._2)
>>}
>>   producer.close()
>>})
>>  }) // foreachRDD
>>
>>
>> ```
>>
>> - Make the NotSerializable object as a static and create it once per
machine.
>>
>> via:
>>
>>
>> ```scala
>>
>>
>> object KafkaSink {
>>   @volatile private var instance: Broadcast[KafkaProducer[String,
String]] = null
>>   def getInstance(brokers: String, sc: SparkContext):
Broadcast[KafkaProducer[String, String]] = {
>> if (instance == null) {
>>   synchronized {
>> println("Creating new kafka producer")
>> val props = new java.util.Properties()
>> ...
>> instance = sc.broadcast(new KafkaProducer[String, String](props))
>> sys.addShutdownHook {
>>   instance.value.close()
>> }
>>   }
>> }
>> instance
>>   }
>> }
>>
>>
>> ```
>>
>>
>>
>>  - Call rdd.forEachPartition and create the NotSerializable object in
there like this:
>>
>> Same as above.
>>
>>
>> - Mark the instance @transient
>>
>> Same thing, just make it a class variable via:
>>
>>
>> ```
>> @transient var producer: KakfaProducer[String,String] = null
>> def getInstance() = {
>>if( producer == null ) {
>>producer = new KafkaProducer()
>>}
>>producer
>> }
>>
>> ```
>>
>>
>> However, I get serialization problems with all of these options.
>>
>>
>> Thanks for your help.
>>
>> - Alex
>>
>



--





Alexander Gallego
Co-Founder & CTO


Re: Spark 1.6.1. How to prevent serialization of KafkaProducer

2016-04-21 Thread Ted Yu
In KafkaWordCount , the String is sent back and producer.send() is called.

I guess if you don't find via solution in your current design, you can
consider the above.

On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallego 
wrote:

> Hello,
>
> I understand that you cannot serialize Kafka Producer.
>
> So I've tried:
>
> (as suggested here
> https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html
> )
>
>  - Make the class Serializable - not possible
>
>  - Declare the instance only within the lambda function passed in map.
>
> via:
>
> // as suggested by the docs
>
>
> ```scala
>
>kafkaOut.foreachRDD(rdd => {
>  rdd.foreachPartition(partition => {
>   val producer = new KafkaProducer(..)
>   partition.foreach { record =>
>   producer.send(new ProducerRecord(outputTopic, record._1,
> record._2)
>}
>   producer.close()
>})
>  }) // foreachRDD
>
>
> ```
>
> - Make the NotSerializable object as a static and create it once per
> machine.
>
> via:
>
>
> ```scala
>
>
> object KafkaSink {
>   @volatile private var instance: Broadcast[KafkaProducer[String, String]]
> = null
>   def getInstance(brokers: String, sc: SparkContext):
> Broadcast[KafkaProducer[String, String]] = {
> if (instance == null) {
>   synchronized {
> println("Creating new kafka producer")
> val props = new java.util.Properties()
> ...
> instance = sc.broadcast(new KafkaProducer[String, String](props))
> sys.addShutdownHook {
>   instance.value.close()
> }
>   }
> }
> instance
>   }
> }
>
>
> ```
>
>
>
>  - Call rdd.forEachPartition and create the NotSerializable object in
> there like this:
>
> Same as above.
>
>
> - Mark the instance @transient
>
> Same thing, just make it a class variable via:
>
>
> ```
> @transient var producer: KakfaProducer[String,String] = null
> def getInstance() = {
>if( producer == null ) {
>producer = new KafkaProducer()
>}
>producer
> }
>
> ```
>
>
> However, I get serialization problems with all of these options.
>
>
> Thanks for your help.
>
> - Alex
>
>


Spark 1.6.1. How to prevent serialization of KafkaProducer

2016-04-21 Thread Alexander Gallego
Hello,

I understand that you cannot serialize Kafka Producer.

So I've tried:

(as suggested here
https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html
)

 - Make the class Serializable - not possible

 - Declare the instance only within the lambda function passed in map.

via:

// as suggested by the docs


```scala

   kafkaOut.foreachRDD(rdd => {
 rdd.foreachPartition(partition => {
  val producer = new KafkaProducer(..)
  partition.foreach { record =>
  producer.send(new ProducerRecord(outputTopic, record._1,
record._2)
   }
  producer.close()
   })
 }) // foreachRDD


```

- Make the NotSerializable object as a static and create it once per
machine.

via:


```scala


object KafkaSink {
  @volatile private var instance: Broadcast[KafkaProducer[String, String]]
= null
  def getInstance(brokers: String, sc: SparkContext):
Broadcast[KafkaProducer[String, String]] = {
if (instance == null) {
  synchronized {
println("Creating new kafka producer")
val props = new java.util.Properties()
...
instance = sc.broadcast(new KafkaProducer[String, String](props))
sys.addShutdownHook {
  instance.value.close()
}
  }
}
instance
  }
}


```



 - Call rdd.forEachPartition and create the NotSerializable object in there
like this:

Same as above.


- Mark the instance @transient

Same thing, just make it a class variable via:


```
@transient var producer: KakfaProducer[String,String] = null
def getInstance() = {
   if( producer == null ) {
   producer = new KafkaProducer()
   }
   producer
}

```


However, I get serialization problems with all of these options.


Thanks for your help.

- Alex