Re: ElasticSearch enrich

2014-06-27 Thread boci
Another question. In the foreachRDD I will initialize the JobConf, but in
this place how can I get information from the items?
I have an identifier in the data which identify the required ES index (so
how can I set dynamic index in the foreachRDD) ?

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.com


On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau hol...@pigscanfly.ca wrote:

 Just your luck I happened to be working on that very talk today :) Let me
 know how your experiences with Elasticsearch  Spark go :)


 On Thu, Jun 26, 2014 at 3:17 PM, boci boci.b...@gmail.com wrote:

 Wow, thanks your fast answer, it's help a lot...

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau hol...@pigscanfly.ca
 wrote:

 Hi b0c1,

 I have an example of how to do this in the repo for my talk as well, the
 specific example is at
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
 . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
 then call  saveAsHadoopDataset on the RDD that gets passed into the
 function we provide to foreachRDD.

 e.g.

 stream.foreachRDD{(data, time) =
  val jobconf = ...
  data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
 }

 Hope that helps :)

 Cheers,

 Holden :)


 On Thu, Jun 26, 2014 at 2:23 PM, boci boci.b...@gmail.com wrote:

 Thanks. I without local option I can connect with es remote, now I only
 have one problem. How can I use elasticsearch-hadoop with spark streaming?
 I mean DStream doesn't have saveAsHadoopFiles method, my second problem
 the output index is depend by the input data.

 Thanks


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:

 You can just add elasticsearch-hadoop as a dependency to your project
 to user the ESInputFormat and ESOutputFormat (
 https://github.com/elasticsearch/elasticsearch-hadoop). Some other
 basics here:
 http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

 For testing, yes I think you will need to start ES in local mode (just
 ./bin/elasticsearch) and use the default config (host = localhost, port =
 9200).


 On Thu, Jun 26, 2014 at 9:04 AM, boci boci.b...@gmail.com wrote:

 That's okay, but hadoop has ES integration. what happened if I run
 saveAsHadoopFile without hadoop (or I must need to pull up hadoop
 programatically? (if I can))

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau hol...@pigscanfly.ca
 wrote:



 On Wed, Jun 25, 2014 at 4:16 PM, boci boci.b...@gmail.com wrote:

 Hi guys, thanks the direction now I have some problem/question:
 - in local (test) mode I want to use ElasticClient.local to create
 es connection, but in prodution I want to use ElasticClient.remote, to 
 this
 I want to pass ElasticClient to mapPartitions, or what is the best
 practices?

 In this case you probably want to make the ElasticClient inside of
 mapPartitions (since it isn't serializable) and if you want to use a
 different client in local mode just have a flag that control what type 
 of
 client you create.

 - my stream output is write into elasticsearch. How can I
 test output.saveAsHadoopFile[ESOutputFormat](-) in local environment?


 - After store the enriched data into ES, I want to generate
 aggregated data (EsInputFormat) how can I test it in local?

 I think the simplest thing to do would be use the same client in
 mode and just start single node elastic search cluster.


 Thanks guys

 b0c1




 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau hol...@pigscanfly.ca
  wrote:

 So I'm giving a talk at the Spark summit on using Spark 
 ElasticSearch, but for now if you want to see a simple demo which uses
 elasticsearch for geo input you can take a look at my quick  dirty
 implementation with TopTweetsInALocation (
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
 ). This approach uses the ESInputFormat which avoids the difficulty of
 having to manually create ElasticSearch clients.

 This approach might not work for your data, e.g. if you need to
 create a query for 

Re: ElasticSearch enrich

2014-06-27 Thread boci
Ok I found dynamic resources, but I have a frustrating problem. This is the
flow:
kafka - enrich X - enrich Y - enrich Z - foreachRDD - save

My problem is: if I do this it's not work, the enrich functions not called,
but if I put a print it's does. for example if I do this:
kafka - enrich X - enrich Y - print - enrich Z - foreachRDD

The enrich X and enrich Y called but enrich Z not
if I put the print after the enrich Z it's will be printed. How can I solve
this? (what can I do to call the foreachRDD I put breakpoint inside the map
function (where I'm generate the writable) but it's not called)

Any idea?

b0c1



--
Skype: boci13, Hangout: boci.b...@gmail.com


On Fri, Jun 27, 2014 at 4:53 PM, boci boci.b...@gmail.com wrote:

 Another question. In the foreachRDD I will initialize the JobConf, but in
 this place how can I get information from the items?
 I have an identifier in the data which identify the required ES index (so
 how can I set dynamic index in the foreachRDD) ?

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau hol...@pigscanfly.ca
 wrote:

 Just your luck I happened to be working on that very talk today :) Let me
 know how your experiences with Elasticsearch  Spark go :)


 On Thu, Jun 26, 2014 at 3:17 PM, boci boci.b...@gmail.com wrote:

 Wow, thanks your fast answer, it's help a lot...

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau hol...@pigscanfly.ca
 wrote:

 Hi b0c1,

 I have an example of how to do this in the repo for my talk as well,
 the specific example is at
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
 . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
 then call  saveAsHadoopDataset on the RDD that gets passed into the
 function we provide to foreachRDD.

 e.g.

 stream.foreachRDD{(data, time) =
  val jobconf = ...
  data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
 }

 Hope that helps :)

 Cheers,

 Holden :)


 On Thu, Jun 26, 2014 at 2:23 PM, boci boci.b...@gmail.com wrote:

 Thanks. I without local option I can connect with es remote, now I
 only have one problem. How can I use elasticsearch-hadoop with spark
 streaming? I mean DStream doesn't have saveAsHadoopFiles method, my
 second problem the output index is depend by the input data.

 Thanks


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:

 You can just add elasticsearch-hadoop as a dependency to your project
 to user the ESInputFormat and ESOutputFormat (
 https://github.com/elasticsearch/elasticsearch-hadoop). Some other
 basics here:
 http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

 For testing, yes I think you will need to start ES in local mode
 (just ./bin/elasticsearch) and use the default config (host = localhost,
 port = 9200).


 On Thu, Jun 26, 2014 at 9:04 AM, boci boci.b...@gmail.com wrote:

 That's okay, but hadoop has ES integration. what happened if I run
 saveAsHadoopFile without hadoop (or I must need to pull up hadoop
 programatically? (if I can))

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau hol...@pigscanfly.ca
 wrote:



 On Wed, Jun 25, 2014 at 4:16 PM, boci boci.b...@gmail.com wrote:

 Hi guys, thanks the direction now I have some problem/question:
 - in local (test) mode I want to use ElasticClient.local to create
 es connection, but in prodution I want to use ElasticClient.remote, 
 to this
 I want to pass ElasticClient to mapPartitions, or what is the
 best practices?

 In this case you probably want to make the ElasticClient inside of
 mapPartitions (since it isn't serializable) and if you want to use a
 different client in local mode just have a flag that control what type 
 of
 client you create.

 - my stream output is write into elasticsearch. How can I
 test output.saveAsHadoopFile[ESOutputFormat](-) in local 
 environment?


 - After store the enriched data into ES, I want to generate
 aggregated data (EsInputFormat) how can I test it in local?

 I think the simplest thing to do would be use the same client in
 mode and just start single node 

Re: ElasticSearch enrich

2014-06-27 Thread Holden Karau
So a few quick questions:

1) What cluster are you running this against? Is it just local? Have you
tried local[4]?
2) When you say breakpoint, how are you setting this break point? There is
a good chance your breakpoint mechanism doesn't work in a distributed
environment, could you instead cause a side effect (like writing to a file)?

