Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21018#discussion_r183362225
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
    @@ -155,31 +125,76 @@ case class InMemoryRelation(
         cached.setName(
           tableName.map(n => s"In-memory table $n")
             .getOrElse(StringUtils.abbreviate(child.toString, 1024)))
    -    _cachedColumnBuffers = cached
    +    cached
    +  }
    +}
    +
    +object InMemoryRelation {
    +
    +  def apply(
    +      useCompression: Boolean,
    +      batchSize: Int,
    +      storageLevel: StorageLevel,
    +      child: SparkPlan,
    +      tableName: Option[String],
    +      logicalPlan: LogicalPlan): InMemoryRelation = {
    +    val cacheBuilder = CachedRDDBuilder(useCompression, batchSize, 
storageLevel, child, tableName)()
    +    new InMemoryRelation(child.output, cacheBuilder)(
    +      statsOfPlanToCache = logicalPlan.stats, outputOrdering = 
logicalPlan.outputOrdering)
    +  }
    +
    +  def apply(cacheBuilder: CachedRDDBuilder, logicalPlan: LogicalPlan): 
InMemoryRelation = {
    +    new InMemoryRelation(cacheBuilder.child.output, cacheBuilder)(
    +      statsOfPlanToCache = logicalPlan.stats, outputOrdering = 
logicalPlan.outputOrdering)
    +  }
    +}
    +
    +case class InMemoryRelation(
    +    output: Seq[Attribute],
    +    @transient cacheBuilder: CachedRDDBuilder)(
    +    statsOfPlanToCache: Statistics,
    +    override val outputOrdering: Seq[SortOrder])
    +  extends logical.LeafNode with MultiInstanceRelation {
    +
    +  override protected def innerChildren: Seq[SparkPlan] = Seq(child)
    +
    +  override def doCanonicalize(): logical.LogicalPlan =
    +    copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)),
    +      cacheBuilder)(
    +      statsOfPlanToCache,
    +      outputOrdering)
    +
    +  override def producedAttributes: AttributeSet = outputSet
    +
    +  @transient val partitionStatistics = new PartitionStatistics(output)
    +
    +  val child: SparkPlan = cacheBuilder.child
    --- End diff --
    
    Since `InMemoryTableScanExec` and the other places reference this variable, 
I kept this public. But, ya, I feel the name is a little weird. So, I renamed 
`child` to `cachedPlan`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to