Hi, Davies The log shows that LogParser initializes and loads data once per executor, thus I think singleton still works. I change the code to
sc.textFile(inputPath) .flatMap(line => LogParser.parseLine(line)) .foreach(_ => {}) to avoid shuffle IO, but it’s slower. I thought it may be caused by the cache in LogParser, so I mocked a class to avoid the cache, unfortunately it’s still slower. 2015-01-22 4:33 GMT+08:00 Davies Liu <dav...@databricks.com>: On Tue, Jan 20, 2015 at 11:13 PM, Fengyun RAO <raofeng...@gmail.com> wrote: > > the LogParser instance is not serializable, and thus cannot be a > broadcast, > > You could create a empty LogParser object (it's serializable), then > load the data > in executor lazily. > > Could you add some logging to LogParser to check the behavior between > Spark 1.1 and 1.2 (the number of times to load data)? > > > what’s worse, it contains an LRU cache, which is essential to the > > performance, and we would like to share among all the tasks on the same > > node. > > > > If it is the case, what’s the recommended way to share a variable among > all > > the tasks within the same executor. > > > > > > 2015-01-21 15:04 GMT+08:00 Davies Liu <dav...@databricks.com>: > >> > >> Maybe some change related to serialize the closure cause LogParser is > >> not a singleton any more, then it is initialized for every task. > >> > >> Could you change it to a Broadcast? > >> > >> On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO <raofeng...@gmail.com> > >> wrote: > >> > Currently we are migrating from spark 1.1 to spark 1.2, but found the > >> > program 3x slower, with nothing else changed. > >> > note: our program in spark 1.1 has successfully processed a whole year > >> > data, > >> > quite stable. > >> > > >> > the main script is as below > >> > > >> > sc.textFile(inputPath) > >> > .flatMap(line => LogParser.parseLine(line)) > >> > .groupByKey(new HashPartitioner(numPartitions)) > >> > .mapPartitionsWithIndex(...) > >> > .foreach(_ => {}) > >> > > >> > where LogParser is a singleton which may take some time to initialized > >> > and > >> > is shared across the execuator. > >> > > >> > the flatMap stage is 3x slower. > >> > > >> > We tried to change spark.shuffle.manager back to hash, and > >> > spark.shuffle.blockTransferService back to nio, but didn’t help. > >> > > >> > May somebody explain possible causes, or what should we test or change > >> > to > >> > find it out > > > > >