[jira] [Commented] (SPARK-17450) spark sql rownumber OOM

2016-11-17 Thread cen yuhai (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15675639#comment-15675639
 ] 

cen yuhai commented on SPARK-17450:
---

I will upgrade to 2.x, please close this issue

> spark sql rownumber OOM
> ---
>
> Key: SPARK-17450
> URL: https://issues.apache.org/jira/browse/SPARK-17450
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.2
>Reporter: cen yuhai
>
> spark sql will be OOM when using row_number() over too much sorted records... 
> There will be only 1 task to handle all records
> This sql group by passenger_id,  we have 100 million passengers.
> {code} 
>  SELECT
> passenger_id,
> total_order,
> (CASE WHEN row_number() over (ORDER BY total_order DESC) BETWEEN 0 AND 
> 670800 THEN 'V3' END) AS order_rank
>   FROM
>   (
> SELECT
>   passenger_id,
>   1 as total_order
> FROM table
> GROUP BY passenger_id
>   ) dd1
> {code}
> {code}
> java.lang.OutOfMemoryError: Java heap space
> at 
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:536)
> at 
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:93)
> at 
> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.fetchNextPartition(Window.scala:278)
> at 
> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:304)
> at 
> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:246)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:512)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:686)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:74)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:104)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:247)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> physical plan:
> {code}
> == Physical Plan ==
> Project [passenger_id#7L,total_order#0,CASE WHEN ((_we0#20 >= 0) && (_we1#21 
> <= 670800)) THEN V3 AS order_rank#1]
> +- Window [passenger_id#7L,total_order#0], 
> [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber()
>  windowspecdefinition(total_order#0 DESC,ROWS BETWEEN UNBOUNDED PRECEDING AND 
> UNBOUNDED FOLLOWING) AS 
> _we0#20,HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber()
>  windowspecdefinition(total_order#0 DESC,ROWS BETWEEN UNBOUNDED PRECEDING AND 
> UNBOUNDED FOLLOWING) AS _we1#21], [total_order#0 DESC]
>+- Sort [total_order#0 DESC], false, 0
>   +- TungstenExchange SinglePartition, None
>  +- Project [passenger_id#7L,total_order#0]
> +- TungstenAggregate(key=[passenger_id#7L], functions=[], 
> output=[passenger_id#7L,total_order#0])
>+- TungstenExchange hashpartitioning(passenger_id#7L,1000), 
> None
>   +- TungstenAggregate(key=[passenger_id#7L], functions=[], 
> output=[passenger_id#7L])
>  +- Project [passenger_id#7L]
> +- Filter product#9 IN (kuai,gulf)
>+- HiveTableScan [passenger_id#7L,product#9], 
> MetastoreRelation pbs_dw, dwv_order_whole_day, None,
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, 

[jira] [Commented] (SPARK-17450) spark sql rownumber OOM

2016-11-16 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15671411#comment-15671411
 ] 

Herman van Hovell commented on SPARK-17450:
---

[~cenyuhai] did you have any luck with merging these? Can we close this issue?

> spark sql rownumber OOM
> ---
>
> Key: SPARK-17450
> URL: https://issues.apache.org/jira/browse/SPARK-17450
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.2
>Reporter: cen yuhai
>
> spark sql will be OOM when using row_number() over too much sorted records... 
> There will be only 1 task to handle all records
> This sql group by passenger_id,  we have 100 million passengers.
> {code} 
>  SELECT
> passenger_id,
> total_order,
> (CASE WHEN row_number() over (ORDER BY total_order DESC) BETWEEN 0 AND 
> 670800 THEN 'V3' END) AS order_rank
>   FROM
>   (
> SELECT
>   passenger_id,
>   1 as total_order
> FROM table
> GROUP BY passenger_id
>   ) dd1
> {code}
> {code}
> java.lang.OutOfMemoryError: Java heap space
> at 
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:536)
> at 
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:93)
> at 
> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.fetchNextPartition(Window.scala:278)
> at 
> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:304)
> at 
> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:246)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:512)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:686)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:74)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:104)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:247)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> physical plan:
> {code}
> == Physical Plan ==
> Project [passenger_id#7L,total_order#0,CASE WHEN ((_we0#20 >= 0) && (_we1#21 
> <= 670800)) THEN V3 AS order_rank#1]
> +- Window [passenger_id#7L,total_order#0], 
> [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber()
>  windowspecdefinition(total_order#0 DESC,ROWS BETWEEN UNBOUNDED PRECEDING AND 
> UNBOUNDED FOLLOWING) AS 
> _we0#20,HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber()
>  windowspecdefinition(total_order#0 DESC,ROWS BETWEEN UNBOUNDED PRECEDING AND 
> UNBOUNDED FOLLOWING) AS _we1#21], [total_order#0 DESC]
>+- Sort [total_order#0 DESC], false, 0
>   +- TungstenExchange SinglePartition, None
>  +- Project [passenger_id#7L,total_order#0]
> +- TungstenAggregate(key=[passenger_id#7L], functions=[], 
> output=[passenger_id#7L,total_order#0])
>+- TungstenExchange hashpartitioning(passenger_id#7L,1000), 
> None
>   +- TungstenAggregate(key=[passenger_id#7L], functions=[], 
> output=[passenger_id#7L])
>  +- Project [passenger_id#7L]
> +- Filter product#9 IN (kuai,gulf)
>+- HiveTableScan [passenger_id#7L,product#9], 
> MetastoreRelation pbs_dw, dwv_order_whole_day, None,
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (SPARK-17450) spark sql rownumber OOM

2016-09-13 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15488005#comment-15488005
 ] 

Herman van Hovell commented on SPARK-17450:
---

https://github.com/apache/spark/pull/10605

> spark sql rownumber OOM
> ---
>
> Key: SPARK-17450
> URL: https://issues.apache.org/jira/browse/SPARK-17450
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.2
>Reporter: cen yuhai
>
> spark sql will be OOM when using row_number() over too much sorted records... 
> There will be only 1 task to handle all records
> This sql group by passenger_id,  we have 100 million passengers.
> {code} 
>  SELECT
> passenger_id,
> total_order,
> (CASE WHEN row_number() over (ORDER BY total_order DESC) BETWEEN 0 AND 
> 670800 THEN 'V3' END) AS order_rank
>   FROM
>   (
> SELECT
>   passenger_id,
>   1 as total_order
> FROM table
> GROUP BY passenger_id
>   ) dd1
> {code}
> {code}
> java.lang.OutOfMemoryError: Java heap space
> at 
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:536)
> at 
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:93)
> at 
> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.fetchNextPartition(Window.scala:278)
> at 
> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:304)
> at 
> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:246)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:512)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:686)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:74)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:104)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:247)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> physical plan:
> {code}
> == Physical Plan ==
> Project [passenger_id#7L,total_order#0,CASE WHEN ((_we0#20 >= 0) && (_we1#21 
> <= 670800)) THEN V3 AS order_rank#1]
> +- Window [passenger_id#7L,total_order#0], 
> [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber()
>  windowspecdefinition(total_order#0 DESC,ROWS BETWEEN UNBOUNDED PRECEDING AND 
> UNBOUNDED FOLLOWING) AS 
> _we0#20,HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber()
>  windowspecdefinition(total_order#0 DESC,ROWS BETWEEN UNBOUNDED PRECEDING AND 
> UNBOUNDED FOLLOWING) AS _we1#21], [total_order#0 DESC]
>+- Sort [total_order#0 DESC], false, 0
>   +- TungstenExchange SinglePartition, None
>  +- Project [passenger_id#7L,total_order#0]
> +- TungstenAggregate(key=[passenger_id#7L], functions=[], 
> output=[passenger_id#7L,total_order#0])
>+- TungstenExchange hashpartitioning(passenger_id#7L,1000), 
> None
>   +- TungstenAggregate(key=[passenger_id#7L], functions=[], 
> output=[passenger_id#7L])
>  +- Project [passenger_id#7L]
> +- Filter product#9 IN (kuai,gulf)
>+- HiveTableScan [passenger_id#7L,product#9], 
> MetastoreRelation pbs_dw, dwv_order_whole_day, None,
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To 

[jira] [Commented] (SPARK-17450) spark sql rownumber OOM

2016-09-09 Thread cen yuhai (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15479097#comment-15479097
 ] 

cen yuhai commented on SPARK-17450:
---

can you provide me davies's pr?

> spark sql rownumber OOM
> ---
>
> Key: SPARK-17450
> URL: https://issues.apache.org/jira/browse/SPARK-17450
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.2
>Reporter: cen yuhai
>
> spark sql will be OOM when using row_number() over too much sorted records... 
> There will be only 1 task to handle all records
> This sql group by passenger_id,  we have 100 million passengers.
> {code} 
>  SELECT
> passenger_id,
> total_order,
> (CASE WHEN row_number() over (ORDER BY total_order DESC) BETWEEN 0 AND 
> 670800 THEN 'V3' END) AS order_rank
>   FROM
>   (
> SELECT
>   passenger_id,
>   1 as total_order
> FROM table
> GROUP BY passenger_id
>   ) dd1
> {code}
> {code}
> java.lang.OutOfMemoryError: Java heap space
> at 
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:536)
> at 
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:93)
> at 
> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.fetchNextPartition(Window.scala:278)
> at 
> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:304)
> at 
> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:246)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:512)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:686)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:74)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:104)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:247)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> physical plan:
> {code}
> == Physical Plan ==
> Project [passenger_id#7L,total_order#0,CASE WHEN ((_we0#20 >= 0) && (_we1#21 
> <= 670800)) THEN V3 AS order_rank#1]
> +- Window [passenger_id#7L,total_order#0], 
> [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber()
>  windowspecdefinition(total_order#0 DESC,ROWS BETWEEN UNBOUNDED PRECEDING AND 
> UNBOUNDED FOLLOWING) AS 
> _we0#20,HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber()
>  windowspecdefinition(total_order#0 DESC,ROWS BETWEEN UNBOUNDED PRECEDING AND 
> UNBOUNDED FOLLOWING) AS _we1#21], [total_order#0 DESC]
>+- Sort [total_order#0 DESC], false, 0
>   +- TungstenExchange SinglePartition, None
>  +- Project [passenger_id#7L,total_order#0]
> +- TungstenAggregate(key=[passenger_id#7L], functions=[], 
> output=[passenger_id#7L,total_order#0])
>+- TungstenExchange hashpartitioning(passenger_id#7L,1000), 
> None
>   +- TungstenAggregate(key=[passenger_id#7L], functions=[], 
> output=[passenger_id#7L])
>  +- Project [passenger_id#7L]
> +- Filter product#9 IN (kuai,gulf)
>+- HiveTableScan [passenger_id#7L,product#9], 
> MetastoreRelation pbs_dw, dwv_order_whole_day, None,
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: 

[jira] [Commented] (SPARK-17450) spark sql rownumber OOM

2016-09-08 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474517#comment-15474517
 ] 

Herman van Hovell commented on SPARK-17450:
---

You could try. You would also have to add the follow-up by davies (that adds 
the spilling logic).

> spark sql rownumber OOM
> ---
>
> Key: SPARK-17450
> URL: https://issues.apache.org/jira/browse/SPARK-17450
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.2
>Reporter: cen yuhai
>
> spark sql will be OOM when using row_number() over too much sorted records... 
> There will be only 1 task to handle all records
> This sql group by passenger_id,  we have 100 million passengers.
> {code} 
>  SELECT
> passenger_id,
> total_order,
> (CASE WHEN row_number() over (ORDER BY total_order DESC) BETWEEN 0 AND 
> 670800 THEN 'V3' END) AS order_rank
>   FROM
>   (
> SELECT
>   passenger_id,
>   1 as total_order
> FROM table
> GROUP BY passenger_id
>   ) dd1
> {code}
> {code}
> java.lang.OutOfMemoryError: Java heap space
> at 
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:536)
> at 
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:93)
> at 
> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.fetchNextPartition(Window.scala:278)
> at 
> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:304)
> at 
> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:246)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:512)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:686)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:74)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:104)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:247)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> physical plan:
> {code}
> == Physical Plan ==
> Project [passenger_id#7L,total_order#0,CASE WHEN ((_we0#20 >= 0) && (_we1#21 
> <= 670800)) THEN V3 AS order_rank#1]
> +- Window [passenger_id#7L,total_order#0], 
> [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber()
>  windowspecdefinition(total_order#0 DESC,ROWS BETWEEN UNBOUNDED PRECEDING AND 
> UNBOUNDED FOLLOWING) AS 
> _we0#20,HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber()
>  windowspecdefinition(total_order#0 DESC,ROWS BETWEEN UNBOUNDED PRECEDING AND 
> UNBOUNDED FOLLOWING) AS _we1#21], [total_order#0 DESC]
>+- Sort [total_order#0 DESC], false, 0
>   +- TungstenExchange SinglePartition, None
>  +- Project [passenger_id#7L,total_order#0]
> +- TungstenAggregate(key=[passenger_id#7L], functions=[], 
> output=[passenger_id#7L,total_order#0])
>+- TungstenExchange hashpartitioning(passenger_id#7L,1000), 
> None
>   +- TungstenAggregate(key=[passenger_id#7L], functions=[], 
> output=[passenger_id#7L])
>  +- Project [passenger_id#7L]
> +- Filter product#9 IN (kuai,gulf)
>+- HiveTableScan [passenger_id#7L,product#9], 
> MetastoreRelation pbs_dw, dwv_order_whole_day, None,
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (SPARK-17450) spark sql rownumber OOM

2016-09-08 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474518#comment-15474518
 ] 

Herman van Hovell commented on SPARK-17450:
---

You could try. You would also have to add the follow-up by davies (that adds 
the spilling logic).

> spark sql rownumber OOM
> ---
>
> Key: SPARK-17450
> URL: https://issues.apache.org/jira/browse/SPARK-17450
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.2
>Reporter: cen yuhai
>
> spark sql will be OOM when using row_number() over too much sorted records... 
> There will be only 1 task to handle all records
> This sql group by passenger_id,  we have 100 million passengers.
> {code} 
>  SELECT
> passenger_id,
> total_order,
> (CASE WHEN row_number() over (ORDER BY total_order DESC) BETWEEN 0 AND 
> 670800 THEN 'V3' END) AS order_rank
>   FROM
>   (
> SELECT
>   passenger_id,
>   1 as total_order
> FROM table
> GROUP BY passenger_id
>   ) dd1
> {code}
> {code}
> java.lang.OutOfMemoryError: Java heap space
> at 
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:536)
> at 
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:93)
> at 
> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.fetchNextPartition(Window.scala:278)
> at 
> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:304)
> at 
> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:246)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:512)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:686)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:74)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:104)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:247)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> physical plan:
> {code}
> == Physical Plan ==
> Project [passenger_id#7L,total_order#0,CASE WHEN ((_we0#20 >= 0) && (_we1#21 
> <= 670800)) THEN V3 AS order_rank#1]
> +- Window [passenger_id#7L,total_order#0], 
> [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber()
>  windowspecdefinition(total_order#0 DESC,ROWS BETWEEN UNBOUNDED PRECEDING AND 
> UNBOUNDED FOLLOWING) AS 
> _we0#20,HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber()
>  windowspecdefinition(total_order#0 DESC,ROWS BETWEEN UNBOUNDED PRECEDING AND 
> UNBOUNDED FOLLOWING) AS _we1#21], [total_order#0 DESC]
>+- Sort [total_order#0 DESC], false, 0
>   +- TungstenExchange SinglePartition, None
>  +- Project [passenger_id#7L,total_order#0]
> +- TungstenAggregate(key=[passenger_id#7L], functions=[], 
> output=[passenger_id#7L,total_order#0])
>+- TungstenExchange hashpartitioning(passenger_id#7L,1000), 
> None
>   +- TungstenAggregate(key=[passenger_id#7L], functions=[], 
> output=[passenger_id#7L])
>  +- Project [passenger_id#7L]
> +- Filter product#9 IN (kuai,gulf)
>+- HiveTableScan [passenger_id#7L,product#9], 
> MetastoreRelation pbs_dw, dwv_order_whole_day, None,
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (SPARK-17450) spark sql rownumber OOM

2016-09-08 Thread cen yuhai (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474430#comment-15474430
 ] 

cen yuhai commented on SPARK-17450:
---

hiļ¼Œherman, can i merge your pr for native spark window function? 
https://github.com/apache/spark/pull/9819 ???

> spark sql rownumber OOM
> ---
>
> Key: SPARK-17450
> URL: https://issues.apache.org/jira/browse/SPARK-17450
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.2
>Reporter: cen yuhai
>
> spark sql will be OOM when using row_number() over too much sorted records... 
> There will be only 1 task to handle all records
> This sql group by passenger_id,  we have 100 million passengers.
> {code} 
>  SELECT
> passenger_id,
> total_order,
> (CASE WHEN row_number() over (ORDER BY total_order DESC) BETWEEN 0 AND 
> 670800 THEN 'V3' END) AS order_rank
>   FROM
>   (
> SELECT
>   passenger_id,
>   1 as total_order
> FROM table
> GROUP BY passenger_id
>   ) dd1
> {code}
> {code}
> java.lang.OutOfMemoryError: Java heap space
> at 
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:536)
> at 
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:93)
> at 
> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.fetchNextPartition(Window.scala:278)
> at 
> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:304)
> at 
> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:246)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:512)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:686)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:74)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:104)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:247)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> physical plan:
> {code}
> == Physical Plan ==
> Project [passenger_id#7L,total_order#0,CASE WHEN ((_we0#20 >= 0) && (_we1#21 
> <= 670800)) THEN V3 AS order_rank#1]
> +- Window [passenger_id#7L,total_order#0], 
> [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber()
>  windowspecdefinition(total_order#0 DESC,ROWS BETWEEN UNBOUNDED PRECEDING AND 
> UNBOUNDED FOLLOWING) AS 
> _we0#20,HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber()
>  windowspecdefinition(total_order#0 DESC,ROWS BETWEEN UNBOUNDED PRECEDING AND 
> UNBOUNDED FOLLOWING) AS _we1#21], [total_order#0 DESC]
>+- Sort [total_order#0 DESC], false, 0
>   +- TungstenExchange SinglePartition, None
>  +- Project [passenger_id#7L,total_order#0]
> +- TungstenAggregate(key=[passenger_id#7L], functions=[], 
> output=[passenger_id#7L,total_order#0])
>+- TungstenExchange hashpartitioning(passenger_id#7L,1000), 
> None
>   +- TungstenAggregate(key=[passenger_id#7L], functions=[], 
> output=[passenger_id#7L])
>  +- Project [passenger_id#7L]
> +- Filter product#9 IN (kuai,gulf)
>+- HiveTableScan [passenger_id#7L,product#9], 
> MetastoreRelation pbs_dw, dwv_order_whole_day, None,
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (SPARK-17450) spark sql rownumber OOM

2016-09-08 Thread Herman van Hovell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474363#comment-15474363
 ] 

Herman van Hovell commented on SPARK-17450:
---

This generally a bad idea. All your data is moved to a single executor, sorted 
in that executor and finally processes on that executor. So I am curious to 
know what the use case is.

What makes things worse is the fact that 1.6 keeps all records in a single 
partition in memory; causing an OOM in your case. In 2.0 we spill records to 
disk (above 4000 records) in order to prevent OOMs. Could you try this in 2.0?


> spark sql rownumber OOM
> ---
>
> Key: SPARK-17450
> URL: https://issues.apache.org/jira/browse/SPARK-17450
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.2
>Reporter: cen yuhai
>
> spark sql will be OOM when using row_number() over too much sorted records... 
> There will be only 1 task to handle all records
> This sql group by passenger_id,  we have 100 million passengers.
> {code} 
>  SELECT
> passenger_id,
> total_order,
> (CASE WHEN row_number() over (ORDER BY total_order DESC) BETWEEN 0 AND 
> 670800 THEN 'V3' END) AS order_rank
>   FROM
>   (
> SELECT
>   passenger_id,
>   1 as total_order
> FROM table
> GROUP BY passenger_id
>   ) dd1
> {code}
> {code}
> java.lang.OutOfMemoryError: Java heap space
> at 
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:536)
> at 
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:93)
> at 
> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.fetchNextPartition(Window.scala:278)
> at 
> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:304)
> at 
> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:246)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:512)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:686)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:74)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:104)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:247)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> physical plan:
> {code}
> == Physical Plan ==
> Project [passenger_id#7L,total_order#0,CASE WHEN ((_we0#20 >= 0) && (_we1#21 
> <= 670800)) THEN V3 AS order_rank#1]
> +- Window [passenger_id#7L,total_order#0], 
> [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber()
>  windowspecdefinition(total_order#0 DESC,ROWS BETWEEN UNBOUNDED PRECEDING AND 
> UNBOUNDED FOLLOWING) AS 
> _we0#20,HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFRowNumber()
>  windowspecdefinition(total_order#0 DESC,ROWS BETWEEN UNBOUNDED PRECEDING AND 
> UNBOUNDED FOLLOWING) AS _we1#21], [total_order#0 DESC]
>+- Sort [total_order#0 DESC], false, 0
>   +- TungstenExchange SinglePartition, None
>  +- Project [passenger_id#7L,total_order#0]
> +- TungstenAggregate(key=[passenger_id#7L], functions=[], 
> output=[passenger_id#7L,total_order#0])
>+- TungstenExchange hashpartitioning(passenger_id#7L,1000), 
> None
>   +- TungstenAggregate(key=[passenger_id#7L], functions=[], 
> output=[passenger_id#7L])
>