dbtsai commented on code in PR #55017:
URL: https://github.com/apache/spark/pull/55017#discussion_r3321347259
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala:
##########
@@ -703,6 +703,12 @@ abstract class SparkStrategies extends
QueryPlanner[SparkPlan] {
}
}
+ /**
+ * Fallback strategy for cached in-memory tables when the DSv2 cache path is
disabled
+ * (spark.sql.inMemoryColumnarStorage.useDataSourceV2 = false).
Review Comment:
Fixed in da6ae80f23b — added
`spark.sql.inMemoryColumnarStorage.enableDatasourceV2` (default `true`) to
`SQLConf`. The `InMemoryScans` ScalaDoc now explicitly states it is only
reached when the config is `false`, and documents that both paths produce
`InMemoryTableScanExec` with the same column-pruning, filter-pushdown, and
sort-order semantics — the DSv2 path additionally enables DPP and per-partition
LIMIT pushdown.
---
_This comment was generated with [GitHub MCP](http://go/mcps)._
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala:
##########
@@ -53,7 +53,9 @@ trait InMemoryTableScanLike extends LeafExecNode {
case class InMemoryTableScanExec(
attributes: Seq[Attribute],
predicates: Seq[Expression],
- @transient relation: InMemoryRelation)
+ @transient relation: InMemoryRelation,
+ limit: Option[Int] = None,
+ runtimeFilters: Seq[Expression] = Nil)
Review Comment:
Fixed in da6ae80f23b — updated the pattern to `InMemoryTableScanExec(_, _,
relation, _, _)` to match the new 5-field case class.
---
_This comment was generated with [GitHub MCP](http://go/mcps)._
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryCacheTable.scala:
##########
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar
+
+import java.util
+import java.util.OptionalLong
+
+import org.apache.spark.sql.catalyst.expressions.{
+ Ascending, Attribute, AttributeReference, Descending, NullsFirst, NullsLast,
+ SortOrder => CatalystSortOrder
+}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
+import org.apache.spark.sql.catalyst.types.DataTypeUtils
+import org.apache.spark.sql.connector.catalog.{SupportsRead, Table,
TableCapability}
+import org.apache.spark.sql.connector.expressions.{
+ Expression => V2Expression, FieldReference, NamedReference,
+ NullOrdering => V2NullOrdering, SortDirection => V2SortDirection,
+ SortOrder => V2SortOrder, SortValue
+}
+import org.apache.spark.sql.connector.expressions.filter.{Predicate =>
V2Predicate}
+import org.apache.spark.sql.connector.read.{
+ Scan, ScanBuilder, Statistics => V2Statistics, SupportsPushDownLimit,
+ SupportsPushDownRequiredColumns, SupportsPushDownV2Filters,
SupportsReportOrdering,
+ SupportsReportPartitioning, SupportsReportStatistics,
SupportsRuntimeV2Filtering
+}
+import org.apache.spark.sql.connector.read.colstats.ColumnStatistics
+import org.apache.spark.sql.connector.read.partitioning.{
+ KeyGroupedPartitioning, Partitioning, UnknownPartitioning
+}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
DataSourceV2ScanRelation}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+/**
+ * A DSv2 [[Table]] wrapper around [[InMemoryRelation]], enabling
[[V2ScanRelationPushDown]]
+ * optimizer rules to apply column pruning, filter pushdown, and
ordering/statistics reporting
+ * to cached DataFrames.
+ */
+private[sql] class InMemoryCacheTable(val relation: InMemoryRelation)
+ extends Table with SupportsRead {
+
+ // Two InMemoryCacheTable instances wrapping the same CachedRDDBuilder are
equal.
+ // All InMemoryRelation copies from the same CachedData share the same
cacheBuilder by reference.
+ override def equals(other: Any): Boolean = other match {
+ case t: InMemoryCacheTable => relation.cacheBuilder eq
t.relation.cacheBuilder
+ case _ => false
+ }
+ override def hashCode(): Int = System.identityHashCode(relation.cacheBuilder)
+
+ override def name(): String = relation.cacheBuilder.cachedName
+
+ override def schema(): StructType =
DataTypeUtils.fromAttributes(relation.output)
+
+ override def capabilities(): util.Set[TableCapability] =
+ util.EnumSet.of(TableCapability.BATCH_READ)
+
+ override def newScanBuilder(options: CaseInsensitiveStringMap):
InMemoryScanBuilder =
+ new InMemoryScanBuilder(relation)
+}
+
+/**
+ * DSv2 [[ScanBuilder]] for [[InMemoryRelation]].
+ *
+ * - Column pruning via [[SupportsPushDownRequiredColumns]]: only requested
columns are
+ * passed to [[InMemoryTableScanExec]], reducing deserialization work.
+ * - Filter pushdown via [[SupportsPushDownV2Filters]]: predicates are
recorded for
+ * batch-level pruning using per-batch min/max statistics, but all
predicates are
+ * returned (category-2: still need post-scan row-level re-evaluation).
+ */
+private[sql] class InMemoryScanBuilder(relation: InMemoryRelation)
+ extends ScanBuilder
+ with SupportsPushDownRequiredColumns
+ with SupportsPushDownV2Filters
+ with SupportsPushDownLimit {
+
+ private var requiredSchema: StructType =
DataTypeUtils.fromAttributes(relation.output)
+ private var _pushedPredicates: Array[V2Predicate] = Array.empty
+ private var _pushedLimit: Option[Int] = None
+
+ override def pruneColumns(required: StructType): Unit = {
+ requiredSchema = required
+ }
+
+ /**
+ * Records predicates so Spark adds a post-scan [[FilterExec]] for row-level
evaluation.
+ * Batch-level min/max pruning is handled at physical planning:
[[DataSourceV2Strategy]]
+ * passes the Catalyst [[FilterExec]] expressions extracted by
[[PhysicalOperation]] directly
+ * to [[InMemoryTableScanExec]], which forwards them to
[[CachedBatchSerializer.buildFilter]].
+ * The V2 [[Predicate]]s stored here are not used for batch pruning.
+ */
+ override def pushPredicates(predicates: Array[V2Predicate]):
Array[V2Predicate] = {
+ _pushedPredicates = predicates
+ predicates
+ }
+
+ override def pushedPredicates(): Array[V2Predicate] = _pushedPredicates
+
+ /**
+ * Pushes a LIMIT down into the scan. Returns true to indicate the limit was
accepted.
+ * Because caching may interleave data across partitions, this is always a
partial push:
+ * Spark will still apply a LocalLimit on top to enforce the exact count.
+ */
+ override def pushLimit(limit: Int): Boolean = {
+ _pushedLimit = Some(limit)
+ true
+ }
+
+ /** Always partially pushed: Spark applies a LocalLimit on top. */
+ override def isPartiallyPushed(): Boolean = true
+
+ override def build(): InMemoryCacheScan = {
+ val requiredFieldNames = requiredSchema.fieldNames.toSet
+ val prunedAttrs =
+ if (requiredFieldNames == relation.output.map(_.name).toSet)
relation.output
+ else relation.output.filter(a => requiredFieldNames.contains(a.name))
+ new InMemoryCacheScan(relation, prunedAttrs, _pushedPredicates,
_pushedLimit)
+ }
+}
+
+/**
+ * DSv2 [[Scan]] for [[InMemoryRelation]].
+ *
+ * Physical execution is handled by [[InMemoryTableScanExec]] via
[[DataSourceV2Strategy]]
+ * rather than [[Batch]]/[[InputPartition]] to preserve the existing efficient
columnar path.
+ *
+ * Reports:
+ * - Ordering ([[SupportsReportOrdering]]): propagates the ordering of the
original cached plan
+ * so the optimizer can eliminate redundant sorts on top of the cache.
+ * - Statistics ([[SupportsReportStatistics]]): exposes accurate row count
and size from
+ * accumulated scan metrics once the cache is materialized, feeding AQE
decisions.
+ * - Partitioning ([[SupportsReportPartitioning]]): reports
[[KeyGroupedPartitioning]] when
+ * the cached plan was hash-partitioned on explicit columns, allowing the
optimizer to
+ * skip shuffles for downstream joins/aggregates on the same key.
+ * - Runtime filtering ([[SupportsRuntimeV2Filtering]]): enables Dynamic
Partition Pruning
+ * on cached scans; [[DynamicPruning]] expressions are passed via
[[InMemoryTableScanExec]]
+ * for batch-level min/max pruning.
+ */
+private[sql] class InMemoryCacheScan(
+ val relation: InMemoryRelation,
+ val prunedAttrs: Seq[Attribute],
+ val pushedPredicates: Array[V2Predicate],
+ val pushedLimit: Option[Int] = None)
+ extends Scan
+ with SupportsReportOrdering
+ with SupportsReportStatistics
+ with SupportsReportPartitioning
+ with SupportsRuntimeV2Filtering {
+
+ override def readSchema(): StructType =
DataTypeUtils.fromAttributes(prunedAttrs)
+
+ /**
+ * Converts the Catalyst sort ordering of the cached plan to V2
[[SortOrder]]s.
+ * Only attribute-reference based orderings whose column is present in
[[prunedAttrs]] are
+ * emitted; sort keys that were pruned away are dropped so that
[[V2ScanPartitioningAndOrdering]]
+ * does not attempt to resolve a column that is no longer in the scan output.
+ */
+ override def outputOrdering(): Array[V2SortOrder] = {
+ val prunedNames = prunedAttrs.map(_.name).toSet
+ relation.outputOrdering.flatMap {
+ case CatalystSortOrder(attr: AttributeReference, direction,
nullOrdering, _)
+ if prunedNames.contains(attr.name) =>
+ val v2Dir = direction match {
+ case Ascending => V2SortDirection.ASCENDING
+ case Descending => V2SortDirection.DESCENDING
+ }
+ val v2Nulls = nullOrdering match {
+ case NullsFirst => V2NullOrdering.NULLS_FIRST
+ case NullsLast => V2NullOrdering.NULLS_LAST
+ }
+ Some(SortValue(FieldReference.column(attr.name), v2Dir, v2Nulls))
+ case _ => None
+ }.toArray
+ }
+
+ /**
+ * Reports the output partitioning of the cached plan so the optimizer can
skip
+ * shuffles for downstream operations on the same partitioning key.
+ */
+ override def outputPartitioning(): Partitioning = {
+ relation.cachedPlan.outputPartitioning match {
+ case HashPartitioning(expressions, numPartitions) =>
+ val keys = expressions.collect { case a: AttributeReference =>
+ FieldReference.column(a.name).asInstanceOf[V2Expression]
+ }
+ if (keys.size == expressions.size) {
+ new KeyGroupedPartitioning(keys.toArray, numPartitions)
+ } else {
+ new UnknownPartitioning(numPartitions)
+ }
+ case other => new UnknownPartitioning(other.numPartitions)
+ }
+ }
+
+ /**
+ * Exposes hash-partitioning key columns for Dynamic Partition Pruning.
+ * Spark will inject runtime IN-list filters on these attributes when it can
+ * derive them from a broadcast side of a join.
+ */
+ override def filterAttributes(): Array[NamedReference] = {
+ relation.cachedPlan.outputPartitioning match {
+ case HashPartitioning(exprs, _) =>
+ exprs.collect { case a: AttributeReference =>
+ FieldReference.column(a.name).asInstanceOf[NamedReference]
+ }.toArray
+ case _ => Array.empty
+ }
+ }
+
+ /**
+ * No-op: runtime predicates for cached scans are handled entirely through
+ * [[InMemoryTableScanExec.runtimeFilters]], not through this interface
method.
+ * The DPP pipeline injects [[DynamicPruning]] expressions into the plan,
which
+ * [[DataSourceV2Strategy]] separates and passes as runtimeFilters to the
exec node.
+ */
+ override def filter(predicates: Array[V2Predicate]): Unit = {}
Review Comment:
Updated the ScalaDoc in da6ae80f23b to make the deviation explicit:
`filter()` is an intentional no-op because `InMemoryCacheScan` never goes
through `BatchScanExec` — `DataSourceV2Strategy` special-cases it and creates
`InMemoryTableScanExec` directly, passing `DynamicPruning` expressions via
`runtimeFilters`. We implement `SupportsRuntimeV2Filtering` solely to expose
`filterAttributes()`, which is what lets the optimizer inject DPP filters into
the plan.
---
_This comment was generated with [GitHub MCP](http://go/mcps)._
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala:
##########
@@ -151,6 +152,24 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
DataSourceV2Strategy.withProjectAndFilter(
project, filters, localScanExec, needsUnsafeConversion = false) :: Nil
+ case PhysicalOperation(project, filters,
Review Comment:
Added a comment in da6ae80f23b explaining why scalar subquery filters are
intentionally not extracted here: `InMemoryCacheScan` never goes through
`BatchScanExec`, so `scan.filter()` is never called for partition pruning.
Scalar subquery filters stay in `compiledFilters` and are applied by
`FilterExec` above the scan — the same post-scan re-evaluation path used by all
other compile-time filters.
---
_This comment was generated with [GitHub MCP](http://go/mcps)._
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]