I have a related question.  With Hadoop, I would do the same thing for
non-serializable objects and setup().  I also had a use case where it
was so expensive to initialize the non-serializable object that I
would make it a static member of the mapper, turn on JVM reuse across
tasks, and then prevent the reinitialization for every task on the
same node.  Is that easy to do with Spark?  Assuming Spark reuses the
JVM across tasks by default, then taking raofengyun's factory method
and it return a singleton should work, right?  Does Spark reuse JVMs
across tasks?

On Sat, Aug 9, 2014 at 7:48 AM, Fengyun RAO <raofeng...@gmail.com> wrote:
> Although nobody answers the Two questions, in my practice, it seems both are
> yes.
>
>
> 2014-08-04 19:50 GMT+08:00 Fengyun RAO <raofeng...@gmail.com>:
>>
>> object LogParserWrapper {
>>     private val logParser = {
>>         val settings = new ...
>>         val builders = new ....
>>         new LogParser(builders, settings)
>>     }
>>     def getParser = logParser
>> }
>>
>> object MySparkJob {
>>    def main(args: Array[String]) {
>>         val sc = new SparkContext()
>>         val lines = sc.textFile(arg(0))
>>
>>         val parsed = lines.map(line =>
>> LogParserWrapper.getParser.parse(line))
>>         ...
>> }
>>
>> Q1: Is this the right way to share LogParser instance among all tasks on
>> the same worker, if LogParser is not serializable?
>>
>> Q2: LogParser is read-only, but can LogParser hold a cache field such as a
>> ConcurrentHashMap where all tasks on the same worker try to get() and put()
>> items?
>>
>>
>> 2014-08-04 19:29 GMT+08:00 Sean Owen <so...@cloudera.com>:
>>
>>> The issue is that it's not clear what "parser" is. It's not shown in
>>> your code. The snippet you show does not appear to contain a parser
>>> object.
>>>
>>> On Mon, Aug 4, 2014 at 10:01 AM, Fengyun RAO <raofeng...@gmail.com>
>>> wrote:
>>> > Thanks, Sean!
>>> >
>>> > It works, but as the link in 2 - Why Is My Spark Job so Slow and Only
>>> > Using
>>> > a Single Thread? says " parser instance is now a singleton created in
>>> > the
>>> > scope of our driver program" which I thought was in the scope of
>>> > executor.
>>> > Am I wrong, or why?
>>> >
>>> > I didn't want the equivalent of "setup()" method, since I want to share
>>> > the
>>> > "parser" among tasks in the same worker node. It takes tens of seconds
>>> > to
>>> > initialize a "parser". What's more, I want to know if the "parser"
>>> > could
>>> > have a field such as ConcurrentHashMap which all tasks in the node may
>>> > get()
>>> > of put() items.
>>> >
>>> >
>>> >
>>> >
>>> > 2014-08-04 16:35 GMT+08:00 Sean Owen <so...@cloudera.com>:
>>> >
>>> >> The parser does not need to be serializable. In the line:
>>> >>
>>> >> lines.map(line => JSONParser.parse(line))
>>> >>
>>> >> ... the parser is called but there is no parser object that with state
>>> >> that can be serialized. Are you sure it does not work?
>>> >>
>>> >> The error message alluded to originally refers to an object not shown
>>> >> in the code, so I'm not 100% sure this was the original issue.
>>> >>
>>> >> If you want, the equivalent of "setup()" is really "writing some code
>>> >> at the start of a call to mapPartitions()"
>>> >>
>>> >> On Mon, Aug 4, 2014 at 8:40 AM, Fengyun RAO <raofeng...@gmail.com>
>>> >> wrote:
>>> >> > Thanks, Ron.
>>> >> >
>>> >> > The problem is that the "parser" is written in another package which
>>> >> > is
>>> >> > not
>>> >> > serializable.
>>> >> >
>>> >> > In mapreduce, I could create the "parser" in the map setup() method.
>>> >> >
>>> >> > Now in spark, I want to create it for each worker, and share it
>>> >> > among
>>> >> > all
>>> >> > the tasks on the same work node.
>>> >> >
>>> >> > I know different workers run on different machine, but it doesn't
>>> >> > have
>>> >> > to
>>> >> > communicate between workers.
>>> >
>>> >
>>
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to