Re: ElasticSearch enrich

2014-06-27 Thread boci
 for each record in your RDD. If this is the case, you 
 could
 instead look at using mapPartitions and setting up your Elasticsearch
 connection inside of that, so you could then re-use the client for 
 all of
 the queries on each partition. This approach will avoid having to 
 serialize
 the Elasticsearch connection because it will be local to your 
 function.

 Hope this helps!

 Cheers,

 Holden :)


 On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi 
 mayur.rust...@gmail.com wrote:

 Its not used as default serializer for some issues with
 compatibility  requirement to register the classes..

 Which part are you getting as nonserializable... you need to
 serialize that class if you are sending it to spark workers inside a 
 map,
 reduce , mappartition or any of the operations on RDD.


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng pc...@uow.edu.au
 wrote:

 I'm afraid persisting connection across two tasks is a dangerous
 act as they
 can't be guaranteed to be executed on the same machine. Your ES
 server may
 think its a man-in-the-middle attack!

 I think its possible to invoke a static method that give you a
 connection in
 a local 'pool', so nothing will sneak into your closure, but its
 too complex
 and there should be a better option.

 Never use kryo before, if its that good perhaps we should use it
 as the
 default serializer



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.





 --
 Cell : 425-233-8271





 --
 Cell : 425-233-8271







 --
 Cell : 425-233-8271





 --
 Cell : 425-233-8271



Re: ElasticSearch enrich

2014-06-27 Thread boci
 elastic search cluster.


 Thanks guys

 b0c1




 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau 
 hol...@pigscanfly.ca wrote:

 So I'm giving a talk at the Spark summit on using Spark 
 ElasticSearch, but for now if you want to see a simple demo which 
 uses
 elasticsearch for geo input you can take a look at my quick  dirty
 implementation with TopTweetsInALocation (
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
 ). This approach uses the ESInputFormat which avoids the difficulty 
 of
 having to manually create ElasticSearch clients.

 This approach might not work for your data, e.g. if you need to
 create a query for each record in your RDD. If this is the case, you 
 could
 instead look at using mapPartitions and setting up your Elasticsearch
 connection inside of that, so you could then re-use the client for 
 all of
 the queries on each partition. This approach will avoid having to 
 serialize
 the Elasticsearch connection because it will be local to your 
 function.

 Hope this helps!

 Cheers,

 Holden :)


 On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi 
 mayur.rust...@gmail.com wrote:

 Its not used as default serializer for some issues with
 compatibility  requirement to register the classes..

 Which part are you getting as nonserializable... you need to
 serialize that class if you are sending it to spark workers inside 
 a map,
 reduce , mappartition or any of the operations on RDD.


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng pc...@uow.edu.au
 wrote:

 I'm afraid persisting connection across two tasks is a
 dangerous act as they
 can't be guaranteed to be executed on the same machine. Your ES
 server may
 think its a man-in-the-middle attack!

 I think its possible to invoke a static method that give you a
 connection in
 a local 'pool', so nothing will sneak into your closure, but
 its too complex
 and there should be a better option.

 Never use kryo before, if its that good perhaps we should use
 it as the
 default serializer



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.





 --
 Cell : 425-233-8271





 --
 Cell : 425-233-8271







 --
 Cell : 425-233-8271





 --
 Cell : 425-233-8271





Re: ElasticSearch enrich

