Re: OutOfMemory error with Spark ML 1.5 logreg example

2015-09-07 Thread boci
Hi,

Can you try to using save method instead of write?

ex: out_df.save("path","parquet")

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.com

On Mon, Sep 7, 2015 at 3:35 PM, Zoltán Tóth  wrote:

> Aaand, the error! :)
>
> Exception in thread "org.apache.hadoop.hdfs.PeerCache@4e000abf"
> Exception: java.lang.OutOfMemoryError thrown from the 
> UncaughtExceptionHandler in thread "org.apache.hadoop.hdfs.PeerCache@4e000abf"
> Exception in thread "Thread-7"
> Exception: java.lang.OutOfMemoryError thrown from the 
> UncaughtExceptionHandler in thread "Thread-7"
> Exception in thread "LeaseRenewer:r...@docker.rapidminer.com:8020"
> Exception: java.lang.OutOfMemoryError thrown from the 
> UncaughtExceptionHandler in thread 
> "LeaseRenewer:r...@docker.rapidminer.com:8020"
> Exception in thread "Reporter"
> Exception: java.lang.OutOfMemoryError thrown from the 
> UncaughtExceptionHandler in thread "Reporter"
> Exception in thread "qtp2115718813-47"
> Exception: java.lang.OutOfMemoryError thrown from the 
> UncaughtExceptionHandler in thread "qtp2115718813-47"
>
> Exception: java.lang.OutOfMemoryError thrown from the 
> UncaughtExceptionHandler in thread "sparkDriver-scheduler-1"
>
> Log Type: stdout
>
> Log Upload Time: Mon Sep 07 09:03:01 -0400 2015
>
> Log Length: 986
>
> Traceback (most recent call last):
>   File "spark-ml.py", line 33, in 
> out_df.write.parquet("/tmp/logparquet")
>   File 
> "/var/lib/hadoop-yarn/cache/yarn/nm-local-dir/usercache/root/appcache/application_1441224592929_0022/container_1441224592929_0022_01_01/pyspark.zip/pyspark/sql/readwriter.py",
>  line 422, in parquet
>   File 
> "/var/lib/hadoop-yarn/cache/yarn/nm-local-dir/usercache/root/appcache/application_1441224592929_0022/container_1441224592929_0022_01_01/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>  line 538, in __call__
>   File 
> "/var/lib/hadoop-yarn/cache/yarn/nm-local-dir/usercache/root/appcache/application_1441224592929_0022/container_1441224592929_0022_01_01/pyspark.zip/pyspark/sql/utils.py",
>  line 36, in deco
>   File 
> "/var/lib/hadoop-yarn/cache/yarn/nm-local-dir/usercache/root/appcache/application_1441224592929_0022/container_1441224592929_0022_01_01/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>  line 300, in get_return_value
> py4j.protocol.Py4JJavaError
>
>
>
> On Mon, Sep 7, 2015 at 3:27 PM, Zoltán Tóth  wrote:
>
>> Hi,
>>
>> When I execute the Spark ML Logisitc Regression example in pyspark I run
>> into an OutOfMemory exception. I'm wondering if any of you experienced the
>> same or has a hint about how to fix this.
>>
>> The interesting bit is that I only get the exception when I try to write
>> the result DataFrame into a file. If I only "print" any of the results, it
>> all works fine.
>>
>> My Setup:
>> Spark 1.5.0-SNAPSHOT built for Hadoop 2.6.0 (I'm working with the latest
>> nightly build)
>> Build flags: -Psparkr -Phadoop-2.6 -Phive -Phive-thriftserver -Pyarn
>> -DzincPort=3034
>>
>> I'm using the default resource setup
>> 15/09/07 08:49:04 INFO yarn.YarnAllocator: Will request 2 executor
>> containers, each with 1 cores and 1408 MB memory including 384 MB overhead
>> 15/09/07 08:49:04 INFO yarn.YarnAllocator: Container request (host: Any,
>> capability: )
>> 15/09/07 08:49:04 INFO yarn.YarnAllocator: Container request (host: Any,
>> capability: )
>>
>> The script I'm executing:
>> from pyspark import SparkContext, SparkConf
>> from pyspark.sql import SQLContext
>>
>> conf = SparkConf().setAppName("pysparktest")
>> sc = SparkContext(conf=conf)
>> sqlContext = SQLContext(sc)
>>
>> from pyspark.mllib.regression import LabeledPoint
>> from pyspark.mllib.linalg import Vector, Vectors
>>
>> training = sc.parallelize((
>>   LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
>>   LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
>>   LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
>>   LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5
>>
>> training_df = training.toDF()
>>
>> from pyspark.ml.classification import LogisticRegression
>>
>> reg = LogisticRegression()
>>
>> reg.setMaxIter(10).setRegParam(0.01)
>> model = reg.fit(training.toDF())
>>
>> test = sc.parallelize((
>>   LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
>>   LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
>>   LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5
>>
>> out_df = model.transform(test.toDF())
>>
>> out_df.write.parquet("/tmp/logparquet")
>>
>> And the command:
>> spark-submit --master yarn --deploy-mode cluster spark-ml.py
>>
>> Thanks,
>> z
>>
>
>


Re: Mesos + Spark

2015-07-24 Thread boci
Thanks,
Mesos will show spark is driver is running, but what happened if my batch
job finished? How can I reschedule without chronos ? Can I submit a job
without start it?

Thanks

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.com

On Fri, Jul 24, 2015 at 11:52 PM, Dean Wampler 
wrote:

> When running Spark in Mesos cluster mode, the driver program runs in one
> of the cluster nodes, like the other Spark processes that are spawned. You
> won't need a special node for this purpose. I'm not very familiar with
> Chronos, but its UI or the regular Mesos UI should show you where the
> driver is running, then you can use the Spark web UI on that machine to see
> what the Spark job is doing.
>
> dean
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
> Typesafe <http://typesafe.com>
> @deanwampler <http://twitter.com/deanwampler>
> http://polyglotprogramming.com
>
> On Fri, Jul 24, 2015 at 4:47 PM, boci  wrote:
>
>> Thanks, but something is not clear...
>> I have the mesos cluster.
>> - I want to submit my application and scheduled with chronos.
>> - For cluster mode I need a dispatcher, this is another container
>> (machine in the real world)? What will this do? It's needed when I using
>> chronos?
>> - How can I access to my spark job from chronos?
>>
>> I think submit in client mode is not fit to my condition, that's right?
>>
>> Thanks
>> b0c1
>>
>>
>> --
>> Skype: boci13, Hangout: boci.b...@gmail.com
>>
>> On Wed, Jul 22, 2015 at 4:51 PM, Dean Wampler 
>> wrote:
>>
>>> This page, http://spark.apache.org/docs/latest/running-on-mesos.html,
>>> covers many of these questions. If you submit a job with the option
>>> "--supervise", it will be restarted if it fails.
>>>
>>> You can use Chronos for scheduling. You can create a single streaming
>>> job with a 10 minute batch interval, if that works for your every 10-min.
>>> need.
>>>
>>> Dean Wampler, Ph.D.
>>> Author: Programming Scala, 2nd Edition
>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>> Typesafe <http://typesafe.com>
>>> @deanwampler <http://twitter.com/deanwampler>
>>> http://polyglotprogramming.com
>>>
>>> On Wed, Jul 22, 2015 at 3:53 AM, boci  wrote:
>>>
>>>> Hi guys!
>>>>
>>>> I'm a new in mesos. I have two spark application (one streaming and one
>>>> batch). I want to run both app in mesos cluster. Now for testing I want to
>>>> run in docker container so I started a simple redjack/mesos-master, but I
>>>> think a lot of think unclear for me (both mesos and spark-mesos).
>>>>
>>>> If I have a mesos cluster (for testing it will be some docker
>>>> container) i need a separate machine (container) to run my spark job? Or
>>>> can I submit the cluster and schedule (chronos or I dunno)?
>>>> How can I run the streaming job? What happened if the "controller"
>>>> died? Or if I call spark-submit with master=mesos my application started
>>>> and I can forget? How can I run in every 10 min without submit in every 10
>>>> min? How can I run my streaming app in HA mode?
>>>>
>>>> Thanks
>>>>
>>>> b0c1
>>>>
>>>>
>>>> --
>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>
>>>
>>>
>>
>


Re: Mesos + Spark

2015-07-24 Thread boci
Thanks, but something is not clear...
I have the mesos cluster.
- I want to submit my application and scheduled with chronos.
- For cluster mode I need a dispatcher, this is another container (machine
in the real world)? What will this do? It's needed when I using chronos?
- How can I access to my spark job from chronos?

I think submit in client mode is not fit to my condition, that's right?

Thanks
b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.com

On Wed, Jul 22, 2015 at 4:51 PM, Dean Wampler  wrote:

> This page, http://spark.apache.org/docs/latest/running-on-mesos.html,
> covers many of these questions. If you submit a job with the option
> "--supervise", it will be restarted if it fails.
>
> You can use Chronos for scheduling. You can create a single streaming job
> with a 10 minute batch interval, if that works for your every 10-min. need.
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
> Typesafe <http://typesafe.com>
> @deanwampler <http://twitter.com/deanwampler>
> http://polyglotprogramming.com
>
> On Wed, Jul 22, 2015 at 3:53 AM, boci  wrote:
>
>> Hi guys!
>>
>> I'm a new in mesos. I have two spark application (one streaming and one
>> batch). I want to run both app in mesos cluster. Now for testing I want to
>> run in docker container so I started a simple redjack/mesos-master, but I
>> think a lot of think unclear for me (both mesos and spark-mesos).
>>
>> If I have a mesos cluster (for testing it will be some docker container)
>> i need a separate machine (container) to run my spark job? Or can I submit
>> the cluster and schedule (chronos or I dunno)?
>> How can I run the streaming job? What happened if the "controller" died?
>> Or if I call spark-submit with master=mesos my application started and I
>> can forget? How can I run in every 10 min without submit in every 10 min?
>> How can I run my streaming app in HA mode?
>>
>> Thanks
>>
>> b0c1
>>
>>
>> --
>> Skype: boci13, Hangout: boci.b...@gmail.com
>>
>
>


Mesos + Spark

2015-07-22 Thread boci
Hi guys!

I'm a new in mesos. I have two spark application (one streaming and one
batch). I want to run both app in mesos cluster. Now for testing I want to
run in docker container so I started a simple redjack/mesos-master, but I
think a lot of think unclear for me (both mesos and spark-mesos).

If I have a mesos cluster (for testing it will be some docker container) i
need a separate machine (container) to run my spark job? Or can I submit
the cluster and schedule (chronos or I dunno)?
How can I run the streaming job? What happened if the "controller" died? Or
if I call spark-submit with master=mesos my application started and I can
forget? How can I run in every 10 min without submit in every 10 min? How
can I run my streaming app in HA mode?

Thanks

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.com


spark submit debugging

2015-05-28 Thread boci
Hi!

I have a little problem... If I started my spark application as java app
(locally) it's work like a charm, but if I start in hadoop cluster (tried
spark-submit --master local[5] and --master yarn-client), but it's not
working. No error, no exception, periodically run the job but nothing
happened.

