beliefer commented on a change in pull request #31402:
URL: https://github.com/apache/spark/pull/31402#discussion_r568296396
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
##########
@@ -511,14 +511,15 @@ case class Lag(
abstract class AggregateWindowFunction extends DeclarativeAggregate with
WindowFunction {
self: Product =>
- override val frame: WindowFrame = SpecifiedWindowFrame(RowFrame,
UnboundedPreceding, CurrentRow)
Review comment:
Yes. `AggregateWindowFunction` support all frames.
##########
File path: sql/core/src/test/resources/sql-functions/sql-expression-schema.md
##########
@@ -205,7 +205,7 @@
| org.apache.spark.sql.catalyst.expressions.ParseToDate | to_date | SELECT
to_date('2009-07-30 04:17:52') | struct<to_date(2009-07-30 04:17:52):date> |
| org.apache.spark.sql.catalyst.expressions.ParseToTimestamp | to_timestamp |
SELECT to_timestamp('2016-12-31 00:12:00') | struct<to_timestamp(2016-12-31
00:12:00):timestamp> |
| org.apache.spark.sql.catalyst.expressions.ParseUrl | parse_url | SELECT
parse_url('http://spark.apache.org/path?query=1', 'HOST') |
struct<parse_url(http://spark.apache.org/path?query=1, HOST):string> |
-| org.apache.spark.sql.catalyst.expressions.PercentRank | percent_rank |
SELECT a, b, percent_rank(b) OVER (PARTITION BY a ORDER BY b) FROM VALUES
('A1', 2), ('A1', 1), ('A2', 3), ('A1', 1) tab(a, b) |
struct<a:string,b:int,PERCENT_RANK() OVER (PARTITION BY a ORDER BY b ASC NULLS
FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):double> |
+| org.apache.spark.sql.catalyst.expressions.PercentRank | percent_rank |
SELECT a, b, percent_rank(b) OVER (PARTITION BY a ORDER BY b) FROM VALUES
('A1', 2), ('A1', 1), ('A2', 3), ('A1', 1) tab(a, b) |
struct<a:string,b:int,PERCENT_RANK() OVER (PARTITION BY a ORDER BY b ASC NULLS
FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):double> |
Review comment:
Why we change `ROWS` -> `RANGE` ?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
##########
@@ -511,14 +511,15 @@ case class Lag(
abstract class AggregateWindowFunction extends DeclarativeAggregate with
WindowFunction {
self: Product =>
- override val frame: WindowFrame = SpecifiedWindowFrame(RowFrame,
UnboundedPreceding, CurrentRow)
Review comment:
It seems no need to remove the default window frame.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
##########
@@ -528,6 +529,10 @@ abstract class RowNumberLike extends
AggregateWindowFunction {
override val updateExpressions: Seq[Expression] = rowNumber + one :: Nil
}
+trait SpecifiedFrameAggregateWindowFunction extends AggregateWindowFunction {
+ override val frame: WindowFrame = SpecifiedWindowFrame(RowFrame,
UnboundedPreceding, CurrentRow)
+}
Review comment:
It seems we no need this trait. If we want window function support using
specified window frame, we just do like this:
`override val frame: WindowFrame = UnspecifiedFrame`
##########
File path: sql/core/src/test/resources/sql-functions/sql-expression-schema.md
##########
@@ -205,7 +205,7 @@
| org.apache.spark.sql.catalyst.expressions.ParseToDate | to_date | SELECT
to_date('2009-07-30 04:17:52') | struct<to_date(2009-07-30 04:17:52):date> |
| org.apache.spark.sql.catalyst.expressions.ParseToTimestamp | to_timestamp |
SELECT to_timestamp('2016-12-31 00:12:00') | struct<to_timestamp(2016-12-31
00:12:00):timestamp> |
| org.apache.spark.sql.catalyst.expressions.ParseUrl | parse_url | SELECT
parse_url('http://spark.apache.org/path?query=1', 'HOST') |
struct<parse_url(http://spark.apache.org/path?query=1, HOST):string> |
-| org.apache.spark.sql.catalyst.expressions.PercentRank | percent_rank |
SELECT a, b, percent_rank(b) OVER (PARTITION BY a ORDER BY b) FROM VALUES
('A1', 2), ('A1', 1), ('A2', 3), ('A1', 1) tab(a, b) |
struct<a:string,b:int,PERCENT_RANK() OVER (PARTITION BY a ORDER BY b ASC NULLS
FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):double> |
+| org.apache.spark.sql.catalyst.expressions.PercentRank | percent_rank |
SELECT a, b, percent_rank(b) OVER (PARTITION BY a ORDER BY b) FROM VALUES
('A1', 2), ('A1', 1), ('A2', 3), ('A1', 1) tab(a, b) |
struct<a:string,b:int,PERCENT_RANK() OVER (PARTITION BY a ORDER BY b ASC NULLS
FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):double> |
Review comment:
I think not change the origin behavior unless you have the determined
reason.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
##########
@@ -511,14 +511,15 @@ case class Lag(
abstract class AggregateWindowFunction extends DeclarativeAggregate with
WindowFunction {
self: Product =>
- override val frame: WindowFrame = SpecifiedWindowFrame(RowFrame,
UnboundedPreceding, CurrentRow)
Review comment:
It seems no need to remove the default window frame. If subclass of
`AggregateWindowFunction` needs support user specified window frames, the
subclass should `override val frame: WindowFrame = UnspecifiedFrame`
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -3162,6 +3162,9 @@ class Analyzer(override val catalogManager:
CatalogManager)
case WindowExpression(wf: FrameLessOffsetWindowFunction,
WindowSpecDefinition(_, _, f: SpecifiedWindowFrame)) if wf.frame != f
=>
throw
QueryCompilationErrors.cannotSpecifyWindowFrameError(wf.prettyName)
+ case WindowExpression(wf: AggregateWindowFunction, s:
WindowSpecDefinition)
+ if wf.frame != UnspecifiedFrame =>
+ WindowExpression(wf, s.copy(frameSpecification = wf.frame))
Review comment:
It seems duplicate with
https://github.com/apache/spark/blob/a5af58bde544985d152bd974aa18cf0bdbfad2d5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L3171
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -3162,6 +3162,9 @@ class Analyzer(override val catalogManager:
CatalogManager)
case WindowExpression(wf: FrameLessOffsetWindowFunction,
WindowSpecDefinition(_, _, f: SpecifiedWindowFrame)) if wf.frame != f
=>
throw
QueryCompilationErrors.cannotSpecifyWindowFrameError(wf.prettyName)
+ case WindowExpression(wf: AggregateWindowFunction, s:
WindowSpecDefinition)
+ if wf.frame != UnspecifiedFrame =>
+ WindowExpression(wf, s.copy(frameSpecification = wf.frame))
Review comment:
This block is to use the default frame regardless of the specified frame
?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -3170,7 +3170,9 @@ class Analyzer(override val catalogManager:
CatalogManager)
WindowExpression(wf, s.copy(frameSpecification = wf.frame))
case we @ WindowExpression(e, s @ WindowSpecDefinition(_, o,
UnspecifiedFrame))
if e.resolved =>
- val frame = if (o.nonEmpty) {
+ val frame = if (e.isInstanceOf[RankLike]) {
+ SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow)
Review comment:
It seems we should investigate the behavior of other database.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]