Hi b0c1,

I have an example of how to do this in the repo for my talk as well, the
specific example is at
https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
. Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
then call  saveAsHadoopDataset on the RDD that gets passed into the
function we provide to foreachRDD.

e.g.

stream.foreachRDD{(data, time) =>
     val jobconf = ...
     data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
}

Hope that helps :)

Cheers,

Holden :)


On Thu, Jun 26, 2014 at 2:23 PM, boci <boci.b...@gmail.com> wrote:

> Thanks. I without local option I can connect with es remote, now I only
> have one problem. How can I use elasticsearch-hadoop with spark streaming?
> I mean DStream doesn't have "saveAsHadoopFiles" method, my second problem
> the output index is depend by the input data.
>
> Thanks
>
>
> ----------------------------------------------------------------------------------------------------------------------------------
> Skype: boci13, Hangout: boci.b...@gmail.com
>
>
> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <nick.pentre...@gmail.com
> > wrote:
>
>> You can just add elasticsearch-hadoop as a dependency to your project to
>> user the ESInputFormat and ESOutputFormat (
>> https://github.com/elasticsearch/elasticsearch-hadoop). Some other
>> basics here:
>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html
>>
>> For testing, yes I think you will need to start ES in local mode (just
>> ./bin/elasticsearch) and use the default config (host = localhost, port =
>> 9200).
>>
>>
>> On Thu, Jun 26, 2014 at 9:04 AM, boci <boci.b...@gmail.com> wrote:
>>
>>> That's okay, but hadoop has ES integration. what happened if I run
>>> saveAsHadoopFile without hadoop (or I must need to pull up hadoop
>>> programatically? (if I can))
>>>
>>> b0c1
>>>
>>>
>>> ----------------------------------------------------------------------------------------------------------------------------------
>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>
>>>
>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <hol...@pigscanfly.ca>
>>> wrote:
>>>
>>>>
>>>>
>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <boci.b...@gmail.com> wrote:
>>>>
>>>>> Hi guys, thanks the direction now I have some problem/question:
>>>>> - in local (test) mode I want to use ElasticClient.local to create es
>>>>> connection, but in prodution I want to use ElasticClient.remote, to this I
>>>>> want to pass ElasticClient to mapPartitions, or what is the best
>>>>> practices?
>>>>>
>>>> In this case you probably want to make the ElasticClient inside of
>>>> mapPartitions (since it isn't serializable) and if you want to use a
>>>> different client in local mode just have a flag that control what type of
>>>> client you create.
>>>>
>>>>> - my stream output is write into elasticsearch. How can I
>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment?
>>>>>
>>>>>
>>>> - After store the enriched data into ES, I want to generate aggregated
>>>>> data (EsInputFormat) how can I test it in local?
>>>>>
>>>> I think the simplest thing to do would be use the same client in mode
>>>> and just start single node elastic search cluster.
>>>>
>>>>>
>>>>> Thanks guys
>>>>>
>>>>> b0c1
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>
>>>>>
>>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <hol...@pigscanfly.ca>
>>>>> wrote:
>>>>>
>>>>>> So I'm giving a talk at the Spark summit on using Spark &
>>>>>> ElasticSearch, but for now if you want to see a simple demo which uses
>>>>>> elasticsearch for geo input you can take a look at my quick & dirty
>>>>>> implementation with TopTweetsInALocation (
>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
>>>>>> ). This approach uses the ESInputFormat which avoids the difficulty of
>>>>>> having to manually create ElasticSearch clients.
>>>>>>
>>>>>> This approach might not work for your data, e.g. if you need to
>>>>>> create a query for each record in your RDD. If this is the case, you 
>>>>>> could
>>>>>> instead look at using mapPartitions and setting up your Elasticsearch
>>>>>> connection inside of that, so you could then re-use the client for all of
>>>>>> the queries on each partition. This approach will avoid having to 
>>>>>> serialize
>>>>>> the Elasticsearch connection because it will be local to your function.
>>>>>>
>>>>>> Hope this helps!
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> Holden :)
>>>>>>
>>>>>>
>>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <
>>>>>> mayur.rust...@gmail.com> wrote:
>>>>>>
>>>>>>> Its not used as default serializer for some issues with
>>>>>>> compatibility & requirement to register the classes..
>>>>>>>
>>>>>>> Which part are you getting as nonserializable... you need to
>>>>>>> serialize that class if you are sending it to spark workers inside a 
>>>>>>> map,
>>>>>>> reduce , mappartition or any of the operations on RDD.
>>>>>>>
>>>>>>>
>>>>>>> Mayur Rustagi
>>>>>>> Ph: +1 (760) 203 3257
>>>>>>> http://www.sigmoidanalytics.com
>>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I'm afraid persisting connection across two tasks is a dangerous
>>>>>>>> act as they
>>>>>>>> can't be guaranteed to be executed on the same machine. Your ES
>>>>>>>> server may
>>>>>>>> think its a man-in-the-middle attack!
>>>>>>>>
>>>>>>>> I think its possible to invoke a static method that give you a
>>>>>>>> connection in
>>>>>>>> a local 'pool', so nothing will sneak into your closure, but its
>>>>>>>> too complex
>>>>>>>> and there should be a better option.
>>>>>>>>
>>>>>>>> Never use kryo before, if its that good perhaps we should use it as
>>>>>>>> the
>>>>>>>> default serializer
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> View this message in context:
>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>> Nabble.com.
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Cell : 425-233-8271
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Cell : 425-233-8271
>>>>
>>>
>>>
>>
>


-- 
Cell : 425-233-8271

Reply via email to