Using Spark for high concurrent load tasks
Hello Spark community, We have a project where we want to use Spark as computation engine to perform calculations and return result via REST services. Working with Spark we have learned how to do things to make it work faster and finally optimize our code to produce results in acceptable time (1-2 seconds). But when we tried to test it under concurrent load we realizes that time grows significantly while increasing amount of concurrent requests. This was expected and we are trying to find the way to scale our solution to get acceptable time under concurrent load, and here we faced with fact that adding more slave servers not increasing average timing while having several concurrent requests. As for now I observing following behavior: While hitting our test REST service using 100 threads and having 1 master and one slave node we have average timing for those 100 requests 30.6 seconds/request, in case we add 2 slave nodes average time becomes 29.8 seconds/request, which seems pretty similar to test case with one slave node. While doing those test cases we monitor server load using htop and weird thing here that in first case our slave node’s CPU’s were loaded on 90-95% and in second case with 2 slaves it loads CPU’s on 45-50%. We are trying to find bottleneck in our solution but was not succeed in this exercise yet. We have checked all hardware for possible bottleneck Network IO, Disk IO, RAM, CPU but no-one seems to be even closed to limit. Our major suspects at this moment is Spark configuration. Our application is Self Contained Spark application, Spark is ruined in standalone mode (without external resource managers). We are not submitting jar to spark using shell script, instead we create spark context in our spring boot application and it connects to spark master and slave by itself. All requests to spark are going through internal thread pool. We have experimented with thread pool sizes and find out the best performance appears when we have 16 threads in thread pool, where each thread is performing one or several manipulations with RDDs, and by itself could be considered as spark job. For now I assume we either misconfigured Spark due to lack of experience with it, or our use case is not really use case for Spark and it is simply not designed for big parallel load. I’ve make this conclusion since behavior when after adding new nodes we have no performance gain doesn’t makes any sense for me. So any help and ideas would be really helpful. Hardware details: Azure D5v2 instances for master and 3 slaves. D5v2 featured with 2.4 GHz E5-2673 v3 (16 cores, 56Gb RAM, SSD HDD). Using ipref we have tested network speed and it is around 1 Gb/s. -- CONFIDENTIALITY NOTICE: This email and files attached to it are confidential. If you are not the intended recipient you are hereby notified that using, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited. If you have received this email in error please notify the sender and delete this email. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
DataFrame.toJavaRDD cause fetching data to driver, is it expected ?
Hello folks, Recently I have noticed unexpectedly big network traffic between Driver Program and Worker node. During debugging I have figured out that it is caused by following block of code —— Java ——— — DataFrame etpvRecords = context.sql(" SOME SQL query here"); Mapper m = new Mapper(localValue, ProgramId::toProgId); return etpvRecords .toJavaRDD() .map(m::mapHutPutViewingRow) .reduce(Reducer::reduce); —— Java I’m using debug breakpoint and OS X nettop to monitor traffic between processes. So before approaching line toJavaRDD() I have 500Kb of traffic and after executing this line I have 2.2 Mb of traffic. But when I check size of result of reduce function it is 10 Kb. So .toJavaRDD() seems causing worker process return dataset to driver process and seems further map/reduce occurs on Driver. This is definitely not expected by me, so I have 2 questions. 1. Is it really expected behavior that DataFrame.toJavaRDD cause whole dataset return to driver or I’m doing something wrong? 2. What is expected way to perform transformation with DataFrame using custom Java map\reduce functions in case if standard SQL features are not fit all my needs? Env: Spark 1.5.1 in standalone mode, (1 master, 1 worker sharing same machine). Java 1.8.0_60. -- CONFIDENTIALITY NOTICE: This email and files attached to it are confidential. If you are not the intended recipient you are hereby notified that using, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited. If you have received this email in error please notify the sender and delete this email.
Re: DataFrame.toJavaRDD cause fetching data to driver, is it expected ?
Hello Romi, Do you mean that in my particular case I’m causing computation on dataFrame or it is regular behavior of DataFrame.toJavaRDD ? If it’s regular behavior, do you know which approach could be used to perform make/reduce on dataFrame without causing it to load all data to driver program ? > On Nov 4, 2015, at 12:34 PM, Romi Kuntsman <r...@totango.com> wrote: > > I noticed that toJavaRDD causes a computation on the DataFrame, so is it > considered an action, even though logically it's a transformation? > > On Nov 4, 2015 6:51 PM, "Aliaksei Tsyvunchyk" <atsyvunc...@exadel.com > <mailto:atsyvunc...@exadel.com>> wrote: > Hello folks, > > Recently I have noticed unexpectedly big network traffic between Driver > Program and Worker node. > During debugging I have figured out that it is caused by following block of > code > > —— Java ——— — > DataFrame etpvRecords = context.sql(" SOME SQL query here"); > Mapper m = new Mapper(localValue, ProgramId::toProgId); > return etpvRecords > .toJavaRDD() > .map(m::mapHutPutViewingRow) > .reduce(Reducer::reduce); > —— Java > > I’m using debug breakpoint and OS X nettop to monitor traffic between > processes. So before approaching line toJavaRDD() I have 500Kb of traffic and > after executing this line I have 2.2 Mb of traffic. But when I check size of > result of reduce function it is 10 Kb. > So .toJavaRDD() seems causing worker process return dataset to driver process > and seems further map/reduce occurs on Driver. > > This is definitely not expected by me, so I have 2 questions. > 1. Is it really expected behavior that DataFrame.toJavaRDD cause whole > dataset return to driver or I’m doing something wrong? > 2. What is expected way to perform transformation with DataFrame using > custom Java map\reduce functions in case if standard SQL features are not fit > all my needs? > > Env: Spark 1.5.1 in standalone mode, (1 master, 1 worker sharing same > machine). Java 1.8.0_60. > > CONFIDENTIALITY NOTICE: This email and files attached to it are confidential. > If you are not the intended recipient you are hereby notified that using, > copying, distributing or taking any action in reliance on the contents of > this information is strictly prohibited. If you have received this email in > error please notify the sender and delete this email. > -- CONFIDENTIALITY NOTICE: This email and files attached to it are confidential. If you are not the intended recipient you are hereby notified that using, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited. If you have received this email in error please notify the sender and delete this email.
Re: DataFrame.toJavaRDD cause fetching data to driver, is it expected ?
Hi Romi, Thank for pointing me. I quite new in Spark and not sure how it can help when I’ll check number of partitions in DF and RDD, so if you can give me some explanation it would be really helpful. Link to documentation will also help. > On Nov 4, 2015, at 1:05 PM, Romi Kuntsman <r...@totango.com> wrote: > > In my program I move between RDD and DataFrame several times. > I know that the entire data of the DF doesn't go into the driver because it > wouldn't fit there. > But calling toJavaRDD does cause computation. > > Check the number of partitions you have on the DF and RDD... > > On Nov 4, 2015 7:54 PM, "Aliaksei Tsyvunchyk" <atsyvunc...@exadel.com > <mailto:atsyvunc...@exadel.com>> wrote: > Hello Romi, > > Do you mean that in my particular case I’m causing computation on dataFrame > or it is regular behavior of DataFrame.toJavaRDD ? > If it’s regular behavior, do you know which approach could be used to perform > make/reduce on dataFrame without causing it to load all data to driver > program ? > >> On Nov 4, 2015, at 12:34 PM, Romi Kuntsman <r...@totango.com >> <mailto:r...@totango.com>> wrote: >> >> I noticed that toJavaRDD causes a computation on the DataFrame, so is it >> considered an action, even though logically it's a transformation? >> >> On Nov 4, 2015 6:51 PM, "Aliaksei Tsyvunchyk" <atsyvunc...@exadel.com >> <mailto:atsyvunc...@exadel.com>> wrote: >> Hello folks, >> >> Recently I have noticed unexpectedly big network traffic between Driver >> Program and Worker node. >> During debugging I have figured out that it is caused by following block of >> code >> >> —— Java ——— — >> DataFrame etpvRecords = context.sql(" SOME SQL query here"); >> Mapper m = new Mapper(localValue, ProgramId::toProgId); >> return etpvRecords >> .toJavaRDD() >> .map(m::mapHutPutViewingRow) >> .reduce(Reducer::reduce); >> —— Java >> >> I’m using debug breakpoint and OS X nettop to monitor traffic between >> processes. So before approaching line toJavaRDD() I have 500Kb of traffic >> and after executing this line I have 2.2 Mb of traffic. But when I check >> size of result of reduce function it is 10 Kb. >> So .toJavaRDD() seems causing worker process return dataset to driver >> process and seems further map/reduce occurs on Driver. >> >> This is definitely not expected by me, so I have 2 questions. >> 1. Is it really expected behavior that DataFrame.toJavaRDD cause whole >> dataset return to driver or I’m doing something wrong? >> 2. What is expected way to perform transformation with DataFrame using >> custom Java map\reduce functions in case if standard SQL features are not >> fit all my needs? >> >> Env: Spark 1.5.1 in standalone mode, (1 master, 1 worker sharing same >> machine). Java 1.8.0_60. >> >> CONFIDENTIALITY NOTICE: This email and files attached to it are >> confidential. If you are not the intended recipient you are hereby notified >> that using, copying, distributing or taking any action in reliance on the >> contents of this information is strictly prohibited. If you have received >> this email in error please notify the sender and delete this email. >> > > > CONFIDENTIALITY NOTICE: This email and files attached to it are confidential. > If you are not the intended recipient you are hereby notified that using, > copying, distributing or taking any action in reliance on the contents of > this information is strictly prohibited. If you have received this email in > error please notify the sender and delete this email. > -- CONFIDENTIALITY NOTICE: This email and files attached to it are confidential. If you are not the intended recipient you are hereby notified that using, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited. If you have received this email in error please notify the sender and delete this email.
Whether Spark is appropriate for our use case.
Hello all community members, I need opinion of people who was using Spark before and can share there experience to help me select technical approach. I have a project in Proof Of Concept phase, where we are evaluating possibility of Spark usage for our use case. Here is brief task description. We should process big amount of raw data to calculate ratings. We have different type of textual source data. This is just text lines which represents different type of input data (we call them type 20, type 24, type 26, type 33, etc). To perform calculations we should make joins between diffrerent type of raw data - event records (which represents actual user action) and users description records (which represents person which performs action) and sometimes with userGroup record (we group all users by some criteria). All ratings are calculated on daily basis and our dataset could be partitioned by date (except probably reference data). So we have tried to implement it using possibly most obvious way, we parse text file, store data in parquet format and trying to use sparkSQL to query data and perform calculation. Experimenting with sparkSQL I’ve noticed that SQL query speed decreased proportionally to data size growth. Base on this I assume that SparkSQL performs full records scan while servicing my SQL queries. So here are the questions I’m trying to find answers: 1. Is parquet format appropriate for storing data in our case (to efficiently query data)? Could it be more suitable to have some DB as storage which could filter data efficiently before it gets to Spark processing engine ? 2. For now we assume that joins we are doing for calculations slowing down execution. As alternatives we consider denormalizing data and join it on parsing phase, but this increase data volume Spark should handle (due to the duplicates we will get). Is it valid approach? Would it be better if we create 2 RDD, from Parquet files filter them out and next join without sparkSQL involvement? Or joins in SparkSQL are fine and we should look for performance bottlenecks in different place? 3. Should we look closer on Cloudera Impala? As I know it is working over the same parquet files and I’m wondering whether it gives better performance for data querying ? 4. 90% of results we need could be pre-calculated since they are not change after one day of data is loaded. So I think it makes sense to keep this pre-calculated data in some DB storage which give me best performance while querying by key. Now I’m consider to use Cassandra for this purpose due to it’s perfect scalability and performance. Could somebody provide any other options we can consider ? Thanks in Advance, Any opinion will be helpful and greatly appreciated -- CONFIDENTIALITY NOTICE: This email and files attached to it are confidential. If you are not the intended recipient you are hereby notified that using, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited. If you have received this email in error please notify the sender and delete this email. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org