I using elastic4s inside my ESWorker class. ESWorker now only contain two field, host:String, port:Int. Now Inside the "findNearestCity" method I create ElasticClient (elastic4s) connection. What's wrong with my class? I need to serialize ElasticClient? mappartition is sounds good but I still got NotSerializableException, or I must mar kit to transient? and where come the host and port in this case?
my worker: class ESWorker(val host: String, val port: Int) { def findNearestCity(geo: Position): Option[City] = { //Here I create ElasticClient connection and execute queries } def enrichGeoInternal(data:Data):Data = { data.location=findNearestCity(data.position) } def enrichGeo(ds: DStream[Data]): DStream[Data] = { ds.map(enrichGeoInternal) } } ---------------------------------------------------------------------------------------------------------------------------------- Skype: boci13, Hangout: boci.b...@gmail.com On Wed, Jun 25, 2014 at 1:03 AM, Mayur Rustagi <mayur.rust...@gmail.com> wrote: > Mostly ES client is not serializable for you. You can do 3 workarounds, > 1. Switch to kryo serialization, register the client in kryo , might solve > your serialization issue > 2. Use mappartition for all your data & initialize your client in the > mappartition code, this will create client for each partition, reduce some > parallelism & add some overhead of creation of client but prevent > serialization of esclient & transfer to workers > 3. Use serializablewrapper to serialize your ESclient manually & send it > across & deserialize it manually, this may or may not work depending on > whether your class is safely serializable. > > Mayur Rustagi > Ph: +1 (760) 203 3257 > http://www.sigmoidanalytics.com > @mayur_rustagi <https://twitter.com/mayur_rustagi> > > > > On Wed, Jun 25, 2014 at 4:12 AM, boci <boci.b...@gmail.com> wrote: > >> Hi guys, >> >> I have a small question. I want to create a "Worker" class which using >> ElasticClient to make query to elasticsearch. (I want to enrich my data >> with geo search result). >> >> How can I do that? I try to create a worker instance with ES host/port >> parameter but spark throw an exceptino (my class not serializable). >> >> Any idea? >> >> Thanks >> b0c1 >> >> >