On Friday, June 27, 2014, boci <boci.b...@gmail.com> wrote:

> Thanks, more local thread solve the problem, it's work like a charm. How
> many thread required?
>
Just more than one so that it can schedule the other task :)

>
> Adrian: it's not public project but ask, and I will answer (if I can)...
> maybe later I will create a demo project based on my solution.
>
> b0c1
>

>
> ----------------------------------------------------------------------------------------------------------------------------------
> Skype: boci13, Hangout: boci.b...@gmail.com
> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');>
>
>
> On Fri, Jun 27, 2014 at 11:31 PM, Holden Karau <hol...@pigscanfly.ca
> <javascript:_e(%7B%7D,'cvml','hol...@pigscanfly.ca');>> wrote:
>
>> Try setting the master to local[4]
>>
>>
>> On Fri, Jun 27, 2014 at 2:17 PM, boci <boci.b...@gmail.com
>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');>> wrote:
>>
>>> This is a simply scalatest. I start a SparkConf, set the master to local
>>> (set the serializer etc), pull up kafka and es connection send a message to
>>> kafka and wait 30sec to processing.
>>>
>>> It's run in IDEA no magick trick.
>>>
>>> b0c1
>>>
>>>
>>> ----------------------------------------------------------------------------------------------------------------------------------
>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');>
>>>
>>>
>>> On Fri, Jun 27, 2014 at 11:11 PM, Holden Karau <hol...@pigscanfly.ca
>>> <javascript:_e(%7B%7D,'cvml','hol...@pigscanfly.ca');>> wrote:
>>>
>>>> So a few quick questions:
>>>>
>>>> 1) What cluster are you running this against? Is it just local? Have
>>>> you tried local[4]?
>>>> 2) When you say breakpoint, how are you setting this break point? There
>>>> is a good chance your breakpoint mechanism doesn't work in a distributed
>>>> environment, could you instead cause a side effect (like writing to a 
>>>> file)?
>>>>
>>>> Cheers,
>>>>
>>>> Holden :)
>>>>
>>>>
>>>> On Fri, Jun 27, 2014 at 2:04 PM, boci <boci.b...@gmail.com
>>>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');>> wrote:
>>>>
>>>>> Ok I found dynamic resources, but I have a frustrating problem. This
>>>>> is the flow:
>>>>> kafka -> enrich X -> enrich Y -> enrich Z -> foreachRDD -> save
>>>>>
>>>>> My problem is: if I do this it's not work, the enrich functions not
>>>>> called, but if I put a print it's does. for example if I do this:
>>>>> kafka -> enrich X -> enrich Y -> print -> enrich Z -> foreachRDD
>>>>>
>>>>> The enrich X and enrich Y called but enrich Z not
>>>>> if I put the print after the enrich Z it's will be printed. How can I
>>>>> solve this? (what can I do to call the foreachRDD I put breakpoint inside
>>>>> the map function (where I'm generate the writable) but it's not called)
>>>>>
>>>>> Any idea?
>>>>>
>>>>> b0c1
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');>
>>>>>
>>>>>
>>>>> On Fri, Jun 27, 2014 at 4:53 PM, boci <boci.b...@gmail.com
>>>>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');>> wrote:
>>>>>
>>>>>> Another question. In the foreachRDD I will initialize the JobConf,
>>>>>> but in this place how can I get information from the items?
>>>>>> I have an identifier in the data which identify the required ES index
>>>>>> (so how can I set dynamic index in the foreachRDD) ?
>>>>>>
>>>>>> b0c1
>>>>>>
>>>>>>
>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');>
>>>>>>
>>>>>>
>>>>>> On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau <hol...@pigscanfly.ca
>>>>>> <javascript:_e(%7B%7D,'cvml','hol...@pigscanfly.ca');>> wrote:
>>>>>>
>>>>>>> Just your luck I happened to be working on that very talk today :)
>>>>>>> Let me know how your experiences with Elasticsearch & Spark go :)
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jun 26, 2014 at 3:17 PM, boci <boci.b...@gmail.com
>>>>>>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');>> wrote:
>>>>>>>
>>>>>>>> Wow, thanks your fast answer, it's help a lot...
>>>>>>>>
>>>>>>>> b0c1
>>>>>>>>
>>>>>>>>
>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>>>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <
>>>>>>>> hol...@pigscanfly.ca
>>>>>>>> <javascript:_e(%7B%7D,'cvml','hol...@pigscanfly.ca');>> wrote:
>>>>>>>>
>>>>>>>>> Hi b0c1,
>>>>>>>>>
>>>>>>>>> I have an example of how to do this in the repo for my talk as
>>>>>>>>> well, the specific example is at
>>>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
>>>>>>>>> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD 
>>>>>>>>> and
>>>>>>>>> then call  saveAsHadoopDataset on the RDD that gets passed into the
>>>>>>>>> function we provide to foreachRDD.
>>>>>>>>>
>>>>>>>>> e.g.
>>>>>>>>>
>>>>>>>>> stream.foreachRDD{(data, time) =>
>>>>>>>>>      val jobconf = ...
>>>>>>>>>      data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> Hope that helps :)
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>>
>>>>>>>>> Holden :)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jun 26, 2014 at 2:23 PM, boci <boci.b...@gmail.com
>>>>>>>>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks. I without local option I can connect with es remote, now
>>>>>>>>>> I only have one problem. How can I use elasticsearch-hadoop with 
>>>>>>>>>> spark
>>>>>>>>>> streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my
>>>>>>>>>> second problem the output index is depend by the input data.
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <
>>>>>>>>>> nick.pentre...@gmail.com
>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','nick.pentre...@gmail.com');>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> You can just add elasticsearch-hadoop as a dependency to your
>>>>>>>>>>> project to user the ESInputFormat and ESOutputFormat (
>>>>>>>>>>> https://github.com/elasticsearch/elasticsearch-hadoop). Some
>>>>>>>>>>> other basics here:
>>>>>>>>>>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html
>>>>>>>>>>>
>>>>>>>>>>> For testing, yes I think you will need to start ES in local mode
>>>>>>>>>>> (just ./bin/elasticsearch) and use the default config (host = 
>>>>>>>>>>> localhost,
>>>>>>>>>>> port = 9200).
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jun 26, 2014 at 9:04 AM, boci <boci.b...@gmail.com
>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> That's okay, but hadoop has ES integration. what happened if I
>>>>>>>>>>>> run saveAsHadoopFile without hadoop (or I must need to pull up 
>>>>>>>>>>>> hadoop
>>>>>>>>>>>> programatically? (if I can))
>>>>>>>>>>>>
>>>>>>>>>>>> b0c1
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <
>>>>>>>>>>>> hol...@pigscanfly.ca
>>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','hol...@pigscanfly.ca');>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <boci.b...@gmail.com
>>>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi guys, thanks the direction now I have some
>>>>>>>>>>>>>> problem/question:
>>>>>>>>>>>>>> - in local (test) mode I want to use ElasticClient.local to
>>>>>>>>>>>>>> create es connection, but in prodution I want to use 
>>>>>>>>>>>>>> ElasticClient.remote,
>>>>>>>>>>>>>> to this I want to pass ElasticClient to mapPartitions, or
>>>>>>>>>>>>>> what is the best practices?
>>>>>>>>>>>>>>
>>>>>>>>>>>>> In this case you probably want to make the ElasticClient
>>>>>>>>>>>>> inside of mapPartitions (since it isn't serializable) and if you 
>>>>>>>>>>>>> want to
>>>>>>>>>>>>> use a different client in local mode just have a flag that 
>>>>>>>>>>>>> control what
>>>>>>>>>>>>> type of client you create.
>>>>>>>>>>>>>
>>>>>>>>>>>>>> - my stream output is write into elasticsearch. How can I
>>>>>>>>>>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local 
>>>>>>>>>>>>>> environment?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>> - After store the enriched data into ES, I want to generate
>>>>>>>>>>>>>> aggregated data (EsInputFormat) how can I test it in local?
>>>>>>>>>>>>>>
>>>>>>>>>>>>> I think the simplest thing to do would be use the same client
>>>>>>>>>>>>> in mode and just start single node elastic search cluster.
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks guys
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> b0c1
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <
>>>>>>>>>>>>>> hol...@pigscanfly.ca
>>>>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','hol...@pigscanfly.ca');>>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> So I'm giving a talk at the Spark summit on using Spark &
>>>>>>>>>>>>>>> ElasticSearch, but for now if you want to see a simple demo 
>>>>>>>>>>>>>>> which uses
>>>>>>>>>>>>>>> elasticsearch for geo input you can take a look at my quick & 
>>>>>>>>>>>>>>> dirty
>>>>>>>>>>>>>>> implementation with TopTweetsInALocation (
>>>>>>>>>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
>>>>>>>>>>>>>>> ). This approach uses the ESInputFormat which avoids the 
>>>>>>>>>>>>>>> difficulty of
>>>>>>>>>>>>>>> having to manually create ElasticSearch clients.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This approach might not work for your data, e.g. if you need
>>>>>>>>>>>>>>> to create a query for each record in your RDD. If this is the 
>>>>>>>>>>>>>>> case, you
>>>>>>>>>>>>>>> could instead look at using mapPartitions and setting up your 
>>>>>>>>>>>>>>> Elasticsearch
>>>>>>>>>>>>>>> connection inside of that, so you could then re-use the client 
>>>>>>>>>>>>>>> for all of
>>>>>>>>>>>>>>> the queries on each partition. This approach will avoid having 
>>>>>>>>>>>>>>> to serialize
>>>>>>>>>>>>>>> the Elasticsearch connection because it will be local to your 
>>>>>>>>>>>>>>> function.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hope this helps!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Holden :)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <
>>>>>>>>>>>>>>> mayur.rust...@gmail.com
>>>>>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','mayur.rust...@gmail.com');>>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Its not used as default serializer for some issues with
>>>>>>>>>>>>>>>> compatibility & requirement to register the classes..
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Which part are you getting as nonserializable... you need
>>>>>>>>>>>>>>>> to serialize that class if you are sending it to spark workers 
>>>>>>>>>>>>>>>> inside a
>>>>>>>>>>>>>>>> map, reduce , mappartition or any of the operations on RDD.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Mayur Rustagi
>>>>>>>>>>>>>>>> Ph: +1 (760) 203 3257
>>>>>>>>>>>>>>>> http://www.sigmoidanalytics.com
>>>>>>>>>>>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <
>>>>>>>>>>>>>>>> pc...@uow.edu.au
>>>>>>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','pc...@uow.edu.au');>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I'm afraid persisting connection across two tasks is a
>>>>>>>>>>>>>>>>> dangerous act as they
>>>>>>>>>>>>>>>>> can't be guaranteed to be executed on the same machine.
>>>>>>>>>>>>>>>>> Your ES server may
>>>>>>>>>>>>>>>>> think its a man-in-the-middle attack!
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I think its possible to invoke a static method that give
>>>>>>>>>>>>>>>>> you a connection in
>>>>>>>>>>>>>>>>> a local 'pool', so nothing will sneak into your closure,
>>>>>>>>>>>>>>>>> but its too complex
>>>>>>>>>>>>>>>>> and there should be a better option.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Never use kryo before, if its that good perhaps we should
>>>>>>>>>>>>>>>>> use it as the
>>>>>>>>>>>>>>>>> default serializer
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> View this message in context:
>>>>>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
>>>>>>>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive
>>>>>>>>>>>>>>>>> at Nabble.com.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Cell : 425-233-8271
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Cell : 425-233-8271
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Cell : 425-233-8271
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Cell : 425-233-8271
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Cell : 425-233-8271
>>>>
>>>
>>>
>>
>>
>> --
>> Cell : 425-233-8271
>>
>
>

-- 
Cell : 425-233-8271

Reply via email to