[jira] [Created] (SPARK-31872) NotNullSafe to get complementary set

2020-05-30 Thread Xiaoju Wu (Jira)
Xiaoju Wu created SPARK-31872:
-

 Summary: NotNullSafe to get complementary set
 Key: SPARK-31872
 URL: https://issues.apache.org/jira/browse/SPARK-31872
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0, 2.3.0, 3.0.0
Reporter: Xiaoju Wu


If we have a filter expression to get subset of rows, and then we want the 
complementary set, Not(expression) cannot work, since Not is NullIntolerent, if 
expression.eval(row) is null, filter predicate is false too for 
Not(expression), "row" will not appear in both subset and complementary set.

So, maybe we need a NotNullSafe implementation to get the complementary set 
which will result in true if the expression.eval(row) is null.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-30443) "Managed memory leak detected" even with no calls to take() or limit()

2020-03-26 Thread Xiaoju Wu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17068290#comment-17068290
 ] 

Xiaoju Wu edited comment on SPARK-30443 at 3/27/20, 5:50 AM:
-

Also see this kind of warning logs. SPARK-21492 may relate to this warning. 
Does your code base contain it?
And I'm afraid there could be other consumers not release memory by themselves 
but let the task release all memory related to taskId at the end of task.


was (Author: xiaojuwu):
Also see this kind of warning logs. SPARK-21492 may relate to this warning. 
Does your code base contain it?

> "Managed memory leak detected" even with no calls to take() or limit()
> --
>
> Key: SPARK-30443
> URL: https://issues.apache.org/jira/browse/SPARK-30443
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.2, 2.4.4, 3.0.0
>Reporter: Luke Richter
>Priority: Major
> Attachments: a.csv.zip, b.csv.zip, c.csv.zip
>
>
> Our Spark code is causing a "Managed memory leak detected" warning to appear, 
> even though we are not calling take() or limit().
> According to SPARK-14168 https://issues.apache.org/jira/browse/SPARK-14168 
> managed memory leaks should only be caused by not reading an iterator to 
> completion, i.e. take() or limit()
> Our exact warning text is: "2020-01-06 14:54:59 WARN Executor:66 - Managed 
> memory leak detected; size = 2097152 bytes, TID = 118"
>  The size of the managed memory leak is always 2MB.
> I have created a minimal test program that reproduces the warning: 
> {code:java}
> import pyspark.sql
> import pyspark.sql.functions as fx
> def main():
> builder = pyspark.sql.SparkSession.builder
> builder = builder.appName("spark-jira")
> spark = builder.getOrCreate()
> reader = spark.read
> reader = reader.format("csv")
> reader = reader.option("inferSchema", "true")
> reader = reader.option("header", "true")
> table_c = reader.load("c.csv")
> table_a = reader.load("a.csv")
> table_b = reader.load("b.csv")
> primary_filter = fx.col("some_code").isNull()
> new_primary_data = table_a.filter(primary_filter)
> new_ids = new_primary_data.select("some_id")
> new_data = table_b.join(new_ids, "some_id")
> new_data = new_data.select("some_id")
> result = table_c.join(new_data, "some_id", "left")
> result.repartition(1).write.json("results.json", mode="overwrite")
> spark.stop()
> if __name__ == "__main__":
> main()
> {code}
> Our code isn't anything out of the ordinary, just some filters, selects and 
> joins.
> The input data is made up of 3 CSV files. The input data files are quite 
> large, roughly 2.6GB in total uncompressed. I attempted to reduce the number 
> of rows in the CSV input files but this caused the warning to no longer 
> appear. After compressing the files I was able to attach them below.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30443) "Managed memory leak detected" even with no calls to take() or limit()

2020-03-26 Thread Xiaoju Wu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17068290#comment-17068290
 ] 

Xiaoju Wu commented on SPARK-30443:
---

Also see this kind of warning logs. SPARK-21492 may relate to this warning. 
Does your code base contain it?

> "Managed memory leak detected" even with no calls to take() or limit()
> --
>
> Key: SPARK-30443
> URL: https://issues.apache.org/jira/browse/SPARK-30443
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.2, 2.4.4, 3.0.0
>Reporter: Luke Richter
>Priority: Major
> Attachments: a.csv.zip, b.csv.zip, c.csv.zip
>
>
> Our Spark code is causing a "Managed memory leak detected" warning to appear, 
> even though we are not calling take() or limit().
> According to SPARK-14168 https://issues.apache.org/jira/browse/SPARK-14168 
> managed memory leaks should only be caused by not reading an iterator to 
> completion, i.e. take() or limit()
> Our exact warning text is: "2020-01-06 14:54:59 WARN Executor:66 - Managed 
> memory leak detected; size = 2097152 bytes, TID = 118"
>  The size of the managed memory leak is always 2MB.
> I have created a minimal test program that reproduces the warning: 
> {code:java}
> import pyspark.sql
> import pyspark.sql.functions as fx
> def main():
> builder = pyspark.sql.SparkSession.builder
> builder = builder.appName("spark-jira")
> spark = builder.getOrCreate()
> reader = spark.read
> reader = reader.format("csv")
> reader = reader.option("inferSchema", "true")
> reader = reader.option("header", "true")
> table_c = reader.load("c.csv")
> table_a = reader.load("a.csv")
> table_b = reader.load("b.csv")
> primary_filter = fx.col("some_code").isNull()
> new_primary_data = table_a.filter(primary_filter)
> new_ids = new_primary_data.select("some_id")
> new_data = table_b.join(new_ids, "some_id")
> new_data = new_data.select("some_id")
> result = table_c.join(new_data, "some_id", "left")
> result.repartition(1).write.json("results.json", mode="overwrite")
> spark.stop()
> if __name__ == "__main__":
> main()
> {code}
> Our code isn't anything out of the ordinary, just some filters, selects and 
> joins.
> The input data is made up of 3 CSV files. The input data files are quite 
> large, roughly 2.6GB in total uncompressed. I attempted to reduce the number 
> of rows in the CSV input files but this caused the warning to no longer 
> appear. After compressing the files I was able to attach them below.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-31069) high cpu caused by chunksBeingTransferred in external shuffle service

2020-03-07 Thread Xiaoju Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaoju Wu updated SPARK-31069:
--
Description: 
"shuffle-chunk-fetch-handler-2-40" #250 daemon prio=5 os_prio=0 
tid=0x02ac nid=0xb9b3 runnable [0x7ff20a1af000]
   java.lang.Thread.State: RUNNABLE
at 
java.util.concurrent.ConcurrentHashMap$Traverser.advance(ConcurrentHashMap.java:3339)
at 
java.util.concurrent.ConcurrentHashMap$ValueIterator.next(ConcurrentHashMap.java:3439)
at 
org.apache.spark.network.server.OneForOneStreamManager.chunksBeingTransferred(OneForOneStreamManager.java:184)
at 
org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:85)
at 
org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:51)
at 
org.spark_project.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at 
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:38)
at 
org.spark_project.io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:353)
at 
org.spark_project.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at 
org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at 
org.spark_project.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at 
org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at 
org.spark_project.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
 
 
 
"shuffle-chunk-fetch-handler-2-48" #235 daemon prio=5 os_prio=0 
tid=0x7ff2302ec800 nid=0xb9ad runnable [0x7ff20a7b4000]
   java.lang.Thread.State: RUNNABLE
at 
org.apache.spark.network.server.OneForOneStreamManager.chunksBeingTransferred(OneForOneStreamManager.java:186)
at 
org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:85)
at 
org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:51)
at 
org.spark_project.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at 
org.spark_project.io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:38)
at 
org.spark_project.io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:353)
at 
org.spark_project.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at 
org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at 
org.spark_project.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at 
org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at 
org.spark_project.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)

> high cpu caused by chunksBeingTransferred in external shuffle service
> -
>
> Key: SPARK-31069
> URL: https://issues.apache.org/jira/browse/SPARK-31069
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 3.0.0
>Reporter: Xiaoju Wu
>Priority: Major
>
> "shuffle-chunk-fetch-handler-2-40" #250 daemon prio=5 os_prio=0 
> tid=0x02ac nid=0xb9b3 runnable [0x7ff20a1af000]
>java.lang.Thread.State: RUNNABLE
> at 
> java.util.concurrent.ConcurrentHashMap$Traverser.advance(ConcurrentHashMap.java:3339)
> at 
> java.util.concurrent.ConcurrentHashMap$ValueIterator.next(ConcurrentHashMap.java:3439)
> at 
> org.apache.spark.network.server.OneForOneStreamManager.chunksBeingTransferred(OneForOneStreamManager.java:184)
> at 
> org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:85)
> at 
> org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:51)
> at 
> 

[jira] [Created] (SPARK-31069) high cpu caused by chunksBeingTransferred in external shuffle service

2020-03-06 Thread Xiaoju Wu (Jira)
Xiaoju Wu created SPARK-31069:
-

 Summary: high cpu caused by chunksBeingTransferred in external 
shuffle service
 Key: SPARK-31069
 URL: https://issues.apache.org/jira/browse/SPARK-31069
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle
Affects Versions: 3.0.0
Reporter: Xiaoju Wu






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23811) FetchFailed comes before Success of same task will cause child stage never succeed

2020-01-01 Thread Xiaoju Wu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-23811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006539#comment-17006539
 ] 

