Ok I found dynamic resources, but I have a frustrating problem. This is the flow: kafka -> enrich X -> enrich Y -> enrich Z -> foreachRDD -> save
My problem is: if I do this it's not work, the enrich functions not called, but if I put a print it's does. for example if I do this: kafka -> enrich X -> enrich Y -> print -> enrich Z -> foreachRDD The enrich X and enrich Y called but enrich Z not if I put the print after the enrich Z it's will be printed. How can I solve this? (what can I do to call the foreachRDD I put breakpoint inside the map function (where I'm generate the writable) but it's not called) Any idea? b0c1 ---------------------------------------------------------------------------------------------------------------------------------- Skype: boci13, Hangout: boci.b...@gmail.com On Fri, Jun 27, 2014 at 4:53 PM, boci <boci.b...@gmail.com> wrote: > Another question. In the foreachRDD I will initialize the JobConf, but in > this place how can I get information from the items? > I have an identifier in the data which identify the required ES index (so > how can I set dynamic index in the foreachRDD) ? > > b0c1 > > > ---------------------------------------------------------------------------------------------------------------------------------- > Skype: boci13, Hangout: boci.b...@gmail.com > > > On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau <hol...@pigscanfly.ca> > wrote: > >> Just your luck I happened to be working on that very talk today :) Let me >> know how your experiences with Elasticsearch & Spark go :) >> >> >> On Thu, Jun 26, 2014 at 3:17 PM, boci <boci.b...@gmail.com> wrote: >> >>> Wow, thanks your fast answer, it's help a lot... >>> >>> b0c1 >>> >>> >>> ---------------------------------------------------------------------------------------------------------------------------------- >>> Skype: boci13, Hangout: boci.b...@gmail.com >>> >>> >>> On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <hol...@pigscanfly.ca> >>> wrote: >>> >>>> Hi b0c1, >>>> >>>> I have an example of how to do this in the repo for my talk as well, >>>> the specific example is at >>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala >>>> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and >>>> then call saveAsHadoopDataset on the RDD that gets passed into the >>>> function we provide to foreachRDD. >>>> >>>> e.g. >>>> >>>> stream.foreachRDD{(data, time) => >>>> val jobconf = ... >>>> data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf) >>>> } >>>> >>>> Hope that helps :) >>>> >>>> Cheers, >>>> >>>> Holden :) >>>> >>>> >>>> On Thu, Jun 26, 2014 at 2:23 PM, boci <boci.b...@gmail.com> wrote: >>>> >>>>> Thanks. I without local option I can connect with es remote, now I >>>>> only have one problem. How can I use elasticsearch-hadoop with spark >>>>> streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my >>>>> second problem the output index is depend by the input data. >>>>> >>>>> Thanks >>>>> >>>>> >>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>> >>>>> >>>>> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath < >>>>> nick.pentre...@gmail.com> wrote: >>>>> >>>>>> You can just add elasticsearch-hadoop as a dependency to your project >>>>>> to user the ESInputFormat and ESOutputFormat ( >>>>>> https://github.com/elasticsearch/elasticsearch-hadoop). Some other >>>>>> basics here: >>>>>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html >>>>>> >>>>>> For testing, yes I think you will need to start ES in local mode >>>>>> (just ./bin/elasticsearch) and use the default config (host = localhost, >>>>>> port = 9200). >>>>>> >>>>>> >>>>>> On Thu, Jun 26, 2014 at 9:04 AM, boci <boci.b...@gmail.com> wrote: >>>>>> >>>>>>> That's okay, but hadoop has ES integration. what happened if I run >>>>>>> saveAsHadoopFile without hadoop (or I must need to pull up hadoop >>>>>>> programatically? (if I can)) >>>>>>> >>>>>>> b0c1 >>>>>>> >>>>>>> >>>>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>>>> >>>>>>> >>>>>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <hol...@pigscanfly.ca> >>>>>>> wrote: >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <boci.b...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi guys, thanks the direction now I have some problem/question: >>>>>>>>> - in local (test) mode I want to use ElasticClient.local to create >>>>>>>>> es connection, but in prodution I want to use ElasticClient.remote, >>>>>>>>> to this >>>>>>>>> I want to pass ElasticClient to mapPartitions, or what is the >>>>>>>>> best practices? >>>>>>>>> >>>>>>>> In this case you probably want to make the ElasticClient inside of >>>>>>>> mapPartitions (since it isn't serializable) and if you want to use a >>>>>>>> different client in local mode just have a flag that control what type >>>>>>>> of >>>>>>>> client you create. >>>>>>>> >>>>>>>>> - my stream output is write into elasticsearch. How can I >>>>>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local >>>>>>>>> environment? >>>>>>>>> >>>>>>>>> >>>>>>>> - After store the enriched data into ES, I want to generate >>>>>>>>> aggregated data (EsInputFormat) how can I test it in local? >>>>>>>>> >>>>>>>> I think the simplest thing to do would be use the same client in >>>>>>>> mode and just start single node elastic search cluster. >>>>>>>> >>>>>>>>> >>>>>>>>> Thanks guys >>>>>>>>> >>>>>>>>> b0c1 >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau < >>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>> >>>>>>>>>> So I'm giving a talk at the Spark summit on using Spark & >>>>>>>>>> ElasticSearch, but for now if you want to see a simple demo which >>>>>>>>>> uses >>>>>>>>>> elasticsearch for geo input you can take a look at my quick & dirty >>>>>>>>>> implementation with TopTweetsInALocation ( >>>>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala >>>>>>>>>> ). This approach uses the ESInputFormat which avoids the difficulty >>>>>>>>>> of >>>>>>>>>> having to manually create ElasticSearch clients. >>>>>>>>>> >>>>>>>>>> This approach might not work for your data, e.g. if you need to >>>>>>>>>> create a query for each record in your RDD. If this is the case, you >>>>>>>>>> could >>>>>>>>>> instead look at using mapPartitions and setting up your Elasticsearch >>>>>>>>>> connection inside of that, so you could then re-use the client for >>>>>>>>>> all of >>>>>>>>>> the queries on each partition. This approach will avoid having to >>>>>>>>>> serialize >>>>>>>>>> the Elasticsearch connection because it will be local to your >>>>>>>>>> function. >>>>>>>>>> >>>>>>>>>> Hope this helps! >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> >>>>>>>>>> Holden :) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi < >>>>>>>>>> mayur.rust...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Its not used as default serializer for some issues with >>>>>>>>>>> compatibility & requirement to register the classes.. >>>>>>>>>>> >>>>>>>>>>> Which part are you getting as nonserializable... you need to >>>>>>>>>>> serialize that class if you are sending it to spark workers inside >>>>>>>>>>> a map, >>>>>>>>>>> reduce , mappartition or any of the operations on RDD. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Mayur Rustagi >>>>>>>>>>> Ph: +1 (760) 203 3257 >>>>>>>>>>> http://www.sigmoidanalytics.com >>>>>>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> I'm afraid persisting connection across two tasks is a >>>>>>>>>>>> dangerous act as they >>>>>>>>>>>> can't be guaranteed to be executed on the same machine. Your ES >>>>>>>>>>>> server may >>>>>>>>>>>> think its a man-in-the-middle attack! >>>>>>>>>>>> >>>>>>>>>>>> I think its possible to invoke a static method that give you a >>>>>>>>>>>> connection in >>>>>>>>>>>> a local 'pool', so nothing will sneak into your closure, but >>>>>>>>>>>> its too complex >>>>>>>>>>>> and there should be a better option. >>>>>>>>>>>> >>>>>>>>>>>> Never use kryo before, if its that good perhaps we should use >>>>>>>>>>>> it as the >>>>>>>>>>>> default serializer >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> View this message in context: >>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html >>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>>>>>>> Nabble.com. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Cell : 425-233-8271 >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Cell : 425-233-8271 >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Cell : 425-233-8271 >>>> >>> >>> >> >> >> -- >> Cell : 425-233-8271 >> > >