I using elastic4s inside my ESWorker class. ESWorker now only contain two
field, host:String, port:Int. Now Inside the "findNearestCity" method I
create ElasticClient (elastic4s) connection. What's wrong with my class? I
need to serialize ElasticClient? mappartition is sounds good but I still
got NotSerializableException, or I must mar kit to transient? and where
come the host and port in this case?

my worker:

class ESWorker(val host: String, val port: Int) {
  def findNearestCity(geo: Position): Option[City] = {
     //Here I create ElasticClient connection and execute queries
  }
  def enrichGeoInternal(data:Data):Data = {
     data.location=findNearestCity(data.position)
  }
  def enrichGeo(ds: DStream[Data]): DStream[Data] = {
     ds.map(enrichGeoInternal)
  }
}



----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: boci.b...@gmail.com


On Wed, Jun 25, 2014 at 1:03 AM, Mayur Rustagi <mayur.rust...@gmail.com>
wrote:

> Mostly ES client is not serializable for you. You can do 3 workarounds,
> 1. Switch to kryo serialization, register the client in kryo , might solve
> your serialization issue
> 2. Use mappartition for all your data & initialize your client in the
> mappartition code, this will create client for each partition, reduce some
> parallelism & add some overhead of creation of client but prevent
> serialization of esclient & transfer to workers
> 3. Use serializablewrapper to serialize your ESclient manually & send it
> across & deserialize it manually, this may or may not work depending on
> whether your class is safely serializable.
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>
>
>
> On Wed, Jun 25, 2014 at 4:12 AM, boci <boci.b...@gmail.com> wrote:
>
>> Hi guys,
>>
>> I have a small question. I want to create a "Worker" class which using
>> ElasticClient to make query to elasticsearch. (I want to enrich my data
>> with geo search result).
>>
>> How can I do that? I try to create a worker instance with ES host/port
>> parameter but spark throw an exceptino (my class not serializable).
>>
>> Any idea?
>>
>> Thanks
>> b0c1
>>
>>
>

Reply via email to