allisonwang-db commented on code in PR #48664:
URL: https://github.com/apache/spark/pull/48664#discussion_r1830254768


##########
sql/api/src/main/scala/org/apache/spark/sql/internal/columnNodes.scala:
##########
@@ -167,6 +167,16 @@ private[sql] object UnresolvedAttribute {
     apply(unparsedIdentifier, None, false, CurrentOrigin.get)
 }
 
+private[sql] case class LazyOuterReference(

Review Comment:
   Let's add a javadoc here explaining this node?



##########
python/pyspark/sql/dataframe.py:
##########
@@ -6476,6 +6476,153 @@ def transpose(self, indexColumn: 
Optional["ColumnOrName"] = None) -> "DataFrame"
         """
         ...
 
+    def scalar(self) -> Column:
+        """
+        Return a `Column` object for a SCALAR Subquery containing exactly one 
row and one column.
+
+        The `scalar()` method is useful for extracting a `Column` object that 
represents a scalar
+        value from a DataFrame, especially when the DataFrame results from an 
aggregation or
+        single-value computation. This returned `Column` can then be used 
directly in `select`
+        clauses or as predicates in filters on the outer DataFrame, enabling 
dynamic data filtering
+        and calculations based on scalar values.
+
+        .. versionadded:: 4.0.0
+
+        Returns
+        -------
+        :class:`Column`
+            A `Column` object representing a SCALAR subquery.
+
+        Examples
+        --------
+        Setup a sample DataFrame.
+
+        >>> data = [
+        ...     (1, "Alice", 45000, 101), (2, "Bob", 54000, 101), (3, 
"Charlie", 29000, 102),
+        ...     (4, "David", 61000, 102), (5, "Eve", 48000, 101),
+        ... ]
+        >>> employees = spark.createDataFrame(data, ["id", "name", "salary", 
"department_id"])
+
+        Example 1: Filter for employees with salary greater than the average 
salary.

Review Comment:
   Can we also mention here this is a non-correlated example?



##########
python/pyspark/sql/dataframe.py:
##########
@@ -6476,6 +6476,153 @@ def transpose(self, indexColumn: 
Optional["ColumnOrName"] = None) -> "DataFrame"
         """
         ...
 
+    def scalar(self) -> Column:
+        """
+        Return a `Column` object for a SCALAR Subquery containing exactly one 
row and one column.
+
+        The `scalar()` method is useful for extracting a `Column` object that 
represents a scalar
+        value from a DataFrame, especially when the DataFrame results from an 
aggregation or
+        single-value computation. This returned `Column` can then be used 
directly in `select`
+        clauses or as predicates in filters on the outer DataFrame, enabling 
dynamic data filtering
+        and calculations based on scalar values.
+
+        .. versionadded:: 4.0.0
+
+        Returns
+        -------
+        :class:`Column`
+            A `Column` object representing a SCALAR subquery.
+
+        Examples
+        --------
+        Setup a sample DataFrame.
+
+        >>> data = [
+        ...     (1, "Alice", 45000, 101), (2, "Bob", 54000, 101), (3, 
"Charlie", 29000, 102),
+        ...     (4, "David", 61000, 102), (5, "Eve", 48000, 101),
+        ... ]
+        >>> employees = spark.createDataFrame(data, ["id", "name", "salary", 
"department_id"])
+
+        Example 1: Filter for employees with salary greater than the average 
salary.
+
+        >>> from pyspark.sql import functions as sf
+        >>> employees.where(
+        ...     sf.col("salary") > employees.select(sf.avg("salary")).scalar()
+        ... ).select("name", "salary", "department_id").show()
+        +-----+------+-------------+
+        | name|salary|department_id|
+        +-----+------+-------------+
+        |  Bob| 54000|          101|
+        |David| 61000|          102|
+        |  Eve| 48000|          101|
+        +-----+------+-------------+
+
+        Example 2: Filter for employees with salary greater than the average 
salary in their
+        department.
+
+        >>> from pyspark.sql import functions as sf
+        >>> employees.where(
+        ...     sf.col("salary")
+        ...     > employees.where(sf.col("department_id") == 
sf.col("department_id").outer())
+        ...         .select(sf.avg("salary")).scalar()
+        ... ).select("name", "salary", "department_id").show()
+        +-----+------+-------------+
+        | name|salary|department_id|
+        +-----+------+-------------+
+        |  Bob| 54000|          101|
+        |David| 61000|          102|
+        +-----+------+-------------+
+
+        Example 3: Select the name, salary, and the proportion of the salary 
in the department.
+
+        >>> from pyspark.sql import functions as sf
+        >>> employees.select(
+        ...     "name", "salary", "department_id",
+        ...     sf.format_number(
+        ...         sf.lit(100) * sf.col("salary") /
+        ...             employees.where(sf.col("department_id") == 
sf.col("department_id").outer())
+        ...             .select(sf.sum("salary")).scalar().alias("avg_salary"),
+        ...         1
+        ...     ).alias("salary_proportion_in_department")
+        ... ).show()
+        +-------+------+-------------+-------------------------------+
+        |   name|salary|department_id|salary_proportion_in_department|
+        +-------+------+-------------+-------------------------------+
+        |  Alice| 45000|          101|                           30.6|
+        |    Bob| 54000|          101|                           36.7|
+        |Charlie| 29000|          102|                           32.2|
+        |    Eve| 48000|          101|                           32.7|
+        |  David| 61000|          102|                           67.8|
+        +-------+------+-------------+-------------------------------+
+        """
+        ...
+
+    def exists(self) -> Column:
+        """
+        Return a `Column` object for an EXISTS Subquery.
+
+        The `exists` method provides a way to create a boolean column that 
checks for the presence
+        of related records in a subquery. When applied within a `DataFrame`, 
this method allows you
+        to filter rows based on whether matching records exist in the related 
dataset. The resulting
+        `Column` object can be used directly in filtering conditions or as a 
computed column.
+
+        .. versionadded:: 4.0.0
+
+        Returns
+        -------
+        :class:`Column`
+            A `Column` object representing an EXISTS subquery
+
+        Examples
+        --------
+        Setup sample data for customers and orders.
+
+        >>> data_customers = [
+        ...     (101, "Alice", "USA"), (102, "Bob", "Canada"), (103, 
"Charlie", "USA"),
+        ...     (104, "David", "Australia")
+        ... ]
+        >>> data_orders = [
+        ...     (1, 101, "2023-01-15", 250), (2, 102, "2023-01-20", 300),
+        ...     (3, 103, "2023-01-25", 400), (4, 101, "2023-02-05", 150)
+        ... ]
+        >>> customers = spark.createDataFrame(
+        ...     data_customers, ["customer_id", "customer_name", "country"])
+        >>> orders = spark.createDataFrame(
+        ...     data_orders, ["order_id", "customer_id", "order_date", 
"total_amount"])
+
+        Example 1: Filter for customers who have placed at least one order.
+
+        >>> from pyspark.sql import functions as sf
+        >>> customers.where(
+        ...     orders.where(sf.col("customer_id") == 
sf.col("customer_id").outer()).exists()

Review Comment:
   Does it work with NOT EXISTS?



##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -106,8 +111,13 @@ private[sql] object Dataset {
     sparkSession.withActive {
       val qe = new QueryExecution(
         sparkSession, logicalPlan, shuffleCleanupMode = shuffleCleanupMode)
-      qe.assertAnalyzed()
-      new Dataset[Row](qe, RowEncoder.encoderFor(qe.analyzed.schema))
+      val encoder = if (qe.isLazyAnalysis) {
+        RowEncoder.encoderFor(new StructType())

Review Comment:
   I wonder if we can add a test to trigger this case



##########
python/pyspark/sql/dataframe.py:
##########
@@ -6476,6 +6476,153 @@ def transpose(self, indexColumn: 
Optional["ColumnOrName"] = None) -> "DataFrame"
         """
         ...
 
+    def scalar(self) -> Column:
+        """
+        Return a `Column` object for a SCALAR Subquery containing exactly one 
row and one column.
+
+        The `scalar()` method is useful for extracting a `Column` object that 
represents a scalar
+        value from a DataFrame, especially when the DataFrame results from an 
aggregation or
+        single-value computation. This returned `Column` can then be used 
directly in `select`
+        clauses or as predicates in filters on the outer DataFrame, enabling 
dynamic data filtering
+        and calculations based on scalar values.
+
+        .. versionadded:: 4.0.0
+
+        Returns
+        -------
+        :class:`Column`
+            A `Column` object representing a SCALAR subquery.
+
+        Examples
+        --------
+        Setup a sample DataFrame.
+
+        >>> data = [
+        ...     (1, "Alice", 45000, 101), (2, "Bob", 54000, 101), (3, 
"Charlie", 29000, 102),
+        ...     (4, "David", 61000, 102), (5, "Eve", 48000, 101),
+        ... ]
+        >>> employees = spark.createDataFrame(data, ["id", "name", "salary", 
"department_id"])
+
+        Example 1: Filter for employees with salary greater than the average 
salary.
+
+        >>> from pyspark.sql import functions as sf
+        >>> employees.where(
+        ...     sf.col("salary") > employees.select(sf.avg("salary")).scalar()
+        ... ).select("name", "salary", "department_id").show()
+        +-----+------+-------------+
+        | name|salary|department_id|
+        +-----+------+-------------+
+        |  Bob| 54000|          101|
+        |David| 61000|          102|
+        |  Eve| 48000|          101|
+        +-----+------+-------------+
+
+        Example 2: Filter for employees with salary greater than the average 
salary in their

Review Comment:
   ditto, let's mention that this is a correlated subquery example



##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -245,13 +262,17 @@ class Dataset[T] private[sql](
   }
 
   @transient private[sql] val logicalPlan: LogicalPlan = {
-    val plan = queryExecution.commandExecuted
-    if 
(sparkSession.sessionState.conf.getConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED))
 {
-      val dsIds = plan.getTagValue(Dataset.DATASET_ID_TAG).getOrElse(new 
HashSet[Long])
-      dsIds.add(id)
-      plan.setTagValue(Dataset.DATASET_ID_TAG, dsIds)
+    if (queryExecution.isLazyAnalysis) {
+      queryExecution.logical

Review Comment:
   cc @cloud-fan 



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2482,9 +2482,16 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
      */
     private def resolveSubQuery(
         e: SubqueryExpression,
-        outer: LogicalPlan)(
+        outer: LogicalPlan,
+        hasExplicitOuterRefs: Boolean = false)(

Review Comment:
   nit: let's add this new parameter to the doc above?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to