EnricoMi commented on code in PR #38223:
URL: https://github.com/apache/spark/pull/38223#discussion_r1031150953
##########
python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py:
##########
@@ -165,100 +148,191 @@ def merge_pandas(lft, _):
)
def test_apply_in_pandas_not_returning_pandas_dataframe(self):
- left = self.data1
- right = self.data2
+ self._test_merge_error(
+ fn=lambda lft, rgt: lft.size + rgt.size,
+ error_class=PythonException,
+ error_message_regex="Return type of the user-defined function "
+ "should be pandas.DataFrame, but is <class 'numpy.int64'>",
+ )
+
+ def test_apply_in_pandas_returning_column_names(self):
+ self._test_merge(fn=lambda lft, rgt: pd.merge(lft, rgt, on=["id",
"k"]))
+ def test_apply_in_pandas_returning_no_column_names(self):
def merge_pandas(lft, rgt):
- return lft.size + rgt.size
+ res = pd.merge(lft, rgt, on=["id", "k"])
+ res.columns = range(res.columns.size)
+ return res
- with QuietTest(self.sc):
- with self.assertRaisesRegex(
- PythonException,
- "Return type of the user-defined function should be
pandas.DataFrame, "
- "but is <class 'numpy.int64'>",
- ):
- (
- left.groupby("id")
- .cogroup(right.groupby("id"))
- .applyInPandas(merge_pandas, "id long, k int, v int, v2
int")
- .collect()
- )
+ self._test_merge(fn=merge_pandas)
- def test_apply_in_pandas_returning_wrong_number_of_columns(self):
- left = self.data1
- right = self.data2
+ def test_apply_in_pandas_returning_column_names_sometimes(self):
+ def merge_pandas(lft, rgt):
+ res = pd.merge(lft, rgt, on=["id", "k"])
+ if 0 in lft["id"] and lft["id"][0] % 2 == 0:
+ return res
+ res.columns = range(res.columns.size)
+ return res
+
+ self._test_merge(fn=merge_pandas)
+ def test_apply_in_pandas_returning_wrong_column_names(self):
def merge_pandas(lft, rgt):
if 0 in lft["id"] and lft["id"][0] % 2 == 0:
lft["add"] = 0
if 0 in rgt["id"] and rgt["id"][0] % 3 == 0:
rgt["more"] = 1
return pd.merge(lft, rgt, on=["id", "k"])
- with QuietTest(self.sc):
- with self.assertRaisesRegex(
- PythonException,
- "Number of columns of the returned pandas.DataFrame "
- "doesn't match specified schema. Expected: 4 Actual: 6",
- ):
- (
- # merge_pandas returns two columns for even keys while we
set schema to four
- left.groupby("id")
- .cogroup(right.groupby("id"))
- .applyInPandas(merge_pandas, "id long, k int, v int, v2
int")
- .collect()
- )
+ self._test_merge_error(
+ fn=merge_pandas,
+ error_class=PythonException,
+ error_message_regex="Column names of the returned pandas.DataFrame
"
+ "do not match specified schema. Unexpected: add, more Schema:
id, k, v, v2\n",
+ )
- def test_apply_in_pandas_returning_empty_dataframe(self):
- left = self.data1
- right = self.data2
+ # with very large schema, missing and unexpected is limited to 5
+ # and the schema is abbreviated in the error message
+ schema = "id long, k long, mean double, " + ", ".join(
+ f"column_with_long_column_name_{no} integer" for no in range(35)
+ )
+ self._test_merge_error(
+ fn=lambda lft, rgt: pd.DataFrame(
+ [
+ (
+ lft.id,
+ lft.k,
+ lft.v.mean(),
+ )
+ + tuple(lft.v.mean() for _ in range(7))
+ ],
+ columns=["id", "k", "mean"] + [f"extra_column_{no} integer"
for no in range(7)],
+ ),
+ output_schema=schema,
+ error_class=PythonException,
+ error_message_regex="Column names of the returned
pandas\\.DataFrame "
+ "do not match specified schema\\. "
+ "Missing \\(first 5 of 35\\): column_with_long_column_name_0,"
+ " column_with_long_column_name_1, column_with_long_column_name_10,"
+ " column_with_long_column_name_11, column_with_long_column_name_12
"
+ "Unexpected \\(first 5 of 7\\): extra_column_0 integer,
extra_column_1 integer,"
+ " extra_column_2 integer, extra_column_3 integer, extra_column_4
integer "
+ "Schema: id, k, mean, column_with_long_column_name_0,
column_with_long_column_name_1,"
+ " column_with_long_column_name_2, column_with_long_column_name_3,"
+ " column_with_long_column_name_4, column_with_long_column_name_5,"
+ " column_with_long_column_name_6, column_with_long_column_name_7,"
+ " column_with_long_column_name_8, column_with_long_column_name_9,"
+ " column_with_long_column_name_10,
column_with_long_column_name_11,"
+ " column_with_long_column_name_12,
column_with_long_column_name_13,"
+ " column_with_long_column_name_14,
column_with_\\.\\.\\.g_column_name_19,"
+ " column_with_long_column_name_20,
column_with_long_column_name_21,"
+ " column_with_long_column_name_22,
column_with_long_column_name_23,"
+ " column_with_long_column_name_24,
column_with_long_column_name_25,"
+ " column_with_long_column_name_26,
column_with_long_column_name_27,"
+ " column_with_long_column_name_28,
column_with_long_column_name_29,"
+ " column_with_long_column_name_30,
column_with_long_column_name_31,"
+ " column_with_long_column_name_32,
column_with_long_column_name_33,"
+ " column_with_long_column_name_34\n",
+ )
+ def test_apply_in_pandas_returning_no_column_names_and_wrong_amount(self):
def merge_pandas(lft, rgt):
if 0 in lft["id"] and lft["id"][0] % 2 == 0:
- return pd.DataFrame([])
+ lft[3] = 0
if 0 in rgt["id"] and rgt["id"][0] % 3 == 0:
- return pd.DataFrame([])
- return pd.merge(lft, rgt, on=["id", "k"])
-
- result = (
- left.groupby("id")
- .cogroup(right.groupby("id"))
- .applyInPandas(merge_pandas, "id long, k int, v int, v2 int")
- .sort(["id", "k"])
- .toPandas()
+ rgt[3] = 1
+ res = pd.merge(lft, rgt, on=["id", "k"])
+ res.columns = range(res.columns.size)
+ return res
+
+ self._test_merge_error(
+ fn=merge_pandas,
+ error_class=PythonException,
+ error_message_regex="Number of columns of the returned
pandas.DataFrame "
+ "doesn't match specified schema. Expected: 4 Actual: 6 Schema:
id, k, v, v2\n",
)
- left = left.toPandas()
- right = right.toPandas()
-
- expected = pd.merge(
- left[left["id"] % 2 != 0], right[right["id"] % 3 != 0], on=["id",
"k"]
- ).sort_values(by=["id", "k"])
+ # with very large schema the schema is abbreviated in the error message
+ schema = "id long, k long, mean double, " + ", ".join(
+ f"column_with_long_column_name_{no} integer" for no in range(35)
+ )
- assert_frame_equal(expected, result)
+ def fn(lft, _):
+ # remove column names from lft DataFrame
+ lft.columns = range(lft.columns.size)
+ return lft
- def
test_apply_in_pandas_returning_empty_dataframe_and_wrong_number_of_columns(self):
- left = self.data1
- right = self.data2
+ self._test_merge_error(
+ fn=fn,
+ output_schema=schema,
+ error_class=PythonException,
+ error_message_regex="Number of columns of the returned
pandas\\.DataFrame "
+ "doesn't match specified schema\\. Expected: 38 Actual: 3 "
+ "Schema: id, k, mean, column_with_long_column_name_0,
column_with_long_column_name_1,"
+ " column_with_long_column_name_2, column_with_long_column_name_3,"
Review Comment:
this is gone
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]