[jira] [Commented] (SPARK-16700) StructType doesn't accept Python dicts anymore

2016-08-03 Thread Sylvain Zimmer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406754#comment-15406754
 ] 

Sylvain Zimmer commented on SPARK-16700:


The verifySchema flag works great, and the {{dict}} issue seems to be fixed for 
me. Thanks a lot!!

> StructType doesn't accept Python dicts anymore
> --
>
> Key: SPARK-16700
> URL: https://issues.apache.org/jira/browse/SPARK-16700
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.0
>Reporter: Sylvain Zimmer
>Assignee: Davies Liu
>
> Hello,
> I found this issue while testing my codebase with 2.0.0-rc5
> StructType in Spark 1.6.2 accepts the Python  type, which is very 
> handy. 2.0.0-rc5 does not and throws an error.
> I don't know if this was intended but I'd advocate for this behaviour to 
> remain the same. MapType is probably wasteful when your key names never 
> change and switching to Python tuples would be cumbersome.
> Here is a minimal script to reproduce the issue: 
> {code}
> from pyspark import SparkContext
> from pyspark.sql import types as SparkTypes
> from pyspark.sql import SQLContext
> sc = SparkContext()
> sqlc = SQLContext(sc)
> struct_schema = SparkTypes.StructType([
> SparkTypes.StructField("id", SparkTypes.LongType())
> ])
> rdd = sc.parallelize([{"id": 0}, {"id": 1}])
> df = sqlc.createDataFrame(rdd, struct_schema)
> print df.collect()
> # 1.6.2 prints [Row(id=0), Row(id=1)]
> # 2.0.0-rc5 raises TypeError: StructType can not accept object {'id': 0} in 
> type 
> {code}
> Thanks!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16826) java.util.Hashtable limits the throughput of PARSE_URL()

2016-08-01 Thread Sylvain Zimmer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403262#comment-15403262
 ] 

Sylvain Zimmer commented on SPARK-16826:


[~srowen] what about this? 
https://github.com/sylvinus/spark/commit/98119a08368b1cd1faf3f25a32910ad6717c5c02

The tests seem to pass and I don't think it uses the problematic code paths in 
java.net.URL (except for getFile but that may be could be fixed easily)

> java.util.Hashtable limits the throughput of PARSE_URL()
> 
>
> Key: SPARK-16826
> URL: https://issues.apache.org/jira/browse/SPARK-16826
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Sylvain Zimmer
>
> Hello!
> I'm using {{c4.8xlarge}} instances on EC2 with 36 cores and doing lots of 
> {{parse_url(url, "host")}} in Spark SQL.
> Unfortunately it seems that there is an internal thread-safe cache in there, 
> and the instances end up being 90% idle.
> When I view the thread dump for my executors, most of the executor threads 
> are "BLOCKED", in that state:
> {code}
> java.util.Hashtable.get(Hashtable.java:362)
> java.net.URL.getURLStreamHandler(URL.java:1135)
> java.net.URL.(URL.java:599)
> java.net.URL.(URL.java:490)
> java.net.URL.(URL.java:439)
> org.apache.spark.sql.catalyst.expressions.ParseUrl.getUrl(stringExpressions.scala:731)
> org.apache.spark.sql.catalyst.expressions.ParseUrl.parseUrlWithoutKey(stringExpressions.scala:772)
> org.apache.spark.sql.catalyst.expressions.ParseUrl.eval(stringExpressions.scala:785)
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
>  Source)
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69)
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69)
> org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:203)
> org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:202)
> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147)
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> org.apache.spark.scheduler.Task.run(Task.scala:85)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
> {code}
> However, when I switch from 1 executor with 36 cores to 9 executors with 4 
> cores, throughput is almost 10x higher and the CPUs are back at ~100% use.
> Thanks!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16826) java.util.Hashtable limits the throughput of PARSE_URL()

2016-08-01 Thread Sylvain Zimmer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403236#comment-15403236
 ] 

Sylvain Zimmer commented on SPARK-16826:


Sorry I can't be more helpful on the Java side... But I think there must be 
some high-quality URL parsing code somewhere in the Apache foundation already 
:-)

> java.util.Hashtable limits the throughput of PARSE_URL()
> 
>
> Key: SPARK-16826
> URL: https://issues.apache.org/jira/browse/SPARK-16826
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Sylvain Zimmer
>
> Hello!
> I'm using {{c4.8xlarge}} instances on EC2 with 36 cores and doing lots of 
> {{parse_url(url, "host")}} in Spark SQL.
> Unfortunately it seems that there is an internal thread-safe cache in there, 
> and the instances end up being 90% idle.
> When I view the thread dump for my executors, most of the executor threads 
> are "BLOCKED", in that state:
> {code}
> java.util.Hashtable.get(Hashtable.java:362)
> java.net.URL.getURLStreamHandler(URL.java:1135)
> java.net.URL.(URL.java:599)
> java.net.URL.(URL.java:490)
> java.net.URL.(URL.java:439)
> org.apache.spark.sql.catalyst.expressions.ParseUrl.getUrl(stringExpressions.scala:731)
> org.apache.spark.sql.catalyst.expressions.ParseUrl.parseUrlWithoutKey(stringExpressions.scala:772)
> org.apache.spark.sql.catalyst.expressions.ParseUrl.eval(stringExpressions.scala:785)
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
>  Source)
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69)
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69)
> org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:203)
> org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:202)
> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147)
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> org.apache.spark.scheduler.Task.run(Task.scala:85)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
> {code}
> However, when I switch from 1 executor with 36 cores to 9 executors with 4 
> cores, throughput is almost 10x higher and the CPUs are back at ~100% use.
> Thanks!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-16826) java.util.Hashtable limits the throughput of PARSE_URL()

2016-08-01 Thread Sylvain Zimmer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403165#comment-15403165
 ] 

Sylvain Zimmer edited comment on SPARK-16826 at 8/2/16 1:15 AM:


[~srowen] thanks for the pointers! 

I'm parsing every hyperlink found in Common Crawl, so there are billions of 
unique ones, no way around it.

Wouldn't it be possible to switch to another implementation with an API similar 
to java.net.URL? As I understand it we never need the URLStreamHandler in the 
first place anyway?

I'm not a Java expert but what about {{java.net.URI}} or 
{{org.apache.catalina.util.URL}} for instance?



was (Author: sylvinus):
[~srowen] thanks for the pointers! 

I'm parsing every hyperlink found in Common Crawl, so there are billions of 
unique ones, no way around it.

Wouldn't it be possible to switch to another implementation with an API similar 
to java.net.URL? As I understand it we never need the URLStreamHandler in the 
first place anyway?

I'm not a Java expert but what about {java.net.URI} or 
{org.apache.catalina.util.URL} for instance?


> java.util.Hashtable limits the throughput of PARSE_URL()
> 
>
> Key: SPARK-16826
> URL: https://issues.apache.org/jira/browse/SPARK-16826
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Sylvain Zimmer
>
> Hello!
> I'm using {{c4.8xlarge}} instances on EC2 with 36 cores and doing lots of 
> {{parse_url(url, "host")}} in Spark SQL.
> Unfortunately it seems that there is an internal thread-safe cache in there, 
> and the instances end up being 90% idle.
> When I view the thread dump for my executors, most of the executor threads 
> are "BLOCKED", in that state:
> {code}
> java.util.Hashtable.get(Hashtable.java:362)
> java.net.URL.getURLStreamHandler(URL.java:1135)
> java.net.URL.(URL.java:599)
> java.net.URL.(URL.java:490)
> java.net.URL.(URL.java:439)
> org.apache.spark.sql.catalyst.expressions.ParseUrl.getUrl(stringExpressions.scala:731)
> org.apache.spark.sql.catalyst.expressions.ParseUrl.parseUrlWithoutKey(stringExpressions.scala:772)
> org.apache.spark.sql.catalyst.expressions.ParseUrl.eval(stringExpressions.scala:785)
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
>  Source)
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69)
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69)
> org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:203)
> org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:202)
> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147)
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> org.apache.spark.scheduler.Task.run(Task.scala:85)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
> {code}
> However, when I switch from 1 executor with 36 cores to 9 executors with 4 
> cores, throughput is almost 10x higher and the CPUs are back at ~100% use.
> Thanks!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16826) java.util.Hashtable limits the throughput of PARSE_URL()

