Re: Phoenix as a source for Spark processing

2018-03-15 Thread Josh Elser
Cool, that's a good find. Re-stating what you're seeing: the 
distribution of your HBase table (region splits) doesn't match an even 
distribution of the data in the HBase table. Some regions have more data 
than other regions.


Typically, applications reading from HBase will launch workers based on 
the Region split points, or modulo some maximum number of "work items" 
(tasks, in your case, I'd guess).


I'd take a look at the amount of data in HDFS for each region in your 
table, and see if you can find any skew. If there are large region(s), 
you can try to split them. Or, you can change the split threshold from 
the default of 10G (iirc) to a smaller number and let the system do it 
for you.


On 3/15/18 5:49 AM, Stepan Migunov wrote:

The table is about 300GB in hbase.
I've done some more research and now my test is very simple - I'm tryng to
calculate count of records of the table. No "distincts" and etc., just
phoenixTableAsDataFrame(...).count().

And now I see the issue - Spark creates about 400 task (14 executors),
starts calculation, speed is pretty good. Hbase shows about 1000 requests
per second. But then Sparks stops tasks as completed. I can see that Spark
have read only 20% of records, but completed 50% tasks. HBase shows only 100
requests per second. When Sparks "thinks" that 99% completed (only 5 tasks
left), actually it read only 70% records. The rest of work will be done by 5
tasks with 1-2 request per second...

Is the any way to force Spark distribute workload evenly? I have tried to
pre-split my Phonix table (now it has about 1200 regions), but it did't
help.

-Original Message-
From: Josh Elser [mailto:els...@apache.org]
Sent: Friday, March 9, 2018 2:17 AM
To: user@phoenix.apache.org
Subject: Re: Phoenix as a source for Spark processing

How large is each row in this case? Or, better yet, how large is the table
in HBase?

You're spreading out approximately 7 "clients" to each Regionserver fetching
results (100/14). So, you should have pretty decent saturation from Spark
into HBase.

I'd be taking a look at the EXPLAIN plan for your SELECT DISTINCT to really
understand what Phoenix is doing. For example, are you getting ample
saturation of the resources that your servers have available (32core/128Gb
memory is pretty good). Validating how busy Spark is actually keeping HBase,
and how much time is spent transforming the data would be good. Or, another
point, are you excessively scanning data in the system which you could
otherwise preclude by a different rowkey structure via logic such as a
skip-scan (which would be shown in the EXPLAIN plan).

You may actually find that using the built-in UPSERT SELECT logic may
out-perform the Spark integration since you aren't actually doing any
transformation logic inside of Spark.


On 3/5/18 3:14 PM, Stepan Migunov wrote:

Hi Josh, thank you for response!

Our cluster has 14 nodes (32 cores each/128 GB memory). The source
Phoenix table contains about 1 billion records (100 columns). We start
a Spark's job with about 100 executors. Spark executes SELECT from the
source table (select 6 columns with DISTINCT) and writes down output
to another Phoenix table. Expected that the target table will contains
about 100 million records.
HBase has 14 region servers, both tables salted with SALT_BUCKETS=42.
Spark's job running via Yarn.


-Original Message-
From: Josh Elser [mailto:els...@apache.org]
Sent: Monday, March 5, 2018 9:14 PM
To: user@phoenix.apache.org
Subject: Re: Phoenix as a source for Spark processing

Hi Stepan,

Can you better ballpark the Phoenix-Spark performance you've seen (e.g.
how much hardware do you have, how many spark executors did you use,
how many region servers)? Also, what versions of software are you using?

I don't think there are any firm guidelines on how you can solve this
problem, but you've found the tools available for you.

* You can try Phoenix+Spark to run over the Phoenix tables in place
* You can use Phoenix+Hive to offload the data into Hive for queries

If Phoenix-Spark wasn't fast enough, I'd imagine using the
Phoenix-Hive integration to query the data would be similarly not fast
enough.

It's possible that the bottleneck is something we could fix in the
integration, or fix configuration of Spark and/or Phoenix. We'd need
you to help quantify this better :)

On 3/4/18 6:08 AM, Stepan Migunov wrote:

In our software we need to combine fast interactive access to the
data with quite complex data processing. I know that Phoenix intended
for fast access, but hoped that also I could be able to use Phoenix
as a source for complex processing with the Spark.  Unfortunately,
Phoenix + Spark shows very poor performance. E.g., querying big
(about billion records) table with distinct takes about 2 hours. At
the same time this task with Hive source takes a few minutes. Is it
expected? Does it mean that Phoenix is absolutely not suitable for
batch processing with spark and I should duplicate data to Hive and

