Re: Spark Streaming: Custom Receiver OOM consistently

2017-05-23 Thread Manish Malhotra
Thanks !

On Mon, May 22, 2017 at 5:58 PM kant kodali <kanth...@gmail.com> wrote:

> Well there are few things here.
>
> 1. What is the Spark Version?
>
cdh 1.6

2. You said there is OOM error but what is the cause that appears in the
> log message or stack trace? OOM can happen for various reasons and JVM
> usually specifies the cause in the error message.
>
GC heap reached. Will send some logs as well.

>
> 3. What is the driver and executor memory?
>
Driver : 4g
Executor: 40g

> 4. What is the receive throughput per second and what is the data size of
> an average message?
>
Msg size : 2KB
1/sec per receiver. Running 2 receivers.

5. What OS you are using ?
>

Red hat Linux.

StorageLevel.MEMORY_AND_DISK_SER_2 This means that after the receiver
> receives the data is replicated across worker nodes.
>
yes but after batch is finished or after few batches receiver and worker
nodes should discard the old data ?


>
>
>
> On Mon, May 22, 2017 at 5:20 PM, Manish Malhotra <
> manish.malhotra.w...@gmail.com> wrote:
>
>> thanks Alonso,
>>
>> Sorry, but there are some security reservations.
>>
>> But we can assume the receiver, is equivalent to writing a JMS based
>> custom receiver, where we register a message listener and for each message
>> delivered by JMS will be stored by calling store method of listener.
>>
>>
>> Something like :
>> https://github.com/tbfenet/spark-jms-receiver/blob/master/src/main/scala/org/apache/spark/streaming/jms/JmsReceiver.scala
>>
>> Though the diff is here this JMS receiver is using block generator and
>> the calling store.
>> I m calling store when I receive message.
>> And also I'm not using block generator.
>> Not sure if that something will make the memory to balloon up.
>>
>> Btw I also run the same message consumer code from standalone map and
>> never seen this memory issue.
>>
>> On Sun, May 21, 2017 at 10:20 AM, Alonso Isidoro Roman <
>> alons...@gmail.com> wrote:
>>
>>> could you share the code?
>>>
>>> Alonso Isidoro Roman
>>> [image: https://]about.me/alonso.isidoro.roman
>>>
>>> <https://about.me/alonso.isidoro.roman?promo=email_sig_source=email_sig_medium=email_sig_campaign=external_links>
>>>
>>> 2017-05-20 7:54 GMT+02:00 Manish Malhotra <
>>> manish.malhotra.w...@gmail.com>:
>>>
>>>> Hello,
>>>>
>>>> have implemented Java based custom receiver, which consumes from
>>>> messaging system say JMS.
>>>> once received message, I call store(object) ... Im storing spark Row
>>>> object.
>>>>
>>>> it run for around 8 hrs, and then goes OOM, and OOM is happening in
>>>> receiver nodes.
>>>> I also tried to run multiple receivers, to distribute the load but
>>>> faces the same issue.
>>>>
>>>> something fundamentally we are doing wrong, which tells custom 
>>>> receiver/spark
>>>> to release the memory.
>>>> but Im not able to crack that, atleast till now.
>>>>
>>>> any help is appreciated !!
>>>>
>>>> Regards,
>>>> Manish
>>>>
>>>>
>>>
>>
>


Re: Spark Streaming: Custom Receiver OOM consistently

2017-05-22 Thread Manish Malhotra
thanks Alonso,

Sorry, but there are some security reservations.

But we can assume the receiver, is equivalent to writing a JMS based custom
receiver, where we register a message listener and for each message
delivered by JMS will be stored by calling store method of listener.


Something like :
https://github.com/tbfenet/spark-jms-receiver/blob/master/src/main/scala/org/apache/spark/streaming/jms/JmsReceiver.scala

Though the diff is here this JMS receiver is using block generator and the
calling store.
I m calling store when I receive message.
And also I'm not using block generator.
Not sure if that something will make the memory to balloon up.

Btw I also run the same message consumer code from standalone map and never
seen this memory issue.

On Sun, May 21, 2017 at 10:20 AM, Alonso Isidoro Roman <alons...@gmail.com>
wrote:

> could you share the code?
>
> Alonso Isidoro Roman
> [image: https://]about.me/alonso.isidoro.roman
>
> <https://about.me/alonso.isidoro.roman?promo=email_sig_source=email_sig_medium=email_sig_campaign=external_links>
>
> 2017-05-20 7:54 GMT+02:00 Manish Malhotra <manish.malhotra.w...@gmail.com>
> :
>
>> Hello,
>>
>> have implemented Java based custom receiver, which consumes from
>> messaging system say JMS.
>> once received message, I call store(object) ... Im storing spark Row
>> object.
>>
>> it run for around 8 hrs, and then goes OOM, and OOM is happening in
>> receiver nodes.
>> I also tried to run multiple receivers, to distribute the load but faces
>> the same issue.
>>
>> something fundamentally we are doing wrong, which tells custom receiver/spark
>> to release the memory.
>> but Im not able to crack that, atleast till now.
>>
>> any help is appreciated !!
>>
>> Regards,
>> Manish
>>
>>
>


Spark Streaming: Custom Receiver OOM consistently

2017-05-19 Thread Manish Malhotra
Hello,

have implemented Java based custom receiver, which consumes from messaging
system say JMS.
once received message, I call store(object) ... Im storing spark Row object.

it run for around 8 hrs, and then goes OOM, and OOM is happening in receiver
 nodes.
I also tried to run multiple receivers, to distribute the load but faces
the same issue.

something fundamentally we are doing wrong, which tells custom receiver/spark
to release the memory.
but Im not able to crack that, atleast till now.

any help is appreciated !!

Regards,
Manish


Re: [Spark Streamiing] Streaming job failing consistently after 1h

2017-05-19 Thread Manish Malhotra
Im also facing same problem.

I have implemented Java based custom receiver, which consumes from
messaging system say JMS.
once received message, I call store(object) ... Im storing spark Row object.

it run for around 8 hrs, and then goes OOM, and OOM is happening in
receiver nodes.
I also tried to run multiple receivers, to distribute the load but faces
the same issue.

something fundamentally we are doing wrong, which tells custom
receiver/spark to release the memory.
but Im not able to crack that, atleast till now.

any help is appreciated spark group !!

Regards,
Manish



On Sun, Mar 5, 2017 at 6:37 PM, Charles O. Bajomo <
charles.baj...@pretechconsulting.co.uk> wrote:

> Hello all,
>
> I have a strange behaviour I can't understand. I have a streaming job
> using a custom java receiver that pull data from a jms queue that I process
> and then write to HDFS as parquet and avro files. For some reason my job
> keeps failing after 1hr and 30 minutes. When It fails I get an error saying
> the "container is running beyond physical memory limits. Current Usage
> 4.5GB of 4.5GB physical memory used. 6.4GB of 9.4GB virtual memory used. ".
> to be honest I don;t understand the error,  What are the memory limits
> shown in the error referring to? I allocated 10 executors with 6 cores each
> and 4G of executor and driver memory. I set the overhead memory to 2.8G, so
> the values don't add up.
>
> Anyone have any idea what the error is referring? I have increased the
> memory and i didn't help, it appears it just bought me more time.
>
> Thanks.
>


Re: RDD getPartitions() size and HashPartitioner numPartitions

2016-12-04 Thread Manish Malhotra
Its a pretty nice question !
I'll trying to understand the problem, and see can help further.

When you say CustomRDD I believe you will using it in the  transformation
stage, once the data is read from a file source like HDFS or Cassandra or
Kafka.

Now the RDD.getPartitions() should return the partitions its having, and in
case of HashPartitioner (which will be used in functions like reduceByKey),
the partition of the key will be identified like

partition_num=(key.hashCode%numOfParttions) ..

so when the RDD(partitions) reaches to nodes (reducer phase) say after
reduceByKey, groupByKey can have few partitions based on the keys it
contains which are mapped to partitions.

So, I believe it is not required to match the numOfPartitons of the
HashPartitoner to the getPartitions of the RDD.

Thanks,
Manish



On Fri, Dec 2, 2016 at 1:53 PM, Amit Sela  wrote:

> This might be a silly question, but I wanted to make sure, when
> implementing my own RDD, if using a HashPartitioner as the RDD's
> partitioner the number of partitions returned by the implementation of
> getPartitions() has to match the number of partitions set in the
> HashPartitioner, correct ?
>


Re: What benefits do we really get out of colocation?

2016-12-03 Thread Manish Malhotra
thanks for sharing number as well !

Now a days even network can be with very high throughput, and might out
perform the disk, but as Sean mentioned data on network will have other
dependencies like network hops, like if its across rack, which can have
switch in between.

But yes people are discussing and talking about Mesos + high performance
network and not worried about the colocation for various use cases.

AWS emphmerial is not good for reliable storage file system, EBS is the
expensive alternative :)

On Sat, Dec 3, 2016 at 1:12 AM, kant kodali  wrote:

> Thanks Sean! Just for the record I am currently seeing 95 MB/s RX (Receive
> throughput ) on my spark worker machine when I do `sudo iftop -B`
>
> The problem with instance store on AWS is that they all are ephemeral so
> placing Cassandra on top doesn't make a lot of sense. so In short, AWS
> doesn't seem to be the right place for colocating in theory. I would still
> give you the benefit of doubt and colocate :) but just the numbers are not
> reflecting significant margins in terms of performance gains for AWS
>
>
> On Sat, Dec 3, 2016 at 12:56 AM, Sean Owen  wrote:
>
>> I'm sure he meant that this is downside to not colocating.
>> You are asking the right question. While networking is traditionally much
>> slower than disk, that changes a bit in the cloud, where attached storage
>> is remote too.
>> The disk throughput here is mostly achievable in normal workloads.
>> However I think you'll find it's going to be much harder to get 1Gbps out
>> of network transfers. That's just the speed of the local interface, and of
>> course the transfer speed depends on hops across the network beyond that.
>> Network latency is going to be higher than disk too, though that's not as
>> much an issue in this context.
>>
>> On Sat, Dec 3, 2016 at 8:42 AM kant kodali  wrote:
>>
>>> wait, how is that a benefit? isn't that a bad thing if you are saying
>>> colocating leads to more latency  and overall execution time is longer?
>>>
>>> On Sat, Dec 3, 2016 at 12:34 AM, vincent gromakowski <
>>> vincent.gromakow...@gmail.com> wrote:
>>>
>>> You get more latency on reads so overall execution time is longer
>>>
>>> Le 3 déc. 2016 7:39 AM, "kant kodali"  a écrit :
>>>
>>>
>>> I wonder what benefits do I really I get If I colocate my spark worker
>>> process and Cassandra server process on each node?
>>>
>>> I understand the concept of moving compute towards the data instead of
>>> moving data towards computation but It sounds more like one is trying to
>>> optimize for network latency.
>>>
>>> Majority of my nodes (m4.xlarge)  have 1Gbps = 125MB/s (Megabytes per
>>> second) Network throughput.
>>>
>>> and the DISK throughput for m4.xlarge is 93.75 MB/s (link below)
>>>
>>> http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/EBSOptimized.html
>>>
>>> so In this case I don't see how colocation can help even if there is one
>>> to one mapping from spark worker node to a colocated Cassandra node where
>>> say we are doing a table scan of billion rows ?
>>>
>>> Thanks!
>>>
>>>
>>>
>


