[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource

2020-01-14 Thread GitBox
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] 
Support filters pushdown in CSV datasource
URL: https://github.com/apache/spark/pull/26973#discussion_r366200543
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVFilters.scala
 ##
 @@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.csv
+
+import scala.util.Try
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources
+import org.apache.spark.sql.types.{BooleanType, StructType}
+
+/**
+ * An instance of the class compiles filters to predicates and allows to
+ * apply the predicates to an internal row with partially initialized values
+ * converted from parsed CSV fields.
+ *
+ * @param filters The filters pushed down to CSV datasource.
+ * @param dataSchema The full schema with all fields in CSV files.
+ * @param requiredSchema The schema with only fields requested by the upper 
layer.
+ * @param columnPruning true if CSV parser can read sub-set of columns 
otherwise false.
+ */
+class CSVFilters(
+filters: Seq[sources.Filter],
+dataSchema: StructType,
+requiredSchema: StructType,
+columnPruning: Boolean) {
+  require(checkFilters(), "All filters must be applicable to the data schema.")
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource

2020-01-14 Thread GitBox
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] 
Support filters pushdown in CSV datasource
URL: https://github.com/apache/spark/pull/26973#discussion_r366200437
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 ##
 @@ -39,27 +40,49 @@ import org.apache.spark.unsafe.types.UTF8String
  * @param requiredSchema The schema of the data that should be output for each 
row. This should be a
  *   subset of the columns in dataSchema.
  * @param options Configuration options for a CSV parser.
+ * @param filters The pushdown filters that should be applied to converted 
values.
  */
 class UnivocityParser(
 dataSchema: StructType,
 requiredSchema: StructType,
-val options: CSVOptions) extends Logging {
+val options: CSVOptions,
+filters: Seq[Filter]) extends Logging {
   require(requiredSchema.toSet.subsetOf(dataSchema.toSet),
 s"requiredSchema (${requiredSchema.catalogString}) should be the subset of 
" +
   s"dataSchema (${dataSchema.catalogString}).")
 
+  def this(dataSchema: StructType, requiredSchema: StructType, options: 
CSVOptions) = {
+this(dataSchema, requiredSchema, options, Seq.empty)
+  }
   def this(schema: StructType, options: CSVOptions) = this(schema, schema, 
options)
 
   // A `ValueConverter` is responsible for converting the given value to a 
desired type.
   private type ValueConverter = String => Any
 
+  private val csvFilters = new CSVFilters(
+filters,
+dataSchema,
+requiredSchema,
+options.columnPruning)
+
+  private[sql] val parsedSchema = csvFilters.readSchema
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource

2020-01-14 Thread GitBox
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] 
Support filters pushdown in CSV datasource
URL: https://github.com/apache/spark/pull/26973#discussion_r366197607
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVFilters.scala
 ##
 @@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.csv
