Ah my bad, it works without serializable exception. But not much performance difference is there though.
Thanks Best Regards On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das <ak...@sigmoidanalytics.com> wrote: > Thanks for the suggestion, but doing that gives me this exception: > > http://pastebin.com/ni80NqKn > > Over this piece of code: > > object Holder extends Serializable { > @transient lazy val mapper = new ObjectMapper() with > ScalaObjectMapper > mapper.registerModule(DefaultScalaModule) > } > > val jsonStream = myDStream.map(x=> { > Holder.mapper.readValue[Map[String,Any]](x) > }) > > Thanks > Best Regards > > On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji <eshi...@gmail.com> wrote: > >> (adding back user) >> >> Fair enough. Regarding serialization exception, the hack I use is to have >> a object with a transient lazy field, like so: >> >> >> object Holder extends Serializable { >> @transient lazy val mapper = new ObjectMapper() >> } >> >> This way, the ObjectMapper will be instantiated at the destination and >> you can share the instance. >> >> >> >> ᐧ >> >> On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das <ak...@sigmoidanalytics.com> >> wrote: >> >>> Thanks for the reply Enno, in my case rate from the stream is not the >>> bottleneck as i'm able to consume all those records at a time (have tested >>> it). And regarding the ObjectMapper, if i take it outside of my map >>> operation then it throws Serializable Exceptions (Caused by: >>> java.io.NotSerializableException: >>> com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier). >>> >>> Thanks >>> Best Regards >>> >>> On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji <eshi...@gmail.com> wrote: >>> >>>> If I were you I'd first parse some test jsons in isolation (outside >>>> Spark) to determine if the bottleneck is really the parsing. There are >>>> plenty other places that could be affecting your performance, like the rate >>>> you are able to read from your stream source etc. >>>> >>>> Apart from that, I notice that you are instantiating the ObjectMapper >>>> every time. This is quite expensive and jackson recommends you to share the >>>> instance. However, if you tried other parsers / mapPartitions without >>>> success, this probably won't fix your problem either. >>>> >>>> >>>> >>>> >>>> >>>> On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das <ak...@sigmoidanalytics.com> >>>> wrote: >>>> >>>>> I'm getting a low performance while parsing json data. My cluster >>>>> setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory >>>>> and 4 cores. >>>>> >>>>> I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson >>>>> parser. >>>>> >>>>> This is what i basically do: >>>>> >>>>> *//Approach 1:* >>>>> val jsonStream = myDStream.map(x=> { >>>>> val mapper = new ObjectMapper() with ScalaObjectMapper >>>>> mapper.registerModule(DefaultScalaModule) >>>>> mapper.readValue[Map[String,Any]](x) >>>>> }) >>>>> >>>>> jsonStream.count().print() >>>>> >>>>> >>>>> *//Approach 2:* >>>>> val jsonStream2 = >>>>> myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String, >>>>> Any]]) >>>>> >>>>> jsonStream2.count().print() >>>>> >>>>> >>>>> >>>>> It takes around 15-20 Seconds to process/parse 35k json documents >>>>> (contains nested documents and arrays) which i put in the stream. >>>>> >>>>> Is there any better approach/parser to process it faster? i also tried >>>>> it with mapPartitions but it did not make any difference. >>>>> >>>>> >>>>> >>>>> >>>>> Thanks >>>>> Best Regards >>>>> >>>> >>>> >>> >> >