Hey Hyukjin,
Sorry that I missed the JIRA ticket. Thanks for bring this issue up
here, your detailed investigation.
From my side, I think this is a bug of Parquet. Parquet was designed to
support schema evolution. When scanning a Parquet, if a column exists in
the requested schema but missing in the file schema, that column is
filled with null. This should also hold for pushed-down predicate
filters. For example, if filter "a = 1" is pushed down but column "a"
doesn't exist in the Parquet file being scanned, it's safe to assume "a"
is null in all records and drop all of them. On the contrary, if "a IS
NULL" is pushed down, all records should be preserved.
Apparently, before this issue is properly fixed on Parquet side, we need
to workaround this issue from Spark side. Please see my comments of all
3 of your solutions inlined below. In short, I'd like to have approach 1
for branch-1.5 and approach 2 for master.
Cheng
On 10/28/15 10:11 AM, Hyukjin Kwon wrote:
When enabling mergedSchema and predicate filter, this fails
since Parquet filters are pushed down regardless of each schema of the
splits (or rather files).
Dominic Ricard reported this
issue (https://issues.apache.org/jira/browse/SPARK-11103)
Even though this would work okay by setting
spark.sql.parquet.filterPushdown to false, the default value of this
is true. So this looks an issue.
My questions are,
is this clearly an issue?
and if so, which way would this be handled?
I thought this is an issue and I made three rough patches for this and
tested them and this looks fine though.
The first approach looks simpler and appropriate as I presume from the
previous approaches such as
https://issues.apache.org/jira/browse/SPARK-11153
However, in terms of safety and performances, I also want to ensure
which one would be a proper approach before trying to open a PR.
1. Simply set false to spark.sql.parquet.filterPushdown when using
mergeSchema
This one is pretty simple and safe, I'd like to have this for 1.5.2, or
1.5.3 if we can't make it for 1.5.2.
2. If spark.sql.parquet.filterPushdown is true, retrieve all the
schema of every part-files (and also merged one) and check if each can
accept the given schema and then, apply the filter only when they all
can accept, which I think it's a bit over-implemented.
Actually we only need to calculate the intersection of all file
schemata. We can make ParquetRelation.mergeSchemaInParallel return two
StructTypes, the first one is the original merged schema, the other is
the intersection of all file schemata, which only contains fields that
exist in all file schemata. Then we decide which filter to pushed down
according to the second StructType.
3. If spark.sql.parquet.filterPushdown is true, retrieve all the
schema of every part-files (and also merged one) and apply the filter
to each split (rather file) that can accept the filter which (I think
it's hacky) ends up different configurations for each task in a job.
The idea I came up with at first was similar to this one. Instead of
pulling all file schemata to driver side, we can push filter push-down
to executor side. Namely, passing candidate filters to executor side,
and compute the Parquet predicate filter according to each file schema.
I haven't looked into this direction in depth, but we can probably put
this part into CatalystReadSupport, which is now initialized on executor
side.
However, correctness of this approach can only guaranteed by the
defensive filtering we do in Spark SQL (i.e. apply all the filters no
matter they are pushed down or not), but we are considering to remove it
because it imposes unnecessary performance cost. This makes me hesitant
to go along this way.