+
+import scala.util.Try
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources
+import org.apache.spark.sql.types.{BooleanType, StructType}
+
+/**
+ * An instance of the class compiles filters to predicates and allows to
+ * apply the predicates to an internal row with partially initialized values
+ * converted from parsed CSV fields.
+ *
+ * @param filters The filters pushed down to CSV datasource.
+ * @param dataSchema The full schema with all fields in CSV files.
+ * @param requiredSchema The schema with only fields requested by the upper 
layer.
+ * @param columnPruning true if CSV parser can read sub-set of columns 
otherwise false.
+ */
+class CSVFilters(
+filters: Seq[sources.Filter],
+dataSchema: StructType,
+requiredSchema: StructType,
+columnPruning: Boolean) {
+  require(checkFilters(), "All filters must be applicable to the data schema.")
+
+  /**
+   * The schema to read from the underlying CSV parser.
+   * It combines the required schema and the fields referenced by filters.
+   */
+  val readSchema: StructType = {
+if (columnPruning) {
+  val refs = filters.flatMap(_.references).toSet
+  val readFields = dataSchema.filter { field =>
+requiredSchema.contains(field) || refs.contains(field.name)
+  }
+  StructType(readFields)
+} else {
+  dataSchema
+}
+  }
+
+  /**
+   * Converted filters to predicates and grouped by maximum field index
+   * in the read schema. For example, if an filter refers to 2 attributes
+   * attrA with field index 5 and attrB with field index 10 in the read schema:
+   *   0 === $"attrA" or $"attrB" < 100
+   * the filter is compiled to a predicate, and placed to the `predicates`
+   * array at the position 10. In this way, if there is a row with initialized
+   * fields from the 0 to 10 index, the predicate can be applied to the row
+   * to check that the row should be skipped or not.
+   * Multiple predicates with the same maximum reference index are combined
+   * by the `And` expression.
+   */
+  private val predicates: Array[BasePredicate] = {
+val len = readSchema.fields.length
+val groupedPredicates = Array.fill[BasePredicate](len)(null)
+if (SQLConf.get.csvFilterPushDown) {
+  val groupedExprs = Array.fill(len)(Seq.empty[Expression])
+  for (filter <- filters) {
+val expr = CSVFilters.filterToExpression(filter, toRef)
+val refs = filter.references
+if (refs.isEmpty) {
+  // For example, AlwaysTrue and AlwaysFalse doesn't have any 
references
+  for (i <- 0 until len) {
+groupedExprs(i) ++= expr
 
 Review comment:
   I have implemented the first approach in the commit: 
https://github.com/apache/spark/pull/26973/commits/c03ae069d738c6aa526cc1a1216d079bc8b5ec3e


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource

2020-01-13 Thread GitBox
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] 
Support filters pushdown in CSV datasource
URL: https://github.com/apache/spark/pull/26973#discussion_r366189514
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 ##
 @@ -39,27 +40,49 @@ import org.apache.spark.unsafe.types.UTF8String
  * @param requiredSchema The schema of the data that should be output for each 
row. This should be a
  *   subset of the columns in dataSchema.
  * @param options Configuration options for a CSV parser.
+ * @param filters The pushdown filters that should be applied to converted 
values.
  */
 class UnivocityParser(
 dataSchema: StructType,
 requiredSchema: StructType,
-val options: CSVOptions) extends Logging {
+val options: CSVOptions,
+filters: Seq[Filter]) extends Logging {
   require(requiredSchema.toSet.subsetOf(dataSchema.toSet),
 s"requiredSchema (${requiredSchema.catalogString}) should be the subset of 
" +
   s"dataSchema (${dataSchema.catalogString}).")
 
+  def this(dataSchema: StructType, requiredSchema: StructType, options: 
CSVOptions) = {
+this(dataSchema, requiredSchema, options, Seq.empty)
+  }
   def this(schema: StructType, options: CSVOptions) = this(schema, schema, 
options)
 
   // A `ValueConverter` is responsible for converting the given value to a 
desired type.
   private type ValueConverter = String => Any
 
+  private val csvFilters = new CSVFilters(
+filters,
+dataSchema,
+requiredSchema,
+options.columnPruning)
+
+  private[sql] val parsedSchema = csvFilters.readSchema
+
+  // Mapping of field indexes of `parsedSchema` to indexes of `requiredSchema`.
+  // It returns -1 if `requiredSchema` doesn't contain a field from 
`parsedSchema`.
 
 Review comment:
   If `requiredSchema` always contains filter references, it is significant 
assumption, and it can simplify this implementation slightly. Is it just 
specific of current implementation or `requiredSchema` could contain **really 
required** column in the future? because filter columns are not actually 
required if the filter is applied only once in a datasource.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource

2020-01-13 Thread GitBox
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] 
Support filters pushdown in CSV datasource
URL: https://github.com/apache/spark/pull/26973#discussion_r366177956
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 ##
 @@ -39,27 +40,49 @@ import org.apache.spark.unsafe.types.UTF8String
  * @param requiredSchema The schema of the data that should be output for each 
row. This should be a
  *   subset of the columns in dataSchema.
  * @param options Configuration options for a CSV parser.
+ * @param filters The pushdown filters that should be applied to converted 
values.
  */
 class UnivocityParser(
 dataSchema: StructType,
 requiredSchema: StructType,
-val options: CSVOptions) extends Logging {
+val options: CSVOptions,
+filters: Seq[Filter]) extends Logging {
   require(requiredSchema.toSet.subsetOf(dataSchema.toSet),
 s"requiredSchema (${requiredSchema.catalogString}) should be the subset of 
" +
   s"dataSchema (${dataSchema.catalogString}).")
 
+  def this(dataSchema: StructType, requiredSchema: StructType, options: 
CSVOptions) = {
+this(dataSchema, requiredSchema, options, Seq.empty)
+  }
   def this(schema: StructType, options: CSVOptions) = this(schema, schema, 
options)
 
   // A `ValueConverter` is responsible for converting the given value to a 
desired type.
   private type ValueConverter = String => Any
 
+  private val csvFilters = new CSVFilters(
+filters,
+dataSchema,
+requiredSchema,
+options.columnPruning)
+
+  private[sql] val parsedSchema = csvFilters.readSchema
 
 Review comment:
   Just for the context, originally I made it private but had to make it more 
open because it is used in other places in `sql`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource

2020-01-13 Thread GitBox
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] 
Support filters pushdown in CSV datasource
URL: https://github.com/apache/spark/pull/26973#discussion_r366177325
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 ##
 @@ -39,27 +40,49 @@ import org.apache.spark.unsafe.types.UTF8String
  * @param requiredSchema The schema of the data that should be output for each 
row. This should be a
  *   subset of the columns in dataSchema.
  * @param options Configuration options for a CSV parser.
+ * @param filters The pushdown filters that should be applied to converted 
values.
  */
 class UnivocityParser(
 dataSchema: StructType,
 requiredSchema: StructType,
-val options: CSVOptions) extends Logging {
+val options: CSVOptions,
+filters: Seq[Filter]) extends Logging {
   require(requiredSchema.toSet.subsetOf(dataSchema.toSet),
 s"requiredSchema (${requiredSchema.catalogString}) should be the subset of 
" +
   s"dataSchema (${dataSchema.catalogString}).")
 
+  def this(dataSchema: StructType, requiredSchema: StructType, options: 
CSVOptions) = {
+this(dataSchema, requiredSchema, options, Seq.empty)
+  }
   def this(schema: StructType, options: CSVOptions) = this(schema, schema, 
options)
 
   // A `ValueConverter` is responsible for converting the given value to a 
desired type.
   private type ValueConverter = String => Any
 
+  private val csvFilters = new CSVFilters(
+filters,
+dataSchema,
+requiredSchema,
+options.columnPruning)
+
+  private[sql] val parsedSchema = csvFilters.readSchema
 
 Review comment:
   Do you propose just `private`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource

2020-01-13 Thread GitBox
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] 
Support filters pushdown in CSV datasource
URL: https://github.com/apache/spark/pull/26973#discussion_r366177325
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 ##
 @@ -39,27 +40,49 @@ import org.apache.spark.unsafe.types.UTF8String
  * @param requiredSchema The schema of the data that should be output for each 
row. This should be a
  *   subset of the columns in dataSchema.
  * @param options Configuration options for a CSV parser.
+ * @param filters The pushdown filters that should be applied to converted 
values.
  */
 class UnivocityParser(
 dataSchema: StructType,
 requiredSchema: StructType,
-val options: CSVOptions) extends Logging {
+val options: CSVOptions,
+filters: Seq[Filter]) extends Logging {
   require(requiredSchema.toSet.subsetOf(dataSchema.toSet),
 s"requiredSchema (${requiredSchema.catalogString}) should be the subset of 
" +
   s"dataSchema (${dataSchema.catalogString}).")
 
+  def this(dataSchema: StructType, requiredSchema: StructType, options: 
CSVOptions) = {
+this(dataSchema, requiredSchema, options, Seq.empty)
+  }
   def this(schema: StructType, options: CSVOptions) = this(schema, schema, 
options)
 
   // A `ValueConverter` is responsible for converting the given value to a 
desired type.
   private type ValueConverter = String => Any
 
+  private val csvFilters = new CSVFilters(
+filters,
+dataSchema,
+requiredSchema,
+options.columnPruning)
+
+  private[sql] val parsedSchema = csvFilters.readSchema
 
 Review comment:
   Do you propose just `private`? or public?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource

2020-01-13 Thread GitBox
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] 
Support filters pushdown in CSV datasource
URL: https://github.com/apache/spark/pull/26973#discussion_r366176306
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVFilters.scala
 ##
 @@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.csv
+
+import scala.util.Try
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources
+import org.apache.spark.sql.types.{BooleanType, StructType}
+
+/**
+ * An instance of the class compiles filters to predicates and allows to
+ * apply the predicates to an internal row with partially initialized values
+ * converted from parsed CSV fields.
+ *
+ * @param filters The filters pushed down to CSV datasource.
+ * @param dataSchema The full schema with all fields in CSV files.
+ * @param requiredSchema The schema with only fields requested by the upper 
layer.
+ * @param columnPruning true if CSV parser can read sub-set of columns 
otherwise false.
+ */
+class CSVFilters(
+filters: Seq[sources.Filter],
+dataSchema: StructType,
+requiredSchema: StructType,
+columnPruning: Boolean) {
+  require(checkFilters(), "All filters must be applicable to the data schema.")
 
 Review comment:
   Initially the function was slightly complex, see 
https://github.com/apache/spark/pull/26973/commits/f0aa0a88bfa0c87007f8781ba7fac8f9cd3057ba#diff-44a98c4a53980cb04e57f0489b257a37L126
 . That's why I extracted it to a separate method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource

2020-01-13 Thread GitBox
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] 
Support filters pushdown in CSV datasource
URL: https://github.com/apache/spark/pull/26973#discussion_r366175302
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 ##
 @@ -204,15 +234,15 @@ class UnivocityParser(
* Parses a single CSV string and turns it into either one resulting row or 
no row (if the
* the record is malformed).
*/
-  def parse(input: String): InternalRow = doParse(input)
+  def parse(input: String): Seq[InternalRow] = doParse(input)
 
   private val getToken = if (options.columnPruning) {
 (tokens: Array[String], index: Int) => tokens(index)
   } else {
 (tokens: Array[String], index: Int) => tokens(tokenIndexArr(index))
   }
 
-  private def convert(tokens: Array[String]): InternalRow = {
+  private def convert(tokens: Array[String]): Seq[InternalRow] = {
 
 Review comment:
   We can do that but modification of `FailureSafeParser` is slightly 
orthogonal to the purpose of the PR, and it is not necessary for this changes. 
Can we do that separately? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource

2020-01-13 Thread GitBox
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] 
Support filters pushdown in CSV datasource
URL: https://github.com/apache/spark/pull/26973#discussion_r366174594
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 ##
 @@ -39,27 +40,49 @@ import org.apache.spark.unsafe.types.UTF8String
  * @param requiredSchema The schema of the data that should be output for each 
row. This should be a
  *   subset of the columns in dataSchema.
  * @param options Configuration options for a CSV parser.
+ * @param filters The pushdown filters that should be applied to converted 
values.
  */
 class UnivocityParser(
 dataSchema: StructType,
 requiredSchema: StructType,
-val options: CSVOptions) extends Logging {
+val options: CSVOptions,
+filters: Seq[Filter]) extends Logging {
   require(requiredSchema.toSet.subsetOf(dataSchema.toSet),
 s"requiredSchema (${requiredSchema.catalogString}) should be the subset of 
" +
   s"dataSchema (${dataSchema.catalogString}).")
 
+  def this(dataSchema: StructType, requiredSchema: StructType, options: 
CSVOptions) = {
+this(dataSchema, requiredSchema, options, Seq.empty)
+  }
   def this(schema: StructType, options: CSVOptions) = this(schema, schema, 
options)
 
   // A `ValueConverter` is responsible for converting the given value to a 
desired type.
   private type ValueConverter = String => Any
 
+  private val csvFilters = new CSVFilters(
+filters,
+dataSchema,
+requiredSchema,
+options.columnPruning)
+
+  private[sql] val parsedSchema = csvFilters.readSchema
+
+  // Mapping of field indexes of `parsedSchema` to indexes of `requiredSchema`.
+  // It returns -1 if `requiredSchema` doesn't contain a field from 
`parsedSchema`.
 
 Review comment:
   What should the upper layer do with the column if a datasource already 
applied filters? As far as I know filters are applied only once in DSv2, 
@cloud-fan or I am wrong?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource

2020-01-13 Thread GitBox
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] 
Support filters pushdown in CSV datasource
URL: https://github.com/apache/spark/pull/26973#discussion_r365789628
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 ##
 @@ -2204,4 +2204,37 @@ class CSVSuite extends QueryTest with 
SharedSparkSession with TestCsvData {
   checkAnswer(resultDF, Row("a", 2, "e", "c"))
 }
   }
+
+  test("filters push down") {
+Seq(true, false).foreach { multiLine =>
 
 Review comment:
   And this is some kind of end-to-end test, I would test as much as possible 
options and modes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource

2020-01-13 Thread GitBox
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] 
Support filters pushdown in CSV datasource
URL: https://github.com/apache/spark/pull/26973#discussion_r365786665
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 ##
 @@ -2204,4 +2204,37 @@ class CSVSuite extends QueryTest with 
SharedSparkSession with TestCsvData {
   checkAnswer(resultDF, Row("a", 2, "e", "c"))
 }
   }
+
+  test("filters push down") {
+Seq(true, false).foreach { multiLine =>
 
 Review comment:
   Lines in CSV cannot be spitted, so, input should be the same. The difference 
is how do we read the file - as whole or by lines. But you are right, there 
should be not difference for the changes since I touched only value conversions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource

2020-01-13 Thread GitBox
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] 
Support filters pushdown in CSV datasource
URL: https://github.com/apache/spark/pull/26973#discussion_r365704789
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 ##
 @@ -242,24 +272,48 @@ class UnivocityParser(
 new RuntimeException("Malformed CSV record"))
 } else {
   // When the length of the returned tokens is identical to the length of 
the parsed schema,
-  // we just need to convert the tokens that correspond to the required 
columns.
-  var badRecordException: Option[Throwable] = None
+  // we just need to:
+  //  1. Convert the tokens that correspond to the parsed schema.
+  //  2. Apply the pushdown filters to `parsedRow`.
+  //  3. Convert `parsedRow` to `requiredRow` by stripping non-required 
fields.
   var i = 0
+  val requiredSingleRow = requiredRow.head
   while (i < requiredSchema.length) {
+requiredSingleRow.setNullAt(i)
+i += 1
+  }
+
+  var skipValueConversion = false
+  var badRecordException: Option[Throwable] = None
+  i = 0
+  while (!skipValueConversion && i < parsedSchema.length) {
 try {
-  row(i) = valueConverters(i).apply(getToken(tokens, i))
+  val convertedValue = valueConverters(i).apply(getToken(tokens, i))
+  parsedRow(i) = convertedValue
+  if (csvFilters.skipRow(parsedRow, i)) {
+skipValueConversion = true
+  } else {
+val requiredIndex = parsedToRequiredIndex(i)
+if (requiredIndex != -1) {
+  requiredSingleRow(requiredIndex) = convertedValue
+}
+  }
 } catch {
   case NonFatal(e) =>
-badRecordException = badRecordException.orElse(Some(e))
-row.setNullAt(i)
+badRecordException = Some(e)
+skipValueConversion = true
 }
 i += 1
   }
-
-  if (badRecordException.isEmpty) {
-row
+  if (skipValueConversion) {
+if (badRecordException.isDefined) {
 
 Review comment:
   Look at 
https://github.com/apache/spark/pull/26973/files#diff-c82e4b74d2a51fed29069745ce4f9e96R303-R304
 . The lines 303 and 304 above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource

2020-01-13 Thread GitBox
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] 
Support filters pushdown in CSV datasource
URL: https://github.com/apache/spark/pull/26973#discussion_r365704789
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 ##
 @@ -242,24 +272,48 @@ class UnivocityParser(
 new RuntimeException("Malformed CSV record"))
 } else {
   // When the length of the returned tokens is identical to the length of 
the parsed schema,
-  // we just need to convert the tokens that correspond to the required 
columns.
-  var badRecordException: Option[Throwable] = None
+  // we just need to:
+  //  1. Convert the tokens that correspond to the parsed schema.
+  //  2. Apply the pushdown filters to `parsedRow`.
+  //  3. Convert `parsedRow` to `requiredRow` by stripping non-required 
fields.
   var i = 0
+  val requiredSingleRow = requiredRow.head
   while (i < requiredSchema.length) {
+requiredSingleRow.setNullAt(i)
+i += 1
+  }
+
+  var skipValueConversion = false
+  var badRecordException: Option[Throwable] = None
+  i = 0
+  while (!skipValueConversion && i < parsedSchema.length) {
 try {
-  row(i) = valueConverters(i).apply(getToken(tokens, i))
+  val convertedValue = valueConverters(i).apply(getToken(tokens, i))
+  parsedRow(i) = convertedValue
+  if (csvFilters.skipRow(parsedRow, i)) {
+skipValueConversion = true
+  } else {
+val requiredIndex = parsedToRequiredIndex(i)
+if (requiredIndex != -1) {
+  requiredSingleRow(requiredIndex) = convertedValue
+}
+  }
 } catch {
   case NonFatal(e) =>
-badRecordException = badRecordException.orElse(Some(e))
-row.setNullAt(i)
+badRecordException = Some(e)
+skipValueConversion = true
 }
 i += 1
   }
-
-  if (badRecordException.isEmpty) {
-row
+  if (skipValueConversion) {
+if (badRecordException.isDefined) {
 
 Review comment:
   Look at 
https://github.com/apache/spark/pull/26973/files#diff-c82e4b74d2a51fed29069745ce4f9e96R303-R304


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource

2020-01-13 Thread GitBox
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] 
Support filters pushdown in CSV datasource
URL: https://github.com/apache/spark/pull/26973#discussion_r365703327
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 ##
 @@ -39,27 +40,49 @@ import org.apache.spark.unsafe.types.UTF8String
  * @param requiredSchema The schema of the data that should be output for each 
row. This should be a
  *   subset of the columns in dataSchema.
  * @param options Configuration options for a CSV parser.
+ * @param filters The pushdown filters that should be applied to converted 
values.
  */
 class UnivocityParser(
 dataSchema: StructType,
 requiredSchema: StructType,
-val options: CSVOptions) extends Logging {
+val options: CSVOptions,
+filters: Seq[Filter]) extends Logging {
   require(requiredSchema.toSet.subsetOf(dataSchema.toSet),
 s"requiredSchema (${requiredSchema.catalogString}) should be the subset of 
" +
   s"dataSchema (${dataSchema.catalogString}).")
 
+  def this(dataSchema: StructType, requiredSchema: StructType, options: 
CSVOptions) = {
+this(dataSchema, requiredSchema, options, Seq.empty)
+  }
   def this(schema: StructType, options: CSVOptions) = this(schema, schema, 
options)
 
   // A `ValueConverter` is responsible for converting the given value to a 
desired type.
   private type ValueConverter = String => Any
 
+  private val csvFilters = new CSVFilters(
+filters,
+dataSchema,
+requiredSchema,
+options.columnPruning)
+
+  private[sql] val parsedSchema = csvFilters.readSchema
+
+  // Mapping of field indexes of `parsedSchema` to indexes of `requiredSchema`.
+  // It returns -1 if `requiredSchema` doesn't contain a field from 
`parsedSchema`.
 
 Review comment:
   For example:
   dataSchema: `i INTEGER, d DOUBLE, s STRING`
   requiredSchema: `i INTEGER`
   filters: `Seq(LessThan("d", 10))`
   parsedSchema: `i INTEGER, d DOUBLE`
   In that case, `requiredSchema` doesn't contain a field from `parsedSchema`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource

2020-01-12 Thread GitBox
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] 
Support filters pushdown in CSV datasource
URL: https://github.com/apache/spark/pull/26973#discussion_r365667390
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVFilters.scala
 ##
 @@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.csv
+
+import scala.util.Try
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources
+import org.apache.spark.sql.types.{BooleanType, StructType}
+
+/**
+ * An instance of the class compiles filters to predicates and allows to
+ * apply the predicates to an internal row with partially initialized values
+ * converted from parsed CSV fields.
+ *
+ * @param filters The filters pushed down to CSV datasource.
+ * @param dataSchema The full schema with all fields in CSV files.
+ * @param requiredSchema The schema with only fields requested by the upper 
layer.
+ * @param columnPruning true if CSV parser can read sub-set of columns 
otherwise false.
+ */
+class CSVFilters(
+filters: Seq[sources.Filter],
+dataSchema: StructType,
+requiredSchema: StructType,
+columnPruning: Boolean) {
+  require(checkFilters(), "All filters must be applicable to the data schema.")
+
+  /**
+   * The schema to read from the underlying CSV parser.
+   * It combines the required schema and the fields referenced by filters.
+   */
+  val readSchema: StructType = {
+if (columnPruning) {
+  val refs = filters.flatMap(_.references).toSet
+  val readFields = dataSchema.filter { field =>
+requiredSchema.contains(field) || refs.contains(field.name)
+  }
+  StructType(readFields)
+} else {
+  dataSchema
+}
+  }
+
+  /**
+   * Converted filters to predicates and grouped by maximum field index
+   * in the read schema. For example, if an filter refers to 2 attributes
+   * attrA with field index 5 and attrB with field index 10 in the read schema:
+   *   0 === $"attrA" or $"attrB" < 100
+   * the filter is compiled to a predicate, and placed to the `predicates`
+   * array at the position 10. In this way, if there is a row with initialized
+   * fields from the 0 to 10 index, the predicate can be applied to the row
+   * to check that the row should be skipped or not.
+   * Multiple predicates with the same maximum reference index are combined
+   * by the `And` expression.
+   */
+  private val predicates: Array[BasePredicate] = {
+val len = readSchema.fields.length
+val groupedPredicates = Array.fill[BasePredicate](len)(null)
+if (SQLConf.get.csvFilterPushDown) {
+  val groupedExprs = Array.fill(len)(Seq.empty[Expression])
+  for (filter <- filters) {
+val expr = CSVFilters.filterToExpression(filter, toRef)
+val refs = filter.references
+if (refs.isEmpty) {
+  // For example, AlwaysTrue and AlwaysFalse doesn't have any 
references
+  for (i <- 0 until len) {
+groupedExprs(i) ++= expr
 
 Review comment:
   Even more, `AlwaysTrue` could be removed because it does not impact on the 
result. `AlwaysFalse` could be put at index 0, and other filters can be ignored.
   
   But this is some kind of ad-hoc optimization. The optimization above can 
work for other literal filters.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource

2020-01-12 Thread GitBox
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] 
Support filters pushdown in CSV datasource
URL: https://github.com/apache/spark/pull/26973#discussion_r365667390
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVFilters.scala
 ##
 @@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.csv
+
+import scala.util.Try
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources
+import org.apache.spark.sql.types.{BooleanType, StructType}
+
+/**
+ * An instance of the class compiles filters to predicates and allows to
+ * apply the predicates to an internal row with partially initialized values
+ * converted from parsed CSV fields.
+ *
+ * @param filters The filters pushed down to CSV datasource.
+ * @param dataSchema The full schema with all fields in CSV files.
+ * @param requiredSchema The schema with only fields requested by the upper 
layer.
+ * @param columnPruning true if CSV parser can read sub-set of columns 
otherwise false.
+ */
+class CSVFilters(
+filters: Seq[sources.Filter],
+dataSchema: StructType,
+requiredSchema: StructType,
+columnPruning: Boolean) {
+  require(checkFilters(), "All filters must be applicable to the data schema.")
+
+  /**
+   * The schema to read from the underlying CSV parser.
+   * It combines the required schema and the fields referenced by filters.
+   */
+  val readSchema: StructType = {
+if (columnPruning) {
+  val refs = filters.flatMap(_.references).toSet
+  val readFields = dataSchema.filter { field =>
+requiredSchema.contains(field) || refs.contains(field.name)
+  }
+  StructType(readFields)
+} else {
+  dataSchema
+}
+  }
+
+  /**
+   * Converted filters to predicates and grouped by maximum field index
+   * in the read schema. For example, if an filter refers to 2 attributes
+   * attrA with field index 5 and attrB with field index 10 in the read schema:
+   *   0 === $"attrA" or $"attrB" < 100
+   * the filter is compiled to a predicate, and placed to the `predicates`
+   * array at the position 10. In this way, if there is a row with initialized
+   * fields from the 0 to 10 index, the predicate can be applied to the row
+   * to check that the row should be skipped or not.
+   * Multiple predicates with the same maximum reference index are combined
+   * by the `And` expression.
+   */
+  private val predicates: Array[BasePredicate] = {
+val len = readSchema.fields.length
+val groupedPredicates = Array.fill[BasePredicate](len)(null)
+if (SQLConf.get.csvFilterPushDown) {
+  val groupedExprs = Array.fill(len)(Seq.empty[Expression])
+  for (filter <- filters) {
+val expr = CSVFilters.filterToExpression(filter, toRef)
+val refs = filter.references
+if (refs.isEmpty) {
+  // For example, AlwaysTrue and AlwaysFalse doesn't have any 
references
+  for (i <- 0 until len) {
+groupedExprs(i) ++= expr
 
 Review comment:
   Even more, `AlwaysTrue` could be removed because it does not impact on the 
result. `AlwaysFalse` could be put at index 0, and other filters can be ignored.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource

2020-01-12 Thread GitBox
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] 
Support filters pushdown in CSV datasource
URL: https://github.com/apache/spark/pull/26973#discussion_r365666768
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVFilters.scala
 ##
 @@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.csv
+
+import scala.util.Try
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources
+import org.apache.spark.sql.types.{BooleanType, StructType}
+
+/**
+ * An instance of the class compiles filters to predicates and allows to
+ * apply the predicates to an internal row with partially initialized values
+ * converted from parsed CSV fields.
+ *
+ * @param filters The filters pushed down to CSV datasource.
+ * @param dataSchema The full schema with all fields in CSV files.
+ * @param requiredSchema The schema with only fields requested by the upper 
layer.
+ * @param columnPruning true if CSV parser can read sub-set of columns 
otherwise false.
+ */
+class CSVFilters(
+filters: Seq[sources.Filter],
+dataSchema: StructType,
+requiredSchema: StructType,
+columnPruning: Boolean) {
+  require(checkFilters(), "All filters must be applicable to the data schema.")
+
+  /**
+   * The schema to read from the underlying CSV parser.
+   * It combines the required schema and the fields referenced by filters.
+   */
+  val readSchema: StructType = {
+if (columnPruning) {
+  val refs = filters.flatMap(_.references).toSet
+  val readFields = dataSchema.filter { field =>
+requiredSchema.contains(field) || refs.contains(field.name)
+  }
+  StructType(readFields)
+} else {
+  dataSchema
+}
+  }
+
+  /**
+   * Converted filters to predicates and grouped by maximum field index
+   * in the read schema. For example, if an filter refers to 2 attributes
+   * attrA with field index 5 and attrB with field index 10 in the read schema:
+   *   0 === $"attrA" or $"attrB" < 100
+   * the filter is compiled to a predicate, and placed to the `predicates`
+   * array at the position 10. In this way, if there is a row with initialized
+   * fields from the 0 to 10 index, the predicate can be applied to the row
+   * to check that the row should be skipped or not.
+   * Multiple predicates with the same maximum reference index are combined
+   * by the `And` expression.
+   */
+  private val predicates: Array[BasePredicate] = {
+val len = readSchema.fields.length
+val groupedPredicates = Array.fill[BasePredicate](len)(null)
+if (SQLConf.get.csvFilterPushDown) {
+  val groupedExprs = Array.fill(len)(Seq.empty[Expression])
+  for (filter <- filters) {
+val expr = CSVFilters.filterToExpression(filter, toRef)
+val refs = filter.references
+if (refs.isEmpty) {
+  // For example, AlwaysTrue and AlwaysFalse doesn't have any 
references
+  for (i <- 0 until len) {
+groupedExprs(i) ++= expr
 
 Review comment:
   You are right since we combine all pushed filters via `And`. Also I think 
all filters with references (literals) could be put at the beginning of the 
group before reducing here 
https://github.com/apache/spark/pull/26973/files#diff-44a98c4a53980cb04e57f0489b257a37R95
   So, we have pushed filters: 
   `Seq(AlwaysFalse, StringContains(ref0, "abc"))`, and they are reduced to
   `And(AlwaysFalse, StringContains(ref0, "abc"))`, the second filter 
`StringContains(ref0, "abc")` will not be evaluated at all.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: