RE: Node afinity for Kafka-Direct Stream

2015-10-14 Thread prajod.vettiyattil
Hi,

Another point is the in the receiver based approach, all the data from kafka 
first goes to the Worker where the receiver runs
https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md

Also if you create one stream (which is the normal case), and you have many 
worker instances, only one worker does all the reading. Once that worker reads, 
the data can be “repartitioned” to distribute the load. This repartitioning is 
a data movement overhead in the receiver based approach.
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
{
In Receiver approach:
Multiple Kafka input DStreams can be created with different groups and topics 
for parallel receiving of data using multiple receivers.


In Direct approach:
Simplified Parallelism: No need to create multiple input Kafka streams and 
union them.
}

Prajod
From: Gerard Maas [mailto:gerard.m...@gmail.com]
Sent: 14 October 2015 18:53
To: Saisai Shao <sai.sai.s...@gmail.com>
Cc: Rishitesh Mishra <rmis...@snappydata.io>; spark users 
<user@spark.apache.org>
Subject: Re: Node afinity for Kafka-Direct Stream

Thanks Saisai, Mishra,

Indeed, that hint will only work on a case where the Spark executor is 
co-located with the Kafka broker.
I think the answer to my question as stated  is that there's no warranty of 
where the task will execute as it will depend on the scheduler and cluster 
resources available  (Mesos in our case).
Therefore, any assumptions made about data locality using the consumer-based 
approach need to be reconsidered when migrating to the direct stream.

((In our case, we were using local caches to decide when a given secondary 
index for a record should be produced and written.))

-kr, Gerard.




On Wed, Oct 14, 2015 at 2:58 PM, Saisai Shao 
<sai.sai.s...@gmail.com<mailto:sai.sai.s...@gmail.com>> wrote:
This preferred locality is a hint to spark to schedule Kafka tasks on the 
preferred nodes, if Kafka and Spark are two separate cluster, obviously this 
locality hint takes no effect, and spark will schedule tasks following 
node-local -> rack-local -> any pattern, like any other spark tasks.

On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra 
<rmis...@snappydata.io<mailto:rmis...@snappydata.io>> wrote:
Hi Gerard,
I am also trying to understand the same issue. Whatever code I have seen it 
looks like once Kafka RDD is constructed the execution of that RDD is upto the 
task scheduler and it can schedule the partitions based on the load on nodes. 
There is preferred node specified in Kafks RDD. But ASFIK it maps to the Kafka 
partitions host . So if Kafka and Spark are co hosted probably this will work. 
If not, I am not sure how to get data locality for a partition.
Others,
correct me if there is a way.

On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas 
<gerard.m...@gmail.com<mailto:gerard.m...@gmail.com>> wrote:
In the receiver-based kafka streaming model, given that each receiver starts as 
a long-running task, one can rely in a certain degree of data locality based on 
the kafka partitioning:  Data published on a given topic/partition will land on 
the same spark streaming receiving node until the receiver dies and needs to be 
restarted somewhere else.

As I understand, the direct-kafka streaming model just computes offsets and 
relays the work to a KafkaRDD. How is the execution locality compared to the 
receiver-based approach?

thanks, Gerard.



--

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra


The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com


Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Cody Koeninger
Assumptions about locality in spark are not very reliable, regardless of
what consumer you use.  Even if you have locality preferences, and locality
wait turned up really high, you still have to account for losing executors.

On Wed, Oct 14, 2015 at 8:23 AM, Gerard Maas  wrote:

> Thanks Saisai, Mishra,
>
> Indeed, that hint will only work on a case where the Spark executor is
> co-located with the Kafka broker.
> I think the answer to my question as stated  is that there's no warranty
> of where the task will execute as it will depend on the scheduler and
> cluster resources available  (Mesos in our case).
> Therefore, any assumptions made about data locality using the
> consumer-based approach need to be reconsidered when migrating to the
> direct stream.
>
> ((In our case, we were using local caches to decide when a given secondary
> index for a record should be produced and written.))
>
> -kr, Gerard.
>
>
>
>
> On Wed, Oct 14, 2015 at 2:58 PM, Saisai Shao 
> wrote:
>
>> This preferred locality is a hint to spark to schedule Kafka tasks on the
>> preferred nodes, if Kafka and Spark are two separate cluster, obviously
>> this locality hint takes no effect, and spark will schedule tasks following
>> node-local -> rack-local -> any pattern, like any other spark tasks.
>>
>> On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra 
>> wrote:
>>
>>> Hi Gerard,
>>> I am also trying to understand the same issue. Whatever code I have seen
>>> it looks like once Kafka RDD is constructed the execution of that RDD is
>>> upto the task scheduler and it can schedule the partitions based on the
>>> load on nodes. There is preferred node specified in Kafks RDD. But ASFIK it
>>> maps to the Kafka partitions host . So if Kafka and Spark are co hosted
>>> probably this will work. If not, I am not sure how to get data locality for
>>> a partition.
>>> Others,
>>> correct me if there is a way.
>>>
>>> On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas 
>>> wrote:
>>>
 In the receiver-based kafka streaming model, given that each receiver
 starts as a long-running task, one can rely in a certain degree of data
 locality based on the kafka partitioning:  Data published on a given
 topic/partition will land on the same spark streaming receiving node until
 the receiver dies and needs to be restarted somewhere else.

 As I understand, the direct-kafka streaming model just computes offsets
 and relays the work to a KafkaRDD. How is the execution locality compared
 to the receiver-based approach?

 thanks, Gerard.