Cheers,

Holden :)


On Fri, Jun 27, 2014 at 2:04 PM, boci boci.b...@gmail.com wrote:

 Ok I found dynamic resources, but I have a frustrating problem. This is
 the flow:
 kafka - enrich X - enrich Y - enrich Z - foreachRDD - save

 My problem is: if I do this it's not work, the enrich functions not
 called, but if I put a print it's does. for example if I do this:
 kafka - enrich X - enrich Y - print - enrich Z - foreachRDD

 The enrich X and enrich Y called but enrich Z not
 if I put the print after the enrich Z it's will be printed. How can I
 solve this? (what can I do to call the foreachRDD I put breakpoint inside
 the map function (where I'm generate the writable) but it's not called)

 Any idea?

 b0c1




 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Fri, Jun 27, 2014 at 4:53 PM, boci boci.b...@gmail.com wrote:

 Another question. In the foreachRDD I will initialize the JobConf, but in
 this place how can I get information from the items?
 I have an identifier in the data which identify the required ES index (so
 how can I set dynamic index in the foreachRDD) ?

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau hol...@pigscanfly.ca
 wrote:

 Just your luck I happened to be working on that very talk today :) Let
 me know how your experiences with Elasticsearch  Spark go :)


 On Thu, Jun 26, 2014 at 3:17 PM, boci boci.b...@gmail.com wrote:

 Wow, thanks your fast answer, it's help a lot...

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau hol...@pigscanfly.ca
 wrote:

 Hi b0c1,

 I have an example of how to do this in the repo for my talk as well,
 the specific example is at
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
 . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
 then call  saveAsHadoopDataset on the RDD that gets passed into the
 function we provide to foreachRDD.

 e.g.

 stream.foreachRDD{(data, time) =
  val jobconf = ...
  data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
 }

 Hope that helps :)

 Cheers,

 Holden :)


 On Thu, Jun 26, 2014 at 2:23 PM, boci boci.b...@gmail.com wrote:

 Thanks. I without local option I can connect with es remote, now I
 only have one problem. How can I use elasticsearch-hadoop with spark
 streaming? I mean DStream doesn't have saveAsHadoopFiles method, my
 second problem the output index is depend by the input data.

 Thanks


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:

 You can just add elasticsearch-hadoop as a dependency to your
 project to user the ESInputFormat and ESOutputFormat (
 https://github.com/elasticsearch/elasticsearch-hadoop). Some other
 basics here:
 http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

 For testing, yes I think you will need to start ES in local mode
 (just ./bin/elasticsearch) and use the default config (host = localhost,
 port = 9200).


 On Thu, Jun 26, 2014 at 9:04 AM, boci boci.b...@gmail.com wrote:

 That's okay, but hadoop has ES integration. what happened if I run
 saveAsHadoopFile without hadoop (or I must need to pull up hadoop
 programatically? (if I can))

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau hol...@pigscanfly.ca
  wrote:



 On Wed, Jun 25, 2014 at 4:16 PM, boci boci.b...@gmail.com wrote:

 Hi guys, thanks the direction now I have some problem/question:
 - in local (test) mode I want to use ElasticClient.local to
 create es connection, but in prodution I want to use 
 ElasticClient.remote,
 to this I want to pass ElasticClient to mapPartitions, or what
 is the best practices?

 In this case you probably want to make the ElasticClient inside of
 mapPartitions (since it isn't serializable) and if you want to use a
 different 

RE: ElasticSearch enrich

2014-06-27 Thread Adrian Mocanu
b0c1http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=user_nodesuser=1215,
 could you post your code? I am interested in your solution.

Thanks
Adrian

From: boci [mailto:boci.b...@gmail.com]
Sent: June-26-14 6:17 PM
To: user@spark.apache.org
Subject: Re: ElasticSearch enrich

Wow, thanks your fast answer, it's help a lot...

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.commailto:boci.b...@gmail.com

On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau 
hol...@pigscanfly.camailto:hol...@pigscanfly.ca wrote:
Hi b0c1,

I have an example of how to do this in the repo for my talk as well, the 
specific example is at 
https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
 . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and then 
call  saveAsHadoopDataset on the RDD that gets passed into the function we 
provide to foreachRDD.

e.g.

stream.foreachRDD{(data, time) =
 val jobconf = ...
 data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
}

Hope that helps :)

Cheers,

Holden :)

On Thu, Jun 26, 2014 at 2:23 PM, boci 
boci.b...@gmail.commailto:boci.b...@gmail.com wrote:
Thanks. I without local option I can connect with es remote, now I only have 
one problem. How can I use elasticsearch-hadoop with spark streaming? I mean 
DStream doesn't have saveAsHadoopFiles method, my second problem the output 
index is depend by the input data.

Thanks

--
Skype: boci13, Hangout: boci.b...@gmail.commailto:boci.b...@gmail.com