2016-08-01 Thread Sylvain Zimmer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403165#comment-15403165
 ] 

Sylvain Zimmer commented on SPARK-16826:


[~srowen] thanks for the pointers! 

I'm parsing every hyperlink found in Common Crawl, so there are billions of 
unique ones, no way around it.

Wouldn't it be possible to switch to another implementation with an API similar 
to java.net.URL? As I understand it we never need the URLStreamHandler in the 
first place anyway?

I'm not a Java expert but what about {java.net.URI} or 
{org.apache.catalina.util.URL} for instance?


> java.util.Hashtable limits the throughput of PARSE_URL()
> 
>
> Key: SPARK-16826
> URL: https://issues.apache.org/jira/browse/SPARK-16826
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Sylvain Zimmer
>
> Hello!
> I'm using {{c4.8xlarge}} instances on EC2 with 36 cores and doing lots of 
> {{parse_url(url, "host")}} in Spark SQL.
> Unfortunately it seems that there is an internal thread-safe cache in there, 
> and the instances end up being 90% idle.
> When I view the thread dump for my executors, most of the executor threads 
> are "BLOCKED", in that state:
> {code}
> java.util.Hashtable.get(Hashtable.java:362)
> java.net.URL.getURLStreamHandler(URL.java:1135)
> java.net.URL.(URL.java:599)
> java.net.URL.(URL.java:490)
> java.net.URL.(URL.java:439)
> org.apache.spark.sql.catalyst.expressions.ParseUrl.getUrl(stringExpressions.scala:731)
> org.apache.spark.sql.catalyst.expressions.ParseUrl.parseUrlWithoutKey(stringExpressions.scala:772)
> org.apache.spark.sql.catalyst.expressions.ParseUrl.eval(stringExpressions.scala:785)
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
>  Source)
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69)
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69)
> org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:203)
> org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:202)
> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147)
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> org.apache.spark.scheduler.Task.run(Task.scala:85)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
> {code}
> However, when I switch from 1 executor with 36 cores to 9 executors with 4 
> cores, throughput is almost 10x higher and the CPUs are back at ~100% use.
> Thanks!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-16826) java.util.Hashtable limits the throughput of PARSE_URL()

2016-07-31 Thread Sylvain Zimmer (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sylvain Zimmer updated SPARK-16826:
---
Description: 
Hello!

I'm using {{c4.8xlarge}} instances on EC2 with 36 cores and doing lots of 
{{parse_url(url, "host")}} in Spark SQL.

Unfortunately it seems that there is an internal thread-safe cache in there, 
and the instances end up being 90% idle.

When I view the thread dump for my executors, most of the executor threads are 
"BLOCKED", in that state:
{code}
java.util.Hashtable.get(Hashtable.java:362)
java.net.URL.getURLStreamHandler(URL.java:1135)
java.net.URL.(URL.java:599)
java.net.URL.(URL.java:490)
java.net.URL.(URL.java:439)
org.apache.spark.sql.catalyst.expressions.ParseUrl.getUrl(stringExpressions.scala:731)
org.apache.spark.sql.catalyst.expressions.ParseUrl.parseUrlWithoutKey(stringExpressions.scala:772)
org.apache.spark.sql.catalyst.expressions.ParseUrl.eval(stringExpressions.scala:785)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
 Source)
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69)
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69)
org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:203)
org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:202)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
org.apache.spark.scheduler.Task.run(Task.scala:85)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
{code}

However, when I switch from 1 executor with 36 cores to 9 executors with 4 
cores, throughput is almost 10x higher and the CPUs are back at ~100% use.

Thanks!

  was:
Hello!

I'm using {{c4.8xlarge}} instances on EC2 with 36 cores and doing lots of 
{{parse_url(url, "host")}} in Spark SQL.

Unfortunately it seems that there is an internal thread-safe cache in there, 
and the instances end up being 90% idle.

When I view the thread dump for my executors, most of the 36 cores are in 
status "BLOCKED", in that state:
{code}
java.util.Hashtable.get(Hashtable.java:362)
java.net.URL.getURLStreamHandler(URL.java:1135)
java.net.URL.(URL.java:599)
java.net.URL.(URL.java:490)
java.net.URL.(URL.java:439)
org.apache.spark.sql.catalyst.expressions.ParseUrl.getUrl(stringExpressions.scala:731)
org.apache.spark.sql.catalyst.expressions.ParseUrl.parseUrlWithoutKey(stringExpressions.scala:772)
org.apache.spark.sql.catalyst.expressions.ParseUrl.eval(stringExpressions.scala:785)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
 Source)
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69)
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69)
org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:203)
org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:202)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
org.apache.spark.scheduler.Task.run(Task.scala:85)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

[jira] [Updated] (SPARK-16826) java.util.Hashtable limits the throughput of PARSE_URL()

2016-07-31 Thread Sylvain Zimmer (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sylvain Zimmer updated SPARK-16826:
---
Description: 
Hello!

I'm using {{c4.8xlarge}} instances on EC2 with 36 cores and doing lots of 
{{parse_url(url, "host")}} in Spark SQL.

Unfortunately it seems that there is an internal thread-safe cache in there, 
and the instances end up being 90% idle.

When I view the thread dump for my executors, most of the 36 cores are in 
status "BLOCKED", in that state:
{code}
java.util.Hashtable.get(Hashtable.java:362)
java.net.URL.getURLStreamHandler(URL.java:1135)
java.net.URL.(URL.java:599)
java.net.URL.(URL.java:490)
java.net.URL.(URL.java:439)
org.apache.spark.sql.catalyst.expressions.ParseUrl.getUrl(stringExpressions.scala:731)
org.apache.spark.sql.catalyst.expressions.ParseUrl.parseUrlWithoutKey(stringExpressions.scala:772)
org.apache.spark.sql.catalyst.expressions.ParseUrl.eval(stringExpressions.scala:785)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
 Source)
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69)
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69)
org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:203)
org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:202)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
org.apache.spark.scheduler.Task.run(Task.scala:85)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
{code}

However, when I switch from 1 executor with 36 cores to 9 executors with 4 
cores, throughput is almost 10x higher and the CPUs are back at ~100% use.

Thanks!

  was:
Hello!

I'm using {{c4.8xlarge}} instances on EC2 with 36 cores and doing lots of 
{{parse_url(url, "host")}} in Spark SQL.

Unfortunately it seems that there is an internal thread-safe cache in there, 
and the instances end up being 90% idle.

When I view the thread dump for my executors, most of the 36 cores are in 
status "BLOCKED", in that stage:
{code}
java.util.Hashtable.get(Hashtable.java:362)
java.net.URL.getURLStreamHandler(URL.java:1135)
java.net.URL.(URL.java:599)
java.net.URL.(URL.java:490)
java.net.URL.(URL.java:439)
org.apache.spark.sql.catalyst.expressions.ParseUrl.getUrl(stringExpressions.scala:731)
org.apache.spark.sql.catalyst.expressions.ParseUrl.parseUrlWithoutKey(stringExpressions.scala:772)
org.apache.spark.sql.catalyst.expressions.ParseUrl.eval(stringExpressions.scala:785)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
 Source)
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69)
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69)
org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:203)
org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:202)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
org.apache.spark.scheduler.Task.run(Task.scala:85)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

[jira] [Updated] (SPARK-16826) java.util.Hashtable limits the throughput of PARSE_URL()

2016-07-31 Thread Sylvain Zimmer (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sylvain Zimmer updated SPARK-16826:
---
Description: 
Hello!

I'm using {{c4.8xlarge}} instances on EC2 with 36 cores and doing lots of 
{{parse_url(url, "host")}} in Spark SQL.

Unfortunately it seems that there is an internal thread-safe cache in there, 
and the instances end up being 90% idle.

When I view the thread dump for my executors, most of the 36 cores are in 
status "BLOCKED", in that stage:
{code}
java.util.Hashtable.get(Hashtable.java:362)
java.net.URL.getURLStreamHandler(URL.java:1135)
java.net.URL.(URL.java:599)
java.net.URL.(URL.java:490)
java.net.URL.(URL.java:439)
org.apache.spark.sql.catalyst.expressions.ParseUrl.getUrl(stringExpressions.scala:731)
org.apache.spark.sql.catalyst.expressions.ParseUrl.parseUrlWithoutKey(stringExpressions.scala:772)
org.apache.spark.sql.catalyst.expressions.ParseUrl.eval(stringExpressions.scala:785)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
 Source)
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69)
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69)
org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:203)
org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:202)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
org.apache.spark.scheduler.Task.run(Task.scala:85)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
{code}

