[jira] [Commented] (SPARK-16700) StructType doesn't accept Python dicts anymore
[ https://issues.apache.org/jira/browse/SPARK-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406754#comment-15406754 ] Sylvain Zimmer commented on SPARK-16700: The verifySchema flag works great, and the {{dict}} issue seems to be fixed for me. Thanks a lot!! > StructType doesn't accept Python dicts anymore > -- > > Key: SPARK-16700 > URL: https://issues.apache.org/jira/browse/SPARK-16700 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.0.0 >Reporter: Sylvain Zimmer >Assignee: Davies Liu > > Hello, > I found this issue while testing my codebase with 2.0.0-rc5 > StructType in Spark 1.6.2 accepts the Python type, which is very > handy. 2.0.0-rc5 does not and throws an error. > I don't know if this was intended but I'd advocate for this behaviour to > remain the same. MapType is probably wasteful when your key names never > change and switching to Python tuples would be cumbersome. > Here is a minimal script to reproduce the issue: > {code} > from pyspark import SparkContext > from pyspark.sql import types as SparkTypes > from pyspark.sql import SQLContext > sc = SparkContext() > sqlc = SQLContext(sc) > struct_schema = SparkTypes.StructType([ > SparkTypes.StructField("id", SparkTypes.LongType()) > ]) > rdd = sc.parallelize([{"id": 0}, {"id": 1}]) > df = sqlc.createDataFrame(rdd, struct_schema) > print df.collect() > # 1.6.2 prints [Row(id=0), Row(id=1)] > # 2.0.0-rc5 raises TypeError: StructType can not accept object {'id': 0} in > type > {code} > Thanks! -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16826) java.util.Hashtable limits the throughput of PARSE_URL()
[ https://issues.apache.org/jira/browse/SPARK-16826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403262#comment-15403262 ] Sylvain Zimmer commented on SPARK-16826: [~srowen] what about this? https://github.com/sylvinus/spark/commit/98119a08368b1cd1faf3f25a32910ad6717c5c02 The tests seem to pass and I don't think it uses the problematic code paths in java.net.URL (except for getFile but that may be could be fixed easily) > java.util.Hashtable limits the throughput of PARSE_URL() > > > Key: SPARK-16826 > URL: https://issues.apache.org/jira/browse/SPARK-16826 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Sylvain Zimmer > > Hello! > I'm using {{c4.8xlarge}} instances on EC2 with 36 cores and doing lots of > {{parse_url(url, "host")}} in Spark SQL. > Unfortunately it seems that there is an internal thread-safe cache in there, > and the instances end up being 90% idle. > When I view the thread dump for my executors, most of the executor threads > are "BLOCKED", in that state: > {code} > java.util.Hashtable.get(Hashtable.java:362) > java.net.URL.getURLStreamHandler(URL.java:1135) > java.net.URL.(URL.java:599) > java.net.URL.(URL.java:490) > java.net.URL.(URL.java:439) > org.apache.spark.sql.catalyst.expressions.ParseUrl.getUrl(stringExpressions.scala:731) > org.apache.spark.sql.catalyst.expressions.ParseUrl.parseUrlWithoutKey(stringExpressions.scala:772) > org.apache.spark.sql.catalyst.expressions.ParseUrl.eval(stringExpressions.scala:785) > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown > Source) > org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69) > org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69) > org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:203) > org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:202) > scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463) > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147) > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > org.apache.spark.scheduler.Task.run(Task.scala:85) > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > {code} > However, when I switch from 1 executor with 36 cores to 9 executors with 4 > cores, throughput is almost 10x higher and the CPUs are back at ~100% use. > Thanks! -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16826) java.util.Hashtable limits the throughput of PARSE_URL()
[ https://issues.apache.org/jira/browse/SPARK-16826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403236#comment-15403236 ] Sylvain Zimmer commented on SPARK-16826: Sorry I can't be more helpful on the Java side... But I think there must be some high-quality URL parsing code somewhere in the Apache foundation already :-) > java.util.Hashtable limits the throughput of PARSE_URL() > > > Key: SPARK-16826 > URL: https://issues.apache.org/jira/browse/SPARK-16826 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Sylvain Zimmer > > Hello! > I'm using {{c4.8xlarge}} instances on EC2 with 36 cores and doing lots of > {{parse_url(url, "host")}} in Spark SQL. > Unfortunately it seems that there is an internal thread-safe cache in there, > and the instances end up being 90% idle. > When I view the thread dump for my executors, most of the executor threads > are "BLOCKED", in that state: > {code} > java.util.Hashtable.get(Hashtable.java:362) > java.net.URL.getURLStreamHandler(URL.java:1135) > java.net.URL.(URL.java:599) > java.net.URL.(URL.java:490) > java.net.URL.(URL.java:439) > org.apache.spark.sql.catalyst.expressions.ParseUrl.getUrl(stringExpressions.scala:731) > org.apache.spark.sql.catalyst.expressions.ParseUrl.parseUrlWithoutKey(stringExpressions.scala:772) > org.apache.spark.sql.catalyst.expressions.ParseUrl.eval(stringExpressions.scala:785) > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown > Source) > org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69) > org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69) > org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:203) > org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:202) > scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463) > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147) > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > org.apache.spark.scheduler.Task.run(Task.scala:85) > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > {code} > However, when I switch from 1 executor with 36 cores to 9 executors with 4 > cores, throughput is almost 10x higher and the CPUs are back at ~100% use. > Thanks! -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-16826) java.util.Hashtable limits the throughput of PARSE_URL()
[ https://issues.apache.org/jira/browse/SPARK-16826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403165#comment-15403165 ] Sylvain Zimmer edited comment on SPARK-16826 at 8/2/16 1:15 AM: [~srowen] thanks for the pointers! I'm parsing every hyperlink found in Common Crawl, so there are billions of unique ones, no way around it. Wouldn't it be possible to switch to another implementation with an API similar to java.net.URL? As I understand it we never need the URLStreamHandler in the first place anyway? I'm not a Java expert but what about {{java.net.URI}} or {{org.apache.catalina.util.URL}} for instance? was (Author: sylvinus): [~srowen] thanks for the pointers! I'm parsing every hyperlink found in Common Crawl, so there are billions of unique ones, no way around it. Wouldn't it be possible to switch to another implementation with an API similar to java.net.URL? As I understand it we never need the URLStreamHandler in the first place anyway? I'm not a Java expert but what about {java.net.URI} or {org.apache.catalina.util.URL} for instance? > java.util.Hashtable limits the throughput of PARSE_URL() > > > Key: SPARK-16826 > URL: https://issues.apache.org/jira/browse/SPARK-16826 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Sylvain Zimmer > > Hello! > I'm using {{c4.8xlarge}} instances on EC2 with 36 cores and doing lots of > {{parse_url(url, "host")}} in Spark SQL. > Unfortunately it seems that there is an internal thread-safe cache in there, > and the instances end up being 90% idle. > When I view the thread dump for my executors, most of the executor threads > are "BLOCKED", in that state: > {code} > java.util.Hashtable.get(Hashtable.java:362) > java.net.URL.getURLStreamHandler(URL.java:1135) > java.net.URL.(URL.java:599) > java.net.URL.(URL.java:490) > java.net.URL.(URL.java:439) > org.apache.spark.sql.catalyst.expressions.ParseUrl.getUrl(stringExpressions.scala:731) > org.apache.spark.sql.catalyst.expressions.ParseUrl.parseUrlWithoutKey(stringExpressions.scala:772) > org.apache.spark.sql.catalyst.expressions.ParseUrl.eval(stringExpressions.scala:785) > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown > Source) > org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69) > org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69) > org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:203) > org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:202) > scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463) > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147) > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > org.apache.spark.scheduler.Task.run(Task.scala:85) > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > {code} > However, when I switch from 1 executor with 36 cores to 9 executors with 4 > cores, throughput is almost 10x higher and the CPUs are back at ~100% use. > Thanks! -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16826) java.util.Hashtable limits the throughput of PARSE_URL()
[ https://issues.apache.org/jira/browse/SPARK-16826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15403165#comment-15403165 ] Sylvain Zimmer commented on SPARK-16826: [~srowen] thanks for the pointers! I'm parsing every hyperlink found in Common Crawl, so there are billions of unique ones, no way around it. Wouldn't it be possible to switch to another implementation with an API similar to java.net.URL? As I understand it we never need the URLStreamHandler in the first place anyway? I'm not a Java expert but what about {java.net.URI} or {org.apache.catalina.util.URL} for instance? > java.util.Hashtable limits the throughput of PARSE_URL() > > > Key: SPARK-16826 > URL: https://issues.apache.org/jira/browse/SPARK-16826 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Sylvain Zimmer > > Hello! > I'm using {{c4.8xlarge}} instances on EC2 with 36 cores and doing lots of > {{parse_url(url, "host")}} in Spark SQL. > Unfortunately it seems that there is an internal thread-safe cache in there, > and the instances end up being 90% idle. > When I view the thread dump for my executors, most of the executor threads > are "BLOCKED", in that state: > {code} > java.util.Hashtable.get(Hashtable.java:362) > java.net.URL.getURLStreamHandler(URL.java:1135) > java.net.URL.(URL.java:599) > java.net.URL.(URL.java:490) > java.net.URL.(URL.java:439) > org.apache.spark.sql.catalyst.expressions.ParseUrl.getUrl(stringExpressions.scala:731) > org.apache.spark.sql.catalyst.expressions.ParseUrl.parseUrlWithoutKey(stringExpressions.scala:772) > org.apache.spark.sql.catalyst.expressions.ParseUrl.eval(stringExpressions.scala:785) > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown > Source) > org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69) > org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69) > org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:203) > org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:202) > scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463) > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147) > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > org.apache.spark.scheduler.Task.run(Task.scala:85) > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > {code} > However, when I switch from 1 executor with 36 cores to 9 executors with 4 > cores, throughput is almost 10x higher and the CPUs are back at ~100% use. > Thanks! -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-16826) java.util.Hashtable limits the throughput of PARSE_URL()
[ https://issues.apache.org/jira/browse/SPARK-16826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sylvain Zimmer updated SPARK-16826: --- Description: Hello! I'm using {{c4.8xlarge}} instances on EC2 with 36 cores and doing lots of {{parse_url(url, "host")}} in Spark SQL. Unfortunately it seems that there is an internal thread-safe cache in there, and the instances end up being 90% idle. When I view the thread dump for my executors, most of the executor threads are "BLOCKED", in that state: {code} java.util.Hashtable.get(Hashtable.java:362) java.net.URL.getURLStreamHandler(URL.java:1135) java.net.URL.(URL.java:599) java.net.URL.(URL.java:490) java.net.URL.(URL.java:439) org.apache.spark.sql.catalyst.expressions.ParseUrl.getUrl(stringExpressions.scala:731) org.apache.spark.sql.catalyst.expressions.ParseUrl.parseUrlWithoutKey(stringExpressions.scala:772) org.apache.spark.sql.catalyst.expressions.ParseUrl.eval(stringExpressions.scala:785) org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source) org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69) org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69) org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:203) org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:202) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463) org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) org.apache.spark.scheduler.Task.run(Task.scala:85) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) {code} However, when I switch from 1 executor with 36 cores to 9 executors with 4 cores, throughput is almost 10x higher and the CPUs are back at ~100% use. Thanks! was: Hello! I'm using {{c4.8xlarge}} instances on EC2 with 36 cores and doing lots of {{parse_url(url, "host")}} in Spark SQL. Unfortunately it seems that there is an internal thread-safe cache in there, and the instances end up being 90% idle. When I view the thread dump for my executors, most of the 36 cores are in status "BLOCKED", in that state: {code} java.util.Hashtable.get(Hashtable.java:362) java.net.URL.getURLStreamHandler(URL.java:1135) java.net.URL.(URL.java:599) java.net.URL.(URL.java:490) java.net.URL.(URL.java:439) org.apache.spark.sql.catalyst.expressions.ParseUrl.getUrl(stringExpressions.scala:731) org.apache.spark.sql.catalyst.expressions.ParseUrl.parseUrlWithoutKey(stringExpressions.scala:772) org.apache.spark.sql.catalyst.expressions.ParseUrl.eval(stringExpressions.scala:785) org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source) org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69) org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69) org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:203) org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:202) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463) org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) org.apache.spark.scheduler.Task.run(Task.scala:85) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
[jira] [Updated] (SPARK-16826) java.util.Hashtable limits the throughput of PARSE_URL()
[ https://issues.apache.org/jira/browse/SPARK-16826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sylvain Zimmer updated SPARK-16826: --- Description: Hello! I'm using {{c4.8xlarge}} instances on EC2 with 36 cores and doing lots of {{parse_url(url, "host")}} in Spark SQL. Unfortunately it seems that there is an internal thread-safe cache in there, and the instances end up being 90% idle. When I view the thread dump for my executors, most of the 36 cores are in status "BLOCKED", in that state: {code} java.util.Hashtable.get(Hashtable.java:362) java.net.URL.getURLStreamHandler(URL.java:1135) java.net.URL.(URL.java:599) java.net.URL.(URL.java:490) java.net.URL.(URL.java:439) org.apache.spark.sql.catalyst.expressions.ParseUrl.getUrl(stringExpressions.scala:731) org.apache.spark.sql.catalyst.expressions.ParseUrl.parseUrlWithoutKey(stringExpressions.scala:772) org.apache.spark.sql.catalyst.expressions.ParseUrl.eval(stringExpressions.scala:785) org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source) org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69) org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69) org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:203) org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:202) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463) org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) org.apache.spark.scheduler.Task.run(Task.scala:85) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) {code} However, when I switch from 1 executor with 36 cores to 9 executors with 4 cores, throughput is almost 10x higher and the CPUs are back at ~100% use. Thanks! was: Hello! I'm using {{c4.8xlarge}} instances on EC2 with 36 cores and doing lots of {{parse_url(url, "host")}} in Spark SQL. Unfortunately it seems that there is an internal thread-safe cache in there, and the instances end up being 90% idle. When I view the thread dump for my executors, most of the 36 cores are in status "BLOCKED", in that stage: {code} java.util.Hashtable.get(Hashtable.java:362) java.net.URL.getURLStreamHandler(URL.java:1135) java.net.URL.(URL.java:599) java.net.URL.(URL.java:490) java.net.URL.(URL.java:439) org.apache.spark.sql.catalyst.expressions.ParseUrl.getUrl(stringExpressions.scala:731) org.apache.spark.sql.catalyst.expressions.ParseUrl.parseUrlWithoutKey(stringExpressions.scala:772) org.apache.spark.sql.catalyst.expressions.ParseUrl.eval(stringExpressions.scala:785) org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source) org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69) org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69) org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:203) org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:202) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463) org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) org.apache.spark.scheduler.Task.run(Task.scala:85) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
[jira] [Updated] (SPARK-16826) java.util.Hashtable limits the throughput of PARSE_URL()
[ https://issues.apache.org/jira/browse/SPARK-16826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sylvain Zimmer updated SPARK-16826: --- Description: Hello! I'm using {{c4.8xlarge}} instances on EC2 with 36 cores and doing lots of {{parse_url(url, "host")}} in Spark SQL. Unfortunately it seems that there is an internal thread-safe cache in there, and the instances end up being 90% idle. When I view the thread dump for my executors, most of the 36 cores are in status "BLOCKED", in that stage: {code} java.util.Hashtable.get(Hashtable.java:362) java.net.URL.getURLStreamHandler(URL.java:1135) java.net.URL.(URL.java:599) java.net.URL.(URL.java:490) java.net.URL.(URL.java:439) org.apache.spark.sql.catalyst.expressions.ParseUrl.getUrl(stringExpressions.scala:731) org.apache.spark.sql.catalyst.expressions.ParseUrl.parseUrlWithoutKey(stringExpressions.scala:772) org.apache.spark.sql.catalyst.expressions.ParseUrl.eval(stringExpressions.scala:785) org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source) org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69) org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69) org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:203) org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:202) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463) org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) org.apache.spark.scheduler.Task.run(Task.scala:85) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) {code} However, when I switch from 1 executor with 36 cores to 9 executors with 4 cores, throughput is almost 10x higher and the CPUs are back at ~100% use. Thanks! was: Hello! I'm using {c4.8xlarge} instances on EC2 with 36 cores and doing lots of {parse_url(url, "host")} in Spark SQL. Unfortunately it seems that there is an internal thread-safe cache in there, and the instances end up being 90% idle. When I view the thread dump for my executors, most of the 36 cores are in status "BLOCKED", in that stage: {code} java.util.Hashtable.get(Hashtable.java:362) java.net.URL.getURLStreamHandler(URL.java:1135) java.net.URL.(URL.java:599) java.net.URL.(URL.java:490) java.net.URL.(URL.java:439) org.apache.spark.sql.catalyst.expressions.ParseUrl.getUrl(stringExpressions.scala:731) org.apache.spark.sql.catalyst.expressions.ParseUrl.parseUrlWithoutKey(stringExpressions.scala:772) org.apache.spark.sql.catalyst.expressions.ParseUrl.eval(stringExpressions.scala:785) org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source) org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69) org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69) org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:203) org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:202) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463) org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) org.apache.spark.scheduler.Task.run(Task.scala:85) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
[jira] [Created] (SPARK-16826) java.util.Hashtable limits the throughput of PARSE_URL()
Sylvain Zimmer created SPARK-16826: -- Summary: java.util.Hashtable limits the throughput of PARSE_URL() Key: SPARK-16826 URL: https://issues.apache.org/jira/browse/SPARK-16826 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.0.0 Reporter: Sylvain Zimmer Hello! I'm using {c4.8xlarge} instances on EC2 with 36 cores and doing lots of {parse_url(url, "host")} in Spark SQL. Unfortunately it seems that there is an internal thread-safe cache in there, and the instances end up being 90% idle. When I view the thread dump for my executors, most of the 36 cores are in status "BLOCKED", in that stage: {code} java.util.Hashtable.get(Hashtable.java:362) java.net.URL.getURLStreamHandler(URL.java:1135) java.net.URL.(URL.java:599) java.net.URL.(URL.java:490) java.net.URL.(URL.java:439) org.apache.spark.sql.catalyst.expressions.ParseUrl.getUrl(stringExpressions.scala:731) org.apache.spark.sql.catalyst.expressions.ParseUrl.parseUrlWithoutKey(stringExpressions.scala:772) org.apache.spark.sql.catalyst.expressions.ParseUrl.eval(stringExpressions.scala:785) org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source) org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69) org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:69) org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:203) org.apache.spark.sql.execution.FilterExec$$anonfun$17$$anonfun$apply$2.apply(basicPhysicalOperators.scala:202) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463) org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) org.apache.spark.scheduler.Task.run(Task.scala:85) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) {code} However, when I switch from 1 executor with 36 cores to 9 executors with 4 cores, throughput is almost 10x higher and the CPUs are back at ~100% use. Thanks! -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16802) joins.LongToUnsafeRowMap crashes with ArrayIndexOutOfBoundsException
[ https://issues.apache.org/jira/browse/SPARK-16802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15400260#comment-15400260 ] Sylvain Zimmer commented on SPARK-16802: Maybe useful for others: an ugly workaround to avoid this code path is to cast the join as string and do something like this instead: {code} SELECT df1.id1 FROM df1 LEFT OUTER JOIN df2 ON cast(df1.id1 as string) = cast(df2.id2 as string) {code} > joins.LongToUnsafeRowMap crashes with ArrayIndexOutOfBoundsException > > > Key: SPARK-16802 > URL: https://issues.apache.org/jira/browse/SPARK-16802 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.0.1 >Reporter: Sylvain Zimmer > > Hello! > This is a little similar to > [SPARK-16740|https://issues.apache.org/jira/browse/SPARK-16740] (should I > have reopened it?). > I would recommend to give another full review to {{HashedRelation.scala}}, > particularly the new {{LongToUnsafeRowMap}} code. I've had a few other errors > that I haven't managed to reproduce so far, as well as what I suspect could > be memory leaks (I have a query in a loop OOMing after a few iterations > despite not caching its results). > Here is the script to reproduce the ArrayIndexOutOfBoundsException on the > current 2.0 branch: > {code} > import os > import random > from pyspark import SparkContext > from pyspark.sql import types as SparkTypes > from pyspark.sql import SQLContext > sc = SparkContext() > sqlc = SQLContext(sc) > schema1 = SparkTypes.StructType([ > SparkTypes.StructField("id1", SparkTypes.LongType(), nullable=True) > ]) > schema2 = SparkTypes.StructType([ > SparkTypes.StructField("id2", SparkTypes.LongType(), nullable=True) > ]) > def randlong(): > return random.randint(-9223372036854775808, 9223372036854775807) > while True: > l1, l2 = randlong(), randlong() > # Sample values that crash: > # l1, l2 = 4661454128115150227, -5543241376386463808 > print "Testing with %s, %s" % (l1, l2) > data1 = [(l1, ), (l2, )] > data2 = [(l1, )] > df1 = sqlc.createDataFrame(sc.parallelize(data1), schema1) > df2 = sqlc.createDataFrame(sc.parallelize(data2), schema2) > crash = True > if crash: > os.system("rm -rf /tmp/sparkbug") > df1.write.parquet("/tmp/sparkbug/vertex") > df2.write.parquet("/tmp/sparkbug/edge") > df1 = sqlc.read.load("/tmp/sparkbug/vertex") > df2 = sqlc.read.load("/tmp/sparkbug/edge") > sqlc.registerDataFrameAsTable(df1, "df1") > sqlc.registerDataFrameAsTable(df2, "df2") > result_df = sqlc.sql(""" > SELECT > df1.id1 > FROM df1 > LEFT OUTER JOIN df2 ON df1.id1 = df2.id2 > """) > print result_df.collect() > {code} > {code} > java.lang.ArrayIndexOutOfBoundsException: 1728150825 > at > org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.getValue(HashedRelation.scala:463) > at > org.apache.spark.sql.execution.joins.LongHashedRelation.getValue(HashedRelation.scala:762) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:117) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at > org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) > at > org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:112) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) > at > org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:112) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) > at > org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:112) > at > org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:899) > at > org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:899) > at >
[jira] [Updated] (SPARK-16807) Optimize some ABS() statements
[ https://issues.apache.org/jira/browse/SPARK-16807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sylvain Zimmer updated SPARK-16807: --- Priority: Minor (was: Major) > Optimize some ABS() statements > -- > > Key: SPARK-16807 > URL: https://issues.apache.org/jira/browse/SPARK-16807 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Sylvain Zimmer >Priority: Minor > > I'm not a Catalyst expert, but I think some use cases for the ABS() function > could generate simpler code. > This is the code generated when doing something like {{ABS(x - y) > 0}} or > {{ABS(x - y) = 0}} in Spark SQL: > {code} > /* 267 */ float filter_value6 = -1.0f; > /* 268 */ filter_value6 = agg_value27 - agg_value32; > /* 269 */ float filter_value5 = -1.0f; > /* 270 */ filter_value5 = (float)(java.lang.Math.abs(filter_value6)); > /* 271 */ > /* 272 */ boolean filter_value4 = false; > /* 273 */ filter_value4 = > org.apache.spark.util.Utils.nanSafeCompareFloats(filter_value5, 0.0f) > 0; > /* 274 */ if (!filter_value4) continue; > {code} > Maybe it could all be simplified to something like this? > {code} > filter_value4 = (agg_value27 != agg_value32) > {code} > (Of course you could write {{x != y}} directly in the SQL query, but the > {{0}} in my example could be a configurable threshold, not something you can > hardcode) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-16807) Optimize some ABS() statements
[ https://issues.apache.org/jira/browse/SPARK-16807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sylvain Zimmer updated SPARK-16807: --- Description: I'm not a Catalyst expert, but I think some use cases for the ABS() function could generate simpler code. This is the code generated when doing something like {{ABS(x - y) > 0}} or {{ABS(x - y) = 0}} in Spark SQL: {code} /* 267 */ float filter_value6 = -1.0f; /* 268 */ filter_value6 = agg_value27 - agg_value32; /* 269 */ float filter_value5 = -1.0f; /* 270 */ filter_value5 = (float)(java.lang.Math.abs(filter_value6)); /* 271 */ /* 272 */ boolean filter_value4 = false; /* 273 */ filter_value4 = org.apache.spark.util.Utils.nanSafeCompareFloats(filter_value5, 0.0f) > 0; /* 274 */ if (!filter_value4) continue; {code} Maybe it could all be simplified to something like this? {code} filter_value4 = (agg_value27 != agg_value32) {code} (Of course you could write {{x != y}} directly in the SQL query, but the {{0}} in my example could be a configurable threshold, not something you can hardcode) was: I'm not a Catalyst expert, but I think some use cases for the ABS() function could generate simpler code. This is the code generated when doing something like {{ABS(x - y) > 0}} or {{ABS(x - y) = 0}} in Spark SQL: {code} /* 267 */ float filter_value6 = -1.0f; /* 268 */ filter_value6 = agg_value27 - agg_value32; /* 269 */ float filter_value5 = -1.0f; /* 270 */ filter_value5 = (float)(java.lang.Math.abs(filter_value6)); /* 271 */ /* 272 */ boolean filter_value4 = false; /* 273 */ filter_value4 = org.apache.spark.util.Utils.nanSafeCompareFloats(filter_value5, 0.0f) > 0; /* 274 */ if (!filter_value4) continue; {code} Maybe it could all be simplified to something like this? {code} filter_value4 = (agg_value27 != agg_value32) {code} > Optimize some ABS() statements > -- > > Key: SPARK-16807 > URL: https://issues.apache.org/jira/browse/SPARK-16807 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Sylvain Zimmer > > I'm not a Catalyst expert, but I think some use cases for the ABS() function > could generate simpler code. > This is the code generated when doing something like {{ABS(x - y) > 0}} or > {{ABS(x - y) = 0}} in Spark SQL: > {code} > /* 267 */ float filter_value6 = -1.0f; > /* 268 */ filter_value6 = agg_value27 - agg_value32; > /* 269 */ float filter_value5 = -1.0f; > /* 270 */ filter_value5 = (float)(java.lang.Math.abs(filter_value6)); > /* 271 */ > /* 272 */ boolean filter_value4 = false; > /* 273 */ filter_value4 = > org.apache.spark.util.Utils.nanSafeCompareFloats(filter_value5, 0.0f) > 0; > /* 274 */ if (!filter_value4) continue; > {code} > Maybe it could all be simplified to something like this? > {code} > filter_value4 = (agg_value27 != agg_value32) > {code} > (Of course you could write {{x != y}} directly in the SQL query, but the > {{0}} in my example could be a configurable threshold, not something you can > hardcode) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-16807) Optimize some ABS() statements
Sylvain Zimmer created SPARK-16807: -- Summary: Optimize some ABS() statements Key: SPARK-16807 URL: https://issues.apache.org/jira/browse/SPARK-16807 Project: Spark Issue Type: Improvement Components: SQL Reporter: Sylvain Zimmer I'm not a Catalyst expert, but I think some use cases for the ABS() function could generate simpler code. This is the code generated when doing something like {{ABS(x - y) > 0}} or {{ABS(x - y) = 0}} in Spark SQL: {code} /* 267 */ float filter_value6 = -1.0f; /* 268 */ filter_value6 = agg_value27 - agg_value32; /* 269 */ float filter_value5 = -1.0f; /* 270 */ filter_value5 = (float)(java.lang.Math.abs(filter_value6)); /* 271 */ /* 272 */ boolean filter_value4 = false; /* 273 */ filter_value4 = org.apache.spark.util.Utils.nanSafeCompareFloats(filter_value5, 0.0f) > 0; /* 274 */ if (!filter_value4) continue; {code} Maybe it could all be simplified to something like this? {code} filter_value4 = (agg_value27 != agg_value32) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-16802) joins.LongToUnsafeRowMap crashes with ArrayIndexOutOfBoundsException
[ https://issues.apache.org/jira/browse/SPARK-16802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sylvain Zimmer updated SPARK-16802: --- Description: Hello! This is a little similar to [SPARK-16740|https://issues.apache.org/jira/browse/SPARK-16740] (should I have reopened it?). I would recommend to give another full review to {{HashedRelation.scala}}, particularly the new {{LongToUnsafeRowMap}} code. I've had a few other errors that I haven't managed to reproduce so far, as well as what I suspect could be memory leaks (I have a query in a loop OOMing after a few iterations despite not caching its results). Here is the script to reproduce the ArrayIndexOutOfBoundsException on the current 2.0 branch: {code} import os import random from pyspark import SparkContext from pyspark.sql import types as SparkTypes from pyspark.sql import SQLContext sc = SparkContext() sqlc = SQLContext(sc) schema1 = SparkTypes.StructType([ SparkTypes.StructField("id1", SparkTypes.LongType(), nullable=True) ]) schema2 = SparkTypes.StructType([ SparkTypes.StructField("id2", SparkTypes.LongType(), nullable=True) ]) def randlong(): return random.randint(-9223372036854775808, 9223372036854775807) while True: l1, l2 = randlong(), randlong() # Sample values that crash: # l1, l2 = 4661454128115150227, -5543241376386463808 print "Testing with %s, %s" % (l1, l2) data1 = [(l1, ), (l2, )] data2 = [(l1, )] df1 = sqlc.createDataFrame(sc.parallelize(data1), schema1) df2 = sqlc.createDataFrame(sc.parallelize(data2), schema2) crash = True if crash: os.system("rm -rf /tmp/sparkbug") df1.write.parquet("/tmp/sparkbug/vertex") df2.write.parquet("/tmp/sparkbug/edge") df1 = sqlc.read.load("/tmp/sparkbug/vertex") df2 = sqlc.read.load("/tmp/sparkbug/edge") sqlc.registerDataFrameAsTable(df1, "df1") sqlc.registerDataFrameAsTable(df2, "df2") result_df = sqlc.sql(""" SELECT df1.id1 FROM df1 LEFT OUTER JOIN df2 ON df1.id1 = df2.id2 """) print result_df.collect() {code} {code} java.lang.ArrayIndexOutOfBoundsException: 1728150825 at org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.getValue(HashedRelation.scala:463) at org.apache.spark.sql.execution.joins.LongHashedRelation.getValue(HashedRelation.scala:762) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:117) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:112) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:112) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:112) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:899) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:899) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1898) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1898) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/07/29 20:19:00 WARN TaskSetManager: Lost task 0.0 in stage 17.0 (TID 50, localhost): java.lang.ArrayIndexOutOfBoundsException: 1728150825 at org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.getValue(HashedRelation.scala:463)
[jira] [Updated] (SPARK-16802) joins.LongToUnsafeRowMap crashes with ArrayIndexOutOfBoundsException
[ https://issues.apache.org/jira/browse/SPARK-16802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sylvain Zimmer updated SPARK-16802: --- Description: Hello! This is a little similar to [SPARK-16740|https://issues.apache.org/jira/browse/SPARK-16740] (should I have reopened it?). I would recommend to give another full review to {{HashedRelation.scala}}, particularly the new {{LongToUnsafeRowMap}} code. I've had a few other errors that I haven't managed to reproduce so far, as well as what I suspect could be memory leaks (I have a query in a loop OOMing after a few iterations despite not caching its results). Here is the script to reproduce the ArrayIndexOutOfBoundsException on the current 2.0 branch: {code} import os import random from pyspark import SparkContext from pyspark.sql import types as SparkTypes from pyspark.sql import SQLContext sc = SparkContext() sqlc = SQLContext(sc) schema1 = SparkTypes.StructType([ SparkTypes.StructField("id1", SparkTypes.LongType(), nullable=True), # SparkTypes.StructField("weight1", SparkTypes.FloatType(), nullable=True) ]) schema2 = SparkTypes.StructType([ SparkTypes.StructField("id2", SparkTypes.LongType(), nullable=True), # SparkTypes.StructField("weight2", SparkTypes.LongType(), nullable=True) ]) def randlong(): return random.randint(-9223372036854775808, 9223372036854775807) while True: l1, l2 = randlong(), randlong() # Sample values that crash: # l1, l2 = 4661454128115150227, -5543241376386463808 print "Testing with %s, %s" % (l1, l2) data1 = [(l1, ), (l2, )] data2 = [(l1, )] df1 = sqlc.createDataFrame(sc.parallelize(data1), schema1) df2 = sqlc.createDataFrame(sc.parallelize(data2), schema2) crash = True if crash: os.system("rm -rf /tmp/sparkbug") df1.write.parquet("/tmp/sparkbug/vertex") df2.write.parquet("/tmp/sparkbug/edge") df1 = sqlc.read.load("/tmp/sparkbug/vertex") df2 = sqlc.read.load("/tmp/sparkbug/edge") sqlc.registerDataFrameAsTable(df1, "df1") sqlc.registerDataFrameAsTable(df2, "df2") result_df = sqlc.sql(""" SELECT df1.id1 FROM df1 LEFT OUTER JOIN df2 ON df1.id1 = df2.id2 """) print result_df.collect() {code} {code} java.lang.ArrayIndexOutOfBoundsException: 1728150825 at org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.getValue(HashedRelation.scala:463) at org.apache.spark.sql.execution.joins.LongHashedRelation.getValue(HashedRelation.scala:762) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:117) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:112) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:112) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:112) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:899) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:899) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1898) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1898) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/07/29 20:19:00 WARN TaskSetManager: Lost task 0.0 in stage 17.0 (TID 50, localhost):
[jira] [Created] (SPARK-16802) joins.LongToUnsafeRowMap crashes with ArrayIndexOutOfBoundsException
Sylvain Zimmer created SPARK-16802: -- Summary: joins.LongToUnsafeRowMap crashes with ArrayIndexOutOfBoundsException Key: SPARK-16802 URL: https://issues.apache.org/jira/browse/SPARK-16802 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.0.0, 2.0.1 Reporter: Sylvain Zimmer Hello! This is a little similar to [SPARK-16740|https://issues.apache.org/jira/browse/SPARK-16740] (should I have reopened it?). I would recommend to give another full review to {{HashedRelation.scala}}, particularly the new {{LongToUnsafeRowMap}} code. I've had a few other errors that I haven't managed to reproduce so far, as well as what I suspect could be memory leaks (I have a query in a loop OOMing after a few iterations despite not caching its results). Here is the script to reproduce the ArrayIndexOutOfBoundsException on the current 2.0 branch: {{code}} import os import random from pyspark import SparkContext from pyspark.sql import types as SparkTypes from pyspark.sql import SQLContext sc = SparkContext() sqlc = SQLContext(sc) schema1 = SparkTypes.StructType([ SparkTypes.StructField("id1", SparkTypes.LongType(), nullable=True), # SparkTypes.StructField("weight1", SparkTypes.FloatType(), nullable=True) ]) schema2 = SparkTypes.StructType([ SparkTypes.StructField("id2", SparkTypes.LongType(), nullable=True), # SparkTypes.StructField("weight2", SparkTypes.LongType(), nullable=True) ]) def randlong(): return random.randint(-9223372036854775808, 9223372036854775807) while True: l1, l2 = randlong(), randlong() # Sample values that crash: # l1, l2 = 4661454128115150227, -5543241376386463808 print "Testing with %s, %s" % (l1, l2) data1 = [(l1, ), (l2, )] data2 = [(l1, )] df1 = sqlc.createDataFrame(sc.parallelize(data1), schema1) df2 = sqlc.createDataFrame(sc.parallelize(data2), schema2) crash = True if crash: os.system("rm -rf /tmp/sparkbug") df1.write.parquet("/tmp/sparkbug/vertex") df2.write.parquet("/tmp/sparkbug/edge") df1 = sqlc.read.load("/tmp/sparkbug/vertex") df2 = sqlc.read.load("/tmp/sparkbug/edge") sqlc.registerDataFrameAsTable(df1, "df1") sqlc.registerDataFrameAsTable(df2, "df2") result_df = sqlc.sql(""" SELECT df1.id1 FROM df1 LEFT OUTER JOIN df2 ON df1.id1 = df2.id2 """) print result_df.collect() {{code}} {{code}} java.lang.ArrayIndexOutOfBoundsException: 1728150825 at org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.getValue(HashedRelation.scala:463) at org.apache.spark.sql.execution.joins.LongHashedRelation.getValue(HashedRelation.scala:762) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:117) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:112) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:112) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:112) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:899) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:899) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1898) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1898) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at
[jira] [Commented] (SPARK-16740) joins.LongToUnsafeRowMap crashes with NegativeArraySizeException
[ https://issues.apache.org/jira/browse/SPARK-16740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15394644#comment-15394644 ] Sylvain Zimmer commented on SPARK-16740: OK! Looks like that would be [~davies] :-) > joins.LongToUnsafeRowMap crashes with NegativeArraySizeException > > > Key: SPARK-16740 > URL: https://issues.apache.org/jira/browse/SPARK-16740 > Project: Spark > Issue Type: Bug > Components: PySpark, Spark Core, SQL >Affects Versions: 2.0.0 >Reporter: Sylvain Zimmer > > Hello, > Here is a crash in Spark SQL joins, with a minimal reproducible test case. > Interestingly, it only seems to happen when reading Parquet data (I added a > {{crash = True}} variable to show it) > This is an {{left_outer}} example, but it also crashes with a regular > {{inner}} join. > {code} > import os > from pyspark import SparkContext > from pyspark.sql import types as SparkTypes > from pyspark.sql import SQLContext > sc = SparkContext() > sqlc = SQLContext(sc) > schema1 = SparkTypes.StructType([ > SparkTypes.StructField("id1", SparkTypes.LongType(), nullable=True) > ]) > schema2 = SparkTypes.StructType([ > SparkTypes.StructField("id2", SparkTypes.LongType(), nullable=True) > ]) > # Valid Long values (-9223372036854775808 < -5543241376386463808 , > 4661454128115150227 < 9223372036854775807) > data1 = [(4661454128115150227,), (-5543241376386463808,)] > data2 = [(650460285, )] > df1 = sqlc.createDataFrame(sc.parallelize(data1), schema1) > df2 = sqlc.createDataFrame(sc.parallelize(data2), schema2) > crash = True > if crash: > os.system("rm -rf /tmp/sparkbug") > df1.write.parquet("/tmp/sparkbug/vertex") > df2.write.parquet("/tmp/sparkbug/edge") > df1 = sqlc.read.load("/tmp/sparkbug/vertex") > df2 = sqlc.read.load("/tmp/sparkbug/edge") > result_df = df2.join(df1, on=(df1.id1 == df2.id2), how="left_outer") > # Should print [Row(id2=650460285, id1=None)] > print result_df.collect() > {code} > When ran with {{spark-submit}}, the final {{collect()}} call crashes with > this: > {code} > py4j.protocol.Py4JJavaError: An error occurred while calling > o61.collectToPython. > : org.apache.spark.SparkException: Exception thrown in awaitResult: > at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) > at > org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:120) > at > org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at > org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:124) > at > org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98) > at > org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:242) > at > org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:83) > at > org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153) > at > org.apache.spark.sql.execution.BatchedDataSourceScanExec.consume(ExistingRDD.scala:225) > at > org.apache.spark.sql.execution.BatchedDataSourceScanExec.doProduce(ExistingRDD.scala:328) > at > org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83) > at > org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at > org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78) > at > org.apache.spark.sql.execution.BatchedDataSourceScanExec.produce(ExistingRDD.scala:225) > at > org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:77) > at > org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83) > at >
[jira] [Commented] (SPARK-16740) joins.LongToUnsafeRowMap crashes with NegativeArraySizeException
[ https://issues.apache.org/jira/browse/SPARK-16740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15394628#comment-15394628 ] Sylvain Zimmer commented on SPARK-16740: Thanks! I just did. Let me know if that's okay. > joins.LongToUnsafeRowMap crashes with NegativeArraySizeException > > > Key: SPARK-16740 > URL: https://issues.apache.org/jira/browse/SPARK-16740 > Project: Spark > Issue Type: Bug > Components: PySpark, Spark Core, SQL >Affects Versions: 2.0.0 >Reporter: Sylvain Zimmer > > Hello, > Here is a crash in Spark SQL joins, with a minimal reproducible test case. > Interestingly, it only seems to happen when reading Parquet data (I added a > {{crash = True}} variable to show it) > This is an {{left_outer}} example, but it also crashes with a regular > {{inner}} join. > {code} > import os > from pyspark import SparkContext > from pyspark.sql import types as SparkTypes > from pyspark.sql import SQLContext > sc = SparkContext() > sqlc = SQLContext(sc) > schema1 = SparkTypes.StructType([ > SparkTypes.StructField("id1", SparkTypes.LongType(), nullable=True) > ]) > schema2 = SparkTypes.StructType([ > SparkTypes.StructField("id2", SparkTypes.LongType(), nullable=True) > ]) > # Valid Long values (-9223372036854775808 < -5543241376386463808 , > 4661454128115150227 < 9223372036854775807) > data1 = [(4661454128115150227,), (-5543241376386463808,)] > data2 = [(650460285, )] > df1 = sqlc.createDataFrame(sc.parallelize(data1), schema1) > df2 = sqlc.createDataFrame(sc.parallelize(data2), schema2) > crash = True > if crash: > os.system("rm -rf /tmp/sparkbug") > df1.write.parquet("/tmp/sparkbug/vertex") > df2.write.parquet("/tmp/sparkbug/edge") > df1 = sqlc.read.load("/tmp/sparkbug/vertex") > df2 = sqlc.read.load("/tmp/sparkbug/edge") > result_df = df2.join(df1, on=(df1.id1 == df2.id2), how="left_outer") > # Should print [Row(id2=650460285, id1=None)] > print result_df.collect() > {code} > When ran with {{spark-submit}}, the final {{collect()}} call crashes with > this: > {code} > py4j.protocol.Py4JJavaError: An error occurred while calling > o61.collectToPython. > : org.apache.spark.SparkException: Exception thrown in awaitResult: > at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) > at > org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:120) > at > org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at > org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:124) > at > org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98) > at > org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:242) > at > org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:83) > at > org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153) > at > org.apache.spark.sql.execution.BatchedDataSourceScanExec.consume(ExistingRDD.scala:225) > at > org.apache.spark.sql.execution.BatchedDataSourceScanExec.doProduce(ExistingRDD.scala:328) > at > org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83) > at > org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at > org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78) > at > org.apache.spark.sql.execution.BatchedDataSourceScanExec.produce(ExistingRDD.scala:225) > at > org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:77) > at > org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83) > at >
[jira] [Comment Edited] (SPARK-16740) joins.LongToUnsafeRowMap crashes with NegativeArraySizeException
[ https://issues.apache.org/jira/browse/SPARK-16740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15394437#comment-15394437 ] Sylvain Zimmer edited comment on SPARK-16740 at 7/26/16 7:54 PM: - I'm not a Scala expert but from a quick review of the code, it appears that it's easy to overflow the {{range}} variable in {{optimize()}}: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L608 In my example, that would yield {code} scala> val range = 4661454128115150227L - (-5543241376386463808L) range: Long = -8242048569207937581 {code} Maybe we should add {{range >= 0}} as a condition of doing that optimization? was (Author: sylvinus): I'm not a Scala expert but from a quick review of the code, it appears that it's easy to overflow the {{range}} variable in {{optimize()}}: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L608 In my example, that would yield {{ scala> val range = 4661454128115150227L - (-5543241376386463808L) range: Long = -8242048569207937581 }} Maybe we should add {{range >= 0}} as a condition of doing that optimization? > joins.LongToUnsafeRowMap crashes with NegativeArraySizeException > > > Key: SPARK-16740 > URL: https://issues.apache.org/jira/browse/SPARK-16740 > Project: Spark > Issue Type: Bug > Components: PySpark, Spark Core, SQL >Affects Versions: 2.0.0 >Reporter: Sylvain Zimmer > > Hello, > Here is a crash in Spark SQL joins, with a minimal reproducible test case. > Interestingly, it only seems to happen when reading Parquet data (I added a > {{crash = True}} variable to show it) > This is an {{left_outer}} example, but it also crashes with a regular > {{inner}} join. > {code} > import os > from pyspark import SparkContext > from pyspark.sql import types as SparkTypes > from pyspark.sql import SQLContext > sc = SparkContext() > sqlc = SQLContext(sc) > schema1 = SparkTypes.StructType([ > SparkTypes.StructField("id1", SparkTypes.LongType(), nullable=True) > ]) > schema2 = SparkTypes.StructType([ > SparkTypes.StructField("id2", SparkTypes.LongType(), nullable=True) > ]) > # Valid Long values (-9223372036854775808 < -5543241376386463808 , > 4661454128115150227 < 9223372036854775807) > data1 = [(4661454128115150227,), (-5543241376386463808,)] > data2 = [(650460285, )] > df1 = sqlc.createDataFrame(sc.parallelize(data1), schema1) > df2 = sqlc.createDataFrame(sc.parallelize(data2), schema2) > crash = True > if crash: > os.system("rm -rf /tmp/sparkbug") > df1.write.parquet("/tmp/sparkbug/vertex") > df2.write.parquet("/tmp/sparkbug/edge") > df1 = sqlc.read.load("/tmp/sparkbug/vertex") > df2 = sqlc.read.load("/tmp/sparkbug/edge") > result_df = df2.join(df1, on=(df1.id1 == df2.id2), how="left_outer") > # Should print [Row(id2=650460285, id1=None)] > print result_df.collect() > {code} > When ran with {{spark-submit}}, the final {{collect()}} call crashes with > this: > {code} > py4j.protocol.Py4JJavaError: An error occurred while calling > o61.collectToPython. > : org.apache.spark.SparkException: Exception thrown in awaitResult: > at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) > at > org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:120) > at > org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at > org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:124) > at > org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98) > at > org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:242) > at > org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:83) > at > org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153) > at > org.apache.spark.sql.execution.BatchedDataSourceScanExec.consume(ExistingRDD.scala:225) > at >
[jira] [Commented] (SPARK-16740) joins.LongToUnsafeRowMap crashes with NegativeArraySizeException
[ https://issues.apache.org/jira/browse/SPARK-16740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15394437#comment-15394437 ] Sylvain Zimmer commented on SPARK-16740: I'm not a Scala expert but from a quick review of the code, it appears that it's easy to overflow the {{range}} variable in {{optimize()}}: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L608 In my example, that would yield {{ scala> val range = 4661454128115150227L - (-5543241376386463808L) range: Long = -8242048569207937581 }} Maybe we should add {{range >= 0}} as a condition of doing that optimization? > joins.LongToUnsafeRowMap crashes with NegativeArraySizeException > > > Key: SPARK-16740 > URL: https://issues.apache.org/jira/browse/SPARK-16740 > Project: Spark > Issue Type: Bug > Components: PySpark, Spark Core, SQL >Affects Versions: 2.0.0 >Reporter: Sylvain Zimmer > > Hello, > Here is a crash in Spark SQL joins, with a minimal reproducible test case. > Interestingly, it only seems to happen when reading Parquet data (I added a > {{crash = True}} variable to show it) > This is an {{left_outer}} example, but it also crashes with a regular > {{inner}} join. > {code} > import os > from pyspark import SparkContext > from pyspark.sql import types as SparkTypes > from pyspark.sql import SQLContext > sc = SparkContext() > sqlc = SQLContext(sc) > schema1 = SparkTypes.StructType([ > SparkTypes.StructField("id1", SparkTypes.LongType(), nullable=True) > ]) > schema2 = SparkTypes.StructType([ > SparkTypes.StructField("id2", SparkTypes.LongType(), nullable=True) > ]) > # Valid Long values (-9223372036854775808 < -5543241376386463808 , > 4661454128115150227 < 9223372036854775807) > data1 = [(4661454128115150227,), (-5543241376386463808,)] > data2 = [(650460285, )] > df1 = sqlc.createDataFrame(sc.parallelize(data1), schema1) > df2 = sqlc.createDataFrame(sc.parallelize(data2), schema2) > crash = True > if crash: > os.system("rm -rf /tmp/sparkbug") > df1.write.parquet("/tmp/sparkbug/vertex") > df2.write.parquet("/tmp/sparkbug/edge") > df1 = sqlc.read.load("/tmp/sparkbug/vertex") > df2 = sqlc.read.load("/tmp/sparkbug/edge") > result_df = df2.join(df1, on=(df1.id1 == df2.id2), how="left_outer") > # Should print [Row(id2=650460285, id1=None)] > print result_df.collect() > {code} > When ran with {{spark-submit}}, the final {{collect()}} call crashes with > this: > {code} > py4j.protocol.Py4JJavaError: An error occurred while calling > o61.collectToPython. > : org.apache.spark.SparkException: Exception thrown in awaitResult: > at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) > at > org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:120) > at > org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at > org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:124) > at > org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98) > at > org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:242) > at > org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:83) > at > org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153) > at > org.apache.spark.sql.execution.BatchedDataSourceScanExec.consume(ExistingRDD.scala:225) > at > org.apache.spark.sql.execution.BatchedDataSourceScanExec.doProduce(ExistingRDD.scala:328) > at > org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83) > at > org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at >
[jira] [Created] (SPARK-16740) joins.LongToUnsafeRowMap crashes with NegativeArraySizeException
Sylvain Zimmer created SPARK-16740: -- Summary: joins.LongToUnsafeRowMap crashes with NegativeArraySizeException Key: SPARK-16740 URL: https://issues.apache.org/jira/browse/SPARK-16740 Project: Spark Issue Type: Bug Components: PySpark, Spark Core, SQL Affects Versions: 2.0.0 Reporter: Sylvain Zimmer Hello, Here is a crash in Spark SQL joins, with a minimal reproducible test case. Interestingly, it only seems to happen when reading Parquet data (I added a {{crash = True}} variable to show it) This is an {{left_outer}} example, but it also crashes with a regular {{inner}} join. {code} import os from pyspark import SparkContext from pyspark.sql import types as SparkTypes from pyspark.sql import SQLContext sc = SparkContext() sqlc = SQLContext(sc) schema1 = SparkTypes.StructType([ SparkTypes.StructField("id1", SparkTypes.LongType(), nullable=True) ]) schema2 = SparkTypes.StructType([ SparkTypes.StructField("id2", SparkTypes.LongType(), nullable=True) ]) # Valid Long values (-9223372036854775808 < -5543241376386463808 , 4661454128115150227 < 9223372036854775807) data1 = [(4661454128115150227,), (-5543241376386463808,)] data2 = [(650460285, )] df1 = sqlc.createDataFrame(sc.parallelize(data1), schema1) df2 = sqlc.createDataFrame(sc.parallelize(data2), schema2) crash = True if crash: os.system("rm -rf /tmp/sparkbug") df1.write.parquet("/tmp/sparkbug/vertex") df2.write.parquet("/tmp/sparkbug/edge") df1 = sqlc.read.load("/tmp/sparkbug/vertex") df2 = sqlc.read.load("/tmp/sparkbug/edge") result_df = df2.join(df1, on=(df1.id1 == df2.id2), how="left_outer") # Should print [Row(id2=650460285, id1=None)] print result_df.collect() {code} When ran with {{spark-submit}}, the final {{collect()}} call crashes with this: {code} py4j.protocol.Py4JJavaError: An error occurred while calling o61.collectToPython. : org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:120) at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:124) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:242) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:83) at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153) at org.apache.spark.sql.execution.BatchedDataSourceScanExec.consume(ExistingRDD.scala:225) at org.apache.spark.sql.execution.BatchedDataSourceScanExec.doProduce(ExistingRDD.scala:328) at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83) at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78) at org.apache.spark.sql.execution.BatchedDataSourceScanExec.produce(ExistingRDD.scala:225) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:77) at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83) at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) at
[jira] [Commented] (SPARK-16700) StructType doesn't accept Python dicts anymore
[ https://issues.apache.org/jira/browse/SPARK-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15392200#comment-15392200 ] Sylvain Zimmer commented on SPARK-16700: I dug into this a bit more: {{_verify_type({}, struct_schema)}} was already raising a similar exception in Spark 1.6.2, however schema validation wasn't being enforced at all by {{createDataFrame}} : https://github.com/apache/spark/blob/branch-1.6/python/pyspark/sql/context.py#L418 In 2.0.0, it seems that it is done over each row of the data: https://github.com/apache/spark/blob/master/python/pyspark/sql/session.py#L504 I think there are 2 issues that should be fixed here: - {{_verify_type({}, struct_schema)}} shouldn't raise, because as far as I can tell dicts behave as expected and have their items correctly mapped as struct fields. - There should be a way to go back to 1.6.x-like behaviour and disable schema verification in {{createDataFrame}}. The {{prepare()}} function is being map()'d over all the data coming from Python, which I think will definitely hurt performance for large datasets and complex schemas. Leaving it on by default but adding a flag to disable it would be a good solution. Without this users will probably have to implement their own {{createDataFrame}} function like I did. > StructType doesn't accept Python dicts anymore > -- > > Key: SPARK-16700 > URL: https://issues.apache.org/jira/browse/SPARK-16700 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.0.0 >Reporter: Sylvain Zimmer > > Hello, > I found this issue while testing my codebase with 2.0.0-rc5 > StructType in Spark 1.6.2 accepts the Python type, which is very > handy. 2.0.0-rc5 does not and throws an error. > I don't know if this was intended but I'd advocate for this behaviour to > remain the same. MapType is probably wasteful when your key names never > change and switching to Python tuples would be cumbersome. > Here is a minimal script to reproduce the issue: > {code} > from pyspark import SparkContext > from pyspark.sql import types as SparkTypes > from pyspark.sql import SQLContext > sc = SparkContext() > sqlc = SQLContext(sc) > struct_schema = SparkTypes.StructType([ > SparkTypes.StructField("id", SparkTypes.LongType()) > ]) > rdd = sc.parallelize([{"id": 0}, {"id": 1}]) > df = sqlc.createDataFrame(rdd, struct_schema) > print df.collect() > # 1.6.2 prints [Row(id=0), Row(id=1)] > # 2.0.0-rc5 raises TypeError: StructType can not accept object {'id': 0} in > type > {code} > Thanks! -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-16700) StructType doesn't accept Python dicts anymore
[ https://issues.apache.org/jira/browse/SPARK-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sylvain Zimmer updated SPARK-16700: --- Component/s: (was: Spark Core) > StructType doesn't accept Python dicts anymore > -- > > Key: SPARK-16700 > URL: https://issues.apache.org/jira/browse/SPARK-16700 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.0.0 >Reporter: Sylvain Zimmer > > Hello, > I found this issue while testing my codebase with 2.0.0-rc5 > StructType in Spark 1.6.2 accepts the Python type, which is very > handy. 2.0.0-rc5 does not and throws an error. > I don't know if this was intended but I'd advocate for this behaviour to > remain the same. MapType is probably wasteful when your key names never > change and switching to Python tuples would be cumbersome. > Here is a minimal script to reproduce the issue: > {code} > from pyspark import SparkContext > from pyspark.sql import types as SparkTypes > from pyspark.sql import SQLContext > sc = SparkContext() > sqlc = SQLContext(sc) > struct_schema = SparkTypes.StructType([ > SparkTypes.StructField("id", SparkTypes.LongType()) > ]) > rdd = sc.parallelize([{"id": 0}, {"id": 1}]) > df = sqlc.createDataFrame(rdd, struct_schema) > print df.collect() > # 1.6.2 prints [Row(id=0), Row(id=1)] > # 2.0.0-rc5 raises TypeError: StructType can not accept object {'id': 0} in > type > {code} > Thanks! -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-16700) StructType doesn't accept Python dicts anymore
[ https://issues.apache.org/jira/browse/SPARK-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sylvain Zimmer updated SPARK-16700: --- Description: Hello, I found this issue while testing my codebase with 2.0.0-rc5 StructType in Spark 1.6.2 accepts the Python type, which is very handy. 2.0.0-rc5 does not and throws an error. I don't know if this was intended but I'd advocate for this behaviour to remain the same. MapType is probably wasteful when your key names never change and switching to Python tuples would be cumbersome. Here is a minimal script to reproduce the issue: {code} from pyspark import SparkContext from pyspark.sql import types as SparkTypes from pyspark.sql import SQLContext sc = SparkContext() sqlc = SQLContext(sc) struct_schema = SparkTypes.StructType([ SparkTypes.StructField("id", SparkTypes.LongType()) ]) rdd = sc.parallelize([{"id": 0}, {"id": 1}]) df = sqlc.createDataFrame(rdd, struct_schema) print df.collect() # 1.6.2 prints [Row(id=0), Row(id=1)] # 2.0.0-rc5 raises TypeError: StructType can not accept object {'id': 0} in type {code} Thanks! was: Hello, I found this issue while testing my codebase with 2.0.0-rc5 StructType in Spark 1.6.2 accepts the Python type, which is very handy. 2.0.0-rc5 does not and throws an error. I don't know if this was intended but I'd advocate for this behaviour to remain the same. MapType is probably wasteful when your key names never change and switching to Python tuples would be cumbersome. Here is a minimal script to reproduce the issue: {code:python} from pyspark import SparkContext from pyspark.sql import types as SparkTypes from pyspark.sql import SQLContext sc = SparkContext() sqlc = SQLContext(sc) struct_schema = SparkTypes.StructType([ SparkTypes.StructField("id", SparkTypes.LongType()) ]) rdd = sc.parallelize([{"id": 0}, {"id": 1}]) df = sqlc.createDataFrame(rdd, struct_schema) print df.collect() # 1.6.2 prints [Row(id=0), Row(id=1)] # 2.0.0-rc5 raises TypeError: StructType can not accept object {'id': 0} in type {code} Thanks! > StructType doesn't accept Python dicts anymore > -- > > Key: SPARK-16700 > URL: https://issues.apache.org/jira/browse/SPARK-16700 > Project: Spark > Issue Type: Bug > Components: PySpark, Spark Core >Affects Versions: 2.0.0 >Reporter: Sylvain Zimmer > > Hello, > I found this issue while testing my codebase with 2.0.0-rc5 > StructType in Spark 1.6.2 accepts the Python type, which is very > handy. 2.0.0-rc5 does not and throws an error. > I don't know if this was intended but I'd advocate for this behaviour to > remain the same. MapType is probably wasteful when your key names never > change and switching to Python tuples would be cumbersome. > Here is a minimal script to reproduce the issue: > {code} > from pyspark import SparkContext > from pyspark.sql import types as SparkTypes > from pyspark.sql import SQLContext > sc = SparkContext() > sqlc = SQLContext(sc) > struct_schema = SparkTypes.StructType([ > SparkTypes.StructField("id", SparkTypes.LongType()) > ]) > rdd = sc.parallelize([{"id": 0}, {"id": 1}]) > df = sqlc.createDataFrame(rdd, struct_schema) > print df.collect() > # 1.6.2 prints [Row(id=0), Row(id=1)] > # 2.0.0-rc5 raises TypeError: StructType can not accept object {'id': 0} in > type > {code} > Thanks! -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-16700) StructType doesn't accept Python dicts anymore
[ https://issues.apache.org/jira/browse/SPARK-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sylvain Zimmer updated SPARK-16700: --- Component/s: PySpark > StructType doesn't accept Python dicts anymore > -- > > Key: SPARK-16700 > URL: https://issues.apache.org/jira/browse/SPARK-16700 > Project: Spark > Issue Type: Bug > Components: PySpark, Spark Core >Affects Versions: 2.0.0 >Reporter: Sylvain Zimmer > > Hello, > I found this issue while testing my codebase with 2.0.0-rc5 > StructType in Spark 1.6.2 accepts the Python type, which is very > handy. 2.0.0-rc5 does not and throws an error. > I don't know if this was intended but I'd advocate for this behaviour to > remain the same. MapType is probably wasteful when your key names never > change and switching to Python tuples would be cumbersome. > Here is a minimal script to reproduce the issue: > {code} > from pyspark import SparkContext > from pyspark.sql import types as SparkTypes > from pyspark.sql import SQLContext > sc = SparkContext() > sqlc = SQLContext(sc) > struct_schema = SparkTypes.StructType([ > SparkTypes.StructField("id", SparkTypes.LongType()) > ]) > rdd = sc.parallelize([{"id": 0}, {"id": 1}]) > df = sqlc.createDataFrame(rdd, struct_schema) > print df.collect() > # 1.6.2 prints [Row(id=0), Row(id=1)] > # 2.0.0-rc5 raises TypeError: StructType can not accept object {'id': 0} in > type > {code} > Thanks! -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-16700) StructType doesn't accept Python dicts anymore
Sylvain Zimmer created SPARK-16700: -- Summary: StructType doesn't accept Python dicts anymore Key: SPARK-16700 URL: https://issues.apache.org/jira/browse/SPARK-16700 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.0.0 Reporter: Sylvain Zimmer Hello, I found this issue while testing my codebase with 2.0.0-rc5 StructType in Spark 1.6.2 accepts the Python type, which is very handy. 2.0.0-rc5 does not and throws an error. I don't know if this was intended but I'd advocate for this behaviour to remain the same. MapType is probably wasteful when your key names never change and switching to Python tuples would be cumbersome. Here is a minimal script to reproduce the issue: {code:python} from pyspark import SparkContext from pyspark.sql import types as SparkTypes from pyspark.sql import SQLContext sc = SparkContext() sqlc = SQLContext(sc) struct_schema = SparkTypes.StructType([ SparkTypes.StructField("id", SparkTypes.LongType()) ]) rdd = sc.parallelize([{"id": 0}, {"id": 1}]) df = sqlc.createDataFrame(rdd, struct_schema) print df.collect() # 1.6.2 prints [Row(id=0), Row(id=1)] # 2.0.0-rc5 raises TypeError: StructType can not accept object {'id': 0} in type {code} Thanks! -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org