On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath 
nick.pentre...@gmail.commailto:nick.pentre...@gmail.com wrote:
You can just add elasticsearch-hadoop as a dependency to your project to user 
the ESInputFormat and ESOutputFormat 
(https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics 
here: 
http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

For testing, yes I think you will need to start ES in local mode (just 
./bin/elasticsearch) and use the default config (host = localhost, port = 9200).

On Thu, Jun 26, 2014 at 9:04 AM, boci 
boci.b...@gmail.commailto:boci.b...@gmail.com wrote:
That's okay, but hadoop has ES integration. what happened if I run 
saveAsHadoopFile without hadoop (or I must need to pull up hadoop 
programatically? (if I can))

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.commailto:boci.b...@gmail.com

On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau 
hol...@pigscanfly.camailto:hol...@pigscanfly.ca wrote:

On Wed, Jun 25, 2014 at 4:16 PM, boci 
boci.b...@gmail.commailto:boci.b...@gmail.com wrote:
Hi guys, thanks the direction now I have some problem/question:
- in local (test) mode I want to use ElasticClient.local to create es 
connection, but in prodution I want to use ElasticClient.remote, to this I want 
to pass ElasticClient to mapPartitions, or what is the best practices?
In this case you probably want to make the ElasticClient inside of 
mapPartitions (since it isn't serializable) and if you want to use a different 
client in local mode just have a flag that control what type of client you 
create.
- my stream output is write into elasticsearch. How can I test 
output.saveAsHadoopFile[ESOutputFormat](-) in local environment?
- After store the enriched data into ES, I want to generate aggregated data 
(EsInputFormat) how can I test it in local?
I think the simplest thing to do would be use the same client in mode and just 
start single node elastic search cluster.

Thanks guys

b0c1



--
Skype: boci13, Hangout: boci.b...@gmail.commailto:boci.b...@gmail.com

On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau 
hol...@pigscanfly.camailto:hol...@pigscanfly.ca wrote:
So I'm giving a talk at the Spark summit on using Spark  ElasticSearch, but 
for now if you want to see a simple demo which uses elasticsearch for geo input 
you can take a look at my quick  dirty implementation with 
TopTweetsInALocation ( 
https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
 ). This approach uses the ESInputFormat which avoids the difficulty of having 
to manually create ElasticSearch clients.

This approach might not work for your data, e.g. if you need to create a query 
for each record in your RDD. If this is the case, you could instead look at 
using mapPartitions and setting up your Elasticsearch connection inside of 
that, so you could then re-use

Re: ElasticSearch enrich

2014-06-27 Thread boci
This is a simply scalatest. I start a SparkConf, set the master to local
(set the serializer etc), pull up kafka and es connection send a message to
kafka and wait 30sec to processing.

It's run in IDEA no magick trick.

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.com


On Fri, Jun 27, 2014 at 11:11 PM, Holden Karau hol...@pigscanfly.ca wrote:

 So a few quick questions:

 1) What cluster are you running this against? Is it just local? Have you
 tried local[4]?
 2) When you say breakpoint, how are you setting this break point? There is
 a good chance your breakpoint mechanism doesn't work in a distributed
 environment, could you instead cause a side effect (like writing to a file)?

 Cheers,

 Holden :)


 On Fri, Jun 27, 2014 at 2:04 PM, boci boci.b...@gmail.com wrote:

 Ok I found dynamic resources, but I have a frustrating problem. This is
 the flow:
 kafka - enrich X - enrich Y - enrich Z - foreachRDD - save

 My problem is: if I do this it's not work, the enrich functions not
 called, but if I put a print it's does. for example if I do this:
 kafka - enrich X - enrich Y - print - enrich Z - foreachRDD

 The enrich X and enrich Y called but enrich Z not
 if I put the print after the enrich Z it's will be printed. How can I
 solve this? (what can I do to call the foreachRDD I put breakpoint inside
 the map function (where I'm generate the writable) but it's not called)

 Any idea?

 b0c1




 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Fri, Jun 27, 2014 at 4:53 PM, boci boci.b...@gmail.com wrote:

 Another question. In the foreachRDD I will initialize the JobConf, but
 in this place how can I get information from the items?
 I have an identifier in the data which identify the required ES index
 (so how can I set dynamic index in the foreachRDD) ?

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau hol...@pigscanfly.ca
 wrote:

 Just your luck I happened to be working on that very talk today :) Let
 me know how your experiences with Elasticsearch  Spark go :)


 On Thu, Jun 26, 2014 at 3:17 PM, boci boci.b...@gmail.com wrote:

 Wow, thanks your fast answer, it's help a lot...

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau hol...@pigscanfly.ca
 wrote:

 Hi b0c1,

 I have an example of how to do this in the repo for my talk as well,
 the specific example is at
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
 . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
 then call  saveAsHadoopDataset on the RDD that gets passed into the
 function we provide to foreachRDD.

 e.g.

 stream.foreachRDD{(data, time) =
  val jobconf = ...
  data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
 }

 Hope that helps :)

 Cheers,

 Holden :)


 On Thu, Jun 26, 2014 at 2:23 PM, boci boci.b...@gmail.com wrote:

 Thanks. I without local option I can connect with es remote, now I
 only have one problem. How can I use elasticsearch-hadoop with spark
 streaming? I mean DStream doesn't have saveAsHadoopFiles method, my
 second problem the output index is depend by the input data.

 Thanks


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:

 You can just add elasticsearch-hadoop as a dependency to your
 project to user the ESInputFormat and ESOutputFormat (
 https://github.com/elasticsearch/elasticsearch-hadoop). Some other
 basics here:
 http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

 For testing, yes I think you will need to start ES in local mode
 (just ./bin/elasticsearch) and use the default config (host = 
 localhost,
 port = 9200).


 On Thu, Jun 26, 2014 at 9:04 AM, boci boci.b...@gmail.com wrote:

 That's okay, but hadoop has ES integration. what happened if I run
 saveAsHadoopFile without hadoop (or I must need to pull up hadoop
 programatically? (if I can))

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau 
 hol...@pigscanfly.ca wrote:



 On Wed, Jun 25, 2014 

Re: ElasticSearch enrich

2014-06-27 Thread Holden Karau
Try setting the master to local[4]


On Fri, Jun 27, 2014 at 2:17 PM, boci boci.b...@gmail.com wrote:

 This is a simply scalatest. I start a SparkConf, set the master to local
 (set the serializer etc), pull up kafka and es connection send a message to
 kafka and wait 30sec to processing.

 It's run in IDEA no magick trick.

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Fri, Jun 27, 2014 at 11:11 PM, Holden Karau hol...@pigscanfly.ca
 wrote:

 So a few quick questions:

 1) What cluster are you running this against? Is it just local? Have you
 tried local[4]?
 2) When you say breakpoint, how are you setting this break point? There
 is a good chance your breakpoint mechanism doesn't work in a distributed
 environment, could you instead cause a side effect (like writing to a file)?

 Cheers,

 Holden :)


 On Fri, Jun 27, 2014 at 2:04 PM, boci boci.b...@gmail.com wrote:

 Ok I found dynamic resources, but I have a frustrating problem. This is
 the flow:
 kafka - enrich X - enrich Y - enrich Z - foreachRDD - save

 My problem is: if I do this it's not work, the enrich functions not
 called, but if I put a print it's does. for example if I do this:
 kafka - enrich X - enrich Y - print - enrich Z - foreachRDD

 The enrich X and enrich Y called but enrich Z not
 if I put the print after the enrich Z it's will be printed. How can I
 solve this? (what can I do to call the foreachRDD I put breakpoint inside
 the map function (where I'm generate the writable) but it's not called)

 Any idea?

 b0c1




 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Fri, Jun 27, 2014 at 4:53 PM, boci boci.b...@gmail.com wrote:

 Another question. In the foreachRDD I will initialize the JobConf, but
 in this place how can I get information from the items?
 I have an identifier in the data which identify the required ES index
 (so how can I set dynamic index in the foreachRDD) ?

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau hol...@pigscanfly.ca
 wrote:

 Just your luck I happened to be working on that very talk today :) Let
 me know how your experiences with Elasticsearch  Spark go :)


 On Thu, Jun 26, 2014 at 3:17 PM, boci boci.b...@gmail.com wrote:

 Wow, thanks your fast answer, it's help a lot...

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau hol...@pigscanfly.ca
 wrote:

 Hi b0c1,

 I have an example of how to do this in the repo for my talk as well,
 the specific example is at
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
 . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
 then call  saveAsHadoopDataset on the RDD that gets passed into the
 function we provide to foreachRDD.

 e.g.

 stream.foreachRDD{(data, time) =
  val jobconf = ...
  data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
 }

 Hope that helps :)

 Cheers,

 Holden :)


 On Thu, Jun 26, 2014 at 2:23 PM, boci boci.b...@gmail.com wrote:

 Thanks. I without local option I can connect with es remote, now I
 only have one problem. How can I use elasticsearch-hadoop with spark
 streaming? I mean DStream doesn't have saveAsHadoopFiles method, my
 second problem the output index is depend by the input data.

 Thanks


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:

 You can just add elasticsearch-hadoop as a dependency to your
 project to user the ESInputFormat and ESOutputFormat (
 https://github.com/elasticsearch/elasticsearch-hadoop). Some
 other basics here:
 http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

 For testing, yes I think you will need to start ES in local mode
 (just ./bin/elasticsearch) and use the default config (host = 
 localhost,
 port = 9200).


 On Thu, Jun 26, 2014 at 9:04 AM, boci boci.b...@gmail.com wrote:

 That's okay, but hadoop has ES integration. what happened if I
 run saveAsHadoopFile without hadoop (or I must need to pull up hadoop
 programatically? (if I can))

 b0c1


 --
 Skype: boci13, Hangout: 

Re: ElasticSearch enrich

2014-06-26 Thread boci
That's okay, but hadoop has ES integration. what happened if I run
saveAsHadoopFile without hadoop (or I must need to pull up hadoop
programatically? (if I can))

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.com


On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau hol...@pigscanfly.ca wrote:



 On Wed, Jun 25, 2014 at 4:16 PM, boci boci.b...@gmail.com wrote:

 Hi guys, thanks the direction now I have some problem/question:
 - in local (test) mode I want to use ElasticClient.local to create es
 connection, but in prodution I want to use ElasticClient.remote, to this I
 want to pass ElasticClient to mapPartitions, or what is the best
 practices?

 In this case you probably want to make the ElasticClient inside of
 mapPartitions (since it isn't serializable) and if you want to use a
 different client in local mode just have a flag that control what type of
 client you create.

 - my stream output is write into elasticsearch. How can I
 test output.saveAsHadoopFile[ESOutputFormat](-) in local environment?

 - After store the enriched data into ES, I want to generate aggregated
 data (EsInputFormat) how can I test it in local?

 I think the simplest thing to do would be use the same client in mode and
 just start single node elastic search cluster.


 Thanks guys

 b0c1




 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau hol...@pigscanfly.ca
 wrote:

 So I'm giving a talk at the Spark summit on using Spark  ElasticSearch,
 but for now if you want to see a simple demo which uses elasticsearch for
 geo input you can take a look at my quick  dirty implementation with
 TopTweetsInALocation (
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
 ). This approach uses the ESInputFormat which avoids the difficulty of
 having to manually create ElasticSearch clients.

 This approach might not work for your data, e.g. if you need to create a
 query for each record in your RDD. If this is the case, you could instead
 look at using mapPartitions and setting up your Elasticsearch connection
 inside of that, so you could then re-use the client for all of the queries
 on each partition. This approach will avoid having to serialize the
 Elasticsearch connection because it will be local to your function.

 Hope this helps!

 Cheers,

 Holden :)


 On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi mayur.rust...@gmail.com
 wrote:

 Its not used as default serializer for some issues with compatibility 
 requirement to register the classes..

 Which part are you getting as nonserializable... you need to serialize
 that class if you are sending it to spark workers inside a map, reduce ,
 mappartition or any of the operations on RDD.


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng pc...@uow.edu.au wrote:

 I'm afraid persisting connection across two tasks is a dangerous act
 as they
 can't be guaranteed to be executed on the same machine. Your ES server
 may
 think its a man-in-the-middle attack!

 I think its possible to invoke a static method that give you a
 connection in
 a local 'pool', so nothing will sneak into your closure, but its too
 complex
 and there should be a better option.

 Never use kryo before, if its that good perhaps we should use it as the
 default serializer



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.





 --
 Cell : 425-233-8271





 --
 Cell : 425-233-8271



Re: ElasticSearch enrich

2014-06-26 Thread Nick Pentreath
You can just add elasticsearch-hadoop as a dependency to your project to
user the ESInputFormat and ESOutputFormat (
https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics
here:
http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

For testing, yes I think you will need to start ES in local mode (just
./bin/elasticsearch) and use the default config (host = localhost, port =
9200).


On Thu, Jun 26, 2014 at 9:04 AM, boci boci.b...@gmail.com wrote:

 That's okay, but hadoop has ES integration. what happened if I run
 saveAsHadoopFile without hadoop (or I must need to pull up hadoop
 programatically? (if I can))

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau hol...@pigscanfly.ca
 wrote:



 On Wed, Jun 25, 2014 at 4:16 PM, boci boci.b...@gmail.com wrote:

 Hi guys, thanks the direction now I have some problem/question:
 - in local (test) mode I want to use ElasticClient.local to create es
 connection, but in prodution I want to use ElasticClient.remote, to this I
 want to pass ElasticClient to mapPartitions, or what is the best
 practices?

 In this case you probably want to make the ElasticClient inside of
 mapPartitions (since it isn't serializable) and if you want to use a
 different client in local mode just have a flag that control what type of
 client you create.

 - my stream output is write into elasticsearch. How can I
 test output.saveAsHadoopFile[ESOutputFormat](-) in local environment?

 - After store the enriched data into ES, I want to generate aggregated
 data (EsInputFormat) how can I test it in local?

 I think the simplest thing to do would be use the same client in mode and
 just start single node elastic search cluster.


 Thanks guys

 b0c1




 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau hol...@pigscanfly.ca
 wrote:

 So I'm giving a talk at the Spark summit on using Spark 
 ElasticSearch, but for now if you want to see a simple demo which uses
 elasticsearch for geo input you can take a look at my quick  dirty
 implementation with TopTweetsInALocation (
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
 ). This approach uses the ESInputFormat which avoids the difficulty of
 having to manually create ElasticSearch clients.

 This approach might not work for your data, e.g. if you need to create
 a query for each record in your RDD. If this is the case, you could instead
 look at using mapPartitions and setting up your Elasticsearch connection
 inside of that, so you could then re-use the client for all of the queries
 on each partition. This approach will avoid having to serialize the
 Elasticsearch connection because it will be local to your function.

 Hope this helps!

 Cheers,

 Holden :)


 On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi mayur.rust...@gmail.com
  wrote:

 Its not used as default serializer for some issues with compatibility
  requirement to register the classes..

 Which part are you getting as nonserializable... you need to serialize
 that class if you are sending it to spark workers inside a map, reduce ,
 mappartition or any of the operations on RDD.


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng pc...@uow.edu.au wrote:

 I'm afraid persisting connection across two tasks is a dangerous act
 as they
 can't be guaranteed to be executed on the same machine. Your ES
 server may
 think its a man-in-the-middle attack!

 I think its possible to invoke a static method that give you a
 connection in
 a local 'pool', so nothing will sneak into your closure, but its too
 complex
 and there should be a better option.

 Never use kryo before, if its that good perhaps we should use it as
 the
 default serializer



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.





 --
 Cell : 425-233-8271





 --
 Cell : 425-233-8271





Re: ElasticSearch enrich

2014-06-26 Thread boci
Thanks. I without local option I can connect with es remote, now I only
have one problem. How can I use elasticsearch-hadoop with spark streaming?
I mean DStream doesn't have saveAsHadoopFiles method, my second problem
the output index is depend by the input data.

Thanks

--
Skype: boci13, Hangout: boci.b...@gmail.com


On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath nick.pentre...@gmail.com
wrote:

 You can just add elasticsearch-hadoop as a dependency to your project to
 user the ESInputFormat and ESOutputFormat (
 https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics
 here:
 http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

 For testing, yes I think you will need to start ES in local mode (just
 ./bin/elasticsearch) and use the default config (host = localhost, port =
 9200).


 On Thu, Jun 26, 2014 at 9:04 AM, boci boci.b...@gmail.com wrote:

 That's okay, but hadoop has ES integration. what happened if I run
 saveAsHadoopFile without hadoop (or I must need to pull up hadoop
 programatically? (if I can))

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau hol...@pigscanfly.ca
 wrote:



 On Wed, Jun 25, 2014 at 4:16 PM, boci boci.b...@gmail.com wrote:

 Hi guys, thanks the direction now I have some problem/question:
 - in local (test) mode I want to use ElasticClient.local to create es
 connection, but in prodution I want to use ElasticClient.remote, to this I
 want to pass ElasticClient to mapPartitions, or what is the best
 practices?

 In this case you probably want to make the ElasticClient inside of
 mapPartitions (since it isn't serializable) and if you want to use a
 different client in local mode just have a flag that control what type of
 client you create.

 - my stream output is write into elasticsearch. How can I
 test output.saveAsHadoopFile[ESOutputFormat](-) in local environment?


 - After store the enriched data into ES, I want to generate aggregated
 data (EsInputFormat) how can I test it in local?

 I think the simplest thing to do would be use the same client in mode
 and just start single node elastic search cluster.


 Thanks guys

 b0c1




 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau hol...@pigscanfly.ca
 wrote:

 So I'm giving a talk at the Spark summit on using Spark 
 ElasticSearch, but for now if you want to see a simple demo which uses
 elasticsearch for geo input you can take a look at my quick  dirty
 implementation with TopTweetsInALocation (
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
 ). This approach uses the ESInputFormat which avoids the difficulty of
 having to manually create ElasticSearch clients.

 This approach might not work for your data, e.g. if you need to create
 a query for each record in your RDD. If this is the case, you could 
 instead
 look at using mapPartitions and setting up your Elasticsearch connection
 inside of that, so you could then re-use the client for all of the queries
 on each partition. This approach will avoid having to serialize the
 Elasticsearch connection because it will be local to your function.

 Hope this helps!

 Cheers,

 Holden :)


 On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi 
 mayur.rust...@gmail.com wrote:

 Its not used as default serializer for some issues with compatibility
  requirement to register the classes..

 Which part are you getting as nonserializable... you need to
 serialize that class if you are sending it to spark workers inside a map,
 reduce , mappartition or any of the operations on RDD.


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng pc...@uow.edu.au wrote:

 I'm afraid persisting connection across two tasks is a dangerous act
 as they
 can't be guaranteed to be executed on the same machine. Your ES
 server may
 think its a man-in-the-middle attack!

 I think its possible to invoke a static method that give you a
 connection in
 a local 'pool', so nothing will sneak into your closure, but its too
 complex
 and there should be a better option.

 Never use kryo before, if its that good perhaps we should use it as
 the
 default serializer



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.





 --
 Cell : 425-233-8271


Re: ElasticSearch enrich

2014-06-26 Thread Holden Karau
Hi b0c1,

I have an example of how to do this in the repo for my talk as well, the
specific example is at
https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
. Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
then call  saveAsHadoopDataset on the RDD that gets passed into the
function we provide to foreachRDD.

e.g.

stream.foreachRDD{(data, time) =
 val jobconf = ...
 data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
}

Hope that helps :)

Cheers,

Holden :)


On Thu, Jun 26, 2014 at 2:23 PM, boci boci.b...@gmail.com wrote:

 Thanks. I without local option I can connect with es remote, now I only
 have one problem. How can I use elasticsearch-hadoop with spark streaming?
 I mean DStream doesn't have saveAsHadoopFiles method, my second problem
 the output index is depend by the input data.

 Thanks


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath nick.pentre...@gmail.com
  wrote:

 You can just add elasticsearch-hadoop as a dependency to your project to
 user the ESInputFormat and ESOutputFormat (
 https://github.com/elasticsearch/elasticsearch-hadoop). Some other
 basics here:
 http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

 For testing, yes I think you will need to start ES in local mode (just
 ./bin/elasticsearch) and use the default config (host = localhost, port =
 9200).


 On Thu, Jun 26, 2014 at 9:04 AM, boci boci.b...@gmail.com wrote:

 That's okay, but hadoop has ES integration. what happened if I run
 saveAsHadoopFile without hadoop (or I must need to pull up hadoop
 programatically? (if I can))

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau hol...@pigscanfly.ca
 wrote:



 On Wed, Jun 25, 2014 at 4:16 PM, boci boci.b...@gmail.com wrote:

 Hi guys, thanks the direction now I have some problem/question:
 - in local (test) mode I want to use ElasticClient.local to create es
 connection, but in prodution I want to use ElasticClient.remote, to this I
 want to pass ElasticClient to mapPartitions, or what is the best
 practices?

 In this case you probably want to make the ElasticClient inside of
 mapPartitions (since it isn't serializable) and if you want to use a
 different client in local mode just have a flag that control what type of
 client you create.

 - my stream output is write into elasticsearch. How can I
 test output.saveAsHadoopFile[ESOutputFormat](-) in local environment?


 - After store the enriched data into ES, I want to generate aggregated
 data (EsInputFormat) how can I test it in local?

 I think the simplest thing to do would be use the same client in mode
 and just start single node elastic search cluster.


 Thanks guys

 b0c1




 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau hol...@pigscanfly.ca
 wrote:

 So I'm giving a talk at the Spark summit on using Spark 
 ElasticSearch, but for now if you want to see a simple demo which uses
 elasticsearch for geo input you can take a look at my quick  dirty
 implementation with TopTweetsInALocation (
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
 ). This approach uses the ESInputFormat which avoids the difficulty of
 having to manually create ElasticSearch clients.

 This approach might not work for your data, e.g. if you need to
 create a query for each record in your RDD. If this is the case, you 
 could
 instead look at using mapPartitions and setting up your Elasticsearch
 connection inside of that, so you could then re-use the client for all of
 the queries on each partition. This approach will avoid having to 
 serialize
 the Elasticsearch connection because it will be local to your function.

 Hope this helps!

 Cheers,

 Holden :)


 On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi 
 mayur.rust...@gmail.com wrote:

 Its not used as default serializer for some issues with
 compatibility  requirement to register the classes..

 Which part are you getting as nonserializable... you need to
 serialize that class if you are sending it to spark workers inside a 
 map,
 reduce , mappartition or any of the operations on RDD.


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng pc...@uow.edu.au
 wrote:

 I'm afraid persisting connection across two tasks is a 

Re: ElasticSearch enrich

2014-06-26 Thread boci
Wow, thanks your fast answer, it's help a lot...

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.com


On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau hol...@pigscanfly.ca wrote:

 Hi b0c1,

 I have an example of how to do this in the repo for my talk as well, the
 specific example is at
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
 . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
 then call  saveAsHadoopDataset on the RDD that gets passed into the
 function we provide to foreachRDD.

 e.g.

 stream.foreachRDD{(data, time) =
  val jobconf = ...
  data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
 }

 Hope that helps :)

 Cheers,

 Holden :)


 On Thu, Jun 26, 2014 at 2:23 PM, boci boci.b...@gmail.com wrote:

 Thanks. I without local option I can connect with es remote, now I only
 have one problem. How can I use elasticsearch-hadoop with spark streaming?
 I mean DStream doesn't have saveAsHadoopFiles method, my second problem
 the output index is depend by the input data.

 Thanks


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:

 You can just add elasticsearch-hadoop as a dependency to your project to
 user the ESInputFormat and ESOutputFormat (
 https://github.com/elasticsearch/elasticsearch-hadoop). Some other
 basics here:
 http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

 For testing, yes I think you will need to start ES in local mode (just
 ./bin/elasticsearch) and use the default config (host = localhost, port =
 9200).


 On Thu, Jun 26, 2014 at 9:04 AM, boci boci.b...@gmail.com wrote:

 That's okay, but hadoop has ES integration. what happened if I run
 saveAsHadoopFile without hadoop (or I must need to pull up hadoop
 programatically? (if I can))

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau hol...@pigscanfly.ca
 wrote:



 On Wed, Jun 25, 2014 at 4:16 PM, boci boci.b...@gmail.com wrote:

 Hi guys, thanks the direction now I have some problem/question:
 - in local (test) mode I want to use ElasticClient.local to create es
 connection, but in prodution I want to use ElasticClient.remote, to this 
 I
 want to pass ElasticClient to mapPartitions, or what is the best
 practices?

 In this case you probably want to make the ElasticClient inside of
 mapPartitions (since it isn't serializable) and if you want to use a
 different client in local mode just have a flag that control what type of
 client you create.

 - my stream output is write into elasticsearch. How can I
 test output.saveAsHadoopFile[ESOutputFormat](-) in local environment?


 - After store the enriched data into ES, I want to generate aggregated
 data (EsInputFormat) how can I test it in local?

 I think the simplest thing to do would be use the same client in mode
 and just start single node elastic search cluster.


 Thanks guys

 b0c1




 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau hol...@pigscanfly.ca
 wrote:

 So I'm giving a talk at the Spark summit on using Spark 
 ElasticSearch, but for now if you want to see a simple demo which uses
 elasticsearch for geo input you can take a look at my quick  dirty
 implementation with TopTweetsInALocation (
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
 ). This approach uses the ESInputFormat which avoids the difficulty of
 having to manually create ElasticSearch clients.

 This approach might not work for your data, e.g. if you need to
 create a query for each record in your RDD. If this is the case, you 
 could
 instead look at using mapPartitions and setting up your Elasticsearch
 connection inside of that, so you could then re-use the client for all 
 of
 the queries on each partition. This approach will avoid having to 
 serialize
 the Elasticsearch connection because it will be local to your function.

 Hope this helps!

 Cheers,

 Holden :)


 On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi 
 mayur.rust...@gmail.com wrote:

 Its not used as default serializer for some issues with
 compatibility  requirement to register the classes..

 Which part are you getting as nonserializable... you need to
 serialize that class if you are sending it to spark workers 

Re: ElasticSearch enrich

2014-06-26 Thread Holden Karau
Just your luck I happened to be working on that very talk today :) Let me
know how your experiences with Elasticsearch  Spark go :)


