James, Is async upsert/select query server route is an option now? Or is that something that will have to be developed over time?
Cheers, --Alex. On Fri, Sep 4, 2015 at 10:44 PM, James Taylor <[email protected]> wrote: > Hi Alex, > You can execute that pig script in a single UPSERT SELECT call in Phoenix > and it'll perform far better. What's the motivation for using Pig? If the > answer is "I don't want to tie up client threads for that amount of time", > then one answer to that is to use our query server[1]. That'd still use one > client thread, but with an UPSERT SELECT you'd be ok running it > asynchronously (please file a JIRA for this if you're interested in > pursuing it). > > FYI, our Pig integration does not support any aggregation in the LOAD > command as Phoenix expects to do the final merge. It'd be interesting to > explore being able to invoke the server-side aggregation and have the > client-side final merge be done by Pig. I believe Pig exposes an API for > that, but Phoenix doesn't have an API for a partial aggregation. We're > looking into integrating with Apache Drill and I suspect this kind of thing > will be possible through our Calcite integration. > > Thanks, > James > > [1] https://phoenix.apache.org/server.html > > On Thu, Sep 3, 2015 at 11:05 AM, Alex Warshavsky < > [email protected]> wrote: > >> Hello all, >> >> Expanding earlier email conversation to wider audience to share the >> wealth. Please read email chain below to get the context if needed. >> >> Followup question. Here's a simple PIG script that loads data from >> Phoenix, performs aggregation, and writes aggregations back to Phoenix. >> Idea is to rollup counts every hour, hence the WHERE clause by EVENT_TIME. >> >> raw = LOAD 'hbase://query/SELECT TENANT_ID, USER_ID FROM LOGINS WHERE >> EVENT_TIME > 8/1/2015 AND EVENT_TIME < 8/2/2015' >> >> >> by_org_and_app = GROUP raw BY TENANT_ID, USER_ID >> >> >> count_by_user = FOREACH by_org_and_app GENERATE group AS TENANT_ID, >> USER_ID, COUNT(raw) >> >> >> STORE count_by_user into 'hbase://METRICS’ >> >> For script above, since GROUP follows the LOAD in PIG, is the best way to >> push GROUP filter to the SELECT WHERE clause? I want to ensure that the >> subsequent rollups are correct for each TENANT_ID/USER_ID without loading >> the whole table. >> >> >> raw = LOAD 'hbase://query/SELECT TENANT_ID, USER_ID FROM LOGINS WHERE >> EVENT_TIME > 8/1/2015 AND EVENT_TIME < 8/2/2015' *AND TENANT_ID = >> '123456789'* >> >> >> by_org_and_app = GROUP raw BY TENANT_ID, USER_ID >> >> The flip side is that the idea was to use PIG to Map/Reduce aggregations >> (rollups) across tenants rather than processing one TENANT at a time. If >> tenants have to processed one at a time, a loop will have to be created >> which I was trying to avoid. >> >> Thanks, >> --Alex. >> >> On Thu, Aug 27, 2015 at 7:44 PM, Ravi Kiran wrote: >> >>> Hi Alex, >>> I believe the way Pig works is to apply predicate pushdown to LOAD >>> and once the data is loaded , applies the operators like GROUP and COUNT. >>> Hence, the rollup in your example above would happen as a subsequent step. >>> Currently, the PhoenixHBaseLoader doesn't implement >>> LoadPredicatePushdown hence, any FILTER operators after LOAD doesn't get >>> pushed to HBase. In these cases, its good to go with the SELECT query >>> approach and have the WHERE clause to get the predicate pushdown feature. >>> >>> I also would recommend running the EXPLAIN operator on the script to see >>> the execution plan. >>> >>> Sorry for the delay in my response, just got back from office. >>> >>> Regards >>> Ravi >>> >>> On Thu, Aug 27, 2015 at 12:54 PM, Alex Warshavsky wrote: >>> >>>> Hi Ravi, >>>> >>>> Thanks a lot for your response. I wanted to clarify relationship >>>> between splits and PIG operations like GROUP, COUNT, and DISTINCT. >>>> >>>> Let's talk about an example. >>>> >>>> In the example below, GROUPing happens to be by TENANT_ID and USER_ID. >>>> When LOAD happens, does the split take care of the rollup being calculated >>>> for all rows for a given TENANT_ID? Is the data read in chunks (by >>>> split logic) and then rolled up using map/reduce for each of the groups? >>>> What about cases like DISTINCT (count(distinct) equivalent) where >>>> *all* of the data for a given group needs to be available to perform final >>>> calculation. >>>> >>>> Effectively on Map/Reduce level we're talking about TENANT_ID and >>>> USER_ID being the keys and Reduce happening for each key (COUNT). >>>> >>>> # Calculate number of logins per user for a given day. Rollup by >>>> TENANT_ID. >>>> >>>> raw = LOAD 'hbase://query/SELECT TENANT_ID, USER_ID FROM LOGINS WHERE >>>> EVENT_TIME > 8/1/2015 AND EVENT_TIME < 8/2/2015' >>>> >>>> by_org_and_app = GROUP raw BY TENANT_ID, USER_ID >>>> >>>> count_by_user = FOREACH by_org_and_app GENERATE group AS TENANT_ID, >>>> USER_ID, COUNT(raw) >>>> >>>> STORE count_by_user into 'hbase://METRICS’ >>>> >>>> Cheers, >>>> --Alex. >>>> >>>> On Wed, Aug 26, 2015 at 7:42 PM, Ravi Kiran wrote: >>>> >>>>> Hi Soumen, >>>>> >>>>> Below are the series of steps that happens for the given LOAD >>>>> operator >>>>> >>>>> a) Parse the information passed to the LOAD operator and generate a >>>>> SELECT query with the necessary columns requested. You can also pass a >>>>> SELECT query directly. >>>>> b) From the SELECT query, we generate the QueryPlan and thereby get >>>>> the splits. This is all done in the PhoenixInputFormat. >>>>> c) Each split pulls records from the table. Kindly note the splits >>>>> dont map to region boundaries . They just have a start and stop keys. >>>>> d) We then transform the returned columns onto Pig datatype . You can >>>>> look further into TypeUtil . >>>>> >>>>> Coming to you questions, >>>>> >>>>> >>>>> - Could you suggest how the data is being loaded into Pig from >>>>> Phoenix/HBase? We are trying to evaluate whether there is data movement >>>>> from HBase to MapReduce hdfs during the Load. >>>>> *RAVI*: No data movement happens from Hbase to HDFS. Its a direct >>>>> SCAN from HBase table. >>>>> >>>>> - Is the data processed in chunks? Is there a concept of batchSize >>>>> in LOAD function similar to STORE batchSize? >>>>> *RAVI*: Yes. 1 chunk is 1 split. I am not sure on the use of >>>>> batchSize as PIG cannot execute any downstream operators like GROUP , >>>>> JOIN >>>>> till all data is loaded. Please feel free to revert if I am not on the >>>>> same >>>>> page as you on this question. >>>>> >>>>> - Can the Load be reused for multi-query on Phoenix? We are >>>>> looking to process the same load data using multiple Group+Store >>>>> *RAVI*: I am hoping you are looking into using Multi-query >>>>> optimization of Pig. https://issues.apache.org/jira/browse/PIG-627 >>>>> . I would definitely recommend using that if you have multiple STORE. >>>>> >>>>> Please feel free to reach out to me if you need any information. I am >>>>> HTH. >>>>> >>>>> Regards >>>>> Ravi >>>>> >>>>> On Wed, Aug 26, 2015 at 4:31 PM, Soumen Bandyopadhyay wrote: >>>>> >>>>>> Hello Ravi, >>>>>> >>>>>> We are developers at Salesforce looking into the Pig integration >>>>>> <https://phoenix.apache.org/pig_integration.html> with Phoenix. We >>>>>> are looking into the LOAD function and have some questions around its >>>>>> implementation. >>>>>> >>>>>> >>>>>> - Could you suggest how the data is being loaded into Pig from >>>>>> Phoenix/HBase? We are trying to evaluate whether there is data >>>>>> movement >>>>>> from HBase to MapReduce hdfs during the Load. >>>>>> - Is the data processed in chunks? Is there a concept of >>>>>> batchSize in LOAD function similar to STORE batchSize? >>>>>> - Can the Load be reused for multi-query on Phoenix? We are >>>>>> looking to process the same load data using multiple Group+Store. >>>>>> >>>>>> Thank you for your input in this regard. >>>>>> >>>>>> -- >>>>>> Thanks, >>>>>> >>>>>> Soumen >>>>>> >>>>> >>>>> >>>> >>> >> >
