Help connecting to the cluster

2014-03-07 Thread Yana Kadiyska
Hi Spark users,

could someone help me out.

My company has a fully functioning spark cluster with shark running on
top of it (as part of the same cluster, on the same LAN) . I'm
interested in running raw spark code against it but am running against
the following issue -- it seems like the machine hosting the driver
program needs to be reachable by the worker nodes (in my case the
workers cannot route to the machine hosting the driver). Below is a
snippet from my worker log:

14/03/03 20:45:28 INFO executor.StandaloneExecutorBackend: Connecting
to driver: akka://spark@driver_ip:49081/user/StandaloneScheduler
14/03/03 20:45:29 ERROR executor.StandaloneExecutorBackend: Driver
terminated or disconnected! Shutting down.

Does this sound right -- it's not clear to me why a worker would try
to establish a connection to the driver -- the driver already
connected successfully as I see the program listed in the logwhy
is this connection not sufficient?

If you use Amazon EC2, can you run the driver from your personal
machine or do have to install an IDE on one of Amazon machines in
order to debug code? I am not too excited about the EC2 option as our
data is proprietary...but if that's the shortest path to success at
least it would get me started on some toy examples. At the moment I'm
not sure what my options are, other than running a VM cluster or EC2

Any help/insight would be greatly appreciated.


Re: Spark stand alone cluster mode

2014-03-11 Thread Yana Kadiyska
does sbt show full-classpath show spark-core on the classpath? I am
still pretty new to scala but it seems like you have val sparkCore
= org.apache.spark   %% spark-core% V.spark %
provided -- I believe the provided part means it's in your
classpath. Spark-shell script sets up a lot of stuff for you so...

