On Friday, June 27, 2014, boci <boci.b...@gmail.com> wrote: > Thanks, more local thread solve the problem, it's work like a charm. How > many thread required? > Just more than one so that it can schedule the other task :)
> > Adrian: it's not public project but ask, and I will answer (if I can)... > maybe later I will create a demo project based on my solution. > > b0c1 > > > ---------------------------------------------------------------------------------------------------------------------------------- > Skype: boci13, Hangout: boci.b...@gmail.com > <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');> > > > On Fri, Jun 27, 2014 at 11:31 PM, Holden Karau <hol...@pigscanfly.ca > <javascript:_e(%7B%7D,'cvml','hol...@pigscanfly.ca');>> wrote: > >> Try setting the master to local[4] >> >> >> On Fri, Jun 27, 2014 at 2:17 PM, boci <boci.b...@gmail.com >> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');>> wrote: >> >>> This is a simply scalatest. I start a SparkConf, set the master to local >>> (set the serializer etc), pull up kafka and es connection send a message to >>> kafka and wait 30sec to processing. >>> >>> It's run in IDEA no magick trick. >>> >>> b0c1 >>> >>> >>> ---------------------------------------------------------------------------------------------------------------------------------- >>> Skype: boci13, Hangout: boci.b...@gmail.com >>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');> >>> >>> >>> On Fri, Jun 27, 2014 at 11:11 PM, Holden Karau <hol...@pigscanfly.ca >>> <javascript:_e(%7B%7D,'cvml','hol...@pigscanfly.ca');>> wrote: >>> >>>> So a few quick questions: >>>> >>>> 1) What cluster are you running this against? Is it just local? Have >>>> you tried local[4]? >>>> 2) When you say breakpoint, how are you setting this break point? There >>>> is a good chance your breakpoint mechanism doesn't work in a distributed >>>> environment, could you instead cause a side effect (like writing to a >>>> file)? >>>> >>>> Cheers, >>>> >>>> Holden :) >>>> >>>> >>>> On Fri, Jun 27, 2014 at 2:04 PM, boci <boci.b...@gmail.com >>>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');>> wrote: >>>> >>>>> Ok I found dynamic resources, but I have a frustrating problem. This >>>>> is the flow: >>>>> kafka -> enrich X -> enrich Y -> enrich Z -> foreachRDD -> save >>>>> >>>>> My problem is: if I do this it's not work, the enrich functions not >>>>> called, but if I put a print it's does. for example if I do this: >>>>> kafka -> enrich X -> enrich Y -> print -> enrich Z -> foreachRDD >>>>> >>>>> The enrich X and enrich Y called but enrich Z not >>>>> if I put the print after the enrich Z it's will be printed. How can I >>>>> solve this? (what can I do to call the foreachRDD I put breakpoint inside >>>>> the map function (where I'm generate the writable) but it's not called) >>>>> >>>>> Any idea? >>>>> >>>>> b0c1 >>>>> >>>>> >>>>> >>>>> >>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');> >>>>> >>>>> >>>>> On Fri, Jun 27, 2014 at 4:53 PM, boci <boci.b...@gmail.com >>>>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');>> wrote: >>>>> >>>>>> Another question. In the foreachRDD I will initialize the JobConf, >>>>>> but in this place how can I get information from the items? >>>>>> I have an identifier in the data which identify the required ES index >>>>>> (so how can I set dynamic index in the foreachRDD) ? >>>>>> >>>>>> b0c1 >>>>>> >>>>>> >>>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');> >>>>>> >>>>>> >>>>>> On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau <hol...@pigscanfly.ca >>>>>> <javascript:_e(%7B%7D,'cvml','hol...@pigscanfly.ca');>> wrote: >>>>>> >>>>>>> Just your luck I happened to be working on that very talk today :) >>>>>>> Let me know how your experiences with Elasticsearch & Spark go :) >>>>>>> >>>>>>> >>>>>>> On Thu, Jun 26, 2014 at 3:17 PM, boci <boci.b...@gmail.com >>>>>>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');>> wrote: >>>>>>> >>>>>>>> Wow, thanks your fast answer, it's help a lot... >>>>>>>> >>>>>>>> b0c1 >>>>>>>> >>>>>>>> >>>>>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>>>>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau < >>>>>>>> hol...@pigscanfly.ca >>>>>>>> <javascript:_e(%7B%7D,'cvml','hol...@pigscanfly.ca');>> wrote: >>>>>>>> >>>>>>>>> Hi b0c1, >>>>>>>>> >>>>>>>>> I have an example of how to do this in the repo for my talk as >>>>>>>>> well, the specific example is at >>>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala >>>>>>>>> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD >>>>>>>>> and >>>>>>>>> then call saveAsHadoopDataset on the RDD that gets passed into the >>>>>>>>> function we provide to foreachRDD. >>>>>>>>> >>>>>>>>> e.g. >>>>>>>>> >>>>>>>>> stream.foreachRDD{(data, time) => >>>>>>>>> val jobconf = ... >>>>>>>>> data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf) >>>>>>>>> } >>>>>>>>> >>>>>>>>> Hope that helps :) >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> >>>>>>>>> Holden :) >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Jun 26, 2014 at 2:23 PM, boci <boci.b...@gmail.com >>>>>>>>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');>> wrote: >>>>>>>>> >>>>>>>>>> Thanks. I without local option I can connect with es remote, now >>>>>>>>>> I only have one problem. How can I use elasticsearch-hadoop with >>>>>>>>>> spark >>>>>>>>>> streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my >>>>>>>>>> second problem the output index is depend by the input data. >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>>>>>>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath < >>>>>>>>>> nick.pentre...@gmail.com >>>>>>>>>> <javascript:_e(%7B%7D,'cvml','nick.pentre...@gmail.com');>> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> You can just add elasticsearch-hadoop as a dependency to your >>>>>>>>>>> project to user the ESInputFormat and ESOutputFormat ( >>>>>>>>>>> https://github.com/elasticsearch/elasticsearch-hadoop). Some >>>>>>>>>>> other basics here: >>>>>>>>>>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html >>>>>>>>>>> >>>>>>>>>>> For testing, yes I think you will need to start ES in local mode >>>>>>>>>>> (just ./bin/elasticsearch) and use the default config (host = >>>>>>>>>>> localhost, >>>>>>>>>>> port = 9200). >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Jun 26, 2014 at 9:04 AM, boci <boci.b...@gmail.com >>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');>> wrote: >>>>>>>>>>> >>>>>>>>>>>> That's okay, but hadoop has ES integration. what happened if I >>>>>>>>>>>> run saveAsHadoopFile without hadoop (or I must need to pull up >>>>>>>>>>>> hadoop >>>>>>>>>>>> programatically? (if I can)) >>>>>>>>>>>> >>>>>>>>>>>> b0c1 >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>>>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau < >>>>>>>>>>>> hol...@pigscanfly.ca >>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','hol...@pigscanfly.ca');>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <boci.b...@gmail.com >>>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi guys, thanks the direction now I have some >>>>>>>>>>>>>> problem/question: >>>>>>>>>>>>>> - in local (test) mode I want to use ElasticClient.local to >>>>>>>>>>>>>> create es connection, but in prodution I want to use >>>>>>>>>>>>>> ElasticClient.remote, >>>>>>>>>>>>>> to this I want to pass ElasticClient to mapPartitions, or >>>>>>>>>>>>>> what is the best practices? >>>>>>>>>>>>>> >>>>>>>>>>>>> In this case you probably want to make the ElasticClient >>>>>>>>>>>>> inside of mapPartitions (since it isn't serializable) and if you >>>>>>>>>>>>> want to >>>>>>>>>>>>> use a different client in local mode just have a flag that >>>>>>>>>>>>> control what >>>>>>>>>>>>> type of client you create. >>>>>>>>>>>>> >>>>>>>>>>>>>> - my stream output is write into elasticsearch. How can I >>>>>>>>>>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local >>>>>>>>>>>>>> environment? >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> - After store the enriched data into ES, I want to generate >>>>>>>>>>>>>> aggregated data (EsInputFormat) how can I test it in local? >>>>>>>>>>>>>> >>>>>>>>>>>>> I think the simplest thing to do would be use the same client >>>>>>>>>>>>> in mode and just start single node elastic search cluster. >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks guys >>>>>>>>>>>>>> >>>>>>>>>>>>>> b0c1 >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>>>>>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','boci.b...@gmail.com');> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau < >>>>>>>>>>>>>> hol...@pigscanfly.ca >>>>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','hol...@pigscanfly.ca');>> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> So I'm giving a talk at the Spark summit on using Spark & >>>>>>>>>>>>>>> ElasticSearch, but for now if you want to see a simple demo >>>>>>>>>>>>>>> which uses >>>>>>>>>>>>>>> elasticsearch for geo input you can take a look at my quick & >>>>>>>>>>>>>>> dirty >>>>>>>>>>>>>>> implementation with TopTweetsInALocation ( >>>>>>>>>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala >>>>>>>>>>>>>>> ). This approach uses the ESInputFormat which avoids the >>>>>>>>>>>>>>> difficulty of >>>>>>>>>>>>>>> having to manually create ElasticSearch clients. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This approach might not work for your data, e.g. if you need >>>>>>>>>>>>>>> to create a query for each record in your RDD. If this is the >>>>>>>>>>>>>>> case, you >>>>>>>>>>>>>>> could instead look at using mapPartitions and setting up your >>>>>>>>>>>>>>> Elasticsearch >>>>>>>>>>>>>>> connection inside of that, so you could then re-use the client >>>>>>>>>>>>>>> for all of >>>>>>>>>>>>>>> the queries on each partition. This approach will avoid having >>>>>>>>>>>>>>> to serialize >>>>>>>>>>>>>>> the Elasticsearch connection because it will be local to your >>>>>>>>>>>>>>> function. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hope this helps! >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Holden :) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi < >>>>>>>>>>>>>>> mayur.rust...@gmail.com >>>>>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','mayur.rust...@gmail.com');>> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Its not used as default serializer for some issues with >>>>>>>>>>>>>>>> compatibility & requirement to register the classes.. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Which part are you getting as nonserializable... you need >>>>>>>>>>>>>>>> to serialize that class if you are sending it to spark workers >>>>>>>>>>>>>>>> inside a >>>>>>>>>>>>>>>> map, reduce , mappartition or any of the operations on RDD. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Mayur Rustagi >>>>>>>>>>>>>>>> Ph: +1 (760) 203 3257 >>>>>>>>>>>>>>>> http://www.sigmoidanalytics.com >>>>>>>>>>>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng < >>>>>>>>>>>>>>>> pc...@uow.edu.au >>>>>>>>>>>>>>>> <javascript:_e(%7B%7D,'cvml','pc...@uow.edu.au');>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I'm afraid persisting connection across two tasks is a >>>>>>>>>>>>>>>>> dangerous act as they >>>>>>>>>>>>>>>>> can't be guaranteed to be executed on the same machine. >>>>>>>>>>>>>>>>> Your ES server may >>>>>>>>>>>>>>>>> think its a man-in-the-middle attack! >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I think its possible to invoke a static method that give >>>>>>>>>>>>>>>>> you a connection in >>>>>>>>>>>>>>>>> a local 'pool', so nothing will sneak into your closure, >>>>>>>>>>>>>>>>> but its too complex >>>>>>>>>>>>>>>>> and there should be a better option. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Never use kryo before, if its that good perhaps we should >>>>>>>>>>>>>>>>> use it as the >>>>>>>>>>>>>>>>> default serializer >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>> View this message in context: >>>>>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html >>>>>>>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive >>>>>>>>>>>>>>>>> at Nabble.com. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Cell : 425-233-8271 >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Cell : 425-233-8271 >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Cell : 425-233-8271 >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Cell : 425-233-8271 >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Cell : 425-233-8271 >>>> >>> >>> >> >> >> -- >> Cell : 425-233-8271 >> > > -- Cell : 425-233-8271