[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource URL: https://github.com/apache/spark/pull/26973#discussion_r366200543 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVFilters.scala ## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.csv + +import scala.util.Try + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources +import org.apache.spark.sql.types.{BooleanType, StructType} + +/** + * An instance of the class compiles filters to predicates and allows to + * apply the predicates to an internal row with partially initialized values + * converted from parsed CSV fields. + * + * @param filters The filters pushed down to CSV datasource. + * @param dataSchema The full schema with all fields in CSV files. + * @param requiredSchema The schema with only fields requested by the upper layer. + * @param columnPruning true if CSV parser can read sub-set of columns otherwise false. + */ +class CSVFilters( +filters: Seq[sources.Filter], +dataSchema: StructType, +requiredSchema: StructType, +columnPruning: Boolean) { + require(checkFilters(), "All filters must be applicable to the data schema.") Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource URL: https://github.com/apache/spark/pull/26973#discussion_r366200437 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala ## @@ -39,27 +40,49 @@ import org.apache.spark.unsafe.types.UTF8String * @param requiredSchema The schema of the data that should be output for each row. This should be a * subset of the columns in dataSchema. * @param options Configuration options for a CSV parser. + * @param filters The pushdown filters that should be applied to converted values. */ class UnivocityParser( dataSchema: StructType, requiredSchema: StructType, -val options: CSVOptions) extends Logging { +val options: CSVOptions, +filters: Seq[Filter]) extends Logging { require(requiredSchema.toSet.subsetOf(dataSchema.toSet), s"requiredSchema (${requiredSchema.catalogString}) should be the subset of " + s"dataSchema (${dataSchema.catalogString}).") + def this(dataSchema: StructType, requiredSchema: StructType, options: CSVOptions) = { +this(dataSchema, requiredSchema, options, Seq.empty) + } def this(schema: StructType, options: CSVOptions) = this(schema, schema, options) // A `ValueConverter` is responsible for converting the given value to a desired type. private type ValueConverter = String => Any + private val csvFilters = new CSVFilters( +filters, +dataSchema, +requiredSchema, +options.columnPruning) + + private[sql] val parsedSchema = csvFilters.readSchema Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource URL: https://github.com/apache/spark/pull/26973#discussion_r366197607 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVFilters.scala ## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.csv + +import scala.util.Try + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources +import org.apache.spark.sql.types.{BooleanType, StructType} + +/** + * An instance of the class compiles filters to predicates and allows to + * apply the predicates to an internal row with partially initialized values + * converted from parsed CSV fields. + * + * @param filters The filters pushed down to CSV datasource. + * @param dataSchema The full schema with all fields in CSV files. + * @param requiredSchema The schema with only fields requested by the upper layer. + * @param columnPruning true if CSV parser can read sub-set of columns otherwise false. + */ +class CSVFilters( +filters: Seq[sources.Filter], +dataSchema: StructType, +requiredSchema: StructType, +columnPruning: Boolean) { + require(checkFilters(), "All filters must be applicable to the data schema.") + + /** + * The schema to read from the underlying CSV parser. + * It combines the required schema and the fields referenced by filters. + */ + val readSchema: StructType = { +if (columnPruning) { + val refs = filters.flatMap(_.references).toSet + val readFields = dataSchema.filter { field => +requiredSchema.contains(field) || refs.contains(field.name) + } + StructType(readFields) +} else { + dataSchema +} + } + + /** + * Converted filters to predicates and grouped by maximum field index + * in the read schema. For example, if an filter refers to 2 attributes + * attrA with field index 5 and attrB with field index 10 in the read schema: + * 0 === $"attrA" or $"attrB" < 100 + * the filter is compiled to a predicate, and placed to the `predicates` + * array at the position 10. In this way, if there is a row with initialized + * fields from the 0 to 10 index, the predicate can be applied to the row + * to check that the row should be skipped or not. + * Multiple predicates with the same maximum reference index are combined + * by the `And` expression. + */ + private val predicates: Array[BasePredicate] = { +val len = readSchema.fields.length +val groupedPredicates = Array.fill[BasePredicate](len)(null) +if (SQLConf.get.csvFilterPushDown) { + val groupedExprs = Array.fill(len)(Seq.empty[Expression]) + for (filter <- filters) { +val expr = CSVFilters.filterToExpression(filter, toRef) +val refs = filter.references +if (refs.isEmpty) { + // For example, AlwaysTrue and AlwaysFalse doesn't have any references + for (i <- 0 until len) { +groupedExprs(i) ++= expr Review comment: I have implemented the first approach in the commit: https://github.com/apache/spark/pull/26973/commits/c03ae069d738c6aa526cc1a1216d079bc8b5ec3e This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource URL: https://github.com/apache/spark/pull/26973#discussion_r366189514 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala ## @@ -39,27 +40,49 @@ import org.apache.spark.unsafe.types.UTF8String * @param requiredSchema The schema of the data that should be output for each row. This should be a * subset of the columns in dataSchema. * @param options Configuration options for a CSV parser. + * @param filters The pushdown filters that should be applied to converted values. */ class UnivocityParser( dataSchema: StructType, requiredSchema: StructType, -val options: CSVOptions) extends Logging { +val options: CSVOptions, +filters: Seq[Filter]) extends Logging { require(requiredSchema.toSet.subsetOf(dataSchema.toSet), s"requiredSchema (${requiredSchema.catalogString}) should be the subset of " + s"dataSchema (${dataSchema.catalogString}).") + def this(dataSchema: StructType, requiredSchema: StructType, options: CSVOptions) = { +this(dataSchema, requiredSchema, options, Seq.empty) + } def this(schema: StructType, options: CSVOptions) = this(schema, schema, options) // A `ValueConverter` is responsible for converting the given value to a desired type. private type ValueConverter = String => Any + private val csvFilters = new CSVFilters( +filters, +dataSchema, +requiredSchema, +options.columnPruning) + + private[sql] val parsedSchema = csvFilters.readSchema + + // Mapping of field indexes of `parsedSchema` to indexes of `requiredSchema`. + // It returns -1 if `requiredSchema` doesn't contain a field from `parsedSchema`. Review comment: If `requiredSchema` always contains filter references, it is significant assumption, and it can simplify this implementation slightly. Is it just specific of current implementation or `requiredSchema` could contain **really required** column in the future? because filter columns are not actually required if the filter is applied only once in a datasource. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource URL: https://github.com/apache/spark/pull/26973#discussion_r366177956 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala ## @@ -39,27 +40,49 @@ import org.apache.spark.unsafe.types.UTF8String * @param requiredSchema The schema of the data that should be output for each row. This should be a * subset of the columns in dataSchema. * @param options Configuration options for a CSV parser. + * @param filters The pushdown filters that should be applied to converted values. */ class UnivocityParser( dataSchema: StructType, requiredSchema: StructType, -val options: CSVOptions) extends Logging { +val options: CSVOptions, +filters: Seq[Filter]) extends Logging { require(requiredSchema.toSet.subsetOf(dataSchema.toSet), s"requiredSchema (${requiredSchema.catalogString}) should be the subset of " + s"dataSchema (${dataSchema.catalogString}).") + def this(dataSchema: StructType, requiredSchema: StructType, options: CSVOptions) = { +this(dataSchema, requiredSchema, options, Seq.empty) + } def this(schema: StructType, options: CSVOptions) = this(schema, schema, options) // A `ValueConverter` is responsible for converting the given value to a desired type. private type ValueConverter = String => Any + private val csvFilters = new CSVFilters( +filters, +dataSchema, +requiredSchema, +options.columnPruning) + + private[sql] val parsedSchema = csvFilters.readSchema Review comment: Just for the context, originally I made it private but had to make it more open because it is used in other places in `sql`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource URL: https://github.com/apache/spark/pull/26973#discussion_r366177325 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala ## @@ -39,27 +40,49 @@ import org.apache.spark.unsafe.types.UTF8String * @param requiredSchema The schema of the data that should be output for each row. This should be a * subset of the columns in dataSchema. * @param options Configuration options for a CSV parser. + * @param filters The pushdown filters that should be applied to converted values. */ class UnivocityParser( dataSchema: StructType, requiredSchema: StructType, -val options: CSVOptions) extends Logging { +val options: CSVOptions, +filters: Seq[Filter]) extends Logging { require(requiredSchema.toSet.subsetOf(dataSchema.toSet), s"requiredSchema (${requiredSchema.catalogString}) should be the subset of " + s"dataSchema (${dataSchema.catalogString}).") + def this(dataSchema: StructType, requiredSchema: StructType, options: CSVOptions) = { +this(dataSchema, requiredSchema, options, Seq.empty) + } def this(schema: StructType, options: CSVOptions) = this(schema, schema, options) // A `ValueConverter` is responsible for converting the given value to a desired type. private type ValueConverter = String => Any + private val csvFilters = new CSVFilters( +filters, +dataSchema, +requiredSchema, +options.columnPruning) + + private[sql] val parsedSchema = csvFilters.readSchema Review comment: Do you propose just `private`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource URL: https://github.com/apache/spark/pull/26973#discussion_r366177325 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala ## @@ -39,27 +40,49 @@ import org.apache.spark.unsafe.types.UTF8String * @param requiredSchema The schema of the data that should be output for each row. This should be a * subset of the columns in dataSchema. * @param options Configuration options for a CSV parser. + * @param filters The pushdown filters that should be applied to converted values. */ class UnivocityParser( dataSchema: StructType, requiredSchema: StructType, -val options: CSVOptions) extends Logging { +val options: CSVOptions, +filters: Seq[Filter]) extends Logging { require(requiredSchema.toSet.subsetOf(dataSchema.toSet), s"requiredSchema (${requiredSchema.catalogString}) should be the subset of " + s"dataSchema (${dataSchema.catalogString}).") + def this(dataSchema: StructType, requiredSchema: StructType, options: CSVOptions) = { +this(dataSchema, requiredSchema, options, Seq.empty) + } def this(schema: StructType, options: CSVOptions) = this(schema, schema, options) // A `ValueConverter` is responsible for converting the given value to a desired type. private type ValueConverter = String => Any + private val csvFilters = new CSVFilters( +filters, +dataSchema, +requiredSchema, +options.columnPruning) + + private[sql] val parsedSchema = csvFilters.readSchema Review comment: Do you propose just `private`? or public? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource URL: https://github.com/apache/spark/pull/26973#discussion_r366176306 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVFilters.scala ## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.csv + +import scala.util.Try + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources +import org.apache.spark.sql.types.{BooleanType, StructType} + +/** + * An instance of the class compiles filters to predicates and allows to + * apply the predicates to an internal row with partially initialized values + * converted from parsed CSV fields. + * + * @param filters The filters pushed down to CSV datasource. + * @param dataSchema The full schema with all fields in CSV files. + * @param requiredSchema The schema with only fields requested by the upper layer. + * @param columnPruning true if CSV parser can read sub-set of columns otherwise false. + */ +class CSVFilters( +filters: Seq[sources.Filter], +dataSchema: StructType, +requiredSchema: StructType, +columnPruning: Boolean) { + require(checkFilters(), "All filters must be applicable to the data schema.") Review comment: Initially the function was slightly complex, see https://github.com/apache/spark/pull/26973/commits/f0aa0a88bfa0c87007f8781ba7fac8f9cd3057ba#diff-44a98c4a53980cb04e57f0489b257a37L126 . That's why I extracted it to a separate method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource URL: https://github.com/apache/spark/pull/26973#discussion_r366175302 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala ## @@ -204,15 +234,15 @@ class UnivocityParser( * Parses a single CSV string and turns it into either one resulting row or no row (if the * the record is malformed). */ - def parse(input: String): InternalRow = doParse(input) + def parse(input: String): Seq[InternalRow] = doParse(input) private val getToken = if (options.columnPruning) { (tokens: Array[String], index: Int) => tokens(index) } else { (tokens: Array[String], index: Int) => tokens(tokenIndexArr(index)) } - private def convert(tokens: Array[String]): InternalRow = { + private def convert(tokens: Array[String]): Seq[InternalRow] = { Review comment: We can do that but modification of `FailureSafeParser` is slightly orthogonal to the purpose of the PR, and it is not necessary for this changes. Can we do that separately? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource URL: https://github.com/apache/spark/pull/26973#discussion_r366174594 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala ## @@ -39,27 +40,49 @@ import org.apache.spark.unsafe.types.UTF8String * @param requiredSchema The schema of the data that should be output for each row. This should be a * subset of the columns in dataSchema. * @param options Configuration options for a CSV parser. + * @param filters The pushdown filters that should be applied to converted values. */ class UnivocityParser( dataSchema: StructType, requiredSchema: StructType, -val options: CSVOptions) extends Logging { +val options: CSVOptions, +filters: Seq[Filter]) extends Logging { require(requiredSchema.toSet.subsetOf(dataSchema.toSet), s"requiredSchema (${requiredSchema.catalogString}) should be the subset of " + s"dataSchema (${dataSchema.catalogString}).") + def this(dataSchema: StructType, requiredSchema: StructType, options: CSVOptions) = { +this(dataSchema, requiredSchema, options, Seq.empty) + } def this(schema: StructType, options: CSVOptions) = this(schema, schema, options) // A `ValueConverter` is responsible for converting the given value to a desired type. private type ValueConverter = String => Any + private val csvFilters = new CSVFilters( +filters, +dataSchema, +requiredSchema, +options.columnPruning) + + private[sql] val parsedSchema = csvFilters.readSchema + + // Mapping of field indexes of `parsedSchema` to indexes of `requiredSchema`. + // It returns -1 if `requiredSchema` doesn't contain a field from `parsedSchema`. Review comment: What should the upper layer do with the column if a datasource already applied filters? As far as I know filters are applied only once in DSv2, @cloud-fan or I am wrong? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource URL: https://github.com/apache/spark/pull/26973#discussion_r365789628 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala ## @@ -2204,4 +2204,37 @@ class CSVSuite extends QueryTest with SharedSparkSession with TestCsvData { checkAnswer(resultDF, Row("a", 2, "e", "c")) } } + + test("filters push down") { +Seq(true, false).foreach { multiLine => Review comment: And this is some kind of end-to-end test, I would test as much as possible options and modes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource URL: https://github.com/apache/spark/pull/26973#discussion_r365786665 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala ## @@ -2204,4 +2204,37 @@ class CSVSuite extends QueryTest with SharedSparkSession with TestCsvData { checkAnswer(resultDF, Row("a", 2, "e", "c")) } } + + test("filters push down") { +Seq(true, false).foreach { multiLine => Review comment: Lines in CSV cannot be spitted, so, input should be the same. The difference is how do we read the file - as whole or by lines. But you are right, there should be not difference for the changes since I touched only value conversions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource URL: https://github.com/apache/spark/pull/26973#discussion_r365704789 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala ## @@ -242,24 +272,48 @@ class UnivocityParser( new RuntimeException("Malformed CSV record")) } else { // When the length of the returned tokens is identical to the length of the parsed schema, - // we just need to convert the tokens that correspond to the required columns. - var badRecordException: Option[Throwable] = None + // we just need to: + // 1. Convert the tokens that correspond to the parsed schema. + // 2. Apply the pushdown filters to `parsedRow`. + // 3. Convert `parsedRow` to `requiredRow` by stripping non-required fields. var i = 0 + val requiredSingleRow = requiredRow.head while (i < requiredSchema.length) { +requiredSingleRow.setNullAt(i) +i += 1 + } + + var skipValueConversion = false + var badRecordException: Option[Throwable] = None + i = 0 + while (!skipValueConversion && i < parsedSchema.length) { try { - row(i) = valueConverters(i).apply(getToken(tokens, i)) + val convertedValue = valueConverters(i).apply(getToken(tokens, i)) + parsedRow(i) = convertedValue + if (csvFilters.skipRow(parsedRow, i)) { +skipValueConversion = true + } else { +val requiredIndex = parsedToRequiredIndex(i) +if (requiredIndex != -1) { + requiredSingleRow(requiredIndex) = convertedValue +} + } } catch { case NonFatal(e) => -badRecordException = badRecordException.orElse(Some(e)) -row.setNullAt(i) +badRecordException = Some(e) +skipValueConversion = true } i += 1 } - - if (badRecordException.isEmpty) { -row + if (skipValueConversion) { +if (badRecordException.isDefined) { Review comment: Look at https://github.com/apache/spark/pull/26973/files#diff-c82e4b74d2a51fed29069745ce4f9e96R303-R304 . The lines 303 and 304 above. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource URL: https://github.com/apache/spark/pull/26973#discussion_r365704789 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala ## @@ -242,24 +272,48 @@ class UnivocityParser( new RuntimeException("Malformed CSV record")) } else { // When the length of the returned tokens is identical to the length of the parsed schema, - // we just need to convert the tokens that correspond to the required columns. - var badRecordException: Option[Throwable] = None + // we just need to: + // 1. Convert the tokens that correspond to the parsed schema. + // 2. Apply the pushdown filters to `parsedRow`. + // 3. Convert `parsedRow` to `requiredRow` by stripping non-required fields. var i = 0 + val requiredSingleRow = requiredRow.head while (i < requiredSchema.length) { +requiredSingleRow.setNullAt(i) +i += 1 + } + + var skipValueConversion = false + var badRecordException: Option[Throwable] = None + i = 0 + while (!skipValueConversion && i < parsedSchema.length) { try { - row(i) = valueConverters(i).apply(getToken(tokens, i)) + val convertedValue = valueConverters(i).apply(getToken(tokens, i)) + parsedRow(i) = convertedValue + if (csvFilters.skipRow(parsedRow, i)) { +skipValueConversion = true + } else { +val requiredIndex = parsedToRequiredIndex(i) +if (requiredIndex != -1) { + requiredSingleRow(requiredIndex) = convertedValue +} + } } catch { case NonFatal(e) => -badRecordException = badRecordException.orElse(Some(e)) -row.setNullAt(i) +badRecordException = Some(e) +skipValueConversion = true } i += 1 } - - if (badRecordException.isEmpty) { -row + if (skipValueConversion) { +if (badRecordException.isDefined) { Review comment: Look at https://github.com/apache/spark/pull/26973/files#diff-c82e4b74d2a51fed29069745ce4f9e96R303-R304 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource URL: https://github.com/apache/spark/pull/26973#discussion_r365703327 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala ## @@ -39,27 +40,49 @@ import org.apache.spark.unsafe.types.UTF8String * @param requiredSchema The schema of the data that should be output for each row. This should be a * subset of the columns in dataSchema. * @param options Configuration options for a CSV parser. + * @param filters The pushdown filters that should be applied to converted values. */ class UnivocityParser( dataSchema: StructType, requiredSchema: StructType, -val options: CSVOptions) extends Logging { +val options: CSVOptions, +filters: Seq[Filter]) extends Logging { require(requiredSchema.toSet.subsetOf(dataSchema.toSet), s"requiredSchema (${requiredSchema.catalogString}) should be the subset of " + s"dataSchema (${dataSchema.catalogString}).") + def this(dataSchema: StructType, requiredSchema: StructType, options: CSVOptions) = { +this(dataSchema, requiredSchema, options, Seq.empty) + } def this(schema: StructType, options: CSVOptions) = this(schema, schema, options) // A `ValueConverter` is responsible for converting the given value to a desired type. private type ValueConverter = String => Any + private val csvFilters = new CSVFilters( +filters, +dataSchema, +requiredSchema, +options.columnPruning) + + private[sql] val parsedSchema = csvFilters.readSchema + + // Mapping of field indexes of `parsedSchema` to indexes of `requiredSchema`. + // It returns -1 if `requiredSchema` doesn't contain a field from `parsedSchema`. Review comment: For example: dataSchema: `i INTEGER, d DOUBLE, s STRING` requiredSchema: `i INTEGER` filters: `Seq(LessThan("d", 10))` parsedSchema: `i INTEGER, d DOUBLE` In that case, `requiredSchema` doesn't contain a field from `parsedSchema` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource URL: https://github.com/apache/spark/pull/26973#discussion_r365667390 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVFilters.scala ## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.csv + +import scala.util.Try + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources +import org.apache.spark.sql.types.{BooleanType, StructType} + +/** + * An instance of the class compiles filters to predicates and allows to + * apply the predicates to an internal row with partially initialized values + * converted from parsed CSV fields. + * + * @param filters The filters pushed down to CSV datasource. + * @param dataSchema The full schema with all fields in CSV files. + * @param requiredSchema The schema with only fields requested by the upper layer. + * @param columnPruning true if CSV parser can read sub-set of columns otherwise false. + */ +class CSVFilters( +filters: Seq[sources.Filter], +dataSchema: StructType, +requiredSchema: StructType, +columnPruning: Boolean) { + require(checkFilters(), "All filters must be applicable to the data schema.") + + /** + * The schema to read from the underlying CSV parser. + * It combines the required schema and the fields referenced by filters. + */ + val readSchema: StructType = { +if (columnPruning) { + val refs = filters.flatMap(_.references).toSet + val readFields = dataSchema.filter { field => +requiredSchema.contains(field) || refs.contains(field.name) + } + StructType(readFields) +} else { + dataSchema +} + } + + /** + * Converted filters to predicates and grouped by maximum field index + * in the read schema. For example, if an filter refers to 2 attributes + * attrA with field index 5 and attrB with field index 10 in the read schema: + * 0 === $"attrA" or $"attrB" < 100 + * the filter is compiled to a predicate, and placed to the `predicates` + * array at the position 10. In this way, if there is a row with initialized + * fields from the 0 to 10 index, the predicate can be applied to the row + * to check that the row should be skipped or not. + * Multiple predicates with the same maximum reference index are combined + * by the `And` expression. + */ + private val predicates: Array[BasePredicate] = { +val len = readSchema.fields.length +val groupedPredicates = Array.fill[BasePredicate](len)(null) +if (SQLConf.get.csvFilterPushDown) { + val groupedExprs = Array.fill(len)(Seq.empty[Expression]) + for (filter <- filters) { +val expr = CSVFilters.filterToExpression(filter, toRef) +val refs = filter.references +if (refs.isEmpty) { + // For example, AlwaysTrue and AlwaysFalse doesn't have any references + for (i <- 0 until len) { +groupedExprs(i) ++= expr Review comment: Even more, `AlwaysTrue` could be removed because it does not impact on the result. `AlwaysFalse` could be put at index 0, and other filters can be ignored. But this is some kind of ad-hoc optimization. The optimization above can work for other literal filters. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource URL: https://github.com/apache/spark/pull/26973#discussion_r365667390 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVFilters.scala ## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.csv + +import scala.util.Try + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources +import org.apache.spark.sql.types.{BooleanType, StructType} + +/** + * An instance of the class compiles filters to predicates and allows to + * apply the predicates to an internal row with partially initialized values + * converted from parsed CSV fields. + * + * @param filters The filters pushed down to CSV datasource. + * @param dataSchema The full schema with all fields in CSV files. + * @param requiredSchema The schema with only fields requested by the upper layer. + * @param columnPruning true if CSV parser can read sub-set of columns otherwise false. + */ +class CSVFilters( +filters: Seq[sources.Filter], +dataSchema: StructType, +requiredSchema: StructType, +columnPruning: Boolean) { + require(checkFilters(), "All filters must be applicable to the data schema.") + + /** + * The schema to read from the underlying CSV parser. + * It combines the required schema and the fields referenced by filters. + */ + val readSchema: StructType = { +if (columnPruning) { + val refs = filters.flatMap(_.references).toSet + val readFields = dataSchema.filter { field => +requiredSchema.contains(field) || refs.contains(field.name) + } + StructType(readFields) +} else { + dataSchema +} + } + + /** + * Converted filters to predicates and grouped by maximum field index + * in the read schema. For example, if an filter refers to 2 attributes + * attrA with field index 5 and attrB with field index 10 in the read schema: + * 0 === $"attrA" or $"attrB" < 100 + * the filter is compiled to a predicate, and placed to the `predicates` + * array at the position 10. In this way, if there is a row with initialized + * fields from the 0 to 10 index, the predicate can be applied to the row + * to check that the row should be skipped or not. + * Multiple predicates with the same maximum reference index are combined + * by the `And` expression. + */ + private val predicates: Array[BasePredicate] = { +val len = readSchema.fields.length +val groupedPredicates = Array.fill[BasePredicate](len)(null) +if (SQLConf.get.csvFilterPushDown) { + val groupedExprs = Array.fill(len)(Seq.empty[Expression]) + for (filter <- filters) { +val expr = CSVFilters.filterToExpression(filter, toRef) +val refs = filter.references +if (refs.isEmpty) { + // For example, AlwaysTrue and AlwaysFalse doesn't have any references + for (i <- 0 until len) { +groupedExprs(i) ++= expr Review comment: Even more, `AlwaysTrue` could be removed because it does not impact on the result. `AlwaysFalse` could be put at index 0, and other filters can be ignored. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource
MaxGekk commented on a change in pull request #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource URL: https://github.com/apache/spark/pull/26973#discussion_r365666768 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVFilters.scala ## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.csv + +import scala.util.Try + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources +import org.apache.spark.sql.types.{BooleanType, StructType} + +/** + * An instance of the class compiles filters to predicates and allows to + * apply the predicates to an internal row with partially initialized values + * converted from parsed CSV fields. + * + * @param filters The filters pushed down to CSV datasource. + * @param dataSchema The full schema with all fields in CSV files. + * @param requiredSchema The schema with only fields requested by the upper layer. + * @param columnPruning true if CSV parser can read sub-set of columns otherwise false. + */ +class CSVFilters( +filters: Seq[sources.Filter], +dataSchema: StructType, +requiredSchema: StructType, +columnPruning: Boolean) { + require(checkFilters(), "All filters must be applicable to the data schema.") + + /** + * The schema to read from the underlying CSV parser. + * It combines the required schema and the fields referenced by filters. + */ + val readSchema: StructType = { +if (columnPruning) { + val refs = filters.flatMap(_.references).toSet + val readFields = dataSchema.filter { field => +requiredSchema.contains(field) || refs.contains(field.name) + } + StructType(readFields) +} else { + dataSchema +} + } + + /** + * Converted filters to predicates and grouped by maximum field index + * in the read schema. For example, if an filter refers to 2 attributes + * attrA with field index 5 and attrB with field index 10 in the read schema: + * 0 === $"attrA" or $"attrB" < 100 + * the filter is compiled to a predicate, and placed to the `predicates` + * array at the position 10. In this way, if there is a row with initialized + * fields from the 0 to 10 index, the predicate can be applied to the row + * to check that the row should be skipped or not. + * Multiple predicates with the same maximum reference index are combined + * by the `And` expression. + */ + private val predicates: Array[BasePredicate] = { +val len = readSchema.fields.length +val groupedPredicates = Array.fill[BasePredicate](len)(null) +if (SQLConf.get.csvFilterPushDown) { + val groupedExprs = Array.fill(len)(Seq.empty[Expression]) + for (filter <- filters) { +val expr = CSVFilters.filterToExpression(filter, toRef) +val refs = filter.references +if (refs.isEmpty) { + // For example, AlwaysTrue and AlwaysFalse doesn't have any references + for (i <- 0 until len) { +groupedExprs(i) ++= expr Review comment: You are right since we combine all pushed filters via `And`. Also I think all filters with references (literals) could be put at the beginning of the group before reducing here https://github.com/apache/spark/pull/26973/files#diff-44a98c4a53980cb04e57f0489b257a37R95 So, we have pushed filters: `Seq(AlwaysFalse, StringContains(ref0, "abc"))`, and they are reduced to `And(AlwaysFalse, StringContains(ref0, "abc"))`, the second filter `StringContains(ref0, "abc")` will not be evaluated at all. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: