Using Spark for high concurrent load tasks

2015-12-28 Thread Aliaksei Tsyvunchyk
Hello Spark community,

We have a project where we want to use Spark as computation engine to perform 
calculations and return result via REST services.
Working with Spark we have learned how to do things to make it work faster and 
finally optimize our code to produce results in acceptable time (1-2 seconds). 
But when we tried to test it under concurrent load we realizes that time grows 
significantly while increasing amount of concurrent requests. This was expected 
and we are trying to find the way to scale our solution to get acceptable time 
under concurrent load, and here we faced with fact that adding more slave 
servers not increasing average timing while having several concurrent requests.

As for now I observing following behavior: While hitting our test REST service 
using 100 threads and having 1 master and one slave node we have average timing 
for those 100 requests 30.6 seconds/request, in case we add 2 slave nodes 
average time becomes 29.8 seconds/request, which seems pretty similar to test 
case with one slave node.  While doing those test cases we monitor server load 
using htop and weird thing here that in first case our slave node’s CPU’s were 
loaded on 90-95% and in second case with 2 slaves it loads CPU’s on 45-50%.
We are trying to find bottleneck in our solution but was not succeed in this 
exercise yet. We have checked all hardware for possible bottleneck Network IO, 
Disk IO, RAM, CPU but no-one seems to be even closed to limit.

Our major suspects at this moment is Spark configuration. Our application is 
Self Contained Spark application, Spark is ruined in standalone mode (without 
external resource managers). We are not submitting jar to spark using shell 
script, instead we create spark context in our spring boot application and it 
connects to spark master and slave by itself. All requests to spark are going 
through internal thread pool. We have experimented with thread pool sizes and 
find out the best performance appears when we have 16 threads in thread pool, 
where each thread is performing one or several manipulations with RDDs, and by 
itself could be considered as spark job.

For now I assume we either misconfigured Spark due to lack of experience with 
it, or our use case is not really use case for Spark and it is simply not 
designed for big parallel load. I’ve make this conclusion since behavior when 
after adding new nodes we have no performance gain doesn’t makes any sense for 
me. So any help and ideas would be really helpful.


Hardware details:
Azure D5v2 instances for master and 3 slaves. D5v2 featured with 2.4 GHz 
E5-2673 v3 (16 cores, 56Gb RAM, SSD HDD). Using ipref we have tested network 
speed and it is around 1 Gb/s.



-- 


CONFIDENTIALITY NOTICE: This email and files attached to it are 
confidential. If you are not the intended recipient you are hereby notified 
that using, copying, distributing or taking any action in reliance on the 
contents of this information is strictly prohibited. If you have received 
this email in error please notify the sender and delete this email.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



DataFrame.toJavaRDD cause fetching data to driver, is it expected ?

2015-11-04 Thread Aliaksei Tsyvunchyk
Hello folks,

Recently I have noticed unexpectedly big network traffic between Driver Program 
and Worker node.
During debugging I have figured out that it is caused by following block of 
code 

—— Java ——— — 
DataFrame etpvRecords = context.sql(" SOME SQL query here");
Mapper m = new Mapper(localValue, ProgramId::toProgId);
return etpvRecords
.toJavaRDD()
.map(m::mapHutPutViewingRow)
.reduce(Reducer::reduce);
—— Java  

I’m using debug breakpoint and OS X nettop to monitor traffic between 
processes. So before approaching line toJavaRDD() I have 500Kb of traffic and 
after executing this line I have 2.2 Mb of traffic. But when I check size of 
result of reduce function it is 10 Kb.
So .toJavaRDD() seems causing worker process return dataset to driver process 
and seems further map/reduce occurs on Driver. 

This is definitely not expected by me, so I have 2 questions.
1.  Is it really expected behavior that DataFrame.toJavaRDD cause whole dataset 
return to driver or I’m doing something wrong?
2.  What is expected way to perform transformation with DataFrame using custom 
Java map\reduce functions in case if standard SQL features are not fit all my 
needs?

Env: Spark 1.5.1 in standalone mode, (1 master, 1 worker sharing same machine). 
Java 1.8.0_60.
-- 


CONFIDENTIALITY NOTICE: This email and files attached to it are 
confidential. If you are not the intended recipient you are hereby notified 
that using, copying, distributing or taking any action in reliance on the 
contents of this information is strictly prohibited. If you have received 
this email in error please notify the sender and delete this email.


Re: DataFrame.toJavaRDD cause fetching data to driver, is it expected ?

2015-11-04 Thread Aliaksei Tsyvunchyk
Hello Romi,

Do you mean that in my particular case I’m causing computation on dataFrame or 
it is regular behavior of DataFrame.toJavaRDD ? 
If it’s regular behavior, do you know which approach could be used to perform 
make/reduce on dataFrame without causing it to load all data to driver program ?

> On Nov 4, 2015, at 12:34 PM, Romi Kuntsman <r...@totango.com> wrote:
> 
> I noticed that toJavaRDD causes a computation on the DataFrame, so is it 
> considered an action, even though logically it's a transformation?
> 
> On Nov 4, 2015 6:51 PM, "Aliaksei Tsyvunchyk" <atsyvunc...@exadel.com 
> <mailto:atsyvunc...@exadel.com>> wrote:
> Hello folks,
> 
> Recently I have noticed unexpectedly big network traffic between Driver 
> Program and Worker node.
> During debugging I have figured out that it is caused by following block of 
> code 
> 
> —— Java ——— — 
> DataFrame etpvRecords = context.sql(" SOME SQL query here");
> Mapper m = new Mapper(localValue, ProgramId::toProgId);
> return etpvRecords
> .toJavaRDD()
> .map(m::mapHutPutViewingRow)
> .reduce(Reducer::reduce);
> —— Java  
> 
> I’m using debug breakpoint and OS X nettop to monitor traffic between 
> processes. So before approaching line toJavaRDD() I have 500Kb of traffic and 
> after executing this line I have 2.2 Mb of traffic. But when I check size of 
> result of reduce function it is 10 Kb.
> So .toJavaRDD() seems causing worker process return dataset to driver process 
> and seems further map/reduce occurs on Driver. 
> 
> This is definitely not expected by me, so I have 2 questions.
> 1.  Is it really expected behavior that DataFrame.toJavaRDD cause whole 
> dataset return to driver or I’m doing something wrong?
> 2.  What is expected way to perform transformation with DataFrame using 
> custom Java map\reduce functions in case if standard SQL features are not fit 
> all my needs?
> 
> Env: Spark 1.5.1 in standalone mode, (1 master, 1 worker sharing same 
> machine). Java 1.8.0_60.
> 
> CONFIDENTIALITY NOTICE: This email and files attached to it are confidential. 
> If you are not the intended recipient you are hereby notified that using, 
> copying, distributing or taking any action in reliance on the contents of 
> this information is strictly prohibited. If you have received this email in 
> error please notify the sender and delete this email.
> 


-- 


CONFIDENTIALITY NOTICE: This email and files attached to it are 
confidential. If you are not the intended recipient you are hereby notified 
that using, copying, distributing or taking any action in reliance on the 
contents of this information is strictly prohibited. If you have received 
this email in error please notify the sender and delete this email.


Re: DataFrame.toJavaRDD cause fetching data to driver, is it expected ?

2015-11-04 Thread Aliaksei Tsyvunchyk
Hi Romi,

Thank for pointing me. I quite new in Spark and not sure how it can help when 
I’ll check number of partitions in DF and RDD, so if you can give me some 
explanation it would be really helpful. Link to documentation will also help.

