kyle-winkelman commented on code in PR #49659:
URL: https://github.com/apache/spark/pull/49659#discussion_r1929508474


##########
core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala:
##########
@@ -308,20 +309,83 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] 
extends Serializable {
     JavaPairRDD.fromRDD(rdd.zip(other.rdd)(other.classTag))(classTag, 
other.classTag)
   }
 
-  /**
-   * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD 
by
-   * applying a function to the zipped partitions. Assumes that all the RDDs 
have the
-   * *same number of partitions*, but does *not* require them to have the same 
number
-   * of elements in each partition.
+  /*
+   * Zip this RDD's partitions with one other RDD and return a new RDD by 
applying a function to
+   * the zipped partitions. Assumes that both the RDDs have the *same number 
of partitions*, but
+   * does *not* require them to have the same number of elements in each 
partition.
    */
   def zipPartitions[U, V](
       other: JavaRDDLike[U, _],
       f: FlatMapFunction2[JIterator[T], JIterator[U], V]): JavaRDD[V] = {
-    def fn: (Iterator[T], Iterator[U]) => Iterator[V] = {
-      (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).asScala
+    def fn: (Iterator[T], Iterator[U]) => Iterator[V] = { (x: Iterator[T], y: 
Iterator[U]) =>
+      f.call(x.asJava, y.asJava).asScala
     }
+    JavaRDD
+      .fromRDD(rdd.zipPartitions(other.rdd)(fn)(other.classTag, 
fakeClassTag[V]))(fakeClassTag[V])
+  }
+
+  /**
+   * Zip this RDD's partitions with two more RDDs and return a new RDD by 
applying a function to
+   * the zipped partitions. Assumes that all the RDDs have the *same number of 
partitions*, but
+   * does *not* require them to have the same number of elements in each 
partition.
+   */
+  @Since("4.1.0")
+  def zipPartitions[U1, U2, V](
+      other1: JavaRDDLike[U1, _],
+      other2: JavaRDDLike[U2, _],
+      f: FlatMapFunction3[JIterator[T], JIterator[U1], JIterator[U2], V]): 
JavaRDD[V] = {
+    def fn: (Iterator[T], Iterator[U1], Iterator[U2]) => Iterator[V] =
+      (t: Iterator[T], u1: Iterator[U1], u2: Iterator[U2]) =>
+        f.call(t.asJava, u1.asJava, u2.asJava).asScala
+    JavaRDD.fromRDD(

Review Comment:
   Is this comment in regards to the entire PR or just the changes in 
JavaRDDLike? My long term goal was to add additional 
[cogroup](https://github.com/apache/spark/blob/v3.5.0/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala#L808)
 methods for 3, 4, and N number of KeyValueGroupedDatasets. I do not need all 
the logic from this PR for that goal, but thought it was a good small step in 
that direction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to