This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e2f65b5316e [SPARK-42527][CONNECT] Scala Client add Window functions e2f65b5316e is described below commit e2f65b5316ed1473518e2d79e89c9bed756029e9 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Wed Feb 22 10:53:15 2023 -0400 [SPARK-42527][CONNECT] Scala Client add Window functions ### What changes were proposed in this pull request? This PR aims add the window functions to the Scala spark connect client. ### Why are the changes needed? Provide same APIs in the Scala spark connect client as in the original Dataset API. ### Does this PR introduce _any_ user-facing change? Yes, it adds new for functions to the Spark Connect Scala client. ### How was this patch tested? - Add new test - Manual checked connect-client-jvm and connect with Scala-2.13 Closes #40120 from LuciferYang/window-functions. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../scala/org/apache/spark/sql/functions.scala | 256 ++++++++++++++++++++- .../org/apache/spark/sql/FunctionTestSuite.scala | 14 ++ .../apache/spark/sql/PlanGenerationTestSuite.scala | 39 ++++ .../explain-results/function_cume_dist.explain | 5 + .../explain-results/function_dense_rank.explain | 5 + .../explain-results/function_lag.explain | 5 + .../explain-results/function_lead.explain | 5 + .../explain-results/function_nth_value.explain | 5 + .../explain-results/function_ntile.explain | 5 + .../explain-results/function_percent_rank.explain | 5 + .../explain-results/function_rank.explain | 5 + .../explain-results/function_row_number.explain | 5 + .../query-tests/queries/function_cume_dist.json | 32 +++ .../queries/function_cume_dist.proto.bin | 7 + .../query-tests/queries/function_dense_rank.json | 32 +++ .../queries/function_dense_rank.proto.bin | 8 + .../query-tests/queries/function_lag.json | 52 +++++ .../query-tests/queries/function_lag.proto.bin | Bin 0 -> 209 bytes .../query-tests/queries/function_lead.json | 49 ++++ .../query-tests/queries/function_lead.proto.bin | 11 + .../query-tests/queries/function_nth_value.json | 45 ++++ .../queries/function_nth_value.proto.bin | 10 + .../query-tests/queries/function_ntile.json | 37 +++ .../query-tests/queries/function_ntile.proto.bin | 8 + .../query-tests/queries/function_percent_rank.json | 32 +++ .../queries/function_percent_rank.proto.bin | 7 + .../query-tests/queries/function_rank.json | 32 +++ .../query-tests/queries/function_rank.proto.bin | 7 + .../query-tests/queries/function_row_number.json | 32 +++ .../queries/function_row_number.proto.bin | 8 + .../sql/connect/planner/SparkConnectPlanner.scala | 20 ++ 31 files changed, 782 insertions(+), 1 deletion(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala index 4996b5033e3..0fd35b570f8 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala @@ -100,6 +100,9 @@ object functions { .setValue(value) } + private val nullType = + proto.DataType.newBuilder().setNull(proto.DataType.NULL.getDefaultInstance).build() + /** * Creates a [[Column]] of literal value. * @@ -129,7 +132,7 @@ object functions { case v: Array[Byte] => createLiteral(_.setBinary(ByteString.copyFrom(v))) case v: collection.mutable.WrappedArray[_] => lit(v.array) case v: LocalDate => createLiteral(_.setDate(v.toEpochDay.toInt)) - case null => unsupported("Null literals not supported yet.") + case null => createLiteral(_.setNull(nullType)) case _ => unsupported(s"literal $literal not supported (yet).") } } @@ -895,6 +898,257 @@ object functions { */ def var_pop(columnName: String): Column = var_pop(Column(columnName)) + ////////////////////////////////////////////////////////////////////////////////////////////// + // Window functions + ////////////////////////////////////////////////////////////////////////////////////////////// + + /** + * Window function: returns the cumulative distribution of values within a window partition, + * i.e. the fraction of rows that are below the current row. + * + * {{{ + * N = total number of rows in the partition + * cumeDist(x) = number of values before (and including) x / N + * }}} + * + * @group window_funcs + * @since 3.4.0 + */ + def cume_dist(): Column = Column.fn("cume_dist") + + /** + * Window function: returns the rank of rows within a window partition, without any gaps. + * + * The difference between rank and dense_rank is that denseRank leaves no gaps in ranking + * sequence when there are ties. That is, if you were ranking a competition using dense_rank and + * had three people tie for second place, you would say that all three were in second place and + * that the next person came in third. Rank would give me sequential numbers, making the person + * that came in third place (after the ties) would register as coming in fifth. + * + * This is equivalent to the DENSE_RANK function in SQL. + * + * @group window_funcs + * @since 3.4.0 + */ + def dense_rank(): Column = Column.fn("dense_rank") + + /** + * Window function: returns the value that is `offset` rows before the current row, and `null` + * if there is less than `offset` rows before the current row. For example, an `offset` of one + * will return the previous row at any given point in the window partition. + * + * This is equivalent to the LAG function in SQL. + * + * @group window_funcs + * @since 3.4.0 + */ + def lag(e: Column, offset: Int): Column = lag(e, offset, null) + + /** + * Window function: returns the value that is `offset` rows before the current row, and `null` + * if there is less than `offset` rows before the current row. For example, an `offset` of one + * will return the previous row at any given point in the window partition. + * + * This is equivalent to the LAG function in SQL. + * + * @group window_funcs + * @since 3.4.0 + */ + def lag(columnName: String, offset: Int): Column = lag(columnName, offset, null) + + /** + * Window function: returns the value that is `offset` rows before the current row, and + * `defaultValue` if there is less than `offset` rows before the current row. For example, an + * `offset` of one will return the previous row at any given point in the window partition. + * + * This is equivalent to the LAG function in SQL. + * + * @group window_funcs + * @since 3.4.0 + */ + def lag(columnName: String, offset: Int, defaultValue: Any): Column = { + lag(Column(columnName), offset, defaultValue) + } + + /** + * Window function: returns the value that is `offset` rows before the current row, and + * `defaultValue` if there is less than `offset` rows before the current row. For example, an + * `offset` of one will return the previous row at any given point in the window partition. + * + * This is equivalent to the LAG function in SQL. + * + * @group window_funcs + * @since 3.4.0 + */ + def lag(e: Column, offset: Int, defaultValue: Any): Column = { + lag(e, offset, defaultValue, false) + } + + /** + * Window function: returns the value that is `offset` rows before the current row, and + * `defaultValue` if there is less than `offset` rows before the current row. `ignoreNulls` + * determines whether null values of row are included in or eliminated from the calculation. For + * example, an `offset` of one will return the previous row at any given point in the window + * partition. + * + * This is equivalent to the LAG function in SQL. + * + * @group window_funcs + * @since 3.4.0 + */ + def lag(e: Column, offset: Int, defaultValue: Any, ignoreNulls: Boolean): Column = + Column.fn("lag", e, lit(offset), lit(defaultValue), lit(ignoreNulls)) + + /** + * Window function: returns the value that is `offset` rows after the current row, and `null` if + * there is less than `offset` rows after the current row. For example, an `offset` of one will + * return the next row at any given point in the window partition. + * + * This is equivalent to the LEAD function in SQL. + * + * @group window_funcs + * @since 3.4.0 + */ + def lead(columnName: String, offset: Int): Column = { + lead(columnName, offset, null) + } + + /** + * Window function: returns the value that is `offset` rows after the current row, and `null` if + * there is less than `offset` rows after the current row. For example, an `offset` of one will + * return the next row at any given point in the window partition. + * + * This is equivalent to the LEAD function in SQL. + * + * @group window_funcs + * @since 3.4.0 + */ + def lead(e: Column, offset: Int): Column = { + lead(e, offset, null) + } + + /** + * Window function: returns the value that is `offset` rows after the current row, and + * `defaultValue` if there is less than `offset` rows after the current row. For example, an + * `offset` of one will return the next row at any given point in the window partition. + * + * This is equivalent to the LEAD function in SQL. + * + * @group window_funcs + * @since 3.4.0 + */ + def lead(columnName: String, offset: Int, defaultValue: Any): Column = { + lead(Column(columnName), offset, defaultValue) + } + + /** + * Window function: returns the value that is `offset` rows after the current row, and + * `defaultValue` if there is less than `offset` rows after the current row. For example, an + * `offset` of one will return the next row at any given point in the window partition. + * + * This is equivalent to the LEAD function in SQL. + * + * @group window_funcs + * @since 3.4.0 + */ + def lead(e: Column, offset: Int, defaultValue: Any): Column = { + lead(e, offset, defaultValue, false) + } + + /** + * Window function: returns the value that is `offset` rows after the current row, and + * `defaultValue` if there is less than `offset` rows after the current row. `ignoreNulls` + * determines whether null values of row are included in or eliminated from the calculation. The + * default value of `ignoreNulls` is false. For example, an `offset` of one will return the next + * row at any given point in the window partition. + * + * This is equivalent to the LEAD function in SQL. + * + * @group window_funcs + * @since 3.4.0 + */ + def lead(e: Column, offset: Int, defaultValue: Any, ignoreNulls: Boolean): Column = + Column.fn("lead", e, lit(offset), lit(defaultValue), lit(ignoreNulls)) + + /** + * Window function: returns the value that is the `offset`th row of the window frame (counting + * from 1), and `null` if the size of window frame is less than `offset` rows. + * + * It will return the `offset`th non-null value it sees when ignoreNulls is set to true. If all + * values are null, then null is returned. + * + * This is equivalent to the nth_value function in SQL. + * + * @group window_funcs + * @since 3.4.0 + */ + def nth_value(e: Column, offset: Int, ignoreNulls: Boolean): Column = + Column.fn("nth_value", e, lit(offset), lit(ignoreNulls)) + + /** + * Window function: returns the value that is the `offset`th row of the window frame (counting + * from 1), and `null` if the size of window frame is less than `offset` rows. + * + * This is equivalent to the nth_value function in SQL. + * + * @group window_funcs + * @since 3.4.0 + */ + def nth_value(e: Column, offset: Int): Column = + Column.fn("nth_value", e, lit(offset)) + + /** + * Window function: returns the ntile group id (from 1 to `n` inclusive) in an ordered window + * partition. For example, if `n` is 4, the first quarter of the rows will get value 1, the + * second quarter will get 2, the third quarter will get 3, and the last quarter will get 4. + * + * This is equivalent to the NTILE function in SQL. + * + * @group window_funcs + * @since 3.4.0 + */ + def ntile(n: Int): Column = Column.fn("ntile", lit(n)) + + /** + * Window function: returns the relative rank (i.e. percentile) of rows within a window + * partition. + * + * This is computed by: + * {{{ + * (rank of row in its partition - 1) / (number of rows in the partition - 1) + * }}} + * + * This is equivalent to the PERCENT_RANK function in SQL. + * + * @group window_funcs + * @since 3.4.0 + */ + def percent_rank(): Column = Column.fn("percent_rank") + + /** + * Window function: returns the rank of rows within a window partition. + * + * The difference between rank and dense_rank is that dense_rank leaves no gaps in ranking + * sequence when there are ties. That is, if you were ranking a competition using dense_rank and + * had three people tie for second place, you would say that all three were in second place and + * that the next person came in third. Rank would give me sequential numbers, making the person + * that came in third place (after the ties) would register as coming in fifth. + * + * This is equivalent to the RANK function in SQL. + * + * @group window_funcs + * @since 3.4.0 + */ + def rank(): Column = Column.fn("rank") + + /** + * Window function: returns a sequential number starting at 1 within a window partition. + * + * @group window_funcs + * @since 3.4.0 + */ + def row_number(): Column = Column.fn("row_number") + ////////////////////////////////////////////////////////////////////////////////////////////// // Non-aggregate functions ////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala index d600ac432a2..f9118b93ec5 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala @@ -171,6 +171,20 @@ class FunctionTestSuite extends ConnectFunSuite { window(a, "10 seconds")) testEquals("session_window", session_window(a, "1 second"), session_window(a, lit("1 second"))) testEquals("bucket", bucket(lit(3), a), bucket(3, a)) + testEquals( + "lag", + lag(a, 1), + lag("a", 1), + lag(a, 1, null), + lag("a", 1, null), + lag(a, 1, null, false)) + testEquals( + "lead", + lead(a, 2), + lead("a", 2), + lead(a, 2, null), + lead("a", 2, null), + lead(a, 2, null, false)) test("assert_true no message") { val e = assert_true(a).expr diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala index 4cd7bfa0887..42572f8427e 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala @@ -1407,6 +1407,45 @@ class PlanGenerationTestSuite extends ConnectFunSuite with BeforeAndAfterAll wit fn.bucket(3, Column("a")) } + functionTest("cume_dist") { + fn.cume_dist().over(Window.partitionBy(Column("a")).orderBy(Column("id"))) + } + + functionTest("dense_rank") { + fn.dense_rank().over(Window.partitionBy(Column("a")).orderBy(Column("id"))) + } + + functionTest("lag") { + fn.lag(Column("g"), 1, null, ignoreNulls = true) + .over(Window.partitionBy(Column("a")).orderBy(Column("id"))) + } + + functionTest("lead") { + fn.lead(Column("g"), 2, "dv", ignoreNulls = true) + .over(Window.partitionBy(Column("a")).orderBy(Column("id"))) + } + + functionTest("nth_value") { + fn.nth_value(Column("g"), 3, ignoreNulls = true) + .over(Window.partitionBy(Column("a")).orderBy(Column("id"))) + } + + functionTest("ntile") { + fn.ntile(4).over(Window.partitionBy(Column("a")).orderBy(Column("id"))) + } + + functionTest("percent_rank") { + fn.percent_rank().over(Window.partitionBy(Column("a")).orderBy(Column("id"))) + } + + functionTest("rank") { + fn.rank().over(Window.partitionBy(Column("a")).orderBy(Column("id"))) + } + + functionTest("row_number") { + fn.row_number().over(Window.partitionBy(Column("a")).orderBy(Column("id"))) + } + private def temporalFunctionTest(name: String)(f: => Column): Unit = { test("function " + name) { temporals.select(f) diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_cume_dist.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_cume_dist.explain new file mode 100644 index 00000000000..4f15f83bb9f --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_cume_dist.explain @@ -0,0 +1,5 @@ +Project [cume_dist() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0] ++- Project [a#0, id#0L, cume_dist() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0, cume_dist() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0] + +- Window [cume_dist() windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS cume_dist() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0], [a#0], [id#0L ASC NULLS FIRST] + +- Project [a#0, id#0L] + +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_dense_rank.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_dense_rank.explain new file mode 100644 index 00000000000..0cce71ad1d8 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_dense_rank.explain @@ -0,0 +1,5 @@ +Project [DENSE_RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0] ++- Project [id#0L, a#0, DENSE_RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0, DENSE_RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0] + +- Window [dense_rank(id#0L) windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS DENSE_RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0], [a#0], [id#0L ASC NULLS FIRST] + +- Project [id#0L, a#0] + +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_lag.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_lag.explain new file mode 100644 index 00000000000..6d9d4e706ec --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_lag.explain @@ -0,0 +1,5 @@ +Project [lag(g, 1, NULL) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN -1 FOLLOWING AND -1 FOLLOWING)#0] ++- Project [g#0, a#0, id#0L, lag(g, 1, NULL) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN -1 FOLLOWING AND -1 FOLLOWING)#0, lag(g, 1, NULL) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN -1 FOLLOWING AND -1 FOLLOWING)#0] + +- Window [lag(g#0, -1, null) windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS lag(g, 1, NULL) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN -1 FOLLOWING AND -1 FOLLOWING)#0], [a#0], [id#0L ASC NULLS FIRST] + +- Project [g#0, a#0, id#0L] + +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_lead.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_lead.explain new file mode 100644 index 00000000000..6c8ce180b79 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_lead.explain @@ -0,0 +1,5 @@ +Project [lead(g, 2, dv) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN 2 FOLLOWING AND 2 FOLLOWING)#0] ++- Project [g#0, a#0, id#0L, lead(g, 2, dv) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN 2 FOLLOWING AND 2 FOLLOWING)#0, lead(g, 2, dv) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN 2 FOLLOWING AND 2 FOLLOWING)#0] + +- Window [lead(g#0, 2, dv) windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, 2, 2)) AS lead(g, 2, dv) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN 2 FOLLOWING AND 2 FOLLOWING)#0], [a#0], [id#0L ASC NULLS FIRST] + +- Project [g#0, a#0, id#0L] + +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_nth_value.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_nth_value.explain new file mode 100644 index 00000000000..69eb7872d52 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_nth_value.explain @@ -0,0 +1,5 @@ +Project [nth_value(g, 3) ignore nulls OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0] ++- Project [g#0, a#0, id#0L, nth_value(g, 3) ignore nulls OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0, nth_value(g, 3) ignore nulls OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0] + +- Window [nth_value(g#0, 3, true) windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS nth_value(g, 3) ignore nulls OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0], [a#0], [id#0L ASC NULLS FIRST] + +- Project [g#0, a#0, id#0L] + +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_ntile.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_ntile.explain new file mode 100644 index 00000000000..349ac7bbe8b --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_ntile.explain @@ -0,0 +1,5 @@ +Project [ntile(4) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0] ++- Project [a#0, id#0L, ntile(4) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0, ntile(4) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0] + +- Window [ntile(4) windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS ntile(4) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0], [a#0], [id#0L ASC NULLS FIRST] + +- Project [a#0, id#0L] + +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_percent_rank.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_percent_rank.explain new file mode 100644 index 00000000000..012931bd2aa --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_percent_rank.explain @@ -0,0 +1,5 @@ +Project [PERCENT_RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0] ++- Project [id#0L, a#0, PERCENT_RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0, PERCENT_RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0] + +- Window [percent_rank(id#0L) windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS PERCENT_RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0], [a#0], [id#0L ASC NULLS FIRST] + +- Project [id#0L, a#0] + +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_rank.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_rank.explain new file mode 100644 index 00000000000..b8d4b5ee756 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_rank.explain @@ -0,0 +1,5 @@ +Project [RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0] ++- Project [id#0L, a#0, RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0, RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0] + +- Window [rank(id#0L) windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0], [a#0], [id#0L ASC NULLS FIRST] + +- Project [id#0L, a#0] + +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_row_number.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_row_number.explain new file mode 100644 index 00000000000..d0c817f8894 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_row_number.explain @@ -0,0 +1,5 @@ +Project [row_number() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0] ++- Project [a#0, id#0L, row_number() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0, row_number() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0] + +- Window [row_number() windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_number() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0], [a#0], [id#0L ASC NULLS FIRST] + +- Project [a#0, id#0L] + +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_cume_dist.json b/connector/connect/common/src/test/resources/query-tests/queries/function_cume_dist.json new file mode 100644 index 00000000000..4e22d94aa30 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_cume_dist.json @@ -0,0 +1,32 @@ +{ + "project": { + "input": { + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "window": { + "windowFunction": { + "unresolvedFunction": { + "functionName": "cume_dist" + } + }, + "partitionSpec": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }], + "orderSpec": [{ + "child": { + "unresolvedAttribute": { + "unparsedIdentifier": "id" + } + }, + "direction": "SORT_DIRECTION_ASCENDING", + "nullOrdering": "SORT_NULLS_FIRST" + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_cume_dist.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_cume_dist.proto.bin new file mode 100644 index 00000000000..fe9c87a0a03 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_cume_dist.proto.bin @@ -0,0 +1,7 @@ +� +�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>&Z$ + + cume_dist +a + +id \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_dense_rank.json b/connector/connect/common/src/test/resources/query-tests/queries/function_dense_rank.json new file mode 100644 index 00000000000..3cc81b32613 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_dense_rank.json @@ -0,0 +1,32 @@ +{ + "project": { + "input": { + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "window": { + "windowFunction": { + "unresolvedFunction": { + "functionName": "dense_rank" + } + }, + "partitionSpec": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }], + "orderSpec": [{ + "child": { + "unresolvedAttribute": { + "unparsedIdentifier": "id" + } + }, + "direction": "SORT_DIRECTION_ASCENDING", + "nullOrdering": "SORT_NULLS_FIRST" + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_dense_rank.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_dense_rank.proto.bin new file mode 100644 index 00000000000..2df47f4ce65 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_dense_rank.proto.bin @@ -0,0 +1,8 @@ +� +�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>'Z% + + +dense_rank +a + +id \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_lag.json b/connector/connect/common/src/test/resources/query-tests/queries/function_lag.json new file mode 100644 index 00000000000..ee529f00dc5 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_lag.json @@ -0,0 +1,52 @@ +{ + "project": { + "input": { + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "window": { + "windowFunction": { + "unresolvedFunction": { + "functionName": "lag", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "g" + } + }, { + "literal": { + "integer": 1 + } + }, { + "literal": { + "null": { + "null": { + } + } + } + }, { + "literal": { + "boolean": true + } + }] + } + }, + "partitionSpec": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }], + "orderSpec": [{ + "child": { + "unresolvedAttribute": { + "unparsedIdentifier": "id" + } + }, + "direction": "SORT_DIRECTION_ASCENDING", + "nullOrdering": "SORT_NULLS_FIRST" + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_lag.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_lag.proto.bin new file mode 100644 index 00000000000..908b872ec53 Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_lag.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_lead.json b/connector/connect/common/src/test/resources/query-tests/queries/function_lead.json new file mode 100644 index 00000000000..8c38eec6daf --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_lead.json @@ -0,0 +1,49 @@ +{ + "project": { + "input": { + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "window": { + "windowFunction": { + "unresolvedFunction": { + "functionName": "lead", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "g" + } + }, { + "literal": { + "integer": 2 + } + }, { + "literal": { + "string": "dv" + } + }, { + "literal": { + "boolean": true + } + }] + } + }, + "partitionSpec": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }], + "orderSpec": [{ + "child": { + "unresolvedAttribute": { + "unparsedIdentifier": "id" + } + }, + "direction": "SORT_DIRECTION_ASCENDING", + "nullOrdering": "SORT_NULLS_FIRST" + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_lead.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_lead.proto.bin new file mode 100644 index 00000000000..45f9e784037 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_lead.proto.bin @@ -0,0 +1,11 @@ +� +�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string><Z: +#! +lead +g +0 +jdv + +a + +id \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_nth_value.json b/connector/connect/common/src/test/resources/query-tests/queries/function_nth_value.json new file mode 100644 index 00000000000..4d14c28dfcf --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_nth_value.json @@ -0,0 +1,45 @@ +{ + "project": { + "input": { + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "window": { + "windowFunction": { + "unresolvedFunction": { + "functionName": "nth_value", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "g" + } + }, { + "literal": { + "integer": 3 + } + }, { + "literal": { + "boolean": true + } + }] + } + }, + "partitionSpec": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }], + "orderSpec": [{ + "child": { + "unresolvedAttribute": { + "unparsedIdentifier": "id" + } + }, + "direction": "SORT_DIRECTION_ASCENDING", + "nullOrdering": "SORT_NULLS_FIRST" + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_nth_value.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_nth_value.proto.bin new file mode 100644 index 00000000000..51d26e4c70b --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_nth_value.proto.bin @@ -0,0 +1,10 @@ +� +�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>9Z7 + + nth_value +g +0 + +a + +id \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_ntile.json b/connector/connect/common/src/test/resources/query-tests/queries/function_ntile.json new file mode 100644 index 00000000000..1cd06b27791 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_ntile.json @@ -0,0 +1,37 @@ +{ + "project": { + "input": { + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "window": { + "windowFunction": { + "unresolvedFunction": { + "functionName": "ntile", + "arguments": [{ + "literal": { + "integer": 4 + } + }] + } + }, + "partitionSpec": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }], + "orderSpec": [{ + "child": { + "unresolvedAttribute": { + "unparsedIdentifier": "id" + } + }, + "direction": "SORT_DIRECTION_ASCENDING", + "nullOrdering": "SORT_NULLS_FIRST" + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_ntile.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_ntile.proto.bin new file mode 100644 index 00000000000..6fec68ebb19 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_ntile.proto.bin @@ -0,0 +1,8 @@ +� +�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>(Z& + +ntile +0 +a + +id \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_percent_rank.json b/connector/connect/common/src/test/resources/query-tests/queries/function_percent_rank.json new file mode 100644 index 00000000000..3119e2af8b7 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_percent_rank.json @@ -0,0 +1,32 @@ +{ + "project": { + "input": { + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "window": { + "windowFunction": { + "unresolvedFunction": { + "functionName": "percent_rank" + } + }, + "partitionSpec": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }], + "orderSpec": [{ + "child": { + "unresolvedAttribute": { + "unparsedIdentifier": "id" + } + }, + "direction": "SORT_DIRECTION_ASCENDING", + "nullOrdering": "SORT_NULLS_FIRST" + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_percent_rank.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_percent_rank.proto.bin new file mode 100644 index 00000000000..594de0f59b7 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_percent_rank.proto.bin @@ -0,0 +1,7 @@ +� +�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>)Z' + +percent_rank +a + +id \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_rank.json b/connector/connect/common/src/test/resources/query-tests/queries/function_rank.json new file mode 100644 index 00000000000..ab199e7c6d2 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_rank.json @@ -0,0 +1,32 @@ +{ + "project": { + "input": { + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "window": { + "windowFunction": { + "unresolvedFunction": { + "functionName": "rank" + } + }, + "partitionSpec": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }], + "orderSpec": [{ + "child": { + "unresolvedAttribute": { + "unparsedIdentifier": "id" + } + }, + "direction": "SORT_DIRECTION_ASCENDING", + "nullOrdering": "SORT_NULLS_FIRST" + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_rank.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_rank.proto.bin new file mode 100644 index 00000000000..d15a9c6d0d6 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_rank.proto.bin @@ -0,0 +1,7 @@ +� +�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>!Z + +rank +a + +id \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_row_number.json b/connector/connect/common/src/test/resources/query-tests/queries/function_row_number.json new file mode 100644 index 00000000000..2185a41e2fb --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_row_number.json @@ -0,0 +1,32 @@ +{ + "project": { + "input": { + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "window": { + "windowFunction": { + "unresolvedFunction": { + "functionName": "row_number" + } + }, + "partitionSpec": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }], + "orderSpec": [{ + "child": { + "unresolvedAttribute": { + "unparsedIdentifier": "id" + } + }, + "direction": "SORT_DIRECTION_ASCENDING", + "nullOrdering": "SORT_NULLS_FIRST" + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_row_number.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_row_number.proto.bin new file mode 100644 index 00000000000..080d1fc35b8 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_row_number.proto.bin @@ -0,0 +1,8 @@ +� +�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>'Z% + + +row_number +a + +id \ No newline at end of file diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 4a02ab66ea8..a14d3632d28 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -974,6 +974,26 @@ class SparkConnectPlanner(val session: SparkSession) { } Some(NthValue(children(0), children(1), ignoreNulls)) + case "lag" if fun.getArgumentsCount == 4 => + // Lag does not have a constructor which accepts Expression typed 'ignoreNulls' + val children = fun.getArgumentsList.asScala.toSeq.map(transformExpression) + val ignoreNulls = children.last match { + case Literal(bool: Boolean, BooleanType) => bool + case other => + throw InvalidPlanInput(s"ignoreNulls should be a literal boolean, but got $other") + } + Some(Lag(children.head, children(1), children(2), ignoreNulls)) + + case "lead" if fun.getArgumentsCount == 4 => + // Lead does not have a constructor which accepts Expression typed 'ignoreNulls' + val children = fun.getArgumentsList.asScala.toSeq.map(transformExpression) + val ignoreNulls = children.last match { + case Literal(bool: Boolean, BooleanType) => bool + case other => + throw InvalidPlanInput(s"ignoreNulls should be a literal boolean, but got $other") + } + Some(Lead(children.head, children(1), children(2), ignoreNulls)) + case "window" if 2 <= fun.getArgumentsCount && fun.getArgumentsCount <= 4 => val children = fun.getArgumentsList.asScala.toSeq.map(transformExpression) val timeCol = children.head --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org