However, when I switch from 1 executor with 36 cores to 9 executors with 4 
cores, throughput is almost 10x higher and the CPUs are back at ~100% use.

Thanks!

  was:
Hello!

I'm using {c4.8xlarge} instances on EC2 with 36 cores and doing lots of 
{parse_url(url, "host")} in Spark SQL.

Unfortunately it seems that there is an internal thread-safe cache in there, 
and the instances end up being 90% idle.

When I view the thread dump for my executors, most of the 36 cores are in 
status "BLOCKED", in that stage:
{code}
java.util.Hashtable.get(Hashtable.java:362)
java.net.URL.getURLStreamHandler(URL.java:1135)
java.net.URL.(URL.java:599)
java.net.URL.(URL.java:490)
java.net.URL.(URL.java:439)
org.apache.spark.sql.catalyst.expressions.ParseUrl.getUrl(stringExpressions.scala:731)
org.apache.spark.sql.catalyst.expressions.ParseUrl.parseUrlWithoutKey(stringExpressions.scala:772)
org.apache.spark.sql.catalyst.expressions.ParseUrl.eval(stringExpressions.scala:785)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
 Source)
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69)
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69)
org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:203)
org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:202)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
org.apache.spark.scheduler.Task.run(Task.scala:85)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

[jira] [Created] (SPARK-16826) java.util.Hashtable limits the throughput of PARSE_URL()

2016-07-31 Thread Sylvain Zimmer (JIRA)
Sylvain Zimmer created SPARK-16826:
--

 Summary: java.util.Hashtable limits the throughput of PARSE_URL()
 Key: SPARK-16826
 URL: https://issues.apache.org/jira/browse/SPARK-16826
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.0.0
Reporter: Sylvain Zimmer


Hello!

I'm using {c4.8xlarge} instances on EC2 with 36 cores and doing lots of 
{parse_url(url, "host")} in Spark SQL.

Unfortunately it seems that there is an internal thread-safe cache in there, 
and the instances end up being 90% idle.

When I view the thread dump for my executors, most of the 36 cores are in 
status "BLOCKED", in that stage:
{code}
java.util.Hashtable.get(Hashtable.java:362)
java.net.URL.getURLStreamHandler(URL.java:1135)
java.net.URL.(URL.java:599)
java.net.URL.(URL.java:490)
java.net.URL.(URL.java:439)
org.apache.spark.sql.catalyst.expressions.ParseUrl.getUrl(stringExpressions.scala:731)
org.apache.spark.sql.catalyst.expressions.ParseUrl.parseUrlWithoutKey(stringExpressions.scala:772)
org.apache.spark.sql.catalyst.expressions.ParseUrl.eval(stringExpressions.scala:785)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
 Source)
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69)
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69)
org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:203)
org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:202)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
org.apache.spark.scheduler.Task.run(Task.scala:85)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
{code}

However, when I switch from 1 executor with 36 cores to 9 executors with 4 
cores, throughput is almost 10x higher and the CPUs are back at ~100% use.

Thanks!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16802) joins.LongToUnsafeRowMap crashes with ArrayIndexOutOfBoundsException

2016-07-29 Thread Sylvain Zimmer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15400260#comment-15400260
 ] 

Sylvain Zimmer commented on SPARK-16802:


Maybe useful for others: an ugly workaround to avoid this code path is to cast 
the join as string and do something like this instead:

{code}
SELECT
df1.id1
FROM df1
LEFT OUTER JOIN df2 ON cast(df1.id1 as string) = cast(df2.id2 as string)
{code}

> joins.LongToUnsafeRowMap crashes with ArrayIndexOutOfBoundsException
> 
>
> Key: SPARK-16802
> URL: https://issues.apache.org/jira/browse/SPARK-16802
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Sylvain Zimmer
>
> Hello!
> This is a little similar to 
> [SPARK-16740|https://issues.apache.org/jira/browse/SPARK-16740] (should I 
> have reopened it?).
> I would recommend to give another full review to {{HashedRelation.scala}}, 
> particularly the new {{LongToUnsafeRowMap}} code. I've had a few other errors 
> that I haven't managed to reproduce so far, as well as what I suspect could 
> be memory leaks (I have a query in a loop OOMing after a few iterations 
> despite not caching its results).
> Here is the script to reproduce the ArrayIndexOutOfBoundsException on the 
> current 2.0 branch:
> {code}
> import os
> import random
> from pyspark import SparkContext
> from pyspark.sql import types as SparkTypes
> from pyspark.sql import SQLContext
> sc = SparkContext()
> sqlc = SQLContext(sc)
> schema1 = SparkTypes.StructType([
> SparkTypes.StructField("id1", SparkTypes.LongType(), nullable=True)
> ])
> schema2 = SparkTypes.StructType([
> SparkTypes.StructField("id2", SparkTypes.LongType(), nullable=True)
> ])
> def randlong():
> return random.randint(-9223372036854775808, 9223372036854775807)
> while True:
> l1, l2 = randlong(), randlong()
> # Sample values that crash:
> # l1, l2 = 4661454128115150227, -5543241376386463808
> print "Testing with %s, %s" % (l1, l2)
> data1 = [(l1, ), (l2, )]
> data2 = [(l1, )]
> df1 = sqlc.createDataFrame(sc.parallelize(data1), schema1)
> df2 = sqlc.createDataFrame(sc.parallelize(data2), schema2)
> crash = True
> if crash:
> os.system("rm -rf /tmp/sparkbug")
> df1.write.parquet("/tmp/sparkbug/vertex")
> df2.write.parquet("/tmp/sparkbug/edge")
> df1 = sqlc.read.load("/tmp/sparkbug/vertex")
> df2 = sqlc.read.load("/tmp/sparkbug/edge")
> sqlc.registerDataFrameAsTable(df1, "df1")
> sqlc.registerDataFrameAsTable(df2, "df2")
> result_df = sqlc.sql("""
> SELECT
> df1.id1
> FROM df1
> LEFT OUTER JOIN df2 ON df1.id1 = df2.id2
> """)
> print result_df.collect()
> {code}
> {code}
> java.lang.ArrayIndexOutOfBoundsException: 1728150825
>   at 
> org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.getValue(HashedRelation.scala:463)
>   at 
> org.apache.spark.sql.execution.joins.LongHashedRelation.getValue(HashedRelation.scala:762)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at 
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:117)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at 
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
>   at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
>   at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
>   at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
>   at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
>   at 
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:112)
>   at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
>   at 
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:112)
>   at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
>   at 
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:112)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:899)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:899)
>   at 
> 

[jira] [Updated] (SPARK-16807) Optimize some ABS() statements

2016-07-29 Thread Sylvain Zimmer (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sylvain Zimmer updated SPARK-16807:
---
Priority: Minor  (was: Major)

> Optimize some ABS() statements
> --
>
> Key: SPARK-16807
> URL: https://issues.apache.org/jira/browse/SPARK-16807
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Sylvain Zimmer
>Priority: Minor
>
> I'm not a Catalyst expert, but I think some use cases for the ABS() function 
> could generate simpler code.
> This is the code generated when doing something like {{ABS(x - y) > 0}} or 
> {{ABS(x - y) = 0}} in Spark SQL:
> {code}
> /* 267 */   float filter_value6 = -1.0f;
> /* 268 */   filter_value6 = agg_value27 - agg_value32;
> /* 269 */   float filter_value5 = -1.0f;
> /* 270 */   filter_value5 = (float)(java.lang.Math.abs(filter_value6));
> /* 271 */
> /* 272 */   boolean filter_value4 = false;
> /* 273 */   filter_value4 = 
> org.apache.spark.util.Utils.nanSafeCompareFloats(filter_value5, 0.0f) > 0;
> /* 274 */   if (!filter_value4) continue;
> {code}
> Maybe it could all be simplified to something like this?
> {code}
> filter_value4 = (agg_value27 != agg_value32)
> {code}
> (Of course you could write {{x != y}} directly in the SQL query, but the 
> {{0}} in my example could be a configurable threshold, not something you can 
> hardcode)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-16807) Optimize some ABS() statements

