Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9969#discussion_r46179499
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
 ---
    @@ -17,16 +17,75 @@
     
     package org.apache.spark.sql.execution.joins
     
    -import org.apache.spark.rdd.RDD
    +import org.apache.spark._
    +import org.apache.spark.rdd.{CartesianPartition, CartesianRDD, RDD}
     import org.apache.spark.sql.catalyst.InternalRow
    -import org.apache.spark.sql.catalyst.expressions.{Attribute, JoinedRow}
    -import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
    +import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
     import org.apache.spark.sql.execution.metric.SQLMetrics
    +import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
    +import org.apache.spark.util.CompletionIterator
    +import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
    +
    +
    +/**
    +  * An optimized CartesianRDD for UnsafeRow, which will cache the rows 
from second child RDD,
    +  * will be much faster than building the right partition for every row in 
left RDD, it also
    +  * materialize the right RDD (in case of the right RDD is 
nondeterministic).
    +  */
    +private[spark]
    +class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], 
numFieldsOfRight: Int)
    +  extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, 
right) {
    +
    +  override def compute(split: Partition, context: TaskContext): 
Iterator[(UnsafeRow, UnsafeRow)] = {
    +    // We will not sort the rows, so prefixComparator and recordComparator 
are null.
    +    val sorter = UnsafeExternalSorter.create(
    +      context.taskMemoryManager(),
    +      SparkEnv.get.blockManager,
    +      context,
    +      null,
    +      null,
    +      1024,
    +      SparkEnv.get.memoryManager.pageSizeBytes)
    +
    +    val partition = split.asInstanceOf[CartesianPartition]
    +    for (y <- rdd2.iterator(partition.s2, context)) {
    +      sorter.insertRecord(y.getBaseObject, y.getBaseOffset, 
y.getSizeInBytes, 0)
    +    }
    +
    +    // Create an iterator from sorter and wrapper it as Iterator[UnsafeRow]
    +    def createIter(): Iterator[UnsafeRow] = {
    --- End diff --
    
    Does it make sense to move this into sorter? Seems other calls might want 
this.
    
    Something like UnsafeRowSorter.unsafeRowIterator().


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to