Re: Spark Streaming: question on sticky session across batches ?

2016-11-15 Thread Manish Malhotra
Thanks!
On Tue, Nov 15, 2016 at 1:07 AM Takeshi Yamamuro <linguin@gmail.com>
wrote:

> - dev
>
> Hi,
>
> AFAIK, if you use RDDs only, you can control the partition mapping to some
> extent
> by using a partition key RDD[(key, data)].
> A defined partitioner distributes data into partitions depending on the
> key.
> As a good example to control partitions, you can see the GraphX code;
>
> https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala#L291
>
> GraphX holds `PartitionId` in edge RDDs to control the partition where
> edge data are.
>
> // maropu
>
>
> On Tue, Nov 15, 2016 at 5:19 AM, Manish Malhotra <
> manish.malhotra.w...@gmail.com> wrote:
>
> sending again.
> any help is appreciated !
>
> thanks in advance.
>
> On Thu, Nov 10, 2016 at 8:42 AM, Manish Malhotra <
> manish.malhotra.w...@gmail.com> wrote:
>
> Hello Spark Devs/Users,
>
> Im trying to solve the use case with Spark Streaming 1.6.2 where for every
> batch ( say 2 mins) data needs to go to the same reducer node after
> grouping by key.
> The underlying storage is Cassandra and not HDFS.
>
> This is a map-reduce job, where also trying to use the partitions of the
> Cassandra table to batch the data for the same partition.
>
> The requirement of sticky session/partition across batches is because the
> operations which we need to do, needs to read data for every key and then
> merge this with the current batch aggregate values. So, currently when
> there is no stickyness across batches, we have to read for every key, merge
> and then write back. and reads are very expensive. So, if we have sticky
> session, we can avoid read in every batch and have a cache of till last
> batch aggregates across batches.
>
> So, there are few options, can think of:
>
> 1. to change the TaskSchedulerImpl, as its using Random to identify the
> node for mapper/reducer before starting the batch/phase.
> Not sure if there is a custom scheduler way of achieving it?
>
> 2. Can custom RDD can help to find the node for the key-->node.
> there is a getPreferredLocation() method.
> But not sure, whether this will be persistent or can vary for some edge
> cases?
>
> Thanks in advance for you help and time !
>
> Regards,
> Manish
>
>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Spark Streaming: question on sticky session across batches ?

