kyle-winkelman commented on code in PR #49659:
URL: https://github.com/apache/spark/pull/49659#discussion_r1929236406
##########
core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala:
##########
@@ -609,6 +637,76 @@ class RDDSuite extends SparkFunSuite with
SharedSparkContext with Eventually {
}
}
+ test("zipPartitions2") {
+ val data1 = sc.parallelize(1 to 8, 4)
+ val data2 = sc.parallelize(9 to 16, 4)
+ val zipped = data1
+ .zipPartitions(data2) { case (i1, i2) =>
+ Iterator(i1.mkString(",") + ";" + i2.mkString(","))
+ }
+ .collect()
+ .toList
+ assert(zipped === List("1,2;9,10", "3,4;11,12", "5,6;13,14", "7,8;15,16"))
+ }
+
+ test("zipPartitions3") {
+ val data1 = sc.parallelize(1 to 8, 4)
+ val data2 = sc.parallelize(9 to 16, 4)
+ val data3 = sc.parallelize(17 to 24, 4)
+ val zipped = data1
+ .zipPartitions(data2, data3) { case (i1, i2, i3) =>
+ Iterator(i1.mkString(",") + ";" + i2.mkString(",") + ";" +
i3.mkString(","))
+ }
+ .collect()
+ .toList
+ assert(
+ zipped ===
+ List("1,2;9,10;17,18", "3,4;11,12;19,20", "5,6;13,14;21,22",
"7,8;15,16;23,24"))
+ }
+
+ test("zipPartitions4") {
+ val data1 = sc.parallelize(1 to 8, 4)
+ val data2 = sc.parallelize(9 to 16, 4)
+ val data3 = sc.parallelize(17 to 24, 4)
+ val data4 = sc.parallelize(25 to 32, 4)
+ val zipped = data1
+ .zipPartitions(data2, data3, data4) { case (i1, i2, i3, i4) =>
+ Iterator(
+ i1.mkString(",") + ";" + i2.mkString(",") + ";" + i3.mkString(",") +
";" +
+ i4.mkString(","))
+ }
+ .collect()
+ .toList
+ assert(
+ zipped ===
+ List(
+ "1,2;9,10;17,18;25,26",
+ "3,4;11,12;19,20;27,28",
+ "5,6;13,14;21,22;29,30",
+ "7,8;15,16;23,24;31,32"))
+ }
+
+ test("zipPartitionsN") {
+ val data1 = sc.parallelize(1 to 8, 4)
+ val data2 = sc.parallelize(9 to 16, 4)
+ val data3 = sc.parallelize(17 to 24, 4)
+ val data4 = sc.parallelize(25 to 32, 4)
+ val data5 = sc.parallelize(33 to 40, 4)
+ val zipped = data1
+ .zipPartitions(data2, data3, data4, data5) { is =>
+ Iterator(is.map(_.mkString(",")).mkString(";"))
+ }
+ .collect()
+ .toList
+ assert(
+ zipped ===
+ List(
+ "1,2;9,10;17,18;25,26;33,34",
+ "3,4;11,12;19,20;27,28;35,36",
+ "5,6;13,14;21,22;29,30;37,38",
+ "7,8;15,16;23,24;31,32;39,40"))
+ }
Review Comment:
+
##########
core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala:
##########
@@ -279,49 +287,120 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]]
extends Serializable {
/**
* Return an RDD created by piping elements to a forked external process.
*/
- def pipe(command: JList[String],
- env: JMap[String, String],
- separateWorkingDir: Boolean,
- bufferSize: Int): JavaRDD[String] = {
+ def pipe(
+ command: JList[String],
+ env: JMap[String, String],
+ separateWorkingDir: Boolean,
+ bufferSize: Int): JavaRDD[String] = {
rdd.pipe(command.asScala.toSeq, env.asScala, null, null,
separateWorkingDir, bufferSize)
}
/**
* Return an RDD created by piping elements to a forked external process.
*/
- def pipe(command: JList[String],
- env: JMap[String, String],
- separateWorkingDir: Boolean,
- bufferSize: Int,
- encoding: String): JavaRDD[String] = {
- rdd.pipe(command.asScala.toSeq, env.asScala, null, null,
separateWorkingDir, bufferSize,
+ def pipe(
+ command: JList[String],
+ env: JMap[String, String],
+ separateWorkingDir: Boolean,
+ bufferSize: Int,
+ encoding: String): JavaRDD[String] = {
+ rdd.pipe(
+ command.asScala.toSeq,
+ env.asScala,
+ null,
+ null,
+ separateWorkingDir,
+ bufferSize,
encoding)
}
/**
* Zips this RDD with another one, returning key-value pairs with the first
element in each RDD,
* second element in each RDD, etc. Assumes that the two RDDs have the *same
number of
- * partitions* and the *same number of elements in each partition* (e.g. one
was made through
- * a map on the other).
+ * partitions* and the *same number of elements in each partition* (e.g. one
was made through a
+ * map on the other).
*/
def zip[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U] = {
JavaPairRDD.fromRDD(rdd.zip(other.rdd)(other.classTag))(classTag,
other.classTag)
}
/**
- * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD
by
- * applying a function to the zipped partitions. Assumes that all the RDDs
have the
- * *same number of partitions*, but does *not* require them to have the same
number
- * of elements in each partition.
+ * Zip this RDD's partitions with one other RDD and return a new RDD by
applying a function to
+ * the zipped partitions. Assumes that both the RDDs have the *same number
of partitions*, but
+ * does *not* require them to have the same number of elements in each
partition.
*/
def zipPartitions[U, V](
other: JavaRDDLike[U, _],
f: FlatMapFunction2[JIterator[T], JIterator[U], V]): JavaRDD[V] = {
- def fn: (Iterator[T], Iterator[U]) => Iterator[V] = {
- (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).asScala
+ def fn: (Iterator[T], Iterator[U]) => Iterator[V] = { (x: Iterator[T], y:
Iterator[U]) =>
+ f.call(x.asJava, y.asJava).asScala
}
+ JavaRDD
+ .fromRDD(rdd.zipPartitions(other.rdd)(fn)(other.classTag,
fakeClassTag[V]))(fakeClassTag[V])
+ }
+
+ /**
+ * Zip this RDD's partitions with two more RDDs and return a new RDD by
applying a function to
+ * the zipped partitions. Assumes that all the RDDs have the *same number of
partitions*, but
+ * does *not* require them to have the same number of elements in each
partition.
+ */
+ @Since("4.1.0")
+ def zipPartitions[U1, U2, V](
+ other1: JavaRDDLike[U1, _],
+ other2: JavaRDDLike[U2, _],
+ f: FlatMapFunction3[JIterator[T], JIterator[U1], JIterator[U2], V]):
JavaRDD[V] = {
+ def fn: (Iterator[T], Iterator[U1], Iterator[U2]) => Iterator[V] =
+ (t: Iterator[T], u1: Iterator[U1], u2: Iterator[U2]) =>
+ f.call(t.asJava, u1.asJava, u2.asJava).asScala
+ JavaRDD.fromRDD(
+ rdd.zipPartitions(other1.rdd, other2.rdd)(fn)(
+ other1.classTag,
+ other2.classTag,
+ fakeClassTag[V]))(fakeClassTag[V])
+ }
+
+ /**
+ * Zip this RDD's partitions with three more RDDs and return a new RDD by
applying a function to
+ * the zipped partitions. Assumes that all the RDDs have the *same number of
partitions*, but
+ * does *not* require them to have the same number of elements in each
partition.
+ */
+ @Since("4.1.0")
+ def zipPartitions[U1, U2, U3, V](
+ other1: JavaRDDLike[U1, _],
+ other2: JavaRDDLike[U2, _],
+ other3: JavaRDDLike[U3, _],
+ f: FlatMapFunction4[JIterator[T], JIterator[U1], JIterator[U2],
JIterator[U3], V])
+ : JavaRDD[V] = {
+ def fn: (Iterator[T], Iterator[U1], Iterator[U2], Iterator[U3]) =>
Iterator[V] =
+ (t: Iterator[T], u1: Iterator[U1], u2: Iterator[U2], u3: Iterator[U3]) =>
+ f.call(t.asJava, u1.asJava, u2.asJava, u3.asJava).asScala
JavaRDD.fromRDD(
- rdd.zipPartitions(other.rdd)(fn)(other.classTag,
fakeClassTag[V]))(fakeClassTag[V])
+ rdd.zipPartitions(other1.rdd, other2.rdd, other3.rdd)(fn)(
+ other1.classTag,
+ other2.classTag,
+ other3.classTag,
+ fakeClassTag[V]))(fakeClassTag[V])
+ }
+
+ /**
+ * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD
by applying a
+ * function to the zipped partitions. Assumes that all the RDDs have the
*same number of
+ * partitions*, but does *not* require them to have the same number of
elements in each
+ * partition.
+ *
+ * @note
+ * A generic version of `zipPartitions` for an arbitrary number of RDDs.
It may be type unsafe
+ * and other `zipPartitions` methods should be preferred.
+ */
+ @Since("4.1.0")
+ @varargs
+ def zipPartitions[U, V](
+ f: FlatMapFunction[JList[JIterator[U]], V],
+ others: JavaRDDLike[_, _]*): JavaRDD[V] = {
+ def fn: Seq[Iterator[_]] => Iterator[V] =
+ (i: Seq[Iterator[_]]) =>
f.call(i.map(_.asInstanceOf[Iterator[U]].asJava).asJava).asScala
+ JavaRDD
+ .fromRDD(rdd.zipPartitions(others.map(_.rdd):
_*)(fn)(fakeClassTag[V]))(fakeClassTag[V])
}
Review Comment:
+
##########
core/src/main/scala/org/apache/spark/rdd/RDD.scala:
##########
@@ -987,47 +1041,72 @@ abstract class RDD[T: ClassTag](
}
/**
- * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD
by
- * applying a function to the zipped partitions. Assumes that all the RDDs
have the
- * *same number of partitions*, but does *not* require them to have the same
number
- * of elements in each partition.
+ * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD
by applying a
+ * function to the zipped partitions. Assumes that all the RDDs have the
*same number of
+ * partitions*, but does *not* require them to have the same number of
elements in each
+ * partition.
*/
- def zipPartitions[B: ClassTag, V: ClassTag]
- (rdd2: RDD[B], preservesPartitioning: Boolean)
- (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope {
+ def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B],
preservesPartitioning: Boolean)(
+ f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope {
new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2,
preservesPartitioning)
}
- def zipPartitions[B: ClassTag, V: ClassTag]
- (rdd2: RDD[B])
- (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope {
+ def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B])(
+ f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope {
zipPartitions(rdd2, preservesPartitioning = false)(f)
}
- def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
- (rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)
- (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
withScope {
+ def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag](
+ rdd2: RDD[B],
+ rdd3: RDD[C],
+ preservesPartitioning: Boolean)(
+ f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
withScope {
new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3,
preservesPartitioning)
}
- def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
- (rdd2: RDD[B], rdd3: RDD[C])
- (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
withScope {
+ def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3:
RDD[C])(
+ f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
withScope {
zipPartitions(rdd2, rdd3, preservesPartitioning = false)(f)
}
- def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
- (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning:
Boolean)
- (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) =>
Iterator[V]): RDD[V] = withScope {
- new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4,
preservesPartitioning)
- }
+ def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag](
+ rdd2: RDD[B],
+ rdd3: RDD[C],
+ rdd4: RDD[D],
+ preservesPartitioning: Boolean)(
+ f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]):
RDD[V] =
+ withScope {
+ new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4,
preservesPartitioning)
+ }
- def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
- (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
- (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) =>
Iterator[V]): RDD[V] = withScope {
- zipPartitions(rdd2, rdd3, rdd4, preservesPartitioning = false)(f)
+ def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag](
+ rdd2: RDD[B],
+ rdd3: RDD[C],
+ rdd4: RDD[D])(
+ f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]):
RDD[V] =
+ withScope {
+ zipPartitions(rdd2, rdd3, rdd4, preservesPartitioning = false)(f)
+ }
+
+ /**
+ * A generic version of `zipPartitions` for an arbitrary number of RDDs. It
may be type unsafe
+ * and other `zipPartitions` methods should be preferred.
+ */
+ @Since("4.1.0")
+ def zipPartitions[V: ClassTag](preservesPartitioning: Boolean, rdds:
RDD[_]*)(
+ f: Seq[Iterator[_]] => Iterator[V]): RDD[V] = withScope {
+ new ZippedPartitionsRDDN(sc, sc.clean(f), this +: rdds,
preservesPartitioning)
}
+ /**
+ * A generic version of `zipPartitions` for an arbitrary number of RDDs. It
may be type unsafe
+ * and other `zipPartitions` methods should be preferred.
+ */
+ @Since("4.1.0")
+ def zipPartitions[V: ClassTag](rdds: RDD[_]*)(f: Seq[Iterator[_]] =>
Iterator[V]): RDD[V] =
+ withScope {
+ zipPartitions(preservesPartitioning = false, rdds: _*)(f)
+ }
Review Comment:
+
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]