RE: Node afinity for Kafka-Direct Stream

2015-10-14 Thread prajod.vettiyattil
] Sent: 14 October 2015 18:53 To: Saisai Shao <sai.sai.s...@gmail.com> Cc: Rishitesh Mishra <rmis...@snappydata.io>; spark users <user@spark.apache.org> Subject: Re: Node afinity for Kafka-Direct Stream Thanks Saisai, Mishra, Indeed, that hint will only work on a case where

Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Cody Koeninger
Assumptions about locality in spark are not very reliable, regardless of what consumer you use. Even if you have locality preferences, and locality wait turned up really high, you still have to account for losing executors. On Wed, Oct 14, 2015 at 8:23 AM, Gerard Maas

Node afinity for Kafka-Direct Stream

2015-10-14 Thread Gerard Maas
In the receiver-based kafka streaming model, given that each receiver starts as a long-running task, one can rely in a certain degree of data locality based on the kafka partitioning: Data published on a given topic/partition will land on the same spark streaming receiving node until the receiver

Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Gerard Maas
Hi Cody, I think that I misused the term 'data locality'. I think I should better call it "node affinity" instead, as this is what I would like to have: For as long as an executor is available, I would like to have the same kafka partition processed by the same node in order to take advantage of

Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Cody Koeninger
What I'm saying is that it's not a given with spark, even in receiver-based mode, because as soon as you lose an executor you'll have a rebalance. Spark's model in general isn't a good fit for pinning work to specific nodes. If you really want to try and fake this, you can override

Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Saisai Shao
You could check the code of KafkaRDD, the locality (host) is got from Kafka's partition and set in KafkaRDD, this will a hint for Spark to schedule task on the preferred location. override def getPreferredLocations(thePart: Partition): Seq[String] = { val part =

Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Rishitesh Mishra
Hi Gerard, I am also trying to understand the same issue. Whatever code I have seen it looks like once Kafka RDD is constructed the execution of that RDD is upto the task scheduler and it can schedule the partitions based on the load on nodes. There is preferred node specified in Kafks RDD. But

Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Saisai Shao
This preferred locality is a hint to spark to schedule Kafka tasks on the preferred nodes, if Kafka and Spark are two separate cluster, obviously this locality hint takes no effect, and spark will schedule tasks following node-local -> rack-local -> any pattern, like any other spark tasks. On

Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Gerard Maas
Thanks Saisai, Mishra, Indeed, that hint will only work on a case where the Spark executor is co-located with the Kafka broker. I think the answer to my question as stated is that there's no warranty of where the task will execute as it will depend on the scheduler and cluster resources