Github user ConeyLiu commented on the issue: https://github.com/apache/spark/pull/17936 @srowen Sorry for the late reply. I updated the code. Because we should reduce times of the remotely fetch, the second partition should be cached in locally. There are two ways, first cached by the `TaskConsumer` which controlled by the `Execution Memory`(this methods seems #9969); Second, cached by the `BlockManager` which controlled by the `Storage Memory`. Through the experiment found that the first way gc problem is very serious. Cartesian only used in `ALS` and `UnsafeCartesianRDD`. However, the latter itself implements a `Cartesian`, you can see as follow: ``` class UnsafeCartesianRDD( left : RDD[UnsafeRow], right : RDD[UnsafeRow], numFieldsOfRight: Int, spillThreshold: Int) extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, right) { override def compute(split: Partition, context: TaskContext): Iterator[(UnsafeRow, UnsafeRow)] = { val rowArray = new ExternalAppendOnlyUnsafeRowArray(spillThreshold) val partition = split.asInstanceOf[CartesianPartition] rdd2.iterator(partition.s2, context).foreach(rowArray.add) // Create an iterator from rowArray def createIter(): Iterator[UnsafeRow] = rowArray.generateIterator() val resultIter = for (x <- rdd1.iterator(partition.s1, context); y <- createIter()) yield (x, y) CompletionIterator[(UnsafeRow, UnsafeRow), Iterator[(UnsafeRow, UnsafeRow)]]( resultIter, rowArray.clear()) } } ``` So I think there should be no other impact.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org