sunchao commented on code in PR #46691:
URL: https://github.com/apache/spark/pull/46691#discussion_r1608975585
##########
core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala:
##########
@@ -106,8 +106,15 @@ private[spark] object SerDeUtil extends Logging {
* Convert an RDD of Java objects to an RDD of serialized Python objects,
that is usable by
* PySpark.
*/
- def javaToPython(jRDD: JavaRDD[_]): JavaRDD[Array[Byte]] = {
- jRDD.rdd.mapPartitions { iter => new AutoBatchedPickler(iter) }
+ def javaToPython(jRDD: JavaRDD[_], batchSize: Integer): JavaRDD[Array[Byte]]
= {
+ // Similar logic in pairRDDToPython(rdd, batchSize)
+ if (batchSize == 0) {
+ new AutoBatchedPickler(jRDD)
+ } else {
+ val pickle = new Pickler(/* useMemo = */ true,
+ /* valueCompare = */ false)
+ jRDD.grouped(batchSize).map(batched => pickle.dumps(batched.asJava))
Review Comment:
looks like `grouped` is not a valid function?
```
[error]
/home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala:112:30:
type mismatch;
[error] found : org.apache.spark.api.java.JavaRDD[_$3] where type _$3
[error] required: Iterator[Any]
[error] new AutoBatchedPickler(jRDD)
[error] ^
[info] org.apache.spark.api.java.JavaRDD[_$3] <: Iterator[Any]?
[info] false
[error]
/home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala:112:7:
type mismatch;
[error] found : org.apache.spark.api.python.SerDeUtil.AutoBatchedPickler
[error] required: org.apache.spark.api.java.JavaRDD[Array[Byte]]
[error] new AutoBatchedPickler(jRDD)
[error] ^
[info] org.apache.spark.api.python.SerDeUtil.AutoBatchedPickler <:
org.apache.spark.api.java.JavaRDD[Array[Byte]]?
[info] false
[error]
/home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala:116:12:
value grouped is not a member of org.apache.spark.api.java.JavaRDD[_$3]
[error] jRDD.grouped(batchSize).map(batched =>
pickle.dumps(batched.asJava))
[error] ^
[info] done compiling
[info] compiling 30 Scala sources and 259 Java sources to
/home/runner/work/spark/spark/connector/connect/common/target/scala-2.12/classes
...
[error] three errors found
```
##########
core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala:
##########
@@ -106,8 +106,15 @@ private[spark] object SerDeUtil extends Logging {
* Convert an RDD of Java objects to an RDD of serialized Python objects,
that is usable by
* PySpark.
*/
- def javaToPython(jRDD: JavaRDD[_]): JavaRDD[Array[Byte]] = {
- jRDD.rdd.mapPartitions { iter => new AutoBatchedPickler(iter) }
+ def javaToPython(jRDD: JavaRDD[_], batchSize: Integer): JavaRDD[Array[Byte]]
= {
Review Comment:
we can make `batchSize` an optional parameter so call site doesn't need to
change
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]