revans2 commented on a change in pull request #29067:
URL: https://github.com/apache/spark/pull/29067#discussion_r456509822



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
##########
@@ -19,84 +19,301 @@ package org.apache.spark.sql.execution.columnar
 
 import org.apache.commons.lang3.StringUtils
 
+import org.apache.spark.TaskContext
+import org.apache.spark.annotation.{DeveloperApi, Since}
+import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, 
Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, 
OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.types.{AtomicType, BinaryType, StructType, 
UserDefinedType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.LongAccumulator
+import org.apache.spark.util.{LongAccumulator, Utils}
 
+/**
+ * Basic interface that all cached batches of data must support. This is 
primarily to allow
+ * for metrics to be handled outside of the encoding and decoding steps in a 
standard way.
+ */
+@DeveloperApi
+@Since("3.1.0")
+trait CachedBatch {
+  def numRows: Int
+  def sizeInBytes: Long
+}
 
 /**
- * CachedBatch is a cached batch of rows.
- *
- * @param numRows The total number of rows in this batch
- * @param buffers The buffers for serialized columns
- * @param stats The stat of columns
+ * Provides APIs for compressing, filtering, and decompressing SQL data that 
will be
+ * persisted/cached.
  */
-private[columnar]
-case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
InternalRow)
+@DeveloperApi
+@Since("3.1.0")
+trait CachedBatchSerializer extends Serializable {
+  /**
+   * Run the given plan and convert its output to a implementation of 
[[CachedBatch]].
+   * @param cachedPlan the plan to run.
+   * @return the RDD containing the batches of data to cache.
+   */
+  def convertForCache(cachedPlan: SparkPlan): RDD[CachedBatch]
+
+  /**
+   * Builds a function that can be used to filter which batches are loaded.
+   * In most cases extending [[SimpleMetricsCachedBatchSerializer]] will 
provide what
+   * you need with the added expense of calculating the min and max value for 
some

Review comment:
       That was by design.  Calculating min and max is not that complex and 
other implementations might have more efficient ways to do it, especially if 
they are also compressing the data they might be able to do it at the same 
time. I will update the comment to make it clearer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to