Re: Phoenix as a source for Spark processing
Cool, that's a good find. Re-stating what you're seeing: the distribution of your HBase table (region splits) doesn't match an even distribution of the data in the HBase table. Some regions have more data than other regions. Typically, applications reading from HBase will launch workers based on the Region split points, or modulo some maximum number of "work items" (tasks, in your case, I'd guess). I'd take a look at the amount of data in HDFS for each region in your table, and see if you can find any skew. If there are large region(s), you can try to split them. Or, you can change the split threshold from the default of 10G (iirc) to a smaller number and let the system do it for you. On 3/15/18 5:49 AM, Stepan Migunov wrote: The table is about 300GB in hbase. I've done some more research and now my test is very simple - I'm tryng to calculate count of records of the table. No "distincts" and etc., just phoenixTableAsDataFrame(...).count(). And now I see the issue - Spark creates about 400 task (14 executors), starts calculation, speed is pretty good. Hbase shows about 1000 requests per second. But then Sparks stops tasks as completed. I can see that Spark have read only 20% of records, but completed 50% tasks. HBase shows only 100 requests per second. When Sparks "thinks" that 99% completed (only 5 tasks left), actually it read only 70% records. The rest of work will be done by 5 tasks with 1-2 request per second... Is the any way to force Spark distribute workload evenly? I have tried to pre-split my Phonix table (now it has about 1200 regions), but it did't help. -Original Message- From: Josh Elser [mailto:els...@apache.org] Sent: Friday, March 9, 2018 2:17 AM To: user@phoenix.apache.org Subject: Re: Phoenix as a source for Spark processing How large is each row in this case? Or, better yet, how large is the table in HBase? You're spreading out approximately 7 "clients" to each Regionserver fetching results (100/14). So, you should have pretty decent saturation from Spark into HBase. I'd be taking a look at the EXPLAIN plan for your SELECT DISTINCT to really understand what Phoenix is doing. For example, are you getting ample saturation of the resources that your servers have available (32core/128Gb memory is pretty good). Validating how busy Spark is actually keeping HBase, and how much time is spent transforming the data would be good. Or, another point, are you excessively scanning data in the system which you could otherwise preclude by a different rowkey structure via logic such as a skip-scan (which would be shown in the EXPLAIN plan). You may actually find that using the built-in UPSERT SELECT logic may out-perform the Spark integration since you aren't actually doing any transformation logic inside of Spark. On 3/5/18 3:14 PM, Stepan Migunov wrote: Hi Josh, thank you for response! Our cluster has 14 nodes (32 cores each/128 GB memory). The source Phoenix table contains about 1 billion records (100 columns). We start a Spark's job with about 100 executors. Spark executes SELECT from the source table (select 6 columns with DISTINCT) and writes down output to another Phoenix table. Expected that the target table will contains about 100 million records. HBase has 14 region servers, both tables salted with SALT_BUCKETS=42. Spark's job running via Yarn. -Original Message- From: Josh Elser [mailto:els...@apache.org] Sent: Monday, March 5, 2018 9:14 PM To: user@phoenix.apache.org Subject: Re: Phoenix as a source for Spark processing Hi Stepan, Can you better ballpark the Phoenix-Spark performance you've seen (e.g. how much hardware do you have, how many spark executors did you use, how many region servers)? Also, what versions of software are you using? I don't think there are any firm guidelines on how you can solve this problem, but you've found the tools available for you. * You can try Phoenix+Spark to run over the Phoenix tables in place * You can use Phoenix+Hive to offload the data into Hive for queries If Phoenix-Spark wasn't fast enough, I'd imagine using the Phoenix-Hive integration to query the data would be similarly not fast enough. It's possible that the bottleneck is something we could fix in the integration, or fix configuration of Spark and/or Phoenix. We'd need you to help quantify this better :) On 3/4/18 6:08 AM, Stepan Migunov wrote: In our software we need to combine fast interactive access to the data with quite complex data processing. I know that Phoenix intended for fast access, but hoped that also I could be able to use Phoenix as a source for complex processing with the Spark. Unfortunately, Phoenix + Spark shows very poor performance. E.g., querying big (about billion records) table with distinct takes about 2 hours. At the same time this task with Hive source takes a few minutes. Is it expected? Does it mean that Phoenix is absolutely not suitable for batch processing with spark and I should duplicate data to Hive and
Re: Direct HBase vs. Phoenix query performance
Hi Marcell, Yes, that's correct - the cache we build for the RHS is only kept around while the join query is being executed. It'd be interesting to explore keeping the cache around longer for cases like yours (and probably not too difficult). We'd need to keep a map that maps the RHS query to its hash join cache identifier and if found skip the running of the query. Would you mind filing a JIRA and we can explore further? Thanks, James On Wed, Mar 14, 2018 at 3:40 PM, Marcell Ortutaywrote: > A quick update--I did some inspection of the Phoenix codebase, and it > looks like my understanding of the coprocessor cache was incorrect. I > thought it was meant to be used across queries, eg. that the RHS of the > join would be saved for subsequent queries. In fact this is not the case, > the coprocessor cache is meant to live only for the duration of the query. > This explains the performance difference--Phoenix is re-running a long > subquery for each join, whereas my direct to HBase script saves those > results across queries. > > On Tue, Mar 13, 2018 at 4:56 PM, Marcell Ortutay > wrote: > >> Hi James, >> >> Thanks for the tips. Our row keys are (I think) reasonably optimized. >> I've made a gist which is an anonymized version of the query, and it >> indicates which conditions are / are not part of the PK. It is here: >> https://gist.github.com/ortutay23andme/12f03767db13343ee797c328a4d78c9c >> >> I don't (yet) have an anonymized version of the raw HBase Go script >> available, but after comparing the performance of the two, I've figured out >> the root cause. The query does a subquery to produce the LHS of one of the >> hash joins. This can be seen on L5 of the gist above. This subquery is >> quite long (~1sec) to execute and scans a few million rows. It is shared >> across all queries so in the raw HBase script I cached / re-used it for all >> queries. This has a (very large) performance benefit, in particular under >> high load. >> >> My understanding of Phoenix is that it is supposed to do the same thing. >> It seems like the hash join code has some mechanic for caching data for >> hash joining using the HBase coprocessor system. I would expect this cache >> to kick in, and only execute the large subquery once. Looking at the >> performance of the query (30sec timeouts after ~2qps), this doesn't seem to >> be happening. >> >> I'm wondering if my understanding of the Phoenix join cache is right. Is >> it correct to expect that it would cache the results of a subquery used in >> a join? If so, what are possible reasons why it would *not* do so? Any >> guidance on metrics / optimizations to look at would be appreciated. >> >> Thanks, >> Marcell >> >> On Thu, Mar 8, 2018 at 2:59 PM, James Taylor >> wrote: >> >>> Hi Marcell, >>> It'd be helpful to see the table DDL and the query too along with an >>> idea of how many regions might be involved in the query. If a query is a >>> commonly run query, usually you'll design the row key around optimizing it. >>> If you have other, simpler queries that have determined your row key, then >>> another alternative is to add one or more secondary indexes. Another common >>> technique is to denormalize your data in ways that precompute the join to >>> avoid having to do it at run time. >>> >>> With joins, make sure to order your tables from post filtered largest >>> (on LHS) to smallest (on RHS). Also, if you're joining on the PK of both >>> tables, you should use the USE_SORT_MERGE_JOIN hint. Another common tuning >>> exercise is around determining the best parallelization to use (i.e. >>> guidepost width) or even disabling parallelization for more than an entire >>> region's worth of data. >>> >>> It'd also be interesting to see the raw HBase code for a query of this >>> complexity. >>> >>> Thanks, >>> James >>> >>> On Thu, Mar 8, 2018 at 1:03 PM, Marcell Ortutay >>> wrote: >>> Hi, I am using Phoenix at my company for a large query that is meant to be run in real time as part of our application. The query involves several aggregations, anti-joins, and an inner query. Here is the (anonymized) query plan: https://gist.github.com/ortutay23andme/1da620472cc469e d2d8a6fdd0cc7eb01 The query performance on this is not great, it takes about 5sec to execute the query, and moreover it performs badly under load. If we run ~4qps of this query Phoenix starts to timeout and slow down a lot (queries take >30sec). For comparison, I wrote a simple Go script that runs a similar query talking directly to HBase. The performance on it is substantially better. It executes in ~1.5sec, and can handle loads of ~50-100qps on the same cluster. I'm wondering if anyone has ideas on what might be causing this difference in performance? Are there configs / optimizations we can do in Phoenix to bring the
RE: Phoenix as a source for Spark processing
The table is about 300GB in hbase. I've done some more research and now my test is very simple - I'm tryng to calculate count of records of the table. No "distincts" and etc., just phoenixTableAsDataFrame(...).count(). And now I see the issue - Spark creates about 400 task (14 executors), starts calculation, speed is pretty good. Hbase shows about 1000 requests per second. But then Sparks stops tasks as completed. I can see that Spark have read only 20% of records, but completed 50% tasks. HBase shows only 100 requests per second. When Sparks "thinks" that 99% completed (only 5 tasks left), actually it read only 70% records. The rest of work will be done by 5 tasks with 1-2 request per second... Is the any way to force Spark distribute workload evenly? I have tried to pre-split my Phonix table (now it has about 1200 regions), but it did't help. -Original Message- From: Josh Elser [mailto:els...@apache.org] Sent: Friday, March 9, 2018 2:17 AM To: user@phoenix.apache.org Subject: Re: Phoenix as a source for Spark processing How large is each row in this case? Or, better yet, how large is the table in HBase? You're spreading out approximately 7 "clients" to each Regionserver fetching results (100/14). So, you should have pretty decent saturation from Spark into HBase. I'd be taking a look at the EXPLAIN plan for your SELECT DISTINCT to really understand what Phoenix is doing. For example, are you getting ample saturation of the resources that your servers have available (32core/128Gb memory is pretty good). Validating how busy Spark is actually keeping HBase, and how much time is spent transforming the data would be good. Or, another point, are you excessively scanning data in the system which you could otherwise preclude by a different rowkey structure via logic such as a skip-scan (which would be shown in the EXPLAIN plan). You may actually find that using the built-in UPSERT SELECT logic may out-perform the Spark integration since you aren't actually doing any transformation logic inside of Spark. On 3/5/18 3:14 PM, Stepan Migunov wrote: > Hi Josh, thank you for response! > > Our cluster has 14 nodes (32 cores each/128 GB memory). The source > Phoenix table contains about 1 billion records (100 columns). We start > a Spark's job with about 100 executors. Spark executes SELECT from the > source table (select 6 columns with DISTINCT) and writes down output > to another Phoenix table. Expected that the target table will contains > about 100 million records. > HBase has 14 region servers, both tables salted with SALT_BUCKETS=42. > Spark's job running via Yarn. > > > -Original Message- > From: Josh Elser [mailto:els...@apache.org] > Sent: Monday, March 5, 2018 9:14 PM > To: user@phoenix.apache.org > Subject: Re: Phoenix as a source for Spark processing > > Hi Stepan, > > Can you better ballpark the Phoenix-Spark performance you've seen (e.g. > how much hardware do you have, how many spark executors did you use, > how many region servers)? Also, what versions of software are you using? > > I don't think there are any firm guidelines on how you can solve this > problem, but you've found the tools available for you. > > * You can try Phoenix+Spark to run over the Phoenix tables in place > * You can use Phoenix+Hive to offload the data into Hive for queries > > If Phoenix-Spark wasn't fast enough, I'd imagine using the > Phoenix-Hive integration to query the data would be similarly not fast > enough. > > It's possible that the bottleneck is something we could fix in the > integration, or fix configuration of Spark and/or Phoenix. We'd need > you to help quantify this better :) > > On 3/4/18 6:08 AM, Stepan Migunov wrote: >> In our software we need to combine fast interactive access to the >> data with quite complex data processing. I know that Phoenix intended >> for fast access, but hoped that also I could be able to use Phoenix >> as a source for complex processing with the Spark. Unfortunately, >> Phoenix + Spark shows very poor performance. E.g., querying big >> (about billion records) table with distinct takes about 2 hours. At >> the same time this task with Hive source takes a few minutes. Is it >> expected? Does it mean that Phoenix is absolutely not suitable for >> batch processing with spark and I should duplicate data to Hive and >> process it with Hive? >>