kyle-winkelman commented on code in PR #49659:
URL: https://github.com/apache/spark/pull/49659#discussion_r1929508474
##########
core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala:
##########
@@ -308,20 +309,83 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]]
extends Serializable {
JavaPairRDD.fromRDD(rdd.zip(other.rdd)(other.classTag))(classTag,
other.classTag)
}
- /**
- * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD
by
- * applying a function to the zipped partitions. Assumes that all the RDDs
have the
- * *same number of partitions*, but does *not* require them to have the same
number
- * of elements in each partition.
+ /*
+ * Zip this RDD's partitions with one other RDD and return a new RDD by
applying a function to
+ * the zipped partitions. Assumes that both the RDDs have the *same number
of partitions*, but
+ * does *not* require them to have the same number of elements in each
partition.
*/
def zipPartitions[U, V](
other: JavaRDDLike[U, _],
f: FlatMapFunction2[JIterator[T], JIterator[U], V]): JavaRDD[V] = {
- def fn: (Iterator[T], Iterator[U]) => Iterator[V] = {
- (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).asScala
+ def fn: (Iterator[T], Iterator[U]) => Iterator[V] = { (x: Iterator[T], y:
Iterator[U]) =>
+ f.call(x.asJava, y.asJava).asScala
}
+ JavaRDD
+ .fromRDD(rdd.zipPartitions(other.rdd)(fn)(other.classTag,
fakeClassTag[V]))(fakeClassTag[V])
+ }
+
+ /**
+ * Zip this RDD's partitions with two more RDDs and return a new RDD by
applying a function to
+ * the zipped partitions. Assumes that all the RDDs have the *same number of
partitions*, but
+ * does *not* require them to have the same number of elements in each
partition.
+ */
+ @Since("4.1.0")
+ def zipPartitions[U1, U2, V](
+ other1: JavaRDDLike[U1, _],
+ other2: JavaRDDLike[U2, _],
+ f: FlatMapFunction3[JIterator[T], JIterator[U1], JIterator[U2], V]):
JavaRDD[V] = {
+ def fn: (Iterator[T], Iterator[U1], Iterator[U2]) => Iterator[V] =
+ (t: Iterator[T], u1: Iterator[U1], u2: Iterator[U2]) =>
+ f.call(t.asJava, u1.asJava, u2.asJava).asScala
+ JavaRDD.fromRDD(
Review Comment:
Is this comment in regards to the entire PR or just the changes in
JavaRDDLike? My long term goal was to add additional
[cogroup](https://github.com/apache/spark/blob/v3.5.0/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala#L808)
methods for 3, 4, and N number of KeyValueGroupedDatasets. I do not need all
the logic from this PR for that goal, but thought it was a good small step in
that direction.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]