(I using docker container)

Thanks

b0c1
--
Skype: boci13, Hangout: boci.b...@gmail.com


Spark streaming with kafka

2015-05-28 Thread boci
Hi guys,

I using spark streaming with kafka... In local machine (start as java
application without using spark-submit) it's work, connect to kafka and do
the job (*). I tried to put into spark docker container (hadoop 2.6, spark
1.3.1, try spark submit wil local[5] and yarn-client too ) but I'm out of
success...

No error on the console (the application started), I see something received
from kafka but the result is not written out to elasticsearch...

Where can I start the debug? I see in the spark console two job, both 0/1...

Thanks
--
Skype: boci13, Hangout: boci.b...@gmail.com


Re: Strange ClassNotFound exeption

2015-05-24 Thread boci
Yeah, I have same jar with same result, I run in docker container and I
using same docker container with my another project... the only difference
is the postgresql jdbc driver and the custom RDD... no additional
dependencies (both single jar generated with same assembly configuration
with same dependencies) and the second is work like a charm

Another idea?
2015. máj. 24. 2:41 ezt írta ("Ted Yu" ):

> In my local maven repo, I found:
>
> $ jar tvf
> /Users/tyu/.m2/repository//org/spark-project/akka/akka-actor_2.10/2.3.4-spark/akka-actor_2.10-2.3.4-spark.jar
> | grep SelectionPath
>521 Mon Sep 29 12:05:36 PDT 2014 akka/actor/SelectionPathElement.class
>
> Is the above jar in your classpath ?
>
> On Sat, May 23, 2015 at 5:05 PM, boci  wrote:
>
>> Hi guys!
>>
>> I have a small spark application. It's query some data from postgres,
>> enrich it and write to elasticsearch. When I deployed into spark container
>> I got a very fustrating error:
>> https://gist.github.com/b0c1/66527e00bada1e4c0dc3
>>
>> Spark version: 1.3.1
>> Hadoop version: 2.6.0
>> Additional info:
>>   serialization: kryo
>>   rdd: custom rdd to query
>>
>> I not understand
>> 1. akka.actor.SelectionPath doesn't exists in 1.3.1
>> 2. I checked all dependencies in my project, I only have
>> "org.spark-project.akka:akka-*_2.10:2.3.4-spark:jar" doesn't have
>> 3. I not found any reference for this...
>> 4. I created own RDD, it's work, but I need to register to kryo? (mapRow
>> using ResultSet, I need to create
>> 5. I used some months ago and it's already worked with spark 1.2... I
>> recompiled with 1.3.1 but I got this strange error
>>
>> Any idea?
>>
>>
>> --
>> Skype: boci13, Hangout: boci.b...@gmail.com
>>
>
>


