Re: [DISCUSS] Support subdirectories when accessing partitioned Parquet Hive table

2020-01-06 Thread Wenchen Fan
Isn't your directory structure malformed? The directory name under the
table path should be in the form of "partitionCol=value". And AFAIK this is
the Hive standard.



On Mon, Jan 6, 2020 at 6:59 PM Lotkowski, Michael
 wrote:

> Hi all,
>
>
>
> Reviving this thread, we still have this issue and we have been using our
> updated jar which seems to work. It would be great to get some feedback
> whether this is the correct approach.
>
>
>
> Kind regards,
>
> Michael
>
>
>
> *From: *"Lotkowski, Michael" 
> *Date: *Tuesday, December 3, 2019 at 10:28 AM
> *To: *"dev@spark.apache.org" 
> *Subject: *Support subdirectories when accessing partitioned Parquet Hive
> table
>
>
>
> *Originally https://issues.apache.org/jira/browse/SPARK-30024
> <https://issues.apache.org/jira/browse/SPARK-30024>*
>
>
>
> Hi all,
>
> We have ran in to issues when trying to read parquet partitioned table
> created by Hive. I think I have narrowed down the cause to how
> InMemoryFileIndex
> <https://issues.apache.org/jira/browse/SPARK-30024#L95%5D> created a
> parent -> file mapping.
>
> The folder structure created by Hive is as follows:
>
> s3://bucket/table/date=2019-11-25/subdir1/data1.parquet
>
> s3://bucket/table/date=2019-11-25/subdir2/data2.parquet
>
> Looking through the code it seems that InMemoryFileIndex is creating a
> mapping of leaf files to their parents yielding the following mapping:
>
>  val leafDirToChildrenFiles = Map(
>
> s3://bucket/table/date=2019-11-25/subdir1
> -> s3://bucket/table/date=2019-11-25/subdir1/data1.parquet,
>
> s3://bucket/table/date=2019-11-25/subdir2
> -> s3://bucket/table/date=2019-11-25/subdir2/data2.parquet
>
> )
>
> Which then in turn is used in PartitioningAwareFileIndex
> <https://issues.apache.org/jira/browse/SPARK-30024#L83%5D>
>
> to prune the partitions. From my understanding pruning works by looking up
> the partition path in leafDirToChildrenFiles which in this case is
> s3://bucket/table/date=2019-11-25/ and therefore it fails to find any files
> for this partition.
>
> My suggested fix is to update how the InMemoryFileIndex builds the
> mapping, instead of having a map between parent dir to file, is to have a
> map of rootPath to file. More concretely
> https://gist.github.com/lotkowskim/76e8ff265493efd0b2b7175446805a82
>
> I have tested this by updating the jar running on EMR and we correctly can
> now read the data from these partitioned tables. It's also worth noting
> that we can read the data, without any modifications to the code, if we use
> the following settings:
>
> "spark.sql.hive.convertMetastoreParquet" to "false",
> "spark.hive.mapred.supports.subdirectories" to "true",
> "spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive" to
> "true"
>
> However with these settings we lose the ability to prune partitions
> causing us to read the entire table every time as we aren't using a Spark
> relation.
>
> I want to start discussion on whether this is a correct change, or if we
> are missing something more obvious. In either case I would be happy to
> fully implement the change.
>
> Thanks,
>
> Michael
>
>
>
>
>
>
> Amazon Development Centre (Scotland) Limited registered office: Waverley
> Gate, 2-4 Waterloo Place, Edinburgh EH1 3EG, Scotland. Registered in
> Scotland Registration number SC26867
>
>
>
>
>
>
> Amazon Development Centre (Scotland) Limited registered office: Waverley
> Gate, 2-4 Waterloo Place, Edinburgh EH1 3EG, Scotland. Registered in
> Scotland Registration number SC26867
>
>
>


[DISCUSS] Support subdirectories when accessing partitioned Parquet Hive table

2020-01-06 Thread Lotkowski, Michael
Hi all,

Reviving this thread, we still have this issue and we have been using our 
updated jar which seems to work. It would be great to get some feedback whether 
this is the correct approach.

Kind regards,
Michael

From: "Lotkowski, Michael" 
Date: Tuesday, December 3, 2019 at 10:28 AM
To: "dev@spark.apache.org" 
Subject: Support subdirectories when accessing partitioned Parquet Hive table

Originally https://issues.apache.org/jira/browse/SPARK-30024


Hi all,

We have ran in to issues when trying to read parquet partitioned table created 
by Hive. I think I have narrowed down the cause to how 
InMemoryFileIndex<https://issues.apache.org/jira/browse/SPARK-30024#L95%5D> 
created a parent -> file mapping.

