Re: creating a single kafka producer object for all partitions

2015-01-12 Thread Hafiz Mujadid
Actually this code is producing error leader not found exception. I am
unable to find the reason

On Mon, Jan 12, 2015 at 4:03 PM, kevinkim [via Apache Spark User List] 
ml-node+s1001560n21098...@n3.nabble.com wrote:

 Well, you can use coalesce() to decrease number of partition to 1.
 (It will take time and quite not efficient, tough)

 Regards,
 Kevin.

 On Mon Jan 12 2015 at 7:57:39 PM Hafiz Mujadid [via Apache Spark User
 List] [hidden email]
 http:///user/SendEmail.jtp?type=nodenode=21098i=0 wrote:

 Hi experts!


 I have a schemaRDD of messages to be pushed in kafka. So I am using
 following piece of code to do that

 rdd.foreachPartition(itr = {
 val props = new Properties()
 props.put(metadata.broker.list,
 brokersList)
 props.put(serializer.class,
 kafka.serializer.StringEncoder)
 props.put(compression.codec,
 codec.toString)
 props.put(producer.type, sync)
 props.put(batch.num.messages,
 BatchSize.toString)
 props.put(message.send.max.retries,
 maxRetries.toString)
 props.put(request.required.acks, -1)
 producer = new Producer[String,
 String](new ProducerConfig(props))
 itr.foreach(row = {
 val msg =
 row.toString.drop(1).dropRight(1)
 this.synchronized {
 producer.send(new
 KeyedMessage[String, String](Topic, msg))
 }
 })
 producer.close
 })



 the problem with this code is that it creates kafka producer separate for
 each partition and I want a single producer for all partitions. Is there
 any way to achieve this?


 --
  If you reply to this email, your message will be added to the
 discussion below:

 http://apache-spark-user-list.1001560.n3.nabble.com/creating-a-single-kafka-producer-object-for-all-partitions-tp21097.html
  To unsubscribe from Apache Spark User List, click here.
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml



 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/creating-a-single-kafka-producer-object-for-all-partitions-tp21097p21098.html
  To unsubscribe from creating a single kafka producer object for all
 partitions, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=21097code=aGFmaXptdWphZGlkMDBAZ21haWwuY29tfDIxMDk3fC05MjEzOTMxMTE=
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml




-- 
Regards: HAFIZ MUJADID




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/creating-a-single-kafka-producer-object-for-all-partitions-tp21097p21099.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: creating a single kafka producer object for all partitions

2015-01-12 Thread Cody Koeninger
You should take a look at

https://issues.apache.org/jira/browse/SPARK-4122

which is implementing writing to kafka in a pretty similar way (make a new
producer inside foreachPartition)

On Mon, Jan 12, 2015 at 5:24 AM, Sean Owen so...@cloudera.com wrote:

 Leader-not-found suggests a problem with zookeeper config. It depends
 a lot on the specifics of your error. But this is really a Kafka
 question, better for the Kafka list.

 On Mon, Jan 12, 2015 at 11:10 AM, Hafiz Mujadid
 hafizmujadi...@gmail.com wrote:
  Actually this code is producing error leader not found exception. I am
  unable to find the reason
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




creating a single kafka producer object for all partitions

2015-01-12 Thread Hafiz Mujadid
Hi experts!


I have a schemaRDD of messages to be pushed in kafka. So I am using
following piece of code to do that

rdd.foreachPartition(itr = {
val props = new Properties()
props.put(metadata.broker.list, brokersList)
props.put(serializer.class, 
kafka.serializer.StringEncoder)
props.put(compression.codec, codec.toString)
props.put(producer.type, sync)
props.put(batch.num.messages, 
BatchSize.toString)
props.put(message.send.max.retries, 
maxRetries.toString)
props.put(request.required.acks, -1)
producer = new Producer[String, String](new 
ProducerConfig(props))
itr.foreach(row = {
val msg = 
row.toString.drop(1).dropRight(1)
this.synchronized {
producer.send(new 
KeyedMessage[String, String](Topic, msg))
}
})
producer.close
})



the problem with this code is that it creates kafka producer separate for
each partition and I want a single producer for all partitions. Is there any
way to achieve this?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/creating-a-single-kafka-producer-object-for-all-partitions-tp21097.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: creating a single kafka producer object for all partitions

2015-01-12 Thread Sean Owen
Leader-not-found suggests a problem with zookeeper config. It depends
a lot on the specifics of your error. But this is really a Kafka
question, better for the Kafka list.

On Mon, Jan 12, 2015 at 11:10 AM, Hafiz Mujadid
hafizmujadi...@gmail.com wrote:
 Actually this code is producing error leader not found exception. I am
 unable to find the reason


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org