[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-08-13 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16578763#comment-16578763
 ] 

Liang-Chi Hsieh commented on SPARK-24410:
-

The above code shows that the two tables in union results are located in 
logically different partitions, even you know they might be physically 
co-partitioned. So we can't just get rid of the shuffle and expect the correct 
results, because of `SparkContext.union`'s current implementation.

That is why cloud-fan suggested to implement Union with RDD.zip for some 
certain case, to preserve the children output partitioning.

Although we can make Union smarter on its output partitioning, from the 
discussion you can see we might need to also consider parallelism and locality.

> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  :- *(1) Project [key#25L]
>  :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
>  +- *(2) Project [key#28L]
> +- *(2) FileScan parquet default.a2[key#28L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22347) UDF is evaluated when 'F.when' condition is false

2018-08-13 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-22347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16578709#comment-16578709
 ] 

Liang-Chi Hsieh commented on SPARK-22347:
-

Agreed. Thanks [~rdblue]


> UDF is evaluated when 'F.when' condition is false
> -
>
> Key: SPARK-22347
> URL: https://issues.apache.org/jira/browse/SPARK-22347
> Project: Spark
>  Issue Type: Documentation
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Nicolas Porter
>Assignee: Liang-Chi Hsieh
>Priority: Minor
>
> Here's a simple example on how to reproduce this:
> {code}
> from pyspark.sql import functions as F, Row, types
> def Divide10():
> def fn(value): return 10 / int(value)
> return F.udf(fn, types.IntegerType())
> df = sc.parallelize([Row(x=5), Row(x=0)]).toDF()
> x = F.col('x')
> df2 = df.select(F.when((x > 0), Divide10()(x)))
> df2.show(200)
> {code}
> This raises a division by zero error, even if `F.when` is trying to filter 
> out all cases where `x <= 0`. I believe the correct behavior should be not to 
> evaluate the UDF when the `F.when` condition is false.
> Interestingly enough, when the `F.when` condition is set to `F.lit(False)`, 
> then the error is not raised and all rows resolve to `null`, which is the 
> expected result.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25080) NPE in HiveShim$.toCatalystDecimal(HiveShim.scala:110)

2018-08-11 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16577379#comment-16577379
 ] 

Liang-Chi Hsieh commented on SPARK-25080:
-

Did a quick test but can't reproduce that. Is it possibly that the table schema 
is not correct?