Strange ClassNotFound exeption

2015-05-23 Thread boci
Hi guys!

I have a small spark application. It's query some data from postgres,
enrich it and write to elasticsearch. When I deployed into spark container
I got a very fustrating error:
https://gist.github.com/b0c1/66527e00bada1e4c0dc3

Spark version: 1.3.1
Hadoop version: 2.6.0
Additional info:
  serialization: kryo
  rdd: custom rdd to query

I not understand
1. akka.actor.SelectionPath doesn't exists in 1.3.1
2. I checked all dependencies in my project, I only have
"org.spark-project.akka:akka-*_2.10:2.3.4-spark:jar" doesn't have
3. I not found any reference for this...
4. I created own RDD, it's work, but I need to register to kryo? (mapRow
using ResultSet, I need to create
5. I used some months ago and it's already worked with spark 1.2... I
recompiled with 1.3.1 but I got this strange error

Any idea?

--
Skype: boci13, Hangout: boci.b...@gmail.com


Re: Standalone spark

2015-02-25 Thread boci
Thanks dude... I think I will pull up a docker container for integration
test

--
Skype: boci13, Hangout: boci.b...@gmail.com

On Thu, Feb 26, 2015 at 12:22 AM, Sean Owen  wrote:

> Yes, been on the books for a while ...
> https://issues.apache.org/jira/browse/SPARK-2356
> That one just may always be a known 'gotcha' in Windows; it's kind of
> a Hadoop gotcha. I don't know that Spark 100% works on Windows and it
> isn't tested on Windows.
>
> On Wed, Feb 25, 2015 at 11:05 PM, boci  wrote:
> > Thanks your fast answer...
> > in windows it's not working, because hadoop (surprise suprise) need
> > winutils.exe. Without this it's not working, but if you not set the
> hadoop
> > directory You simply get
> >
> > 15/02/26 00:03:16 ERROR Shell: Failed to locate the winutils binary in
> the
> > hadoop binary path
> > java.io.IOException: Could not locate executable null\bin\winutils.exe in
> > the Hadoop binaries.
> >
> > b0c1
> >
> >
> >
> --
> > Skype: boci13, Hangout: boci.b...@gmail.com
> >
> > On Wed, Feb 25, 2015 at 11:50 PM, Sean Owen  wrote:
> >>
> >> Spark and Hadoop should be listed as 'provided' dependency in your
> >> Maven or SBT build. But that should make it available at compile time.
> >>
> >> On Wed, Feb 25, 2015 at 10:42 PM, boci  wrote:
> >> > Hi,
> >> >
> >> > I have a little question. I want to develop a spark based application,
> >> > but
> >> > spark depend to hadoop-client library. I think it's not necessary
> (spark
> >> > standalone) so I excluded from sbt file.. the result is interesting.
> My
> >> > trait where I create the spark context not compiled.
> >> >
> >> > The error:
> >> > ...
> >> >  scala.reflect.internal.Types$TypeError: bad symbolic reference. A
> >> > signature
> >> > in SparkContext.class refers to term mapred
> >> > [error] in package org.apache.hadoop which is not available.
> >> > [error] It may be completely missing from the current classpath, or
> the
> >> > version on
> >> > [error] the classpath might be incompatible with the version used when
> >> > compiling SparkContext.class.
> >> > ...
> >> >
> >> > I used this class for integration test. I'm using windows and I don't
> >> > want
> >> > to using hadoop for integration test. How can I solve this?
> >> >
> >> > Thanks
> >> > Janos
> >> >
> >
> >
>


Re: Standalone spark

2015-02-25 Thread boci
Thanks your fast answer...
in windows it's not working, because hadoop (surprise suprise) need
winutils.exe. Without this it's not working, but if you not set the hadoop
directory You simply get

15/02/26 00:03:16 ERROR Shell: Failed to locate the winutils binary in the
hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in
the Hadoop binaries.

b0c1


--
Skype: boci13, Hangout: boci.b...@gmail.com

On Wed, Feb 25, 2015 at 11:50 PM, Sean Owen  wrote:

> Spark and Hadoop should be listed as 'provided' dependency in your
> Maven or SBT build. But that should make it available at compile time.
>
> On Wed, Feb 25, 2015 at 10:42 PM, boci  wrote:
> > Hi,
> >
> > I have a little question. I want to develop a spark based application,
> but
> > spark depend to hadoop-client library. I think it's not necessary (spark
> > standalone) so I excluded from sbt file.. the result is interesting. My
> > trait where I create the spark context not compiled.
> >
> > The error:
> > ...
> >  scala.reflect.internal.Types$TypeError: bad symbolic reference. A
> signature
> > in SparkContext.class refers to term mapred
> > [error] in package org.apache.hadoop which is not available.
> > [error] It may be completely missing from the current classpath, or the
> > version on
> > [error] the classpath might be incompatible with the version used when
> > compiling SparkContext.class.
> > ...
> >
> > I used this class for integration test. I'm using windows and I don't
> want
> > to using hadoop for integration test. How can I solve this?
> >
> > Thanks
> > Janos
> >
>


Standalone spark

2015-02-25 Thread boci
Hi,

I have a little question. I want to develop a spark based application, but
spark depend to hadoop-client library. I think it's not necessary (spark
standalone) so I excluded from sbt file.. the result is interesting. My
trait where I create the spark context not compiled.

The error:
...
 scala.reflect.internal.Types$TypeError: bad symbolic reference. A
signature in SparkContext.class refers to term mapred
[error] in package org.apache.hadoop which is not available.
[error] It may be completely missing from the current classpath, or the
version on
[error] the classpath might be incompatible with the version used when
compiling SparkContext.class.
...

I used this class for integration test. I'm using windows and I don't want
to using hadoop for integration test. How can I solve this?

Thanks
Janos


Re: MLLib beginner question

2014-12-23 Thread boci
Xiangrui: Hi, I want to using this with streaming and with job too. I using
kafka (streaming) and elasticsearch (job) as source and want to calculate
sentiment value from the input text.
Simon: great, you have any doc how can I embed into my application without
using the http interface? (how can I direct call the service?)

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.com

On Tue, Dec 23, 2014 at 1:35 AM, Xiangrui Meng  wrote:

