cloud-fan commented on a change in pull request #29067:
URL: https://github.com/apache/spark/pull/29067#discussion_r462965356



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
##########
@@ -19,33 +19,186 @@ package org.apache.spark.sql.execution.columnar
 
 import org.apache.commons.lang3.StringUtils
 
+import org.apache.spark.TaskContext
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, 
Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer, 
SimpleMetricsCachedBatch, SimpleMetricsCachedBatchSerializer}
+import org.apache.spark.sql.execution.{ColumnarToRowTransition, InputAdapter, 
QueryExecution, SparkPlan, WholeStageCodegenExec}
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, 
OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.types.{BooleanType, ByteType, DoubleType, 
FloatType, IntegerType, LongType, ShortType, StructType, UserDefinedType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.LongAccumulator
-
+import org.apache.spark.util.{LongAccumulator, Utils}
 
 /**
- * CachedBatch is a cached batch of rows.
+ * The default implementation of CachedBatch.
  *
  * @param numRows The total number of rows in this batch
  * @param buffers The buffers for serialized columns
  * @param stats The stat of columns
  */
-private[columnar]
-case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
InternalRow)
+case class DefaultCachedBatch(numRows: Int, buffers: Array[Array[Byte]], 
stats: InternalRow)
+  extends SimpleMetricsCachedBatch
+
+/**
+ * The default implementation of CachedBatchSerializer.
+ */
+class DefaultCachedBatchSerializer extends SimpleMetricsCachedBatchSerializer {
+  override def supportsColumnarInput(schema: Seq[Attribute]): Boolean = false
+
+  override def convertColumnarBatchToCachedBatch(
+      input: RDD[ColumnarBatch],
+      schema: Seq[Attribute],
+      storageLevel: StorageLevel,
+      conf: SQLConf): RDD[CachedBatch] =
+    throw new IllegalStateException("Columnar input is not supported")
+
+  override def convertInternalRowToCachedBatch(input: RDD[InternalRow],

Review comment:
       nit:
   ```
   def f(
       para1: T,
       para2: T,
       ...
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to