btw: Shuffle Write(11 GB) mean 11 GB per Executor, for each task, it's ~40 MB
2015-01-21 17:53 GMT+08:00 Fengyun RAO <raofeng...@gmail.com>: > I don't know how to debug distributed application, any tools or suggestion? > > but from spark web UI, > > the GC time (~0.1 s), Shuffle Write(11 GB) are similar for spark 1.1 and > 1.2. > there are no Shuffle Read and Spill. > The only difference is Duration > DurationMin25th percentileMedian75th percentileMaxspark 1.24s37s45s53s1.9 > minspark 1.12 s17 s18 s18 s34 s > > 2015-01-21 16:56 GMT+08:00 Sean Owen <so...@cloudera.com>: > >> I mean that if you had tasks running on 10 machines now instead of 3 for >> some reason you would have more than 3 times the read load on your source >> of data all at once. Same if you made more executors per machine. But from >> your additional info it does not sound like this is the case. I think you >> need more debugging to pinpoint what is slower. >> On Jan 21, 2015 9:30 AM, "Fengyun RAO" <raofeng...@gmail.com> wrote: >> >>> thanks, Sean. >>> >>> I don't quite understand "you have *more *partitions across *more * >>> workers". >>> >>> It's within the same cluster, and the same data, thus I think the same >>> partition, the same workers. >>> >>> we switched from spark 1.1 to 1.2, then it's 3x slower. >>> >>> (We upgrade from CDH 5.2.1 to CDH 5.3, hence spark 1.1 to 1.2, and found >>> the problem. >>> then we installed a standalone spark 1.1, stop the 1.2, run the same >>> script, it's 3x faster. >>> stop 1.1, start 1.2, 3x slower again) >>> >>> >>> 2015-01-21 15:45 GMT+08:00 Sean Owen <so...@cloudera.com>: >>> >>>> I don't know of any reason to think the singleton pattern doesn't work >>>> or works differently. I wonder if, for example, task scheduling is >>>> different in 1.2 and you have more partitions across more workers and so >>>> are loading more copies more slowly into your singletons. >>>> On Jan 21, 2015 7:13 AM, "Fengyun RAO" <raofeng...@gmail.com> wrote: >>>> >>>>> the LogParser instance is not serializable, and thus cannot be a >>>>> broadcast, >>>>> >>>>> what’s worse, it contains an LRU cache, which is essential to the >>>>> performance, and we would like to share among all the tasks on the same >>>>> node. >>>>> >>>>> If it is the case, what’s the recommended way to share a variable >>>>> among all the tasks within the same executor. >>>>> >>>>> >>>>> 2015-01-21 15:04 GMT+08:00 Davies Liu <dav...@databricks.com>: >>>>> >>>>>> Maybe some change related to serialize the closure cause LogParser is >>>>>> not a singleton any more, then it is initialized for every task. >>>>>> >>>>>> Could you change it to a Broadcast? >>>>>> >>>>>> On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO <raofeng...@gmail.com> >>>>>> wrote: >>>>>> > Currently we are migrating from spark 1.1 to spark 1.2, but found >>>>>> the >>>>>> > program 3x slower, with nothing else changed. >>>>>> > note: our program in spark 1.1 has successfully processed a whole >>>>>> year data, >>>>>> > quite stable. >>>>>> > >>>>>> > the main script is as below >>>>>> > >>>>>> > sc.textFile(inputPath) >>>>>> > .flatMap(line => LogParser.parseLine(line)) >>>>>> > .groupByKey(new HashPartitioner(numPartitions)) >>>>>> > .mapPartitionsWithIndex(...) >>>>>> > .foreach(_ => {}) >>>>>> > >>>>>> > where LogParser is a singleton which may take some time to >>>>>> initialized and >>>>>> > is shared across the execuator. >>>>>> > >>>>>> > the flatMap stage is 3x slower. >>>>>> > >>>>>> > We tried to change spark.shuffle.manager back to hash, and >>>>>> > spark.shuffle.blockTransferService back to nio, but didn’t help. >>>>>> > >>>>>> > May somebody explain possible causes, or what should we test or >>>>>> change to >>>>>> > find it out >>>>>> >>>>> >>>>> >>> >