[GitHub] [spark] gengliangwang commented on a diff in pull request #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
gengliangwang commented on code in PR #36562: URL: https://github.com/apache/spark/pull/36562#discussion_r873559626 ## sql/core/benchmarks/CSVBenchmark-results.txt: ## @@ -2,66 +2,66 @@ Benchmark to measure CSV read/write performance -OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure -Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 12.2.1 +Apple M1 Pro Parsing quoted values:Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -One quoted string 41610 42902 1598 0.0 832194.2 1.0X +One quoted string 16964 16981 15 0.0 339281.1 1.0X Review Comment: Not directly related to this PR, but the M1 Macbook is so fast! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on a diff in pull request #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
gengliangwang commented on code in PR #36562: URL: https://github.com/apache/spark/pull/36562#discussion_r873709851 ## sql/core/benchmarks/CSVBenchmark-results.txt: ## @@ -2,66 +2,66 @@ Benchmark to measure CSV read/write performance -OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure -Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 12.2.1 +Apple M1 Pro Parsing quoted values:Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -One quoted string 41610 42902 1598 0.0 832194.2 1.0X +One quoted string 16964 16981 15 0.0 339281.1 1.0X Review Comment: OK I will try it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on pull request #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
gengliangwang commented on PR #36562: URL: https://github.com/apache/spark/pull/36562#issuecomment-1127654867 @MaxGekk So I will revert the benchmark results in this one. There will be regenerated results in the new benchmark PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
MaxGekk commented on PR #36562: URL: https://github.com/apache/spark/pull/36562#issuecomment-1127610949 > Actually the benchmark doesn't show a significant improvement for the timestamp inference ok. Could you open an JIRA to add new benchmarks for CSV/JSON. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a diff in pull request #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
MaxGekk commented on code in PR #36562: URL: https://github.com/apache/spark/pull/36562#discussion_r873663460 ## sql/core/benchmarks/CSVBenchmark-results.txt: ## @@ -2,66 +2,66 @@ Benchmark to measure CSV read/write performance -OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure -Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 12.2.1 +Apple M1 Pro Parsing quoted values:Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -One quoted string 41610 42902 1598 0.0 832194.2 1.0X +One quoted string 16964 16981 15 0.0 339281.1 1.0X Review Comment: > the M1 Macbook is so fast! You should use a GA for the benchmark. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on pull request #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
gengliangwang commented on PR #36562: URL: https://github.com/apache/spark/pull/36562#issuecomment-1127600354 @MaxGekk Actually the benchmark doesn't show a significant improvement for the timestamp inference, since the input set are all valid timestamp strings -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on a diff in pull request #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
gengliangwang commented on code in PR #36562: URL: https://github.com/apache/spark/pull/36562#discussion_r873559626 ## sql/core/benchmarks/CSVBenchmark-results.txt: ## @@ -2,66 +2,66 @@ Benchmark to measure CSV read/write performance -OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure -Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 12.2.1 +Apple M1 Pro Parsing quoted values:Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -One quoted string 41610 42902 1598 0.0 832194.2 1.0X +One quoted string 16964 16981 15 0.0 339281.1 1.0X Review Comment: Not directly related to this PR, but the M1 Macbook is so fast! cc @dbtsai @dongjoon-hyun @viirya @sunchao @huaxingao -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AnywalkerGiser commented on pull request #36566: [SPARK-39176][PYSPARK] Fixed a problem with pyspark serializing pre-1970 datetime in windows
AnywalkerGiser commented on PR #36566: URL: https://github.com/apache/spark/pull/36566#issuecomment-1127516999 @HyukjinKwon I closed the RP in the 3.0 branch(https://github.com/apache/spark/pull/36537) and raised a new RP in the master branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #36530: [SPARK-39172][SQL] Remove outer join if all output come from streamed side and buffered side keys exist unique key
ulysses-you commented on code in PR #36530: URL: https://github.com/apache/spark/pull/36530#discussion_r873584492 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala: ## @@ -462,6 +462,7 @@ package object dsl { Window(windowExpressions, partitionSpec, orderSpec, logicalPlan) def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan) Review Comment: sure, will do a cleanup later -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #36530: [SPARK-39172][SQL] Remove outer join if all output come from streamed side and buffered side keys exist unique key
ulysses-you commented on code in PR #36530: URL: https://github.com/apache/spark/pull/36530#discussion_r873584312 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala: ## @@ -462,6 +462,7 @@ package object dsl { Window(windowExpressions, partitionSpec, orderSpec, logicalPlan) def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan) Review Comment: I looked up the history, it is added at the beginning of SQL .. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36530: [SPARK-39172][SQL] Remove outer join if all output come from streamed side and buffered side keys exist unique key
cloud-fan commented on code in PR #36530: URL: https://github.com/apache/spark/pull/36530#discussion_r873577805 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala: ## @@ -268,4 +268,59 @@ class OuterJoinEliminationSuite extends PlanTest { comparePlans(optimized, originalQuery.analyze) } + + test("SPARK-39172: Remove left outer join if only left-side columns being selected and " + Review Comment: Remove left/right outer join if only left/right side columns are selected and the join keys on the other side are unique. ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala: ## @@ -268,4 +268,59 @@ class OuterJoinEliminationSuite extends PlanTest { comparePlans(optimized, originalQuery.analyze) } + + test("SPARK-39172: Remove left outer join if only left-side columns being selected and " + Review Comment: The same to PR title -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36530: [SPARK-39172][SQL] Remove outer join if all output come from streamed side and buffered side keys exist unique key
cloud-fan commented on code in PR #36530: URL: https://github.com/apache/spark/pull/36530#discussion_r873578814 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala: ## @@ -268,4 +268,59 @@ class OuterJoinEliminationSuite extends PlanTest { comparePlans(optimized, originalQuery.analyze) } + + test("SPARK-39172: Remove left outer join if only left-side columns being selected and " + Review Comment: The PR title can be `Remove left/right outer join if only left/right side columns are selected and the join keys on the other side are unique` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36530: [SPARK-39172][SQL] Remove outer join if all output come from streamed side and buffered side keys exist unique key
cloud-fan commented on code in PR #36530: URL: https://github.com/apache/spark/pull/36530#discussion_r873577805 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala: ## @@ -268,4 +268,59 @@ class OuterJoinEliminationSuite extends PlanTest { comparePlans(optimized, originalQuery.analyze) } + + test("SPARK-39172: Remove left outer join if only left-side columns being selected and " + Review Comment: Remove left outer join if only left-side columns are selected and the join keys on the other side are unique. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36530: [SPARK-39172][SQL] Remove outer join if all output come from streamed side and buffered side keys exist unique key
cloud-fan commented on code in PR #36530: URL: https://github.com/apache/spark/pull/36530#discussion_r873576622 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala: ## @@ -462,6 +462,7 @@ package object dsl { Window(windowExpressions, partitionSpec, orderSpec, logicalPlan) def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan) Review Comment: I don't know why do we need to accept a `Symbol` here. We can probably do a cleanup later and remove this method. The same to `def as(alias: Symbol): NamedExpression = Alias(expr, alias.name)()` in this file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] EnricoMi commented on pull request #36150: [SPARK-38864][SQL] Add melt / unpivot to Dataset
EnricoMi commented on PR #36150: URL: https://github.com/apache/spark/pull/36150#issuecomment-1127495090 @HyukjinKwon @awdavidson @aray I have addressed all comments, rebased and removed the `[WIP]`. Please let me know what you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] EnricoMi commented on pull request #35965: [SPARK-38647][SQL] Add SupportsReportOrdering mix in interface for Scan (DataSourceV2)
EnricoMi commented on PR #35965: URL: https://github.com/apache/spark/pull/35965#issuecomment-1127488590 @sunchao @HyukjinKwon @aokolnychyi @cloud-fan I have addressed comments and rebased. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on a diff in pull request #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
gengliangwang commented on code in PR #36562: URL: https://github.com/apache/spark/pull/36562#discussion_r873559626 ## sql/core/benchmarks/CSVBenchmark-results.txt: ## @@ -2,66 +2,66 @@ Benchmark to measure CSV read/write performance -OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure -Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 12.2.1 +Apple M1 Pro Parsing quoted values:Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -One quoted string 41610 42902 1598 0.0 832194.2 1.0X +One quoted string 16964 16981 15 0.0 339281.1 1.0X Review Comment: Not directly related to this PR, but m1 chips are so fast! cc @dbtsai @dongjoon-hyun @viirya @sunchao @huaxingao -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on pull request #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
gengliangwang commented on PR #36562: URL: https://github.com/apache/spark/pull/36562#issuecomment-1127484622 I will upload the rengenerated benchmark results for json later, which takes more than 1 hour. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
beliefer commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r873554764 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ## @@ -419,6 +420,30 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit } } + private def pushDownOffset(plan: LogicalPlan, offset: Int): (LogicalPlan, Boolean) = plan match { +case operation @ ScanOperation(_, filter, sHolder: ScanBuilderHolder) if filter.isEmpty => + val isPushed = PushDownUtils.pushOffset(sHolder.builder, offset) + if (isPushed) { +sHolder.pushedOffset = Some(offset) + } + (operation, isPushed) +case p: Project => + val (newChild, isPushed) = pushDownOffset(p.child, offset) + (p.withNewChildren(Seq(newChild)), isPushed) +case other => (other, false) + } + + def pushDownOffsets(plan: LogicalPlan): LogicalPlan = plan.transform { +// TODO supports push down Limit append Offset or Offset append Limit +case offset @ Offset(IntegerLiteral(n), child) => Review Comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
beliefer commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r873551016 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/ScanBuilder.java: ## @@ -23,7 +23,7 @@ * An interface for building the {@link Scan}. Implementations can mixin SupportsPushDownXYZ * interfaces to do operator push down, and keep the operator push down result in the returned * {@link Scan}. When pushing down operators, the push down order is: - * sample - filter - aggregate - limit - column pruning. + * sample - filter - aggregate - offset - limit or top N - column pruning. Review Comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang closed pull request #36557: [SPARK-39190][SQL] Provide query context for decimal precision overflow error when WSCG is off
gengliangwang closed pull request #36557: [SPARK-39190][SQL] Provide query context for decimal precision overflow error when WSCG is off URL: https://github.com/apache/spark/pull/36557 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on pull request #36557: [SPARK-39190][SQL] Provide query context for decimal precision overflow error when WSCG is off
gengliangwang commented on PR #36557: URL: https://github.com/apache/spark/pull/36557#issuecomment-1127456145 Merging to master/3.3 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang closed pull request #36445: [SPARK-39096][SQL] Support MERGE commands with DEFAULT values
gengliangwang closed pull request #36445: [SPARK-39096][SQL] Support MERGE commands with DEFAULT values URL: https://github.com/apache/spark/pull/36445 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on pull request #36445: [SPARK-39096][SQL] Support MERGE commands with DEFAULT values
gengliangwang commented on PR #36445: URL: https://github.com/apache/spark/pull/36445#issuecomment-1127454108 Thanks, merging to master! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AnywalkerGiser opened a new pull request, #36566: [SPARK-39176][PYSPARK] Fixed a problem with pyspark serializing pre-1970 datetime in windows
AnywalkerGiser opened a new pull request, #36566: URL: https://github.com/apache/spark/pull/36566 ### What changes were proposed in this pull request? Fix problems with pyspark in Windows: 1. Fixed datetime conversion to timestamp before 1970; 2. Fixed datetime conversion when timestamp is negative; 3. Adding a test script. ### Why are the changes needed? Pyspark has problems serializing pre-1970 times in Windows. An exception occurs when executing the following code under Windows: ```python rdd = sc.parallelize([('a', datetime(1957, 1, 9, 0, 0)), ('b', datetime(2014, 1, 27, 0, 0))]) df = spark.createDataFrame(rdd, ["id", "date"]) df.show() df.printSchema() print(df.collect()) ``` ```bash File "...\spark\python\lib\pyspark.zip\pyspark\sql\types.py", line 195, in toInternal else time.mktime(dt.timetuple())) OverflowError: mktime argument out of range at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ... 1 more ``` and ```bash File ...\spark\python\lib\pyspark.zip\pyspark\sql\types.py, in fromInternal: Line 207: return datetime.datetime.fromtimestamp(ts // 100).replace(microsecond=ts % 100) OSError: [Errno 22] Invalid argument ``` After updating the code, the above code was run successfully! ```bash +---+---+ | id| date| +---+---+ | a|1957-01-08 16:00:00| | b|2014-01-26 16:00:00| +---+---+ root |-- id: string (nullable = true) |-- date: timestamp (nullable = true) [Row(id='a', date=datetime.datetime(1957, 1, 9, 0, 0)), Row(id='b', date=datetime.datetime(2014, 1, 27, 0, 0))] ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New and existing test suites -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36530: [SPARK-39172][SQL] Remove outer join if all output come from streamed side and buffered side keys exist unique key
cloud-fan commented on code in PR #36530: URL: https://github.com/apache/spark/pull/36530#discussion_r873520469 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala: ## @@ -268,4 +268,54 @@ class OuterJoinEliminationSuite extends PlanTest { comparePlans(optimized, originalQuery.analyze) } + + test("SPARK-39172: Remove outer join if all output come from streamed side and buffered side " + +"keys exist unique key") { +val x = testRelation.subquery(Symbol("x")) +val y = testRelation1.subquery(Symbol("y")) + +// left outer +comparePlans(Optimize.execute( + x.join(y.groupBy($"d")($"d"), LeftOuter, Some($"a" === $"d")) +.select($"a", $"b", $"c").analyze), + x.select($"a", $"b", $"c").analyze +) + +comparePlans(Optimize.execute( + x.join(y.groupBy($"d")($"d", count($"d").as("x")), LeftOuter, +Some($"a" === $"d" && $"b" === $"x")) +.select($"a", $"b", $"c").analyze), + x.select($"a", $"b", $"c").analyze +) + +// right outer +comparePlans(Optimize.execute( + x.groupBy($"a")($"a").join(y, RightOuter, Some($"a" === $"d")) +.select($"d", $"e", $"f").analyze), + y.select($"d", $"e", $"f").analyze +) + +comparePlans(Optimize.execute( + x.groupBy($"a")($"a", count($"a").as("x")).join(y, RightOuter, +Some($"a" === $"d" && $"x" === $"e")) +.select($"d", $"e", $"f").analyze), + y.select($"d", $"e", $"f").analyze +) + +// negative case +// not a equi-join +val p1 = x.join(y.groupBy($"d")($"d"), LeftOuter, Some($"a" > $"d")) + .select($"a").analyze +comparePlans(Optimize.execute(p1), p1) + +// do not exist unique key +val p2 = x.join(y.groupBy($"d", $"e")($"d", $"e"), LeftOuter, Some($"a" === $"d")) + .select($"a").analyze +comparePlans(Optimize.execute(p2), p2) + +// output comes from buffered side Review Comment: ```suggestion // output comes from the right side of a left outer join ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36530: [SPARK-39172][SQL] Remove outer join if all output come from streamed side and buffered side keys exist unique key
cloud-fan commented on code in PR #36530: URL: https://github.com/apache/spark/pull/36530#discussion_r873517906 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala: ## @@ -268,4 +268,54 @@ class OuterJoinEliminationSuite extends PlanTest { comparePlans(optimized, originalQuery.analyze) } + + test("SPARK-39172: Remove outer join if all output come from streamed side and buffered side " + Review Comment: please update the test name as well ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala: ## @@ -268,4 +268,54 @@ class OuterJoinEliminationSuite extends PlanTest { comparePlans(optimized, originalQuery.analyze) } + + test("SPARK-39172: Remove outer join if all output come from streamed side and buffered side " + +"keys exist unique key") { +val x = testRelation.subquery(Symbol("x")) Review Comment: ```suggestion val x = testRelation.subquery("x") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36530: [SPARK-39172][SQL] Remove outer join if all output come from streamed side and buffered side keys exist unique key
cloud-fan commented on code in PR #36530: URL: https://github.com/apache/spark/pull/36530#discussion_r873517334 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala: ## @@ -139,6 +139,17 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { * SELECT t1.c1, max(t1.c2) FROM t1 GROUP BY t1.c1 * }}} * + * 3. Remove outer join if: + * - For a left outer join with only left-side columns being selected and the right side join + * keys are unique. + * - For a right outer join with only right-side columns being selected and the left side join + * keys are unique. + * + * {{{ + * SELECT t1.* FROM t1 LEFT JOIN (SELECT DISTINCT c1 as c1 FROM t)t2 ON t1.c1 = t2.c1 ==> Review Comment: ```suggestion * SELECT t1.* FROM t1 LEFT JOIN (SELECT DISTINCT c1 as c1 FROM t) t2 ON t1.c1 = t2.c1 ==> ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on pull request #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
gengliangwang commented on PR #36562: URL: https://github.com/apache/spark/pull/36562#issuecomment-1127443975 @MaxGekk I tried generating the benchmark files for CSV. There is no significant improvement since the timestamp inputs are all valid timestamp strings. Do I need to continue with it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r873515998 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ## @@ -419,6 +420,30 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit } } + private def pushDownOffset(plan: LogicalPlan, offset: Int): (LogicalPlan, Boolean) = plan match { +case operation @ ScanOperation(_, filter, sHolder: ScanBuilderHolder) if filter.isEmpty => + val isPushed = PushDownUtils.pushOffset(sHolder.builder, offset) + if (isPushed) { +sHolder.pushedOffset = Some(offset) + } + (operation, isPushed) +case p: Project => + val (newChild, isPushed) = pushDownOffset(p.child, offset) + (p.withNewChildren(Seq(newChild)), isPushed) +case other => (other, false) + } + + def pushDownOffsets(plan: LogicalPlan): LogicalPlan = plan.transform { +// TODO supports push down Limit append Offset or Offset append Limit +case offset @ Offset(IntegerLiteral(n), child) => Review Comment: We can match offset, limit + offset and offset + limit, similar to the planner rule (after https://github.com/apache/spark/pull/36541) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r873511627 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/ScanBuilder.java: ## @@ -23,7 +23,7 @@ * An interface for building the {@link Scan}. Implementations can mixin SupportsPushDownXYZ * interfaces to do operator push down, and keep the operator push down result in the returned * {@link Scan}. When pushing down operators, the push down order is: - * sample - filter - aggregate - limit - column pruning. + * sample - filter - aggregate - offset - limit or top N - column pruning. Review Comment: top n is a bit tricky as it's sort + limit. how about `aggregate -> limit/top-N(sort + limit) -> offset`? the order of limit and offset doesn't matter as we can always switch the order and adjust the value. And this order matches the physical plan more. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r873511627 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/ScanBuilder.java: ## @@ -23,7 +23,7 @@ * An interface for building the {@link Scan}. Implementations can mixin SupportsPushDownXYZ * interfaces to do operator push down, and keep the operator push down result in the returned * {@link Scan}. When pushing down operators, the push down order is: - * sample - filter - aggregate - limit - column pruning. + * sample - filter - aggregate - offset - limit or top N - column pruning. Review Comment: top n is a bit tricky as it's sort + limit. how about `aggregate -> limit/top-N(sort + limit) -> offset`? the order of limit and offset doesn't matter as we can always switch the order and adjust the value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r873511627 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/ScanBuilder.java: ## @@ -23,7 +23,7 @@ * An interface for building the {@link Scan}. Implementations can mixin SupportsPushDownXYZ * interfaces to do operator push down, and keep the operator push down result in the returned * {@link Scan}. When pushing down operators, the push down order is: - * sample - filter - aggregate - limit - column pruning. + * sample - filter - aggregate - offset - limit or top N - column pruning. Review Comment: top n is a bit tricky as it's sort + limit. how about `aggregate -> limit/top-n(sort + limit) -> offset`? the order of limit and offset doesn't matter as we can always switch the order and adjust the value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36531: [SPARK-39171][SQL] Unify the Cast expression
cloud-fan commented on code in PR #36531: URL: https://github.com/apache/spark/pull/36531#discussion_r873508975 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala: ## @@ -275,6 +376,55 @@ object Cast { case _ => null } } + + // Show suggestion on how to complete the disallowed explicit casting with built-in type + // conversion functions. + private def suggestionOnConversionFunctions ( + from: DataType, + to: DataType, + functionNames: String): String = { +// scalastyle:off line.size.limit +s"""cannot cast ${from.catalogString} to ${to.catalogString}. + |To convert values from ${from.catalogString} to ${to.catalogString}, you can use $functionNames instead. + |""".stripMargin +// scalastyle:on line.size.limit + } + + def typeCheckFailureMessage( + from: DataType, + to: DataType, + fallbackConfKey: Option[String], + fallbackConfValue: Option[String]): String = +(from, to) match { + case (_: NumericType, TimestampType) => +suggestionOnConversionFunctions(from, to, + "functions TIMESTAMP_SECONDS/TIMESTAMP_MILLIS/TIMESTAMP_MICROS") + + case (TimestampType, _: NumericType) => +suggestionOnConversionFunctions(from, to, "functions UNIX_SECONDS/UNIX_MILLIS/UNIX_MICROS") + + case (_: NumericType, DateType) => +suggestionOnConversionFunctions(from, to, "function DATE_FROM_UNIX_DATE") + + case (DateType, _: NumericType) => +suggestionOnConversionFunctions(from, to, "function UNIX_DATE") + + // scalastyle:off line.size.limit + case _ if fallbackConfKey.isDefined && fallbackConfValue.isDefined && Cast.canCast(from, to) => +s""" + | cannot cast ${from.catalogString} to ${to.catalogString} with ANSI mode on. + | If you have to cast ${from.catalogString} to ${to.catalogString}, you can set ${fallbackConfKey.get} as ${fallbackConfValue.get}. Review Comment: Now I see the value of `AnsiCast`: it identifies the cast added by the table insertion resolver so that we can provide a different error message here. I think it's a bit overkill to have a class `AnsiCast` for this purpose. We can have a bool `TreeNodeTag` for `Cast` to indicate if it's added by table insertion resolver. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AnywalkerGiser closed pull request #36565: [SPARK-39176][PYSPARK] Fixed a problem with pyspark serializing pre-1970 datetime in windows
AnywalkerGiser closed pull request #36565: [SPARK-39176][PYSPARK] Fixed a problem with pyspark serializing pre-1970 datetime in windows URL: https://github.com/apache/spark/pull/36565 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AnywalkerGiser opened a new pull request, #36565: [SPARK-39176][PYSPARK] Fixed a problem with pyspark serializing pre-1970 datetime in windows
AnywalkerGiser opened a new pull request, #36565: URL: https://github.com/apache/spark/pull/36565 ### What changes were proposed in this pull request? Fix problems with pyspark in Windows: 1. Fixed datetime conversion to timestamp before 1970; 2. Fixed datetime conversion when timestamp is negative; 3. Adding a test script. ### Why are the changes needed? Pyspark has problems serializing pre-1970 times in Windows. An exception occurs when executing the following code under Windows: ```python rdd = sc.parallelize([('a', datetime(1957, 1, 9, 0, 0)), ('b', datetime(2014, 1, 27, 0, 0))]) df = spark.createDataFrame(rdd, ["id", "date"]) df.show() df.printSchema() print(df.collect()) ``` ```bash File "...\spark\python\lib\pyspark.zip\pyspark\sql\types.py", line 195, in toInternal else time.mktime(dt.timetuple())) OverflowError: mktime argument out of range at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ... 1 more ``` and ```bash File ...\spark\python\lib\pyspark.zip\pyspark\sql\types.py, in fromInternal: Line 207: return datetime.datetime.fromtimestamp(ts // 100).replace(microsecond=ts % 100) OSError: [Errno 22] Invalid argument ``` After updating the code, the above code was run successfully! ```bash +---+---+ | id| date| +---+---+ | a|1957-01-08 16:00:00| | b|2014-01-26 16:00:00| +---+---+ root |-- id: string (nullable = true) |-- date: timestamp (nullable = true) [Row(id='a', date=datetime.datetime(1957, 1, 9, 0, 0)), Row(id='b', date=datetime.datetime(2014, 1, 27, 0, 0))] ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New and existing test suites -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AnywalkerGiser closed pull request #36559: [SPARK-39176][PYSPARK] Fixed a problem with pyspark serializing pre-1970 datetime in windows
AnywalkerGiser closed pull request #36559: [SPARK-39176][PYSPARK] Fixed a problem with pyspark serializing pre-1970 datetime in windows URL: https://github.com/apache/spark/pull/36559 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #36530: [SPARK-39172][SQL] Remove outer join if all output come from streamed side and buffered side keys exist unique key
ulysses-you commented on code in PR #36530: URL: https://github.com/apache/spark/pull/36530#discussion_r873491017 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala: ## @@ -139,6 +139,14 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { * SELECT t1.c1, max(t1.c2) FROM t1 GROUP BY t1.c1 * }}} * + * 3. Remove outer join if all output comes from streamed side and the join keys from buffered side + * exist unique key. Review Comment: changed it to ``` * 3. Remove outer join if: * - For a left outer join with only left-side columns being selected and the right side join * keys are unique. * - For a right outer join with only right-side columns being selected and the left side join * keys are unique. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu opened a new pull request, #36564: [WIP][SPARK-39195][SQL] Spark should use two step update of outputCommitCoordinator
AngersZh opened a new pull request, #36564: URL: https://github.com/apache/spark/pull/36564 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on pull request #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
gengliangwang commented on PR #36562: URL: https://github.com/apache/spark/pull/36562#issuecomment-1127404489 @MaxGekk Sure. BTW I suggest including this one in the RC2. @sadikovi found that the perf is 30% slower with https://github.com/apache/spark/pull/36362. So this one is actually fixing a perf regression in 3.3. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn commented on pull request #36563: [SPARK-39194][SQL] Add a pre resolution builder for spark session extensions
yaooqinn commented on PR #36563: URL: https://github.com/apache/spark/pull/36563#issuecomment-1127402365 cc @cloud-fan @HyukjinKwon thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk closed pull request #36558: [SPARK-39187][SQL][3.3] Remove `SparkIllegalStateException`
MaxGekk closed pull request #36558: [SPARK-39187][SQL][3.3] Remove `SparkIllegalStateException` URL: https://github.com/apache/spark/pull/36558 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #36558: [SPARK-39187][SQL][3.3] Remove `SparkIllegalStateException`
MaxGekk commented on PR #36558: URL: https://github.com/apache/spark/pull/36558#issuecomment-1127392276 Merging to 3.3. Thank you, @HyukjinKwon for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn opened a new pull request, #36563: [SPARK-39194][SQL] Add a pre resolution builder for spark session extensions
yaooqinn opened a new pull request, #36563: URL: https://github.com/apache/spark/pull/36563 ### What changes were proposed in this pull request? This PR aims to introduce an extension point for pre resolution. ### Why are the changes needed? A pre resolution enables developers or users to do some preparations before the actual resolution phase. For example, the current catalog v2 implementations require setting SQL configurations ahead to activate, which is not flexible to use. The current relation resolution always falls back to the current/default catalog, which makes extra resolution rules unable to handle this issue. But W/ this feature we have opportunities to inject catalogs early. ### Does this PR introduce _any_ user-facing change? it is a dev oriented change ### How was this patch tested? new tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on pull request #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
gengliangwang commented on PR #36562: URL: https://github.com/apache/spark/pull/36562#issuecomment-1127384475 cc @sadikovi who reported this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #36560: [SPARK-39192][PYTHON] make pandas-on-spark's kurt consistent with pandas
zhengruifeng commented on PR #36560: URL: https://github.com/apache/spark/pull/36560#issuecomment-1127383653 cc @HyukjinKwon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang opened a new pull request, #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
gengliangwang opened a new pull request, #36562: URL: https://github.com/apache/spark/pull/36562 ### What changes were proposed in this pull request? When reading JSON/CSV files with inferring timestamp types (`.option("inferTimestamp", true)`), the Timestamp conversion will throw and catch exceptions. As we are putting decent error messages in the exception: ``` def cannotCastToDateTimeError( value: Any, from: DataType, to: DataType, errorContext: String): Throwable = { val valueString = toSQLValue(value, from) new SparkDateTimeException("INVALID_SYNTAX_FOR_CAST", Array(toSQLType(to), valueString, SQLConf.ANSI_ENABLED.key, errorContext)) } ``` the creation of the exceptions is actually not cheap. It consumes more than 90% of the type inference time. We can use the parsing methods which return optional results to avoid creating the exceptions. With this PR, the schema inference time is reduced by 90% in a local benchmark. ### Why are the changes needed? Performance improvement ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UT Also manual test the runtime to inferring a JSON file of 624MB with inferring timestamp enabled: ``` spark.read.option("inferTimestamp", true).json(file) ``` - Before the change, it takes 166 seconds - After the change, it only 16 seconds. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] panbingkun opened a new pull request, #36561: [SPARK-37939][SQL] Use error classes in the parsing errors of properties
panbingkun opened a new pull request, #36561: URL: https://github.com/apache/spark/pull/36561 ## What changes were proposed in this pull request? Migrate the following errors in QueryParsingErrors onto use error classes: - cannotCleanReservedNamespacePropertyError => UNSUPPORTED_FEATURE.CLEAN_RESERVED_NAMESPACE_PROPERTY - cannotCleanReservedTablePropertyError => UNSUPPORTED_FEATURE.CLEAN_RESERVED_TABLE_PROPERTY - invalidPropertyKeyForSetQuotedConfigurationError => INVALID_PROPERTY_KEY - invalidPropertyValueForSetQuotedConfigurationError => INVALID_PROPERTY_VALUE - propertiesAndDbPropertiesBothSpecifiedError => UNSUPPORTED_FEATURE.PROPERTIES_AND_DBPROPERTIES_BOTH_SPECIFIED_CONFLICT ### Why are the changes needed? Porting parsing errors of partitions to new error framework, improve test coverage, and document expected error messages in tests. ### Does this PR introduce any user-facing change? No ### How was this patch tested? By running new test: ``` $ build/sbt "sql/testOnly *QueryParsingErrorsSuite*" ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #36560: [SPARK-39192][PYTHON] make pandas-on-spark's kurt consistent with pandas
zhengruifeng commented on PR #36560: URL: https://github.com/apache/spark/pull/36560#issuecomment-1127378942 before this pr ``` In [2]: df = ps.DataFrame({'a': [1, 2, 3, np.nan], 'b': [0.1, 0.2, 0.3, np.nan]}, columns=['a', 'b']) In [3]: df.kurtosis() Out[3]: a -1.5 b -1.5 dtype: float64 In [4]: df.to_pandas().kurtosis() /d0/Dev/Opensource/spark/python/pyspark/pandas/utils.py:976: PandasAPIOnSparkAdviceWarning: `to_pandas` loads all data into the driver's memory. It should only be used if the resulting pandas DataFrame is expected to be small. warnings.warn(message, PandasAPIOnSparkAdviceWarning) Out[4]: a NaN b NaN dtype: float64 In [5]: df = ps.DataFrame({'a': [1, 2, 3, np.nan, 6], 'b': [0.1, 0.2, 0.3, np.nan, 0.8]}, columns=['a', 'b']) In [6]: df.kurtosis() Out[6]: a -1.00 b -0.839477 dtype: float64 In [7]: df.to_pandas().kurtosis() /d0/Dev/Opensource/spark/python/pyspark/pandas/utils.py:976: PandasAPIOnSparkAdviceWarning: `to_pandas` loads all data into the driver's memory. It should only be used if the resulting pandas DataFrame is expected to be small. warnings.warn(message, PandasAPIOnSparkAdviceWarning) Out[7]: a1.50 b2.703924 dtype: float64 ``` after this pr, pandas and pandas-on-spark return the same results -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng opened a new pull request, #36560: [SPARK-39192][PYTHON] make pandas-on-spark's kurt consistent with pandas
zhengruifeng opened a new pull request, #36560: URL: https://github.com/apache/spark/pull/36560 ### What changes were proposed in this pull request? make pandas-on-spark's kurt consistent with pandas ### Why are the changes needed? 1, the formulas of Kurtosis were different between spark sql and pandas; 2, pandas zeros out small `numerator` and `denominator` for better numerical stability; ### Does this PR introduce _any_ user-facing change? yes, the logic of kurt changed ### How was this patch tested? added UT -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #36559: [SPARK-39176][PYSPARK] Fixed a problem with pyspark serializing pre-1970 datetime in windows
AmplabJenkins commented on PR #36559: URL: https://github.com/apache/spark/pull/36559#issuecomment-1127364481 Can one of the admins verify this patch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Yikun commented on a diff in pull request #36464: [SPARK-38947][PYTHON] Supports groupby positional indexing
Yikun commented on code in PR #36464: URL: https://github.com/apache/spark/pull/36464#discussion_r873437306 ## python/pyspark/pandas/groupby.py: ## @@ -2110,22 +2110,60 @@ def _limit(self, n: int, asc: bool) -> FrameLike: groupkey_scols = [psdf._internal.spark_column_for(label) for label in groupkey_labels] sdf = psdf._internal.spark_frame -tmp_col = verify_temp_column_name(sdf, "__row_number__") +window = Window.partitionBy(*groupkey_scols) # This part is handled differently depending on whether it is a tail or a head. -window = ( - Window.partitionBy(*groupkey_scols).orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).asc()) +ordered_window = ( +window.orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).asc()) if asc -else Window.partitionBy(*groupkey_scols).orderBy( -F.col(NATURAL_ORDER_COLUMN_NAME).desc() -) +else window.orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).desc()) ) -sdf = ( -sdf.withColumn(tmp_col, F.row_number().over(window)) -.filter(F.col(tmp_col) <= n) -.drop(tmp_col) -) +if n >= 0 or LooseVersion(pd.__version__) < LooseVersion("1.4.0"): +tmp_row_num_col = verify_temp_column_name(sdf, "__row_number__") +sdf = ( +sdf.withColumn(tmp_row_num_col, F.row_number().over(ordered_window)) +.filter(F.col(tmp_row_num_col) <= n) +.drop(tmp_row_num_col) +) Review Comment: BTW, we could also consider to unify here to use `lag` way: ```python sdf = ( sdf.withColumn(tmp_lag_col, F.lag(F.lit(0), n).over(window)) # for positive case .where(F.isnull(F.col(tmp_lag_col))) .drop(tmp_lag_col) ) ``` If you guys think it's necessary, I can submit a separate PR to address it. Theoretically, `lag` has better performance than `row_number` especially when rows number is very huge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Yikun commented on a diff in pull request #36464: [SPARK-38947][PYTHON] Supports groupby positional indexing
Yikun commented on code in PR #36464: URL: https://github.com/apache/spark/pull/36464#discussion_r873437306 ## python/pyspark/pandas/groupby.py: ## @@ -2110,22 +2110,60 @@ def _limit(self, n: int, asc: bool) -> FrameLike: groupkey_scols = [psdf._internal.spark_column_for(label) for label in groupkey_labels] sdf = psdf._internal.spark_frame -tmp_col = verify_temp_column_name(sdf, "__row_number__") +window = Window.partitionBy(*groupkey_scols) # This part is handled differently depending on whether it is a tail or a head. -window = ( - Window.partitionBy(*groupkey_scols).orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).asc()) +ordered_window = ( +window.orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).asc()) if asc -else Window.partitionBy(*groupkey_scols).orderBy( -F.col(NATURAL_ORDER_COLUMN_NAME).desc() -) +else window.orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).desc()) ) -sdf = ( -sdf.withColumn(tmp_col, F.row_number().over(window)) -.filter(F.col(tmp_col) <= n) -.drop(tmp_col) -) +if n >= 0 or LooseVersion(pd.__version__) < LooseVersion("1.4.0"): +tmp_row_num_col = verify_temp_column_name(sdf, "__row_number__") +sdf = ( +sdf.withColumn(tmp_row_num_col, F.row_number().over(ordered_window)) +.filter(F.col(tmp_row_num_col) <= n) +.drop(tmp_row_num_col) +) Review Comment: BTW, we could also consider to unify here to use `lag` way: ```python sdf = ( sdf.withColumn(tmp_lag_col, F.lag(F.lit(0), n).over(window)) # for positive case .where(F.isnull(F.col(tmp_lag_col))) .drop(tmp_lag_col) ) ``` I can submit a separate PR to address it, if you guys think it's necessary. Theoretically, `lag` has better performance than `row_number` especially when rows number is very huge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Yikun commented on a diff in pull request #36464: [SPARK-38947][PYTHON] Supports groupby positional indexing
Yikun commented on code in PR #36464: URL: https://github.com/apache/spark/pull/36464#discussion_r873426625 ## python/pyspark/pandas/groupby.py: ## @@ -2110,22 +2110,79 @@ def _limit(self, n: int, asc: bool) -> FrameLike: groupkey_scols = [psdf._internal.spark_column_for(label) for label in groupkey_labels] sdf = psdf._internal.spark_frame -tmp_col = verify_temp_column_name(sdf, "__row_number__") +tmp_row_num_col = verify_temp_column_name(sdf, "__row_number__") +window = Window.partitionBy(*groupkey_scols) # This part is handled differently depending on whether it is a tail or a head. -window = ( - Window.partitionBy(*groupkey_scols).orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).asc()) +ordered_window = ( +window.orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).asc()) if asc -else Window.partitionBy(*groupkey_scols).orderBy( -F.col(NATURAL_ORDER_COLUMN_NAME).desc() -) +else window.orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).desc()) ) -sdf = ( -sdf.withColumn(tmp_col, F.row_number().over(window)) -.filter(F.col(tmp_col) <= n) -.drop(tmp_col) -) +if n >= 0 or LooseVersion(pd.__version__) < LooseVersion("1.4.0"): + +sdf = ( +sdf.withColumn(tmp_row_num_col, F.row_number().over(ordered_window)) +.filter(F.col(tmp_row_num_col) <= n) +.drop(tmp_row_num_col) +) +else: +# Pandas supports Groupby positional indexing since v1.4.0 +# https://pandas.pydata.org/docs/whatsnew/v1.4.0.html#groupby-positional-indexing +# +# To support groupby positional indexing, we need add two columns to help we filter +# target rows: +# - Add `__row_number__` and `__group_count__` columns. +# - Use `F.col(tmp_row_num_col) - F.col(tmp_cnt_col) <= positional_index_number` to +# filter target rows. +# - Then drop `__row_number__` and `__group_count__` columns. +# +# For example for the dataframe: +# >>> df = ps.DataFrame([["g", "g0"], +# ... ["g", "g1"], +# ... ["g", "g2"], +# ... ["g", "g3"], +# ... ["h", "h0"], +# ... ["h", "h1"]], columns=["A", "B"]) +# >>> df.groupby("A").head(-1) +# +# Below is an example to show the `__row_number__` column and `__group_count__` column +# for above df: +# >>> sdf.withColumn(tmp_row_num_col, F.row_number().over(window)) +#.withColumn(tmp_cnt_col, F.count("*").over(window)).show() +# +---++---+---++--+---+ +# |__index_level..|__groupkey..| A| B|__natural_..|__row_number__|__group_count__| +# +---++---+---++--+---+ +# | 0| g| g| g0| 17179869184| 1| 4| +# | 1| g| g| g1| 42949672960| 2| 4| +# | 2| g| g| g2| 60129542144| 3| 4| +# | 3| g| g| g3| 85899345920| 4| 4| +# | 4| h| h| h0|111669149696| 1| 2| +# | 5| h| h| h1|128849018880| 2| 2| +# +---++---+---++--+---+ +# +# The limit n is `-1`, we need to filter rows[:-1] in each group: +# +# >>> sdf.withColumn(tmp_row_num_col, F.row_number().over(window)) +#.withColumn(tmp_cnt_col, F.count("*").over(window)) +#.filter(F.col(tmp_row_num_col) - F.col(tmp_cnt_col) <= -1).show() Review Comment: @zhengruifeng I think `lag` is better in here: 1 `WindowsExec` + 2 `sort` + 1 `shuffle`, same cost with orignal implments == Physical Plan == ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Project [__index_level_0__#0, __groupkey_0__#19L, a#1L, b#2L, c#3L, __natural_order__#8L] +- Filter isnull(__tmp_lag__#447) +- Window [lag(0, -2, null) windowspecdefinition(__groupkey_0__#19L, __natural_order__#8L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -2, -2)) AS __tmp_lag__#447], [__groupkey_0__#19L], [__natural_order__#8L ASC NULLS FIRST] +- Sort [__groupkey_0__#19L ASC NULLS FIRST, __natural_order__#8L ASC NULLS FIRST], false, 0
[GitHub] [spark] beliefer commented on a diff in pull request #36541: [SPARK-39180][SQL] Simplify the planning of limit and offset
beliefer commented on code in PR #36541: URL: https://github.com/apache/spark/pull/36541#discussion_r873425906 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala: ## @@ -1303,12 +1303,25 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr copy(child = newChild) } +object OffsetAndLimit { + def unapply(p: GlobalLimit): Option[(Int, Int, LogicalPlan)] = { +p match { + // Optimizer pushes local limit through offset, so we need to match the plan this way. + case GlobalLimit(IntegerLiteral(globalLimit), + Offset(IntegerLiteral(offset), + LocalLimit(IntegerLiteral(localLimit), child))) + if globalLimit + offset == localLimit => +Some((offset, globalLimit, child)) Review Comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36541: [SPARK-39180][SQL] Simplify the planning of limit and offset
cloud-fan commented on code in PR #36541: URL: https://github.com/apache/spark/pull/36541#discussion_r873425084 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala: ## @@ -1303,12 +1303,25 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr copy(child = newChild) } +object OffsetAndLimit { + def unapply(p: GlobalLimit): Option[(Int, Int, LogicalPlan)] = { +p match { + // Optimizer pushes local limit through offset, so we need to match the plan this way. + case GlobalLimit(IntegerLiteral(globalLimit), + Offset(IntegerLiteral(offset), + LocalLimit(IntegerLiteral(localLimit), child))) + if globalLimit + offset == localLimit => +Some((offset, globalLimit, child)) Review Comment: It's better to go for better readability, instead of saving a bit typing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36541: [SPARK-39180][SQL] Simplify the planning of limit and offset
cloud-fan commented on code in PR #36541: URL: https://github.com/apache/spark/pull/36541#discussion_r873423506 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala: ## @@ -1303,12 +1303,25 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr copy(child = newChild) } +object OffsetAndLimit { + def unapply(p: GlobalLimit): Option[(Int, Int, LogicalPlan)] = { +p match { + // Optimizer pushes local limit through offset, so we need to match the plan this way. + case GlobalLimit(IntegerLiteral(globalLimit), + Offset(IntegerLiteral(offset), + LocalLimit(IntegerLiteral(localLimit), child))) + if globalLimit + offset == localLimit => +Some((offset, globalLimit, child)) Review Comment: This pattern match is to match a logical offset + limit, but we care more about semantics here. Returning `localLimit` is super confusing. ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala: ## @@ -1303,12 +1303,25 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr copy(child = newChild) } +object OffsetAndLimit { + def unapply(p: GlobalLimit): Option[(Int, Int, LogicalPlan)] = { +p match { + // Optimizer pushes local limit through offset, so we need to match the plan this way. + case GlobalLimit(IntegerLiteral(globalLimit), + Offset(IntegerLiteral(offset), + LocalLimit(IntegerLiteral(localLimit), child))) + if globalLimit + offset == localLimit => +Some((offset, globalLimit, child)) Review Comment: This pattern match is to match a logical offset + limit, and we care more about semantics here. Returning `localLimit` is super confusing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #36541: [SPARK-39180][SQL] Simplify the planning of limit and offset
beliefer commented on code in PR #36541: URL: https://github.com/apache/spark/pull/36541#discussion_r873410416 ## sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala: ## @@ -278,9 +265,9 @@ case class GlobalLimitAndOffsetExec( } /** - * Take the first limit elements as defined by the sortOrder, and do projection if needed. - * This is logically equivalent to having a Limit operator after a [[SortExec]] operator, - * or having a [[ProjectExec]] operator between them. + * Take the first `limit`` elements as defined by the sortOrder, then drop the first `offset` Review Comment: ```suggestion * Take the first `limit` elements as defined by the sortOrder, then drop the first `offset` ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #36541: [SPARK-39180][SQL] Simplify the planning of limit and offset
beliefer commented on code in PR #36541: URL: https://github.com/apache/spark/pull/36541#discussion_r873409826 ## sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala: ## @@ -215,61 +211,52 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { } /** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { - - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil - - override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = -copy(child = newChild) -} - -/** - * Skip the first `offset` elements then take the first `limit` of the following elements in - * the child's single output partition. + * Take the first `limit` elements and then drop the first `offset` elements in the child's single + * output partition. */ -case class GlobalLimitAndOffsetExec( -limit: Int = -1, -offset: Int, -child: SparkPlan) extends BaseLimitExec { - assert(offset > 0) +case class GlobalLimitExec(limit: Int = -1, child: SparkPlan, offset: Int = 0) + extends BaseLimitExec { + assert(limit >= 0 || (limit == -1 && offset > 0)) override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil - override def doExecute(): RDD[InternalRow] = if (limit >= 0) { -child.execute().mapPartitionsInternal(iter => iter.slice(offset, limit + offset)) - } else { -child.execute().mapPartitionsInternal(iter => iter.drop(offset)) + override def doExecute(): RDD[InternalRow] = { +if (offset > 0) { + if (limit >= 0) { +child.execute().mapPartitionsInternal(iter => iter.slice(offset, limit)) + } else { +child.execute().mapPartitionsInternal(iter => iter.drop(offset)) + } +} else { + super.doExecute() +} } - private lazy val skipTerm = BaseLimitExec.newLimitCountTerm() - override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { -ctx.addMutableState( - CodeGenerator.JAVA_INT, skipTerm, forceInline = true, useFreshName = false) -if (limit >= 0) { - // The counter name is already obtained by the upstream operators via `limitNotReachedChecks`. - // Here we have to inline it to not change its name. This is fine as we won't have many limit - // operators in one query. - ctx.addMutableState( -CodeGenerator.JAVA_INT, countTerm, forceInline = true, useFreshName = false) - s""" - | if ($skipTerm < $offset) { - | $skipTerm += 1; - | } else if ($countTerm < $limit) { - | $countTerm += 1; - | ${consume(ctx, input)} - | } +if (offset > 0) { + val skipTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "rowsSkipped", forceInline = true) + if (limit > 0) { +// In codegen, we skip the first `offset` rows, then take the first `limit - offset` rows. +val finalLimit = limit - offset +s""" + | if ($skipTerm < $offset) { + | $skipTerm += 1; + | } else if ($countTerm < $finalLimit) { + | $countTerm += 1; + | ${consume(ctx, input)} + | } """.stripMargin -} else { - s""" - | if ($skipTerm < $offset) { - | $skipTerm += 1; - | } else { - | ${consume(ctx, input)} - | } + } else { +s""" + | if ($skipTerm < $offset) { + | $skipTerm += 1; + | } else { + | ${consume(ctx, input)} + | } """.stripMargin Review Comment: ```suggestion """.stripMargin ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #36541: [SPARK-39180][SQL] Simplify the planning of limit and offset
beliefer commented on code in PR #36541: URL: https://github.com/apache/spark/pull/36541#discussion_r873406676 ## sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala: ## @@ -81,55 +81,56 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { */ object SpecialLimits extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case ReturnAnswer(rootPlan) => rootPlan match { -case Limit(IntegerLiteral(limit), Sort(order, true, child)) -if limit < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil -case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) -if limit < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil + // Call `planTakeOrdered` first which matches a larger plan. + case ReturnAnswer(rootPlan) => planTakeOrdered(rootPlan).getOrElse(rootPlan match { +// We should match the combination of limit and offset first, to get the optimal physical +// plan, instead of planning limit and offset separately. +case LimitAndOffset(limit, offset, child) => + CollectLimitExec(limit = limit, child = planLater(child), offset = offset) +case OffsetAndLimit(offset, limit, child) => + // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. + CollectLimitExec(limit = offset + limit, child = planLater(child), offset = offset) Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #36541: [SPARK-39180][SQL] Simplify the planning of limit and offset
beliefer commented on code in PR #36541: URL: https://github.com/apache/spark/pull/36541#discussion_r873403382 ## sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala: ## @@ -814,12 +815,19 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.LocalRelation(output, data, _) => LocalTableScanExec(output, data) :: Nil case CommandResult(output, _, plan, data) => CommandResultExec(output, plan, data) :: Nil + // We should match the combination of limit and offset first, to get the optimal physical + // plan, instead of planning limit and offset separately. + case LimitAndOffset(limit, offset, child) => +GlobalLimitExec(limit, planLater(child), offset) :: Nil + case OffsetAndLimit(offset, limit, child) => +// 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. +GlobalLimitExec(limit = offset + limit, child = planLater(child), offset = offset) :: Nil Review Comment: ditto ## sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala: ## @@ -81,55 +81,56 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { */ object SpecialLimits extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case ReturnAnswer(rootPlan) => rootPlan match { -case Limit(IntegerLiteral(limit), Sort(order, true, child)) -if limit < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil -case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) -if limit < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil + // Call `planTakeOrdered` first which matches a larger plan. + case ReturnAnswer(rootPlan) => planTakeOrdered(rootPlan).getOrElse(rootPlan match { +// We should match the combination of limit and offset first, to get the optimal physical +// plan, instead of planning limit and offset separately. +case LimitAndOffset(limit, offset, child) => + CollectLimitExec(limit = limit, child = planLater(child), offset = offset) +case OffsetAndLimit(offset, limit, child) => + // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. + CollectLimitExec(limit = offset + limit, child = planLater(child), offset = offset) case Limit(IntegerLiteral(limit), child) => - CollectLimitExec(limit, planLater(child)) :: Nil -case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), - Sort(order, true, child)) if limit + offset < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec( -limit, order, child.output, planLater(child), offset) :: Nil -case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), - Project(projectList, Sort(order, true, child))) -if limit + offset < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec( -limit, order, projectList, planLater(child), offset) :: Nil -case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) => - CollectLimitExec(limit, planLater(child), offset) :: Nil + CollectLimitExec(limit = limit, child = planLater(child)) case logical.Offset(IntegerLiteral(offset), child) => - CollectLimitExec(child = planLater(child), offset = offset) :: Nil + CollectLimitExec(child = planLater(child), offset = offset) case Tail(IntegerLiteral(limit), child) => - CollectTailExec(limit, planLater(child)) :: Nil -case other => planLater(other) :: Nil - } + CollectTailExec(limit, planLater(child)) +case other => planLater(other) + }) :: Nil + + case other => planTakeOrdered(other).toSeq +} + +private def planTakeOrdered(plan: LogicalPlan): Option[SparkPlan] = plan match { + // We should match the combination of limit and offset first, to get the optimal physical + // plan, instead of planning limit and offset separately. + case LimitAndOffset(limit, offset, Sort(order, true, child)) + if limit < conf.topKSortFallbackThreshold => +Some(TakeOrderedAndProjectExec( + limit, order, child.output, planLater(child), offset)) + case LimitAndOffset(limit, offset, Project(projectList, Sort(order, true, child))) + if limit < conf.topKSortFallbackThreshold => +Some(TakeOrderedAndProjectExec( + limit, order, projectList, planLater(child), offset)) + // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. + case OffsetAndLimit(offset, limit, Sort(order, true, child)) + if offset + limit <
[GitHub] [spark] beliefer commented on a diff in pull request #36541: [SPARK-39180][SQL] Simplify the planning of limit and offset
beliefer commented on code in PR #36541: URL: https://github.com/apache/spark/pull/36541#discussion_r873402602 ## sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala: ## @@ -81,55 +81,56 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { */ object SpecialLimits extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case ReturnAnswer(rootPlan) => rootPlan match { -case Limit(IntegerLiteral(limit), Sort(order, true, child)) -if limit < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil -case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) -if limit < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil + // Call `planTakeOrdered` first which matches a larger plan. + case ReturnAnswer(rootPlan) => planTakeOrdered(rootPlan).getOrElse(rootPlan match { +// We should match the combination of limit and offset first, to get the optimal physical +// plan, instead of planning limit and offset separately. +case LimitAndOffset(limit, offset, child) => + CollectLimitExec(limit = limit, child = planLater(child), offset = offset) +case OffsetAndLimit(offset, limit, child) => + // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. + CollectLimitExec(limit = offset + limit, child = planLater(child), offset = offset) case Limit(IntegerLiteral(limit), child) => - CollectLimitExec(limit, planLater(child)) :: Nil -case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), - Sort(order, true, child)) if limit + offset < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec( -limit, order, child.output, planLater(child), offset) :: Nil -case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), - Project(projectList, Sort(order, true, child))) -if limit + offset < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec( -limit, order, projectList, planLater(child), offset) :: Nil -case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) => - CollectLimitExec(limit, planLater(child), offset) :: Nil + CollectLimitExec(limit = limit, child = planLater(child)) case logical.Offset(IntegerLiteral(offset), child) => - CollectLimitExec(child = planLater(child), offset = offset) :: Nil + CollectLimitExec(child = planLater(child), offset = offset) case Tail(IntegerLiteral(limit), child) => - CollectTailExec(limit, planLater(child)) :: Nil -case other => planLater(other) :: Nil - } + CollectTailExec(limit, planLater(child)) +case other => planLater(other) + }) :: Nil + + case other => planTakeOrdered(other).toSeq +} + +private def planTakeOrdered(plan: LogicalPlan): Option[SparkPlan] = plan match { + // We should match the combination of limit and offset first, to get the optimal physical + // plan, instead of planning limit and offset separately. + case LimitAndOffset(limit, offset, Sort(order, true, child)) + if limit < conf.topKSortFallbackThreshold => +Some(TakeOrderedAndProjectExec( + limit, order, child.output, planLater(child), offset)) + case LimitAndOffset(limit, offset, Project(projectList, Sort(order, true, child))) + if limit < conf.topKSortFallbackThreshold => +Some(TakeOrderedAndProjectExec( + limit, order, projectList, planLater(child), offset)) + // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. + case OffsetAndLimit(offset, limit, Sort(order, true, child)) + if offset + limit < conf.topKSortFallbackThreshold => Review Comment: If we use `localLimit` directly, we can avoid `offset + limit` here. ## sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala: ## @@ -81,55 +81,56 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { */ object SpecialLimits extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case ReturnAnswer(rootPlan) => rootPlan match { -case Limit(IntegerLiteral(limit), Sort(order, true, child)) -if limit < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil -case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) -if limit < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil + //
[GitHub] [spark] beliefer commented on a diff in pull request #36541: [SPARK-39180][SQL] Simplify the planning of limit and offset
beliefer commented on code in PR #36541: URL: https://github.com/apache/spark/pull/36541#discussion_r873401681 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala: ## @@ -1303,12 +1303,25 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr copy(child = newChild) } +object OffsetAndLimit { + def unapply(p: GlobalLimit): Option[(Int, Int, LogicalPlan)] = { +p match { + // Optimizer pushes local limit through offset, so we need to match the plan this way. + case GlobalLimit(IntegerLiteral(globalLimit), + Offset(IntegerLiteral(offset), + LocalLimit(IntegerLiteral(localLimit), child))) + if globalLimit + offset == localLimit => +Some((offset, globalLimit, child)) Review Comment: In fact, we not use globalLimit in physical plan. It seems we can return localLimit here. Then we can avoid `+` in physical plan. ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala: ## @@ -1303,12 +1303,25 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr copy(child = newChild) } +object OffsetAndLimit { + def unapply(p: GlobalLimit): Option[(Int, Int, LogicalPlan)] = { +p match { + // Optimizer pushes local limit through offset, so we need to match the plan this way. + case GlobalLimit(IntegerLiteral(globalLimit), + Offset(IntegerLiteral(offset), + LocalLimit(IntegerLiteral(localLimit), child))) + if globalLimit + offset == localLimit => +Some((offset, globalLimit, child)) Review Comment: In fact, we not use `globalLimit` in physical plan. It seems we can return `localLimit` here. Then we can avoid `+` in physical plan. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AnywalkerGiser closed pull request #36559: [SPARK-39176][PYSPARK] Fixed a problem with pyspark serializing pre-1970 datetime in windows
AnywalkerGiser closed pull request #36559: [SPARK-39176][PYSPARK] Fixed a problem with pyspark serializing pre-1970 datetime in windows URL: https://github.com/apache/spark/pull/36559 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36541: [SPARK-39180][SQL] Simplify the planning of limit and offset
cloud-fan commented on code in PR #36541: URL: https://github.com/apache/spark/pull/36541#discussion_r873386778 ## sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala: ## @@ -82,52 +82,45 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object SpecialLimits extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case ReturnAnswer(rootPlan) => rootPlan match { -case Limit(IntegerLiteral(limit), Sort(order, true, child)) Review Comment: handled by `planTakeOrdered` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AnywalkerGiser opened a new pull request, #36559: [SPARK-39176][PYSPARK] Fixed a problem with pyspark serializing pre-1970 datetime in windows
AnywalkerGiser opened a new pull request, #36559: URL: https://github.com/apache/spark/pull/36559 ### What changes were proposed in this pull request? Fix problems with pyspark in Windows: 1. Fixed datetime conversion to timestamp before 1970; 2. Fixed datetime conversion when timestamp is negative; 3. Adding a test script. ### Why are the changes needed? Pyspark has problems serializing pre-1970 times in Windows. An exception occurs when executing the following code under Windows: ```python rdd = sc.parallelize([('a', datetime(1957, 1, 9, 0, 0)), ('b', datetime(2014, 1, 27, 0, 0))]) df = spark.createDataFrame(rdd, ["id", "date"]) df.show() df.printSchema() print(df.collect()) ``` ```bash File "...\spark\python\lib\pyspark.zip\pyspark\sql\types.py", line 195, in toInternal else time.mktime(dt.timetuple())) OverflowError: mktime argument out of range at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ... 1 more ``` and ```bash File ...\spark\python\lib\pyspark.zip\pyspark\sql\types.py, in fromInternal: Line 207: return datetime.datetime.fromtimestamp(ts // 100).replace(microsecond=ts % 100) OSError: [Errno 22] Invalid argument ``` After updating the code, the above code was run successfully! ```bash +---+---+ | id| date| +---+---+ | a|1957-01-08 16:00:00| | b|2014-01-26 16:00:00| +---+---+ root |-- id: string (nullable = true) |-- date: timestamp (nullable = true) [Row(id='a', date=datetime.datetime(1957, 1, 9, 0, 0)), Row(id='b', date=datetime.datetime(2014, 1, 27, 0, 0))] ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New and existing test suites -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AnywalkerGiser commented on a diff in pull request #36537: [SPARK-39176][PYSPARK] Fixed a problem with pyspark serializing pre-1970 datetime in windows
AnywalkerGiser commented on code in PR #36537: URL: https://github.com/apache/spark/pull/36537#discussion_r873359793 ## python/pyspark/tests/test_rdd.py: ## @@ -669,6 +670,12 @@ def test_sample(self): wr_s21 = rdd.sample(True, 0.4, 21).collect() self.assertNotEqual(set(wr_s11), set(wr_s21)) +def test_datetime(self): Review Comment: It has been added and modified, please approve it again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk opened a new pull request, #36558: [SPARK-39187][SQL][3.3] Remove `SparkIllegalStateException`
MaxGekk opened a new pull request, #36558: URL: https://github.com/apache/spark/pull/36558 ### What changes were proposed in this pull request? Remove `SparkIllegalStateException` and replace it by `IllegalStateException` where it was used. This is a backport of https://github.com/apache/spark/pull/36550. ### Why are the changes needed? To improve code maintenance and be consistent to other places where `IllegalStateException` is used in illegal states (for instance, see https://github.com/apache/spark/pull/36524). After the PR https://github.com/apache/spark/pull/36500, the exception is substituted by `SparkException` w/ the `INTERNAL_ERROR` error class. ### Does this PR introduce _any_ user-facing change? No. Users shouldn't face to the exception in regular cases. ### How was this patch tested? By running the affected test suites: ``` $ build/sbt "sql/test:testOnly *QueryExecutionErrorsSuite*" $ build/sbt "test:testOnly *ArrowUtilsSuite" ``` Authored-by: Max Gekk Signed-off-by: Max Gekk (cherry picked from commit 1a90512f605c490255f7b38215c207e64621475b) Signed-off-by: Max Gekk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AnywalkerGiser closed pull request #36537: [SPARK-39176][PYSPARK][WINDOWS] Fixed a problem with pyspark serializing pre-1970 datetime in windows
AnywalkerGiser closed pull request #36537: [SPARK-39176][PYSPARK][WINDOWS] Fixed a problem with pyspark serializing pre-1970 datetime in windows URL: https://github.com/apache/spark/pull/36537 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36530: [SPARK-39172][SQL] Remove outer join if all output come from streamed side and buffered side keys exist unique key
cloud-fan commented on code in PR #36530: URL: https://github.com/apache/spark/pull/36530#discussion_r873346931 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala: ## @@ -211,6 +219,15 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { if projectList.forall(_.deterministic) && p.references.subsetOf(right.outputSet) && allDuplicateAgnostic(aggExprs) => a.copy(child = p.copy(child = right)) + +case p @ Project(_, ExtractEquiJoinKeys(LeftOuter, _, rightKeys, _, _, left, right, _)) +if right.distinctKeys.exists(_.subsetOf(ExpressionSet(rightKeys))) && + p.references.subsetOf(left.outputSet) => + p.copy(child = left) Review Comment: For a left outer join with only left-side columns being selected, the join can only change the result if we can find more than one matched row on the right side. If the right side join keys are unique, apparently we can't find more than one match. So this optimization LGTM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36530: [SPARK-39172][SQL] Remove outer join if all output come from streamed side and buffered side keys exist unique key
cloud-fan commented on code in PR #36530: URL: https://github.com/apache/spark/pull/36530#discussion_r873344595 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala: ## @@ -139,6 +139,14 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { * SELECT t1.c1, max(t1.c2) FROM t1 GROUP BY t1.c1 * }}} * + * 3. Remove outer join if all output comes from streamed side and the join keys from buffered side + * exist unique key. Review Comment: it looks a bit weird to talk about stream side and buffer side in the logical plan phase. Can we explain this optimization in a different way? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r873341127 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownOffset.java: ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read; + +import org.apache.spark.annotation.Evolving; + +/** + * A mix-in interface for {@link ScanBuilder}. Data sources can implement this interface to + * push down OFFSET. Please note that the combination of OFFSET with other operations + * such as AGGREGATE, GROUP BY, SORT BY, CLUSTER BY, DISTRIBUTE BY, etc. is NOT pushed down. Review Comment: BTW we need to update `ScanBuider`'s classdoc for new pushdown support. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r873340929 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownOffset.java: ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read; + +import org.apache.spark.annotation.Evolving; + +/** + * A mix-in interface for {@link ScanBuilder}. Data sources can implement this interface to + * push down OFFSET. Please note that the combination of OFFSET with other operations + * such as AGGREGATE, GROUP BY, SORT BY, CLUSTER BY, DISTRIBUTE BY, etc. is NOT pushed down. Review Comment: I understand that this is copied from other pushdown interfaces, but I find it really hard to follow. We can push down OFFSET with many other operators if they follow the operator order we defined in `ScanBuilder`'s class doc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #36479: [SPARK-38688][SQL][TESTS] Use error classes in the compilation errors of deserializer
MaxGekk commented on PR #36479: URL: https://github.com/apache/spark/pull/36479#issuecomment-1127239102 @panbingkun Since this PR modified error classes, could you backport it to branch-3.3, please. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk closed pull request #36479: [SPARK-38688][SQL][TESTS] Use error classes in the compilation errors of deserializer
MaxGekk closed pull request #36479: [SPARK-38688][SQL][TESTS] Use error classes in the compilation errors of deserializer URL: https://github.com/apache/spark/pull/36479 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan closed pull request #36412: [SPARK-39073][SQL] Keep rowCount after hive table partition pruning if table only have hive statistics
cloud-fan closed pull request #36412: [SPARK-39073][SQL] Keep rowCount after hive table partition pruning if table only have hive statistics URL: https://github.com/apache/spark/pull/36412 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #36412: [SPARK-39073][SQL] Keep rowCount after hive table partition pruning if table only have hive statistics
cloud-fan commented on PR #36412: URL: https://github.com/apache/spark/pull/36412#issuecomment-1127235625 thanks, merging to master! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36412: [SPARK-39073][SQL] Keep rowCount after hive table partition pruning if table only have hive statistics
cloud-fan commented on code in PR #36412: URL: https://github.com/apache/spark/pull/36412#discussion_r87309 ## sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala: ## @@ -80,10 +80,15 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) val colStats = filteredStats.map(_.attributeStats.map { case (attr, colStat) => (attr.name, colStat.toCatalogColumnStat(attr.name, attr.dataType)) }) + val rowCount = if (prunedPartitions.forall(_.stats.flatMap(_.rowCount).exists(_ > 0))) { Review Comment: you are right, I misread the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk closed pull request #36550: [SPARK-39187][SQL] Remove `SparkIllegalStateException`
MaxGekk closed pull request #36550: [SPARK-39187][SQL] Remove `SparkIllegalStateException` URL: https://github.com/apache/spark/pull/36550 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan closed pull request #36121: [SPARK-38836][SQL] Improve the performance of ExpressionSet
cloud-fan closed pull request #36121: [SPARK-38836][SQL] Improve the performance of ExpressionSet URL: https://github.com/apache/spark/pull/36121 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #36550: [SPARK-39187][SQL] Remove `SparkIllegalStateException`
MaxGekk commented on PR #36550: URL: https://github.com/apache/spark/pull/36550#issuecomment-1127234215 Merging to master. Thank you, @HyukjinKwon and @cloud-fan for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #36121: [SPARK-38836][SQL] Improve the performance of ExpressionSet
cloud-fan commented on PR #36121: URL: https://github.com/apache/spark/pull/36121#issuecomment-1127234077 thanks, merging to master! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AnywalkerGiser commented on pull request #36537: [SPARK-39176][PYSPARK][WINDOWS] Fixed a problem with pyspark serializing pre-1970 datetime in windows
AnywalkerGiser commented on PR #36537: URL: https://github.com/apache/spark/pull/36537#issuecomment-1127233836 @HyukjinKwon It hasn't been tested in master, I found the problem in 3.0.1, and I can test it in master later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36541: [SPARK-39180][SQL] Simplify the planning of limit and offset
cloud-fan commented on code in PR #36541: URL: https://github.com/apache/spark/pull/36541#discussion_r873317698 ## sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala: ## @@ -82,52 +82,45 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object SpecialLimits extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case ReturnAnswer(rootPlan) => rootPlan match { -case Limit(IntegerLiteral(limit), Sort(order, true, child)) Review Comment: As I mentioned in the PR description, we don't need to plan `TakeOrderedAndProjectExec` under `ReturnAnswer`, as we don't have special logic for it. It will still be planned in the normal code path, which is `case other => planLater(other) :: Nil` and we do have planner rule to match `Limit(IntegerLiteral(limit), Sort(order, true, child))` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36541: [SPARK-39180][SQL] Simplify the planning of limit and offset
cloud-fan commented on code in PR #36541: URL: https://github.com/apache/spark/pull/36541#discussion_r873317698 ## sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala: ## @@ -82,52 +82,45 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object SpecialLimits extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case ReturnAnswer(rootPlan) => rootPlan match { -case Limit(IntegerLiteral(limit), Sort(order, true, child)) Review Comment: As I mentioned in the PR description, we don't need to plan `TakeOrderedAndProjectExec` under `ReturnAnswer`, as we don't have special logic for it. It will still be planned in the normal code path, which is `case other => planLater(other) :: Nil` and we do have planner rule to match `Limit(IntegerLiteral(limit), Sort(order, true, child))` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36531: [SPARK-39171][SQL] Unify the Cast expression
cloud-fan commented on code in PR #36531: URL: https://github.com/apache/spark/pull/36531#discussion_r873314783 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala: ## @@ -2117,7 +2265,9 @@ case class Cast( child: Expression, dataType: DataType, timeZoneId: Option[String] = None, -override val ansiEnabled: Boolean = SQLConf.get.ansiEnabled) +override val ansiEnabled: Boolean = SQLConf.get.ansiEnabled, +fallbackConfKey: String = SQLConf.ANSI_ENABLED.key, +fallbackConfValue: String = "false") Review Comment: Can we make it an abstract class so that implementations can override? I'm really worried about changing the class constructor as many spark plugins use `Cast.apply/unapply`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on a diff in pull request #36557: [SPARK-39190][SQL] Provide query context for decimal precision overflow error when WSCG is off
gengliangwang commented on code in PR #36557: URL: https://github.com/apache/spark/pull/36557#discussion_r873307369 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala: ## @@ -128,7 +128,7 @@ case class PromotePrecision(child: Expression) extends UnaryExpression { case class CheckOverflow( Review Comment: Note: we need to change CheckOverflowInSum as well. However, the error context is actually empty even when WSCG is available. I need more time for that. I am making this one to catch up the Spark 3.3 RC2, which is happening soon. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang opened a new pull request, #36557: [SPARK-39190][SQL] Provide query context for decimal precision overflow error when WSCG is off
gengliangwang opened a new pull request, #36557: URL: https://github.com/apache/spark/pull/36557 ### What changes were proposed in this pull request? Similar to https://github.com/apache/spark/pull/36525, this PR provides query context for decimal precision overflow error when WSCG is off ### Why are the changes needed? Enhance the runtime error query context of checking decimal overflow. After changes, it works when the whole stage codegen is not available. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AnywalkerGiser commented on a diff in pull request #36537: [SPARK-39176][PYSPARK][WINDOWS] Fixed a problem with pyspark serializing pre-1970 datetime in windows
AnywalkerGiser commented on code in PR #36537: URL: https://github.com/apache/spark/pull/36537#discussion_r873305033 ## python/pyspark/sql/types.py: ## @@ -191,14 +191,25 @@ def needConversion(self): def toInternal(self, dt): if dt is not None: -seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo - else time.mktime(dt.timetuple())) +seconds = 0.0 +try: +seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo + else time.mktime(dt.timetuple())) +except: Review Comment: Sure, I'll change the test again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on pull request #36056: [SPARK-36571][SQL] Add an SQLOverwriteHadoopMapReduceCommitProtocol to support all SQL overwrite write data to staging dir
AngersZh commented on PR #36056: URL: https://github.com/apache/spark/pull/36056#issuecomment-1127179471 Gentle ping @cloud-fan Could you take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on pull request #35799: [SPARK-38498][STREAM] Support customized StreamingListener by configuration
AngersZh commented on PR #35799: URL: https://github.com/apache/spark/pull/35799#issuecomment-1127178691 Any more suggestion? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #36537: [SPARK-39176][PYSPARK][WINDOWS] Fixed a problem with pyspark serializing pre-1970 datetime in windows
HyukjinKwon commented on PR #36537: URL: https://github.com/apache/spark/pull/36537#issuecomment-1127177497 @AnywalkerGiser mind creating a PR against `master` branch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #36537: [SPARK-39176][PYSPARK][WINDOWS] Fixed a problem with pyspark serializing pre-1970 datetime in windows
HyukjinKwon commented on code in PR #36537: URL: https://github.com/apache/spark/pull/36537#discussion_r873298180 ## python/pyspark/sql/types.py: ## @@ -191,14 +191,25 @@ def needConversion(self): def toInternal(self, dt): if dt is not None: -seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo - else time.mktime(dt.timetuple())) +seconds = 0.0 +try: +seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo + else time.mktime(dt.timetuple())) +except: Review Comment: Can we do this with an if-else with OS and negative value check? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #36537: [SPARK-39176][PYSPARK][WINDOWS] Fixed a problem with pyspark serializing pre-1970 datetime in windows
HyukjinKwon commented on code in PR #36537: URL: https://github.com/apache/spark/pull/36537#discussion_r873297988 ## python/pyspark/tests/test_rdd.py: ## @@ -669,6 +670,12 @@ def test_sample(self): wr_s21 = rdd.sample(True, 0.4, 21).collect() self.assertNotEqual(set(wr_s11), set(wr_s21)) +def test_datetime(self): Review Comment: Should probably add a comment like: ``` SPARK-39176: ... ``` See also https://spark.apache.org/contributing.html -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #36537: [SPARK-39176][PYSPARK][WINDOWS] Fixed a problem with pyspark serializing pre-1970 datetime in windows
HyukjinKwon commented on code in PR #36537: URL: https://github.com/apache/spark/pull/36537#discussion_r873297660 ## python/pyspark/sql/types.py: ## @@ -191,14 +191,25 @@ def needConversion(self): def toInternal(self, dt): if dt is not None: -seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo - else time.mktime(dt.timetuple())) +seconds = 0.0 +try: +seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo + else time.mktime(dt.timetuple())) +except: Review Comment: I think we shouldn't better rely on exception handling for regular data parsing path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #36537: [SPARK-39176][PYSPARK][WINDOWS] Fixed a problem with pyspark serializing pre-1970 datetime in windows
HyukjinKwon commented on code in PR #36537: URL: https://github.com/apache/spark/pull/36537#discussion_r873297554 ## python/pyspark/sql/types.py: ## @@ -191,14 +191,25 @@ def needConversion(self): def toInternal(self, dt): if dt is not None: -seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo - else time.mktime(dt.timetuple())) +seconds = 0.0 +try: +seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo + else time.mktime(dt.timetuple())) +except: +# On Windows, the current value is converted to a timestamp when the current value is less than 1970 +seconds = (dt - datetime.datetime.fromtimestamp(int(time.localtime(0).tm_sec) / 1000)).total_seconds() Review Comment: IIRC 1970 handling issue is not OS specific problem. It would be great if you link some reported issues related to that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a diff in pull request #36550: [SPARK-39187][SQL] Remove `SparkIllegalStateException`
AngersZh commented on code in PR #36550: URL: https://github.com/apache/spark/pull/36550#discussion_r873294811 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala: ## @@ -582,8 +582,8 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { |in operator ${operator.simpleString(SQLConf.get.maxToStringFields)} """.stripMargin) - case _: UnresolvedHint => -throw QueryExecutionErrors.logicalHintOperatorNotRemovedDuringAnalysisError + case _: UnresolvedHint => throw new IllegalStateException( +"Logical hint operator should be removed during analysis.") Review Comment: How about ``` case _: UnresolvedHint => throw new IllegalStateException("Logical hint operator should be removed during analysis.") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer opened a new pull request, #36556: [SPARK-39162][SQL][3.3] Jdbc dialect should decide which function could be pushed down
beliefer opened a new pull request, #36556: URL: https://github.com/apache/spark/pull/36556 ### What changes were proposed in this pull request? This PR used to back port https://github.com/apache/spark/pull/36521 to 3.3 ### Why are the changes needed? Let function push-down more flexible. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? Exists tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AnywalkerGiser commented on pull request #36537: [SPARK-39176][PYSPARK][WINDOWS] Fixed a problem with pyspark serializing pre-1970 datetime in windows
AnywalkerGiser commented on PR #36537: URL: https://github.com/apache/spark/pull/36537#issuecomment-1127149821 Is there a supervisor for approval? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on pull request #36521: [SPARK-39162][SQL] Jdbc dialect should decide which function could be pushed down
beliefer commented on PR #36521: URL: https://github.com/apache/spark/pull/36521#issuecomment-1127146479 @cloud-fan @huaxingao Thank you a lot! I will create back port to 3.3. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org