Funny enough, I observe different behaviour on EC2 vs EMR (Spark on EMR
installed with
https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark
https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark). Both
with Spark 1.3.1/Hadoop 2.
Reading a folder with 12 Parquet gives me the following:
On EC2:
scala val logs = sqlContext.parquetFile(“s3n://mylogs/”)
...
scala logs.rdd.partitions.length
15/05/07 11:45:50 INFO ParquetRelation2: Reading 100.0% of partitions
15/05/07 11:45:51 INFO MemoryStore: ensureFreeSpace(125716) called with
curMem=0, maxMem=278302556
15/05/07 11:45:51 INFO MemoryStore: Block broadcast_0 stored as values in
memory (estimated size 122.8 KB, free 265.3 MB)
15/05/07 11:45:51 INFO MemoryStore: ensureFreeSpace(19128) called with
curMem=125716, maxMem=278302556
15/05/07 11:45:51 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in
memory (estimated size 18.7 KB, free 265.3 MB)
15/05/07 11:45:51 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on
ip-10-31-82-233.ec2.internal:39894 (size: 18.7 KB, free: 265.4 MB)
15/05/07 11:45:51 INFO BlockManagerMaster: Updated info of block
broadcast_0_piece0
15/05/07 11:45:51 INFO SparkContext: Created broadcast 0 from NewHadoopRDD at
newParquet.scala:478
15/05/07 11:45:51 INFO ParquetRelation2$$anon$1$$anon$2: Using Task Side
Metadata Split Strategy
res0: Int = 12
On EMR:
scala val logs = sqlContext.parquetFile(“s3n://mylogs/”)
...
scala logs.rdd.partitions.length
15/05/07 11:46:53 INFO parquet.ParquetRelation2: Reading 100.0% of partitions
15/05/07 11:46:53 INFO storage.MemoryStore: ensureFreeSpace(266059) called with
curMem=287247, maxMem=6667936727
15/05/07 11:46:53 INFO storage.MemoryStore: Block broadcast_1 stored as values
in memory (estimated size 259.8 KB, free 6.2 GB)
15/05/07 11:46:53 INFO storage.MemoryStore: ensureFreeSpace(21188) called with
curMem=553306, maxMem=6667936727
15/05/07 11:46:53 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as
bytes in memory (estimated size 20.7 KB, free 6.2 GB)
15/05/07 11:46:53 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in
memory on ip-10-203-174-61.ec2.internal:52570 (size: 20.7 KB, free: 6.2 GB)
15/05/07 11:46:53 INFO storage.BlockManagerMaster: Updated info of block
broadcast_1_piece0
15/05/07 11:46:53 INFO spark.SparkContext: Created broadcast 1 from
NewHadoopRDD at newParquet.scala:478
15/05/07 11:46:54 INFO parquet.ParquetRelation2$$anon$1$$anon$2: Using Task
Side Metadata Split Strategy
res3: Int = 138
138 (!) partitions on EMR and 12 partitions on EC2 (same as number of files).
I’m reading from the exact same folder on S3.
This leads me to believe that there might be some configuration settings which
control how partitioning happens. Could that be the case?
Insights would be greatly appreciated.
Best, Eric
On 07 May 2015, at 09:31, Archit Thakur archit279tha...@gmail.com wrote:
Hi.
No. of partitions are determined by the RDD it uses in the plan it creates.
It uses NewHadoopRDD which gives partitions by getSplits of input format it
is using. It uses FilteringParquetRowInputFormat subclass of
ParquetInputFormat. To change the no of partitions write a new input format
and make the NewHadoopRDD use your plan. or if u r ready to shuffle u can use
repartition api without change of code.
Thanks Regards.
On Tue, May 5, 2015 at 7:56 PM, Masf masfwo...@gmail.com
mailto:masfwo...@gmail.com wrote:
Hi Eric.
Q1:
When I read parquet files, I've tested that Spark generates so many
partitions as parquet files exist in the path.
Q2:
To reduce the number of partitions you can use rdd.repartition(x), x= number
of partitions. Depend on your case, repartition could be a heavy task
Regards.
Miguel.
On Tue, May 5, 2015 at 3:56 PM, Eric Eijkelenboom
eric.eijkelenb...@gmail.com mailto:eric.eijkelenb...@gmail.com wrote:
Hello guys
Q1: How does Spark determine the number of partitions when reading a Parquet
file?
val df = sqlContext.parquetFile(path)
Is it some way related to the number of Parquet row groups in my input?
Q2: How can I reduce this number of partitions? Doing this:
df.rdd.coalesce(200).count
from the spark-shell causes job execution to hang…
Any ideas? Thank you in advance.
Eric
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
mailto:user-h...@spark.apache.org
--
Saludos.
Miguel Ángel