Re: Direct HBase vs. Phoenix query performance

2018-03-15 Thread James Taylor
Hi Marcell,
Yes, that's correct - the cache we build for the RHS is only kept around
while the join query is being executed. It'd be interesting to explore
keeping the cache around longer for cases like yours (and probably not too
difficult). We'd need to keep a map that maps the RHS query to its hash
join cache identifier and if found skip the running of the query. Would you
mind filing a JIRA and we can explore further?
Thanks,
James

On Wed, Mar 14, 2018 at 3:40 PM, Marcell Ortutay 
wrote:

> A quick update--I did some inspection of the Phoenix codebase, and it
> looks like my understanding of the coprocessor cache was incorrect. I
> thought it was meant to be used across queries, eg. that the RHS of the
> join would be saved for subsequent queries. In fact this is not the case,
> the coprocessor cache is meant to live only for the duration of the query.
> This explains the performance difference--Phoenix is re-running a long
> subquery for each join, whereas my direct to HBase script saves those
> results across queries.
>
> On Tue, Mar 13, 2018 at 4:56 PM, Marcell Ortutay 
> wrote:
>
>> Hi James,
>>
>> Thanks for the tips. Our row keys are (I think) reasonably optimized.
>> I've made a gist which is an anonymized version of the query, and it
>> indicates which conditions are / are not part of the PK. It is here:
>> https://gist.github.com/ortutay23andme/12f03767db13343ee797c328a4d78c9c
>>
>> I don't (yet) have an anonymized version of the raw HBase Go script
>> available, but after comparing the performance of the two, I've figured out
>> the root cause. The query does a subquery to produce the LHS of one of the
>> hash joins. This can be seen on L5 of the gist above. This subquery is
>> quite long (~1sec) to execute and scans a few million rows. It is shared
>> across all queries so in the raw HBase script I cached / re-used it for all
>> queries. This has a (very large) performance benefit, in particular under
>> high load.
>>
>> My understanding of Phoenix is that it is supposed to do the same thing.
>> It seems like the hash join code has some mechanic for caching data for
>> hash joining using the HBase coprocessor system. I would expect this cache
>> to kick in, and only execute the large subquery once. Looking at the
>> performance of the query (30sec timeouts after ~2qps), this doesn't seem to
>> be happening.
>>
>> I'm wondering if my understanding of the Phoenix join cache is right. Is
>> it correct to expect that it would cache the results of a subquery used in
>> a join? If so, what are possible reasons why it would *not* do so? Any
>> guidance on metrics / optimizations to look at would be appreciated.
>>
>> Thanks,
>> Marcell
>>
>> On Thu, Mar 8, 2018 at 2:59 PM, James Taylor 
>> wrote:
>>
>>> Hi Marcell,
>>> It'd be helpful to see the table DDL and the query too along with an
>>> idea of how many regions might be involved in the query. If a query is a
>>> commonly run query, usually you'll design the row key around optimizing it.
>>> If you have other, simpler queries that have determined your row key, then
>>> another alternative is to add one or more secondary indexes. Another common
>>> technique is to denormalize your data in ways that precompute the join to
>>> avoid having to do it at run time.
>>>
>>> With joins, make sure to order your tables from post filtered largest
>>> (on LHS) to smallest (on RHS). Also, if you're joining on the PK of both
>>> tables, you should use the USE_SORT_MERGE_JOIN hint. Another common tuning
>>> exercise is around determining the best parallelization to use (i.e.
>>> guidepost width) or even disabling parallelization for more than an entire
>>> region's worth of data.
>>>
>>> It'd also be interesting to see the raw HBase code for a query of this
>>> complexity.
>>>
>>> Thanks,
>>> James
>>>
>>> On Thu, Mar 8, 2018 at 1:03 PM, Marcell Ortutay 
>>> wrote:
>>>
 Hi,

 I am using Phoenix at my company for a large query that is meant to be
 run in real time as part of our application. The query involves several
 aggregations, anti-joins, and an inner query. Here is the (anonymized)
 query plan: https://gist.github.com/ortutay23andme/1da620472cc469e
 d2d8a6fdd0cc7eb01

 The query performance on this is not great, it takes about 5sec to
 execute the query, and moreover it performs badly under load. If we run
 ~4qps of this query Phoenix starts to timeout and slow down a lot (queries
 take >30sec).

 For comparison, I wrote a simple Go script that runs a similar query
 talking directly to HBase. The performance on it is substantially better.
 It executes in ~1.5sec, and can handle loads of ~50-100qps on the same
 cluster.

 I'm wondering if anyone has ideas on what might be causing this
 difference in performance? Are there configs / optimizations we can do in
 Phoenix to bring the 