> NPE in HiveShim$.toCatalystDecimal(HiveShim.scala:110)
> --
>
> Key: SPARK-25080
> URL: https://issues.apache.org/jira/browse/SPARK-25080
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Affects Versions: 2.3.1
> Environment: AWS EMR
>Reporter: Andrew K Long
>Priority: Minor
>
> NPE while reading hive table.
>  
> ```
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 1190 in stage 392.0 failed 4 times, most recent failure: Lost task 
> 1190.3 in stage 392.0 (TID 122055, ip-172-31-32-196.ec2.internal, executor 
> 487): java.lang.NullPointerException
> at org.apache.spark.sql.hive.HiveShim$.toCatalystDecimal(HiveShim.scala:110)
> at 
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:414)
> at 
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:413)
> at 
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:442)
> at 
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:433)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:217)
> at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$2.apply(ShuffleExchangeExec.scala:294)
> at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$2.apply(ShuffleExchangeExec.scala:265)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:109)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>  
> Driver stacktrace:
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1753)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1741)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1740)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1740)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:871)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:871)
> at scala.Option.foreach(Option.scala:257)
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:871)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1974)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1923)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1912)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:682)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
> at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)
> ... 67 more
> Caused by: java.lang.NullPointerException
> at org.apache.spark.sql.hive.HiveShim$.toCatalystDecimal(HiveShim.scala:110)
> at 
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$

[jira] [Comment Edited] (SPARK-24152) SparkR CRAN feasibility check server problem

2018-08-11 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16577065#comment-16577065
 ] 

Liang-Chi Hsieh edited comment on SPARK-24152 at 8/11/18 9:23 PM:
--

CRAN sysadmin replied they fixed it now. Looks good locally. Will trigger test 
to verify.

Note: verified. It is fixed now.


was (Author: viirya):
CRAN sysadmin replied they fixed it now. Looks good locally. Will trigger test 
to verify.

> SparkR CRAN feasibility check server problem
> 
>
> Key: SPARK-24152
> URL: https://issues.apache.org/jira/browse/SPARK-24152
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 2.4.0
>Reporter: Dongjoon Hyun
>Assignee: Liang-Chi Hsieh
>Priority: Critical
>
> PR builder and master branch test fails with the following SparkR error with 
> unknown reason. The following is an error message from that.
> {code}
> * this is package 'SparkR' version '2.4.0'
> * checking CRAN incoming feasibility ...Error in 
> .check_package_CRAN_incoming(pkgdir) : 
>   dims [product 24] do not match the length of object [0]
> Execution halted
> {code}
> *PR BUILDER*
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90039/
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89983/
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89998/
> *MASTER BRANCH*
> - 
> https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/4458/
>  (Fail with no failures)
> This is critical because we already start to merge the PR by ignoring this 
> **known unkonwn** SparkR failure.
> - https://github.com/apache/spark/pull/21175



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24152) SparkR CRAN feasibility check server problem

2018-08-10 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16577065#comment-16577065
 ] 

Liang-Chi Hsieh commented on SPARK-24152:
-

CRAN sysadmin replied they fixed it now. Looks good locally. Will trigger test 
to verify.

> SparkR CRAN feasibility check server problem
> 
>
> Key: SPARK-24152
> URL: https://issues.apache.org/jira/browse/SPARK-24152
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 2.4.0
>Reporter: Dongjoon Hyun
>Assignee: Liang-Chi Hsieh
>Priority: Critical
>
> PR builder and master branch test fails with the following SparkR error with 
> unknown reason. The following is an error message from that.
> {code}
> * this is package 'SparkR' version '2.4.0'
> * checking CRAN incoming feasibility ...Error in 
> .check_package_CRAN_incoming(pkgdir) : 
>   dims [product 24] do not match the length of object [0]
> Execution halted
> {code}
> *PR BUILDER*
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90039/
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89983/
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89998/
> *MASTER BRANCH*
> - 
> https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/4458/
>  (Fail with no failures)
> This is critical because we already start to merge the PR by ignoring this 
> **known unkonwn** SparkR failure.
> - https://github.com/apache/spark/pull/21175



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24152) SparkR CRAN feasibility check server problem

2018-08-10 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16576862#comment-16576862
 ] 

Liang-Chi Hsieh commented on SPARK-24152:
-

I found retriggered test still failed. I found out the issue and sent email to 
CRAN for help.

> SparkR CRAN feasibility check server problem
> 
>
> Key: SPARK-24152
> URL: https://issues.apache.org/jira/browse/SPARK-24152
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 2.4.0
>Reporter: Dongjoon Hyun
>Assignee: Liang-Chi Hsieh
>Priority: Critical
>
> PR builder and master branch test fails with the following SparkR error with 
> unknown reason. The following is an error message from that.
> {code}
> * this is package 'SparkR' version '2.4.0'
> * checking CRAN incoming feasibility ...Error in 
> .check_package_CRAN_incoming(pkgdir) : 
>   dims [product 24] do not match the length of object [0]
> Execution halted
> {code}
> *PR BUILDER*
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90039/
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89983/
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89998/
> *MASTER BRANCH*
> - 
> https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/4458/
>  (Fail with no failures)
> This is critical because we already start to merge the PR by ignoring this 
> **known unkonwn** SparkR failure.
> - https://github.com/apache/spark/pull/21175



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24152) SparkR CRAN feasibility check server problem

2018-08-10 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16576559#comment-16576559
 ] 

Liang-Chi Hsieh edited comment on SPARK-24152 at 8/10/18 4:53 PM:
--

I checked locally. Seems fine, I don't see the error now. Wait for re-triggered 
tests to verify.


was (Author: viirya):
I checked locally. Seems fine, I don't see the error now.

> SparkR CRAN feasibility check server problem
> 
>
> Key: SPARK-24152
> URL: https://issues.apache.org/jira/browse/SPARK-24152
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 2.4.0
>Reporter: Dongjoon Hyun
>Assignee: Liang-Chi Hsieh
>Priority: Critical
>
> PR builder and master branch test fails with the following SparkR error with 
> unknown reason. The following is an error message from that.
> {code}
> * this is package 'SparkR' version '2.4.0'
> * checking CRAN incoming feasibility ...Error in 
> .check_package_CRAN_incoming(pkgdir) : 
>   dims [product 24] do not match the length of object [0]
> Execution halted
> {code}
> *PR BUILDER*
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90039/
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89983/
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89998/
> *MASTER BRANCH*
> - 
> https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/4458/
>  (Fail with no failures)
> This is critical because we already start to merge the PR by ignoring this 
> **known unkonwn** SparkR failure.
> - https://github.com/apache/spark/pull/21175



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24152) SparkR CRAN feasibility check server problem

2018-08-10 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16576559#comment-16576559
 ] 

Liang-Chi Hsieh commented on SPARK-24152:
-

I checked locally. Seems fine, I don't see the error now.

> SparkR CRAN feasibility check server problem
> 
>
> Key: SPARK-24152
> URL: https://issues.apache.org/jira/browse/SPARK-24152
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 2.4.0
>Reporter: Dongjoon Hyun
>Assignee: Liang-Chi Hsieh
>Priority: Critical
>
> PR builder and master branch test fails with the following SparkR error with 
> unknown reason. The following is an error message from that.
> {code}
> * this is package 'SparkR' version '2.4.0'
> * checking CRAN incoming feasibility ...Error in 
> .check_package_CRAN_incoming(pkgdir) : 
>   dims [product 24] do not match the length of object [0]
> Execution halted
> {code}
> *PR BUILDER*
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90039/
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89983/
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89998/
> *MASTER BRANCH*
> - 
> https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/4458/
>  (Fail with no failures)
> This is critical because we already start to merge the PR by ignoring this 
> **known unkonwn** SparkR failure.
> - https://github.com/apache/spark/pull/21175



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24152) SparkR CRAN feasibility check server problem

2018-08-10 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16576491#comment-16576491
 ] 

Liang-Chi Hsieh commented on SPARK-24152:
-

Sorry just see this. I will ask CRAN sysadmin again. 

> SparkR CRAN feasibility check server problem
> 
>
> Key: SPARK-24152
> URL: https://issues.apache.org/jira/browse/SPARK-24152
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 2.4.0
>Reporter: Dongjoon Hyun
>Assignee: Liang-Chi Hsieh
>Priority: Critical
>
> PR builder and master branch test fails with the following SparkR error with 
> unknown reason. The following is an error message from that.
> {code}
> * this is package 'SparkR' version '2.4.0'
> * checking CRAN incoming feasibility ...Error in 
> .check_package_CRAN_incoming(pkgdir) : 
>   dims [product 24] do not match the length of object [0]
> Execution halted
> {code}
> *PR BUILDER*
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90039/
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89983/
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89998/
> *MASTER BRANCH*
> - 
> https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/4458/
>  (Fail with no failures)
> This is critical because we already start to merge the PR by ignoring this 
> **known unkonwn** SparkR failure.
> - https://github.com/apache/spark/pull/21175



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25010) Rand/Randn should produce different values for each execution in streaming query

2018-08-02 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-25010:
---

 Summary: Rand/Randn should produce different values for each 
execution in streaming query
 Key: SPARK-25010
 URL: https://issues.apache.org/jira/browse/SPARK-25010
 Project: Spark
  Issue Type: Bug
  Components: SQL, Structured Streaming
Affects Versions: 2.4.0
Reporter: Liang-Chi Hsieh


Like Uuid in SPARK-24896, Rand and Randn expressions now produce the same 
results for each execution in streaming query. It doesn't make too much sense 
for streaming queries. We should make them produce different results as Uuid.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24906) Enlarge split size for columnar file to ensure the task read enough data

2018-07-24 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554716#comment-16554716
 ] 

Liang-Chi Hsieh commented on SPARK-24906:
-

A {{maxPartitionBytes}} value adapted improperly might hurt performance. And in 
this case, users may not know what is happened. Do we have a general rule to 
calculate proper {{maxPartitionBytes}}?

> Enlarge split size for columnar file to ensure the task read enough data
> 
>
> Key: SPARK-24906
> URL: https://issues.apache.org/jira/browse/SPARK-24906
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: Jason Guo
>Priority: Critical
> Attachments: image-2018-07-24-20-26-32-441.png, 
> image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, 
> image-2018-07-24-20-30-24-552.png
>
>
> For columnar file, such as, when spark sql read the table, each split will be 
> 128 MB by default since spark.sql.files.maxPartitionBytes is default to 
> 128MB. Even when user set it to a large value, such as 512MB, the task may 
> read only few MB or even hundreds of KB. Because the table (Parquet) may 
> consists of dozens of columns while the SQL only need few columns. And spark 
> will prune the unnecessary columns.
>  
> In this case, spark DataSourceScanExec can enlarge maxPartitionBytes 
> adaptively. 
> For example, there is 40 columns , 20 are integer while another 20 are long. 
> When use query on an integer type column and an long type column, the 
> maxPartitionBytes should be 20 times larger. (20*4+20*8) /  (4+8) = 20. 
>  
> With this optimization, the number of task will be smaller and the job will 
> run faster. More importantly, for a very large cluster (more the 10 thousand 
> nodes), it will relieve RM's schedule pressure.
>  
> Here is the test
>  
> The table named test2 has more than 40 columns and there are more than 5 TB 
> data each hour.
> When we issue a very simple query 
>  
> {code:java}
> select count(device_id) from test2 where date=20180708 and hour='23'{code}
>  
> There are 72176 tasks and the duration of the job is 4.8 minutes
> !image-2018-07-24-20-26-32-441.png!
>  
> Most tasks last less than 1 second and read less than 1.5 MB data
> !image-2018-07-24-20-28-06-269.png!
>  
> After the optimization, there are only 1615 tasks and the job last only 30 
> seconds. It almost 10 times faster.
> !image-2018-07-24-20-29-24-797.png!
>  
> The median of read data is 44.2MB. 
> !image-2018-07-24-20-30-24-552.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24896) Uuid expression should produce different values in each execution under streaming query

2018-07-23 Thread Liang-Chi Hsieh (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh updated SPARK-24896:

Component/s: Structured Streaming

> Uuid expression should produce different values in each execution under 
> streaming query
> ---
>
> Key: SPARK-24896
> URL: https://issues.apache.org/jira/browse/SPARK-24896
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Liang-Chi Hsieh
>Priority: Major
>
> Uuid's results depend on random seed given during analysis. Thus under 
> streaming query, we will have the same uuids in each execution. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24896) Uuid expression should produce different values in each execution under streaming query

2018-07-23 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-24896:
---

 Summary: Uuid expression should produce different values in each 
execution under streaming query
 Key: SPARK-24896
 URL: https://issues.apache.org/jira/browse/SPARK-24896
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.0
Reporter: Liang-Chi Hsieh


Uuid's results depend on random seed given during analysis. Thus under 
streaming query, we will have the same uuids in each execution. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24885) Initialize random seeds for Rand and Randn expression during analysis

2018-07-22 Thread Liang-Chi Hsieh (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh resolved SPARK-24885.
-
Resolution: Won't Fix

> Initialize random seeds for Rand and Randn expression during analysis
> -
>
> Key: SPARK-24885
> URL: https://issues.apache.org/jira/browse/SPARK-24885
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Liang-Chi Hsieh
>Priority: Major
>
> Random expressions such as Rand and Randn should have the same behavior as 
> Uuid that their random seeds should be initialized at analysis.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24885) Initialize random seeds for Rand and Randn expression during analysis

2018-07-22 Thread Liang-Chi Hsieh (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh updated SPARK-24885:

Description: 
Random expressions such as Rand and Randn should have the same behavior as Uuid 
that their random seeds should be initialized at analysis.



  was:
Random expressions such as Rand and Randn should have the same behavior as Uuid 
that 




> Initialize random seeds for Rand and Randn expression during analysis
> -
>
> Key: SPARK-24885
> URL: https://issues.apache.org/jira/browse/SPARK-24885
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Liang-Chi Hsieh
>Priority: Major
>
> Random expressions such as Rand and Randn should have the same behavior as 
> Uuid that their random seeds should be initialized at analysis.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24885) Initialize random seeds for Rand and Randn expression during analysis

2018-07-22 Thread Liang-Chi Hsieh (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh updated SPARK-24885:

Summary: Initialize random seeds for Rand and Randn expression during 
analysis  (was: Rand and Randn expression should produce same result at 
DataFrame on retries)

> Initialize random seeds for Rand and Randn expression during analysis
> -
>
> Key: SPARK-24885
> URL: https://issues.apache.org/jira/browse/SPARK-24885
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Liang-Chi Hsieh
>Priority: Major
>
> Random expressions such as Rand and Randn should have the same behavior as 
> Uuid that 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24885) Rand and Randn expression should produce same result at DataFrame on retries

2018-07-22 Thread Liang-Chi Hsieh (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh updated SPARK-24885:

Description: 
Random expressions such as Rand and Randn should have the same behavior as Uuid 
that 



  was:
Random expressions such as Rand and Randn should have the same behavior as Uuid 
that produces the same results at the same DataFrame on re-treis.




> Rand and Randn expression should produce same result at DataFrame on retries
> 
>
> Key: SPARK-24885
> URL: https://issues.apache.org/jira/browse/SPARK-24885
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Liang-Chi Hsieh
>Priority: Major
>
> Random expressions such as Rand and Randn should have the same behavior as 
> Uuid that 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24885) Rand and Randn expression should produce same result at DataFrame on retries

2018-07-22 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-24885:
---

 Summary: Rand and Randn expression should produce same result at 
DataFrame on retries
 Key: SPARK-24885
 URL: https://issues.apache.org/jira/browse/SPARK-24885
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Liang-Chi Hsieh


Random expressions such as Rand and Randn should have the same behavior as Uuid 
that produces the same results at the same DataFrame on re-treis.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24875) MulticlassMetrics should offer a more efficient way to compute count by label

2018-07-20 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16551457#comment-16551457
 ] 

Liang-Chi Hsieh edited comment on SPARK-24875 at 7/21/18 12:21 AM:
---

hmm, I think for calculation of precision, recall and true/false positive rate, 
we should only care about exact numbers but approximate one. Thus is it 
reasonable to use countByValueApprox here?


was (Author: viirya):
hmm, I think for calculation of precision, recall and true/false positive rate, 
we should only care about exact calculation but approximate one. Thus is it 
reasonable to use countByValueApprox here?

> MulticlassMetrics should offer a more efficient way to compute count by label
> -
>
> Key: SPARK-24875
> URL: https://issues.apache.org/jira/browse/SPARK-24875
> Project: Spark
>  Issue Type: Improvement
>  Components: MLlib
>Affects Versions: 2.3.1
>Reporter: Antoine Galataud
>Priority: Minor
>
> Currently _MulticlassMetrics_ calls _countByValue_() to get count by 
> class/label
> {code:java}
> private lazy val labelCountByClass: Map[Double, Long] = 
> predictionAndLabels.values.countByValue()
> {code}
> If input _RDD[(Double, Double)]_ is huge (which can be the case with a large 
> test dataset), it will lead to poor execution performance.
> One option could be to allow using _countByValueApprox_ (could require adding 
> an extra configuration param for MulticlassMetrics).
> Note: since there is no equivalent of _MulticlassMetrics_ in new ML library, 
> I don't know how this could be ported there.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24875) MulticlassMetrics should offer a more efficient way to compute count by label

2018-07-20 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16551457#comment-16551457
 ] 

Liang-Chi Hsieh commented on SPARK-24875:
-

hmm, I think for calculation of precision, recall and true/false positive rate, 
we should only care about exact calculation but approximate one. Thus is it 
reasonable to use countByValueApprox here?

> MulticlassMetrics should offer a more efficient way to compute count by label
> -
>
> Key: SPARK-24875
> URL: https://issues.apache.org/jira/browse/SPARK-24875
> Project: Spark
>  Issue Type: Improvement
>  Components: MLlib
>Affects Versions: 2.3.1
>Reporter: Antoine Galataud
>Priority: Minor
>
> Currently _MulticlassMetrics_ calls _countByValue_() to get count by 
> class/label
> {code:java}
> private lazy val labelCountByClass: Map[Double, Long] = 
> predictionAndLabels.values.countByValue()
> {code}
> If input _RDD[(Double, Double)]_ is huge (which can be the case with a large 
> test dataset), it will lead to poor execution performance.
> One option could be to allow using _countByValueApprox_ (could require adding 
> an extra configuration param for MulticlassMetrics).
> Note: since there is no equivalent of _MulticlassMetrics_ in new ML library, 
> I don't know how this could be ported there.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24862) Spark Encoder is not consistent to scala case class semantic for multiple argument lists

2018-07-20 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16551416#comment-16551416
 ] 

Liang-Chi Hsieh commented on SPARK-24862:
-

Isn't it inconsistent between the schema and the ser/de? And for serializer, 
for example, how can we get the {{y}} from {{Multi}} objects?

> Spark Encoder is not consistent to scala case class semantic for multiple 
> argument lists
> 
>
> Key: SPARK-24862
> URL: https://issues.apache.org/jira/browse/SPARK-24862
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Antonio Murgia
>Priority: Major
>
> Spark Encoder is not consistent to scala case class semantic for multiple 
> argument lists.
> For example if I create a case class with multiple constructor argument lists:
> {code:java}
> case class Multi(x: String)(y: Int){code}
> Scala creates a product with arity 1, while if I apply 
> {code:java}
> Encoders.product[Multi].schema.printTreeString{code}
> I get
> {code:java}
> root
> |-- x: string (nullable = true)
> |-- y: integer (nullable = false){code}
> That is not consistent and leads to:
> {code:java}
> Error while encoding: java.lang.RuntimeException: Couldn't find y on class 
> it.enel.next.platform.service.events.common.massive.immutable.Multi
> staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
> fromString, assertnotnull(assertnotnull(input[0, 
> it.enel.next.platform.service.events.common.massive.immutable.Multi, 
> true])).x, true) AS x#0
> assertnotnull(assertnotnull(input[0, 
> it.enel.next.platform.service.events.common.massive.immutable.Multi, 
> true])).y AS y#1
> java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: 
> Couldn't find y on class 
> it.enel.next.platform.service.events.common.massive.immutable.Multi
> staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
> fromString, assertnotnull(assertnotnull(input[0, 
> it.enel.next.platform.service.events.common.massive.immutable.Multi, 
> true])).x, true) AS x#0
> assertnotnull(assertnotnull(input[0, 
> it.enel.next.platform.service.events.common.massive.immutable.Multi, 
> true])).y AS y#1
> at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290)
> at org.apache.spark.sql.SparkSession$$anonfun$2.apply(SparkSession.scala:464)
> at org.apache.spark.sql.SparkSession$$anonfun$2.apply(SparkSession.scala:464)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:296)
> at org.apache.spark.sql.SparkSession.createDataset(SparkSession.scala:464)
> at 
> it.enel.next.platform.service.events.common.massive.immutable.ParquetQueueSuite$$anonfun$1.apply$mcV$sp(ParquetQueueSuite.scala:48)
> at 
> it.enel.next.platform.service.events.common.massive.immutable.ParquetQueueSuite$$anonfun$1.apply(ParquetQueueSuite.scala:46)
> at 
> it.enel.next.platform.service.events.common.massive.immutable.ParquetQueueSuite$$anonfun$1.apply(ParquetQueueSuite.scala:46)
> at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> at org.scalatest.Transformer.apply(Transformer.scala:22)
> at org.scalatest.Transformer.apply(Transformer.scala:20)
> at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1682)
> at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
> at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1685)
> at 
> org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1679)
> at 
> org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1692)
> at 
> org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1692)
> at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
> at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1692)
> at org.scalatest.FlatSpec.runTest(FlatSpec.scala:1685)
> at 
> org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1750)
> at 
> org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1750)
> at 
> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
> at 
> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
> at 
> org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInB

[jira] [Commented] (SPARK-24862) Spark Encoder is not consistent to scala case class semantic for multiple argument lists

2018-07-20 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16551174#comment-16551174
 ] 

Liang-Chi Hsieh commented on SPARK-24862:
-

Even we only retrieve the first parameter list at {{getConstructorParameters}}, 
when we need to deserialize {{Multi}}, we don't have the {{y}} in input columns 
because we only serialize {{x}}. I think the multiple parameter lists case 
class is not supported for Encoder.

> Spark Encoder is not consistent to scala case class semantic for multiple 
> argument lists
> 
>
> Key: SPARK-24862
> URL: https://issues.apache.org/jira/browse/SPARK-24862
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Antonio Murgia
>Priority: Major
>
> Spark Encoder is not consistent to scala case class semantic for multiple 
> argument lists.
> For example if I create a case class with multiple constructor argument lists:
> {code:java}
> case class Multi(x: String)(y: Int){code}
> Scala creates a product with arity 1, while if I apply 
> {code:java}
> Encoders.product[Multi].schema.printTreeString{code}
> I get
> {code:java}
> root
> |-- x: string (nullable = true)
> |-- y: integer (nullable = false){code}
> That is not consistent and leads to:
> {code:java}
> Error while encoding: java.lang.RuntimeException: Couldn't find y on class 
> it.enel.next.platform.service.events.common.massive.immutable.Multi
> staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
> fromString, assertnotnull(assertnotnull(input[0, 
> it.enel.next.platform.service.events.common.massive.immutable.Multi, 
> true])).x, true) AS x#0
> assertnotnull(assertnotnull(input[0, 
> it.enel.next.platform.service.events.common.massive.immutable.Multi, 
> true])).y AS y#1
> java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: 
> Couldn't find y on class 
> it.enel.next.platform.service.events.common.massive.immutable.Multi
> staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
> fromString, assertnotnull(assertnotnull(input[0, 
> it.enel.next.platform.service.events.common.massive.immutable.Multi, 
> true])).x, true) AS x#0
> assertnotnull(assertnotnull(input[0, 
> it.enel.next.platform.service.events.common.massive.immutable.Multi, 
> true])).y AS y#1
> at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290)
> at org.apache.spark.sql.SparkSession$$anonfun$2.apply(SparkSession.scala:464)
> at org.apache.spark.sql.SparkSession$$anonfun$2.apply(SparkSession.scala:464)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:296)
> at org.apache.spark.sql.SparkSession.createDataset(SparkSession.scala:464)
> at 
> it.enel.next.platform.service.events.common.massive.immutable.ParquetQueueSuite$$anonfun$1.apply$mcV$sp(ParquetQueueSuite.scala:48)
> at 
> it.enel.next.platform.service.events.common.massive.immutable.ParquetQueueSuite$$anonfun$1.apply(ParquetQueueSuite.scala:46)
> at 
> it.enel.next.platform.service.events.common.massive.immutable.ParquetQueueSuite$$anonfun$1.apply(ParquetQueueSuite.scala:46)
> at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> at org.scalatest.Transformer.apply(Transformer.scala:22)
> at org.scalatest.Transformer.apply(Transformer.scala:20)
> at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1682)
> at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
> at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1685)
> at 
> org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1679)
> at 
> org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1692)
> at 
> org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1692)
> at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
> at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1692)
> at org.scalatest.FlatSpec.runTest(FlatSpec.scala:1685)
> at 
> org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1750)
> at 
> org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1750)
> at 
> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
> at 
> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at o

[jira] [Commented] (SPARK-24847) ScalaReflection#schemaFor occasionally fails to detect schema for Seq of type alias

2018-07-19 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16550091#comment-16550091
 ] 

Liang-Chi Hsieh commented on SPARK-24847:
-

I can't reproduce this currently.

> ScalaReflection#schemaFor occasionally fails to detect schema for Seq of type 
> alias
> ---
>
> Key: SPARK-24847
> URL: https://issues.apache.org/jira/browse/SPARK-24847
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ahmed Mahran
>Priority: Major
>
> org.apache.spark.sql.catalyst.ScalaReflection#schemaFor occasionally fails to 
> detect schema for Seq of type alias (and it occasionally succeeds).
>  
> {code:java}
> object Types {
>   type Alias1 = Long
>   type Alias2 = Int
>   type Alias3 = Int
> }
> case class B(b1: Alias1, b2: Seq[Alias2], b3: Option[Alias3])
> case class A(a1: B, a2: Int)
> {code}
>  
> {code}
> import sparkSession.implicits._
> val seq = Seq(
>   A(B(2L, Seq(3), Some(1)), 1),
>   A(B(3L, Seq(2), Some(2)), 2)
> )
> val ds = sparkSession.createDataset(seq)
> {code}
>  
> {code:java}
> java.lang.UnsupportedOperationException: Schema for type Seq[Types.Alias2] is 
> not supported at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:780)
>  at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:715)
>  at 
> org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
>  at 
> org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
>  at 
> org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:714)
>  at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1$$anonfun$7.apply(ScalaReflection.scala:381)
>  at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1$$anonfun$7.apply(ScalaReflection.scala:380)
>  at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:380)
>  at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:150)
>  at 
> org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
>  at 
> org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
>  at 
> org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor(ScalaReflection.scala:150)
>  at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1$$anonfun$7.apply(ScalaReflection.scala:391)
>  at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1$$anonfun$7.apply(ScalaReflection.scala:380)
>  at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:380)
>  at 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:150)
>  at 
> org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
>  at 
> org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
>  at 
> org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor(ScalaReflection.scala:150)
>  at 
> org.apache.spark.sql.catalyst.ScalaReflection$.deserializerFor(ScalaReflection.scala:138)
>  at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:72)
>  at org.apache.spark.sql.Encoders$.product(Encoders.scala:275) at 
> org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)
>  at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34)
>  {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24835) col function ignores drop

2018-07-17 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16547123#comment-16547123
 ] 

Liang-Chi Hsieh commented on SPARK-24835:
-

`drop` actually does to add a projection on top of original dataset. So the 
following query works:

{code}
df2 = df.drop('c')
df2.where(df['c'] < 6).show()
{code}

It can be seen as a query (in Scala) like:

{code}
val df = Seq(("test1", 0), ("test2", 1)).toDF("name", "id")
val filter1 = df.select(df("name")).filter(df("id") === 0)
{code}

This is a valid query.

Regarding the query can't be run:

{code}
df = df.drop('c')
df.where(df['c'] < 6).show()
{code}

Because you want to resolve column {{c}} on the top of updated {{df}} which 
have output {{[a, b]}} now, you can't successfully resolve this column.



> col function ignores drop
> -
>
> Key: SPARK-24835
> URL: https://issues.apache.org/jira/browse/SPARK-24835
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.0
> Environment: Spark 2.3.0
> Python 3.5.3
>Reporter: Michael Souder
>Priority: Minor
>
> Not sure if this is a bug or user error, but I've noticed that accessing 
> columns with the col function ignores a previous call to drop.
> {code}
> import pyspark.sql.functions as F
> df = spark.createDataFrame([(1,3,5), (2, None, 7), (0, 3, 2)], ['a', 'b', 
> 'c'])
> df.show()
> +---++---+
> |  a|   b|  c|
> +---++---+
> |  1|   3|  5|
> |  2|null|  7|
> |  0|   3|  2|
> +---++---+
> df = df.drop('c')
> # the col function is able to see the 'c' column even though it has been 
> dropped
> df.where(F.col('c') < 6).show()
> +---+---+
> |  a|  b|
> +---+---+
> |  1|  3|
> |  0|  3|
> +---+---+
> # trying the same with brackets on the data frame fails with the expected 
> error
> df.where(df['c'] < 6).show()
> Py4JJavaError: An error occurred while calling o36909.apply.
> : org.apache.spark.sql.AnalysisException: Cannot resolve column name "c" 
> among (a, b);{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24835) col function ignores drop

2018-07-17 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16547123#comment-16547123
 ] 

Liang-Chi Hsieh edited comment on SPARK-24835 at 7/17/18 9:25 PM:
--

`drop` actually does to add a projection on top of original dataset. So the 
following query works:
{code:java}
df2 = df.drop('c')
df2.where(df['c'] < 6).show()
{code}
It can be seen as a query (in Scala) like:
{code:java}
val df = Seq(("test1", 0), ("test2", 1)).toDF("name", "id")
df.select(df("name")).filter(df("id") === 0).show()
{code}
This is a valid query.

Regarding the query can't be run:
{code:java}
df = df.drop('c')
df.where(df['c'] < 6).show()
{code}
Because you want to resolve column {{c}} on the top of updated {{df}} which 
have output {{[a, b]}} now, you can't successfully resolve this column.


was (Author: viirya):
`drop` actually does to add a projection on top of original dataset. So the 
following query works:

{code}
df2 = df.drop('c')
df2.where(df['c'] < 6).show()
{code}

It can be seen as a query (in Scala) like:

{code}
val df = Seq(("test1", 0), ("test2", 1)).toDF("name", "id")
val filter1 = df.select(df("name")).filter(df("id") === 0)
{code}

This is a valid query.

Regarding the query can't be run:

{code}
df = df.drop('c')
df.where(df['c'] < 6).show()
{code}

Because you want to resolve column {{c}} on the top of updated {{df}} which 
have output {{[a, b]}} now, you can't successfully resolve this column.



> col function ignores drop
> -
>
> Key: SPARK-24835
> URL: https://issues.apache.org/jira/browse/SPARK-24835
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.0
> Environment: Spark 2.3.0
> Python 3.5.3
>Reporter: Michael Souder
>Priority: Minor
>
> Not sure if this is a bug or user error, but I've noticed that accessing 
> columns with the col function ignores a previous call to drop.
> {code}
> import pyspark.sql.functions as F
> df = spark.createDataFrame([(1,3,5), (2, None, 7), (0, 3, 2)], ['a', 'b', 
> 'c'])
> df.show()
> +---++---+
> |  a|   b|  c|
> +---++---+
> |  1|   3|  5|
> |  2|null|  7|
> |  0|   3|  2|
> +---++---+
> df = df.drop('c')
> # the col function is able to see the 'c' column even though it has been 
> dropped
> df.where(F.col('c') < 6).show()
> +---+---+
> |  a|  b|
> +---+---+
> |  1|  3|
> |  0|  3|
> +---+---+
> # trying the same with brackets on the data frame fails with the expected 
> error
> df.where(df['c'] < 6).show()
> Py4JJavaError: An error occurred while calling o36909.apply.
> : org.apache.spark.sql.AnalysisException: Cannot resolve column name "c" 
> among (a, b);{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24666) Word2Vec generate infinity vectors when numIterations are large

2018-07-08 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16536538#comment-16536538
 ] 

Liang-Chi Hsieh commented on SPARK-24666:
-

Is it possible you can provide an example dataset and code to reproduce this 
issue?

> Word2Vec generate infinity vectors when numIterations are large
> ---
>
> Key: SPARK-24666
> URL: https://issues.apache.org/jira/browse/SPARK-24666
> Project: Spark
>  Issue Type: Bug
>  Components: ML, MLlib
>Affects Versions: 2.3.1
> Environment:  2.0.X, 2.1.X, 2.2.X, 2.3.X
>Reporter: ZhongYu
>Priority: Major
>
> We found that Word2Vec generate large absolute value vectors when 
> numIterations are large, and if numIterations are large enough (>20), the 
> vector's value many be *infinity(or -**infinity)***, resulting in useless 
> vectors.
> In normal situations, vectors values are mainly around -1.0~1.0 when 
> numIterations = 1.
> The bug is shown on spark 2.0.X, 2.1.X, 2.2.X, 2.3.X.
> There are already issues report this bug: 
> https://issues.apache.org/jira/browse/SPARK-5261 , but the bug fix works 
> seems missing.
> Other people's reports:
> [https://stackoverflow.com/questions/49741956/infinity-vectors-in-spark-mllib-word2vec]
> [http://apache-spark-user-list.1001560.n3.nabble.com/word2vec-outputs-Infinity-Infinity-vectors-with-increasing-iterations-td29020.html]
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24762) Aggregator should be able to use Option of Product encoder

2018-07-08 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-24762:
---

 Summary: Aggregator should be able to use Option of Product encoder
 Key: SPARK-24762
 URL: https://issues.apache.org/jira/browse/SPARK-24762
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Liang-Chi Hsieh


Encoders has a limitation that we can't construct encoders for Option of 
Product at top-level, because in SparkSQL entire row can't be null.

However for some use cases such as Aggregator, it should be able to construct 
encoders for Option of Product at non top-level.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24756) Incorrect Statistics

2018-07-07 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16535976#comment-16535976
 ] 

Liang-Chi Hsieh commented on SPARK-24756:
-

Because seems not yet a suitable approach to estimate the size of RDDs, we use 
{{defaultSizeInBytes}} as the statistics of data from an RDD for now.

> Incorrect Statistics
> 
>
> Key: SPARK-24756
> URL: https://issues.apache.org/jira/browse/SPARK-24756
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Nick Jordan
>Priority: Major
>
> I'm getting some odd results when looking at the statistics for a simple data 
> frame:
> {code:java}
> val df = spark.sparkContext.parallelize(Seq("y")).toDF("y") 
> df.queryExecution.stringWithStats{code}
> {noformat}
> == Optimized Logical Plan
> == Project [value#7 AS y#9], Statistics(sizeInBytes=8.0 EB, hints=none)
> +- SerializeFromObject [staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
> java.lang.String, true], true, false) AS value#7], Statistics(sizeInBytes=8.0 
> EB, hints=none)
> +- ExternalRDD [obj#6], Statistics(sizeInBytes=8.0 EB, hints=none)
> {noformat}
> 8.0 Exabytes is clearly not right here.  It is worth noting that if I don't 
> parallelize the Seq then I get the expected results.  
> This surfaced when I was running unit tests that verified that a broadcast 
> hint was preserved which was failing because of the incorrect statistics.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24152) SparkR CRAN feasibility check server problem

2018-07-06 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16535474#comment-16535474
 ] 

Liang-Chi Hsieh commented on SPARK-24152:
-

I've noticed it too and already asked CRAN sysadmin for fixing it. I think it 
is fixed now.

> SparkR CRAN feasibility check server problem
> 
>
> Key: SPARK-24152
> URL: https://issues.apache.org/jira/browse/SPARK-24152
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 2.4.0
>Reporter: Dongjoon Hyun
>Assignee: Liang-Chi Hsieh
>Priority: Critical
>
> PR builder and master branch test fails with the following SparkR error with 
> unknown reason. The following is an error message from that.
> {code}
> * this is package 'SparkR' version '2.4.0'
> * checking CRAN incoming feasibility ...Error in 
> .check_package_CRAN_incoming(pkgdir) : 
>   dims [product 24] do not match the length of object [0]
> Execution halted
> {code}
> *PR BUILDER*
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90039/
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89983/
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89998/
> *MASTER BRANCH*
> - 
> https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/4458/
>  (Fail with no failures)
> This is critical because we already start to merge the PR by ignoring this 
> **known unkonwn** SparkR failure.
> - https://github.com/apache/spark/pull/21175



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24746) AWS S3 301 Moved Permanently error message even after setting fs.s3a.endpoint for bucket in Mumbai region.

2018-07-05 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16534443#comment-16534443
 ] 

Liang-Chi Hsieh commented on SPARK-24746:
-

Maybe related: 
https://stackoverflow.com/questions/44284451/301-redirect-when-trying-to-access-aws-mumbai-s3-server


> AWS S3 301 Moved Permanently error message even after setting fs.s3a.endpoint 
> for bucket in Mumbai region.
> --
>
> Key: SPARK-24746
> URL: https://issues.apache.org/jira/browse/SPARK-24746
> Project: Spark
>  Issue Type: Question
>  Components: Kubernetes, PySpark
>Affects Versions: 2.3.1
>Reporter: Kushagra Singh
>Priority: Major
>
> I am trying to write parquet data to a S3 bucket in ap-south-1(Mumbai) region 
> but keep getting 301 errors even though I have specified the correct region.
> {code}
> sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", 
> "s3.ap-south-1.amazonaws.com")
> log.write.mode("overwrite").parquet("s3a://bucket/logs")
> {code}
> s3a related config in spark-defaults:
> {code:java}
> spark.hadoop.fs.s3a.implorg.apache.hadoop.fs.s3a.S3AFileSystem
> spark.hadoop.validateOutputSpecs false
> spark.executor.extraJavaOptions -Dcom.amazonaws.services.s3.enableV4=true
> spark.driver.extraJavaOptions -Dcom.amazonaws.services.s3.enableV4=true
> spark.hadoop.fs.s3a.connection.maximum 100
> {code}
> Using _spark 2.3.1_ and _hadoop 2.7_ with _aws-java-sdk-1.7.4_ and 
> _hadoop-aws-2.7.3_
> Stacktrace:
> {code:java}
> py4j.protocol.Py4JJavaError: An error occurred while calling o71.parquet.
> : org.apache.spark.SparkException: Job aborted.
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
>   at 
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
>   at 
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
>   at 
> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
>   at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
>   at 
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
>   at 
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
>   at 
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
>   at 
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
>   at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
>   at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
>   at 
> org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:547)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>   at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>   at py4j.Gateway.invoke(Gateway.java:282)
>   at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>   at py4j.commands.CallCommand.execute(CallCommand.java:79)
>   at py4j.GatewayConnection.run(GatewayConnection.java:238)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 
> 301, AWS Service: Amazon S3, AWS Request ID: 0A48F0A6FD8AC8B5, AWS Error 
> Code: null, AWS Error Message: Moved Permanently, S3 Extended Request ID: 
> lPmrY0rkTFpMASMjvFaDTbCPfTgX+

[jira] [Commented] (SPARK-24464) Unit tests for MLlib's Instrumentation

2018-07-05 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533461#comment-16533461
 ] 

Liang-Chi Hsieh commented on SPARK-24464:
-

Because {{Instrumentation}} is used for logging, I wonder the test for it is to 
verify logging console output?

> Unit tests for MLlib's Instrumentation
> --
>
> Key: SPARK-24464
> URL: https://issues.apache.org/jira/browse/SPARK-24464
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Xiangrui Meng
>Priority: Major
>
> We added Instrumentation to MLlib to log params and metrics during machine 
> learning training and inference. However, the code has zero test coverage, 
> which usually means bugs and regressions in the future. I created this JIRA 
> to discuss how we should test Instrumentation.
> cc: [~thunterdb] [~josephkb] [~lu.DB]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24438) Empty strings and null strings are written to the same partition

2018-07-04 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532392#comment-16532392
 ] 

Liang-Chi Hsieh commented on SPARK-24438:
-

>From the code, looks like we intentionally treat empty string and null the 
>same as default partition name, though the dataframe read back doesn't make 
>such sense.

cc [~cloud_fan] do you think this is a bug and we should fix it?

> Empty strings and null strings are written to the same partition
> 
>
> Key: SPARK-24438
> URL: https://issues.apache.org/jira/browse/SPARK-24438
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Mukul Murthy
>Priority: Major
>
> When you partition on a string column that has empty strings and nulls, they 
> are both written to the same default partition. When you read the data back, 
> all those values get read back as null.
> {code:java}
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.catalyst.encoders.RowEncoder
> val data = Seq(Row(1, ""), Row(2, ""), Row(3, ""), Row(4, "hello"), Row(5, 
> null))
> val schema = new StructType().add("a", IntegerType).add("b", StringType)
> val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
> display(df) 
> => 
> a b
> 1 
> 2 
> 3 
> 4 hello
> 5 null
> df.write.mode("overwrite").partitionBy("b").save("/home/mukul/weird_test_data4")
> val df2 = spark.read.load("/home/mukul/weird_test_data4")
> display(df2)
> => 
> a b
> 4 hello
> 3 null
> 2 null
> 1 null
> 5 null
> {code}
> Seems to affect multiple types of tables.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24467) VectorAssemblerEstimator

2018-07-03 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532324#comment-16532324
 ] 

Liang-Chi Hsieh commented on SPARK-24467:
-

It sounds good to me for the approach similar to one hot encoder.

> VectorAssemblerEstimator
> 
>
> Key: SPARK-24467
> URL: https://issues.apache.org/jira/browse/SPARK-24467
> Project: Spark
>  Issue Type: New Feature
>  Components: ML
>Affects Versions: 2.4.0
>Reporter: Joseph K. Bradley
>Priority: Major
>
> In [SPARK-22346], I believe I made a wrong API decision: I recommended added 
> `VectorSizeHint` instead of making `VectorAssembler` into an Estimator since 
> I thought the latter option would break most workflows.  However, I should 
> have proposed:
> * Add a Param to VectorAssembler for specifying the sizes of Vectors in the 
> inputCols.  This Param can be optional.  If not given, then VectorAssembler 
> will behave as it does now.  If given, then VectorAssembler can use that info 
> instead of figuring out the Vector sizes via metadata or examining Rows in 
> the data (though it could do consistency checks).
> * Add a VectorAssemblerEstimator which gets the Vector lengths from data and 
> produces a VectorAssembler with the vector lengths Param specified.
> This will not break existing workflows.  Migrating to 
> VectorAssemblerEstimator will be easier than adding VectorSizeHint since it 
> will not require users to manually input Vector lengths.
> Note: Even with this Estimator, VectorSizeHint might prove useful for other 
> things in the future which require vector length metadata, so we could 
> consider keeping it rather than deprecating it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24528) Missing optimization for Aggregations/Windowing on a bucketed table

2018-07-01 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529361#comment-16529361
 ] 

Liang-Chi Hsieh commented on SPARK-24528:
-

I think we can have a sql config to control enabling/disabling this behavior 
too.

> Missing optimization for Aggregations/Windowing on a bucketed table
> ---
>
> Key: SPARK-24528
> URL: https://issues.apache.org/jira/browse/SPARK-24528
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0, 2.4.0
>Reporter: Ohad Raviv
>Priority: Major
>
> Closely related to  SPARK-24410, we're trying to optimize a very common use 
> case we have of getting the most updated row by id from a fact table.
> We're saving the table bucketed to skip the shuffle stage, but we're still 
> "waste" time on the Sort operator evethough the data is already sorted.
> here's a good example:
> {code:java}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("key", "t1")
> .saveAsTable("a1"){code}
> {code:java}
> sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
> == Physical Plan ==
> SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, 
> key#24L, t1, t1#25L, t2, t2#26L))])
> +- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, 
> t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))])
> +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, 
> Format: Parquet, Location: ...{code}
>  
> and here's a bad example, but more realistic:
> {code:java}
> sparkSession.sql("set spark.sql.shuffle.partitions=2")
> sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
> == Physical Plan ==
> SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, 
> key#32L, t1, t1#33L, t2, t2#34L))])
> +- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, 
> t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))])
> +- *(1) Sort [key#32L ASC NULLS FIRST], false, 0
> +- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, 
> Format: Parquet, Location: ...
> {code}
>  
> I've traced the problem to DataSourceScanExec#235:
> {code:java}
> val sortOrder = if (sortColumns.nonEmpty) {
>   // In case of bucketing, its possible to have multiple files belonging to 
> the
>   // same bucket in a given relation. Each of these files are locally sorted
>   // but those files combined together are not globally sorted. Given that,
>   // the RDD partition will not be sorted even if the relation has sort 
> columns set
>   // Current solution is to check if all the buckets have a single file in it
>   val files = selectedPartitions.flatMap(partition => partition.files)
>   val bucketToFilesGrouping =
> files.map(_.getPath.getName).groupBy(file => 
> BucketingUtils.getBucketId(file))
>   val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 
> 1){code}
> so obviously the code avoids dealing with this situation now..
> could you think of a way to solve this or bypass it?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24689) java.io.NotSerializableException: org.apache.spark.mllib.clustering.DistributedLDAModel

2018-06-29 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16528490#comment-16528490
 ] 

Liang-Chi Hsieh commented on SPARK-24689:
-

I think we don't set priority as Blocker which is just used by committers.
cc [~hyukjin.kwon].

> java.io.NotSerializableException: 
> org.apache.spark.mllib.clustering.DistributedLDAModel
> ---
>
> Key: SPARK-24689
> URL: https://issues.apache.org/jira/browse/SPARK-24689
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell
>Affects Versions: 2.3.1
> Environment: !image-2018-06-29-13-42-30-255.png!
>Reporter: konglingbo
>Priority: Blocker
> Attachments: @CLZ98635A644[_edx...@e.png
>
>
> scala> val predictionAndLabels=testing.map{case LabeledPoint(label,features)=>
>  | val prediction = model.predict(features)
>  | (prediction, label)
>  | }



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24667) If folders managed by DiskBlockManager are deleted manually, shell throws FileNotFoundException

2018-06-29 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527347#comment-16527347
 ] 

Liang-Chi Hsieh commented on SPARK-24667:
-

I think it is not a bug...

> If folders managed by DiskBlockManager are deleted manually, shell throws 
> FileNotFoundException
> ---
>
> Key: SPARK-24667
> URL: https://issues.apache.org/jira/browse/SPARK-24667
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.3.1
>Reporter: wuyonghua
>Priority: Major
>
> # In spark shell
>  # Run a join query, which writes some data to folders managed by 
> DiskBlockManager.
>  # Delete some of the folders, 
> e.g.tmp\blockmgr-7145fe5a-757c-4640-a032-047c7f1d4080\2d
>  # run the same query again, it throws FileNotFoundException
>  # Suggest to check the folder existence when found a folder in subDirs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24635) Remove Blocks class

2018-06-23 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-24635:
---

 Summary: Remove Blocks class
 Key: SPARK-24635
 URL: https://issues.apache.org/jira/browse/SPARK-24635
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Liang-Chi Hsieh


The {{Blocks}} class in {{JavaCode}} class hierarchy is not necessary. Its 
function can be taken by {{CodeBlock}}. We should remove it to make simpler 
class hierarchy.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24607) Distribute by rand() can lead to data inconsistency

2018-06-21 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519119#comment-16519119
 ] 

Liang-Chi Hsieh commented on SPARK-24607:
-

[~gostop_zlx]

If you don't give a seed to {{rand}}, SparkSQL will assign it a random seed. So 
you can get the same results when you run the same query multiple times.

When you create new query like above, you are in fact assigning it different 
seed.

> Distribute by rand() can lead to data inconsistency
> ---
>
> Key: SPARK-24607
> URL: https://issues.apache.org/jira/browse/SPARK-24607
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.1
>Reporter: zenglinxi
>Priority: Major
>
> Noticed the following queries can give different results:
> {code:java}
> select count(*) from tbl;
> select count(*) from (select * from tbl distribute by rand()) a;{code}
> this issue was first reported by someone using kylin for building cube with 
> hiveSQL which include  distribute by rand, data inconsistency may happen 
> during failure tolerance operations. Since spark has similar failure 
> tolerance mechanism, I think it's also an hidden serious problem in sparksql.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24607) Distribute by rand() can lead to data inconsistency

2018-06-20 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518206#comment-16518206
 ] 

Liang-Chi Hsieh edited comment on SPARK-24607 at 6/20/18 2:40 PM:
--

Thanks [~mgaido]!

As I check {{Rand}} expression, seems it already uses a seed, it sounds like 
the option 1 in the Hive ticket. So if you still use the same query plan, it 
should produce deterministic sequences of numbers.

Unless that the input of rows has a different order when retries, then it can 
be possible to produce inconsistent sequences in retries.



was (Author: viirya):
Thanks [~mgaido]!

As I check {{Rand}} expression, seems it already uses a seed, it sounds like 
the option 1 in the Hive ticket. So if you still use the same query plan, it 
should produce deterministic sequences of numbers.



> Distribute by rand() can lead to data inconsistency
> ---
>
> Key: SPARK-24607
> URL: https://issues.apache.org/jira/browse/SPARK-24607
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.1
>Reporter: zenglinxi
>Priority: Major
>
> Noticed the following queries can give different results:
> {code:java}
> select count(*) from tbl;
> select count(*) from (select * from tbl distribute by rand()) a;{code}
> this issue was first reported by someone using kylin for building cube with 
> hiveSQL which include  distribute by rand, I think it's also an hidden 
> serious problem in sparksql.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24607) Distribute by rand() can lead to data inconsistency

2018-06-20 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518206#comment-16518206
 ] 

Liang-Chi Hsieh commented on SPARK-24607:
-

Thanks [~mgaido]!

As I check {{Rand}} expression, seems it already uses a seed, it sounds like 
the option 1 in the Hive ticket. So if you still use the same query plan, it 
should produce deterministic sequences of numbers.



> Distribute by rand() can lead to data inconsistency
> ---
>
> Key: SPARK-24607
> URL: https://issues.apache.org/jira/browse/SPARK-24607
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.1
>Reporter: zenglinxi
>Priority: Major
>
> Noticed the following queries can give different results:
> {code:java}
> select count(*) from tbl;
> select count(*) from (select * from tbl distribute by rand()) a;{code}
> this issue was first reported by someone using kylin for building cube with 
> hiveSQL which include  distribute by rand, I think it's also an hidden 
> serious problem in sparksql.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24607) Distribute by rand() can lead to data inconsistency

2018-06-20 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518184#comment-16518184
 ] 

Liang-Chi Hsieh commented on SPARK-24607:
-

>From the following test, looks it is ok.

{code:java}
scala> val df1 = sql("select count(*) from a1")
df1: org.apache.spark.sql.DataFrame = [count(1): bigint]

scala> val df2 = sql("select count(*) from (select * from a1 distribute by 
rand()) a")
df2: org.apache.spark.sql.DataFrame = [count(1): bigint]
{code}
{code:java}
scala> df1.show 
   
++
|count(1)|
++
|1000|
++


scala> df2.show
++
|count(1)|
++
|1000|
++
{code}
{code:java}
scala> df1.explain
== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *(1) HashAggregate(keys=[], functions=[partial_count(1)])
  +- *(1) FileScan parquet default.a1[] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/root/repos/spark-1/spark-warehouse/a1], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

scala> df2.explain
== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *(2) HashAggregate(keys=[], functions=[partial_count(1)])
  +- *(2) Project
 +- Exchange hashpartitioning(_nondeterministic#100, 200)
+- *(1) Project [rand(1677810096404215833) AS _nondeterministic#100]
   +- *(1) FileScan parquet default.a1[key#69L,t1#70L,t2#71L] 
Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:
/root/repos/spark-1/spark-warehouse/a1], PartitionFilters: [], PushedFilters: 
[], ReadSchema: struct
{code}

> Distribute by rand() can lead to data inconsistency
> ---
>
> Key: SPARK-24607
> URL: https://issues.apache.org/jira/browse/SPARK-24607
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.1
>Reporter: zenglinxi
>Priority: Major
>
> Noticed the following queries can give different results:
> {code:java}
> select count(*) from tbl;
> select count(*) from (select * from tbl distribute by rand()) a;{code}
> this issue was first reported by someone using kylin for building cube with 
> hiveSQL which include  distribute by rand, I think it's also an hidden 
> serious problem in sparksql.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24465) LSHModel should support Structured Streaming for transform

2018-06-15 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513649#comment-16513649
 ] 

Liang-Chi Hsieh edited comment on SPARK-24465 at 6/15/18 10:32 AM:
---

I'm not sure if SPARK-12878  is a real issue. Seems to me that it just needs to 
write nested UDT codes in the working approach. Please see my comment on 
SPARK-12878.


was (Author: viirya):
I'm not sure if SPARK-12878  is a real issue. Seems to me that it just needs to 
write nested UDT codes in the working approach.

> LSHModel should support Structured Streaming for transform
> --
>
> Key: SPARK-24465
> URL: https://issues.apache.org/jira/browse/SPARK-24465
> Project: Spark
>  Issue Type: New Feature
>  Components: ML
>Affects Versions: 2.4.0
>Reporter: Joseph K. Bradley
>Priority: Major
>
> Locality Sensitive Hashing (LSH) Models (BucketedRandomProjectionLSHModel, 
> MinHashLSHModel) are not compatible with Structured Streaming (and I believe 
> are the final Transformers which are not compatible).  These do not work 
> because Spark SQL does not support nested types containing UDTs; see 
> [SPARK-12878].
> This task is to add unit tests for streaming (as in [SPARK-22644]) for 
> LSHModels after [SPARK-12878] has been fixed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24465) LSHModel should support Structured Streaming for transform

2018-06-15 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513649#comment-16513649
 ] 

Liang-Chi Hsieh commented on SPARK-24465:
-

I'm not sure if SPARK-12878  is a real issue. Seems to me that it just needs to 
write nested UDT codes in the working approach.

> LSHModel should support Structured Streaming for transform
> --
>
> Key: SPARK-24465
> URL: https://issues.apache.org/jira/browse/SPARK-24465
> Project: Spark
>  Issue Type: New Feature
>  Components: ML
>Affects Versions: 2.4.0
>Reporter: Joseph K. Bradley
>Priority: Major
>
> Locality Sensitive Hashing (LSH) Models (BucketedRandomProjectionLSHModel, 
> MinHashLSHModel) are not compatible with Structured Streaming (and I believe 
> are the final Transformers which are not compatible).  These do not work 
> because Spark SQL does not support nested types containing UDTs; see 
> [SPARK-12878].
> This task is to add unit tests for streaming (as in [SPARK-22644]) for 
> LSHModels after [SPARK-12878] has been fixed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12878) Dataframe fails with nested User Defined Types

2018-06-15 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-12878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16513647#comment-16513647
 ] 

Liang-Chi Hsieh commented on SPARK-12878:
-


Is this a real issue? Seems to me that you can't write nested UDT like the 
example code in the description.

The nested UDT example should be look like the following, you need to serialize 
nested UDT objects when you serialize the wrapper object:

{code:scala}
@SQLUserDefinedType(udt = classOf[WrapperUDT])
case class Wrapper(list: Seq[Element])

class WrapperUDT extends UserDefinedType[Wrapper] {
  override def sqlType: DataType = StructType(Seq(StructField("list",
ArrayType(new ElementUDT(), containsNull = false), nullable = true)))
  override def userClass: Class[Wrapper] = classOf[Wrapper]
  override def serialize(obj: Wrapper): Any = obj match {
case Wrapper(list) =>
  val row = new GenericInternalRow(1)
  val elementUDT = new ElementUDT()
  val serializedElements = list.map((e: Element) => elementUDT.serialize(e))
  row.update(0, new GenericArrayData(serializedElements.toArray))
  row
  }

  override def deserialize(datum: Any): Wrapper = datum match {
case row: InternalRow =>
  val elementUDF = new ElementUDT()
  Wrapper(row.getArray(0).toArray(elementUDF).map((e: Any) => 
elementUDF.deserialize(e)))
  }
}

@SQLUserDefinedType(udt = classOf[ElementUDT])
case class Element(num: Int)

class ElementUDT extends UserDefinedType[Element] {
  override def sqlType: DataType =
StructType(Seq(StructField("num", IntegerType, nullable = false)))
  override def userClass: Class[Element] = classOf[Element]
  override def serialize(obj: Element): Any = obj match {
case Element(num) =>
  val row = new GenericInternalRow(1)
  row.setInt(0, num)
  row
  }

  override def deserialize(datum: Any): Element = datum match {
case row: InternalRow => Element(row.getInt(0))
  }
}

val data = Seq(Wrapper(Seq(Element(1), Element(2))), Wrapper(Seq(Element(3), 
Element(4
val df = sparkContext.parallelize((1 to 2).zip(data)).toDF("id", "b")
df.collect().map(println(_))
{code}

{code}
[1,Wrapper(ArraySeq(Element(1), Element(2)))]
[2,Wrapper(ArraySeq(Element(3), Element(4)))]
{code}

> Dataframe fails with nested User Defined Types
> --
>
> Key: SPARK-12878
> URL: https://issues.apache.org/jira/browse/SPARK-12878
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0
>Reporter: Joao Duarte
>Priority: Major
>
> Spark 1.6.0 crashes when using nested User Defined Types in a Dataframe. 
> In version 1.5.2 the code below worked just fine:
> import org.apache.spark.{SparkConf, SparkContext}
> import org.apache.spark.sql.catalyst.InternalRow
> import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
> import org.apache.spark.sql.types._
> @SQLUserDefinedType(udt = classOf[AUDT])
> case class A(list:Seq[B])
> class AUDT extends UserDefinedType[A] {
>   override def sqlType: DataType = StructType(Seq(StructField("list", 
> ArrayType(BUDT, containsNull = false), nullable = true)))
>   override def userClass: Class[A] = classOf[A]
>   override def serialize(obj: Any): Any = obj match {
> case A(list) =>
>   val row = new GenericMutableRow(1)
>   row.update(0, new 
> GenericArrayData(list.map(_.asInstanceOf[Any]).toArray))
>   row
>   }
>   override def deserialize(datum: Any): A = {
> datum match {
>   case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq)
> }
>   }
> }
> object AUDT extends AUDT
> @SQLUserDefinedType(udt = classOf[BUDT])
> case class B(text:Int)
> class BUDT extends UserDefinedType[B] {
>   override def sqlType: DataType = StructType(Seq(StructField("num", 
> IntegerType, nullable = false)))
>   override def userClass: Class[B] = classOf[B]
>   override def serialize(obj: Any): Any = obj match {
> case B(text) =>
>   val row = new GenericMutableRow(1)
>   row.setInt(0, text)
>   row
>   }
>   override def deserialize(datum: Any): B = {
> datum match {  case row: InternalRow => new B(row.getInt(0))  }
>   }
> }
> object BUDT extends BUDT
> object Test {
>   def main(args:Array[String]) = {
> val col = Seq(new A(Seq(new B(1), new B(2))),
>   new A(Seq(new B(3), new B(4
> val sc = new SparkContext(new 
> SparkConf().setMaster("local[1]").setAppName("TestSpark"))
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> import sqlContext.implicits._
> val df = sc.parallelize(1 to 2 zip col).toDF("id","b")
> df.select("b").show()
> df.collect().foreach(println)
>   }
> }
> In the new version (1.6.0) I needed to include the following import:
> import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
> However, Spark crashes in runtim

[jira] [Updated] (SPARK-24548) JavaPairRDD to Dataset in SPARK generates ambiguous results

2018-06-15 Thread Liang-Chi Hsieh (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh updated SPARK-24548:

Component/s: (was: Spark Core)

> JavaPairRDD to Dataset in SPARK generates ambiguous results
> 
>
> Key: SPARK-24548
> URL: https://issues.apache.org/jira/browse/SPARK-24548
> Project: Spark
>  Issue Type: Bug
>  Components: Java API, SQL
>Affects Versions: 2.3.0
> Environment: Using Windows 10, on 64bit machine with 16G of ram.
>Reporter: Jackson
>Priority: Major
>
> I have data in below JavaPairRDD :
> {quote}JavaPairRDD> MY_RDD;
> {quote}
> I tried using below code:
> {quote}Encoder>> encoder2 =
> Encoders.tuple(Encoders.STRING(), 
> Encoders.tuple(Encoders.STRING(),Encoders.STRING()));
> Dataset newDataSet = 
> spark.createDataset(JavaPairRDD.toRDD(MY_RDD),encoder2).toDF("value1","value2");
> newDataSet.printSchema();
> {quote}
> {{root}}
> {{ |-- value1: string (nullable = true)}}
> {{ |-- value2: struct (nullable = true)}}
> {{ | |-- value: string (nullable = true)}}
> {{ | |-- value: string (nullable = true)}}
> But after creating a StackOverflow question 
> ("https://stackoverflow.com/questions/50834145/javapairrdd-to-datasetrow-in-spark";),
>  i got to know that values in tuple should have distinguish field names, 
> where in this case its generating same name. Cause of this I cannot select 
> specific column under value2.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24548) JavaPairRDD to Dataset in SPARK generates ambiguous results

2018-06-15 Thread Liang-Chi Hsieh (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh updated SPARK-24548:

Component/s: SQL

> JavaPairRDD to Dataset in SPARK generates ambiguous results
> 
>
> Key: SPARK-24548
> URL: https://issues.apache.org/jira/browse/SPARK-24548
> Project: Spark
>  Issue Type: Bug
>  Components: Java API, SQL
>Affects Versions: 2.3.0
> Environment: Using Windows 10, on 64bit machine with 16G of ram.
>Reporter: Jackson
>Priority: Major
>
> I have data in below JavaPairRDD :
> {quote}JavaPairRDD> MY_RDD;
> {quote}
> I tried using below code:
> {quote}Encoder>> encoder2 =
> Encoders.tuple(Encoders.STRING(), 
> Encoders.tuple(Encoders.STRING(),Encoders.STRING()));
> Dataset newDataSet = 
> spark.createDataset(JavaPairRDD.toRDD(MY_RDD),encoder2).toDF("value1","value2");
> newDataSet.printSchema();
> {quote}
> {{root}}
> {{ |-- value1: string (nullable = true)}}
> {{ |-- value2: struct (nullable = true)}}
> {{ | |-- value: string (nullable = true)}}
> {{ | |-- value: string (nullable = true)}}
> But after creating a StackOverflow question 
> ("https://stackoverflow.com/questions/50834145/javapairrdd-to-datasetrow-in-spark";),
>  i got to know that values in tuple should have distinguish field names, 
> where in this case its generating same name. Cause of this I cannot select 
> specific column under value2.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24528) Missing optimization for Aggregations/Windowing on a bucketed table

2018-06-13 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511924#comment-16511924
 ] 

Liang-Chi Hsieh commented on SPARK-24528:
-

Btw, I think the complete and reproducible examples should be: 

{code}
spark.sql("set spark.sql.shuffle.partitions=3")
val N = 1000
spark.range(N).selectExpr("id as key", "id % 2 as t1", "id % 3 as 
t2").repartition(col("key")).write.mode("overwrite").bucketBy(3, 
"key").sortBy("key", "t1").saveAsTable("a1")
spark.sql("select max(struct(t1, *)) from a1 group by key").explain

== Physical Plan ==
SortAggregate(key=[key#156L], functions=[max(named_struct(t1, t1#157L, key, 
key#156L, t1, t1#157L, t2, t2#158L))])
+- SortAggregate(key=[key#156L], functions=[partial_max(named_struct(t1, 
t1#157L, key, key#156L, t1, t1#157L, t2, t2#158L))])
   +- *(1) FileScan parquet default.a1[key#156L,t1#157L,t2#158L] Batched: true, 
Format: Parquet, Location: 
InMemoryFileIndex[file:/root/repos/spark-1/spark-warehouse/a1], 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct, SelectedBucketsCount: 3 out of 3

{code}

{code}
spark.sql("set spark.sql.shuffle.partitions=2")
val N = 1000
spark.range(N).selectExpr("id as key", "id % 2 as t1", "id % 3 as 
t2").repartition(col("key")).write.mode("overwrite").bucketBy(3, 
"key").sortBy("key", "t1").saveAsTable("a1")
spark.sql("select max(struct(t1, *)) from a1 group by key").explain

== Physical Plan ==
SortAggregate(key=[key#126L], functions=[max(named_struct(t1, t1#127L, key, 
key#126L, t1, t1#127L, t2, t2#128L))])
+- SortAggregate(key=[key#126L], functions=[partial_max(named_struct(t1, 
t1#127L, key, key#126L, t1, t1#127L, t2, t2#128L))])
   +- *(1) Sort [key#126L ASC NULLS FIRST], false, 0
  +- *(1) FileScan parquet default.a1[key#126L,t1#127L,t2#128L] Batched: 
true, Format: Parquet, Location: 
InMemoryFileIndex[file:/root/repos/spark-1/spark-warehouse/a1], 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct, SelectedBucketsCount: 3 out of 3
{code}

> Missing optimization for Aggregations/Windowing on a bucketed table
> ---
>
> Key: SPARK-24528
> URL: https://issues.apache.org/jira/browse/SPARK-24528
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0, 2.4.0
>Reporter: Ohad Raviv
>Priority: Major
>
> Closely related to  SPARK-24410, we're trying to optimize a very common use 
> case we have of getting the most updated row by id from a fact table.
> We're saving the table bucketed to skip the shuffle stage, but we're still 
> "waste" time on the Sort operator evethough the data is already sorted.
> here's a good example:
> {code:java}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("key", "t1")
> .saveAsTable("a1"){code}
> {code:java}
> sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
> == Physical Plan ==
> SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, 
> key#24L, t1, t1#25L, t2, t2#26L))])
> +- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, 
> t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))])
> +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, 
> Format: Parquet, Location: ...{code}
>  
> and here's a bad example, but more realistic:
> {code:java}
> sparkSession.sql("set spark.sql.shuffle.partitions=2")
> sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
> == Physical Plan ==
> SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, 
> key#32L, t1, t1#33L, t2, t2#34L))])
> +- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, 
> t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))])
> +- *(1) Sort [key#32L ASC NULLS FIRST], false, 0
> +- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, 
> Format: Parquet, Location: ...
> {code}
>  
> I've traced the problem to DataSourceScanExec#235:
> {code:java}
> val sortOrder = if (sortColumns.nonEmpty) {
>   // In case of bucketing, its possible to have multiple files belonging to 
> the
>   // same bucket in a given relation. Each of these files are locally sorted
>   // but those files combined together are not globally sorted. Given that,
>   // the RDD partition will not be sorted even if the relation has sort 
> columns set
>   // Current solution is to check if all the buckets have a single file in it
>   val files = selectedPartitions.flatMap(partition => partition.files)
>   val bucketToFilesGrouping =
> files.map(_.getPath.getName).groupBy(file => 
> BucketingUtils.getBucketId(file))
>   val singleFilePartitions = bucketToFilesGr

[jira] [Updated] (SPARK-24505) Convert strings in codegen to blocks: Cast and BoundAttribute

2018-06-12 Thread Liang-Chi Hsieh (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh updated SPARK-24505:

Description: The CodeBlock interpolator now accepts strings. Based on 
previous discussion, we should forbid string interpolation. We will 
incrementally convert strings in codegen methods to blocks. This is for Cast 
and BoundAttribute.  (was: The CodeBlock interpolator now accepts strings. 
Based on previous discussion, we should forbid string interpolation.)

> Convert strings in codegen to blocks: Cast and BoundAttribute
> -
>
> Key: SPARK-24505
> URL: https://issues.apache.org/jira/browse/SPARK-24505
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Liang-Chi Hsieh
>Priority: Major
>
> The CodeBlock interpolator now accepts strings. Based on previous discussion, 
> we should forbid string interpolation. We will incrementally convert strings 
> in codegen methods to blocks. This is for Cast and BoundAttribute.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24505) Convert strings in codegen to blocks: Cast and BoundAttribute

2018-06-12 Thread Liang-Chi Hsieh (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh updated SPARK-24505:

Summary: Convert strings in codegen to blocks: Cast and BoundAttribute  
(was: Forbidding string interpolation in CodeBlock)

> Convert strings in codegen to blocks: Cast and BoundAttribute
> -
>
> Key: SPARK-24505
> URL: https://issues.apache.org/jira/browse/SPARK-24505
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Liang-Chi Hsieh
>Priority: Major
>
> The CodeBlock interpolator now accepts strings. Based on previous discussion, 
> we should forbid string interpolation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23596) Modify Dataset test harness to include interpreted execution

2018-06-11 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16509137#comment-16509137
 ] 

Liang-Chi Hsieh edited comment on SPARK-23596 at 6/12/18 3:55 AM:
--

One concern I have right now is that by testing the interpreted code paths too, 
we will double current test time or more. Otherwise, we can only choose to test 
the interpreted code paths for just few test suites such as DatasetSuite, 
DataFrameSuite...

[~hvanhovell] What do you think?


was (Author: viirya):
One concern I have right now is that by testing the interpreted code paths too, 
we will double current test time or more. Otherwise, we can only choose to test 
the interpreted code paths for just few test suites such as DatasetSuite, 
DataFrameSuite...

 

[~hvanhovell] What do you think?

> Modify Dataset test harness to include interpreted execution
> 
>
> Key: SPARK-23596
> URL: https://issues.apache.org/jira/browse/SPARK-23596
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Herman van Hovell
>Priority: Major
>
> We should modify the Dataset test harness to also test the interpreted code 
> paths. This task can be started as soon as a significant subset of the object 
> related Expressions provides an interpreted fallback.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23596) Modify Dataset test harness to include interpreted execution

2018-06-11 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16509137#comment-16509137
 ] 

Liang-Chi Hsieh commented on SPARK-23596:
-

One concern I have right now is that by testing the interpreted code paths too, 
we will double current test time or more. Otherwise, we can only choose to test 
the interpreted code paths for just few test suites such as DatasetSuite, 
DataFrameSuite...

 

[~hvanhovell] What do you think?

> Modify Dataset test harness to include interpreted execution
> 
>
> Key: SPARK-23596
> URL: https://issues.apache.org/jira/browse/SPARK-23596
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Herman van Hovell
>Priority: Major
>
> We should modify the Dataset test harness to also test the interpreted code 
> paths. This task can be started as soon as a significant subset of the object 
> related Expressions provides an interpreted fallback.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24517) Bug in loading unstructured data

2018-06-11 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16509117#comment-16509117
 ] 

Liang-Chi Hsieh commented on SPARK-24517:
-

Is it also happened when using built-in json datasource? Or it is just related 
to Azure CosmosDB?

> Bug in loading unstructured data
> 
>
> Key: SPARK-24517
> URL: https://issues.apache.org/jira/browse/SPARK-24517
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.3.0
>Reporter: Moshe Israel
>Priority: Major
>
> When loading data using spark.read from unstructured data sets to Spark 
> dataframes there is a bug in the value of unexisting properties. I found the 
> issue while loading data from Azure CosmosDB which is based on json files, 
> but the issue might be relevant to other providers too.
> I'll explain more through an example... Let's assume we have a dataset of 
> users with *20* json files with properties \{name, age, isMale} and *40* more 
> json files with the properties \{name, age}. Loading the data to a dataframe 
> will create a dataframe object with *60* rows and three columns of \{name, 
> age, isMale}.
> querying *df.filter(col("isMale").isNull())* returns 0 rows; Expected 20 
> rows. Looks like instead of a null there is no content in the cell when the 
> source row does not have the property.
> Querying *df.where(df.isMale == True)* returns 60 rows (let's assume all are 
> males). Meaning including the rows which don't include the property too. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24505) Forbidding string interpolation in CodeBlock

2018-06-09 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-24505:
---

 Summary: Forbidding string interpolation in CodeBlock
 Key: SPARK-24505
 URL: https://issues.apache.org/jira/browse/SPARK-24505
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Liang-Chi Hsieh


The CodeBlock interpolator now accepts strings. Based on previous discussion, 
we should forbid string interpolation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24504) Implement SparkSQL authorization plugin in Apache Ranger

2018-06-09 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16506889#comment-16506889
 ] 

Liang-Chi Hsieh commented on SPARK-24504:
-

Seems you created duplicate ticket, can you close one of them?

> Implement SparkSQL authorization plugin in Apache Ranger
> 
>
> Key: SPARK-24504
> URL: https://issues.apache.org/jira/browse/SPARK-24504
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: t oo
>Priority: Major
>
> Implement SparkSQL authorization plugin in Apache Ranger. Key element 
> preventing adoption of SparkSQL in my enterprise is the lack of a 
> masking/access control feature.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24447) Pyspark RowMatrix.columnSimilarities() loses spark context

2018-06-06 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504213#comment-16504213
 ] 

Liang-Chi Hsieh edited comment on SPARK-24447 at 6/7/18 4:09 AM:
-

I just built Spark from current 2.3 branch. The above example code also works 
well on it.


was (Author: viirya):
I just build Spark from current 2.3 branch. The above example code also works 
well.

> Pyspark RowMatrix.columnSimilarities() loses spark context
> --
>
> Key: SPARK-24447
> URL: https://issues.apache.org/jira/browse/SPARK-24447
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib, PySpark
>Affects Versions: 2.3.0
>Reporter: Perry Chu
>Priority: Major
>
> The RDD behind the CoordinateMatrix returned by 
> RowMatrix.columnSimilarities() appears to be losing track of the spark 
> context. 
> I'm pretty new to spark - not sure if the problem is on the python side or 
> the scala side - would appreciate someone more experienced taking a look.
> This snippet should reproduce the error:
> {code:java}
> from pyspark.mllib.linalg.distributed import RowMatrix
> rows = spark.sparkContext.parallelize([[0,1,2],[1,1,1]])
> matrix = RowMatrix(rows)
> sims = matrix.columnSimilarities()
> ## This works, prints "3 3" as expected (3 columns = 3x3 matrix)
> print(sims.numRows(),sims.numCols())
> ## This throws an error (stack trace below)
> print(sims.entries.first())
> ## Later I tried this
> print(rows.context) #
> print(sims.entries.context) # PySparkShell>, then throws an error{code}
> Error stack trace
> {code:java}
> ---
> AttributeError Traceback (most recent call last)
>  in ()
> > 1 sims.entries.first()
> /usr/lib/spark/python/pyspark/rdd.py in first(self)
> 1374 ValueError: RDD is empty
> 1375 """
> -> 1376 rs = self.take(1)
> 1377 if rs:
> 1378 return rs[0]
> /usr/lib/spark/python/pyspark/rdd.py in take(self, num)
> 1356
> 1357 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
> -> 1358 res = self.context.runJob(self, takeUpToNumLeft, p)
> 1359
> 1360 items += res
> /usr/lib/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, 
> partitions, allowLocal)
> 999 # SparkContext#runJob.
> 1000 mappedRDD = rdd.mapPartitions(partitionFunc)
> -> 1001 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, 
> partitions)
> 1002 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
> 1003
> AttributeError: 'NoneType' object has no attribute 'sc'
> {code}
> PySpark columnSimilarities documentation
> http://spark.apache.org/docs/latest/api/python/_modules/pyspark/mllib/linalg/distributed.html#RowMatrix.columnSimilarities



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24447) Pyspark RowMatrix.columnSimilarities() loses spark context

2018-06-06 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504213#comment-16504213
 ] 

Liang-Chi Hsieh commented on SPARK-24447:
-

I just build Spark from current 2.3 branch. The above example code also works 
well.

> Pyspark RowMatrix.columnSimilarities() loses spark context
> --
>
> Key: SPARK-24447
> URL: https://issues.apache.org/jira/browse/SPARK-24447
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib, PySpark
>Affects Versions: 2.3.0
>Reporter: Perry Chu
>Priority: Major
>
> The RDD behind the CoordinateMatrix returned by 
> RowMatrix.columnSimilarities() appears to be losing track of the spark 
> context. 
> I'm pretty new to spark - not sure if the problem is on the python side or 
> the scala side - would appreciate someone more experienced taking a look.
> This snippet should reproduce the error:
> {code:java}
> from pyspark.mllib.linalg.distributed import RowMatrix
> rows = spark.sparkContext.parallelize([[0,1,2],[1,1,1]])
> matrix = RowMatrix(rows)
> sims = matrix.columnSimilarities()
> ## This works, prints "3 3" as expected (3 columns = 3x3 matrix)
> print(sims.numRows(),sims.numCols())
> ## This throws an error (stack trace below)
> print(sims.entries.first())
> ## Later I tried this
> print(rows.context) #
> print(sims.entries.context) # PySparkShell>, then throws an error{code}
> Error stack trace
> {code:java}
> ---
> AttributeError Traceback (most recent call last)
>  in ()
> > 1 sims.entries.first()
> /usr/lib/spark/python/pyspark/rdd.py in first(self)
> 1374 ValueError: RDD is empty
> 1375 """
> -> 1376 rs = self.take(1)
> 1377 if rs:
> 1378 return rs[0]
> /usr/lib/spark/python/pyspark/rdd.py in take(self, num)
> 1356
> 1357 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
> -> 1358 res = self.context.runJob(self, takeUpToNumLeft, p)
> 1359
> 1360 items += res
> /usr/lib/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, 
> partitions, allowLocal)
> 999 # SparkContext#runJob.
> 1000 mappedRDD = rdd.mapPartitions(partitionFunc)
> -> 1001 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, 
> partitions)
> 1002 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
> 1003
> AttributeError: 'NoneType' object has no attribute 'sc'
> {code}
> PySpark columnSimilarities documentation
> http://spark.apache.org/docs/latest/api/python/_modules/pyspark/mllib/linalg/distributed.html#RowMatrix.columnSimilarities



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24447) Pyspark RowMatrix.columnSimilarities() loses spark context

2018-06-06 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504152#comment-16504152
 ] 

Liang-Chi Hsieh commented on SPARK-24447:
-

Yes, I can run the example code on a build from latest source code.

Because I can't reproduce it with your provided example on latest codebase, I'd 
like to know if it is a problem in Spark codebase or not. Since you said the 
affect version is 2.3.0, I will try it on 2.3.0 codebase to see if there is a 
bug.

> Pyspark RowMatrix.columnSimilarities() loses spark context
> --
>
> Key: SPARK-24447
> URL: https://issues.apache.org/jira/browse/SPARK-24447
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib, PySpark
>Affects Versions: 2.3.0
>Reporter: Perry Chu
>Priority: Major
>
> The RDD behind the CoordinateMatrix returned by 
> RowMatrix.columnSimilarities() appears to be losing track of the spark 
> context. 
> I'm pretty new to spark - not sure if the problem is on the python side or 
> the scala side - would appreciate someone more experienced taking a look.
> This snippet should reproduce the error:
> {code:java}
> from pyspark.mllib.linalg.distributed import RowMatrix
> rows = spark.sparkContext.parallelize([[0,1,2],[1,1,1]])
> matrix = RowMatrix(rows)
> sims = matrix.columnSimilarities()
> ## This works, prints "3 3" as expected (3 columns = 3x3 matrix)
> print(sims.numRows(),sims.numCols())
> ## This throws an error (stack trace below)
> print(sims.entries.first())
> ## Later I tried this
> print(rows.context) #
> print(sims.entries.context) # PySparkShell>, then throws an error{code}
> Error stack trace
> {code:java}
> ---
> AttributeError Traceback (most recent call last)
>  in ()
> > 1 sims.entries.first()
> /usr/lib/spark/python/pyspark/rdd.py in first(self)
> 1374 ValueError: RDD is empty
> 1375 """
> -> 1376 rs = self.take(1)
> 1377 if rs:
> 1378 return rs[0]
> /usr/lib/spark/python/pyspark/rdd.py in take(self, num)
> 1356
> 1357 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
> -> 1358 res = self.context.runJob(self, takeUpToNumLeft, p)
> 1359
> 1360 items += res
> /usr/lib/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, 
> partitions, allowLocal)
> 999 # SparkContext#runJob.
> 1000 mappedRDD = rdd.mapPartitions(partitionFunc)
> -> 1001 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, 
> partitions)
> 1002 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
> 1003
> AttributeError: 'NoneType' object has no attribute 'sc'
> {code}
> PySpark columnSimilarities documentation
> http://spark.apache.org/docs/latest/api/python/_modules/pyspark/mllib/linalg/distributed.html#RowMatrix.columnSimilarities



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24357) createDataFrame in Python infers large integers as long type and then fails silently when converting them

2018-06-06 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503065#comment-16503065
 ] 

Liang-Chi Hsieh edited comment on SPARK-24357 at 6/6/18 9:50 AM:
-

I think this is because this number {{1 << 65}} (36893488147419103232L) is more 
than Scala's long range.

Scala:
scala> Long.MaxValue
res3: Long = 9223372036854775807

Python:
>>> TEST_DATA = [Row(data=9223372036854775807L)]
>>> frame = spark.createDataFrame(TEST_DATA)
>>> frame.collect()
[Row(data=9223372036854775807)] 
>>> TEST_DATA = [Row(data=9223372036854775808L)]
>>> frame = spark.createDataFrame(TEST_DATA)
>>> frame.collect()
[Row(data=None)]



was (Author: viirya):
I think this is because this number {{1 << 65}} (36893488147419103232L) is more 
than Scala's long range.

>>> TEST_DATA = [Row(data=9223372036854775807L)]
>>> frame = spark.createDataFrame(TEST_DATA)
>>> frame.collect()
[Row(data=9223372036854775807)] 
>>> TEST_DATA = [Row(data=9223372036854775808L)]
>>> frame = spark.createDataFrame(TEST_DATA)
>>> frame.collect()
[Row(data=None)]


> createDataFrame in Python infers large integers as long type and then fails 
> silently when converting them
> -
>
> Key: SPARK-24357
> URL: https://issues.apache.org/jira/browse/SPARK-24357
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Joel Croteau
>Priority: Major
>
> When inferring the schema type of an RDD passed to createDataFrame, PySpark 
> SQL will infer any integral type as a LongType, which is a 64-bit integer, 
> without actually checking whether the values will fit into a 64-bit slot. If 
> the values are larger than 64 bits, then when pickled and unpickled in Java, 
> Unpickler will convert them to BigIntegers. When applySchemaToPythonRDD is 
> called, it will ignore the BigInteger type and return Null. This results in 
> any large integers in the resulting DataFrame being silently converted to 
> None. This can create some very surprising and difficult to debug behavior, 
> in particular if you are not aware of this limitation. There should either be 
> a runtime error at some point in this conversion chain, or else _infer_type 
> should infer larger integers as DecimalType with appropriate precision, or as 
> BinaryType. The former would be less convenient, but the latter may be 
> problematic to implement in practice. In any case, we should stop silently 
> converting large integers to None.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24357) createDataFrame in Python infers large integers as long type and then fails silently when converting them

2018-06-06 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503065#comment-16503065
 ] 

Liang-Chi Hsieh commented on SPARK-24357:
-

I think this is because this number {{1 << 65}} (36893488147419103232L) is more 
than Scala's long range.

>>> TEST_DATA = [Row(data=9223372036854775807L)]
>>> frame = spark.createDataFrame(TEST_DATA)
>>> frame.collect()
[Row(data=9223372036854775807)] 
>>> TEST_DATA = [Row(data=9223372036854775808L)]
>>> frame = spark.createDataFrame(TEST_DATA)
>>> frame.collect()
[Row(data=None)]


> createDataFrame in Python infers large integers as long type and then fails 
> silently when converting them
> -
>
> Key: SPARK-24357
> URL: https://issues.apache.org/jira/browse/SPARK-24357
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Joel Croteau
>Priority: Major
>
> When inferring the schema type of an RDD passed to createDataFrame, PySpark 
> SQL will infer any integral type as a LongType, which is a 64-bit integer, 
> without actually checking whether the values will fit into a 64-bit slot. If 
> the values are larger than 64 bits, then when pickled and unpickled in Java, 
> Unpickler will convert them to BigIntegers. When applySchemaToPythonRDD is 
> called, it will ignore the BigInteger type and return Null. This results in 
> any large integers in the resulting DataFrame being silently converted to 
> None. This can create some very surprising and difficult to debug behavior, 
> in particular if you are not aware of this limitation. There should either be 
> a runtime error at some point in this conversion chain, or else _infer_type 
> should infer larger integers as DecimalType with appropriate precision, or as 
> BinaryType. The former would be less convenient, but the latter may be 
> problematic to implement in practice. In any case, we should stop silently 
> converting large integers to None.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24467) VectorAssemblerEstimator

2018-06-06 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503002#comment-16503002
 ] 

Liang-Chi Hsieh commented on SPARK-24467:
-

[~josephkb] Does that mean {{VectorAssembler}} will change from a 
{{Transformer}} to a {{Model}}?

> VectorAssemblerEstimator
> 
>
> Key: SPARK-24467
> URL: https://issues.apache.org/jira/browse/SPARK-24467
> Project: Spark
>  Issue Type: New Feature
>  Components: ML
>Affects Versions: 2.4.0
>Reporter: Joseph K. Bradley
>Priority: Major
>
> In [SPARK-22346], I believe I made a wrong API decision: I recommended added 
> `VectorSizeHint` instead of making `VectorAssembler` into an Estimator since 
> I thought the latter option would break most workflows.  However, I should 
> have proposed:
> * Add a Param to VectorAssembler for specifying the sizes of Vectors in the 
> inputCols.  This Param can be optional.  If not given, then VectorAssembler 
> will behave as it does now.  If given, then VectorAssembler can use that info 
> instead of figuring out the Vector sizes via metadata or examining Rows in 
> the data (though it could do consistency checks).
> * Add a VectorAssemblerEstimator which gets the Vector lengths from data and 
> produces a VectorAssembler with the vector lengths Param specified.
> This will not break existing workflows.  Migrating to 
> VectorAssemblerEstimator will be easier than adding VectorSizeHint since it 
> will not require users to manually input Vector lengths.
> Note: Even with this Estimator, VectorSizeHint might prove useful for other 
> things in the future which require vector length metadata, so we could 
> consider keeping it rather than deprecating it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24447) Pyspark RowMatrix.columnSimilarities() loses spark context

2018-06-05 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16502876#comment-16502876
 ] 

Liang-Chi Hsieh commented on SPARK-24447:
-

I can't reproduce this in current master branch. Can you try on it too?

> Pyspark RowMatrix.columnSimilarities() loses spark context
> --
>
> Key: SPARK-24447
> URL: https://issues.apache.org/jira/browse/SPARK-24447
> Project: Spark
>  Issue Type: Bug
>  Components: MLlib, PySpark
>Affects Versions: 2.3.0
>Reporter: Perry Chu
>Priority: Major
>
> The RDD behind the CoordinateMatrix returned by 
> RowMatrix.columnSimilarities() appears to be losing track of the spark 
> context. 
> I'm pretty new to spark - not sure if the problem is on the python side or 
> the scala side - would appreciate someone more experienced taking a look.
> This snippet should reproduce the error:
> {code:java}
> from pyspark.mllib.linalg.distributed import RowMatrix
> rows = spark.sparkContext.parallelize([[0,1,2],[1,1,1]])
> matrix = RowMatrix(rows)
> sims = matrix.columnSimilarities()
> ## This works, prints "3 3" as expected (3 columns = 3x3 matrix)
> print(sims.numRows(),sims.numCols())
> ## This throws an error (stack trace below)
> print(sims.entries.first())
> ## Later I tried this
> print(rows.context) #
> print(sims.entries.context) # PySparkShell>, then throws an error{code}
> Error stack trace
> {code:java}
> ---
> AttributeError Traceback (most recent call last)
>  in ()
> > 1 sims.entries.first()
> /usr/lib/spark/python/pyspark/rdd.py in first(self)
> 1374 ValueError: RDD is empty
> 1375 """
> -> 1376 rs = self.take(1)
> 1377 if rs:
> 1378 return rs[0]
> /usr/lib/spark/python/pyspark/rdd.py in take(self, num)
> 1356
> 1357 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
> -> 1358 res = self.context.runJob(self, takeUpToNumLeft, p)
> 1359
> 1360 items += res
> /usr/lib/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, 
> partitions, allowLocal)
> 999 # SparkContext#runJob.
> 1000 mappedRDD = rdd.mapPartitions(partitionFunc)
> -> 1001 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, 
> partitions)
> 1002 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
> 1003
> AttributeError: 'NoneType' object has no attribute 'sc'
> {code}
> PySpark columnSimilarities documentation
> http://spark.apache.org/docs/latest/api/python/_modules/pyspark/mllib/linalg/distributed.html#RowMatrix.columnSimilarities



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-05-30 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496178#comment-16496178
 ] 

Liang-Chi Hsieh commented on SPARK-24410:
-

Yeah, it depends on how we combine the RDDs from Union's children. Currently 
{{SparkContext.union}} doesn't produce the results that this issue wants to 
have. So I will leave {{UnionExec#outputPartitioning}} untouched for now.

> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  :- *(1) Project [key#25L]
>  :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
>  +- *(2) Project [key#28L]
> +- *(2) FileScan parquet default.a2[key#28L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-05-30 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496132#comment-16496132
 ] 

Liang-Chi Hsieh edited comment on SPARK-24410 at 5/31/18 5:39 AM:
--

We can verify the partition of union dataframe:
{code}
val df1 = spark.table("a1").select(spark_partition_id(), $"key")
val df2 = spark.table("a2").select(spark_partition_id(), $"key")
df1.union(df2).select(spark_partition_id(), $"key").collect
{code}
{code}
res8: Array[org.apache.spark.sql.Row] = Array([0,6], [0,7], [0,3], [1,0], 
[2,4], [2,8], [2,9], [2,2], [2,1], [2,5], [3,3], [3,7], [3,6], [4,0], [5,2], 
[5,9], [5,5], [5,8], [5,1], [5,4])
{code}

>From above result, we can find that for the same {{key}} from {{df1}} and 
>{{df2}} are at different partitions. E.g., key {{6}} are at partition {{0}} 
>and partition {{3}}. So we still need a shuffle to get the correct results.





was (Author: viirya):
We can verify the partition of union dataframe:
{code}
val df1 = spark.table("a1").select(spark_partition_id(), $"key")
val df2 = spark.table("a2").select(spark_partition_id(), $"key")
df1.union(df2).select(spark_partition_id(), $"key").collect
{code}
{code}
res8: Array[org.apache.spark.sql.Row] = Array([0,6], [0,7], [0,3], [1,0], 
[2,4], [2,8], [2,9], [2,2], [2,1], [2,5], [3,3], [3,7], [3,6], [4,0], [5,2], 
[5,9], [5,5], [5,8], [5,1], [5,4])
{code}

>From above result, we can find that for the same {{key}} from {{df1 and 
>{{df2}} are at different partitions. E.g., key {{6}} are at partition {{0}} 
>and partition {{3}}.




> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  :- *(1) Project [key#25L]
>  :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
>  +- *(2) Project [key#28L]
> +- *(2) FileScan parquet default.a2[key#28L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}



--
This message was sent by Atla

[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-05-30 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496132#comment-16496132
 ] 

Liang-Chi Hsieh commented on SPARK-24410:
-

We can verify the partition of union dataframe:
{code}
val df1 = spark.table("a1").select(spark_partition_id(), $"key")
val df2 = spark.table("a2").select(spark_partition_id(), $"key")
df1.union(df2).select(spark_partition_id(), $"key").collect
{code}
{code}
res8: Array[org.apache.spark.sql.Row] = Array([0,6], [0,7], [0,3], [1,0], 
[2,4], [2,8], [2,9], [2,2], [2,1], [2,5], [3,3], [3,7], [3,6], [4,0], [5,2], 
[5,9], [5,5], [5,8], [5,1], [5,4])
{code}

>From above result, we can find that for the same {{key}} from {{df1 and 
>{{df2}} are at different partitions. E.g., key {{6}} are at partition {{0}} 
>and partition {{3}}.




> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  :- *(1) Project [key#25L]
>  :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
>  +- *(2) Project [key#28L]
> +- *(2) FileScan parquet default.a2[key#28L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-05-30 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494885#comment-16494885
 ] 

Liang-Chi Hsieh edited comment on SPARK-24410 at 5/30/18 2:20 PM:
--

I've done some experiments locally. But the results show that in above case 
seems we don't guarantee that the data distribution is the same when reading 
two tables {{a1}} and {{a2}}. By removing the shuffling, you actually don't get 
the correct results.

That said, for FileScan nodes at (1) and (2):
{code:java}
*(1) FileScan parquet default.a1[key#25L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a1], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct

*(2) FileScan parquet default.a2[key#28L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a2], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct{code}

FileScan (1)'s rows are partitioned by {{key#25}} and FileScan (2)'s rows are 
partitioned by {{key#28}}. But it doesn't guarantee that the same values of 
{{key#25}} and {{key#28}} are located in the same partition.


was (Author: viirya):
I've done some experiments locally. But the results show that in above case 
seems we don't guarantee that the data distribution is the same when reading 
two tables {{a1}} and {{a2}}. By removing the shuffling, you actually don't get 
the correct results.

That said, for FileScan nodes at (1) and (2):
{code:java}
*(1) FileScan parquet default.a1[key#25L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a1], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct

*(2) FileScan parquet default.a2[key#28L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a2], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct{code}

FileScan (1)'s rows are partitioned by {{key#25}} and FileScan (2)'s rows are 
partitioned by {{key#28}}. But it doesn't guarantee that the values of 
{{key#25}} and {{key#28}} are located in the same partition.

> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
> 

[jira] [Comment Edited] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-05-30 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494885#comment-16494885
 ] 

Liang-Chi Hsieh edited comment on SPARK-24410 at 5/30/18 8:41 AM:
--

I've done some experiments locally. But the results show that in above case 
seems we don't guarantee that the data distribution is the same when reading 
two tables {{a1}} and {{a2}}. By removing the shuffling, you actually don't get 
the correct results.

That said, for FileScan nodes at (1) and (2):
{code:java}
*(1) FileScan parquet default.a1[key#25L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a1], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct

*(2) FileScan parquet default.a2[key#28L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a2], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct{code}

FileScan (1)'s rows are partitioned by {{key#25}} and FileScan (2)'s rows are 
partitioned by {{key#28}}. But it doesn't guarantee that the values of 
{{key#25}} and {{key#28}} are located in the same partition.


was (Author: viirya):
I've done some experiments locally. But the results show that in above case 
seems we don't guarantee that the data distribution is the same when reading 
two tables {{a1}} and {{a2}}. By removing the shuffling, you actually don't get 
the correct results.

That said, for FileScan nodes at (1) and (2):
{code:java}
*(1) FileScan parquet default.a1[key#25L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a1], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct

*(2) FileScan parquet default.a2[key#28L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a2], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct{code}

FileScan (1)'s rows are partitioned by {{key#25}} and FileScan (2)'s rows are 
partitioned by {{key#28}}. But it doesn't guarantee that the values of 
{{key#25}} and {{key#28}} are located in the same partition.

> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  

[jira] [Comment Edited] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-05-30 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494885#comment-16494885
 ] 

Liang-Chi Hsieh edited comment on SPARK-24410 at 5/30/18 8:41 AM:
--

I've done some experiments locally. But the results show that in above case 
seems we don't guarantee that the data distribution is the same when reading 
two tables {{a1}} and {{a2}}. By removing the shuffling, you actually don't get 
the correct results.

That said, for FileScan nodes at (1) and (2):
{code:java}
*(1) FileScan parquet default.a1[key#25L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a1], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct

*(2) FileScan parquet default.a2[key#28L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a2], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct{code}

FileScan (1)'s rows are partitioned by {{key#25}} and FileScan (2)'s rows are 
partitioned by {{key#28}}. But it doesn't guarantee that the values of 
{{key#25}} and {{key#28}} are located in the same partition.


was (Author: viirya):
I've done some experiments locally. But the results show that in above case 
seems we don't guarantee that the data distribution is the same when reading 
two tables {{a1}} and {{a2}}. By removing the shuffling, you actually don't get 
the correct results.

> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  :- *(1) Project [key#25L]
>  :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
>  +- *(2) Project [key#28L]
> +- *(2) FileScan parquet default.a2[key#28L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: iss

[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-05-30 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494885#comment-16494885
 ] 

Liang-Chi Hsieh commented on SPARK-24410:
-

I've done some experiments locally. But the results show that in above case 
seems we don't guarantee that the data distribution is the same when reading 
two tables {{a1}} and {{a2}}. By removing the shuffling, you actually don't get 
the correct results.

> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  :- *(1) Project [key#25L]
>  :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
>  +- *(2) Project [key#28L]
> +- *(2) FileScan parquet default.a2[key#28L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24409) exception when sending large list in filter(col(x).isin(list))

2018-05-29 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494647#comment-16494647
 ] 

Liang-Chi Hsieh commented on SPARK-24409:
-

Seems you use AWS Glue Data Catalog as the Metastore for Hive. And the too long 
partition filtering expressions cause this exception.

> exception when sending large list in filter(col(x).isin(list))
> --
>
> Key: SPARK-24409
> URL: https://issues.apache.org/jira/browse/SPARK-24409
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.1
>Reporter: Janet Levin
>Priority: Major
>
> This is the error we get:
>  
> /mnt/yarn/usercache/hadoop/appcache/application_1526466002571_8701/container_1526466002571_8701_01_01/pyspark.zip/pyspark/sql/dataframe.py",
>  line 88, in rdd
>  File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1526466002571_8701/container_1526466002571_8701_01_01/py4j-0.10.6-src.zip/py4j/java_gateway.py",
>  line 1160, in __call__
>  File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1526466002571_8701/container_1526466002571_8701_01_01/pyspark.zip/pyspark/sql/utils.py",
>  line 63, in deco
>  File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1526466002571_8701/container_1526466002571_8701_01_01/py4j-0.10.6-src.zip/py4j/protocol.py",
>  line 320, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> o605.javaToPython.
> : java.lang.RuntimeException: Caught Hive MetaException attempting to get 
> partition metadata by filter from Hive. You can set the Spark configuration 
> setting spark.sql.hive.manageFilesourcePartitions to false to work around 
> this problem, however this will result in degraded performance. Please report 
> a bug: https://issues.apache.org/jira/browse/SPARK
>  at 
> org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(HiveShim.scala:741)
>  at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitionsByFilter$1.apply(HiveClientImpl.scala:655)
>  at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitionsByFilter$1.apply(HiveClientImpl.scala:653)
>  at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:272)
>  at 
> org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:210)
>  at 
> org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:209)
>  at 
> org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:255)
>  at 
> org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionsByFilter(HiveClientImpl.scala:653)
>  at 
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$listPartitionsByFilter$1.apply(HiveExternalCatalog.scala:1218)
>  at 
> org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$listPartitionsByFilter$1.apply(HiveExternalCatalog.scala:1211)
>  at 
> org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
>  at 
> org.apache.spark.sql.hive.HiveExternalCatalog.listPartitionsByFilter(HiveExternalCatalog.scala:1211)
>  at 
> org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionsByFilter(SessionCatalog.scala:925)
>  at 
> org.apache.spark.sql.execution.datasources.CatalogFileIndex.filterPartitions(CatalogFileIndex.scala:73)
>  at 
> org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$$anonfun$apply$1.applyOrElse(PruneFileSourcePartitions.scala:61)
>  at 
> org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$$anonfun$apply$1.applyOrElse(PruneFileSourcePartitions.scala:27)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>  at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>  at 
> org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$.apply(PruneFileSourcePartitions.scala:27)
>  at 
> org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$.apply(PruneFileSourcePartitions.sc

[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-05-29 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494391#comment-16494391
 ] 

Liang-Chi Hsieh commented on SPARK-24410:
-

[~cloud_fan] Thanks for pinging me. I'll look into this.

> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  :- *(1) Project [key#25L]
>  :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
>  +- *(2) Project [key#28L]
> +- *(2) FileScan parquet default.a2[key#28L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24361) Polish code block manipulation API

2018-05-22 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-24361:
---

 Summary: Polish code block manipulation API
 Key: SPARK-24361
 URL: https://issues.apache.org/jira/browse/SPARK-24361
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Liang-Chi Hsieh


Current code block manipulation API is immature and hacky. We should have a 
formal API to manipulate code blocks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23455) Default Params in ML should be saved separately

2018-05-15 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476753#comment-16476753
 ] 

Liang-Chi Hsieh commented on SPARK-23455:
-

According to [~josephkb]'s comment at 
[https://github.com/apache/spark/pull/20633#issuecomment-383747170,] R should 
be ok.

> Default Params in ML should be saved separately
> ---
>
> Key: SPARK-23455
> URL: https://issues.apache.org/jira/browse/SPARK-23455
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.4.0
>Reporter: Liang-Chi Hsieh
>Assignee: Liang-Chi Hsieh
>Priority: Major
> Fix For: 2.4.0
>
>
> We save ML's user-supplied params and default params as one entity in JSON. 
> During loading the saved models, we set all the loaded params into created ML 
> model instances as user-supplied params.
> It causes some problems, e.g., if we strictly disallow some params to be set 
> at the same time, a default param can fail the param check because it is 
> treated as user-supplied param after loading.
> The loaded default params should not be set as user-supplied params. We 
> should save ML default params separately in JSON.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24259) ArrayWriter for Arrow produces wrong output

2018-05-12 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-24259:
---

 Summary: ArrayWriter for Arrow produces wrong output
 Key: SPARK-24259
 URL: https://issues.apache.org/jira/browse/SPARK-24259
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.0
Reporter: Liang-Chi Hsieh


Right now {{ArrayWriter}} used to output Arrow data for array type, doesn't do 
clear or reset after each batch. It produces wrong output.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24242) RangeExec should have correct outputOrdering

2018-05-10 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-24242:
---

 Summary: RangeExec should have correct outputOrdering
 Key: SPARK-24242
 URL: https://issues.apache.org/jira/browse/SPARK-24242
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Liang-Chi Hsieh


Logical Range node has been added with outputOrdering recently. It's used to 
eliminate redundant Sort during optimization. However, this outputOrdering info 
doesn't not propagate to physical Range node. We should use this outputOrdering 
from logical Range node so parent nodes of Range can correctly know the output 
ordering.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21274) Implement EXCEPT ALL and INTERSECT ALL

2018-05-06 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16465010#comment-16465010
 ] 

Liang-Chi Hsieh commented on SPARK-21274:
-

[~dkbiswal] No problem. Current EXCEPT ALL rewrite is also one GROUP BY, I 
think it is fine to keep with.

> Implement EXCEPT ALL and INTERSECT ALL
> --
>
> Key: SPARK-21274
> URL: https://issues.apache.org/jira/browse/SPARK-21274
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.0.0, 2.1.0, 2.2.0
>Reporter: Ruslan Dautkhanov
>Priority: Major
>
> 1) *EXCEPT ALL* / MINUS ALL :
> {code}
> SELECT a,b,c FROM tab1
>  EXCEPT ALL 
> SELECT a,b,c FROM tab2
> {code}
> can be rewritten as following outer join:
> {code}
> SELECT a,b,c
> FROMtab1 t1
>  LEFT OUTER JOIN 
> tab2 t2
>  ON (
> (t1.a, t1.b, t1.c) = (t2.a, t2.b, t2.c)
>  )
> WHERE
> COALESCE(t2.a, t2.b, t2.c) IS NULL
> {code}
> (register as a temp.view this second query under "*t1_except_t2_df*" name 
> that can be also used to find INTERSECT ALL below):
> 2) *INTERSECT ALL*:
> {code}
> SELECT a,b,c FROM tab1
>  INTERSECT ALL 
> SELECT a,b,c FROM tab2
> {code}
> can be rewritten as following anti-join using t1_except_t2_df we defined 
> above:
> {code}
> SELECT a,b,c
> FROMtab1 t1
> WHERE 
>NOT EXISTS
>(SELECT 1
> FROMt1_except_t2_df e
> WHERE (t1.a, t1.b, t1.c) = (e.a, e.b, e.c)
>)
> {code}
> So the suggestion is just to use above query rewrites to implement both 
> EXCEPT ALL and INTERSECT ALL sql set operations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-21274) Implement EXCEPT ALL and INTERSECT ALL

2018-05-06 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16465000#comment-16465000
 ] 

Liang-Chi Hsieh edited comment on SPARK-21274 at 5/6/18 7:30 AM:
-

I read the design doc. It looks correct to me. I found a rewrite rule in Presto 
for INTERSECT that seems more simple to me at 
[https://github.com/prestodb/presto/issues/4918#issuecomment-207106688.]

That rule can be used to do INTERSECT ALL and EXCEPT ALL, if I don't miss 
anything.

For example, to do INTERSECT ALL like:

{{SELECT a FROM foo INTERSECT ALL SELECT x FROM bar}}

{{We can rewrite it as:}}
{code:java}
SELECT a FROM (
  SELECT replicate_row(min_count, a) AS (min_count, a) FROM (
SELECT a, COUNT(foo_marker) AS foo_cnt, COUNT(bar_marker) AS bar_cnt, 
IF(COUNT(foo_marker) > COUNT(bar_marker), COUNT(bar_marker), COUNT(foo_marker)) 
AS min_count
FROM (
  SELECT a, true as foo_marker, null as bar_marker FROM foo
  UNION ALL
  SELECT x, null as foo_marker, true as bar_marker FROM bar
) T1
GROUP BY a) T2
  WHERE foo_cnt >= 1 AND bar_cnt >= 1
)
{code}
 

One advantage of that rewrite rule is the rules of INTERSECT ALL and EXCEPT ALL 
are more similar to each other.

Another one is for INTERSECT ALL, it only needs one GROUP BY (like EXCEPT ALL 
in current design) instead of three GROUP BY.

WDYT? [~dkbiswal] [~maropu]

 

 

 


was (Author: viirya):
I read the design doc. It looks correct to me. I found a rewrite rule in Presto 
for INTERSECT that seems more simple to me at 
[https://github.com/prestodb/presto/issues/4918#issuecomment-207106688.]

That rule can be used to do INTERSECT ALL and EXCEPT ALL, if I don't miss 
anything.

For example, to do INTERSECT ALL like:

{{SELECT a FROM foo INTERSECT ALL SELECT x FROM bar}}

{{We can rewrite it as:}}
{code:java}
SELECT a FROM (
  SELECT replicate_row(min_count, a) AS (min_count, a) FROM (
SELECT a, COUNT(foo_marker) AS foo_cnt, COUNT(bar_marker) AS bar_cnt, 
IF(COUNT(foo_marker) > COUNT(bar_marker), COUNT(bar_marker), COUNT(foo_marker)) 
AS min_count
FROM (
  SELECT a, true as foo_marker, null as bar_marker FROM foo
  UNION ALL
  SELECT x, null as foo_marker, true as bar_marker FROM bar
) T1
GROUP BY a) T2
  WHERE foo_cnt >= 1 AND bar_cnt >= 1
)
{code}
 

One advantage of that rewrite rule is the rules of INTERSECT ALL and EXCEPT ALL 
are more similar to each other.

Another one is for INTERSECT ALL, it only needs one GROUP BY instead of three 
GROUP BY in current design.

WDYT? [~dkbiswal] [~maropu]

 

 

 

> Implement EXCEPT ALL and INTERSECT ALL
> --
>
> Key: SPARK-21274
> URL: https://issues.apache.org/jira/browse/SPARK-21274
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.0.0, 2.1.0, 2.2.0
>Reporter: Ruslan Dautkhanov
>Priority: Major
>
> 1) *EXCEPT ALL* / MINUS ALL :
> {code}
> SELECT a,b,c FROM tab1
>  EXCEPT ALL 
> SELECT a,b,c FROM tab2
> {code}
> can be rewritten as following outer join:
> {code}
> SELECT a,b,c
> FROMtab1 t1
>  LEFT OUTER JOIN 
> tab2 t2
>  ON (
> (t1.a, t1.b, t1.c) = (t2.a, t2.b, t2.c)
>  )
> WHERE
> COALESCE(t2.a, t2.b, t2.c) IS NULL
> {code}
> (register as a temp.view this second query under "*t1_except_t2_df*" name 
> that can be also used to find INTERSECT ALL below):
> 2) *INTERSECT ALL*:
> {code}
> SELECT a,b,c FROM tab1
>  INTERSECT ALL 
> SELECT a,b,c FROM tab2
> {code}
> can be rewritten as following anti-join using t1_except_t2_df we defined 
> above:
> {code}
> SELECT a,b,c
> FROMtab1 t1
> WHERE 
>NOT EXISTS
>(SELECT 1
> FROMt1_except_t2_df e
> WHERE (t1.a, t1.b, t1.c) = (e.a, e.b, e.c)
>)
> {code}
> So the suggestion is just to use above query rewrites to implement both 
> EXCEPT ALL and INTERSECT ALL sql set operations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-21274) Implement EXCEPT ALL and INTERSECT ALL

2018-05-06 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16465000#comment-16465000
 ] 

Liang-Chi Hsieh edited comment on SPARK-21274 at 5/6/18 7:29 AM:
-

I read the design doc. It looks correct to me. I found a rewrite rule in Presto 
for INTERSECT that seems more simple to me at 
[https://github.com/prestodb/presto/issues/4918#issuecomment-207106688.]

That rule can be used to do INTERSECT ALL and EXCEPT ALL, if I don't miss 
anything.

For example, to do INTERSECT ALL like:

{{SELECT a FROM foo INTERSECT ALL SELECT x FROM bar}}

{{We can rewrite it as:}}
{code:java}
SELECT a FROM (
  SELECT replicate_row(min_count, a) AS (min_count, a) FROM (
SELECT a, COUNT(foo_marker) AS foo_cnt, COUNT(bar_marker) AS bar_cnt, 
IF(COUNT(foo_marker) > COUNT(bar_marker), COUNT(bar_marker), COUNT(foo_marker)) 
AS min_count
FROM (
  SELECT a, true as foo_marker, null as bar_marker FROM foo
  UNION ALL
  SELECT x, null as foo_marker, true as bar_marker FROM bar
) T1
GROUP BY a) T2
  WHERE foo_cnt >= 1 AND bar_cnt >= 1
)
{code}
 

One advantage of that rewrite rule is the rules of INTERSECT ALL and EXCEPT ALL 
are more similar to each other.

Another one is for INTERSECT ALL, it only needs one GROUP BY instead of three 
GROUP BY in current design.

WDYT? [~dkbiswal] [~maropu]

 

 

 


was (Author: viirya):
I read the design doc. It looks correct to me. I found a rewrite rule in Presto 
for INTERSECT that seems more simple to me at 
[https://github.com/prestodb/presto/issues/4918#issuecomment-207106688.]

That rule can be used to do INTERSECT ALL and EXCEPT ALL, if I don't miss 
anything.

For example, to do INTERSECT ALL like:

{{SELECT a FROM foo INTERSECT ALL SELECT x FROM bar}}

{{We can rewrite it as:}}
{code:java}
SELECT a FROM (
  SELECT replicate_row(min_count, a) AS (min_count, a) FROM (
SELECT a, COUNT(foo_marker) AS foo_cnt, COUNT(bar_marker) AS bar_cnt, 
IF(COUNT(foo_marker) > COUNT(bar_marker), COUNT(bar_marker), COUNT(foo_marker)) 
AS min_count
FROM (
  SELECT a, true as foo_marker, null as bar_marker FROM foo
  UNION ALL
  SELECT x, null as foo_marker, true as bar_marker FROM bar
) T1
GROUP BY a) T2
  WHERE foo_cnt >= 1 AND bar_cnt >= 1
)
{code}
 

One advantage of that rewrite rule is the rules of INTERSECT ALL and EXCEPT ALL 
are more similar to each other.

Another one is for INTERSECT ALL, it only needs one GROUP BY instead of three 
GROUP BY in current design.

WDYT?

 

 

 

> Implement EXCEPT ALL and INTERSECT ALL
> --
>
> Key: SPARK-21274
> URL: https://issues.apache.org/jira/browse/SPARK-21274
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.0.0, 2.1.0, 2.2.0
>Reporter: Ruslan Dautkhanov
>Priority: Major
>
> 1) *EXCEPT ALL* / MINUS ALL :
> {code}
> SELECT a,b,c FROM tab1
>  EXCEPT ALL 
> SELECT a,b,c FROM tab2
> {code}
> can be rewritten as following outer join:
> {code}
> SELECT a,b,c
> FROMtab1 t1
>  LEFT OUTER JOIN 
> tab2 t2
>  ON (
> (t1.a, t1.b, t1.c) = (t2.a, t2.b, t2.c)
>  )
> WHERE
> COALESCE(t2.a, t2.b, t2.c) IS NULL
> {code}
> (register as a temp.view this second query under "*t1_except_t2_df*" name 
> that can be also used to find INTERSECT ALL below):
> 2) *INTERSECT ALL*:
> {code}
> SELECT a,b,c FROM tab1
>  INTERSECT ALL 
> SELECT a,b,c FROM tab2
> {code}
> can be rewritten as following anti-join using t1_except_t2_df we defined 
> above:
> {code}
> SELECT a,b,c
> FROMtab1 t1
> WHERE 
>NOT EXISTS
>(SELECT 1
> FROMt1_except_t2_df e
> WHERE (t1.a, t1.b, t1.c) = (e.a, e.b, e.c)
>)
> {code}
> So the suggestion is just to use above query rewrites to implement both 
> EXCEPT ALL and INTERSECT ALL sql set operations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21274) Implement EXCEPT ALL and INTERSECT ALL

2018-05-06 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16465000#comment-16465000
 ] 

Liang-Chi Hsieh commented on SPARK-21274:
-

I read the design doc. It looks correct to me. I found a rewrite rule in Presto 
for INTERSECT that seems more simple to me at 
[https://github.com/prestodb/presto/issues/4918#issuecomment-207106688.]

That rule can be used to do INTERSECT ALL and EXCEPT ALL, if I don't miss 
anything.

For example, to do INTERSECT ALL like:

{{SELECT a FROM foo INTERSECT ALL SELECT x FROM bar}}

{{We can rewrite it as:}}
{code:java}
SELECT a FROM (
  SELECT replicate_row(min_count, a) AS (min_count, a) FROM (
SELECT a, COUNT(foo_marker) AS foo_cnt, COUNT(bar_marker) AS bar_cnt, 
IF(COUNT(foo_marker) > COUNT(bar_marker), COUNT(bar_marker), COUNT(foo_marker)) 
AS min_count
FROM (
  SELECT a, true as foo_marker, null as bar_marker FROM foo
  UNION ALL
  SELECT x, null as foo_marker, true as bar_marker FROM bar
) T1
GROUP BY a) T2
  WHERE foo_cnt >= 1 AND bar_cnt >= 1
)
{code}
 

One advantage of that rewrite rule is the rules of INTERSECT ALL and EXCEPT ALL 
are more similar to each other.

Another one is for INTERSECT ALL, it only needs one GROUP BY instead of three 
GROUP BY in current design.

WDYT?

 

 

 

> Implement EXCEPT ALL and INTERSECT ALL
> --
>
> Key: SPARK-21274
> URL: https://issues.apache.org/jira/browse/SPARK-21274
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.0.0, 2.1.0, 2.2.0
>Reporter: Ruslan Dautkhanov
>Priority: Major
>
> 1) *EXCEPT ALL* / MINUS ALL :
> {code}
> SELECT a,b,c FROM tab1
>  EXCEPT ALL 
> SELECT a,b,c FROM tab2
> {code}
> can be rewritten as following outer join:
> {code}
> SELECT a,b,c
> FROMtab1 t1
>  LEFT OUTER JOIN 
> tab2 t2
>  ON (
> (t1.a, t1.b, t1.c) = (t2.a, t2.b, t2.c)
>  )
> WHERE
> COALESCE(t2.a, t2.b, t2.c) IS NULL
> {code}
> (register as a temp.view this second query under "*t1_except_t2_df*" name 
> that can be also used to find INTERSECT ALL below):
> 2) *INTERSECT ALL*:
> {code}
> SELECT a,b,c FROM tab1
>  INTERSECT ALL 
> SELECT a,b,c FROM tab2
> {code}
> can be rewritten as following anti-join using t1_except_t2_df we defined 
> above:
> {code}
> SELECT a,b,c
> FROMtab1 t1
> WHERE 
>NOT EXISTS
>(SELECT 1
> FROMt1_except_t2_df e
> WHERE (t1.a, t1.b, t1.c) = (e.a, e.b, e.c)
>)
> {code}
> So the suggestion is just to use above query rewrites to implement both 
> EXCEPT ALL and INTERSECT ALL sql set operations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24152) SparkR CRAN feasibility check server problem

2018-05-03 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16462280#comment-16462280
 ] 

Liang-Chi Hsieh commented on SPARK-24152:
-

Can be resolved now as I saw Jenkins test passed.

> SparkR CRAN feasibility check server problem
> 
>
> Key: SPARK-24152
> URL: https://issues.apache.org/jira/browse/SPARK-24152
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 2.4.0
>Reporter: Dongjoon Hyun
>Assignee: Liang-Chi Hsieh
>Priority: Critical
>
> PR builder and master branch test fails with the following SparkR error with 
> unknown reason. The following is an error message from that.
> {code}
> * this is package 'SparkR' version '2.4.0'
> * checking CRAN incoming feasibility ...Error in 
> .check_package_CRAN_incoming(pkgdir) : 
>   dims [product 24] do not match the length of object [0]
> Execution halted
> {code}
> *PR BUILDER*
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90039/
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89983/
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89998/
> *MASTER BRANCH*
> - 
> https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/4458/
>  (Fail with no failures)
> This is critical because we already start to merge the PR by ignoring this 
> **known unkonwn** SparkR failure.
> - https://github.com/apache/spark/pull/21175



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24152) SparkR CRAN feasibility check server problem

2018-05-03 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16462030#comment-16462030
 ] 

Liang-Chi Hsieh commented on SPARK-24152:
-

I think it is fixed now. It works in local. But better to check Jenkins test 
results too.

> SparkR CRAN feasibility check server problem
> 
>
> Key: SPARK-24152
> URL: https://issues.apache.org/jira/browse/SPARK-24152
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 2.4.0
>Reporter: Dongjoon Hyun
>Assignee: Liang-Chi Hsieh
>Priority: Critical
>
> PR builder and master branch test fails with the following SparkR error with 
> unknown reason. The following is an error message from that.
> {code}
> * this is package 'SparkR' version '2.4.0'
> * checking CRAN incoming feasibility ...Error in 
> .check_package_CRAN_incoming(pkgdir) : 
>   dims [product 24] do not match the length of object [0]
> Execution halted
> {code}
> *PR BUILDER*
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90039/
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89983/
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89998/
> *MASTER BRANCH*
> - 
> https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/4458/
>  (Fail with no failures)
> This is critical because we already start to merge the PR by ignoring this 
> **known unkonwn** SparkR failure.
> - https://github.com/apache/spark/pull/21175



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24152) Flaky Test: SparkR

2018-05-02 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461962#comment-16461962
 ] 

Liang-Chi Hsieh commented on SPARK-24152:
-

CRAN sysadmin replied me it should be fixed now. I can't access laptop so
don't confirm it. Maybe someone can confirm it by checking if Jenkins R
tests pass now.

Thanks.




> Flaky Test: SparkR
> --
>
> Key: SPARK-24152
> URL: https://issues.apache.org/jira/browse/SPARK-24152
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 2.4.0
>Reporter: Dongjoon Hyun
>Priority: Critical
>
> PR builder and master branch test fails with the following SparkR error with 
> unknown reason. The following is an error message from that.
> {code}
> * this is package 'SparkR' version '2.4.0'
> * checking CRAN incoming feasibility ...Error in 
> .check_package_CRAN_incoming(pkgdir) : 
>   dims [product 24] do not match the length of object [0]
> Execution halted
> {code}
> *PR BUILDER*
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90039/
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89983/
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89998/
> *MASTER BRANCH*
> - 
> https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/4458/
>  (Fail with no failures)
> This is critical because we already start to merge the PR by ignoring this 
> **known unkonwn** SparkR failure.
> - https://github.com/apache/spark/pull/21175



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24152) Flaky Test: SparkR

2018-05-02 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461867#comment-16461867
 ] 

Liang-Chi Hsieh commented on SPARK-24152:
-

Thanks [~hyukjin.kwon] for pinging me. I found a problem in CRAN PACKAGES.in 
file. Seems it causes the R test failure again. Already emailed to cran 
sysadmin for help.

 

 

> Flaky Test: SparkR
> --
>
> Key: SPARK-24152
> URL: https://issues.apache.org/jira/browse/SPARK-24152
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Affects Versions: 2.4.0
>Reporter: Dongjoon Hyun
>Priority: Critical
>
> PR builder and master branch test fails with the following SparkR error with 
> unknown reason. The following is an error message from that.
> {code}
> * this is package 'SparkR' version '2.4.0'
> * checking CRAN incoming feasibility ...Error in 
> .check_package_CRAN_incoming(pkgdir) : 
>   dims [product 24] do not match the length of object [0]
> Execution halted
> {code}
> *PR BUILDER*
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90039/
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89983/
> - https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89998/
> *MASTER BRANCH*
> - 
> https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/4458/
>  (Fail with no failures)
> This is critical because we already start to merge the PR by ignoring this 
> **known unkonwn** SparkR failure.
> - https://github.com/apache/spark/pull/21175



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24131) Add majorMinorVersion API to PySpark for determining Spark versions

2018-04-30 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-24131:
---

 Summary: Add majorMinorVersion API to PySpark for determining 
Spark versions
 Key: SPARK-24131
 URL: https://issues.apache.org/jira/browse/SPARK-24131
 Project: Spark
  Issue Type: Improvement
  Components: PySpark
Affects Versions: 2.4.0
Reporter: Liang-Chi Hsieh


We need to determine Spark major and minor versions in PySpark. We can add a  
{{majorMinorVersion}} API to PySpark which is similar to the API in 
{{VersionUtils.majorMinorVersion}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24121) The API for handling expression code generation in expression codegen

2018-04-30 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-24121:
---

 Summary: The API for handling expression code generation in 
expression codegen
 Key: SPARK-24121
 URL: https://issues.apache.org/jira/browse/SPARK-24121
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Liang-Chi Hsieh


In order to achieve the replacement of expr value during expression codegen 
(please see the proposal at 
[https://github.com/apache/spark/pull/19813#issuecomment-354045400),] we need 
an API to handle the insertion of temporary symbols for statements generated by 
expressions. This API must allow us to know what statement expressions are 
during codegen and to use symbols instead of actual codes when generating java 
codes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24058) Default Params in ML should be saved separately: Python API

2018-04-23 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16449313#comment-16449313
 ] 

Liang-Chi Hsieh commented on SPARK-24058:
-

OK. I will work on this. Thanks.

> Default Params in ML should be saved separately: Python API
> ---
>
> Key: SPARK-24058
> URL: https://issues.apache.org/jira/browse/SPARK-24058
> Project: Spark
>  Issue Type: Improvement
>  Components: ML, PySpark
>Affects Versions: 2.4.0
>Reporter: Joseph K. Bradley
>Priority: Major
>
> See [SPARK-23455] for reference.  Since DefaultParamsReader has been changed 
> in Scala, we must change it for Python for Spark 2.4.0 as well in order to 
> keep the 2 in sync.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23711) Add fallback to interpreted execution logic

2018-04-19 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443959#comment-16443959
 ] 

Liang-Chi Hsieh commented on SPARK-23711:
-

Ok.

> Add fallback to interpreted execution logic
> ---
>
> Key: SPARK-23711
> URL: https://issues.apache.org/jira/browse/SPARK-23711
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Herman van Hovell
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23711) Add fallback to interpreted execution logic

2018-04-19 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443924#comment-16443924
 ] 

Liang-Chi Hsieh commented on SPARK-23711:
-

Yeah, I agree that is a good rule. I will prepare a PR for this soon.

> Add fallback to interpreted execution logic
> ---
>
> Key: SPARK-23711
> URL: https://issues.apache.org/jira/browse/SPARK-23711
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Herman van Hovell
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23711) Add fallback to interpreted execution logic

2018-04-19 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443879#comment-16443879
 ] 

Liang-Chi Hsieh commented on SPARK-23711:
-

object hash aggregate? If you mean {{ObjectHashAggregateExec}}, seems it 
doesn't support codegen and only runs interpreted mode?

For encoders, I think it is because it directly uses something like 
{{GenerateUnsafeProjection}} and doesn't fallback to interpreted version for 
now. Other places using this kind of codegen code path are the same too.

I didn't check window functions yet. Thanks for pointing it out.

> Add fallback to interpreted execution logic
> ---
>
> Key: SPARK-23711
> URL: https://issues.apache.org/jira/browse/SPARK-23711
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Herman van Hovell
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23711) Add fallback to interpreted execution logic

2018-04-18 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443573#comment-16443573
 ] 

Liang-Chi Hsieh commented on SPARK-23711:
-

About this, I suppose that in {{WholeStageCodegenExec}}, we already have logic 
to fallback to interpreted mode? Other places we need to add this kind of 
fallback?

> Add fallback to interpreted execution logic
> ---
>
> Key: SPARK-23711
> URL: https://issues.apache.org/jira/browse/SPARK-23711
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Herman van Hovell
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24014) Add onStreamingStarted method to StreamingListener

2018-04-18 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-24014:
---

 Summary: Add onStreamingStarted method to StreamingListener
 Key: SPARK-24014
 URL: https://issues.apache.org/jira/browse/SPARK-24014
 Project: Spark
  Issue Type: Improvement
  Components: PySpark
Affects Versions: 2.4.0
Reporter: Liang-Chi Hsieh


The {{StreamingListener}} in PySpark side seems to be lack of 
{{onStreamingStarted}} method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23904) Big execution plan cause OOM

2018-04-13 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16438189#comment-16438189
 ] 

Liang-Chi Hsieh commented on SPARK-23904:
-

If you don't need UI, can you try to set {{spark.ui.enabled as false?}}

 

> Big execution plan cause OOM
> 
>
> Key: SPARK-23904
> URL: https://issues.apache.org/jira/browse/SPARK-23904
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Izek Greenfield
>Priority: Major
>  Labels: SQL, query
>
> I create a question in 
> [StackOverflow|https://stackoverflow.com/questions/49508683/spark-physicalplandescription-string-is-to-big]
>  
> Spark create the text representation of query in any case even if I don't 
> need it.
> That causes many garbage object and unneeded GC... 
>  [Gist with code to 
> reproduce|https://gist.github.com/igreenfield/584c3336f03ba7d63e9026774eaf5e23]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23970) pyspark - simple filter/select doesn't use all tasks when coalesce is set

2018-04-13 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16438132#comment-16438132
 ] 

Liang-Chi Hsieh commented on SPARK-23970:
-

I think the document of {{coalesce}} might answer this, let me quote it:

   * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
   * this may result in your computation taking place on fewer nodes than
   * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
   * you can call repartition. This will add a shuffle step, but means the
   * current upstream partitions will be executed in parallel (per whatever
   * the current partitioning is).

When you do a {{coalesce}}, you make your computation taking place on certain 
number of partitions.

 

> pyspark - simple filter/select doesn't use all tasks when coalesce is set
> -
>
> Key: SPARK-23970
> URL: https://issues.apache.org/jira/browse/SPARK-23970
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0, 2.2.1
>Reporter: Matthew Anthony
>Priority: Major
>
> Running in (py)spark 2.2. 
> Marking this as PySpark, but have not confirmed whether this is Spark-wide; 
> I've observed it in pyspark which is my preferred API.
> {code:java}
> df = spark.sql(
> """
> select 
> from 
> where  """
> )
> df.coalesce(32).write.parquet(...){code}
> The above code will only attempt to use 32 tasks to read and process all of 
> the original input data. This compares to 
> {code:java}
> df = spark.sql(
> """
> select 
> from 
> where  """
> ).cache()
> df.count()
> df.coalesce(32).write.parquet(...){code}
> where this will use the full complement of tasks available to the cluster to 
> do the initial filter, with a subsequent shuffle to coalesce and write. The 
> latter execution path is way more efficient, particularly at large volumes 
> where filtering will remove most records and should be the default. Note that 
> in the real setting in which I am running this, I'm operating a 20 node 
> cluster with 16 cores and 56gb RAM per machine, and processing well over a TB 
> of raw data in . The scale of the task I am testing on generates 
> approximately 300,000 read tasks in the latter version of the code when not 
> constrained by the former's execution plan.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23979) MultiAlias should not be a CodegenFallback

2018-04-13 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-23979:
---

 Summary: MultiAlias should not be a CodegenFallback
 Key: SPARK-23979
 URL: https://issues.apache.org/jira/browse/SPARK-23979
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Liang-Chi Hsieh


I just found {{MultiAlias}} is a {{CodegenFallback}}. It should not be as looks 
like {{MultiAlias}} won't be evaluated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23928) High-order function: shuffle(x) → array

2018-04-13 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16437102#comment-16437102
 ] 

Liang-Chi Hsieh commented on SPARK-23928:
-

Hi [~hzlu], So will you take this one?

> High-order function: shuffle(x) → array
> ---
>
> Key: SPARK-23928
> URL: https://issues.apache.org/jira/browse/SPARK-23928
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiao Li
>Priority: Major
>
> Ref: https://prestodb.io/docs/current/functions/array.html
> Generate a random permutation of the given array x.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23928) High-order function: shuffle(x) → array

2018-04-10 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433406#comment-16433406
 ] 

Liang-Chi Hsieh commented on SPARK-23928:
-

If no assignee and no one announces, it is no problem you to take an jira.

> High-order function: shuffle(x) → array
> ---
>
> Key: SPARK-23928
> URL: https://issues.apache.org/jira/browse/SPARK-23928
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiao Li
>Priority: Major
>
> Ref: https://prestodb.io/docs/current/functions/array.html
> Generate a random permutation of the given array x.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23875) Create IndexedSeq wrapper for ArrayData

2018-04-05 Thread Liang-Chi Hsieh (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh updated SPARK-23875:

Description: We don't have a good way to sequentially access 
{{UnsafeArrayData}} with a common interface such as Seq. An example is 
{{MapObject}} where we need to access several sequence collection types 
together. But {{UnsafeArrayData}} doesn't implement {{ArrayData.array}}. 
Calling {{toArray}} will copy the entire array. We can provide an 
{{IndexedSeq}} wrapper for {{ArrayData}}, so we can avoid copying the entire 
array.  (was: We don't have a good way to sequentially access 
{{UnsafeArrayData}} with a common interface such as Seq. An example is 
{{MapObject}} where we need to access several sequence collection types 
together. But {{UnsafeArrayData}} doesn't implement {{ArrayData.array}}. We can 
provide an {{IndexedSeq}} wrapper for {{ArrayData}}, so we can avoid copying 
the entire array.)

> Create IndexedSeq wrapper for ArrayData
> ---
>
> Key: SPARK-23875
> URL: https://issues.apache.org/jira/browse/SPARK-23875
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Liang-Chi Hsieh
>Priority: Major
>
> We don't have a good way to sequentially access {{UnsafeArrayData}} with a 
> common interface such as Seq. An example is {{MapObject}} where we need to 
> access several sequence collection types together. But {{UnsafeArrayData}} 
> doesn't implement {{ArrayData.array}}. Calling {{toArray}} will copy the 
> entire array. We can provide an {{IndexedSeq}} wrapper for {{ArrayData}}, so 
> we can avoid copying the entire array.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23875) Create IndexedSeq wrapper for ArrayData

2018-04-05 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-23875:
---

 Summary: Create IndexedSeq wrapper for ArrayData
 Key: SPARK-23875
 URL: https://issues.apache.org/jira/browse/SPARK-23875
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Liang-Chi Hsieh


We don't have a good way to sequentially access {{UnsafeArrayData}} with a 
common interface such as Seq. An example is {{MapObject}} where we need to 
access several sequence collection types together. But {{UnsafeArrayData}} 
doesn't implement {{ArrayData.array}}. We can provide an {{IndexedSeq}} wrapper 
for {{ArrayData}}, so we can avoid copying the entire array.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23873) Use accessors in interpreted LambdaVariable

2018-04-04 Thread Liang-Chi Hsieh (JIRA)
Liang-Chi Hsieh created SPARK-23873:
---

 Summary: Use accessors in interpreted LambdaVariable
 Key: SPARK-23873
 URL: https://issues.apache.org/jira/browse/SPARK-23873
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Liang-Chi Hsieh


Currently, interpreted execution of {{LambdaVariable}} just uses 
{{InternalRow.get}} to access element. We should use specified accessors if 
possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23661) Implement treeAggregate on Dataset API

2018-03-31 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16421348#comment-16421348
 ] 

Liang-Chi Hsieh commented on SPARK-23661:
-

For the implementation of {{Dataset.treeAggregate}}, I'm thinking if we need to 
support SQL tree aggregate for all cases. For example, {{RDD.treeAggregate}} 
can be seen as grouping without keys. This is the case tree aggregation can 
benefit. For grouping by keys, I'm wondering if it really performs much better 
than non tree aggregation.

cc [~cloud_fan]

> Implement treeAggregate on Dataset API
> --
>
> Key: SPARK-23661
> URL: https://issues.apache.org/jira/browse/SPARK-23661
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Liang-Chi Hsieh
>Priority: Major
>
> Many algorithms in MLlib are still not migrated their internal computing 
> workload from {{RDD}} to {{DataFrame}}. {{treeAggregate}} is one of obstacles 
> we need to address in order to see complete migration.
> This ticket is opened to provide {{treeAggregate}} on Dataset API. For now 
> this should be a private API used by ML component.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23835) When Dataset.as converts column from nullable to non-nullable type, null Doubles are converted silently to -1

2018-03-31 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16421346#comment-16421346
 ] 

Liang-Chi Hsieh commented on SPARK-23835:
-

What is the better behavior it should have?

> When Dataset.as converts column from nullable to non-nullable type, null 
> Doubles are converted silently to -1
> -
>
> Key: SPARK-23835
> URL: https://issues.apache.org/jira/browse/SPARK-23835
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Joseph K. Bradley
>Priority: Major
>
> I constructed a DataFrame with a nullable java.lang.Double column (and an 
> extra Double column).  I then converted it to a Dataset using ```as[(Double, 
> Double)]```.  When the Dataset is shown, it has a null.  When it is collected 
> and printed, the null is silently converted to a -1.
> Code snippet to reproduce this:
> {code}
> val localSpark = spark
> import localSpark.implicits._
> val df = Seq[(java.lang.Double, Double)](
>   (1.0, 2.0),
>   (3.0, 4.0),
>   (Double.NaN, 5.0),
>   (null, 6.0)
> ).toDF("a", "b")
> df.show()  // OUTPUT 1: has null
> df.printSchema()
> val data = df.as[(Double, Double)]
> data.show()  // OUTPUT 2: has null
> data.collect().foreach(println)  // OUTPUT 3: has -1
> {code}
> OUTPUT 1 and 2:
> {code}
> ++---+
> |   a|  b|
> ++---+
> | 1.0|2.0|
> | 3.0|4.0|
> | NaN|5.0|
> |null|6.0|
> ++---+
> {code}
> OUTPUT 3:
> {code}
> (1.0,2.0)
> (3.0,4.0)
> (NaN,5.0)
> (-1.0,6.0)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   9   10   >