Hi, Davies

The log shows that LogParser initializes and loads data once per executor,
thus I think singleton still works.
I change the code to

sc.textFile(inputPath)
.flatMap(line => LogParser.parseLine(line))
.foreach(_ => {})

to avoid shuffle IO, but it’s slower.

I thought it may be caused by the cache in LogParser, so I mocked a class
to avoid the cache,
unfortunately it’s still slower.

2015-01-22 4:33 GMT+08:00 Davies Liu <dav...@databricks.com>:

On Tue, Jan 20, 2015 at 11:13 PM, Fengyun RAO <raofeng...@gmail.com> wrote:
> > the LogParser instance is not serializable, and thus cannot be a
> broadcast,
>
> You could create a empty LogParser object (it's serializable), then
> load the data
> in executor lazily.
>
> Could you add some logging to LogParser to check the behavior between
> Spark 1.1 and 1.2 (the number of times to load data)?
>
> > what’s worse, it contains an LRU cache, which is essential to the
> > performance, and we would like to share among all the tasks on the same
> > node.
> >
> > If it is the case, what’s the recommended way to share a variable among
> all
> > the tasks within the same executor.
> >
> >
> > 2015-01-21 15:04 GMT+08:00 Davies Liu <dav...@databricks.com>:
> >>
> >> Maybe some change related to serialize the closure cause LogParser is
> >> not a singleton any more, then it is initialized for every task.
> >>
> >> Could you change it to a Broadcast?
> >>
> >> On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO <raofeng...@gmail.com>
> >> wrote:
> >> > Currently we are migrating from spark 1.1 to spark 1.2, but found the
> >> > program 3x slower, with nothing else changed.
> >> > note: our program in spark 1.1 has successfully processed a whole year
> >> > data,
> >> > quite stable.
> >> >
> >> > the main script is as below
> >> >
> >> > sc.textFile(inputPath)
> >> > .flatMap(line => LogParser.parseLine(line))
> >> > .groupByKey(new HashPartitioner(numPartitions))
> >> > .mapPartitionsWithIndex(...)
> >> > .foreach(_ => {})
> >> >
> >> > where LogParser is a singleton which may take some time to initialized
> >> > and
> >> > is shared across the execuator.
> >> >
> >> > the flatMap stage is 3x slower.
> >> >
> >> > We tried to change spark.shuffle.manager back to hash, and
> >> > spark.shuffle.blockTransferService back to nio, but didn’t help.
> >> >
> >> > May somebody explain possible causes, or what should we test or change
> >> > to
> >> > find it out
> >
> >
>
​

Reply via email to