Re: sparkcontext.objectFile return thousands of partitions

2015-01-22 Thread Imran Rashid
I think you should also just be able to provide an input format that never
splits the input data.  This has come up before on the list, but I couldn't
find it.*

I think this should work, but I can't try it out at the moment.  Can you
please try and let us know if it works?

class TextFormatNoSplits extends TextInputFormat {
  override def isSplitable(fs: FileSystem, file: Path): Boolean = false
}

def textFileNoSplits(sc: SparkContext, path: String): RDD[String] = {
  //note this is just a copy of sc.textFile, with a different
InputFormatClass
  sc.hadoopFile(path, classOf[TextFormatNoSplits], classOf[LongWritable],
classOf[Text]).map(pair => pair._2.toString).setName(path)
}


* yes I realize the irony given the recent discussion about mailing list
vs. stackoverflow ...

On Thu, Jan 22, 2015 at 11:01 AM, Sean Owen  wrote:

> Yes, that second argument is what I was referring to, but yes it's a
> *minimum*, oops, right. OK, you will want to coalesce then, indeed.
>
> On Thu, Jan 22, 2015 at 6:51 PM, Wang, Ningjun (LNG-NPV)
>  wrote:
> > Ø  If you know that this number is too high you can request a number of
> > partitions when you read it.
> >
> >
> >
> > How to do that? Can you give a code snippet? I want to read it into 8
> > partitions, so I do
> >
> >
> >
> > val rdd2 = sc.objectFile[LabeledPoint]( (“file:///tmp/mydir”, 8)
> >
> > However rdd2 contains thousands of partitions instead of 8 partitions
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: sparkcontext.objectFile return thousands of partitions

2015-01-22 Thread Sean Owen
Yes, that second argument is what I was referring to, but yes it's a
*minimum*, oops, right. OK, you will want to coalesce then, indeed.

On Thu, Jan 22, 2015 at 6:51 PM, Wang, Ningjun (LNG-NPV)
 wrote:
> Ø  If you know that this number is too high you can request a number of
> partitions when you read it.
>
>
>
> How to do that? Can you give a code snippet? I want to read it into 8
> partitions, so I do
>
>
>
> val rdd2 = sc.objectFile[LabeledPoint]( (“file:///tmp/mydir”, 8)
>
> However rdd2 contains thousands of partitions instead of 8 partitions
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: sparkcontext.objectFile return thousands of partitions

2015-01-22 Thread Wang, Ningjun (LNG-NPV)
Sean

You said


Ø  If you know that this number is too high you can request a number of 
partitions when you read it.

How to do that? Can you give a code snippet? I want to read it into 8 
partitions, so I do

val rdd2 = sc.objectFile[LabeledPoint]( 
(“file:///tmp/mydir”, 8)
However rdd2 contains thousands of partitions instead of 8 partitions


Regards,

Ningjun Wang
Consulting Software Engineer
LexisNexis
121 Chanlon Road
New Providence, NJ 07974-1541

From: Sean Owen [mailto:so...@cloudera.com]
Sent: Wednesday, January 21, 2015 2:32 PM
To: Wang, Ningjun (LNG-NPV)
Cc: user@spark.apache.org
Subject: Re: sparkcontext.objectFile return thousands of partitions


You have 8 files, not 8 partitions. It does not follow that they should be read 
as 8 partitions since they are presumably large and so you would be stuck using 
at most 8 tasks in parallel to process. The number of partitions is determined 
by Hadoop input splits and generally makes a partition per block of data. If 
you know that this number is too high you can request a number of partitions 
when you read it. Don't coalesce, just read the desired number from the start.
On Jan 21, 2015 4:32 PM, "Wang, Ningjun (LNG-NPV)" 
mailto:ningjun.w...@lexisnexis.com>> wrote:
Why sc.objectFile(…) return a Rdd with thousands of partitions?

I save a rdd to file system using

rdd.saveAsObjectFile(“file:///tmp/mydir”)

Note that the rdd contains 7 millions object. I check the directory 
/tmp/mydir/, it contains 8 partitions

part-0  part-2  part-4  part-6  _SUCCESS
part-1  part-3  part-5  part-7

I then load the rdd back using

val rdd2 = sc.objectFile[LabeledPoint]( 
(“file:///tmp/mydir”, 8)

I expect rdd2 to have 8 partitions. But from the master UI, I see that rdd2 has 
over 1000 partitions. This is very inefficient. How can I limit it to 8 
partitions just like what is stored on the file system?

Regards,

Ningjun Wang
Consulting Software Engineer
LexisNexis
121 Chanlon Road
New Providence, NJ 07974-1541



Re: sparkcontext.objectFile return thousands of partitions

2015-01-21 Thread Sean Owen
You have 8 files, not 8 partitions. It does not follow that they should be
read as 8 partitions since they are presumably large and so you would be
stuck using at most 8 tasks in parallel to process. The number of
partitions is determined by Hadoop input splits and generally makes a
partition per block of data. If you know that this number is too high you
can request a number of partitions when you read it. Don't coalesce, just
read the desired number from the start.
On Jan 21, 2015 4:32 PM, "Wang, Ningjun (LNG-NPV)" <
ningjun.w...@lexisnexis.com> wrote:

>  Why sc.objectFile(…) return a Rdd with thousands of partitions?
>
>
>
> I save a rdd to file system using
>
>
>
> rdd.saveAsObjectFile(“file:///tmp/mydir”)
>
>
>
> Note that the rdd contains 7 millions object. I check the directory
> /tmp/mydir/, it contains 8 partitions
>
>
>
> part-0  part-2  part-4  part-6  _SUCCESS
>
> part-1  part-3  part-5  part-7
>
>
>
> I then load the rdd back using
>
>
>
> val rdd2 = sc.objectFile[LabeledPoint]( (“file:///tmp/mydir”, 8)
>
>
>
> I expect rdd2 to have 8 partitions. But from the master UI, I see that
> rdd2 has over 1000 partitions. This is very inefficient. How can I limit it
> to 8 partitions just like what is stored on the file system?
>
>
>
> Regards,
>
>
>
> *Ningjun Wang*
>
> Consulting Software Engineer
>
> LexisNexis
>
> 121 Chanlon Road
>
> New Providence, NJ 07974-1541
>
>
>


Re: sparkcontext.objectFile return thousands of partitions

2015-01-21 Thread Noam Barcay
maybe each of the file parts has many blocks?
did you try SparkContext.coalesce to reduce the number of partitions? can
be done w/ or w/o data-shuffle.

*Noam Barcay*
Developer // *Kenshoo*
*Office* +972 3 746-6500 *427 // *Mobile* +972 54 475-3142
__
*www.Kenshoo.com* 

On Wed, Jan 21, 2015 at 5:31 PM, Wang, Ningjun (LNG-NPV) <
ningjun.w...@lexisnexis.com> wrote:

>  Why sc.objectFile(…) return a Rdd with thousands of partitions?
>
>
>
> I save a rdd to file system using
>
>
>
> rdd.saveAsObjectFile(“file:///tmp/mydir”)
>
>
>
> Note that the rdd contains 7 millions object. I check the directory
> /tmp/mydir/, it contains 8 partitions
>
>
>
> part-0  part-2  part-4  part-6  _SUCCESS
>
> part-1  part-3  part-5  part-7
>
>
>
> I then load the rdd back using
>
>
>
> val rdd2 = sc.objectFile[LabeledPoint]( (“file:///tmp/mydir”, 8)
>
>
>
> I expect rdd2 to have 8 partitions. But from the master UI, I see that
> rdd2 has over 1000 partitions. This is very inefficient. How can I limit it
> to 8 partitions just like what is stored on the file system?
>
>
>
> Regards,
>
>
>
> *Ningjun Wang*
>
> Consulting Software Engineer
>
> LexisNexis
>
> 121 Chanlon Road
>
> New Providence, NJ 07974-1541
>
>
>

-- 
This e-mail, as well as any attached document, may contain material which 
is confidential and privileged and may include trademark, copyright and 
other intellectual property rights that are proprietary to Kenshoo Ltd, 
 its subsidiaries or affiliates ("Kenshoo"). This e-mail and its 
attachments may be read, copied and used only by the addressee for the 
purpose(s) for which it was disclosed herein. If you have received it in 
error, please destroy the message and any attachment, and contact us 
immediately. If you are not the intended recipient, be aware that any 
review, reliance, disclosure, copying, distribution or use of the contents 
of this message without Kenshoo's express permission is strictly prohibited.