Xiaoju Wu commented on SPARK-23811:
---

[~XuanYuan] The issue seems still exist after patch #17955, any update?

> FetchFailed comes before Success of same task will cause child stage never 
> succeed
> --
>
> Key: SPARK-23811
> URL: https://issues.apache.org/jira/browse/SPARK-23811
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Yuanjian Li
>Priority: Major
> Attachments: 1.png, 2.png
>
>
> This is a bug caused by abnormal scenario describe below:
>  # ShuffleMapTask 1.0 running, this task will fetch data from ExecutorA
>  # ExecutorA Lost, trigger `mapOutputTracker.removeOutputsOnExecutor(execId)` 
> , shuffleStatus changed.
>  # Speculative ShuffleMapTask 1.1 start, got a FetchFailed immediately.
>  # ShuffleMapTask 1.0 finally succeed, but because of 1.1's FetchFailed, 
> stage still mark as failed stage.
>  # ShuffleMapTask 1 is the last task of its stage, so this stage will never 
> succeed because of there's no missing task DagScheduler can get.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-30298) bucket join cannot work for self-join with views

2019-12-18 Thread Xiaoju Wu (Jira)
Xiaoju Wu created SPARK-30298:
-

 Summary: bucket join cannot work for self-join with views
 Key: SPARK-30298
 URL: https://issues.apache.org/jira/browse/SPARK-30298
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.0
Reporter: Xiaoju Wu


This UT may fail at the last line:
{code:java}
test("bucket join cannot work for self-join with views") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
  withTable("t1") {
val df = (0 until 20).map(i => (i, i)).toDF("i", "j").as("df")
df.write
  .format("parquet")
  .bucketBy(8, "i")
  .saveAsTable("t1")

sql(s"create view v1 as select * from t1").collect()

val plan1 = sql("SELECT * FROM t1 a JOIN t1 b ON a.i = 
b.i").queryExecution.executedPlan
assert(plan1.collect { case exchange : ShuffleExchangeExec => exchange 
}.isEmpty)

val plan2 = sql("SELECT * FROM t1 a JOIN v1 b ON a.i = 
b.i").queryExecution.executedPlan
assert(plan2.collect { case exchange : ShuffleExchangeExec => exchange 
}.isEmpty)
  }
}
  }
{code}

It's because View will add Project with Alias, then Join's requiredDistribution 
is based on Alias, but ProjectExec passes child's outputPartition up without 
Alias. Then the satisfies check cannot meet in EnsureRequirement.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30072) Create dedicated planner for subqueries

2019-12-13 Thread Xiaoju Wu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16995627#comment-16995627
 ] 

Xiaoju Wu commented on SPARK-30072:
---

[~cloud_fan] If the sql looks like:

SELECT * FROM df2 WHERE df2.k = (SELECT max(df2.k) FROM df1 JOIN df2 ON df1.k = 
df2.k AND df2.id < 2)

The nested subquery "SELECT max(df2.k) FROM df1 JOIN df2 ON df1.k = df2.k AND 
df2.id < 2" will be run in another QueryExecution,  there's no way to pass 
"isSubquery" information to InsertAdaptiveSparkPlan in the nested 
QueryExecution.

> Create dedicated planner for subqueries
> ---
>
> Key: SPARK-30072
> URL: https://issues.apache.org/jira/browse/SPARK-30072
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Ali Afroozeh
>Assignee: Ali Afroozeh
>Priority: Minor
> Fix For: 3.0.0
>
>
> This PR changes subquery planning by calling the planner and plan preparation 
> rules on the subquery plan directly. Before we were creating a QueryExecution 
> instance for subqueries to get the executedPlan. This would re-run analysis 
> and optimization on the subqueries plan. Running the analysis again on an 
> optimized query plan can have unwanted consequences, as some rules, for 
> example DecimalPrecision, are not idempotent.
> As an example, consider the expression 1.7 * avg(a) which after applying the 
> DecimalPrecision rule becomes:
> promote_precision(1.7) * promote_precision(avg(a))
> After the optimization, more specifically the constant folding rule, this 
> expression becomes:
> 1.7 * promote_precision(avg(a))
> Now if we run the analyzer on this optimized query again, we will get:
> promote_precision(1.7) * promote_precision(promote_precision(avg(a)))
> Which will later optimized as:
> 1.7 * promote_precision(promote_precision(avg(a)))
> As can be seen, re-running the analysis and optimization on this expression 
> results in an expression with extra nested promote_preceision nodes. Adding 
> unneeded nodes to the plan is problematic because it can eliminate situations 
> where we can reuse the plan.
> We opted to introduce dedicated planners for subuqueries, instead of making 
> the DecimalPrecision rule idempotent, because this eliminates this entire 
> category of problems. Another benefit is that planning time for subqueries is 
> reduced.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30072) Create dedicated planner for subqueries

2019-12-09 Thread Xiaoju Wu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16991554#comment-16991554
 ] 

Xiaoju Wu commented on SPARK-30072:
---

[~afroozeh] I think the change from checking if queryExecution equals to 
checking isSubquery boolean is not correct in some scenarios. If a subquery is 
executed under SubqueryExec.executionContext, there's no way to pass the 
information to tell InsertAdaptiveSparkPlan rule that it is a subquery. 

> Create dedicated planner for subqueries
> ---
>
> Key: SPARK-30072
> URL: https://issues.apache.org/jira/browse/SPARK-30072
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Ali Afroozeh
>Assignee: Ali Afroozeh
>Priority: Minor
> Fix For: 3.0.0
>
>
> This PR changes subquery planning by calling the planner and plan preparation 
> rules on the subquery plan directly. Before we were creating a QueryExecution 
> instance for subqueries to get the executedPlan. This would re-run analysis 
> and optimization on the subqueries plan. Running the analysis again on an 
> optimized query plan can have unwanted consequences, as some rules, for 
> example DecimalPrecision, are not idempotent.
> As an example, consider the expression 1.7 * avg(a) which after applying the 
> DecimalPrecision rule becomes:
> promote_precision(1.7) * promote_precision(avg(a))
> After the optimization, more specifically the constant folding rule, this 
> expression becomes:
> 1.7 * promote_precision(avg(a))
> Now if we run the analyzer on this optimized query again, we will get:
> promote_precision(1.7) * promote_precision(promote_precision(avg(a)))
> Which will later optimized as:
> 1.7 * promote_precision(promote_precision(avg(a)))
> As can be seen, re-running the analysis and optimization on this expression 
> results in an expression with extra nested promote_preceision nodes. Adding 
> unneeded nodes to the plan is problematic because it can eliminate situations 
> where we can reuse the plan.
> We opted to introduce dedicated planners for subuqueries, instead of making 
> the DecimalPrecision rule idempotent, because this eliminates this entire 
> category of problems. Another benefit is that planning time for subqueries is 
> reduced.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-30186) support Dynamic Partition Pruning in Adaptive Execution

2019-12-09 Thread Xiaoju Wu (Jira)
Xiaoju Wu created SPARK-30186:
-

 Summary: support Dynamic Partition Pruning in Adaptive Execution
 Key: SPARK-30186
 URL: https://issues.apache.org/jira/browse/SPARK-30186
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Xiaoju Wu
 Fix For: 3.0.0


Currently Adaptive Execution cannot work if Dynamic Partition Pruning is 
applied.

private def supportAdaptive(plan: SparkPlan): Boolean = {
 // TODO migrate dynamic-partition-pruning onto adaptive execution.
 sanityCheck(plan) &&
 !plan.logicalLink.exists(_.isStreaming) &&
 
*!plan.expressions.exists(_.find(_.isInstanceOf[DynamicPruningSubquery]).isDefined)*
 &&
 plan.children.forall(supportAdaptive)
}

It means we cannot benefit the performance from both AE and DPP.

This ticket is target to make DPP + AE works together.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27290) remove unneed sort under Aggregate

2019-06-12 Thread Xiaoju Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862698#comment-16862698
 ] 

Xiaoju Wu commented on SPARK-27290:
---

[~joshrosen] Got it. I think we should identify in which patterns sort is 
really needed and fix the UT to be more meaningful.

> remove unneed sort under Aggregate
> --
>
> Key: SPARK-27290
> URL: https://issues.apache.org/jira/browse/SPARK-27290
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Xiaoju Wu
>Priority: Minor
>
> I saw some tickets to remove unneeded sort in plan while I think there's 
> another case in which sort is redundant:
> Sort just under an non-orderPreserving node is redundant, for example:
> {code}
> select count(*) from (select a1 from A order by a2);
> +- Aggregate
>   +- Sort
>      +- FileScan parquet
> {code}
> But one of the existing test cases is conflict with this example:
> {code}
> test("sort should not be removed when there is a node which doesn't guarantee 
> any order") {
>    val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc)   
>val groupedAndResorted = orderedPlan.groupBy('a)(sum('a)).orderBy('a.asc)
>    val optimized = Optimize.execute(groupedAndResorted.analyze)
>    val correctAnswer = groupedAndResorted.analyze
>comparePlans(optimized, correctAnswer) 
> }
> {code}
> Why is it designed like this? In my opinion, since Aggregate won't pass up 
> the ordering, the below Sort is useless.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27431) move HashedRelation to global UnifiedMemoryManager and enable offheap

