EnricoMi opened a new pull request, #38223:
URL: https://github.com/apache/spark/pull/38223

   ### What changes were proposed in this pull request?
   Improve the error messages when a Python method provided to 
`DataFrame.groupby(...).applyInPandas` / 
`DataFrame.groupby(...).cogroup(...).applyInPandas` returns a Pandas DataFrame 
that does not match the expected schema.
   
   With
   ```Python
   gdf = spark.range(2).join(spark.range(3).withColumnRenamed("id", 
"val")).groupby("id")
   ```
   
   **Mismatching column names, matching number of columns:**
   ```Python
   gdf.applyInPandas(lambda pdf: pdf.rename(columns={"val": "v"}), "id long, 
val long").show()
   # was: KeyError: 'val'
   # now: RuntimeError: Column names of the returned pandas.DataFrame do not 
match specified schema.
   #      Missing: val  Unexpected: v  Schema: id, val
   ```
   
   **Mismatching column names, different number of columns:**
   ```Python
   gdf.applyInPandas(lambda pdf: pdf.assign(foo=[3, 3, 
3]).rename(columns={"val": "v"}), "id long, val long").show()
   # was: RuntimeError: Number of columns of the returned pandas.DataFrame 
doesn't match specified schema.
   #      Expected: 2 Actual: 3
   # now: RuntimeError: Column names of the returned pandas.DataFrame do not 
match specified schema.
   #      Missing: val  Unexpected: foo, v  Schema: id, val
   ```
   
   **Expected schema matches but has duplicates (`id`) so that number of 
columns match:**
   ```Python
   gdf.applyInPandas(lambda pdf: pdf.rename(columns={"val": "v"}), "id long, id 
long").show()
   # was: java.lang.IllegalArgumentException: not all nodes and buffers were 
consumed.
   #      nodes: [ArrowFieldNode [length=3, nullCount=0]]
   #      buffers: [ArrowBuf[304], address:139860828549160, length:0, 
ArrowBuf[305], address:139860828549160, length:24]
   # now: RuntimeError: Column names of the returned pandas.DataFrame do not 
match specified schema.
   #      Unexpected: v  Schema: id, id
   ```
   
   **In case the returned Pandas DataFrame contains no column names (none of 
the column labels is a string):**
   ```Python
   gdf.applyInPandas(lambda pdf: pdf.assign(foo=[3, 3, 
3]).rename(columns={"id": 0, "val": 1, "foo": 2}), "id long, val long").show()
   # was: RuntimeError: Number of columns of the returned pandas.DataFrame 
doesn't match specified schema.
   #      Expected: 2 Actual: 3
   # now: RuntimeError: Number of columns of the returned pandas.DataFrame 
doesn't match specified schema.
   #      Expected: 2  Actual: 3  Schema: id, val
   ```
   
   **Mismatching types (ValueError and TypeError):**
   ```Python
   gdf.applyInPandas(lambda pdf: pdf, "id int, val string").show()
   # was: pyarrow.lib.ArrowTypeError: Expected a string or bytes dtype, got 
int64
   # now: pyarrow.lib.ArrowTypeError: Expected a string or bytes dtype, got 
int64
   #      The above exception was the direct cause of the following exception:
   #      TypeError: Exception thrown when converting pandas.Series (int64) 
with name 'val' to Arrow Array (string).
   
   
   gdf.applyInPandas(lambda pdf: pdf.assign(val=pdf["val"].apply(str)), "id 
int, val double").show()
   # was: pyarrow.lib.ArrowInvalid: Could not convert '0' with type str: tried 
to convert to double
   # now: pyarrow.lib.ArrowInvalid: Could not convert '0' with type str: tried 
to convert to double
   #      The above exception was the direct cause of the following exception:
   #      ValueError: Exception thrown when converting pandas.Series (object) 
with name 'val' to Arrow Array (double).
   
   with self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": 
safely}):
     gdf.applyInPandas(lambda pdf: pdf.assign(val=pdf["val"].apply(str)), "id 
int, val double").show()
   # was: ValueError: Exception thrown when converting pandas.Series (object) 
to Arrow Array (double).
   #      It can be caused by overflows or other unsafe conversions warned by 
Arrow. Arrow safe type check can be disabled
   #      by using SQL config 
`spark.sql.execution.pandas.convertToArrowArraySafely`.
   # now: ValueError: Exception thrown when converting pandas.Series (object) 
with name 'val' to Arrow Array (double).
   #      It can be caused by overflows or other unsafe conversions warned by 
Arrow. Arrow safe type check can be disabled
   #      by using SQL config 
`spark.sql.execution.pandas.convertToArrowArraySafely`.
   ```
   
   ### Why are the changes needed?
   Existing errors are generic (`KeyError`) or meaningless (`not all nodes and 
buffers were consumed`). The errors should help users in spotting the 
mismatching columns by naming them.
   
   The schema of the returned Pandas DataFrames can only be checked during 
processing the DataFrame, so such errors are very expensive. Therefore, they 
should be expressive.
   
   ### Does this PR introduce _any_ user-facing change?
   This only changes error messages, not behaviour.
   
   ### How was this patch tested?
   Tests all cases of schema mismatch for `GroupedData.applyInPandas` and 
`PandasCogroupedOps.applyInPandas`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to