Hi b0c1, I have an example of how to do this in the repo for my talk as well, the specific example is at https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and then call saveAsHadoopDataset on the RDD that gets passed into the function we provide to foreachRDD.
e.g. stream.foreachRDD{(data, time) => val jobconf = ... data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf) } Hope that helps :) Cheers, Holden :) On Thu, Jun 26, 2014 at 2:23 PM, boci <boci.b...@gmail.com> wrote: > Thanks. I without local option I can connect with es remote, now I only > have one problem. How can I use elasticsearch-hadoop with spark streaming? > I mean DStream doesn't have "saveAsHadoopFiles" method, my second problem > the output index is depend by the input data. > > Thanks > > > ---------------------------------------------------------------------------------------------------------------------------------- > Skype: boci13, Hangout: boci.b...@gmail.com > > > On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <nick.pentre...@gmail.com > > wrote: > >> You can just add elasticsearch-hadoop as a dependency to your project to >> user the ESInputFormat and ESOutputFormat ( >> https://github.com/elasticsearch/elasticsearch-hadoop). Some other >> basics here: >> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html >> >> For testing, yes I think you will need to start ES in local mode (just >> ./bin/elasticsearch) and use the default config (host = localhost, port = >> 9200). >> >> >> On Thu, Jun 26, 2014 at 9:04 AM, boci <boci.b...@gmail.com> wrote: >> >>> That's okay, but hadoop has ES integration. what happened if I run >>> saveAsHadoopFile without hadoop (or I must need to pull up hadoop >>> programatically? (if I can)) >>> >>> b0c1 >>> >>> >>> ---------------------------------------------------------------------------------------------------------------------------------- >>> Skype: boci13, Hangout: boci.b...@gmail.com >>> >>> >>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <hol...@pigscanfly.ca> >>> wrote: >>> >>>> >>>> >>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <boci.b...@gmail.com> wrote: >>>> >>>>> Hi guys, thanks the direction now I have some problem/question: >>>>> - in local (test) mode I want to use ElasticClient.local to create es >>>>> connection, but in prodution I want to use ElasticClient.remote, to this I >>>>> want to pass ElasticClient to mapPartitions, or what is the best >>>>> practices? >>>>> >>>> In this case you probably want to make the ElasticClient inside of >>>> mapPartitions (since it isn't serializable) and if you want to use a >>>> different client in local mode just have a flag that control what type of >>>> client you create. >>>> >>>>> - my stream output is write into elasticsearch. How can I >>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment? >>>>> >>>>> >>>> - After store the enriched data into ES, I want to generate aggregated >>>>> data (EsInputFormat) how can I test it in local? >>>>> >>>> I think the simplest thing to do would be use the same client in mode >>>> and just start single node elastic search cluster. >>>> >>>>> >>>>> Thanks guys >>>>> >>>>> b0c1 >>>>> >>>>> >>>>> >>>>> >>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>> >>>>> >>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <hol...@pigscanfly.ca> >>>>> wrote: >>>>> >>>>>> So I'm giving a talk at the Spark summit on using Spark & >>>>>> ElasticSearch, but for now if you want to see a simple demo which uses >>>>>> elasticsearch for geo input you can take a look at my quick & dirty >>>>>> implementation with TopTweetsInALocation ( >>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala >>>>>> ). This approach uses the ESInputFormat which avoids the difficulty of >>>>>> having to manually create ElasticSearch clients. >>>>>> >>>>>> This approach might not work for your data, e.g. if you need to >>>>>> create a query for each record in your RDD. If this is the case, you >>>>>> could >>>>>> instead look at using mapPartitions and setting up your Elasticsearch >>>>>> connection inside of that, so you could then re-use the client for all of >>>>>> the queries on each partition. This approach will avoid having to >>>>>> serialize >>>>>> the Elasticsearch connection because it will be local to your function. >>>>>> >>>>>> Hope this helps! >>>>>> >>>>>> Cheers, >>>>>> >>>>>> Holden :) >>>>>> >>>>>> >>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi < >>>>>> mayur.rust...@gmail.com> wrote: >>>>>> >>>>>>> Its not used as default serializer for some issues with >>>>>>> compatibility & requirement to register the classes.. >>>>>>> >>>>>>> Which part are you getting as nonserializable... you need to >>>>>>> serialize that class if you are sending it to spark workers inside a >>>>>>> map, >>>>>>> reduce , mappartition or any of the operations on RDD. >>>>>>> >>>>>>> >>>>>>> Mayur Rustagi >>>>>>> Ph: +1 (760) 203 3257 >>>>>>> http://www.sigmoidanalytics.com >>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au> >>>>>>> wrote: >>>>>>> >>>>>>>> I'm afraid persisting connection across two tasks is a dangerous >>>>>>>> act as they >>>>>>>> can't be guaranteed to be executed on the same machine. Your ES >>>>>>>> server may >>>>>>>> think its a man-in-the-middle attack! >>>>>>>> >>>>>>>> I think its possible to invoke a static method that give you a >>>>>>>> connection in >>>>>>>> a local 'pool', so nothing will sneak into your closure, but its >>>>>>>> too complex >>>>>>>> and there should be a better option. >>>>>>>> >>>>>>>> Never use kryo before, if its that good perhaps we should use it as >>>>>>>> the >>>>>>>> default serializer >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> View this message in context: >>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html >>>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>>> Nabble.com. >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Cell : 425-233-8271 >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Cell : 425-233-8271 >>>> >>> >>> >> > -- Cell : 425-233-8271