weightCol doesn't seem to be handled properly in PySpark

2016-09-07 Thread evanzamir
When I am trying to use LinearRegression, it seems that unless there is a
column specified with weights, it will raise a py4j error. Seems odd because
supposedly the default is weightCol=None, but when I specifically pass in
weightCol=None to LinearRegression, I get this error.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/weightCol-doesn-t-seem-to-be-handled-properly-in-PySpark-tp27677.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to write data into CouchBase using Spark & Scala?

2016-09-07 Thread Devi P.V
Thanks.Now it is working.

On Thu, Sep 8, 2016 at 12:57 AM, aka.fe2s  wrote:

> Most likely you are missing an import statement that enables some Scala
> implicits. I haven't used this connector, but looks like you need "import
> com.couchbase.spark._"
>
> --
> Oleksiy Dyagilev
>
> On Wed, Sep 7, 2016 at 9:42 AM, Devi P.V  wrote:
>
>> I am newbie in CouchBase.I am trying to write data into CouchBase.My
>> sample code is following,
>>
>> val cfg = new SparkConf()
>>   .setAppName("couchbaseQuickstart")
>>   .setMaster("local[*]")
>>   .set("com.couchbase.bucket.MyBucket","pwd")
>>
>> val sc = new SparkContext(cfg)
>> val doc1 = JsonDocument.create("doc1", JsonObject.create().put("some", 
>> "content"))
>> val doc2 = JsonArrayDocument.create("doc2", JsonArray.from("more", 
>> "content", "in", "here"))
>>
>> val data = sc
>>   .parallelize(Seq(doc1, doc2))
>>
>> But I can't access data.saveToCouchbase().
>>
>> I am using Spark 1.6.1 & Scala 2.11.8
>>
>> I gave following dependencies in built.sbt
>>
>> libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "1.6.1"
>> libraryDependencies += "com.couchbase.client" % "spark-connector_2.11" % 
>> "1.2.1"
>>
>>
>> How can I write data into CouchBase using Spark & Scala?
>>
>>
>>
>>
>>
>


Forecasting algorithms in spark ML

2016-09-07 Thread Madabhattula Rajesh Kumar
Hi,

Please let me know supported Forecasting algorithms in spark ML

Regards,
Rajesh


Re: MLib : Non Linear Optimization

2016-09-07 Thread nsareen
Any answer to this question group ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MLib-Non-Linear-Optimization-tp27645p27676.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: distribute work (files)

2016-09-07 Thread Peter Figliozzi
It works!  Hmm, smells like some kind of linux permissions issue. Checking
this, the owner & group are the same all around, and there is global read
permission as well.  So I have no clue why it would not work with an sshfs
mounted volume.

Back to OPs question... use Spark's CSV data source instead of calling
textFile like I originally suggested.  See this StackOverflow
.


Good to know this is an option.  I use Cassandra for my data source and am
not running Hadoop (no reason to thus far).

Can anyone get this to work with an sshfs mounted share?

On Wed, Sep 7, 2016 at 8:48 PM, ayan guha  wrote:

