Re: Parquet number of partitions

2015-05-07 Thread Archit Thakur
Hi.
No. of partitions are determined by the RDD it uses in the plan it creates.
It uses NewHadoopRDD which gives partitions by getSplits of input format it
is using. It uses FilteringParquetRowInputFormat subclass of
ParquetInputFormat. To change the no of partitions write a new input format
and make the NewHadoopRDD use your plan. or if u r ready to shuffle u can
use repartition api without change of code.

Thanks  Regards.

On Tue, May 5, 2015 at 7:56 PM, Masf masfwo...@gmail.com wrote:

 Hi Eric.

 Q1:
 When I read parquet files, I've tested that Spark generates so many
 partitions as parquet files exist in the path.

 Q2:
 To reduce the number of partitions you can use rdd.repartition(x), x=
 number of partitions. Depend on your case, repartition could be a heavy task


 Regards.
 Miguel.

 On Tue, May 5, 2015 at 3:56 PM, Eric Eijkelenboom 
 eric.eijkelenb...@gmail.com wrote:

 Hello guys

 Q1: How does Spark determine the number of partitions when reading a
 Parquet file?

 val df = sqlContext.parquetFile(path)

 Is it some way related to the number of Parquet row groups in my input?

 Q2: How can I reduce this number of partitions? Doing this:

 df.rdd.coalesce(200).count

 from the spark-shell causes job execution to hang…

 Any ideas? Thank you in advance.

 Eric
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --


 Saludos.
 Miguel Ángel



Re: Parquet number of partitions

2015-05-07 Thread Eric Eijkelenboom
Funny enough, I observe different behaviour on EC2 vs EMR (Spark on EMR 
installed with 
https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark 
https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark). Both 
with Spark 1.3.1/Hadoop 2.

Reading a folder with 12 Parquet gives me the following:

On EC2: 
scala val logs = sqlContext.parquetFile(“s3n://mylogs/”)
...
scala logs.rdd.partitions.length
15/05/07 11:45:50 INFO ParquetRelation2: Reading 100.0% of partitions
15/05/07 11:45:51 INFO MemoryStore: ensureFreeSpace(125716) called with 
curMem=0, maxMem=278302556
15/05/07 11:45:51 INFO MemoryStore: Block broadcast_0 stored as values in 
memory (estimated size 122.8 KB, free 265.3 MB)
15/05/07 11:45:51 INFO MemoryStore: ensureFreeSpace(19128) called with 
curMem=125716, maxMem=278302556
15/05/07 11:45:51 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in 
memory (estimated size 18.7 KB, free 265.3 MB)
15/05/07 11:45:51 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 
ip-10-31-82-233.ec2.internal:39894 (size: 18.7 KB, free: 265.4 MB)
15/05/07 11:45:51 INFO BlockManagerMaster: Updated info of block 
broadcast_0_piece0
15/05/07 11:45:51 INFO SparkContext: Created broadcast 0 from NewHadoopRDD at 
newParquet.scala:478
15/05/07 11:45:51 INFO ParquetRelation2$$anon$1$$anon$2: Using Task Side 
Metadata Split Strategy
res0: Int = 12

On EMR:
scala val logs = sqlContext.parquetFile(“s3n://mylogs/”)
...
scala logs.rdd.partitions.length
15/05/07 11:46:53 INFO parquet.ParquetRelation2: Reading 100.0% of partitions
15/05/07 11:46:53 INFO storage.MemoryStore: ensureFreeSpace(266059) called with 
curMem=287247, maxMem=6667936727
15/05/07 11:46:53 INFO storage.MemoryStore: Block broadcast_1 stored as values 
in memory (estimated size 259.8 KB, free 6.2 GB)
15/05/07 11:46:53 INFO storage.MemoryStore: ensureFreeSpace(21188) called with 
curMem=553306, maxMem=6667936727
15/05/07 11:46:53 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as 
bytes in memory (estimated size 20.7 KB, free 6.2 GB)
15/05/07 11:46:53 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in 
memory on ip-10-203-174-61.ec2.internal:52570 (size: 20.7 KB, free: 6.2 GB)
15/05/07 11:46:53 INFO storage.BlockManagerMaster: Updated info of block 
broadcast_1_piece0
15/05/07 11:46:53 INFO spark.SparkContext: Created broadcast 1 from 
NewHadoopRDD at newParquet.scala:478
15/05/07 11:46:54 INFO parquet.ParquetRelation2$$anon$1$$anon$2: Using Task 
Side Metadata Split Strategy
res3: Int = 138

138 (!) partitions on EMR and 12 partitions on EC2 (same as number of files). 
I’m reading from the exact same folder on S3.

This leads me to believe that there might be some configuration settings which 
control how partitioning happens. Could that be the case?

Insights would be greatly appreciated. 

Best, Eric



 On 07 May 2015, at 09:31, Archit Thakur archit279tha...@gmail.com wrote:
 
 Hi.
 No. of partitions are determined by the RDD it uses in the plan it creates. 
 It uses NewHadoopRDD which gives partitions by getSplits of input format it 
 is using. It uses FilteringParquetRowInputFormat subclass of 
 ParquetInputFormat. To change the no of partitions write a new input format 
 and make the NewHadoopRDD use your plan. or if u r ready to shuffle u can use 
 repartition api without change of code.
 
 Thanks  Regards.
 
 On Tue, May 5, 2015 at 7:56 PM, Masf masfwo...@gmail.com 
 mailto:masfwo...@gmail.com wrote:
 Hi Eric.
 
 Q1:
 When I read parquet files, I've tested that Spark generates so many 
 partitions as parquet files exist in the path.
 
 Q2:
 To reduce the number of partitions you can use rdd.repartition(x), x= number 
 of partitions. Depend on your case, repartition could be a heavy task
 
 
 Regards.
 Miguel.
 
 On Tue, May 5, 2015 at 3:56 PM, Eric Eijkelenboom 
 eric.eijkelenb...@gmail.com mailto:eric.eijkelenb...@gmail.com wrote:
 Hello guys
 
 Q1: How does Spark determine the number of partitions when reading a Parquet 
 file?
 
 val df = sqlContext.parquetFile(path)
 
 Is it some way related to the number of Parquet row groups in my input?
 
 Q2: How can I reduce this number of partitions? Doing this:
 
 df.rdd.coalesce(200).count
 
 from the spark-shell causes job execution to hang…
 
 Any ideas? Thank you in advance.
 
 Eric
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 mailto:user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org 
 mailto:user-h...@spark.apache.org
 
 
 
 
 -- 
 
 
 Saludos.
 Miguel Ángel
 



Re: Parquet number of partitions

2015-05-05 Thread Masf
Hi Eric.

Q1:
When I read parquet files, I've tested that Spark generates so many
partitions as parquet files exist in the path.

Q2:
To reduce the number of partitions you can use rdd.repartition(x), x=
number of partitions. Depend on your case, repartition could be a heavy task


Regards.
Miguel.

On Tue, May 5, 2015 at 3:56 PM, Eric Eijkelenboom 
eric.eijkelenb...@gmail.com wrote:

 Hello guys

 Q1: How does Spark determine the number of partitions when reading a
 Parquet file?

 val df = sqlContext.parquetFile(path)

 Is it some way related to the number of Parquet row groups in my input?

 Q2: How can I reduce this number of partitions? Doing this:

 df.rdd.coalesce(200).count

 from the spark-shell causes job execution to hang…

 Any ideas? Thank you in advance.

 Eric
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 


Saludos.
Miguel Ángel


Parquet number of partitions

2015-05-05 Thread Eric Eijkelenboom
Hello guys

Q1: How does Spark determine the number of partitions when reading a Parquet 
file?

val df = sqlContext.parquetFile(path)

Is it some way related to the number of Parquet row groups in my input?

Q2: How can I reduce this number of partitions? Doing this:

df.rdd.coalesce(200).count

from the spark-shell causes job execution to hang… 

Any ideas? Thank you in advance. 

Eric
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org