Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21018#discussion_r183614108
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
    @@ -55,56 +42,39 @@ object InMemoryRelation {
     private[columnar]
     case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
InternalRow)
     
    -case class InMemoryRelation(
    -    output: Seq[Attribute],
    +case class CachedRDDBuilder(
         useCompression: Boolean,
         batchSize: Int,
         storageLevel: StorageLevel,
         @transient child: SparkPlan,
         tableName: Option[String])(
    -    @transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
    -    val sizeInBytesStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator,
    -    statsOfPlanToCache: Statistics,
    -    override val outputOrdering: Seq[SortOrder])
    -  extends logical.LeafNode with MultiInstanceRelation {
    -
    -  override protected def innerChildren: Seq[SparkPlan] = Seq(child)
    -
    -  override def doCanonicalize(): logical.LogicalPlan =
    -    copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)),
    -      storageLevel = StorageLevel.NONE,
    -      child = child.canonicalized,
    -      tableName = None)(
    -      _cachedColumnBuffers,
    -      sizeInBytesStats,
    -      statsOfPlanToCache,
    -      outputOrdering)
    -
    -  override def producedAttributes: AttributeSet = outputSet
    +    @transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = 
null) {
     
    -  @transient val partitionStatistics = new PartitionStatistics(output)
    +  val sizeInBytesStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator
     
    -  override def computeStats(): Statistics = {
    -    if (sizeInBytesStats.value == 0L) {
    -      // Underlying columnar RDD hasn't been materialized, use the stats 
from the plan to cache.
    -      // Note that we should drop the hint info here. We may cache a plan 
whose root node is a hint
    -      // node. When we lookup the cache with a semantically same plan 
without hint info, the plan
    -      // returned by cache lookup should not have hint info. If we lookup 
the cache with a
    -      // semantically same plan with a different hint info, 
`CacheManager.useCachedData` will take
    -      // care of it and retain the hint info in the lookup input plan.
    -      statsOfPlanToCache.copy(hints = HintInfo())
    -    } else {
    -      Statistics(sizeInBytes = sizeInBytesStats.value.longValue)
    +  def cachedColumnBuffers: RDD[CachedBatch] = {
    +    if (_cachedColumnBuffers == null) {
    +      synchronized {
    --- End diff --
    
    In this pr w/o `synchronized`, I found multi-thread queries wrongly built 
four RDDs for a single cache;
    ```
    val cachedDf = spark.range(1000000).selectExpr("id AS k", "id AS v").cache
    for (i <- 0 to 3) {
      val thread = new Thread {
        override def run {
          // Start a job in each thread
          val df = cachedDf.filter('k > 5).groupBy().sum("v")
          df.collect
        }
      }
      thread.start
    }
    ```
    Either way, I think we should make `_cachedColumnBuffers` private, so I 
fixed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to