2016-07-29 Thread Sylvain Zimmer (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sylvain Zimmer updated SPARK-16807:
---
Description: 
I'm not a Catalyst expert, but I think some use cases for the ABS() function 
could generate simpler code.

This is the code generated when doing something like {{ABS(x - y) > 0}} or 
{{ABS(x - y) = 0}} in Spark SQL:

{code}
/* 267 */   float filter_value6 = -1.0f;
/* 268 */   filter_value6 = agg_value27 - agg_value32;
/* 269 */   float filter_value5 = -1.0f;
/* 270 */   filter_value5 = (float)(java.lang.Math.abs(filter_value6));
/* 271 */
/* 272 */   boolean filter_value4 = false;
/* 273 */   filter_value4 = 
org.apache.spark.util.Utils.nanSafeCompareFloats(filter_value5, 0.0f) > 0;
/* 274 */   if (!filter_value4) continue;
{code}

Maybe it could all be simplified to something like this?
{code}
filter_value4 = (agg_value27 != agg_value32)
{code}

(Of course you could write {{x != y}} directly in the SQL query, but the {{0}} 
in my example could be a configurable threshold, not something you can hardcode)

  was:
I'm not a Catalyst expert, but I think some use cases for the ABS() function 
could generate simpler code.

This is the code generated when doing something like {{ABS(x - y) > 0}} or 
{{ABS(x - y) = 0}} in Spark SQL:

{code}
/* 267 */   float filter_value6 = -1.0f;
/* 268 */   filter_value6 = agg_value27 - agg_value32;
/* 269 */   float filter_value5 = -1.0f;
/* 270 */   filter_value5 = (float)(java.lang.Math.abs(filter_value6));
/* 271 */
/* 272 */   boolean filter_value4 = false;
/* 273 */   filter_value4 = 
org.apache.spark.util.Utils.nanSafeCompareFloats(filter_value5, 0.0f) > 0;
/* 274 */   if (!filter_value4) continue;
{code}

Maybe it could all be simplified to something like this?
{code}
filter_value4 = (agg_value27 != agg_value32)
{code}


> Optimize some ABS() statements
> --
>
> Key: SPARK-16807
> URL: https://issues.apache.org/jira/browse/SPARK-16807
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Sylvain Zimmer
>
> I'm not a Catalyst expert, but I think some use cases for the ABS() function 
> could generate simpler code.
> This is the code generated when doing something like {{ABS(x - y) > 0}} or 
> {{ABS(x - y) = 0}} in Spark SQL:
> {code}
> /* 267 */   float filter_value6 = -1.0f;
> /* 268 */   filter_value6 = agg_value27 - agg_value32;
> /* 269 */   float filter_value5 = -1.0f;
> /* 270 */   filter_value5 = (float)(java.lang.Math.abs(filter_value6));
> /* 271 */
> /* 272 */   boolean filter_value4 = false;
> /* 273 */   filter_value4 = 
> org.apache.spark.util.Utils.nanSafeCompareFloats(filter_value5, 0.0f) > 0;
> /* 274 */   if (!filter_value4) continue;
> {code}
> Maybe it could all be simplified to something like this?
> {code}
> filter_value4 = (agg_value27 != agg_value32)
> {code}
> (Of course you could write {{x != y}} directly in the SQL query, but the 
> {{0}} in my example could be a configurable threshold, not something you can 
> hardcode)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-16807) Optimize some ABS() statements

2016-07-29 Thread Sylvain Zimmer (JIRA)
Sylvain Zimmer created SPARK-16807:
--

 Summary: Optimize some ABS() statements
 Key: SPARK-16807
 URL: https://issues.apache.org/jira/browse/SPARK-16807
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Reporter: Sylvain Zimmer


I'm not a Catalyst expert, but I think some use cases for the ABS() function 
could generate simpler code.

This is the code generated when doing something like {{ABS(x - y) > 0}} or 
{{ABS(x - y) = 0}} in Spark SQL:

{code}
/* 267 */   float filter_value6 = -1.0f;
/* 268 */   filter_value6 = agg_value27 - agg_value32;
/* 269 */   float filter_value5 = -1.0f;
/* 270 */   filter_value5 = (float)(java.lang.Math.abs(filter_value6));
/* 271 */
/* 272 */   boolean filter_value4 = false;
/* 273 */   filter_value4 = 
org.apache.spark.util.Utils.nanSafeCompareFloats(filter_value5, 0.0f) > 0;
/* 274 */   if (!filter_value4) continue;
{code}

Maybe it could all be simplified to something like this?
{code}
filter_value4 = (agg_value27 != agg_value32)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-16802) joins.LongToUnsafeRowMap crashes with ArrayIndexOutOfBoundsException

2016-07-29 Thread Sylvain Zimmer (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sylvain Zimmer updated SPARK-16802:
---
Description: 
Hello!

This is a little similar to 
[SPARK-16740|https://issues.apache.org/jira/browse/SPARK-16740] (should I have 
reopened it?).

I would recommend to give another full review to {{HashedRelation.scala}}, 
particularly the new {{LongToUnsafeRowMap}} code. I've had a few other errors 
that I haven't managed to reproduce so far, as well as what I suspect could be 
memory leaks (I have a query in a loop OOMing after a few iterations despite 
not caching its results).

Here is the script to reproduce the ArrayIndexOutOfBoundsException on the 
current 2.0 branch:

{code}
import os
import random

from pyspark import SparkContext
from pyspark.sql import types as SparkTypes
from pyspark.sql import SQLContext

sc = SparkContext()
sqlc = SQLContext(sc)

schema1 = SparkTypes.StructType([
SparkTypes.StructField("id1", SparkTypes.LongType(), nullable=True)
])
schema2 = SparkTypes.StructType([
SparkTypes.StructField("id2", SparkTypes.LongType(), nullable=True)
])


def randlong():
return random.randint(-9223372036854775808, 9223372036854775807)

while True:
l1, l2 = randlong(), randlong()

# Sample values that crash:
# l1, l2 = 4661454128115150227, -5543241376386463808

print "Testing with %s, %s" % (l1, l2)
data1 = [(l1, ), (l2, )]
data2 = [(l1, )]

df1 = sqlc.createDataFrame(sc.parallelize(data1), schema1)
df2 = sqlc.createDataFrame(sc.parallelize(data2), schema2)

crash = True
if crash:
os.system("rm -rf /tmp/sparkbug")
df1.write.parquet("/tmp/sparkbug/vertex")
df2.write.parquet("/tmp/sparkbug/edge")

df1 = sqlc.read.load("/tmp/sparkbug/vertex")
df2 = sqlc.read.load("/tmp/sparkbug/edge")

sqlc.registerDataFrameAsTable(df1, "df1")
sqlc.registerDataFrameAsTable(df2, "df2")

result_df = sqlc.sql("""
SELECT
df1.id1
FROM df1
LEFT OUTER JOIN df2 ON df1.id1 = df2.id2
""")

print result_df.collect()
{code}

{code}
java.lang.ArrayIndexOutOfBoundsException: 1728150825
at 
org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.getValue(HashedRelation.scala:463)
at 
org.apache.spark.sql.execution.joins.LongHashedRelation.getValue(HashedRelation.scala:762)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:117)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:112)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:112)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:112)
at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:899)
at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:899)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1898)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1898)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/07/29 20:19:00 WARN TaskSetManager: Lost task 0.0 in stage 17.0 (TID 50, 
localhost): java.lang.ArrayIndexOutOfBoundsException: 1728150825
at 
org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.getValue(HashedRelation.scala:463)
   

[jira] [Updated] (SPARK-16802) joins.LongToUnsafeRowMap crashes with ArrayIndexOutOfBoundsException

