cloud-fan commented on a change in pull request #29067:
URL: https://github.com/apache/spark/pull/29067#discussion_r457530661



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
##########
@@ -19,33 +19,165 @@ package org.apache.spark.sql.execution.columnar
 
 import org.apache.commons.lang3.StringUtils
 
+import org.apache.spark.TaskContext
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, 
Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer, 
SimpleMetricsCachedBatch, SimpleMetricsCachedBatchSerializer}
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, 
OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.types.{BooleanType, ByteType, DoubleType, 
FloatType, IntegerType, LongType, ShortType, StructType, UserDefinedType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.LongAccumulator
-
+import org.apache.spark.util.{LongAccumulator, Utils}
 
 /**
- * CachedBatch is a cached batch of rows.
+ * The default implementation of CachedBatch.
  *
  * @param numRows The total number of rows in this batch
  * @param buffers The buffers for serialized columns
  * @param stats The stat of columns
  */
-private[columnar]
-case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
InternalRow)
+case class DefaultCachedBatch(numRows: Int, buffers: Array[Array[Byte]], 
stats: InternalRow)
+    extends SimpleMetricsCachedBatch
+
+/**
+ * The default implementation of CachedBatchSerializer.
+ */
+class DefaultCachedBatchSerializer extends SimpleMetricsCachedBatchSerializer {
+  override def convertForCache(cachedPlan: SparkPlan): RDD[CachedBatch] = {
+    val batchSize = cachedPlan.conf.columnBatchSize
+    val useCompression = cachedPlan.conf.useCompression
+    convertForCacheInternal(cachedPlan, batchSize, useCompression)
+  }
+
+  def convertForCacheInternal(cachedPlan: SparkPlan,
+      batchSize: Int,
+      useCompression: Boolean): RDD[CachedBatch] = {
+    val output = cachedPlan.output
+    cachedPlan.execute().mapPartitionsInternal { rowIterator =>

Review comment:
       it will be very helpful if you can leave some comments for code that are 
just moved around, to speed up the review.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to