> So, can you try to simulate the same without sshfs? ie, create a folder on
> /tmp/datashare and copy your files on all the machines and point
> sc.textFiles to that folder?
>
>
> On Thu, Sep 8, 2016 at 11:26 AM, Peter Figliozzi  > wrote:
>
>> All (three) of them.  It's kind of cool-- when I re-run collect() a different
>> executor will show up as first to encounter the error.
>>
>> On Wed, Sep 7, 2016 at 8:20 PM, ayan guha  wrote:
>>
>>> Hi
>>>
>>> Is it happening on all executors or one?
>>>
>>> On Thu, Sep 8, 2016 at 10:46 AM, Peter Figliozzi <
>>> pete.figlio...@gmail.com> wrote:
>>>

 Yes indeed (see below).  Just to reiterate, I am not running Hadoop.
 The "curly" node name mentioned in the stacktrace is the name of one of the
 worker nodes.  I've mounted the same directory "datashare" with two text
 files to all worker nodes with sshfs.  The Spark documentation suggests
 that this should work:

 *If using a path on the local filesystem, the file must also be
 accessible at the same path on worker nodes. Either copy the file to all
 workers or use a network-mounted shared file system.*

 I was hoping someone else could try this and see if it works.

 Here's what I did to generate the error:

 val data = sc.textFile("file:///home/peter/datashare/*.txt")
 data.collect()

 It's working to some extent because if I put a bogus path in, I'll get
 a different (correct) error (InvalidInputException: Input Pattern
 file:/home/peter/ddatashare/*.txt matches 0 files).

 Here's the stack trace when I use a valid path:

 org.apache.spark.SparkException: Job aborted due to stage failure:
 Task 1 in stage 18.0 failed 4 times, most recent failure: Lost task 1.3 in
 stage 18.0 (TID 792, curly): java.io.FileNotFoundException: File
 file:/home/peter/datashare/f1.txt does not exist
 at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileSta
 tus(RawLocalFileSystem.java:609)
 at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInt
 ernal(RawLocalFileSystem.java:822)
 at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLoc
 alFileSystem.java:599)
 at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFi
 leSystem.java:421)
 at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputCheck
 er.(ChecksumFileSystem.java:140)
 at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSys
 tem.java:341)
 at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
 at org.apache.hadoop.mapred.LineRecordReader.(LineRecordR
 eader.java:109)
 at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(Tex
 tInputFormat.java:67)
 at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:246)
 at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:209)
 at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
 DD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
 at org.apache.spark.scheduler.Task.run(Task.scala:85)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.s
 cala:274)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
 Executor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
 lExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)


 On Wed, Sep 7, 2016 at 9:50 AM, Yong Zhang 
 wrote:

> What error do you get? FileNotFoundException?
>
>
> Please paste the stacktrace here.
>
>
> Yong
>
>
> --
> *From:* Peter Figliozzi 
> *Sent:* Wednesday, September 7, 2016 10:18 AM
> *To:* ayan guha
> *Cc:* Lydia Ickler; user.spark
> *Subject:* Re: 

Re: distribute work (files)

2016-09-07 Thread ayan guha
So, can you try to simulate the same without sshfs? ie, create a folder on
/tmp/datashare and copy your files on all the machines and point
sc.textFiles to that folder?


On Thu, Sep 8, 2016 at 11:26 AM, Peter Figliozzi 
wrote:

> All (three) of them.  It's kind of cool-- when I re-run collect() a different
> executor will show up as first to encounter the error.
>
> On Wed, Sep 7, 2016 at 8:20 PM, ayan guha  wrote:
>
>> Hi
>>
>> Is it happening on all executors or one?
>>
>> On Thu, Sep 8, 2016 at 10:46 AM, Peter Figliozzi <
>> pete.figlio...@gmail.com> wrote:
>>
>>>
>>> Yes indeed (see below).  Just to reiterate, I am not running Hadoop.
>>> The "curly" node name mentioned in the stacktrace is the name of one of the
>>> worker nodes.  I've mounted the same directory "datashare" with two text
>>> files to all worker nodes with sshfs.  The Spark documentation suggests
>>> that this should work:
>>>
>>> *If using a path on the local filesystem, the file must also be
>>> accessible at the same path on worker nodes. Either copy the file to all
>>> workers or use a network-mounted shared file system.*
>>>
>>> I was hoping someone else could try this and see if it works.
>>>
>>> Here's what I did to generate the error:
>>>
>>> val data = sc.textFile("file:///home/peter/datashare/*.txt")
>>> data.collect()
>>>
>>> It's working to some extent because if I put a bogus path in, I'll get a
>>> different (correct) error (InvalidInputException: Input Pattern
>>> file:/home/peter/ddatashare/*.txt matches 0 files).
>>>
>>> Here's the stack trace when I use a valid path:
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 1 in stage 18.0 failed 4 times, most recent failure: Lost task 1.3 in stage
>>> 18.0 (TID 792, curly): java.io.FileNotFoundException: File
>>> file:/home/peter/datashare/f1.txt does not exist
>>> at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileSta
>>> tus(RawLocalFileSystem.java:609)
>>> at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInt
>>> ernal(RawLocalFileSystem.java:822)
>>> at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLoc
>>> alFileSystem.java:599)
>>> at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFi
>>> leSystem.java:421)
>>> at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputCheck
>>> er.(ChecksumFileSystem.java:140)
>>> at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSys
>>> tem.java:341)
>>> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
>>> at org.apache.hadoop.mapred.LineRecordReader.(LineRecordR
>>> eader.java:109)
>>> at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(Tex
>>> tInputFormat.java:67)
>>> at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:246)
>>> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:209)
>>> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>>> DD.scala:38)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:85)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>> Executor.java:1142)
>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>> lExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>> On Wed, Sep 7, 2016 at 9:50 AM, Yong Zhang  wrote:
>>>
 What error do you get? FileNotFoundException?


 Please paste the stacktrace here.


 Yong


 --
 *From:* Peter Figliozzi 
 *Sent:* Wednesday, September 7, 2016 10:18 AM
 *To:* ayan guha
 *Cc:* Lydia Ickler; user.spark
 *Subject:* Re: distribute work (files)

 That's failing for me.  Can someone please try this-- is this even
 supposed to work:

- create a directory somewhere and add two text files to it
- mount that directory on the Spark worker machines with sshfs
- read the textfiles into one datas structure using a file URL with
a wildcard

 Thanks,

 Pete

 On Tue, Sep 6, 2016 at 11:20 PM, ayan guha  wrote:

> To access local file, try with file:// URI.
>
> On Wed, Sep 7, 2016 at 8:52 AM, Peter Figliozzi <
> pete.figlio...@gmail.com> wrote:
>
>> This is a great question.  Basically you don't have to worry about
>> the details-- just give a wildcard in your call to textFile.  See
>> the Programming Guide
>> 

Re: No SparkR on Mesos?

2016-09-07 Thread Rodrick Brown
We've been using SparkR on Mesos for quite sometime with no issues.


[fedora@prod-rstudio-1 ~]$ /opt/spark-1.6.1/bin/sparkR

R version 3.3.0 (2016-05-03) -- "Supposedly Educational"
Copyright (C) 2016 The R Foundation for Statistical Computing
Platform: x86_64-redhat-linux-gnu (64-bit)

R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under certain conditions.
Type 'license()' or 'licence()' for distribution details.

  Natural language support but running in an English locale

R is a collaborative project with many contributors.
Type 'contributors()' for more information and
'citation()' on how to cite R or R packages in publications.

Type 'demo()' for some demos, 'help()' for on-line help, or
'help.start()' for an HTML browser interface to help.
Type 'q()' to quit R.

Launching java with spark-submit command /opt/spark-1.6.1/bin/spark-submit
  "sparkr-shell" /tmp/Rtmphk5zxe/backend_port11f8414240b65
16/09/08 01:44:04 INFO SparkContext: Running Spark version 1.6.1
16/09/08 01:44:04 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
16/09/08 01:44:05 INFO SecurityManager: Changing view acls to: fedora
16/09/08 01:44:05 INFO SecurityManager: Changing modify acls to: fedora
16/09/08 01:44:05 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(fedora); users
with modify permissions: Set(fedora)
16/09/08 01:44:05 INFO Utils: Successfully started service 'sparkDriver' on
port 39193.
16/09/08 01:44:05 INFO Slf4jLogger: Slf4jLogger started
16/09/08 01:44:05 INFO Remoting: Starting remoting
16/09/08 01:44:05 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriverActorSystem@172.1.34.13:44212]
16/09/08 01:44:05 INFO Utils: Successfully started service
'sparkDriverActorSystem' on port 44212.
16/09/08 01:44:05 INFO SparkEnv: Registering MapOutputTracker
16/09/08 01:44:05 INFO SparkEnv: Registering BlockManagerMaster
16/09/08 01:44:05 INFO DiskBlockManager: Created local directory at
/home/fedora/spark-tmp-73604/blockmgr-2928edf7-635e-45ca-83ed-8dc1de50b141
16/09/08 01:44:05 INFO MemoryStore: MemoryStore started with capacity 3.4 GB
16/09/08 01:44:05 INFO SparkEnv: Registering OutputCommitCoordinator
16/09/08 01:44:05 INFO Utils: Successfully started service 'SparkUI' on
port 4040.
16/09/08 01:44:05 INFO SparkUI: Started SparkUI at http://172.1.34.13:4040
16/09/08 01:44:06 INFO Executor: Starting executor ID driver on host
localhost
16/09/08 01:44:06 INFO Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port 45678.
16/09/08 01:44:06 INFO NettyBlockTransferService: Server created on 45678
16/09/08 01:44:06 INFO BlockManager: external shuffle service port = 31338
16/09/08 01:44:06 INFO BlockManagerMaster: Trying to register BlockManager
16/09/08 01:44:06 INFO BlockManagerMasterEndpoint: Registering block
manager localhost:45678 with 3.4 GB RAM, BlockManagerId(driver, localhost,
45678)
16/09/08 01:44:06 INFO BlockManagerMaster: Registered BlockManager

 Welcome to
  __
   / __/__  ___ _/ /__
  _\ \/ _ \/ _ `/ __/  '_/
 /___/ .__/\_,_/_/ /_/\_\   version  1.6.1
/_/


 Spark context is available as sc, SQL context is available as sqlContext
>



On Wed, Sep 7, 2016 at 8:02 AM, Peter Griessl  wrote:

> Hello,
>
>
>
> does SparkR really not work (yet?) on Mesos (Spark 2.0 on Mesos 1.0)?
>
>
>
> $ /opt/spark/bin/sparkR
>
>
>
> R version 3.3.1 (2016-06-21) -- "Bug in Your Hair"
>
> Copyright (C) 2016 The R Foundation for Statistical Computing
>
> Platform: x86_64-pc-linux-gnu (64-bit)
>
> Launching java with spark-submit command /opt/spark/bin/spark-submit
> "sparkr-shell" /tmp/RtmpPYVJxF/backend_port338581f434
>
> Error: *SparkR is not supported for Mesos cluster*.
>
> Error in sparkR.sparkContext(master, appName, sparkHome, sparkConfigMap,  :
>
>   JVM is not ready after 10 seconds
>
>
>
>
>
> I couldn’t find any information on this subject in the docs – am I missing
> something?
>
>
>
> Thanks for any hints,
>
> Peter
>



-- 

[image: Orchard Platform] 

*Rodrick Brown */ *DevOPs*

9174456839 / rodr...@orchardplatform.com

Orchard Platform
101 5th Avenue, 4th Floor, New York, NY

-- 
*NOTICE TO RECIPIENTS*: This communication is confidential and intended for 
the use of the addressee only. If you are not an intended recipient of this 
communication, please delete it immediately and notify the sender by return 
email. Unauthorized reading, dissemination, distribution or copying of this 
communication is prohibited. This communication does not constitute an 
offer to sell or a solicitation of an indication of interest to purchase 
any loan, security or any other financial product or instrument, nor is it 
an offer to sell or a solicitation of an indication of interest to purchase 
any products 

Re: distribute work (files)

2016-09-07 Thread Peter Figliozzi
All (three) of them.  It's kind of cool-- when I re-run collect() a different
executor will show up as first to encounter the error.

On Wed, Sep 7, 2016 at 8:20 PM, ayan guha  wrote:

> Hi
>
> Is it happening on all executors or one?
>
> On Thu, Sep 8, 2016 at 10:46 AM, Peter Figliozzi  > wrote:
>
>>
>> Yes indeed (see below).  Just to reiterate, I am not running Hadoop.  The
>> "curly" node name mentioned in the stacktrace is the name of one of the
>> worker nodes.  I've mounted the same directory "datashare" with two text
>> files to all worker nodes with sshfs.  The Spark documentation suggests
>> that this should work:
>>
>> *If using a path on the local filesystem, the file must also be
>> accessible at the same path on worker nodes. Either copy the file to all
>> workers or use a network-mounted shared file system.*
>>
>> I was hoping someone else could try this and see if it works.
>>
>> Here's what I did to generate the error:
>>
>> val data = sc.textFile("file:///home/peter/datashare/*.txt")
>> data.collect()
>>
>> It's working to some extent because if I put a bogus path in, I'll get a
>> different (correct) error (InvalidInputException: Input Pattern
>> file:/home/peter/ddatashare/*.txt matches 0 files).
>>
>> Here's the stack trace when I use a valid path:
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 1 in stage 18.0 failed 4 times, most recent failure: Lost task 1.3 in stage
>> 18.0 (TID 792, curly): java.io.FileNotFoundException: File
>> file:/home/peter/datashare/f1.txt does not exist
>> at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileSta
>> tus(RawLocalFileSystem.java:609)
>> at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInt
>> ernal(RawLocalFileSystem.java:822)
>> at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLoc
>> alFileSystem.java:599)
>> at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFi
>> leSystem.java:421)
>> at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputCheck
>> er.(ChecksumFileSystem.java:140)
>> at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSys
>> tem.java:341)
>> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
>> at org.apache.hadoop.mapred.LineRecordReader.(LineRecordR
>> eader.java:109)
>> at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(Tex
>> tInputFormat.java:67)
>> at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:246)
>> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:209)
>> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>> at org.apache.spark.scheduler.Task.run(Task.scala:85)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>> On Wed, Sep 7, 2016 at 9:50 AM, Yong Zhang  wrote:
>>
>>> What error do you get? FileNotFoundException?
>>>
>>>
>>> Please paste the stacktrace here.
>>>
>>>
>>> Yong
>>>
>>>
>>> --
>>> *From:* Peter Figliozzi 
>>> *Sent:* Wednesday, September 7, 2016 10:18 AM
>>> *To:* ayan guha
>>> *Cc:* Lydia Ickler; user.spark
>>> *Subject:* Re: distribute work (files)
>>>
>>> That's failing for me.  Can someone please try this-- is this even
>>> supposed to work:
>>>
>>>- create a directory somewhere and add two text files to it
>>>- mount that directory on the Spark worker machines with sshfs
>>>- read the textfiles into one datas structure using a file URL with
>>>a wildcard
>>>
>>> Thanks,
>>>
>>> Pete
>>>
>>> On Tue, Sep 6, 2016 at 11:20 PM, ayan guha  wrote:
>>>
 To access local file, try with file:// URI.

 On Wed, Sep 7, 2016 at 8:52 AM, Peter Figliozzi <
 pete.figlio...@gmail.com> wrote:

> This is a great question.  Basically you don't have to worry about the
> details-- just give a wildcard in your call to textFile.  See the 
> Programming
> Guide  section
> entitled "External Datasets".  The Spark framework will distribute your
> data across the workers.  Note that:
>
> *If using a path on the local filesystem, the file must also be
>> accessible at the same path on worker nodes. Either copy the file to all
>> workers or use a network-mounted shared 

Re: distribute work (files)

2016-09-07 Thread ayan guha
Hi

Is it happening on all executors or one?

On Thu, Sep 8, 2016 at 10:46 AM, Peter Figliozzi 
wrote:

>
> Yes indeed (see below).  Just to reiterate, I am not running Hadoop.  The
> "curly" node name mentioned in the stacktrace is the name of one of the
> worker nodes.  I've mounted the same directory "datashare" with two text
> files to all worker nodes with sshfs.  The Spark documentation suggests
> that this should work:
>
> *If using a path on the local filesystem, the file must also be accessible
> at the same path on worker nodes. Either copy the file to all workers or
> use a network-mounted shared file system.*
>
> I was hoping someone else could try this and see if it works.
>
> Here's what I did to generate the error:
>
> val data = sc.textFile("file:///home/peter/datashare/*.txt")
> data.collect()
>
> It's working to some extent because if I put a bogus path in, I'll get a
> different (correct) error (InvalidInputException: Input Pattern
> file:/home/peter/ddatashare/*.txt matches 0 files).
>
> Here's the stack trace when I use a valid path:
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
> in stage 18.0 failed 4 times, most recent failure: Lost task 1.3 in stage
> 18.0 (TID 792, curly): java.io.FileNotFoundException: File
> file:/home/peter/datashare/f1.txt does not exist
> at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileSta
> tus(RawLocalFileSystem.java:609)
> at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInt
> ernal(RawLocalFileSystem.java:822)
> at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLoc
> alFileSystem.java:599)
> at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFi
> leSystem.java:421)
> at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputCheck
> er.(ChecksumFileSystem.java:140)
> at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSys
> tem.java:341)
> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
> at org.apache.hadoop.mapred.LineRecordReader.(LineRecordR
> eader.java:109)
> at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(Tex
> tInputFormat.java:67)
> at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:246)
> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:209)
> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
> DD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
> Executor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
> lExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> On Wed, Sep 7, 2016 at 9:50 AM, Yong Zhang  wrote:
>
>> What error do you get? FileNotFoundException?
>>
>>
>> Please paste the stacktrace here.
>>
>>
>> Yong
>>
>>
>> --
>> *From:* Peter Figliozzi 
>> *Sent:* Wednesday, September 7, 2016 10:18 AM
>> *To:* ayan guha
>> *Cc:* Lydia Ickler; user.spark
>> *Subject:* Re: distribute work (files)
>>
>> That's failing for me.  Can someone please try this-- is this even
>> supposed to work:
>>
>>- create a directory somewhere and add two text files to it
>>- mount that directory on the Spark worker machines with sshfs
>>- read the textfiles into one datas structure using a file URL with a
>>wildcard
>>
>> Thanks,
>>
>> Pete
>>
>> On Tue, Sep 6, 2016 at 11:20 PM, ayan guha  wrote:
>>
>>> To access local file, try with file:// URI.
>>>
>>> On Wed, Sep 7, 2016 at 8:52 AM, Peter Figliozzi <
>>> pete.figlio...@gmail.com> wrote:
>>>
 This is a great question.  Basically you don't have to worry about the
 details-- just give a wildcard in your call to textFile.  See the 
 Programming
 Guide  section
 entitled "External Datasets".  The Spark framework will distribute your
 data across the workers.  Note that:

 *If using a path on the local filesystem, the file must also be
> accessible at the same path on worker nodes. Either copy the file to all
> workers or use a network-mounted shared file system.*


 In your case this would mean the directory of files.

 Curiously, I cannot get this to work when I mount a directory with
 sshfs on all of my worker nodes.  It says "file not found" even though
 the file clearly exists in the specified path on all workers.   Anyone care
 to try 

Fwd: distribute work (files)

2016-09-07 Thread Peter Figliozzi
Yes indeed (see below).  Just to reiterate, I am not running Hadoop.  The
"curly" node name mentioned in the stacktrace is the name of one of the
worker nodes.  I've mounted the same directory "datashare" with two text
files to all worker nodes with sshfs.  The Spark documentation suggests
that this should work:

*If using a path on the local filesystem, the file must also be accessible
at the same path on worker nodes. Either copy the file to all workers or
use a network-mounted shared file system.*

I was hoping someone else could try this and see if it works.

Here's what I did to generate the error:

val data = sc.textFile("file:///home/peter/datashare/*.txt")
data.collect()

It's working to some extent because if I put a bogus path in, I'll get a
different (correct) error (InvalidInputException: Input Pattern
file:/home/peter/ddatashare/*.txt matches 0 files).

Here's the stack trace when I use a valid path:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
in stage 18.0 failed 4 times, most recent failure: Lost task 1.3 in stage
18.0 (TID 792, curly): java.io.FileNotFoundException: File
file:/home/peter/datashare/f1.txt does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(
RawLocalFileSystem.java:609)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(
RawLocalFileSystem.java:822)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(
RawLocalFileSystem.java:599)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(
FilterFileSystem.java:421)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.(
ChecksumFileSystem.java:140)
at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:341)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
at org.apache.hadoop.mapred.LineRecordReader.(
LineRecordReader.java:109)
at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(
TextInputFormat.java:67)
at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:246)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:209)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


On Wed, Sep 7, 2016 at 9:50 AM, Yong Zhang  wrote:

> What error do you get? FileNotFoundException?
>
>
> Please paste the stacktrace here.
>
>
> Yong
>
>
> --
> *From:* Peter Figliozzi 
> *Sent:* Wednesday, September 7, 2016 10:18 AM
> *To:* ayan guha
> *Cc:* Lydia Ickler; user.spark
> *Subject:* Re: distribute work (files)
>
> That's failing for me.  Can someone please try this-- is this even
> supposed to work:
>
>- create a directory somewhere and add two text files to it
>- mount that directory on the Spark worker machines with sshfs
>- read the textfiles into one datas structure using a file URL with a
>wildcard
>
> Thanks,
>
> Pete
>
> On Tue, Sep 6, 2016 at 11:20 PM, ayan guha  wrote:
>
>> To access local file, try with file:// URI.
>>
>> On Wed, Sep 7, 2016 at 8:52 AM, Peter Figliozzi > > wrote:
>>
>>> This is a great question.  Basically you don't have to worry about the
>>> details-- just give a wildcard in your call to textFile.  See the 
>>> Programming
>>> Guide  section
>>> entitled "External Datasets".  The Spark framework will distribute your
>>> data across the workers.  Note that:
>>>
>>> *If using a path on the local filesystem, the file must also be
 accessible at the same path on worker nodes. Either copy the file to all
 workers or use a network-mounted shared file system.*
>>>
>>>
>>> In your case this would mean the directory of files.
>>>
>>> Curiously, I cannot get this to work when I mount a directory with sshfs
>>> on all of my worker nodes.  It says "file not found" even though the file
>>> clearly exists in the specified path on all workers.   Anyone care to try
>>> and comment on this?
>>>
>>> Thanks,
>>>
>>> Pete
>>>
>>> On Tue, Sep 6, 2016 at 9:51 AM, Lydia Ickler 
>>> wrote:
>>>
 Hi,

 maybe this is a stupid question:

 I have a list of files. Each file I want to take as an input for a
 ML-algorithm. All files are 

year out of range

2016-09-07 Thread Daniel Lopes
Hi,

I'm* importing a few CSV*s with spark-csv package,
Always when I give a select at each one looks ok
But when i join then with sqlContext.sql give me this error

all tables has fields timestamp

joins are not with this dates


*Py4JJavaError: An error occurred while calling o643.showString.*
: org.apache.spark.SparkException: Job aborted due to stage failure: Task
54 in stage 92.0 failed 10 times, most recent failure: Lost task 54.9 in
stage 92.0 (TID 6356, yp-spark-dal09-env5-0036):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File
"/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/pyspark.zip/pyspark/worker.py",
line 111, in main
process()
  File
"/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/pyspark.zip/pyspark/worker.py",
line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File
"/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/pyspark.zip/pyspark/serializers.py",
line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
  File
"/usr/local/src/spark160master/spark/python/pyspark/sql/functions.py", line
1563, in 
func = lambda _, it: map(lambda x: returnType.toInternal(f(*x)), it)
  File
"/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/pyspark.zip/pyspark/sql/types.py",
line 191, in toInternal
else time.mktime(dt.timetuple()))
*ValueError: year out of range  *

Any one knows this problem?

Best,

*Daniel Lopes*
Chief Data and Analytics Officer | OneMatch
c: +55 (18) 99764-2733 | https://www.linkedin.com/in/dslopes

www.onematch.com.br



collect_set without nulls (1.6 vs 2.0)

2016-09-07 Thread Lee Becker
Hello everyone,

Consider this toy example:

case class Foo(x: String, y: String)
val df = sparkSession.createDataFrame(Array(Foo(null), Foo("a"), Foo("b"))
df.select(collect_set($"x")).show

In Spark 2.0.0 I get the following results:

+--+
|collect_set(x)|
+--+
|  [null, b, a]|
+--+

In 1.6.* the same collect_set produces:

+--+
|collect_set(x)|
+--+
|  [b, a]|
+--+

Is there any way to get this aggregation to ignore nulls?  I understand the
trivial way would be to filter on x beforehand, but in my actual use case
I'm calling the collect_set in withColumn over a window specification, so I
want empty arrays on rows with nulls.

For now I'm using this hack of a workaround:

val removenulls = udf((l: scala.collection.mutable.WrappedArray[String]) =>
l.filter(x=>x != null))
f.select(removenulls(collect_set($"x"))).show


Any suggestions are appreciated.

Thanks,
Lee


Re: dstream.foreachRDD iteration

2016-09-07 Thread Mich Talebzadeh
I am sure few Spark gurus can explain this much better than me. So here we
go.

A DStream is an abstraction that breaks a continuous stream of data into
small chunks. This is called "micro-batching" and Spark streaming is all
about micro-batching

You have batch interval, windows length and sliding window. So each batch
interval  streaming is passed to Spark for further processing, in the form
of an RDD. So there is only one RDD produced for each DStream at each batch
interval. That is exactly what the doc says.

An RDD is a set of pointers to where the actual data is in a cluster. So
RDD is not the data but a construct.

Now DStream.foreachRDD is basically an output operator. It allows you to
access the underlying RDDs of the DStream to execute actions that do
something practical with the data. For example, work out the incoming
pricing, choose a security and recommend that as a good buy etc. Then of
course you can push that particular data to HDFS if you wished.

This RDD may have many many rows of pricing for many securities (IBM,
Microsoft, Oracle, JPM etc) so at this stage it is all in one RDD with many
elements.

In our example the DStream.foreachRDD gives an RDD [Security, Prices], not
a single Security and its price. To access single elements of the
collection, we need to further operate on the RDD. So this becomes:



 dstream.foreachRDD { pricesRDD =>  // Loop over RDD
   val x= pricesRDD.count
   if (x > 0)  // RDD has data
   {
 for(line <- pricesRDD.collect.toArray) // Look for each record in
the RDD
 {
   var index = line._2.split(',').view(0).toInt  // That is
the index
   var timestamp = line._2.split(',').view(1).toString   // This is
the timestamp from source
   var security =  line._2.split(',').view(12.toString   // This is
the name of the security
   var price = line._2.split(',').view(3).toFloat// This is
the price of the security
   if (price.toFloat > 90.0)
   {
// Do something here
// Sent notification, write to HDFS etc
   }
 }
   }
 }


I trust that this makes it clearer.














Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 7 September 2016 at 18:13, Ashok Kumar 
wrote:

> I have checked that doc sir.
>
> My understand every batch interval of data always generates one RDD, So
> why do we need to use foreachRDD when there is only one.
>
> Sorry for this question but bit confusing me.
>
> Thanks
>
>
>
>
> On Wednesday, 7 September 2016, 18:05, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>
> Hi,
>
> What is so confusing about RDD. Have you checked this doc?
>
> http://spark.apache.org/docs/latest/streaming-programming-
> guide.html#design-patterns-for-using-foreachrdd
>
> HTH
>
> Dr Mich Talebzadeh
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
> http://talebzadehmich.wordpress.com
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
> On 7 September 2016 at 11:39, Ashok Kumar 
> wrote:
>
> Hi,
>
> A bit confusing to me
>
> How many layers involved in DStream.foreachRDD.
>
> Do I need to loop over it more than once? I mean  DStream.foreachRDD{ rdd
> = > }
>
> I am trying to get individual lines in RDD.
>
> Thanks
>
>
>
>
>


Re: No SparkR on Mesos?

2016-09-07 Thread Timothy Chen
Python should be supported as I tested it, patches should already be merged 
1.6.2.

Tim

> On Sep 8, 2016, at 1:20 AM, Michael Gummelt  wrote:
> 
> Quite possibly.  I've never used it.  I know Python was "unsupported" for a 
> while, which turned out to mean there was a silly conditional that would fail 
> the submission, even though all the support was there.  Could be the same for 
> R.  Can you submit a JIRA?
> 
>> On Wed, Sep 7, 2016 at 5:02 AM, Peter Griessl  wrote:
>> Hello,
>> 
>>  
>> 
>> does SparkR really not work (yet?) on Mesos (Spark 2.0 on Mesos 1.0)?
>> 
>>  
>> 
>> $ /opt/spark/bin/sparkR
>> 
>>  
>> 
>> R version 3.3.1 (2016-06-21) -- "Bug in Your Hair"
>> 
>> Copyright (C) 2016 The R Foundation for Statistical Computing
>> 
>> Platform: x86_64-pc-linux-gnu (64-bit)
>> 
>> Launching java with spark-submit command /opt/spark/bin/spark-submit   
>> "sparkr-shell" /tmp/RtmpPYVJxF/backend_port338581f434
>> 
>> Error: SparkR is not supported for Mesos cluster.
>> 
>> Error in sparkR.sparkContext(master, appName, sparkHome, sparkConfigMap,  :
>> 
>>   JVM is not ready after 10 seconds
>> 
>>  
>> 
>>  
>> 
>> I couldn’t find any information on this subject in the docs – am I missing 
>> something?
>> 
>>  
>> 
>> Thanks for any hints,
>> 
>> Peter
>> 
> 
> 
> 
> -- 
> Michael Gummelt
> Software Engineer
> Mesosphere


Re: Spark 2.0 with Kafka 0.10 exception

2016-09-07 Thread Cody Koeninger
It's a really noticeable overhead, without the cache you're basically
pulling every message twice due to prefetching.

On Wed, Sep 7, 2016 at 3:23 PM, Srikanth  wrote:
> Yea, disabling cache was not going to be my permanent solution either.
> I was going to ask how big an overhead is that?
>
> It happens intermittently and each time it happens retry is successful.
>
> Srikanth
>
> On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger  wrote:
>>
>> That's not what I would have expected to happen with a lower cache
>> setting, but in general disabling the cache isn't something you want
>> to do with the new kafka consumer.
>>
>>
>> As far as the original issue, are you seeing those polling errors
>> intermittently, or consistently?  From your description, it sounds
>> like retry is working correctly.
>>
>>
>> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth  wrote:
>> > Setting those two results in below exception.
>> > No.of executors < no.of partitions. Could that be triggering this?
>> >
>> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0
>> > (TID 9)
>> > java.util.ConcurrentModificationException: KafkaConsumer is not safe for
>> > multi-threaded access
>> > at
>> >
>> > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1430)
>> > at
>> >
>> > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360)
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
>> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source)
>> > at java.util.HashMap.putVal(Unknown Source)
>> > at java.util.HashMap.put(Unknown Source)
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:158)
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:210)
>> > at
>> > org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:185)
>> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> > at
>> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> > at
>> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> > at
>> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> > at
>> >
>> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>> > at
>> >
>> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>> > at org.apache.spark.scheduler.Task.run(Task.scala:85)
>> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>> > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>> > at java.lang.Thread.run(Unknown Source)
>> >
>> >
>> > On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger 
>> > wrote:
>> >>
>> >> you could try setting
>> >>
>> >> spark.streaming.kafka.consumer.cache.initialCapacity
>> >>
>> >> spark.streaming.kafka.consumer.cache.maxCapacity
>> >>
>> >> to 1
>> >>
>> >> On Wed, Sep 7, 2016 at 2:02 PM, Srikanth  wrote:
>> >> > I had a look at the executor logs and noticed that this exception
>> >> > happens
>> >> > only when using the cached consumer.
>> >> > Every retry is successful. This is consistent.
>> >> > One possibility is that the cached consumer is causing the failure as
>> >> > retry
>> >> > clears it.
>> >> > Is there a way to disable cache and test this?
>> >> > Again, kafkacat is running fine on the same node.
>> >> >
>> >> > 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0 (TID
>> >> > 7849)
>> >> > 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0 (TID
>> >> > 7851
>> >> >
>> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition
>> >> > 2
>> >> > offsets 57079162 -> 57090330
>> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition
>> >> > 0
>> >> > offsets 57098866 -> 57109957
>> >> > 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0
>> >> > (TID
>> >> > 7851). 1030 bytes result sent to driver
>> >> > 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage
>> >> > 138.0
>> >> > (TID
>> >> > 7849)
>> >> > java.lang.AssertionError: assertion failed: Failed to get records for
>> >> > spark-executor-StreamingPixelCount1 mt_event 0 57100069 after polling
>> >> > for
>> >> > 2048
>> >> >   at 

Re: Spark 2.0 with Kafka 0.10 exception

2016-09-07 Thread Srikanth
Yea, disabling cache was not going to be my permanent solution either.
I was going to ask how big an overhead is that?

It happens intermittently and each time it happens retry is successful.

Srikanth

On Wed, Sep 7, 2016 at 3:55 PM, Cody Koeninger  wrote:

> That's not what I would have expected to happen with a lower cache
> setting, but in general disabling the cache isn't something you want
> to do with the new kafka consumer.
>
>
> As far as the original issue, are you seeing those polling errors
> intermittently, or consistently?  From your description, it sounds
> like retry is working correctly.
>
>
> On Wed, Sep 7, 2016 at 2:37 PM, Srikanth  wrote:
> > Setting those two results in below exception.
> > No.of executors < no.of partitions. Could that be triggering this?
> >
> > 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0
> (TID 9)
> > java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> > multi-threaded access
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1430)
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.close(
> KafkaConsumer.java:1360)
> > at
> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$
> anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
> > at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source)
> > at java.util.HashMap.putVal(Unknown Source)
> > at java.util.HashMap.put(Unknown Source)
> > at
> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.
> get(CachedKafkaConsumer.scala:158)
> > at
> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(
> KafkaRDD.scala:210)
> > at org.apache.spark.streaming.kafka010.KafkaRDD.compute(
> KafkaRDD.scala:185)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> > at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> > at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> > at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> > at
> > org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:79)
> > at
> > org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:47)
> > at org.apache.spark.scheduler.Task.run(Task.scala:85)
> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> > at java.lang.Thread.run(Unknown Source)
> >
> >
> > On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger 
> wrote:
> >>
> >> you could try setting
> >>
> >> spark.streaming.kafka.consumer.cache.initialCapacity
> >>
> >> spark.streaming.kafka.consumer.cache.maxCapacity
> >>
> >> to 1
> >>
> >> On Wed, Sep 7, 2016 at 2:02 PM, Srikanth  wrote:
> >> > I had a look at the executor logs and noticed that this exception
> >> > happens
> >> > only when using the cached consumer.
> >> > Every retry is successful. This is consistent.
> >> > One possibility is that the cached consumer is causing the failure as
> >> > retry
> >> > clears it.
> >> > Is there a way to disable cache and test this?
> >> > Again, kafkacat is running fine on the same node.
> >> >
> >> > 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0 (TID
> >> > 7849)
> >> > 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0 (TID
> >> > 7851
> >> >
> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 2
> >> > offsets 57079162 -> 57090330
> >> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 0
> >> > offsets 57098866 -> 57109957
> >> > 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0 (TID
> >> > 7851). 1030 bytes result sent to driver
> >> > 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage 138.0
> >> > (TID
> >> > 7849)
> >> > java.lang.AssertionError: assertion failed: Failed to get records for
> >> > spark-executor-StreamingPixelCount1 mt_event 0 57100069 after polling
> >> > for
> >> > 2048
> >> >   at scala.Predef$.assert(Predef.scala:170)
> >> >   at
> >> >
> >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:74)
> >> >   at
> >> >
> >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:227)
> >> >   at
> >> >
> >> > 

Re: Spark 2.0 with Kafka 0.10 exception

2016-09-07 Thread Cody Koeninger
That's not what I would have expected to happen with a lower cache
setting, but in general disabling the cache isn't something you want
to do with the new kafka consumer.


As far as the original issue, are you seeing those polling errors
intermittently, or consistently?  From your description, it sounds
like retry is working correctly.


On Wed, Sep 7, 2016 at 2:37 PM, Srikanth  wrote:
> Setting those two results in below exception.
> No.of executors < no.of partitions. Could that be triggering this?
>
> 16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0 (TID 9)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1430)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
> at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source)
> at java.util.HashMap.putVal(Unknown Source)
> at java.util.HashMap.put(Unknown Source)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:158)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:210)
> at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:185)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> at java.lang.Thread.run(Unknown Source)
>
>
> On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger  wrote:
>>
>> you could try setting
>>
>> spark.streaming.kafka.consumer.cache.initialCapacity
>>
>> spark.streaming.kafka.consumer.cache.maxCapacity
>>
>> to 1
>>
>> On Wed, Sep 7, 2016 at 2:02 PM, Srikanth  wrote:
>> > I had a look at the executor logs and noticed that this exception
>> > happens
>> > only when using the cached consumer.
>> > Every retry is successful. This is consistent.
>> > One possibility is that the cached consumer is causing the failure as
>> > retry
>> > clears it.
>> > Is there a way to disable cache and test this?
>> > Again, kafkacat is running fine on the same node.
>> >
>> > 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0 (TID
>> > 7849)
>> > 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0 (TID
>> > 7851
>> >
>> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 2
>> > offsets 57079162 -> 57090330
>> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 0
>> > offsets 57098866 -> 57109957
>> > 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0 (TID
>> > 7851). 1030 bytes result sent to driver
>> > 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage 138.0
>> > (TID
>> > 7849)
>> > java.lang.AssertionError: assertion failed: Failed to get records for
>> > spark-executor-StreamingPixelCount1 mt_event 0 57100069 after polling
>> > for
>> > 2048
>> >   at scala.Predef$.assert(Predef.scala:170)
>> >   at
>> >
>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>> >   at
>> >
>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>> >   at
>> >
>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>> >
>> > 16/09/07 16:00:02 INFO CoarseGrainedExecutorBackend: Got assigned task
>> > 7854
>> > 16/09/07 16:00:02 INFO Executor: Running task 1.1 in stage 138.0 (TID
>> > 7854)
>> > 16/09/07 16:00:02 INFO KafkaRDD: Computing topic mt_event, partition 0
>> > offsets 57098866 -> 57109957
>> > 16/09/07 16:00:02 INFO CachedKafkaConsumer: Initial fetch for
>> > spark-executor-StreamingPixelCount1 mt_event 0 57098866
>> >
>> > 16/09/07 16:00:03 INFO Executor: Finished task 1.1 in stage 138.0 

Re: Spark 2.0 with Kafka 0.10 exception

2016-09-07 Thread Srikanth
Setting those two results in below exception.
No.of executors < no.of partitions. Could that be triggering this?

16/09/07 15:33:13 ERROR Executor: Exception in task 2.0 in stage 2.0 (TID 9)
java.util.ConcurrentModificationException: KafkaConsumer is not safe for
multi-threaded access
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1430)
at
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1360)
at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
at java.util.LinkedHashMap.afterNodeInsertion(Unknown Source)
at java.util.HashMap.putVal(Unknown Source)
at java.util.HashMap.put(Unknown Source)
at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer$.get(CachedKafkaConsumer.scala:158)
at
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.(KafkaRDD.scala:210)
at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:185)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)


On Wed, Sep 7, 2016 at 3:07 PM, Cody Koeninger  wrote:

> you could try setting
>
> spark.streaming.kafka.consumer.cache.initialCapacity
>
> spark.streaming.kafka.consumer.cache.maxCapacity
>
> to 1
>
> On Wed, Sep 7, 2016 at 2:02 PM, Srikanth  wrote:
> > I had a look at the executor logs and noticed that this exception happens
> > only when using the cached consumer.
> > Every retry is successful. This is consistent.
> > One possibility is that the cached consumer is causing the failure as
> retry
> > clears it.
> > Is there a way to disable cache and test this?
> > Again, kafkacat is running fine on the same node.
> >
> > 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0 (TID
> 7849)
> > 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0 (TID
> 7851
> >
> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 2
> > offsets 57079162 -> 57090330
> > 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 0
> > offsets 57098866 -> 57109957
> > 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0 (TID
> > 7851). 1030 bytes result sent to driver
> > 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage 138.0
> (TID
> > 7849)
> > java.lang.AssertionError: assertion failed: Failed to get records for
> > spark-executor-StreamingPixelCount1 mt_event 0 57100069 after polling
> for
> > 2048
> >   at scala.Predef$.assert(Predef.scala:170)
> >   at
> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:74)
> >   at
> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:227)
> >   at
> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:193)
> >
> > 16/09/07 16:00:02 INFO CoarseGrainedExecutorBackend: Got assigned task
> 7854
> > 16/09/07 16:00:02 INFO Executor: Running task 1.1 in stage 138.0 (TID
> 7854)
> > 16/09/07 16:00:02 INFO KafkaRDD: Computing topic mt_event, partition 0
> > offsets 57098866 -> 57109957
> > 16/09/07 16:00:02 INFO CachedKafkaConsumer: Initial fetch for
> > spark-executor-StreamingPixelCount1 mt_event 0 57098866
> >
> > 16/09/07 16:00:03 INFO Executor: Finished task 1.1 in stage 138.0 (TID
> > 7854). 1103 bytes result sent to driver
> >
> >
> >
> > On Wed, Aug 24, 2016 at 2:13 PM, Srikanth  wrote:
> >>
> >> Thanks Cody. Setting poll timeout helped.
> >> Our network is fine but brokers are not fully provisioned in test
> cluster.
> >> But there isn't enough load to max out on broker capacity.
> >> Curious that kafkacat running on the same node doesn't have any issues.
> >>
> >> Srikanth
> >>
> >> On Tue, Aug 23, 2016 at 9:52 PM, Cody Koeninger 
> >> wrote:
> >>>
> >>> You can set that poll timeout higher with
> 

Re: How to write data into CouchBase using Spark & Scala?

2016-09-07 Thread aka.fe2s
Most likely you are missing an import statement that enables some Scala
implicits. I haven't used this connector, but looks like you need "import
com.couchbase.spark._"

--
Oleksiy Dyagilev

On Wed, Sep 7, 2016 at 9:42 AM, Devi P.V  wrote:

> I am newbie in CouchBase.I am trying to write data into CouchBase.My
> sample code is following,
>
> val cfg = new SparkConf()
>   .setAppName("couchbaseQuickstart")
>   .setMaster("local[*]")
>   .set("com.couchbase.bucket.MyBucket","pwd")
>
> val sc = new SparkContext(cfg)
> val doc1 = JsonDocument.create("doc1", JsonObject.create().put("some", 
> "content"))
> val doc2 = JsonArrayDocument.create("doc2", JsonArray.from("more", "content", 
> "in", "here"))
>
> val data = sc
>   .parallelize(Seq(doc1, doc2))
>
> But I can't access data.saveToCouchbase().
>
> I am using Spark 1.6.1 & Scala 2.11.8
>
> I gave following dependencies in built.sbt
>
> libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "1.6.1"
> libraryDependencies += "com.couchbase.client" % "spark-connector_2.11" % 
> "1.2.1"
>
>
> How can I write data into CouchBase using Spark & Scala?
>
>
>
>
>


Re: Spark 2.0 with Kafka 0.10 exception

2016-09-07 Thread Cody Koeninger
you could try setting

spark.streaming.kafka.consumer.cache.initialCapacity

spark.streaming.kafka.consumer.cache.maxCapacity

to 1

On Wed, Sep 7, 2016 at 2:02 PM, Srikanth  wrote:
> I had a look at the executor logs and noticed that this exception happens
> only when using the cached consumer.
> Every retry is successful. This is consistent.
> One possibility is that the cached consumer is causing the failure as retry
> clears it.
> Is there a way to disable cache and test this?
> Again, kafkacat is running fine on the same node.
>
> 16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0 (TID 7849)
> 16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0 (TID 7851
>
> 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 2
> offsets 57079162 -> 57090330
> 16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 0
> offsets 57098866 -> 57109957
> 16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0 (TID
> 7851). 1030 bytes result sent to driver
> 16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage 138.0 (TID
> 7849)
> java.lang.AssertionError: assertion failed: Failed to get records for
> spark-executor-StreamingPixelCount1 mt_event 0 57100069 after polling for
> 2048
>   at scala.Predef$.assert(Predef.scala:170)
>   at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>   at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>   at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>
> 16/09/07 16:00:02 INFO CoarseGrainedExecutorBackend: Got assigned task 7854
> 16/09/07 16:00:02 INFO Executor: Running task 1.1 in stage 138.0 (TID 7854)
> 16/09/07 16:00:02 INFO KafkaRDD: Computing topic mt_event, partition 0
> offsets 57098866 -> 57109957
> 16/09/07 16:00:02 INFO CachedKafkaConsumer: Initial fetch for
> spark-executor-StreamingPixelCount1 mt_event 0 57098866
>
> 16/09/07 16:00:03 INFO Executor: Finished task 1.1 in stage 138.0 (TID
> 7854). 1103 bytes result sent to driver
>
>
>
> On Wed, Aug 24, 2016 at 2:13 PM, Srikanth  wrote:
>>
>> Thanks Cody. Setting poll timeout helped.
>> Our network is fine but brokers are not fully provisioned in test cluster.
>> But there isn't enough load to max out on broker capacity.
>> Curious that kafkacat running on the same node doesn't have any issues.
>>
>> Srikanth
>>
>> On Tue, Aug 23, 2016 at 9:52 PM, Cody Koeninger 
>> wrote:
>>>
>>> You can set that poll timeout higher with
>>>
>>> spark.streaming.kafka.consumer.poll.ms
>>>
>>> but half a second is fairly generous.  I'd try to take a look at
>>> what's going on with your network or kafka broker during that time.
>>>
>>> On Tue, Aug 23, 2016 at 4:44 PM, Srikanth  wrote:
>>> > Hello,
>>> >
>>> > I'm getting the below exception when testing Spark 2.0 with Kafka 0.10.
>>> >
>>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka version : 0.10.0.0
>>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId :
>>> >> b8642491e78c5a13
>>> >> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch for
>>> >> spark-executor-example mt_event 0 15782114
>>> >> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered coordinator
>>> >> 10.150.254.161:9233 (id: 2147483646 rack: null) for group
>>> >> spark-executor-example.
>>> >> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in stage 1.0
>>> >> (TID
>>> >> 6)
>>> >> java.lang.AssertionError: assertion failed: Failed to get records for
>>> >> spark-executor-example mt_event 0 15782114 after polling for 512
>>> >> at scala.Predef$.assert(Predef.scala:170)
>>> >> at
>>> >>
>>> >> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
>>> >> at
>>> >>
>>> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>>> >> at
>>> >>
>>> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>>> >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>>> >
>>> >
>>> > I get this error intermittently. Sometimes a few batches are scheduled
>>> > and
>>> > run fine. Then I get this error.
>>> > kafkacat is able to fetch from this topic continuously.
>>> >
>>> > Full exception is here --
>>> > https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767
>>> >
>>> > Srikanth
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: LabeledPoint creation

2016-09-07 Thread aka.fe2s
It has 4 categories
a = 1 0 0
b = 0 0 0
c = 0 1 0
d = 0 0 1

--
Oleksiy Dyagilev

On Wed, Sep 7, 2016 at 10:42 AM, Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi,
>
> Any help on above mail use case ?
>
> Regards,
> Rajesh
>
> On Tue, Sep 6, 2016 at 5:40 PM, Madabhattula Rajesh Kumar <
> mrajaf...@gmail.com> wrote:
>
>> Hi,
>>
>> I am new to Spark ML, trying to create a LabeledPoint from categorical
>> dataset(example code from spark). For this, I am using One-hot encoding
>>  feature. Below is my code
>>
>> val df = sparkSession.createDataFrame(Seq(
>>   (0, "a"),
>>   (1, "b"),
>>   (2, "c"),
>>   (3, "a"),
>>   (4, "a"),
>>   (5, "c"),
>>   (6, "d"))).toDF("id", "category")
>>
>> val indexer = new StringIndexer()
>>   .setInputCol("category")
>>   .setOutputCol("categoryIndex")
>>   .fit(df)
>>
>> val indexed = indexer.transform(df)
>>
>> indexed.select("category", "categoryIndex").show()
>>
>> val encoder = new OneHotEncoder()
>>   .setInputCol("categoryIndex")
>>   .setOutputCol("categoryVec")
>> val encoded = encoder.transform(indexed)
>>
>>  encoded.select("id", "category", "categoryVec").show()
>>
>> *Output :- *
>> +---++-+
>> | id|category|  categoryVec|
>> +---++-+
>> |  0|   a|(3,[0],[1.0])|
>> |  1|   b|(3,[],[])|
>> |  2|   c|(3,[1],[1.0])|
>> |  3|   a|(3,[0],[1.0])|
>> |  4|   a|(3,[0],[1.0])|
>> |  5|   c|(3,[1],[1.0])|
>> |  6|   d|(3,[2],[1.0])|
>> +---++-+
>>
>> *Creating LablePoint from encoded dataframe:-*
>>
>> val data = encoded.rdd.map { x =>
>>   {
>> val featureVector = Vectors.dense(x.getAs[org.apac
>> he.spark.ml.linalg.SparseVector]("categoryVec").toArray)
>> val label = x.getAs[java.lang.Integer]("id").toDouble
>> LabeledPoint(label, featureVector)
>>   }
>> }
>>
>> data.foreach { x => println(x) }
>>
>> *Output :-*
>>
>> (0.0,[1.0,0.0,0.0])
>> (1.0,[0.0,0.0,0.0])
>> (2.0,[0.0,1.0,0.0])
>> (3.0,[1.0,0.0,0.0])
>> (4.0,[1.0,0.0,0.0])
>> (5.0,[0.0,1.0,0.0])
>> (6.0,[0.0,0.0,1.0])
>>
>> I have a four categorical values like a, b, c, d. I am expecting 4
>> features in the above LablePoint but it has only 3 features.
>>
>> Please help me to creation of LablePoint from categorical features.
>>
>> Regards,
>> Rajesh
>>
>>
>>
>


Re: Spark 2.0 with Kafka 0.10 exception

2016-09-07 Thread Srikanth
I had a look at the executor logs and noticed that this exception happens
only when using the cached consumer.
Every retry is successful. This is consistent.
One possibility is that the cached consumer is causing the failure as retry
clears it.
Is there a way to disable cache and test this?
Again, kafkacat is running fine on the same node.

16/09/07 16:00:00 INFO Executor: Running task 1.0 in stage 138.0 (TID 7849)
16/09/07 16:00:00 INFO Executor: Running task 3.0 in stage 138.0 (TID 7851

16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 2
offsets 57079162 -> 57090330
16/09/07 16:00:00 INFO KafkaRDD: Computing topic mt_event, partition 0
offsets 57098866 -> 57109957
16/09/07 16:00:00 INFO Executor: Finished task 3.0 in stage 138.0 (TID
7851). 1030 bytes result sent to driver
16/09/07 16:00:02 ERROR Executor: Exception in task 1.0 in stage 138.0
(TID 7849)
java.lang.AssertionError: assertion failed: Failed to get records for
spark-executor-StreamingPixelCount1 mt_event 0 57100069 after polling
for 2048
at scala.Predef$.assert(Predef.scala:170)
at 
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at 
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)

16/09/07 16:00:02 INFO CoarseGrainedExecutorBackend: Got assigned task 7854
16/09/07 16:00:02 INFO Executor: Running task 1.1 in stage 138.0 (TID 7854)
16/09/07 16:00:02 INFO KafkaRDD: Computing topic mt_event, partition 0
offsets 57098866 -> 57109957
16/09/07 16:00:02 INFO CachedKafkaConsumer: Initial fetch for
spark-executor-StreamingPixelCount1 mt_event 0 57098866

16/09/07 16:00:03 INFO Executor: Finished task 1.1 in stage 138.0 (TID
7854). 1103 bytes result sent to driver



On Wed, Aug 24, 2016 at 2:13 PM, Srikanth  wrote:

> Thanks Cody. Setting poll timeout helped.
> Our network is fine but brokers are not fully provisioned in test cluster.
> But there isn't enough load to max out on broker capacity.
> Curious that kafkacat running on the same node doesn't have any issues.
>
> Srikanth
>
> On Tue, Aug 23, 2016 at 9:52 PM, Cody Koeninger 
> wrote:
>
>> You can set that poll timeout higher with
>>
>> spark.streaming.kafka.consumer.poll.ms
>>
>> but half a second is fairly generous.  I'd try to take a look at
>> what's going on with your network or kafka broker during that time.
>>
>> On Tue, Aug 23, 2016 at 4:44 PM, Srikanth  wrote:
>> > Hello,
>> >
>> > I'm getting the below exception when testing Spark 2.0 with Kafka 0.10.
>> >
>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka version : 0.10.0.0
>> >> 16/08/23 16:31:01 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13
>> >> 16/08/23 16:31:01 INFO CachedKafkaConsumer: Initial fetch for
>> >> spark-executor-example mt_event 0 15782114
>> >> 16/08/23 16:31:01 INFO AbstractCoordinator: Discovered coordinator
>> >> 10.150.254.161:9233 (id: 2147483646 rack: null) for group
>> >> spark-executor-example.
>> >> 16/08/23 16:31:02 ERROR Executor: Exception in task 0.0 in stage 1.0
>> (TID
>> >> 6)
>> >> java.lang.AssertionError: assertion failed: Failed to get records for
>> >> spark-executor-example mt_event 0 15782114 after polling for 512
>> >> at scala.Predef$.assert(Predef.scala:170)
>> >> at
>> >> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(
>> CachedKafkaConsumer.scala:74)
>> >> at
>> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterato
>> r.next(KafkaRDD.scala:227)
>> >> at
>> >> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterato
>> r.next(KafkaRDD.scala:193)
>> >> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> >
>> >
>> > I get this error intermittently. Sometimes a few batches are scheduled
>> and
>> > run fine. Then I get this error.
>> > kafkacat is able to fetch from this topic continuously.
>> >
>> > Full exception is here --
>> > https://gist.github.com/SrikanthTati/c2e95c4ac689cd49aab817e24ec42767
>> >
>> > Srikanth
>>
>
>


Re: Split RDD by key and save to different files

2016-09-07 Thread Dhaval Patel
In order to do that, first of all you need to Key RDD by Key. and then use
saveAsHadoopFile in this way:

We can use saveAsHadoopFile(location,classOf[KeyClass],
classOf[ValueClass], classOf[PartitionOutputFormat])

When PartitionOutputFormat is extended from MultipleTextOutputFormat.

Sample for that is below:

class PartitionOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any =
/// Add logic if you want to create any Key from Key and Value

  override def generateFileNameForKeyValue(key: Any, value: Any, basePath:
String): String = {
   /// Add logic to generate file name from Key and Value, Generally we use
basePath and add Key to it to make filename for that set of keys.
  }
}


On Wed, Sep 7, 2016 at 10:58 AM, Vikash Kumar  wrote:

> I need to spilt RDD [keys, Iterable[Value]]  to save each key into
> different file.
>
> e.g I have records like: customerId, name, age, sex
>
> 111,abc,34,M
> 122, xyz,32,F
> 111,def,31,F
> 122.trp,30,F
> 133,jkl,35,M
>
> I need to write 3 different files based on customerId
> file1:
> 111,abc,34,M
> 111,def,31,F
>
> file2:
> 122, xyz,32,F
> 122.trp,30,F
>
> file3:
> 133,jkl,35,M
>
> How I can achieve this in spark scala code?
>


Error while storing datetime read from MySQL back to MySQL

2016-09-07 Thread Dhaval Patel
I am facing an error while trying to save Dataframe containing datetime
field into MySQL table.
What I am doing is:
1. Reading data from MySQL table which has fields of type datetime in MySQL.
2. Process Dataframe.
3. Store/Save Dataframe back into another MySQL table.

While creating table, spark is creating table in MySQL with type timestamp
for field which were datetime in MySQL.

For reference, I am adding dataframe contents:
scala> data.select("date_from","date_to").show(5, false)
+-+-+
|date_from|date_to  |
+-+-+
|1899-12-31 18:00:00.0|2199-12-31 23:59:59.0|
|1899-12-31 18:00:00.0|2199-12-31 23:59:59.0|
|1899-12-31 18:00:00.0|2199-12-31 23:59:59.0|
|1899-12-31 18:00:00.0|2199-12-31 23:59:59.0|
|1899-12-31 18:00:00.0|2199-12-31 23:59:59.0|
+-+-+


And while trying to insert data in new table, I am getting following error:


=
scala> data.write.mode("overwrite").jdbc(url,"test_service_table", prop)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 11.0 failed 1 times, most recent failure: Lost task 0.0 in stage
11.0 (TID 11, localhost): java.sql.BatchUpdateException: Data truncation:
Incorrect datetime value: '1899-12-31 15:00:00' for column 'date_from' at
row 1
at
com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:2028)
at
com.mysql.jdbc.PreparedStatement.executeBatch(PreparedStatement.java:1451)
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:215)
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:277)
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:276)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
at java.lang.Thread.run(Unknown Source)
Caused by: com.mysql.jdbc.MysqlDataTruncation: Data truncation: Incorrect
datetime value: '1899-12-31 15:00:00' for column 'date_from' at row 1
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3607)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3541)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2002)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2163)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2624)
at
com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2127)
at
com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2427)
at
com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1980)
... 14 more

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at

Re: No SparkR on Mesos?

2016-09-07 Thread Felix Cheung
This is correct - SparkR is not quite working completely on Mesos. JIRAs and 
contributions welcome!





On Wed, Sep 7, 2016 at 10:21 AM -0700, "Michael Gummelt" 
> wrote:

Quite possibly.  I've never used it.  I know Python was "unsupported" for a 
while, which turned out to mean there was a silly conditional that would fail 
the submission, even though all the support was there.  Could be the same for 
R.  Can you submit a JIRA?

On Wed, Sep 7, 2016 at 5:02 AM, Peter Griessl 
> wrote:
Hello,

does SparkR really not work (yet?) on Mesos (Spark 2.0 on Mesos 1.0)?

$ /opt/spark/bin/sparkR

R version 3.3.1 (2016-06-21) -- "Bug in Your Hair"
Copyright (C) 2016 The R Foundation for Statistical Computing
Platform: x86_64-pc-linux-gnu (64-bit)
Launching java with spark-submit command /opt/spark/bin/spark-submit   
"sparkr-shell" /tmp/RtmpPYVJxF/backend_port338581f434
Error: SparkR is not supported for Mesos cluster.
Error in sparkR.sparkContext(master, appName, sparkHome, sparkConfigMap,  :
  JVM is not ready after 10 seconds


I couldn't find any information on this subject in the docs - am I missing 
something?

Thanks for any hints,
Peter



--
Michael Gummelt
Software Engineer
Mesosphere


Split RDD by key and save to different files

2016-09-07 Thread Vikash Kumar
I need to spilt RDD [keys, Iterable[Value]]  to save each key into
different file.

e.g I have records like: customerId, name, age, sex

111,abc,34,M
122, xyz,32,F
111,def,31,F
122.trp,30,F
133,jkl,35,M

I need to write 3 different files based on customerId
file1:
111,abc,34,M
111,def,31,F

file2:
122, xyz,32,F
122.trp,30,F

file3:
133,jkl,35,M

How I can achieve this in spark scala code?


Re: Spark Java Heap Error

2016-09-07 Thread neil90
If your in local mode just allocate all your memory you want to use to your
Driver(that acts as the executor in local mode) don't even bother changing
the executor memory. So your new settings should look like this...

spark.driver.memory  16g 
spark.driver.maxResultSize   2g 
spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value 

You might need to change your spark.driver.maxResultSize settings if you
plan on doing a collect on the entire rdd/dataframe.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Java-Heap-Error-tp27669p27673.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: No SparkR on Mesos?

2016-09-07 Thread Michael Gummelt
Quite possibly.  I've never used it.  I know Python was "unsupported" for a
while, which turned out to mean there was a silly conditional that would
fail the submission, even though all the support was there.  Could be the
same for R.  Can you submit a JIRA?

On Wed, Sep 7, 2016 at 5:02 AM, Peter Griessl  wrote:

> Hello,
>
>
>
> does SparkR really not work (yet?) on Mesos (Spark 2.0 on Mesos 1.0)?
>
>
>
> $ /opt/spark/bin/sparkR
>
>
>
> R version 3.3.1 (2016-06-21) -- "Bug in Your Hair"
>
> Copyright (C) 2016 The R Foundation for Statistical Computing
>
> Platform: x86_64-pc-linux-gnu (64-bit)
>
> Launching java with spark-submit command /opt/spark/bin/spark-submit
> "sparkr-shell" /tmp/RtmpPYVJxF/backend_port338581f434
>
> Error: *SparkR is not supported for Mesos cluster*.
>
> Error in sparkR.sparkContext(master, appName, sparkHome, sparkConfigMap,  :
>
>   JVM is not ready after 10 seconds
>
>
>
>
>
> I couldn’t find any information on this subject in the docs – am I missing
> something?
>
>
>
> Thanks for any hints,
>
> Peter
>



-- 
Michael Gummelt
Software Engineer
Mesosphere


Re: Spark Metrics: custom source/sink configurations not getting recognized

2016-09-07 Thread map reduced
Thanks for the reply, I wish it did. We have an internal metrics system
where we need to submit to. I am sure that the ways I've tried work with
yarn deployment, but not with standalone.

Thanks,
KP

On Tue, Sep 6, 2016 at 11:36 PM, Benjamin Kim  wrote:

> We use Graphite/Grafana for custom metrics. We found Spark’s metrics not
> to be customizable. So, we write directly using Graphite’s API, which was
> very easy to do using Java’s socket library in Scala. It works great for
> us, and we are going one step further using Sensu to alert us if there is
> an anomaly in the metrics beyond the norm.
>
> Hope this helps.
>
> Cheers,
> Ben
>
>
> On Sep 6, 2016, at 9:52 PM, map reduced  wrote:
>
> Hi, anyone has any ideas please?
>
> On Mon, Sep 5, 2016 at 8:30 PM, map reduced  wrote:
>
>> Hi,
>>
>> I've written my custom metrics source/sink for my Spark streaming app and
>> I am trying to initialize it from metrics.properties - but that doesn't
>> work from executors. I don't have control on the machines in Spark cluster,
>> so I can't copy properties file in $SPARK_HOME/conf/ in the cluster. I have
>> it in the fat jar where my app lives, but by the time my fat jar is
>> downloaded on worker nodes in cluster, executors are already started and
>> their Metrics system is already initialized - thus not picking my file with
>> custom source configuration in it.
>>
>> Following this post
>> ,
>> I've specified 'spark.files
>>  =
>> metrics.properties' and 'spark.metrics.conf=metrics.properties' but by
>> the time 'metrics.properties' is shipped to executors, their metric system
>> is already initialized.
>>
>> If I initialize my own metrics system, it's picking up my file but then
>> I'm missing master/executor level metrics/properties (eg.
>> executor.sink.mySink.propName=myProp - can't read 'propName' from
>> 'mySink') since they are initialized
>> 
>>  by
>> Spark's metric system.
>>
>> Is there a (programmatic) way to have 'metrics.properties' shipped before
>> executors initialize
>> 
>>  ?
>>
>> Here's my SO question
>> 
>> .
>>
>> Thanks,
>>
>> KP
>>
>
>
>


Re: Mesos coarse-grained problem with spark.shuffle.service.enabled

2016-09-07 Thread Michael Gummelt
The shuffle service is run out of band from any specific Spark job, and you
only run one on any given node.  You need to get the Spark distribution on
each node somehow, then run the shuffle service out of that distribution.
The most common way I see people doing this is via Marathon (using the
"uris" field in the marathon app to download the Spark distribution).

On Wed, Sep 7, 2016 at 2:16 AM, Tamas Szuromi <
tamas.szur...@odigeo.com.invalid> wrote:

> Hello,
>
> For a while, we're using Spark on Mesos with fine-grained mode in
> production.
> Since Spark 2.0 the fine-grained mode is deprecated so we'd shift to
> dynamic allocation.
>
> When I tried to setup the dynamic allocation I run into the following
> problem:
> So I set spark.shuffle.service.enabled = true and 
> spark.dynamicAllocation.enabled
> = true as the documentation said. We're using Spark on Mesos
> with spark.executor.uri where we download the pipeline's
> corresponding Spark version from HDFS. The documentation also says In Mesos
> coarse-grained mode, run $SPARK_HOME/sbin/start-mesos-shuffle-service.sh
> on all slave nodes. But how is it possible to launch it before start the
> application, if the given Spark will be downloaded to the Mesos executor
> after executor launch but it's looking for the started external shuffle
> service in advance?
>
> Is it possible I can't use spark.executor.uri and spark.dynamicAllocation.
> enabled together?
>
> Thanks in advance!
>
> Tamas
>
>
>


-- 
Michael Gummelt
Software Engineer
Mesosphere


Re: dstream.foreachRDD iteration

2016-09-07 Thread Ashok Kumar
I have checked that doc sir.
My understand every batch interval of data always generates one RDD, So why do 
we need to use foreachRDD when there is only one.

Sorry for this question but bit confusing me.
Thanks

 

On Wednesday, 7 September 2016, 18:05, Mich Talebzadeh 
 wrote:
 

 Hi,
What is so confusing about RDD. Have you checked this doc?
http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
HTH
Dr Mich Talebzadeh LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destructionof data or any other property which may arise from relying 
on this email's technical content is explicitly disclaimed.The author will in 
no case be liable for any monetary damages arising from suchloss, damage or 
destruction.  
On 7 September 2016 at 11:39, Ashok Kumar  wrote:

Hi,
A bit confusing to me
How many layers involved in DStream.foreachRDD.
Do I need to loop over it more than once? I mean  DStream.foreachRDD{ rdd = > }
I am trying to get individual lines in RDD.
Thanks



   

Re: dstream.foreachRDD iteration

2016-09-07 Thread Mich Talebzadeh
Hi,

What is so confusing about RDD. Have you checked this doc?

http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 7 September 2016 at 11:39, Ashok Kumar 
wrote:

> Hi,
>
> A bit confusing to me
>
> How many layers involved in DStream.foreachRDD.
>
> Do I need to loop over it more than once? I mean  DStream.foreachRDD{ rdd
> = > }
>
> I am trying to get individual lines in RDD.
>
> Thanks
>


Re: Reset auto.offset.reset in Kafka 0.10 integ

2016-09-07 Thread Cody Koeninger
Kafka as yet doesn't have a good way to distinguish between "it's ok
to reset at the beginning of a job" and "it's ok to reset any time
offsets are out of range"

https://issues.apache.org/jira/browse/KAFKA-3370

Because of that, it's better IMHO to err on the side of obvious
failures, as opposed to silent ones.

Like I said, the quickest way to deal with this from a user
perspective is just use a new consumer group.



On Wed, Sep 7, 2016 at 11:44 AM, Srikanth  wrote:
> Yes that's right.
>
> I understand this is a data loss. The restart doesn't have to be all that
> silent. It requires us to set a flag. I thought auto.offset.reset is that
> flag.
> But there isn't much I can do at this point given that retention has cleaned
> things up.
> The app has to start. Let admins address the data loss on the side.
>
> On Wed, Sep 7, 2016 at 12:15 PM, Cody Koeninger  wrote:
>>
>> Just so I'm clear on what's happening...
>>
>> - you're running a job that auto-commits offsets to kafka.
>> - you stop that job for longer than your retention
>> - you start that job back up, and it errors because the last committed
>> offset is no longer available
>> - you think that instead of erroring, the job should silently restart
>> based on the value of auto.offset.reset
>>
>> Is that accurate?
>>
>>
>> On Wed, Sep 7, 2016 at 10:44 AM, Srikanth  wrote:
>> > My retention is 1d which isn't terribly low. The problem is every time I
>> > restart after retention expiry, I get this exception instead of honoring
>> > auto.offset.reset.
>> > It isn't a corner case where retention expired after driver created a
>> > batch.
>> > Its easily reproducible and consistent.
>> >
>> > On Tue, Sep 6, 2016 at 3:34 PM, Cody Koeninger 
>> > wrote:
>> >>
>> >> You don't want auto.offset.reset on executors, you want executors to
>> >> do what the driver told them to do.  Otherwise you're going to get
>> >> really horrible data inconsistency issues if the executors silently
>> >> reset.
>> >>
>> >> If your retention is so low that retention gets expired in between
>> >> when the driver created a batch with a given starting offset, and when
>> >> an executor starts to process that batch, you're going to have
>> >> problems.
>> >>
>> >> On Tue, Sep 6, 2016 at 2:30 PM, Srikanth  wrote:
>> >> > This isn't a production setup. We kept retention low intentionally.
>> >> > My original question was why I got the exception instead of it using
>> >> > auto.offset.reset on restart?
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > On Tue, Sep 6, 2016 at 10:48 AM, Cody Koeninger 
>> >> > wrote:
>> >> >>
>> >> >> If you leave enable.auto.commit set to true, it will commit offsets
>> >> >> to
>> >> >> kafka, but you will get undefined delivery semantics.
>> >> >>
>> >> >> If you just want to restart from a fresh state, the easiest thing to
>> >> >> do is use a new consumer group name.
>> >> >>
>> >> >> But if that keeps happening, you should look into why your retention
>> >> >> is not sufficient.
>> >> >>
>> >> >> On Tue, Sep 6, 2016 at 9:39 AM, Srikanth 
>> >> >> wrote:
>> >> >> > You are right. I got confused as its all part of same log when
>> >> >> > running
>> >> >> > from
>> >> >> > IDE.
>> >> >> > I was looking for a good guide to read to understand the this
>> >> >> > integ.
>> >> >> >
>> >> >> > I'm not managing offset on my own. I've not enabled checkpoint for
>> >> >> > my
>> >> >> > tests.
>> >> >> > I assumed offsets will be stored in kafka by default.
>> >> >> >
>> >> >> > KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] (
>> >> >> > ssc, PreferConsistent, SubscribePattern[Array[Byte],
>> >> >> > Array[Byte]](pattern, kafkaParams) )
>> >> >> >
>> >> >> >* @param offsets: offsets to begin at on initial startup.  If
>> >> >> > no
>> >> >> > offset
>> >> >> > is given for a
>> >> >> >* TopicPartition, the committed offset (if applicable) or kafka
>> >> >> > param
>> >> >> >* auto.offset.reset will be used.
>> >> >> >
>> >> >> > 16/09/02 13:11:11 INFO ConsumerConfig: ConsumerConfig values:
>> >> >> > enable.auto.commit = true
>> >> >> > auto.offset.reset = latest
>> >> >> >
>> >> >> > Srikanth
>> >> >> >
>> >> >> > On Sat, Sep 3, 2016 at 8:59 AM, Cody Koeninger
>> >> >> > 
>> >> >> > wrote:
>> >> >> >>
>> >> >> >> Seems like you're confused about the purpose of that line of
>> >> >> >> code,
>> >> >> >> it
>> >> >> >> applies to executors, not the driver. The driver is responsible
>> >> >> >> for
>> >> >> >> determining offsets.
>> >> >> >>
>> >> >> >> Where are you storing offsets, in Kafka, checkpoints, or your own
>> >> >> >> store?
>> >> >> >> Auto offset reset won't be used if there are stored offsets.
>> >> >> >>
>> >> >> >>
>> >> >> >> On Sep 2, 2016 14:58, "Srikanth"  wrote:
>> >> >> >>>
>> >> >> >>> Hi,
>> >> >> >>>
>> >> >> >>> Upon 

Re: Reset auto.offset.reset in Kafka 0.10 integ

2016-09-07 Thread Srikanth
Yes that's right.

I understand this is a data loss. The restart doesn't have to be all that
silent. It requires us to set a flag. I thought auto.offset.reset is that
flag.
But there isn't much I can do at this point given that retention has
cleaned things up.
The app has to start. Let admins address the data loss on the side.

On Wed, Sep 7, 2016 at 12:15 PM, Cody Koeninger  wrote:

> Just so I'm clear on what's happening...
>
> - you're running a job that auto-commits offsets to kafka.
> - you stop that job for longer than your retention
> - you start that job back up, and it errors because the last committed
> offset is no longer available
> - you think that instead of erroring, the job should silently restart
> based on the value of auto.offset.reset
>
> Is that accurate?
>
>
> On Wed, Sep 7, 2016 at 10:44 AM, Srikanth  wrote:
> > My retention is 1d which isn't terribly low. The problem is every time I
> > restart after retention expiry, I get this exception instead of honoring
> > auto.offset.reset.
> > It isn't a corner case where retention expired after driver created a
> batch.
> > Its easily reproducible and consistent.
> >
> > On Tue, Sep 6, 2016 at 3:34 PM, Cody Koeninger 
> wrote:
> >>
> >> You don't want auto.offset.reset on executors, you want executors to
> >> do what the driver told them to do.  Otherwise you're going to get
> >> really horrible data inconsistency issues if the executors silently
> >> reset.
> >>
> >> If your retention is so low that retention gets expired in between
> >> when the driver created a batch with a given starting offset, and when
> >> an executor starts to process that batch, you're going to have
> >> problems.
> >>
> >> On Tue, Sep 6, 2016 at 2:30 PM, Srikanth  wrote:
> >> > This isn't a production setup. We kept retention low intentionally.
> >> > My original question was why I got the exception instead of it using
> >> > auto.offset.reset on restart?
> >> >
> >> >
> >> >
> >> >
> >> > On Tue, Sep 6, 2016 at 10:48 AM, Cody Koeninger 
> >> > wrote:
> >> >>
> >> >> If you leave enable.auto.commit set to true, it will commit offsets
> to
> >> >> kafka, but you will get undefined delivery semantics.
> >> >>
> >> >> If you just want to restart from a fresh state, the easiest thing to
> >> >> do is use a new consumer group name.
> >> >>
> >> >> But if that keeps happening, you should look into why your retention
> >> >> is not sufficient.
> >> >>
> >> >> On Tue, Sep 6, 2016 at 9:39 AM, Srikanth 
> wrote:
> >> >> > You are right. I got confused as its all part of same log when
> >> >> > running
> >> >> > from
> >> >> > IDE.
> >> >> > I was looking for a good guide to read to understand the this
> integ.
> >> >> >
> >> >> > I'm not managing offset on my own. I've not enabled checkpoint for
> my
> >> >> > tests.
> >> >> > I assumed offsets will be stored in kafka by default.
> >> >> >
> >> >> > KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] (
> >> >> > ssc, PreferConsistent, SubscribePattern[Array[Byte],
> >> >> > Array[Byte]](pattern, kafkaParams) )
> >> >> >
> >> >> >* @param offsets: offsets to begin at on initial startup.  If no
> >> >> > offset
> >> >> > is given for a
> >> >> >* TopicPartition, the committed offset (if applicable) or kafka
> >> >> > param
> >> >> >* auto.offset.reset will be used.
> >> >> >
> >> >> > 16/09/02 13:11:11 INFO ConsumerConfig: ConsumerConfig values:
> >> >> > enable.auto.commit = true
> >> >> > auto.offset.reset = latest
> >> >> >
> >> >> > Srikanth
> >> >> >
> >> >> > On Sat, Sep 3, 2016 at 8:59 AM, Cody Koeninger  >
> >> >> > wrote:
> >> >> >>
> >> >> >> Seems like you're confused about the purpose of that line of code,
> >> >> >> it
> >> >> >> applies to executors, not the driver. The driver is responsible
> for
> >> >> >> determining offsets.
> >> >> >>
> >> >> >> Where are you storing offsets, in Kafka, checkpoints, or your own
> >> >> >> store?
> >> >> >> Auto offset reset won't be used if there are stored offsets.
> >> >> >>
> >> >> >>
> >> >> >> On Sep 2, 2016 14:58, "Srikanth"  wrote:
> >> >> >>>
> >> >> >>> Hi,
> >> >> >>>
> >> >> >>> Upon restarting my Spark Streaming app it is failing with error
> >> >> >>>
> >> >> >>> Exception in thread "main" org.apache.spark.SparkException: Job
> >> >> >>> aborted
> >> >> >>> due to stage failure: Task 0 in stage 1.0 failed 1 times, most
> >> >> >>> recent
> >> >> >>> failure: Lost task 0.0 in stage 1.0 (TID 6, localhost):
> >> >> >>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
> >> >> >>> Offsets
> >> >> >>> out of
> >> >> >>> range with no configured reset policy for partitions:
> >> >> >>> {mt-event-2=1710706}
> >> >> >>>
> >> >> >>> It is correct that the last read offset was deleted by kafka due
> to
> >> >> >>> retention period expiry.
> >> >> >>> I've set 

Re: spark 1.6.0 web console shows running application in a "waiting" status, but it's acutally running

2016-09-07 Thread arlindo santos
Assuming you configured spark to use zookeeper for ha, when the master fails 
over to another node, the workers will automatically attach themselves to the 
newly elected master and this works fine. My issue is that once I go over to 
the new master web GUI ( I see all the workers attached just fine, this means 
the failover worked just fine) My issue is that the web GUI now think the spark 
streaming app running on the cluster is in "waiting" state and this is not the 
case because the app is actually running and processing events.


[http://ttllxapp-spk03.lab.tsx.com:18080/static/spark-logo-77x50px-hd.png] 
1.6.0  Spark Master at 
spark://10.142.191.154:7077

  *   URL: spark://10.142.191.154:7077
  *   REST URL: spark://10.142.191.154:6066 (cluster mode)
  *   Alive Workers: 3
  *   Cores in use: 12 Total, 12 Used
  *   Memory in use: 8.2 GB Total, 4.0 GB Used
  *   Applications: 1 Running, 0 Completed
  *   Drivers: 1 Running, 0 Completed
  *   Status: ALIVE

Workers
Worker Id   Address State   Cores   Memory
worker-20160907122724-10.142.191.154-7078 
10.142.191.154:7078 ALIVE   4 (4 Used)  2.7 GB (2.0 GB Used)
worker-20160907122724-10.142.191.159-7078 
10.142.191.159:7078 ALIVE   4 (4 Used)  2.7 GB (1024.0 MB Used)
worker-20160907122724-10.142.191.162-7078 
10.142.191.162:7078 ALIVE   4 (4 Used)  2.7 GB (1024.0 MB Used)
Running Applications
Application ID  NameCores   Memory per Node Submitted Time  UserState   
Duration
app-20160907122851-
(kill)
Ex1Feed12  1024.0 MB   
2016/09/07 12:28:51 spark   WAITING 10 min





From: Mich Talebzadeh 
Sent: September 7, 2016 3:42 PM
To: arlindo santos
Cc: user @spark
Subject: Re: spark 1.6.0 web console shows running application in a "waiting" 
status, but it's acutally running

I just tested it.

If you start master on the original host, the workers on that host they won't 
respond. They will stay stale. So there is no heartbeat between workers and 
master except the initial handshake

The only way is to stop workers (if they are still running), restart the master 
and restart workers.


HTH



Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 7 September 2016 at 16:15, Mich Talebzadeh 
> wrote:
Ok but look at the worker why is it still saying port 7077. That port on that 
host as far as I know is the port that local master is running which is no 
longer there?


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 7 September 2016 at 16:05, arlindo santos 
> wrote:

Port 7077 is for "client" mode connections to the master. In "cluster" mode 
it's 6066 and this means the "driver" runs on the spark cluster on a node spark 
chooses. The command I use to deploy my spark app (including the driver) is 
below:


spark-submit --deploy-mode cluster --master 
spark://tiplxapp-spk01:6066,tiplxapp-spk02:6066,tiplxapp-spk03:6066 
/app/tmx/ngxspark/lib/EX1AppSpark-1.0.13.jar



Yes, your right I believe when the master dies, zookeeper detects that and 
elects a new master node and spark-submit should carry on. Not sure how this 
leads into the UI believing the app is in "waiting" state?


Also, I noticed when these fail overs happen the "worker" web GUI goes a bit 
strange and starts reporting over allocated resources? Look at the cores and 
memory used?


[http://142.201.185.134:18081/static/spark-logo-77x50px-hd.png] 1.6.0 
 Spark Worker at 
142.201.185.134:7078

  *   ID: worker-20160622152457-142.201.185.134-7078
  *   Master URL: spark://142.201.185.132:7077
  *   Cores: 4 (5 Used)
  *   Memory: 2.7 GB (3.0 GB 

Re: Reset auto.offset.reset in Kafka 0.10 integ

2016-09-07 Thread Cody Koeninger
Just so I'm clear on what's happening...

- you're running a job that auto-commits offsets to kafka.
- you stop that job for longer than your retention
- you start that job back up, and it errors because the last committed
offset is no longer available
- you think that instead of erroring, the job should silently restart
based on the value of auto.offset.reset

Is that accurate?


On Wed, Sep 7, 2016 at 10:44 AM, Srikanth  wrote:
> My retention is 1d which isn't terribly low. The problem is every time I
> restart after retention expiry, I get this exception instead of honoring
> auto.offset.reset.
> It isn't a corner case where retention expired after driver created a batch.
> Its easily reproducible and consistent.
>
> On Tue, Sep 6, 2016 at 3:34 PM, Cody Koeninger  wrote:
>>
>> You don't want auto.offset.reset on executors, you want executors to
>> do what the driver told them to do.  Otherwise you're going to get
>> really horrible data inconsistency issues if the executors silently
>> reset.
>>
>> If your retention is so low that retention gets expired in between
>> when the driver created a batch with a given starting offset, and when
>> an executor starts to process that batch, you're going to have
>> problems.
>>
>> On Tue, Sep 6, 2016 at 2:30 PM, Srikanth  wrote:
>> > This isn't a production setup. We kept retention low intentionally.
>> > My original question was why I got the exception instead of it using
>> > auto.offset.reset on restart?
>> >
>> >
>> >
>> >
>> > On Tue, Sep 6, 2016 at 10:48 AM, Cody Koeninger 
>> > wrote:
>> >>
>> >> If you leave enable.auto.commit set to true, it will commit offsets to
>> >> kafka, but you will get undefined delivery semantics.
>> >>
>> >> If you just want to restart from a fresh state, the easiest thing to
>> >> do is use a new consumer group name.
>> >>
>> >> But if that keeps happening, you should look into why your retention
>> >> is not sufficient.
>> >>
>> >> On Tue, Sep 6, 2016 at 9:39 AM, Srikanth  wrote:
>> >> > You are right. I got confused as its all part of same log when
>> >> > running
>> >> > from
>> >> > IDE.
>> >> > I was looking for a good guide to read to understand the this integ.
>> >> >
>> >> > I'm not managing offset on my own. I've not enabled checkpoint for my
>> >> > tests.
>> >> > I assumed offsets will be stored in kafka by default.
>> >> >
>> >> > KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] (
>> >> > ssc, PreferConsistent, SubscribePattern[Array[Byte],
>> >> > Array[Byte]](pattern, kafkaParams) )
>> >> >
>> >> >* @param offsets: offsets to begin at on initial startup.  If no
>> >> > offset
>> >> > is given for a
>> >> >* TopicPartition, the committed offset (if applicable) or kafka
>> >> > param
>> >> >* auto.offset.reset will be used.
>> >> >
>> >> > 16/09/02 13:11:11 INFO ConsumerConfig: ConsumerConfig values:
>> >> > enable.auto.commit = true
>> >> > auto.offset.reset = latest
>> >> >
>> >> > Srikanth
>> >> >
>> >> > On Sat, Sep 3, 2016 at 8:59 AM, Cody Koeninger 
>> >> > wrote:
>> >> >>
>> >> >> Seems like you're confused about the purpose of that line of code,
>> >> >> it
>> >> >> applies to executors, not the driver. The driver is responsible for
>> >> >> determining offsets.
>> >> >>
>> >> >> Where are you storing offsets, in Kafka, checkpoints, or your own
>> >> >> store?
>> >> >> Auto offset reset won't be used if there are stored offsets.
>> >> >>
>> >> >>
>> >> >> On Sep 2, 2016 14:58, "Srikanth"  wrote:
>> >> >>>
>> >> >>> Hi,
>> >> >>>
>> >> >>> Upon restarting my Spark Streaming app it is failing with error
>> >> >>>
>> >> >>> Exception in thread "main" org.apache.spark.SparkException: Job
>> >> >>> aborted
>> >> >>> due to stage failure: Task 0 in stage 1.0 failed 1 times, most
>> >> >>> recent
>> >> >>> failure: Lost task 0.0 in stage 1.0 (TID 6, localhost):
>> >> >>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
>> >> >>> Offsets
>> >> >>> out of
>> >> >>> range with no configured reset policy for partitions:
>> >> >>> {mt-event-2=1710706}
>> >> >>>
>> >> >>> It is correct that the last read offset was deleted by kafka due to
>> >> >>> retention period expiry.
>> >> >>> I've set auto.offset.reset in my app but it is getting reset here
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>> https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala#L160
>> >> >>>
>> >> >>> How to force it to restart in this case (fully aware of potential
>> >> >>> data
>> >> >>> loss)?
>> >> >>>
>> >> >>> Srikanth
>> >> >
>> >> >
>> >
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Reset auto.offset.reset in Kafka 0.10 integ

2016-09-07 Thread Srikanth
My retention is 1d which isn't terribly low. The problem is every time I
restart after retention expiry, I get this exception instead of honoring
auto.offset.reset.
It isn't a corner case where retention expired after driver created a
batch. Its easily reproducible and consistent.

On Tue, Sep 6, 2016 at 3:34 PM, Cody Koeninger  wrote:

> You don't want auto.offset.reset on executors, you want executors to
> do what the driver told them to do.  Otherwise you're going to get
> really horrible data inconsistency issues if the executors silently
> reset.
>
> If your retention is so low that retention gets expired in between
> when the driver created a batch with a given starting offset, and when
> an executor starts to process that batch, you're going to have
> problems.
>
> On Tue, Sep 6, 2016 at 2:30 PM, Srikanth  wrote:
> > This isn't a production setup. We kept retention low intentionally.
> > My original question was why I got the exception instead of it using
> > auto.offset.reset on restart?
> >
> >
> >
> >
> > On Tue, Sep 6, 2016 at 10:48 AM, Cody Koeninger 
> wrote:
> >>
> >> If you leave enable.auto.commit set to true, it will commit offsets to
> >> kafka, but you will get undefined delivery semantics.
> >>
> >> If you just want to restart from a fresh state, the easiest thing to
> >> do is use a new consumer group name.
> >>
> >> But if that keeps happening, you should look into why your retention
> >> is not sufficient.
> >>
> >> On Tue, Sep 6, 2016 at 9:39 AM, Srikanth  wrote:
> >> > You are right. I got confused as its all part of same log when running
> >> > from
> >> > IDE.
> >> > I was looking for a good guide to read to understand the this integ.
> >> >
> >> > I'm not managing offset on my own. I've not enabled checkpoint for my
> >> > tests.
> >> > I assumed offsets will be stored in kafka by default.
> >> >
> >> > KafkaUtils.createDirectStream[Array[Byte], Array[Byte]] (
> >> > ssc, PreferConsistent, SubscribePattern[Array[Byte],
> >> > Array[Byte]](pattern, kafkaParams) )
> >> >
> >> >* @param offsets: offsets to begin at on initial startup.  If no
> >> > offset
> >> > is given for a
> >> >* TopicPartition, the committed offset (if applicable) or kafka
> param
> >> >* auto.offset.reset will be used.
> >> >
> >> > 16/09/02 13:11:11 INFO ConsumerConfig: ConsumerConfig values:
> >> > enable.auto.commit = true
> >> > auto.offset.reset = latest
> >> >
> >> > Srikanth
> >> >
> >> > On Sat, Sep 3, 2016 at 8:59 AM, Cody Koeninger 
> >> > wrote:
> >> >>
> >> >> Seems like you're confused about the purpose of that line of code, it
> >> >> applies to executors, not the driver. The driver is responsible for
> >> >> determining offsets.
> >> >>
> >> >> Where are you storing offsets, in Kafka, checkpoints, or your own
> >> >> store?
> >> >> Auto offset reset won't be used if there are stored offsets.
> >> >>
> >> >>
> >> >> On Sep 2, 2016 14:58, "Srikanth"  wrote:
> >> >>>
> >> >>> Hi,
> >> >>>
> >> >>> Upon restarting my Spark Streaming app it is failing with error
> >> >>>
> >> >>> Exception in thread "main" org.apache.spark.SparkException: Job
> >> >>> aborted
> >> >>> due to stage failure: Task 0 in stage 1.0 failed 1 times, most
> recent
> >> >>> failure: Lost task 0.0 in stage 1.0 (TID 6, localhost):
> >> >>> org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
> Offsets
> >> >>> out of
> >> >>> range with no configured reset policy for partitions:
> >> >>> {mt-event-2=1710706}
> >> >>>
> >> >>> It is correct that the last read offset was deleted by kafka due to
> >> >>> retention period expiry.
> >> >>> I've set auto.offset.reset in my app but it is getting reset here
> >> >>>
> >> >>>
> >> >>> https://github.com/apache/spark/blob/master/external/
> kafka-0-10/src/main/scala/org/apache/spark/streaming/
> kafka010/KafkaUtils.scala#L160
> >> >>>
> >> >>> How to force it to restart in this case (fully aware of potential
> data
> >> >>> loss)?
> >> >>>
> >> >>> Srikanth
> >> >
> >> >
> >
> >
>


Re: spark 1.6.0 web console shows running application in a "waiting" status, but it's acutally running

2016-09-07 Thread Mich Talebzadeh
I just tested it.

If you start master on the original host, the workers on that host they
won't respond. They will stay stale. So there is no heartbeat between
workers and master except the initial handshake

The only way is to stop workers (if they are still running), restart the
master and restart workers.


HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 7 September 2016 at 16:15, Mich Talebzadeh 
wrote:

> Ok but look at the worker why is it still saying port 7077. That port on
> that host as far as I know is the port that local master is running which
> is no longer there?
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 7 September 2016 at 16:05, arlindo santos  wrote:
>
>> Port 7077 is for "client" mode connections to the master. In "cluster"
>> mode it's 6066 and this means the "driver" runs on the spark cluster on a
>> node spark chooses. The command I use to deploy my spark app (including the
>> driver) is below:
>>
>>
>> spark-submit --deploy-mode cluster --master spark://tiplxapp-spk01:6066,ti
>> plxapp-spk02:6066,tiplxapp-spk03:6066 /app/tmx/ngxspark/lib/EX1AppSp
>> ark-1.0.13.jar
>>
>>
>>
>> Yes, your right I believe when the master dies, zookeeper detects that
>> and elects a new master node and spark-submit should carry on. Not sure how
>> this leads into the UI believing the app is in "waiting" state?
>>
>>
>> Also, I noticed when these fail overs happen the "worker" web GUI goes a
>> bit strange and starts reporting over allocated resources? Look at the
>> cores and memory used?
>>
>>
>>  1.6.0  Spark Worker at
>> 142.201.185.134:7078
>>
>>- *ID:* worker-20160622152457-142.201.185.134-7078
>>- *Master URL:* spark://142.201.185.132:7077
>>- *Cores:* 4 (5 Used)
>>- *Memory:* 2.7 GB (3.0 GB Used)
>>
>> Back to Master 
>>
>>
>>
>>
>>
>> --
>> *From:* Mich Talebzadeh 
>> *Sent:* September 7, 2016 2:52 PM
>> *To:* arlindo santos
>>
>> *Cc:* user @spark
>> *Subject:* Re: spark 1.6.0 web console shows running application in a
>> "waiting" status, but it's acutally running
>>
>> This is my take.
>>
>> When you issue spark-submit on any node it start GUI on port 4040 by
>> default. Otherwise you can specify port yourself with --conf
>> "spark.ui.port="
>>
>> As I understand in standalone mode executors run on workers.
>>
>> $SPARK_HOME/sbin/start-slave.sh spark://::7077
>>
>> That port 7077 is the master port. If master dies, then those workers
>> lose connection to port 7077 so I believe they go stale. So the
>> spark-submit carries on using the remaining executors on other workers.
>>
>> So in summary one expects the job to run.  You start your UI on
>> :port.
>>
>> One test you can do is to exit from UI and start UI on the host that
>> zookeeper selects the master on the same port. That should work.
>>
>> HTH
>>
>>
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 7 September 2016 at 15:27, arlindo santos 
>> wrote:
>>
>>> Yes refreshed a few times. Running in cluster mode.
>>>
>>> Fyi.. I can duplicate this easily now. Our setup consists of 3 nodes
>>> running standalone spark, master and worker on each, zookeeper doing master
>>> 

Re: spark 1.6.0 web console shows running application in a "waiting" status, but it's acutally running

2016-09-07 Thread arlindo santos
Port 7077 is for "client" mode connections to the master. In "cluster" mode 
it's 6066 and this means the "driver" runs on the spark cluster on a node spark 
chooses. The command I use to deploy my spark app (including the driver) is 
below:


spark-submit --deploy-mode cluster --master 
spark://tiplxapp-spk01:6066,tiplxapp-spk02:6066,tiplxapp-spk03:6066 
/app/tmx/ngxspark/lib/EX1AppSpark-1.0.13.jar



Yes, your right I believe when the master dies, zookeeper detects that and 
elects a new master node and spark-submit should carry on. Not sure how this 
leads into the UI believing the app is in "waiting" state?


Also, I noticed when these fail overs happen the "worker" web GUI goes a bit 
strange and starts reporting over allocated resources? Look at the cores and 
memory used?


[http://142.201.185.134:18081/static/spark-logo-77x50px-hd.png] 1.6.0 
 Spark Worker at 142.201.185.134:7078

  *   ID: worker-20160622152457-142.201.185.134-7078
  *   Master URL: spark://142.201.185.132:7077
  *   Cores: 4 (5 Used)
  *   Memory: 2.7 GB (3.0 GB Used)

Back to Master






From: Mich Talebzadeh 
Sent: September 7, 2016 2:52 PM
To: arlindo santos
Cc: user @spark
Subject: Re: spark 1.6.0 web console shows running application in a "waiting" 
status, but it's acutally running

This is my take.

When you issue spark-submit on any node it start GUI on port 4040 by default. 
Otherwise you can specify port yourself with --conf  "spark.ui.port="

As I understand in standalone mode executors run on workers.

$SPARK_HOME/sbin/start-slave.sh spark://::7077

That port 7077 is the master port. If master dies, then those workers lose 
connection to port 7077 so I believe they go stale. So the spark-submit carries 
on using the remaining executors on other workers.

So in summary one expects the job to run.  You start your UI on :port.

One test you can do is to exit from UI and start UI on the host that zookeeper 
selects the master on the same port. That should work.

HTH







Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 7 September 2016 at 15:27, arlindo santos 
> wrote:
Yes refreshed a few times. Running in cluster mode.

Fyi.. I can duplicate this easily now. Our setup consists of 3 nodes running 
standalone spark, master and worker on each, zookeeper doing master leader 
election. If I kill a master on any node, the master shifts to another node and 
that is when the app state changes to waiting and never changes back to running 
on the gui, but really it's in a running mode.

Sent from my BlackBerry 10 smartphone on the Rogers network.
From: Mich Talebzadeh
Sent: Wednesday, September 7, 2016 9:50 AM
To: sarlindo
Cc: user @spark
Subject: Re: spark 1.6.0 web console shows running application in a "waiting" 
status, but it's acutally running


Have you refreshed the Spark UI page?

What Mode are you running your Spark app?

HTH


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 6 September 2016 at 16:15, sarlindo 
> wrote:
I have 2 questions/issues.

1. We had the spark-master shut down (reason unknown) we looked at the
spark-master logs and it simply shows this, is there some other log I should
be looking at to find out why the master went down?

16/09/05 21:10:00 INFO ClientCnxn: Opening socket connection to server
tiplxapp-spk02.prd.tse.com/142.201.219.76:2181.
 Will not attempt to
authenticate using SASL (unknown error)
16/09/05 21:10:00 ERROR Master: Leadership has been revoked -- master
shutting down.
16/09/05 21:10:00 INFO ClientCnxn: Socket connection established, initiating
session, client: /142.201.219.75:56361, server:
tiplxapp-spk02.prd.tse.com/142.201.219.76:2181


2. Spark 1.6.0 web console shows a running application in a "waiting"
status, but it's actually running. Is this an 

Re: distribute work (files)

2016-09-07 Thread Yong Zhang
What error do you get? FileNotFoundException?


Please paste the stacktrace here.


Yong



From: Peter Figliozzi 
Sent: Wednesday, September 7, 2016 10:18 AM
To: ayan guha
Cc: Lydia Ickler; user.spark
Subject: Re: distribute work (files)

That's failing for me.  Can someone please try this-- is this even supposed to 
work:

  *   create a directory somewhere and add two text files to it
  *   mount that directory on the Spark worker machines with sshfs
  *   read the textfiles into one datas structure using a file URL with a 
wildcard

Thanks,

Pete

On Tue, Sep 6, 2016 at 11:20 PM, ayan guha 
> wrote:
To access local file, try with file:// URI.

On Wed, Sep 7, 2016 at 8:52 AM, Peter Figliozzi 
> wrote:
This is a great question.  Basically you don't have to worry about the 
details-- just give a wildcard in your call to textFile.  See the Programming 
Guide section 
entitled "External Datasets".  The Spark framework will distribute your data 
across the workers.  Note that:

If using a path on the local filesystem, the file must also be accessible at 
the same path on worker nodes. Either copy the file to all workers or use a 
network-mounted shared file system.

In your case this would mean the directory of files.

Curiously, I cannot get this to work when I mount a directory with sshfs on all 
of my worker nodes.  It says "file not found" even though the file clearly 
exists in the specified path on all workers.   Anyone care to try and comment 
on this?

Thanks,

Pete

On Tue, Sep 6, 2016 at 9:51 AM, Lydia Ickler 
> wrote:
Hi,

maybe this is a stupid question:

I have a list of files. Each file I want to take as an input for a 
ML-algorithm. All files are independent from another.
My question now is how do I distribute the work so that each worker takes a 
block of files and just runs the algorithm on them one by one.
I hope somebody can point me in the right direction! :)

Best regards,
Lydia
-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org





--
Best Regards,
Ayan Guha



Managing Dataset API Partitions - Spark 2.0

2016-09-07 Thread ANDREA SPINA
Hi everyone,
I'd test some algorithms with the Dataset API offered by Spark 2.0.0.

So I was wondering, *which is the best way for managing Dataset partitions?*

E.g. in the data reading phase, what I use to do is the following
*// RDD*
*// if I want to set a custom minimum number of partitions*
*val data = sc.textFile(inputPath, numPartitions)*

*// If I want to coalesce with a new shape my RDD at some point*
*sc.repartition(newNumPartitions)*

*// Dataset API*
*// Now with the Dataset API I'm calling directly the repartition method on
the dataset*
*spark.read.text(inputPath).repartition(newNumberOfPartition)*

So I'll be glad to know if there're *any new valuable about custom
partitioning dataset, either in the reading phase or at some point?*

Thank you so much.
Andrea
-- 
*Andrea Spina*
N.Tessera: *74598*
MAT: *89369*
*Ingegneria Informatica* *[LM] *(D.M. 270)


Re: spark 1.6.0 web console shows running application in a "waiting" status, but it's acutally running

2016-09-07 Thread Mich Talebzadeh
This is my take.

When you issue spark-submit on any node it start GUI on port 4040 by
default. Otherwise you can specify port yourself with --conf
"spark.ui.port="

As I understand in standalone mode executors run on workers.

$SPARK_HOME/sbin/start-slave.sh spark://::7077

That port 7077 is the master port. If master dies, then those workers lose
connection to port 7077 so I believe they go stale. So the spark-submit
carries on using the remaining executors on other workers.

So in summary one expects the job to run.  You start your UI on :port.

One test you can do is to exit from UI and start UI on the host that
zookeeper selects the master on the same port. That should work.

HTH






Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 7 September 2016 at 15:27, arlindo santos  wrote:

> Yes refreshed a few times. Running in cluster mode.
>
> Fyi.. I can duplicate this easily now. Our setup consists of 3 nodes
> running standalone spark, master and worker on each, zookeeper doing master
> leader election. If I kill a master on any node, the master shifts to
> another node and that is when the app state changes to waiting and never
> changes back to running on the gui, but really it's in a running mode.
>
> Sent from my BlackBerry 10 smartphone on the Rogers network.
> *From: *Mich Talebzadeh
> *Sent: *Wednesday, September 7, 2016 9:50 AM
> *To: *sarlindo
> *Cc: *user @spark
> *Subject: *Re: spark 1.6.0 web console shows running application in a
> "waiting" status, but it's acutally running
>
> Have you refreshed the Spark UI page?
>
> What Mode are you running your Spark app?
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 6 September 2016 at 16:15, sarlindo  wrote:
>
>> I have 2 questions/issues.
>>
>> 1. We had the spark-master shut down (reason unknown) we looked at the
>> spark-master logs and it simply shows this, is there some other log I
>> should
>> be looking at to find out why the master went down?
>>
>> 16/09/05 21:10:00 INFO ClientCnxn: Opening socket connection to server
>> tiplxapp-spk02.prd.tse.com/142.201.219.76:2181. Will not attempt to
>> authenticate using SASL (unknown error)
>> 16/09/05 21:10:00 ERROR Master: Leadership has been revoked -- master
>> shutting down.
>> 16/09/05 21:10:00 INFO ClientCnxn: Socket connection established,
>> initiating
>> session, client: /142.201.219.75:56361, server:
>> tiplxapp-spk02.prd.tse.com/142.201.219.76:2181
>>
>>
>> 2. Spark 1.6.0 web console shows a running application in a "waiting"
>> status, but it's actually running. Is this an existing bug?
>>
>> 
>>
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/spark-1-6-0-web-console-shows-running-
>> application-in-a-waiting-status-but-it-s-acutally-running-tp27665.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: spark 1.6.0 web console shows running application in a "waiting" status, but it's acutally running

2016-09-07 Thread arlindo santos
Yes refreshed a few times. Running in cluster mode.

Fyi.. I can duplicate this easily now. Our setup consists of 3 nodes running 
standalone spark, master and worker on each, zookeeper doing master leader 
election. If I kill a master on any node, the master shifts to another node and 
that is when the app state changes to waiting and never changes back to running 
on the gui, but really it's in a running mode.

Sent from my BlackBerry 10 smartphone on the Rogers network.
From: Mich Talebzadeh
Sent: Wednesday, September 7, 2016 9:50 AM
To: sarlindo
Cc: user @spark
Subject: Re: spark 1.6.0 web console shows running application in a "waiting" 
status, but it's acutally running


Have you refreshed the Spark UI page?

What Mode are you running your Spark app?

HTH


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 6 September 2016 at 16:15, sarlindo 
> wrote:
I have 2 questions/issues.

1. We had the spark-master shut down (reason unknown) we looked at the
spark-master logs and it simply shows this, is there some other log I should
be looking at to find out why the master went down?

16/09/05 21:10:00 INFO ClientCnxn: Opening socket connection to server
tiplxapp-spk02.prd.tse.com/142.201.219.76:2181.
 Will not attempt to
authenticate using SASL (unknown error)
16/09/05 21:10:00 ERROR Master: Leadership has been revoked -- master
shutting down.
16/09/05 21:10:00 INFO ClientCnxn: Socket connection established, initiating
session, client: /142.201.219.75:56361, server:
tiplxapp-spk02.prd.tse.com/142.201.219.76:2181


2. Spark 1.6.0 web console shows a running application in a "waiting"
status, but it's actually running. Is this an existing bug?







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-6-0-web-console-shows-running-application-in-a-waiting-status-but-it-s-acutally-running-tp27665.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org




Re: distribute work (files)

2016-09-07 Thread Peter Figliozzi
That's failing for me.  Can someone please try this-- is this even supposed
to work:

   - create a directory somewhere and add two text files to it
   - mount that directory on the Spark worker machines with sshfs
   - read the textfiles into one datas structure using a file URL with a
   wildcard

Thanks,

Pete

On Tue, Sep 6, 2016 at 11:20 PM, ayan guha  wrote:

> To access local file, try with file:// URI.
>
> On Wed, Sep 7, 2016 at 8:52 AM, Peter Figliozzi 
> wrote:
>
>> This is a great question.  Basically you don't have to worry about the
>> details-- just give a wildcard in your call to textFile.  See the Programming
>> Guide  section
>> entitled "External Datasets".  The Spark framework will distribute your
>> data across the workers.  Note that:
>>
>> *If using a path on the local filesystem, the file must also be
>>> accessible at the same path on worker nodes. Either copy the file to all
>>> workers or use a network-mounted shared file system.*
>>
>>
>> In your case this would mean the directory of files.
>>
>> Curiously, I cannot get this to work when I mount a directory with sshfs
>> on all of my worker nodes.  It says "file not found" even though the file
>> clearly exists in the specified path on all workers.   Anyone care to try
>> and comment on this?
>>
>> Thanks,
>>
>> Pete
>>
>> On Tue, Sep 6, 2016 at 9:51 AM, Lydia Ickler 
>> wrote:
>>
>>> Hi,
>>>
>>> maybe this is a stupid question:
>>>
>>> I have a list of files. Each file I want to take as an input for a
>>> ML-algorithm. All files are independent from another.
>>> My question now is how do I distribute the work so that each worker
>>> takes a block of files and just runs the algorithm on them one by one.
>>> I hope somebody can point me in the right direction! :)
>>>
>>> Best regards,
>>> Lydia
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Dataframe, Java: How to convert String to Vector ?

2016-09-07 Thread Peter Figliozzi
Here's a decent GitHub book: Mastering Apache Spark

.

I'm new at Scala too.  I found it very helpful to study the Scala language
without Spark.  The documentation found here
 is excellent.

Pete

On Wed, Sep 7, 2016 at 1:39 AM, 颜发才(Yan Facai)  wrote:

> Hi Peter,
> I'm familiar with Pandas / Numpy in python,  while spark / scala is
> totally new for me.
> Pandas provides a detailed document, like how to slice data, parse file,
> use apply and filter function.
>
> Do spark have some more detailed document?
>
>
>
> On Tue, Sep 6, 2016 at 9:58 PM, Peter Figliozzi 
> wrote:
>
>> Hi Yan, I think you'll have to map the features column to a new numerical
>> features column.
>>
>> Here's one way to do the individual transform:
>>
>> scala> val x = "[1, 2, 3, 4, 5]"
>> x: String = [1, 2, 3, 4, 5]
>>
>> scala> val y:Array[Int] = x slice(1, x.length - 1) replace(",", "")
>> split(" ") map(_.toInt)
>> y: Array[Int] = Array(1, 2, 3, 4, 5)
>>
>> If you don't know about the Scala command line, just type "scala" in a
>> terminal window.  It's a good place to try things out.
>>
>> You can make a function out of this transformation and apply it to your
>> features column to make a new column.  Then add this with
>> Dataset.withColumn.
>>
>> See here
>> 
>> on how to apply a function to a Column to make a new column.
>>
>> On Tue, Sep 6, 2016 at 1:56 AM, 颜发才(Yan Facai)  wrote:
>>
>>> Hi,
>>> I have a csv file like:
>>> uid  mid  features   label
>>> 1235231[0, 1, 3, ...]True
>>>
>>> Both  "features" and "label" columns are used for GBTClassifier.
>>>
>>> However, when I read the file:
>>> Dataset samples = sparkSession.read().csv(file);
>>> The type of samples.select("features") is String.
>>>
>>> My question is:
>>> How to map samples.select("features") to Vector or any appropriate type,
>>> so I can use it to train like:
>>> GBTClassifier gbdt = new GBTClassifier()
>>> .setLabelCol("label")
>>> .setFeaturesCol("features")
>>> .setMaxIter(2)
>>> .setMaxDepth(7);
>>>
>>> Thanks.
>>>
>>
>>
>


Failed to open native connection to Cassandra at

2016-09-07 Thread muhammet pakyürek
how to solve this problem below

py4j.protocol.Py4JJavaError: An error occurred while calling o33.load.
: java.io.IOException: Failed to open native connection to Cassandra at 
{127.0.1.1}:9042




Re: spark 1.6.0 web console shows running application in a "waiting" status, but it's acutally running

2016-09-07 Thread Mich Talebzadeh
Have you refreshed the Spark UI page?

What Mode are you running your Spark app?

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 6 September 2016 at 16:15, sarlindo  wrote:

> I have 2 questions/issues.
>
> 1. We had the spark-master shut down (reason unknown) we looked at the
> spark-master logs and it simply shows this, is there some other log I
> should
> be looking at to find out why the master went down?
>
> 16/09/05 21:10:00 INFO ClientCnxn: Opening socket connection to server
> tiplxapp-spk02.prd.tse.com/142.201.219.76:2181. Will not attempt to
> authenticate using SASL (unknown error)
> 16/09/05 21:10:00 ERROR Master: Leadership has been revoked -- master
> shutting down.
> 16/09/05 21:10:00 INFO ClientCnxn: Socket connection established,
> initiating
> session, client: /142.201.219.75:56361, server:
> tiplxapp-spk02.prd.tse.com/142.201.219.76:2181
>
>
> 2. Spark 1.6.0 web console shows a running application in a "waiting"
> status, but it's actually running. Is this an existing bug?
>
> 
>
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/spark-1-6-0-web-console-shows-running-
> application-in-a-waiting-status-but-it-s-acutally-running-tp27665.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: I noticed LinearRegression sometimes produces negative R^2 values

2016-09-07 Thread Evan Zamir
Yes, it's on a hold out segment from the data set being fitted.
On Wed, Sep 7, 2016 at 1:02 AM Sean Owen  wrote:

> Yes, should be.
> It's also not necessarily nonnegative if you evaluate R^2 on a
> different data set than you fit it to. Is that the case?
>
> On Tue, Sep 6, 2016 at 11:15 PM, Evan Zamir  wrote:
> > I am using the default setting for setting fitIntercept, which *should*
> be
> > TRUE right?
> >
> > On Tue, Sep 6, 2016 at 1:38 PM Sean Owen  wrote:
> >>
> >> Are you not fitting an intercept / regressing through the origin? with
> >> that constraint it's no longer true that R^2 is necessarily
> >> nonnegative. It basically means that the errors are even bigger than
> >> what you'd get by predicting the data's mean value as a constant
> >> model.
> >>
> >> On Tue, Sep 6, 2016 at 8:49 PM, evanzamir  wrote:
> >> > Am I misinterpreting what r2() in the LinearRegression Model summary
> >> > means?
> >> > By definition, R^2 should never be a negative number!
> >> >
> >> >
> >> >
> >> > --
> >> > View this message in context:
> >> >
> http://apache-spark-user-list.1001560.n3.nabble.com/I-noticed-LinearRegression-sometimes-produces-negative-R-2-values-tp27667.html
> >> > Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >> >
> >> > -
> >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >> >
>


How to find the partitioner for a Dataset

2016-09-07 Thread Darin McBeath
I have a Dataset (om) which I created and repartitioned (and cached) using
one of the fields (docId).  Reading the Spark documentation, I would assume
the om Dataset should be hash partitioned.  But, how can I verify this?

When I do om.rdd.partitioner I get 

Option[org.apache.spark.Partitioner] = None

I thought I would have seen HashPartitioner.  But, perhaps this is not
equivalent.

The reason I ask is that when I use this cached Dataset in a join with
another Dataset (partitioned on the same column and cached) I see things
like the following in my explain which makes me think the Dataset might have
lost the partitioner.  I also see a couple of stages for the job where it
seems like each Dataset in my join is being read in and shuffled out again
(I'm assuming for the hash partitioning required by the join)

Exchange hashpartitioning(_1#6062.docId, 8)

Any thoughts/ideas would be appreciated.

Thanks.

Darin.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-find-the-partitioner-for-a-Dataset-tp27672.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



No SparkR on Mesos?

2016-09-07 Thread Peter Griessl
Hello,

does SparkR really not work (yet?) on Mesos (Spark 2.0 on Mesos 1.0)?

$ /opt/spark/bin/sparkR

R version 3.3.1 (2016-06-21) -- "Bug in Your Hair"
Copyright (C) 2016 The R Foundation for Statistical Computing
Platform: x86_64-pc-linux-gnu (64-bit)
Launching java with spark-submit command /opt/spark/bin/spark-submit   
"sparkr-shell" /tmp/RtmpPYVJxF/backend_port338581f434
Error: SparkR is not supported for Mesos cluster.
Error in sparkR.sparkContext(master, appName, sparkHome, sparkConfigMap,  :
  JVM is not ready after 10 seconds


I couldn't find any information on this subject in the docs - am I missing 
something?

Thanks for any hints,
Peter


dstream.foreachRDD iteration

2016-09-07 Thread Ashok Kumar
Hi,
A bit confusing to me
How many layers involved in DStream.foreachRDD.
Do I need to loop over it more than once? I mean  DStream.foreachRDD{ rdd = > }
I am trying to get individual lines in RDD.
Thanks

Re: call() function being called 3 times

2016-09-07 Thread Kevin Tran
It turns out that call() function runs in different stages

...
2016-09-07 20:37:21,086 [Executor task launch worker-0] INFO
 org.apache.spark.executor.Executor - Running task 0.0 in stage 11.0 (TID
11)
2016-09-07 20:37:21,087 [Executor task launch worker-0] DEBUG
org.apache.spark.executor.Executor - Task 11's epoch is 0
...
2016-09-07 20:37:21,096 [Executor task launch worker-0] INFO
 org.apache.spark.executor.Executor - Finished task 0.0 in stage 11.0 (TID
11). 2412 bytes result sent to driver
...
<=== call() called here !!

2016-09-07 20:37:22,341 [Executor task launch worker-0] INFO
 org.apache.spark.executor.Executor - Running task 0.0 in stage 12.0 (TID
12)
2016-09-07 20:37:22,343 [Executor task launch worker-0] DEBUG
org.apache.spark.executor.Executor - Task 12's epoch is 0

<=== call() called here !!

2016-09-07 20:37:22,362 [Executor task launch worker-0] INFO
 org.apache.spark.executor.Executor - Finished task 0.0 in stage 12.0 (TID
12). 2518 bytes result sent to driver


Does anyone have any ideas?




On Wed, Sep 7, 2016 at 7:30 PM, Kevin Tran  wrote:

> Hi Everyone,
> Does anyone know why call() function being called *3 times* for each
> message arrive
>
> JavaDStream message = messagesDStream.map(new
>>> Function, String>() {
>>
>> @Override
>>
>> public String call(Tuple2 tuple2) {
>>
>> return tuple2._2();
>>
>> }
>>
>> });
>>
>>
>>>
>>
>> message.foreachRDD(rdd -> {
>>
>> logger.debug("---> New RDD with " + rdd.partitions().size() + "
>>> partitions and " + rdd.count() + " records");   *<== 1*
>>
>> SQLContext sqlContext = new SQLContext(rdd.context());
>>
>>
>>> JavaRDD rowRDD = rdd.map(new Function() {
>>
>> public JavaBean call(String record) {
>>>   *<== being called 3 times*
>>
>>
>
> What I tried:
>  * *cache()*
>  * cleaning up *checkpoint dir*
>
> Thanks,
> Kevin.
>
>
>


Re[10]: Spark 2.0: SQL runs 5x times slower when adding 29th field to aggregation.

2016-09-07 Thread Сергей Романов

Thank you, Yong, it looks great.

I had added following lines to spark-defaults.conf and now my original SQL 
query runs much faster.
spark.executor.extraJavaOptions -XX:-DontCompileHugeMethods
spark.driver.extraJavaOptions -XX:-DontCompileHugeMethods
Can you recommend these configuration settings for production mode? Will it 
have any side-effects? Will it supersede  SPARK-17115?
SQL:
SELECT `publisher_id` AS `publisher_id`, SUM(`conversions`) AS `conversions`, 
SUM(`dmp_rapleaf_margin`) AS `dmp_rapleaf_margin`, SUM(`pvc`) AS `pvc`, 
SUM(`dmp_nielsen_payout`) AS `dmp_nielsen_payout`, SUM(`fraud_clicks`) AS 
`fraud_clicks`, SUM(`impressions`) AS `impressions`, SUM(`conv_prob`) AS 
`conv_prob`, SUM(`dmp_liveramp_payout`) AS `dmp_liveramp_payout`, 
SUM(`decisions`) AS `decisions`, SUM(`fraud_impressions`) AS 
`fraud_impressions`, SUM(`advertiser_spent`) AS `advertiser_spent`, 
SUM(`actual_ssp_fee`) AS `actual_ssp_fee`, SUM(`dmp_nielsen_margin`) AS 
`dmp_nielsen_margin`, SUM(`first_impressions`) AS `first_impressions`, 
SUM(`clicks`) AS `clicks`, SUM(`second_price`) AS `second_price`, 
SUM(`click_prob`) AS `click_prob`, SUM(`clicks_static`) AS `clicks_static`, 
SUM(`expected_payout`) AS `expected_payout`, SUM(`bid_price`) AS `bid_price`, 
SUM(`noads`) AS `noads`, SUM(`e`) AS `e`, SUM(`e`) as `e2`, 
SUM(`publisher_revenue`) AS `publisher_revenue`, SUM(`dmp_liveramp_margin`) AS 
`dmp_liveramp_margin`, SUM(`actual_pgm_fee`) AS `actual_pgm_fee`, 
SUM(`dmp_rapleaf_payout`) AS `dmp_rapleaf_payout`, SUM(`dd_convs`) AS 
`dd_convs`, SUM(`actual_dsp_fee`) AS `actual_dsp_fee` FROM 
`slicer`.`573_slicer_rnd_13` WHERE dt = '2016-07-28' GROUP BY `publisher_id` 
LIMIT 30;
Original:
30 rows selected (10.047 seconds)
30 rows selected (10.612 seconds)
30 rows selected (9.935 seconds)
With -XX:-DontCompileHugeMethods:
30 rows selected (1.086 seconds)
30 rows selected (1.051 seconds)
30 rows selected (1.073 seconds)

>Среда,  7 сентября 2016, 0:35 +03:00 от Yong Zhang :
>
>This is an interesting point.
>
>I tested with originally data with Spark 2.0 release, I can get the same 
>statistic output in the originally email like following:
>
>50 1.77695393562
>51 0.695149898529
>52 0.638142108917
>53 0.647341966629
>54 0.663456916809
>55 0.629166126251
>56 0.644149065018
>57 0.661190986633
>58 2.6616499424
>59 2.6137509346
>60 2.71165704727
>61 2.63473916054
>
>Then I tested with your suggestion:
>
>spark/bin/pyspark --driver-java-options '-XX:-DontCompileHugeMethods'
>
>Run the same test code, and here is the output:
>
>50 1.77180695534
>51 0.679394006729
>52 0.629493951797
>53 0.62108206749
>54 0.637018918991
>55 0.640591144562
>56 0.649922132492
>57 0.652480125427
>58 0.636356830597
>59 0.667215824127
>60 0.643863916397
>61 0.669810056686
>62 0.664624929428
>63 0.682888031006
>64 0.691393136978
>65 0.690823078156
>66 0.70525097847
>67 0.724694013596
>68 0.737638950348
>69 0.749594926834
>
>
>Yong
>
>--
>From: Davies Liu < dav...@databricks.com >
>Sent: Tuesday, September 6, 2016 2:27 PM
>To: Сергей Романов
>Cc: Gavin Yue; Mich Talebzadeh; user
>Subject: Re: Re[8]: Spark 2.0: SQL runs 5x times slower when adding 29th field 
>to aggregation.
> 
>I think the slowness is caused by generated aggregate method has more
>than 8K bytecodes, than it's not JIT compiled, became much slower.
>
>Could you try to disable the DontCompileHugeMethods by:
>
>-XX:-DontCompileHugeMethods
>
>On Mon, Sep 5, 2016 at 4:21 AM, Сергей Романов
>< romano...@inbox.ru.invalid > wrote:
>> Hi, Gavin,
>>
>> Shuffling is exactly the same in both requests and is minimal. Both requests
>> produces one shuffle task. Running time is the only difference I can see in
>> metrics:
>>
>> timeit.timeit(spark.read.csv('file:///data/dump/test_csv',
>> schema=schema).groupBy().sum(*(['dd_convs'] * 57) ).collect, number=1)
>> 0.713730096817
>>  {
>> "id" : 368,
>> "name" : "duration total (min, med, max)",
>> "value" : "524"
>>   }, {
>> "id" : 375,
>> "name" : "internal.metrics.executorRunTime",
>> "value" : "527"
>>   }, {
>> "id" : 391,
>> "name" : "internal.metrics.shuffle.write.writeTime",
>> "value" : "244495"
>>   }
>>
>> timeit.timeit(spark.read.csv('file:///data/dump/test_csv',
>> schema=schema).groupBy().sum(*(['dd_convs'] * 58) ).collect, number=1)
>> 2.97951102257
>>
>>   }, {
>> "id" : 469,
>> "name" : "duration total (min, med, max)",
>> "value" : "2654"
>>   }, {
>> "id" : 476,
>> "name" : "internal.metrics.executorRunTime",
>> "value" : "2661"
>>   }, {
>> "id" : 492,
>> "name" : "internal.metrics.shuffle.write.writeTime",
>> "value" : "371883"
>>   }, {
>>
>> Full metrics in attachment.
>>
>> Суббота, 3 сентября 2016, 19:53 +03:00 от Gavin Yue
>> < yue.yuany...@gmail.com >:
>>
>>
>> Any shuffling?
>>
>>
>> On Sep 3, 2016, at 5:50 AM, Сергей Романов < romano...@inbox.ru.INVALID >
>> wrote:
>>
>> Same 

Mesos coarse-grained problem with spark.shuffle.service.enabled

2016-09-07 Thread Tamas Szuromi
Hello,

For a while, we're using Spark on Mesos with fine-grained mode in
production.
Since Spark 2.0 the fine-grained mode is deprecated so we'd shift to
dynamic allocation.

When I tried to setup the dynamic allocation I run into the following
problem:
So I set spark.shuffle.service.enabled = true
and spark.dynamicAllocation.enabled = true as the documentation said. We're
using Spark on Mesos with spark.executor.uri where we download the
pipeline's corresponding Spark version from HDFS. The documentation also
says In Mesos coarse-grained mode, run
$SPARK_HOME/sbin/start-mesos-shuffle-service.sh on all slave nodes. But how
is it possible to launch it before start the application, if the given
Spark will be downloaded to the Mesos executor after executor launch but
it's looking for the started external shuffle service in advance?

Is it possible I can't use spark.executor.uri and
spark.dynamicAllocation.enabled together?

Thanks in advance!

Tamas


Re: I noticed LinearRegression sometimes produces negative R^2 values

2016-09-07 Thread Sean Owen
Yes, should be.
It's also not necessarily nonnegative if you evaluate R^2 on a
different data set than you fit it to. Is that the case?

On Tue, Sep 6, 2016 at 11:15 PM, Evan Zamir  wrote:
> I am using the default setting for setting fitIntercept, which *should* be
> TRUE right?
>
> On Tue, Sep 6, 2016 at 1:38 PM Sean Owen  wrote:
>>
>> Are you not fitting an intercept / regressing through the origin? with
>> that constraint it's no longer true that R^2 is necessarily
>> nonnegative. It basically means that the errors are even bigger than
>> what you'd get by predicting the data's mean value as a constant
>> model.
>>
>> On Tue, Sep 6, 2016 at 8:49 PM, evanzamir  wrote:
>> > Am I misinterpreting what r2() in the LinearRegression Model summary
>> > means?
>> > By definition, R^2 should never be a negative number!
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> > http://apache-spark-user-list.1001560.n3.nabble.com/I-noticed-LinearRegression-sometimes-produces-negative-R-2-values-tp27667.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: LabeledPoint creation

2016-09-07 Thread Madabhattula Rajesh Kumar
Hi,

Any help on above mail use case ?

Regards,
Rajesh

On Tue, Sep 6, 2016 at 5:40 PM, Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi,
>
> I am new to Spark ML, trying to create a LabeledPoint from categorical
> dataset(example code from spark). For this, I am using One-hot encoding
>  feature. Below is my code
>
> val df = sparkSession.createDataFrame(Seq(
>   (0, "a"),
>   (1, "b"),
>   (2, "c"),
>   (3, "a"),
>   (4, "a"),
>   (5, "c"),
>   (6, "d"))).toDF("id", "category")
>
> val indexer = new StringIndexer()
>   .setInputCol("category")
>   .setOutputCol("categoryIndex")
>   .fit(df)
>
> val indexed = indexer.transform(df)
>
> indexed.select("category", "categoryIndex").show()
>
> val encoder = new OneHotEncoder()
>   .setInputCol("categoryIndex")
>   .setOutputCol("categoryVec")
> val encoded = encoder.transform(indexed)
>
>  encoded.select("id", "category", "categoryVec").show()
>
> *Output :- *
> +---++-+
> | id|category|  categoryVec|
> +---++-+
> |  0|   a|(3,[0],[1.0])|
> |  1|   b|(3,[],[])|
> |  2|   c|(3,[1],[1.0])|
> |  3|   a|(3,[0],[1.0])|
> |  4|   a|(3,[0],[1.0])|
> |  5|   c|(3,[1],[1.0])|
> |  6|   d|(3,[2],[1.0])|
> +---++-+
>
> *Creating LablePoint from encoded dataframe:-*
>
> val data = encoded.rdd.map { x =>
>   {
> val featureVector = Vectors.dense(x.getAs[org.
> apache.spark.ml.linalg.SparseVector]("categoryVec").toArray)
> val label = x.getAs[java.lang.Integer]("id").toDouble
> LabeledPoint(label, featureVector)
>   }
> }
>
> data.foreach { x => println(x) }
>
> *Output :-*
>
> (0.0,[1.0,0.0,0.0])
> (1.0,[0.0,0.0,0.0])
> (2.0,[0.0,1.0,0.0])
> (3.0,[1.0,0.0,0.0])
> (4.0,[1.0,0.0,0.0])
> (5.0,[0.0,1.0,0.0])
> (6.0,[0.0,0.0,1.0])
>
> I have a four categorical values like a, b, c, d. I am expecting 4
> features in the above LablePoint but it has only 3 features.
>
> Please help me to creation of LablePoint from categorical features.
>
> Regards,
> Rajesh
>
>
>


Re: Getting figures from spark streaming

2016-09-07 Thread Ashok Kumar
Any help on this warmly appreciated. 

On Tuesday, 6 September 2016, 21:31, Ashok Kumar 
 wrote:
 

 Hello Gurus,
I am creating some figures and feed them into Kafka and then spark streaming.
It works OK but I have the following issue.
For now as a test I sent 5 prices in each batch interval. In the loop code this 
is what is hapening
      dstream.foreachRDD { rdd =>     val x= rdd.count
     i += 1     println(s"> rdd loop i is ${i}, number of lines is  ${x} 
<==")     if (x > 0) {       println(s"processing ${x} 
records=")       var words1 = 
rdd.map(_._2).map(_.split(',').view(0)).map(_.toInt).collect.apply(0)        
println (words1)       var words2 = 
rdd.map(_._2).map(_.split(',').view(1)).map(_.toString).collect.apply(0)        
println (words2)       var price = 
rdd.map(_._2).map(_.split(',').view(2)).map(_.toFloat).collect.apply(0)        
println (price)        rdd.collect.foreach(println)       }     }

My tuple looks like this
// (null, "ID       TIMESTAMP                           PRICE")// (null, 
"40,20160426-080924,                  67.55738301621814598514")
And this the sample output from the run
processing 5 
records=320160906-21250980.224686(null,3,20160906-212509,80.22468448052631637099)(null,1,20160906-212509,60.40695324215582386153)(null,4,20160906-212509,61.95159400693415572125)(null,2,20160906-212509,93.05912099305473237788)(null,5,20160906-212509,81.08637370113427387121)
Now it does process the first values 3, 20160906-212509, 80.224686  for record 
(null,3,20160906-212509,80.22468448052631637099) but ignores the rest. of 4 
records. How can I make it go through all records here? I want the third column 
from all records!
Greetings




   

call() function being called 3 times

2016-09-07 Thread Kevin Tran
Hi Everyone,
Does anyone know why call() function being called *3 times* for each
message arrive

JavaDStream message = messagesDStream.map(new
>> Function, String>() {
>
> @Override
>
> public String call(Tuple2 tuple2) {
>
> return tuple2._2();
>
> }
>
> });
>
>
>>
>
> message.foreachRDD(rdd -> {
>
> logger.debug("---> New RDD with " + rdd.partitions().size() + " partitions
>> and " + rdd.count() + " records");   *<== 1*
>
> SQLContext sqlContext = new SQLContext(rdd.context());
>
>
>> JavaRDD rowRDD = rdd.map(new Function() {
>
> public JavaBean call(String record) {
>>   *<== being called 3 times*
>
>

What I tried:
 * *cache()*
 * cleaning up *checkpoint dir*

Thanks,
Kevin.


SparkStreaming is not working with SparkLauncher

2016-09-07 Thread aditya barve
Hello Team,

I am new to spark. I tried to create a sample application and submitted to
spark.

D:\spark-1.6.2-bin-hadoop2.6\bin\spark-submit --class "main.java.SimpleApp"
--master local[4] target/simple-project-1.0-jar-with-dependencies.jar
server1 1234

It worked fine. Here server1 is my netcat server and 1234 is port. I am
able to listen data from netcat server and process it further,

Now I want to check deployment part of spark. I can't submit jar from
command line every time. On little research I found SparkLauncher class. I
tried following code.


  public class AppLauncher {
public static void main(String[] args) throws Exception {
Process spark = new SparkLauncher()

.setAppResource("D:\\aditya_barve\\SimpleApp\\target\\simple-project-1.0-jar-with-dependencies.jar")
.setSparkHome("D:\\spark-1.6.2-bin-hadoop2.6")
.setMainClass("main.java.SimpleApp")
.setMaster("local")
.setConf(SparkLauncher.DRIVER_MEMORY, "2g")
.addAppArgs("server1","1234")
.launch();

  System.out.println("Waiting for finish...");
  int exitCode = spark.waitFor();
  System.out.println("Finished! Exit code:" + exitCode);
  // Use handle API to monitor / control application.
}


}

I am able to start my application successfully but my application is not
listening streaming data from netcat server. It works perfectly when I
manually submitting jar file from command line.

It looks like I am missing something.

 Any help is highly appreciable.

Thanks,
--
Aditya


How to write data into CouchBase using Spark & Scala?

2016-09-07 Thread Devi P.V
I am newbie in CouchBase.I am trying to write data into CouchBase.My sample
code is following,

val cfg = new SparkConf()
  .setAppName("couchbaseQuickstart")
  .setMaster("local[*]")
  .set("com.couchbase.bucket.MyBucket","pwd")

val sc = new SparkContext(cfg)
val doc1 = JsonDocument.create("doc1", JsonObject.create().put("some",
"content"))
val doc2 = JsonArrayDocument.create("doc2", JsonArray.from("more",
"content", "in", "here"))

val data = sc
  .parallelize(Seq(doc1, doc2))

But I can't access data.saveToCouchbase().

I am using Spark 1.6.1 & Scala 2.11.8

I gave following dependencies in built.sbt

libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "1.6.1"
libraryDependencies += "com.couchbase.client" % "spark-connector_2.11" % "1.2.1"


How can I write data into CouchBase using Spark & Scala?


Re: Dataframe, Java: How to convert String to Vector ?

2016-09-07 Thread Yan Facai
Hi Peter,
I'm familiar with Pandas / Numpy in python,  while spark / scala is totally
new for me.
Pandas provides a detailed document, like how to slice data, parse file,
use apply and filter function.

Do spark have some more detailed document?



On Tue, Sep 6, 2016 at 9:58 PM, Peter Figliozzi 
wrote:

> Hi Yan, I think you'll have to map the features column to a new numerical
> features column.
>
> Here's one way to do the individual transform:
>
> scala> val x = "[1, 2, 3, 4, 5]"
> x: String = [1, 2, 3, 4, 5]
>
> scala> val y:Array[Int] = x slice(1, x.length - 1) replace(",", "")
> split(" ") map(_.toInt)
> y: Array[Int] = Array(1, 2, 3, 4, 5)
>
> If you don't know about the Scala command line, just type "scala" in a
> terminal window.  It's a good place to try things out.
>
> You can make a function out of this transformation and apply it to your
> features column to make a new column.  Then add this with
> Dataset.withColumn.
>
> See here
> 
> on how to apply a function to a Column to make a new column.
>
> On Tue, Sep 6, 2016 at 1:56 AM, 颜发才(Yan Facai)  wrote:
>
>> Hi,
>> I have a csv file like:
>> uid  mid  features   label
>> 1235231[0, 1, 3, ...]True
>>
>> Both  "features" and "label" columns are used for GBTClassifier.
>>
>> However, when I read the file:
>> Dataset samples = sparkSession.read().csv(file);
>> The type of samples.select("features") is String.
>>
>> My question is:
>> How to map samples.select("features") to Vector or any appropriate type,
>> so I can use it to train like:
>> GBTClassifier gbdt = new GBTClassifier()
>> .setLabelCol("label")
>> .setFeaturesCol("features")
>> .setMaxIter(2)
>> .setMaxDepth(7);
>>
>> Thanks.
>>
>
>


Re: Spark Metrics: custom source/sink configurations not getting recognized

2016-09-07 Thread Benjamin Kim
We use Graphite/Grafana for custom metrics. We found Spark’s metrics not to be 
customizable. So, we write directly using Graphite’s API, which was very easy 
to do using Java’s socket library in Scala. It works great for us, and we are 
going one step further using Sensu to alert us if there is an anomaly in the 
metrics beyond the norm.

Hope this helps.

Cheers,
Ben


> On Sep 6, 2016, at 9:52 PM, map reduced  wrote:
> 
> Hi, anyone has any ideas please?
> 
> On Mon, Sep 5, 2016 at 8:30 PM, map reduced  > wrote:
> Hi,
> 
> I've written my custom metrics source/sink for my Spark streaming app and I 
> am trying to initialize it from metrics.properties - but that doesn't work 
> from executors. I don't have control on the machines in Spark cluster, so I 
> can't copy properties file in $SPARK_HOME/conf/ in the cluster. I have it in 
> the fat jar where my app lives, but by the time my fat jar is downloaded on 
> worker nodes in cluster, executors are already started and their Metrics 
> system is already initialized - thus not picking my file with custom source 
> configuration in it.
> 
> Following this post 
> ,
>  I've specified 'spark.files 
>  = 
> metrics.properties' and 'spark.metrics.conf=metrics.properties' but by the 
> time 'metrics.properties' is shipped to executors, their metric system is 
> already initialized.
> 
> If I initialize my own metrics system, it's picking up my file but then I'm 
> missing master/executor level metrics/properties (eg. 
> executor.sink.mySink.propName=myProp - can't read 'propName' from 'mySink') 
> since they are initialized 
> 
>  by Spark's metric system.
> 
> Is there a (programmatic) way to have 'metrics.properties' shipped before 
> executors initialize 
> 
>  ?
> 
> Here's my SO question 
> .
> 
> Thanks,
> 
> KP
> 
>