What I'm saying is that it's not a given with spark, even in receiver-based mode, because as soon as you lose an executor you'll have a rebalance.
Spark's model in general isn't a good fit for pinning work to specific nodes. If you really want to try and fake this, you can override getPreferredLocations and set spark.locality.wait to a high value. On Wed, Oct 14, 2015 at 2:45 PM, Gerard Maas <gerard.m...@gmail.com> wrote: > Hi Cody, > > I think that I misused the term 'data locality'. I think I should better > call it "node affinity" instead, as this is what I would like to have: > For as long as an executor is available, I would like to have the same > kafka partition processed by the same node in order to take advantage of > local in-memory structures. > > In the receiver-based mode this was a given. Any ideas how to achieve that > with the direct stream approach? > > -greetz, Gerard. > > > On Wed, Oct 14, 2015 at 4:31 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> Assumptions about locality in spark are not very reliable, regardless of >> what consumer you use. Even if you have locality preferences, and locality >> wait turned up really high, you still have to account for losing executors. >> >> On Wed, Oct 14, 2015 at 8:23 AM, Gerard Maas <gerard.m...@gmail.com> >> wrote: >> >>> Thanks Saisai, Mishra, >>> >>> Indeed, that hint will only work on a case where the Spark executor is >>> co-located with the Kafka broker. >>> I think the answer to my question as stated is that there's no warranty >>> of where the task will execute as it will depend on the scheduler and >>> cluster resources available (Mesos in our case). >>> Therefore, any assumptions made about data locality using the >>> consumer-based approach need to be reconsidered when migrating to the >>> direct stream. >>> >>> ((In our case, we were using local caches to decide when a given >>> secondary index for a record should be produced and written.)) >>> >>> -kr, Gerard. >>> >>> >>> >>> >>> On Wed, Oct 14, 2015 at 2:58 PM, Saisai Shao <sai.sai.s...@gmail.com> >>> wrote: >>> >>>> This preferred locality is a hint to spark to schedule Kafka tasks on >>>> the preferred nodes, if Kafka and Spark are two separate cluster, obviously >>>> this locality hint takes no effect, and spark will schedule tasks following >>>> node-local -> rack-local -> any pattern, like any other spark tasks. >>>> >>>> On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra < >>>> rmis...@snappydata.io> wrote: >>>> >>>>> Hi Gerard, >>>>> I am also trying to understand the same issue. Whatever code I have >>>>> seen it looks like once Kafka RDD is constructed the execution of that RDD >>>>> is upto the task scheduler and it can schedule the partitions based on the >>>>> load on nodes. There is preferred node specified in Kafks RDD. But ASFIK >>>>> it >>>>> maps to the Kafka partitions host . So if Kafka and Spark are co hosted >>>>> probably this will work. If not, I am not sure how to get data locality >>>>> for >>>>> a partition. >>>>> Others, >>>>> correct me if there is a way. >>>>> >>>>> On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas <gerard.m...@gmail.com> >>>>> wrote: >>>>> >>>>>> In the receiver-based kafka streaming model, given that each receiver >>>>>> starts as a long-running task, one can rely in a certain degree of data >>>>>> locality based on the kafka partitioning: Data published on a given >>>>>> topic/partition will land on the same spark streaming receiving node >>>>>> until >>>>>> the receiver dies and needs to be restarted somewhere else. >>>>>> >>>>>> As I understand, the direct-kafka streaming model just computes >>>>>> offsets and relays the work to a KafkaRDD. How is the execution locality >>>>>> compared to the receiver-based approach? >>>>>> >>>>>> thanks, Gerard. >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> Regards, >>>>> Rishitesh Mishra, >>>>> SnappyData . (http://www.snappydata.io/) >>>>> >>>>> https://in.linkedin.com/in/rishiteshmishra >>>>> >>>> >>>> >>> >> >