JoshRosen commented on code in PR #36683:
URL: https://github.com/apache/spark/pull/36683#discussion_r883115780


##########
python/pyspark/sql/pandas/conversion.py:
##########
@@ -613,16 +613,16 @@ def _create_from_pandas_with_arrow(
 
         @no_type_check
         def reader_func(temp_filename):
-            return 
self._jvm.PythonSQLUtils.readArrowStreamFromFile(jsparkSession, temp_filename)
+            return 
self._jvm.PythonSQLUtils.readArrowStreamFromFile(temp_filename)
 
         @no_type_check
         def create_RDD_server():
-            return self._jvm.ArrowRDDServer(jsparkSession)
+            return self._jvm.ArrowIteratorServer()
 
         # Create Spark DataFrame from Arrow stream file, using one batch per 
partition
-        jrdd = self._sc._serialize_to_jvm(arrow_data, ser, reader_func, 
create_RDD_server)
+        jiter = self._sc._serialize_to_jvm(arrow_data, ser, reader_func, 
create_RDD_server)

Review Comment:
   Here it looks like we have changed the type of `reader_func`: before it was 
a function from filename to `JavaRDD[Array[Byte]]`, whereas now it's a function 
from filename to `Iterator[Array[Byte]]`.
   
   I think this breaks the current interface contract of `_serialize_to_jvm` in 
a way that will cause problems if IO encryption is enabled. According to [that 
method's 
docs](https://github.com/apache/spark/blob/dbde77856d2e51ff502a7fc1dba7f10316c2211b/python/pyspark/context.py#L677),
 `reader_func` is only used when IO encryption is disabled (which is the 
default configuration). If IO encryption is enabled then this codepath will use 
the `PythonParallelizeServer` codepath which will still produce a 
`JavaRDD[Array[Byte]]`.
   
   To fix this, we might need to change the other pieces of this interface so 
that they all pass around `Iterator[Array[Byte]]` (in other words, modify the 
regular Python parallelize codepath so that its implementation is closer to 
what you have here).
   
   Good catch about the performance risks of passing `Array[Array[Byte]]` back 
to Python via Py4J, BTW. Do you know if `ArrayBuffer` has the same issue? I'm 
wondering whether we could pass `ArrayBuffer[Array[Byte]]` if we want to ensure 
that the returned object is still considered `Iterable` from Python.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to