Github user paulata commented on the issue:
https://github.com/apache/spark/pull/14649
Hi @andreweduffy, this looks very interesting, @guyGerson and I have been
working on this for a while - we agree it can significantly reduce the number
of tasks and files/objects touched, and agree on its importance for querying
object storage systems like S3 and Openstack Swift. We also like your idea of
introducing a new interface to handle all formats, not just parquet, although
the benefit seems to be currently limited to parquet data with _metadata
summaries.
By making a small extension to your pull request, the same idea could work
for parquet data without _metadata summaries as well as for all file formats
supported by Spark SQL.
@HyukjinKwon @liancheng Please take a look and let us know what you think -
we think this justifies adding a new interface and not limiting this feature to
Parquet only.
We have found this to be very useful in real world use cases, for example:
**Madrid Transportation Scenario**: we presented this at Spark Summit
Europe 2015 - this use case utilizes Elastic Search to index object storage
metadata and search for objects relevant to a query. For example a query
looking for rush hour traffic over the past five years. See
https://spark-summit.org/eu-2015/events/how-spark-enables-the-internet-of-things-efficient-integration-of-multiple-spark-components-for-smart-city-use-cases/
(slides 9-10).
**IoT data archive scenario**: we have an external DB with the following
information about sensor state: |sensorID -> type, manufacturer, version,
isActive |
We want to compare behaviour of all active sensors with their behaviour as
recorded in an object storage archive from 2014 using a single query.
Archives/dt=01-01-2014
Archives/dt=01-01-2014/sensor1.json (500MB)
Archives/dt=01-01-2014/sensor2.json (500MB)
Archives/dt=01-01-2014/sensor3.json (500MB)
Archives/dt=02-01-2014
Archives/dt=02-01-2014/sensor1.json (500MB)
Archives/dt=02-01-2014/sensor2.json (500MB)
Archives/dt=02-01-2014/sensor3.json (500MB)
more...
Note that for each case we plug in different code which does the filtering.
The filtering is application specific and not necessarily related to the file
format (parquet/csv/json etc.).
Here is a code snippet from our patch which shows how to keep the filter
interface more generic. @andreweduffy please let us know what you think about
extending your pull request in this way.
fileSourceInterfaces.scala:
```
//interface for applications to implement their own custom file filter.
This will be run during query execution
trait CustomFileFilter {
def isRequired(dataFilters: Seq[Filter], f: FileStatus) : Boolean
}
```
DataSourceScanExec.scala:
```
//code snippet example for including the custom filter in the execution
process (line:159)
val customFileFilterClazzName =
hadoopConf.get("spark.sql.customFileFilter")
val filteredPartitions = if (customFileFilterClazzName == null) {
logInfo(s"No custom file filter detected")
selectedPartitions
} else {
logInfo(s"Custom file filter detected")
val fileFilter = hadoopConf.getInstances(customFileFilterClazzName,
classOf[CustomFileFilter])
.get(0)
val tmpFilteredPartitions = selectedPartitions.map { part =>
Partition(part.values, part.files.filter { f =>
fileFilter.isRequired(dataFilters, f)
})
}.filter(_.files.nonEmpty)
tmpFilteredPartitions
}
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]