kyle-winkelman commented on code in PR #49659:
URL: https://github.com/apache/spark/pull/49659#discussion_r1929236406


##########
core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala:
##########
@@ -609,6 +637,76 @@ class RDDSuite extends SparkFunSuite with 
SharedSparkContext with Eventually {
     }
   }
 
+  test("zipPartitions2") {
+    val data1 = sc.parallelize(1 to 8, 4)
+    val data2 = sc.parallelize(9 to 16, 4)
+    val zipped = data1
+      .zipPartitions(data2) { case (i1, i2) =>
+        Iterator(i1.mkString(",") + ";" + i2.mkString(","))
+      }
+      .collect()
+      .toList
+    assert(zipped === List("1,2;9,10", "3,4;11,12", "5,6;13,14", "7,8;15,16"))
+  }
+
+  test("zipPartitions3") {
+    val data1 = sc.parallelize(1 to 8, 4)
+    val data2 = sc.parallelize(9 to 16, 4)
+    val data3 = sc.parallelize(17 to 24, 4)
+    val zipped = data1
+      .zipPartitions(data2, data3) { case (i1, i2, i3) =>
+        Iterator(i1.mkString(",") + ";" + i2.mkString(",") + ";" + 
i3.mkString(","))
+      }
+      .collect()
+      .toList
+    assert(
+      zipped ===
+        List("1,2;9,10;17,18", "3,4;11,12;19,20", "5,6;13,14;21,22", 
"7,8;15,16;23,24"))
+  }
+
+  test("zipPartitions4") {
+    val data1 = sc.parallelize(1 to 8, 4)
+    val data2 = sc.parallelize(9 to 16, 4)
+    val data3 = sc.parallelize(17 to 24, 4)
+    val data4 = sc.parallelize(25 to 32, 4)
+    val zipped = data1
+      .zipPartitions(data2, data3, data4) { case (i1, i2, i3, i4) =>
+        Iterator(
+          i1.mkString(",") + ";" + i2.mkString(",") + ";" + i3.mkString(",") + 
";" +
+            i4.mkString(","))
+      }
+      .collect()
+      .toList
+    assert(
+      zipped ===
+        List(
+          "1,2;9,10;17,18;25,26",
+          "3,4;11,12;19,20;27,28",
+          "5,6;13,14;21,22;29,30",
+          "7,8;15,16;23,24;31,32"))
+  }
+
+  test("zipPartitionsN") {
+    val data1 = sc.parallelize(1 to 8, 4)
+    val data2 = sc.parallelize(9 to 16, 4)
+    val data3 = sc.parallelize(17 to 24, 4)
+    val data4 = sc.parallelize(25 to 32, 4)
+    val data5 = sc.parallelize(33 to 40, 4)
+    val zipped = data1
+      .zipPartitions(data2, data3, data4, data5) { is =>
+        Iterator(is.map(_.mkString(",")).mkString(";"))
+      }
+      .collect()
+      .toList
+    assert(
+      zipped ===
+        List(
+          "1,2;9,10;17,18;25,26;33,34",
+          "3,4;11,12;19,20;27,28;35,36",
+          "5,6;13,14;21,22;29,30;37,38",
+          "7,8;15,16;23,24;31,32;39,40"))
+  }

Review Comment:
   +



