HyukjinKwon commented on a change in pull request #26973: [SPARK-30323][SQL]
Support filters pushdown in CSV datasource
URL: https://github.com/apache/spark/pull/26973#discussion_r366198528
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
##########
@@ -39,27 +40,49 @@ import org.apache.spark.unsafe.types.UTF8String
* @param requiredSchema The schema of the data that should be output for each
row. This should be a
* subset of the columns in dataSchema.
* @param options Configuration options for a CSV parser.
+ * @param filters The pushdown filters that should be applied to converted
values.
*/
class UnivocityParser(
dataSchema: StructType,
requiredSchema: StructType,
- val options: CSVOptions) extends Logging {
+ val options: CSVOptions,
+ filters: Seq[Filter]) extends Logging {
require(requiredSchema.toSet.subsetOf(dataSchema.toSet),
s"requiredSchema (${requiredSchema.catalogString}) should be the subset of
" +
s"dataSchema (${dataSchema.catalogString}).")
+ def this(dataSchema: StructType, requiredSchema: StructType, options:
CSVOptions) = {
+ this(dataSchema, requiredSchema, options, Seq.empty)
+ }
def this(schema: StructType, options: CSVOptions) = this(schema, schema,
options)
// A `ValueConverter` is responsible for converting the given value to a
desired type.
private type ValueConverter = String => Any
+ private val csvFilters = new CSVFilters(
+ filters,
+ dataSchema,
+ requiredSchema,
+ options.columnPruning)
+
+ private[sql] val parsedSchema = csvFilters.readSchema
+
+ // Mapping of field indexes of `parsedSchema` to indexes of `requiredSchema`.
+ // It returns -1 if `requiredSchema` doesn't contain a field from
`parsedSchema`.
Review comment:
> Is it just specific of current implementation or `requiredSchema` could
contain **really required** column in the future?
We will still have to do the filter at Spark side for some other datasources
where record-level filter isn't supported. As an example, ORC datasource.
In fact, Parquet does not do record-level filter by default because
filtering out in Spark side was found faster.
So, I can quite clearly foresee the `requiredSchema` will contain the column
referenced by pushed filters, or at the very least we will keep one way to do
it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]