Although nobody answers the Two questions, in my practice, it seems both are yes.
2014-08-04 19:50 GMT+08:00 Fengyun RAO <raofeng...@gmail.com>: > object LogParserWrapper { > private val logParser = { > val settings = new ... > val builders = new .... > new LogParser(builders, settings) > } > def getParser = logParser > } > > object MySparkJob { > def main(args: Array[String]) { > val sc = new SparkContext() > val lines = sc.textFile(arg(0)) > > val parsed = lines.map(line => > LogParserWrapper.getParser.parse(line)) > ... > } > > Q1: Is this the right way to share LogParser instance among all tasks on > the same worker, if LogParser is not serializable? > > Q2: LogParser is read-only, but can LogParser hold a cache field such as a > ConcurrentHashMap where all tasks on the same worker try to get() and put() > items? > > > 2014-08-04 19:29 GMT+08:00 Sean Owen <so...@cloudera.com>: > > The issue is that it's not clear what "parser" is. It's not shown in >> your code. The snippet you show does not appear to contain a parser >> object. >> >> On Mon, Aug 4, 2014 at 10:01 AM, Fengyun RAO <raofeng...@gmail.com> >> wrote: >> > Thanks, Sean! >> > >> > It works, but as the link in 2 - Why Is My Spark Job so Slow and Only >> Using >> > a Single Thread? says " parser instance is now a singleton created in >> the >> > scope of our driver program" which I thought was in the scope of >> executor. >> > Am I wrong, or why? >> > >> > I didn't want the equivalent of "setup()" method, since I want to share >> the >> > "parser" among tasks in the same worker node. It takes tens of seconds >> to >> > initialize a "parser". What's more, I want to know if the "parser" could >> > have a field such as ConcurrentHashMap which all tasks in the node may >> get() >> > of put() items. >> > >> > >> > >> > >> > 2014-08-04 16:35 GMT+08:00 Sean Owen <so...@cloudera.com>: >> > >> >> The parser does not need to be serializable. In the line: >> >> >> >> lines.map(line => JSONParser.parse(line)) >> >> >> >> ... the parser is called but there is no parser object that with state >> >> that can be serialized. Are you sure it does not work? >> >> >> >> The error message alluded to originally refers to an object not shown >> >> in the code, so I'm not 100% sure this was the original issue. >> >> >> >> If you want, the equivalent of "setup()" is really "writing some code >> >> at the start of a call to mapPartitions()" >> >> >> >> On Mon, Aug 4, 2014 at 8:40 AM, Fengyun RAO <raofeng...@gmail.com> >> wrote: >> >> > Thanks, Ron. >> >> > >> >> > The problem is that the "parser" is written in another package which >> is >> >> > not >> >> > serializable. >> >> > >> >> > In mapreduce, I could create the "parser" in the map setup() method. >> >> > >> >> > Now in spark, I want to create it for each worker, and share it among >> >> > all >> >> > the tasks on the same work node. >> >> > >> >> > I know different workers run on different machine, but it doesn't >> have >> >> > to >> >> > communicate between workers. >> > >> > >> > >