revans2 commented on a change in pull request #29067:
URL: https://github.com/apache/spark/pull/29067#discussion_r455803871
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
##########
@@ -19,84 +19,301 @@ package org.apache.spark.sql.execution.columnar
import org.apache.commons.lang3.StringUtils
+import org.apache.spark.TaskContext
+import org.apache.spark.annotation.{DeveloperApi, Since}
+import org.apache.spark.internal.Logging
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan,
Statistics}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector,
OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.types.{AtomicType, BinaryType, StructType,
UserDefinedType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.LongAccumulator
+import org.apache.spark.util.{LongAccumulator, Utils}
+/**
+ * Basic interface that all cached batches of data must support. This is
primarily to allow
+ * for metrics to be handled outside of the encoding and decoding steps in a
standard way.
+ */
+@DeveloperApi
+@Since("3.1.0")
+trait CachedBatch {
+ def numRows: Int
+ def sizeInBytes: Long
+}
/**
- * CachedBatch is a cached batch of rows.
- *
- * @param numRows The total number of rows in this batch
- * @param buffers The buffers for serialized columns
- * @param stats The stat of columns
+ * Provides APIs for compressing, filtering, and decompressing SQL data that
will be
+ * persisted/cached.
*/
-private[columnar]
-case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats:
InternalRow)
+@DeveloperApi
+@Since("3.1.0")
+trait CachedBatchSerializer extends Serializable {
+ /**
+ * Run the given plan and convert its output to a implementation of
[[CachedBatch]].
+ * @param cachedPlan the plan to run.
+ * @return the RDD containing the batches of data to cache.
+ */
+ def convertForCache(cachedPlan: SparkPlan): RDD[CachedBatch]
+
+ /**
+ * Builds a function that can be used to filter which batches are loaded.
+ * In most cases extending [[SimpleMetricsCachedBatchSerializer]] will
provide what
+ * you need with the added expense of calculating the min and max value for
some
+ * data columns, depending on their data type. Note that this is intended to
skip batches
+ * that are not needed, and the actual filtering of individual rows is
handled later.
+ * @param predicates the set of expressions to use for filtering.
+ * @param cachedAttributes the schema/attributes of the data that is cached.
This can be helpful
+ * if you don't store it with the data.
+ * @return a function that takes the partition id and the iterator of
batches in the partition.
+ * It returns an iterator of batches that should be loaded.
+ */
+ def buildFilter(predicates: Seq[Expression],
+ cachedAttributes: Seq[Attribute]): (Int, Iterator[CachedBatch]) =>
Iterator[CachedBatch]
+
+ /**
+ * Decompress the cached data into a ColumnarBatch. This currently is only
used for basic types
+ * BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType |
DoubleType
+ * That may change in the future.
+ * @param input the cached batches that should be decompressed.
+ * @param cacheAttributes the attributes of the data in the batch.
+ * @param selectedAttributes the field that should be loaded from the data,
and the order they
+ * should appear in the output batch.
+ * @param conf the configuration for the job.
+ * @return an RDD of the input cached batches transformed into the
ColumnarBatch format.
+ */
+ def decompressColumnar(
+ input: RDD[CachedBatch],
+ cacheAttributes: Seq[Attribute],
+ selectedAttributes: Seq[Attribute],
+ conf: SQLConf): RDD[ColumnarBatch]
+
+ /**
+ * Decompress the cached batch into [[InternalRow]]. If you want this to be
performant, code
+ * generation is advised.
+ * @param input the cached batches that should be decompressed.
+ * @param cacheAttributes the attributes of the data in the batch.
+ * @param selectedAttributes the field that should be loaded from the data,
and the order they
+ * should appear in the output batch.
+ * @param conf the configuration for the job.
+ * @return RDD of the rows that were stored in the cached batches.
+ */
+ def decompressToRows(
+ input: RDD[CachedBatch],
+ cacheAttributes: Seq[Attribute],
+ selectedAttributes: Seq[Attribute],
+ conf: SQLConf): RDD[InternalRow]
+}
-case class CachedRDDBuilder(
- useCompression: Boolean,
- batchSize: Int,
- storageLevel: StorageLevel,
- @transient cachedPlan: SparkPlan,
- tableName: Option[String]) {
+/**
+ * A [[CachedBatch]] that stores some simple metrics that can be used for
filtering of batches with
+ * the [[SimpleMetricsCachedBatchSerializer]].
+ */
+@DeveloperApi
+@Since("3.1.0")
+trait SimpleMetricsCachedBatch extends CachedBatch {
Review comment:
`DefaultCachedBatch` is a `case class`, and I didn't think it was
acceptable to have a `case class` inherent from a regular `class`. If I am
wrong on that just let me know and I'll make it an abstract class.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]