2016-07-29 Thread Sylvain Zimmer (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sylvain Zimmer updated SPARK-16802:
---
Description: 
Hello!

This is a little similar to 
[SPARK-16740|https://issues.apache.org/jira/browse/SPARK-16740] (should I have 
reopened it?).

I would recommend to give another full review to {{HashedRelation.scala}}, 
particularly the new {{LongToUnsafeRowMap}} code. I've had a few other errors 
that I haven't managed to reproduce so far, as well as what I suspect could be 
memory leaks (I have a query in a loop OOMing after a few iterations despite 
not caching its results).

Here is the script to reproduce the ArrayIndexOutOfBoundsException on the 
current 2.0 branch:

{code}
import os
import random

from pyspark import SparkContext
from pyspark.sql import types as SparkTypes
from pyspark.sql import SQLContext

sc = SparkContext()
sqlc = SQLContext(sc)

schema1 = SparkTypes.StructType([
SparkTypes.StructField("id1", SparkTypes.LongType(), nullable=True),
# SparkTypes.StructField("weight1", SparkTypes.FloatType(), nullable=True)
])
schema2 = SparkTypes.StructType([
SparkTypes.StructField("id2", SparkTypes.LongType(), nullable=True),
# SparkTypes.StructField("weight2", SparkTypes.LongType(), nullable=True)
])


def randlong():
return random.randint(-9223372036854775808, 9223372036854775807)

while True:
l1, l2 = randlong(), randlong()

# Sample values that crash:
# l1, l2 = 4661454128115150227, -5543241376386463808

print "Testing with %s, %s" % (l1, l2)
data1 = [(l1, ), (l2, )]
data2 = [(l1, )]

df1 = sqlc.createDataFrame(sc.parallelize(data1), schema1)
df2 = sqlc.createDataFrame(sc.parallelize(data2), schema2)

crash = True
if crash:
os.system("rm -rf /tmp/sparkbug")
df1.write.parquet("/tmp/sparkbug/vertex")
df2.write.parquet("/tmp/sparkbug/edge")

df1 = sqlc.read.load("/tmp/sparkbug/vertex")
df2 = sqlc.read.load("/tmp/sparkbug/edge")

sqlc.registerDataFrameAsTable(df1, "df1")
sqlc.registerDataFrameAsTable(df2, "df2")

result_df = sqlc.sql("""
SELECT
df1.id1
FROM df1
LEFT OUTER JOIN df2 ON df1.id1 = df2.id2
""")

print result_df.collect()
{code}

{code}
java.lang.ArrayIndexOutOfBoundsException: 1728150825
at 
org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.getValue(HashedRelation.scala:463)
at 
org.apache.spark.sql.execution.joins.LongHashedRelation.getValue(HashedRelation.scala:762)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:117)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:112)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:112)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:112)
at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:899)
at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:899)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1898)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1898)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/07/29 20:19:00 WARN TaskSetManager: Lost task 0.0 in stage 17.0 (TID 50, 
localhost): 

[jira] [Created] (SPARK-16802) joins.LongToUnsafeRowMap crashes with ArrayIndexOutOfBoundsException

2016-07-29 Thread Sylvain Zimmer (JIRA)
Sylvain Zimmer created SPARK-16802:
--

 Summary: joins.LongToUnsafeRowMap crashes with 
ArrayIndexOutOfBoundsException
 Key: SPARK-16802
 URL: https://issues.apache.org/jira/browse/SPARK-16802
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.0.0, 2.0.1
Reporter: Sylvain Zimmer


Hello!

This is a little similar to 
[SPARK-16740|https://issues.apache.org/jira/browse/SPARK-16740] (should I have 
reopened it?).

I would recommend to give another full review to {{HashedRelation.scala}}, 
particularly the new {{LongToUnsafeRowMap}} code. I've had a few other errors 
that I haven't managed to reproduce so far, as well as what I suspect could be 
memory leaks (I have a query in a loop OOMing after a few iterations despite 
not caching its results).

Here is the script to reproduce the ArrayIndexOutOfBoundsException on the 
current 2.0 branch:

{{code}}
import os
import random

from pyspark import SparkContext
from pyspark.sql import types as SparkTypes
from pyspark.sql import SQLContext

sc = SparkContext()
sqlc = SQLContext(sc)

schema1 = SparkTypes.StructType([
SparkTypes.StructField("id1", SparkTypes.LongType(), nullable=True),
# SparkTypes.StructField("weight1", SparkTypes.FloatType(), nullable=True)
])
schema2 = SparkTypes.StructType([
SparkTypes.StructField("id2", SparkTypes.LongType(), nullable=True),
# SparkTypes.StructField("weight2", SparkTypes.LongType(), nullable=True)
])


def randlong():
return random.randint(-9223372036854775808, 9223372036854775807)

while True:
l1, l2 = randlong(), randlong()

# Sample values that crash:
# l1, l2 = 4661454128115150227, -5543241376386463808

print "Testing with %s, %s" % (l1, l2)
data1 = [(l1, ), (l2, )]
data2 = [(l1, )]

df1 = sqlc.createDataFrame(sc.parallelize(data1), schema1)
df2 = sqlc.createDataFrame(sc.parallelize(data2), schema2)

crash = True
if crash:
os.system("rm -rf /tmp/sparkbug")
df1.write.parquet("/tmp/sparkbug/vertex")
df2.write.parquet("/tmp/sparkbug/edge")

df1 = sqlc.read.load("/tmp/sparkbug/vertex")
df2 = sqlc.read.load("/tmp/sparkbug/edge")

sqlc.registerDataFrameAsTable(df1, "df1")
sqlc.registerDataFrameAsTable(df2, "df2")

result_df = sqlc.sql("""
SELECT
df1.id1
FROM df1
LEFT OUTER JOIN df2 ON df1.id1 = df2.id2
""")

print result_df.collect()
{{code}}

{{code}}
java.lang.ArrayIndexOutOfBoundsException: 1728150825
at 
org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.getValue(HashedRelation.scala:463)
at 
org.apache.spark.sql.execution.joins.LongHashedRelation.getValue(HashedRelation.scala:762)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:117)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:112)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:112)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:112)
at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:899)
at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:899)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1898)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1898)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 

[jira] [Commented] (SPARK-16740) joins.LongToUnsafeRowMap crashes with NegativeArraySizeException

2016-07-26 Thread Sylvain Zimmer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15394644#comment-15394644
 ] 

Sylvain Zimmer commented on SPARK-16740:


OK! Looks like that would be [~davies] :-)

> joins.LongToUnsafeRowMap crashes with NegativeArraySizeException
> 
>
> Key: SPARK-16740
> URL: https://issues.apache.org/jira/browse/SPARK-16740
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core, SQL
>Affects Versions: 2.0.0
>Reporter: Sylvain Zimmer
>
> Hello,
> Here is a crash in Spark SQL joins, with a minimal reproducible test case. 
> Interestingly, it only seems to happen when reading Parquet data (I added a 
> {{crash = True}} variable to show it)
> This is an {{left_outer}} example, but it also crashes with a regular 
> {{inner}} join.
> {code}
> import os
> from pyspark import SparkContext
> from pyspark.sql import types as SparkTypes
> from pyspark.sql import SQLContext
> sc = SparkContext()
> sqlc = SQLContext(sc)
> schema1 = SparkTypes.StructType([
> SparkTypes.StructField("id1", SparkTypes.LongType(), nullable=True)
> ])
> schema2 = SparkTypes.StructType([
> SparkTypes.StructField("id2", SparkTypes.LongType(), nullable=True)
> ])
> # Valid Long values (-9223372036854775808 < -5543241376386463808 , 
> 4661454128115150227 < 9223372036854775807)
> data1 = [(4661454128115150227,), (-5543241376386463808,)]
> data2 = [(650460285, )]
> df1 = sqlc.createDataFrame(sc.parallelize(data1), schema1)
> df2 = sqlc.createDataFrame(sc.parallelize(data2), schema2)
> crash = True
> if crash:
> os.system("rm -rf /tmp/sparkbug")
> df1.write.parquet("/tmp/sparkbug/vertex")
> df2.write.parquet("/tmp/sparkbug/edge")
> df1 = sqlc.read.load("/tmp/sparkbug/vertex")
> df2 = sqlc.read.load("/tmp/sparkbug/edge")
> result_df = df2.join(df1, on=(df1.id1 == df2.id2), how="left_outer")
> # Should print [Row(id2=650460285, id1=None)]
> print result_df.collect()
> {code}
> When ran with {{spark-submit}}, the final {{collect()}} call crashes with 
> this:
> {code}
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> o61.collectToPython.
> : org.apache.spark.SparkException: Exception thrown in awaitResult:
>   at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
>   at 
> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:120)
>   at 
> org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:124)
>   at 
> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
>   at 
> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:242)
>   at 
> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:83)
>   at 
> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
>   at 
> org.apache.spark.sql.execution.BatchedDataSourceScanExec.consume(ExistingRDD.scala:225)
>   at 
> org.apache.spark.sql.execution.BatchedDataSourceScanExec.doProduce(ExistingRDD.scala:328)
>   at 
> org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
>   at 
> org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>   at 
> org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
>   at 
> org.apache.spark.sql.execution.BatchedDataSourceScanExec.produce(ExistingRDD.scala:225)
>   at 
> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:77)
>   at 
> org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
>   at 
> 

