Try setting the master to local[4]

On Fri, Jun 27, 2014 at 2:17 PM, boci <boci.b...@gmail.com> wrote:

> This is a simply scalatest. I start a SparkConf, set the master to local
> (set the serializer etc), pull up kafka and es connection send a message to
> kafka and wait 30sec to processing.
>
> It's run in IDEA no magick trick.
>
> b0c1
>
>
> ----------------------------------------------------------------------------------------------------------------------------------
> Skype: boci13, Hangout: boci.b...@gmail.com
>
>
> On Fri, Jun 27, 2014 at 11:11 PM, Holden Karau <hol...@pigscanfly.ca>
> wrote:
>
>> So a few quick questions:
>>
>> 1) What cluster are you running this against? Is it just local? Have you
>> tried local[4]?
>> 2) When you say breakpoint, how are you setting this break point? There
>> is a good chance your breakpoint mechanism doesn't work in a distributed
>> environment, could you instead cause a side effect (like writing to a file)?
>>
>> Cheers,
>>
>> Holden :)
>>
>>
>> On Fri, Jun 27, 2014 at 2:04 PM, boci <boci.b...@gmail.com> wrote:
>>
>>> Ok I found dynamic resources, but I have a frustrating problem. This is
>>> the flow:
>>> kafka -> enrich X -> enrich Y -> enrich Z -> foreachRDD -> save
>>>
>>> My problem is: if I do this it's not work, the enrich functions not
>>> called, but if I put a print it's does. for example if I do this:
>>> kafka -> enrich X -> enrich Y -> print -> enrich Z -> foreachRDD
>>>
>>> The enrich X and enrich Y called but enrich Z not
>>> if I put the print after the enrich Z it's will be printed. How can I
>>> solve this? (what can I do to call the foreachRDD I put breakpoint inside
>>> the map function (where I'm generate the writable) but it's not called)
>>>
>>> Any idea?
>>>
>>> b0c1
>>>
>>>
>>>
>>>
>>> ----------------------------------------------------------------------------------------------------------------------------------
>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>
>>>
>>> On Fri, Jun 27, 2014 at 4:53 PM, boci <boci.b...@gmail.com> wrote:
>>>
>>>> Another question. In the foreachRDD I will initialize the JobConf, but
>>>> in this place how can I get information from the items?
>>>> I have an identifier in the data which identify the required ES index
>>>> (so how can I set dynamic index in the foreachRDD) ?
>>>>
>>>> b0c1
>>>>
>>>>
>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>
>>>>
>>>> On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau <hol...@pigscanfly.ca>
>>>> wrote:
>>>>
>>>>> Just your luck I happened to be working on that very talk today :) Let
>>>>> me know how your experiences with Elasticsearch & Spark go :)
>>>>>
>>>>>
>>>>> On Thu, Jun 26, 2014 at 3:17 PM, boci <boci.b...@gmail.com> wrote:
>>>>>
>>>>>> Wow, thanks your fast answer, it's help a lot...
>>>>>>
>>>>>> b0c1
>>>>>>
>>>>>>
>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>>
>>>>>>
>>>>>> On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <hol...@pigscanfly.ca>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi b0c1,
>>>>>>>
>>>>>>> I have an example of how to do this in the repo for my talk as well,
>>>>>>> the specific example is at
>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
>>>>>>> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
>>>>>>> then call  saveAsHadoopDataset on the RDD that gets passed into the
>>>>>>> function we provide to foreachRDD.
>>>>>>>
>>>>>>> e.g.
>>>>>>>
>>>>>>> stream.foreachRDD{(data, time) =>
>>>>>>>      val jobconf = ...
>>>>>>>      data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
>>>>>>> }
>>>>>>>
>>>>>>> Hope that helps :)
>>>>>>>
>>>>>>> Cheers,
>>>>>>>
>>>>>>> Holden :)
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jun 26, 2014 at 2:23 PM, boci <boci.b...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks. I without local option I can connect with es remote, now I
>>>>>>>> only have one problem. How can I use elasticsearch-hadoop with spark
>>>>>>>> streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my
>>>>>>>> second problem the output index is depend by the input data.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>>
>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <
>>>>>>>> nick.pentre...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> You can just add elasticsearch-hadoop as a dependency to your
>>>>>>>>> project to user the ESInputFormat and ESOutputFormat (
>>>>>>>>> https://github.com/elasticsearch/elasticsearch-hadoop). Some
>>>>>>>>> other basics here:
>>>>>>>>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html
>>>>>>>>>
>>>>>>>>> For testing, yes I think you will need to start ES in local mode
>>>>>>>>> (just ./bin/elasticsearch) and use the default config (host = 
>>>>>>>>> localhost,
>>>>>>>>> port = 9200).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jun 26, 2014 at 9:04 AM, boci <boci.b...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> That's okay, but hadoop has ES integration. what happened if I
>>>>>>>>>> run saveAsHadoopFile without hadoop (or I must need to pull up hadoop
>>>>>>>>>> programatically? (if I can))
>>>>>>>>>>
>>>>>>>>>> b0c1
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <
>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <boci.b...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi guys, thanks the direction now I have some problem/question:
>>>>>>>>>>>> - in local (test) mode I want to use ElasticClient.local to
>>>>>>>>>>>> create es connection, but in prodution I want to use 
>>>>>>>>>>>> ElasticClient.remote,
>>>>>>>>>>>> to this I want to pass ElasticClient to mapPartitions, or what
>>>>>>>>>>>> is the best practices?
>>>>>>>>>>>>
>>>>>>>>>>> In this case you probably want to make the ElasticClient inside
>>>>>>>>>>> of mapPartitions (since it isn't serializable) and if you want to 
>>>>>>>>>>> use a
>>>>>>>>>>> different client in local mode just have a flag that control what 
>>>>>>>>>>> type of
>>>>>>>>>>> client you create.
>>>>>>>>>>>
>>>>>>>>>>>> - my stream output is write into elasticsearch. How can I
>>>>>>>>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local 
>>>>>>>>>>>> environment?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>> - After store the enriched data into ES, I want to generate
>>>>>>>>>>>> aggregated data (EsInputFormat) how can I test it in local?
>>>>>>>>>>>>
>>>>>>>>>>> I think the simplest thing to do would be use the same client in
>>>>>>>>>>> mode and just start single node elastic search cluster.
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks guys
>>>>>>>>>>>>
>>>>>>>>>>>> b0c1
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <
>>>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> So I'm giving a talk at the Spark summit on using Spark &
>>>>>>>>>>>>> ElasticSearch, but for now if you want to see a simple demo which 
>>>>>>>>>>>>> uses
>>>>>>>>>>>>> elasticsearch for geo input you can take a look at my quick & 
>>>>>>>>>>>>> dirty
>>>>>>>>>>>>> implementation with TopTweetsInALocation (
>>>>>>>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
>>>>>>>>>>>>> ). This approach uses the ESInputFormat which avoids the 
>>>>>>>>>>>>> difficulty of
>>>>>>>>>>>>> having to manually create ElasticSearch clients.
>>>>>>>>>>>>>
>>>>>>>>>>>>> This approach might not work for your data, e.g. if you need
>>>>>>>>>>>>> to create a query for each record in your RDD. If this is the 
>>>>>>>>>>>>> case, you
>>>>>>>>>>>>> could instead look at using mapPartitions and setting up your 
>>>>>>>>>>>>> Elasticsearch
>>>>>>>>>>>>> connection inside of that, so you could then re-use the client 
>>>>>>>>>>>>> for all of
>>>>>>>>>>>>> the queries on each partition. This approach will avoid having to 
>>>>>>>>>>>>> serialize
>>>>>>>>>>>>> the Elasticsearch connection because it will be local to your 
>>>>>>>>>>>>> function.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hope this helps!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Holden :)
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <
>>>>>>>>>>>>> mayur.rust...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Its not used as default serializer for some issues with
>>>>>>>>>>>>>> compatibility & requirement to register the classes..
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Which part are you getting as nonserializable... you need to
>>>>>>>>>>>>>> serialize that class if you are sending it to spark workers 
>>>>>>>>>>>>>> inside a map,
>>>>>>>>>>>>>> reduce , mappartition or any of the operations on RDD.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Mayur Rustagi
>>>>>>>>>>>>>> Ph: +1 (760) 203 3257
>>>>>>>>>>>>>> http://www.sigmoidanalytics.com
>>>>>>>>>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au
>>>>>>>>>>>>>> > wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I'm afraid persisting connection across two tasks is a
>>>>>>>>>>>>>>> dangerous act as they
>>>>>>>>>>>>>>> can't be guaranteed to be executed on the same machine. Your
>>>>>>>>>>>>>>> ES server may
>>>>>>>>>>>>>>> think its a man-in-the-middle attack!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think its possible to invoke a static method that give you
>>>>>>>>>>>>>>> a connection in
>>>>>>>>>>>>>>> a local 'pool', so nothing will sneak into your closure, but
>>>>>>>>>>>>>>> its too complex
>>>>>>>>>>>>>>> and there should be a better option.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Never use kryo before, if its that good perhaps we should
>>>>>>>>>>>>>>> use it as the
>>>>>>>>>>>>>>> default serializer
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> View this message in context:
>>>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
>>>>>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>>>>>>> Nabble.com.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Cell : 425-233-8271
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Cell : 425-233-8271
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Cell : 425-233-8271
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Cell : 425-233-8271
>>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>> Cell : 425-233-8271
>>
>
>


-- 
Cell : 425-233-8271

Reply via email to