[jira] [Created] (SPARK-31872) NotNullSafe to get complementary set
Xiaoju Wu created SPARK-31872: - Summary: NotNullSafe to get complementary set Key: SPARK-31872 URL: https://issues.apache.org/jira/browse/SPARK-31872 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0, 2.3.0, 3.0.0 Reporter: Xiaoju Wu If we have a filter expression to get subset of rows, and then we want the complementary set, Not(expression) cannot work, since Not is NullIntolerent, if expression.eval(row) is null, filter predicate is false too for Not(expression), "row" will not appear in both subset and complementary set. So, maybe we need a NotNullSafe implementation to get the complementary set which will result in true if the expression.eval(row) is null. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-30443) "Managed memory leak detected" even with no calls to take() or limit()
[ https://issues.apache.org/jira/browse/SPARK-30443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17068290#comment-17068290 ] Xiaoju Wu edited comment on SPARK-30443 at 3/27/20, 5:50 AM: - Also see this kind of warning logs. SPARK-21492 may relate to this warning. Does your code base contain it? And I'm afraid there could be other consumers not release memory by themselves but let the task release all memory related to taskId at the end of task. was (Author: xiaojuwu): Also see this kind of warning logs. SPARK-21492 may relate to this warning. Does your code base contain it? > "Managed memory leak detected" even with no calls to take() or limit() > -- > > Key: SPARK-30443 > URL: https://issues.apache.org/jira/browse/SPARK-30443 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.2, 2.4.4, 3.0.0 >Reporter: Luke Richter >Priority: Major > Attachments: a.csv.zip, b.csv.zip, c.csv.zip > > > Our Spark code is causing a "Managed memory leak detected" warning to appear, > even though we are not calling take() or limit(). > According to SPARK-14168 https://issues.apache.org/jira/browse/SPARK-14168 > managed memory leaks should only be caused by not reading an iterator to > completion, i.e. take() or limit() > Our exact warning text is: "2020-01-06 14:54:59 WARN Executor:66 - Managed > memory leak detected; size = 2097152 bytes, TID = 118" > The size of the managed memory leak is always 2MB. > I have created a minimal test program that reproduces the warning: > {code:java} > import pyspark.sql > import pyspark.sql.functions as fx > def main(): > builder = pyspark.sql.SparkSession.builder > builder = builder.appName("spark-jira") > spark = builder.getOrCreate() > reader = spark.read > reader = reader.format("csv") > reader = reader.option("inferSchema", "true") > reader = reader.option("header", "true") > table_c = reader.load("c.csv") > table_a = reader.load("a.csv") > table_b = reader.load("b.csv") > primary_filter = fx.col("some_code").isNull() > new_primary_data = table_a.filter(primary_filter) > new_ids = new_primary_data.select("some_id") > new_data = table_b.join(new_ids, "some_id") > new_data = new_data.select("some_id") > result = table_c.join(new_data, "some_id", "left") > result.repartition(1).write.json("results.json", mode="overwrite") > spark.stop() > if __name__ == "__main__": > main() > {code} > Our code isn't anything out of the ordinary, just some filters, selects and > joins. > The input data is made up of 3 CSV files. The input data files are quite > large, roughly 2.6GB in total uncompressed. I attempted to reduce the number > of rows in the CSV input files but this caused the warning to no longer > appear. After compressing the files I was able to attach them below. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-30443) "Managed memory leak detected" even with no calls to take() or limit()
[ https://issues.apache.org/jira/browse/SPARK-30443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17068290#comment-17068290 ] Xiaoju Wu commented on SPARK-30443: --- Also see this kind of warning logs. SPARK-21492 may relate to this warning. Does your code base contain it? > "Managed memory leak detected" even with no calls to take() or limit() > -- > > Key: SPARK-30443 > URL: https://issues.apache.org/jira/browse/SPARK-30443 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.2, 2.4.4, 3.0.0 >Reporter: Luke Richter >Priority: Major > Attachments: a.csv.zip, b.csv.zip, c.csv.zip > > > Our Spark code is causing a "Managed memory leak detected" warning to appear, > even though we are not calling take() or limit(). > According to SPARK-14168 https://issues.apache.org/jira/browse/SPARK-14168 > managed memory leaks should only be caused by not reading an iterator to > completion, i.e. take() or limit() > Our exact warning text is: "2020-01-06 14:54:59 WARN Executor:66 - Managed > memory leak detected; size = 2097152 bytes, TID = 118" > The size of the managed memory leak is always 2MB. > I have created a minimal test program that reproduces the warning: > {code:java} > import pyspark.sql > import pyspark.sql.functions as fx > def main(): > builder = pyspark.sql.SparkSession.builder > builder = builder.appName("spark-jira") > spark = builder.getOrCreate() > reader = spark.read > reader = reader.format("csv") > reader = reader.option("inferSchema", "true") > reader = reader.option("header", "true") > table_c = reader.load("c.csv") > table_a = reader.load("a.csv") > table_b = reader.load("b.csv") > primary_filter = fx.col("some_code").isNull() > new_primary_data = table_a.filter(primary_filter) > new_ids = new_primary_data.select("some_id") > new_data = table_b.join(new_ids, "some_id") > new_data = new_data.select("some_id") > result = table_c.join(new_data, "some_id", "left") > result.repartition(1).write.json("results.json", mode="overwrite") > spark.stop() > if __name__ == "__main__": > main() > {code} > Our code isn't anything out of the ordinary, just some filters, selects and > joins. > The input data is made up of 3 CSV files. The input data files are quite > large, roughly 2.6GB in total uncompressed. I attempted to reduce the number > of rows in the CSV input files but this caused the warning to no longer > appear. After compressing the files I was able to attach them below. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31069) high cpu caused by chunksBeingTransferred in external shuffle service
[ https://issues.apache.org/jira/browse/SPARK-31069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaoju Wu updated SPARK-31069: -- Description: "shuffle-chunk-fetch-handler-2-40" #250 daemon prio=5 os_prio=0 tid=0x02ac nid=0xb9b3 runnable [0x7ff20a1af000] java.lang.Thread.State: RUNNABLE at java.util.concurrent.ConcurrentHashMap$Traverser.advance(ConcurrentHashMap.java:3339) at java.util.concurrent.ConcurrentHashMap$ValueIterator.next(ConcurrentHashMap.java:3439) at org.apache.spark.network.server.OneForOneStreamManager.chunksBeingTransferred(OneForOneStreamManager.java:184) at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:85) at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:51) at org.spark_project.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:38) at org.spark_project.io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:353) at org.spark_project.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) at org.spark_project.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) at org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at org.spark_project.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) at java.lang.Thread.run(Thread.java:748) "shuffle-chunk-fetch-handler-2-48" #235 daemon prio=5 os_prio=0 tid=0x7ff2302ec800 nid=0xb9ad runnable [0x7ff20a7b4000] java.lang.Thread.State: RUNNABLE at org.apache.spark.network.server.OneForOneStreamManager.chunksBeingTransferred(OneForOneStreamManager.java:186) at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:85) at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:51) at org.spark_project.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:38) at org.spark_project.io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:353) at org.spark_project.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) at org.spark_project.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) at org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at org.spark_project.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) at java.lang.Thread.run(Thread.java:748) > high cpu caused by chunksBeingTransferred in external shuffle service > - > > Key: SPARK-31069 > URL: https://issues.apache.org/jira/browse/SPARK-31069 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 3.0.0 >Reporter: Xiaoju Wu >Priority: Major > > "shuffle-chunk-fetch-handler-2-40" #250 daemon prio=5 os_prio=0 > tid=0x02ac nid=0xb9b3 runnable [0x7ff20a1af000] >java.lang.Thread.State: RUNNABLE > at > java.util.concurrent.ConcurrentHashMap$Traverser.advance(ConcurrentHashMap.java:3339) > at > java.util.concurrent.ConcurrentHashMap$ValueIterator.next(ConcurrentHashMap.java:3439) > at > org.apache.spark.network.server.OneForOneStreamManager.chunksBeingTransferred(OneForOneStreamManager.java:184) > at > org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:85) > at > org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:51) > at >
[jira] [Created] (SPARK-31069) high cpu caused by chunksBeingTransferred in external shuffle service
Xiaoju Wu created SPARK-31069: - Summary: high cpu caused by chunksBeingTransferred in external shuffle service Key: SPARK-31069 URL: https://issues.apache.org/jira/browse/SPARK-31069 Project: Spark Issue Type: Improvement Components: Shuffle Affects Versions: 3.0.0 Reporter: Xiaoju Wu -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23811) FetchFailed comes before Success of same task will cause child stage never succeed
[ https://issues.apache.org/jira/browse/SPARK-23811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006539#comment-17006539 ] Xiaoju Wu commented on SPARK-23811: --- [~XuanYuan] The issue seems still exist after patch #17955, any update? > FetchFailed comes before Success of same task will cause child stage never > succeed > -- > > Key: SPARK-23811 > URL: https://issues.apache.org/jira/browse/SPARK-23811 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0, 2.3.0 >Reporter: Yuanjian Li >Priority: Major > Attachments: 1.png, 2.png > > > This is a bug caused by abnormal scenario describe below: > # ShuffleMapTask 1.0 running, this task will fetch data from ExecutorA > # ExecutorA Lost, trigger `mapOutputTracker.removeOutputsOnExecutor(execId)` > , shuffleStatus changed. > # Speculative ShuffleMapTask 1.1 start, got a FetchFailed immediately. > # ShuffleMapTask 1.0 finally succeed, but because of 1.1's FetchFailed, > stage still mark as failed stage. > # ShuffleMapTask 1 is the last task of its stage, so this stage will never > succeed because of there's no missing task DagScheduler can get. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-30298) bucket join cannot work for self-join with views
Xiaoju Wu created SPARK-30298: - Summary: bucket join cannot work for self-join with views Key: SPARK-30298 URL: https://issues.apache.org/jira/browse/SPARK-30298 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.0.0 Reporter: Xiaoju Wu This UT may fail at the last line: {code:java} test("bucket join cannot work for self-join with views") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") { withTable("t1") { val df = (0 until 20).map(i => (i, i)).toDF("i", "j").as("df") df.write .format("parquet") .bucketBy(8, "i") .saveAsTable("t1") sql(s"create view v1 as select * from t1").collect() val plan1 = sql("SELECT * FROM t1 a JOIN t1 b ON a.i = b.i").queryExecution.executedPlan assert(plan1.collect { case exchange : ShuffleExchangeExec => exchange }.isEmpty) val plan2 = sql("SELECT * FROM t1 a JOIN v1 b ON a.i = b.i").queryExecution.executedPlan assert(plan2.collect { case exchange : ShuffleExchangeExec => exchange }.isEmpty) } } } {code} It's because View will add Project with Alias, then Join's requiredDistribution is based on Alias, but ProjectExec passes child's outputPartition up without Alias. Then the satisfies check cannot meet in EnsureRequirement. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-30072) Create dedicated planner for subqueries
[ https://issues.apache.org/jira/browse/SPARK-30072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16995627#comment-16995627 ] Xiaoju Wu commented on SPARK-30072: --- [~cloud_fan] If the sql looks like: SELECT * FROM df2 WHERE df2.k = (SELECT max(df2.k) FROM df1 JOIN df2 ON df1.k = df2.k AND df2.id < 2) The nested subquery "SELECT max(df2.k) FROM df1 JOIN df2 ON df1.k = df2.k AND df2.id < 2" will be run in another QueryExecution, there's no way to pass "isSubquery" information to InsertAdaptiveSparkPlan in the nested QueryExecution. > Create dedicated planner for subqueries > --- > > Key: SPARK-30072 > URL: https://issues.apache.org/jira/browse/SPARK-30072 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Ali Afroozeh >Assignee: Ali Afroozeh >Priority: Minor > Fix For: 3.0.0 > > > This PR changes subquery planning by calling the planner and plan preparation > rules on the subquery plan directly. Before we were creating a QueryExecution > instance for subqueries to get the executedPlan. This would re-run analysis > and optimization on the subqueries plan. Running the analysis again on an > optimized query plan can have unwanted consequences, as some rules, for > example DecimalPrecision, are not idempotent. > As an example, consider the expression 1.7 * avg(a) which after applying the > DecimalPrecision rule becomes: > promote_precision(1.7) * promote_precision(avg(a)) > After the optimization, more specifically the constant folding rule, this > expression becomes: > 1.7 * promote_precision(avg(a)) > Now if we run the analyzer on this optimized query again, we will get: > promote_precision(1.7) * promote_precision(promote_precision(avg(a))) > Which will later optimized as: > 1.7 * promote_precision(promote_precision(avg(a))) > As can be seen, re-running the analysis and optimization on this expression > results in an expression with extra nested promote_preceision nodes. Adding > unneeded nodes to the plan is problematic because it can eliminate situations > where we can reuse the plan. > We opted to introduce dedicated planners for subuqueries, instead of making > the DecimalPrecision rule idempotent, because this eliminates this entire > category of problems. Another benefit is that planning time for subqueries is > reduced. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-30072) Create dedicated planner for subqueries
[ https://issues.apache.org/jira/browse/SPARK-30072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16991554#comment-16991554 ] Xiaoju Wu commented on SPARK-30072: --- [~afroozeh] I think the change from checking if queryExecution equals to checking isSubquery boolean is not correct in some scenarios. If a subquery is executed under SubqueryExec.executionContext, there's no way to pass the information to tell InsertAdaptiveSparkPlan rule that it is a subquery. > Create dedicated planner for subqueries > --- > > Key: SPARK-30072 > URL: https://issues.apache.org/jira/browse/SPARK-30072 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Ali Afroozeh >Assignee: Ali Afroozeh >Priority: Minor > Fix For: 3.0.0 > > > This PR changes subquery planning by calling the planner and plan preparation > rules on the subquery plan directly. Before we were creating a QueryExecution > instance for subqueries to get the executedPlan. This would re-run analysis > and optimization on the subqueries plan. Running the analysis again on an > optimized query plan can have unwanted consequences, as some rules, for > example DecimalPrecision, are not idempotent. > As an example, consider the expression 1.7 * avg(a) which after applying the > DecimalPrecision rule becomes: > promote_precision(1.7) * promote_precision(avg(a)) > After the optimization, more specifically the constant folding rule, this > expression becomes: > 1.7 * promote_precision(avg(a)) > Now if we run the analyzer on this optimized query again, we will get: > promote_precision(1.7) * promote_precision(promote_precision(avg(a))) > Which will later optimized as: > 1.7 * promote_precision(promote_precision(avg(a))) > As can be seen, re-running the analysis and optimization on this expression > results in an expression with extra nested promote_preceision nodes. Adding > unneeded nodes to the plan is problematic because it can eliminate situations > where we can reuse the plan. > We opted to introduce dedicated planners for subuqueries, instead of making > the DecimalPrecision rule idempotent, because this eliminates this entire > category of problems. Another benefit is that planning time for subqueries is > reduced. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-30186) support Dynamic Partition Pruning in Adaptive Execution
Xiaoju Wu created SPARK-30186: - Summary: support Dynamic Partition Pruning in Adaptive Execution Key: SPARK-30186 URL: https://issues.apache.org/jira/browse/SPARK-30186 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.0 Reporter: Xiaoju Wu Fix For: 3.0.0 Currently Adaptive Execution cannot work if Dynamic Partition Pruning is applied. private def supportAdaptive(plan: SparkPlan): Boolean = { // TODO migrate dynamic-partition-pruning onto adaptive execution. sanityCheck(plan) && !plan.logicalLink.exists(_.isStreaming) && *!plan.expressions.exists(_.find(_.isInstanceOf[DynamicPruningSubquery]).isDefined)* && plan.children.forall(supportAdaptive) } It means we cannot benefit the performance from both AE and DPP. This ticket is target to make DPP + AE works together. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27290) remove unneed sort under Aggregate
[ https://issues.apache.org/jira/browse/SPARK-27290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862698#comment-16862698 ] Xiaoju Wu commented on SPARK-27290: --- [~joshrosen] Got it. I think we should identify in which patterns sort is really needed and fix the UT to be more meaningful. > remove unneed sort under Aggregate > -- > > Key: SPARK-27290 > URL: https://issues.apache.org/jira/browse/SPARK-27290 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Xiaoju Wu >Priority: Minor > > I saw some tickets to remove unneeded sort in plan while I think there's > another case in which sort is redundant: > Sort just under an non-orderPreserving node is redundant, for example: > {code} > select count(*) from (select a1 from A order by a2); > +- Aggregate > +- Sort > +- FileScan parquet > {code} > But one of the existing test cases is conflict with this example: > {code} > test("sort should not be removed when there is a node which doesn't guarantee > any order") { > val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc) >val groupedAndResorted = orderedPlan.groupBy('a)(sum('a)).orderBy('a.asc) > val optimized = Optimize.execute(groupedAndResorted.analyze) > val correctAnswer = groupedAndResorted.analyze >comparePlans(optimized, correctAnswer) > } > {code} > Why is it designed like this? In my opinion, since Aggregate won't pass up > the ordering, the below Sort is useless. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27431) move HashedRelation to global UnifiedMemoryManager and enable offheap
Xiaoju Wu created SPARK-27431: - Summary: move HashedRelation to global UnifiedMemoryManager and enable offheap Key: SPARK-27431 URL: https://issues.apache.org/jira/browse/SPARK-27431 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0 Reporter: Xiaoju Wu Why is HashedRelation currently managed by a newly created MemoryManager and disabled with offheap? Can we improve this part? /** * Create a HashedRelation from an Iterator of InternalRow. */ def apply( input: Iterator[InternalRow], key: Seq[Expression], sizeEstimate: Int = 64, taskMemoryManager: TaskMemoryManager = null): HashedRelation = { val mm = Option(taskMemoryManager).getOrElse { new TaskMemoryManager( new UnifiedMemoryManager( new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), Long.MaxValue, Long.MaxValue / 2, 1), 0) } if (key.length == 1 && key.head.dataType == LongType) { LongHashedRelation(input, key, sizeEstimate, mm) } else { UnsafeHashedRelation(input, key, sizeEstimate, mm) } } -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27290) remove unneed sort under Aggregate
[ https://issues.apache.org/jira/browse/SPARK-27290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16803737#comment-16803737 ] Xiaoju Wu commented on SPARK-27290: --- [~ekoifman] HashAggregate can not benefit from sorted input but SortAggregate can. But SortAggregate will require its input sorted by columns of group-by, and will enforce the SortExec in the logic of EnsureRequirements. So the user-written sort is useless. > remove unneed sort under Aggregate > -- > > Key: SPARK-27290 > URL: https://issues.apache.org/jira/browse/SPARK-27290 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Xiaoju Wu >Priority: Minor > > I saw some tickets to remove unneeded sort in plan while I think there's > another case in which sort is redundant: > Sort just under an non-orderPreserving node is redundant, for example: > select count(*) from (select a1 from A order by a2); > +- Aggregate > +- Sort > +- FileScan parquet > But one of the existing test cases is conflict with this example: > test("sort should not be removed when there is a node which doesn't guarantee > any order") > { val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc) val > groupedAndResorted = orderedPlan.groupBy('a)(sum('a)).orderBy('a.asc) val > optimized = Optimize.execute(groupedAndResorted.analyze) val correctAnswer > = groupedAndResorted.analyze comparePlans(optimized, correctAnswer) } > Why is it designed like this? In my opinion, since Aggregate won't pass up > the ordering, the below Sort is useless. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27290) remove unneed sort under Aggregate
Xiaoju Wu created SPARK-27290: - Summary: remove unneed sort under Aggregate Key: SPARK-27290 URL: https://issues.apache.org/jira/browse/SPARK-27290 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0 Reporter: Xiaoju Wu I saw some tickets to remove unneeded sort in plan while I think there's another case in which sort is redundant: Sort just under an non-orderPreserving node is redundant, for example: select count(*) from (select a1 from A order by a2); +- Aggregate +- Sort +- FileScan parquet But one of the existing test cases is conflict with this example: test("sort should not be removed when there is a node which doesn't guarantee any order") { val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc) val groupedAndResorted = orderedPlan.groupBy('a)(sum('a)).orderBy('a.asc) val optimized = Optimize.execute(groupedAndResorted.analyze) val correctAnswer = groupedAndResorted.analyze comparePlans(optimized, correctAnswer) } Why is it designed like this? In my opinion, since Aggregate won't pass up the ordering, the below Sort is useless. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin
[ https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16794962#comment-16794962 ] Xiaoju Wu commented on SPARK-21492: --- Any updates? Do you have any discussion on the general fix instead of hack in SMJ? > Memory leak in SortMergeJoin > > > Key: SPARK-21492 > URL: https://issues.apache.org/jira/browse/SPARK-21492 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0 >Reporter: Zhan Zhang >Priority: Major > > In SortMergeJoin, if the iterator is not exhausted, there will be memory leak > caused by the Sort. The memory is not released until the task end, and cannot > be used by other operators causing performance drop or OOM. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25837) Web UI does not respect spark.ui.retainedJobs in some instances
[ https://issues.apache.org/jira/browse/SPARK-25837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16794932#comment-16794932 ] Xiaoju Wu commented on SPARK-25837: --- Did you verify this fix with the reproduce case above? I tried and found the issue is still there: the cleanup was still backed up but better than the version without this fix. > Web UI does not respect spark.ui.retainedJobs in some instances > --- > > Key: SPARK-25837 > URL: https://issues.apache.org/jira/browse/SPARK-25837 > Project: Spark > Issue Type: Bug > Components: Web UI >Affects Versions: 2.3.1 > Environment: Reproduction Environment: > Spark 2.3.1 > Dataproc 1.3-deb9 > 1x master 4 vCPUs, 15 GB > 2x workers 4 vCPUs, 15 GB > >Reporter: Patrick Brown >Assignee: Patrick Brown >Priority: Minor > Fix For: 2.3.3, 2.4.1, 3.0.0 > > Attachments: Screen Shot 2018-10-23 at 4.40.51 PM (1).png > > > Expected Behavior: Web UI only displays 1 completed job and remains > responsive. > Actual Behavior: Both during job execution and following all job completion > for some non short amount of time the UI retains many completed jobs, causing > limited responsiveness. > > To reproduce: > > > spark-shell --conf spark.ui.retainedJobs=1 > > scala> import scala.concurrent._ > scala> import scala.concurrent.ExecutionContext.Implicits.global > scala> for (i <- 0 until 5) { Future > { println(sc.parallelize(0 until i).collect.length) } > } > > > > The attached screenshot shows the state of the webui after running the repro > code, you can see the ui is displaying some 43k completed jobs (takes a long > time to load) after a few minutes of inactivity this will clear out, however > in an application which continues to submit jobs every once in a while, the > issue persists. > > The issue seems to appear when running multiple jobs at once as well as in > sequence for a while and may as well have something to do with high master > CPU usage (thus the collect in the repro code). My rough guess would be > whatever is managing clearing out completed jobs gets overwhelmed (on the > master during repro htop reported almost full CPU usage across all 4 cores). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23375) Optimizer should remove unneeded Sort
[ https://issues.apache.org/jira/browse/SPARK-23375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16794805#comment-16794805 ] Xiaoju Wu commented on SPARK-23375: --- But one of your test cases is conflict with what I talked about above: test("sort should not be removed when there is a node which doesn't guarantee any order") { val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc) val groupedAndResorted = orderedPlan.groupBy('a)(sum('a)).orderBy('a.asc) val optimized = Optimize.execute(groupedAndResorted.analyze) val correctAnswer = groupedAndResorted.analyze comparePlans(optimized, correctAnswer) } Why you design like this? In my opinion, since Aggregate won't pass up the ordering, the below Sort is useless. > Optimizer should remove unneeded Sort > - > > Key: SPARK-23375 > URL: https://issues.apache.org/jira/browse/SPARK-23375 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Marco Gaido >Assignee: Marco Gaido >Priority: Minor > Fix For: 2.4.0 > > > As pointed out in SPARK-23368, as of now there is no rule to remove the Sort > operator on an already sorted plan, ie. if we have a query like: > {code} > SELECT b > FROM ( > SELECT a, b > FROM table1 > ORDER BY a > ) t > ORDER BY a > {code} > The sort is actually executed twice, even though it is not needed. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23375) Optimizer should remove unneeded Sort
[ https://issues.apache.org/jira/browse/SPARK-23375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16794791#comment-16794791 ] Xiaoju Wu commented on SPARK-23375: --- I think there's another case in which sort is redundant: Sort just under an non-orderPreserving node is redundant, for example: select count(*) from (select a1 from A order by a2); +- Aggregate +- Sort +- FileScan parquet > Optimizer should remove unneeded Sort > - > > Key: SPARK-23375 > URL: https://issues.apache.org/jira/browse/SPARK-23375 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Marco Gaido >Assignee: Marco Gaido >Priority: Minor > Fix For: 2.4.0 > > > As pointed out in SPARK-23368, as of now there is no rule to remove the Sort > operator on an already sorted plan, ie. if we have a query like: > {code} > SELECT b > FROM ( > SELECT a, b > FROM table1 > ORDER BY a > ) t > ORDER BY a > {code} > The sort is actually executed twice, even though it is not needed. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26779) NullPointerException when disable wholestage codegen
Xiaoju Wu created SPARK-26779: - Summary: NullPointerException when disable wholestage codegen Key: SPARK-26779 URL: https://issues.apache.org/jira/browse/SPARK-26779 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.0 Reporter: Xiaoju Wu When running TPCDSSuite with wholestage codegen disabled, NPE is thrown in q9: java.lang.NullPointerException at org.apache.spark.sql.execution.FileSourceScanExec.(DataSourceScanExec.scala:170) at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:613) at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:160) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210) at
[jira] [Commented] (SPARK-23839) consider bucket join in cost-based JoinReorder rule
[ https://issues.apache.org/jira/browse/SPARK-23839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628184#comment-16628184 ] Xiaoju Wu commented on SPARK-23839: --- [~smilegator] Is there any plan on the cost-based optimizer? > consider bucket join in cost-based JoinReorder rule > --- > > Key: SPARK-23839 > URL: https://issues.apache.org/jira/browse/SPARK-23839 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Xiaoju Wu >Priority: Minor > > Since spark 2.2, the cost-based JoinReorder rule is implemented and in Spark > 2.3 released, it is improved with histogram. While it doesn't take the cost > of the different join implementations. For example: > TableA JOIN TableB JOIN TableC > TableA will output 10,000 rows after filter and projection. > TableB will output 10,000 rows after filter and projection. > TableC will output 8,000 rows after filter and projection. > The current JoinReorder rule will possibly optimize the plan to join TableC > with TableA firstly and then TableB. But if the TableA and TableB are bucket > tables and can be applied with BucketJoin, it could be a different story. > > Also, to support bucket join of more than 2 tables when table bucket number > is multiple of another (SPARK-17570), whether bucket join can take effect > depends on the result of JoinReorder. For example of "A join B join C" which > has bucket number like 8, 4, 12, JoinReorder rule should optimize the order > to "A join B join C“ to make the bucket join take effect instead of "C join A > join B". > > Based on current CBO JoinReorder, there are possibly 2 part to be changed: > # CostBasedJoinReorder rule is applied in optimizer phase while we do Join > selection in planner phase and bucket join optimization in EnsureRequirements > which is in preparation phase. Both are after optimizer. > # Current statistics and join cost formula are based data selectivity and > cardinality, we need to add statistics for present the join method cost like > shuffle, sort, hash and etc. Also we need to add the statistics into the > formula to estimate the join cost. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24088) only HadoopRDD leverage HDFS Cache as preferred location
Xiaoju Wu created SPARK-24088: - Summary: only HadoopRDD leverage HDFS Cache as preferred location Key: SPARK-24088 URL: https://issues.apache.org/jira/browse/SPARK-24088 Project: Spark Issue Type: Improvement Components: Input/Output Affects Versions: 2.3.0 Reporter: Xiaoju Wu Only HadoopRDD implements convertSplitLocationInfo which will convert location to HDFSCacheTaskLocation based on if the block is cached in Datanode memory. While FileScanRDD not. In FileScanRDD, all split location information is dropped. private[spark] def convertSplitLocationInfo( infos: Array[SplitLocationInfo]): Option[Seq[String]] = { Option(infos).map(_.flatMap { loc => val locationStr = loc.getLocation if (locationStr != "localhost") { if (loc.isInMemory) { logDebug(s"Partition $locationStr is cached by Hadoop.") Some(HDFSCacheTaskLocation(locationStr).toString) } else { Some(HostTaskLocation(locationStr).toString) } } else { None } }) } -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23839) consider bucket join in cost-based JoinReorder rule
[ https://issues.apache.org/jira/browse/SPARK-23839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1648#comment-1648 ] Xiaoju Wu edited comment on SPARK-23839 at 4/2/18 3:05 PM: --- Yes, bucketing is one of the cases to say that the cost of shuffling and sorting should be taken into consideration of the cost formula. I agree the simple the better. Seems we need to think about a simple solution, which will not lead the join order to worse performance than that optimized after current CostBasedJoinReorder rule. Any suggestion? was (Author: xiaojuwu): Yes, bucketing is one of the cases to say that the cost of shuffling and sorting should be taken into consideration of the cost formula. I agree the simple the better. Seems we need to think about a simple solution, which will not lead the join order to worse performance than that optimized after current CostBasedJoinReorder rule. > consider bucket join in cost-based JoinReorder rule > --- > > Key: SPARK-23839 > URL: https://issues.apache.org/jira/browse/SPARK-23839 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Xiaoju Wu >Priority: Minor > > Since spark 2.2, the cost-based JoinReorder rule is implemented and in Spark > 2.3 released, it is improved with histogram. While it doesn't take the cost > of the different join implementations. For example: > TableA JOIN TableB JOIN TableC > TableA will output 10,000 rows after filter and projection. > TableB will output 10,000 rows after filter and projection. > TableC will output 8,000 rows after filter and projection. > The current JoinReorder rule will possibly optimize the plan to join TableC > with TableA firstly and then TableB. But if the TableA and TableB are bucket > tables and can be applied with BucketJoin, it could be a different story. > > Also, to support bucket join of more than 2 tables when table bucket number > is multiple of another (SPARK-17570), whether bucket join can take effect > depends on the result of JoinReorder. For example of "A join B join C" which > has bucket number like 8, 4, 12, JoinReorder rule should optimize the order > to "A join B join C“ to make the bucket join take effect instead of "C join A > join B". > > Based on current CBO JoinReorder, there are possibly 2 part to be changed: > # CostBasedJoinReorder rule is applied in optimizer phase while we do Join > selection in planner phase and bucket join optimization in EnsureRequirements > which is in preparation phase. Both are after optimizer. > # Current statistics and join cost formula are based data selectivity and > cardinality, we need to add statistics for present the join method cost like > shuffle, sort, hash and etc. Also we need to add the statistics into the > formula to estimate the join cost. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23839) consider bucket join in cost-based JoinReorder rule
[ https://issues.apache.org/jira/browse/SPARK-23839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1648#comment-1648 ] Xiaoju Wu commented on SPARK-23839: --- Yes, bucketing is one of the cases to say that the cost of shuffling and sorting should be taken into consideration of the cost formula. I agree the simple the better. Seems we need to think about a simple solution, which will not lead the join order to worse performance than that optimized after current CostBasedJoinReorder rule. > consider bucket join in cost-based JoinReorder rule > --- > > Key: SPARK-23839 > URL: https://issues.apache.org/jira/browse/SPARK-23839 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Xiaoju Wu >Priority: Minor > > Since spark 2.2, the cost-based JoinReorder rule is implemented and in Spark > 2.3 released, it is improved with histogram. While it doesn't take the cost > of the different join implementations. For example: > TableA JOIN TableB JOIN TableC > TableA will output 10,000 rows after filter and projection. > TableB will output 10,000 rows after filter and projection. > TableC will output 8,000 rows after filter and projection. > The current JoinReorder rule will possibly optimize the plan to join TableC > with TableA firstly and then TableB. But if the TableA and TableB are bucket > tables and can be applied with BucketJoin, it could be a different story. > > Also, to support bucket join of more than 2 tables when table bucket number > is multiple of another (SPARK-17570), whether bucket join can take effect > depends on the result of JoinReorder. For example of "A join B join C" which > has bucket number like 8, 4, 12, JoinReorder rule should optimize the order > to "A join B join C“ to make the bucket join take effect instead of "C join A > join B". > > Based on current CBO JoinReorder, there are possibly 2 part to be changed: > # CostBasedJoinReorder rule is applied in optimizer phase while we do Join > selection in planner phase and bucket join optimization in EnsureRequirements > which is in preparation phase. Both are after optimizer. > # Current statistics and join cost formula are based data selectivity and > cardinality, we need to add statistics for present the join method cost like > shuffle, sort, hash and etc. Also we need to add the statistics into the > formula to estimate the join cost. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23839) consider bucket join in cost-based JoinReorder rule
[ https://issues.apache.org/jira/browse/SPARK-23839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16422116#comment-16422116 ] Xiaoju Wu commented on SPARK-23839: --- [~maropu] My concern is, "bucket join always firstly" doesn't mean best performance every time. In my example above, if C join A will have very less output which can benefit the following join a lot, why not join them firstly even though A and B can be applied with bucket join? > consider bucket join in cost-based JoinReorder rule > --- > > Key: SPARK-23839 > URL: https://issues.apache.org/jira/browse/SPARK-23839 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Xiaoju Wu >Priority: Minor > > Since spark 2.2, the cost-based JoinReorder rule is implemented and in Spark > 2.3 released, it is improved with histogram. While it doesn't take the cost > of the different join implementations. For example: > TableA JOIN TableB JOIN TableC > TableA will output 10,000 rows after filter and projection. > TableB will output 10,000 rows after filter and projection. > TableC will output 8,000 rows after filter and projection. > The current JoinReorder rule will possibly optimize the plan to join TableC > with TableA firstly and then TableB. But if the TableA and TableB are bucket > tables and can be applied with BucketJoin, it could be a different story. > > Also, to support bucket join of more than 2 tables when table bucket number > is multiple of another (SPARK-17570), whether bucket join can take effect > depends on the result of JoinReorder. For example of "A join B join C" which > has bucket number like 8, 4, 12, JoinReorder rule should optimize the order > to "A join B join C“ to make the bucket join take effect instead of "C join A > join B". > > Based on current CBO JoinReorder, there are possibly 2 part to be changed: > # CostBasedJoinReorder rule is applied in optimizer phase while we do Join > selection in planner phase and bucket join optimization in EnsureRequirements > which is in preparation phase. Both are after optimizer. > # Current statistics and join cost formula are based data selectivity and > cardinality, we need to add statistics for present the join method cost like > shuffle, sort, hash and etc. Also we need to add the statistics into the > formula to estimate the join cost. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23839) consider bucket join in cost-based JoinReorder rule
[ https://issues.apache.org/jira/browse/SPARK-23839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaoju Wu updated SPARK-23839: -- Description: Since spark 2.2, the cost-based JoinReorder rule is implemented and in Spark 2.3 released, it is improved with histogram. While it doesn't take the cost of the different join implementations. For example: TableA JOIN TableB JOIN TableC TableA will output 10,000 rows after filter and projection. TableB will output 10,000 rows after filter and projection. TableC will output 8,000 rows after filter and projection. The current JoinReorder rule will possibly optimize the plan to join TableC with TableA firstly and then TableB. But if the TableA and TableB are bucket tables and can be applied with BucketJoin, it could be a different story. Also, to support bucket join of more than 2 tables when table bucket number is multiple of another (SPARK-17570), whether bucket join can take effect depends on the result of JoinReorder. For example of "A join B join C" which has bucket number like 8, 4, 12, JoinReorder rule should optimize the order to "A join B join C“ to make the bucket join take effect instead of "C join A join B". Based on current CBO JoinReorder, there are possibly 2 part to be changed: # CostBasedJoinReorder rule is applied in optimizer phase while we do Join selection in planner phase and bucket join optimization in EnsureRequirements which is in preparation phase. Both are after optimizer. # Current statistics and join cost formula are based data selectivity and cardinality, we need to add statistics for present the join method cost like shuffle, sort, hash and etc. Also we need to add the statistics into the formula to estimate the join cost. was: Since spark 2.2, the cost-based JoinReorder rule is implemented and in Spark 2.3 released, it is improved with histogram. While it doesn't take the cost of the different join implementations. For example: TableA JOIN TableB JOIN TableC TableA will output 10,000 rows after filter and projection. TableB will output 10,000 rows after filter and projection. TableC will output 8,000 rows after filter and projection. The current JoinReorder rule will possibly optimize the plan to join TableC with TableA firstly and then TableB. But if the TableA and TableB are bucket tables and can be applied with BucketJoin, it could be a different story. Also, to support bucket join of more than 2 tables when table bucket number is multiple of another (SPARK-17570), whether bucket join can take effect depends on the result of JoinReorder. For example of "a join b join c" which has bucket number like 8, 12, 4, JoinReorder rule should optimize the order to "c join a join b“ to make the bucket join take effect. Based on current CBO JoinReorder, there are possibly 2 part to be changed: # CostBasedJoinReorder rule is applied in optimizer phase while we do Join selection in planner phase and bucket join optimization in EnsureRequirements which is in preparation phase. Both are after optimizer. # Current statistics and join cost formula are based data selectivity and cardinality, we need to add statistics for present the join method cost like shuffle, sort, hash and etc. Also we need to add the statistics into the formula to estimate the join cost. > consider bucket join in cost-based JoinReorder rule > --- > > Key: SPARK-23839 > URL: https://issues.apache.org/jira/browse/SPARK-23839 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Xiaoju Wu >Priority: Minor > > Since spark 2.2, the cost-based JoinReorder rule is implemented and in Spark > 2.3 released, it is improved with histogram. While it doesn't take the cost > of the different join implementations. For example: > TableA JOIN TableB JOIN TableC > TableA will output 10,000 rows after filter and projection. > TableB will output 10,000 rows after filter and projection. > TableC will output 8,000 rows after filter and projection. > The current JoinReorder rule will possibly optimize the plan to join TableC > with TableA firstly and then TableB. But if the TableA and TableB are bucket > tables and can be applied with BucketJoin, it could be a different story. > > Also, to support bucket join of more than 2 tables when table bucket number > is multiple of another (SPARK-17570), whether bucket join can take effect > depends on the result of JoinReorder. For example of "A join B join C" which > has bucket number like 8, 4, 12, JoinReorder rule should optimize the order > to "A join B join C“ to make the bucket join take effect instead of "C join A > join B". > > Based on current CBO JoinReorder, there are possibly 2 part to be changed: > #
[jira] [Commented] (SPARK-23839) consider bucket join in cost-based JoinReorder rule
[ https://issues.apache.org/jira/browse/SPARK-23839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421613#comment-16421613 ] Xiaoju Wu commented on SPARK-23839: --- Any discussion or ticket already related to this topic please let me know. > consider bucket join in cost-based JoinReorder rule > --- > > Key: SPARK-23839 > URL: https://issues.apache.org/jira/browse/SPARK-23839 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Xiaoju Wu >Priority: Minor > > Since spark 2.2, the cost-based JoinReorder rule is implemented and in Spark > 2.3 released, it is improved with histogram. While it doesn't take the cost > of the different join implementations. For example: > TableA JOIN TableB JOIN TableC > TableA will output 10,000 rows after filter and projection. > TableB will output 10,000 rows after filter and projection. > TableC will output 8,000 rows after filter and projection. > The current JoinReorder rule will possibly optimize the plan to join TableC > with TableA firstly and then TableB. But if the TableA and TableB are bucket > tables and can be applied with BucketJoin, it could be a different story. > > Also, to support bucket join of more than 2 tables when table bucket number > is multiple of another (SPARK-17570), whether bucket join can take effect > depends on the result of JoinReorder. For example of "a join b join c" which > has bucket number like 8, 12, 4, JoinReorder rule should optimize the order > to "c join a join b“ to make the bucket join take effect. > > Based on current CBO JoinReorder, there are possibly 2 part to be changed: > # CostBasedJoinReorder rule is applied in optimizer phase while we do Join > selection in planner phase and bucket join optimization in EnsureRequirements > which is in preparation phase. Both are after optimizer. > # Current statistics and join cost formula are based data selectivity and > cardinality, we need to add statistics for present the join method cost like > shuffle, sort, hash and etc. Also we need to add the statistics into the > formula to estimate the join cost. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23839) consider bucket join in cost-based JoinReorder rule
Xiaoju Wu created SPARK-23839: - Summary: consider bucket join in cost-based JoinReorder rule Key: SPARK-23839 URL: https://issues.apache.org/jira/browse/SPARK-23839 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0 Reporter: Xiaoju Wu Since spark 2.2, the cost-based JoinReorder rule is implemented and in Spark 2.3 released, it is improved with histogram. While it doesn't take the cost of the different join implementations. For example: TableA JOIN TableB JOIN TableC TableA will output 10,000 rows after filter and projection. TableB will output 10,000 rows after filter and projection. TableC will output 8,000 rows after filter and projection. The current JoinReorder rule will possibly optimize the plan to join TableC with TableA firstly and then TableB. But if the TableA and TableB are bucket tables and can be applied with BucketJoin, it could be a different story. Also, to support bucket join of more than 2 tables when table bucket number is multiple of another (SPARK-17570), whether bucket join can take effect depends on the result of JoinReorder. For example of "a join b join c" which has bucket number like 8, 12, 4, JoinReorder rule should optimize the order to "c join a join b“ to make the bucket join take effect. Based on current CBO JoinReorder, there are possibly 2 part to be changed: # CostBasedJoinReorder rule is applied in optimizer phase while we do Join selection in planner phase and bucket join optimization in EnsureRequirements which is in preparation phase. Both are after optimizer. # Current statistics and join cost formula are based data selectivity and cardinality, we need to add statistics for present the join method cost like shuffle, sort, hash and etc. Also we need to add the statistics into the formula to estimate the join cost. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17570) Avoid Hash and Exchange in Sort Merge join if bucketing factor is multiple for tables
[ https://issues.apache.org/jira/browse/SPARK-17570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410752#comment-16410752 ] Xiaoju Wu commented on SPARK-17570: --- [~tejasp] When you join 3 tables with bucket number 4,8,12, if bucket join it depends on the ordering of join. Does it mean the changes on joinReordering rule? > Avoid Hash and Exchange in Sort Merge join if bucketing factor is multiple > for tables > - > > Key: SPARK-17570 > URL: https://issues.apache.org/jira/browse/SPARK-17570 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.0.0 >Reporter: Tejas Patil >Priority: Minor > > In case of bucketed tables, Spark will avoid doing `Sort` and `Exchange` if > the input tables and output table has same number of buckets. However, > unequal bucketing will always lead to `Sort` and `Exchange`. If the number of > buckets in the output table is a factor of the buckets in the input table, we > should be able to avoid `Sort` and `Exchange` and directly join those. > eg. > Assume Input1, Input2 and Output be bucketed + sorted tables over the same > columns but with different number of buckets. Input1 has 8 buckets, Input1 > has 4 buckets and Output has 4 buckets. Since hash-partitioning is done using > Modulus, if we JOIN buckets (0, 4) of Input1 and buckets (0, 4, 8) of Input2 > in the same task, it would give the bucket 0 of output table. > {noformat} > Input1 (0, 4) (1, 3) (2, 5) (3, 7) > Input2 (0, 4, 8) (1, 3, 9) (2, 5, 10) (3, 7, 11) > Output (0) (1) (2) (3) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-17495) Hive hash implementation
[ https://issues.apache.org/jira/browse/SPARK-17495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaoju Wu updated SPARK-17495: -- Comment: was deleted (was: [~tejasp] I can see HiveHash merged but never used. Seems the using of spark/hive hash is still under discussion, is there any update on this topic?) > Hive hash implementation > > > Key: SPARK-17495 > URL: https://issues.apache.org/jira/browse/SPARK-17495 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Tejas Patil >Assignee: Tejas Patil >Priority: Minor > > Spark internally uses Murmur3Hash for partitioning. This is different from > the one used by Hive. For queries which use bucketing this leads to different > results if one tries the same query on both engines. For us, we want users to > have backward compatibility to that one can switch parts of applications > across the engines without observing regressions. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17495) Hive hash implementation
[ https://issues.apache.org/jira/browse/SPARK-17495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387873#comment-16387873 ] Xiaoju Wu commented on SPARK-17495: --- [~tejasp] I can see HiveHash merged but never used. Seems the using of spark/hive hash is still under discussion, is there any update on this topic? > Hive hash implementation > > > Key: SPARK-17495 > URL: https://issues.apache.org/jira/browse/SPARK-17495 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Tejas Patil >Assignee: Tejas Patil >Priority: Minor > > Spark internally uses Murmur3Hash for partitioning. This is different from > the one used by Hive. For queries which use bucketing this leads to different > results if one tries the same query on both engines. For us, we want users to > have backward compatibility to that one can switch parts of applications > across the engines without observing regressions. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22469) Accuracy problem in comparison with string and numeric
[ https://issues.apache.org/jira/browse/SPARK-22469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16385156#comment-16385156 ] Xiaoju Wu commented on SPARK-22469: --- [~liutang123] cast Decimal to Double is possible to lose precision, why did you say "There is no proper decimal type we can pick, using double type is the best we can do." ? > Accuracy problem in comparison with string and numeric > --- > > Key: SPARK-22469 > URL: https://issues.apache.org/jira/browse/SPARK-22469 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Lijia Liu >Assignee: Lijia Liu >Priority: Major > Fix For: 2.2.1, 2.3.0 > > > {code:sql} > select '1.5' > 0.5; // Result is NULL in Spark but is true in Hive. > {code} > IIUC, we can cast string as double like Hive. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-23493) insert-into depends on columns order, otherwise incorrect data inserted
[ https://issues.apache.org/jira/browse/SPARK-23493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaoju Wu resolved SPARK-23493. --- Resolution: Not A Bug > insert-into depends on columns order, otherwise incorrect data inserted > --- > > Key: SPARK-23493 > URL: https://issues.apache.org/jira/browse/SPARK-23493 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1 >Reporter: Xiaoju Wu >Priority: Minor > > insert-into only works when the partitionby key columns are set at last: > val data = Seq( > (7, "test1", 1.0), > (8, "test#test", 0.0), > (9, "test3", 0.0) > ) > import spark.implicits._ > val table = "default.tbl" > spark > .createDataset(data) > .toDF("col1", "col2", "col3") > .write > .partitionBy("col1") > .saveAsTable(table) > val data2 = Seq( > (7, "test2", 1.0), > (8, "test#test", 0.0), > (9, "test3", 0.0) > ) > spark > .createDataset(data2) > .toDF("col1", "col2", "col3") > .write > .insertInto(table) > sql("select * from " + table).show() > ++-++ > |col2|col3|col1| > ++-++ > |test#test|0.0|8| > |test1|1.0|7| > |test3|0.0|9| > |8|null|0| > |9|null|0| > |7|null|1| > ++-++ > > If you try inserting with sql, the issue is the same. > val data = Seq( > (7, "test1", 1.0), > (8, "test#test", 0.0), > (9, "test3", 0.0) > ) > import spark.implicits._ > val table = "default.tbl" > spark > .createDataset(data) > .toDF("col1", "col2", "col3") > .write > .partitionBy("col1") > .saveAsTable(table) > val data2 = Seq( > (7, "test2", 1.0), > (8, "test#test", 0.0), > (9, "test3", 0.0) > ) > sql("insert into " + table + " values(7,'test2',1.0)") > sql("select * from " + table).show() > +--+---++ > |col2|col3|col1| > +--+---++ > |test#test|0.0|8| > |test1|1.0|7| > |test3|0.0|9| > |7|null|1| > +--+---++ > No exception was thrown since I only run insertInto, not together with > partitionBy. The data are inserted incorrectly. The issue is related to > column order. If I change to partitionBy col3, which is the last column, it > works. > val data = Seq( > (7, "test1", 1.0), > (8, "test#test", 0.0), > (9, "test3", 0.0) > ) > import spark.implicits._ > val table = "default.tbl" > spark > .createDataset(data) > .toDF("col1", "col2", "col3") > .write > .partitionBy("col3") > .saveAsTable(table) > val data2 = Seq( > (7, "test2", 1.0), > (8, "test#test", 0.0), > (9, "test3", 0.0) > ) > spark > .createDataset(data2) > .toDF("col1", "col2", "col3") > .write > .insertInto(table) > sql("select * from " + table).show() > +---+--++ > |col1|col2|col3| > +---+--++ > |8|test#test|0.0| > |9|test3|0.0| > |8|test#test|0.0| > |9|test3|0.0| > |7|test1|1.0| > |7|test2|1.0| > +---+--++ -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23493) insert-into depends on columns order, otherwise incorrect data inserted
[ https://issues.apache.org/jira/browse/SPARK-23493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374266#comment-16374266 ] Xiaoju Wu commented on SPARK-23493: --- If that's the case, it should throw an exception to tell the users of the API "please set the partition column at last" instead of swallow the issue and just return the data inserted incorrectly, right? > insert-into depends on columns order, otherwise incorrect data inserted > --- > > Key: SPARK-23493 > URL: https://issues.apache.org/jira/browse/SPARK-23493 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1 >Reporter: Xiaoju Wu >Priority: Minor > > insert-into only works when the partitionby key columns are set at last: > val data = Seq( > (7, "test1", 1.0), > (8, "test#test", 0.0), > (9, "test3", 0.0) > ) > import spark.implicits._ > val table = "default.tbl" > spark > .createDataset(data) > .toDF("col1", "col2", "col3") > .write > .partitionBy("col1") > .saveAsTable(table) > val data2 = Seq( > (7, "test2", 1.0), > (8, "test#test", 0.0), > (9, "test3", 0.0) > ) > spark > .createDataset(data2) > .toDF("col1", "col2", "col3") > .write > .insertInto(table) > sql("select * from " + table).show() > ++-++ > |col2|col3|col1| > ++-++ > |test#test|0.0|8| > |test1|1.0|7| > |test3|0.0|9| > |8|null|0| > |9|null|0| > |7|null|1| > ++-++ > > If you try inserting with sql, the issue is the same. > val data = Seq( > (7, "test1", 1.0), > (8, "test#test", 0.0), > (9, "test3", 0.0) > ) > import spark.implicits._ > val table = "default.tbl" > spark > .createDataset(data) > .toDF("col1", "col2", "col3") > .write > .partitionBy("col1") > .saveAsTable(table) > val data2 = Seq( > (7, "test2", 1.0), > (8, "test#test", 0.0), > (9, "test3", 0.0) > ) > sql("insert into " + table + " values(7,'test2',1.0)") > sql("select * from " + table).show() > +--+---++ > |col2|col3|col1| > +--+---++ > |test#test|0.0|8| > |test1|1.0|7| > |test3|0.0|9| > |7|null|1| > +--+---++ > No exception was thrown since I only run insertInto, not together with > partitionBy. The data are inserted incorrectly. The issue is related to > column order. If I change to partitionBy col3, which is the last column, it > works. > val data = Seq( > (7, "test1", 1.0), > (8, "test#test", 0.0), > (9, "test3", 0.0) > ) > import spark.implicits._ > val table = "default.tbl" > spark > .createDataset(data) > .toDF("col1", "col2", "col3") > .write > .partitionBy("col3") > .saveAsTable(table) > val data2 = Seq( > (7, "test2", 1.0), > (8, "test#test", 0.0), > (9, "test3", 0.0) > ) > spark > .createDataset(data2) > .toDF("col1", "col2", "col3") > .write > .insertInto(table) > sql("select * from " + table).show() > +---+--++ > |col1|col2|col3| > +---+--++ > |8|test#test|0.0| > |9|test3|0.0| > |8|test#test|0.0| > |9|test3|0.0| > |7|test1|1.0| > |7|test2|1.0| > +---+--++ -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23493) insert-into depends on columns order, otherwise incorrect data inserted
[ https://issues.apache.org/jira/browse/SPARK-23493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374172#comment-16374172 ] Xiaoju Wu commented on SPARK-23493: --- [~mgaido] "Columns are matched in order while inserting" This is acceptable, but if you looking into my case, you will find it forces the partition key to be placed at the last of the columns, that seems confusing. > insert-into depends on columns order, otherwise incorrect data inserted > --- > > Key: SPARK-23493 > URL: https://issues.apache.org/jira/browse/SPARK-23493 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1 >Reporter: Xiaoju Wu >Priority: Minor > > insert-into only works when the partitionby key columns are set at last: > val data = Seq( > (7, "test1", 1.0), > (8, "test#test", 0.0), > (9, "test3", 0.0) > ) > import spark.implicits._ > val table = "default.tbl" > spark > .createDataset(data) > .toDF("col1", "col2", "col3") > .write > .partitionBy("col1") > .saveAsTable(table) > val data2 = Seq( > (7, "test2", 1.0), > (8, "test#test", 0.0), > (9, "test3", 0.0) > ) > spark > .createDataset(data2) > .toDF("col1", "col2", "col3") > .write > .insertInto(table) > sql("select * from " + table).show() > ++-++ > |col2|col3|col1| > ++-++ > |test#test|0.0|8| > |test1|1.0|7| > |test3|0.0|9| > |8|null|0| > |9|null|0| > |7|null|1| > ++-++ > > If you try inserting with sql, the issue is the same. > val data = Seq( > (7, "test1", 1.0), > (8, "test#test", 0.0), > (9, "test3", 0.0) > ) > import spark.implicits._ > val table = "default.tbl" > spark > .createDataset(data) > .toDF("col1", "col2", "col3") > .write > .partitionBy("col1") > .saveAsTable(table) > val data2 = Seq( > (7, "test2", 1.0), > (8, "test#test", 0.0), > (9, "test3", 0.0) > ) > sql("insert into " + table + " values(7,'test2',1.0)") > sql("select * from " + table).show() > +--+---++ > |col2|col3|col1| > +--+---++ > |test#test|0.0|8| > |test1|1.0|7| > |test3|0.0|9| > |7|null|1| > +--+---++ > No exception was thrown since I only run insertInto, not together with > partitionBy. The data are inserted incorrectly. The issue is related to > column order. If I change to partitionBy col3, which is the last column, it > works. > val data = Seq( > (7, "test1", 1.0), > (8, "test#test", 0.0), > (9, "test3", 0.0) > ) > import spark.implicits._ > val table = "default.tbl" > spark > .createDataset(data) > .toDF("col1", "col2", "col3") > .write > .partitionBy("col3") > .saveAsTable(table) > val data2 = Seq( > (7, "test2", 1.0), > (8, "test#test", 0.0), > (9, "test3", 0.0) > ) > spark > .createDataset(data2) > .toDF("col1", "col2", "col3") > .write > .insertInto(table) > sql("select * from " + table).show() > +---+--++ > |col1|col2|col3| > +---+--++ > |8|test#test|0.0| > |9|test3|0.0| > |8|test#test|0.0| > |9|test3|0.0| > |7|test1|1.0| > |7|test2|1.0| > +---+--++ -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23493) insert-into depends on columns order, otherwise incorrect data inserted
[ https://issues.apache.org/jira/browse/SPARK-23493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374076#comment-16374076 ] Xiaoju Wu commented on SPARK-23493: --- This issue is similar with the issue described in ticket SPARK-9278. While seems in that ticket, only fixed the insert-into cannot be run together with partitionBy. If I try inserting into a table already set with partition key, the data are inserted incorrectly. > insert-into depends on columns order, otherwise incorrect data inserted > --- > > Key: SPARK-23493 > URL: https://issues.apache.org/jira/browse/SPARK-23493 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1 >Reporter: Xiaoju Wu >Priority: Minor > > insert-into only works when the partitionby key columns are set at last: > val data = Seq( > (7, "test1", 1.0), > (8, "test#test", 0.0), > (9, "test3", 0.0) > ) > import spark.implicits._ > val table = "default.tbl" > spark > .createDataset(data) > .toDF("col1", "col2", "col3") > .write > .partitionBy("col1") > .saveAsTable(table) > val data2 = Seq( > (7, "test2", 1.0), > (8, "test#test", 0.0), > (9, "test3", 0.0) > ) > spark > .createDataset(data2) > .toDF("col1", "col2", "col3") > .write > .insertInto(table) > sql("select * from " + table).show() > ++-++ > |col2|col3|col1| > ++-++ > |test#test|0.0|8| > |test1|1.0|7| > |test3|0.0|9| > |8|null|0| > |9|null|0| > |7|null|1| > ++-++ > > If you try inserting with sql, the issue is the same. > val data = Seq( > (7, "test1", 1.0), > (8, "test#test", 0.0), > (9, "test3", 0.0) > ) > import spark.implicits._ > val table = "default.tbl" > spark > .createDataset(data) > .toDF("col1", "col2", "col3") > .write > .partitionBy("col1") > .saveAsTable(table) > val data2 = Seq( > (7, "test2", 1.0), > (8, "test#test", 0.0), > (9, "test3", 0.0) > ) > sql("insert into " + table + " values(7,'test2',1.0)") > sql("select * from " + table).show() > +--+---++ > |col2|col3|col1| > +--+---++ > |test#test|0.0|8| > |test1|1.0|7| > |test3|0.0|9| > |7|null|1| > +--+---++ > No exception was thrown since I only run insertInto, not together with > partitionBy. The data are inserted incorrectly. The issue is related to > column order. If I change to partitionBy col3, which is the last column, it > works. > val data = Seq( > (7, "test1", 1.0), > (8, "test#test", 0.0), > (9, "test3", 0.0) > ) > import spark.implicits._ > val table = "default.tbl" > spark > .createDataset(data) > .toDF("col1", "col2", "col3") > .write > .partitionBy("col3") > .saveAsTable(table) > val data2 = Seq( > (7, "test2", 1.0), > (8, "test#test", 0.0), > (9, "test3", 0.0) > ) > spark > .createDataset(data2) > .toDF("col1", "col2", "col3") > .write > .insertInto(table) > sql("select * from " + table).show() > +---+--++ > |col1|col2|col3| > +---+--++ > |8|test#test|0.0| > |9|test3|0.0| > |8|test#test|0.0| > |9|test3|0.0| > |7|test1|1.0| > |7|test2|1.0| > +---+--++ -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9278) DataFrameWriter.insertInto inserts incorrect data
[ https://issues.apache.org/jira/browse/SPARK-9278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374073#comment-16374073 ] Xiaoju Wu commented on SPARK-9278: -- Created a new ticket to trace this issue SPARK-23493 > DataFrameWriter.insertInto inserts incorrect data > - > > Key: SPARK-9278 > URL: https://issues.apache.org/jira/browse/SPARK-9278 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.4.0 > Environment: Linux, S3, Hive Metastore >Reporter: Steve Lindemann >Assignee: Cheng Lian >Priority: Critical > > After creating a partitioned Hive table (stored as Parquet) via the > DataFrameWriter.createTable command, subsequent attempts to insert additional > data into new partitions of this table result in inserting incorrect data > rows. Reordering the columns in the data to be written seems to avoid this > issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23493) insert-into depends on columns order, otherwise incorrect data inserted
[ https://issues.apache.org/jira/browse/SPARK-23493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaoju Wu updated SPARK-23493: -- Description: insert-into only works when the partitionby key columns are set at last: val data = Seq( (7, "test1", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) import spark.implicits._ val table = "default.tbl" spark .createDataset(data) .toDF("col1", "col2", "col3") .write .partitionBy("col1") .saveAsTable(table) val data2 = Seq( (7, "test2", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) spark .createDataset(data2) .toDF("col1", "col2", "col3") .write .insertInto(table) sql("select * from " + table).show() ++-++ |col2|col3|col1| ++-++ |test#test|0.0|8| |test1|1.0|7| |test3|0.0|9| |8|null|0| |9|null|0| |7|null|1| ++-++ If you try inserting with sql, the issue is the same. val data = Seq( (7, "test1", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) import spark.implicits._ val table = "default.tbl" spark .createDataset(data) .toDF("col1", "col2", "col3") .write .partitionBy("col1") .saveAsTable(table) val data2 = Seq( (7, "test2", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) sql("insert into " + table + " values(7,'test2',1.0)") sql("select * from " + table).show() +--+---++ |col2|col3|col1| +--+---++ |test#test|0.0|8| |test1|1.0|7| |test3|0.0|9| |7|null|1| +--+---++ No exception was thrown since I only run insertInto, not together with partitionBy. The data are inserted incorrectly. The issue is related to column order. If I change to partitionBy col3, which is the last column, it works. val data = Seq( (7, "test1", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) import spark.implicits._ val table = "default.tbl" spark .createDataset(data) .toDF("col1", "col2", "col3") .write .partitionBy("col3") .saveAsTable(table) val data2 = Seq( (7, "test2", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) spark .createDataset(data2) .toDF("col1", "col2", "col3") .write .insertInto(table) sql("select * from " + table).show() +---+--++ |col1|col2|col3| +---+--++ |8|test#test|0.0| |9|test3|0.0| |8|test#test|0.0| |9|test3|0.0| |7|test1|1.0| |7|test2|1.0| +---+--++ was: Seems the issue still exists, here's the test: val data = Seq( (7, "test1", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) import spark.implicits._ val table = "default.tbl" spark .createDataset(data) .toDF("col1", "col2", "col3") .write .partitionBy("col1") .saveAsTable(table) val data2 = Seq( (7, "test2", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) spark .createDataset(data2) .toDF("col1", "col2", "col3") .write .insertInto(table) sql("select * from " + table).show() +---+--++ |col2|col3|col1| +---+--++ |test#test|0.0|8| |test1|1.0|7| |test3|0.0|9| |8|null|0| |9|null|0| |7|null|1| +---+--++ If you try inserting with sql, the issue is the same. val data = Seq( (7, "test1", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) import spark.implicits._ val table = "default.tbl" spark .createDataset(data) .toDF("col1", "col2", "col3") .write .partitionBy("col1") .saveAsTable(table) val data2 = Seq( (7, "test2", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) sql("insert into " + table + " values(7,'test2',1.0)") sql("select * from " + table).show() +-+++ | col2|col3|col1| +-+++ |test#test| 0.0| 8| | test1| 1.0| 7| | test3| 0.0| 9| | 7|null| 1| +-+++ No exception was thrown since I only run insertInto, not together with partitionBy. The data are inserted incorrectly. The issue is related to column order. If I change to partitionBy col3, which is the last column, it works. val data = Seq( (7, "test1", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) import spark.implicits._ val table = "default.tbl" spark .createDataset(data) .toDF("col1", "col2", "col3") .write .partitionBy("col3") .saveAsTable(table) val data2 = Seq( (7, "test2", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) spark .createDataset(data2) .toDF("col1", "col2", "col3") .write .insertInto(table) sql("select * from " + table).show() +--+---++ |col1|col2|col3| +--+---++ |8|test#test|0.0| |9|test3|0.0| |8|test#test|0.0| |9|test3|0.0| |7|test1|1.0| |7|test2|1.0| +--+---++ > insert-into depends on columns order, otherwise incorrect data inserted > --- > > Key: SPARK-23493 > URL: https://issues.apache.org/jira/browse/SPARK-23493 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1 >Reporter: Xiaoju Wu >Priority: Minor > > insert-into only
[jira] [Created] (SPARK-23493) insert-into depends on columns order, otherwise incorrect data inserted
Xiaoju Wu created SPARK-23493: - Summary: insert-into depends on columns order, otherwise incorrect data inserted Key: SPARK-23493 URL: https://issues.apache.org/jira/browse/SPARK-23493 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.2.1 Reporter: Xiaoju Wu Seems the issue still exists, here's the test: val data = Seq( (7, "test1", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) import spark.implicits._ val table = "default.tbl" spark .createDataset(data) .toDF("col1", "col2", "col3") .write .partitionBy("col1") .saveAsTable(table) val data2 = Seq( (7, "test2", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) spark .createDataset(data2) .toDF("col1", "col2", "col3") .write .insertInto(table) sql("select * from " + table).show() +---+--++ |col2|col3|col1| +---+--++ |test#test|0.0|8| |test1|1.0|7| |test3|0.0|9| |8|null|0| |9|null|0| |7|null|1| +---+--++ If you try inserting with sql, the issue is the same. val data = Seq( (7, "test1", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) import spark.implicits._ val table = "default.tbl" spark .createDataset(data) .toDF("col1", "col2", "col3") .write .partitionBy("col1") .saveAsTable(table) val data2 = Seq( (7, "test2", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) sql("insert into " + table + " values(7,'test2',1.0)") sql("select * from " + table).show() +-+++ | col2|col3|col1| +-+++ |test#test| 0.0| 8| | test1| 1.0| 7| | test3| 0.0| 9| | 7|null| 1| +-+++ No exception was thrown since I only run insertInto, not together with partitionBy. The data are inserted incorrectly. The issue is related to column order. If I change to partitionBy col3, which is the last column, it works. val data = Seq( (7, "test1", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) import spark.implicits._ val table = "default.tbl" spark .createDataset(data) .toDF("col1", "col2", "col3") .write .partitionBy("col3") .saveAsTable(table) val data2 = Seq( (7, "test2", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) spark .createDataset(data2) .toDF("col1", "col2", "col3") .write .insertInto(table) sql("select * from " + table).show() +--+---++ |col1|col2|col3| +--+---++ |8|test#test|0.0| |9|test3|0.0| |8|test#test|0.0| |9|test3|0.0| |7|test1|1.0| |7|test2|1.0| +--+---++ -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-9278) DataFrameWriter.insertInto inserts incorrect data
[ https://issues.apache.org/jira/browse/SPARK-9278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374040#comment-16374040 ] Xiaoju Wu edited comment on SPARK-9278 at 2/23/18 7:48 AM: --- Seems the issue still exists, here's the test: val data = Seq( (7, "test1", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) import spark.implicits._ val table = "default.tbl" spark .createDataset(data) .toDF("col1", "col2", "col3") .write .partitionBy("col1") .saveAsTable(table) val data2 = Seq( (7, "test2", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) spark .createDataset(data2) .toDF("col1", "col2", "col3") .write .insertInto(table) sql("select * from " + table).show() +--+---++ |col2|col3|col1| +--+---++ |test#test|0.0|8| |test1|1.0|7| |test3|0.0|9| |8|null|0| |9|null|0| |7|null|1| +--+---++ No exception was thrown since I only run insertInto not together with partitionBy. The data are inserted incorrectly. The issue is related to column order. If I change to partitionBy col3, which is the last column in order, it works. val data = Seq( (7, "test1", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) import spark.implicits._ val table = "default.tbl" spark .createDataset(data) .toDF("col1", "col2", "col3") .write .partitionBy("col3") .saveAsTable(table) val data2 = Seq( (7, "test2", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) spark .createDataset(data2) .toDF("col1", "col2", "col3") .write .insertInto(table) sql("select * from " + table).show() +-+++ |col1|col2|col3| +-+++ |8|test#test|0.0| |9|test3|0.0| |8|test#test|0.0| |9|test3|0.0| |7|test1|1.0| |7|test2|1.0| +-+++ [~hyukjin.kwon] It's still a problem, it's not a problem as you reproduced, if you insert into a partitioned table which already exists, the problem occurs. was (Author: xiaojuwu): Seems the issue still exists, here's the test: val data = Seq( (7, "test1", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) import spark.implicits._ val table = "default.tbl" spark .createDataset(data) .toDF("col1", "col2", "col3") .write .partitionBy("col1") .saveAsTable(table) val data2 = Seq( (7, "test2", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) spark .createDataset(data2) .toDF("col1", "col2", "col3") .write .insertInto(table) sql("select * from " + table).show() +-+++ | col2|col3|col1| +-+++ |test#test| 0.0| 8| | test1| 1.0| 7| | test3| 0.0| 9| | 8|null| 0| | 9|null| 0| | 7|null| 1| +-+++ No exception was thrown since I only run insertInto not together with partitionBy. The data are inserted incorrectly. The issue is related to column order. If I change to partitionBy col3, which is the last column in order, it works. val data = Seq( (7, "test1", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) import spark.implicits._ val table = "default.tbl" spark .createDataset(data) .toDF("col1", "col2", "col3") .write .partitionBy("col3") .saveAsTable(table) val data2 = Seq( (7, "test2", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) spark .createDataset(data2) .toDF("col1", "col2", "col3") .write .insertInto(table) sql("select * from " + table).show() ++-++ |col1| col2|col3| ++-++ | 8|test#test| 0.0| | 9| test3| 0.0| | 8|test#test| 0.0| | 9| test3| 0.0| | 7| test1| 1.0| | 7| test2| 1.0| ++-++ > DataFrameWriter.insertInto inserts incorrect data > - > > Key: SPARK-9278 > URL: https://issues.apache.org/jira/browse/SPARK-9278 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.4.0 > Environment: Linux, S3, Hive Metastore >Reporter: Steve Lindemann >Assignee: Cheng Lian >Priority: Critical > > After creating a partitioned Hive table (stored as Parquet) via the > DataFrameWriter.createTable command, subsequent attempts to insert additional > data into new partitions of this table result in inserting incorrect data > rows. Reordering the columns in the data to be written seems to avoid this > issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9278) DataFrameWriter.insertInto inserts incorrect data
[ https://issues.apache.org/jira/browse/SPARK-9278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374040#comment-16374040 ] Xiaoju Wu commented on SPARK-9278: -- Seems the issue still exists, here's the test: val data = Seq( (7, "test1", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) import spark.implicits._ val table = "default.tbl" spark .createDataset(data) .toDF("col1", "col2", "col3") .write .partitionBy("col1") .saveAsTable(table) val data2 = Seq( (7, "test2", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) spark .createDataset(data2) .toDF("col1", "col2", "col3") .write .insertInto(table) sql("select * from " + table).show() +-+++ | col2|col3|col1| +-+++ |test#test| 0.0| 8| | test1| 1.0| 7| | test3| 0.0| 9| | 8|null| 0| | 9|null| 0| | 7|null| 1| +-+++ No exception was thrown since I only run insertInto not together with partitionBy. The data are inserted incorrectly. The issue is related to column order. If I change to partitionBy col3, which is the last column in order, it works. val data = Seq( (7, "test1", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) import spark.implicits._ val table = "default.tbl" spark .createDataset(data) .toDF("col1", "col2", "col3") .write .partitionBy("col3") .saveAsTable(table) val data2 = Seq( (7, "test2", 1.0), (8, "test#test", 0.0), (9, "test3", 0.0) ) spark .createDataset(data2) .toDF("col1", "col2", "col3") .write .insertInto(table) sql("select * from " + table).show() ++-++ |col1| col2|col3| ++-++ | 8|test#test| 0.0| | 9| test3| 0.0| | 8|test#test| 0.0| | 9| test3| 0.0| | 7| test1| 1.0| | 7| test2| 1.0| ++-++ > DataFrameWriter.insertInto inserts incorrect data > - > > Key: SPARK-9278 > URL: https://issues.apache.org/jira/browse/SPARK-9278 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.4.0 > Environment: Linux, S3, Hive Metastore >Reporter: Steve Lindemann >Assignee: Cheng Lian >Priority: Critical > > After creating a partitioned Hive table (stored as Parquet) via the > DataFrameWriter.createTable command, subsequent attempts to insert additional > data into new partitions of this table result in inserting incorrect data > rows. Reordering the columns in the data to be written seems to avoid this > issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org