> How big is the dataset you want to use in prediction? -Xiangrui
>
> On Mon, Dec 22, 2014 at 1:47 PM, boci  wrote:
> > Hi!
> >
> > I want to try out spark mllib in my spark project, but I got a little
> > problem. I have training data (external file), but the real data com from
> > another rdd. How can I do that?
> > I try to simple using same SparkContext to boot rdd (first I create rdd
> > using sc.textFile() and after NaiveBayes.train... After that I want to
> fetch
> > the real data using same context and internal the map using the predict.
> But
> > My application never exit (I think stucked or something). Why not work
> this
> > solution?
> >
> > Thanks
> >
> > b0c1
> >
> >
> >
> --
> > Skype: boci13, Hangout: boci.b...@gmail.com
>


MLLib beginner question

2014-12-22 Thread boci
Hi!

I want to try out spark mllib in my spark project, but I got a little
problem. I have training data (external file), but the real data com from
another rdd. How can I do that?
I try to simple using same SparkContext to boot rdd (first I create rdd
using sc.textFile() and after NaiveBayes.train... After that I want to
fetch the real data using same context and internal the map using the
predict. But My application never exit (I think stucked or something). Why
not work this solution?

Thanks

b0c1


--
Skype: boci13, Hangout: boci.b...@gmail.com


Re: Out of any idea

2014-07-20 Thread boci
Hi i created a demo input.
https://gist.github.com/b0c1/e3721af839feec433b56#file-gistfile1-txt-L10

As you see in line 10 the json received (json/string nevermind)
After that everything is ok, except the processing not started...

Any idea?
Please help guys... I doesn't have any idea what I miss... dependency?
wrong spark version?
No error in the log just the processing is not started...



--
Skype: boci13, Hangout: boci.b...@gmail.com


On Sun, Jul 20, 2014 at 4:58 AM, Krishna Sankar  wrote:

> Probably you have - if not, try a very simple app in the docker container
> and make sure it works. Sometimes resource contention/allocation can get in
> the way. This happened to me in the YARN container.
> Also try single worker thread.
> Cheers
> 
>
>
> On Sat, Jul 19, 2014 at 2:39 PM, boci  wrote:
>
>> Hi guys!
>>
>> I run out of ideas... I created a spark streaming job (kafka -> spark ->
>> ES).
>> If I start my app local machine (inside the editor, but connect to the
>> real kafka and ES) the application work correctly.
>> If I start it in my docker container (same kafka and ES, local mode
>> (local[4]) like inside my editor) the application connect to kafka, receive
>> the message but after that nothing happened (I put config/log4j.properties
>> to debug mode and I see BlockGenerator receive the data bu after that
>> nothing happened with that.
>> (first step I simply run a map to print the received data with log4j)
>>
>> I hope somebody can help... :(
>>
>> b0c1
>>
>> --
>> Skype: boci13, Hangout: boci.b...@gmail.com
>>
>
>


Out of any idea

2014-07-19 Thread boci
Hi guys!

I run out of ideas... I created a spark streaming job (kafka -> spark ->
ES).
If I start my app local machine (inside the editor, but connect to the real
kafka and ES) the application work correctly.
If I start it in my docker container (same kafka and ES, local mode
(local[4]) like inside my editor) the application connect to kafka, receive
the message but after that nothing happened (I put config/log4j.properties
to debug mode and I see BlockGenerator receive the data bu after that
nothing happened with that.
(first step I simply run a map to print the received data with log4j)

I hope somebody can help... :(

b0c1
--
Skype: boci13, Hangout: boci.b...@gmail.com


Re: Uber jar with SBT

2014-07-19 Thread boci
Hi!

I using java7, I found the problem. I not run start and await termination
on streaming context, now it's work BUT
spark-submit never return (it's run in the foreground and receive the kafka
streams)... what I miss?
(I want to send the job to standalone cluster worker process)

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.com


On Sat, Jul 19, 2014 at 3:32 PM, Sean Owen  wrote:

> Are you building / running with Java 6? I imagine your .jar files has
> more than 65536 files, and Java 6 has various issues with jars this
> large. If possible, use Java 7 everywhere.
>
> https://issues.apache.org/jira/browse/SPARK-1520
>
> On Sat, Jul 19, 2014 at 2:30 PM, boci  wrote:
> > Hi Guys,
> >
> > I try to create spark uber jar with sbt but I have a lot of problem... I
> > want to use the following:
> > - Spark streaming
> > - Kafka
> > - Elsaticsearch
> > - HBase
> >
> > the current jar size is cca 60M and it's not working.
> > - When I deploy with spark-submit: It's running and exit without any
> error
> > - When I try to start with local[*]  mode, it's say:
> >  Exception in thread "main" java.lang.NoClassDefFoundError:
> > org/apache/spark/Logging
> > => but I start with java -cp /.../spark-assembly-1.0.1-hadoop2.2.0.jar
> -jar
> > my.jar
> >
> > Any idea how can solve this? (which lib required to set provided wich
> > required for run... later I want to run this jar in yarn cluster)
> >
> > b0c1
> >
> --
> > Skype: boci13, Hangout: boci.b...@gmail.com
>


Uber jar with SBT

2014-07-19 Thread boci
Hi Guys,

I try to create spark uber jar with sbt but I have a lot of problem... I
want to use the following:
- Spark streaming
- Kafka
- Elsaticsearch
- HBase

the current jar size is cca 60M and it's not working.
- When I deploy with spark-submit: It's running and exit without any error
- When I try to start with local[*]  mode, it's say:
 Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/spark/Logging
=> but I start with java -cp /.../spark-assembly-1.0.1-hadoop2.2.0.jar -jar
my.jar

Any idea how can solve this? (which lib required to set provided wich
required for run... later I want to run this jar in yarn cluster)

b0c1
--
Skype: boci13, Hangout: boci.b...@gmail.com


sbt + idea + test

2014-07-14 Thread boci
Hi guys,


I want to use Elasticsearch and HBase in my spark project, I want to create
a test. I pulled up ES and Zookeeper, but if I put "val htest = new
HBaseTestingUtility()" to my app I got a strange exception (compilation
time, not runtime).

https://gist.github.com/b0c1/4a4b3f6350816090c3b5

Any idea?

--
Skype: boci13, Hangout: boci.b...@gmail.com


Kafka/ES question

2014-06-29 Thread boci
Hi!

I try to use spark with kafka, everything is work but I found a little
problem. I create a small test application which connect to real kafka
cluster, send a message and read it back. It's work, but when I run my test
second time (send/read) it's read the first and the second stream (maybe
the offset is not stored in ZK or I dunno).

At the end I store the processed message in ES but I want to avoid the data
duplication. Any idea how can I solve this?"

b0c1
--
Skype: boci13, Hangout: boci.b...@gmail.com


Re: ElasticSearch enrich

2014-06-27 Thread boci
Thanks, more local thread solve the problem, it's work like a charm. How
many thread required?
Adrian: it's not public project but ask, and I will answer (if I can)...
maybe later I will create a demo project based on my solution.

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.com


On Fri, Jun 27, 2014 at 11:31 PM, Holden Karau  wrote:

> Try setting the master to local[4]
>
>
> On Fri, Jun 27, 2014 at 2:17 PM, boci  wrote:
>
>> This is a simply scalatest. I start a SparkConf, set the master to local
>> (set the serializer etc), pull up kafka and es connection send a message to
>> kafka and wait 30sec to processing.
>>
>> It's run in IDEA no magick trick.
>>
>> b0c1
>>
>>
>> --
>> Skype: boci13, Hangout: boci.b...@gmail.com
>>
>>
>> On Fri, Jun 27, 2014 at 11:11 PM, Holden Karau 
>> wrote:
>>
>>> So a few quick questions:
>>>
>>> 1) What cluster are you running this against? Is it just local? Have you
>>> tried local[4]?
>>> 2) When you say breakpoint, how are you setting this break point? There
>>> is a good chance your breakpoint mechanism doesn't work in a distributed
>>> environment, could you instead cause a side effect (like writing to a file)?
>>>
>>> Cheers,
>>>
>>> Holden :)
>>>
>>>
>>> On Fri, Jun 27, 2014 at 2:04 PM, boci  wrote:
>>>
>>>> Ok I found dynamic resources, but I have a frustrating problem. This is
>>>> the flow:
>>>> kafka -> enrich X -> enrich Y -> enrich Z -> foreachRDD -> save
>>>>
>>>> My problem is: if I do this it's not work, the enrich functions not
>>>> called, but if I put a print it's does. for example if I do this:
>>>> kafka -> enrich X -> enrich Y -> print -> enrich Z -> foreachRDD
>>>>
>>>> The enrich X and enrich Y called but enrich Z not
>>>> if I put the print after the enrich Z it's will be printed. How can I
>>>> solve this? (what can I do to call the foreachRDD I put breakpoint inside
>>>> the map function (where I'm generate the writable) but it's not called)
>>>>
>>>> Any idea?
>>>>
>>>> b0c1
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>
>>>>
>>>> On Fri, Jun 27, 2014 at 4:53 PM, boci  wrote:
>>>>
>>>>> Another question. In the foreachRDD I will initialize the JobConf, but
>>>>> in this place how can I get information from the items?
>>>>> I have an identifier in the data which identify the required ES index
>>>>> (so how can I set dynamic index in the foreachRDD) ?
>>>>>
>>>>> b0c1
>>>>>
>>>>>
>>>>> --
>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>
>>>>>
>>>>> On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau 
>>>>> wrote:
>>>>>
>>>>>> Just your luck I happened to be working on that very talk today :)
>>>>>> Let me know how your experiences with Elasticsearch & Spark go :)
>>>>>>
>>>>>>
>>>>>> On Thu, Jun 26, 2014 at 3:17 PM, boci  wrote:
>>>>>>
>>>>>>> Wow, thanks your fast answer, it's help a lot...
>>>>>>>
>>>>>>> b0c1
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau >>>>>&