2014-06-27 Thread Holden Karau
 client in local mode just have a flag that control what 
 type of
 client you create.

 - my stream output is write into elasticsearch. How can I
 test output.saveAsHadoopFile[ESOutputFormat](-) in local 
 environment?


 - After store the enriched data into ES, I want to generate
 aggregated data (EsInputFormat) how can I test it in local?

 I think the simplest thing to do would be use the same client in
 mode and just start single node elastic search cluster.


 Thanks guys

 b0c1




 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau 
 hol...@pigscanfly.ca wrote:

 So I'm giving a talk at the Spark summit on using Spark 
 ElasticSearch, but for now if you want to see a simple demo which 
 uses
 elasticsearch for geo input you can take a look at my quick  dirty
 implementation with TopTweetsInALocation (
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
 ). This approach uses the ESInputFormat which avoids the difficulty 
 of
 having to manually create ElasticSearch clients.

 This approach might not work for your data, e.g. if you need to
 create a query for each record in your RDD. If this is the case, 
 you could
 instead look at using mapPartitions and setting up your 
 Elasticsearch
 connection inside of that, so you could then re-use the client for 
 all of
 the queries on each partition. This approach will avoid having to 
 serialize
 the Elasticsearch connection because it will be local to your 
 function.

 Hope this helps!

 Cheers,

 Holden :)


 On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi 
 mayur.rust...@gmail.com wrote:

 Its not used as default serializer for some issues with
 compatibility  requirement to register the classes..

 Which part are you getting as nonserializable... you need to
 serialize that class if you are sending it to spark workers inside 
 a map,
 reduce , mappartition or any of the operations on RDD.


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng pc...@uow.edu.au
 wrote:

 I'm afraid persisting connection across two tasks is a
 dangerous act as they
 can't be guaranteed to be executed on the same machine. Your
 ES server may
 think its a man-in-the-middle attack!

 I think its possible to invoke a static method that give you a
 connection in
 a local 'pool', so nothing will sneak into your closure, but
 its too complex
 and there should be a better option.

 Never use kryo before, if its that good perhaps we should use
 it as the
 default serializer



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.





 --
 Cell : 425-233-8271





 --
 Cell : 425-233-8271







 --
 Cell : 425-233-8271





 --
 Cell : 425-233-8271






-- 
Cell : 425-233-8271


RE: ElasticSearch enrich

2014-06-27 Thread Adrian Mocanu
b0c1http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=user_nodesuser=1215,
 could you post your code? I am interested in your solution.

Thanks
Adrian

From: boci [mailto:boci.b...@gmail.com]
Sent: June-26-14 6:17 PM
To: user@spark.apache.org
Subject: Re: ElasticSearch enrich

Wow, thanks your fast answer, it's help a lot...

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.commailto:boci.b...@gmail.com

On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau 
hol...@pigscanfly.camailto:hol...@pigscanfly.ca wrote:
Hi b0c1,

I have an example of how to do this in the repo for my talk as well, the 
specific example is at 
https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
 . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and then 
call  saveAsHadoopDataset on the RDD that gets passed into the function we 
provide to foreachRDD.

e.g.

stream.foreachRDD{(data, time) =
 val jobconf = ...
 data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
}

Hope that helps :)

Cheers,

Holden :)

On Thu, Jun 26, 2014 at 2:23 PM, boci 
boci.b...@gmail.commailto:boci.b...@gmail.com wrote:
Thanks. I without local option I can connect with es remote, now I only have 
one problem. How can I use elasticsearch-hadoop with spark streaming? I mean 
DStream doesn't have saveAsHadoopFiles method, my second problem the output 
index is depend by the input data.

Thanks

--
Skype: boci13, Hangout: boci.b...@gmail.commailto:boci.b...@gmail.com