On Thu, Jun 26, 2014 at 3:17 PM, boci boci.b...@gmail.com wrote:

 Wow, thanks your fast answer, it's help a lot...

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau hol...@pigscanfly.ca
 wrote:

 Hi b0c1,

 I have an example of how to do this in the repo for my talk as well, the
 specific example is at
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
 . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
 then call  saveAsHadoopDataset on the RDD that gets passed into the
 function we provide to foreachRDD.

 e.g.

 stream.foreachRDD{(data, time) =
  val jobconf = ...
  data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
 }

 Hope that helps :)

 Cheers,

 Holden :)


 On Thu, Jun 26, 2014 at 2:23 PM, boci boci.b...@gmail.com wrote:

 Thanks. I without local option I can connect with es remote, now I only
 have one problem. How can I use elasticsearch-hadoop with spark streaming?
 I mean DStream doesn't have saveAsHadoopFiles method, my second problem
 the output index is depend by the input data.

 Thanks


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:

 You can just add elasticsearch-hadoop as a dependency to your project
 to user the ESInputFormat and ESOutputFormat (
 https://github.com/elasticsearch/elasticsearch-hadoop). Some other
 basics here:
 http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

 For testing, yes I think you will need to start ES in local mode (just
 ./bin/elasticsearch) and use the default config (host = localhost, port =
 9200).


 On Thu, Jun 26, 2014 at 9:04 AM, boci boci.b...@gmail.com wrote:

 That's okay, but hadoop has ES integration. what happened if I run
 saveAsHadoopFile without hadoop (or I must need to pull up hadoop
 programatically? (if I can))

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau hol...@pigscanfly.ca
 wrote:



 On Wed, Jun 25, 2014 at 4:16 PM, boci boci.b...@gmail.com wrote:

 Hi guys, thanks the direction now I have some problem/question:
 - in local (test) mode I want to use ElasticClient.local to create
 es connection, but in prodution I want to use ElasticClient.remote, to 
 this
 I want to pass ElasticClient to mapPartitions, or what is the best
 practices?

 In this case you probably want to make the ElasticClient inside of
 mapPartitions (since it isn't serializable) and if you want to use a
 different client in local mode just have a flag that control what type of
 client you create.

 - my stream output is write into elasticsearch. How can I
 test output.saveAsHadoopFile[ESOutputFormat](-) in local environment?


 - After store the enriched data into ES, I want to generate
 aggregated data (EsInputFormat) how can I test it in local?

 I think the simplest thing to do would be use the same client in mode
 and just start single node elastic search cluster.


 Thanks guys

 b0c1




 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau hol...@pigscanfly.ca
 wrote:

 So I'm giving a talk at the Spark summit on using Spark 
 ElasticSearch, but for now if you want to see a simple demo which uses
 elasticsearch for geo input you can take a look at my quick  dirty
 implementation with TopTweetsInALocation (
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
 ). This approach uses the ESInputFormat which avoids the difficulty of
 having to manually create ElasticSearch clients.

 This approach might not work for your data, e.g. if you need to
 create a query for each record in your RDD. If this is the case, you 
 could
 instead look at using mapPartitions and setting up your Elasticsearch
 connection inside of that, so you could then re-use the client for all 
 of
 the queries on each partition. This approach will avoid having to 
 serialize
 the Elasticsearch connection because it will be local to your function.

 Hope this helps!

 Cheers,

 Holden :)


 On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi 
 mayur.rust...@gmail.com wrote:

 Its not used as default 

Re: ElasticSearch enrich

2014-06-25 Thread boci
Hi guys, thanks the direction now I have some problem/question:
- in local (test) mode I want to use ElasticClient.local to create es
connection, but in prodution I want to use ElasticClient.remote, to this I
want to pass ElasticClient to mapPartitions, or what is the best practices?
- my stream output is write into elasticsearch. How can I
test output.saveAsHadoopFile[ESOutputFormat](-) in local environment?
- After store the enriched data into ES, I want to generate aggregated data
(EsInputFormat) how can I test it in local?

Thanks guys

b0c1



--
Skype: boci13, Hangout: boci.b...@gmail.com


On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau hol...@pigscanfly.ca wrote:

 So I'm giving a talk at the Spark summit on using Spark  ElasticSearch,
 but for now if you want to see a simple demo which uses elasticsearch for
 geo input you can take a look at my quick  dirty implementation with
 TopTweetsInALocation (
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
 ). This approach uses the ESInputFormat which avoids the difficulty of
 having to manually create ElasticSearch clients.

 This approach might not work for your data, e.g. if you need to create a
 query for each record in your RDD. If this is the case, you could instead
 look at using mapPartitions and setting up your Elasticsearch connection
 inside of that, so you could then re-use the client for all of the queries
 on each partition. This approach will avoid having to serialize the
 Elasticsearch connection because it will be local to your function.

 Hope this helps!

 Cheers,

 Holden :)


 On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi mayur.rust...@gmail.com
 wrote:

 Its not used as default serializer for some issues with compatibility 
 requirement to register the classes..

 Which part are you getting as nonserializable... you need to serialize
 that class if you are sending it to spark workers inside a map, reduce ,
 mappartition or any of the operations on RDD.


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng pc...@uow.edu.au wrote:

 I'm afraid persisting connection across two tasks is a dangerous act as
 they
 can't be guaranteed to be executed on the same machine. Your ES server
 may
 think its a man-in-the-middle attack!

 I think its possible to invoke a static method that give you a
 connection in
 a local 'pool', so nothing will sneak into your closure, but its too
 complex
 and there should be a better option.

 Never use kryo before, if its that good perhaps we should use it as the
 default serializer



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.





 --
 Cell : 425-233-8271



