beliefer opened a new pull request, #39483:
URL: https://github.com/apache/spark/pull/39483
### What changes were proposed in this pull request?
File
"/Users/ruifeng.zheng/Dev/spark/python/pyspark/sql/connect/dataframe.py", line
609, in pyspark.sql.connect.dataframe.DataFrame.intersect
Failed example:
`df1.intersect(df2).show()`
Expected:
```
+---+---+
| C1| C2|
+---+---+
| b| 3|
| a| 1|
+---+---+
```
Got:
```
+---+---+
| C1| C2|
+---+---+
| a| 1|
| b| 3|
+---+---+
<BLANKLINE>
```
After my investigation, the root cause is the different plan from pyspark to
connect.
The plan come from pyspark show below.
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[C1#2603, C2#2604L], functions=[])
+- Exchange hashpartitioning(C1#2603, C2#2604L, 200),
ENSURE_REQUIREMENTS, [plan_id=11257]
+- HashAggregate(keys=[C1#2603, C2#2604L], functions=[])
+- SortMergeJoin [coalesce(C1#2603, ), isnull(C1#2603),
coalesce(C2#2604L, 0), isnull(C2#2604L)], [coalesce(C1#2607, ),
isnull(C1#2607), coalesce(C2#2608L, 0), isnull(C2#2608L)], LeftSemi
:- Sort [coalesce(C1#2603, ) ASC NULLS FIRST, isnull(C1#2603)
ASC NULLS FIRST, coalesce(C2#2604L, 0) ASC NULLS FIRST, isnull(C2#2604L) ASC
NULLS FIRST], false, 0
: +- Exchange hashpartitioning(coalesce(C1#2603, ),
isnull(C1#2603), coalesce(C2#2604L, 0), isnull(C2#2604L), 200),
ENSURE_REQUIREMENTS, [plan_id=11250]
: +- Scan ExistingRDD[C1#2603,C2#2604L]
+- Sort [coalesce(C1#2607, ) ASC NULLS FIRST, isnull(C1#2607)
ASC NULLS FIRST, coalesce(C2#2608L, 0) ASC NULLS FIRST, isnull(C2#2608L) ASC
NULLS FIRST], false, 0
+- Exchange hashpartitioning(coalesce(C1#2607, ),
isnull(C1#2607), coalesce(C2#2608L, 0), isnull(C2#2608L), 200),
ENSURE_REQUIREMENTS, [plan_id=11251]
+- Scan ExistingRDD[C1#2607,C2#2608L]
<BLANKLINE>
<BLANKLINE>
```
The plan come from connect show below.
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[C1#1988, C2#1989L], functions=[])
+- Exchange hashpartitioning(C1#1988, C2#1989L, 200),
ENSURE_REQUIREMENTS, [plan_id=1550]
+- HashAggregate(keys=[C1#1988, C2#1989L], functions=[])
+- BroadcastHashJoin [coalesce(C1#1988, ), isnull(C1#1988),
coalesce(C2#1989L, 0), isnull(C2#1989L)], [coalesce(C1#1996, ),
isnull(C1#1996), coalesce(C2#1997L, 0), isnull(C2#1997L)], LeftSemi,
BuildRight, false
:- LocalTableScan [C1#1988, C2#1989L]
+- BroadcastExchange
HashedRelationBroadcastMode(List(coalesce(input[0, string, true], ),
isnull(input[0, string, true]), coalesce(input[1, bigint, true], 0),
isnull(input[1, bigint, true])),false), [plan_id=1546]
+- LocalTableScan [C1#1996, C2#1997L]
```
So the shuffle impacts the order of output.
### Why are the changes needed?
This PR adds `sort` operator to the example to test `DataFrame.intersect` so
that the output has determined order.
### Does this PR introduce _any_ user-facing change?
'No'.
New feature.
### How was this patch tested?
Manual test and doc test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]