allisonwang-db commented on code in PR #41606:
URL: https://github.com/apache/spark/pull/41606#discussion_r1251258342
##########
python/pyspark/testing/utils.py:
##########
@@ -209,3 +216,50 @@ def check_error(
self.assertEqual(
expected, actual, f"Expected message parameters was '{expected}',
got '{actual}'"
)
+
+
+def assertSparkSchemaEquality(
+ s1: Optional[Union[AtomicType, StructType, str, List[str], Tuple[str,
...]]],
+ s2: Optional[Union[AtomicType, StructType, str, List[str], Tuple[str,
...]]],
+):
+ if s1 != s2:
+ msg = "Schemas are different"
+ raise AssertionError(msg)
+
+
+def assertSparkDFEqual(left: PySparkDataFrame, right: PySparkDataFrame):
+ def assert_rows_equality(rows1, rows2):
+ if rows1 != rows2:
Review Comment:
Let's add some tests for
1. None/Null
2. Complex types (array, map, struct)
3. Floating points
##########
python/pyspark/testing/utils.py:
##########
@@ -209,3 +216,50 @@ def check_error(
self.assertEqual(
expected, actual, f"Expected message parameters was '{expected}',
got '{actual}'"
)
+
+
+def assertSparkSchemaEquality(
+ s1: Optional[Union[AtomicType, StructType, str, List[str], Tuple[str,
...]]],
+ s2: Optional[Union[AtomicType, StructType, str, List[str], Tuple[str,
...]]],
+):
+ if s1 != s2:
+ msg = "Schemas are different"
+ raise AssertionError(msg)
+
+
+def assertSparkDFEqual(left: PySparkDataFrame, right: PySparkDataFrame):
+ def assert_rows_equality(rows1, rows2):
+ if rows1 != rows2:
+ raise PySparkAssertionError(
+ error_class="DIFFERENT_DATAFRAME",
+ message_parameters={},
+ )
+
+ left = left.sort(left.columns)
Review Comment:
We don't always want the rows to be sorted, e.g. when a query has a "ORDER
BY" clause. We can check the logical plan of the dataframe to see if there are
any `Sort`.
```
val isSorted = df.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty
```
I am not sure if this works for Spark Connect tests.
##########
python/pyspark/testing/utils.py:
##########
@@ -209,3 +216,50 @@ def check_error(
self.assertEqual(
expected, actual, f"Expected message parameters was '{expected}',
got '{actual}'"
)
+
+
+def assertSparkSchemaEquality(
Review Comment:
assertSchemaEqual
##########
python/pyspark/testing/utils.py:
##########
@@ -209,3 +216,50 @@ def check_error(
self.assertEqual(
expected, actual, f"Expected message parameters was '{expected}',
got '{actual}'"
)
+
+
+def assertSparkSchemaEquality(
+ s1: Optional[Union[AtomicType, StructType, str, List[str], Tuple[str,
...]]],
+ s2: Optional[Union[AtomicType, StructType, str, List[str], Tuple[str,
...]]],
+):
+ if s1 != s2:
+ msg = "Schemas are different"
+ raise AssertionError(msg)
+
+
+def assertSparkDFEqual(left: PySparkDataFrame, right: PySparkDataFrame):
Review Comment:
Maybe we can have this:
assertDataFrameEqual(df: DataFrame, expected: Union[DataFrame, List[Row]])
We can use the `checkAnswer` from the Scala side for reference.
##########
python/pyspark/testing/utils.py:
##########
@@ -209,3 +216,50 @@ def check_error(
self.assertEqual(
expected, actual, f"Expected message parameters was '{expected}',
got '{actual}'"
)
+
+
+def assertSparkSchemaEquality(
+ s1: Optional[Union[AtomicType, StructType, str, List[str], Tuple[str,
...]]],
+ s2: Optional[Union[AtomicType, StructType, str, List[str], Tuple[str,
...]]],
+):
+ if s1 != s2:
+ msg = "Schemas are different"
+ raise AssertionError(msg)
+
+
+def assertSparkDFEqual(left: PySparkDataFrame, right: PySparkDataFrame):
+ def assert_rows_equality(rows1, rows2):
+ if rows1 != rows2:
+ raise PySparkAssertionError(
+ error_class="DIFFERENT_DATAFRAME",
+ message_parameters={},
+ )
+
+ left = left.sort(left.columns)
Review Comment:
If checking the logical plan is not doable, we can probably have a param
`ignore_row_order=True` for users to specify whether they want the rows sorted.
##########
python/pyspark/testing/utils.py:
##########
@@ -209,3 +216,50 @@ def check_error(
self.assertEqual(
expected, actual, f"Expected message parameters was '{expected}',
got '{actual}'"
)
+
+
+def assertSparkSchemaEquality(
+ s1: Optional[Union[AtomicType, StructType, str, List[str], Tuple[str,
...]]],
+ s2: Optional[Union[AtomicType, StructType, str, List[str], Tuple[str,
...]]],
+):
+ if s1 != s2:
+ msg = "Schemas are different"
+ raise AssertionError(msg)
+
+
+def assertSparkDFEqual(left: PySparkDataFrame, right: PySparkDataFrame):
+ def assert_rows_equality(rows1, rows2):
+ if rows1 != rows2:
+ raise PySparkAssertionError(
Review Comment:
Do we need to use error classes here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]