Funny enough, I observe different behaviour on EC2 vs EMR (Spark on EMR installed with https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark <https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark>). Both with Spark 1.3.1/Hadoop 2.
Reading a folder with 12 Parquet gives me the following: On EC2: scala> val logs = sqlContext.parquetFile(“s3n://mylogs/”) ... scala> logs.rdd.partitions.length 15/05/07 11:45:50 INFO ParquetRelation2: Reading 100.0% of partitions 15/05/07 11:45:51 INFO MemoryStore: ensureFreeSpace(125716) called with curMem=0, maxMem=278302556 15/05/07 11:45:51 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 122.8 KB, free 265.3 MB) 15/05/07 11:45:51 INFO MemoryStore: ensureFreeSpace(19128) called with curMem=125716, maxMem=278302556 15/05/07 11:45:51 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 18.7 KB, free 265.3 MB) 15/05/07 11:45:51 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-10-31-82-233.ec2.internal:39894 (size: 18.7 KB, free: 265.4 MB) 15/05/07 11:45:51 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/05/07 11:45:51 INFO SparkContext: Created broadcast 0 from NewHadoopRDD at newParquet.scala:478 15/05/07 11:45:51 INFO ParquetRelation2$$anon$1$$anon$2: Using Task Side Metadata Split Strategy res0: Int = 12 On EMR: scala> val logs = sqlContext.parquetFile(“s3n://mylogs/”) ... scala> logs.rdd.partitions.length 15/05/07 11:46:53 INFO parquet.ParquetRelation2: Reading 100.0% of partitions 15/05/07 11:46:53 INFO storage.MemoryStore: ensureFreeSpace(266059) called with curMem=287247, maxMem=6667936727 15/05/07 11:46:53 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 259.8 KB, free 6.2 GB) 15/05/07 11:46:53 INFO storage.MemoryStore: ensureFreeSpace(21188) called with curMem=553306, maxMem=6667936727 15/05/07 11:46:53 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 20.7 KB, free 6.2 GB) 15/05/07 11:46:53 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on ip-10-203-174-61.ec2.internal:52570 (size: 20.7 KB, free: 6.2 GB) 15/05/07 11:46:53 INFO storage.BlockManagerMaster: Updated info of block broadcast_1_piece0 15/05/07 11:46:53 INFO spark.SparkContext: Created broadcast 1 from NewHadoopRDD at newParquet.scala:478 15/05/07 11:46:54 INFO parquet.ParquetRelation2$$anon$1$$anon$2: Using Task Side Metadata Split Strategy res3: Int = 138 138 (!) partitions on EMR and 12 partitions on EC2 (same as number of files). I’m reading from the exact same folder on S3. This leads me to believe that there might be some configuration settings which control how partitioning happens. Could that be the case? Insights would be greatly appreciated. Best, Eric > On 07 May 2015, at 09:31, Archit Thakur <archit279tha...@gmail.com> wrote: > > Hi. > No. of partitions are determined by the RDD it uses in the plan it creates. > It uses NewHadoopRDD which gives partitions by getSplits of input format it > is using. It uses FilteringParquetRowInputFormat subclass of > ParquetInputFormat. To change the no of partitions write a new input format > and make the NewHadoopRDD use your plan. or if u r ready to shuffle u can use > repartition api without change of code. > > Thanks & Regards. > > On Tue, May 5, 2015 at 7:56 PM, Masf <masfwo...@gmail.com > <mailto:masfwo...@gmail.com>> wrote: > Hi Eric. > > Q1: > When I read parquet files, I've tested that Spark generates so many > partitions as parquet files exist in the path. > > Q2: > To reduce the number of partitions you can use rdd.repartition(x), x=> number > of partitions. Depend on your case, repartition could be a heavy task > > > Regards. > Miguel. > > On Tue, May 5, 2015 at 3:56 PM, Eric Eijkelenboom > <eric.eijkelenb...@gmail.com <mailto:eric.eijkelenb...@gmail.com>> wrote: > Hello guys > > Q1: How does Spark determine the number of partitions when reading a Parquet > file? > > val df = sqlContext.parquetFile(path) > > Is it some way related to the number of Parquet row groups in my input? > > Q2: How can I reduce this number of partitions? Doing this: > > df.rdd.coalesce(200).count > > from the spark-shell causes job execution to hang… > > Any ideas? Thank you in advance. > > Eric > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > <mailto:user-unsubscr...@spark.apache.org> > For additional commands, e-mail: user-h...@spark.apache.org > <mailto:user-h...@spark.apache.org> > > > > > -- > > > Saludos. > Miguel Ángel >