We have the similar issue with massive parquet files, Cheng Lian, could you have a look?
2015-04-08 15:47 GMT+08:00 Zheng, Xudong <dong...@gmail.com>: > Hi Cheng, > > I tried both these patches, and seems still not resolve my issue. And I > found the most time is spend on this line in newParquet.scala: > > ParquetFileReader.readAllFootersInParallel( > sparkContext.hadoopConfiguration, seqAsJavaList(leaves), > taskSideMetaData) > > Which need read all the files under the Parquet folder, while our Parquet > folder has a lot of Parquet files (near 2000), read one file need about 2 > seconds, so it become very slow ... And the PR 5231 did not skip this steps > so it not resolve my issue. > > As our Parquet files are generated by a Spark job, so the number of > .parquet files is same with the number of tasks, that is why we have so > many files. But these files actually have the same schema. Is there any way > to merge these files into one, or avoid scan each of them? > > On Sat, Apr 4, 2015 at 9:47 PM, Cheng Lian <lian.cs....@gmail.com> wrote: > >> Hey Xudong, >> >> We had been digging this issue for a while, and believe PR 5339 >> <http://github.com/apache/spark/pull/5339> and PR 5334 >> <http://github.com/apache/spark/pull/5339> should fix this issue. >> >> There two problems: >> >> 1. Normally we cache Parquet table metadata for better performance, but >> when converting Hive metastore Hive tables, the cache is not used. Thus >> heavy operations like schema discovery is done every time a metastore >> Parquet table is converted. >> 2. With Parquet task side metadata reading (which is turned on by >> default), we can actually skip the row group information in the footer. >> However, we accidentally called a Parquet function which doesn't skip row >> group information. >> >> For your question about schema merging, Parquet allows different >> part-files have different but compatible schemas. For example, >> part-00001.parquet has columns a and b, while part-00002.parquet may has >> columns a and c. In some cases, the summary files (_metadata and >> _common_metadata) contains the merged schema (a, b, and c), but it's not >> guaranteed. For example, when the user defined metadata stored different >> part-files contain different values for the same key, Parquet simply gives >> up writing summary files. That's why all part-files must be touched to get >> a precise merged schema. >> >> However, in scenarios where a centralized arbitrative schema is available >> (e.g. Hive metastore schema, or the schema provided by user via data source >> DDL), we don't need to do schema merging on driver side, but defer it to >> executor side and each task only needs to reconcile those part-files it >> needs to touch. This is also what the Parquet developers did recently for >> parquet-hadoop <https://github.com/apache/incubator-parquet-mr/pull/45>. >> >> Cheng >> >> >> On 3/31/15 11:49 PM, Zheng, Xudong wrote: >> >> Thanks Cheng! >> >> Set 'spark.sql.parquet.useDataSourceApi' to false resolves my issues, >> but the PR 5231 seems not. Not sure any other things I did wrong ... >> >> BTW, actually, we are very interested in the schema merging feature in >> Spark 1.3, so both these two solution will disable this feature, right? It >> seems that Parquet metadata is store in a file named _metadata in the >> Parquet file folder (each folder is a partition as we use partition table), >> why we need scan all Parquet part files? Is there any other solutions could >> keep schema merging feature at the same time? We are really like this >> feature :) >> >> On Tue, Mar 31, 2015 at 3:19 PM, Cheng Lian <lian.cs....@gmail.com> >> wrote: >> >>> Hi Xudong, >>> >>> This is probably because of Parquet schema merging is turned on by >>> default. This is generally useful for Parquet files with different but >>> compatible schemas. But it needs to read metadata from all Parquet >>> part-files. This can be problematic when reading Parquet files with lots of >>> part-files, especially when the user doesn't need schema merging. >>> >>> This issue is tracked by SPARK-6575, and here is a PR for it: >>> https://github.com/apache/spark/pull/5231. This PR adds a configuration >>> to disable schema merging by default when doing Hive metastore Parquet >>> table conversion. >>> >>> Another workaround is to fallback to the old Parquet code by setting >>> spark.sql.parquet.useDataSourceApi to false. >>> >>> Cheng >>> >>> >>> On 3/31/15 2:47 PM, Zheng, Xudong wrote: >>> >>> Hi all, >>> >>> We are using Parquet Hive table, and we are upgrading to Spark 1.3. >>> But we find that, just a simple COUNT(*) query will much slower (100x) than >>> Spark 1.2. >>> >>> I find the most time spent on driver to get HDFS blocks. I find large >>> amount of get below logs printed: >>> >>> 15/03/30 23:03:43 DEBUG ProtobufRpcEngine: Call: getBlockLocations took >>> 2097ms >>> 15/03/30 23:03:43 DEBUG DFSClient: newInfo = LocatedBlocks{ >>> fileLength=77153436 >>> underConstruction=false >>> >>> blocks=[LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275; >>> getBlockSize()=77153436; corrupt=false; offset=0; >>> locs=[10.152.116.172:50010, 10.152.116.169:50010, 10.153.125.184:50010]}] >>> >>> lastLocatedBlock=LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275; >>> getBlockSize()=77153436; corrupt=false; offset=0; >>> locs=[10.152.116.169:50010, 10.153.125.184:50010, 10.152.116.172:50010]} >>> isLastBlockComplete=true} >>> 15/03/30 23:03:43 DEBUG DFSClient: Connecting to datanode >>> 10.152.116.172:50010 >>> >>> >>> I compare the printed log with Spark 1.2, although the number of >>> getBlockLocations call is similar, but each such operation only cost 20~30 >>> ms (but it is 2000ms~3000ms now), and it didn't print the detailed >>> LocatedBlocks info. >>> >>> Another finding is, if I read the Parquet file via scala code form >>> spark-shell as below, it looks fine, the computation will return the result >>> quick as before. >>> >>> sqlContext.parquetFile("data/myparquettable") >>> >>> >>> Any idea about it? Thank you! >>> >>> >>> -- >>> 郑旭东 >>> Zheng, Xudong >>> >>> >>> >> >> >> -- >> 郑旭东 >> Zheng, Xudong >> >> >> > > > -- > 郑旭东 > Zheng, Xudong > >