[jira] [Commented] (SPARK-16740) joins.LongToUnsafeRowMap crashes with NegativeArraySizeException

2016-07-26 Thread Sylvain Zimmer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15394628#comment-15394628
 ] 

Sylvain Zimmer commented on SPARK-16740:


Thanks! I just did. Let me know if that's okay.

> joins.LongToUnsafeRowMap crashes with NegativeArraySizeException
> 
>
> Key: SPARK-16740
> URL: https://issues.apache.org/jira/browse/SPARK-16740
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core, SQL
>Affects Versions: 2.0.0
>Reporter: Sylvain Zimmer
>
> Hello,
> Here is a crash in Spark SQL joins, with a minimal reproducible test case. 
> Interestingly, it only seems to happen when reading Parquet data (I added a 
> {{crash = True}} variable to show it)
> This is an {{left_outer}} example, but it also crashes with a regular 
> {{inner}} join.
> {code}
> import os
> from pyspark import SparkContext
> from pyspark.sql import types as SparkTypes
> from pyspark.sql import SQLContext
> sc = SparkContext()
> sqlc = SQLContext(sc)
> schema1 = SparkTypes.StructType([
> SparkTypes.StructField("id1", SparkTypes.LongType(), nullable=True)
> ])
> schema2 = SparkTypes.StructType([
> SparkTypes.StructField("id2", SparkTypes.LongType(), nullable=True)
> ])
> # Valid Long values (-9223372036854775808 < -5543241376386463808 , 
> 4661454128115150227 < 9223372036854775807)
> data1 = [(4661454128115150227,), (-5543241376386463808,)]
> data2 = [(650460285, )]
> df1 = sqlc.createDataFrame(sc.parallelize(data1), schema1)
> df2 = sqlc.createDataFrame(sc.parallelize(data2), schema2)
> crash = True
> if crash:
> os.system("rm -rf /tmp/sparkbug")
> df1.write.parquet("/tmp/sparkbug/vertex")
> df2.write.parquet("/tmp/sparkbug/edge")
> df1 = sqlc.read.load("/tmp/sparkbug/vertex")
> df2 = sqlc.read.load("/tmp/sparkbug/edge")
> result_df = df2.join(df1, on=(df1.id1 == df2.id2), how="left_outer")
> # Should print [Row(id2=650460285, id1=None)]
> print result_df.collect()
> {code}
> When ran with {{spark-submit}}, the final {{collect()}} call crashes with 
> this:
> {code}
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> o61.collectToPython.
> : org.apache.spark.SparkException: Exception thrown in awaitResult:
>   at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
>   at 
> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:120)
>   at 
> org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:124)
>   at 
> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
>   at 
> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:242)
>   at 
> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:83)
>   at 
> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
>   at 
> org.apache.spark.sql.execution.BatchedDataSourceScanExec.consume(ExistingRDD.scala:225)
>   at 
> org.apache.spark.sql.execution.BatchedDataSourceScanExec.doProduce(ExistingRDD.scala:328)
>   at 
> org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
>   at 
> org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>   at 
> org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
>   at 
> org.apache.spark.sql.execution.BatchedDataSourceScanExec.produce(ExistingRDD.scala:225)
>   at 
> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:77)
>   at 
> org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
>   at 
> 

[jira] [Comment Edited] (SPARK-16740) joins.LongToUnsafeRowMap crashes with NegativeArraySizeException

2016-07-26 Thread Sylvain Zimmer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15394437#comment-15394437
 ] 

Sylvain Zimmer edited comment on SPARK-16740 at 7/26/16 7:54 PM:
-

I'm not a Scala expert but from a quick review of the code, it appears that 
it's easy to overflow the {{range}} variable in {{optimize()}}:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L608

In my example, that would yield
{code}
scala> val range = 4661454128115150227L - (-5543241376386463808L)
range: Long = -8242048569207937581
{code}

Maybe we should add {{range >= 0}} as a condition of doing that optimization?


was (Author: sylvinus):
I'm not a Scala expert but from a quick review of the code, it appears that 
it's easy to overflow the {{range}} variable in {{optimize()}}:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L608

In my example, that would yield
{{ scala> val range = 4661454128115150227L - (-5543241376386463808L)
range: Long = -8242048569207937581
}}

Maybe we should add {{range >= 0}} as a condition of doing that optimization?

> joins.LongToUnsafeRowMap crashes with NegativeArraySizeException
> 
>
> Key: SPARK-16740
> URL: https://issues.apache.org/jira/browse/SPARK-16740
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core, SQL
>Affects Versions: 2.0.0
>Reporter: Sylvain Zimmer
>
> Hello,
> Here is a crash in Spark SQL joins, with a minimal reproducible test case. 
> Interestingly, it only seems to happen when reading Parquet data (I added a 
> {{crash = True}} variable to show it)
> This is an {{left_outer}} example, but it also crashes with a regular 
> {{inner}} join.
> {code}
> import os
> from pyspark import SparkContext
> from pyspark.sql import types as SparkTypes
> from pyspark.sql import SQLContext
> sc = SparkContext()
> sqlc = SQLContext(sc)
> schema1 = SparkTypes.StructType([
> SparkTypes.StructField("id1", SparkTypes.LongType(), nullable=True)
> ])
> schema2 = SparkTypes.StructType([
> SparkTypes.StructField("id2", SparkTypes.LongType(), nullable=True)
> ])
> # Valid Long values (-9223372036854775808 < -5543241376386463808 , 
> 4661454128115150227 < 9223372036854775807)
> data1 = [(4661454128115150227,), (-5543241376386463808,)]
> data2 = [(650460285, )]
> df1 = sqlc.createDataFrame(sc.parallelize(data1), schema1)
> df2 = sqlc.createDataFrame(sc.parallelize(data2), schema2)
> crash = True
> if crash:
> os.system("rm -rf /tmp/sparkbug")
> df1.write.parquet("/tmp/sparkbug/vertex")
> df2.write.parquet("/tmp/sparkbug/edge")
> df1 = sqlc.read.load("/tmp/sparkbug/vertex")
> df2 = sqlc.read.load("/tmp/sparkbug/edge")
> result_df = df2.join(df1, on=(df1.id1 == df2.id2), how="left_outer")
> # Should print [Row(id2=650460285, id1=None)]
> print result_df.collect()
> {code}
> When ran with {{spark-submit}}, the final {{collect()}} call crashes with 
> this:
> {code}
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> o61.collectToPython.
> : org.apache.spark.SparkException: Exception thrown in awaitResult:
>   at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
>   at 
> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:120)
>   at 
> org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:124)
>   at 
> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
>   at 
> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:242)
>   at 
> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:83)
>   at 
> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
>   at 
> org.apache.spark.sql.execution.BatchedDataSourceScanExec.consume(ExistingRDD.scala:225)
>   at 
> 

[jira] [Commented] (SPARK-16740) joins.LongToUnsafeRowMap crashes with NegativeArraySizeException

2016-07-26 Thread Sylvain Zimmer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15394437#comment-15394437
 ] 

Sylvain Zimmer commented on SPARK-16740:


I'm not a Scala expert but from a quick review of the code, it appears that 
it's easy to overflow the {{range}} variable in {{optimize()}}:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L608

