I see. I'd really benchmark how the parsing performs outside Spark (in a
tight loop or something). If *that* is slow, you know it's the parsing. If
not, it's not the parsing.

Another thing you want to look at is CPU usage. If the actual parsing
really is the bottleneck, you should see very high CPU utilization. If not,
it's not the parsing per se but rather the ability to feed the messages to
the parsing library.


On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das <ak...@sigmoidanalytics.com>

> Ah my bad, it works without serializable exception. But not much
> performance difference is there though.
> Thanks
> Best Regards
> On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>> Thanks for the suggestion, but doing that gives me this exception:
>> http://pastebin.com/ni80NqKn
>> Over this piece of code:
>>    object Holder extends Serializable {
>>       @transient lazy val mapper = new ObjectMapper() with
>> ScalaObjectMapper
>>       mapper.registerModule(DefaultScalaModule)
>>     }
>>     val jsonStream = myDStream.map(x=> {
>>        Holder.mapper.readValue[Map[String,Any]](x)
>>     })
>> Thanks
>> Best Regards
>> On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji <eshi...@gmail.com> wrote:
>>> (adding back user)
>>> Fair enough. Regarding serialization exception, the hack I use is to
>>> have a object with a transient lazy field, like so:
>>> object Holder extends Serializable {
>>>   @transient lazy val mapper = new ObjectMapper()
>>> }
>>> This way, the ObjectMapper will be instantiated at the destination and
>>> you can share the instance.
>>> ᐧ
>>> On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das <ak...@sigmoidanalytics.com>
>>> wrote:
>>>> Thanks for the reply Enno, in my case rate from the stream is not the
>>>> bottleneck as i'm able to consume all those records at a time (have tested
>>>> it). And regarding the ObjectMapper, if i take it outside of my map
>>>> operation then it throws Serializable Exceptions (Caused by:
>>>> java.io.NotSerializableException:
>>>> com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).
>>>> Thanks
>>>> Best Regards
>>>> On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji <eshi...@gmail.com> wrote:
>>>>> If I were you I'd first parse some test jsons in isolation (outside
>>>>> Spark) to determine if the bottleneck is really the parsing. There are
>>>>> plenty other places that could be affecting your performance, like the 
>>>>> rate
>>>>> you are able to read from your stream source etc.
>>>>> Apart from that, I notice that you are instantiating the ObjectMapper
>>>>> every time. This is quite expensive and jackson recommends you to share 
>>>>> the
>>>>> instance. However, if you tried other parsers / mapPartitions without
>>>>> success, this probably won't fix your problem either.
>>>>> On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das <ak...@sigmoidanalytics.com
>>>>> > wrote:
>>>>>> I'm getting a low performance while parsing json data. My cluster
>>>>>> setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory
>>>>>> and 4 cores.
>>>>>> I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson
>>>>>> parser.
>>>>>> This is what i basically do:
>>>>>> *//Approach 1:*
>>>>>> val jsonStream = myDStream.map(x=> {
>>>>>>       val mapper = new ObjectMapper() with ScalaObjectMapper
>>>>>>       mapper.registerModule(DefaultScalaModule)
>>>>>>       mapper.readValue[Map[String,Any]](x)
>>>>>>     })
>>>>>> jsonStream.count().print()
>>>>>> *//Approach 2:*
>>>>>> val jsonStream2 =
>>>>>> myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
>>>>>> Any]])
>>>>>> jsonStream2.count().print()
>>>>>> It takes around 15-20 Seconds to process/parse 35k json documents
>>>>>> (contains nested documents and arrays) which i put in the stream.
>>>>>> Is there any better approach/parser to process it faster? i also
>>>>>> tried it with mapPartitions but it did not make any difference.
>>>>>> Thanks
>>>>>> Best Regards

Reply via email to