This is a simply scalatest. I start a SparkConf, set the master to local (set the serializer etc), pull up kafka and es connection send a message to kafka and wait 30sec to processing.
It's run in IDEA no magick trick. b0c1 ---------------------------------------------------------------------------------------------------------------------------------- Skype: boci13, Hangout: boci.b...@gmail.com On Fri, Jun 27, 2014 at 11:11 PM, Holden Karau <hol...@pigscanfly.ca> wrote: > So a few quick questions: > > 1) What cluster are you running this against? Is it just local? Have you > tried local[4]? > 2) When you say breakpoint, how are you setting this break point? There is > a good chance your breakpoint mechanism doesn't work in a distributed > environment, could you instead cause a side effect (like writing to a file)? > > Cheers, > > Holden :) > > > On Fri, Jun 27, 2014 at 2:04 PM, boci <boci.b...@gmail.com> wrote: > >> Ok I found dynamic resources, but I have a frustrating problem. This is >> the flow: >> kafka -> enrich X -> enrich Y -> enrich Z -> foreachRDD -> save >> >> My problem is: if I do this it's not work, the enrich functions not >> called, but if I put a print it's does. for example if I do this: >> kafka -> enrich X -> enrich Y -> print -> enrich Z -> foreachRDD >> >> The enrich X and enrich Y called but enrich Z not >> if I put the print after the enrich Z it's will be printed. How can I >> solve this? (what can I do to call the foreachRDD I put breakpoint inside >> the map function (where I'm generate the writable) but it's not called) >> >> Any idea? >> >> b0c1 >> >> >> >> >> ---------------------------------------------------------------------------------------------------------------------------------- >> Skype: boci13, Hangout: boci.b...@gmail.com >> >> >> On Fri, Jun 27, 2014 at 4:53 PM, boci <boci.b...@gmail.com> wrote: >> >>> Another question. In the foreachRDD I will initialize the JobConf, but >>> in this place how can I get information from the items? >>> I have an identifier in the data which identify the required ES index >>> (so how can I set dynamic index in the foreachRDD) ? >>> >>> b0c1 >>> >>> >>> ---------------------------------------------------------------------------------------------------------------------------------- >>> Skype: boci13, Hangout: boci.b...@gmail.com >>> >>> >>> On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau <hol...@pigscanfly.ca> >>> wrote: >>> >>>> Just your luck I happened to be working on that very talk today :) Let >>>> me know how your experiences with Elasticsearch & Spark go :) >>>> >>>> >>>> On Thu, Jun 26, 2014 at 3:17 PM, boci <boci.b...@gmail.com> wrote: >>>> >>>>> Wow, thanks your fast answer, it's help a lot... >>>>> >>>>> b0c1 >>>>> >>>>> >>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>> >>>>> >>>>> On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <hol...@pigscanfly.ca> >>>>> wrote: >>>>> >>>>>> Hi b0c1, >>>>>> >>>>>> I have an example of how to do this in the repo for my talk as well, >>>>>> the specific example is at >>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala >>>>>> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and >>>>>> then call saveAsHadoopDataset on the RDD that gets passed into the >>>>>> function we provide to foreachRDD. >>>>>> >>>>>> e.g. >>>>>> >>>>>> stream.foreachRDD{(data, time) => >>>>>> val jobconf = ... >>>>>> data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf) >>>>>> } >>>>>> >>>>>> Hope that helps :) >>>>>> >>>>>> Cheers, >>>>>> >>>>>> Holden :) >>>>>> >>>>>> >>>>>> On Thu, Jun 26, 2014 at 2:23 PM, boci <boci.b...@gmail.com> wrote: >>>>>> >>>>>>> Thanks. I without local option I can connect with es remote, now I >>>>>>> only have one problem. How can I use elasticsearch-hadoop with spark >>>>>>> streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my >>>>>>> second problem the output index is depend by the input data. >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> >>>>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>>>> >>>>>>> >>>>>>> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath < >>>>>>> nick.pentre...@gmail.com> wrote: >>>>>>> >>>>>>>> You can just add elasticsearch-hadoop as a dependency to your >>>>>>>> project to user the ESInputFormat and ESOutputFormat ( >>>>>>>> https://github.com/elasticsearch/elasticsearch-hadoop). Some other >>>>>>>> basics here: >>>>>>>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html >>>>>>>> >>>>>>>> For testing, yes I think you will need to start ES in local mode >>>>>>>> (just ./bin/elasticsearch) and use the default config (host = >>>>>>>> localhost, >>>>>>>> port = 9200). >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Jun 26, 2014 at 9:04 AM, boci <boci.b...@gmail.com> wrote: >>>>>>>> >>>>>>>>> That's okay, but hadoop has ES integration. what happened if I run >>>>>>>>> saveAsHadoopFile without hadoop (or I must need to pull up hadoop >>>>>>>>> programatically? (if I can)) >>>>>>>>> >>>>>>>>> b0c1 >>>>>>>>> >>>>>>>>> >>>>>>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau < >>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <boci.b...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi guys, thanks the direction now I have some problem/question: >>>>>>>>>>> - in local (test) mode I want to use ElasticClient.local to >>>>>>>>>>> create es connection, but in prodution I want to use >>>>>>>>>>> ElasticClient.remote, >>>>>>>>>>> to this I want to pass ElasticClient to mapPartitions, or what >>>>>>>>>>> is the best practices? >>>>>>>>>>> >>>>>>>>>> In this case you probably want to make the ElasticClient inside >>>>>>>>>> of mapPartitions (since it isn't serializable) and if you want to >>>>>>>>>> use a >>>>>>>>>> different client in local mode just have a flag that control what >>>>>>>>>> type of >>>>>>>>>> client you create. >>>>>>>>>> >>>>>>>>>>> - my stream output is write into elasticsearch. How can I >>>>>>>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local >>>>>>>>>>> environment? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> - After store the enriched data into ES, I want to generate >>>>>>>>>>> aggregated data (EsInputFormat) how can I test it in local? >>>>>>>>>>> >>>>>>>>>> I think the simplest thing to do would be use the same client in >>>>>>>>>> mode and just start single node elastic search cluster. >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Thanks guys >>>>>>>>>>> >>>>>>>>>>> b0c1 >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau < >>>>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>>>> >>>>>>>>>>>> So I'm giving a talk at the Spark summit on using Spark & >>>>>>>>>>>> ElasticSearch, but for now if you want to see a simple demo which >>>>>>>>>>>> uses >>>>>>>>>>>> elasticsearch for geo input you can take a look at my quick & dirty >>>>>>>>>>>> implementation with TopTweetsInALocation ( >>>>>>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala >>>>>>>>>>>> ). This approach uses the ESInputFormat which avoids the >>>>>>>>>>>> difficulty of >>>>>>>>>>>> having to manually create ElasticSearch clients. >>>>>>>>>>>> >>>>>>>>>>>> This approach might not work for your data, e.g. if you need to >>>>>>>>>>>> create a query for each record in your RDD. If this is the case, >>>>>>>>>>>> you could >>>>>>>>>>>> instead look at using mapPartitions and setting up your >>>>>>>>>>>> Elasticsearch >>>>>>>>>>>> connection inside of that, so you could then re-use the client for >>>>>>>>>>>> all of >>>>>>>>>>>> the queries on each partition. This approach will avoid having to >>>>>>>>>>>> serialize >>>>>>>>>>>> the Elasticsearch connection because it will be local to your >>>>>>>>>>>> function. >>>>>>>>>>>> >>>>>>>>>>>> Hope this helps! >>>>>>>>>>>> >>>>>>>>>>>> Cheers, >>>>>>>>>>>> >>>>>>>>>>>> Holden :) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi < >>>>>>>>>>>> mayur.rust...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Its not used as default serializer for some issues with >>>>>>>>>>>>> compatibility & requirement to register the classes.. >>>>>>>>>>>>> >>>>>>>>>>>>> Which part are you getting as nonserializable... you need to >>>>>>>>>>>>> serialize that class if you are sending it to spark workers >>>>>>>>>>>>> inside a map, >>>>>>>>>>>>> reduce , mappartition or any of the operations on RDD. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Mayur Rustagi >>>>>>>>>>>>> Ph: +1 (760) 203 3257 >>>>>>>>>>>>> http://www.sigmoidanalytics.com >>>>>>>>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> I'm afraid persisting connection across two tasks is a >>>>>>>>>>>>>> dangerous act as they >>>>>>>>>>>>>> can't be guaranteed to be executed on the same machine. Your >>>>>>>>>>>>>> ES server may >>>>>>>>>>>>>> think its a man-in-the-middle attack! >>>>>>>>>>>>>> >>>>>>>>>>>>>> I think its possible to invoke a static method that give you >>>>>>>>>>>>>> a connection in >>>>>>>>>>>>>> a local 'pool', so nothing will sneak into your closure, but >>>>>>>>>>>>>> its too complex >>>>>>>>>>>>>> and there should be a better option. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Never use kryo before, if its that good perhaps we should use >>>>>>>>>>>>>> it as the >>>>>>>>>>>>>> default serializer >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> View this message in context: >>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html >>>>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>>>>>>>>> Nabble.com. >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Cell : 425-233-8271 >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Cell : 425-233-8271 >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Cell : 425-233-8271 >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Cell : 425-233-8271 >>>> >>> >>> >> > > > -- > Cell : 425-233-8271 >