2019-04-10 Thread Xiaoju Wu (JIRA)
Xiaoju Wu created SPARK-27431:
-

 Summary: move HashedRelation to global UnifiedMemoryManager and 
enable offheap
 Key: SPARK-27431
 URL: https://issues.apache.org/jira/browse/SPARK-27431
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0
Reporter: Xiaoju Wu


Why is HashedRelation currently managed by a newly created MemoryManager and 
disabled with offheap? Can we improve this part?

/**
 * Create a HashedRelation from an Iterator of InternalRow.
 */
def apply(
 input: Iterator[InternalRow],
 key: Seq[Expression],
 sizeEstimate: Int = 64,
 taskMemoryManager: TaskMemoryManager = null): HashedRelation = {
 val mm = Option(taskMemoryManager).getOrElse {
 new TaskMemoryManager(
 new UnifiedMemoryManager(
 new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
 Long.MaxValue,
 Long.MaxValue / 2,
 1),
 0)
 }

 if (key.length == 1 && key.head.dataType == LongType) {
 LongHashedRelation(input, key, sizeEstimate, mm)
 } else {
 UnsafeHashedRelation(input, key, sizeEstimate, mm)
 }
}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27290) remove unneed sort under Aggregate

2019-03-28 Thread Xiaoju Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16803737#comment-16803737
 ] 

Xiaoju Wu commented on SPARK-27290:
---

[~ekoifman] HashAggregate can not benefit from sorted input but SortAggregate 
can. But SortAggregate will require its input sorted by columns of group-by, 
and will enforce the SortExec in the logic of EnsureRequirements. So the 
user-written sort is useless.

> remove unneed sort under Aggregate
> --
>
> Key: SPARK-27290
> URL: https://issues.apache.org/jira/browse/SPARK-27290
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Xiaoju Wu
>Priority: Minor
>
> I saw some tickets to remove unneeded sort in plan while I think there's 
> another case in which sort is redundant:
> Sort just under an non-orderPreserving node is redundant, for example:
> select count(*) from (select a1 from A order by a2);
> +- Aggregate
>   +- Sort
>      +- FileScan parquet
> But one of the existing test cases is conflict with this example:
> test("sort should not be removed when there is a node which doesn't guarantee 
> any order")
> {   val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc)   val 
> groupedAndResorted = orderedPlan.groupBy('a)(sum('a)).orderBy('a.asc)   val 
> optimized = Optimize.execute(groupedAndResorted.analyze)   val correctAnswer 
> = groupedAndResorted.analyze   comparePlans(optimized, correctAnswer) }
> Why is it designed like this? In my opinion, since Aggregate won't pass up 
> the ordering, the below Sort is useless.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27290) remove unneed sort under Aggregate

2019-03-27 Thread Xiaoju Wu (JIRA)
Xiaoju Wu created SPARK-27290:
-

 Summary: remove unneed sort under Aggregate
 Key: SPARK-27290
 URL: https://issues.apache.org/jira/browse/SPARK-27290
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Xiaoju Wu


I saw some tickets to remove unneeded sort in plan while I think there's 
another case in which sort is redundant:

Sort just under an non-orderPreserving node is redundant, for example:

select count(*) from (select a1 from A order by a2);
+- Aggregate
  +- Sort
     +- FileScan parquet

But one of the existing test cases is conflict with this example:

test("sort should not be removed when there is a node which doesn't guarantee 
any order")

{   val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc)   val 
groupedAndResorted = orderedPlan.groupBy('a)(sum('a)).orderBy('a.asc)   val 
optimized = Optimize.execute(groupedAndResorted.analyze)   val correctAnswer = 
groupedAndResorted.analyze   comparePlans(optimized, correctAnswer) }

Why is it designed like this? In my opinion, since Aggregate won't pass up the 
ordering, the below Sort is useless.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

2019-03-18 Thread Xiaoju Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16794962#comment-16794962
 ] 

Xiaoju Wu commented on SPARK-21492:
---

Any updates? Do you have any discussion on the general fix instead of hack in 
SMJ?

> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>Reporter: Zhan Zhang
>Priority: Major
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25837) Web UI does not respect spark.ui.retainedJobs in some instances

2019-03-18 Thread Xiaoju Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16794932#comment-16794932
 ] 

Xiaoju Wu commented on SPARK-25837:
---

Did you verify this fix with the reproduce case above? I tried and found the 
issue is still there: the cleanup was still backed up but better than the 
version without this fix.

> Web UI does not respect spark.ui.retainedJobs in some instances
> ---
>
> Key: SPARK-25837
> URL: https://issues.apache.org/jira/browse/SPARK-25837
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.3.1
> Environment: Reproduction Environment:
> Spark 2.3.1
> Dataproc 1.3-deb9
> 1x master 4 vCPUs, 15 GB
> 2x workers 4 vCPUs, 15 GB
>  
>Reporter: Patrick Brown
>Assignee: Patrick Brown
>Priority: Minor
> Fix For: 2.3.3, 2.4.1, 3.0.0
>
> Attachments: Screen Shot 2018-10-23 at 4.40.51 PM (1).png
>
>
> Expected Behavior: Web UI only displays 1 completed job and remains 
> responsive.
> Actual Behavior: Both during job execution and following all job completion 
> for some non short amount of time the UI retains many completed jobs, causing 
> limited responsiveness.
>  
> To reproduce:
>  
>  > spark-shell --conf spark.ui.retainedJobs=1
>   
>  scala> import scala.concurrent._
>  scala> import scala.concurrent.ExecutionContext.Implicits.global
>  scala> for (i <- 0 until 5) { Future
> { println(sc.parallelize(0 until i).collect.length) }
> }
>   
>  
>  
> The attached screenshot shows the state of the webui after running the repro 
> code, you can see the ui is displaying some 43k completed jobs (takes a long 
> time to load) after a few minutes of inactivity this will clear out, however 
> in an application which continues to submit jobs every once in a while, the 
> issue persists.
>  
> The issue seems to appear when running multiple jobs at once as well as in 
> sequence for a while and may as well have something to do with high master 
> CPU usage (thus the collect in the repro code). My rough guess would be 
> whatever is managing clearing out completed jobs gets overwhelmed (on the 
> master during repro htop reported almost full CPU usage across all 4 cores).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23375) Optimizer should remove unneeded Sort

2019-03-18 Thread Xiaoju Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16794805#comment-16794805
 ] 

Xiaoju Wu commented on SPARK-23375:
---

But one of your test cases is conflict with what I talked about above:

test("sort should not be removed when there is a node which doesn't guarantee 
any order") {
  val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc)
  val groupedAndResorted = orderedPlan.groupBy('a)(sum('a)).orderBy('a.asc)
  val optimized = Optimize.execute(groupedAndResorted.analyze)
  val correctAnswer = groupedAndResorted.analyze
  comparePlans(optimized, correctAnswer)
}

Why you design like this? In my opinion, since Aggregate won't pass up the 
ordering, the below Sort is useless.

 

> Optimizer should remove unneeded Sort
> -
>
> Key: SPARK-23375
> URL: https://issues.apache.org/jira/browse/SPARK-23375
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Marco Gaido
>Assignee: Marco Gaido
>Priority: Minor
> Fix For: 2.4.0
>
>
> As pointed out in SPARK-23368, as of now there is no rule to remove the Sort 
> operator on an already sorted plan, ie. if we have a query like:
> {code}
> SELECT b
> FROM (
> SELECT a, b
> FROM table1
> ORDER BY a
> ) t
> ORDER BY a
> {code}
> The sort is actually executed twice, even though it is not needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23375) Optimizer should remove unneeded Sort

2019-03-18 Thread Xiaoju Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16794791#comment-16794791
 ] 

Xiaoju Wu commented on SPARK-23375:
---

I think there's another case in which sort is redundant:

Sort just under an non-orderPreserving node is redundant, for example:

select count(*) from (select a1 from A order by a2);
+- Aggregate
  +- Sort
     +- FileScan parquet

> Optimizer should remove unneeded Sort
> -
>
> Key: SPARK-23375
> URL: https://issues.apache.org/jira/browse/SPARK-23375
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Marco Gaido
>Assignee: Marco Gaido
>Priority: Minor
> Fix For: 2.4.0
>
>
> As pointed out in SPARK-23368, as of now there is no rule to remove the Sort 
> operator on an already sorted plan, ie. if we have a query like:
> {code}
> SELECT b
> FROM (
> SELECT a, b
> FROM table1
> ORDER BY a
> ) t
> ORDER BY a
> {code}
> The sort is actually executed twice, even though it is not needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26779) NullPointerException when disable wholestage codegen

2019-01-29 Thread Xiaoju Wu (JIRA)
Xiaoju Wu created SPARK-26779:
-

 Summary: NullPointerException when disable wholestage codegen
 Key: SPARK-26779
 URL: https://issues.apache.org/jira/browse/SPARK-26779
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.0
Reporter: Xiaoju Wu


When running TPCDSSuite with wholestage codegen disabled, NPE is thrown in q9:

java.lang.NullPointerException at

org.apache.spark.sql.execution.FileSourceScanExec.(DataSourceScanExec.scala:170)
 at 
org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:613)
 at 