On Tue, Mar 11, 2014 at 9:02 AM, Gino Mathews gin...@thinkpalm.com wrote:
 Hi,

 I am new to spark.
 I would like to run jobs  in Spark stand alone cluster mode.

 No cluser managers other than spark is used.
 (https://spark.apache.org/docs/0.9.0/spark-standalone.html)
 I have tried wordcount from spark shell and stand alone scala app.

 The code reads input from HDFS and writes the results to HDFS. uses 2 worker
 nodes.

 In spark-shell the wordcount is successful, how ever my effort to run stand
 alone programmes are in vain.



 My environement

 Ubuntu 12.04  - 32 bit

 JAVA 1.7.0_51

 I have installed spark @ $HOME/Downloads/spark-0.9.0-incubating
 installed hadoop 2.2.0 as separate hduser and given permission to other
 users.
 installed scala 2.10.3
 installed sbt 0.13.1

 Spark master act as HDFS master
 I have one master and 2 worker nodes and HDFS is accessible in all nodes.
 I downloaded example project and modified to use my spark cluster.
 I started the sparkcluster at spark://192.168.0.138:7077
 and hdfs://master:9000/
 When I run the project as SPARK_HADOOP_VERSION=2.2.0 sbt run, I get
 following error

 gino@master:~/Test/spark-example-project$ SPARK_HADOOP_VERSION=2.2.0 sbt run
 [info] Loading project definition from
 /home/gino/Test/spark-example-project/project
 [info] Set current project to spark-example-project (in build
 file:/home/gino/Test/spark-example-project/)
 [info] Running com.Thinkpalm.spark.WordCountHDFS
 [error] (run-main-0) java.lang.NoClassDefFoundError:
 org/apache/spark/SparkContext
 java.lang.NoClassDefFoundError: org/apache/spark/SparkContext
 at com.Thinkpalm.spark.WordCountHDFS$.main(WordCountHDFS.scala:12)
 at com.Thinkpalm.spark.WordCountHDFS.main(WordCountHDFS.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 Caused by: java.lang.ClassNotFoundException: org.apache.spark.SparkContext
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at com.Thinkpalm.spark.WordCountHDFS$.main(WordCountHDFS.scala:12)
 at com.Thinkpalm.spark.WordCountHDFS.main(WordCountHDFS.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 [trace] Stack trace suppressed: run last compile:run for the full output.
 java.lang.RuntimeException: Nonzero exit code: 1
 at scala.sys.package$.error(package.scala:27)
 [trace] Stack trace suppressed: run last compile:run for the full output.
 [error] (compile:run) Nonzero exit code: 1
 [error] Total time: 0 s, completed Mar 11, 2014 2:54:54 PM

 Could anyone give some pointers ... I have attached the project for
 reference.



 Thanks and regards

 Gino Mathews


Re: quick start guide: building a standalone scala program

2014-03-24 Thread Yana Kadiyska
I am able to run standalone apps. I think you are making one mistake
that throws you off from there onwards. You don't need to put your app
under SPARK_HOME. I would create it in its own folder somewhere, it
follows the rules of any standalone scala program (including the
layout). In the giude, $SPARK_HOME is only relevant to find the Readme
file which they are parsing/word-counting. But otherwise the compile
time dependencies on spark would be resolved via the sbt file (or the
pom file if you look at the Java example).

So for example I put my app under C:\Source\spark-code and the jar
gets created in C:\Source\spark-code\target\scala-2.9.3 (or 2.10 if
you're running with scala 2.10 as the example shows). But for that
part of the guide, it's not any different than building a scala app.

On Mon, Mar 24, 2014 at 3:44 PM, Diana Carroll dcarr...@cloudera.com wrote:
 Has anyone successfully followed the instructions on the Quick Start page of
 the Spark home page to run a standalone Scala application?  I can't, and I
 figure I must be missing something obvious!

 I'm trying to follow the instructions here as close to word for word as
 possible:
 http://spark.apache.org/docs/latest/quick-start.html#a-standalone-app-in-scala

 1.  The instructions don't say what directory to create my test application
 in, but later I'm instructed to run sbt/sbt so I conclude that my working
 directory must be $SPARK_HOME.  (Temporarily ignoring that it is a little
 weird to be working directly in the Spark distro.)

 2.  Create $SPARK_HOME/mysparktest/src/main/scala/SimpleApp.scala.
 Copypaste in the code from the instructions exactly, replacing
 YOUR_SPARK_HOME with my spark home path.

 3.  Create $SPARK_HOME/mysparktest/simple.sbt.  Copypaste in the sbt file
 from the instructions

 4.  From the $SPARK_HOME I run sbt/sbt package.  It runs through the
 ENTIRE Spark project!  This takes several minutes, and at the end, it says
 Done packaging.  unfortunately, there's nothing in the
 $SPARK_HOME/mysparktest/ folder other than what I already had there.

 (Just for fun, I also did what I thought was more logical, which is set my
 working directory to $SPARK_HOME/mysparktest, and but $SPARK_HOME/sbt/sbt
 package, but that was even less successful: I got an error:
 awk: cmd. line:1: fatal: cannot open file `./project/build.properties' for
 reading (No such file or directory)
 Attempting to fetch sbt
 /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or
 directory
 /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or
 directory
 Our attempt to download sbt locally to sbt/sbt-launch-.jar failed. Please
 install sbt manually from http://www.scala-sbt.org/


 So, help?  I'm sure these instructions work because people are following
 them every day, but I can't tell what they are supposed to do.

 Thanks!
 Diana



Re: quick start guide: building a standalone scala program

2014-03-24 Thread Yana Kadiyska
Diana, I think you are correct - I just installed
 wget 
http://mirror.symnds.com/software/Apache/incubator/spark/spark-0.9.0-incubating/spark-0.9.0-incubating-bin-cdh4.tgz
and indeed I see the same error that you see

It looks like in previous versions sbt-launch used to just come down
in the package, but now they try to get it for you -- and that code
seems to have some assumptions on where it is being invoked from

On Mon, Mar 24, 2014 at 5:47 PM, Diana Carroll dcarr...@cloudera.com wrote:
 Thanks for your help, everyone.  Several folks have explained that I can
 surely solve the problem by installing sbt.

 But I'm trying to get the instructions working as written on the Spark
 website.  The instructions not only don't have you install sbt
 separately...they actually specifically have you use the sbt that is
 distributed with Spark.

 If it is not possible to build your own Spark programs with
 Spark-distributed sbt, then that's a big hole in the Spark docs that I shall
 file.  And if the sbt that is included with Spark is MEANT to be able to
 compile your own Spark apps, then that's a product bug.

 But before I file the bug, I'm still hoping I'm missing something, and
 someone will point out that I'm missing a small step that will make the
 Spark distribution of sbt work!

 Diana



 On Mon, Mar 24, 2014 at 4:52 PM, Yana Kadiyska yana.kadiy...@gmail.com
 wrote:

 Diana, I just tried it on a clean Ubuntu machine, with Spark 0.8
 (since like other folks I had sbt preinstalled on my usual machine)

 I ran the command exactly as Ognen suggested and see
 Set current project to Simple Project (do you see this -- you should
 at least be seeing this)
 and then a bunch of Resolving ...

 messages. I did get an error there, saying it can't find
 javax.servlet.orbit. I googled the error and found this thread:


 http://mail-archives.apache.org/mod_mbox/spark-user/201309.mbox/%3ccajbo4nexyzqe6zgreqjtzzz5zrcoavfen+wmbyced6n1epf...@mail.gmail.com%3E

 adding the IvyXML fragment they suggested helped in my case (but
 again, the build pretty clearly complained).

 If you're still having no luck, I suggest installing sbt and setting
 SBT_HOME... http://www.scala-sbt.org/

 In either case though, it's not a Spark-specific issue...Hopefully
 some of all this helps.

 On Mon, Mar 24, 2014 at 4:30 PM, Diana Carroll dcarr...@cloudera.com
 wrote:
  Yeah, that's exactly what I did. Unfortunately it doesn't work:
 
  $SPARK_HOME/sbt/sbt package
  awk: cmd. line:1: fatal: cannot open file `./project/build.properties'
  for
  reading (No such file or directory)
  Attempting to fetch sbt
  /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or
  directory
  /usr/lib/spark/sbt/sbt: line 33: sbt/sbt-launch-.jar: No such file or
  directory
  Our attempt to download sbt locally to sbt/sbt-launch-.jar failed.
  Please
  install sbt manually from http://www.scala-sbt.org/
 
 
 
  On Mon, Mar 24, 2014 at 4:25 PM, Ognen Duzlevski
  og...@plainvanillagames.com wrote:
 
  You can use any sbt on your machine, including the one that comes with
  spark. For example, try:
 
  ~/path_to_spark/sbt/sbt compile
  ~/path_to_spark/sbt/sbt run arguments
 
  Or you can just add that to your PATH by:
 
  export $PATH=$PATH:~/path_to_spark/sbt
 
  To make it permanent, you can add it to your ~/.bashrc or
  ~/.bash_profile
  or ??? depending on the system you are using. If you are on Windows,
  sorry,
  I can't offer any help there ;)
 
  Ognen
 
 
  On 3/24/14, 3:16 PM, Diana Carroll wrote:
 
  Thanks Ongen.
 
  Unfortunately I'm not able to follow your instructions either.  In
  particular:
 
 
  sbt compile
  sbt run arguments if any
 
 
  This doesn't work for me because there's no program on my path called
  sbt.  The instructions in the Quick Start guide are specific that I
  should
  call $SPARK_HOME/sbt/sbt.  I don't have any other executable on my
  system
  called sbt.
 
  Did you download and install sbt separately?  In following the Quick
  Start
  guide, that was not stated as a requirement, and I'm trying to run
  through
  the guide word for word.
 
  Diana
 
 
  On Mon, Mar 24, 2014 at 4:12 PM, Ognen Duzlevski
  og...@plainvanillagames.com wrote:
 
  Diana,
 
  Anywhere on the filesystem you have read/write access (you need not be
  in
  your spark home directory):
 
  mkdir myproject
  cd myproject
  mkdir project
  mkdir target
  mkdir -p src/main/scala
  cp $mypath/$mymysource.scala src/main/scala/
  cp $mypath/myproject.sbt .
 
  Make sure that myproject.sbt has the following in it:
 
  name := I NEED A NAME!
 
  version := I NEED A VERSION!
 
  scalaVersion := 2.10.3
 
  libraryDependencies += org.apache.spark % spark-core_2.10 %
  0.9.0-incubating
 
  If you will be using Hadoop/HDFS functionality you will need the below
  line also
 
  libraryDependencies += org.apache.hadoop % hadoop-client % 2.2.0
 
  The above assumes you are using Spark 0.9 and Scala 2.10.3. If you are
  using 0.8.1 - adjust appropriately

Re: Writing RDDs to HDFS

2014-03-24 Thread Yana Kadiyska
Ognen, can you comment if you were actually able to run two jobs
concurrently with just restricting spark.cores.max? I run Shark on the
same cluster and was not able to see a standalone job get in (since
Shark is a long running job) until I restricted both spark.cores.max
_and_ spark.executor.memory. Just curious if I did something wrong.

On Mon, Mar 24, 2014 at 7:48 PM, Ognen Duzlevski
og...@plainvanillagames.com wrote:
 Just so I can close this thread (in case anyone else runs into this stuff) -
 I did sleep through the basics of Spark ;). The answer on why my job is in
 waiting state (hanging) is here:
 http://spark.incubator.apache.org/docs/latest/spark-standalone.html#resource-scheduling


 Ognen

 On 3/24/14, 5:01 PM, Diana Carroll wrote:

 Ongen:

 I don't know why your process is hanging, sorry.  But I do know that the way
 saveAsTextFile works is that you give it a path to a directory, not a file.
 The file is saved in multiple parts, corresponding to the partitions.
 (part-0, part-1 etc.)

 (Presumably it does this because it allows each partition to be saved on the
 local disk, to minimize network traffic.  It's how Hadoop works, too.)




 On Mon, Mar 24, 2014 at 5:00 PM, Ognen Duzlevski og...@nengoiksvelzud.com
 wrote:

 Is someRDD.saveAsTextFile(hdfs://ip:port/path/final_filename.txt)
 supposed to work? Meaning, can I save files to the HDFS fs this way?

 I tried:

 val r = sc.parallelize(List(1,2,3,4,5,6,7,8))
 r.saveAsTextFile(hdfs://ip:port/path/file.txt)

 and it is just hanging. At the same time on my HDFS it created file.txt
 but as a directory which has subdirectories (the final one is empty).

 Thanks!
 Ognen



 --
 A distributed system is one in which the failure of a computer you didn't
 even know existed can render your own computer unusable
 -- Leslie Lamport


Re: Distributed running in Spark Interactive shell

2014-03-26 Thread Yana Kadiyska
Nan (or anyone who feels they understand the cluster architecture well),
can you clarify something for me.

From reading this user group and your explanation above it appears that the
cluster master is only involved in this during application startup -- to
allocate executors(from what you wrote sounds like the driver itself passes
the job/tasks to  the executors). From there onwards all computation is
done on the executors, who communicate results directly to the driver if
certain actions (say collect) are performed. Is that right? The only
description of the cluster I've seen came from here:
https://spark.apache.org/docs/0.9.0/cluster-overview.html but that picture
suggests there is no direct communication between driver and executors,
which I believe is wrong (unless I am misreading the picture -- I believe
Master and Cluster Manager refer to the same thing?).

The very short form of my question is, does the master do anything other
than executor allocation?


On Wed, Mar 26, 2014 at 9:23 AM, Nan Zhu zhunanmcg...@gmail.com wrote:

  what you only need to do is ensure your spark cluster is running well,
 (you can check by access the Spark UI to see if all workers are displayed)

 then, you have to set correct SPARK_MASTER_IP in the machine where you run
 spark-shell

 The more details are :

 when you run bin/spark-shell, it will start the driver program in that
 machine, interacting with the Master to start the application (in this
 case, it is spark-shell)

 the Master tells Workers to start executors for your application, and the
 executors will try to register with your driver,

 then your driver can distribute tasks to the executors, i.e. run in a
 distributed fashion


 Best,

 --
 Nan Zhu

 On Wednesday, March 26, 2014 at 9:01 AM, Sai Prasanna wrote:

 Nan Zhu, its the later, I want to distribute the tasks to the cluster
 [machines available.]

 If i set the SPARK_MASTER_IP at the other machines and set the slaves-IP
 in the /conf/slaves at the master node, will the interactive shell code run
 at the master get distributed across multiple machines ???





 On Wed, Mar 26, 2014 at 6:32 PM, Nan Zhu zhunanmcg...@gmail.com wrote:

  what do you mean by run across the cluster?

 you want to start the spark-shell across the cluster or you want to
 distribute tasks to multiple machines?

 if the former case, yes, as long as you indicate the right master URL

 if the later case, also yes, you can observe the distributed task in the
 Spark UI

 --
 Nan Zhu

 On Wednesday, March 26, 2014 at 8:54 AM, Sai Prasanna wrote:

 Is it possible to run across cluster using Spark Interactive Shell ?

 To be more explicit, is the procedure similar to running standalone
 master-slave spark.

 I want to execute my code in  the interactive shell in the master-node,
 and it should run across the cluster [say 5 node]. Is the procedure similar
 ???





 --
 *Sai Prasanna. AN*
 *II M.Tech (CS), SSSIHL*


 *Entire water in the ocean can never sink a ship, Unless it gets inside.
 All the pressures of life can never hurt you, Unless you let them in.*





 --
 *Sai Prasanna. AN*
 *II M.Tech (CS), SSSIHL*


 *Entire water in the ocean can never sink a ship, Unless it gets inside.
 All the pressures of life can never hurt you, Unless you let them in.*





Re: Calling Spark enthusiasts in NYC

2014-03-31 Thread Yana Kadiyska
Nicholas, I'm in Boston and would be interested in a Spark group. Not
sure if you know this -- there was a meetup that never got off the
ground. Anyway, I'd be +1 for attending. Not sure what is involved in
organizing. Seems a shame that a city like Boston doesn't have one.

On Mon, Mar 31, 2014 at 2:02 PM, Nicholas Chammas
nicholas.cham...@gmail.com wrote:
 As in, I am interested in helping organize a Spark meetup in the Boston
 area.


 On Mon, Mar 31, 2014 at 2:00 PM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:

 Well, since this thread has played out as it has, lemme throw in a
 shout-out for Boston.


 On Mon, Mar 31, 2014 at 1:52 PM, Chris Gore cdg...@cdgore.com wrote:

 We'd love to see a Spark user group in Los Angeles and connect with
 others working with it here.

 Ping me if you're in the LA area and use Spark at your company (
 ch...@retentionscience.com ).

 Chris

 Retention Science
 call: 734.272.3099
 visit: Site | like: Facebook | follow: Twitter

 On Mar 31, 2014, at 10:42 AM, Anurag Dodeja anu...@anuragdodeja.com
 wrote:

 How about Chicago?


 On Mon, Mar 31, 2014 at 12:38 PM, Nan Zhu zhunanmcg...@gmail.com wrote:

 Montreal or Toronto?


 On Mon, Mar 31, 2014 at 1:36 PM, Martin Goodson mar...@skimlinks.com
 wrote:

 How about London?


 --
 Martin Goodson  |  VP Data Science
 (0)20 3397 1240
 image.png


 On Mon, Mar 31, 2014 at 6:28 PM, Andy Konwinski
 andykonwin...@gmail.com wrote:

 Hi folks,

 We have seen a lot of community growth outside of the Bay Area and we
 are looking to help spur even more!

 For starters, the organizers of the Spark meetups here in the Bay Area
 want to help anybody that is interested in setting up a meetup in a new
 city.

 Some amazing Spark champions have stepped forward in Seattle,
 Vancouver, Boulder/Denver, and a few other areas already.

 Right now, we are looking to connect with you Spark enthusiasts in NYC
 about helping to run an inaugural Spark Meetup in your area.

 You can reply to me directly if you are interested and I can tell you
 about all of the resources we have to offer (speakers from the core
 community, a budget for food, help scheduling, etc.), and let's make this
 happen!

 Andy









Re: Sample Project for using Shark API in Spark programs

2014-04-07 Thread Yana Kadiyska
I might be wrong here but I don't believe it's discouraged. Maybe part
of the reason there's not a lot of examples is that sql2rdd returns an
RDD (TableRDD that is
https://github.com/amplab/shark/blob/master/src/main/scala/shark/SharkContext.scala).
I haven't done anything too complicated yet but my impression is that
almost any Spark example of manipulating RDDs should applying from
that line onwards.

Are you asking for samples what to do with the RDD once you get it or
how to get a SharkContext from a standalone program?

Also, my reading of a recent email on this list is that SharkAPI will
be largely superceded by a more general SparkSQL API in 1.0
(http://people.apache.org/~pwendell/catalyst-docs/sql-programming-guide.html).
So if you're just starting out and you don't have short term needs
that might be a better place to start...

On Mon, Apr 7, 2014 at 9:14 AM, Jerry Lam chiling...@gmail.com wrote:
 Hi Shark,

 Should I assume that Shark users should not use the shark APIs since there
 are no documentations for it? If there are documentations, can you point it
 out?

 Best Regards,

 Jerry


 On Thu, Apr 3, 2014 at 9:24 PM, Jerry Lam chiling...@gmail.com wrote:

 Hello everyone,

 I have successfully installed Shark 0.9 and Spark 0.9 in standalone mode
 in a cluster of 6 nodes for testing purposes.

 I would like to use Shark API in Spark programs. So far I could only find
 the following:

 $./bin/shark-shell
 scala val youngUsers = sc.sql2rdd(SELECT * FROM users WHERE age  20)
 scala println(youngUsers.count)
 ...
 scala val featureMatrix = youngUsers.map(extractFeatures(_))
 scala kmeans(featureMatrix)

 Is there a more complete sample code to start a program using Shark API in
 Spark?

 Thanks!

 Jerry




Re: Urgently need help interpreting duration

2014-04-08 Thread Yana Kadiyska
Thank you -- this actually helped a lot. Strangely it appears that the task
detail view is not accurate in 0.8 -- that view shows 425ms duration for
one of the tasks, but in the driver log I do indeed see Finished TID 125 in
10940ms.

On that slow worker I see the following:

14/04/08 18:06:24 INFO broadcast.HttpBroadcast: Started reading broadcast
variable 81
14/04/08 18:06:34 INFO storage.MemoryStore: ensureFreeSpace(503292) called
with curMem=26011999, maxMem=2662874480
14/04/08 18:06:34 INFO storage.MemoryStore: Block broadcast_81 stored as
values to memory (estimated size 491.5 KB, free 2.5 GB)
14/04/08 18:06:34 INFO broadcast.HttpBroadcast: *Reading broadcast variable
81 took 10.051249937 s*
14/04/08 18:06:34 INFO broadcast.HttpBroadcast: Started reading broadcast
variable 82
14/04/08 18:06:34 INFO storage.MemoryStore: ensureFreeSpace(503292) called
with curMem=26515291, maxMem=2662874480
14/04/08 18:06:34 INFO storage.MemoryStore: Block broadcast_82 stored as
values to memory (estimated size 491.5 KB, free 2.5 GB)
14/04/08 18:06:34 INFO broadcast.HttpBroadcast: Reading broadcast variable
82 took 0.027498244 s
14/04/08 18:06:34 INFO broadcast.HttpBroadcast: Started reading broadcast
variable 83
14/04/08 18:06:34 INFO storage.MemoryStore: ensureFreeSpace(503292) called
with curMem=27018583, maxMem=2662874480

and here is the same variable read on a different worker

14/04/08 18:06:24 INFO broadcast.HttpBroadcast: Started reading
broadcast variable 81
14/04/08 18:06:24 INFO storage.MemoryStore: ensureFreeSpace(503292)
called with curMem=35008553, maxMem=2662874480
14/04/08 18:06:24 INFO storage.MemoryStore: Block broadcast_81 stored
as values to memory (estimated size 491.5 KB, free 2.4 GB)
14/04/08 18:06:24 INFO broadcast.HttpBroadcast: Reading broadcast
variable 81 took 0.029252199 s


Any thoughts why read of variable 81 is so slow on one machine? The job is
a sum by key across several partitions -- it doesn't seem that variable 81
is any larger than the rest (per the log lines above), so it's puzzling
that it's taking so very long...


On Tue, Apr 8, 2014 at 12:24 PM, Aaron Davidson ilike...@gmail.com wrote:

 Also, take a look at the driver logs -- if there is overhead before the
 first task is launched, the driver logs would likely reveal this.


 On Tue, Apr 8, 2014 at 9:21 AM, Aaron Davidson ilike...@gmail.com wrote:

 Off the top of my head, the most likely cause would be driver GC issues.
 You can diagnose this by enabling GC printing at the driver and you can fix
 this by increasing the amount of memory your driver program has (see
 http://spark.apache.org/docs/0.9.0/tuning.html#garbage-collection-tuning
 ).

 The launch time statistic would also be useful -- if all tasks are
 launched at around the same time and complete within 300ms, yet the total
 time is 10s, that strongly suggests that the overhead is coming before the
 first task is launched. Similarly, it would be useful to know if there was
 a large gap between launch times, or if it appeared that the launch times
 were serial with respect to the durations. If for some reason Spark started
 using only one executor, say, each task would take the same duration but
 would be executed one after another.


 On Tue, Apr 8, 2014 at 8:11 AM, Yana Kadiyska yana.kadiy...@gmail.comwrote:

 Hi Spark users, I'm very much hoping someone can help me out.

 I have a strict performance requirement on a particular query. One of
 the stages shows great variance in duration -- from 300ms to 10sec.

 The stage is mapPartitionsWithIndex at Operator.scala:210 (running Spark
 0.8)

 I have run the job quite a few times -- the details within the stage
 do not account for the overall duration shown for the stage. What
 could be taking up time that's not showing within the stage breakdown
 UI? Im thinking that reading the data in is reflected in the Duration
 column before, so caching should not be a reason(I'm not caching
 explicitly)?

 The details within the stage always show roughly the following (both
 for the 10second and 600ms query -- very little variation, nothing
 over 500ms, ShuffleWrite size is pretty comparable):

 StatusLocality LevelExecutor Launch Time DurationGC TimeShuffle Write
 1864  SUCCESSNODE_LOCAL ###  301 ms  8 ms  111.0 B
 1863  SUCCESSNODE_LOCAL ###  273 ms102.0 B
 1862  SUCCESSNODE_LOCAL ###  245 ms111.0 B
 1861  SUCCESSNODE_LOCAL ###  326 ms  4 ms   102.0 B
 1860  SUCCESSNODE_LOCAL ###  217 ms  6 ms   102.0 B
 1859  SUCCESSNODE_LOCAL ###  277 ms 111.0 B
 1858  SUCCESSNODE_LOCAL ###  262 ms 108.0 B
 1857  SUCCESSNODE_LOCAL ###  217 ms  14 ms  112.0 B
 1856  SUCCESSNODE_LOCAL ###  208 ms  109.0 B
 1855  SUCCESSNODE_LOCAL ###  242 ms  74.0 B
 1854  SUCCESSNODE_LOCAL ###  218 ms  3 ms 58.0 B
 1853  SUCCESSNODE_LOCAL ###  254 ms  12 ms   102.0 B
 1852  SUCCESSNODE_LOCAL ###  274 ms  8 ms 77.0 B






Re: spark-shell driver interacting with Workers in YARN mode - firewall blocking communication

2014-05-02 Thread Yana Kadiyska
I think what you want to do is set spark.driver.port to a fixed port.


On Fri, May 2, 2014 at 1:52 PM, Andrew Lee alee...@hotmail.com wrote:

 Hi All,

 I encountered this problem when the firewall is enabled between the
 spark-shell and the Workers.

 When I launch spark-shell in yarn-client mode, I notice that Workers on
 the YARN containers are trying to talk to the driver (spark-shell),
 however, the firewall is not opened and caused timeout.

 For the Workers, it tries to open listening ports on 54xxx for each
 Worker? Is the port random in such case?
 What will be the better way to predict the ports so I can configure the
 firewall correctly between the driver (spark-shell) and the Workers? Is
 there a range of ports we can specify in the firewall/iptables?

 Any ideas?



Shark resilience to unusable slaves

2014-05-22 Thread Yana Kadiyska
Hi, I am running into a pretty concerning issue with Shark (granted I'm
running v. 0.8.1).

I have a Spark slave node that has run out of disk space. When I try to
start Shark it attempts to deploy the application to a directory on that
node, fails and eventually gives up  (I see a Master Removed our
application message in the shark server log).

Is Spark supposed to be able to ignore a slave if something goes wrong for
it (I realize that the slave probably appears alive enough)? I restarted
the Spark master in hopes that it would detect that the slave is suffering
but it doesn't seem to be the case.

Any thoughts appreciated -- we'll monitor disk space but I'm a little
worried that the cluster is not functional on account of a single slave.


Re: Running Jars on Spark, program just hanging there

2014-05-27 Thread Yana Kadiyska
Does the spark UI show your program running? (http://spark-masterIP:8118).
If the program is listed as running you should be able to see details via
the UI. In my experience there are 3 sets of logs -- the log where you're
running your program (the driver), the log on the master node, and the log
on each executor. The master log often has very useful details when one of
your slave executors has an issue. Then you can go and read the logs on
that machine. Of course, if you have a small number of workers in your
cluster you can just read all the logs. That's just general debugging
advice... (I also find it useful to do rdd.partitions.size before anything
else to check how many partitions the RDD is actually partitioned to...)


On Tue, May 27, 2014 at 2:48 PM, Min Li limin...@gmail.com wrote:

  Hi all,

 I've a single machine with 8 cores and 8g mem. I've deployed the
 standalone spark on the machine and successfully run the examples.

 Now I'm trying to write some simple java codes. I just read a local file
 (23M) into string list and use JavaRDDString rdds =
 sparkContext.paralellize() method to get the corresponding rdd. And I asked
 to run rdds.count(). But the program just stopped on the count(). The last
 log info is:

 14/05/27 14:13:16 INFO SparkContext: Starting job: count at RDDTest.java:40
 14/05/27 14:13:16 INFO DAGScheduler: Got job 0 (count at RDDTest.java:40)
 with 2 output partitions (allowLocal=false)
 14/05/27 14:13:16 INFO DAGScheduler: Final stage: Stage 0 (count at
 RDDTest.java:40)
 14/05/27 14:13:16 INFO DAGScheduler: Parents of final stage: List()
 14/05/27 14:13:16 INFO DAGScheduler: Missing parents: List()
 14/05/27 14:13:16 INFO DAGScheduler: Submitting Stage 0
 (ParallelCollectionRDD[0] at parallelize at RDDTest.java:37), which has no
 missing parents
 14/05/27 14:13:16 INFO SparkDeploySchedulerBackend: Connected to Spark
 cluster with app ID app-20140527141316-0003
 14/05/27 14:13:16 INFO AppClient$ClientActor: Executor added:
 app-20140527141316-0003/0 on worker-20140526221107-spark-35303
 (spark:35303) with 8 cores
 14/05/27 14:13:16 INFO SparkDeploySchedulerBackend: Granted executor ID
 app-20140527141316-0003/0 on hostPort spark:35303 with 8 cores, 1024.0 MB
 RAM
 14/05/27 14:13:16 INFO AppClient$ClientActor: Executor updated:
 app-20140527141316-0003/0 is now RUNNING
 14/05/27 14:13:16 INFO DAGScheduler: Submitting 2 missing tasks from Stage
 0 (ParallelCollectionRDD[0] at parallelize at RDDTest.java:37)
 14/05/27 14:13:16 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
 14/05/27 14:13:17 INFO SparkDeploySchedulerBackend: Registered executor:
 Actor[akka.tcp://sparkExecutor@spark:34279/user/Executor#196489168] with
 ID 0
 14/05/27 14:13:17 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on
 executor 0: spark (PROCESS_LOCAL)
 14/05/27 14:13:17 INFO TaskSetManager: Serialized task 0.0:0 as 12993529
 bytes in 127 ms
 14/05/27 14:13:17 INFO TaskSetManager: Starting task 0.0:1 as TID 1 on
 executor 0: spark (PROCESS_LOCAL)
 14/05/27 14:13:17 INFO TaskSetManager: Serialized task 0.0:1 as 13006417
 bytes in 74 ms
 14/05/27 14:13:17 INFO BlockManagerMasterActor$BlockManagerInfo:
 Registering block manager spark:37617 with 589.2 MB RAM

 I tried to figure out what's going on, but just can't. Could any please
 give me some suggestions and point out some possible issues?

 Best Regards,
 Min



Master not seeing recovered nodes(Got heartbeat from unregistered worker ....)

2014-06-13 Thread Yana Kadiyska
Hi, I see this has been asked before but has not gotten any satisfactory
answer so I'll try again:

(here is the original thread I found:
http://mail-archives.apache.org/mod_mbox/spark-user/201403.mbox/%3c1394044078706-2312.p...@n3.nabble.com%3E
)

I have a set of workers dying and coming back again. The master prints the
following warning:

Got heartbeat from unregistered worker 

What is the solution to this -- rolling the master is very undesirable to
me as I have a Shark context sitting on top of it (it's meant to be highly
available).

Insights appreciated -- I don't think an executor going down is very
unexpected but it does seem odd that it won't be able to rejoin the working
set.

I'm running Spark 0.9.1 on CDH


Need some Streaming help

2014-06-16 Thread Yana Kadiyska
Like many people, I'm trying to do hourly counts. The twist is that I don't
want to count per hour of streaming, but per hour of the actual occurrence
of the event (wall clock, say -mm-dd HH).

My thought is to make the streaming window large enough that a full hour of
streaming data would fit inside it. Since my window slides in small
increments, I want to drop the lowest hour from the stream before
persisting the results(since it would have been reduced during the previous
batch and would be a partial count in the current). I have gotten this far

Every line of the input files is parsed into Event(type, hour), and stream
is a DStream[RDD[Event]]

val evtCountsByHour =
  stream.map(evt = (evt, 1))
.reduceByKeyAndWindow(_+_, Seconds(secondsInWindow)) //hourly
counts per event
.mapPartitions(iter = iter.map(x=(x._1.hour,x)))

My understanding is that at this point, the event counts are keyed by hour.

1. How do I detect the smallest key? I have seen some examples of
partitionBy + mapPartitionsWithIndex and dropping the lowest index but
can't figure out how to do it with a DStream. My gut feeling is that the
first RDD in the stream has to contain the oldest data but that doesn't
seem to be the case(printed from inside evtCountsByHour.foreachRDD)

2. If someone is further ahead with this type of problem, could you give
some insight on how you approached it -- I think Streaming would be the
correct approach since I don't really want to worry about data that was
already processed and I want to process it continuously.  I opted on
reduceByKeyAndWindow with a large window as opposed to updateStateByKey as
the hour the event occurred in is part of the key and I don't care to keep
around that key once the next hour's events are coming in (I'm assuming
RDDs outside the window are considered unreferenced). But I'd love to hear
other suggestions if my logic is off.


Help with object access from mapper (simple question)

2014-06-23 Thread Yana Kadiyska
Hi folks, hoping someone can explain to me what's going on:

I have the following code, largely based on RecoverableNetworkWordCount
example (
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
):

I am setting fields on an object that gets accessed within the map
function. But the workers do not see the set values. Could someone help me
understand what is going on? I suspect the serialization of the object
happens not when I think it does

object Foo{
var field1 = -1
var field2 = -2
}

def main(args: Array[String]) {
//open a yaml file
   Foo.field1 = value_from_yaml
   Foo.field2 = value from_yaml

   val ssc = StreamingContext.getOrCreate(checkpointDirectory,
  () = { createContext()  })
ssc.start()
}

def createContext(){
 // create streaming context
println(Foo)--
foo's fields are  set
lines.map(line ={
 println(Foo)   --
foo's fields are not set
})
}


Re: Help with object access from mapper (simple question)

2014-06-23 Thread Yana Kadiyska
Thank you so much! I was trying for a singleton and opted against a class
but clearly this backfired. Clearly time to revisit Scala lessons. Thanks
again


On Mon, Jun 23, 2014 at 1:16 PM, Marcelo Vanzin van...@cloudera.com wrote:

 object in Scala is similar to a class with only static fields /
 methods in Java. So when you set its fields in the driver, the
 object does not get serialized and sent to the executors; they have
 their own copy of the class and its static fields, which haven't been
 initialized.

 Use a proper class, instantiate it, and then use it in the executors. e.g.

 class Foo extends Serializable {
  ...
 }

  val foo = new Foo()
  foo.field1 = blah

  lines.map(line = { println(foo) }) // now you should see the field
 values you set.



 On Mon, Jun 23, 2014 at 7:44 AM, Yana Kadiyska yana.kadiy...@gmail.com
 wrote:
  Hi folks, hoping someone can explain to me what's going on:
 
  I have the following code, largely based on RecoverableNetworkWordCount
  example
  (
 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
 ):
 
  I am setting fields on an object that gets accessed within the map
 function.
  But the workers do not see the set values. Could someone help me
 understand
  what is going on? I suspect the serialization of the object happens not
 when
  I think it does
 
  object Foo{
  var field1 = -1
  var field2 = -2
  }
 
  def main(args: Array[String]) {
  //open a yaml file
 Foo.field1 = value_from_yaml
 Foo.field2 = value from_yaml
 
 val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() = { createContext()  })
  ssc.start()
  }
 
  def createContext(){
   // create streaming context
  println(Foo)--
  foo's fields are  set
  lines.map(line ={
   println(Foo)   --
  foo's fields are not set
  })
  }



 --
 Marcelo



Help understanding spark.task.maxFailures

2014-06-30 Thread Yana Kadiyska
Hi community, this one should be an easy one:

I have left spark.task.maxFailures to it's default (which should be
4). I see a job that shows the following statistics for Tasks:
Succeeded/Total

7109/819 (1 failed)

So there were 819 tasks to start with. I have 2 executors in that
cluster. From Spark docs it says spark.task.maxFailures is the number
of times to try a task before a job is given up. So I was imagining
that 819*4 (i.e. 3276) would be the max number to ever see in the
succeeded (accounting for retries on every possibly task). even that
3276*2 (6552, if it's per task per executor) does not account for 7109
successfull tasks.

Could anyone help explain why I'm seeing such high number of succeeded tasks?


Re: Changing log level of spark

2014-07-01 Thread Yana Kadiyska
Are you looking at the driver log? (e.g. Shark?). I see a ton of
information in the INFO category on what query is being started, what
stage is starting and which executor stuff is sent to. So I'm not sure
if you're saying you see all that and you need more, or that you're
not seeing this type of information. I cannot speak to the ec2 setup,
just pointing out that under 0.9.1 I see quite a bit of scheduling
information in the driver log.

On Tue, Jul 1, 2014 at 9:20 AM, Philip Limbeck philiplimb...@gmail.com wrote:
 We changed the loglevel to DEBUG by replacing every INFO with DEBUG in
 /root/ephemeral-hdfs/conf/log4j.properties and propagating it to the
 cluster. There is some DEBUG output visible in both master and worker but
 nothing really interesting regarding stages or scheduling. Since we expected
 a little more than that, there could be 2 possibilites:
   a) There is still some other unknown way to set the loglevel to debug
   b) There is not that much log output to be expected in this direction, I
 looked for logDebug (The log wrapper in spark) in github with 84 results,
 which means that I doubt that there is not much else to expect.

 We actually just want to have a little more insight into the system behavior
 especially when using Shark since we ran into some serious concurrency
 issues with blocking queries. So much for the background why this is
 important to us.


 On Thu, Jun 26, 2014 at 3:30 AM, Aaron Davidson ilike...@gmail.com wrote:

 If you're using the spark-ec2 scripts, you may have to change
 /root/ephemeral-hdfs/conf/log4j.properties or something like that, as that
 is added to the classpath before Spark's own conf.


 On Wed, Jun 25, 2014 at 6:10 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 I have a log4j.xml in src/main/resources with

 ?xml version=1.0 encoding=UTF-8 ?
 !DOCTYPE log4j:configuration SYSTEM log4j.dtd
 log4j:configuration xmlns:log4j=http://jakarta.apache.org/log4j/;
 [...]
 root
 priority value =warn /
 appender-ref ref=Console /
 /root
 /log4j:configuration

 and that is included in the jar I package with `sbt assembly`. That
 works fine for me, at least on the driver.

 Tobias

 On Wed, Jun 25, 2014 at 2:25 PM, Philip Limbeck philiplimb...@gmail.com
 wrote:
  Hi!
 
  According to
 
  https://spark.apache.org/docs/0.9.0/configuration.html#configuring-logging,
  changing log-level is just a matter of creating a log4j.properties
  (which is
  in the classpath of spark) and changing log level there for the root
  logger.
  I did this steps on every node in the cluster (master and worker
  nodes).
  However, after restart there is still no debug output as desired, but
  only
  the default info log level.





Re: Spark Streaming question batch size

2014-07-01 Thread Yana Kadiyska
Are you saying that both streams come in at the same rate and you have
the same batch interval but the batch size ends up different? i.e. two
datapoints both arriving at X seconds after streaming starts end up in
two different batches? How do you define real time values for both
streams? I am trying to do something similar to you, I think -- but
I'm not clear on what your notion of time is.
My reading of your example above is that the streams just pump data in
at different rates -- first one got 7462 points in the first batch
interval, whereas stream2 saw 10493

On Tue, Jul 1, 2014 at 5:34 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:
 Hi,

 The window size in a spark streaming is time based which means we have
 different number of elements in each window. For example if you have two
 streams (might be more) which are related to each other and you want to
 compare them in a specific time interval. I am not clear how it will work.
 Although they start running simultaneously, they might have different number
 of elements in each time interval.

 The following is output for two streams which have same number of elements
 and ran simultaneously. The left most value is the number of elements in
 each window. If we add the number of elements them, they are same for both
 streams but we can't compare both streams as they are different in window
 size and number of windows.

 Can we somehow make windows based on real time values for both streams? or
 Can we make windows based on number of elements?

 (n, (mean, varience, SD))
 Stream 1
 (7462,(1.0535658165371238,4242.001306434091,65.13064798107025))
 (44826,(0.2546925855084064,5042.890184382894,71.0133099100647))
 (245466,(0.2857731601728941,5014.411691661449,70.81251084138628))
 (154852,(0.21907814309792514,3483.800160602281,59.023725404300606))
 (156345,(0.3075668844414613,7449.528181550462,86.31064929399189))
 (156603,(0.27785151491351234,5917.809892281489,76.9273026452994))
 (156047,(0.18130350363672296,4019.0232843737017,63.39576708561623))

 Stream 2
 (10493,(0.5554953964547791,1254.883548218503,35.42433553672536))
 (180649,(0.21684831234050583,1095.9634245399352,33.1053383087975))
 (179994,(0.22048869512317407,1443.0566458182718,37.98758541705792))
 (179455,(0.20473330254938552,1623.9538730448216,40.29831104456888))
 (269817,(0.16987953223480945,3270.663944782799,57.18971887308766))
 (101193,(0.21469292497504766,1263.0879032808723,35.53994799209577))

 Regards,
 Laeeq




Re: Lost TID: Loss was due to fetch failure from BlockManagerId

2014-07-01 Thread Yana Kadiyska
A lot of things can get funny when you run distributed as opposed to
local -- e.g. some jar not making it over. Do you see anything of
interest in the log on the executor machines -- I'm guessing
192.168.222.152/192.168.222.164. From here
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
seems like the warning message is logged after the task fails -- but I
wonder if you might see something more useful as to why it failed to
begin with. As an example we've had cases in Hdfs where a small
example would work, but on a larger example we'd hit a bad file. But
the executor log is usually pretty explicit as to what happened...

On Tue, Jul 1, 2014 at 8:57 PM, Mohammed Guller moham...@glassbeam.com wrote:
 I am running Spark 1.0 on a 4-node standalone spark cluster (1 master + 3
 worker). Our app is fetching data from Cassandra and doing a basic filter,
 map, and countByKey on that data. I have run into a strange problem. Even if
 the number of rows in Cassandra is just 1M, the Spark job goes seems to go
 into an infinite loop and runs for hours. With a small amount of data (less
 than 100 rows), the job does finish, but takes almost 30-40 seconds and we
 frequently see the messages shown below. If we run the same application on a
 single node Spark (--master local[4]), then we don’t see these warnings and
 the task finishes in less than 6-7 seconds. Any idea what could be the cause
 for these problems when we run our application on a standalone 4-node spark
 cluster?



 14/06/30 19:30:16 WARN TaskSetManager: Lost TID 25036 (task 6.0:90)

 14/06/30 19:30:16 WARN TaskSetManager: Loss was due to fetch failure from
 BlockManagerId(2, 192.168.222.164, 57185, 0)

 14/06/30 19:30:18 WARN TaskSetManager: Lost TID 25310 (task 6.1:0)

 14/06/30 19:30:18 WARN TaskSetManager: Loss was due to fetch failure from
 BlockManagerId(2, 192.168.222.164, 57185, 0)

 14/06/30 19:30:19 WARN TaskSetManager: Lost TID 25582 (task 6.2:0)

 14/06/30 19:30:19 WARN TaskSetManager: Loss was due to fetch failure from
 BlockManagerId(2, 192.168.222.164, 57185, 0)

 14/06/30 19:30:21 WARN TaskSetManager: Lost TID 25882 (task 6.3:34)

 14/06/30 19:30:21 WARN TaskSetManager: Loss was due to fetch failure from
 BlockManagerId(0, 192.168.222.142, 39342, 0)

 14/06/30 19:30:22 WARN TaskSetManager: Lost TID 26152 (task 6.4:0)

 14/06/30 19:30:22 WARN TaskSetManager: Loss was due to fetch failure from
 BlockManagerId(0, 192.168.222.142, 39342, 0)

 14/06/30 19:30:23 WARN TaskSetManager: Lost TID 26427 (task 6.5:4)

 14/06/30 19:30:23 WARN TaskSetManager: Loss was due to fetch failure from
 BlockManagerId(2, 192.168.222.164, 57185, 0)

 14/06/30 19:30:25 WARN TaskSetManager: Lost TID 26690 (task 6.6:0)

 14/06/30 19:30:25 WARN TaskSetManager: Loss was due to fetch failure from
 BlockManagerId(2, 192.168.222.164, 57185, 0)

 14/06/30 19:30:26 WARN TaskSetManager: Lost TID 26959 (task 6.7:0)

 14/06/30 19:30:26 WARN TaskSetManager: Loss was due to fetch failure from
 BlockManagerId(2, 192.168.222.164, 57185, 0)

 14/06/30 19:30:28 WARN TaskSetManager: Lost TID 27449 (task 6.8:218)

 14/06/30 19:30:28 WARN TaskSetManager: Loss was due to fetch failure from
 BlockManagerId(2, 192.168.222.164, 57185, 0)

 14/06/30 19:30:30 WARN TaskSetManager: Lost TID 27718 (task 6.9:0)

 14/06/30 19:30:30 WARN TaskSetManager: Loss was due to fetch failure from
 BlockManagerId(2, 192.168.222.164, 57185, 0)

 14/06/30 19:30:30 WARN TaskSetManager: Loss was due to fetch failure from
 BlockManagerId(2, 192.168.222.164, 57185, 0)

 14/06/30 19:30:31 WARN TaskSetManager: Lost TID 27991 (task 6.10:1)

 14/06/30 19:30:31 WARN TaskSetManager: Loss was due to fetch failure from
 BlockManagerId(2, 192.168.222.164, 57185, 0)

 14/06/30 19:30:33 WARN TaskSetManager: Lost TID 28265 (task 6.11:0)

 14/06/30 19:30:33 WARN TaskSetManager: Loss was due to fetch failure from
 BlockManagerId(2, 192.168.222.164, 57185, 0)

 14/06/30 19:30:34 WARN TaskSetManager: Lost TID 28550 (task 6.12:0)

 14/06/30 19:30:34 WARN TaskSetManager: Loss was due to fetch failure from
 BlockManagerId(2, 192.168.222.164, 57185, 0)

 14/06/30 19:30:36 WARN TaskSetManager: Lost TID 28822 (task 6.13:0)

 14/06/30 19:30:36 WARN TaskSetManager: Loss was due to fetch failure from
 BlockManagerId(2, 192.168.222.164, 57185, 0)

 14/06/30 19:30:37 WARN TaskSetManager: Lost TID 29093 (task 6.14:0)

 14/06/30 19:30:37 WARN TaskSetManager: Loss was due to fetch failure from
 BlockManagerId(2, 192.168.222.164, 57185, 0)

 14/06/30 19:30:39 WARN TaskSetManager: Lost TID 29366 (task 6.15:0)

 14/06/30 19:30:39 WARN TaskSetManager: Loss was due to fetch failure from
 BlockManagerId(2, 192.168.222.164, 57185, 0)

 14/06/30 19:30:40 WARN TaskSetManager: Lost TID 29648 (task 6.16:9)

 14/06/30 19:30:40 WARN TaskSetManager: Loss was due to fetch failure from
 BlockManagerId(2, 192.168.222.164, 57185, 0)

 14/06/30 19:30:42 WARN TaskSetManager: Lost TID 29924 (task 

Re: Help alleviating OOM errors

2014-07-02 Thread Yana Kadiyska
Can you elaborate why You need to configure the  spark.shuffle.spill
true again in the config -- the default for spark.shuffle.spill is
set to true according to the
doc(https://spark.apache.org/docs/0.9.1/configuration.html)?

On OOM the tasks were process_local, which I understand is as good as
it gets but still going on 32+ hours.

On Wed, Jul 2, 2014 at 2:40 AM, Mayur Rustagi mayur.rust...@gmail.com wrote:


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi



 On Mon, Jun 30, 2014 at 8:09 PM, Yana Kadiyska yana.kadiy...@gmail.com
 wrote:

 Hi,

 our cluster seems to have a really hard time with OOM errors on the
 executor. Periodically we'd see a task that gets sent to a few
 executors, one would OOM, and then the job just stays active for hours
 (sometimes 30+ whereas normally it completes sub-minute).

 So I have a few questions:

 1. Why am I seeing OOMs to begin with?

 I'm running with defaults for
 spark.storage.memoryFraction
 spark.shuffle.memoryFraction

 so my understanding is that if Spark exceeds 60% of available memory,
 data will be spilled to disk? Am I misunderstanding this? In the
 attached screenshot, I see a single stage with 2 tasks on the same
 executor -- no disk spills but OOM.

 You need to configure the  spark.shuffle.spill true again in the config,
 What is causing you to OOM, it could be that you are trying to just simply
 sortbykey  keys are bigger memory of executor causing the OOM, can you put
 the stack.


 2. How can I reduce the likelyhood of seeing OOMs -- I am a bit
 concerned that I don't see a spill at all so not sure if decreasing
 spark.storage.memoryFraction is what needs to be done




 3. Why does an OOM seem to break the executor so hopelessly? I am
 seeing times upwards of 30hrs once an OOM occurs. Why is that -- the
 task *should* take under a minute, so even if the whole RDD was
 recomputed from scratch, 30hrs is very mysterious to me. Hadoop can
 process this in about 10-15 minutes, so I imagine even if the whole
 job went to disk it should still not take more than an hour

 When OOM occurs it could cause the RDD to spill to disk, the repeat task may
 be forced to read data from disk  cause the overall slowdown, not to
 mention the RDD may be send to different executor to be processed, are you
 seeing the slow tasks as process_local  or node_local atleast?


 Any insight into this would be much appreciated.
 Running Spark 0.9.1




Re: SparkKMeans.scala from examples will show: NoClassDefFoundError: breeze/linalg/Vector

2014-07-02 Thread Yana Kadiyska
The scripts that Xiangrui mentions set up the classpath...Can you run
./run-example for the provided example sucessfully?

What you can try is set SPARK_PRINT_LAUNCH_COMMAND=1 and then call
run-example -- that will show you the exact java command used to run
the example at the start of execution. Assuming you can run examples
succesfully, you should be able to just copy that and add your jar to
the front of the classpath. If that works you can start removing extra
jars (run-examples put all the example jars in the cp, which you won't
need)

 As you said the error you see is indicative of the class not being
available/seen at runtime but it's hard to tell why.

On Wed, Jul 2, 2014 at 2:13 AM, Wanda Hawk wanda_haw...@yahoo.com wrote:
 I want to make some minor modifications in the SparkMeans.scala so running
 the basic example won't do.
 I have also packed my code under a jar file with sbt. It completes
 successfully but when I try to run it : java -jar myjar.jar I get the same
 error:
 Exception in thread main java.lang.NoClassDefFoundError:
 breeze/linalg/Vector
 at java.lang.Class.getDeclaredMethods0(Native Method)
 at java.lang.Class.privateGetDeclaredMethods(Class.java:2531)
 at java.lang.Class.getMethod0(Class.java:2774)
 at java.lang.Class.getMethod(Class.java:1663)
 at
 sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494)
 at
 sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486)
 

 If scalac -d classes/ SparkKMeans.scala can't see my classpath, why does
 it succeeds in compiling and does not give the same error ?
 The error itself NoClassDefFoundError means that the files are available
 at compile time, but for some reason I cannot figure out they are not
 available at run time. Does anyone know why ?

 Thank you


 On Tuesday, July 1, 2014 7:03 PM, Xiangrui Meng men...@gmail.com wrote:


 You can use either bin/run-example or bin/spark-summit to run example
 code. scalac -d classes/ SparkKMeans.scala doesn't recognize Spark
 classpath. There are examples in the official doc:
 http://spark.apache.org/docs/latest/quick-start.html#where-to-go-from-here
 -Xiangrui

 On Tue, Jul 1, 2014 at 4:39 AM, Wanda Hawk wanda_haw...@yahoo.com wrote:
 Hello,

 I have installed spark-1.0.0 with scala2.10.3. I have built spark with
 sbt/sbt assembly and added

 /home/wanda/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar
 to my CLASSPATH variable.
 Then I went here
 ../spark-1.0.0/examples/src/main/scala/org/apache/spark/examples created
 a
 new directory classes and compiled SparkKMeans.scala with scalac -d
 classes/ SparkKMeans.scala
 Then I navigated to classes (I commented this line in the scala file :
 package org.apache.spark.examples ) and tried to run it with java -cp .
 SparkKMeans and I get the following error:
 Exception in thread main java.lang.NoClassDefFoundError:
 breeze/linalg/Vector
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2531)
at java.lang.Class.getMethod0(Class.java:2774)
at java.lang.Class.getMethod(Class.java:1663)
at
 sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494)
at
 sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486)
 Caused by: java.lang.ClassNotFoundException: breeze.linalg.Vector
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 6 more
 
 The jar under

 /home/wanda/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar
 contains the breeze/linalg/Vector* path, I even tried to unpack it and put
 it in CLASSPATH to it does not seem to pick it up


 I am currently running java 1.8
 java version 1.8.0_05
 Java(TM) SE Runtime Environment (build 1.8.0_05-b13)
 Java HotSpot(TM) 64-Bit Server VM (build 25.5-b02, mixed mode)

 What I am doing wrong ?





Re: Cannot submit to a Spark Application to a remote cluster Spark 1.0

2014-07-09 Thread Yana Kadiyska
 class java.io.IOException: Cannot run program
/Users/aris.vlasakakis/Documents/spark-1.0.0/bin/compute-classpath.sh
(in directory .): error=2, No such file or directory

By any chance, are your SPARK_HOME directories different on the
machine where you're submitting from and the cluster? I'm on an older
drop so not sure about the finer points of spark-submit but do
remember a very similar issue when trying to run a Spark driver on a
windows machine against a Spark Master on Ubuntu cluster (the
SPARK_HOME directories were obviously different)

On Wed, Jul 9, 2014 at 7:18 PM, Aris Vlasakakis a...@vlasakakis.com wrote:
 Hello everybody,

 I am trying to figure out how to submit a Spark application from one
 separate physical machine to a Spark stand alone cluster. I have an
 application that I wrote in Python that works if I am on the 1-Node Spark
 server itself, and from that spark installation I run bin/spark-submit with
 1)  MASTER=local[*] or if 2) MASTER=spark://localhost:7077.

 However, I want to be on a separate machine that submits a job to Spark. Am
 I doing something wrong here? I think something is wrong because I am
 working from two different spark installations -- as in, on the big server
 I have one spark installation and I am running sbin/start-all.sh to run the
 standalone server (and that works), and then on a separate laptop I have a
 different installation of spark-1.0.0, but I am using the laptop's
 bin/spark-submit script to submit to the remote Spark server (using
 MASTER=spark://remote-spark-master:7077

 This submit-to-remote cluster does not work, even for the Scala examples
 like SparkPi.

 Concrete Example: I want to do submit the example SparkPi to the cluster,
 from my laptop.

 Server is 10.20.10.152, running master and slave, I can look at the Master
 web UI at http://10.20.10.152:8080. Great.

 From laptop (10.20.10.154), I try the following, using bin/run-example from
 a locally built version of spark 1.0.0 (so that I have the script
 spark-submit!):

 bin/spark-submit --verbose --class org.apache.spark.examples.SparkPi
 --master spark://10.20.10.152:7077
 examples/target/scala-2.10/spark-examples-1.0.0-hadoop1.0.4.jar


 This fails, with the errors at the bottom of this email.

 Am I doing something wrong? How can I submit to  a remote cluster? I get the
 same problem with bin/spark-submit.


  bin/spark-submit --verbose --class org.apache.spark.examples.SparkPi
 --master spark://10.20.10.152:7077
 examples/target/scala-2.10/spark-examples-1.0.0-hadoop1.0.4.jar
 Using properties file: null
 Using properties file: null
 Parsed arguments:
   master  spark://10.20.10.152:7077
   deployMode  null
   executorMemory  null
   executorCores   null
   totalExecutorCores  null
   propertiesFile  null
   driverMemorynull
   driverCores null
   driverExtraClassPathnull
   driverExtraLibraryPath  null
   driverExtraJavaOptions  null
   supervise   false
   queue   null
   numExecutorsnull
   files   null
   pyFiles null
   archivesnull
   mainClass   org.apache.spark.examples.SparkPi
   primaryResource
 file:/Users/aris.vlasakakis/Documents/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop1.0.4.jar
   nameorg.apache.spark.examples.SparkPi
   childArgs   []
   jarsnull
   verbose true

 Default properties from null:



 Using properties file: null
 Main class:
 org.apache.spark.examples.SparkPi
 Arguments:

 System properties:
 SPARK_SUBMIT - true
 spark.app.name - org.apache.spark.examples.SparkPi
 spark.jars -
 file:/Users/aris.vlasakakis/Documents/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop1.0.4.jar
 spark.master - spark://10.20.10.152:7077
 Classpath elements:
 file:/Users/aris.vlasakakis/Documents/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop1.0.4.jar


 14/07/09 16:16:08 INFO SecurityManager: Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties
 14/07/09 16:16:08 INFO SecurityManager: Changing view acls to:
 aris.vlasakakis
 14/07/09 16:16:08 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions:
 Set(aris.vlasakakis)
 14/07/09 16:16:08 INFO Slf4jLogger: Slf4jLogger started
 14/07/09 16:16:08 INFO Remoting: Starting remoting
 14/07/09 16:16:08 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://spark@10.20.10.154:50478]
 14/07/09 16:16:08 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://spark@10.20.10.154:50478]
 14/07/09 16:16:08 INFO SparkEnv: Registering MapOutputTracker
 14/07/09 16:16:08 INFO SparkEnv: Registering BlockManagerMaster
 14/07/09 16:16:08 INFO DiskBlockManager: Created local directory at
 

Re: Map Function does not seem to be executing over RDD

2014-07-09 Thread Yana Kadiyska
Does this line  println(Retuning +string) from the hash function
print what you expect? If you're not seeing that output in the
executor log I'd also put some debug statements in case other, since
your match in the interesting case is conditioned on if(
fieldsList.contains(index)) -- maybe that doesn't catch what you think
it should...if that's the case you can dump out the contents of
fieldsList within the other case (i.e. inside the map) and see
what's there...

On Wed, Jul 9, 2014 at 9:46 PM, Raza Rehman razaurreh...@gmail.com wrote:
 Hello every one

 I am having some problem with a simple Scala/ Spark Code in which I am
 trying to replaces certain fields in a csv with their hashes

 class DSV (var line:String=,fieldsList:Seq[Int], var delimiter:String=,)
 extends Serializable {

 def hash(s:String):String={
 var md = MessageDigest.getInstance(sha)
 md.update(s.getBytes(UTF-8))

 var digest = md.digest()

 val string:Option[String] = Option(digest).map(Hex.valueOf)

 println(Retuning +string)
 string.getOrElse()
 }

 def anonymizeFields(l:String):String ={
 l.split(delimiter,-1).zipWithIndex
 .map {
 case (str, index) if( fieldsList.contains(index))
 =hash(str)
 case other = other._1
 }.mkString(delimiter)
 }
 }

 I am calling the anonymize function like this but the anondata seems to be
 the same as the original dsvData

 var dsvData = sc.textFile(inputPath+inputFileName).map(
 line=(new DSV(line,List(1,2),  \\|))
 )

 println(Lines Processed=+dsvData.count())
 var anonData = dsvData.map(l=l.anonymizeFields(l.line))

 println(DSVs Processed=+anonData.count())
 anonData.saveAsTextFile(outputPath+outputFileName)

 I have tried the execution through shell as well but the problem persists.
 The job does finish but the worker log shows the following error message

 14/07/09 11:30:20 ERROR EndpointWriter: AssociationError
 [akka.tcp://sparkWorker@host:60593] -
 [akka.tcp://sparkExecutor@host:51397]: Error [Association failed with
 [akka.tcp://sparkExecutor@host:51397]] [

 Regards
 MRK


Re: incorrect labels being read by MLUtils.loadLabeledData()

2014-07-10 Thread Yana Kadiyska
I do not believe the order of points in a distributed RDD is in any
way guaranteed. For a simple test, you can always add a last column
which is an id (make it double and throw it in the feature vector).
Printing the rdd back will not give you the points in file order. If
you don't want to go that far you can always examine the full feature
vector carefully -- points 12 and 14 should differ from your input csv
in the feature vector as well as the label.

On Thu, Jul 10, 2014 at 6:28 PM, SK skrishna...@gmail.com wrote:
 Hi,

 I have a csv data file, which I have organized  in the following format to
 be read as a LabeledPoint(following the example in
 mllib/data/sample_tree_data.csv):

 1,5.1,3.5,1.4,0.2
 1,4.9,3,1.4,0.2
 1,4.7,3.2,1.3,0.2
 1,4.6,3.1,1.5,0.2

 The first column is the binary label (1 or 0) and the remaining columns are
 features. I am using the Logistic Regression Classifier in MLLib to create a
 model based on the training data and predict the (binary) class of the test
 data.   I use MLUtils.loadLabeledData to read  the data file. My prediction
 accuracy is quite low (compared to the results I got for the same data from
 R), So I tried to debug, by first verifying that the LabeledData is being
 read correctly.
 I find that some of the labels are not read correctly. For example, the
 first 40 points of the training data have a class of 1, whereas the training
 data read by loadLabeledData has label 0 for point 12 and point 14. I would
 like to know if this is because of the distributed algorithm that MLLib uses
 or if there is something wrong with the format I have above.

 thanks





 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/incorrect-labels-being-read-by-MLUtils-loadLabeledData-tp9356.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Help using streaming from Spark Shell

2014-07-26 Thread Yana Kadiyska
Hi,

I'm starting spark-shell like this:

SPARK_MEM=1g SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=3600
/spark/bin/spark-shell -c 3

but when I try to create a streaming context
val scc = new StreamingContext(sc, Seconds(10))

 I get:

org.apache.spark.SparkException: Spark Streaming cannot be used
without setting spark.cleaner.ttl; set this property before creating a
SparkContext creating a SparkContext (use SPARK_JAVA_OPTS for the
shell)

at 
org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:121)


I also tried export SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=3600
before calling spark-shell but with no luck...

What am I doing wrong? This is spark 0.9.1 -- I cannot upgrade


[Streaming] updateStateByKey trouble

2014-08-06 Thread Yana Kadiyska
Hi folks, hoping someone who works with Streaming can help me out.

I have the following snippet:

val stateDstream =
  data.map(x = (x, 1))
  .updateStateByKey[State](updateFunc)

stateDstream.saveAsTextFiles(checkpointDirectory, partitions_test)

where data is a RDD of

case class StateKey(host:String,hour:String,customer:String)

when I dump out the stream, I see duplicate values in the same partition
(I've bolded the keys that are identical):

(StateKey(foo.com.br,2014-07-22-18,16),State(43,2014-08-06T14:05:29.831Z))
(*StateKey*(www.abcd.com
,2014-07-22-22,25),State(2564,2014-08-06T14:05:29.831Z))
(StateKey(bar.com,2014-07-04-20,29),State(77,2014-08-06T14:05:29.831Z))
(*StateKey*(www.abcd.com
,2014-07-22-22,25),State(1117,2014-08-06T14:05:29.831Z))


I was under the impression that on each batch, the stream will contain a
single RDD with Key-Value pairs, reflecting the latest state of each key.
Am I misunderstanding this? Or is the key equality somehow failing?

Any tips on this appreciated...

PS. For completeness State is
case class State(val count:Integer,val update_date:DateTime)


Spark working directories

2014-08-14 Thread Yana Kadiyska
Hi all, trying to change defaults of where stuff gets written.

I've set -Dspark.local.dir=/spark/tmp and I can see that the setting is
used when the executor is started.

I do indeed see directories like spark-local-20140815004454-bb3f in this
desired location but I also see undesired stuff under /tmp

usr@executor:~# ls /tmp/spark-93f4d44c-ff4d-477d-8930-5884b10b065f/
files  jars

usr@driver: ls /tmp/spark-7e456342-a58c-4439-ab69-ff8e6d6b56a5/
files jars

Is there a way to move these directories off of /tmp? I am running
0.9.1 (SPARK_WORKER_DIR
is also exported on all nodes though all that I see there are executor logs)

Thanks


Re: a noob question for how to implement setup and cleanup in Spark map

2014-08-19 Thread Yana Kadiyska
Sean, would this work --

rdd.mapPartitions { partition = Iterator(partition) }.foreach(

   // Some setup code here
   // save partition to DB
   // Some cleanup code here
)


I tried a pretty simple example ... I can see that the setup and
cleanup are executed on the executor node, once per partition (I used
mapPartitionWithIndex instead of mapPartition to track this a little
better). Seems like an easier solution than Tobias's but I'm wondering
if it's perhaps incorrect




On Mon, Aug 18, 2014 at 3:29 AM, Henry Hung ythu...@winbond.com wrote:

 I slightly modify the code to use while(partitions.hasNext) { } instead of
 partitions.map(func)
 I suppose this can eliminate the uncertainty from lazy execution.

 -Original Message-
 From: Sean Owen [mailto:so...@cloudera.com]
 Sent: Monday, August 18, 2014 3:10 PM
 To: MA33 YTHung1
 Cc: user@spark.apache.org
 Subject: Re: a noob question for how to implement setup and cleanup in
 Spark map

 I think this was a more comprehensive answer recently. Tobias is right
 that it is not quite that simple:

 http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAPH-c_O9kQO6yJ4khXUVdO=+D4vj=JfG2tP9eqn5RPko=dr...@mail.gmail.com%3E

 On Mon, Aug 18, 2014 at 8:04 AM, Henry Hung ythu...@winbond.com wrote:
  Hi All,
 
 
 
  Please ignore my question, I found a way to implement it via old
  archive
  mails:
 
 
 
  http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAF
  _KkPzpU4qZWzDWUpS5r9bbh=-hwnze2qqg56e25p--1wv...@mail.gmail.com%3E
 
 
 
  Best regards,
 
  Henry
 
 
 
  From: MA33 YTHung1
  Sent: Monday, August 18, 2014 2:42 PM
  To: user@spark.apache.org
  Subject: a noob question for how to implement setup and cleanup in
  Spark map
 
 
 
  Hi All,
 
 
 
  I’m new to Spark and Scala, just recently using this language and love
  it, but there is a small coding problem when I want to convert my
  existing map reduce code from Java to Spark…
 
 
 
  In Java, I create a class by extending
  org.apache.hadoop.mapreduce.Mapper
  and override the setup(), map() and cleanup() methods.
 
  But in the Spark, there is no a method called setup(), so I write the
  setup() code into map(), but it performs badly.
 
  The reason is I create database connection in the setup() once and
  run() will execute SQL query, then cleanup() will close the connection.
 
  Could someone tell me how to do it in Spark?
 
 
 
  Best regards,
 
  Henry Hung
 
 
 
  
 
  The privileged confidential information contained in this email is
  intended for use only by the addressees as indicated by the original
  sender of this email. If you are not the addressee indicated in this
  email or are not responsible for delivery of the email to such a
  person, please kindly reply to the sender indicating this fact and
  delete all copies of it from your computer and network server
  immediately. Your cooperation is highly appreciated. It is advised
  that any unauthorized use of confidential information of Winbond is
  strictly prohibited; and any information in this email irrelevant to
  the official business of Winbond shall be deemed as neither given nor
 endorsed by Winbond.
 
 
  
  The privileged confidential information contained in this email is
  intended for use only by the addressees as indicated by the original
  sender of this email. If you are not the addressee indicated in this
  email or are not responsible for delivery of the email to such a
  person, please kindly reply to the sender indicating this fact and
  delete all copies of it from your computer and network server
  immediately. Your cooperation is highly appreciated. It is advised
  that any unauthorized use of confidential information of Winbond is
  strictly prohibited; and any information in this email irrelevant to
  the official business of Winbond shall be deemed as neither given nor
 endorsed by Winbond.

 The privileged confidential information contained in this email is
 intended for use only by the addressees as indicated by the original sender
 of this email. If you are not the addressee indicated in this email or are
 not responsible for delivery of the email to such a person, please kindly
 reply to the sender indicating this fact and delete all copies of it from
 your computer and network server immediately. Your cooperation is highly
 appreciated. It is advised that any unauthorized use of confidential
 information of Winbond is strictly prohibited; and any information in this
 email irrelevant to the official business of Winbond shall be deemed as
 neither given nor endorsed by Winbond.



[Streaming] Cannot get executors to stay alive

2014-08-27 Thread Yana Kadiyska
Hi, I tried a similar question before and didn't get any answers,so I'll
try again:

I am using updateStateByKey, pretty much exactly as shown in the examples
shipping with Spark:

 def createContext(master:String,dropDir:String, checkpointDirectory:String) = {
val updateFunc = (values: Seq[Int], state: Option[Int]) = {
  val currentCount = values.sum
  val previousCount = state.getOrElse(0)
  Some(currentCount + previousCount)
}

val sparkConf = new SparkConf()
  .setMaster(master)
  .setAppName(StatefulNetworkWordCountNoMem)
  .setJars(StreamingContext.jarOfClass(this.getClass));


val ssc = new StreamingContext(sparkConf, Seconds(10))
ssc.checkpoint(checkpointDirectory)

   val lines = ssc.textFileStream(dropDir)

val wordDstream = lines.map(line ={
  val x = line.split(\t)
 ((x(0), x(1),x(2)), 1)
})
wordDstream.print()
val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
println(Printing stream)
stateDstream.print()
ssc
  }

​


I am running this over a _static_ drop directory (i.e. files are there at
the beginning of time, nothing new is coming).

The only difference is that I have a pretty wide key space -- about 440K,
each key is of length under 20 chars.

The code exactly as shown above runs for a while (about an hour) and then
the executors start dying with OOM exceptions. I tried executor memory from
512M to 2g (4 executors)-- the only difference is how long it takes for the
OOM. The only task that keeps executing is take at DStream(line
586)...which makes sense. What doesn't make sense is why the memory isn't
getting reclaimed

What am I doing wrong? I'd like to use streaming but it seems that I can't
get a process with 0 incoming traffic to stay up. Any advice much
appreciated -- I'm sure someone on this list has managed to run a streaming
program for longer than an hour!


Re: Spark Streaming checkpoint recovery causes IO re-execution

2014-08-28 Thread Yana Kadiyska
Can you clarify the scenario:

 val ssc = new StreamingContext(sparkConf, Seconds(10))
 ssc.checkpoint(checkpointDirectory)

 val stream = KafkaUtils.createStream(...)
 val wordCounts = lines.flatMap(_.split( )).map(x = (x, 1L))

 val wordDstream= wordCounts.updateStateByKey[Int](updateFunc)

wordDstream.foreachRDD(

   rdd=rdd.foreachPartition( //sync state to external store//)
)

​

My impression is that during recovery from checkpoint, your wordDstream
would be in the state that it was before the crash +1 batch interval
forward when you get to the foreachRDD part -- even if re-creating the
pre-crash RDD is really slow. So if your driver goes down at 10:20 and you
restart at 10:30, I thought at the time of the DB write wordDstream would
have exactly the state of 10:20 + 10seconds worth of aggregated stream data?


I don't really understand what you mean by Upon metadata checkpoint
recovery (before the data checkpoint occurs) but it sounds like you're
observing the same DB write happening twice?

I don't have any advice for you but I am interested in understanding better
what happens in the recovery scenario so just trying to clarify what you
observe.


On Thu, Aug 28, 2014 at 6:42 AM, GADV giulio_devec...@yahoo.com wrote:

 Not sure if this make sense, but maybe would be nice to have a kind of
 flag
 available within the code that tells me if I'm running in a normal
 situation or during a recovery.
 To better explain this, let's consider the following scenario:
 I am processing data, let's say from a Kafka streaming, and I am updating a
 database based on the computations. During the recovery I don't want to
 update again the database (for many reasons, let's just assume that) but I
 want my system to be in the same status as before, thus I would like to
 know
 if my code is running for the first time or during a recovery so I can
 avoid
 to update the database again.
 More generally I want to know this in case I'm interacting with external
 entities.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-tp12568p13009.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Streaming checkpoint recovery causes IO re-execution

2014-08-29 Thread Yana Kadiyska
I understand that the DB writes are happening from the workers unless you
collect. My confusion is that you believe workers recompute on recovery(nodes
computations which get redone upon recovery). My understanding is that
checkpointing dumps  the RDD to disk and the cuts the RDD lineage. So I
thought on driver restart you'll get a set of new executor processes but
they would read the last known state of the RDD from HDFS checkpoint. Am I
off here?

So the only situation I can imagine where you end up recomputing is if your
checkpointing at a larger interval than your batch size (i.e. the RDD on
disk does not reflect it's last precrash state)?


On Thu, Aug 28, 2014 at 1:32 PM, RodrigoB rodrigo.boav...@aspect.com
wrote:

 Hi Yana,

 The fact is that the DB writing is happening on the node level and not on
 Spark level. One of the benefits of distributed computing nature of Spark
 is
 enabling IO distribution as well. For example, is much faster to have the
 nodes to write to Cassandra instead of having them all collected at the
 driver level and sending the writes from there.

 The problem is that nodes computations which get redone upon recovery. If
 these lambda functions send events to other systems these events would get
 resent upon re-computation causing overall system instability.

 Hope this helps you understand the problematic.

 tnks,
 Rod



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-tp12568p13043.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Java Configuration.

2014-09-02 Thread Yana Kadiyska
JavaSparkContext java_SC = new JavaSparkContext(conf); is the spark
context. An application has a single spark context -- you won't be able to
keep calling this -- you'll see an error if you try to create a second
such object from the same application.

Additionally, depending on your configuration, if you create a few
different apps that each create a spark context, you'll see them all
connected to the master in the UI. But they'll have to share executors on
the worker machines you have available. You'll often see messages like No
resources available if you are trying to run more than 1 app concurrently
and the first app you start is resource greedy

Hope this helps.


On Tue, Sep 2, 2014 at 10:02 AM, pcsenthil pcsent...@gmail.com wrote:

 Team,

 I am new to Apache Spark and I didn't have much knowledge on hadoop or big
 data. I need clarifications on the below,

 How does Spark Configuration works, from a tutorial i got the below

 /SparkConf conf = new SparkConf().setAppName(Simple application)
 .setMaster(local[4]);
 JavaSparkContext java_SC = new JavaSparkContext(conf);/

 from this, i understood that we are providing the config through java
 program to Spark.
 Let us assume i have written this in a separate java method.

 My question are

 what happen if i am keep on calling this?
 If this one will will keep on creating new objects for spark on each call,
 then how we are going to handle the JVM memory? Since under each object i
 am
 trying to run 4 concurrent threads?
 Is there any option to find existing one in JVM, so instead of creating new
 Spark object i can go with it?

 Please help me on this.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Java-Configuration-tp13269.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




[MLib] How do you normalize features?

2014-09-03 Thread Yana Kadiyska
It seems like the next release will add a nice org.apache.spark.mllib.feature
package but what is the recommended way to normalize features in the
current release (1.0.2) -- I'm hoping for a general pointer here.

At the moment I have a RDD[LabeledPoint] and I can get
a MultivariateStatisticalSummary for mean/variance. Is that about the right
way to proceed? I'm also not seeing an easy way to subtract vectors -- do I
need to do this element-wise?

thanks


Re: Object serialisation inside closures

2014-09-04 Thread Yana Kadiyska
In the third case the object does not get shipped around. Each executor
will create it's own instance. I got bitten by this here:

http://apache-spark-user-list.1001560.n3.nabble.com/Help-with-object-access-from-mapper-simple-question-tt8125.html


On Thu, Sep 4, 2014 at 9:29 AM, Andrianasolo Fanilo 
fanilo.andrianas...@worldline.com wrote:

  Hello Spark fellows J



 I’m a new user of Spark and Scala and have been using both for 6 months
 without too many problems.

 Here I’m looking for best practices for using non-serializable classes
 inside closure. I’m using Spark-0.9.0-incubating here with Hadoop 2.2.



 Suppose I am using OpenCSV parser to parse an input file. So inside my
 main :



 val sc = new SparkContext(local[2], App)

 val heyRDD = sc.textFile(…)



 val csvparser = new CSVParser(';')

 val heyMap = heyRDD.map { line =

   val temp = csvparser.parseLine(line)

   (temp(1), temp(4))

 }





 This gives me a java.io.NotSerializableException:
 au.com.bytecode.opencsv.CSVParser, which seems reasonable.



 From here I could see 3 solutions :

 1/ Extending CSVParser with Serialisable properties, which adds a lot of
 boilerplate code if you ask me

 2/ Using Kryo Serialization (still need to define a serializer)

 3/ Creating an object with an instance of the class I want to use,
 typically :



 object CSVParserPlus {



   val csvParser = new CSVParser(';')



   def parse(line: String) = {

 csvParser.parseLine(line)

   }

 }





 val heyMap = heyRDD.map { line =

   val temp = CSVParserPlus.parse(line)

   (temp(1), temp(4))

 }



 Third solution works and I don’t get how, so I was wondering how worked
 the closure system inside Spark to be able to serialize an object with a
 non-serializable instance. How does that work ? Does it hinder performance
 ? Is it a good solution ? How do you manage this problem ?



 Any input would be greatly appreciated



 Best regards,

 Fanilo

 --

 Ce message et les pièces jointes sont confidentiels et réservés à l'usage
 exclusif de ses destinataires. Il peut également être protégé par le secret
 professionnel. Si vous recevez ce message par erreur, merci d'en avertir
 immédiatement l'expéditeur et de le détruire. L'intégrité du message ne
 pouvant être assurée sur Internet, la responsabilité de Worldline ne pourra
 être recherchée quant au contenu de ce message. Bien que les meilleurs
 efforts soient faits pour maintenir cette transmission exempte de tout
 virus, l'expéditeur ne donne aucune garantie à cet égard et sa
 responsabilité ne saurait être recherchée pour tout dommage résultant d'un
 virus transmis.

 This e-mail and the documents attached are confidential and intended
 solely for the addressee; it may also be privileged. If you receive this
 e-mail in error, please notify the sender immediately and destroy it. As
 its integrity cannot be secured on the Internet, the Worldline liability
 cannot be triggered for the message content. Although the sender endeavours
 to maintain a computer virus-free network, the sender does not warrant that
 this transmission is virus-free and will not be liable for any damages
 resulting from any virus transmitted.



Re: Multiple spark shell sessions

2014-09-04 Thread Yana Kadiyska
These are just warnings from the web server. Normally your application will
have a UI page on port 4040. In your case, a little after the warning it
should bind just fine to another port (mine picked 4041). Im running on
0.9.1. Do you actually see the application failing? The main thing when
running more than one apps against the master is to make sure you have
enough resources for each (i.e. neither is grabbing too many resources or
cores). You can see if your app is successfully connected by looking at the
master UI page.


On Thu, Sep 4, 2014 at 5:58 AM, Dhimant dhimant84.jays...@gmail.com wrote:

 Hi,
 I am receiving following error while connecting the spark server via shell
 if one shell is already open.
 How can I open multiple sessions ?

 Does anyone know abt Workflow Engine/Job Server like apache oozie for spark
 ?

 /
 Welcome to
     __
  / __/__  ___ _/ /__
 _\ \/ _ \/ _ `/ __/  '_/
/___/ .__/\_,_/_/ /_/\_\   version 1.0.2
   /_/

 Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java
 1.7.0_60)
 Type in expressions to have them evaluated.
 Type :help for more information.
 14/09/04 15:07:46 INFO spark.SecurityManager: Changing view acls to: root
 14/09/04 15:07:46 INFO spark.SecurityManager: SecurityManager:
 authentication disabled; ui acls disabled; users with view permissions:
 Set(root)
 14/09/04 15:07:46 INFO slf4j.Slf4jLogger: Slf4jLogger started
 14/09/04 15:07:47 INFO Remoting: Starting remoting
 14/09/04 15:07:47 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sp...@sparkmaster.guavus.com:42236]
 14/09/04 15:07:47 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://sp...@sparkmaster.guavus.com:42236]
 14/09/04 15:07:47 INFO spark.SparkEnv: Registering MapOutputTracker
 14/09/04 15:07:47 INFO spark.SparkEnv: Registering BlockManagerMaster
 14/09/04 15:07:47 INFO storage.DiskBlockManager: Created local directory at
 /tmp/spark-local-20140904150747-4dcd
 14/09/04 15:07:47 INFO storage.MemoryStore: MemoryStore started with
 capacity 294.9 MB.
 14/09/04 15:07:47 INFO network.ConnectionManager: Bound socket to port
 54453
 with id = ConnectionManagerId(sparkmaster.guavus.com,54453)
 14/09/04 15:07:47 INFO storage.BlockManagerMaster: Trying to register
 BlockManager
 14/09/04 15:07:47 INFO storage.BlockManagerInfo: Registering block manager
 sparkmaster.guavus.com:54453 with 294.9 MB RAM
 14/09/04 15:07:47 INFO storage.BlockManagerMaster: Registered BlockManager
 14/09/04 15:07:47 INFO spark.HttpServer: Starting HTTP Server
 14/09/04 15:07:47 INFO server.Server: jetty-8.y.z-SNAPSHOT
 14/09/04 15:07:47 INFO server.AbstractConnector: Started
 SocketConnector@0.0.0.0:48977
 14/09/04 15:07:47 INFO broadcast.HttpBroadcast: Broadcast server started at
 http://192.168.1.21:48977
 14/09/04 15:07:47 INFO spark.HttpFileServer: HTTP File server directory is
 /tmp/spark-0e45759a-2c58-439a-8e96-95b0bc1d6136
 14/09/04 15:07:47 INFO spark.HttpServer: Starting HTTP Server
 14/09/04 15:07:47 INFO server.Server: jetty-8.y.z-SNAPSHOT
 14/09/04 15:07:47 INFO server.AbstractConnector: Started
 SocketConnector@0.0.0.0:39962
 14/09/04 15:07:48 INFO server.Server: jetty-8.y.z-SNAPSHOT
 14/09/04 15:07:48 WARN component.AbstractLifeCycle: FAILED
 SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address
 already
 in use
 java.net.BindException: Address already in use
 at sun.nio.ch.Net.bind0(Native Method)
 at sun.nio.ch.Net.bind(Net.java:444)
 at sun.nio.ch.Net.bind(Net.java:436)
 at
 sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
 at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
 at

 org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187)
 at

 org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316)
 at

 org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265)
 at

 org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
 at org.eclipse.jetty.server.Server.doStart(Server.java:293)
 at

 org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
 at

 org.apache.spark.ui.JettyUtils$$anonfun$1.apply$mcV$sp(JettyUtils.scala:192)
 at
 org.apache.spark.ui.JettyUtils$$anonfun$1.apply(JettyUtils.scala:192)
 at
 org.apache.spark.ui.JettyUtils$$anonfun$1.apply(JettyUtils.scala:192)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.ui.JettyUtils$.connect$1(JettyUtils.scala:191)
 at
 org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:205)
 at org.apache.spark.ui.WebUI.bind(WebUI.scala:99)
 at org.apache.spark.SparkContext.init(SparkContext.scala:223)
 at
 org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:957)
 at 

Re: error: type mismatch while Union

2014-09-05 Thread Yana Kadiyska
Which version are you using -- I can reproduce your issue w/ 0.9.2 but not
with 1.0.1...so my guess is that it's a bug and the fix hasn't been
backported... No idea on a workaround though..


On Fri, Sep 5, 2014 at 7:58 AM, Dhimant dhimant84.jays...@gmail.com wrote:

 Hi,
 I am getting type mismatch error while union operation.
 Can someone suggest solution ?

   / case class MyNumber(no: Int, secondVal: String) extends Serializable
 with Ordered[MyNumber] {
   override def toString(): String = this.no.toString +   +
 this.secondVal
   override def compare(that: MyNumber): Int = this.no compare that.no
   override def compareTo(that: MyNumber): Int = this.no compare
 that.no
   def Equals(that: MyNumber): Boolean = {
 (this.no == that.no)  (that match {
   case MyNumber(n1, n2) = n1 == no  n2 == secondVal
   case _ = false
 })
   }
 }
 val numbers = sc.parallelize(1 to 20, 10)
 val firstRdd = numbers.map(new MyNumber(_, A))
 val secondRDD = numbers.map(new MyNumber(_, B))
 val numberRdd = firstRdd .union(secondRDD )
 console:24: error: type mismatch;
  found   : org.apache.spark.rdd.RDD[MyNumber]
  required: org.apache.spark.rdd.RDD[MyNumber]
val numberRdd = onenumberRdd.union(anotherRDD)/



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/error-type-mismatch-while-Union-tp13547.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Cannot run SimpleApp as regular Java app

2014-09-09 Thread Yana Kadiyska
spark-submit is a script which calls spark-class script. Can you output the
command that spark-class runs (say, by putting set -x before the very last
line?). You should see the java command that is being run. The scripts do
some parameter setting so it's possible you're missing something. It seems
to me you think your worker memory is 2G but the executor is clearly
launched with -Xms512M -Xmx512M...so that's all you'd get.

On Mon, Sep 8, 2014 at 10:16 AM, ericacm eric...@gmail.com wrote:

 Dear all:

 I am a brand new Spark user trying out the SimpleApp from the Quick Start
 page.

 Here is the code:

 object SimpleApp {
   def main(args: Array[String]) {
 val logFile = /dev/spark-1.0.2-bin-hadoop2/README.md // Should be
 some
 file on your system
 val conf = new SparkConf()
 .setAppName(Simple Application)
 .set(spark.executor.memory, 512m)
 .setMaster(spark://myhost.local:7077)


 .setJars(Seq(/spark-experiments/target/spark-experiments-1.0-SNAPSHOT.jar))
 val sc = new SparkContext(conf)
 try {
   val logData = sc.textFile(logFile, 2).cache()
   val numAs = logData.filter(line = line.contains(a)).count()
   val numBs = logData.filter(line = line.contains(b)).count()
   println(Lines with a: %s, Lines with b: %s.format(numAs, numBs))
 } finally {
   sc.stop()
 }
   }
 }

 I am using Spark 1.0.2 and Scala 2.10.4.  In spark-env.sh I have
 SPARK_WORKER_MEMORY=2g.

 I am trying to run this as a standalone Java app in my IDE.

 Note that this code *does* work when I either
 - Change the master to local (works running from IDE)
 - Run it using spark-submit

 The application/driver log is:

 14/09/08 10:03:55 INFO spark.SecurityManager: Changing view acls to: eric
 14/09/08 10:03:55 INFO spark.SecurityManager: SecurityManager:
 authentication disabled; ui acls disabled; users with view permissions:
 Set(eric)
 14/09/08 10:03:56 INFO slf4j.Slf4jLogger: Slf4jLogger started
 14/09/08 10:03:56 INFO Remoting: Starting remoting
 14/09/08 10:03:56 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://spark@10.0.1.5:61645]
 14/09/08 10:03:56 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://spark@10.0.1.5:61645]
 14/09/08 10:03:56 INFO spark.SparkEnv: Registering MapOutputTracker
 14/09/08 10:03:56 INFO spark.SparkEnv: Registering BlockManagerMaster
 14/09/08 10:03:56 INFO storage.DiskBlockManager: Created local directory at

 /var/folders/j1/5rzyf1x97q9_7gj3mdc79t3cgn/T/spark-local-20140908100356-2496
 14/09/08 10:03:56 INFO storage.MemoryStore: MemoryStore started with
 capacity 279.5 MB.
 14/09/08 10:03:56 INFO network.ConnectionManager: Bound socket to port
 61646
 with id = ConnectionManagerId(10.0.1.5,61646)
 14/09/08 10:03:56 INFO storage.BlockManagerMaster: Trying to register
 BlockManager
 14/09/08 10:03:56 INFO storage.BlockManagerInfo: Registering block manager
 10.0.1.5:61646 with 279.5 MB RAM
 14/09/08 10:03:56 INFO storage.BlockManagerMaster: Registered BlockManager
 14/09/08 10:03:56 INFO spark.HttpServer: Starting HTTP Server
 14/09/08 10:03:57 INFO server.Server: jetty-8.1.14.v20131031
 14/09/08 10:03:57 INFO server.AbstractConnector: Started
 SocketConnector@0.0.0.0:61647
 14/09/08 10:03:57 INFO broadcast.HttpBroadcast: Broadcast server started at
 http://10.0.1.5:61647
 14/09/08 10:03:57 INFO spark.HttpFileServer: HTTP File server directory is

 /var/folders/j1/5rzyf1x97q9_7gj3mdc79t3cgn/T/spark-d5637279-5caa-4c14-a00f-650f1dd915bc
 14/09/08 10:03:57 INFO spark.HttpServer: Starting HTTP Server
 14/09/08 10:03:57 INFO server.Server: jetty-8.1.14.v20131031
 14/09/08 10:03:57 INFO server.AbstractConnector: Started
 SocketConnector@0.0.0.0:61648
 14/09/08 10:03:57 INFO server.Server: jetty-8.1.14.v20131031
 14/09/08 10:03:57 INFO server.AbstractConnector: Started
 SelectChannelConnector@0.0.0.0:4040
 14/09/08 10:03:57 INFO ui.SparkUI: Started SparkUI at http://10.0.1.5:4040
 2014-09-08 10:03:57.567 java[58736:1703] Unable to load realm info from
 SCDynamicStore
 14/09/08 10:03:57 INFO spark.SparkContext: Added JAR
 /spark-experiments/target/spark-experiments-1.0-SNAPSHOT.jar at
 http://10.0.1.5:61648/jars/spark-experiments-1.0-SNAPSHOT.jar with
 timestamp
 1410185037723
 14/09/08 10:03:57 INFO client.AppClient$ClientActor: Connecting to master
 spark://myhost.local:7077...
 14/09/08 10:03:57 INFO storage.MemoryStore: ensureFreeSpace(32960) called
 with curMem=0, maxMem=293063884
 14/09/08 10:03:57 INFO storage.MemoryStore: Block broadcast_0 stored as
 values to memory (estimated size 32.2 KB, free 279.5 MB)
 14/09/08 10:03:58 INFO cluster.SparkDeploySchedulerBackend: Connected to
 Spark cluster with app ID app-20140908100358-0002
 14/09/08 10:03:58 INFO client.AppClient$ClientActor: Executor added:
 app-20140908100358-0002/0 on worker-20140908100129-10.0.1.5-61526
 (10.0.1.5:61526) with 8 cores
 14/09/08 10:03:58 INFO cluster.SparkDeploySchedulerBackend: Granted
 executor
 ID 

Re: Problem in running mosek in spark cluster - java.lang.UnsatisfiedLinkError: no mosekjava7_0 in java.library.path at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1738)

2014-09-09 Thread Yana Kadiyska
If that library has native dependencies you'd need to make sure that the
native code is on all boxes and in the path with export
SPARK_LIBRARY_PATH=...

On Tue, Sep 9, 2014 at 10:17 AM, ayandas84 ayanda...@gmail.com wrote:

  We have a small apache spark cluster of 6 computers. We are trying to
 solve
 a distributed problem which requires solving a optimization problem at each
 machine during a spark map operation.

 We decided to use mosek as the solver and I collected an academic license
 to
 this end. We observed that mosek works fine in a single system. However,
 when we prepare a jar file, include the mosek.jar into the library and try
 to run the jar in the cluster as a spark job it gives errors.

 java.lang.UnsatisfiedLinkError: no mosekjava7_0 in java.library.path

 Does this problem has any thing to do with the license? We have set the
 necessary path variables i n the profile of the user in the master machine
 but we are not sure about what changes needs to be made to the other
 machines in the cluster.

 We shall be greatly obliged if you please suggest the necessary solution
 and
 help us out.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-running-mosek-in-spark-cluster-java-lang-UnsatisfiedLinkError-no-mosekjava7-0-in-java-lib-tp13799.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: spark.cleaner.ttl and spark.streaming.unpersist

2014-09-10 Thread Yana Kadiyska
Tim, I asked a similar question twice:
here
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-executors-to-stay-alive-tt12940.html
and here
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Executor-OOM-tt12383.html

and have not yet received any responses. I noticed that the heapdump only
contains a very large byte array consuming about 66%(the second link
contains a picture of my heap -- I ran with a small heap to be able to get
the failure quickly)

I don't have solutions but wanted to affirm that I've observed a similar
situation...

On Wed, Sep 10, 2014 at 2:24 PM, Tim Smith secs...@gmail.com wrote:

 I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case,
 the receivers die within an hour because Yarn kills the containers for high
 memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I
 don't think stale RDDs are an issue here. I did a jmap -histo on a couple
 of running receiver processes and in a heap of 30G, roughly ~16G is taken
 by [B which is byte arrays.

 Still investigating more and would appreciate pointers for
 troubleshooting. I have dumped the heap of a receiver and will try to go
 over it.




 On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez 
 langel.gro...@gmail.com wrote:

 I somehow missed that parameter when I was reviewing the documentation,
 that should do the trick! Thank you!

 2014-09-10 2:10 GMT+01:00 Shao, Saisai saisai.s...@intel.com:

  Hi Luis,



 The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
 used to remove useless timeout streaming data, the difference is that
 “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming
 input data, but also Spark’s useless metadata; while
 “spark.streaming.unpersist” is reference-based cleaning mechanism,
 streaming data will be removed when out of slide duration.



 Both these two parameter can alleviate the memory occupation of Spark
 Streaming. But if the data is flooded into Spark Streaming when start up
 like your situation using Kafka, these two parameters cannot well mitigate
 the problem. Actually you need to control the input data rate to not inject
 so fast, you can try “spark.straming.receiver.maxRate” to control the
 inject rate.



 Thanks

 Jerry



 *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
 *Sent:* Wednesday, September 10, 2014 5:21 AM
 *To:* user@spark.apache.org
 *Subject:* spark.cleaner.ttl and spark.streaming.unpersist



 The executors of my spark streaming application are being killed due to
 memory issues. The memory consumption is quite high on startup because is
 the first run and there are quite a few events on the kafka queues that are
 consumed at a rate of 100K events per sec.

 I wonder if it's recommended to use spark.cleaner.ttl and
 spark.streaming.unpersist together to mitigate that problem. And I also
 wonder if new RDD are being batched while a RDD is being processed.

 Regards,

 Luis






Need help with ThriftServer/Spark1.1.0

2014-09-15 Thread Yana Kadiyska
Hi ladies and gents,

trying to get Thrift server up and running in an effort to replace Shark.

My first attempt to run sbin/start-thriftserver resulted in:

14/09/15 17:09:05 ERROR TThreadPoolServer: Error occurred during processing
of message.
java.lang.RuntimeException:
org.apache.thrift.transport.TTransportException: java.net.SocketException:
Connection reset
at
org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:219)
at
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:189)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: org.apache.thrift.transport.TTransportException:
java.net.SocketException: Connection reset
at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:129)
at
org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at
org.apache.thrift.transport.TSaslTransport.receiveSaslMessage(TSaslTransport.java:178)
at
org.apache.thrift.transport.TSaslServerTransport.handleSaslStartMessage(TSaslServerTransport.java:125)
at
org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253)
at
org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:41)
at
org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:216)
... 4 more
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:196)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:275)
at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127)

After turing logging levels up it seemed like this error is related to SASL
and the SO advice was to turn it off
via: 
propertynamehive.server2.authentication/namevalueNOSASL/value/property

But I still have no luck:
(this is the full command that gets run)

java -cp
/spark-1.1.0-bin-cdh4/conf:/spark-1.1.0-bin-cdh4/lib/spark-assembly-1.1.0-hadoop2.0.0-mr1-cdh4.2.0.jar:/spark-1.1.0-bin-cdh4/lib/datanucleus-core-3.2.2.jar:/spark-1.1.0-bin-cdh4/lib/datanucleus-rdbms-3.2.1.jar:/a/shark/spark-1.1.0-bin-cdh4/lib/datanucleus-api-jdo-3.2.1.jar:/hadoop/share/hadoop/mapreduce1//conf
-XX:MaxPermSize=128m-Xms4012m -Xmx4012m org.apache.spark.deploy.SparkSubmit
--class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 --master
spark://master-ip:7077 spark-internal --hiveconf
hive.server.thrift.bind.host ip-to-bind

14/09/15 17:05:05 ERROR TThreadPoolServer:
Error occurred during processing of message.
java.lang.ClassCastException: org.apache.thrift.transport.TSocket cannot be
cast to org.apache.thrift.transport.TSaslServerTransport
at
org.apache.hive.service.auth.TUGIContainingProcessor.process(TUGIContainingProcessor.java:53)
at
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:206)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

Any idea what might be going on? I compiled w/ -Phive against the 1.1.0.
hive-site.conf is the conf file we used previously. SparkSQL does work for
me  but does not have  a lot of functionality I need.

Any help appreciated -- I do acklnowledge this is likely more of a Hive
question than spark...If there is a precompiled version of CDH4 that
includes thrift-server I'd be happy to try that too...

thanks again.


Re: Serving data

2014-09-16 Thread Yana Kadiyska
If your dashboard is doing ajax/pull requests against say a REST API you
can always create a Spark context in your rest service and use SparkSQL to
query over the parquet files. The parquet files are already on disk so it
seems silly to write both to parquet and to a DB...unless I'm missing
something in your setup.

On Tue, Sep 16, 2014 at 4:18 AM, Marius Soutier mps@gmail.com wrote:

 Writing to Parquet and querying the result via SparkSQL works great
 (except for some strange SQL parser errors). However the problem remains,
 how do I get that data back to a dashboard. So I guess I’ll have to use a
 database after all.


 You can batch up data  store into parquet partitions as well.  query
 it using another SparkSQL  shell, JDBC driver in SparkSQL is part 1.1 i
 believe.




Re: persistent state for spark streaming

2014-10-01 Thread Yana Kadiyska
I don't think persist is meant for end-user usage. You might want to call
saveAsTextFiles, for example, if you're saving to the file system as
strings. You can also dump the DStream to a DB -- there are samples on this
list (you'd have to do a combo of foreachRDD and mapPartitions, likely)

On Wed, Oct 1, 2014 at 3:49 AM, Chia-Chun Shih chiachun.s...@gmail.com
wrote:

 Hi,

 My application is to digest user logs and deduct user quotas. I need to
 maintain latest states of user quotas persistently, so that latest user
 quotas will not be lost.

 I have tried *updateStateByKey* to generate and a DStream for user quotas
 and called *persist(StorageLevel.MEMORY_AND_DISK())*, but it didn't work.

 Are there better approaches to persist states for spark streaming?

 Thanks.









Re: persistent state for spark streaming

2014-10-02 Thread Yana Kadiyska
Yes -- persist is more akin to caching -- it's telling Spark to materialize
that RDD for fast reuse but it's not meant for the end user to query/use
across processes, etc.(at least that's my understanding).

On Thu, Oct 2, 2014 at 4:04 AM, Chia-Chun Shih chiachun.s...@gmail.com
wrote:

 Hi Yana,

 So, user quotas need another data store, which can guarantee persistence
 and afford frequent data updates/access. Is it correct?

 Thanks,
 Chia-Chun

 2014-10-01 21:48 GMT+08:00 Yana Kadiyska yana.kadiy...@gmail.com:

 I don't think persist is meant for end-user usage. You might want to call
 saveAsTextFiles, for example, if you're saving to the file system as
 strings. You can also dump the DStream to a DB -- there are samples on this
 list (you'd have to do a combo of foreachRDD and mapPartitions, likely)

 On Wed, Oct 1, 2014 at 3:49 AM, Chia-Chun Shih chiachun.s...@gmail.com
 wrote:

 Hi,

 My application is to digest user logs and deduct user quotas. I need to
 maintain latest states of user quotas persistently, so that latest user
 quotas will not be lost.

 I have tried *updateStateByKey* to generate and a DStream for user
 quotas and called *persist(StorageLevel.MEMORY_AND_DISK())*, but it
 didn't work.

 Are there better approaches to persist states for spark streaming?

 Thanks.












[SparkSQL] Function parity with Shark?

2014-10-02 Thread Yana Kadiyska
Hi, in an effort to migrate off of Shark I recently tried the Thrift JDBC
server that comes with Spark 1.1.0.

However I observed that conditional functions do not work (I tried 'case'
and 'coalesce')

some string functions like 'concat' also did not work.

Is there a list of what's missing or a roadmap of when it will be added? (I
know percentiles are pending, for example but do not see JIRAs for the
others in this email).


Re: [SparkSQL] Function parity with Shark?

2014-10-03 Thread Yana Kadiyska
)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at scala.collection.immutable.$colon$colon.writeObject(List.scala:379)
at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

​


Let me know if any of these warrant a JIRA

thanks




On Thu, Oct 2, 2014 at 2:00 PM, Michael Armbrust mich...@databricks.com
wrote:

 What are the errors you are seeing.  All of those functions should work.

 On Thu, Oct 2, 2014 at 6:56 AM, Yana Kadiyska yana.kadiy...@gmail.com
 wrote:

 Hi, in an effort to migrate off of Shark I recently tried the Thrift JDBC
 server that comes with Spark 1.1.0.

 However I observed that conditional functions do not work (I tried 'case'
 and 'coalesce')

 some string functions like 'concat' also did not work.

 Is there a list of what's missing or a roadmap of when it will be added?
 (I know percentiles are pending, for example but do not see JIRAs for the
 others in this email).





Re: Akka connection refused when running standalone Scala app on Spark 0.9.2

2014-10-03 Thread Yana Kadiyska
when you're running spark-shell and the example, are you actually
specifying --master spark://master:7077 as shown here:
http://spark.apache.org/docs/latest/programming-guide.html#initializing-spark

because if you're not, your spark-shell is running in local mode and not
actually connecting to the cluster. Also, if you run spark-shell against
the cluster, you'll see it listed under the Running applications in the
master UI. It would be pretty odd for spark shell to connect successfully
to the cluster but for your app to not connect...(which is why I suspect
that you're running spark-shell local)

Another thing to check, the executors need to connect back to your driver,
so it could be that you have to set the driver host or driver port...in
fact looking at your executor log, this seems fairly likely: is
host1/xxx.xx.xx.xx:45542
the machine where your driver is running? is that host/port reachable from
the worker machines?

On Fri, Oct 3, 2014 at 5:32 AM, Irina Fedulova fedul...@gmail.com wrote:

 Hi,

 I have set up Spark 0.9.2 standalone cluster using CDH5 and pre-built
 spark distribution archive for Hadoop 2. I was not using spark-ec2 scripts
 because I am not on EC2 cloud.

 Spark-shell seems to be working properly -- I am able to perform simple
 RDD operations, as well as e.g. SparkPi standalone example works well when
 run via `run-example`. Web UI shows all workers connected.

 However, standalone Scala application gets connection refused messages.
 I think this has something to do with configuration, because spark-shell
 and SparkPi works well. I verified that .setMaster and .setSparkHome are
 properly assigned within scala app.

 Is there anything else in configuration of standalone scala app on spark
 that I am missing?
 I would very much appreciate any clues.

 Namely, I am trying to run MovieLensALS.scala example from AMPCamp big
 data mini course (http://ampcamp.berkeley.edu/big-data-mini-course/movie-
 recommendation-with-mllib.html).

 Here is error which I get when try to run compiled jar:
 ---
 root@master:~/machine-learning/scala# sbt/sbt package run
 /movielens/medium
 Launching sbt from sbt/sbt-launch-0.12.4.jar
 [info] Loading project definition from /root/training/machine-
 learning/scala/project
 [info] Set current project to movielens-als (in build
 file:/root/training/machine-learning/scala/)
 [info] Compiling 1 Scala source to /root/training/machine-
 learning/scala/target/scala-2.10/classes...
 [warn] there were 2 deprecation warning(s); re-run with -deprecation for
 details
 [warn] one warning found
 [info] Packaging 
 /root/training/machine-learning/scala/target/scala-2.10/movielens-als_2.10-0.0.jar
 ...
 [info] Done packaging.
 [success] Total time: 6 s, completed Oct 2, 2014 1:19:00 PM
 [info] Running MovieLensALS /movielens/medium
 master = spark://master:7077
 log4j:WARN No appenders could be found for logger
 (akka.event.slf4j.Slf4jLogger).
 log4j:WARN Please initialize the log4j system properly.
 log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
 more info.
 14/10/02 13:19:01 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 HERE
 THERE
 14/10/02 13:19:02 INFO FileInputFormat: Total input paths to process : 1
 14/10/02 13:19:03 ERROR TaskSchedulerImpl: Lost executor 0 on host2:
 remote Akka client disassociated
 14/10/02 13:19:03 WARN TaskSetManager: Lost TID 1 (task 0.0:1)
 14/10/02 13:19:03 WARN TaskSetManager: Lost TID 0 (task 0.0:0)
 14/10/02 13:19:03 ERROR TaskSchedulerImpl: Lost executor 4 on host5:
 remote Akka client disassociated
 14/10/02 13:19:03 WARN TaskSetManager: Lost TID 3 (task 0.0:1)
 14/10/02 13:19:03 ERROR TaskSchedulerImpl: Lost executor 1 on host4:
 remote Akka client disassociated
 14/10/02 13:19:03 WARN TaskSetManager: Lost TID 2 (task 0.0:0)
 14/10/02 13:19:03 WARN TaskSetManager: Lost TID 4 (task 0.0:1)
 14/10/02 13:19:03 ERROR TaskSchedulerImpl: Lost executor 3 on host3:
 remote Akka client disassociated
 14/10/02 13:19:03 WARN TaskSetManager: Lost TID 6 (task 0.0:0)
 14/10/02 13:19:03 ERROR TaskSchedulerImpl: Lost executor 2 on host1:
 remote Akka client disassociated
 14/10/02 13:19:03 WARN TaskSetManager: Lost TID 5 (task 0.0:1)
 14/10/02 13:19:03 WARN TaskSetManager: Lost TID 7 (task 0.0:0)
 14/10/02 13:19:04 ERROR TaskSchedulerImpl: Lost executor 6 on host4:
 remote Akka client disassociated
 14/10/02 13:19:04 WARN TaskSetManager: Lost TID 8 (task 0.0:0)
 14/10/02 13:19:04 WARN TaskSetManager: Lost TID 9 (task 0.0:1)
 14/10/02 13:19:04 ERROR TaskSchedulerImpl: Lost executor 5 on host2:
 remote Akka client disassociated
 14/10/02 13:19:04 WARN TaskSetManager: Lost TID 10 (task 0.0:1)
 14/10/02 13:19:04 ERROR TaskSchedulerImpl: Lost executor 7 on host5:
 remote Akka client disassociated
 14/10/02 13:19:04 WARN TaskSetManager: Lost TID 11 (task 0.0:0)
 14/10/02 13:19:04 WARN TaskSetManager: Lost TID 12 (task 0.0:1)
 14/10/02 

Re: Akka connection refused when running standalone Scala app on Spark 0.9.2

2014-10-03 Thread Yana Kadiyska
I don't think it's a red herring... (btw. spark.driver.host needs to be set
to the IP or  FQDN of the machine where you're running the program).

I am running 0.9.2 on CDH4 and the beginning of my executor log looks like
below (I've obfuscated the IP -- this is the log from executor
a100-2-200-245). My driver is running on a100-2-200-238. I am not
specifically setting spark.driver.host or the port but depending on how
your machine is setup you might need to:

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
14/10/03 18:14:48 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/10/03 18:14:48 INFO Remoting: Starting remoting
14/10/03 18:14:48 INFO Remoting: Remoting started; listening on
addresses :[akka.tcp://sparkExecutor@a100-2-200-245:56760]
14/10/03 18:14:48 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkExecutor@a100-2-200-245:56760]
**14/10/03 18:14:48 INFO executor.CoarseGrainedExecutorBackend:
Connecting to driver:
akka.tcp://spark@a100-2-200-238:61505/user/CoarseGrainedScheduler**
14/10/03 18:14:48 INFO worker.WorkerWatcher: Connecting to worker
akka.tcp://sparkWorker@a100-2-200-245:48067/user/Worker
14/10/03 18:14:48 INFO worker.WorkerWatcher: Successfully connected to
akka.tcp://sparkWorker@a100-2-200-245:48067/user/Worker
**14/10/03 18:14:49 INFO executor.CoarseGrainedExecutorBackend:
Successfully registered with driver**
14/10/03 18:14:49 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/10/03 18:14:49 INFO Remoting: Starting remoting

​
If you look at the lines with ** this is where the driver successfully
connects and at this point you should see your app show up in the UI under
Running applications...The worker log you're posting -- is that the log
that stored under work/app-id/executor-id/stderr? The first line you
show in that log is

 INFO worker.Worker: Executor
app-20141002131901-0002/9 finished with state FAILED

but I imagine something prior to that would say why the executor failed?

On Fri, Oct 3, 2014 at 2:56 PM, Irina Fedulova fedul...@gmail.com wrote:

 Yana, many thanks for looking into this!

 I am not running spark-shell in local mode, I am really starting
 spark-shell with --master spark://master:7077 and run in cluster mode.

 Second thing is I tried to set spark.driver.host to master both in
 scala app when creating context, and in conf/spark-defaults.conf file, but
 this did not make any difference. Worker logs still have same messages:
 14/10/03 13:37:30 ERROR remote.EndpointWriter: AssociationError
 [akka.tcp://sparkWorker@host2:51414] - 
 [akka.tcp://sparkExecutor@host2:53851]:
 Error [Association failed with [akka.tcp://sparkExecutor@host2:53851]] [
 akka.remote.EndpointAssociationException: Association failed with
 [akka.tcp://sparkExecutor@host2:53851]
 Caused by: 
 akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
 Connection refused: host2/xxx.xx.xx.xx:53851
 ]

 note that host1, host2 etc are slave hostnames, and each slave has error
 message about itself: host1:some random port cannot connect to
 host1:some random port.

 However I noticed that after running successfully SparkPi app log also is
 populated with similar connection refused messages, but this does not
 lead to application death... So these worker logs are probably a false clue.



 On 03.10.14 19:37, Yana Kadiyska wrote:

 when you're running spark-shell and the example, are you actually
 specifying --master spark://master:7077 as shown here:
 http://spark.apache.org/docs/latest/programming-guide.html#
 initializing-spark

 because if you're not, your spark-shell is running in local mode and not
 actually connecting to the cluster. Also, if you run spark-shell against
 the cluster, you'll see it listed under the Running applications in the
 master UI. It would be pretty odd for spark shell to connect
 successfully to the cluster but for your app to not connect...(which is
 why I suspect that you're running spark-shell local)

 Another thing to check, the executors need to connect back to your
 driver, so it could be that you have to set the driver host or driver
 port...in fact looking at your executor log, this seems fairly likely:
 is host1/xxx.xx.xx.xx:45542 the machine where your driver is running? is
 that host/port reachable from the worker machines?

 On Fri, Oct 3, 2014 at 5:32 AM, Irina Fedulova fedul...@gmail.com
 mailto:fedul...@gmail.com wrote:

 Hi,

 I have set up Spark 0.9.2 standalone cluster using CDH5 and
 pre-built spark distribution archive for Hadoop 2. I was not using
 spark-ec2 scripts because I am not on EC2 cloud.

 Spark-shell seems to be working properly -- I am able to perform
 simple RDD operations, as well as e.g. SparkPi standalone example
 works well when run via `run-example`. Web UI shows all workers
 connected.

 However, standalone Scala application gets connection refused
 messages. I think this has something to do with configuration,
 because spark-shell

Re: [SparkSQL] Function parity with Shark?

2014-10-06 Thread Yana Kadiyska
I have created

https://issues.apache.org/jira/browse/SPARK-3814
https://issues.apache.org/jira/browse/SPARK-3815

Will probably try my hand at 3814, seems like a good place to get started...

On Fri, Oct 3, 2014 at 3:06 PM, Michael Armbrust mich...@databricks.com
wrote:

 Thanks for digging in!  These both look like they should have JIRAs.

 On Fri, Oct 3, 2014 at 8:14 AM, Yana Kadiyska yana.kadiy...@gmail.com
 wrote:

 Thanks -- it does appear that I misdiagnosed a bit: case works generally
 but it doesn't seem to like the bit operation, which does not seem to work
 (type of bit_field in Hive is bigint):

 Error: java.lang.RuntimeException:
 Unsupported language features in query: select (case when bit_field  1=1 
 then r_end - r_start else NULL end) from mytable where pkey='0178-2014-07' 
 LIMIT 2
 TOK_QUERY
   TOK_FROM
 TOK_TABREF
   TOK_TABNAME
mytable
   TOK_INSERT
 TOK_DESTINATION
   TOK_DIR
 TOK_TMP_FILE
 TOK_SELECT
   TOK_SELEXPR
 TOK_FUNCTION
   when
   =
 
   TOK_TABLE_OR_COL
 bit_field
   1
 1
   -
 TOK_TABLE_OR_COL
   r_end
 TOK_TABLE_OR_COL
   r_start
   TOK_NULL
 TOK_WHERE
   =
 TOK_TABLE_OR_COL
   pkey
 '0178-2014-07'
 TOK_LIMIT
   2


 SQLState:  null
 ErrorCode: 0

 ​

 similarly, concat seems to work but I get a failure in this query (due to
 LPAD I believe) :

 select customer_id from mytable where
 pkey=concat_ws('-',LPAD('077',4,'0'),'2014-07') LIMIT 2

 (there is something going on with the fact that the function is in the
 where clausethe following work fine:

 select concat_ws('-', LPAD(cast(112717 % 1024 AS
 STRING),4,'0'),'2014-07') from mytable where pkey='0077-2014-07' LIMIT 2
 select customer_id from mytable where pkey=concat_ws('-','0077','2014-07')
 LIMIT 2
 ​
 )

 14/10/03 14:51:35 ERROR server.SparkSQLOperationManager: Error executing 
 query:
 org.apache.spark.SparkException: Task not serializable
 at 
 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at 
 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
 at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
 at 
 org.apache.spark.sql.execution.Limit.execute(basicOperators.scala:146)
 at 
 org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:360)
 at 
 org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:360)
 at 
 org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager$$anon$1.run(SparkSQLOperationManager.scala:185)
 at 
 org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:193)
 at 
 org.apache.hive.service.cli.session.HiveSessionImpl.executeStatement(HiveSessionImpl.java:175)
 at 
 org.apache.hive.service.cli.CLIService.executeStatement(CLIService.java:150)
 at 
 org.apache.hive.service.cli.thrift.ThriftCLIService.ExecuteStatement(ThriftCLIService.java:207)
 at 
 org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1133)
 at 
 org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1118)
 at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
 at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
 at 
 org.apache.hive.service.auth.TUGIContainingProcessor$1.run(TUGIContainingProcessor.java:58)
 at 
 org.apache.hive.service.auth.TUGIContainingProcessor$1.run(TUGIContainingProcessor.java:55)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:415)
 at 
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
 at 
 org.apache.hadoop.hive.shims.HadoopShimsSecure.doAs(HadoopShimsSecure.java:526)
 at 
 org.apache.hive.service.auth.TUGIContainingProcessor.process(TUGIContainingProcessor.java:55)
 at 
 org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:206)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)
 Caused by: java.io.NotSerializableException: java.lang.reflect.Constructor
 at 
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at 
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at 
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508

Re: Help with using combineByKey

2014-10-09 Thread Yana Kadiyska
If you just want the ratio of positive to all values per key (if I'm
reading right) this works

val reduced= input.groupByKey().map(grp=
grp._2.filter(v=v0).size.toFloat/grp._2.size)
reduced.foreach(println)

I don't think you need reduceByKey or combineByKey as you're not doing
anything where the values depend on each other -- you're just counting...

On Thu, Oct 9, 2014 at 11:47 AM, HARIPRIYA AYYALASOMAYAJULA 
aharipriy...@gmail.com wrote:


 I am a beginner to Spark and finding it difficult to implement a very
 simple reduce operation. I read that is ideal to use combineByKey for
 complex reduce operations.

 My input:

 val input = sc.parallelize(List((LAX,6), (LAX,8), (LAX,7),
 (SFO,0), (SFO,1),
 (SFO,9),(PHX,65),(PHX,88),(KX,7),(KX,6),(KX,1), (KX,9),
 (HOU,56),(HOU,5),(HOU,59),(HOU,0),(MA,563),(MA,545),(MA,5),(MA,0),(MA,0)))


  val opPart1 = input.combineByKey(
(v) = (v, 1),
(var acc: (Int, Int), v) = ( if(v  0) acc._1 + 1 else acc._1, acc._2
 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) = (acc1._1 + acc2._1, acc1._2 +
 acc2._2)
)

val opPart2 = opPart1.map{ case (key, value) = (key,
 (value._1,value._2)) }

  opPart2.collectAsMap().map(println(_))

 If the value is greater than 0, the first accumulator should be
 incremented by 1, else it remains the same. The second accumulator is a
 simple counter for each value.  I am getting an incorrect output (garbage
 values )for the first accumulator. Please help.

  The equivalent reduce operation in Hadoop MapReduce is :


 public static class PercentageCalcReducer extends 
 ReducerText,IntWritable,Text,FloatWritable

 {

 private FloatWritable pdelay = new FloatWritable();


 public void reduce(Text key, IterableIntWritable values,Context
 context)throws IOException,InterruptedException

 {

 int acc2=0;

 float frac_delay, percentage_delay;

 int acc1=0;

 for(IntWritable val : values)

 {

 if(val.get()  0)

 {

 acc1++;

 }

 acc2++;

 }



 frac_delay = (float)acc1/acc2;

 percentage_delay = frac_delay * 100 ;

 pdelay.set(percentage_delay);

 context.write(key,pdelay);

 }

 }


 Please help. Thank you for your time.

 --

 Regards,
 Haripriya Ayyalasomayajula
 contact : 650-796-7112



Re: Problem executing Spark via JBoss application

2014-10-15 Thread Yana Kadiyska
From this line :  Removing executor app-20141015142644-0125/0 because it is
EXITED I would guess that you need to examine the executor log to see why
the executor actually exited. My guess would be that the executor cannot
connect back to your driver. But check the log from the executor. It should
be in SPARK_HOME/work/app-id/executor_id/stderr on the worker box, I
believe.

On Wed, Oct 15, 2014 at 8:56 AM, Mehdi Singer mehdi.sin...@lampiris.be
wrote:

  Hi,



 I have a Spark standalone example application which is working fine.

 I'm now trying to integrate this application into a J2EE application,
 deployed on JBoss 7.1.1 and accessed via a web service. The JBoss server is
 installed on my local machine (Windows 7) and the master Spark is remote
 (Linux).

 The example simply executes a count on my RDD.

 When I call the webservice I'm getting the following error at JBoss side
 when executing the count:



 11:48:10,232 ERROR
 [org.apache.catalina.core.ContainerBase.[jboss.web].[default-host].[/el2-etrm-spark].[ws]]
 (http--127.0.0.1-8082-3) Servlet.service() pour la servlet ws a généré
 une exception: java.lang.RuntimeException:
 org.apache.cxf.interceptor.Fault: Job cancelled because SparkContext was
 shut down

 at
 org.apache.cxf.interceptor.AbstractFaultChainInitiatorObserver.onMessage(AbstractFaultChainInitiatorObserver.java:116)
 [cxf-api-2.6.9.jar:2.6.9]

 at
 org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:322)
 [cxf-api-2.6.9.jar:2.4.3]

 at
 org.apache.cxf.transport.ChainInitiationObserver.onMessage(ChainInitiationObserver.java:121)
 [cxf-api-2.6.9.jar:2.6.9]

 at
 org.apache.cxf.transport.http.AbstractHTTPDestination.invoke(AbstractHTTPDestination.java:211)
 [cxf-bundle-2.6.2.jar:2.6.2]

 at
 org.apache.cxf.transport.servlet.ServletController.invokeDestination(ServletController.java:213)
 [cxf-bundle-2.6.2.jar:2.6.2]

 at
 org.apache.cxf.transport.servlet.ServletController.invoke(ServletController.java:154)
 [cxf-bundle-2.6.2.jar:2.6.2]

 at
 org.apache.cxf.transport.servlet.CXFNonSpringServlet.invoke(CXFNonSpringServlet.java:130)
 [cxf-bundle-2.6.2.jar:2.6.2]

 at
 org.apache.cxf.transport.servlet.AbstractHTTPServlet.handleRequest(AbstractHTTPServlet.java:221)
 [cxf-bundle-2.6.2.jar:2.6.2]

 at
 org.apache.cxf.transport.servlet.AbstractHTTPServlet.doGet(AbstractHTTPServlet.java:146)
 [cxf-bundle-2.6.2.jar:2.6.2]

 at
 javax.servlet.http.HttpServlet.service(HttpServlet.java:734)
 [jboss-servlet-api_3.0_spec-1.0.0.Final.jar:1.0.0.Final]

 at
 org.apache.cxf.transport.servlet.AbstractHTTPServlet.service(AbstractHTTPServlet.java:197)
 [cxf-bundle-2.6.2.jar:2.6.2]

 at
 org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:329)
 [jbossweb-7.0.13.Final.jar:]

 at
 org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:248)
 [jbossweb-7.0.13.Final.jar:]

 at
 org.springframework.orm.jpa.support.OpenEntityManagerInViewFilter.doFilterInternal(OpenEntityManagerInViewFilter.java:180)
 [spring-orm-3.2.3.RELEASE.jar:3.2.3.RELEASE]

 at
 org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
 [spring-web-3.2.3.RELEASE.jar:3.2.3.RELEASE]

 at
 org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:280)
 [jbossweb-7.0.13.Final.jar:]

 at
 org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:248)
 [jbossweb-7.0.13.Final.jar:]

 at
 org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:186)
 [spring-security-web-3.1.3.RELEASE.jar:3.1.3.RELEASE]

 at
 org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:160)
 [spring-security-web-3.1.3.RELEASE.jar:3.1.3.RELEASE]

 at
 org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:346)
 [spring-web-3.2.3.RELEASE.jar:3.2.3.RELEASE]

 at
 org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:259)
 [spring-web-3.2.3.RELEASE.jar:3.2.3.RELEASE]

 at
 org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:280)
 [jbossweb-7.0.13.Final.jar:]

 at
 org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:248)
 [jbossweb-7.0.13.Final.jar:]

 at
 org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:275)
 [jbossweb-7.0.13.Final.jar:]

 at
 org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:161)
 [jbossweb-7.0.13.Final.jar:]


Re: Exception Logging

2014-10-16 Thread Yana Kadiyska
you can out a try catch block in the map function and log the exception.
The only tricky part is that the exception log will be located in the
executor machine. Even if you don't do any trapping you should see the
exception stacktrace in the executors' stderr log which is visible through
the UI (if your app crashes the executor you can still see it as the last
executor that ran on a given worker). But things like println and logging
work inside map, you just have to remember everything happens on the remote
machine

On Thu, Oct 16, 2014 at 8:11 PM, Ge, Yao (Y.) y...@ford.com wrote:

  I need help to better trap Exception in the map functions. What is the
 best way to catch the exception and provide some helpful diagnostic
 information such as source of the input such as file name (and ideally line
 number if I am processing a text file)?



 -Yao



Re: create a Row Matrix

2014-10-22 Thread Yana Kadiyska
This works for me

import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.distributed.RowMatrix

val v1=Vectors.dense(Array(1d,2d))

val v2=Vectors.dense(Array(3d,4d))

val rows=sc.parallelize(List(v1,v2))

val mat=new RowMatrix(rows)

val svd: SingularValueDecomposition[RowMatrix, Matrix] =
mat.computeSVD(2, computeU = true)

​

On Wed, Oct 22, 2014 at 1:55 AM, viola viola.wiersc...@siemens.com wrote:

 Thanks for the quick response. However, I still only get error messages. I
 am
 able to load a .txt file with entries in it and use it in sparks, but I am
 not able to create a simple matrix, for instance a 2x2 row matrix
 [1 2
 3 4]
 I tried variations such as
 val RowMatrix = Matrix(2,2,array(1,3,2,4))
 but it doesn't work..





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/create-a-Row-Matrix-tp16913p16993.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Problem packing spark-assembly jar

2014-10-23 Thread Yana Kadiyska
Hi folks,

I'm trying to deploy the latest from master branch and having some trouble
with the assembly jar.

In the spark-1.1 official distribution(I use cdh version), I see the
following jars, where spark-assembly-1.1.0-hadoop2.0.0-mr1-cdh4.2.0.jar
contains a ton of stuff:
datanucleus-api-jdo-3.2.1.jar
datanucleus-core-3.2.2.jar
datanucleus-rdbms-3.2.1.jar
spark-assembly-1.1.0-hadoop2.0.0-mr1-cdh4.2.0.jar
spark-examples-1.1.0-hadoop2.0.0-mr1-cdh4.2.0.jar
spark-hive-thriftserver_2.10-1.1.0.jar
spark-hive_2.10-1.1.0.jar
spark-sql_2.10-1.1.0.jar


I tried to create a similar distribution off of master running
mvn -Phive -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests -Pbigtop-dist
package
and
./make-distribution.sh -Pbigtop-dist -Phive
-Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests


but in either case all I get in spark-assembly is near empty:

spark_official/dist/lib$ jar -tvf
spark-assembly-1.2.0-SNAPSHOT-hadoop2.0.0-mr1-cdh4.2.0.jar

META-INF/
META-INF/MANIFEST.MF
org/
org/apache/
org/apache/spark/
org/apache/spark/unused/
org/apache/spark/unused/UnusedStubClass.class
META-INF/maven/
META-INF/maven/org.spark-project.spark/
META-INF/maven/org.spark-project.spark/unused/
META-INF/maven/org.spark-project.spark/unused/pom.xml
META-INF/maven/org.spark-project.spark/unused/pom.properties
META-INF/NOTICE

Any advice on how to get spark-core and the rest packaged into the assembly
jar -- I'd like to have fewer things to copy around.


Re: Problem packing spark-assembly jar

2014-10-24 Thread Yana Kadiyska
thanks -- that was it. I could swear this had worked for me before and
indeed it's fixed this morning.

On Fri, Oct 24, 2014 at 6:34 AM, Sean Owen so...@cloudera.com wrote:

 I imagine this is a side effect of the change that was just reverted,
 related to publishing the effective pom? sounds related but I don't
 know.

 On Fri, Oct 24, 2014 at 2:22 AM, Yana Kadiyska yana.kadiy...@gmail.com
 wrote:
  Hi folks,
 
  I'm trying to deploy the latest from master branch and having some
 trouble
  with the assembly jar.
 
  In the spark-1.1 official distribution(I use cdh version), I see the
  following jars, where spark-assembly-1.1.0-hadoop2.0.0-mr1-cdh4.2.0.jar
  contains a ton of stuff:
  datanucleus-api-jdo-3.2.1.jar
  datanucleus-core-3.2.2.jar
  datanucleus-rdbms-3.2.1.jar
  spark-assembly-1.1.0-hadoop2.0.0-mr1-cdh4.2.0.jar
  spark-examples-1.1.0-hadoop2.0.0-mr1-cdh4.2.0.jar
  spark-hive-thriftserver_2.10-1.1.0.jar
  spark-hive_2.10-1.1.0.jar
  spark-sql_2.10-1.1.0.jar
 
 
  I tried to create a similar distribution off of master running
  mvn -Phive -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests -Pbigtop-dist
  package
  and
  ./make-distribution.sh -Pbigtop-dist -Phive
  -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests
 
  but in either case all I get in spark-assembly is near empty:
 
  spark_official/dist/lib$ jar -tvf
  spark-assembly-1.2.0-SNAPSHOT-hadoop2.0.0-mr1-cdh4.2.0.jar
 
  META-INF/
  META-INF/MANIFEST.MF
  org/
  org/apache/
  org/apache/spark/
  org/apache/spark/unused/
  org/apache/spark/unused/UnusedStubClass.class
  META-INF/maven/
  META-INF/maven/org.spark-project.spark/
  META-INF/maven/org.spark-project.spark/unused/
  META-INF/maven/org.spark-project.spark/unused/pom.xml
  META-INF/maven/org.spark-project.spark/unused/pom.properties
  META-INF/NOTICE
 
  Any advice on how to get spark-core and the rest packaged into the
 assembly
  jar -- I'd like to have fewer things to copy around.



[Spark SQL] Setting variables

2014-10-24 Thread Yana Kadiyska
Hi all,

I'm trying to set a pool for a JDBC session. I'm connecting to the thrift
server via JDBC client.

My installation appears to be good(queries run fine), I can see the pools
in the UI, but any attempt to set a variable (I tried
spark.sql.shuffle.partitions and spark.sql.thriftserver.scheduler.pool)
result in the exception below (trace is from Thriftserver log)


Any thoughts on what I'm doing wrong? (I am on master, built today)

SET spark.sql.thriftserver.scheduler.pool=mypool;select count(*) from mytable;


==

14/10/24 18:17:10 ERROR server.SparkSQLOperationManager: Error executing
query:
java.lang.NullPointerException
at
org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:309)
at
org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:272)
at
org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:244)
at
org.apache.spark.sql.execution.SetCommand.sideEffectResult$lzycompute(commands.scala:64)
at
org.apache.spark.sql.execution.SetCommand.sideEffectResult(commands.scala:55)
at
org.apache.spark.sql.execution.Command$class.execute(commands.scala:44)
at
org.apache.spark.sql.execution.SetCommand.execute(commands.scala:51)
at
org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:357)
at
org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:357)
at
org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)
at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:104)
at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:99)
at
org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager$$anon$1.run(SparkSQLOperationManager.scala:172)
at
org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:193)
at
org.apache.hive.service.cli.session.HiveSessionImpl.executeStatement(HiveSessionImpl.java:175)
at
org.apache.hive.service.cli.CLIService.executeStatement(CLIService.java:150)
at
org.apache.hive.service.cli.thrift.ThriftCLIService.ExecuteStatement(ThriftCLIService.java:207)
at
org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1133)
at
org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1118)
at
org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at
org.apache.hive.service.auth.TUGIContainingProcessor$1.run(TUGIContainingProcessor.java:58)
at
org.apache.hive.service.auth.TUGIContainingProcessor$1.run(TUGIContainingProcessor.java:55)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
at
org.apache.hadoop.hive.shims.HadoopShimsSecure.doAs(HadoopShimsSecure.java:526)
at
org.apache.hive.service.auth.TUGIContainingProcessor.process(TUGIContainingProcessor.java:55)
at
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:206)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)


Re: scalac crash when compiling DataTypeConversions.scala

2014-10-27 Thread Yana Kadiyska
guoxu1231, I struggled with the Idea problem for a full week. Same thing --
clean builds under MVN/Sbt, but no luck with IDEA. What worked for me was
the solution posted higher up in this thread -- it's a SO post that
basically says to delete all iml files anywhere under the project
directory.

Let me know if you can't see this mail and I'll locate the exact SO post

On Mon, Oct 27, 2014 at 5:15 AM, guoxu1231 guoxu1...@gmail.com wrote:

 Hi Stephen,
 I tried it again,
 To avoid the profile impact,  I execute mvn -DskipTests clean package
 with
 Hadoop 1.0.4 by default and open the IDEA and import it as a maven project,
 and I didn't choose any profile in the import wizard.
 Then Make project or re-build project in IDEA,  unfortunately the
 DataTypeConversions.scala compile failed agian.


 Any updated guide for using With IntelliJ IDEA?  I'm following the
 Building
 Spark with Maven in website.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/scalac-crash-when-compiling-DataTypeConversions-scala-tp17083p17333.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: problem with start-slaves.sh

2014-10-29 Thread Yana Kadiyska
I see this when I start a worker and then try to start it again forgetting
it's already running (I don't use start-slaves, I start the slaves
individually with start-slave.sh). All this is telling you is that there is
already a running process on that machine. You can see it if you do a ps
-aef|grep worker

you can look on the spark UI and see if your master shows this machine as
connected to it already. If it doesn't, you might want to kill the worker
process and restart it.

On Tue, Oct 28, 2014 at 4:32 PM, Pagliari, Roberto rpagli...@appcomsci.com
wrote:

 I ran sbin/start-master.sh followed by sbin/start-slaves.sh (I build with
 PHive option to be able to interface with hive)



 I’m getting this



 ip_address: org.apache.spark.deploy.worker.Worker running as process .
 Stop it first.



 Am I doing something wrong? In my specific case, shark+hive is running on
 the nodes. Does that interfere with spark?



 Thank you,



Re: CANNOT FIND ADDRESS

2014-10-29 Thread Yana Kadiyska
CANNOT FIND ADDRESS occurs when your executor has crashed. I'll look
further down where it shows each task and see if you see any tasks failed.
Then you can examine the error log of that executor and see why it died.

On Wed, Oct 29, 2014 at 9:35 AM, akhandeshi ami.khande...@gmail.com wrote:

 SparkApplication UI shows that one of the executor Cannot find Addresss
 Aggregated Metrics by Executor
 Executor ID Address Task Time   Total Tasks Failed Tasks
 Succeeded Tasks Input
 Shuffle ReadShuffle Write   Shuffle Spill (Memory)  Shuffle Spill
 (Disk)
 0   mddworker1.c.fi-mdd-poc.internal:42197  0 ms0   0   0
  0.0 B   136.1 MB184.9 MB
 146.8 GB135.4 MB
 1   CANNOT FIND ADDRESS 0 ms0   0   0   0.0 B
  87.4 MB 142.0 MB61.4 GB 81.4 MB

 I also see following in one of the executor logs for which the driver may
 have lost communication.

 14/10/29 13:18:33 WARN : Master_Client Heartbeat last execution took 90859
 ms. Longer than  the FIXED_EXECUTION_INTERVAL_MS 5000
 14/10/29 13:18:33 WARN : WorkerClientToWorkerHeartbeat last execution took
 90859 ms. Longer than  the FIXED_EXECUTION_INTERVAL_MS 1000
 14/10/29 13:18:33 WARN AkkaUtils: Error sending message in 1 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
 at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at
 scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at

 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at
 org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:176)
 at
 org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:362)

 I have also seen other variation of timeouts

 14/10/29 06:21:05 WARN SendingConnection: Error finishing connection to
 mddworker1.c.fi-mdd-poc.internal/10.240.179.241:40442
 java.net.ConnectException: Connection refused
 14/10/29 06:21:05 ERROR BlockManager: Failed to report broadcast_6_piece0
 to
 master; giving up.

 or

 14/10/29 07:23:40 WARN AkkaUtils: Error sending message in 1 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
 at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at
 scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at

 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at
 org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:176)
 at

 org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:218)
 at

 org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:58)
 at
 org.apache.spark.storage.BlockManager.org
 $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:310)
 at

 org.apache.spark.storage.BlockManager$$anonfun$reportAllBlocks$3.apply(BlockManager.scala:190)
 at

 org.apache.spark.storage.BlockManager$$anonfun$reportAllBlocks$3.apply(BlockManager.scala:188)
 at

 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at

 org.apache.spark.util.TimeStampedHashMap.foreach(TimeStampedHashMap.scala:107)
 at

 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at

 org.apache.spark.storage.BlockManager.reportAllBlocks(BlockManager.scala:188)
 at
 org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:207)
 at
 org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:366)

 How do I track down what is causing this problem?  Any suggestion on
 solution, debugging or workaround will be helpful!







 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/CANNOT-FIND-ADDRESS-tp17637.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: problem with start-slaves.sh

2014-10-30 Thread Yana Kadiyska
Roberto, I don't think shark is an issue -- I have shark server running on
a node that also acts as a worker. What you can do is turn off shark
server, just run start-all to start your spark cluster. then you can try
bin/spark-shell --master yourmasterip and see if you can successfully run
some hello world stuff. This will verify you have a working Spark
cluster. Shark is just an application on top of it, so I can't imagine
that's what's causing interference. But stopping it is the simplest way to
check.

On Wed, Oct 29, 2014 at 10:54 PM, Pagliari, Roberto rpagli...@appcomsci.com
 wrote:

  hi Yana,
 in my case I did not start any spark worker. However, shark was definitely
 running. Do you think that might be a problem?

  I will take a look

  Thank you,

  --
 *From:* Yana Kadiyska [yana.kadiy...@gmail.com]
 *Sent:* Wednesday, October 29, 2014 9:45 AM
 *To:* Pagliari, Roberto
 *Cc:* user@spark.apache.org
 *Subject:* Re: problem with start-slaves.sh

   I see this when I start a worker and then try to start it again
 forgetting it's already running (I don't use start-slaves, I start the
 slaves individually with start-slave.sh). All this is telling you is that
 there is already a running process on that machine. You can see it if you
 do a ps -aef|grep worker

  you can look on the spark UI and see if your master shows this machine
 as connected to it already. If it doesn't, you might want to kill the
 worker process and restart it.

 On Tue, Oct 28, 2014 at 4:32 PM, Pagliari, Roberto 
 rpagli...@appcomsci.com wrote:

  I ran sbin/start-master.sh followed by sbin/start-slaves.sh (I build
 with PHive option to be able to interface with hive)



 I’m getting this



 ip_address: org.apache.spark.deploy.worker.Worker running as process
 . Stop it first.



 Am I doing something wrong? In my specific case, shark+hive is running on
 the nodes. Does that interfere with spark?



 Thank you,





Re: overloaded method value updateStateByKey ... cannot be applied to ... when Key is a Tuple2

2014-11-12 Thread Yana Kadiyska
Adrian, do you know if this is documented somewhere? I was also under the
impression that setting a key's value to None would cause the key to be
discarded (without any explicit filtering on the user's part) but can not
find any official documentation to that effect

On Wed, Nov 12, 2014 at 2:43 PM, Adrian Mocanu amoc...@verticalscope.com
wrote:

 My understanding is that the reason you have an Option is so you could
 filter out tuples when None is returned. This way your state data won't
 grow forever.

 -Original Message-
 From: spr [mailto:s...@yarcdata.com]
 Sent: November-12-14 2:25 PM
 To: u...@spark.incubator.apache.org
 Subject: Re: overloaded method value updateStateByKey ... cannot be
 applied to ... when Key is a Tuple2

 After comparing with previous code, I got it work by making the return a
 Some instead of Tuple2.  Perhaps some day I will understand this.


 spr wrote
  --code
 
  val updateDnsCount = (values: Seq[(Int, Time)], state:
  Option[(Int,
  Time)]) = {
val currentCount = if (values.isEmpty) 0 else values.map( x =
  x._1).sum
val newMinTime = if (values.isEmpty) Time(Long.MaxValue) else
  values.map( x = x._2).min
 
val (previousCount, minTime) = state.getOrElse((0,
  Time(System.currentTimeMillis)))
 
//  (currentCount + previousCount, Seq(minTime, newMinTime).min)
  == old
Some(currentCount + previousCount, Seq(minTime, newMinTime).min)
  // == new
  }





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/overloaded-method-value-updateStateByKey-cannot-be-applied-to-when-Key-is-a-Tuple2-tp18644p18750.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
 commands, e-mail: user-h...@spark.apache.org


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




[SQL] HiveThriftServer2 failure detection

2014-11-19 Thread Yana Kadiyska
Hi all, I am running HiveThriftServer2 and noticed that the process stays
up even though there is no driver connected to the Spark master.

I started the server via sbin/start-thriftserver and my namenodes are
currently not operational. I can see from the log that there was an error
on startup:

14/11/19 16:32:58 ERROR HiveThriftServer2: Error starting HiveThriftServer2

and that the driver shut down as expected:

14/11/19 16:32:59 INFO SparkUI: Stopped Spark web UI at http://myip:4040
14/11/19 16:32:59 INFO DAGScheduler: Stopping DAGScheduler
14/11/19 16:32:59 INFO SparkDeploySchedulerBackend: Shutting down all executors
14/11/19 16:32:59 INFO SparkDeploySchedulerBackend: Asking each
executor to shut down
14/11/19 16:33:00 INFO MapOutputTrackerMasterActor:
MapOutputTrackerActor stopped!
14/11/19 16:33:00 INFO MemoryStore: MemoryStore cleared
14/11/19 16:33:00 INFO BlockManager: BlockManager stopped
14/11/19 16:33:00 INFO BlockManagerMaster: BlockManagerMaster stopped
14/11/19 16:33:00 INFO SparkContext: Successfully stopped SparkContext

​
However, when I try to run start-thriftserver.sh again I see an error
message that the process is already running and indeed there is a process
with that PID:

root 32334 1  0 16:32 ?00:00:00 /usr/local/bin/java
org.apache.spark.deploy.SparkSubmitDriverBootstrapper --class
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 --master
spark://myip:7077 --conf -spark.executor.extraJavaOptions=-verbose:gc
-XX:-PrintGCDetails -XX:+PrintGCTimeStamps spark-internal --hiveconf
hive.root.logger=INFO,console

​
 Is this a bug or design decision -- I am upgrading from Shark and we had
scripts that monitor the driver and restart on failure. Here it seems that
we would not be able to restart even though the driver died?


Re: [SQL] HiveThriftServer2 failure detection

2014-11-19 Thread Yana Kadiyska
https://issues.apache.org/jira/browse/SPARK-4497

On Wed, Nov 19, 2014 at 1:48 PM, Michael Armbrust mich...@databricks.com
wrote:

 This is not by design.  Can you please file a JIRA?

 On Wed, Nov 19, 2014 at 9:19 AM, Yana Kadiyska yana.kadiy...@gmail.com
 wrote:

 Hi all, I am running HiveThriftServer2 and noticed that the process stays
 up even though there is no driver connected to the Spark master.

 I started the server via sbin/start-thriftserver and my namenodes are
 currently not operational. I can see from the log that there was an error
 on startup:

 14/11/19 16:32:58 ERROR HiveThriftServer2: Error starting
 HiveThriftServer2

 and that the driver shut down as expected:

 14/11/19 16:32:59 INFO SparkUI: Stopped Spark web UI at http://myip:4040
 14/11/19 16:32:59 INFO DAGScheduler: Stopping DAGScheduler
 14/11/19 16:32:59 INFO SparkDeploySchedulerBackend: Shutting down all 
 executors
 14/11/19 16:32:59 INFO SparkDeploySchedulerBackend: Asking each executor to 
 shut down
 14/11/19 16:33:00 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor 
 stopped!
 14/11/19 16:33:00 INFO MemoryStore: MemoryStore cleared
 14/11/19 16:33:00 INFO BlockManager: BlockManager stopped
 14/11/19 16:33:00 INFO BlockManagerMaster: BlockManagerMaster stopped
 14/11/19 16:33:00 INFO SparkContext: Successfully stopped SparkContext

 ​
 However, when I try to run start-thriftserver.sh again I see an error
 message that the process is already running and indeed there is a process
 with that PID:

 root 32334 1  0 16:32 ?00:00:00 /usr/local/bin/java 
 org.apache.spark.deploy.SparkSubmitDriverBootstrapper --class 
 org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 --master 
 spark://myip:7077 --conf -spark.executor.extraJavaOptions=-verbose:gc 
 -XX:-PrintGCDetails -XX:+PrintGCTimeStamps spark-internal --hiveconf 
 hive.root.logger=INFO,console

 ​
  Is this a bug or design decision -- I am upgrading from Shark and we had
 scripts that monitor the driver and restart on failure. Here it seems that
 we would not be able to restart even though the driver died?





[SQL]Proper use of spark.sql.thriftserver.scheduler.pool

2014-11-19 Thread Yana Kadiyska
Hi sparkers,

I'm trying to use  spark.sql.thriftserver.scheduler.pool for the first time
(earlier I was stuck because of
https://issues.apache.org/jira/browse/SPARK-4037)

I have two pools setup:
[image: Inline image 1]
and would like to issue a query against the low priority pool.

I am doing this (tried both from beeline and a different JDBC client,
output below is from beeline):

 SET spark.sql.thriftserver.scheduler.pool=CRON;
+-+
| |
+-+
| spark.sql.thriftserver.scheduler.pool=CRON  |
+-+
1 row selected (0.09 seconds)
1: jdbc:hive2://myip:10001 select count(*) from mytable;

The query executes OK but does not execute against the CRON pool...Am I
misusing this setting (my goal is to be able to allocate a large set of
cores to Thriftserver but separate out to a low-priority pool some
housekeeping tasks)

Thanks for any tips.


[SQL] Wildcards in SQLContext.parquetFile?

2014-12-03 Thread Yana Kadiyska
Hi folks,

I'm wondering if someone has successfully used wildcards with a parquetFile
call?

I saw this thread and it makes me think no?
http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3CCACA1tWLjcF-NtXj=pqpqm3xk4aj0jitxjhmdqbojj_ojybo...@mail.gmail.com%3E

I have a set of parquet files that are partitioned by key. I'd like to
issue a query to read in a subset of the files, based on a directory
wildcard (the wildcard will be a little more specific than * but this is to
show the issue):

This call works fine:

sc.textFile(hdfs:///warehouse/hive/*/*/*.parquet).first
res4: String = PAR1? L??? ?\??? ,
,a??aL0?xU???e??

​

but this doesn't

scala val parquetFile =
sqlContext.parquetFile(“hdfs:///warehouse/hive/*/*/*.parquet”).first
java.io.FileNotFoundException: File
hdfs://cdh4-14822-nn/warehouse/hive/*/*/*.parquet does not exist

​


Re: Cluster getting a null pointer error

2014-12-10 Thread Yana Kadiyska
does spark-submit with SparkPi and spark-examples.jar work?

e.g.

./spark/bin/spark-submit  --class org.apache.spark.examples.SparkPi
--master spark://xx.xx.xx.xx:7077  /path/to/examples.jar


On Tue, Dec 9, 2014 at 6:58 PM, Eric Tanner eric.tan...@justenough.com
wrote:

 I have set up a cluster on AWS and am trying a really simple hello world
 program as a test.  The cluster was built using the ec2 scripts that come
 with Spark.  Anyway, I have output the error message (using --verbose)
 below.  The source code is further below that.

 Any help would be greatly appreciated.

 Thanks,

 Eric

 *Error code:*

 r...@ip-xx.xx.xx.xx ~]$ ./spark/bin/spark-submit  --verbose  --class
 com.je.test.Hello --master spark://xx.xx.xx.xx:7077
  Hello-assembly-1.0.jar
 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath
 Using properties file: /root/spark/conf/spark-defaults.conf
 Adding default property: spark.executor.memory=5929m
 Adding default property:
 spark.executor.extraClassPath=/root/ephemeral-hdfs/conf
 Adding default property:
 spark.executor.extraLibraryPath=/root/ephemeral-hdfs/lib/native/
 Using properties file: /root/spark/conf/spark-defaults.conf
 Adding default property: spark.executor.memory=5929m
 Adding default property:
 spark.executor.extraClassPath=/root/ephemeral-hdfs/conf
 Adding default property:
 spark.executor.extraLibraryPath=/root/ephemeral-hdfs/lib/native/
 Parsed arguments:
   master  spark://xx.xx.xx.xx:7077
   deployMode  null
   executorMemory  5929m
   executorCores   null
   totalExecutorCores  null
   propertiesFile  /root/spark/conf/spark-defaults.conf
   extraSparkPropertiesMap()
   driverMemorynull
   driverCores null
   driverExtraClassPathnull
   driverExtraLibraryPath  null
   driverExtraJavaOptions  null
   supervise   false
   queue   null
   numExecutorsnull
   files   null
   pyFiles null
   archivesnull
   mainClass   com.je.test.Hello
   primaryResource file:/root/Hello-assembly-1.0.jar
   namecom.je.test.Hello
   childArgs   []
   jarsnull
   verbose true

 Default properties from /root/spark/conf/spark-defaults.conf:
   spark.executor.extraLibraryPath - /root/ephemeral-hdfs/lib/native/
   spark.executor.memory - 5929m
   spark.executor.extraClassPath - /root/ephemeral-hdfs/conf


 Using properties file: /root/spark/conf/spark-defaults.conf
 Adding default property: spark.executor.memory=5929m
 Adding default property:
 spark.executor.extraClassPath=/root/ephemeral-hdfs/conf
 Adding default property:
 spark.executor.extraLibraryPath=/root/ephemeral-hdfs/lib/native/
 Main class:
 com.je.test.Hello
 Arguments:

 System properties:
 spark.executor.extraLibraryPath - /root/ephemeral-hdfs/lib/native/
 spark.executor.memory - 5929m
 SPARK_SUBMIT - true
 spark.app.name - com.je.test.Hello
 spark.jars - file:/root/Hello-assembly-1.0.jar
 spark.executor.extraClassPath - /root/ephemeral-hdfs/conf
 spark.master - spark://xxx.xx.xx.xxx:7077
 Classpath elements:
 file:/root/Hello-assembly-1.0.jar

 *Actual Error:*
 Exception in thread main java.lang.NullPointerException
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


 *Source Code:*
 package com.je.test


 import org.apache.spark.{SparkConf, SparkContext}

 class Hello {

   def main(args: Array[String]): Unit = {

 val conf = new SparkConf(true)//.set(spark.cassandra.connection.host, 
 xxx.xx.xx.xxx)
 val sc = new SparkContext(spark://xxx.xx.xx.xxx:7077, Season, conf)

 println(Hello World)

   }
 }







Trouble with cache() and parquet

2014-12-10 Thread Yana Kadiyska
Hi folks, wondering if anyone has thoughts. Trying to create something akin
to a materialized view (sqlContext is a HiveContext connected to external
metastore):


val last2HourRdd = sqlContext.sql(sselect * from mytable)
//last2HourRdd.first prints out a  org.apache.spark.sql.Row = [...] with
valid data

 last2HourRdd.cache()
//last2HourRdd.first now fails in an executor with the following:

In the driver:

14/12/10 20:24:01 INFO TaskSetManager: Starting task 0.1 in stage 25.0
(TID 35, iphere, NODE_LOCAL, 2170 bytes)
14/12/10 20:24:01 INFO TaskSetManager: Lost task 0.1 in stage 25.0
(TID 35) on executor iphere: java.lang.ClassCastException (null)
[duplicate 1]

​


And in executor:

14/12/10 19:56:57 ERROR Executor: Exception in task 0.1 in stage 20.0 (TID 27)
java.lang.ClassCastException: java.lang.String cannot be cast to
java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
at 
org.apache.spark.sql.catalyst.expressions.MutableInt.update(SpecificMutableRow.scala:73)
at 
org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.update(SpecificMutableRow.scala:231)
at 
org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.setString(SpecificMutableRow.scala:236)
at org.apache.spark.sql.columnar.STRING$.setField(ColumnType.scala:328)
at org.apache.spark.sql.columnar.STRING$.setField(ColumnType.scala:310)
at 
org.apache.spark.sql.columnar.compression.RunLengthEncoding$Decoder.next(compressionSchemes.scala:168)
at 
org.apache.spark.sql.columnar.compression.CompressibleColumnAccessor$class.extractSingle(CompressibleColumnAccessor.scala:37)
at 
org.apache.spark.sql.columnar.NativeColumnAccessor.extractSingle(ColumnAccessor.scala:64)
at 
org.apache.spark.sql.columnar.BasicColumnAccessor.extractTo(ColumnAccessor.scala:54)
at 
org.apache.spark.sql.columnar.NativeColumnAccessor.org$apache$spark$sql$columnar$NullableColumnAccessor$$super$extractTo(ColumnAccessor.scala:64)
at 
org.apache.spark.sql.columnar.NullableColumnAccessor$class.extractTo(NullableColumnAccessor.scala:52)
at 
org.apache.spark.sql.columnar.NativeColumnAccessor.extractTo(ColumnAccessor.scala:64)
at 
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)
at 
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:275)
at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at 
org.apache.spark.sql.execution.Limit$$anonfun$4.apply(basicOperators.scala:141)
at 
org.apache.spark.sql.execution.Limit$$anonfun$4.apply(basicOperators.scala:141)
at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)


Any thoughts on this? Not sure if using the external metastore for the
inital pull is a problem or if I'm just hitting a bug.


Re: Trouble with cache() and parquet

2014-12-11 Thread Yana Kadiyska
I see -- they are the same in design but  the difference comes from
partitioned Hive tables: when the RDD is generated by querying an external
Hive metastore, the partition is appended as part of the row, and shows up
as part of the schema. Can you shed some light on why this is a problem:

last2HourRdd.first -- works ok
last2HourRdd.cache()

last2HourRdd.first -- does not work

​

The first call shows K+1 columns (and so does print schema, where K columns
are from the backing parquet files and the K+1st is the partition inlined.
My impression is that the second call to .first would just force the
cache() call and dump out that RDD to disk (with all of it's K+1 columns
and store the schema info, again with K+1 columns), and then just return a
single entry. I am not sure why the fact that Hive metastore exposes an
extra column over the raw parquet file is a problem since it does so both
on the schema and in the data: last2HourRdd.schema.fields.length reports
K+1, and so does  last2HourRdd.first.length.

I also tried
calling sqlContext.applySchema(last2HourRdd,parquetFile.schema) before
caching but it does not fix the issue. The only workaround I've come up
with so far is to replace select * with a select list_of_columns. But I'd
love to understand a little better why the cache call trips this scenario



On Wed, Dec 10, 2014 at 3:50 PM, Michael Armbrust mich...@databricks.com
wrote:

 Have you checked to make sure the schema in the metastore matches the
 schema in the parquet file?  One way to test would be to just use
 sqlContext.parquetFile(...) which infers the schema from the file instead
 of using the metastore.

 On Wed, Dec 10, 2014 at 12:46 PM, Yana Kadiyska yana.kadiy...@gmail.com
 wrote:


 Hi folks, wondering if anyone has thoughts. Trying to create something
 akin to a materialized view (sqlContext is a HiveContext connected to
 external metastore):


 val last2HourRdd = sqlContext.sql(sselect * from mytable)
 //last2HourRdd.first prints out a  org.apache.spark.sql.Row = [...] with
 valid data

  last2HourRdd.cache()
 //last2HourRdd.first now fails in an executor with the following:

 In the driver:

 14/12/10 20:24:01 INFO TaskSetManager: Starting task 0.1 in stage 25.0 (TID 
 35, iphere, NODE_LOCAL, 2170 bytes)
 14/12/10 20:24:01 INFO TaskSetManager: Lost task 0.1 in stage 25.0 (TID 35) 
 on executor iphere: java.lang.ClassCastException (null) [duplicate 1]

 ​


 And in executor:

 14/12/10 19:56:57 ERROR Executor: Exception in task 0.1 in stage 20.0 (TID 
 27)
 java.lang.ClassCastException: java.lang.String cannot be cast to 
 java.lang.Integer
  at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
  at 
 org.apache.spark.sql.catalyst.expressions.MutableInt.update(SpecificMutableRow.scala:73)
  at 
 org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.update(SpecificMutableRow.scala:231)
  at 
 org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.setString(SpecificMutableRow.scala:236)
  at org.apache.spark.sql.columnar.STRING$.setField(ColumnType.scala:328)
  at org.apache.spark.sql.columnar.STRING$.setField(ColumnType.scala:310)
  at 
 org.apache.spark.sql.columnar.compression.RunLengthEncoding$Decoder.next(compressionSchemes.scala:168)
  at 
 org.apache.spark.sql.columnar.compression.CompressibleColumnAccessor$class.extractSingle(CompressibleColumnAccessor.scala:37)
  at 
 org.apache.spark.sql.columnar.NativeColumnAccessor.extractSingle(ColumnAccessor.scala:64)
  at 
 org.apache.spark.sql.columnar.BasicColumnAccessor.extractTo(ColumnAccessor.scala:54)
  at 
 org.apache.spark.sql.columnar.NativeColumnAccessor.org$apache$spark$sql$columnar$NullableColumnAccessor$$super$extractTo(ColumnAccessor.scala:64)
  at 
 org.apache.spark.sql.columnar.NullableColumnAccessor$class.extractTo(NullableColumnAccessor.scala:52)
  at 
 org.apache.spark.sql.columnar.NativeColumnAccessor.extractTo(ColumnAccessor.scala:64)
  at 
 org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279)
  at 
 org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:275)
  at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
  at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
  at 
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
  at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
  at 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
  at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
  at scala.collection.AbstractIterator.to(Iterator.scala:1157

Re: Nabble mailing list mirror errors: This post has NOT been accepted by the mailing list yet

2014-12-13 Thread Yana Kadiyska
Since you mentioned this, I had a related quandry recently -- it also says
that the forum archives *u...@spark.incubator.apache.org
u...@spark.incubator.apache.org/* *d...@spark.incubator.apache.org
d...@spark.incubator.apache.org *respectively, yet the Community page
clearly says to email the @spark.apache.org list (but the nabble archive is
linked right there too). IMO even putting a clear explanation at the top

Posting here requires that you create an account via the UI. Your message
will be sent to both spark.incubator.apache.org and spark.apache.org (if
that is the case, i'm not sure which alias nabble posts get sent to) would
make things a lot more clear.

On Sat, Dec 13, 2014 at 5:05 PM, Josh Rosen rosenvi...@gmail.com wrote:

 I've noticed that several users are attempting to post messages to Spark's
 user / dev mailing lists using the Nabble web UI (
 http://apache-spark-user-list.1001560.n3.nabble.com/).  However, there
 are many posts in Nabble that are not posted to the Apache lists and are
 flagged with This post has NOT been accepted by the mailing list yet.
 errors.

 I suspect that the issue is that users are not completing the sign-up
 confirmation process (
 http://apache-spark-user-list.1001560.n3.nabble.com/mailing_list/MailingListOptions.jtp?forum=1),
 which is preventing their emails from being accepted by the mailing list.

 I wanted to mention this issue to the Spark community to see whether there
 are any good solutions to address this.  I have spoken to users who think
 that our mailing list is unresponsive / inactive because their un-posted
 messages haven't received any replies.

 - Josh



Re: Limit the # of columns in Spark Scala

2014-12-14 Thread Yana Kadiyska
Denny, I am not sure what exception you're observing but I've had luck with
2 things:

val table = sc.textFile(hdfs://)

You can try calling table.first here and you'll see the first line of the
file.
You can also do val debug = table.first.split(\t) which would give you an
array and you can indeed verify that the array contains what you want in
 positions 167,119 and 200. In the case of large files with a random bad
line I find wrapping the call within the map in try/catch very valuable --
you can dump out the whole line in the catch statement

Lastly I would guess that you're getting a compile error and not a runtime
error -- I believe c is an array of values so I think you want
tabs.map(c = (c(167), c(110), c(200)) instead of tabs.map(c = (c._(167),
c._(110), c._(200))



On Sun, Dec 14, 2014 at 3:12 PM, Denny Lee denny.g@gmail.com wrote:

 Yes - that works great! Sorry for implying I couldn't. Was just more
 flummoxed that I couldn't make the Scala call work on its own. Will
 continue to debug ;-)

 On Sun, Dec 14, 2014 at 11:39 Michael Armbrust mich...@databricks.com
 wrote:

 BTW, I cannot use SparkSQL / case right now because my table has 200
 columns (and I'm on Scala 2.10.3)


 You can still apply the schema programmatically:
 http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema




Re: Exception: NoSuchMethodError: org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions

2015-01-23 Thread Yana Kadiyska
if you're running the test via sbt you can examine the classpath that sbt
uses for the test (show runtime:full-classpath or last run)-- I find this
helps once too many includes and excludes interact.

On Thu, Jan 22, 2015 at 3:50 PM, Adrian Mocanu amoc...@verticalscope.com
wrote:


 I use spark 1.1.0-SNAPSHOT and the test I'm running is in local mode. My
 test case uses org.apache.spark.streaming.TestSuiteBase

 val spark=org.apache.spark %% spark-core % 1.1.0-SNAPSHOT %
 provided excludeAll(
 val sparkStreaming= org.apache.spark % spark-streaming_2.10 %
 1.1.0-SNAPSHOT % provided excludeAll(
 val sparkCassandra= com.tuplejump % calliope_2.10 % 0.9.0-C2-EA
 exclude(org.apache.cassandra, cassandra-all)
 exclude(org.apache.cassandra, cassandra-thrift)
 val casAll = org.apache.cassandra % cassandra-all % 2.0.3
 intransitive()
 val casThrift = org.apache.cassandra % cassandra-thrift % 2.0.3
 intransitive()
 val sparkStreamingFromKafka = org.apache.spark %
 spark-streaming-kafka_2.10 % 0.9.1 excludeAll(


 -Original Message-
 From: Sean Owen [mailto:so...@cloudera.com]
 Sent: January-22-15 11:39 AM
 To: Adrian Mocanu
 Cc: u...@spark.incubator.apache.org
 Subject: Re: Exception: NoSuchMethodError:
 org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions

 NoSuchMethodError almost always means that you have compiled some code
 against one version of a library but are running against another. I wonder
 if you are including different versions of Spark in your project, or
 running against a cluster on an older version?

 On Thu, Jan 22, 2015 at 3:57 PM, Adrian Mocanu amoc...@verticalscope.com
 wrote:
  Hi
 
  I get this exception when I run a Spark test case on my local machine:
 
 
 
  An exception or error caused a run to abort:
  org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions(Lo
  rg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;Lsca
  la/reflect/ClassTag;Lscala/math/Ordering;)Lorg/apache/spark/streaming/
  dstream/PairDStreamFunctions;
 
  java.lang.NoSuchMethodError:
  org.apache.spark.streaming.StreamingContext$.toPairDStreamFunctions(Lo
  rg/apache/spark/streaming/dstream/DStream;Lscala/reflect/ClassTag;Lsca
  la/reflect/ClassTag;Lscala/math/Ordering;)Lorg/apache/spark/streaming/
  dstream/PairDStreamFunctions;
 
 
 
  In my test case I have these Spark related imports imports:
 
  import org.apache.spark.streaming.StreamingContext._
 
  import org.apache.spark.streaming.TestSuiteBase
 
  import org.apache.spark.streaming.dstream.DStream
 
  import
  org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
 
 
 
  -Adrian
 
 
  B CB
   [  X  ܚX K  K[XZ[
   \ \ ][  X  ܚX P   \ ˘\ X  K ܙ B  ܈ Y  ] [ۘ[[X[ K[XZ[
   \ \ Z [ \ ˘\ X  K ܙ B B

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Results never return to driver | Spark Custom Reader

2015-01-23 Thread Yana Kadiyska
It looks to me like your executor actually crashed and didn't just finish
properly.

Can you check the executor log?

It is available in the UI, or on the worker machine, under $SPARK_HOME/work/
 app-20150123155114-/6/stderr  (unless you manually changed the work
directory location but in that case I'd assume you know where to find the
log)

On Thu, Jan 22, 2015 at 10:54 PM, Harihar Nahak hna...@wynyardgroup.com
wrote:

 Hi All,

 I wrote a custom reader to read a DB, and it is able to return key and
 value
 as expected but after it finished it never returned to driver

 here is output of worker log :
 15/01/23 15:51:38 INFO worker.ExecutorRunner: Launch command: java -cp

 ::/usr/local/spark-1.2.0-bin-hadoop2.4/sbin/../conf:/usr/local/spark-1.2.0-bin-hadoop2.4/lib/spark-assembly-1.2.0-hadoop2.4.0.jar:/usr/local/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar:/usr/local/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/usr/local/hadoop/etc/hadoop
 -XX:MaxPermSize=128m -Dspark.driver.port=53484 -Xms1024M -Xmx1024M
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://sparkDriver@VM90:53484/user/CoarseGrainedScheduler 6 VM99
 4 app-20150123155114-
 akka.tcp://sparkWorker@VM99:44826/user/Worker
 15/01/23 15:51:47 INFO worker.Worker: Executor app-20150123155114-/6
 finished with state EXITED message Command exited with code 1 exitStatus 1
 15/01/23 15:51:47 WARN remote.ReliableDeliverySupervisor: Association with
 remote system [akka.tcp://sparkExecutor@VM99:57695] has failed, address is
 now gated for [5000] ms. Reason is: [Disassociated].
 15/01/23 15:51:47 INFO actor.LocalActorRef: Message
 [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
 Actor[akka://sparkWorker/deadLetters] to

 Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%40143.96.25.29%3A35065-4#-915179653]
 was not delivered. [3] dead letters encountered. This logging can be turned
 off or adjusted with configuration settings 'akka.log-dead-letters' and
 'akka.log-dead-letters-during-shutdown'.
 15/01/23 15:51:49 INFO worker.Worker: Asked to kill unknown executor
 app-20150123155114-/6

 If someone noticed any clue to fixed that will really appreciate.



 -
 --Harihar
 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Results-never-return-to-driver-Spark-Custom-Reader-tp21328.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: spark-shell has syntax error on windows.

2015-01-23 Thread Yana Kadiyska
https://issues.apache.org/jira/browse/SPARK-5389

I marked as minor since I also just discovered that I can run it under
PowerShell just fine. Vladimir, feel free to change the bug if you're
getting a different message or a more serious issue.

On Fri, Jan 23, 2015 at 4:44 PM, Josh Rosen rosenvi...@gmail.com wrote:

 Do you mind filing a JIRA issue for this which includes the actual error
 message string that you saw?  https://issues.apache.org/jira/browse/SPARK

 On Thu, Jan 22, 2015 at 8:31 AM, Yana Kadiyska yana.kadiy...@gmail.com
 wrote:

 I am not sure if you get the same exception as I do -- spark-shell2.cmd
 works fine for me. Windows 7 as well. I've never bothered looking to fix it
 as it seems spark-shell just calls spark-shell2 anyway...

 On Thu, Jan 22, 2015 at 3:16 AM, Vladimir Protsenko 
 protsenk...@gmail.com wrote:

 I have a problem with running spark shell in windows 7. I made the
 following
 steps:

 1. downloaded and installed Scala 2.11.5
 2. downloaded spark 1.2.0 by git clone git://github.com/apache/spark.git
 3. run dev/change-version-to-2.11.sh and mvn -Dscala-2.11 -DskipTests
 clean
 package (in git bash)

 After installation tried to run spark-shell.cmd in cmd shell and it says
 there is a syntax error in file. What could I do to fix problem?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-shell-has-syntax-error-on-windows-tp21313.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Why custom parquet format hive table execute ParquetTableScan physical plan, not HiveTableScan?

2015-01-19 Thread Yana Kadiyska
If you're talking about filter pushdowns for parquet files this also has to
be turned on explicitly. Try  *spark.sql.parquet.**filterPushdown=true . *It's
off by default

On Mon, Jan 19, 2015 at 3:46 AM, Xiaoyu Wang wangxy...@gmail.com wrote:

 Yes it works!
 But the filter can't pushdown!!!

 If custom parquetinputformat only implement the datasource API?


 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

 2015-01-16 21:51 GMT+08:00 Xiaoyu Wang wangxy...@gmail.com:

 Thanks yana!
 I will try it!

 在 2015年1月16日,20:51,yana yana.kadiy...@gmail.com 写道:

 I think you might need to set
 spark.sql.hive.convertMetastoreParquet to false if I understand that flag
 correctly

 Sent on the new Sprint Network from my Samsung Galaxy S®4.


  Original message 
 From: Xiaoyu Wang
 Date:01/16/2015 5:09 AM (GMT-05:00)
 To: user@spark.apache.org
 Subject: Why custom parquet format hive table execute ParquetTableScan
 physical plan, not HiveTableScan?

 Hi all!

 In the Spark SQL1.2.0.
 I create a hive table with custom parquet inputformat and outputformat.
 like this :
 CREATE TABLE test(
   id string,
   msg string)
 CLUSTERED BY (
   id)
 SORTED BY (
   id ASC)
 INTO 10 BUCKETS
 ROW FORMAT SERDE
   '*com.a.MyParquetHiveSerDe*'
 STORED AS INPUTFORMAT
   '*com.a.MyParquetInputFormat*'
 OUTPUTFORMAT
   '*com.a.MyParquetOutputFormat*';

 And the spark shell see the plan of select * from test is :

 [== Physical Plan ==]
 [!OutputFaker [id#5,msg#6]]
 [ *ParquetTableScan* [id#12,msg#13], (ParquetRelation
 hdfs://hadoop/user/hive/warehouse/test.db/test, Some(Configuration:
 core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml,
 yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml),
 org.apache.spark.sql.hive.HiveContext@6d15a113, []), []]

 *Not HiveTableScan*!!!
 *So it dosn't execute my custom inputformat!*
 Why? How can it execute my custom inputformat?

 Thanks!






Re: Why Parquet Predicate Pushdown doesn't work?

2015-01-17 Thread Yana Kadiyska
Just wondering if you've made any progress on this -- I'm having the same
issue. My attempts to help myself are documented here
http://mail-archives.apache.org/mod_mbox/spark-user/201501.mbox/%3CCAJ4HpHFVKvdNgKes41DvuFY=+f_nTJ2_RT41+tadhNZx=bc...@mail.gmail.com%3E
.

I don't believe I have the value scattered through all blocks issue either
as running with
sc.hadoopConfiguration.set(parquet.task.side.metadata,false) shows a
much smaller Input size for me and it is the exact same parquet files being
scanned.

On Thu, Jan 8, 2015 at 1:40 AM, Xuelin Cao xuelincao2...@gmail.com wrote:


 Yes, the problem is, I've turned the flag on.

 One possible reason for this is, the parquet file supports predicate
 pushdown by setting statistical min/max value of each column on parquet
 blocks. If in my test, the groupID=10113000 is scattered in all parquet
 blocks, then the predicate pushdown fails.

 But, I'm not quite sure about that. I don't know whether there is any
 other reason that can lead to this.


 On Wed, Jan 7, 2015 at 10:14 PM, Cody Koeninger c...@koeninger.org
 wrote:

 But Xuelin already posted in the original message that the code was using

 SET spark.sql.parquet.filterPushdown=true

 On Wed, Jan 7, 2015 at 12:42 AM, Daniel Haviv danielru...@gmail.com
 wrote:

 Quoting Michael:
 Predicate push down into the input format is turned off by default
 because there is a bug in the current parquet library that null pointers
 when there are full row groups that are null.

 https://issues.apache.org/jira/browse/SPARK-4258

 You can turn it on if you want:
 http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration

 Daniel

 On 7 בינו׳ 2015, at 08:18, Xuelin Cao xuelin...@yahoo.com.INVALID
 wrote:


 Hi,

I'm testing parquet file format, and the predicate pushdown is a
 very useful feature for us.

However, it looks like the predicate push down doesn't work after
 I set
sqlContext.sql(SET spark.sql.parquet.filterPushdown=true)

Here is my sql:
*sqlContext.sql(select adId, adTitle  from ad where
 groupId=10113000).collect*

Then, I checked the amount of input data on the WEB UI. But the
 amount of input data is ALWAYS 80.2M regardless whether I turn the 
 spark.sql.parquet.filterPushdown
 flag on or off.

I'm not sure, if there is anything that I must do when *generating
 *the parquet file in order to make the predicate pushdown available.
 (Like ORC file, when creating the ORC file, I need to explicitly sort the
 field that will be used for predicate pushdown)

Anyone have any idea?

And, anyone knows the internal mechanism for parquet predicate
 pushdown?

Thanks








Re: Issues with constants in Spark HiveQL queries

2015-01-14 Thread Yana Kadiyska
yeah, that makes sense. Pala, are you on a prebuild version of Spark -- I
just tried the CDH4 prebuilt...Here is what I get for the = token:

[image: Inline image 1]

The literal type shows as 290, not 291, and 290 is numeric. According to
this
http://grepcode.com/file/repo1.maven.org/maven2/org.apache.hive/hive-exec/0.13.1/org/apache/hadoop/hive/ql/parse/HiveParser.java#HiveParser
291 is token PLUS which is really weird...


On Wed, Jan 14, 2015 at 7:47 PM, Cheng, Hao hao.ch...@intel.com wrote:

  The log showed it failed in parsing, so the typo stuff shouldn’t be the
 root cause. BUT I couldn’t reproduce that with master branch.



 I did the test as follow:



 sbt/sbt –Phadoop-2.3.0 –Phadoop-2.3 –Phive –Phive-0.13.1 hive/console

 scala sql(“SELECT user_id FROM actions where
 conversion_aciton_id=20141210”)



 sbt/sbt –Phadoop-2.3.0 –Phadoop-2.3 –Phive –Phive-0.12.0 hive/console

 scala sql(“SELECT user_id FROM actions where
 conversion_aciton_id=20141210”)





 *From:* Yana Kadiyska [mailto:yana.kadiy...@gmail.com]
 *Sent:* Wednesday, January 14, 2015 11:12 PM
 *To:* Pala M Muthaia
 *Cc:* user@spark.apache.org
 *Subject:* Re: Issues with constants in Spark HiveQL queries



 Just a guess but what is the type of conversion_aciton_id? I do queries
 over an epoch all the time with no issues(where epoch's type is bigint).
 You can see the source here
 https://github.com/apache/spark/blob/v1.2.0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
  --
 not sure what ASTNode type: 291 but it sounds like it's not considered
 numeric? If it's a string it should be conversion_aciton_id=*'*20141210*'
 *(single quotes around the string)



 On Tue, Jan 13, 2015 at 5:25 PM, Pala M Muthaia 
 mchett...@rocketfuelinc.com wrote:

  Hi,



 We are testing Spark SQL-Hive QL, on Spark 1.2.0. We have run some simple
 queries successfully, but we hit the following issue whenever we attempt to
 use a constant in the query predicate.



 It seems like an issue with parsing constant.



 Query: SELECT user_id FROM actions where conversion_aciton_id=20141210



 Error:

 scala.NotImplementedError: No parse rules for ASTNode type: 291, text:
 20141210 :

 20141210



 Any ideas? This seems very basic, so we may be missing something basic,
 but i haven't figured out what it is.



 ---



 Full shell output below:



 scala sqlContext.sql(SELECT user_id FROM actions where
 conversion_aciton_id=20141210)

 15/01/13 16:55:54 INFO ParseDriver: Parsing command: SELECT user_id FROM
 actions where conversion_aciton_id=20141210

 15/01/13 16:55:54 INFO ParseDriver: Parse Completed

 15/01/13 16:55:54 INFO ParseDriver: Parsing command: SELECT user_id FROM
 actions where conversion_aciton_id=20141210

 15/01/13 16:55:54 INFO ParseDriver: Parse Completed

 java.lang.RuntimeException:

 Unsupported language features in query: SELECT user_id FROM actions where
 conversion_aciton_id=20141210

 TOK_QUERY

   TOK_FROM

 TOK_TABREF

   TOK_TABNAME

 actions

   TOK_INSERT

 TOK_DESTINATION

   TOK_DIR

 TOK_TMP_FILE

 TOK_SELECT

   TOK_SELEXPR

 TOK_TABLE_OR_COL

   user_id

 TOK_WHERE

   =

 TOK_TABLE_OR_COL

   conversion_aciton_id

 20141210



 scala.NotImplementedError: No parse rules for ASTNode type: 291, text:
 20141210 :

 20141210

  +



 org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1110)



 at scala.sys.package$.error(package.scala:27)

 at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:251)

 at
 org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:50)

 at
 org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:49)

 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)

 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)

 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)

 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)

 at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)

 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)

 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)

 at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)

 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)

 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)

 at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)

 at
 scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)

 at
 scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)

 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57

Re: Issues with constants in Spark HiveQL queries

2015-01-14 Thread Yana Kadiyska
Just a guess but what is the type of conversion_aciton_id? I do queries
over an epoch all the time with no issues(where epoch's type is bigint).
You can see the source here
https://github.com/apache/spark/blob/v1.2.0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
--
not sure what ASTNode type: 291 but it sounds like it's not considered
numeric? If it's a string it should be
conversion_aciton_id=*'*20141210*' *(single
quotes around the string)

On Tue, Jan 13, 2015 at 5:25 PM, Pala M Muthaia mchett...@rocketfuelinc.com
 wrote:

 Hi,

 We are testing Spark SQL-Hive QL, on Spark 1.2.0. We have run some simple
 queries successfully, but we hit the following issue whenever we attempt to
 use a constant in the query predicate.

 It seems like an issue with parsing constant.

 Query: SELECT user_id FROM actions where conversion_aciton_id=20141210

 Error:
 scala.NotImplementedError: No parse rules for ASTNode type: 291, text:
 20141210 :
 20141210

 Any ideas? This seems very basic, so we may be missing something basic,
 but i haven't figured out what it is.

 ---

 Full shell output below:

 scala sqlContext.sql(SELECT user_id FROM actions where
 conversion_aciton_id=20141210)
 15/01/13 16:55:54 INFO ParseDriver: Parsing command: SELECT user_id FROM
 actions where conversion_aciton_id=20141210
 15/01/13 16:55:54 INFO ParseDriver: Parse Completed
 15/01/13 16:55:54 INFO ParseDriver: Parsing command: SELECT user_id FROM
 actions where conversion_aciton_id=20141210
 15/01/13 16:55:54 INFO ParseDriver: Parse Completed
 java.lang.RuntimeException:
 Unsupported language features in query: SELECT user_id FROM actions where
 conversion_aciton_id=20141210
 TOK_QUERY
   TOK_FROM
 TOK_TABREF
   TOK_TABNAME
 actions
   TOK_INSERT
 TOK_DESTINATION
   TOK_DIR
 TOK_TMP_FILE
 TOK_SELECT
   TOK_SELEXPR
 TOK_TABLE_OR_COL
   user_id
 TOK_WHERE
   =
 TOK_TABLE_OR_COL
   conversion_aciton_id
 20141210

 scala.NotImplementedError: No parse rules for ASTNode type: 291, text:
 20141210 :
 20141210
  +

 org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1110)

 at scala.sys.package$.error(package.scala:27)
 at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:251)
 at
 org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:50)
 at
 org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:49)
 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
 at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
 at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
 at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
 at
 scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
 at
 scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
 at
 scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
 at
 org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:31)
 at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:133)
 at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:133)
 at
 org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174)
 at
 org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173)
 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
 at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
 at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
 at
 

Re: Using Spark SQL with multiple (avro) files

2015-01-14 Thread Yana Kadiyska
If the wildcard path you have doesn't work you should probably open a bug
-- I had a similar problem with Parquet and it was a bug which recently got
closed. Not sure if sqlContext.avroFile shares a codepath with
.parquetFile...you
can try running with bits that have the fix for .parquetFile or look at the
source...
Here was my question for reference:
http://mail-archives.apache.org/mod_mbox/spark-user/201412.mbox/%3ccaaswr-5rfmu-y-7htluj2eqqaecwjs8jh+irrzhm7g1ex7v...@mail.gmail.com%3E

On Wed, Jan 14, 2015 at 4:34 AM, David Jones letsnumsperi...@gmail.com
wrote:

 Hi,

 I have a program that loads a single avro file using spark SQL, queries
 it, transforms it and then outputs the data. The file is loaded with:

 val records = sqlContext.avroFile(filePath)
 val data = records.registerTempTable(data)
 ...


 Now I want to run it over tens of thousands of Avro files (all with
 schemas that contain the fields I'm interested in).

 Is it possible to load multiple avro files recursively from a top-level
 directory using wildcards? All my avro files are stored under
 s3://my-bucket/avros/*/DATE/*.avro, and I want to run my task across all of
 these on EMR.

 If that's not possible, is there some way to load multiple avro files into
 the same table/RDD so the whole dataset can be processed (and in that case
 I'd supply paths to each file concretely, but I *really* don't want to have
 to do that).

 Thanks
 David



Re: spark-shell has syntax error on windows.

2015-01-22 Thread Yana Kadiyska
I am not sure if you get the same exception as I do -- spark-shell2.cmd
works fine for me. Windows 7 as well. I've never bothered looking to fix it
as it seems spark-shell just calls spark-shell2 anyway...

On Thu, Jan 22, 2015 at 3:16 AM, Vladimir Protsenko protsenk...@gmail.com
wrote:

 I have a problem with running spark shell in windows 7. I made the
 following
 steps:

 1. downloaded and installed Scala 2.11.5
 2. downloaded spark 1.2.0 by git clone git://github.com/apache/spark.git
 3. run dev/change-version-to-2.11.sh and mvn -Dscala-2.11 -DskipTests
 clean
 package (in git bash)

 After installation tried to run spark-shell.cmd in cmd shell and it says
 there is a syntax error in file. What could I do to fix problem?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-shell-has-syntax-error-on-windows-tp21313.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: [SparkSQL] Try2: Parquet predicate pushdown troubles

2015-01-21 Thread Yana Kadiyska
Thanks for looking Cheng. Just to clarify in case other people need this
sooner, setting sc.hadoopConfiguration.set(parquet.task.side.metadata,
false)did work well in terms of dropping rowgroups/showing small input
size. What was odd about that is that the overall time wasn't much
better...but maybe that was overhead from sending the metadata clientside.

Thanks again and looking forward to your fix

On Tue, Jan 20, 2015 at 9:07 PM, Cheng Lian lian.cs@gmail.com wrote:

  Hey Yana,

 Sorry for the late reply, missed this important thread somehow. And many
 thanks for reporting this. It turned out to be a bug — filter pushdown is
 only enabled when using client side metadata, which is not expected,
 because task side metadata code path is more performant. And I guess that
 the reason why setting parquet.task.side.metadata to false didn’t reduce
 input size for you is because you set the configuration with Spark API, or
 put it into spark-defaults.conf. This configuration goes to Hadoop
 Configuration, and Spark only merge properties whose names start with
 spark.hadoop into Hadoop Configuration instances. You may try to put
 parquet.task.side.metadata config into Hadoop core-site.xml, and then
 re-run the query. I can see significant differences by doing so.

 I’ll open a JIRA and deliver a fix for this ASAP. Thanks again for
 reporting all the details!

 Cheng

 On 1/13/15 12:56 PM, Yana Kadiyska wrote:

   Attempting to bump this up in case someone can help out after all. I
 spent a few good hours stepping through the code today, so I'll summarize
 my observations both in hope I get some help and to help others that might
 be looking into this:

  1. I am setting *spark.sql.parquet.**filterPushdown=true*
 2. I can see by stepping through the driver debugger that
 PaquetTableOperations.execute sets the filters via
 ParquetInputFormat.setFilterPredicate (I checked the conf object, things
 appear OK there)
 3. In FilteringParquetRowInputFormat, I get through the codepath for
 getTaskSideSplits. It seems that the codepath for getClientSideSplits would
 try to drop rowGroups but I don't see similar in getTaskSideSplit.

  Does anyone have pointers on where to look after this? Where is rowgroup
 filtering happening in the case of getTaskSideSplits? I can attach to the
 executor but am not quite sure what code related to Parquet gets called
 executor side...also don't see any messages in the executor logs related to
 Filtering predicates.

 For comparison, I went through the getClientSideSplits and can see that
 predicate pushdown works OK:


 sc.hadoopConfiguration.set(parquet.task.side.metadata,false)

 15/01/13 20:04:49 INFO FilteringParquetRowInputFormat: Using Client Side 
 Metadata Split Strategy
 15/01/13 20:05:13 INFO FilterCompat: Filtering using predicate: eq(epoch, 
 1417384800)
 15/01/13 20:06:45 INFO FilteringParquetRowInputFormat: Dropping 572 row 
 groups that do not pass filter predicate (28 %) !

 ​

  Is it possible that this is just a UI bug? I can see Input=4G when using
 (parquet.task.side.metadata,false) and Input=140G when using
 (parquet.task.side.metadata,true) but the runtimes are very comparable?

  [image: Inline image 1]


  JobId 4 is the ClientSide split, JobId 5 is the TaskSide split.



  On Fri, Jan 9, 2015 at 2:56 PM, Yana Kadiyska yana.kadiy...@gmail.com
 wrote:

 I am running the following (connecting to an external Hive Metastore)

   /a/shark/spark/bin/spark-shell --master spark://ip:7077  --conf
 *spark.sql.parquet.filterPushdown=true*

  val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

  and then ran two queries:

 sqlContext.sql(select count(*) from table where partition='blah' )
 andsqlContext.sql(select count(*) from table where partition='blah' and 
 epoch=1415561604)

 ​

  According to the Input tab in the UI both scan about 140G of data which
 is the size of my whole partition. So I have two questions --

  1. is there a way to tell from the plan if a predicate pushdown is
 supposed to happen?
 I see this for the second query

 res0: org.apache.spark.sql.SchemaRDD =
 SchemaRDD[0] at RDD at SchemaRDD.scala:108
 == Query Plan ==
 == Physical Plan ==
 Aggregate false, [], [Coalesce(SUM(PartialCount#49L),0) AS _c0#0L]
  Exchange SinglePartition
   Aggregate true, [], [COUNT(1) AS PartialCount#49L]
OutputFaker []
 Project []
  ParquetTableScan [epoch#139L], (ParquetRelation list of hdfs files

 ​
  2. am I doing something obviously wrong that this is not working? (Im
 guessing it's not woring because the input size for the second query shows
 unchanged and the execution time is almost 2x as long)

  thanks in advance for any insights


​



Re: Installing Spark Standalone to a Cluster

2015-01-22 Thread Yana Kadiyska
You can do ./sbin/start-slave.sh --master spark://IP:PORT. I believe you're
missing --master. In addition, it's a good idea to pass with --master
exactly the spark master's endpoint as shown on your UI under
http://localhost:8080. But that should do it. If that's not working, you
can look at the Worker log and see where it's trying to connect to and if
it's getting any errors.

On Thu, Jan 22, 2015 at 12:06 PM, riginos samarasrigi...@gmail.com wrote:

 I have downloaded spark-1.2.0.tgz on each of my node and execute ./sbt/sbt
 assembly on each of them.  So I execute. /sbin/start-master.sh on my master
 and ./bin/spark-class org.apache.spark.deploy.worker.Worker
 spark://IP:PORT.
 Althought when I got to http://localhost:8080 I cannot see any worker. Why
 is that? Do I do something wrong with the installation deploy of the spark?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Installing-Spark-Standalone-to-a-Cluster-tp21319.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Parquet predicate pushdown troubles

2015-01-09 Thread Yana Kadiyska
I am running the following (connecting to an external Hive Metastore)

 /a/shark/spark/bin/spark-shell --master spark://ip:7077  --conf
*spark.sql.parquet.filterPushdown=true*

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

and then ran two queries:

sqlContext.sql(select count(*) from table where partition='blah' )
andsqlContext.sql(select count(*) from table where partition='blah'
and epoch=1415561604)

​

According to the Input tab in the UI both scan about 140G of data which is
the size of my whole partition. So I have two questions --

1. is there a way to tell from the plan if a predicate pushdown is supposed
to happen?
I see this for the second query

res0: org.apache.spark.sql.SchemaRDD =
SchemaRDD[0] at RDD at SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
Aggregate false, [], [Coalesce(SUM(PartialCount#49L),0) AS _c0#0L]
 Exchange SinglePartition
  Aggregate true, [], [COUNT(1) AS PartialCount#49L]
   OutputFaker []
Project []
 ParquetTableScan [epoch#139L], (ParquetRelation list of hdfs files

​
2. am I doing something obviously wrong that this is not working? (Im
guessing it's not woring because the input size for the second query shows
unchanged and the execution time is almost 2x as long)

thanks in advance for any insights


Re: Getting Output From a Cluster

2015-01-08 Thread Yana Kadiyska
are you calling the saveAsText files on the DStream --looks like it? Look
at the section called Design Patterns for using foreachRDD in the link
you sent -- you want to do  dstream.foreachRDD(rdd = rdd.saveAs)

On Thu, Jan 8, 2015 at 5:20 PM, Su She suhsheka...@gmail.com wrote:

 Hello Everyone,

 Thanks in advance for the help!

 I successfully got my Kafka/Spark WordCount app to print locally. However,
 I want to run it on a cluster, which means that I will have to save it to
 HDFS if I want to be able to read the output.

 I am running Spark 1.1.0, which means according to this document:
 https://spark.apache.org/docs/1.1.0/streaming-programming-guide.html

 I should be able to use commands such as saveAsText/HadoopFiles.

 1) When I try saveAsTextFiles it says:
 cannot find symbol
 [ERROR] symbol  : method saveAsTextFiles(java.lang.String,java.lang.String)
 [ERROR] location: class
 org.apache.spark.streaming.api.java.JavaPairDStreamjava.lang.String,java.lang.Integer

 This makes some sense as saveAsTextFiles is not included here:

 http://people.apache.org/~tdas/spark-1.1.0-temp-docs/api/java/org/apache/spark/streaming/api/java/JavaPairDStream.html

 2) When I try
 saveAsHadoopFiles(hdfs://ipus-west-1.compute.internal:8020/user/testwordcount,
 txt) it builds, but when I try running it it throws this exception:

 Exception in thread main java.lang.RuntimeException:
 java.lang.RuntimeException: class scala.runtime.Nothing$ not
 org.apache.hadoop.mapred.OutputFormat
 at
 org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2079)
 at
 org.apache.hadoop.mapred.JobConf.getOutputFormat(JobConf.java:712)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1021)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:940)
 at
 org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:632)
 at
 org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:630)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
 at
 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:171)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:724)
 Caused by: java.lang.RuntimeException: class scala.runtime.Nothing$ not
 org.apache.hadoop.mapred.OutputFormat
 at
 org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2073)
 ... 14 more


 Any help is really appreciated! Thanks.




textFile partitions

2015-02-09 Thread Yana Kadiyska
Hi folks, puzzled by something pretty simple:

I have a standalone cluster with default parallelism of 2, spark-shell
running with 2 cores

sc.textFile(README.md).partitions.size returns 2 (this makes sense)
sc.textFile(README.md).coalesce(100,true).partitions.size returns 100,
also makes sense

but

sc.textFile(README.md,100).partitions.size
 gives 102 --I was expecting this to be equivalent to last statement
(i.e.result in 100 partitions)

I'd appreciate if someone can enlighten me as to why I end up with 102
This is on Spark 1.2

thanks


Re: Saving partial (top 10) DStream windows to hdfs

2015-01-08 Thread Yana Kadiyska
I'm glad you solved this issue but have a followup question for you.
Wouldn't Akhil's solution be better for you after all? I run similar
computation where a large set of data gets reduced to a much smaller
aggregate in an interval. If you do saveAsText without coalescing, I
believe you'd get the same number of files as you have partitions. So in a
worse case scenario, you'd end up with 10 partitions(if each item in your
top 10 was in a different partition) and thus, 10 output files. Seems to me
this would be horrible for subsequent processing of these (as in, this is
small-files in HDFS at its worst). But even with coalesce 1 you'd get 1
pretty small file on every interval

I'm curious what you think because everytime I come to a situation like
this I end up using a store that supports appends (some sort of DB) but
it's more code/work. So I'm curious on your experience of saving to files
(unless you have a separate process to merge these chunks, of course)

On Wed, Jan 7, 2015 at 11:56 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:

 Hi,

 It worked out as this.

 val topCounts = sortedCounts.transform(rdd =
 rdd.zipWithIndex().filter(x=x._2 =10))

 Regards,
 Laeeq


   On Wednesday, January 7, 2015 5:25 PM, Laeeq Ahmed
 laeeqsp...@yahoo.com.INVALID wrote:


 Hi Yana,

 I also think that
 *val top10 = your_stream.mapPartitions(rdd = rdd.take(10))*


 will give top 10 from each partition. I will try your code.

 Regards,
 Laeeq


   On Wednesday, January 7, 2015 5:19 PM, Yana Kadiyska 
 yana.kadiy...@gmail.com wrote:


 My understanding is that

 *val top10 = your_stream.mapPartitions(rdd = rdd.take(10))*

 would result in an RDD containing the top 10 entries per partition -- am I
 wrong?

 I am not sure if there is a more efficient way but I think this would work:

 *sortedCounts.*zipWithIndex().filter(x=x._2 =10).saveAsText

 On Wed, Jan 7, 2015 at 10:38 AM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid
  wrote:

 Hi,

 I applied it as fallows:

eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group,
 Map(args(a) - 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
 val counts = eegStreams(a).map(x =
 math.round(x.toDouble)).countByValueAndWindow(Seconds(4), Seconds(4))
 val sortedCounts = counts.map(_.swap).transform(rdd =
 rdd.sortByKey(false)).map(_.swap)
 val topCounts = sortedCounts.mapPartitions(rdd=rdd.take(10))
 *//val topCounts = sortedCounts.transform(rdd =
 ssc.sparkContext.makeRDD(rdd.take(10)))*
 topCounts.map(tuple = %s,%s.format(tuple._1,
 tuple._2)).saveAsTextFiles(hdfs://
 ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ +
 (a+1))
 topCounts.print()

 It gives the output with 10 extra values. I think it works on partition of
 each rdd rather than just rdd. I also tried the commented code. It gives
 correct result but in the start it gives serialisation error

 ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContext
 java.io.NotSerializableException:
 org.apache.spark.streaming.StreamingContext

 Output for code in red: The values in green looks extra to me.

 0,578
 -3,576
 4,559
 -1,556
 3,553
 -6,540
 6,538
 -4,535
 1,526
 10,483
 *94,8*
 *-113,8*
 *-137,8*
 *-85,8*
 *-91,8*
 *-121,8*
 *114,8*
 *108,8*
 *93,8*
 *101,8*
 1,128
 -8,118
 3,112
 -4,110
 -13,108
 4,108
 -3,107
 -10,107
 -6,106
 8,105
 *76,6*
 *74,6*
 *60,6*
 *52,6*
 *70,6*
 *71,6*
 *-60,6*
 *55,6*
 *78,5*
 *64,5*

 and so on.

 Regards,
 Laeeq



   On Tuesday, January 6, 2015 9:06 AM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:


 You can try something like:

 *val top10 = your_stream.mapPartitions(rdd = rdd.take(10))*


 Thanks
 Best Regards

 On Mon, Jan 5, 2015 at 11:08 PM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid
  wrote:

 Hi,

 I am counting values in each window and find the top values and want to
 save only the top 10 frequent values of each window to hdfs rather than all
 the values.

 *eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a)
 - 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)*
 *val counts = eegStreams(a).map(x = (math.round(x.toDouble),
 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(4), Seconds(4))*
 *val sortedCounts = counts.map(_.swap).transform(rdd =
 rdd.sortByKey(false)).map(_.swap)*
 *//sortedCounts.foreachRDD(rdd =println(\nTop 10 amplitudes:\n +
 rdd.take(10).mkString(\n)))*
 *sortedCounts.map(tuple = %s,%s.format(tuple._1,
 tuple._2)).saveAsTextFiles(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/
 http://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/
 + (a+1))*

 I can print top 10 as above in red.

 I have also tried

 *sortedCounts.foreachRDD{ rdd =
 ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/
 http://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/
 + (a+1))} *

 but I get the following error.

 *15/01/05 17:12:23 ERROR actor.OneForOneStrategy

Re: Is it possible to use windows service to start and stop spark standalone cluster

2015-03-11 Thread Yana Kadiyska
You might also want to see if TaskScheduler helps with that. I have not
used it with Windows 2008 R2 but it generally does allow you to schedule a
bat file to run on startup

On Wed, Mar 11, 2015 at 10:16 AM, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.com wrote:

 Thanks for the suggestion. I will try that.



 Ningjun





 From: Silvio Fiorito [mailto:silvio.fior...@granturing.com]
 Sent: Wednesday, March 11, 2015 12:40 AM
 To: Wang, Ningjun (LNG-NPV); user@spark.apache.org
 Subject: Re: Is it possible to use windows service to start and stop
spark standalone cluster



 Have you tried Apache Daemon?
http://commons.apache.org/proper/commons-daemon/procrun.html



 From: Wang, Ningjun (LNG-NPV)
 Date: Tuesday, March 10, 2015 at 11:47 PM
 To: user@spark.apache.org
 Subject: Is it possible to use windows service to start and stop spark
standalone cluster



 We are using spark stand alone cluster on Windows 2008 R2. I can start
spark clusters by open an command prompt and run the following



 bin\spark-class.cmd org.apache.spark.deploy.master.Master



 bin\spark-class.cmd org.apache.spark.deploy.worker.Worker spark://
mywin.mydomain.com:7077



 I can stop spark cluster by pressing Ctril-C.



 The problem is that if the machine is reboot, I have to manually start
the spark cluster again as above. Is it possible to use a windows service
to start cluster? This way when the machine is reboot, the windows service
will automatically restart spark cluster. How to stop spark cluster using
windows service is also a challenge.



 Please advise.



 Thanks



 Ningjun


Re: Errors in spark

2015-02-27 Thread Yana Kadiyska
I was actually just able to reproduce the  issue. I do wonder if this is a
bug -- the docs say When not configured by the hive-site.xml, the context
automatically creates metastore_db and warehouse in the current directory.
But as you can see in from the message warehouse is not in the current
directory, it is under /user/hive. In my case this directory was owned by
'root' and noone else had write permissions. Changing the permissions works
if you need to get unblocked quickly...But it does seem like a bug to me...


On Fri, Feb 27, 2015 at 11:21 AM, sandeep vura sandeepv...@gmail.com
wrote:

 Hi yana,

 I have removed hive-site.xml from spark/conf directory but still getting
 the same errors. Anyother way to work around.

 Regards,
 Sandeep

 On Fri, Feb 27, 2015 at 9:38 PM, Yana Kadiyska yana.kadiy...@gmail.com
 wrote:

 I think you're mixing two things: the docs say When* not *configured by
 the hive-site.xml, the context automatically creates metastore_db and
 warehouse in the current directory.. AFAIK if you want a local
 metastore, you don't put hive-site.xml anywhere. You only need the file if
 you're going to point to an external metastore. If you're pointing to an
 external metastore, in my experience I've also had to copy core-site.xml
 into conf in order to specify this property:  namefs.defaultFS/name

 On Fri, Feb 27, 2015 at 10:39 AM, sandeep vura sandeepv...@gmail.com
 wrote:

 Hi Sparkers,

 I am using hive version - hive 0.13 and copied hive-site.xml in
 spark/conf and using default derby local metastore .

 While creating a table in spark shell getting the following error ..Can
 any one please look and give solution asap..

 sqlContext.sql(CREATE TABLE IF NOT EXISTS sandeep (key INT, value
 STRING))
 15/02/27 23:06:13 ERROR RetryingHMSHandler:
 MetaException(message:file:/user/hive/warehouse_1/sandeep is not a
 directory or unable to create one)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:622)
 at
 org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105)
 at
 com.sun.proxy.$Proxy12.create_table_with_environment_context(Unknown Source)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:558)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:547)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:622)
 at
 org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89)
 at com.sun.proxy.$Proxy13.createTable(Unknown Source)
 at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:613)
 at
 org.apache.hadoop.hive.ql.exec.DDLTask.createTable(DDLTask.java:4189)
 at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:281)
 at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:153)
 at
 org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:85)
 at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1503)
 at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1270)
 at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1088)
 at org.apache.hadoop.hive.ql.Driver.run(Driver.java:911)
 at org.apache.hadoop.hive.ql.Driver.run(Driver.java:901)
 at
 org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:305)
 at
 org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276)
 at
 org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35)
 at
 org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35)
 at
 org.apache.spark.sql.execution.Command$class.execute(commands.scala:46)
 at
 org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:30)
 at
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
 at
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
 at
 org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)
 at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108)
 at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94)
 at $line9.$read$$iwC$$iwC$$iwC

Re: Errors in spark

2015-02-27 Thread Yana Kadiyska
I think you're mixing two things: the docs say When* not *configured by
the hive-site.xml, the context automatically creates metastore_db and
warehouse in the current directory.. AFAIK if you want a local metastore,
you don't put hive-site.xml anywhere. You only need the file if you're
going to point to an external metastore. If you're pointing to an external
metastore, in my experience I've also had to copy core-site.xml into conf
in order to specify this property:  namefs.defaultFS/name

On Fri, Feb 27, 2015 at 10:39 AM, sandeep vura sandeepv...@gmail.com
wrote:

 Hi Sparkers,

 I am using hive version - hive 0.13 and copied hive-site.xml in spark/conf
 and using default derby local metastore .

 While creating a table in spark shell getting the following error ..Can
 any one please look and give solution asap..

 sqlContext.sql(CREATE TABLE IF NOT EXISTS sandeep (key INT, value
 STRING))
 15/02/27 23:06:13 ERROR RetryingHMSHandler:
 MetaException(message:file:/user/hive/warehouse_1/sandeep is not a
 directory or unable to create one)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:622)
 at
 org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:105)
 at
 com.sun.proxy.$Proxy12.create_table_with_environment_context(Unknown Source)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:558)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:547)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:622)
 at
 org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89)
 at com.sun.proxy.$Proxy13.createTable(Unknown Source)
 at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:613)
 at
 org.apache.hadoop.hive.ql.exec.DDLTask.createTable(DDLTask.java:4189)
 at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:281)
 at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:153)
 at
 org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:85)
 at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1503)
 at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1270)
 at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1088)
 at org.apache.hadoop.hive.ql.Driver.run(Driver.java:911)
 at org.apache.hadoop.hive.ql.Driver.run(Driver.java:901)
 at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:305)
 at
 org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276)
 at
 org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35)
 at
 org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35)
 at
 org.apache.spark.sql.execution.Command$class.execute(commands.scala:46)
 at
 org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:30)
 at
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
 at
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
 at
 org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)
 at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108)
 at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94)
 at $line9.$read$$iwC$$iwC$$iwC$$iwC.init(console:15)
 at $line9.$read$$iwC$$iwC$$iwC.init(console:20)
 at $line9.$read$$iwC$$iwC.init(console:22)
 at $line9.$read$$iwC.init(console:24)
 at $line9.$read.init(console:26)
 at $line9.$read$.init(console:30)
 at $line9.$read$.clinit(console)
 at $line9.$eval$.init(console:7)
 at $line9.$eval$.clinit(console)
 at $line9.$eval.$print(console)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:622)
 at
 org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
 at
 

Re: Running multiple threads with same Spark Context

2015-02-25 Thread Yana Kadiyska
I am not sure if your issue is setting the Fair mode correctly or something
else so let's start with the FAIR mode.

Do you see scheduler mode actually being set to FAIR:

I have this line in spark-defaults.conf
spark.scheduler.allocation.file=/spark/conf/fairscheduler.xml

Then, when I start my application, I can see that it is using that
scheduler in the UI -- go to master UI and click on your application. Then
you should see this (note the scheduling mode is shown as Fair):





On Wed, Feb 25, 2015 at 4:06 AM, Harika Matha matha.har...@gmail.com
wrote:

 Hi Yana,

 I tried running the program after setting the property
 spark.scheduler.mode to FAIR. But the result is same as previous. Are
 there any other properties that have to be set?


 On Tue, Feb 24, 2015 at 10:26 PM, Yana Kadiyska yana.kadiy...@gmail.com
 wrote:

 It's hard to tell. I have not run this on EC2 but this worked for me:

 The only thing that I can think of is that the scheduling mode is set to

- *Scheduling Mode:* FAIR


 val pool: ExecutorService = Executors.newFixedThreadPool(poolSize)
 while_loop to get curr_job
  pool.execute(new ReportJob(sqlContext, curr_job, i))

 class ReportJob(sqlContext:org.apache.spark.sql.hive.HiveContext,query: 
 String,id:Int) extends Runnable with Logging {
   def threadId = (Thread.currentThread.getName() + \t)

   def run() {
 logInfo(s* Running ${threadId} ${id})
 val startTime = Platform.currentTime
 val hiveQuery=query
 val result_set = sqlContext.sql(hiveQuery)
 result_set.repartition(1)
 result_set.saveAsParquetFile(shdfs:///tmp/${id})
 logInfo(s* DONE ${threadId} ${id} time: 
 +(Platform.currentTime-startTime))
   }
 }

 ​

 On Tue, Feb 24, 2015 at 4:04 AM, Harika matha.har...@gmail.com wrote:

 Hi all,

 I have been running a simple SQL program on Spark. To test the
 concurrency,
 I have created 10 threads inside the program, all threads using same
 SQLContext object. When I ran the program on my EC2 cluster using
 spark-submit, only 3 threads were running in parallel. I have repeated
 the
 test on different EC2 clusters (containing different number of cores) and
 found out that only 3 threads are running in parallel on every cluster.

 Why is this behaviour seen? What does this number 3 specify?
 Is there any configuration parameter that I have to set if I want to run
 more threads concurrently?

 Thanks
 Harika



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Running-multiple-threads-with-same-Spark-Context-tp21784.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Executor size and checkpoints

2015-02-21 Thread Yana Kadiyska
Hi all,

I had a streaming application and midway through things decided to up the
executor memory. I spent a long time launching like this:

~/spark-1.2.0-bin-cdh4/bin/spark-submit --class StreamingTest
--executor-memory 2G --master...

and observing the executor memory is still at old 512 setting

I was about to ask if this is a bug when I decided to delete the
checkpoints. Sure enough the setting took after that.

So my question is -- why is it required to remove checkpoints to increase
memory allowed on an executor? This seems pretty un-intuitive to me.

Thanks for any insights.


Re: Executor size and checkpoints

2015-02-24 Thread Yana Kadiyska
Tathagata, yes, I was using StreamingContext.getOrCreate. My question is
about the design decision here. I was expecting that if I have a streaming
application that say crashed, and I wanted to give the executors more
memory, I would be able to restart, using the checkpointed RDD but with
more memory.

I thought deleting the checkpoints in a checkpointed application is the
last thing that you want to do (as you lose all state). Seems a bit harsh
to have to do this just to increase the amount of memory?

On Mon, Feb 23, 2015 at 11:12 PM, Tathagata Das t...@databricks.com wrote:

 Hey Yana,

 I think you posted screenshots, but they are not visible in the email.
 Probably better to upload them and post links.

 Are you using StreamingContext.getOrCreate? If that is being used, then it
 will recreate the SparkContext with SparkConf having whatever configuration
 is present in the existing checkpoint files. It may so happen that the
 existing checkpoint files were from an old run which had 512 configured. So
 the SparkConf in the restarted SparkContext/StremingContext is accidentally
 picking up the old configuration. Deleting the checkpoint files avoided a
 restart, and the new config took affect. Maybe. :)

 TD

 On Sat, Feb 21, 2015 at 7:30 PM, Yana Kadiyska yana.kadiy...@gmail.com
 wrote:

 Hi all,

 I had a streaming application and midway through things decided to up the
 executor memory. I spent a long time launching like this:

 ~/spark-1.2.0-bin-cdh4/bin/spark-submit --class StreamingTest
 --executor-memory 2G --master...

 and observing the executor memory is still at old 512 setting

 I was about to ask if this is a bug when I decided to delete the
 checkpoints. Sure enough the setting took after that.

 So my question is -- why is it required to remove checkpoints to increase
 memory allowed on an executor? This seems pretty un-intuitive to me.

 Thanks for any insights.





[SparkSQL] Number of map tasks in SparkSQL

2015-02-24 Thread Yana Kadiyska
Shark used to have shark.map.tasks variable. Is there an equivalent for
Spark SQL?

We are trying a scenario with heavily partitioned Hive tables. We end up
with a UnionRDD with a lot of partitions underneath and hence too many
tasks:
https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala#L202

is there a good way to tell SQL to coalesce these?

thanks for any pointers


Re: Running multiple threads with same Spark Context

2015-02-24 Thread Yana Kadiyska
It's hard to tell. I have not run this on EC2 but this worked for me:

The only thing that I can think of is that the scheduling mode is set to

   - *Scheduling Mode:* FAIR


val pool: ExecutorService = Executors.newFixedThreadPool(poolSize)
while_loop to get curr_job
 pool.execute(new ReportJob(sqlContext, curr_job, i))

class ReportJob(sqlContext:org.apache.spark.sql.hive.HiveContext,query:
String,id:Int) extends Runnable with Logging {
  def threadId = (Thread.currentThread.getName() + \t)

  def run() {
logInfo(s* Running ${threadId} ${id})
val startTime = Platform.currentTime
val hiveQuery=query
val result_set = sqlContext.sql(hiveQuery)
result_set.repartition(1)
result_set.saveAsParquetFile(shdfs:///tmp/${id})
logInfo(s* DONE ${threadId} ${id} time:
+(Platform.currentTime-startTime))
  }
}

​

On Tue, Feb 24, 2015 at 4:04 AM, Harika matha.har...@gmail.com wrote:

 Hi all,

 I have been running a simple SQL program on Spark. To test the concurrency,
 I have created 10 threads inside the program, all threads using same
 SQLContext object. When I ran the program on my EC2 cluster using
 spark-submit, only 3 threads were running in parallel. I have repeated the
 test on different EC2 clusters (containing different number of cores) and
 found out that only 3 threads are running in parallel on every cluster.

 Why is this behaviour seen? What does this number 3 specify?
 Is there any configuration parameter that I have to set if I want to run
 more threads concurrently?

 Thanks
 Harika



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Running-multiple-threads-with-same-Spark-Context-tp21784.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




[SparkSQL, Spark 1.2] UDFs in group by broken?

2015-02-26 Thread Yana Kadiyska
Can someone confirm if they can run UDFs in group by in spark1.2?

I have two builds running -- one from a custom build from early December
(commit 4259ca8dd12) which works fine, and Spark1.2-RC2.

On the latter I get:

 jdbc:hive2://XXX.208:10001 select
from_unixtime(epoch,'-MM-dd-HH'),count(*) count
. . . . . . . . . . . . . . . . . . from tbl
. . . . . . . . . . . . . . . . . . group by
from_unixtime(epoch,'-MM-dd-HH');
Error: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
Expression not in GROUP BY:
HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(epoch#1049L,-MM-dd-HH)
AS _c0#1004, tree:
Aggregate 
[HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(epoch#1049L,-MM-dd-HH)],
[HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFFromUnixTime(epoch#1049L,-MM-dd-HH)
AS _c0#1004,COUNT(1) AS count#1003L]
 MetastoreRelation default, tbl, None (state=,code=0)

​

This worked fine on my older build. I don't see a JIRA on this but maybe
I'm not looking right. Can someone please advise?


Re: Help me understand the partition, parallelism in Spark

2015-02-26 Thread Yana Kadiyska
Imran, I have also observed the phenomenon of reducing the cores helping
with OOM. I wanted to ask this (hopefully without straying off topic): we
can specify the number of cores and the executor memory. But we don't get
to specify _how_ the cores are spread among executors.

Is it possible that with 24G memory and 4 cores we get a spread of 1 core
per executor thus ending up with 24G for the task, but with 24G memory and
10 cores some executor ends up with 3 cores on the same machine and thus we
have only 8G per task?

On Thu, Feb 26, 2015 at 4:42 PM, Imran Rashid iras...@cloudera.com wrote:

 Hi Yong,

 mostly correct except for:


- Since we are doing reduceByKey, shuffling will happen. Data will be
shuffled into 1000 partitions, as we have 1000 unique keys.

 no, you will not get 1000 partitions.  Spark has to decide how many
 partitions to use before it even knows how many unique keys there are.  If
 you have 200 as the default parallelism (or you just explicitly make it the
 second parameter to reduceByKey()), then you will get 200 partitions.  The
 1000 unique keys will be distributed across the 200 partitions.  ideally
 they will be distributed pretty equally, but how they get distributed
 depends on the partitioner (by default you will have a HashPartitioner, so
 it depends on the hash of your keys).

 Note that this is more or less the same as in Hadoop MapReduce.

 the amount of parallelism matters b/c there are various places in spark
 where there is some overhead proportional to the size of a partition.  So
 in your example, if you have 1000 unique keys in 200 partitions, you expect
 about 5 unique keys per partitions -- if instead you had 10 partitions,
 you'd expect 100 unique keys per partitions, and thus more data and you'd
 be more likely to hit an OOM.  But there are many other possible sources of
 OOM, so this is definitely not the *only* solution.

 Sorry I can't comment in particular about Spark SQL -- hopefully somebody
 more knowledgeable can comment on that.



 On Wed, Feb 25, 2015 at 8:58 PM, java8964 java8...@hotmail.com wrote:

 Hi, Sparkers:

 I come from the Hadoop MapReducer world, and try to understand some
 internal information of spark. From the web and this list, I keep seeing
 people talking about increase the parallelism if you get the OOM error. I
 tried to read document as much as possible to understand the RDD partition,
 and parallelism usage in the spark.

 I understand that for RDD from HDFS, by default, one partition will be
 one HDFS block, pretty straightforward. I saw that lots of RDD operations
 support 2nd parameter of parallelism. This is the part confuse me. From my
 understand, the parallelism is totally controlled by how many cores you
 give to your job. Adjust that parameter, or spark.default.parallelism
 shouldn't have any impact.

 For example, if I have a 10G data in HDFS, and assume the block size is
 128M, so we get 100 blocks/partitions in RDD. Now if I transfer that RDD to
 a Pair RDD, with 1000 unique keys in the pair RDD, and doing reduceByKey
 action, using 200 as the default parallelism. Here is what I assume:


- We have 100 partitions, as the data comes from 100 blocks. Most
likely the spark will generate 100 tasks to read and shuffle them?
- The 1000 unique keys mean the 1000 reducer group, like in MR
- If I set the max core to be 50, so there will be up to 50 tasks can
be run concurrently. The rest tasks just have to wait for the core, if
there are 50 tasks are running.
- Since we are doing reduceByKey, shuffling will happen. Data will be
shuffled into 1000 partitions, as we have 1000 unique keys.
- I don't know these 1000 partitions will be processed by how many
tasks, maybe this is the parallelism parameter comes in?
- No matter what parallelism this will be, there are ONLY 50 task can
be run concurrently. So if we set more cores, more partitions' data will 
 be
processed in the executor (which runs more thread in this case), so more
memory needs. I don't see how increasing parallelism could help the OOM in
this case.
- In my test case of Spark SQL, I gave 24G as the executor heap, my
join between 2 big datasets keeps getting OOM. I keep increasing the
spark.default.parallelism, from 200 to 400, to 2000, even to 4000, no
help. What really makes the query finish finally without OOM is after I
change the --total-executor-cores from 10 to 4.


 So my questions are:
 1) What is the parallelism really mean in the Spark? In the simple
 example above, for reduceByKey, what difference it is between parallelism
 change from 10 to 20?
 2) When we talk about partition in the spark, for the data coming from
 HDFS, I can understand the partition clearly. For the intermediate data,
 the partition will be same as key, right? For group, reducing, join action,
 uniqueness of the keys will be partition. Is that correct?
 3) Why increasing parallelism could help OOM? 

Re: Help me understand the partition, parallelism in Spark

2015-02-26 Thread Yana Kadiyska
Yong, for the 200 tasks in stage 2 and 3 -- this actually comes from the
shuffle setting: spark.sql.shuffle.partitions

On Thu, Feb 26, 2015 at 5:51 PM, java8964 java8...@hotmail.com wrote:

 Imran, thanks for your explaining about the parallelism. That is very
 helpful.

 In my test case, I am only use one box cluster, with one executor. So if I
 put 10 cores, then 10 concurrent task will be run within this one executor,
 which will handle more data than 4 core case, then leaded to OOM.

 I haven't setup Spark on our production cluster yet, but assume that we
 have 100 nodes cluster, if I guess right, set up to 1000 cores mean that on
  average, each box's executor will run 10 threads to process data. So
 lowering core will reduce the speed of spark, but can help to avoid the
 OOM, as less data to be processed in the memory.

 My another guess is that each partition will be processed by one core
 eventually. So make bigger partition count can decrease partition size,
 which should help the memory footprint. In my case, I guess that Spark SQL
 in fact doesn't use the spark.default.parallelism setting, or at least in
 my query, it is not used. So no matter what I changed, it doesn't matter.
 The reason I said that is that there is always 200 tasks in stage 2 and 3
 of my query job, no matter what I set the spark.default.parallelism.

 I think lowering the core is to exchange lower memory usage vs speed. Hope
 my understanding is correct.

 Thanks

 Yong

 --
 Date: Thu, 26 Feb 2015 17:03:20 -0500
 Subject: Re: Help me understand the partition, parallelism in Spark
 From: yana.kadiy...@gmail.com
 To: iras...@cloudera.com
 CC: java8...@hotmail.com; user@spark.apache.org


 Imran, I have also observed the phenomenon of reducing the cores helping
 with OOM. I wanted to ask this (hopefully without straying off topic): we
 can specify the number of cores and the executor memory. But we don't get
 to specify _how_ the cores are spread among executors.

 Is it possible that with 24G memory and 4 cores we get a spread of 1 core
 per executor thus ending up with 24G for the task, but with 24G memory and
 10 cores some executor ends up with 3 cores on the same machine and thus we
 have only 8G per task?

 On Thu, Feb 26, 2015 at 4:42 PM, Imran Rashid iras...@cloudera.com
 wrote:

 Hi Yong,

 mostly correct except for:


- Since we are doing reduceByKey, shuffling will happen. Data will be
shuffled into 1000 partitions, as we have 1000 unique keys.

 no, you will not get 1000 partitions.  Spark has to decide how many
 partitions to use before it even knows how many unique keys there are.  If
 you have 200 as the default parallelism (or you just explicitly make it the
 second parameter to reduceByKey()), then you will get 200 partitions.  The
 1000 unique keys will be distributed across the 200 partitions.  ideally
 they will be distributed pretty equally, but how they get distributed
 depends on the partitioner (by default you will have a HashPartitioner, so
 it depends on the hash of your keys).

 Note that this is more or less the same as in Hadoop MapReduce.

 the amount of parallelism matters b/c there are various places in spark
 where there is some overhead proportional to the size of a partition.  So
 in your example, if you have 1000 unique keys in 200 partitions, you expect
 about 5 unique keys per partitions -- if instead you had 10 partitions,
 you'd expect 100 unique keys per partitions, and thus more data and you'd
 be more likely to hit an OOM.  But there are many other possible sources of
 OOM, so this is definitely not the *only* solution.

 Sorry I can't comment in particular about Spark SQL -- hopefully somebody
 more knowledgeable can comment on that.



 On Wed, Feb 25, 2015 at 8:58 PM, java8964 java8...@hotmail.com wrote:

 Hi, Sparkers:

 I come from the Hadoop MapReducer world, and try to understand some
 internal information of spark. From the web and this list, I keep seeing
 people talking about increase the parallelism if you get the OOM error. I
 tried to read document as much as possible to understand the RDD partition,
 and parallelism usage in the spark.

 I understand that for RDD from HDFS, by default, one partition will be one
 HDFS block, pretty straightforward. I saw that lots of RDD operations
 support 2nd parameter of parallelism. This is the part confuse me. From my
 understand, the parallelism is totally controlled by how many cores you
 give to your job. Adjust that parameter, or spark.default.parallelism
 shouldn't have any impact.

 For example, if I have a 10G data in HDFS, and assume the block size is
 128M, so we get 100 blocks/partitions in RDD. Now if I transfer that RDD to
 a Pair RDD, with 1000 unique keys in the pair RDD, and doing reduceByKey
 action, using 200 as the default parallelism. Here is what I assume:


- We have 100 partitions, as the data comes from 100 blocks. Most
likely the spark will generate 100 tasks to 

[SparkSQL] Try2: Parquet predicate pushdown troubles

2015-01-13 Thread Yana Kadiyska
Attempting to bump this up in case someone can help out after all. I spent
a few good hours stepping through the code today, so I'll summarize my
observations both in hope I get some help and to help others that might be
looking into this:

1. I am setting *spark.sql.parquet.**filterPushdown=true*
2. I can see by stepping through the driver debugger that
PaquetTableOperations.execute sets the filters via
ParquetInputFormat.setFilterPredicate (I checked the conf object, things
appear OK there)
3. In FilteringParquetRowInputFormat, I get through the codepath for
getTaskSideSplits. It seems that the codepath for getClientSideSplits would
try to drop rowGroups but I don't see similar in getTaskSideSplit.

Does anyone have pointers on where to look after this? Where is rowgroup
filtering happening in the case of getTaskSideSplits? I can attach to the
executor but am not quite sure what code related to Parquet gets called
executor side...also don't see any messages in the executor logs related to
Filtering predicates.

For comparison, I went through the getClientSideSplits and can see that
predicate pushdown works OK:


sc.hadoopConfiguration.set(parquet.task.side.metadata,false)

15/01/13 20:04:49 INFO FilteringParquetRowInputFormat: Using Client
Side Metadata Split Strategy
15/01/13 20:05:13 INFO FilterCompat: Filtering using predicate:
eq(epoch, 1417384800)
15/01/13 20:06:45 INFO FilteringParquetRowInputFormat: Dropping 572
row groups that do not pass filter predicate (28 %) !

​

Is it possible that this is just a UI bug? I can see Input=4G when using
(parquet.task.side.metadata,false) and Input=140G when using
(parquet.task.side.metadata,true) but the runtimes are very comparable?

[image: Inline image 1]


JobId 4 is the ClientSide split, JobId 5 is the TaskSide split.



On Fri, Jan 9, 2015 at 2:56 PM, Yana Kadiyska yana.kadiy...@gmail.com
wrote:

 I am running the following (connecting to an external Hive Metastore)

  /a/shark/spark/bin/spark-shell --master spark://ip:7077  --conf
 *spark.sql.parquet.filterPushdown=true*

 val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

 and then ran two queries:

 sqlContext.sql(select count(*) from table where partition='blah' )
 andsqlContext.sql(select count(*) from table where partition='blah' and 
 epoch=1415561604)

 ​

 According to the Input tab in the UI both scan about 140G of data which is
 the size of my whole partition. So I have two questions --

 1. is there a way to tell from the plan if a predicate pushdown is
 supposed to happen?
 I see this for the second query

 res0: org.apache.spark.sql.SchemaRDD =
 SchemaRDD[0] at RDD at SchemaRDD.scala:108
 == Query Plan ==
 == Physical Plan ==
 Aggregate false, [], [Coalesce(SUM(PartialCount#49L),0) AS _c0#0L]
  Exchange SinglePartition
   Aggregate true, [], [COUNT(1) AS PartialCount#49L]
OutputFaker []
 Project []
  ParquetTableScan [epoch#139L], (ParquetRelation list of hdfs files

 ​
 2. am I doing something obviously wrong that this is not working? (Im
 guessing it's not woring because the input size for the second query shows
 unchanged and the execution time is almost 2x as long)

 thanks in advance for any insights




[SQL] Simple DataFrame questions

2015-04-02 Thread Yana Kadiyska
Hi folks, having some seemingly noob issues with the dataframe API.

I have a DF which came from the csv package.

1. What would be an easy way to cast a column to a given type -- my DF
columns are all typed as strings coming from a csv. I see a schema getter
but not setter on DF

2. I am trying to use the syntax used in various blog posts but can't
figure out how to reference a column by name:

scala df.filter(customer_id!=)
console:23: error: overloaded method value filter with alternatives:
  (conditionExpr: String)org.apache.spark.sql.DataFrame and
  (condition: org.apache.spark.sql.Column)org.apache.spark.sql.DataFrame
 cannot be applied to (Boolean)
  df.filter(customer_id!=)

​
3. what would be the recommended way to drop a row containing a null value
-- is it possible to do this:
scala df.filter(customer_id IS NOT NULL)


Escaping user input for Hive queries

2015-05-05 Thread Yana Kadiyska
Hi folks, we have been using the a JDBC connection to Spark's Thrift Server
so far and using JDBC prepared statements to escape potentially malicious
user input.

I am trying to port our code directly to HiveContext now (i.e. eliminate
the use of Thrift Server) and I am not quite sure how to generate a
properly escaped sql statement...

Wondering if someone has ideas on proper way to do this?

To be concrete, I would love to issue this statement

 val df = myHiveCtxt.(sqlText)

​
but I would like to defend against potential SQL injection.


Re: Spark 1.3.1 and Parquet Partitions

2015-05-07 Thread Yana Kadiyska
Here is the JIRA:  https://issues.apache.org/jira/browse/SPARK-3928
Looks like for now you'd have to list the full paths...I don't see a
comment from an official spark committer so still not sure if this is a bug
or design, but it seems to be the current state of affairs.

On Thu, May 7, 2015 at 8:43 AM, yana yana.kadiy...@gmail.com wrote:

 I believe this is a regression. Does not work for me either. There is a
 Jira on parquet wildcards which is resolved, I'll see about getting it
 reopened


 Sent on the new Sprint Network from my Samsung Galaxy S®4.


  Original message 
 From: Vaxuki
 Date:05/07/2015 7:38 AM (GMT-05:00)
 To: Olivier Girardot
 Cc: user@spark.apache.org
 Subject: Re: Spark 1.3.1 and Parquet Partitions

 Olivier
 Nope. Wildcard extensions don't work I am debugging the code to figure out
 what's wrong I know I am using 1.3.1 for sure

 Pardon typos...

 On May 7, 2015, at 7:06 AM, Olivier Girardot ssab...@gmail.com wrote:

 hdfs://some ip:8029/dataset/*/*.parquet doesn't work for you ?

 Le jeu. 7 mai 2015 à 03:32, vasuki vax...@gmail.com a écrit :

 Spark 1.3.1 -
 i have a parquet file on hdfs partitioned by some string looking like this
 /dataset/city=London/data.parquet
 /dataset/city=NewYork/data.parquet
 /dataset/city=Paris/data.paruqet
 ….

 I am trying to get to load it using sqlContext using
 sqlcontext.parquetFile(
 hdfs://some ip:8029/dataset/ what do i put here 

 No leads so far. is there i can load the partitions ? I am running on
 cluster and not local..
 -V



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-and-Parquet-Partitions-tp22792.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




  1   2   >