There's no need to guess at that. The docs tell you directly: def countByValue(): Map[T, Long]
Return the count of each unique value in this RDD as a map of (value, count) pairs. The final combine step happens locally on the master, equivalent to running a single reduce task. On Tue, Oct 22, 2013 at 7:22 AM, Shay Seng <[email protected]> wrote: > Hi Matei, > > I've seen several memory tuning queries on this mailing list, and also > heard the same kinds of queries at the spark meetup. In fact the last > bullet point in Josh Carver(?) slides, the guy from Bizo, was "memory > tuning is still a mystery". > > I certainly had lots of issues in when I first started. From memory issues > to gc issues, things seem to run fine until you try something with 500GB of > data etc. > > I was wondering if you could write up a little white paper or some guide > lines on how to set memory values, and what to look at when something goes > wrong? Eg. I would never gave guessed that countByValue happens on a single > machine etc. > On Oct 21, 2013 6:18 PM, "Matei Zaharia" <[email protected]> wrote: > >> Hi there, >> >> The problem is that countByValue happens in only a single reduce task -- >> this is probably something we should fix but it's basically not designed >> for lots of values. Instead, do the count in parallel as follows: >> >> val counts = mapped.map(str => (str, 1)).reduceByKey((a, b) => a + b) >> >> If this still has trouble, you can also increase the level of parallelism >> of reduceByKey by passing it a second parameter for the number of tasks >> (e.g. 100). >> >> BTW one other small thing with your code, flatMap should actually work >> fine if your function returns an Iterator to Traversable, so there's no >> need to call toList and return a Seq in ngrams; you can just return an >> Iterator[String]. >> >> Matei >> >> On Oct 21, 2013, at 1:05 PM, Timothy Perrigo <[email protected]> wrote: >> >> > Hi everyone, >> > I am very new to Spark, so as a learning exercise I've set up a small >> cluster consisting of 4 EC2 m1.large instances (1 master, 3 slaves), which >> I'm hoping to use to calculate ngram frequencies from text files of various >> sizes (I'm not doing anything with them; I just thought this would be >> slightly more interesting than the usual 'word count' example). Currently, >> I'm trying to work with a 1GB text file, but running into memory issues. >> I'm wondering what parameters I should be setting (in spark-env.sh) in >> order to properly utilize the cluster. Right now, I'd be happy just to >> have the process complete successfully with the 1 gig file, so I'd really >> appreciate any suggestions you all might have. >> > >> > Here's a summary of the code I'm running through the spark shell on the >> master: >> > >> > def ngrams(s: String, n: Int = 3): Seq[String] = { >> > (s.split("\\s+").sliding(n)).filter(_.length == n).map(_.mkString(" >> ")).map(_.trim).toList >> > } >> > >> > val text = sc.textFile("s3n://my-bucket/my-1gb-text-file") >> > >> > val mapped = text.filter(_.trim.length > 0).flatMap(ngrams(_, 3)) >> > >> > So far so good; the problems come during the reduce phase. With small >> files, I was able to issue the following to calculate the most frequently >> occurring trigram: >> > >> > val topNgram = (mapped countByValue) reduce((a:(String, Long), >> b:(String, Long)) => if (a._2 > b._2) a else b) >> > >> > With the 1 gig file, though, I've been running into OutOfMemory errors, >> so I decided to split the reduction to several steps, starting with simply >> issuing countByValue of my "mapped" RDD, but I have yet to get it to >> complete successfully. >> > >> > SPARK_MEM is currently set to 6154m. I also bumped up the >> spark.akka.framesize setting to 500 (though at this point, I was grasping >> at straws; I'm not sure what a "proper" value would be). What properties >> should I be setting for a job of this size on a cluster of 3 m1.large >> slaves? (The cluster was initially configured using the spark-ec2 scripts). >> Also, programmatically, what should I be doing differently? (For example, >> should I be setting the minimum number of splits when reading the text >> file? If so, what would be a good default?). >> > >> > I apologize for what I'm sure are very naive questions. I think Spark >> is a fantastic project and have enjoyed working with it, but I'm still very >> much a newbie and would appreciate any help you all can provide (as well as >> any 'rules-of-thumb' or best practices I should be following). >> > >> > Thanks, >> > Tim Perrigo >> >>
