HyukjinKwon commented on code in PR #49659:
URL: https://github.com/apache/spark/pull/49659#discussion_r1929905847
##########
core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala:
##########
@@ -308,20 +309,83 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]]
extends Serializable {
JavaPairRDD.fromRDD(rdd.zip(other.rdd)(other.classTag))(classTag,
other.classTag)
}
- /**
- * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD
by
- * applying a function to the zipped partitions. Assumes that all the RDDs
have the
- * *same number of partitions*, but does *not* require them to have the same
number
- * of elements in each partition.
+ /*
+ * Zip this RDD's partitions with one other RDD and return a new RDD by
applying a function to
+ * the zipped partitions. Assumes that both the RDDs have the *same number
of partitions*, but
+ * does *not* require them to have the same number of elements in each
partition.
*/
def zipPartitions[U, V](
other: JavaRDDLike[U, _],
f: FlatMapFunction2[JIterator[T], JIterator[U], V]): JavaRDD[V] = {
- def fn: (Iterator[T], Iterator[U]) => Iterator[V] = {
- (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).asScala
+ def fn: (Iterator[T], Iterator[U]) => Iterator[V] = { (x: Iterator[T], y:
Iterator[U]) =>
+ f.call(x.asJava, y.asJava).asScala
}
+ JavaRDD
+ .fromRDD(rdd.zipPartitions(other.rdd)(fn)(other.classTag,
fakeClassTag[V]))(fakeClassTag[V])
+ }
+
+ /**
+ * Zip this RDD's partitions with two more RDDs and return a new RDD by
applying a function to
+ * the zipped partitions. Assumes that all the RDDs have the *same number of
partitions*, but
+ * does *not* require them to have the same number of elements in each
partition.
+ */
+ @Since("4.1.0")
+ def zipPartitions[U1, U2, V](
+ other1: JavaRDDLike[U1, _],
+ other2: JavaRDDLike[U2, _],
+ f: FlatMapFunction3[JIterator[T], JIterator[U1], JIterator[U2], V]):
JavaRDD[V] = {
+ def fn: (Iterator[T], Iterator[U1], Iterator[U2]) => Iterator[V] =
+ (t: Iterator[T], u1: Iterator[U1], u2: Iterator[U2]) =>
+ f.call(t.asJava, u1.asJava, u2.asJava).asScala
+ JavaRDD.fromRDD(
Review Comment:
Why don't we use Dataset instead? We're promoting it over RDD API actually.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]