revans2 commented on a change in pull request #29067:
URL: https://github.com/apache/spark/pull/29067#discussion_r457539751
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
##########
@@ -19,33 +19,165 @@ package org.apache.spark.sql.execution.columnar
import org.apache.commons.lang3.StringUtils
+import org.apache.spark.TaskContext
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan,
Statistics}
import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer,
SimpleMetricsCachedBatch, SimpleMetricsCachedBatchSerializer}
import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector,
OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.types.{BooleanType, ByteType, DoubleType,
FloatType, IntegerType, LongType, ShortType, StructType, UserDefinedType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.LongAccumulator
-
+import org.apache.spark.util.{LongAccumulator, Utils}
/**
- * CachedBatch is a cached batch of rows.
+ * The default implementation of CachedBatch.
*
* @param numRows The total number of rows in this batch
* @param buffers The buffers for serialized columns
* @param stats The stat of columns
*/
-private[columnar]
-case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats:
InternalRow)
+case class DefaultCachedBatch(numRows: Int, buffers: Array[Array[Byte]],
stats: InternalRow)
+ extends SimpleMetricsCachedBatch
+
+/**
+ * The default implementation of CachedBatchSerializer.
+ */
+class DefaultCachedBatchSerializer extends SimpleMetricsCachedBatchSerializer {
+ override def convertForCache(cachedPlan: SparkPlan): RDD[CachedBatch] = {
+ val batchSize = cachedPlan.conf.columnBatchSize
+ val useCompression = cachedPlan.conf.useCompression
+ convertForCacheInternal(cachedPlan, batchSize, useCompression)
+ }
+
+ def convertForCacheInternal(cachedPlan: SparkPlan,
+ batchSize: Int,
+ useCompression: Boolean): RDD[CachedBatch] = {
+ val output = cachedPlan.output
+ cachedPlan.execute().mapPartitionsInternal { rowIterator =>
Review comment:
I added some comments. Please let me know if you want me to remove them
after the review.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]