Re: Upgrade to parquet 1.6.0
Great, thanks for the extra info! > On 12 Jun 2015, at 12:41, Cheng Lian wrote: > > At the time 1.3.x was released, 1.6.0 hasn't been released yet. And we didn't > have enough time to upgrade and test Parquet 1.6.0 for Spark 1.4.0. But we've > already upgraded Parquet to 1.7.0 (which is exactly the same as 1.6.0 with > package name renamed from com.twitter to org.apache.parquet) on master branch > recently. > > Cheng > > On 6/12/15 6:16 PM, Eric Eijkelenboom wrote: >> Hi >> >> What is the reason that Spark still comes with Parquet 1.6.0rc3? It seems >> like newer Parquet versions are available (e.g. 1.6.0). This would fix >> problems with ‘spark.sql.parquet.filterPushdown’, which currently is >> disabled by default, because of a bug in Parquet 1.6.0rc3. >> >> Thanks! >> >> Eric >
Upgrade to parquet 1.6.0
Hi What is the reason that Spark still comes with Parquet 1.6.0rc3? It seems like newer Parquet versions are available (e.g. 1.6.0). This would fix problems with ‘spark.sql.parquet.filterPushdown’, which currently is disabled by default, because of a bug in Parquet 1.6.0rc3. Thanks! Eric
Re: Parquet number of partitions
Funny enough, I observe different behaviour on EC2 vs EMR (Spark on EMR installed with https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark <https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark>). Both with Spark 1.3.1/Hadoop 2. Reading a folder with 12 Parquet gives me the following: On EC2: scala> val logs = sqlContext.parquetFile(“s3n://mylogs/”) ... scala> logs.rdd.partitions.length 15/05/07 11:45:50 INFO ParquetRelation2: Reading 100.0% of partitions 15/05/07 11:45:51 INFO MemoryStore: ensureFreeSpace(125716) called with curMem=0, maxMem=278302556 15/05/07 11:45:51 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 122.8 KB, free 265.3 MB) 15/05/07 11:45:51 INFO MemoryStore: ensureFreeSpace(19128) called with curMem=125716, maxMem=278302556 15/05/07 11:45:51 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 18.7 KB, free 265.3 MB) 15/05/07 11:45:51 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-10-31-82-233.ec2.internal:39894 (size: 18.7 KB, free: 265.4 MB) 15/05/07 11:45:51 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/05/07 11:45:51 INFO SparkContext: Created broadcast 0 from NewHadoopRDD at newParquet.scala:478 15/05/07 11:45:51 INFO ParquetRelation2$$anon$1$$anon$2: Using Task Side Metadata Split Strategy res0: Int = 12 On EMR: scala> val logs = sqlContext.parquetFile(“s3n://mylogs/”) ... scala> logs.rdd.partitions.length 15/05/07 11:46:53 INFO parquet.ParquetRelation2: Reading 100.0% of partitions 15/05/07 11:46:53 INFO storage.MemoryStore: ensureFreeSpace(266059) called with curMem=287247, maxMem=6667936727 15/05/07 11:46:53 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 259.8 KB, free 6.2 GB) 15/05/07 11:46:53 INFO storage.MemoryStore: ensureFreeSpace(21188) called with curMem=553306, maxMem=6667936727 15/05/07 11:46:53 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 20.7 KB, free 6.2 GB) 15/05/07 11:46:53 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on ip-10-203-174-61.ec2.internal:52570 (size: 20.7 KB, free: 6.2 GB) 15/05/07 11:46:53 INFO storage.BlockManagerMaster: Updated info of block broadcast_1_piece0 15/05/07 11:46:53 INFO spark.SparkContext: Created broadcast 1 from NewHadoopRDD at newParquet.scala:478 15/05/07 11:46:54 INFO parquet.ParquetRelation2$$anon$1$$anon$2: Using Task Side Metadata Split Strategy res3: Int = 138 138 (!) partitions on EMR and 12 partitions on EC2 (same as number of files). I’m reading from the exact same folder on S3. This leads me to believe that there might be some configuration settings which control how partitioning happens. Could that be the case? Insights would be greatly appreciated. Best, Eric > On 07 May 2015, at 09:31, Archit Thakur wrote: > > Hi. > No. of partitions are determined by the RDD it uses in the plan it creates. > It uses NewHadoopRDD which gives partitions by getSplits of input format it > is using. It uses FilteringParquetRowInputFormat subclass of > ParquetInputFormat. To change the no of partitions write a new input format > and make the NewHadoopRDD use your plan. or if u r ready to shuffle u can use > repartition api without change of code. > > Thanks & Regards. > > On Tue, May 5, 2015 at 7:56 PM, Masf <mailto:masfwo...@gmail.com>> wrote: > Hi Eric. > > Q1: > When I read parquet files, I've tested that Spark generates so many > partitions as parquet files exist in the path. > > Q2: > To reduce the number of partitions you can use rdd.repartition(x), x=> number > of partitions. Depend on your case, repartition could be a heavy task > > > Regards. > Miguel. > > On Tue, May 5, 2015 at 3:56 PM, Eric Eijkelenboom > mailto:eric.eijkelenb...@gmail.com>> wrote: > Hello guys > > Q1: How does Spark determine the number of partitions when reading a Parquet > file? > > val df = sqlContext.parquetFile(path) > > Is it some way related to the number of Parquet row groups in my input? > > Q2: How can I reduce this number of partitions? Doing this: > > df.rdd.coalesce(200).count > > from the spark-shell causes job execution to hang… > > Any ideas? Thank you in advance. > > Eric > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > <mailto:user-unsubscr...@spark.apache.org> > For additional commands, e-mail: user-h...@spark.apache.org > <mailto:user-h...@spark.apache.org> > > > > > -- > > > Saludos. > Miguel Ángel >
Parquet number of partitions
Hello guys Q1: How does Spark determine the number of partitions when reading a Parquet file? val df = sqlContext.parquetFile(path) Is it some way related to the number of Parquet row groups in my input? Q2: How can I reduce this number of partitions? Doing this: df.rdd.coalesce(200).count from the spark-shell causes job execution to hang… Any ideas? Thank you in advance. Eric - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Opening many Parquet files = slow
Hi guys Does anyone know how to stop Spark from opening all Parquet files before starting a job? This is quite a show stopper for me, since I have 5000 Parquet files on S3. Recap of what I tried: 1. Disable schema merging with: sqlContext.load(“parquet", Map("mergeSchema" -> "false”, "path" -> “s3://path/to/folder")) This opens most files in the folder (17 out of 21 in my small example). For 5000 files on S3, sqlContext.load() takes 30 minutes to complete. 2. Use the old api with: sqlContext.setConf("spark.sql.parquet.useDataSourceApi", "false”) Now sqlContext.parquetFile() only opens a few files and prints the schema: so far so good! However, as soon as I run e.g. a count() on the dataframe, Spark still opens all files _before_ starting a job/stage. Effectively this moves the delay from load() to count() (or any other action I presume). 3. Run Spark 1.3.1-rc2. sqlContext.load() took about 30 minutes for 5000 Parquet files on S3, the same as 1.3.0. Any help would be greatly appreciated! Thanks a lot. Eric > On 10 Apr 2015, at 16:46, Eric Eijkelenboom > wrote: > > Hi Ted > > Ah, I guess the term ‘source’ confused me :) > > Doing: > > sqlContext.load(“parquet", Map("mergeSchema" -> "false”, "path" -> “path to a > single day of logs")) > > for 1 directory with 21 files, Spark opens 17 files: > > 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening > 's3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-72' > > > for reading > 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key > 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-72' for > reading at position '261573524' > 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening > 's3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-74' > > > for reading > 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening > 's3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-77' > > > for reading > 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening > 's3n://mylogs/logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-62' > > > for reading > 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key > 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-74' for > reading at position '259256807' > 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key > 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-77' for > reading at position '260002042' > 15/04/10 14:31:42 INFO s3native.NativeS3FileSystem: Opening key > 'logs/=2015/mm=2/dd=1/bab8c575-29e7-4456-a1a1-23f8f746e46a-62' for > reading at position ‘260875275' > etc. > > I can’t seem to pass a comma-separated list of directories to load(), so in > order to load multiple days of logs, I have to point to the root folder and > depend on auto-partition discovery (unless there’s a smarter way). > > Doing: > > sqlContext.load(“parquet", Map("mergeSchema" -> "false”, "path" -> “path to > root log dir")) > > starts opening what seems like all files (I killed the process after a couple > of minutes). > > Thanks for helping out. > Eric
Opening many Parquet files = slow
Hi guys I’ve got: 180 days of log data in Parquet. Each day is stored in a separate folder in S3. Each day consists of 20-30 Parquet files of 256 MB each. Spark 1.3 on Amazon EMR This makes approximately 5000 Parquet files with a total size if 1.5 TB. My code: val in = sqlContext.parquetFile(“day1”, “day2”, …, “day180”) Problem: Before the very first stage is started, Spark spends about 25 minutes printing the following: ... 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=8/dd=30/1b1d213d-101d-45a7-85ff-f1c573bf5ffd-59' for reading at position '258305902' 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=11/dd=22/2dbc904b-b341-4e9f-a9bb-5b5933c2e101-72' for reading at position '260897108' 15/04/08 13:00:26 INFO s3native.NativeS3FileSystem: Opening 's3n://adt-timelord-daily-logs-pure/logs/=2014/mm=11/dd=2/b0752022-3cd0-47e5-9e67-6ad84543e84b-000124' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening key 'logs/=2014/mm=11/dd=2/b0752022-3cd0-47e5-9e67-6ad84543e84b-000124' for reading at position '261259189' 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening 's3n://adt-timelord-daily-logs-pure/logs/=2014/mm=10/dd=15/bc9c8fdf-dc67-441a-8eda-9a06f032158f-000102' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening 's3n://adt-timelord-daily-logs-pure/logs/=2014/mm=8/dd=30/1b1d213d-101d-45a7-85ff-f1c573bf5ffd-60' for reading 15/04/08 13:00:27 INFO s3native.NativeS3FileSystem: Opening 's3n://adt-timelord-daily-logs-pure/logs/=2014/mm=11/dd=22/2dbc904b-b341-4e9f-a9bb-5b5933c2e101-73' for reading … etc It looks like Spark is opening each file, before it actually does any work. This means a delay of 25 minutes when working with Parquet files. Previously, we used LZO files and did not experience this problem. Bonus info: This also happens when I use auto partition discovery (i.e. sqlContext.parquetFile(“/path/to/logsroot/")). What can I do to avoid this? Thanks in advance! Eric Eijkelenboom