revans2 commented on a change in pull request #29067:
URL: https://github.com/apache/spark/pull/29067#discussion_r457539751



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
##########
@@ -19,33 +19,165 @@ package org.apache.spark.sql.execution.columnar
 
 import org.apache.commons.lang3.StringUtils
 
+import org.apache.spark.TaskContext
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, 
Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer, 
SimpleMetricsCachedBatch, SimpleMetricsCachedBatchSerializer}
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, 
OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.types.{BooleanType, ByteType, DoubleType, 
FloatType, IntegerType, LongType, ShortType, StructType, UserDefinedType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.LongAccumulator
-
+import org.apache.spark.util.{LongAccumulator, Utils}
 
 /**
- * CachedBatch is a cached batch of rows.
+ * The default implementation of CachedBatch.
  *
  * @param numRows The total number of rows in this batch
  * @param buffers The buffers for serialized columns
  * @param stats The stat of columns
  */
-private[columnar]
-case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
InternalRow)
+case class DefaultCachedBatch(numRows: Int, buffers: Array[Array[Byte]], 
stats: InternalRow)
+    extends SimpleMetricsCachedBatch
+
+/**
+ * The default implementation of CachedBatchSerializer.
+ */
+class DefaultCachedBatchSerializer extends SimpleMetricsCachedBatchSerializer {
+  override def convertForCache(cachedPlan: SparkPlan): RDD[CachedBatch] = {
+    val batchSize = cachedPlan.conf.columnBatchSize
+    val useCompression = cachedPlan.conf.useCompression
+    convertForCacheInternal(cachedPlan, batchSize, useCompression)
+  }
+
+  def convertForCacheInternal(cachedPlan: SparkPlan,
+      batchSize: Int,
+      useCompression: Boolean): RDD[CachedBatch] = {
+    val output = cachedPlan.output
+    cachedPlan.execute().mapPartitionsInternal { rowIterator =>

Review comment:
       I added some comments.  Please let me know if you want me to remove them 
after the review.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to