On the other hand, I totally agree that memory usage in Spark is rather
opaque, and is one area where we could do a lot better at in terms of
communicating issues, through both docs and instrumentation. At least with
serialization and such, you can get meaningful exceptions (hopefully), but
OOMs are just blanket "something wasn't right somewhere." Debugging them
empirically would require deep diving into Spark's heap allocations, which
requires a lot more knowledge of Spark internals than should be required
for general usage.


On Tue, Oct 22, 2013 at 9:22 AM, Mark Hamstra <m...@clearstorydata.com>wrote:

> Yes, but that also illustrates the problem faced by anyone trying to write
> a "little white paper or guide lines" to make newbies' experience painless.
>  Distributed computing clusters are necessarily complex things, and
> problems can crop up in multiple locations, layers or subsystems.  It's
> just not feasible to quickly bring up to speed someone with no experience
> in distributed programming and cluster systems.  It takes a lot of
> knowledge, both broad and deep.  Very few people have the complete scope of
> knowledge and experience required, so creating, debugging and maintaining a
> cluster computing application almost always has to be a team effort.
>
> Support organizations and communities can replace some of the need for a
> knowledgeable and well-functioning team, but not all of it; and at some
> point you have to expect that debugging is going to take a considerable
> amount of painstaking, systematic effort -- including a close reading of
> the available docs.
>
> Several people are working on making more and better reference and
> training material available, and some of that will include trouble-shooting
> guidance, but that doesn't mean that there can ever be "one little paper"
> to solve newbies' (or more experienced developers') problems or provide
> adequate guidance.  There's just too much to cover and too many different
> kinds or levels of initial-user knowledge to make that completely feasible.
>
>
>
> On Tue, Oct 22, 2013 at 8:50 AM, Shay Seng <s...@1618labs.com> wrote:
>
>> Hey Mark, I didn't mean to say that the information isn't out there --
>> just that when something goes wrong with spark, the scope of what could be
>> wrong is so large - some bad setting with JVM, serializer, akka, badly
>> written scala code, algorithm wrong, check worker logs, check executor
>> stderrs, ....
>>
>> When I looked at this post this morning, my initial thought wasn't that
>> "countByValue" would be at fault. ...probably since I've only been using
>> Scala/Spark for a month or so.
>>
>> It was just a suggestion to help newbies come up to speed more quickly
>> and gain insights into how to debug issues.
>>
>>
>> On Tue, Oct 22, 2013 at 8:14 AM, Mark Hamstra <m...@clearstorydata.com>wrote:
>>
>>> There's no need to guess at that.  The docs tell you directly:
>>>
>>> def countByValue(): Map[T, Long]
>>>
>>> Return the count of each unique value in this RDD as a map of (value,
>>> count) pairs. The final combine step happens locally on the master,
>>> equivalent to running a single reduce task.
>>>
>>>
>>>
>>> On Tue, Oct 22, 2013 at 7:22 AM, Shay Seng <s...@1618labs.com> wrote:
>>>
>>>> Hi Matei,
>>>>
>>>> I've seen several memory tuning queries on this mailing list, and also
>>>> heard the same kinds of queries at the spark meetup. In fact the last
>>>> bullet point in Josh Carver(?) slides, the guy from Bizo, was "memory
>>>> tuning is still a mystery".
>>>>
>>>> I certainly had lots of issues in when I first started. From memory
>>>> issues to gc issues, things seem to run fine until you try something with
>>>> 500GB of data etc.
>>>>
>>>> I was wondering if you could write up a little white paper or some
>>>> guide lines on how to set memory values, and what to look at when something
>>>> goes wrong? Eg. I would never gave guessed that countByValue happens on a
>>>> single machine etc.
>>>>  On Oct 21, 2013 6:18 PM, "Matei Zaharia" <matei.zaha...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi there,
>>>>>
>>>>> The problem is that countByValue happens in only a single reduce task
>>>>> -- this is probably something we should fix but it's basically not 
>>>>> designed
>>>>> for lots of values. Instead, do the count in parallel as follows:
>>>>>
>>>>> val counts = mapped.map(str => (str, 1)).reduceByKey((a, b) => a + b)
>>>>>
>>>>> If this still has trouble, you can also increase the level of
>>>>> parallelism of reduceByKey by passing it a second parameter for the number
>>>>> of tasks (e.g. 100).
>>>>>
>>>>> BTW one other small thing with your code, flatMap should actually work
>>>>> fine if your function returns an Iterator to Traversable, so there's no
>>>>> need to call toList and return a Seq in ngrams; you can just return an
>>>>> Iterator[String].
>>>>>
>>>>> Matei
>>>>>
>>>>> On Oct 21, 2013, at 1:05 PM, Timothy Perrigo <tperr...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> > Hi everyone,
>>>>> > I am very new to Spark, so as a learning exercise I've set up a
>>>>> small cluster consisting of 4 EC2 m1.large instances (1 master, 3 slaves),
>>>>> which I'm hoping to use to calculate ngram frequencies from text files of
>>>>> various sizes (I'm not doing anything with them; I just thought this would
>>>>> be slightly more interesting than the usual 'word count' example).
>>>>>  Currently, I'm trying to work with a 1GB text file, but running into
>>>>> memory issues.  I'm wondering what parameters I should be setting (in
>>>>> spark-env.sh) in order to properly utilize the cluster.  Right now, I'd be
>>>>> happy just to have the process complete successfully with the 1 gig file,
>>>>> so I'd really appreciate any suggestions you all might have.
>>>>> >
>>>>> > Here's a summary of the code I'm running through the spark shell on
>>>>> the master:
>>>>> >
>>>>> > def ngrams(s: String, n: Int = 3): Seq[String] = {
>>>>> >   (s.split("\\s+").sliding(n)).filter(_.length ==
>>>>> n).map(_.mkString(" ")).map(_.trim).toList
>>>>> > }
>>>>> >
>>>>> > val text = sc.textFile("s3n://my-bucket/my-1gb-text-file")
>>>>> >
>>>>> > val mapped = text.filter(_.trim.length > 0).flatMap(ngrams(_, 3))
>>>>> >
>>>>> > So far so good; the problems come during the reduce phase.  With
>>>>> small files, I was able to issue the following to calculate the most
>>>>> frequently occurring trigram:
>>>>> >
>>>>> > val topNgram = (mapped countByValue) reduce((a:(String, Long),
>>>>> b:(String, Long)) => if (a._2 > b._2) a else b)
>>>>> >
>>>>> > With the 1 gig file, though, I've been running into OutOfMemory
>>>>> errors, so I decided to split the reduction to several steps, starting 
>>>>> with
>>>>> simply issuing countByValue of my "mapped" RDD, but I have yet to get it 
>>>>> to
>>>>> complete successfully.
>>>>> >
>>>>> > SPARK_MEM is currently set to 6154m.  I also bumped up the
>>>>> spark.akka.framesize setting to 500 (though at this point, I was grasping
>>>>> at straws; I'm not sure what a "proper" value would be).  What properties
>>>>> should I be setting for a job of this size on a cluster of 3 m1.large
>>>>> slaves? (The cluster was initially configured using the spark-ec2 
>>>>> scripts).
>>>>>  Also, programmatically, what should I be doing differently?  (For 
>>>>> example,
>>>>> should I be setting the minimum number of splits when reading the text
>>>>> file?  If so, what would be a good default?).
>>>>> >
>>>>> > I apologize for what I'm sure are very naive questions.  I think
>>>>> Spark is a fantastic project and have enjoyed working with it, but I'm
>>>>> still very much a newbie and would appreciate any help you all can provide
>>>>> (as well as any 'rules-of-thumb' or best practices I should be following).
>>>>> >
>>>>> > Thanks,
>>>>> > Tim Perrigo
>>>>>
>>>>>
>>>
>>
>

Reply via email to