org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:160)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.immutable.List.foreach(List.scala:381) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.immutable.List.map(List.scala:285) at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.immutable.List.foreach(List.scala:381) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.immutable.List.map(List.scala:285) at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.immutable.List.foreach(List.scala:381) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.immutable.List.map(List.scala:285) at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.immutable.List.foreach(List.scala:381) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.immutable.List.map(List.scala:285) at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.immutable.List.foreach(List.scala:381) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.immutable.List.map(List.scala:285) at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
 at 

[jira] [Commented] (SPARK-23839) consider bucket join in cost-based JoinReorder rule

2018-09-25 Thread Xiaoju Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628184#comment-16628184
 ] 

Xiaoju Wu commented on SPARK-23839:
---

[~smilegator] Is there any plan on the cost-based optimizer?

> consider bucket join in cost-based JoinReorder rule
> ---
>
> Key: SPARK-23839
> URL: https://issues.apache.org/jira/browse/SPARK-23839
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiaoju Wu
>Priority: Minor
>
> Since spark 2.2, the cost-based JoinReorder rule is implemented and in Spark 
> 2.3 released, it is improved with histogram. While it doesn't take the cost 
> of the different join implementations. For example:
> TableA JOIN TableB JOIN TableC
> TableA  will output 10,000 rows after filter and projection. 
> TableB  will output 10,000 rows after filter and projection. 
> TableC  will output 8,000 rows after filter and projection. 
> The current JoinReorder rule will possibly optimize the plan to join TableC 
> with TableA firstly and then TableB. But if the TableA and TableB are bucket 
> tables and can be applied with BucketJoin, it could be a different story. 
>  
> Also, to support bucket join of more than 2 tables when table bucket number 
> is multiple of another (SPARK-17570), whether bucket join can take effect 
> depends on the result of JoinReorder. For example of "A join B join C" which 
> has bucket number like 8, 4, 12, JoinReorder rule should optimize the order 
> to "A join B join C“ to make the bucket join take effect instead of "C join A 
> join B". 
>  
> Based on current CBO JoinReorder, there are possibly 2 part to be changed:
>  # CostBasedJoinReorder rule is applied in optimizer phase while we do Join 
> selection in planner phase and bucket join optimization in EnsureRequirements 
> which is in preparation phase. Both are after optimizer. 
>  # Current statistics and join cost formula are based data selectivity and 
> cardinality, we need to add statistics for present the join method cost like 
> shuffle, sort, hash and etc. Also we need to add the statistics into the 
> formula to estimate the join cost. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24088) only HadoopRDD leverage HDFS Cache as preferred location

2018-04-25 Thread Xiaoju Wu (JIRA)
Xiaoju Wu created SPARK-24088:
-

 Summary: only HadoopRDD leverage HDFS Cache as preferred location
 Key: SPARK-24088
 URL: https://issues.apache.org/jira/browse/SPARK-24088
 Project: Spark
  Issue Type: Improvement
  Components: Input/Output
Affects Versions: 2.3.0
Reporter: Xiaoju Wu


Only HadoopRDD implements convertSplitLocationInfo which will convert location 
to HDFSCacheTaskLocation based on if the block is cached in Datanode memory.  
While FileScanRDD not. In FileScanRDD, all split location information is 
dropped. 

private[spark] def convertSplitLocationInfo(
 infos: Array[SplitLocationInfo]): Option[Seq[String]] = {
 Option(infos).map(_.flatMap { loc =>
 val locationStr = loc.getLocation
 if (locationStr != "localhost") {
 if (loc.isInMemory) {
 logDebug(s"Partition $locationStr is cached by Hadoop.")
 Some(HDFSCacheTaskLocation(locationStr).toString)
 } else {
 Some(HostTaskLocation(locationStr).toString)
 }
 } else {
 None
 }
 })
}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23839) consider bucket join in cost-based JoinReorder rule

2018-04-02 Thread Xiaoju Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1648#comment-1648
 ] 

Xiaoju Wu edited comment on SPARK-23839 at 4/2/18 3:05 PM:
---

Yes, bucketing is one of the cases to say that the cost of shuffling and 
sorting should be taken into consideration of the cost formula. I agree the 
simple the better. Seems we need to think about a simple solution, which will 
not lead the join order to worse performance than that optimized after current 
CostBasedJoinReorder rule. Any suggestion?


was (Author: xiaojuwu):
Yes, bucketing is one of the cases to say that the cost of shuffling and 
sorting should be taken into consideration of the cost formula. I agree the 
simple the better. Seems we need to think about a simple solution, which will 
not lead the join order to worse performance than that optimized after current 
CostBasedJoinReorder rule.

> consider bucket join in cost-based JoinReorder rule
> ---
>
> Key: SPARK-23839
> URL: https://issues.apache.org/jira/browse/SPARK-23839
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiaoju Wu
>Priority: Minor
>
> Since spark 2.2, the cost-based JoinReorder rule is implemented and in Spark 
> 2.3 released, it is improved with histogram. While it doesn't take the cost 
> of the different join implementations. For example:
> TableA JOIN TableB JOIN TableC
> TableA  will output 10,000 rows after filter and projection. 
> TableB  will output 10,000 rows after filter and projection. 
> TableC  will output 8,000 rows after filter and projection. 
> The current JoinReorder rule will possibly optimize the plan to join TableC 
> with TableA firstly and then TableB. But if the TableA and TableB are bucket 
> tables and can be applied with BucketJoin, it could be a different story. 
>  
> Also, to support bucket join of more than 2 tables when table bucket number 
> is multiple of another (SPARK-17570), whether bucket join can take effect 
> depends on the result of JoinReorder. For example of "A join B join C" which 
> has bucket number like 8, 4, 12, JoinReorder rule should optimize the order 
> to "A join B join C“ to make the bucket join take effect instead of "C join A 
> join B". 
>  
> Based on current CBO JoinReorder, there are possibly 2 part to be changed:
>  # CostBasedJoinReorder rule is applied in optimizer phase while we do Join 
> selection in planner phase and bucket join optimization in EnsureRequirements 
> which is in preparation phase. Both are after optimizer. 
>  # Current statistics and join cost formula are based data selectivity and 
> cardinality, we need to add statistics for present the join method cost like 
> shuffle, sort, hash and etc. Also we need to add the statistics into the 
> formula to estimate the join cost. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23839) consider bucket join in cost-based JoinReorder rule

2018-04-02 Thread Xiaoju Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1648#comment-1648
 ] 

Xiaoju Wu commented on SPARK-23839:
---

Yes, bucketing is one of the cases to say that the cost of shuffling and 
sorting should be taken into consideration of the cost formula. I agree the 
simple the better. Seems we need to think about a simple solution, which will 
not lead the join order to worse performance than that optimized after current 
CostBasedJoinReorder rule.

> consider bucket join in cost-based JoinReorder rule
> ---
>
> Key: SPARK-23839
> URL: https://issues.apache.org/jira/browse/SPARK-23839
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiaoju Wu
>Priority: Minor
>
> Since spark 2.2, the cost-based JoinReorder rule is implemented and in Spark 
> 2.3 released, it is improved with histogram. While it doesn't take the cost 
> of the different join implementations. For example:
> TableA JOIN TableB JOIN TableC
> TableA  will output 10,000 rows after filter and projection. 
> TableB  will output 10,000 rows after filter and projection. 
> TableC  will output 8,000 rows after filter and projection. 
> The current JoinReorder rule will possibly optimize the plan to join TableC 
> with TableA firstly and then TableB. But if the TableA and TableB are bucket 
> tables and can be applied with BucketJoin, it could be a different story. 
>  
> Also, to support bucket join of more than 2 tables when table bucket number 
> is multiple of another (SPARK-17570), whether bucket join can take effect 
> depends on the result of JoinReorder. For example of "A join B join C" which 
> has bucket number like 8, 4, 12, JoinReorder rule should optimize the order 
> to "A join B join C“ to make the bucket join take effect instead of "C join A 
> join B". 
>  
> Based on current CBO JoinReorder, there are possibly 2 part to be changed:
>  # CostBasedJoinReorder rule is applied in optimizer phase while we do Join 
> selection in planner phase and bucket join optimization in EnsureRequirements 
> which is in preparation phase. Both are after optimizer. 
>  # Current statistics and join cost formula are based data selectivity and 
> cardinality, we need to add statistics for present the join method cost like 
> shuffle, sort, hash and etc. Also we need to add the statistics into the 
> formula to estimate the join cost. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23839) consider bucket join in cost-based JoinReorder rule

2018-04-02 Thread Xiaoju Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16422116#comment-16422116
 ] 

Xiaoju Wu commented on SPARK-23839:
---

[~maropu] My concern is, "bucket join always firstly" doesn't mean best 
performance every time. In my example above, if C join A will have very less 
output which can benefit the following join a lot, why not join them firstly 
even though A and B can be applied with bucket join? 