Re: ElasticSearch enrich

2014-06-27 Thread boci
This is a simply scalatest. I start a SparkConf, set the master to local
(set the serializer etc), pull up kafka and es connection send a message to
kafka and wait 30sec to processing.

It's run in IDEA no magick trick.

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.com


On Fri, Jun 27, 2014 at 11:11 PM, Holden Karau  wrote:

> So a few quick questions:
>
> 1) What cluster are you running this against? Is it just local? Have you
> tried local[4]?
> 2) When you say breakpoint, how are you setting this break point? There is
> a good chance your breakpoint mechanism doesn't work in a distributed
> environment, could you instead cause a side effect (like writing to a file)?
>
> Cheers,
>
> Holden :)
>
>
> On Fri, Jun 27, 2014 at 2:04 PM, boci  wrote:
>
>> Ok I found dynamic resources, but I have a frustrating problem. This is
>> the flow:
>> kafka -> enrich X -> enrich Y -> enrich Z -> foreachRDD -> save
>>
>> My problem is: if I do this it's not work, the enrich functions not
>> called, but if I put a print it's does. for example if I do this:
>> kafka -> enrich X -> enrich Y -> print -> enrich Z -> foreachRDD
>>
>> The enrich X and enrich Y called but enrich Z not
>> if I put the print after the enrich Z it's will be printed. How can I
>> solve this? (what can I do to call the foreachRDD I put breakpoint inside
>> the map function (where I'm generate the writable) but it's not called)
>>
>> Any idea?
>>
>> b0c1
>>
>>
>>
>>
>> --
>> Skype: boci13, Hangout: boci.b...@gmail.com
>>
>>
>> On Fri, Jun 27, 2014 at 4:53 PM, boci  wrote:
>>
>>> Another question. In the foreachRDD I will initialize the JobConf, but
>>> in this place how can I get information from the items?
>>> I have an identifier in the data which identify the required ES index
>>> (so how can I set dynamic index in the foreachRDD) ?
>>>
>>> b0c1
>>>
>>>
>>> --
>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>
>>>
>>> On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau 
>>> wrote:
>>>
>>>> Just your luck I happened to be working on that very talk today :) Let
>>>> me know how your experiences with Elasticsearch & Spark go :)
>>>>
>>>>
>>>> On Thu, Jun 26, 2014 at 3:17 PM, boci  wrote:
>>>>
>>>>> Wow, thanks your fast answer, it's help a lot...
>>>>>
>>>>> b0c1
>>>>>
>>>>>
>>>>> --
>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>
>>>>>
>>>>> On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau 
>>>>> wrote:
>>>>>
>>>>>> Hi b0c1,
>>>>>>
>>>>>> I have an example of how to do this in the repo for my talk as well,
>>>>>> the specific example is at
>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
>>>>>> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
>>>>>> then call  saveAsHadoopDataset on the RDD that gets passed into the
>>>>>> function we provide to foreachRDD.
>>>>>>
>>>>>> e.g.
>>>>>>
>>>>>> stream.foreachRDD{(data, time) =>
>>>>>>  val jobconf = ...
>>>>>>  data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
>>>>>> }
>>>>>>
>>>>>> Hope that helps :)
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> Holden :)
>>>>>>
>>>>>>
>>>>>> On Thu, Jun 26, 2014 at 2:23 PM, boci  wrote:
>>>>>>
>>>>>>> Thanks. I without local option I can connect with es remote, now I
>>>>>>> only have one problem. How can I u

Re: ElasticSearch enrich

2014-06-27 Thread boci
Ok I found dynamic resources, but I have a frustrating problem. This is the
flow:
kafka -> enrich X -> enrich Y -> enrich Z -> foreachRDD -> save

My problem is: if I do this it's not work, the enrich functions not called,
but if I put a print it's does. for example if I do this:
kafka -> enrich X -> enrich Y -> print -> enrich Z -> foreachRDD