2016-11-14 Thread Manish Malhotra
sending again.
any help is appreciated !

thanks in advance.

On Thu, Nov 10, 2016 at 8:42 AM, Manish Malhotra <
manish.malhotra.w...@gmail.com> wrote:

> Hello Spark Devs/Users,
>
> Im trying to solve the use case with Spark Streaming 1.6.2 where for every
> batch ( say 2 mins) data needs to go to the same reducer node after
> grouping by key.
> The underlying storage is Cassandra and not HDFS.
>
> This is a map-reduce job, where also trying to use the partitions of the
> Cassandra table to batch the data for the same partition.
>
> The requirement of sticky session/partition across batches is because the
> operations which we need to do, needs to read data for every key and then
> merge this with the current batch aggregate values. So, currently when
> there is no stickyness across batches, we have to read for every key, merge
> and then write back. and reads are very expensive. So, if we have sticky
> session, we can avoid read in every batch and have a cache of till last
> batch aggregates across batches.
>
> So, there are few options, can think of:
>
> 1. to change the TaskSchedulerImpl, as its using Random to identify the
> node for mapper/reducer before starting the batch/phase.
> Not sure if there is a custom scheduler way of achieving it?
>
> 2. Can custom RDD can help to find the node for the key-->node.
> there is a getPreferredLocation() method.
> But not sure, whether this will be persistent or can vary for some edge
> cases?
>
> Thanks in advance for you help and time !
>
> Regards,
> Manish
>


Spark Streaming: question on sticky session across batches ?

2016-11-10 Thread Manish Malhotra
Hello Spark Devs/Users,

Im trying to solve the use case with Spark Streaming 1.6.2 where for every
batch ( say 2 mins) data needs to go to the same reducer node after
grouping by key.
The underlying storage is Cassandra and not HDFS.

This is a map-reduce job, where also trying to use the partitions of the
Cassandra table to batch the data for the same partition.

The requirement of sticky session/partition across batches is because the
operations which we need to do, needs to read data for every key and then
merge this with the current batch aggregate values. So, currently when
there is no stickyness across batches, we have to read for every key, merge
and then write back. and reads are very expensive. So, if we have sticky
session, we can avoid read in every batch and have a cache of till last
batch aggregates across batches.

So, there are few options, can think of:

1. to change the TaskSchedulerImpl, as its using Random to identify the
node for mapper/reducer before starting the batch/phase.
Not sure if there is a custom scheduler way of achieving it?

2. Can custom RDD can help to find the node for the key-->node.
there is a getPreferredLocation() method.
But not sure, whether this will be persistent or can vary for some edge
cases?

Thanks in advance for you help and time !

Regards,
Manish