Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/19569#discussion_r146749535
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
---
@@ -201,35 +193,50 @@ case class InMemoryTableScanExec(
schema)
partitionFilter.initialize(index)
+ // Do partition batch pruning if enabled
+ if (inMemoryPartitionPruningEnabled) {
+ cachedBatchIterator.filter { cachedBatch =>
+ if (!partitionFilter.eval(cachedBatch.stats)) {
+ logDebug {
+ val statsString = schemaIndex.map { case (a, i) =>
+ val value = cachedBatch.stats.get(i, a.dataType)
+ s"${a.name}: $value"
+ }.mkString(", ")
+ s"Skipping partition based on stats $statsString"
+ }
+ false
+ } else {
+ true
+ }
+ }
+ } else {
+ cachedBatchIterator
+ }
+ }
+ }
+
+ protected override def doExecute(): RDD[InternalRow] = {
+ val numOutputRows = longMetric("numOutputRows")
+
+ if (enableAccumulators) {
+ readPartitions.setValue(0)
+ readBatches.setValue(0)
+ }
+
+ // Using these variables here to avoid serialization of entire objects
(if referenced directly)
+ // within the map Partitions closure.
+ val relOutput: AttributeSeq = relation.output
+ val buffers = filteredCachedBatches()
--- End diff --
Removed. Thanks.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]