Re: ElasticSearch enrich

2014-06-25 Thread Holden Karau
On Wed, Jun 25, 2014 at 4:16 PM, boci boci.b...@gmail.com wrote:

 Hi guys, thanks the direction now I have some problem/question:
 - in local (test) mode I want to use ElasticClient.local to create es
 connection, but in prodution I want to use ElasticClient.remote, to this I
 want to pass ElasticClient to mapPartitions, or what is the best
 practices?

In this case you probably want to make the ElasticClient inside of
mapPartitions (since it isn't serializable) and if you want to use a
different client in local mode just have a flag that control what type of
client you create.

 - my stream output is write into elasticsearch. How can I
 test output.saveAsHadoopFile[ESOutputFormat](-) in local environment?

- After store the enriched data into ES, I want to generate aggregated data
 (EsInputFormat) how can I test it in local?

I think the simplest thing to do would be use the same client in mode and
just start single node elastic search cluster.


 Thanks guys

 b0c1




 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau hol...@pigscanfly.ca
 wrote:

 So I'm giving a talk at the Spark summit on using Spark  ElasticSearch,
 but for now if you want to see a simple demo which uses elasticsearch for
 geo input you can take a look at my quick  dirty implementation with
 TopTweetsInALocation (
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
 ). This approach uses the ESInputFormat which avoids the difficulty of
 having to manually create ElasticSearch clients.

 This approach might not work for your data, e.g. if you need to create a
 query for each record in your RDD. If this is the case, you could instead
 look at using mapPartitions and setting up your Elasticsearch connection
 inside of that, so you could then re-use the client for all of the queries
 on each partition. This approach will avoid having to serialize the
 Elasticsearch connection because it will be local to your function.

 Hope this helps!

 Cheers,

 Holden :)


 On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi mayur.rust...@gmail.com
 wrote:

 Its not used as default serializer for some issues with compatibility 
 requirement to register the classes..

 Which part are you getting as nonserializable... you need to serialize
 that class if you are sending it to spark workers inside a map, reduce ,
 mappartition or any of the operations on RDD.


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng pc...@uow.edu.au wrote:

 I'm afraid persisting connection across two tasks is a dangerous act as
 they
 can't be guaranteed to be executed on the same machine. Your ES server
 may
 think its a man-in-the-middle attack!

 I think its possible to invoke a static method that give you a
 connection in
 a local 'pool', so nothing will sneak into your closure, but its too
 complex
 and there should be a better option.

 Never use kryo before, if its that good perhaps we should use it as the
 default serializer



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.





 --
 Cell : 425-233-8271