RE: Phoenix as a source for Spark processing

2018-03-15 Thread Stepan Migunov
The table is about 300GB in hbase.
I've done some more research and now my test is very simple - I'm tryng to
calculate count of records of the table. No "distincts" and etc., just
phoenixTableAsDataFrame(...).count().

And now I see the issue - Spark creates about 400 task (14 executors),
starts calculation, speed is pretty good. Hbase shows about 1000 requests
per second. But then Sparks stops tasks as completed. I can see that Spark
have read only 20% of records, but completed 50% tasks. HBase shows only 100
requests per second. When Sparks "thinks" that 99% completed (only 5 tasks
left), actually it read only 70% records. The rest of work will be done by 5
tasks with 1-2 request per second...

Is the any way to force Spark distribute workload evenly? I have tried to
pre-split my Phonix table (now it has about 1200 regions), but it did't
help.

-Original Message-
From: Josh Elser [mailto:els...@apache.org]
Sent: Friday, March 9, 2018 2:17 AM
To: user@phoenix.apache.org
Subject: Re: Phoenix as a source for Spark processing

How large is each row in this case? Or, better yet, how large is the table
in HBase?

You're spreading out approximately 7 "clients" to each Regionserver fetching
results (100/14). So, you should have pretty decent saturation from Spark
into HBase.

I'd be taking a look at the EXPLAIN plan for your SELECT DISTINCT to really
understand what Phoenix is doing. For example, are you getting ample
saturation of the resources that your servers have available (32core/128Gb
memory is pretty good). Validating how busy Spark is actually keeping HBase,
and how much time is spent transforming the data would be good. Or, another
point, are you excessively scanning data in the system which you could
otherwise preclude by a different rowkey structure via logic such as a
skip-scan (which would be shown in the EXPLAIN plan).

You may actually find that using the built-in UPSERT SELECT logic may
out-perform the Spark integration since you aren't actually doing any
transformation logic inside of Spark.


On 3/5/18 3:14 PM, Stepan Migunov wrote:
> Hi Josh, thank you for response!
>
> Our cluster has 14 nodes (32 cores each/128 GB memory). The source
> Phoenix table contains about 1 billion records (100 columns). We start
> a Spark's job with about 100 executors. Spark executes SELECT from the
> source table (select 6 columns with DISTINCT) and writes down output
> to another Phoenix table. Expected that the target table will contains
> about 100 million records.
> HBase has 14 region servers, both tables salted with SALT_BUCKETS=42.
> Spark's job running via Yarn.
>
>
> -Original Message-
> From: Josh Elser [mailto:els...@apache.org]
> Sent: Monday, March 5, 2018 9:14 PM
> To: user@phoenix.apache.org
> Subject: Re: Phoenix as a source for Spark processing
>
> Hi Stepan,
>
> Can you better ballpark the Phoenix-Spark performance you've seen (e.g.
> how much hardware do you have, how many spark executors did you use,
> how many region servers)? Also, what versions of software are you using?
>
> I don't think there are any firm guidelines on how you can solve this
> problem, but you've found the tools available for you.
>
> * You can try Phoenix+Spark to run over the Phoenix tables in place
> * You can use Phoenix+Hive to offload the data into Hive for queries
>
> If Phoenix-Spark wasn't fast enough, I'd imagine using the
> Phoenix-Hive integration to query the data would be similarly not fast
> enough.
>
> It's possible that the bottleneck is something we could fix in the
> integration, or fix configuration of Spark and/or Phoenix. We'd need
> you to help quantify this better :)
>
> On 3/4/18 6:08 AM, Stepan Migunov wrote:
>> In our software we need to combine fast interactive access to the
>> data with quite complex data processing. I know that Phoenix intended
>> for fast access, but hoped that also I could be able to use Phoenix
>> as a source for complex processing with the Spark.  Unfortunately,
>> Phoenix + Spark shows very poor performance. E.g., querying big
>> (about billion records) table with distinct takes about 2 hours. At
>> the same time this task with Hive source takes a few minutes. Is it
>> expected? Does it mean that Phoenix is absolutely not suitable for
>> batch processing with spark and I should duplicate data to Hive and
>> process it with Hive?
>>