In my example, that would yield
{{ scala> val range = 4661454128115150227L - (-5543241376386463808L)
range: Long = -8242048569207937581
}}

Maybe we should add {{range >= 0}} as a condition of doing that optimization?

> joins.LongToUnsafeRowMap crashes with NegativeArraySizeException
> 
>
> Key: SPARK-16740
> URL: https://issues.apache.org/jira/browse/SPARK-16740
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core, SQL
>Affects Versions: 2.0.0
>Reporter: Sylvain Zimmer
>
> Hello,
> Here is a crash in Spark SQL joins, with a minimal reproducible test case. 
> Interestingly, it only seems to happen when reading Parquet data (I added a 
> {{crash = True}} variable to show it)
> This is an {{left_outer}} example, but it also crashes with a regular 
> {{inner}} join.
> {code}
> import os
> from pyspark import SparkContext
> from pyspark.sql import types as SparkTypes
> from pyspark.sql import SQLContext
> sc = SparkContext()
> sqlc = SQLContext(sc)
> schema1 = SparkTypes.StructType([
> SparkTypes.StructField("id1", SparkTypes.LongType(), nullable=True)
> ])
> schema2 = SparkTypes.StructType([
> SparkTypes.StructField("id2", SparkTypes.LongType(), nullable=True)
> ])
> # Valid Long values (-9223372036854775808 < -5543241376386463808 , 
> 4661454128115150227 < 9223372036854775807)
> data1 = [(4661454128115150227,), (-5543241376386463808,)]
> data2 = [(650460285, )]
> df1 = sqlc.createDataFrame(sc.parallelize(data1), schema1)
> df2 = sqlc.createDataFrame(sc.parallelize(data2), schema2)
> crash = True
> if crash:
> os.system("rm -rf /tmp/sparkbug")
> df1.write.parquet("/tmp/sparkbug/vertex")
> df2.write.parquet("/tmp/sparkbug/edge")
> df1 = sqlc.read.load("/tmp/sparkbug/vertex")
> df2 = sqlc.read.load("/tmp/sparkbug/edge")
> result_df = df2.join(df1, on=(df1.id1 == df2.id2), how="left_outer")
> # Should print [Row(id2=650460285, id1=None)]
> print result_df.collect()
> {code}
> When ran with {{spark-submit}}, the final {{collect()}} call crashes with 
> this:
> {code}
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> o61.collectToPython.
> : org.apache.spark.SparkException: Exception thrown in awaitResult:
>   at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
>   at 
> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:120)
>   at 
> org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:124)
>   at 
> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
>   at 
> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:242)
>   at 
> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:83)
>   at 
> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
>   at 
> org.apache.spark.sql.execution.BatchedDataSourceScanExec.consume(ExistingRDD.scala:225)
>   at 
> org.apache.spark.sql.execution.BatchedDataSourceScanExec.doProduce(ExistingRDD.scala:328)
>   at 
> org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
>   at 
> org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>   at 
> 

[jira] [Created] (SPARK-16740) joins.LongToUnsafeRowMap crashes with NegativeArraySizeException

2016-07-26 Thread Sylvain Zimmer (JIRA)
Sylvain Zimmer created SPARK-16740:
--

 Summary: joins.LongToUnsafeRowMap crashes with 
NegativeArraySizeException
 Key: SPARK-16740
 URL: https://issues.apache.org/jira/browse/SPARK-16740
 Project: Spark
  Issue Type: Bug
  Components: PySpark, Spark Core, SQL
Affects Versions: 2.0.0
Reporter: Sylvain Zimmer


Hello,

Here is a crash in Spark SQL joins, with a minimal reproducible test case. 
Interestingly, it only seems to happen when reading Parquet data (I added a 
{{crash = True}} variable to show it)

This is an {{left_outer}} example, but it also crashes with a regular {{inner}} 
join.

{code}
import os

from pyspark import SparkContext
from pyspark.sql import types as SparkTypes
from pyspark.sql import SQLContext

sc = SparkContext()
sqlc = SQLContext(sc)

schema1 = SparkTypes.StructType([
SparkTypes.StructField("id1", SparkTypes.LongType(), nullable=True)
])
schema2 = SparkTypes.StructType([
SparkTypes.StructField("id2", SparkTypes.LongType(), nullable=True)
])

# Valid Long values (-9223372036854775808 < -5543241376386463808 , 
4661454128115150227 < 9223372036854775807)
data1 = [(4661454128115150227,), (-5543241376386463808,)]
data2 = [(650460285, )]

df1 = sqlc.createDataFrame(sc.parallelize(data1), schema1)
df2 = sqlc.createDataFrame(sc.parallelize(data2), schema2)

crash = True
if crash:
os.system("rm -rf /tmp/sparkbug")
df1.write.parquet("/tmp/sparkbug/vertex")
df2.write.parquet("/tmp/sparkbug/edge")

df1 = sqlc.read.load("/tmp/sparkbug/vertex")
df2 = sqlc.read.load("/tmp/sparkbug/edge")

result_df = df2.join(df1, on=(df1.id1 == df2.id2), how="left_outer")

# Should print [Row(id2=650460285, id1=None)]
print result_df.collect()
{code}

When ran with {{spark-submit}}, the final {{collect()}} call crashes with this:

{code}
py4j.protocol.Py4JJavaError: An error occurred while calling 
o61.collectToPython.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:120)
at 
org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at 
org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:124)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:242)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:83)
at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at 
org.apache.spark.sql.execution.BatchedDataSourceScanExec.consume(ExistingRDD.scala:225)
at 
org.apache.spark.sql.execution.BatchedDataSourceScanExec.doProduce(ExistingRDD.scala:328)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at 
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
at 
org.apache.spark.sql.execution.BatchedDataSourceScanExec.produce(ExistingRDD.scala:225)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:77)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at 

[jira] [Commented] (SPARK-16700) StructType doesn't accept Python dicts anymore

2016-07-25 Thread Sylvain Zimmer (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15392200#comment-15392200
 ] 

Sylvain Zimmer commented on SPARK-16700:


I dug into this a bit more: 

{{_verify_type({}, struct_schema)}} was already raising a similar exception in 
Spark 1.6.2, however schema validation wasn't being enforced at all by 
{{createDataFrame}} : 
https://github.com/apache/spark/blob/branch-1.6/python/pyspark/sql/context.py#L418

In 2.0.0, it seems that it is done over each row of the data:
https://github.com/apache/spark/blob/master/python/pyspark/sql/session.py#L504

I think there are 2 issues that should be fixed here:
 - {{_verify_type({}, struct_schema)}} shouldn't raise, because as far as I can 
tell dicts behave as expected and have their items correctly mapped as struct 
fields.
 - There should be a way to go back to 1.6.x-like behaviour and disable schema 
verification in {{createDataFrame}}. The {{prepare()}} function is being 
map()'d over all the data coming from Python, which I think will definitely 
hurt performance for large datasets and complex schemas. Leaving it on by 
default but adding a flag to disable it would be a good solution. Without this 
users will probably have to implement their own {{createDataFrame}} function 
like I did.


> StructType doesn't accept Python dicts anymore
> --
>
> Key: SPARK-16700
> URL: https://issues.apache.org/jira/browse/SPARK-16700
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.0
>Reporter: Sylvain Zimmer
>
> Hello,
> I found this issue while testing my codebase with 2.0.0-rc5
> StructType in Spark 1.6.2 accepts the Python  type, which is very 
> handy. 2.0.0-rc5 does not and throws an error.
> I don't know if this was intended but I'd advocate for this behaviour to 
> remain the same. MapType is probably wasteful when your key names never 
> change and switching to Python tuples would be cumbersome.
> Here is a minimal script to reproduce the issue: 
> {code}
> from pyspark import SparkContext
> from pyspark.sql import types as SparkTypes
> from pyspark.sql import SQLContext
> sc = SparkContext()
> sqlc = SQLContext(sc)
> struct_schema = SparkTypes.StructType([
> SparkTypes.StructField("id", SparkTypes.LongType())
> ])
> rdd = sc.parallelize([{"id": 0}, {"id": 1}])
> df = sqlc.createDataFrame(rdd, struct_schema)
> print df.collect()
> # 1.6.2 prints [Row(id=0), Row(id=1)]
> # 2.0.0-rc5 raises TypeError: StructType can not accept object {'id': 0} in 
> type 
> {code}
> Thanks!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-16700) StructType doesn't accept Python dicts anymore