The enrich X and enrich Y called but enrich Z not
if I put the print after the enrich Z it's will be printed. How can I solve
this? (what can I do to call the foreachRDD I put breakpoint inside the map
function (where I'm generate the writable) but it's not called)

Any idea?

b0c1



--
Skype: boci13, Hangout: boci.b...@gmail.com


On Fri, Jun 27, 2014 at 4:53 PM, boci  wrote:

> Another question. In the foreachRDD I will initialize the JobConf, but in
> this place how can I get information from the items?
> I have an identifier in the data which identify the required ES index (so
> how can I set dynamic index in the foreachRDD) ?
>
> b0c1
>
>
> --
> Skype: boci13, Hangout: boci.b...@gmail.com
>
>
> On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau 
> wrote:
>
>> Just your luck I happened to be working on that very talk today :) Let me
>> know how your experiences with Elasticsearch & Spark go :)
>>
>>
>> On Thu, Jun 26, 2014 at 3:17 PM, boci  wrote:
>>
>>> Wow, thanks your fast answer, it's help a lot...
>>>
>>> b0c1
>>>
>>>
>>> --
>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>
>>>
>>> On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau 
>>> wrote:
>>>
>>>> Hi b0c1,
>>>>
>>>> I have an example of how to do this in the repo for my talk as well,
>>>> the specific example is at
>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
>>>> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
>>>> then call  saveAsHadoopDataset on the RDD that gets passed into the
>>>> function we provide to foreachRDD.
>>>>
>>>> e.g.
>>>>
>>>> stream.foreachRDD{(data, time) =>
>>>>  val jobconf = ...
>>>>  data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
>>>> }
>>>>
>>>> Hope that helps :)
>>>>
>>>> Cheers,
>>>>
>>>> Holden :)
>>>>
>>>>
>>>> On Thu, Jun 26, 2014 at 2:23 PM, boci  wrote:
>>>>
>>>>> Thanks. I without local option I can connect with es remote, now I
>>>>> only have one problem. How can I use elasticsearch-hadoop with spark
>>>>> streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my
>>>>> second problem the output index is depend by the input data.
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>> --
>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>
>>>>>
>>>>> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <
>>>>> nick.pentre...@gmail.com> wrote:
>>>>>
>>>>>> You can just add elasticsearch-hadoop as a dependency to your project
>>>>>> to user the ESInputFormat and ESOutputFormat (
>>>>>> https://github.com/elasticsearch/elasticsearch-hadoop). Some other
>>>>>> basics here:
>>>>>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html
>>>>>>
>>>>>> For testing, yes I think you will need to start ES in local mode
>>>>>> (just ./bin/elasticsearch) and use the default config (host = localhost,
>>>>>> port = 9200).
>>>>>>
>>>>>>
>>>>>> On Thu, Jun 26, 2014 at 9:04 AM, boci  wrote:
>>>>>>
>>>>>>> That's okay, but hadoop has ES integration. what happened if I run
>>>>>>>

Re: ElasticSearch enrich

2014-06-27 Thread boci
Another question. In the foreachRDD I will initialize the JobConf, but in
this place how can I get information from the items?
I have an identifier in the data which identify the required ES index (so
how can I set dynamic index in the foreachRDD) ?

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.com


On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau  wrote:

> Just your luck I happened to be working on that very talk today :) Let me
> know how your experiences with Elasticsearch & Spark go :)
>
>
> On Thu, Jun 26, 2014 at 3:17 PM, boci  wrote:
>
>> Wow, thanks your fast answer, it's help a lot...
>>
>> b0c1
>>
>>
>> --
>> Skype: boci13, Hangout: boci.b...@gmail.com
>>
>>
>> On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau 
>> wrote:
>>
>>> Hi b0c1,
>>>
>>> I have an example of how to do this in the repo for my talk as well, the
>>> specific example is at
>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
>>> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
>>> then call  saveAsHadoopDataset on the RDD that gets passed into the
>>> function we provide to foreachRDD.
>>>
>>> e.g.
>>>
>>> stream.foreachRDD{(data, time) =>
>>>  val jobconf = ...
>>>  data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
>>> }
>>>
>>> Hope that helps :)
>>>
>>> Cheers,
>>>
>>> Holden :)
>>>
>>>
>>> On Thu, Jun 26, 2014 at 2:23 PM, boci  wrote:
>>>
>>>> Thanks. I without local option I can connect with es remote, now I only
>>>> have one problem. How can I use elasticsearch-hadoop with spark streaming?
>>>> I mean DStream doesn't have "saveAsHadoopFiles" method, my second problem
>>>> the output index is depend by the input data.
>>>>
>>>> Thanks
>>>>
>>>>
>>>> --
>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>
>>>>
>>>> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <
>>>> nick.pentre...@gmail.com> wrote:
>>>>
>>>>> You can just add elasticsearch-hadoop as a dependency to your project
>>>>> to user the ESInputFormat and ESOutputFormat (
>>>>> https://github.com/elasticsearch/elasticsearch-hadoop). Some other
>>>>> basics here:
>>>>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html
>>>>>
>>>>> For testing, yes I think you will need to start ES in local mode (just
>>>>> ./bin/elasticsearch) and use the default config (host = localhost, port =
>>>>> 9200).
>>>>>
>>>>>
>>>>> On Thu, Jun 26, 2014 at 9:04 AM, boci  wrote:
>>>>>
>>>>>> That's okay, but hadoop has ES integration. what happened if I run
>>>>>> saveAsHadoopFile without hadoop (or I must need to pull up hadoop
>>>>>> programatically? (if I can))
>>>>>>
>>>>>> b0c1
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>>
>>>>>>
>>>>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau 
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci  wrote:
>>>>>>>
>>>>>>>> Hi guys, thanks the direction now I have some problem/question:
>>>>>>>> - in local (test) mode I want to use ElasticClient.local to create
>>>>>>>> es connection, but in prodution I want to use ElasticClient.remote, to 
>>>>>>>> this
>>>>>>>> I want to pass ElasticClient to mapPartitions, or what is the best
>>>>>&

Re: ElasticSearch enrich

2014-06-26 Thread boci
Wow, thanks your fast answer, it's help a lot...

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.com


On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau  wrote:

> Hi b0c1,
>
> I have an example of how to do this in the repo for my talk as well, the
> specific example is at
> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
> then call  saveAsHadoopDataset on the RDD that gets passed into the
> function we provide to foreachRDD.
>
> e.g.
>
> stream.foreachRDD{(data, time) =>
>  val jobconf = ...
>  data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
> }
>
> Hope that helps :)
>
> Cheers,
>
> Holden :)
>
>
> On Thu, Jun 26, 2014 at 2:23 PM, boci  wrote:
>
>> Thanks. I without local option I can connect with es remote, now I only
>> have one problem. How can I use elasticsearch-hadoop with spark streaming?
>> I mean DStream doesn't have "saveAsHadoopFiles" method, my second problem
>> the output index is depend by the input data.
>>
>> Thanks
>>
>>
>> --
>> Skype: boci13, Hangout: boci.b...@gmail.com
>>
>>
>> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <
>> nick.pentre...@gmail.com> wrote:
>>
>>> You can just add elasticsearch-hadoop as a dependency to your project to
>>> user the ESInputFormat and ESOutputFormat (
>>> https://github.com/elasticsearch/elasticsearch-hadoop). Some other
>>> basics here:
>>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html
>>>
>>> For testing, yes I think you will need to start ES in local mode (just
>>> ./bin/elasticsearch) and use the default config (host = localhost, port =
>>> 9200).
>>>
>>>
>>> On Thu, Jun 26, 2014 at 9:04 AM, boci  wrote:
>>>
>>>> That's okay, but hadoop has ES integration. what happened if I run
>>>> saveAsHadoopFile without hadoop (or I must need to pull up hadoop
>>>> programatically? (if I can))
>>>>
>>>> b0c1
>>>>
>>>>
>>>> --
>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>
>>>>
>>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau 
>>>> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci  wrote:
>>>>>
>>>>>> Hi guys, thanks the direction now I have some problem/question:
>>>>>> - in local (test) mode I want to use ElasticClient.local to create es
>>>>>> connection, but in prodution I want to use ElasticClient.remote, to this 
>>>>>> I
>>>>>> want to pass ElasticClient to mapPartitions, or what is the best
>>>>>> practices?
>>>>>>
>>>>> In this case you probably want to make the ElasticClient inside of
>>>>> mapPartitions (since it isn't serializable) and if you want to use a
>>>>> different client in local mode just have a flag that control what type of
>>>>> client you create.
>>>>>
>>>>>> - my stream output is write into elasticsearch. How can I
>>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment?
>>>>>>
>>>>>>
>>>>> - After store the enriched data into ES, I want to generate aggregated
>>>>>> data (EsInputFormat) how can I test it in local?
>>>>>>
>>>>> I think the simplest thing to do would be use the same client in mode
>>>>> and just start single node elastic search cluster.
>>>>>
>>>>>>
>>>>>> Thanks guys
>>>>>>
>>>>>> b0c1
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Skype: boci13, Hangout: boci.b...@gmail.c

Re: ElasticSearch enrich

2014-06-26 Thread boci
Thanks. I without local option I can connect with es remote, now I only
have one problem. How can I use elasticsearch-hadoop with spark streaming?
I mean DStream doesn't have "saveAsHadoopFiles" method, my second problem
the output index is depend by the input data.

Thanks

--
Skype: boci13, Hangout: boci.b...@gmail.com


On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath 
wrote:

> You can just add elasticsearch-hadoop as a dependency to your project to
> user the ESInputFormat and ESOutputFormat (
> https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics
> here:
> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html
>
> For testing, yes I think you will need to start ES in local mode (just
> ./bin/elasticsearch) and use the default config (host = localhost, port =
> 9200).
>
>
> On Thu, Jun 26, 2014 at 9:04 AM, boci  wrote:
>
>> That's okay, but hadoop has ES integration. what happened if I run
>> saveAsHadoopFile without hadoop (or I must need to pull up hadoop
>> programatically? (if I can))
>>
>> b0c1
>>
>>
>> --
>> Skype: boci13, Hangout: boci.b...@gmail.com
>>
>>
>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau 
>> wrote:
>>
>>>
>>>
>>> On Wed, Jun 25, 2014 at 4:16 PM, boci  wrote:
>>>
>>>> Hi guys, thanks the direction now I have some problem/question:
>>>> - in local (test) mode I want to use ElasticClient.local to create es
>>>> connection, but in prodution I want to use ElasticClient.remote, to this I
>>>> want to pass ElasticClient to mapPartitions, or what is the best
>>>> practices?
>>>>
>>> In this case you probably want to make the ElasticClient inside of
>>> mapPartitions (since it isn't serializable) and if you want to use a
>>> different client in local mode just have a flag that control what type of
>>> client you create.
>>>
>>>> - my stream output is write into elasticsearch. How can I
>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment?
>>>>
>>>>
>>> - After store the enriched data into ES, I want to generate aggregated
>>>> data (EsInputFormat) how can I test it in local?
>>>>
>>> I think the simplest thing to do would be use the same client in mode
>>> and just start single node elastic search cluster.
>>>
>>>>
>>>> Thanks guys
>>>>
>>>> b0c1
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>
>>>>
>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau 
>>>> wrote:
>>>>
>>>>> So I'm giving a talk at the Spark summit on using Spark &
>>>>> ElasticSearch, but for now if you want to see a simple demo which uses
>>>>> elasticsearch for geo input you can take a look at my quick & dirty
>>>>> implementation with TopTweetsInALocation (
>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
>>>>> ). This approach uses the ESInputFormat which avoids the difficulty of
>>>>> having to manually create ElasticSearch clients.
>>>>>
>>>>> This approach might not work for your data, e.g. if you need to create
>>>>> a query for each record in your RDD. If this is the case, you could 
>>>>> instead
>>>>> look at using mapPartitions and setting up your Elasticsearch connection
>>>>> inside of that, so you could then re-use the client for all of the queries
>>>>> on each partition. This approach will avoid having to serialize the
>>>>> Elasticsearch connection because it will be local to your function.
>>>>>
>>>>> Hope this helps!
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Holden :)
>>>>>
>>>>>
>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <
>>>>> mayur.rust...@gmail.com> wrote:
>>&g

Re: ElasticSearch enrich

2014-06-26 Thread boci
That's okay, but hadoop has ES integration. what happened if I run
saveAsHadoopFile without hadoop (or I must need to pull up hadoop
programatically? (if I can))

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.com


On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau  wrote:

>
>
> On Wed, Jun 25, 2014 at 4:16 PM, boci  wrote:
>
>> Hi guys, thanks the direction now I have some problem/question:
>> - in local (test) mode I want to use ElasticClient.local to create es
>> connection, but in prodution I want to use ElasticClient.remote, to this I
>> want to pass ElasticClient to mapPartitions, or what is the best
>> practices?
>>
> In this case you probably want to make the ElasticClient inside of
> mapPartitions (since it isn't serializable) and if you want to use a
> different client in local mode just have a flag that control what type of
> client you create.
>
>> - my stream output is write into elasticsearch. How can I
>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment?
>>
> - After store the enriched data into ES, I want to generate aggregated
>> data (EsInputFormat) how can I test it in local?
>>
> I think the simplest thing to do would be use the same client in mode and
> just start single node elastic search cluster.
>
>>
>> Thanks guys
>>
>> b0c1
>>
>>
>>
>>
>> --
>> Skype: boci13, Hangout: boci.b...@gmail.com
>>
>>
>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau 
>> wrote:
>>
>>> So I'm giving a talk at the Spark summit on using Spark & ElasticSearch,
>>> but for now if you want to see a simple demo which uses elasticsearch for
>>> geo input you can take a look at my quick & dirty implementation with
>>> TopTweetsInALocation (
>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
>>> ). This approach uses the ESInputFormat which avoids the difficulty of
>>> having to manually create ElasticSearch clients.
>>>
>>> This approach might not work for your data, e.g. if you need to create a
>>> query for each record in your RDD. If this is the case, you could instead
>>> look at using mapPartitions and setting up your Elasticsearch connection
>>> inside of that, so you could then re-use the client for all of the queries
>>> on each partition. This approach will avoid having to serialize the
>>> Elasticsearch connection because it will be local to your function.
>>>
>>> Hope this helps!
>>>
>>> Cheers,
>>>
>>> Holden :)
>>>
>>>
>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi 
>>> wrote:
>>>
>>>> Its not used as default serializer for some issues with compatibility &
>>>> requirement to register the classes..
>>>>
>>>> Which part are you getting as nonserializable... you need to serialize
>>>> that class if you are sending it to spark workers inside a map, reduce ,
>>>> mappartition or any of the operations on RDD.
>>>>
>>>>
>>>> Mayur Rustagi
>>>> Ph: +1 (760) 203 3257
>>>> http://www.sigmoidanalytics.com
>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>>
>>>>
>>>>
>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng  wrote:
>>>>
>>>>> I'm afraid persisting connection across two tasks is a dangerous act
>>>>> as they
>>>>> can't be guaranteed to be executed on the same machine. Your ES server
>>>>> may
>>>>> think its a man-in-the-middle attack!
>>>>>
>>>>> I think its possible to invoke a static method that give you a
>>>>> connection in
>>>>> a local 'pool', so nothing will sneak into your closure, but its too
>>>>> complex
>>>>> and there should be a better option.
>>>>>
>>>>> Never use kryo before, if its that good perhaps we should use it as the
>>>>> default serializer
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Cell : 425-233-8271
>>>
>>
>>
>
>
> --
> Cell : 425-233-8271
>


Re: ElasticSearch enrich

2014-06-25 Thread boci
Hi guys, thanks the direction now I have some problem/question:
- in local (test) mode I want to use ElasticClient.local to create es
connection, but in prodution I want to use ElasticClient.remote, to this I
want to pass ElasticClient to mapPartitions, or what is the best practices?
- my stream output is write into elasticsearch. How can I
test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment?
- After store the enriched data into ES, I want to generate aggregated data
(EsInputFormat) how can I test it in local?

Thanks guys

b0c1



--
Skype: boci13, Hangout: boci.b...@gmail.com


On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau  wrote:

> So I'm giving a talk at the Spark summit on using Spark & ElasticSearch,
> but for now if you want to see a simple demo which uses elasticsearch for
> geo input you can take a look at my quick & dirty implementation with
> TopTweetsInALocation (
> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
> ). This approach uses the ESInputFormat which avoids the difficulty of
> having to manually create ElasticSearch clients.
>
> This approach might not work for your data, e.g. if you need to create a
> query for each record in your RDD. If this is the case, you could instead
> look at using mapPartitions and setting up your Elasticsearch connection
> inside of that, so you could then re-use the client for all of the queries
> on each partition. This approach will avoid having to serialize the
> Elasticsearch connection because it will be local to your function.
>
> Hope this helps!
>
> Cheers,
>
> Holden :)
>
>
> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi 
> wrote:
>
>> Its not used as default serializer for some issues with compatibility &
>> requirement to register the classes..
>>
>> Which part are you getting as nonserializable... you need to serialize
>> that class if you are sending it to spark workers inside a map, reduce ,
>> mappartition or any of the operations on RDD.
>>
>>
>> Mayur Rustagi
>> Ph: +1 (760) 203 3257
>> http://www.sigmoidanalytics.com
>> @mayur_rustagi 
>>
>>
>>
>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng  wrote:
>>
>>> I'm afraid persisting connection across two tasks is a dangerous act as
>>> they
>>> can't be guaranteed to be executed on the same machine. Your ES server
>>> may
>>> think its a man-in-the-middle attack!
>>>
>>> I think its possible to invoke a static method that give you a
>>> connection in
>>> a local 'pool', so nothing will sneak into your closure, but its too
>>> complex
>>> and there should be a better option.
>>>
>>> Never use kryo before, if its that good perhaps we should use it as the
>>> default serializer
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>
>>
>
>
> --
> Cell : 425-233-8271
>


Re: ElasticSearch enrich

2014-06-24 Thread boci
I using elastic4s inside my ESWorker class. ESWorker now only contain two
field, host:String, port:Int. Now Inside the "findNearestCity" method I
create ElasticClient (elastic4s) connection. What's wrong with my class? I
need to serialize ElasticClient? mappartition is sounds good but I still
got NotSerializableException, or I must mar kit to transient? and where
come the host and port in this case?

my worker:

class ESWorker(val host: String, val port: Int) {
  def findNearestCity(geo: Position): Option[City] = {
 //Here I create ElasticClient connection and execute queries
  }
  def enrichGeoInternal(data:Data):Data = {
 data.location=findNearestCity(data.position)
  }
  def enrichGeo(ds: DStream[Data]): DStream[Data] = {
 ds.map(enrichGeoInternal)
  }
}



--
Skype: boci13, Hangout: boci.b...@gmail.com


On Wed, Jun 25, 2014 at 1:03 AM, Mayur Rustagi 
wrote:

> Mostly ES client is not serializable for you. You can do 3 workarounds,
> 1. Switch to kryo serialization, register the client in kryo , might solve
> your serialization issue
> 2. Use mappartition for all your data & initialize your client in the
> mappartition code, this will create client for each partition, reduce some
> parallelism & add some overhead of creation of client but prevent
> serialization of esclient & transfer to workers
> 3. Use serializablewrapper to serialize your ESclient manually & send it
> across & deserialize it manually, this may or may not work depending on
> whether your class is safely serializable.
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>
>
>
> On Wed, Jun 25, 2014 at 4:12 AM, boci  wrote:
>
>> Hi guys,
>>
>> I have a small question. I want to create a "Worker" class which using
>> ElasticClient to make query to elasticsearch. (I want to enrich my data
>> with geo search result).
>>
>> How can I do that? I try to create a worker instance with ES host/port
>> parameter but spark throw an exceptino (my class not serializable).
>>
>> Any idea?
>>
>> Thanks
>> b0c1
>>
>>
>


Re: ElasticSearch enrich

2014-06-24 Thread boci
Ok but in this case where can I store the ES connection? Or all document
create new ES connection inside the worker?

--
Skype: boci13, Hangout: boci.b...@gmail.com


On Wed, Jun 25, 2014 at 1:01 AM, Peng Cheng  wrote:

> make sure all queries are called through class methods and wrap your query
> info with a class having only simple properties (strings, collections etc).
> If you can't find such wrapper you can also use SerializableWritable
> wrapper
> out-of-the-box, but its not recommended. (developer-api and make fat
> closures that run slowly)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8214.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


ElasticSearch enrich

2014-06-24 Thread boci
Hi guys,

I have a small question. I want to create a "Worker" class which using
ElasticClient to make query to elasticsearch. (I want to enrich my data
with geo search result).

How can I do that? I try to create a worker instance with ES host/port
parameter but spark throw an exceptino (my class not serializable).

Any idea?

Thanks
b0c1