-- 
Cell : 425-233-8271


Re: ElasticSearch enrich

2014-06-24 Thread Peng Cheng
make sure all queries are called through class methods and wrap your query
info with a class having only simple properties (strings, collections etc).
If you can't find such wrapper you can also use SerializableWritable wrapper
out-of-the-box, but its not recommended. (developer-api and make fat
closures that run slowly)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8214.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: ElasticSearch enrich

2014-06-24 Thread boci
Ok but in this case where can I store the ES connection? Or all document
create new ES connection inside the worker?

--
Skype: boci13, Hangout: boci.b...@gmail.com


On Wed, Jun 25, 2014 at 1:01 AM, Peng Cheng pc...@uow.edu.au wrote:

 make sure all queries are called through class methods and wrap your query
 info with a class having only simple properties (strings, collections etc).
 If you can't find such wrapper you can also use SerializableWritable
 wrapper
 out-of-the-box, but its not recommended. (developer-api and make fat
 closures that run slowly)



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8214.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: ElasticSearch enrich

2014-06-24 Thread Mayur Rustagi
Mostly ES client is not serializable for you. You can do 3 workarounds,
1. Switch to kryo serialization, register the client in kryo , might solve
your serialization issue
2. Use mappartition for all your data  initialize your client in the
mappartition code, this will create client for each partition, reduce some
parallelism  add some overhead of creation of client but prevent
serialization of esclient  transfer to workers
3. Use serializablewrapper to serialize your ESclient manually  send it
across  deserialize it manually, this may or may not work depending on
whether your class is safely serializable.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Wed, Jun 25, 2014 at 4:12 AM, boci boci.b...@gmail.com wrote:

 Hi guys,

 I have a small question. I want to create a Worker class which using
 ElasticClient to make query to elasticsearch. (I want to enrich my data
 with geo search result).

 How can I do that? I try to create a worker instance with ES host/port
 parameter but spark throw an exceptino (my class not serializable).

 Any idea?

 Thanks
 b0c1




Re: ElasticSearch enrich

2014-06-24 Thread Peng Cheng
I'm afraid persisting connection across two tasks is a dangerous act as they
can't be guaranteed to be executed on the same machine. Your ES server may
think its a man-in-the-middle attack!

I think its possible to invoke a static method that give you a connection in
a local 'pool', so nothing will sneak into your closure, but its too complex
and there should be a better option.

Never use kryo before, if its that good perhaps we should use it as the
default serializer



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: ElasticSearch enrich

2014-06-24 Thread Mayur Rustagi
Its not used as default serializer for some issues with compatibility 
requirement to register the classes..

Which part are you getting as nonserializable... you need to serialize that
class if you are sending it to spark workers inside a map, reduce ,
mappartition or any of the operations on RDD.


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng pc...@uow.edu.au wrote:

 I'm afraid persisting connection across two tasks is a dangerous act as
 they
 can't be guaranteed to be executed on the same machine. Your ES server may
 think its a man-in-the-middle attack!

 I think its possible to invoke a static method that give you a connection
 in
 a local 'pool', so nothing will sneak into your closure, but its too
 complex
 and there should be a better option.

 Never use kryo before, if its that good perhaps we should use it as the
 default serializer



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.