> consider bucket join in cost-based JoinReorder rule
> ---
>
> Key: SPARK-23839
> URL: https://issues.apache.org/jira/browse/SPARK-23839
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiaoju Wu
>Priority: Minor
>
> Since spark 2.2, the cost-based JoinReorder rule is implemented and in Spark 
> 2.3 released, it is improved with histogram. While it doesn't take the cost 
> of the different join implementations. For example:
> TableA JOIN TableB JOIN TableC
> TableA  will output 10,000 rows after filter and projection. 
> TableB  will output 10,000 rows after filter and projection. 
> TableC  will output 8,000 rows after filter and projection. 
> The current JoinReorder rule will possibly optimize the plan to join TableC 
> with TableA firstly and then TableB. But if the TableA and TableB are bucket 
> tables and can be applied with BucketJoin, it could be a different story. 
>  
> Also, to support bucket join of more than 2 tables when table bucket number 
> is multiple of another (SPARK-17570), whether bucket join can take effect 
> depends on the result of JoinReorder. For example of "A join B join C" which 
> has bucket number like 8, 4, 12, JoinReorder rule should optimize the order 
> to "A join B join C“ to make the bucket join take effect instead of "C join A 
> join B". 
>  
> Based on current CBO JoinReorder, there are possibly 2 part to be changed:
>  # CostBasedJoinReorder rule is applied in optimizer phase while we do Join 
> selection in planner phase and bucket join optimization in EnsureRequirements 
> which is in preparation phase. Both are after optimizer. 
>  # Current statistics and join cost formula are based data selectivity and 
> cardinality, we need to add statistics for present the join method cost like 
> shuffle, sort, hash and etc. Also we need to add the statistics into the 
> formula to estimate the join cost. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23839) consider bucket join in cost-based JoinReorder rule

2018-04-02 Thread Xiaoju Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaoju Wu updated SPARK-23839:
--
Description: 
Since spark 2.2, the cost-based JoinReorder rule is implemented and in Spark 
2.3 released, it is improved with histogram. While it doesn't take the cost of 
the different join implementations. For example:

TableA JOIN TableB JOIN TableC

TableA  will output 10,000 rows after filter and projection. 

TableB  will output 10,000 rows after filter and projection. 

TableC  will output 8,000 rows after filter and projection. 

The current JoinReorder rule will possibly optimize the plan to join TableC 
with TableA firstly and then TableB. But if the TableA and TableB are bucket 
tables and can be applied with BucketJoin, it could be a different story. 

 

Also, to support bucket join of more than 2 tables when table bucket number is 
multiple of another (SPARK-17570), whether bucket join can take effect depends 
on the result of JoinReorder. For example of "A join B join C" which has bucket 
number like 8, 4, 12, JoinReorder rule should optimize the order to "A join B 
join C“ to make the bucket join take effect instead of "C join A join B". 

 

Based on current CBO JoinReorder, there are possibly 2 part to be changed:
 # CostBasedJoinReorder rule is applied in optimizer phase while we do Join 
selection in planner phase and bucket join optimization in EnsureRequirements 
which is in preparation phase. Both are after optimizer. 
 # Current statistics and join cost formula are based data selectivity and 
cardinality, we need to add statistics for present the join method cost like 
shuffle, sort, hash and etc. Also we need to add the statistics into the 
formula to estimate the join cost. 

  was:
Since spark 2.2, the cost-based JoinReorder rule is implemented and in Spark 
2.3 released, it is improved with histogram. While it doesn't take the cost of 
the different join implementations. For example:

TableA JOIN TableB JOIN TableC

TableA  will output 10,000 rows after filter and projection. 

TableB  will output 10,000 rows after filter and projection. 

TableC  will output 8,000 rows after filter and projection. 

The current JoinReorder rule will possibly optimize the plan to join TableC 
with TableA firstly and then TableB. But if the TableA and TableB are bucket 
tables and can be applied with BucketJoin, it could be a different story. 

 

Also, to support bucket join of more than 2 tables when table bucket number is 
multiple of another (SPARK-17570), whether bucket join can take effect depends 
on the result of JoinReorder. For example of "a join b join c" which has bucket 
number like 8, 12, 4, JoinReorder rule should optimize the order to "c join a 
join b“ to make the bucket join take effect. 

 

Based on current CBO JoinReorder, there are possibly 2 part to be changed:
 # CostBasedJoinReorder rule is applied in optimizer phase while we do Join 
selection in planner phase and bucket join optimization in EnsureRequirements 
which is in preparation phase. Both are after optimizer. 
 # Current statistics and join cost formula are based data selectivity and 
cardinality, we need to add statistics for present the join method cost like 
shuffle, sort, hash and etc. Also we need to add the statistics into the 
formula to estimate the join cost. 


> consider bucket join in cost-based JoinReorder rule
> ---
>
> Key: SPARK-23839
> URL: https://issues.apache.org/jira/browse/SPARK-23839
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiaoju Wu
>Priority: Minor
>
> Since spark 2.2, the cost-based JoinReorder rule is implemented and in Spark 
> 2.3 released, it is improved with histogram. While it doesn't take the cost 
> of the different join implementations. For example:
> TableA JOIN TableB JOIN TableC
> TableA  will output 10,000 rows after filter and projection. 
> TableB  will output 10,000 rows after filter and projection. 
> TableC  will output 8,000 rows after filter and projection. 
> The current JoinReorder rule will possibly optimize the plan to join TableC 
> with TableA firstly and then TableB. But if the TableA and TableB are bucket 
> tables and can be applied with BucketJoin, it could be a different story. 
>  
> Also, to support bucket join of more than 2 tables when table bucket number 
> is multiple of another (SPARK-17570), whether bucket join can take effect 
> depends on the result of JoinReorder. For example of "A join B join C" which 
> has bucket number like 8, 4, 12, JoinReorder rule should optimize the order 
> to "A join B join C“ to make the bucket join take effect instead of "C join A 
> join B". 
>  
> Based on current CBO JoinReorder, there are possibly 2 part to be changed:
>  # 

[jira] [Commented] (SPARK-23839) consider bucket join in cost-based JoinReorder rule

2018-04-01 Thread Xiaoju Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421613#comment-16421613
 ] 

Xiaoju Wu commented on SPARK-23839:
---

Any discussion or ticket already related to this topic please let me know. 

> consider bucket join in cost-based JoinReorder rule
> ---
>
> Key: SPARK-23839
> URL: https://issues.apache.org/jira/browse/SPARK-23839
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiaoju Wu
>Priority: Minor
>
> Since spark 2.2, the cost-based JoinReorder rule is implemented and in Spark 
> 2.3 released, it is improved with histogram. While it doesn't take the cost 
> of the different join implementations. For example:
> TableA JOIN TableB JOIN TableC
> TableA  will output 10,000 rows after filter and projection. 
> TableB  will output 10,000 rows after filter and projection. 
> TableC  will output 8,000 rows after filter and projection. 
> The current JoinReorder rule will possibly optimize the plan to join TableC 
> with TableA firstly and then TableB. But if the TableA and TableB are bucket 
> tables and can be applied with BucketJoin, it could be a different story. 
>  
> Also, to support bucket join of more than 2 tables when table bucket number 
> is multiple of another (SPARK-17570), whether bucket join can take effect 
> depends on the result of JoinReorder. For example of "a join b join c" which 
> has bucket number like 8, 12, 4, JoinReorder rule should optimize the order 
> to "c join a join b“ to make the bucket join take effect. 
>  
> Based on current CBO JoinReorder, there are possibly 2 part to be changed:
>  # CostBasedJoinReorder rule is applied in optimizer phase while we do Join 
> selection in planner phase and bucket join optimization in EnsureRequirements 
> which is in preparation phase. Both are after optimizer. 
>  # Current statistics and join cost formula are based data selectivity and 
> cardinality, we need to add statistics for present the join method cost like 
> shuffle, sort, hash and etc. Also we need to add the statistics into the 
> formula to estimate the join cost. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23839) consider bucket join in cost-based JoinReorder rule

2018-04-01 Thread Xiaoju Wu (JIRA)
Xiaoju Wu created SPARK-23839:
-

 Summary: consider bucket join in cost-based JoinReorder rule
 Key: SPARK-23839
 URL: https://issues.apache.org/jira/browse/SPARK-23839
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0
Reporter: Xiaoju Wu


Since spark 2.2, the cost-based JoinReorder rule is implemented and in Spark 
2.3 released, it is improved with histogram. While it doesn't take the cost of 
the different join implementations. For example:

TableA JOIN TableB JOIN TableC

TableA  will output 10,000 rows after filter and projection. 

TableB  will output 10,000 rows after filter and projection. 

TableC  will output 8,000 rows after filter and projection. 

The current JoinReorder rule will possibly optimize the plan to join TableC 
with TableA firstly and then TableB. But if the TableA and TableB are bucket 
tables and can be applied with BucketJoin, it could be a different story. 

 

Also, to support bucket join of more than 2 tables when table bucket number is 
multiple of another (SPARK-17570), whether bucket join can take effect depends 
on the result of JoinReorder. For example of "a join b join c" which has bucket 
number like 8, 12, 4, JoinReorder rule should optimize the order to "c join a 
join b“ to make the bucket join take effect. 

 

Based on current CBO JoinReorder, there are possibly 2 part to be changed:
 # CostBasedJoinReorder rule is applied in optimizer phase while we do Join 
selection in planner phase and bucket join optimization in EnsureRequirements 
which is in preparation phase. Both are after optimizer. 
 # Current statistics and join cost formula are based data selectivity and 
cardinality, we need to add statistics for present the join method cost like 
shuffle, sort, hash and etc. Also we need to add the statistics into the 
formula to estimate the join cost. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17570) Avoid Hash and Exchange in Sort Merge join if bucketing factor is multiple for tables

2018-03-22 Thread Xiaoju Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410752#comment-16410752
 ] 