2016-07-24 Thread Sylvain Zimmer (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sylvain Zimmer updated SPARK-16700:
---
Component/s: (was: Spark Core)

> StructType doesn't accept Python dicts anymore
> --
>
> Key: SPARK-16700
> URL: https://issues.apache.org/jira/browse/SPARK-16700
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.0
>Reporter: Sylvain Zimmer
>
> Hello,
> I found this issue while testing my codebase with 2.0.0-rc5
> StructType in Spark 1.6.2 accepts the Python  type, which is very 
> handy. 2.0.0-rc5 does not and throws an error.
> I don't know if this was intended but I'd advocate for this behaviour to 
> remain the same. MapType is probably wasteful when your key names never 
> change and switching to Python tuples would be cumbersome.
> Here is a minimal script to reproduce the issue: 
> {code}
> from pyspark import SparkContext
> from pyspark.sql import types as SparkTypes
> from pyspark.sql import SQLContext
> sc = SparkContext()
> sqlc = SQLContext(sc)
> struct_schema = SparkTypes.StructType([
> SparkTypes.StructField("id", SparkTypes.LongType())
> ])
> rdd = sc.parallelize([{"id": 0}, {"id": 1}])
> df = sqlc.createDataFrame(rdd, struct_schema)
> print df.collect()
> # 1.6.2 prints [Row(id=0), Row(id=1)]
> # 2.0.0-rc5 raises TypeError: StructType can not accept object {'id': 0} in 
> type 
> {code}
> Thanks!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-16700) StructType doesn't accept Python dicts anymore

2016-07-24 Thread Sylvain Zimmer (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sylvain Zimmer updated SPARK-16700:
---
Description: 
Hello,

I found this issue while testing my codebase with 2.0.0-rc5

StructType in Spark 1.6.2 accepts the Python  type, which is very handy. 
2.0.0-rc5 does not and throws an error.

I don't know if this was intended but I'd advocate for this behaviour to remain 
the same. MapType is probably wasteful when your key names never change and 
switching to Python tuples would be cumbersome.

Here is a minimal script to reproduce the issue: 

{code}
from pyspark import SparkContext
from pyspark.sql import types as SparkTypes
from pyspark.sql import SQLContext


sc = SparkContext()
sqlc = SQLContext(sc)

struct_schema = SparkTypes.StructType([
SparkTypes.StructField("id", SparkTypes.LongType())
])

rdd = sc.parallelize([{"id": 0}, {"id": 1}])

df = sqlc.createDataFrame(rdd, struct_schema)

print df.collect()

# 1.6.2 prints [Row(id=0), Row(id=1)]

# 2.0.0-rc5 raises TypeError: StructType can not accept object {'id': 0} in 
type 

{code}

Thanks!

  was:
Hello,

I found this issue while testing my codebase with 2.0.0-rc5

StructType in Spark 1.6.2 accepts the Python  type, which is very handy. 
2.0.0-rc5 does not and throws an error.

I don't know if this was intended but I'd advocate for this behaviour to remain 
the same. MapType is probably wasteful when your key names never change and 
switching to Python tuples would be cumbersome.

Here is a minimal script to reproduce the issue: 

{code:python}
from pyspark import SparkContext
from pyspark.sql import types as SparkTypes
from pyspark.sql import SQLContext


sc = SparkContext()
sqlc = SQLContext(sc)

struct_schema = SparkTypes.StructType([
SparkTypes.StructField("id", SparkTypes.LongType())
])

rdd = sc.parallelize([{"id": 0}, {"id": 1}])

df = sqlc.createDataFrame(rdd, struct_schema)

print df.collect()

# 1.6.2 prints [Row(id=0), Row(id=1)]

# 2.0.0-rc5 raises TypeError: StructType can not accept object {'id': 0} in 
type 

{code}

Thanks!


> StructType doesn't accept Python dicts anymore
> --
>
> Key: SPARK-16700
> URL: https://issues.apache.org/jira/browse/SPARK-16700
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 2.0.0
>Reporter: Sylvain Zimmer
>
> Hello,
> I found this issue while testing my codebase with 2.0.0-rc5
> StructType in Spark 1.6.2 accepts the Python  type, which is very 
> handy. 2.0.0-rc5 does not and throws an error.
> I don't know if this was intended but I'd advocate for this behaviour to 
> remain the same. MapType is probably wasteful when your key names never 
> change and switching to Python tuples would be cumbersome.
> Here is a minimal script to reproduce the issue: 
> {code}
> from pyspark import SparkContext
> from pyspark.sql import types as SparkTypes
> from pyspark.sql import SQLContext
> sc = SparkContext()
> sqlc = SQLContext(sc)
> struct_schema = SparkTypes.StructType([
> SparkTypes.StructField("id", SparkTypes.LongType())
> ])
> rdd = sc.parallelize([{"id": 0}, {"id": 1}])
> df = sqlc.createDataFrame(rdd, struct_schema)
> print df.collect()
> # 1.6.2 prints [Row(id=0), Row(id=1)]
> # 2.0.0-rc5 raises TypeError: StructType can not accept object {'id': 0} in 
> type 
> {code}
> Thanks!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-16700) StructType doesn't accept Python dicts anymore

2016-07-24 Thread Sylvain Zimmer (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sylvain Zimmer updated SPARK-16700:
---
Component/s: PySpark

> StructType doesn't accept Python dicts anymore
> --
>
> Key: SPARK-16700
> URL: https://issues.apache.org/jira/browse/SPARK-16700
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 2.0.0
>Reporter: Sylvain Zimmer
>
> Hello,
> I found this issue while testing my codebase with 2.0.0-rc5
> StructType in Spark 1.6.2 accepts the Python  type, which is very 
> handy. 2.0.0-rc5 does not and throws an error.
> I don't know if this was intended but I'd advocate for this behaviour to 
> remain the same. MapType is probably wasteful when your key names never 
> change and switching to Python tuples would be cumbersome.
> Here is a minimal script to reproduce the issue: 
> {code}
> from pyspark import SparkContext
> from pyspark.sql import types as SparkTypes
> from pyspark.sql import SQLContext
> sc = SparkContext()
> sqlc = SQLContext(sc)
> struct_schema = SparkTypes.StructType([
> SparkTypes.StructField("id", SparkTypes.LongType())
> ])
> rdd = sc.parallelize([{"id": 0}, {"id": 1}])
> df = sqlc.createDataFrame(rdd, struct_schema)
> print df.collect()
> # 1.6.2 prints [Row(id=0), Row(id=1)]
> # 2.0.0-rc5 raises TypeError: StructType can not accept object {'id': 0} in 
> type 
> {code}
> Thanks!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-16700) StructType doesn't accept Python dicts anymore

2016-07-24 Thread Sylvain Zimmer (JIRA)
Sylvain Zimmer created SPARK-16700:
--

 Summary: StructType doesn't accept Python dicts anymore
 Key: SPARK-16700
 URL: https://issues.apache.org/jira/browse/SPARK-16700
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.0.0
Reporter: Sylvain Zimmer


Hello,

I found this issue while testing my codebase with 2.0.0-rc5

StructType in Spark 1.6.2 accepts the Python  type, which is very handy. 
2.0.0-rc5 does not and throws an error.

I don't know if this was intended but I'd advocate for this behaviour to remain 
the same. MapType is probably wasteful when your key names never change and 
switching to Python tuples would be cumbersome.

Here is a minimal script to reproduce the issue: 

{code:python}
from pyspark import SparkContext
from pyspark.sql import types as SparkTypes
from pyspark.sql import SQLContext


sc = SparkContext()
sqlc = SQLContext(sc)

struct_schema = SparkTypes.StructType([
SparkTypes.StructField("id", SparkTypes.LongType())
])

rdd = sc.parallelize([{"id": 0}, {"id": 1}])

df = sqlc.createDataFrame(rdd, struct_schema)

print df.collect()

# 1.6.2 prints [Row(id=0), Row(id=1)]

# 2.0.0-rc5 raises TypeError: StructType can not accept object {'id': 0} in 
type 

{code}

Thanks!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org