ueshin commented on code in PR #52467:
URL: https://github.com/apache/spark/pull/52467#discussion_r2395897959


##########
python/pyspark/sql/tests/test_udf.py:
##########
@@ -1446,6 +1447,95 @@ def my_udf():
 
                 self.spark.range(1).select(my_udf().alias("result")).show()
 
+    def test_udf_binary_type(self):
+        def get_binary_type(x):
+            return type(x).__name__
+
+        binary_udf = udf(get_binary_type, returnType="string")
+
+        df = self.spark.createDataFrame(
+            [Row(b=b"hello"), Row(b=b"world")], 
schema=StructType([StructField("b", BinaryType())])
+        )
+
+        with self.sql_conf({"spark.sql.execution.pyspark.binaryAsBytes": 
"true"}):
+            result = 
df.select(binary_udf(col("b")).alias("type_name")).collect()
+            self.assertEqual(result[0]["type_name"], "bytes")
+            self.assertEqual(result[1]["type_name"], "bytes")

Review Comment:
   Shall we use `assertDataFrameEqual`? Same for the following checks?



##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -3063,6 +3078,22 @@ def tearDownClass(cls):
     not have_pandas or not have_pyarrow, pandas_requirement_message or 
pyarrow_requirement_message
 )
 class LegacyUDTFArrowTestsMixin(BaseUDTFTestsMixin):
+    def test_udtf_binary_type(self):
+        @udtf(returnType="type_name: string")
+        class BinaryTypeUDF:

Review Comment:
   ditto.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala:
##########
@@ -49,39 +56,52 @@ object EvaluatePython {
   /**
    * Helper for converting from Catalyst type to java type suitable for Pickle.
    */
-  def toJava(obj: Any, dataType: DataType): Any = (obj, dataType) match {
-    case (null, _) => null
-
-    case (row: InternalRow, struct: StructType) =>
-      val values = new Array[Any](row.numFields)
-      var i = 0
-      while (i < row.numFields) {
-        values(i) = toJava(row.get(i, struct.fields(i).dataType), 
struct.fields(i).dataType)
-        i += 1
-      }
-      new GenericRowWithSchema(values, struct)
+  def toJava(
+      obj: Any,
+      dataType: DataType,
+      binaryAsBytes: Boolean = SQLConf.get.pysparkBinaryAsBytes): Any = {

Review Comment:
   I guess we shouldn't use the default value for `binaryAsBytes` to see if we 
updated all places where it uses `toJava` to avoid unexpected behavior change?



##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -1823,7 +1823,14 @@ def collect(self) -> List[Row]:
 
         assert schema is not None and isinstance(schema, StructType)
 
-        return ArrowTableToRowsConversion.convert(table, schema)
+        return ArrowTableToRowsConversion.convert(
+            table, schema, binary_as_bytes=self._get_binary_as_bytes()
+        )
+
+    def _get_binary_as_bytes(self) -> bool:
+        """Get the binary_as_bytes configuration value from Spark session."""
+        conf_value = 
self._session.conf.get("spark.sql.execution.pyspark.binaryAsBytes", "true")
+        return (conf_value or "true").lower() == "true"

Review Comment:
   Which case is `(conf_value or "true")` for?



##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -3044,6 +3044,21 @@ def eval(self, v1, v2, v3, v4):
         for idx, field in enumerate(result_df.schema.fields):
             self.assertEqual(field.dataType, expected_output_types[idx])
 
+    def test_udtf_binary_type(self):
+        @udtf(returnType="type_name: string")
+        class BinaryTypeUDF:

Review Comment:
   nit:
   ```suggestion
           class BinaryTypeUDTF:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to