Xiaoju Wu commented on SPARK-17570:
---

[~tejasp] When you join 3 tables with bucket number 4,8,12, if bucket join it 
depends on the ordering of join. Does it mean the changes on joinReordering 
rule?

> Avoid Hash and Exchange in Sort Merge join if bucketing factor is multiple 
> for tables
> -
>
> Key: SPARK-17570
> URL: https://issues.apache.org/jira/browse/SPARK-17570
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Tejas Patil
>Priority: Minor
>
> In case of bucketed tables, Spark will avoid doing `Sort` and `Exchange` if 
> the input tables and output table has same number of buckets. However, 
> unequal bucketing will always lead to `Sort` and `Exchange`. If the number of 
> buckets in the output table is a factor of the buckets in the input table, we 
> should be able to avoid `Sort` and `Exchange` and directly join those.
> eg.
> Assume Input1, Input2 and Output be bucketed + sorted tables over the same 
> columns but with different number of buckets. Input1 has 8 buckets, Input1 
> has 4 buckets and Output has 4 buckets. Since hash-partitioning is done using 
> Modulus, if we JOIN buckets (0, 4) of Input1 and buckets (0, 4, 8) of Input2 
> in the same task, it would give the bucket 0 of output table.
> {noformat}
> Input1   (0, 4)  (1, 3)  (2, 5)   (3, 7)
> Input2   (0, 4, 8)   (1, 3, 9)   (2, 5, 10)   (3, 7, 11)
> Output   (0) (1) (2)  (3)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-17495) Hive hash implementation

2018-03-07 Thread Xiaoju Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaoju Wu updated SPARK-17495:
--
Comment: was deleted

(was: [~tejasp] I can see HiveHash merged but never used. Seems the using of 
spark/hive hash is still under discussion, is there any update on this topic?)

> Hive hash implementation
> 
>
> Key: SPARK-17495
> URL: https://issues.apache.org/jira/browse/SPARK-17495
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Tejas Patil
>Assignee: Tejas Patil
>Priority: Minor
>
> Spark internally uses Murmur3Hash for partitioning. This is different from 
> the one used by Hive. For queries which use bucketing this leads to different 
> results if one tries the same query on both engines. For us, we want users to 
> have backward compatibility to that one can switch parts of applications 
> across the engines without observing regressions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17495) Hive hash implementation

2018-03-06 Thread Xiaoju Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387873#comment-16387873
 ] 

Xiaoju Wu commented on SPARK-17495:
---

[~tejasp] I can see HiveHash merged but never used. Seems the using of 
spark/hive hash is still under discussion, is there any update on this topic?

> Hive hash implementation
> 
>
> Key: SPARK-17495
> URL: https://issues.apache.org/jira/browse/SPARK-17495
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Tejas Patil
>Assignee: Tejas Patil
>Priority: Minor
>
> Spark internally uses Murmur3Hash for partitioning. This is different from 
> the one used by Hive. For queries which use bucketing this leads to different 
> results if one tries the same query on both engines. For us, we want users to 
> have backward compatibility to that one can switch parts of applications 
> across the engines without observing regressions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22469) Accuracy problem in comparison with string and numeric

2018-03-04 Thread Xiaoju Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16385156#comment-16385156
 ] 

Xiaoju Wu commented on SPARK-22469:
---

[~liutang123] cast Decimal to Double is possible to lose precision, why did you 
say "There is no proper decimal type we can pick, using double type is the best 
we can do." ? 

> Accuracy problem in comparison with string and numeric 
> ---
>
> Key: SPARK-22469
> URL: https://issues.apache.org/jira/browse/SPARK-22469
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Lijia Liu
>Assignee: Lijia Liu
>Priority: Major
> Fix For: 2.2.1, 2.3.0
>
>
> {code:sql}
> select '1.5' > 0.5; // Result is NULL in Spark but is true in Hive.
> {code}
> IIUC, we can cast string as double like Hive.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-23493) insert-into depends on columns order, otherwise incorrect data inserted

2018-02-28 Thread Xiaoju Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaoju Wu resolved SPARK-23493.
---
Resolution: Not A Bug

> insert-into depends on columns order, otherwise incorrect data inserted
> ---
>
> Key: SPARK-23493
> URL: https://issues.apache.org/jira/browse/SPARK-23493
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Xiaoju Wu
>Priority: Minor
>
> insert-into only works when the partitionby key columns are set at last:
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col1")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  spark
>  .createDataset(data2)
>  .toDF("col1", "col2", "col3")
>   .write
>  .insertInto(table)
> sql("select * from " + table).show()
> ++-++
> |col2|col3|col1|
> ++-++
> |test#test|0.0|8|
> |test1|1.0|7|
> |test3|0.0|9|
> |8|null|0|
> |9|null|0|
> |7|null|1|
> ++-++
>  
> If you try inserting with sql, the issue is the same.
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col1")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
> sql("insert into " + table + " values(7,'test2',1.0)")
>  sql("select * from " + table).show()
> +--+---++
> |col2|col3|col1|
> +--+---++
> |test#test|0.0|8|
> |test1|1.0|7|
> |test3|0.0|9|
> |7|null|1|
> +--+---++
> No exception was thrown since I only run insertInto, not together with 
> partitionBy. The data are inserted incorrectly. The issue is related to 
> column order. If I change to partitionBy col3, which is the last column, it 
> works.
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col3")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
> spark
>  .createDataset(data2)
>  .toDF("col1", "col2", "col3")
>  .write
>  .insertInto(table)
> sql("select * from " + table).show()
> +---+--++
> |col1|col2|col3|
> +---+--++
> |8|test#test|0.0|
> |9|test3|0.0|
> |8|test#test|0.0|
> |9|test3|0.0|
> |7|test1|1.0|
> |7|test2|1.0|
> +---+--++



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23493) insert-into depends on columns order, otherwise incorrect data inserted

2018-02-23 Thread Xiaoju Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374266#comment-16374266
 ] 

Xiaoju Wu commented on SPARK-23493:
---

If that's the case, it should throw an exception to tell the users of the API 
"please set the partition column at last" instead of swallow the issue and just 
return the data inserted incorrectly, right?

> insert-into depends on columns order, otherwise incorrect data inserted
> ---
>
> Key: SPARK-23493
> URL: https://issues.apache.org/jira/browse/SPARK-23493
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Xiaoju Wu
>Priority: Minor
>
> insert-into only works when the partitionby key columns are set at last:
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col1")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  spark
>  .createDataset(data2)
>  .toDF("col1", "col2", "col3")
>   .write
>  .insertInto(table)
> sql("select * from " + table).show()
> ++-++
> |col2|col3|col1|
> ++-++
> |test#test|0.0|8|
> |test1|1.0|7|
> |test3|0.0|9|
> |8|null|0|
> |9|null|0|
> |7|null|1|
> ++-++
>  
> If you try inserting with sql, the issue is the same.
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col1")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
> sql("insert into " + table + " values(7,'test2',1.0)")
>  sql("select * from " + table).show()
> +--+---++
> |col2|col3|col1|
> +--+---++
> |test#test|0.0|8|
> |test1|1.0|7|
> |test3|0.0|9|
> |7|null|1|
> +--+---++
> No exception was thrown since I only run insertInto, not together with 
> partitionBy. The data are inserted incorrectly. The issue is related to 
> column order. If I change to partitionBy col3, which is the last column, it 
> works.
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col3")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
> spark
>  .createDataset(data2)
>  .toDF("col1", "col2", "col3")
>  .write
>  .insertInto(table)
> sql("select * from " + table).show()
> +---+--++
> |col1|col2|col3|
> +---+--++
> |8|test#test|0.0|
> |9|test3|0.0|
> |8|test#test|0.0|
> |9|test3|0.0|
> |7|test1|1.0|
> |7|test2|1.0|
> +---+--++



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23493) insert-into depends on columns order, otherwise incorrect data inserted

2018-02-23 Thread Xiaoju Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374172#comment-16374172
 ] 

Xiaoju Wu commented on SPARK-23493:
---

[~mgaido] "Columns are matched in order while inserting" This is acceptable, 
but if you looking into my case, you will find it forces the partition key to 
be placed at the last of the columns, that seems confusing. 

> insert-into depends on columns order, otherwise incorrect data inserted
> ---
>
> Key: SPARK-23493
> URL: https://issues.apache.org/jira/browse/SPARK-23493
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Xiaoju Wu
>Priority: Minor
>
> insert-into only works when the partitionby key columns are set at last:
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col1")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  spark
>  .createDataset(data2)
>  .toDF("col1", "col2", "col3")
>   .write
>  .insertInto(table)
> sql("select * from " + table).show()
> ++-++
> |col2|col3|col1|
> ++-++
> |test#test|0.0|8|
> |test1|1.0|7|
> |test3|0.0|9|
> |8|null|0|
> |9|null|0|
> |7|null|1|
> ++-++
>  
> If you try inserting with sql, the issue is the same.
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col1")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
> sql("insert into " + table + " values(7,'test2',1.0)")
>  sql("select * from " + table).show()
> +--+---++
> |col2|col3|col1|
> +--+---++
> |test#test|0.0|8|
> |test1|1.0|7|
> |test3|0.0|9|
> |7|null|1|
> +--+---++
> No exception was thrown since I only run insertInto, not together with 
> partitionBy. The data are inserted incorrectly. The issue is related to 
> column order. If I change to partitionBy col3, which is the last column, it 
> works.
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col3")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
> spark
>  .createDataset(data2)
>  .toDF("col1", "col2", "col3")
>  .write
>  .insertInto(table)
> sql("select * from " + table).show()
> +---+--++
> |col1|col2|col3|
> +---+--++
> |8|test#test|0.0|
> |9|test3|0.0|
> |8|test#test|0.0|
> |9|test3|0.0|
> |7|test1|1.0|
> |7|test2|1.0|
> +---+--++



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23493) insert-into depends on columns order, otherwise incorrect data inserted

