Help needed: Out of memory with windowing functions
Hi all, I have an event table with (user_id, timestamp, event) and I'm trying to write a query to get the first 10 events for each user. My query goes like this : SELECT user_id, event FROM ( SELECT user_id, event, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY time) as rownum FROM eventTable ) T WHERE rownum = 10 However, the table may contain millions of events for the same user and I'm getting an OutOfMemory Error in the reduce phase, inside the following method: org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber$RowNumberBuffer.incr(GenericUDAFRowNumber.java:80) It seems that the windowing functions were designed to store a Buffer containing all results for each PARTITION, and writes everything once all rows of that partition have been read. This make windowing with Hive not very scalable... My questions are: a) Is there a reason why it was implemented this way rather than in a streaming fashion? b) Do you know how I could rewrite the query to avoid the problem (if possible without having to write my own UDF)? Thanks, Furcy
Re: Help needed: Out of memory with windowing functions
Hi Furcy, Sorry that you run into this, but this is a non-trivial problem. But, good news is Harish has done some good work in this area. See following links. All this will be available in upcoming release, date for which is under discussion on dev list. 1. https://issues.apache.org/jira/browse/HIVE-6999 2. https://issues.apache.org/jira/browse/HIVE-7062 3. https://issues.apache.org/jira/browse/HIVE-7063 4. https://issues.apache.org/jira/browse/HIVE-7143 5. https://issues.apache.org/jira/browse/HIVE-7344 Hope it helps, Ashutosh On Wed, Aug 20, 2014 at 5:34 AM, Furcy Pin furcy@flaminem.com wrote: Hi all, I have an event table with (user_id, timestamp, event) and I'm trying to write a query to get the first 10 events for each user. My query goes like this : SELECT user_id, event FROM ( SELECT user_id, event, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY time) as rownum FROM eventTable ) T WHERE rownum = 10 However, the table may contain millions of events for the same user and I'm getting an OutOfMemory Error in the reduce phase, inside the following method: org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber$RowNumberBuffer.incr(GenericUDAFRowNumber.java:80) It seems that the windowing functions were designed to store a Buffer containing all results for each PARTITION, and writes everything once all rows of that partition have been read. This make windowing with Hive not very scalable... My questions are: a) Is there a reason why it was implemented this way rather than in a streaming fashion? b) Do you know how I could rewrite the query to avoid the problem (if possible without having to write my own UDF)? Thanks, Furcy
Re: Help needed: Out of memory with windowing functions
Thank you very much for your answer Ashutosh, it seems non-trivial indeed ! 2014-08-20 17:51 GMT+02:00 Ashutosh Chauhan hashut...@apache.org: Hi Furcy, Sorry that you run into this, but this is a non-trivial problem. But, good news is Harish has done some good work in this area. See following links. All this will be available in upcoming release, date for which is under discussion on dev list. 1. https://issues.apache.org/jira/browse/HIVE-6999 2. https://issues.apache.org/jira/browse/HIVE-7062 3. https://issues.apache.org/jira/browse/HIVE-7063 4. https://issues.apache.org/jira/browse/HIVE-7143 5. https://issues.apache.org/jira/browse/HIVE-7344 Hope it helps, Ashutosh On Wed, Aug 20, 2014 at 5:34 AM, Furcy Pin furcy@flaminem.com wrote: Hi all, I have an event table with (user_id, timestamp, event) and I'm trying to write a query to get the first 10 events for each user. My query goes like this : SELECT user_id, event FROM ( SELECT user_id, event, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY time) as rownum FROM eventTable ) T WHERE rownum = 10 However, the table may contain millions of events for the same user and I'm getting an OutOfMemory Error in the reduce phase, inside the following method: org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber$RowNumberBuffer.incr(GenericUDAFRowNumber.java:80) It seems that the windowing functions were designed to store a Buffer containing all results for each PARTITION, and writes everything once all rows of that partition have been read. This make windowing with Hive not very scalable... My questions are: a) Is there a reason why it was implemented this way rather than in a streaming fashion? b) Do you know how I could rewrite the query to avoid the problem (if possible without having to write my own UDF)? Thanks, Furcy