Github user ConeyLiu commented on the issue:
https://github.com/apache/spark/pull/17936
@srowen Sorry for the late reply. I updated the code. Because we should
reduce times of the remotely fetch, the second partition should be cached in
locally. There are two ways, first cached by the `TaskConsumer` which
controlled by the `Execution Memory`(this methods seems #9969); Second, cached
by the `BlockManager` which controlled by the `Storage Memory`. Through the
experiment found that the first way gc problem is very serious.
Cartesian only used in `ALS` and `UnsafeCartesianRDD`. However, the latter
itself implements a `Cartesian`, you can see as follow:
```
class UnsafeCartesianRDD(
left : RDD[UnsafeRow],
right : RDD[UnsafeRow],
numFieldsOfRight: Int,
spillThreshold: Int)
extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left,
right) {
override def compute(split: Partition, context: TaskContext):
Iterator[(UnsafeRow, UnsafeRow)] = {
val rowArray = new ExternalAppendOnlyUnsafeRowArray(spillThreshold)
val partition = split.asInstanceOf[CartesianPartition]
rdd2.iterator(partition.s2, context).foreach(rowArray.add)
// Create an iterator from rowArray
def createIter(): Iterator[UnsafeRow] = rowArray.generateIterator()
val resultIter =
for (x <- rdd1.iterator(partition.s1, context);
y <- createIter()) yield (x, y)
CompletionIterator[(UnsafeRow, UnsafeRow), Iterator[(UnsafeRow,
UnsafeRow)]](
resultIter, rowArray.clear())
}
}
```
So I think there should be no other impact.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]