Whoops, you are right. Sorry for the misinformation. Indeed reduceByKey
just calls combineByKey:

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] =
{
  combineByKey[V]((v: V) => v, func, func, partitioner)
}

(I think I confused reduceByKey with groupByKey.)


On Wed, Apr 30, 2014 at 2:55 AM, Liu, Raymond <raymond....@intel.com> wrote:

> Hi Daniel
>
>         Thanks for your reply, While I think for reduceByKey, it will also
> do map side combine, thus extra the result is the same, say, for each
> partition, one entry per distinct word. In my case with javaserializer,
>  240MB dataset yield to around 70MB shuffle data. Only that shuffle Spill (
> memory ) is abnormal, and sounds to me should not trigger at all. And, by
> the way, this behavior only occurs in map out side, on reduce / shuffle
> fetch side, this strange behavior won't happen.
>
> Best Regards,
> Raymond Liu
>
> From: Daniel Darabos [mailto:daniel.dara...@lynxanalytics.com]
>
> I have no idea why shuffle spill is so large. But this might make it
> smaller:
>
> val addition = (a: Int, b: Int) => a + b
> val wordsCount = wordsPair.combineByKey(identity, addition, addition)
>
> This way only one entry per distinct word will end up in the shuffle for
> each partition, instead of one entry per word occurrence.
>
> On Tue, Apr 29, 2014 at 7:48 AM, Liu, Raymond <raymond....@intel.com>
> wrote:
> Hi  Patrick
>
>         I am just doing simple word count , the data is generated by
> hadoop random text writer.
>
>         This seems to me not quite related to compress , If I turn off
> compress on shuffle, the metrics is something like below for the smaller
> 240MB Dataset.
>
>
> Executor ID     Address Task Time       Total Tasks     Failed Tasks
>  Succeeded Tasks Shuffle Read    Shuffle Write   Shuffle Spill (Memory)
>  Shuffle Spill (Disk)
> 10      sr437:48527     35 s    8       0       8       0.0 B   2.5 MB
>  2.2 GB  1291.2 KB
> 12      sr437:46077     34 s    8       0       8       0.0 B   2.5 MB
>  1822.6 MB       1073.3 KB
> 13      sr434:37896     31 s    8       0       8       0.0 B   2.4 MB
>  1099.2 MB       621.2 KB
> 15      sr438:52819     31 s    8       0       8       0.0 B   2.5 MB
>  1898.8 MB       1072.6 KB
> 16      sr434:37103     32 s    8       0       8       0.0 B   2.4 MB
>  1638.0 MB       1044.6 KB
>
>
>         And the program pretty simple:
>
> val files = sc.textFile(args(1))
> val words = files.flatMap(_.split(" "))
> val wordsPair = words.map(x => (x, 1))
>
> val wordsCount = wordsPair.reduceByKey(_ + _)
> val count = wordsCount.count()
>
> println("Number of words = " + count)
>
>
> Best Regards,
> Raymond Liu
>
> From: Patrick Wendell [mailto:pwend...@gmail.com]
>
> Could you explain more what your job is doing and what data types you are
> using? These numbers alone don't necessarily indicate something is wrong.
> The relationship between the in-memory and on-disk shuffle amount is
> definitely a bit strange, the data gets compressed when written to disk,
> but unless you have a weird dataset (E.g. all zeros) I wouldn't expect it
> to compress _that_ much.
>
> On Mon, Apr 28, 2014 at 1:18 AM, Liu, Raymond <raymond....@intel.com>
> wrote:
> Hi
>
>
>         I am running a simple word count program on spark standalone
> cluster. The cluster is made up of 6 node, each run 4 worker and each
> worker own 10G memory and 16 core thus total 96 core and 240G memory. (
> well, also used to configed as 1 worker with 40G memory on each node )
>
>         I run a very small data set (2.4GB on HDFS on total) to confirm
> the problem here as below:
>
>         As you can read from part of the task metrics as below, I noticed
> that the shuffle spill part of metrics indicate that there are something
> wrong.
>
> Executor ID     Address Task Time       Total Tasks     Failed Tasks
>  Succeeded Tasks Shuffle Read    Shuffle Write   Shuffle Spill (Memory)
>  Shuffle Spill (Disk)
> 0       sr437:42139     29 s    4       0       4       0.0 B   4.3 MB
>  23.6 GB 4.3 MB
> 1       sr433:46935     1.1 min 4       0       4       0.0 B   4.2 MB
>  19.0 GB 3.4 MB
> 10      sr436:53277     26 s    4       0       4       0.0 B   4.3 MB
>  25.6 GB 4.6 MB
> 11      sr437:58872     32 s    4       0       4       0.0 B   4.3 MB
>  25.0 GB 4.4 MB
> 12      sr435:48358     27 s    4       0       4       0.0 B   4.3 MB
>  25.1 GB 4.4 MB
>
>
> You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x
> of the actual shuffle data and Shuffle Spill (Disk), and also it seems to
> me that by no means that the spill should trigger, since the memory is not
> used up at all.
>
> To verify that I further reduce the data size to 240MB on total
>
> And here is the result:
>
>
> Executor ID     Address Task Time       Total Tasks     Failed Tasks
>  Succeeded Tasks Shuffle Read    Shuffle Write   Shuffle Spill (Memory)
>  Shuffle Spill (Disk)
> 0       sr437:50895     15 s    4       0       4       0.0 B   703.0 KB
>      80.0 MB 43.2 KB
> 1       sr433:50207     17 s    4       0       4       0.0 B   704.7 KB
>      389.5 MB        90.2 KB
> 10      sr436:56352     16 s    4       0       4       0.0 B   700.9 KB
>      814.9 MB        181.6 KB
> 11      sr437:53099     15 s    4       0       4       0.0 B   689.7 KB
>      0.0 B   0.0 B
> 12      sr435:48318     15 s    4       0       4       0.0 B   702.1 KB
>      427.4 MB        90.7 KB
> 13      sr433:59294     17 s    4       0       4       0.0 B   704.8 KB
>      779.9 MB        180.3 KB
>
> Nothing prevent spill from happening.
>
> Now, there seems to me that there must be something wrong with the spill
> trigger codes.
>
> So anyone encounter this issue?  By the way, I am using latest trunk code.
>
>
> Best Regards,
> Raymond Liu
>
>

Reply via email to