##########
core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala:
##########
@@ -279,49 +287,120 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] 
extends Serializable {
   /**
    * Return an RDD created by piping elements to a forked external process.
    */
-  def pipe(command: JList[String],
-           env: JMap[String, String],
-           separateWorkingDir: Boolean,
-           bufferSize: Int): JavaRDD[String] = {
+  def pipe(
+      command: JList[String],
+      env: JMap[String, String],
+      separateWorkingDir: Boolean,
+      bufferSize: Int): JavaRDD[String] = {
     rdd.pipe(command.asScala.toSeq, env.asScala, null, null, 
separateWorkingDir, bufferSize)
   }
 
   /**
    * Return an RDD created by piping elements to a forked external process.
    */
-  def pipe(command: JList[String],
-           env: JMap[String, String],
-           separateWorkingDir: Boolean,
-           bufferSize: Int,
-           encoding: String): JavaRDD[String] = {
-    rdd.pipe(command.asScala.toSeq, env.asScala, null, null, 
separateWorkingDir, bufferSize,
+  def pipe(
+      command: JList[String],
+      env: JMap[String, String],
+      separateWorkingDir: Boolean,
+      bufferSize: Int,
+      encoding: String): JavaRDD[String] = {
+    rdd.pipe(
+      command.asScala.toSeq,
+      env.asScala,
+      null,
+      null,
+      separateWorkingDir,
+      bufferSize,
       encoding)
   }
 
   /**
    * Zips this RDD with another one, returning key-value pairs with the first 
element in each RDD,
    * second element in each RDD, etc. Assumes that the two RDDs have the *same 
number of
-   * partitions* and the *same number of elements in each partition* (e.g. one 
was made through
-   * a map on the other).
+   * partitions* and the *same number of elements in each partition* (e.g. one 
was made through a
+   * map on the other).
    */
   def zip[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U] = {
     JavaPairRDD.fromRDD(rdd.zip(other.rdd)(other.classTag))(classTag, 
other.classTag)
   }
 
   /**
-   * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD 
by
-   * applying a function to the zipped partitions. Assumes that all the RDDs 
have the
-   * *same number of partitions*, but does *not* require them to have the same 
number
-   * of elements in each partition.
+   * Zip this RDD's partitions with one other RDD and return a new RDD by 
applying a function to
+   * the zipped partitions. Assumes that both the RDDs have the *same number 
of partitions*, but
+   * does *not* require them to have the same number of elements in each 
partition.
    */
   def zipPartitions[U, V](
       other: JavaRDDLike[U, _],
       f: FlatMapFunction2[JIterator[T], JIterator[U], V]): JavaRDD[V] = {
-    def fn: (Iterator[T], Iterator[U]) => Iterator[V] = {
-      (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).asScala
+    def fn: (Iterator[T], Iterator[U]) => Iterator[V] = { (x: Iterator[T], y: 
Iterator[U]) =>
+      f.call(x.asJava, y.asJava).asScala
     }
+    JavaRDD
+      .fromRDD(rdd.zipPartitions(other.rdd)(fn)(other.classTag, 
fakeClassTag[V]))(fakeClassTag[V])
+  }
+
+  /**
+   * Zip this RDD's partitions with two more RDDs and return a new RDD by 
applying a function to
+   * the zipped partitions. Assumes that all the RDDs have the *same number of 
partitions*, but
+   * does *not* require them to have the same number of elements in each 
partition.
+   */
+  @Since("4.1.0")
+  def zipPartitions[U1, U2, V](
+      other1: JavaRDDLike[U1, _],
+      other2: JavaRDDLike[U2, _],
+      f: FlatMapFunction3[JIterator[T], JIterator[U1], JIterator[U2], V]): 
JavaRDD[V] = {
+    def fn: (Iterator[T], Iterator[U1], Iterator[U2]) => Iterator[V] =
+      (t: Iterator[T], u1: Iterator[U1], u2: Iterator[U2]) =>
+        f.call(t.asJava, u1.asJava, u2.asJava).asScala
+    JavaRDD.fromRDD(
+      rdd.zipPartitions(other1.rdd, other2.rdd)(fn)(
+        other1.classTag,
+        other2.classTag,
+        fakeClassTag[V]))(fakeClassTag[V])
+  }
+
+  /**
+   * Zip this RDD's partitions with three more RDDs and return a new RDD by 
applying a function to
+   * the zipped partitions. Assumes that all the RDDs have the *same number of 
partitions*, but
+   * does *not* require them to have the same number of elements in each 
partition.
+   */
+  @Since("4.1.0")
+  def zipPartitions[U1, U2, U3, V](
+      other1: JavaRDDLike[U1, _],
+      other2: JavaRDDLike[U2, _],
+      other3: JavaRDDLike[U3, _],
+      f: FlatMapFunction4[JIterator[T], JIterator[U1], JIterator[U2], 
JIterator[U3], V])
+      : JavaRDD[V] = {
+    def fn: (Iterator[T], Iterator[U1], Iterator[U2], Iterator[U3]) => 
Iterator[V] =
+      (t: Iterator[T], u1: Iterator[U1], u2: Iterator[U2], u3: Iterator[U3]) =>
+        f.call(t.asJava, u1.asJava, u2.asJava, u3.asJava).asScala
     JavaRDD.fromRDD(
-      rdd.zipPartitions(other.rdd)(fn)(other.classTag, 
fakeClassTag[V]))(fakeClassTag[V])
+      rdd.zipPartitions(other1.rdd, other2.rdd, other3.rdd)(fn)(
+        other1.classTag,
+        other2.classTag,
+        other3.classTag,
+        fakeClassTag[V]))(fakeClassTag[V])
+  }
+
+  /**
+   * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD 
by applying a
+   * function to the zipped partitions. Assumes that all the RDDs have the 
*same number of
+   * partitions*, but does *not* require them to have the same number of 
elements in each
+   * partition.
+   *
+   * @note
+   *   A generic version of `zipPartitions` for an arbitrary number of RDDs. 
It may be type unsafe
+   *   and other `zipPartitions` methods should be preferred.
+   */
+  @Since("4.1.0")
+  @varargs
+  def zipPartitions[U, V](
+      f: FlatMapFunction[JList[JIterator[U]], V],
+      others: JavaRDDLike[_, _]*): JavaRDD[V] = {
+    def fn: Seq[Iterator[_]] => Iterator[V] =
+      (i: Seq[Iterator[_]]) => 
f.call(i.map(_.asInstanceOf[Iterator[U]].asJava).asJava).asScala
+    JavaRDD
+      .fromRDD(rdd.zipPartitions(others.map(_.rdd): 
_*)(fn)(fakeClassTag[V]))(fakeClassTag[V])
   }

Review Comment:
   +



##########
core/src/main/scala/org/apache/spark/rdd/RDD.scala:
##########
@@ -987,47 +1041,72 @@ abstract class RDD[T: ClassTag](
   }
 
   /**
-   * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD 
by
-   * applying a function to the zipped partitions. Assumes that all the RDDs 
have the
-   * *same number of partitions*, but does *not* require them to have the same 
number
-   * of elements in each partition.
+   * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD 
by applying a
+   * function to the zipped partitions. Assumes that all the RDDs have the 
*same number of
+   * partitions*, but does *not* require them to have the same number of 
elements in each
+   * partition.
    */
-  def zipPartitions[B: ClassTag, V: ClassTag]
-      (rdd2: RDD[B], preservesPartitioning: Boolean)
-      (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope {
+  def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B], 
preservesPartitioning: Boolean)(
+      f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope {
     new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, 
preservesPartitioning)
   }
 
-  def zipPartitions[B: ClassTag, V: ClassTag]
-      (rdd2: RDD[B])
-      (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope {
+  def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B])(
+      f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope {
     zipPartitions(rdd2, preservesPartitioning = false)(f)
   }
 
-  def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
-      (rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)
-      (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = 
withScope {
+  def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag](
+      rdd2: RDD[B],
+      rdd3: RDD[C],
+      preservesPartitioning: Boolean)(
+      f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = 
withScope {
     new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, 
preservesPartitioning)
   }
 
-  def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
-      (rdd2: RDD[B], rdd3: RDD[C])
-      (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = 
withScope {
+  def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: 
RDD[C])(
+      f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = 
withScope {
     zipPartitions(rdd2, rdd3, preservesPartitioning = false)(f)
   }
 
-  def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
-      (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: 
Boolean)
-      (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => 
Iterator[V]): RDD[V] = withScope {
-    new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, 
preservesPartitioning)
-  }
+  def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag](
+      rdd2: RDD[B],
+      rdd3: RDD[C],
+      rdd4: RDD[D],
+      preservesPartitioning: Boolean)(
+      f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): 
RDD[V] =
+    withScope {
+      new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, 
preservesPartitioning)
+    }
 
-  def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
-      (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
-      (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => 
Iterator[V]): RDD[V] = withScope {
-    zipPartitions(rdd2, rdd3, rdd4, preservesPartitioning = false)(f)
+  def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag](
+      rdd2: RDD[B],
+      rdd3: RDD[C],
+      rdd4: RDD[D])(
+      f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): 
RDD[V] =
+    withScope {
+      zipPartitions(rdd2, rdd3, rdd4, preservesPartitioning = false)(f)
+    }
+
+  /**
+   * A generic version of `zipPartitions` for an arbitrary number of RDDs. It 
may be type unsafe
+   * and other `zipPartitions` methods should be preferred.
+   */
+  @Since("4.1.0")
+  def zipPartitions[V: ClassTag](preservesPartitioning: Boolean, rdds: 
RDD[_]*)(
+      f: Seq[Iterator[_]] => Iterator[V]): RDD[V] = withScope {
+    new ZippedPartitionsRDDN(sc, sc.clean(f), this +: rdds, 
preservesPartitioning)
   }
 
+  /**
+   * A generic version of `zipPartitions` for an arbitrary number of RDDs. It 
may be type unsafe
+   * and other `zipPartitions` methods should be preferred.
+   */
+  @Since("4.1.0")
+  def zipPartitions[V: ClassTag](rdds: RDD[_]*)(f: Seq[Iterator[_]] => 
Iterator[V]): RDD[V] =
+    withScope {
+      zipPartitions(preservesPartitioning = false, rdds: _*)(f)
+    }

Review Comment:
   +



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to