zhengruifeng commented on code in PR #39622:
URL: https://github.com/apache/spark/pull/39622#discussion_r1071974988
##########
python/pyspark/sql/connect/functions.py:
##########
@@ -799,6 +799,13 @@ def corr(col1: "ColumnOrName", col2: "ColumnOrName") ->
Column:
def count(col: "ColumnOrName") -> Column:
+ # convert count(*), count(col(*)) and count(expr(*)) to count(1)
Review Comment:
those tests just never pass before, and this issue was found by reusing
PySpark's tests. (BTW there was a similar fix for `groupby.agg({"*":"count"})`
[here](https://github.com/apache/spark/commit/2d028a2ec19f1a9e41e3b2e893c412bd28ab53a4)).
1. for `count(expr("*"))`, the star is resolved in
https://github.com/apache/spark/blob/bf80aa472bcfc3417369aa0787ca7430862d576c/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L1001-L1003
the explain("extended") results are:
in Connect:
```
== Parsed Logical Plan ==
'Project [unresolvedalias('count(*), None),
unresolvedalias('count('alphabets), None)]
+- Project [alphabets#3 AS alphabets#5]
+- LocalRelation [alphabets#3]
== Analyzed Logical Plan ==
count(alphabets): bigint, count(alphabets): bigint
Aggregate [count(alphabets#5) AS count(alphabets)#8L, count(alphabets#5) AS
count(alphabets)#10L]
+- Project [alphabets#3 AS alphabets#5]
+- LocalRelation [alphabets#3]
== Optimized Logical Plan ==
Aggregate [count(alphabets#5) AS count(alphabets)#8L, count(alphabets#5) AS
count(alphabets)#10L]
+- LocalRelation [alphabets#5]
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(alphabets#5)],
output=[count(alphabets)#8L, count(alphabets)#10L])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=11]
+- HashAggregate(keys=[], functions=[partial_count(alphabets#5)],
output=[count#14L])
+- LocalTableScan [alphabets#5]
```
in PySpark:
```
== Parsed Logical Plan ==
'Project [unresolvedalias(count(1),
Some(org.apache.spark.sql.Column$$Lambda$2108/0x0000000801146840@26e26f5a)),
unresolvedalias(count(alphabets#0),
Some(org.apache.spark.sql.Column$$Lambda$2108/0x0000000801146840@26e26f5a))]
+- LogicalRDD [alphabets#0], false
== Analyzed Logical Plan ==
count(1): bigint, count(alphabets): bigint
Aggregate [count(1) AS count(1)#17L, count(alphabets#0) AS
count(alphabets)#18L]
+- LogicalRDD [alphabets#0], false
== Optimized Logical Plan ==
Aggregate [count(1) AS count(1)#17L, count(alphabets#0) AS
count(alphabets)#18L]
+- LogicalRDD [alphabets#0], false
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(1), count(alphabets#0)],
output=[count(1)#17L, count(alphabets)#18L])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=24]
+- HashAggregate(keys=[], functions=[partial_count(1),
partial_count(alphabets#0)], output=[count#23L, count#24L])
+- Scan ExistingRDD[alphabets#0]
```
2. For `count(col("*"))` and `count(*)`, they are resolve in
https://github.com/apache/spark/blob/bf80aa472bcfc3417369aa0787ca7430862d576c/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L699-L701
the trackback is:
```
======================================================================
ERROR [3.210s]: test_count_star
(pyspark.sql.tests.connect.test_connect_function.SparkConnectFunctionTests)
----------------------------------------------------------------------
Traceback (most recent call last):
File
"/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/tests/connect/test_connect_function.py",
line 104, in test_count_star
cdf.select(CF.count(CF.col("*")), CF.count(cdf.alphabets)).collect(),
File
"/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/dataframe.py", line
1243, in collect
table = self._session.client.to_table(query)
File
"/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client.py", line
415, in to_table
table, _ = self._execute_and_fetch(req)
File
"/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client.py", line
593, in _execute_and_fetch
self._handle_error(rpc_error)
File
"/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client.py", line
632, in _handle_error
raise SparkConnectAnalysisException(
pyspark.sql.connect.client.SparkConnectAnalysisException:
[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name
`*` cannot be resolved. Did you mean one of the following? [`alphabets`]
Plan: 'Aggregate [unresolvedalias('count('*), None), count(alphabets#28) AS
count(alphabets)#31L]
+- Project [alphabets#26 AS alphabets#28]
+- LocalRelation [alphabets#26]
```
3. stars in projection are converted to `unresolved_star`
https://github.com/apache/spark/blob/4f97111eb07a3a622b6d2691cd713814fd005bca/python/pyspark/sql/connect/plan.py#L889-L892,
and resolved here
https://github.com/apache/spark/blob/bf80aa472bcfc3417369aa0787ca7430862d576c/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L1005-L1011
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]