Although nobody answers the Two questions, in my practice, it seems both
are yes.


2014-08-04 19:50 GMT+08:00 Fengyun RAO <raofeng...@gmail.com>:

> object LogParserWrapper {
>     private val logParser = {
>         val settings = new ...
>         val builders = new ....
>         new LogParser(builders, settings)
>     }
>     def getParser = logParser
> }
>
> object MySparkJob {
>    def main(args: Array[String]) {
>         val sc = new SparkContext()
>         val lines = sc.textFile(arg(0))
>
>         val parsed = lines.map(line =>
> LogParserWrapper.getParser.parse(line))
>         ...
> }
>
> Q1: Is this the right way to share LogParser instance among all tasks on
> the same worker, if LogParser is not serializable?
>
> Q2: LogParser is read-only, but can LogParser hold a cache field such as a
> ConcurrentHashMap where all tasks on the same worker try to get() and put()
> items?
>
>
> 2014-08-04 19:29 GMT+08:00 Sean Owen <so...@cloudera.com>:
>
> The issue is that it's not clear what "parser" is. It's not shown in
>> your code. The snippet you show does not appear to contain a parser
>> object.
>>
>> On Mon, Aug 4, 2014 at 10:01 AM, Fengyun RAO <raofeng...@gmail.com>
>> wrote:
>> > Thanks, Sean!
>> >
>> > It works, but as the link in 2 - Why Is My Spark Job so Slow and Only
>> Using
>> > a Single Thread? says " parser instance is now a singleton created in
>> the
>> > scope of our driver program" which I thought was in the scope of
>> executor.
>> > Am I wrong, or why?
>> >
>> > I didn't want the equivalent of "setup()" method, since I want to share
>> the
>> > "parser" among tasks in the same worker node. It takes tens of seconds
>> to
>> > initialize a "parser". What's more, I want to know if the "parser" could
>> > have a field such as ConcurrentHashMap which all tasks in the node may
>> get()
>> > of put() items.
>> >
>> >
>> >
>> >
>> > 2014-08-04 16:35 GMT+08:00 Sean Owen <so...@cloudera.com>:
>> >
>> >> The parser does not need to be serializable. In the line:
>> >>
>> >> lines.map(line => JSONParser.parse(line))
>> >>
>> >> ... the parser is called but there is no parser object that with state
>> >> that can be serialized. Are you sure it does not work?
>> >>
>> >> The error message alluded to originally refers to an object not shown
>> >> in the code, so I'm not 100% sure this was the original issue.
>> >>
>> >> If you want, the equivalent of "setup()" is really "writing some code
>> >> at the start of a call to mapPartitions()"
>> >>
>> >> On Mon, Aug 4, 2014 at 8:40 AM, Fengyun RAO <raofeng...@gmail.com>
>> wrote:
>> >> > Thanks, Ron.
>> >> >
>> >> > The problem is that the "parser" is written in another package which
>> is
>> >> > not
>> >> > serializable.
>> >> >
>> >> > In mapreduce, I could create the "parser" in the map setup() method.
>> >> >
>> >> > Now in spark, I want to create it for each worker, and share it among
>> >> > all
>> >> > the tasks on the same work node.
>> >> >
>> >> > I know different workers run on different machine, but it doesn't
>> have
>> >> > to
>> >> > communicate between workers.
>> >
>> >
>>
>
>

Reply via email to