The folder structure created by Hive is as follows:

s3://bucket/table/date=2019-11-25/subdir1/data1.parquet

s3://bucket/table/date=2019-11-25/subdir2/data2.parquet

Looking through the code it seems that InMemoryFileIndex is creating a mapping 
of leaf files to their parents yielding the following mapping:

 val leafDirToChildrenFiles = Map(

s3://bucket/table/date=2019-11-25/subdir1 -> 
s3://bucket/table/date=2019-11-25/subdir1/data1.parquet,

s3://bucket/table/date=2019-11-25/subdir2 -> 
s3://bucket/table/date=2019-11-25/subdir2/data2.parquet

)

Which then in turn is used in 
PartitioningAwareFileIndex<https://issues.apache.org/jira/browse/SPARK-30024#L83%5D>

to prune the partitions. From my understanding pruning works by looking up the 
partition path in leafDirToChildrenFiles which in this case is 
s3://bucket/table/date=2019-11-25/ and therefore it fails to find any files for 
this partition.

My suggested fix is to update how the InMemoryFileIndex builds the mapping, 
instead of having a map between parent dir to file, is to have a map of 
rootPath to file. More concretely 
https://gist.github.com/lotkowskim/76e8ff265493efd0b2b7175446805a82

I have tested this by updating the jar running on EMR and we correctly can now 
read the data from these partitioned tables. It's also worth noting that we can 
read the data, without any modifications to the code, if we use the following 
settings:

"spark.sql.hive.convertMetastoreParquet" to "false",
"spark.hive.mapred.supports.subdirectories" to "true",
"spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive" to "true"

However with these settings we lose the ability to prune partitions causing us 
to read the entire table every time as we aren't using a Spark relation.

I want to start discussion on whether this is a correct change, or if we are 
missing something more obvious. In either case I would be happy to fully 
implement the change.

Thanks,

Michael




Amazon Development Centre (Scotland) Limited registered office: Waverley Gate, 
2-4 Waterloo Place, Edinburgh EH1 3EG, Scotland. Registered in Scotland 
Registration number SC26867






Amazon Development Centre (Scotland) Limited registered office: Waverley Gate, 
2-4 Waterloo Place, Edinburgh EH1 3EG, Scotland. Registered in Scotland 
Registration number SC26867





Support subdirectories when accessing partitioned Parquet Hive table

2019-12-03 Thread Lotkowski, Michael
Originally https://issues.apache.org/jira/browse/SPARK-30024


Hi all,

We have ran in to issues when trying to read parquet partitioned table created 
by Hive. I think I have narrowed down the cause to how 
InMemoryFileIndex 
created a parent -> file mapping.

The folder structure created by Hive is as follows:

s3://bucket/table/date=2019-11-25/subdir1/data1.parquet

s3://bucket/table/date=2019-11-25/subdir2/data2.parquet

Looking through the code it seems that InMemoryFileIndex is creating a mapping 
of leaf files to their parents yielding the following mapping:

 val leafDirToChildrenFiles = Map(

s3://bucket/table/date=2019-11-25/subdir1 -> 
s3://bucket/table/date=2019-11-25/subdir1/data1.parquet,

s3://bucket/table/date=2019-11-25/subdir2 -> 
s3://bucket/table/date=2019-11-25/subdir2/data2.parquet

)

Which then in turn is used in 
PartitioningAwareFileIndex

to prune the partitions. From my understanding pruning works by looking up the 
partition path in leafDirToChildrenFiles which in this case is 
s3://bucket/table/date=2019-11-25/ and therefore it fails to find any files for 
this partition.

My suggested fix is to update how the InMemoryFileIndex builds the mapping, 
instead of having a map between parent dir to file, is to have a map of 
rootPath to file. More concretely 
https://gist.github.com/lotkowskim/76e8ff265493efd0b2b7175446805a82

I have tested this by updating the jar running on EMR and we correctly can now 
read the data from these partitioned tables. It's also worth noting that we can 
read the data, without any modifications to the code, if we use the following 
settings:

"spark.sql.hive.convertMetastoreParquet" to "false",
"spark.hive.mapred.supports.subdirectories" to "true",
"spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive" to "true"

However with these settings we lose the ability to prune partitions causing us 
to read the entire table every time as we aren't using a Spark relation.

I want to start discussion on whether this is a correct change, or if we are 
missing something more obvious. In either case I would be happy to fully 
implement the change.

Thanks,

Michael




Amazon Development Centre (Scotland) Limited registered office: Waverley Gate, 
2-4 Waterloo Place, Edinburgh EH1 3EG, Scotland. Registered in Scotland 
Registration number SC26867