zhengruifeng commented on code in PR #39622:
URL: https://github.com/apache/spark/pull/39622#discussion_r1071974988


##########
python/pyspark/sql/connect/functions.py:
##########
@@ -799,6 +799,13 @@ def corr(col1: "ColumnOrName", col2: "ColumnOrName") -> 
Column:
 
 
 def count(col: "ColumnOrName") -> Column:
+    # convert count(*), count(col(*)) and count(expr(*)) to count(1)

Review Comment:
   those tests just never pass before, and this issue was found by reusing 
PySpark's tests. (BTW there was a similar fix for `groupby.agg({"*":"count"})` 
here).
   
   1. for `count(expr("*"))`, the star is resolved in 
https://github.com/apache/spark/blob/bf80aa472bcfc3417369aa0787ca7430862d576c/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L1001-L1003
   
   
   the explain("extended") results are:
   
   in Connect:
   ```
   == Parsed Logical Plan ==
   'Project [unresolvedalias('count(*), None), 
unresolvedalias('count('alphabets), None)]
   +- Project [alphabets#3 AS alphabets#5]
      +- LocalRelation [alphabets#3]
   
   == Analyzed Logical Plan ==
   count(alphabets): bigint, count(alphabets): bigint
   Aggregate [count(alphabets#5) AS count(alphabets)#8L, count(alphabets#5) AS 
count(alphabets)#10L]
   +- Project [alphabets#3 AS alphabets#5]
      +- LocalRelation [alphabets#3]
   
   == Optimized Logical Plan ==
   Aggregate [count(alphabets#5) AS count(alphabets)#8L, count(alphabets#5) AS 
count(alphabets)#10L]
   +- LocalRelation [alphabets#5]
   
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- HashAggregate(keys=[], functions=[count(alphabets#5)], 
output=[count(alphabets)#8L, count(alphabets)#10L])
      +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=11]
         +- HashAggregate(keys=[], functions=[partial_count(alphabets#5)], 
output=[count#14L])
            +- LocalTableScan [alphabets#5]
   ```
   
   in PySpark:
   ```
   == Parsed Logical Plan ==
   'Project [unresolvedalias(count(1), 
Some(org.apache.spark.sql.Column$$Lambda$2108/0x0000000801146840@26e26f5a)), 
unresolvedalias(count(alphabets#0), 
Some(org.apache.spark.sql.Column$$Lambda$2108/0x0000000801146840@26e26f5a))]
   +- LogicalRDD [alphabets#0], false
   
   == Analyzed Logical Plan ==
   count(1): bigint, count(alphabets): bigint
   Aggregate [count(1) AS count(1)#17L, count(alphabets#0) AS 
count(alphabets)#18L]
   +- LogicalRDD [alphabets#0], false
   
   == Optimized Logical Plan ==
   Aggregate [count(1) AS count(1)#17L, count(alphabets#0) AS 
count(alphabets)#18L]
   +- LogicalRDD [alphabets#0], false
   
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- HashAggregate(keys=[], functions=[count(1), count(alphabets#0)], 
output=[count(1)#17L, count(alphabets)#18L])
      +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=24]
         +- HashAggregate(keys=[], functions=[partial_count(1), 
partial_count(alphabets#0)], output=[count#23L, count#24L])
            +- Scan ExistingRDD[alphabets#0]
   ```
   
   
   2. For `count(col("*"))` and `count(*)`, they are resolve in 
https://github.com/apache/spark/blob/bf80aa472bcfc3417369aa0787ca7430862d576c/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L699-L701
   
   the trackback is:
   
   ```
   ======================================================================
   ERROR [3.210s]: test_count_star 
(pyspark.sql.tests.connect.test_connect_function.SparkConnectFunctionTests)
   ----------------------------------------------------------------------
   Traceback (most recent call last):
     File 
"/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/tests/connect/test_connect_function.py",
 line 104, in test_count_star
       cdf.select(CF.count(CF.col("*")), CF.count(cdf.alphabets)).collect(),
     File 
"/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/dataframe.py", line 
1243, in collect
       table = self._session.client.to_table(query)
     File 
"/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client.py", line 
415, in to_table
       table, _ = self._execute_and_fetch(req)
     File 
"/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client.py", line 
593, in _execute_and_fetch
       self._handle_error(rpc_error)
     File 
"/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/client.py", line 
632, in _handle_error
       raise SparkConnectAnalysisException(
   pyspark.sql.connect.client.SparkConnectAnalysisException: 
[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name 
`*` cannot be resolved. Did you mean one of the following? [`alphabets`]
   Plan: 'Aggregate [unresolvedalias('count('*), None), count(alphabets#28) AS 
count(alphabets)#31L]
   +- Project [alphabets#26 AS alphabets#28]
      +- LocalRelation [alphabets#26]
   ```
   
   
   3. stars in projection are converted to `unresolved_star` 
https://github.com/apache/spark/blob/4f97111eb07a3a622b6d2691cd713814fd005bca/python/pyspark/sql/connect/plan.py#L889-L892,
 and resolved here 
https://github.com/apache/spark/blob/bf80aa472bcfc3417369aa0787ca7430862d576c/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L1005-L1011
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to