> On Nov 4, 2015, at 1:05 PM, Romi Kuntsman <r...@totango.com> wrote:
> 
> In my program I move between RDD and DataFrame several times.
> I know that the entire data of the DF doesn't go into the driver because it 
> wouldn't fit there.
> But calling toJavaRDD does cause computation.
> 
> Check the number of partitions you have on the DF and RDD...
> 
> On Nov 4, 2015 7:54 PM, "Aliaksei Tsyvunchyk" <atsyvunc...@exadel.com 
> <mailto:atsyvunc...@exadel.com>> wrote:
> Hello Romi,
> 
> Do you mean that in my particular case I’m causing computation on dataFrame 
> or it is regular behavior of DataFrame.toJavaRDD ? 
> If it’s regular behavior, do you know which approach could be used to perform 
> make/reduce on dataFrame without causing it to load all data to driver 
> program ?
> 
>> On Nov 4, 2015, at 12:34 PM, Romi Kuntsman <r...@totango.com 
>> <mailto:r...@totango.com>> wrote:
>> 
>> I noticed that toJavaRDD causes a computation on the DataFrame, so is it 
>> considered an action, even though logically it's a transformation?
>> 
>> On Nov 4, 2015 6:51 PM, "Aliaksei Tsyvunchyk" <atsyvunc...@exadel.com 
>> <mailto:atsyvunc...@exadel.com>> wrote:
>> Hello folks,
>> 
>> Recently I have noticed unexpectedly big network traffic between Driver 
>> Program and Worker node.
>> During debugging I have figured out that it is caused by following block of 
>> code 
>> 
>> —— Java ——— — 
>> DataFrame etpvRecords = context.sql(" SOME SQL query here");
>> Mapper m = new Mapper(localValue, ProgramId::toProgId);
>> return etpvRecords
>> .toJavaRDD()
>> .map(m::mapHutPutViewingRow)
>> .reduce(Reducer::reduce);
>> —— Java  
>> 
>> I’m using debug breakpoint and OS X nettop to monitor traffic between 
>> processes. So before approaching line toJavaRDD() I have 500Kb of traffic 
>> and after executing this line I have 2.2 Mb of traffic. But when I check 
>> size of result of reduce function it is 10 Kb.
>> So .toJavaRDD() seems causing worker process return dataset to driver 
>> process and seems further map/reduce occurs on Driver. 
>> 
>> This is definitely not expected by me, so I have 2 questions.
>> 1.  Is it really expected behavior that DataFrame.toJavaRDD cause whole 
>> dataset return to driver or I’m doing something wrong?
>> 2.  What is expected way to perform transformation with DataFrame using 
>> custom Java map\reduce functions in case if standard SQL features are not 
>> fit all my needs?
>> 
>> Env: Spark 1.5.1 in standalone mode, (1 master, 1 worker sharing same 
>> machine). Java 1.8.0_60.
>> 
>> CONFIDENTIALITY NOTICE: This email and files attached to it are 
>> confidential. If you are not the intended recipient you are hereby notified 
>> that using, copying, distributing or taking any action in reliance on the 
>> contents of this information is strictly prohibited. If you have received 
>> this email in error please notify the sender and delete this email.
>> 
> 
> 
> CONFIDENTIALITY NOTICE: This email and files attached to it are confidential. 
> If you are not the intended recipient you are hereby notified that using, 
> copying, distributing or taking any action in reliance on the contents of 
> this information is strictly prohibited. If you have received this email in 
> error please notify the sender and delete this email.
> 


-- 


CONFIDENTIALITY NOTICE: This email and files attached to it are 
confidential. If you are not the intended recipient you are hereby notified 
that using, copying, distributing or taking any action in reliance on the 
contents of this information is strictly prohibited. If you have received 
this email in error please notify the sender and delete this email.


Whether Spark is appropriate for our use case.

2015-10-20 Thread Aliaksei Tsyvunchyk
Hello all community members,

I need opinion of people who was using Spark before and can share there 
experience to help me select technical approach.
I have a project in Proof Of Concept phase, where we are evaluating possibility 
of Spark usage for our use case. 
Here is brief task description.
We should process big amount of raw data to calculate ratings. We have 
different type of textual source data. This is just text lines which represents 
different type of input data (we call them type 20, type 24, type 26, type 33, 
etc).
To perform calculations we should make joins between diffrerent type of raw 
data - event records (which represents actual user action) and users 
description records (which represents person which performs action) and 
sometimes with userGroup record (we group all users by some criteria).
All ratings are calculated on daily basis and our dataset could be partitioned 
by date (except probably reference data).


So we have tried to implement it using possibly most obvious way, we parse text 
file, store data in parquet format and trying to use sparkSQL to query data and 
perform calculation.
Experimenting with sparkSQL I’ve noticed that SQL query speed decreased 
proportionally to data size growth. Base on this I assume that SparkSQL 
performs full records scan while servicing my SQL queries.

So here are the questions I’m trying to find answers:
1.  Is parquet format appropriate for storing data in our case (to efficiently 
query data)? Could it be more suitable to have some DB as storage which could 
filter data efficiently before it gets to Spark processing engine ?
2.  For now we assume that joins we are doing for calculations slowing down 
execution. As alternatives we consider denormalizing data and join it on 
parsing phase, but this increase data volume Spark should handle (due to the 
duplicates we will get). Is it valid approach? Would it be better if we create 
2 RDD, from Parquet files filter them out and next join without sparkSQL 
involvement?  Or joins in SparkSQL are fine and we should look for performance 
bottlenecks in different place?
3.  Should we look closer on Cloudera Impala? As I know it is working over the 
same parquet files and I’m wondering whether it gives better performance for 
data querying ?
4.  90% of results we need could be pre-calculated since they are not change 
after one day of data is loaded. So I think it makes sense to keep this 
pre-calculated data in some DB storage which give me best performance while 
querying by key. Now I’m consider to use Cassandra for this purpose due to it’s 
perfect scalability and performance. Could somebody provide any other options 
we can consider ?

Thanks in Advance,
Any opinion will be helpful and greatly appreciated
-- 


CONFIDENTIALITY NOTICE: This email and files attached to it are 
confidential. If you are not the intended recipient you are hereby notified 
that using, copying, distributing or taking any action in reliance on the 
contents of this information is strictly prohibited. If you have received 
this email in error please notify the sender and delete this email.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org