ueshin commented on code in PR #41606:
URL: https://github.com/apache/spark/pull/41606#discussion_r1256248740
##########
python/pyspark/testing/utils.py:
##########
@@ -209,3 +219,176 @@ def check_error(
self.assertEqual(
expected, actual, f"Expected message parameters was '{expected}',
got '{actual}'"
)
+
+
+def assertDataFrameEqual(
+ df: DataFrame, expected: Union[DataFrame, List[Row]], check_row_order:
bool = False
+):
+ """
+ A util function to assert equality between DataFrames `df` and `expected`,
with
+ optional parameter `check_row_order`.
+
+ .. versionadded:: 3.5.0
+
+ For float values, assert approximate equality (1e-5 by default).
+
+ Parameters
+ ----------
+ df : DataFrame
+ The DataFrame that is being compared or tested.
+
+ expected : DataFrame or list of Row
+ The expected result of the operation, for comparison with the actual
result.
+
+ check_row_order : bool, optional
+ A flag indicates whether the order of rows should be considered in the
comparison.
+ If set to `False` (default), the row order is not taken into account.
+ If set to `True`, the order of rows is important and will be checked
during comparison.
+
+ Examples
+ --------
+ >>> df1 = spark.createDataFrame(data=[("1", 1000), ("2", 3000)],
schema=["id", "amount"])
+ >>> df2 = spark.createDataFrame(data=[("1", 1000), ("2", 3000)],
schema=["id", "amount"])
+ >>> assertDataFrameEqual(df1, df2) # pass
+
+
+ >>> df1 = spark.createDataFrame(data=[("1", 1000.00), ("2", 3000.00),
("3", 2000.00)], \
+ schema=["id", "amount"])
+ >>> df2 = spark.createDataFrame(data=[("1", 1001.00), ("2", 3000.00),
("3", 2003.00)], \
+ schema=["id", "amount"])
+ >>> assertDataFrameEqual(df1, df2) # fail
+
+ Results do not match: ( 0.66667 % )
+ [df]
+ Row(id='1', amount=1000.0)
+
+ [expected]
+ Row(id='1', amount=1001.0)
+
+ ********************
+
+ [df]
+ Row(id='3', amount=2000.0)
+
+ [expected]
+ Row(id='3', amount=2003.0)
+
+ ********************
+ """
+ if df is None and expected is None:
+ return True
+ elif df is None or expected is None:
+ return False
+
+ try:
+ # If Spark Connect dependencies are available, allow Spark Connect
DataFrame
+ from pyspark.sql.connect.dataframe import DataFrame as ConnectDataFrame
+
+ if not isinstance(df, DataFrame) and not isinstance(df,
ConnectDataFrame):
+ raise PySparkAssertionError(
+ error_class="UNSUPPORTED_DATA_TYPE",
+ message_parameters={"data_type": type(df)},
+ )
+ elif not isinstance(expected, DataFrame) and not isinstance(expected,
ConnectDataFrame):
+ raise PySparkAssertionError(
+ error_class="UNSUPPORTED_DATA_TYPE",
+ message_parameters={"data_type": type(expected)},
+ )
+ except Exception:
+ if not isinstance(df, DataFrame):
+ raise PySparkAssertionError(
+ error_class="UNSUPPORTED_DATA_TYPE",
+ message_parameters={"data_type": type(df)},
+ )
+ elif not isinstance(expected, DataFrame):
+ raise PySparkAssertionError(
+ error_class="UNSUPPORTED_DATA_TYPE",
+ message_parameters={"data_type": type(expected)},
+ )
+
+ # special cases: empty datasets, datasets with 0 columns
+ if (df.first() is None and expected.first() is None) or (
+ len(df.columns) == 0 and len(expected.columns) == 0
+ ):
+ return True
+
+ def compare_rows(r1: Row, r2: Row):
+ def compare_vals(val1, val2):
+ if isinstance(val1, list) and isinstance(val2, list):
+ return len(val1) == len(val2) and all(
+ compare_vals(x, y) for x, y in zip(val1, val2)
+ )
+ elif isinstance(val1, Row) and isinstance(val2, Row):
+ return all(compare_vals(x, y) for x, y in zip(val1, val2))
+ elif isinstance(val1, dict) and isinstance(val2, dict):
+ return (
+ len(val1.keys()) == len(val2.keys())
+ and val1.keys() == val2.keys()
+ and all(compare_vals(val1[k], val2[k]) for k in
val1.keys())
+ )
+ elif isinstance(val1, float) and isinstance(val2, float):
+ if abs(val1 - val2) > 1e-5:
+ return False
+ else:
+ if val1 != val2:
+ return False
+ return True
+
+ if r1 is None and r2 is None:
+ return True
+ elif r1 is None or r2 is None:
+ return False
+
+ return compare_vals(r1, r2)
+
+ def assert_schema_equal(
+ df_schema: StructType,
+ expected_schema: StructType,
+ ):
+ if df_schema != expected_schema:
+ raise PySparkAssertionError(
+ error_class="DIFFERENT_SCHEMA",
+ message_parameters={"df_schema": df_schema, "expected_schema":
expected_schema},
+ )
+
+ def assert_rows_equal(rows1: Row, rows2: Row):
+ zipped = list(zip_longest(rows1, rows2))
+ rows_equal = True
+ error_msg = "Results do not match: "
+ diff_msg = ""
+ diff_rows_cnt = 0
+
+ for r1, r2 in zipped:
+ if not compare_rows(r1, r2):
+ rows_equal = False
+ diff_rows_cnt += 1
+ diff_msg += (
+ "[df]" + "\n" + str(r1) + "\n\n" + "[expected]" + "\n" +
str(r2) + "\n\n"
+ )
+ diff_msg += "********************" + "\n\n"
+
+ if not rows_equal:
+ percent_diff = diff_rows_cnt / len(zipped)
+ error_msg += "( %.5f %% )" % percent_diff
+ error_msg += "\n" + diff_msg
+ raise PySparkAssertionError(
+ error_class="DIFFERENT_ROWS",
+ message_parameters={"error_msg": error_msg},
+ )
+
+ if not check_row_order:
+ try:
+ # rename duplicate columns for sorting
+ renamed_df = df.toDF(*[f"_{i}" for i in range(len(df.columns))])
+ renamed_expected = expected.toDF(*[f"_{i}" for i in
range(len(df.columns))])
Review Comment:
`len(df.columns))` should be `len(expected.columns))`?
The number of columns could be different.
##########
python/pyspark/testing/utils.py:
##########
@@ -209,3 +219,176 @@ def check_error(
self.assertEqual(
expected, actual, f"Expected message parameters was '{expected}',
got '{actual}'"
)
+
+
+def assertDataFrameEqual(
+ df: DataFrame, expected: Union[DataFrame, List[Row]], check_row_order:
bool = False
+):
+ """
+ A util function to assert equality between DataFrames `df` and `expected`,
with
+ optional parameter `check_row_order`.
+
+ .. versionadded:: 3.5.0
+
+ For float values, assert approximate equality (1e-5 by default).
+
+ Parameters
+ ----------
+ df : DataFrame
+ The DataFrame that is being compared or tested.
+
+ expected : DataFrame or list of Row
+ The expected result of the operation, for comparison with the actual
result.
+
+ check_row_order : bool, optional
+ A flag indicates whether the order of rows should be considered in the
comparison.
+ If set to `False` (default), the row order is not taken into account.
+ If set to `True`, the order of rows is important and will be checked
during comparison.
+
+ Examples
+ --------
+ >>> df1 = spark.createDataFrame(data=[("1", 1000), ("2", 3000)],
schema=["id", "amount"])
+ >>> df2 = spark.createDataFrame(data=[("1", 1000), ("2", 3000)],
schema=["id", "amount"])
+ >>> assertDataFrameEqual(df1, df2) # pass
+
+
+ >>> df1 = spark.createDataFrame(data=[("1", 1000.00), ("2", 3000.00),
("3", 2000.00)], \
+ schema=["id", "amount"])
+ >>> df2 = spark.createDataFrame(data=[("1", 1001.00), ("2", 3000.00),
("3", 2003.00)], \
+ schema=["id", "amount"])
+ >>> assertDataFrameEqual(df1, df2) # fail
+
+ Results do not match: ( 0.66667 % )
+ [df]
+ Row(id='1', amount=1000.0)
+
+ [expected]
+ Row(id='1', amount=1001.0)
+
+ ********************
+
+ [df]
+ Row(id='3', amount=2000.0)
+
+ [expected]
+ Row(id='3', amount=2003.0)
+
+ ********************
+ """
+ if df is None and expected is None:
+ return True
+ elif df is None or expected is None:
+ return False
+
+ try:
+ # If Spark Connect dependencies are available, allow Spark Connect
DataFrame
+ from pyspark.sql.connect.dataframe import DataFrame as ConnectDataFrame
+
+ if not isinstance(df, DataFrame) and not isinstance(df,
ConnectDataFrame):
+ raise PySparkAssertionError(
+ error_class="UNSUPPORTED_DATA_TYPE",
+ message_parameters={"data_type": type(df)},
+ )
+ elif not isinstance(expected, DataFrame) and not isinstance(expected,
ConnectDataFrame):
+ raise PySparkAssertionError(
+ error_class="UNSUPPORTED_DATA_TYPE",
+ message_parameters={"data_type": type(expected)},
+ )
+ except Exception:
+ if not isinstance(df, DataFrame):
+ raise PySparkAssertionError(
+ error_class="UNSUPPORTED_DATA_TYPE",
+ message_parameters={"data_type": type(df)},
+ )
+ elif not isinstance(expected, DataFrame):
+ raise PySparkAssertionError(
+ error_class="UNSUPPORTED_DATA_TYPE",
+ message_parameters={"data_type": type(expected)},
+ )
+
+ # special cases: empty datasets, datasets with 0 columns
+ if (df.first() is None and expected.first() is None) or (
+ len(df.columns) == 0 and len(expected.columns) == 0
+ ):
+ return True
+
+ def compare_rows(r1: Row, r2: Row):
+ def compare_vals(val1, val2):
+ if isinstance(val1, list) and isinstance(val2, list):
+ return len(val1) == len(val2) and all(
+ compare_vals(x, y) for x, y in zip(val1, val2)
+ )
+ elif isinstance(val1, Row) and isinstance(val2, Row):
+ return all(compare_vals(x, y) for x, y in zip(val1, val2))
+ elif isinstance(val1, dict) and isinstance(val2, dict):
+ return (
+ len(val1.keys()) == len(val2.keys())
+ and val1.keys() == val2.keys()
+ and all(compare_vals(val1[k], val2[k]) for k in
val1.keys())
+ )
+ elif isinstance(val1, float) and isinstance(val2, float):
+ if abs(val1 - val2) > 1e-5:
+ return False
+ else:
+ if val1 != val2:
+ return False
+ return True
+
+ if r1 is None and r2 is None:
+ return True
+ elif r1 is None or r2 is None:
+ return False
+
+ return compare_vals(r1, r2)
+
+ def assert_schema_equal(
+ df_schema: StructType,
+ expected_schema: StructType,
+ ):
+ if df_schema != expected_schema:
+ raise PySparkAssertionError(
+ error_class="DIFFERENT_SCHEMA",
+ message_parameters={"df_schema": df_schema, "expected_schema":
expected_schema},
+ )
+
+ def assert_rows_equal(rows1: Row, rows2: Row):
+ zipped = list(zip_longest(rows1, rows2))
+ rows_equal = True
+ error_msg = "Results do not match: "
+ diff_msg = ""
+ diff_rows_cnt = 0
+
+ for r1, r2 in zipped:
+ if not compare_rows(r1, r2):
+ rows_equal = False
+ diff_rows_cnt += 1
+ diff_msg += (
+ "[df]" + "\n" + str(r1) + "\n\n" + "[expected]" + "\n" +
str(r2) + "\n\n"
+ )
+ diff_msg += "********************" + "\n\n"
+
+ if not rows_equal:
+ percent_diff = diff_rows_cnt / len(zipped)
+ error_msg += "( %.5f %% )" % percent_diff
+ error_msg += "\n" + diff_msg
+ raise PySparkAssertionError(
+ error_class="DIFFERENT_ROWS",
+ message_parameters={"error_msg": error_msg},
+ )
+
+ if not check_row_order:
+ try:
+ # rename duplicate columns for sorting
+ renamed_df = df.toDF(*[f"_{i}" for i in range(len(df.columns))])
+ renamed_expected = expected.toDF(*[f"_{i}" for i in
range(len(df.columns))])
+
+ df = renamed_df.sort(renamed_df.columns).toDF(*df.columns)
+ expected =
renamed_expected.sort(renamed_expected.columns).toDF(*df.columns)
Review Comment:
ditto for `df.columns`. should be `expected.columns`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]