Ah my bad, it works without serializable exception. But not much
performance difference is there though.

Thanks
Best Regards

On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> Thanks for the suggestion, but doing that gives me this exception:
>
> http://pastebin.com/ni80NqKn
>
> Over this piece of code:
>
>    object Holder extends Serializable {
>       @transient lazy val mapper = new ObjectMapper() with
> ScalaObjectMapper
>       mapper.registerModule(DefaultScalaModule)
>     }
>
>     val jsonStream = myDStream.map(x=> {
>        Holder.mapper.readValue[Map[String,Any]](x)
>     })
>
> Thanks
> Best Regards
>
> On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji <eshi...@gmail.com> wrote:
>
>> (adding back user)
>>
>> Fair enough. Regarding serialization exception, the hack I use is to have
>> a object with a transient lazy field, like so:
>>
>>
>> object Holder extends Serializable {
>>   @transient lazy val mapper = new ObjectMapper()
>> }
>>
>> This way, the ObjectMapper will be instantiated at the destination and
>> you can share the instance.
>>
>>
>>
>> ᐧ
>>
>> On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das <ak...@sigmoidanalytics.com>
>> wrote:
>>
>>> Thanks for the reply Enno, in my case rate from the stream is not the
>>> bottleneck as i'm able to consume all those records at a time (have tested
>>> it). And regarding the ObjectMapper, if i take it outside of my map
>>> operation then it throws Serializable Exceptions (Caused by:
>>> java.io.NotSerializableException:
>>> com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji <eshi...@gmail.com> wrote:
>>>
>>>> If I were you I'd first parse some test jsons in isolation (outside
>>>> Spark) to determine if the bottleneck is really the parsing. There are
>>>> plenty other places that could be affecting your performance, like the rate
>>>> you are able to read from your stream source etc.
>>>>
>>>> Apart from that, I notice that you are instantiating the ObjectMapper
>>>> every time. This is quite expensive and jackson recommends you to share the
>>>> instance. However, if you tried other parsers / mapPartitions without
>>>> success, this probably won't fix your problem either.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das <ak...@sigmoidanalytics.com>
>>>> wrote:
>>>>
>>>>> I'm getting a low performance while parsing json data. My cluster
>>>>> setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory
>>>>> and 4 cores.
>>>>>
>>>>> I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson
>>>>> parser.
>>>>>
>>>>> This is what i basically do:
>>>>>
>>>>> *//Approach 1:*
>>>>> val jsonStream = myDStream.map(x=> {
>>>>>       val mapper = new ObjectMapper() with ScalaObjectMapper
>>>>>       mapper.registerModule(DefaultScalaModule)
>>>>>       mapper.readValue[Map[String,Any]](x)
>>>>>     })
>>>>>
>>>>> jsonStream.count().print()
>>>>>
>>>>>
>>>>> *//Approach 2:*
>>>>> val jsonStream2 =
>>>>> myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
>>>>> Any]])
>>>>>
>>>>> jsonStream2.count().print()
>>>>>
>>>>>
>>>>>
>>>>> It takes around 15-20 Seconds to process/parse 35k json documents
>>>>> (contains nested documents and arrays) which i put in the stream.
>>>>>
>>>>> Is there any better approach/parser to process it faster? i also tried
>>>>> it with mapPartitions but it did not make any difference.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Thanks
>>>>> Best Regards
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to