We have the similar issue with massive parquet files, Cheng Lian, could you
have a look?

2015-04-08 15:47 GMT+08:00 Zheng, Xudong <dong...@gmail.com>:

> Hi Cheng,
>
> I tried both these patches, and seems still not resolve my issue. And I
> found the most time is spend on this line in newParquet.scala:
>
> ParquetFileReader.readAllFootersInParallel(
>   sparkContext.hadoopConfiguration, seqAsJavaList(leaves),
> taskSideMetaData)
>
> Which need read all the files under the Parquet folder, while our Parquet
> folder has a lot of Parquet files (near 2000), read one file need about 2
> seconds, so it become very slow ... And the PR 5231 did not skip this steps
> so it not resolve my issue.
>
> As our Parquet files are generated by a Spark job, so the number of
> .parquet files is same with the number of tasks, that is why we have so
> many files. But these files actually have the same schema. Is there any way
> to merge these files into one, or avoid scan each of them?
>
> On Sat, Apr 4, 2015 at 9:47 PM, Cheng Lian <lian.cs....@gmail.com> wrote:
>
>>  Hey Xudong,
>>
>> We had been digging this issue for a while, and believe PR 5339
>> <http://github.com/apache/spark/pull/5339> and PR 5334
>> <http://github.com/apache/spark/pull/5339> should fix this issue.
>>
>> There two problems:
>>
>> 1. Normally we cache Parquet table metadata for better performance, but
>> when converting Hive metastore Hive tables, the cache is not used. Thus
>> heavy operations like schema discovery is done every time a metastore
>> Parquet table is converted.
>> 2. With Parquet task side metadata reading (which is turned on by
>> default), we can actually skip the row group information in the footer.
>> However, we accidentally called a Parquet function which doesn't skip row
>> group information.
>>
>> For your question about schema merging, Parquet allows different
>> part-files have different but compatible schemas. For example,
>> part-00001.parquet has columns a and b, while part-00002.parquet may has
>> columns a and c. In some cases, the summary files (_metadata and
>> _common_metadata) contains the merged schema (a, b, and c), but it's not
>> guaranteed. For example, when the user defined metadata stored different
>> part-files contain different values for the same key, Parquet simply gives
>> up writing summary files. That's why all part-files must be touched to get
>> a precise merged schema.
>>
>> However, in scenarios where a centralized arbitrative schema is available
>> (e.g. Hive metastore schema, or the schema provided by user via data source
>> DDL), we don't need to do schema merging on driver side, but defer it to
>> executor side and each task only needs to reconcile those part-files it
>> needs to touch. This is also what the Parquet developers did recently for
>> parquet-hadoop <https://github.com/apache/incubator-parquet-mr/pull/45>.
>>
>> Cheng
>>
>>
>> On 3/31/15 11:49 PM, Zheng, Xudong wrote:
>>
>> Thanks Cheng!
>>
>>  Set 'spark.sql.parquet.useDataSourceApi' to false resolves my issues,
>> but the PR 5231 seems not. Not sure any other things I did wrong ...
>>
>>  BTW, actually, we are very interested in the schema merging feature in
>> Spark 1.3, so both these two solution will disable this feature, right? It
>> seems that Parquet metadata is store in a file named _metadata in the
>> Parquet file folder (each folder is a partition as we use partition table),
>> why we need scan all Parquet part files? Is there any other solutions could
>> keep schema merging feature at the same time? We are really like this
>> feature :)
>>
>> On Tue, Mar 31, 2015 at 3:19 PM, Cheng Lian <lian.cs....@gmail.com>
>> wrote:
>>
>>>  Hi Xudong,
>>>
>>> This is probably because of Parquet schema merging is turned on by
>>> default. This is generally useful for Parquet files with different but
>>> compatible schemas. But it needs to read metadata from all Parquet
>>> part-files. This can be problematic when reading Parquet files with lots of
>>> part-files, especially when the user doesn't need schema merging.
>>>
>>> This issue is tracked by SPARK-6575, and here is a PR for it:
>>> https://github.com/apache/spark/pull/5231. This PR adds a configuration
>>> to disable schema merging by default when doing Hive metastore Parquet
>>> table conversion.
>>>
>>> Another workaround is to fallback to the old Parquet code by setting
>>> spark.sql.parquet.useDataSourceApi to false.
>>>
>>> Cheng
>>>
>>>
>>> On 3/31/15 2:47 PM, Zheng, Xudong wrote:
>>>
>>> Hi all,
>>>
>>>  We are using Parquet Hive table, and we are upgrading to Spark 1.3.
>>> But we find that, just a simple COUNT(*) query will much slower (100x) than
>>> Spark 1.2.
>>>
>>>  I find the most time spent on driver to get HDFS blocks. I find large
>>> amount of get below logs printed:
>>>
>>>  15/03/30 23:03:43 DEBUG ProtobufRpcEngine: Call: getBlockLocations took 
>>> 2097ms
>>> 15/03/30 23:03:43 DEBUG DFSClient: newInfo = LocatedBlocks{
>>>   fileLength=77153436
>>>   underConstruction=false
>>>   
>>> blocks=[LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275;
>>>  getBlockSize()=77153436; corrupt=false; offset=0; 
>>> locs=[10.152.116.172:50010, 10.152.116.169:50010, 10.153.125.184:50010]}]
>>>   
>>> lastLocatedBlock=LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275;
>>>  getBlockSize()=77153436; corrupt=false; offset=0; 
>>> locs=[10.152.116.169:50010, 10.153.125.184:50010, 10.152.116.172:50010]}
>>>   isLastBlockComplete=true}
>>> 15/03/30 23:03:43 DEBUG DFSClient: Connecting to datanode 
>>> 10.152.116.172:50010
>>>
>>>
>>>  I compare the printed log with Spark 1.2, although the number of
>>> getBlockLocations call is similar, but each such operation only cost 20~30
>>> ms (but it is 2000ms~3000ms now), and it didn't print the detailed
>>> LocatedBlocks info.
>>>
>>>  Another finding is, if I read the Parquet file via scala code form
>>> spark-shell as below, it looks fine, the computation will return the result
>>> quick as before.
>>>
>>>  sqlContext.parquetFile("data/myparquettable")
>>>
>>>
>>>  Any idea about it? Thank you!
>>>
>>>
>>>  --
>>>   郑旭东
>>> Zheng, Xudong
>>>
>>>
>>>
>>
>>
>>  --
>>   郑旭东
>> Zheng, Xudong
>>
>>
>>
>
>
> --
> 郑旭东
> Zheng, Xudong
>
>

Reply via email to