cloud-fan commented on code in PR #56398:
URL: https://github.com/apache/spark/pull/56398#discussion_r3399088508


##########
python/pyspark/sql/tests/test_column.py:
##########
@@ -928,6 +928,99 @@ def test_resolve_after_intersect(self):
         rows = df1.intersect(df2).select(df1.c).collect()
         self.assertEqual([r.c for r in rows], [2])
 
+    def test_resolve_through_zip(self):

Review Comment:
   In this family `through_*` has marked unary pass-through operators 
(filter/sort/distinct) while binary combinators use `after_*` 
(`test_resolve_after_union`, `test_resolve_after_intersect`). zip is a binary 
combinator, so `test_resolve_after_zip*` (here and in the parity overrides) 
would match the precedent.



##########
python/pyspark/sql/tests/connect/test_parity_column.py:
##########
@@ -50,6 +50,63 @@ def test_resolve_after_union(self):
         with self.assertRaisesRegex(AnalysisException, 
"CANNOT_RESOLVE_DATAFRAME_COLUMN"):
             df1.union(df2).select(df1.c).collect()
 
+    # zip merges the two column-projected sides into a single plan, so the
+    # per-DataFrame plan-id tags do not survive ResolveZip. A tagged left/right
+    # reference can no longer be found and raises in both strict and lenient

Review Comment:
   Worth pinning one more boundary case: a base-side reference, e.g. 
`df.zip(right).select(df.a)`. Unlike the projected sides, `ResolveZip` reuses 
the base plan unchanged (ResolveZip.scala:96, 117), so the base's plan-id tag 
survives the merge, and when one side is the bare base its output passes 
through to the final project list (`analyzeChain`, ResolveZip.scala:133-134). 
If I traced that right, this case resolves on Connect rather than raising — 
i.e. the limitation is specific to column-projected sides. Could you confirm by 
running it, and add it as a test on both sides? (`test_zip.py`'s 
`test_zip_one_side_is_base` only selects by name.)



##########
python/pyspark/sql/tests/test_column.py:
##########
@@ -928,6 +928,99 @@ def test_resolve_after_intersect(self):
         rows = df1.intersect(df2).select(df1.c).collect()
         self.assertEqual([r.c for r in rows], [2])
 
+    def test_resolve_through_zip(self):
+        # zip merges two column-projected DataFrames side by side. Classic
+        # resolves the tagged left/right reference by attribute id, which
+        # ResolveZip preserves in the merged Project, so it succeeds. Connect
+        # resolves by plan id, but ResolveZip collapses the two sides into one
+        # plan and drops the per-DataFrame plan-id tags, so the tagged
+        # reference is never found and it raises (overridden in the parity 
suite).
+        df = self.spark.createDataFrame([(1, 10), (2, 20), (3, 30)], ["a", 
"b"])
+        left = df.select((df.a + 1).alias("x"))
+        right = df.select((df.b * 2).alias("y"))
+        zipped = left.zip(right)
+        self.assertEqual(zipped.columns, ["x", "y"])
+        self.assertEqual(sorted(r.x for r in zipped.select(left.x).collect()), 
[2, 3, 4])
+        self.assertEqual(sorted(r.y for r in 
zipped.select(right.y).collect()), [20, 40, 60])
+        self.assertEqual(
+            sorted((r.x, r.y) for r in zipped.select(left.x, 
right.y).collect()),
+            [(2, 20), (3, 40), (4, 60)],
+        )
+
+    def test_resolve_through_zip_reordered(self):
+        # The originating DataFrame controls which side each column reads, in

Review Comment:
   Missing "from":
   ```suggestion
           # The originating DataFrame controls which side each column reads 
from, in
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to