On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath 
nick.pentre...@gmail.commailto:nick.pentre...@gmail.com wrote:
You can just add elasticsearch-hadoop as a dependency to your project to user 
the ESInputFormat and ESOutputFormat 
(https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics 
here: 
http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

For testing, yes I think you will need to start ES in local mode (just 
./bin/elasticsearch) and use the default config (host = localhost, port = 9200).

On Thu, Jun 26, 2014 at 9:04 AM, boci 
boci.b...@gmail.commailto:boci.b...@gmail.com wrote:
That's okay, but hadoop has ES integration. what happened if I run 
saveAsHadoopFile without hadoop (or I must need to pull up hadoop 
programatically? (if I can))

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.commailto:boci.b...@gmail.com

On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau 
hol...@pigscanfly.camailto:hol...@pigscanfly.ca wrote:

On Wed, Jun 25, 2014 at 4:16 PM, boci 
boci.b...@gmail.commailto:boci.b...@gmail.com wrote:
Hi guys, thanks the direction now I have some problem/question:
- in local (test) mode I want to use ElasticClient.local to create es 
connection, but in prodution I want to use ElasticClient.remote, to this I want 
to pass ElasticClient to mapPartitions, or what is the best practices?
In this case you probably want to make the ElasticClient inside of 
mapPartitions (since it isn't serializable) and if you want to use a different 
client in local mode just have a flag that control what type of client you 
create.
- my stream output is write into elasticsearch. How can I test 
output.saveAsHadoopFile[ESOutputFormat](-) in local environment?
- After store the enriched data into ES, I want to generate aggregated data 
(EsInputFormat) how can I test it in local?
I think the simplest thing to do would be use the same client in mode and just 
start single node elastic search cluster.

Thanks guys

b0c1



--
Skype: boci13, Hangout: boci.b...@gmail.commailto:boci.b...@gmail.com

On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau 
hol...@pigscanfly.camailto:hol...@pigscanfly.ca wrote:
So I'm giving a talk at the Spark summit on using Spark  ElasticSearch, but 
for now if you want to see a simple demo which uses elasticsearch for geo input 
you can take a look at my quick  dirty implementation with 
TopTweetsInALocation ( 
https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
 ). This approach uses the ESInputFormat which avoids the difficulty of having 
to manually create ElasticSearch clients.

This approach might not work for your data, e.g. if you need to create a query 
for each record in your RDD. If this is the case, you could instead look at 
using mapPartitions and setting up your Elasticsearch connection inside of 
that, so you could then re-use

Re: ElasticSearch enrich

2014-06-27 Thread boci
 at 4:16 PM, boci boci.b...@gmail.com
 wrote:

 Hi guys, thanks the direction now I have some problem/question:
 - in local (test) mode I want to use ElasticClient.local to
 create es connection, but in prodution I want to use 
 ElasticClient.remote,
 to this I want to pass ElasticClient to mapPartitions, or what
 is the best practices?

 In this case you probably want to make the ElasticClient inside
 of mapPartitions (since it isn't serializable) and if you want to 
 use a
 different client in local mode just have a flag that control what 
 type of
 client you create.

 - my stream output is write into elasticsearch. How can I
 test output.saveAsHadoopFile[ESOutputFormat](-) in local 
 environment?


 - After store the enriched data into ES, I want to generate
 aggregated data (EsInputFormat) how can I test it in local?

 I think the simplest thing to do would be use the same client in
 mode and just start single node elastic search cluster.


 Thanks guys

 b0c1




 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau 
 hol...@pigscanfly.ca wrote:

 So I'm giving a talk at the Spark summit on using Spark 
 ElasticSearch, but for now if you want to see a simple demo which 
 uses
 elasticsearch for geo input you can take a look at my quick  dirty
 implementation with TopTweetsInALocation (
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
 ). This approach uses the ESInputFormat which avoids the 
 difficulty of
 having to manually create ElasticSearch clients.

 This approach might not work for your data, e.g. if you need to
 create a query for each record in your RDD. If this is the case, 
 you could
 instead look at using mapPartitions and setting up your 
 Elasticsearch
 connection inside of that, so you could then re-use the client for 
 all of
 the queries on each partition. This approach will avoid having to 
 serialize
 the Elasticsearch connection because it will be local to your 
 function.

 Hope this helps!

 Cheers,

 Holden :)


 On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi 
 mayur.rust...@gmail.com wrote:

 Its not used as default serializer for some issues with
 compatibility  requirement to register the classes..

 Which part are you getting as nonserializable... you need to
 serialize that class if you are sending it to spark workers 
 inside a map,
 reduce , mappartition or any of the operations on RDD.


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng pc...@uow.edu.au
 wrote:

 I'm afraid persisting connection across two tasks is a
 dangerous act as they
 can't be guaranteed to be executed on the same machine. Your
 ES server may
 think its a man-in-the-middle attack!

 I think its possible to invoke a static method that give you
 a connection in
 a local 'pool', so nothing will sneak into your closure, but
 its too complex
 and there should be a better option.

 Never use kryo before, if its that good perhaps we should use
 it as the
 default serializer



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.





 --
 Cell : 425-233-8271





 --
 Cell : 425-233-8271







 --
 Cell : 425-233-8271





 --
 Cell : 425-233-8271






 --
 Cell : 425-233-8271



Re: ElasticSearch enrich

2014-06-27 Thread Holden Karau
...@gmail.com


 On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau 
 hol...@pigscanfly.ca wrote:



 On Wed, Jun 25, 2014 at 4:16 PM, boci boci.b...@gmail.com
 wrote:

 Hi guys, thanks the direction now I have some problem/question:
 - in local (test) mode I want to use ElasticClient.local to
 create es connection, but in prodution I want to use 
 ElasticClient.remote,
 to this I want to pass ElasticClient to mapPartitions, or what
 is the best practices?

 In this case you probably want to make the ElasticClient inside
 of mapPartitions (since it isn't serializable) and if you want to 
 use a
 different client in local mode just have a flag that control what 
 type of
 client you create.

 - my stream output is write into elasticsearch. How can I
 test output.saveAsHadoopFile[ESOutputFormat](-) in local 
 environment?


 - After store the enriched data into ES, I want to generate
 aggregated data (EsInputFormat) how can I test it in local?

 I think the simplest thing to do would be use the same client in
 mode and just start single node elastic search cluster.


 Thanks guys

 b0c1




 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau 
 hol...@pigscanfly.ca wrote:

 So I'm giving a talk at the Spark summit on using Spark 
 ElasticSearch, but for now if you want to see a simple demo which 
 uses
 elasticsearch for geo input you can take a look at my quick  
 dirty
 implementation with TopTweetsInALocation (
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
 ). This approach uses the ESInputFormat which avoids the 
 difficulty of
 having to manually create ElasticSearch clients.

 This approach might not work for your data, e.g. if you need
 to create a query for each record in your RDD. If this is the 
 case, you
 could instead look at using mapPartitions and setting up your 
 Elasticsearch
 connection inside of that, so you could then re-use the client 
 for all of
 the queries on each partition. This approach will avoid having to 
 serialize
 the Elasticsearch connection because it will be local to your 
 function.

 Hope this helps!

 Cheers,

 Holden :)


 On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi 
 mayur.rust...@gmail.com wrote:

 Its not used as default serializer for some issues with
 compatibility  requirement to register the classes..

 Which part are you getting as nonserializable... you need to
 serialize that class if you are sending it to spark workers 
 inside a map,
 reduce , mappartition or any of the operations on RDD.


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng pc...@uow.edu.au
  wrote:

 I'm afraid persisting connection across two tasks is a
 dangerous act as they
 can't be guaranteed to be executed on the same machine. Your
 ES server may
 think its a man-in-the-middle attack!

 I think its possible to invoke a static method that give you
 a connection in
 a local 'pool', so nothing will sneak into your closure, but
 its too complex
 and there should be a better option.

 Never use kryo before, if its that good perhaps we should
 use it as the
 default serializer



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.





 --
 Cell : 425-233-8271





 --
 Cell : 425-233-8271







 --
 Cell : 425-233-8271





 --
 Cell : 425-233-8271






 --
 Cell : 425-233-8271





-- 
Cell : 425-233-8271


Re: ElasticSearch enrich

2014-06-26 Thread boci
That's okay, but hadoop has ES integration. what happened if I run
saveAsHadoopFile without hadoop (or I must need to pull up hadoop
programatically? (if I can))

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.com


On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau hol...@pigscanfly.ca wrote:



 On Wed, Jun 25, 2014 at 4:16 PM, boci boci.b...@gmail.com wrote:

 Hi guys, thanks the direction now I have some problem/question:
 - in local (test) mode I want to use ElasticClient.local to create es
 connection, but in prodution I want to use ElasticClient.remote, to this I
 want to pass ElasticClient to mapPartitions, or what is the best
 practices?

 In this case you probably want to make the ElasticClient inside of
 mapPartitions (since it isn't serializable) and if you want to use a
 different client in local mode just have a flag that control what type of
 client you create.

 - my stream output is write into elasticsearch. How can I
 test output.saveAsHadoopFile[ESOutputFormat](-) in local environment?

 - After store the enriched data into ES, I want to generate aggregated
 data (EsInputFormat) how can I test it in local?

 I think the simplest thing to do would be use the same client in mode and
 just start single node elastic search cluster.


 Thanks guys

 b0c1




 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau hol...@pigscanfly.ca
 wrote:

 So I'm giving a talk at the Spark summit on using Spark  ElasticSearch,
 but for now if you want to see a simple demo which uses elasticsearch for
 geo input you can take a look at my quick  dirty implementation with
 TopTweetsInALocation (
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
 ). This approach uses the ESInputFormat which avoids the difficulty of
 having to manually create ElasticSearch clients.

 This approach might not work for your data, e.g. if you need to create a
 query for each record in your RDD. If this is the case, you could instead
 look at using mapPartitions and setting up your Elasticsearch connection
 inside of that, so you could then re-use the client for all of the queries
 on each partition. This approach will avoid having to serialize the
 Elasticsearch connection because it will be local to your function.

 Hope this helps!

 Cheers,

 Holden :)


 On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi mayur.rust...@gmail.com
 wrote:

 Its not used as default serializer for some issues with compatibility 
 requirement to register the classes..

 Which part are you getting as nonserializable... you need to serialize
 that class if you are sending it to spark workers inside a map, reduce ,
 mappartition or any of the operations on RDD.


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng pc...@uow.edu.au wrote:

 I'm afraid persisting connection across two tasks is a dangerous act
 as they
 can't be guaranteed to be executed on the same machine. Your ES server
 may
 think its a man-in-the-middle attack!

 I think its possible to invoke a static method that give you a
 connection in
 a local 'pool', so nothing will sneak into your closure, but its too
 complex
 and there should be a better option.

 Never use kryo before, if its that good perhaps we should use it as the
 default serializer



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.





 --
 Cell : 425-233-8271





 --
 Cell : 425-233-8271



Re: ElasticSearch enrich

2014-06-26 Thread Nick Pentreath
You can just add elasticsearch-hadoop as a dependency to your project to
user the ESInputFormat and ESOutputFormat (
https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics
here:
http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

For testing, yes I think you will need to start ES in local mode (just
./bin/elasticsearch) and use the default config (host = localhost, port =
9200).


On Thu, Jun 26, 2014 at 9:04 AM, boci boci.b...@gmail.com wrote:

 That's okay, but hadoop has ES integration. what happened if I run
 saveAsHadoopFile without hadoop (or I must need to pull up hadoop
 programatically? (if I can))

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau hol...@pigscanfly.ca
 wrote:



 On Wed, Jun 25, 2014 at 4:16 PM, boci boci.b...@gmail.com wrote:

 Hi guys, thanks the direction now I have some problem/question:
 - in local (test) mode I want to use ElasticClient.local to create es
 connection, but in prodution I want to use ElasticClient.remote, to this I
 want to pass ElasticClient to mapPartitions, or what is the best
 practices?

 In this case you probably want to make the ElasticClient inside of
 mapPartitions (since it isn't serializable) and if you want to use a
 different client in local mode just have a flag that control what type of
 client you create.

 - my stream output is write into elasticsearch. How can I
 test output.saveAsHadoopFile[ESOutputFormat](-) in local environment?

 - After store the enriched data into ES, I want to generate aggregated
 data (EsInputFormat) how can I test it in local?

 I think the simplest thing to do would be use the same client in mode and
 just start single node elastic search cluster.


 Thanks guys

 b0c1




 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau hol...@pigscanfly.ca
 wrote:

 So I'm giving a talk at the Spark summit on using Spark 
 ElasticSearch, but for now if you want to see a simple demo which uses
 elasticsearch for geo input you can take a look at my quick  dirty
 implementation with TopTweetsInALocation (
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
 ). This approach uses the ESInputFormat which avoids the difficulty of
 having to manually create ElasticSearch clients.

 This approach might not work for your data, e.g. if you need to create
 a query for each record in your RDD. If this is the case, you could instead
 look at using mapPartitions and setting up your Elasticsearch connection
 inside of that, so you could then re-use the client for all of the queries
 on each partition. This approach will avoid having to serialize the
 Elasticsearch connection because it will be local to your function.

 Hope this helps!

 Cheers,

 Holden :)


 On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi mayur.rust...@gmail.com
  wrote:

 Its not used as default serializer for some issues with compatibility
  requirement to register the classes..

 Which part are you getting as nonserializable... you need to serialize
 that class if you are sending it to spark workers inside a map, reduce ,
 mappartition or any of the operations on RDD.


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng pc...@uow.edu.au wrote:

 I'm afraid persisting connection across two tasks is a dangerous act
 as they
 can't be guaranteed to be executed on the same machine. Your ES
 server may
 think its a man-in-the-middle attack!

 I think its possible to invoke a static method that give you a
 connection in
 a local 'pool', so nothing will sneak into your closure, but its too
 complex
 and there should be a better option.

 Never use kryo before, if its that good perhaps we should use it as
 the
 default serializer



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.





 --
 Cell : 425-233-8271





 --
 Cell : 425-233-8271





Re: ElasticSearch enrich

2014-06-26 Thread boci
Thanks. I without local option I can connect with es remote, now I only
have one problem. How can I use elasticsearch-hadoop with spark streaming?
I mean DStream doesn't have saveAsHadoopFiles method, my second problem
the output index is depend by the input data.

Thanks

--
Skype: boci13, Hangout: boci.b...@gmail.com


On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath nick.pentre...@gmail.com
wrote:

 You can just add elasticsearch-hadoop as a dependency to your project to
 user the ESInputFormat and ESOutputFormat (
 https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics
 here:
 http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

 For testing, yes I think you will need to start ES in local mode (just
 ./bin/elasticsearch) and use the default config (host = localhost, port =
 9200).


 On Thu, Jun 26, 2014 at 9:04 AM, boci boci.b...@gmail.com wrote:

 That's okay, but hadoop has ES integration. what happened if I run
 saveAsHadoopFile without hadoop (or I must need to pull up hadoop
 programatically? (if I can))

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau hol...@pigscanfly.ca
 wrote:



 On Wed, Jun 25, 2014 at 4:16 PM, boci boci.b...@gmail.com wrote:

 Hi guys, thanks the direction now I have some problem/question:
 - in local (test) mode I want to use ElasticClient.local to create es
 connection, but in prodution I want to use ElasticClient.remote, to this I
 want to pass ElasticClient to mapPartitions, or what is the best
 practices?

 In this case you probably want to make the ElasticClient inside of
 mapPartitions (since it isn't serializable) and if you want to use a
 different client in local mode just have a flag that control what type of
 client you create.

 - my stream output is write into elasticsearch. How can I
 test output.saveAsHadoopFile[ESOutputFormat](-) in local environment?


 - After store the enriched data into ES, I want to generate aggregated
 data (EsInputFormat) how can I test it in local?

 I think the simplest thing to do would be use the same client in mode
 and just start single node elastic search cluster.


 Thanks guys

 b0c1




 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau hol...@pigscanfly.ca
 wrote:

 So I'm giving a talk at the Spark summit on using Spark 
 ElasticSearch, but for now if you want to see a simple demo which uses
 elasticsearch for geo input you can take a look at my quick  dirty
 implementation with TopTweetsInALocation (
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
 ). This approach uses the ESInputFormat which avoids the difficulty of
 having to manually create ElasticSearch clients.

 This approach might not work for your data, e.g. if you need to create
 a query for each record in your RDD. If this is the case, you could 
 instead
 look at using mapPartitions and setting up your Elasticsearch connection
 inside of that, so you could then re-use the client for all of the queries
 on each partition. This approach will avoid having to serialize the
 Elasticsearch connection because it will be local to your function.

 Hope this helps!

 Cheers,

 Holden :)


 On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi 
 mayur.rust...@gmail.com wrote:

 Its not used as default serializer for some issues with compatibility
  requirement to register the classes..

 Which part are you getting as nonserializable... you need to
 serialize that class if you are sending it to spark workers inside a map,
 reduce , mappartition or any of the operations on RDD.


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng pc...@uow.edu.au wrote:

 I'm afraid persisting connection across two tasks is a dangerous act
 as they
 can't be guaranteed to be executed on the same machine. Your ES
 server may
 think its a man-in-the-middle attack!

 I think its possible to invoke a static method that give you a
 connection in
 a local 'pool', so nothing will sneak into your closure, but its too
 complex
 and there should be a better option.

 Never use kryo before, if its that good perhaps we should use it as
 the
 default serializer



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.





 --
 Cell : 425-233-8271

Re: ElasticSearch enrich

2014-06-26 Thread Holden Karau
 is a dangerous
 act as they
 can't be guaranteed to be executed on the same machine. Your ES
 server may
 think its a man-in-the-middle attack!

 I think its possible to invoke a static method that give you a
 connection in
 a local 'pool', so nothing will sneak into your closure, but its
 too complex
 and there should be a better option.

 Never use kryo before, if its that good perhaps we should use it as
 the
 default serializer



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.





 --
 Cell : 425-233-8271





 --
 Cell : 425-233-8271







-- 
Cell : 425-233-8271


Re: ElasticSearch enrich

2014-06-26 Thread boci
 inside a 
 map,
 reduce , mappartition or any of the operations on RDD.


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng pc...@uow.edu.au
 wrote:

 I'm afraid persisting connection across two tasks is a dangerous
 act as they
 can't be guaranteed to be executed on the same machine. Your ES
 server may
 think its a man-in-the-middle attack!

 I think its possible to invoke a static method that give you a
 connection in
 a local 'pool', so nothing will sneak into your closure, but its
 too complex
 and there should be a better option.

 Never use kryo before, if its that good perhaps we should use it
 as the
 default serializer



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.





 --
 Cell : 425-233-8271





 --
 Cell : 425-233-8271







 --
 Cell : 425-233-8271



Re: ElasticSearch enrich

2014-06-26 Thread Holden Karau
 serializer for some issues with
 compatibility  requirement to register the classes..

 Which part are you getting as nonserializable... you need to
 serialize that class if you are sending it to spark workers inside a 
 map,
 reduce , mappartition or any of the operations on RDD.


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng pc...@uow.edu.au
 wrote:

 I'm afraid persisting connection across two tasks is a dangerous
 act as they
 can't be guaranteed to be executed on the same machine. Your ES
 server may
 think its a man-in-the-middle attack!

 I think its possible to invoke a static method that give you a
 connection in
 a local 'pool', so nothing will sneak into your closure, but its
 too complex
 and there should be a better option.

 Never use kryo before, if its that good perhaps we should use it
 as the
 default serializer



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.





 --
 Cell : 425-233-8271





 --
 Cell : 425-233-8271







 --
 Cell : 425-233-8271





-- 
Cell : 425-233-8271


Re: ElasticSearch enrich

2014-06-25 Thread boci
Hi guys, thanks the direction now I have some problem/question:
- in local (test) mode I want to use ElasticClient.local to create es
connection, but in prodution I want to use ElasticClient.remote, to this I
want to pass ElasticClient to mapPartitions, or what is the best practices?
- my stream output is write into elasticsearch. How can I
test output.saveAsHadoopFile[ESOutputFormat](-) in local environment?
- After store the enriched data into ES, I want to generate aggregated data
(EsInputFormat) how can I test it in local?

Thanks guys

b0c1



--
Skype: boci13, Hangout: boci.b...@gmail.com


On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau hol...@pigscanfly.ca wrote:

 So I'm giving a talk at the Spark summit on using Spark  ElasticSearch,
 but for now if you want to see a simple demo which uses elasticsearch for
 geo input you can take a look at my quick  dirty implementation with
 TopTweetsInALocation (
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
 ). This approach uses the ESInputFormat which avoids the difficulty of
 having to manually create ElasticSearch clients.

 This approach might not work for your data, e.g. if you need to create a
 query for each record in your RDD. If this is the case, you could instead
 look at using mapPartitions and setting up your Elasticsearch connection
 inside of that, so you could then re-use the client for all of the queries
 on each partition. This approach will avoid having to serialize the
 Elasticsearch connection because it will be local to your function.

 Hope this helps!

 Cheers,

 Holden :)


 On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi mayur.rust...@gmail.com
 wrote:

 Its not used as default serializer for some issues with compatibility 
 requirement to register the classes..

 Which part are you getting as nonserializable... you need to serialize
 that class if you are sending it to spark workers inside a map, reduce ,
 mappartition or any of the operations on RDD.


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng pc...@uow.edu.au wrote:

 I'm afraid persisting connection across two tasks is a dangerous act as
 they
 can't be guaranteed to be executed on the same machine. Your ES server
 may
 think its a man-in-the-middle attack!

 I think its possible to invoke a static method that give you a
 connection in
 a local 'pool', so nothing will sneak into your closure, but its too
 complex
 and there should be a better option.

 Never use kryo before, if its that good perhaps we should use it as the
 default serializer



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.





 --
 Cell : 425-233-8271



Re: ElasticSearch enrich

2014-06-25 Thread Holden Karau
On Wed, Jun 25, 2014 at 4:16 PM, boci boci.b...@gmail.com wrote:

 Hi guys, thanks the direction now I have some problem/question:
 - in local (test) mode I want to use ElasticClient.local to create es
 connection, but in prodution I want to use ElasticClient.remote, to this I
 want to pass ElasticClient to mapPartitions, or what is the best
 practices?

In this case you probably want to make the ElasticClient inside of
mapPartitions (since it isn't serializable) and if you want to use a
different client in local mode just have a flag that control what type of
client you create.

 - my stream output is write into elasticsearch. How can I
 test output.saveAsHadoopFile[ESOutputFormat](-) in local environment?

- After store the enriched data into ES, I want to generate aggregated data
 (EsInputFormat) how can I test it in local?

I think the simplest thing to do would be use the same client in mode and
just start single node elastic search cluster.


 Thanks guys

 b0c1




 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau hol...@pigscanfly.ca
 wrote:

 So I'm giving a talk at the Spark summit on using Spark  ElasticSearch,
 but for now if you want to see a simple demo which uses elasticsearch for
 geo input you can take a look at my quick  dirty implementation with
 TopTweetsInALocation (
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
 ). This approach uses the ESInputFormat which avoids the difficulty of
 having to manually create ElasticSearch clients.

 This approach might not work for your data, e.g. if you need to create a
 query for each record in your RDD. If this is the case, you could instead
 look at using mapPartitions and setting up your Elasticsearch connection
 inside of that, so you could then re-use the client for all of the queries
 on each partition. This approach will avoid having to serialize the
 Elasticsearch connection because it will be local to your function.

 Hope this helps!

 Cheers,

 Holden :)


 On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi mayur.rust...@gmail.com
 wrote:

 Its not used as default serializer for some issues with compatibility 
 requirement to register the classes..

 Which part are you getting as nonserializable... you need to serialize
 that class if you are sending it to spark workers inside a map, reduce ,
 mappartition or any of the operations on RDD.


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng pc...@uow.edu.au wrote:

 I'm afraid persisting connection across two tasks is a dangerous act as
 they
 can't be guaranteed to be executed on the same machine. Your ES server
 may
 think its a man-in-the-middle attack!

 I think its possible to invoke a static method that give you a
 connection in
 a local 'pool', so nothing will sneak into your closure, but its too
 complex
 and there should be a better option.

 Never use kryo before, if its that good perhaps we should use it as the
 default serializer



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.





 --
 Cell : 425-233-8271





-- 
Cell : 425-233-8271


ElasticSearch enrich

2014-06-24 Thread boci
Hi guys,

I have a small question. I want to create a Worker class which using
ElasticClient to make query to elasticsearch. (I want to enrich my data
with geo search result).

How can I do that? I try to create a worker instance with ES host/port
parameter but spark throw an exceptino (my class not serializable).

Any idea?

Thanks
b0c1


Re: ElasticSearch enrich

2014-06-24 Thread Peng Cheng
make sure all queries are called through class methods and wrap your query
info with a class having only simple properties (strings, collections etc).
If you can't find such wrapper you can also use SerializableWritable wrapper
out-of-the-box, but its not recommended. (developer-api and make fat
closures that run slowly)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8214.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: ElasticSearch enrich

2014-06-24 Thread boci
Ok but in this case where can I store the ES connection? Or all document
create new ES connection inside the worker?

--
Skype: boci13, Hangout: boci.b...@gmail.com


On Wed, Jun 25, 2014 at 1:01 AM, Peng Cheng pc...@uow.edu.au wrote:

 make sure all queries are called through class methods and wrap your query
 info with a class having only simple properties (strings, collections etc).
 If you can't find such wrapper you can also use SerializableWritable
 wrapper
 out-of-the-box, but its not recommended. (developer-api and make fat
 closures that run slowly)



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8214.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: ElasticSearch enrich

2014-06-24 Thread Mayur Rustagi
Mostly ES client is not serializable for you. You can do 3 workarounds,
1. Switch to kryo serialization, register the client in kryo , might solve
your serialization issue
2. Use mappartition for all your data  initialize your client in the
mappartition code, this will create client for each partition, reduce some
parallelism  add some overhead of creation of client but prevent
serialization of esclient  transfer to workers
3. Use serializablewrapper to serialize your ESclient manually  send it
across  deserialize it manually, this may or may not work depending on
whether your class is safely serializable.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Wed, Jun 25, 2014 at 4:12 AM, boci boci.b...@gmail.com wrote:

 Hi guys,

 I have a small question. I want to create a Worker class which using
 ElasticClient to make query to elasticsearch. (I want to enrich my data
 with geo search result).

 How can I do that? I try to create a worker instance with ES host/port
 parameter but spark throw an exceptino (my class not serializable).

 Any idea?

 Thanks
 b0c1




Re: ElasticSearch enrich

2014-06-24 Thread Peng Cheng
I'm afraid persisting connection across two tasks is a dangerous act as they
can't be guaranteed to be executed on the same machine. Your ES server may
think its a man-in-the-middle attack!

I think its possible to invoke a static method that give you a connection in
a local 'pool', so nothing will sneak into your closure, but its too complex
and there should be a better option.

Never use kryo before, if its that good perhaps we should use it as the
default serializer



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: ElasticSearch enrich

2014-06-24 Thread Mayur Rustagi
Its not used as default serializer for some issues with compatibility 
requirement to register the classes..

Which part are you getting as nonserializable... you need to serialize that
class if you are sending it to spark workers inside a map, reduce ,
mappartition or any of the operations on RDD.


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng pc...@uow.edu.au wrote:

 I'm afraid persisting connection across two tasks is a dangerous act as
 they
 can't be guaranteed to be executed on the same machine. Your ES server may
 think its a man-in-the-middle attack!

 I think its possible to invoke a static method that give you a connection
 in
 a local 'pool', so nothing will sneak into your closure, but its too
 complex
 and there should be a better option.

 Never use kryo before, if its that good perhaps we should use it as the
 default serializer



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.