2018-02-23 Thread Xiaoju Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374076#comment-16374076
 ] 

Xiaoju Wu commented on SPARK-23493:
---

This issue is similar with the issue described in ticket SPARK-9278. While 
seems in that ticket, only fixed the insert-into cannot be run together with 
partitionBy. If I try inserting into a table already set with partition key, 
the data are inserted incorrectly. 

> insert-into depends on columns order, otherwise incorrect data inserted
> ---
>
> Key: SPARK-23493
> URL: https://issues.apache.org/jira/browse/SPARK-23493
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Xiaoju Wu
>Priority: Minor
>
> insert-into only works when the partitionby key columns are set at last:
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col1")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  spark
>  .createDataset(data2)
>  .toDF("col1", "col2", "col3")
>   .write
>  .insertInto(table)
> sql("select * from " + table).show()
> ++-++
> |col2|col3|col1|
> ++-++
> |test#test|0.0|8|
> |test1|1.0|7|
> |test3|0.0|9|
> |8|null|0|
> |9|null|0|
> |7|null|1|
> ++-++
>  
> If you try inserting with sql, the issue is the same.
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col1")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
> sql("insert into " + table + " values(7,'test2',1.0)")
>  sql("select * from " + table).show()
> +--+---++
> |col2|col3|col1|
> +--+---++
> |test#test|0.0|8|
> |test1|1.0|7|
> |test3|0.0|9|
> |7|null|1|
> +--+---++
> No exception was thrown since I only run insertInto, not together with 
> partitionBy. The data are inserted incorrectly. The issue is related to 
> column order. If I change to partitionBy col3, which is the last column, it 
> works.
> val data = Seq(
>  (7, "test1", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
>  import spark.implicits._
> val table = "default.tbl"
>  spark
>  .createDataset(data)
>  .toDF("col1", "col2", "col3")
>  .write
>  .partitionBy("col3")
>  .saveAsTable(table)
> val data2 = Seq(
>  (7, "test2", 1.0),
>  (8, "test#test", 0.0),
>  (9, "test3", 0.0)
>  )
> spark
>  .createDataset(data2)
>  .toDF("col1", "col2", "col3")
>  .write
>  .insertInto(table)
> sql("select * from " + table).show()
> +---+--++
> |col1|col2|col3|
> +---+--++
> |8|test#test|0.0|
> |9|test3|0.0|
> |8|test#test|0.0|
> |9|test3|0.0|
> |7|test1|1.0|
> |7|test2|1.0|
> +---+--++



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9278) DataFrameWriter.insertInto inserts incorrect data

2018-02-23 Thread Xiaoju Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374073#comment-16374073
 ] 

Xiaoju Wu commented on SPARK-9278:
--

Created a new ticket to trace this issue SPARK-23493

> DataFrameWriter.insertInto inserts incorrect data
> -
>
> Key: SPARK-9278
> URL: https://issues.apache.org/jira/browse/SPARK-9278
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.4.0
> Environment: Linux, S3, Hive Metastore
>Reporter: Steve Lindemann
>Assignee: Cheng Lian
>Priority: Critical
>
> After creating a partitioned Hive table (stored as Parquet) via the 
> DataFrameWriter.createTable command, subsequent attempts to insert additional 
> data into new partitions of this table result in inserting incorrect data 
> rows. Reordering the columns in the data to be written seems to avoid this 
> issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23493) insert-into depends on columns order, otherwise incorrect data inserted

2018-02-23 Thread Xiaoju Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaoju Wu updated SPARK-23493:
--
Description: 
insert-into only works when the partitionby key columns are set at last:

val data = Seq(
 (7, "test1", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )
 import spark.implicits._

val table = "default.tbl"
 spark
 .createDataset(data)
 .toDF("col1", "col2", "col3")
 .write
 .partitionBy("col1")
 .saveAsTable(table)

val data2 = Seq(
 (7, "test2", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )

 spark
 .createDataset(data2)
 .toDF("col1", "col2", "col3")
  .write
 .insertInto(table)

sql("select * from " + table).show()

++-++
|col2|col3|col1|

++-++
|test#test|0.0|8|
|test1|1.0|7|
|test3|0.0|9|
|8|null|0|
|9|null|0|
|7|null|1|

++-++

 

If you try inserting with sql, the issue is the same.

val data = Seq(
 (7, "test1", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )
 import spark.implicits._

val table = "default.tbl"
 spark
 .createDataset(data)
 .toDF("col1", "col2", "col3")
 .write
 .partitionBy("col1")
 .saveAsTable(table)

val data2 = Seq(
 (7, "test2", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )

sql("insert into " + table + " values(7,'test2',1.0)")
 sql("select * from " + table).show()

+--+---++
|col2|col3|col1|

+--+---++
|test#test|0.0|8|
|test1|1.0|7|
|test3|0.0|9|
|7|null|1|

+--+---++

No exception was thrown since I only run insertInto, not together with 
partitionBy. The data are inserted incorrectly. The issue is related to column 
order. If I change to partitionBy col3, which is the last column, it works.

val data = Seq(
 (7, "test1", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )
 import spark.implicits._

val table = "default.tbl"
 spark
 .createDataset(data)
 .toDF("col1", "col2", "col3")
 .write
 .partitionBy("col3")
 .saveAsTable(table)

val data2 = Seq(
 (7, "test2", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )

spark
 .createDataset(data2)
 .toDF("col1", "col2", "col3")
 .write
 .insertInto(table)

sql("select * from " + table).show()

+---+--++
|col1|col2|col3|

+---+--++
|8|test#test|0.0|
|9|test3|0.0|
|8|test#test|0.0|
|9|test3|0.0|
|7|test1|1.0|
|7|test2|1.0|

+---+--++

  was:
Seems the issue still exists, here's the test:

val data = Seq(
(7, "test1", 1.0),
(8, "test#test", 0.0),
(9, "test3", 0.0)
)
import spark.implicits._

val table = "default.tbl"
spark
.createDataset(data)
.toDF("col1", "col2", "col3")
.write
.partitionBy("col1")
.saveAsTable(table)

val data2 = Seq(
(7, "test2", 1.0),
(8, "test#test", 0.0),
(9, "test3", 0.0)
)

 spark
.createDataset(data2)
.toDF("col1", "col2", "col3")
 .write
.insertInto(table)

sql("select * from " + table).show()

+---+--++
|col2|col3|col1|

+---+--++
|test#test|0.0|8|
|test1|1.0|7|
|test3|0.0|9|
|8|null|0|
|9|null|0|
|7|null|1|

+---+--++

 

If you try inserting with sql, the issue is the same.

val data = Seq(
 (7, "test1", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )
 import spark.implicits._

 val table = "default.tbl"
 spark
 .createDataset(data)
 .toDF("col1", "col2", "col3")
 .write
 .partitionBy("col1")
 .saveAsTable(table)

 val data2 = Seq(
 (7, "test2", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )

 sql("insert into " + table + " values(7,'test2',1.0)")
 sql("select * from " + table).show()

+-+++
| col2|col3|col1|
+-+++
|test#test| 0.0| 8|
| test1| 1.0| 7|
| test3| 0.0| 9|
| 7|null| 1|
+-+++

No exception was thrown since I only run insertInto, not together with 
partitionBy. The data are inserted incorrectly. The issue is related to column 
order. If I change to partitionBy col3, which is the last column, it works.

val data = Seq(
(7, "test1", 1.0),
(8, "test#test", 0.0),
(9, "test3", 0.0)
)
import spark.implicits._

val table = "default.tbl"
spark
.createDataset(data)
.toDF("col1", "col2", "col3")
.write
.partitionBy("col3")
.saveAsTable(table)

val data2 = Seq(
(7, "test2", 1.0),
(8, "test#test", 0.0),
(9, "test3", 0.0)
)

spark
.createDataset(data2)
.toDF("col1", "col2", "col3")
.write
.insertInto(table)

sql("select * from " + table).show()

+--+---++
|col1|col2|col3|

+--+---++
|8|test#test|0.0|
|9|test3|0.0|
|8|test#test|0.0|
|9|test3|0.0|
|7|test1|1.0|
|7|test2|1.0|

+--+---++


> insert-into depends on columns order, otherwise incorrect data inserted
> ---
>
> Key: SPARK-23493
> URL: https://issues.apache.org/jira/browse/SPARK-23493
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Xiaoju Wu
>Priority: Minor
>
> insert-into only 

[jira] [Created] (SPARK-23493) insert-into depends on columns order, otherwise incorrect data inserted

2018-02-23 Thread Xiaoju Wu (JIRA)
Xiaoju Wu created SPARK-23493:
-

 Summary: insert-into depends on columns order, otherwise incorrect 
data inserted
 Key: SPARK-23493
 URL: https://issues.apache.org/jira/browse/SPARK-23493
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.2.1
Reporter: Xiaoju Wu


Seems the issue still exists, here's the test:

val data = Seq(
(7, "test1", 1.0),
(8, "test#test", 0.0),
(9, "test3", 0.0)
)
import spark.implicits._

val table = "default.tbl"
spark
.createDataset(data)
.toDF("col1", "col2", "col3")
.write
.partitionBy("col1")
.saveAsTable(table)

val data2 = Seq(
(7, "test2", 1.0),
(8, "test#test", 0.0),
(9, "test3", 0.0)
)

 spark
.createDataset(data2)
.toDF("col1", "col2", "col3")
 .write
.insertInto(table)

sql("select * from " + table).show()

+---+--++
|col2|col3|col1|

+---+--++
|test#test|0.0|8|
|test1|1.0|7|
|test3|0.0|9|
|8|null|0|
|9|null|0|
|7|null|1|

+---+--++

 

If you try inserting with sql, the issue is the same.

val data = Seq(
 (7, "test1", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )
 import spark.implicits._

 val table = "default.tbl"
 spark
 .createDataset(data)
 .toDF("col1", "col2", "col3")
 .write
 .partitionBy("col1")
 .saveAsTable(table)

 val data2 = Seq(
 (7, "test2", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )

 sql("insert into " + table + " values(7,'test2',1.0)")
 sql("select * from " + table).show()

+-+++
| col2|col3|col1|
+-+++
|test#test| 0.0| 8|
| test1| 1.0| 7|
| test3| 0.0| 9|
| 7|null| 1|
+-+++

No exception was thrown since I only run insertInto, not together with 
partitionBy. The data are inserted incorrectly. The issue is related to column 
order. If I change to partitionBy col3, which is the last column, it works.

val data = Seq(
(7, "test1", 1.0),
(8, "test#test", 0.0),
(9, "test3", 0.0)
)
import spark.implicits._

val table = "default.tbl"
spark
.createDataset(data)
.toDF("col1", "col2", "col3")
.write
.partitionBy("col3")
.saveAsTable(table)

val data2 = Seq(
(7, "test2", 1.0),
(8, "test#test", 0.0),
(9, "test3", 0.0)
)

spark
.createDataset(data2)
.toDF("col1", "col2", "col3")
.write
.insertInto(table)

sql("select * from " + table).show()

+--+---++
|col1|col2|col3|

+--+---++
|8|test#test|0.0|
|9|test3|0.0|
|8|test#test|0.0|
|9|test3|0.0|
|7|test1|1.0|
|7|test2|1.0|

+--+---++



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-9278) DataFrameWriter.insertInto inserts incorrect data

2018-02-22 Thread Xiaoju Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374040#comment-16374040
 ] 

Xiaoju Wu edited comment on SPARK-9278 at 2/23/18 7:48 AM:
---

Seems the issue still exists, here's the test:

val data = Seq(
 (7, "test1", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )
 import spark.implicits._

val table = "default.tbl"
 spark
 .createDataset(data)
 .toDF("col1", "col2", "col3")
 .write
 .partitionBy("col1")
 .saveAsTable(table)

val data2 = Seq(
 (7, "test2", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )

 spark
 .createDataset(data2)
 .toDF("col1", "col2", "col3")
  .write
 .insertInto(table)

sql("select * from " + table).show()

+--+---++
|col2|col3|col1|

+--+---++
|test#test|0.0|8|
|test1|1.0|7|
|test3|0.0|9|
|8|null|0|
|9|null|0|
|7|null|1|

+--+---++

No exception was thrown since I only run insertInto not together with 
partitionBy. The data are inserted incorrectly. The issue is related to column 
order. If I change to partitionBy col3, which is the last column in order, it 
works.

val data = Seq(
 (7, "test1", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )
 import spark.implicits._

val table = "default.tbl"
 spark
 .createDataset(data)
 .toDF("col1", "col2", "col3")
 .write
 .partitionBy("col3")
 .saveAsTable(table)

val data2 = Seq(
 (7, "test2", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )

spark
 .createDataset(data2)
 .toDF("col1", "col2", "col3")
 .write
 .insertInto(table)

sql("select * from " + table).show()

+-+++
|col1|col2|col3|

+-+++
|8|test#test|0.0|
|9|test3|0.0|
|8|test#test|0.0|
|9|test3|0.0|
|7|test1|1.0|
|7|test2|1.0|

+-+++

 

[~hyukjin.kwon] It's still a problem, it's not a problem as you reproduced, if 
you insert into a partitioned table which already exists, the problem occurs. 


was (Author: xiaojuwu):
Seems the issue still exists, here's the test:

val data = Seq(
 (7, "test1", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )
 import spark.implicits._

 val table = "default.tbl"
 spark
 .createDataset(data)
 .toDF("col1", "col2", "col3")
 .write
 .partitionBy("col1")
 .saveAsTable(table)

 val data2 = Seq(
 (7, "test2", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )

 spark
.createDataset(data2)
.toDF("col1", "col2", "col3")
 .write
.insertInto(table)

 sql("select * from " + table).show()

+-+++
| col2|col3|col1|
+-+++
|test#test| 0.0| 8|
| test1| 1.0| 7|
| test3| 0.0| 9|
| 8|null| 0|
| 9|null| 0|
| 7|null| 1|
+-+++

No exception was thrown since I only run insertInto not together with 
partitionBy. The data are inserted incorrectly. The issue is related to column 
order. If I change to partitionBy col3, which is the last column in order, it 
works.

val data = Seq(
 (7, "test1", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
)
import spark.implicits._

val table = "default.tbl"
spark
 .createDataset(data)
 .toDF("col1", "col2", "col3")
 .write
 .partitionBy("col3")
 .saveAsTable(table)

val data2 = Seq(
 (7, "test2", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
)

spark
 .createDataset(data2)
 .toDF("col1", "col2", "col3")
 .write
 .insertInto(table)

sql("select * from " + table).show()

++-++
|col1| col2|col3|
++-++
| 8|test#test| 0.0|
| 9| test3| 0.0|
| 8|test#test| 0.0|
| 9| test3| 0.0|
| 7| test1| 1.0|
| 7| test2| 1.0|
++-++

> DataFrameWriter.insertInto inserts incorrect data
> -
>
> Key: SPARK-9278
> URL: https://issues.apache.org/jira/browse/SPARK-9278
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.4.0
> Environment: Linux, S3, Hive Metastore
>Reporter: Steve Lindemann
>Assignee: Cheng Lian
>Priority: Critical
>
> After creating a partitioned Hive table (stored as Parquet) via the 
> DataFrameWriter.createTable command, subsequent attempts to insert additional 
> data into new partitions of this table result in inserting incorrect data 
> rows. Reordering the columns in the data to be written seems to avoid this 
> issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9278) DataFrameWriter.insertInto inserts incorrect data

2018-02-22 Thread Xiaoju Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374040#comment-16374040
 ] 

Xiaoju Wu commented on SPARK-9278:
--

Seems the issue still exists, here's the test:

val data = Seq(
 (7, "test1", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )
 import spark.implicits._

 val table = "default.tbl"
 spark
 .createDataset(data)
 .toDF("col1", "col2", "col3")
 .write
 .partitionBy("col1")
 .saveAsTable(table)

 val data2 = Seq(
 (7, "test2", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
 )

 spark
.createDataset(data2)
.toDF("col1", "col2", "col3")
 .write
.insertInto(table)

 sql("select * from " + table).show()

+-+++
| col2|col3|col1|
+-+++
|test#test| 0.0| 8|
| test1| 1.0| 7|
| test3| 0.0| 9|
| 8|null| 0|
| 9|null| 0|
| 7|null| 1|
+-+++

No exception was thrown since I only run insertInto not together with 
partitionBy. The data are inserted incorrectly. The issue is related to column 
order. If I change to partitionBy col3, which is the last column in order, it 
works.

val data = Seq(
 (7, "test1", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
)
import spark.implicits._

val table = "default.tbl"
spark
 .createDataset(data)
 .toDF("col1", "col2", "col3")
 .write
 .partitionBy("col3")
 .saveAsTable(table)

val data2 = Seq(
 (7, "test2", 1.0),
 (8, "test#test", 0.0),
 (9, "test3", 0.0)
)

spark
 .createDataset(data2)
 .toDF("col1", "col2", "col3")
 .write
 .insertInto(table)

sql("select * from " + table).show()

++-++
|col1| col2|col3|
++-++
| 8|test#test| 0.0|
| 9| test3| 0.0|
| 8|test#test| 0.0|
| 9| test3| 0.0|
| 7| test1| 1.0|
| 7| test2| 1.0|
++-++

> DataFrameWriter.insertInto inserts incorrect data
> -
>
> Key: SPARK-9278
> URL: https://issues.apache.org/jira/browse/SPARK-9278
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.4.0
> Environment: Linux, S3, Hive Metastore
>Reporter: Steve Lindemann
>Assignee: Cheng Lian
>Priority: Critical
>
> After creating a partitioned Hive table (stored as Parquet) via the 
> DataFrameWriter.createTable command, subsequent attempts to insert additional 
> data into new partitions of this table result in inserting incorrect data 
> rows. Reordering the columns in the data to be written seems to avoid this 
> issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org