>>>
>>>
>>>
>>> --
>>>
>>> Regards,
>>> Rishitesh Mishra,
>>> SnappyData . (http://www.snappydata.io/)
>>>
>>> https://in.linkedin.com/in/rishiteshmishra
>>>
>>
>>
>


Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Gerard Maas
Hi Cody,

I think that I misused the term 'data locality'. I think I should better
call it "node affinity"  instead, as this is what I would like to have:
For as long as an executor is available, I would like to have the same
kafka partition processed by the same node in order to take advantage of
local in-memory structures.

In the receiver-based mode this was a given. Any ideas how to achieve that
with the direct stream approach?

-greetz, Gerard.


On Wed, Oct 14, 2015 at 4:31 PM, Cody Koeninger  wrote:

> Assumptions about locality in spark are not very reliable, regardless of
> what consumer you use.  Even if you have locality preferences, and locality
> wait turned up really high, you still have to account for losing executors.
>
> On Wed, Oct 14, 2015 at 8:23 AM, Gerard Maas 
> wrote:
>
>> Thanks Saisai, Mishra,
>>
>> Indeed, that hint will only work on a case where the Spark executor is
>> co-located with the Kafka broker.
>> I think the answer to my question as stated  is that there's no warranty
>> of where the task will execute as it will depend on the scheduler and
>> cluster resources available  (Mesos in our case).
>> Therefore, any assumptions made about data locality using the
>> consumer-based approach need to be reconsidered when migrating to the
>> direct stream.
>>
>> ((In our case, we were using local caches to decide when a given
>> secondary index for a record should be produced and written.))
>>
>> -kr, Gerard.
>>
>>
>>
>>
>> On Wed, Oct 14, 2015 at 2:58 PM, Saisai Shao 
>> wrote:
>>
>>> This preferred locality is a hint to spark to schedule Kafka tasks on
>>> the preferred nodes, if Kafka and Spark are two separate cluster, obviously
>>> this locality hint takes no effect, and spark will schedule tasks following
>>> node-local -> rack-local -> any pattern, like any other spark tasks.
>>>
>>> On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra >> > wrote:
>>>
 Hi Gerard,
 I am also trying to understand the same issue. Whatever code I have
 seen it looks like once Kafka RDD is constructed the execution of that RDD
 is upto the task scheduler and it can schedule the partitions based on the
 load on nodes. There is preferred node specified in Kafks RDD. But ASFIK it
 maps to the Kafka partitions host . So if Kafka and Spark are co hosted
 probably this will work. If not, I am not sure how to get data locality for
 a partition.
 Others,
 correct me if there is a way.

 On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas 
 wrote:

> In the receiver-based kafka streaming model, given that each receiver
> starts as a long-running task, one can rely in a certain degree of data
> locality based on the kafka partitioning:  Data published on a given
> topic/partition will land on the same spark streaming receiving node until
> the receiver dies and needs to be restarted somewhere else.
>
> As I understand, the direct-kafka streaming model just computes
> offsets and relays the work to a KafkaRDD. How is the execution locality
> compared to the receiver-based approach?
>
> thanks, Gerard.
>



 --

 Regards,
 Rishitesh Mishra,
 SnappyData . (http://www.snappydata.io/)

 https://in.linkedin.com/in/rishiteshmishra

>>>
>>>
>>
>


Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Cody Koeninger
What I'm saying is that it's not a given with spark, even in receiver-based
mode, because as soon as you lose an executor you'll have a rebalance.

Spark's model in general isn't a good fit for pinning work to specific
nodes.

If you really want to try and fake this, you can override
getPreferredLocations and set spark.locality.wait to a high value.



On Wed, Oct 14, 2015 at 2:45 PM, Gerard Maas  wrote:

> Hi Cody,
>
> I think that I misused the term 'data locality'. I think I should better
> call it "node affinity"  instead, as this is what I would like to have:
> For as long as an executor is available, I would like to have the same
> kafka partition processed by the same node in order to take advantage of
> local in-memory structures.
>
> In the receiver-based mode this was a given. Any ideas how to achieve that
> with the direct stream approach?
>
> -greetz, Gerard.
>
>
> On Wed, Oct 14, 2015 at 4:31 PM, Cody Koeninger 
> wrote:
>
>> Assumptions about locality in spark are not very reliable, regardless of
>> what consumer you use.  Even if you have locality preferences, and locality
>> wait turned up really high, you still have to account for losing executors.
>>
>> On Wed, Oct 14, 2015 at 8:23 AM, Gerard Maas 
>> wrote:
>>
>>> Thanks Saisai, Mishra,
>>>
>>> Indeed, that hint will only work on a case where the Spark executor is
>>> co-located with the Kafka broker.
>>> I think the answer to my question as stated  is that there's no warranty
>>> of where the task will execute as it will depend on the scheduler and
>>> cluster resources available  (Mesos in our case).
>>> Therefore, any assumptions made about data locality using the
>>> consumer-based approach need to be reconsidered when migrating to the
>>> direct stream.
>>>
>>> ((In our case, we were using local caches to decide when a given
>>> secondary index for a record should be produced and written.))
>>>
>>> -kr, Gerard.
>>>
>>>
>>>
>>>
>>> On Wed, Oct 14, 2015 at 2:58 PM, Saisai Shao 
>>> wrote:
>>>
 This preferred locality is a hint to spark to schedule Kafka tasks on
 the preferred nodes, if Kafka and Spark are two separate cluster, obviously
 this locality hint takes no effect, and spark will schedule tasks following
 node-local -> rack-local -> any pattern, like any other spark tasks.

 On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra <
 rmis...@snappydata.io> wrote:

> Hi Gerard,
> I am also trying to understand the same issue. Whatever code I have
> seen it looks like once Kafka RDD is constructed the execution of that RDD
> is upto the task scheduler and it can schedule the partitions based on the
> load on nodes. There is preferred node specified in Kafks RDD. But ASFIK 
> it
> maps to the Kafka partitions host . So if Kafka and Spark are co hosted
> probably this will work. If not, I am not sure how to get data locality 
> for
> a partition.
> Others,
> correct me if there is a way.
>
> On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas 
> wrote:
>
>> In the receiver-based kafka streaming model, given that each receiver
>> starts as a long-running task, one can rely in a certain degree of data
>> locality based on the kafka partitioning:  Data published on a given
>> topic/partition will land on the same spark streaming receiving node 
>> until
>> the receiver dies and needs to be restarted somewhere else.
>>
>> As I understand, the direct-kafka streaming model just computes
>> offsets and relays the work to a KafkaRDD. How is the execution locality
>> compared to the receiver-based approach?
>>
>> thanks, Gerard.
>>
>
>
>
> --
>
> Regards,
> Rishitesh Mishra,
> SnappyData . (http://www.snappydata.io/)
>
> https://in.linkedin.com/in/rishiteshmishra
>


>>>
>>
>


Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Saisai Shao
You could check the code of KafkaRDD, the locality (host) is got from
Kafka's partition and set in KafkaRDD, this will a hint for Spark to
schedule task on the preferred location.

override def getPreferredLocations(thePart: Partition): Seq[String] = {
  val part = thePart.asInstanceOf[KafkaRDDPartition]
  // TODO is additional hostname resolution necessary here
  Seq(part.host)
}


On Wed, Oct 14, 2015 at 5:38 PM, Gerard Maas  wrote:

> In the receiver-based kafka streaming model, given that each receiver
> starts as a long-running task, one can rely in a certain degree of data
> locality based on the kafka partitioning:  Data published on a given
> topic/partition will land on the same spark streaming receiving node until
> the receiver dies and needs to be restarted somewhere else.
>
> As I understand, the direct-kafka streaming model just computes offsets
> and relays the work to a KafkaRDD. How is the execution locality compared
> to the receiver-based approach?
>
> thanks, Gerard.
>


Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Rishitesh Mishra
Hi Gerard,
I am also trying to understand the same issue. Whatever code I have seen it
looks like once Kafka RDD is constructed the execution of that RDD is upto
the task scheduler and it can schedule the partitions based on the load on
nodes. There is preferred node specified in Kafks RDD. But ASFIK it maps to
the Kafka partitions host . So if Kafka and Spark are co hosted probably
this will work. If not, I am not sure how to get data locality for a
partition.
Others,
correct me if there is a way.

On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas  wrote:

> In the receiver-based kafka streaming model, given that each receiver
> starts as a long-running task, one can rely in a certain degree of data
> locality based on the kafka partitioning:  Data published on a given
> topic/partition will land on the same spark streaming receiving node until
> the receiver dies and needs to be restarted somewhere else.
>
> As I understand, the direct-kafka streaming model just computes offsets
> and relays the work to a KafkaRDD. How is the execution locality compared
> to the receiver-based approach?
>
> thanks, Gerard.
>



-- 

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra


Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Saisai Shao
This preferred locality is a hint to spark to schedule Kafka tasks on the
preferred nodes, if Kafka and Spark are two separate cluster, obviously
this locality hint takes no effect, and spark will schedule tasks following
node-local -> rack-local -> any pattern, like any other spark tasks.

On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra 
wrote:

> Hi Gerard,
> I am also trying to understand the same issue. Whatever code I have seen
> it looks like once Kafka RDD is constructed the execution of that RDD is
> upto the task scheduler and it can schedule the partitions based on the
> load on nodes. There is preferred node specified in Kafks RDD. But ASFIK it
> maps to the Kafka partitions host . So if Kafka and Spark are co hosted
> probably this will work. If not, I am not sure how to get data locality for
> a partition.
> Others,
> correct me if there is a way.
>
> On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas 
> wrote:
>
>> In the receiver-based kafka streaming model, given that each receiver
>> starts as a long-running task, one can rely in a certain degree of data
>> locality based on the kafka partitioning:  Data published on a given
>> topic/partition will land on the same spark streaming receiving node until
>> the receiver dies and needs to be restarted somewhere else.
>>
>> As I understand, the direct-kafka streaming model just computes offsets
>> and relays the work to a KafkaRDD. How is the execution locality compared
>> to the receiver-based approach?
>>
>> thanks, Gerard.
>>
>
>
>
> --
>
> Regards,
> Rishitesh Mishra,
> SnappyData . (http://www.snappydata.io/)
>
> https://in.linkedin.com/in/rishiteshmishra
>


Re: Node afinity for Kafka-Direct Stream

2015-10-14 Thread Gerard Maas
Thanks Saisai, Mishra,

Indeed, that hint will only work on a case where the Spark executor is
co-located with the Kafka broker.
I think the answer to my question as stated  is that there's no warranty of
where the task will execute as it will depend on the scheduler and cluster
resources available  (Mesos in our case).
Therefore, any assumptions made about data locality using the
consumer-based approach need to be reconsidered when migrating to the
direct stream.

((In our case, we were using local caches to decide when a given secondary
index for a record should be produced and written.))

-kr, Gerard.




On Wed, Oct 14, 2015 at 2:58 PM, Saisai Shao  wrote:

> This preferred locality is a hint to spark to schedule Kafka tasks on the
> preferred nodes, if Kafka and Spark are two separate cluster, obviously
> this locality hint takes no effect, and spark will schedule tasks following
> node-local -> rack-local -> any pattern, like any other spark tasks.
>
> On Wed, Oct 14, 2015 at 8:10 PM, Rishitesh Mishra 
> wrote:
>
>> Hi Gerard,
>> I am also trying to understand the same issue. Whatever code I have seen
>> it looks like once Kafka RDD is constructed the execution of that RDD is
>> upto the task scheduler and it can schedule the partitions based on the
>> load on nodes. There is preferred node specified in Kafks RDD. But ASFIK it
>> maps to the Kafka partitions host . So if Kafka and Spark are co hosted
>> probably this will work. If not, I am not sure how to get data locality for
>> a partition.
>> Others,
>> correct me if there is a way.
>>
>> On Wed, Oct 14, 2015 at 3:08 PM, Gerard Maas 
>> wrote:
>>
>>> In the receiver-based kafka streaming model, given that each receiver
>>> starts as a long-running task, one can rely in a certain degree of data
>>> locality based on the kafka partitioning:  Data published on a given
>>> topic/partition will land on the same spark streaming receiving node until
>>> the receiver dies and needs to be restarted somewhere else.
>>>
>>> As I understand, the direct-kafka streaming model just computes offsets
>>> and relays the work to a KafkaRDD. How is the execution locality compared
>>> to the receiver-based approach?
>>>
>>> thanks, Gerard.
>>>
>>
>>
>>
>> --
>>
>> Regards,
>> Rishitesh Mishra,
>> SnappyData . (http://www.snappydata.io/)
>>
>> https://in.linkedin.com/in/rishiteshmishra
>>
>
>