kyle-winkelman commented on code in PR #49659:
URL: https://github.com/apache/spark/pull/49659#discussion_r1929509254


##########
core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala:
##########
@@ -308,20 +309,83 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] 
extends Serializable {
     JavaPairRDD.fromRDD(rdd.zip(other.rdd)(other.classTag))(classTag, 
other.classTag)
   }
 
-  /**
-   * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD 
by
-   * applying a function to the zipped partitions. Assumes that all the RDDs 
have the
-   * *same number of partitions*, but does *not* require them to have the same 
number
-   * of elements in each partition.
+  /*
+   * Zip this RDD's partitions with one other RDD and return a new RDD by 
applying a function to
+   * the zipped partitions. Assumes that both the RDDs have the *same number 
of partitions*, but
+   * does *not* require them to have the same number of elements in each 
partition.
    */
   def zipPartitions[U, V](
       other: JavaRDDLike[U, _],
       f: FlatMapFunction2[JIterator[T], JIterator[U], V]): JavaRDD[V] = {
-    def fn: (Iterator[T], Iterator[U]) => Iterator[V] = {
-      (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).asScala
+    def fn: (Iterator[T], Iterator[U]) => Iterator[V] = { (x: Iterator[T], y: 
Iterator[U]) =>
+      f.call(x.asJava, y.asJava).asScala
     }
+    JavaRDD
+      .fromRDD(rdd.zipPartitions(other.rdd)(fn)(other.classTag, 
fakeClassTag[V]))(fakeClassTag[V])
+  }
+
+  /**
+   * Zip this RDD's partitions with two more RDDs and return a new RDD by 
applying a function to
+   * the zipped partitions. Assumes that all the RDDs have the *same number of 
partitions*, but
+   * does *not* require them to have the same number of elements in each 
partition.
+   */
+  @Since("4.1.0")
+  def zipPartitions[U1, U2, V](
+      other1: JavaRDDLike[U1, _],
+      other2: JavaRDDLike[U2, _],
+      f: FlatMapFunction3[JIterator[T], JIterator[U1], JIterator[U2], V]): 
JavaRDD[V] = {
+    def fn: (Iterator[T], Iterator[U1], Iterator[U2]) => Iterator[V] =
+      (t: Iterator[T], u1: Iterator[U1], u2: Iterator[U2]) =>
+        f.call(t.asJava, u1.asJava, u2.asJava).asScala
+    JavaRDD.fromRDD(

Review Comment:
   Here is what my long term might look like 
https://github.com/apache/spark/compare/master...kyle-winkelman:spark:everything
 (might have some noise in it, but it adds additional cogroup methods and 
[SPARK-42349](https://issues.apache.org/jira/browse/SPARK-42349)). If you would 
prefer I attempt to go straight for the big PR that does all the changes at 
once, I can repurpose this PR to target those changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to