Whoops, you are right. Sorry for the misinformation. Indeed reduceByKey just calls combineByKey:
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = { combineByKey[V]((v: V) => v, func, func, partitioner) } (I think I confused reduceByKey with groupByKey.) On Wed, Apr 30, 2014 at 2:55 AM, Liu, Raymond <raymond....@intel.com> wrote: > Hi Daniel > > Thanks for your reply, While I think for reduceByKey, it will also > do map side combine, thus extra the result is the same, say, for each > partition, one entry per distinct word. In my case with javaserializer, > 240MB dataset yield to around 70MB shuffle data. Only that shuffle Spill ( > memory ) is abnormal, and sounds to me should not trigger at all. And, by > the way, this behavior only occurs in map out side, on reduce / shuffle > fetch side, this strange behavior won't happen. > > Best Regards, > Raymond Liu > > From: Daniel Darabos [mailto:daniel.dara...@lynxanalytics.com] > > I have no idea why shuffle spill is so large. But this might make it > smaller: > > val addition = (a: Int, b: Int) => a + b > val wordsCount = wordsPair.combineByKey(identity, addition, addition) > > This way only one entry per distinct word will end up in the shuffle for > each partition, instead of one entry per word occurrence. > > On Tue, Apr 29, 2014 at 7:48 AM, Liu, Raymond <raymond....@intel.com> > wrote: > Hi Patrick > > I am just doing simple word count , the data is generated by > hadoop random text writer. > > This seems to me not quite related to compress , If I turn off > compress on shuffle, the metrics is something like below for the smaller > 240MB Dataset. > > > Executor ID Address Task Time Total Tasks Failed Tasks > Succeeded Tasks Shuffle Read Shuffle Write Shuffle Spill (Memory) > Shuffle Spill (Disk) > 10 sr437:48527 35 s 8 0 8 0.0 B 2.5 MB > 2.2 GB 1291.2 KB > 12 sr437:46077 34 s 8 0 8 0.0 B 2.5 MB > 1822.6 MB 1073.3 KB > 13 sr434:37896 31 s 8 0 8 0.0 B 2.4 MB > 1099.2 MB 621.2 KB > 15 sr438:52819 31 s 8 0 8 0.0 B 2.5 MB > 1898.8 MB 1072.6 KB > 16 sr434:37103 32 s 8 0 8 0.0 B 2.4 MB > 1638.0 MB 1044.6 KB > > > And the program pretty simple: > > val files = sc.textFile(args(1)) > val words = files.flatMap(_.split(" ")) > val wordsPair = words.map(x => (x, 1)) > > val wordsCount = wordsPair.reduceByKey(_ + _) > val count = wordsCount.count() > > println("Number of words = " + count) > > > Best Regards, > Raymond Liu > > From: Patrick Wendell [mailto:pwend...@gmail.com] > > Could you explain more what your job is doing and what data types you are > using? These numbers alone don't necessarily indicate something is wrong. > The relationship between the in-memory and on-disk shuffle amount is > definitely a bit strange, the data gets compressed when written to disk, > but unless you have a weird dataset (E.g. all zeros) I wouldn't expect it > to compress _that_ much. > > On Mon, Apr 28, 2014 at 1:18 AM, Liu, Raymond <raymond....@intel.com> > wrote: > Hi > > > I am running a simple word count program on spark standalone > cluster. The cluster is made up of 6 node, each run 4 worker and each > worker own 10G memory and 16 core thus total 96 core and 240G memory. ( > well, also used to configed as 1 worker with 40G memory on each node ) > > I run a very small data set (2.4GB on HDFS on total) to confirm > the problem here as below: > > As you can read from part of the task metrics as below, I noticed > that the shuffle spill part of metrics indicate that there are something > wrong. > > Executor ID Address Task Time Total Tasks Failed Tasks > Succeeded Tasks Shuffle Read Shuffle Write Shuffle Spill (Memory) > Shuffle Spill (Disk) > 0 sr437:42139 29 s 4 0 4 0.0 B 4.3 MB > 23.6 GB 4.3 MB > 1 sr433:46935 1.1 min 4 0 4 0.0 B 4.2 MB > 19.0 GB 3.4 MB > 10 sr436:53277 26 s 4 0 4 0.0 B 4.3 MB > 25.6 GB 4.6 MB > 11 sr437:58872 32 s 4 0 4 0.0 B 4.3 MB > 25.0 GB 4.4 MB > 12 sr435:48358 27 s 4 0 4 0.0 B 4.3 MB > 25.1 GB 4.4 MB > > > You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x > of the actual shuffle data and Shuffle Spill (Disk), and also it seems to > me that by no means that the spill should trigger, since the memory is not > used up at all. > > To verify that I further reduce the data size to 240MB on total > > And here is the result: > > > Executor ID Address Task Time Total Tasks Failed Tasks > Succeeded Tasks Shuffle Read Shuffle Write Shuffle Spill (Memory) > Shuffle Spill (Disk) > 0 sr437:50895 15 s 4 0 4 0.0 B 703.0 KB > 80.0 MB 43.2 KB > 1 sr433:50207 17 s 4 0 4 0.0 B 704.7 KB > 389.5 MB 90.2 KB > 10 sr436:56352 16 s 4 0 4 0.0 B 700.9 KB > 814.9 MB 181.6 KB > 11 sr437:53099 15 s 4 0 4 0.0 B 689.7 KB > 0.0 B 0.0 B > 12 sr435:48318 15 s 4 0 4 0.0 B 702.1 KB > 427.4 MB 90.7 KB > 13 sr433:59294 17 s 4 0 4 0.0 B 704.8 KB > 779.9 MB 180.3 KB > > Nothing prevent spill from happening. > > Now, there seems to me that there must be something wrong with the spill > trigger codes. > > So anyone encounter this issue? By the way, I am using latest trunk code. > > > Best Regards, > Raymond Liu > >