Re: Cassandra examples don't work for me

2014-06-05 Thread Nick Pentreath
Yyou need cassandra 1.2.6 for Spark examples —
Sent from Mailbox

On Thu, Jun 5, 2014 at 12:02 AM, Tim Kellogg t...@2lemetry.com wrote:

 Hi,
 I’m following the directions to run the cassandra example 
 “org.apache.spark.examples.CassandraTest” and I get this error
 Exception in thread main java.lang.IncompatibleClassChangeError: Found 
 interface org.apache.hadoop.mapreduce.JobContext, but class was expected
 at 
 org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:113)
 at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:90)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at 
 org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
 at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:59)
 at 
 org.apache.spark.rdd.PairRDDFunctions.reduceByKey(PairRDDFunctions.scala:370)
 at org.apache.spark.examples.CassandraTest$.main(CassandraTest.scala:100)
 at org.apache.spark.examples.CassandraTest.main(CassandraTest.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 I’m running Cassandra version 2.0.6, and this comes from the 
 spark-1.0.0-bin-hadoop2 distribution package. I am running the example with 
 this commandline:
 bin/run-example org.apache.spark.examples.CassandraTest localhost localhost 
 9160
 I suspect it’s because I’m running the wrong version of Cassandra, but I 
 can’t find the correct version listed anywhere. I hope this is an easy issue 
 to address.
 Much thanks, Tim

Re: Logistic Regression MLLib Slow

2014-06-05 Thread DB Tsai
Hi Krishna,

Also, the default optimizer with SGD converges really slow. If you are
willing to write scala code, there is a full working example for
training Logistic Regression with L-BFGS (a quasi-Newton method) in
scala. It converges a way faster than SGD.

See
http://spark.apache.org/docs/latest/mllib-optimization.html
for detail.

Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Wed, Jun 4, 2014 at 7:56 PM, Xiangrui Meng men...@gmail.com wrote:
 Hi Krishna,

 Specifying executor memory in local mode has no effect, because all of
 the threads run inside the same JVM. You can either try
 --driver-memory 60g or start a standalone server.

 Best,
 Xiangrui

 On Wed, Jun 4, 2014 at 7:28 PM, Xiangrui Meng men...@gmail.com wrote:
 80M by 4 should be about 2.5GB uncompressed. 10 iterations shouldn't
 take that long, even on a single executor. Besides what Matei
 suggested, could you also verify the executor memory in
 http://localhost:4040 in the Executors tab. It is very likely the
 executors do not have enough memory. In that case, caching may be
 slower than reading directly from disk. -Xiangrui

 On Wed, Jun 4, 2014 at 7:06 PM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 Ah, is the file gzipped by any chance? We can’t decompress gzipped files in
 parallel so they get processed by a single task.

 It may also be worth looking at the application UI (http://localhost:4040)
 to see 1) whether all the data fits in memory in the Storage tab (maybe it
 somehow becomes larger, though it seems unlikely that it would exceed 20 GB)
 and 2) how many parallel tasks run in each iteration.

 Matei

 On Jun 4, 2014, at 6:56 PM, Srikrishna S srikrishna...@gmail.com wrote:

 I am using the MLLib one (LogisticRegressionWithSGD)  with PySpark. I am
 running to only 10 iterations.

 The MLLib version of logistic regression doesn't seem to use all the cores
 on my machine.

 Regards,
 Krishna



 On Wed, Jun 4, 2014 at 6:47 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 Are you using the logistic_regression.py in examples/src/main/python or
 examples/src/main/python/mllib? The first one is an example of writing
 logistic regression by hand and won’t be as efficient as the MLlib one. I
 suggest trying the MLlib one.

 You may also want to check how many iterations it runs — by default I
 think it runs 100, which may be more than you need.

 Matei

 On Jun 4, 2014, at 5:47 PM, Srikrishna S srikrishna...@gmail.com wrote:

  Hi All.,
 
  I am new to Spark and I am trying to run LogisticRegression (with SGD)
  using MLLib on a beefy single machine with about 128GB RAM. The dataset 
  has
  about 80M rows with only 4 features so it barely occupies 2Gb on disk.
 
  I am running the code using all 8 cores with 20G memory using
  spark-submit --executor-memory 20G --master local[8]
  logistic_regression.py
 
  It seems to take about 3.5 hours without caching and over 5 hours with
  caching.
 
  What is the recommended use for Spark on a beefy single machine?
 
  Any suggestions will help!
 
  Regards,
  Krishna
 
 
  Code sample:
 
  -
  # Dataset
  d = sys.argv[1]
  data = sc.textFile(d)
 
  # Load and parse the data
  #
  --
  def parsePoint(line):
  values = [float(x) for x in line.split(',')]
  return LabeledPoint(values[0], values[1:])
  _parsedData = data.map(parsePoint)
  parsedData = _parsedData.cache()
  results = {}
 
  # Spark
  #
  --
  start_time = time.time()
  # Build the gl_model
  niters = 10
  spark_model = LogisticRegressionWithSGD.train(parsedData,
  iterations=niters)
 
  # Evaluate the gl_model on training data
  labelsAndPreds = parsedData.map(lambda p: (p.label,
  spark_model.predict(p.features)))
  trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() /
  float(parsedData.count())
 





Re: Logistic Regression MLLib Slow

2014-06-05 Thread DB Tsai
Hi Krishna,

It should work, and we use it in production with great success.
However, the constructor of LogisticRegressionModel is private[mllib],
so you have to write your code, and have the package name under
org.apache.spark.mllib instead of using scala console.

Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Wed, Jun 4, 2014 at 11:47 PM, Srikrishna S srikrishna...@gmail.com wrote:
 Does L-BFSG work with spark 1.0? (see code sample below).

 Eventually, I would like to have L-BFGS working but I was facing an issue
 where 10 passes over the data was taking forever. I ran spark in standalone
 mode and the performance is much better!

 Regards,
 Krishna

 

 I am using http://spark.apache.org/docs/latest/mllib-optimization.html

 scala val model = new LogisticRegressionModel(

   Vectors.dense(weightsWithIntercept.toArray.slice(0,
 weightsWithIntercept.size - 1)),

   weightsWithIntercept(weightsWithIntercept.size - 1))


 val model = new LogisticRegressionModel(

  |   Vectors.dense(weightsWithIntercept.toArray.slice(0,
 weightsWithIntercept.size - 1)),

  |   weightsWithIntercept(weightsWithIntercept.size - 1))

 console:20: error: constructor LogisticRegressionModel in class
 LogisticRegressionModel cannot be accessed in class $iwC

val model = new LogisticRegressionModel(

 Based on the documentation, it would seem like LogisticRegressionModel
 doesn't have a constructor:
 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.classification.LogisticRegressionModel

 LogisticRegression *does* have a constructor:
 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.classification.LogisticRegressionWithSGD



 On Wed, Jun 4, 2014 at 11:33 PM, DB Tsai dbt...@stanford.edu wrote:

 Hi Krishna,

 Also, the default optimizer with SGD converges really slow. If you are
 willing to write scala code, there is a full working example for
 training Logistic Regression with L-BFGS (a quasi-Newton method) in
 scala. It converges a way faster than SGD.

 See
 http://spark.apache.org/docs/latest/mllib-optimization.html
 for detail.

 Sincerely,

 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai


 On Wed, Jun 4, 2014 at 7:56 PM, Xiangrui Meng men...@gmail.com wrote:
  Hi Krishna,
 
  Specifying executor memory in local mode has no effect, because all of
  the threads run inside the same JVM. You can either try
  --driver-memory 60g or start a standalone server.
 
  Best,
  Xiangrui
 
  On Wed, Jun 4, 2014 at 7:28 PM, Xiangrui Meng men...@gmail.com wrote:
  80M by 4 should be about 2.5GB uncompressed. 10 iterations shouldn't
  take that long, even on a single executor. Besides what Matei
  suggested, could you also verify the executor memory in
  http://localhost:4040 in the Executors tab. It is very likely the
  executors do not have enough memory. In that case, caching may be
  slower than reading directly from disk. -Xiangrui
 
  On Wed, Jun 4, 2014 at 7:06 PM, Matei Zaharia matei.zaha...@gmail.com
  wrote:
  Ah, is the file gzipped by any chance? We can’t decompress gzipped
  files in
  parallel so they get processed by a single task.
 
  It may also be worth looking at the application UI
  (http://localhost:4040)
  to see 1) whether all the data fits in memory in the Storage tab
  (maybe it
  somehow becomes larger, though it seems unlikely that it would exceed
  20 GB)
  and 2) how many parallel tasks run in each iteration.
 
  Matei
 
  On Jun 4, 2014, at 6:56 PM, Srikrishna S srikrishna...@gmail.com
  wrote:
 
  I am using the MLLib one (LogisticRegressionWithSGD)  with PySpark. I
  am
  running to only 10 iterations.
 
  The MLLib version of logistic regression doesn't seem to use all the
  cores
  on my machine.
 
  Regards,
  Krishna
 
 
 
  On Wed, Jun 4, 2014 at 6:47 PM, Matei Zaharia
  matei.zaha...@gmail.com
  wrote:
 
  Are you using the logistic_regression.py in examples/src/main/python
  or
  examples/src/main/python/mllib? The first one is an example of
  writing
  logistic regression by hand and won’t be as efficient as the MLlib
  one. I
  suggest trying the MLlib one.
 
  You may also want to check how many iterations it runs — by default I
  think it runs 100, which may be more than you need.
 
  Matei
 
  On Jun 4, 2014, at 5:47 PM, Srikrishna S srikrishna...@gmail.com
  wrote:
 
   Hi All.,
  
   I am new to Spark and I am trying to run LogisticRegression (with
   SGD)
   using MLLib on a beefy single machine with about 128GB RAM. The
   dataset has
   about 80M rows with only 4 features so it barely occupies 2Gb on
   disk.
  
   I am running the 

Re: Join : Giving incorrect result

2014-06-05 Thread Ajay Srivastava
Sorry for replying late. It was night here.

Lian/Matei,
Here is the code snippet -
    sparkConf.set(spark.executor.memory, 10g)
    sparkConf.set(spark.cores.max, 5)
    
    val sc = new SparkContext(sparkConf)
    
    val accId2LocRDD = 
sc.textFile(hdfs://bbr-dev178:9000/data/subDbSpark/account2location).map(getKeyValueFromString(_,
 0, ',', true))
  
    val accId2DemoRDD = 
sc.textFile(hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType).map(getKeyValueFromString(_,
 0, ',', true))
    
    val joinedRDD = accId2LocRDD.join(accId2DemoRDD)

  def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char, 
retFullLine: Boolean): Tuple2[String, String] = {
    val splits = line.split(delimit)
    if (splits.length = 1) {
  (null, null)
    } else if (retFullLine) {
  (splits(keyIndex), line)
    } else{
        (splits(keyIndex), splits(splits.length-keyIndex-1))
    }
  }

    

Both of these files have 10 M records with same unique keys. Size of the file 
is nearly 280 MB and block size in hdfs is 256 MB. The output of join should 
contain 10 M records.


We have done some more experiments -
1) Running cogroup instead of join - it also gives incorrect count.
2) Running union followed by groupbykey and then filtering records with two 
entries in sequence - It also gives incorrect count.
3) Increase spark.executor.memory to 50 g and everything works fine. Count 
comes 10 M for join,cogroup and union/groupbykey/filter transformations.


I thought that 10g is enough memory for executors but even if the memory is 
less it should not result in incorrect computation. Probably there is a problem 
in reconstructing RDDs when memory is not enough. 


Thanks Chen for your observation. I get this problem on single worker so there 
will not be any mismatch of jars. On two workers, since executor memory gets 
doubled the code works fine.


Regards,
Ajay



On Thursday, June 5, 2014 1:35 AM, Matei Zaharia matei.zaha...@gmail.com 
wrote:
 


If this isn’t the problem, it would be great if you can post the code for the 
program.

Matei



On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen xche...@gmail.com wrote:

Maybe your two workers have different assembly jar files?
I just ran into a similar problem that my spark-shell is using a different jar 
file than my workers - got really confusing results.
On Jun 4, 2014 8:33 AM, Ajay Srivastava a_k_srivast...@yahoo.com wrote:

Hi,


I am doing join of two RDDs which giving different results ( counting number 
of records ) each time I run this code on same input.


The input files are large enough to be divided in two splits. When the 
program runs on two workers with single core assigned to these, output is 
consistent and looks correct. But when single worker is used with two or more 
than two cores, the result seems to be random. Every time, count of joined 
record is different.


Does this sound like a defect or I need to take care of something while using 
join ? I am using spark-0.9.1.



Regards
Ajay

Re: Can this be done in map-reduce technique (in parallel)

2014-06-05 Thread lmk
Hi Cheng,
Sorry Again.

In this method, i see that the values for 
  a - positions.iterator 
  b - positions.iterator

always remain the same. I tried to do a  b - positions.iterator.next, it
throws an  error: value filter is not a member of (Double, Double)

Is there something I am missing out here?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-this-be-handled-in-map-reduce-using-RDDs-tp6905p7033.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Re: mismatched hdfs protocol

2014-06-05 Thread bluejoe2008
ok, i see
i imported wrong jar files which only work well on default hadoop version

2014-06-05 


bluejoe2008

From: prabeesh k
Date: 2014-06-05 16:14
To: user
Subject: Re: Re: mismatched hdfs protocol
If you are not setting the Spark hadoop version, Spark built using default 
hadoop version(1.0.4).



Before importing Spark-1.0.0 libraries ,  
build Spark using SPARK_HADOOP_VERSION=2.4.0 sbt/sbt assembly command.





On Thu, Jun 5, 2014 at 12:28 PM, bluejoe2008 bluejoe2...@gmail.com wrote:

thank you!

i am developping a java project in Eclipse IDE on Windows
in which spark 1.0.0 libraries are imported
and now i want to open HDFS files as input
the hadoop version of HDFS is 2.4.0

2014-06-05 


bluejoe2008

From: prabeesh k
Date: 2014-06-05 13:23
To: user
Subject: Re: mismatched hdfs protocol
For building Spark for particular version of Hadoop 
Refer http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html 



On Thu, Jun 5, 2014 at 8:14 AM, Koert Kuipers ko...@tresata.com wrote:

you have to build spark against the version of hadoop your are using




On Wed, Jun 4, 2014 at 10:25 PM, bluejoe2008 bluejoe2...@gmail.com wrote:

hi, all
when my spark program accessed hdfs files
an error happened:

Exception in thread main org.apache.hadoop.ipc.RemoteException: Server IPC 
version 9 cannot communicate with client version 4 

it seems the client was trying to connect hadoop2 via an old hadoop protocol

so my question is:
how to specify the version of hadoop on connection?

thank you!

bluejoe

2014-06-05 

Re: Can this be done in map-reduce technique (in parallel)

2014-06-05 Thread Christopher Nguyen
Lakshmi, this is orthogonal to your question, but in case it's useful.

It sounds like you're trying to determine the home location of a user, or
something similar.

If that's the problem statement, the data pattern may suggest a far more
computationally efficient approach. For example, first map all (lat,long)
pairs into geocells of a desired resolution (e.g., 10m or 100m), then count
occurrences of geocells instead. There are simple libraries to map any
(lat,long) pairs into a geocell (string) ID very efficiently.

--
Christopher T. Nguyen
Co-founder  CEO, Adatao http://adatao.com
linkedin.com/in/ctnguyen



On Wed, Jun 4, 2014 at 3:49 AM, lmk lakshmi.muralikrish...@gmail.com
wrote:

 Hi,
 I am a new spark user. Pls let me know how to handle the following
 scenario:

 I have a data set with the following fields:
 1. DeviceId
 2. latitude
 3. longitude
 4. ip address
 5. Datetime
 6. Mobile application name

 With the above data, I would like to perform the following steps:
 1. Collect all lat and lon for each ipaddress
 (ip1,(lat1,lon1),(lat2,lon2))
 (ip2,(lat3,lon3),(lat4,lat5))
 2. For each IP,
 1.Find the distance between each lat and lon coordinate pair and
 all
 the other pairs under the same IP
 2.Select those coordinates whose distances fall under a specific
 threshold (say 100m)
 3.Find the coordinate pair with the maximum occurrences

 In this case, how can I iterate and compare each coordinate pair with all
 the other pairs?
 Can this be done in a distributed manner, as this data set is going to have
 a few million records?
 Can we do this in map/reduce commands?

 Thanks.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Can-this-be-done-in-map-reduce-technique-in-parallel-tp6905.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Can't seem to link external/twitter classes from my own app

2014-06-05 Thread Jeremy Lee
I shan't be far. I'm committed now. Spark and I are going to have a very
interesting future together, but hopefully future messages will be about
the algorithms and modules, and less how do I run make?.

I suspect doing this at the exact moment of the 0.9 - 1.0.0 transition
hasn't helped me. (I literally had the documentation changing on me between
page reloads last thursday, after days of studying the old version. I
thought I was going crazy until the new version number appeared in the
corner and the release email went out.)

The last time I entered into a serious relationship with a piece of
software like this was with a little company called Cognos. :-) And then
Microsoft asked us for some advice about a thing called OLAP Server they
were making. (But I don't think they listened as hard as they should have.)

Oh, the things I'm going to do with Spark! If it hadn't existed, I would
have had to make it.

(My honors thesis was in distributed computing. I once created an
incrementally compiled language that could pause execution, decompile, move
to another machine, recompile, restore state and continue while preserving
all active network connections. discuss.)




On Thu, Jun 5, 2014 at 5:46 PM, Nick Pentreath nick.pentre...@gmail.com
wrote:

 Great - well we do hope we hear from you, since the user list is for
 interesting success stories and anecdotes, as well as blog posts etc too :)


 On Thu, Jun 5, 2014 at 9:40 AM, Jeremy Lee unorthodox.engine...@gmail.com
  wrote:

 Oh. Yes of course. *facepalm*

 I'm sure I typed that at first, but at some point my fingers decided to
 grammar-check me. Stupid fingers. I wonder what sbt assemble does? (apart
 from error) It certainly takes a while to do it.

 Thanks for the maven offer, but I'm not scheduled to learn that until
 after Scala, streaming, graphx, mllib, HDFS, sbt, Python, and yarn. I'll
 probably need to know it for yarn, but I'm really hoping to put it off
 until then. (fortunately I already knew about linux, AWS, eclipse, git,
 java, distributed programming and ssh keyfiles, or I would have been in
 real trouble)

 Ha! OK, that worked for the Kafka project... fails on the other old 0.9
 Twitter project, but who cares... now for mine

 HAHA! YES!! Oh thank you! I have the equivalent of hello world that
 uses one external library! Now the compiler and I can have a _proper_
 conversation.

 Hopefully you won't be hearing from me for a while.



 On Thu, Jun 5, 2014 at 3:06 PM, Nick Pentreath nick.pentre...@gmail.com
 wrote:

 The magic incantation is sbt assembly (not assemble).

 Actually I find maven with their assembly plugins to be very easy (mvn
 package). I can send a Pom.xml for a skeleton project if you need
 —
 Sent from Mailbox https://www.dropbox.com/mailbox


 On Thu, Jun 5, 2014 at 6:59 AM, Jeremy Lee 
 unorthodox.engine...@gmail.com wrote:

 Hmm.. That's not working so well for me. First, I needed to add a
 project/plugin.sbt file with the contents:

 addSbtPlugin(com.eed3si9n % sbt-assembly % 0.11.4)

 Before 'sbt/sbt assemble' worked at all. And I'm not sure about that
 version number, but 0.9.1 isn't working much better and 11.4 is the
 latest one recommended by the sbt project site. Where did you get your
 version from?

 Second, even when I do get it to build a .jar, spark-submit is still
 telling me the external.twitter library is missing.

 I tried using your github project as-is, but it also complained about
 the missing plugin.. I'm trying it with various versions now to see if I
 can get that working, even though I don't know anything about kafka. Hmm,
 and no. Here's what I get:

  [info] Set current project to Simple Project (in build
 file:/home/ubuntu/spark-1.0.0/SparkKafka/)
 [error] Not a valid command: assemble
 [error] Not a valid project ID: assemble
 [error] Expected ':' (if selecting a configuration)
 [error] Not a valid key: assemble (similar: assembly, assemblyJarName,
 assemblyDirectory)
 [error] assemble
 [error]

 I also found this project which seemed to be exactly what I was after:
  https://github.com/prabeesh/SparkTwitterAnalysis

 ...but it was for Spark 0.9, and though I updated all the version
 references to 1.0.0, that one doesn't work either. I can't even get it to
 build.

 *sigh*

 Is it going to be easier to just copy the external/ source code into my
 own project? Because I will... especially if creating Uberjars takes this
 long every... single... time...



 On Thu, Jun 5, 2014 at 8:52 AM, Jeremy Lee 
 unorthodox.engine...@gmail.com wrote:

 Thanks Patrick!

 Uberjars. Cool. I'd actually heard of them. And thanks for the link to
 the example! I shall work through that today.

 I'm still learning sbt and it's many options... the last new framework
 I learned was node.js, and I think I've been rather spoiled by npm.

 At least it's not maven. Please, oh please don't make me learn maven
 too. (The only people who seem to like it have Software Stockholm 
 Syndrome:
 I know maven kidnapped me and 

Re: Unable to run a Standalone job

2014-06-05 Thread prabeesh k
try sbt clean command before build the app.

or delete .ivy2 ans .sbt  folders(not a good methode). Then try to rebuild
the project.


On Thu, Jun 5, 2014 at 11:45 AM, Sean Owen so...@cloudera.com wrote:

 I think this is SPARK-1949 again: https://github.com/apache/spark/pull/906
 I think this change fixed this issue for a few people using the SBT
 build, worth committing?

 On Thu, Jun 5, 2014 at 6:40 AM, Shrikar archak shrika...@gmail.com
 wrote:
  Hi All,
  Now that the Spark Version 1.0.0 is release there should not be any
 problem
  with the local jars.
  Shrikars-MacBook-Pro:SimpleJob shrikar$ cat simple.sbt
  name := Simple Project
 
  version := 1.0
 
  scalaVersion := 2.10.4
 
  libraryDependencies ++= Seq(org.apache.spark %% spark-core % 1.0.0,
  org.apache.spark %% spark-streaming %
  1.0.0)
 
  resolvers += Akka Repository at http://repo.akka.io/releases/;
 
  I am still having this issue
  [error] (run-main) java.lang.NoClassDefFoundError:
  javax/servlet/http/HttpServletResponse
  java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse
  at org.apache.spark.HttpServer.start(HttpServer.scala:54)
  at
 
 org.apache.spark.broadcast.HttpBroadcast$.createServer(HttpBroadcast.scala:156)
  at
 
 org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:127)
  at
 
 org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31)
  at
 
 org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48)
  at
 
 org.apache.spark.broadcast.BroadcastManager.init(BroadcastManager.scala:35)
  at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218)
  at org.apache.spark.SparkContext.init(SparkContext.scala:202)
 
  Any help would be greatly appreciated.
 
  Thanks,
  Shrikar
 
 
  On Fri, May 23, 2014 at 3:58 PM, Shrikar archak shrika...@gmail.com
 wrote:
 
  Still the same error no change
 
  Thanks,
  Shrikar
 
 
  On Fri, May 23, 2014 at 2:38 PM, Jacek Laskowski ja...@japila.pl
 wrote:
 
  Hi Shrikar,
 
  How did you build Spark 1.0.0-SNAPSHOT on your machine? My
  understanding is that `sbt publishLocal` is not enough and you really
  need `sbt assembly` instead. Give it a try and report back.
 
  As to your build.sbt, upgrade Scala to 2.10.4 and org.apache.spark
  %% spark-streaming % 1.0.0-SNAPSHOT only that will pull down
  spark-core as a transitive dep. The resolver for Akka Repository is
  not needed. Your build.sbt should really look as follows:
 
  name := Simple Project
 
  version := 1.0
 
  scalaVersion := 2.10.4
 
  libraryDependencies += org.apache.spark %% spark-streaming %
  1.0.0-SNAPSHOT
 
  Jacek
 
  On Thu, May 22, 2014 at 11:27 PM, Shrikar archak shrika...@gmail.com
  wrote:
   Hi All,
  
   I am trying to run the network count example as a seperate standalone
   job
   and running into some issues.
  
   Environment:
   1) Mac Mavericks
   2) Latest spark repo from Github.
  
  
   I have a structure like this
  
   Shrikars-MacBook-Pro:SimpleJob shrikar$ find .
   .
   ./simple.sbt
   ./src
   ./src/main
   ./src/main/scala
   ./src/main/scala/NetworkWordCount.scala
   ./src/main/scala/SimpleApp.scala.bk
  
  
   simple.sbt
   name := Simple Project
  
   version := 1.0
  
   scalaVersion := 2.10.3
  
   libraryDependencies ++= Seq(org.apache.spark %% spark-core %
   1.0.0-SNAPSHOT,
   org.apache.spark %% spark-streaming %
   1.0.0-SNAPSHOT)
  
   resolvers += Akka Repository at http://repo.akka.io/releases/;
  
  
   I am able to run the SimpleApp which is mentioned in the doc but
 when I
   try
   to run the NetworkWordCount app I get error like this am I missing
   something?
  
   [info] Running com.shrikar.sparkapps.NetworkWordCount
   14/05/22 14:26:47 INFO spark.SecurityManager: Changing view acls to:
   shrikar
   14/05/22 14:26:47 INFO spark.SecurityManager: SecurityManager:
   authentication disabled; ui acls disabled; users with view
 permissions:
   Set(shrikar)
   14/05/22 14:26:48 INFO slf4j.Slf4jLogger: Slf4jLogger started
   14/05/22 14:26:48 INFO Remoting: Starting remoting
   14/05/22 14:26:48 INFO Remoting: Remoting started; listening on
   addresses
   :[akka.tcp://spark@192.168.10.88:49963]
   14/05/22 14:26:48 INFO Remoting: Remoting now listens on addresses:
   [akka.tcp://spark@192.168.10.88:49963]
   14/05/22 14:26:48 INFO spark.SparkEnv: Registering MapOutputTracker
   14/05/22 14:26:48 INFO spark.SparkEnv: Registering BlockManagerMaster
   14/05/22 14:26:48 INFO storage.DiskBlockManager: Created local
   directory at
  
  
 /var/folders/r2/mbj08pb55n5d_9p8588xk5b0gn/T/spark-local-20140522142648-0a14
   14/05/22 14:26:48 INFO storage.MemoryStore: MemoryStore started with
   capacity 911.6 MB.
   14/05/22 14:26:48 INFO network.ConnectionManager: Bound socket to
 port
   49964
   with id = ConnectionManagerId(192.168.10.88,49964)
   14/05/22 14:26:48 INFO storage.BlockManagerMaster: Trying to register
   BlockManager
   

Re: Error related to serialisation in spark streaming

2014-06-05 Thread nilmish
Thanx a lot for your reply. I can see kryo serialiser in the UI. 

I have 1 another query :

I wanted to know the meaning of the following log message when running a
spark streaming job : 

[spark-akka.actor.default-dispatcher-18] INFO 
org.apache.spark.streaming.scheduler.JobScheduler - Total delay: 5.432 s for
time 1401870454500 ms (execution: 0.593 s) 

According to my understanding, total delay here means total end-to-end delay
which is here 5.432 sec. 

What is the meaning of execution : 0.593 ?? 

Is it the time taken for executing this particular query ? 

PS : I am running a streaming job over a window of 5 mins and quering every
1.5 sec. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-related-to-serialisation-in-spark-streaming-tp6801p7039.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Native library can not be loaded when using Mllib PCA

2014-06-05 Thread yangliuyu
Hi,

We're using Mllib (1.0.0 release version) on a k-means clustering problem.
We want to reduce the matrix column size before send the points to k-means
solver.

It works on my mac with the local mode: spark-test-run-assembly-1.0.jar
contains my application code, com.github.fommil, netlib code and
netlib-native*.so files (include jnilib and dll files) 

spark-submit --class test.TestMllibPCA --master local[4] --executor-memory
3g --driver-memory 3g --driver-class-path
/data/user/dump/spark-test-run-assembly-1.0.jar
/data/user/dump/spark-test-run-assembly-1.0.jar
/data/user/dump/user_fav_2014_04_09.csv.head1w 

But if  --driver-class-path removed, the warn message appears:
14/06/05 16:36:20 WARN LAPACK: Failed to load implementation from:
com.github.fommil.netlib.NativeSystemLAPACK
14/06/05 16:36:20 WARN LAPACK: Failed to load implementation from:
com.github.fommil.netlib.NativeRefLAPACK

or set SPARK_CLASSPATH=/data/user/dump/spark-test-run-assembly-1.0.jar can
also solve the problem.

The matrix contain sparse data with rows: 6778, columns: 2487 and the time
consume of calculating PCA is 10s and 47s respectively which infers the
native library works well.

Then I want to test it on a spark standalone cluster(on CentOS), but it
failed again.
After change JDK logging level to FINEST, got the message:
14/06/05 16:19:15 INFO JniLoader: JNI LIB =
netlib-native_system-linux-x86_64.so
14/06/05 16:19:15 INFO JniLoader: extracting
jar:file:/data/user/dump/spark-test-run-assembly-1.0.jar!/netlib-native_system-linux-x86_64.so
to /tmp/jniloader6648403281987654682netlib-native_system-linux-x86_64.so
14/06/05 16:19:15 WARN LAPACK: Failed to load implementation from:
com.github.fommil.netlib.NativeSystemLAPACK
14/06/05 16:19:15 INFO JniLoader: JNI LIB =
netlib-native_ref-linux-x86_64.so
14/06/05 16:19:15 INFO JniLoader: extracting
jar:file:/data/user/dump/spark-test-run-assembly-1.0.jar!/netlib-native_ref-linux-x86_64.so
to /tmp/jniloader2298588627398263902netlib-native_ref-linux-x86_64.so
14/06/05 16:19:16 WARN LAPACK: Failed to load implementation from:
com.github.fommil.netlib.NativeRefLAPACK
14/06/05 16:19:16 INFO LAPACK: Implementation provided by class
com.github.fommil.netlib.F2jLAPACK

The libgfortran ,atlas, blas, lapack and arpack are all installed and all of
the .so files are located under /usr/lib64, spark.executor.extraLibraryPath
is set to /usr/lib64 in conf/spark-defaults.conf but none of them works. I
tried add --jars /data/user/dump/spark-test-run-assembly-1.0.jar but no good
news.

What should I try next?

Is the native library need to be visible for driver and executor both? In
local mode the problem seems to be a classpath problem, but for standalone
and yarn mode it get more complex. A detail document is really helpful.

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Native-library-can-not-be-loaded-when-using-Mllib-PCA-tp7042.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


How to shut down Spark Streaming with Kafka properly?

2014-06-05 Thread Tobias Pfeiffer
Hi,

I am trying to use Spark Streaming with Kafka, which works like a
charm -- except for shutdown. When I run my program with sbt
run-main, sbt will never exit, because there are two non-daemon
threads left that don't die.

I created a minimal example at
https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-kafkadoesntshutdown-scala.
It starts a StreamingContext and does nothing more than connecting to
a Kafka server and printing what it receives. Using the `future { ...
}` construct, I shut down the StreamingContext after some seconds and
then print the difference between the threads at start time and at end
time. The output can be found at
https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output1.
There are a number of threads remaining that will prevent sbt from
exiting.

When I replace `KafkaUtils.createStream(...)` with a call that does
exactly the same, except that it calls `consumerConnector.shutdown()`
in `KafkaReceiver.onStop()` (which it should, IMO), the output is as
shown at https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output2.

Does anyone have *any* idea what is going on here and why the program
doesn't shut down properly? The behavior is the same with both kafka
0.8.0 and 0.8.1.1, by the way.

Thanks
Tobias


Spark Kafka streaming - ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaReceiver

2014-06-05 Thread Gaurav Dasgupta
Hi,

I have written my own custom Spark streaming code which connects to Kafka
server and fetch data. I have tested the code on local mode and it is
working fine. But when I am executing the same code on YARN mode, I am
getting KafkaReceiver class not found exception. I am providing the Spark
Kafka jar in the classpath and ensured that the path is correct for all the
nodes in my cluster.

I am using Spark 0.9.1 hadoop pre-built and is deployed on all the nodes
(10 node cluster) in the YARN cluster.
I am using the following command to run my code on YARN mode:

*SPARK_YARN_MODE=true
SPARK_JAR=assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar
SPARK_YARN_APP_JAR=/usr/local/SparkStreamExample.jar java -cp
/usr/local/SparkStreamExample.jar:assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar:external/kafka/target/spark-streaming-kafka_2.10-0.9.1.jar:/usr/local/kafka/kafka_2.10-0.8.1.1/libs/*:/usr/lib/hbase/lib/*:/etc/hadoop/conf/:/etc/hbase/conf/
SparkStreamExample yarn-client 10.10.5.32 myFirstGroup testTopic
NewTestTable 1*

Below is the error message I am getting:





















































*14/06/05 04:29:12 INFO cluster.YarnClientClusterScheduler: Adding task set
2.0 with 1 tasks14/06/05 04:29:12 INFO scheduler.TaskSetManager: Starting
task 2.0:0 as TID 70 on executor 2: manny6.musigma.com
http://manny6.musigma.com (PROCESS_LOCAL)14/06/05 04:29:12 INFO
scheduler.TaskSetManager: Serialized task 2.0:0 as 2971 bytes in 2
ms14/06/05 04:29:12 WARN scheduler.TaskSetManager: Lost TID 70 (task
2.0:0)14/06/05 04:29:12 WARN scheduler.TaskSetManager: Loss was due to
java.lang.ClassNotFoundExceptionjava.lang.ClassNotFoundException:
org.apache.spark.streaming.kafka.KafkaReceiverat
java.net.URLClassLoader$1.run(URLClassLoader.java:202)at
java.security.AccessController.doPrivileged(Native Method)at
java.net.URLClassLoader.findClass(URLClassLoader.java:190)at
java.lang.ClassLoader.loadClass(ClassLoader.java:306)at
java.lang.ClassLoader.loadClass(ClassLoader.java:247)at
java.lang.Class.forName0(Native Method)at
java.lang.Class.forName(Class.java:247)at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1574)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1495)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1731)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)at
java.io.ObjectInputStream.readArray(ObjectInputStream.java:1666)at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1322)at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
at
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:479)
at
org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:72)
at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)at
org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:145)
at
java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1791)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
at java.security.AccessController.doPrivileged(Native Method)at
javax.security.auth.Subject.doAs(Subject.java:396)at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
at 

Re: Can this be done in map-reduce technique (in parallel)

2014-06-05 Thread lmk
Hi Cheng,
Thanks a lot. That solved my problem.

Thanks again for the quick response and solution.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-this-be-handled-in-map-reduce-using-RDDs-tp6905p7047.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark not working with mesos

2014-06-05 Thread praveshjain1991
Hi Ajatix. 

Yes the HADOOP_HOME is set on the nodes and i did update the bash.

As I said, adding MESOS_HADOOP_HOME did not work.

But what is causing the original error : Java.lang.Error:
java.io.IOException: failure to login  ?

--

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-not-working-with-mesos-tp6806p7048.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Serialization problem in Spark

2014-06-05 Thread Vibhor Banga
Hi,

I am trying to do something like following in Spark:

JavaPairRDDbyte[], MyObject eventRDD = hBaseRDD.map(new
PairFunctionTuple2ImmutableBytesWritable, Result, byte[], MyObject () {
@Override
public Tuple2byte[], MyObject 
call(Tuple2ImmutableBytesWritable, Result
immutableBytesWritableResultTuple2) throws Exception {
return new
Tuple2byte[], MyObject (immutableBytesWritableResultTuple2._1.get(),
MyClass.get(immutableBytesWritableResultTuple2._2));
}
});

eventRDD.foreach(new VoidFunctionTuple2byte[], Event() {
@Override
public void call(Tuple2byte[], Event eventTuple2) throws
Exception {

processForEvent(eventTuple2._2);
}
});


processForEvent() function flow contains some processing and ultimately
writing to HBase Table. But I am getting serialisation issues with Hadoop
and HBase inbuilt classes. How do I solve this ? Does using Kyro
Serialisation help in this case ?

Thanks,
-Vibhor


Problem with serialization and deserialization

2014-06-05 Thread ANEESH .V.V
hi,

   I have a JTree. I want to serialize it using
sc.saveAsObjectFile(path). I could save it in some location. The real
problem is that when I deserialize it back using sc.objectFile(), I am not
getting the jtree. Can anyone please help me on this..

Thanks


Re: Problem with serialization and deserialization

2014-06-05 Thread Stefan van Wouw
Dear Aneesh,

Your particular use case of using Swing GUI components with Spark is a bit 
unclear to me. 

Assuming that you want Spark to operate on a tree object, you could use an 
implementation of the TreeModel ( 
http://docs.oracle.com/javase/8/docs/api/javax/swing/tree/DefaultTreeModel.html 
) used internally by the JTree. This class is serialisable, whereas JTree is 
not. This might be one of the causes of your problems when trying to serialise 
JTree (educated guess).

---
Kind regards,

Stefan van Wouw

On 05 Jun 2014, at 13:47, ANEESH .V.V aneeshnair.ku...@gmail.com wrote:

 
 hi,
 
I have a JTree. I want to serialize it using sc.saveAsObjectFile(path). 
 I could save it in some location. The real problem is that when I deserialize 
 it back using sc.objectFile(), I am not getting the jtree. Can anyone please 
 help me on this..
 
 Thanks



Re: Better line number hints for logging?

2014-06-05 Thread Daniel Darabos
On Wed, Jun 4, 2014 at 10:39 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 That’s a good idea too, maybe we can change CallSiteInfo to do that.


I've filed an issue: https://issues.apache.org/jira/browse/SPARK-2035

Matei

 On Jun 4, 2014, at 8:44 AM, Daniel Darabos 
 daniel.dara...@lynxanalytics.com wrote:

 Oh, this would be super useful for us too!

 Actually wouldn't it be best if you could see the whole call stack on the
 UI, rather than just one line? (Of course you would have to click to expand
 it.)


 On Wed, Jun 4, 2014 at 2:38 AM, John Salvatier jsalvat...@gmail.com
 wrote:

 Ok, I will probably open a Jira.


 On Tue, Jun 3, 2014 at 5:29 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 You can use RDD.setName to give it a name. There’s also a creationSite
 field that is private[spark] — we may want to add a public setter for that
 later. If the name isn’t enough and you’d like this, please open a JIRA
 issue for it.

 Matei

 On Jun 3, 2014, at 5:22 PM, John Salvatier jsalvat...@gmail.com wrote:

 I have created some extension methods for RDDs in RichRecordRDD and
 these are working exceptionally well for me.

 However, when looking at the logs, its impossible to tell what's going
 on because all the line number hints point to RichRecordRDD.scala rather
 than the code that uses it. For example:

 INFO scheduler.DAGScheduler: Submitting Stage 122 (MappedRDD[1223] at
 map at RichRecordRDD.scala:633), which is now runnable

 Is there any way set up my extension methods class so that the logs will
 print a more useful line number?








Re: Serialization problem in Spark

2014-06-05 Thread Vibhor Banga
Any inputs on this will be helpful.

Thanks,
-Vibhor


On Thu, Jun 5, 2014 at 3:41 PM, Vibhor Banga vibhorba...@gmail.com wrote:

 Hi,

 I am trying to do something like following in Spark:

 JavaPairRDDbyte[], MyObject eventRDD = hBaseRDD.map(new
 PairFunctionTuple2ImmutableBytesWritable, Result, byte[], MyObject () {
 @Override
 public Tuple2byte[], MyObject 
 call(Tuple2ImmutableBytesWritable, Result
 immutableBytesWritableResultTuple2) throws Exception {
 return new
 Tuple2byte[], MyObject (immutableBytesWritableResultTuple2._1.get(),
 MyClass.get(immutableBytesWritableResultTuple2._2));
 }
 });

 eventRDD.foreach(new VoidFunctionTuple2byte[], Event() {
 @Override
 public void call(Tuple2byte[], Event eventTuple2) throws
 Exception {

 processForEvent(eventTuple2._2);
 }
 });


 processForEvent() function flow contains some processing and ultimately
 writing to HBase Table. But I am getting serialisation issues with Hadoop
 and HBase inbuilt classes. How do I solve this ? Does using Kyro
 Serialisation help in this case ?

 Thanks,
 -Vibhor




-- 
Vibhor Banga
Software Development Engineer
Flipkart Internet Pvt. Ltd., Bangalore


Re: Spark Streaming not processing file with particular number of entries

2014-06-05 Thread praveshjain1991
The same issue persists in spark-1.0.0 as well (was using 0.9.1 earlier). Any
suggestions are welcomed.

--

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-not-processing-file-with-particular-number-of-entries-tp6694p7056.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


spark worker and yarn memory

2014-06-05 Thread Xu (Simon) Chen
I am slightly confused about the --executor-memory setting. My yarn
cluster has a maximum container memory of 8192MB.

When I specify --executor-memory 8G in my spark-shell, no container can
be started at all. It only works when I lower the executor memory to 7G.
But then, on yarn, I see 2 container per node, using 16G of memory.

Then on the spark UI, it shows that each worker has 4GB of memory, rather
than 7.

Can someone explain the relationship among the numbers I see here?

Thanks.


Loading Python libraries into Spark

2014-06-05 Thread mrm
Hi,

I am new to Spark (and almost-new in python!). How can I download and
install a Python library in my cluster so I can just import it later?

Any help would be much appreciated.

Thanks!





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Loading-Python-libraries-into-Spark-tp7059.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


compress in-memory cache?

2014-06-05 Thread Xu (Simon) Chen
I have a working set larger than available memory, thus I am hoping to turn
on rdd compression so that I can store more in-memory. Strangely it made no
difference. The number of cached partitions, fraction cached, and size in
memory remain the same. Any ideas?

I confirmed that rdd compression wasn't on before and it was on for the
second test.

scala sc.getConf.getAll foreach println
...
(spark.rdd.compress,true)
...

I haven't tried lzo vs snappy, but my guess is that either one should
provide at least some benefit..

Thanks.
-Simon


Re: compress in-memory cache?

2014-06-05 Thread Nick Pentreath
Have you set the persistence level of the RDD to MEMORY_ONLY_SER (
http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence)?
If you're calling cache, the default persistence level is MEMORY_ONLY so
that setting will have no impact.


On Thu, Jun 5, 2014 at 4:41 PM, Xu (Simon) Chen xche...@gmail.com wrote:

 I have a working set larger than available memory, thus I am hoping to
 turn on rdd compression so that I can store more in-memory. Strangely it
 made no difference. The number of cached partitions, fraction cached, and
 size in memory remain the same. Any ideas?

 I confirmed that rdd compression wasn't on before and it was on for the
 second test.

 scala sc.getConf.getAll foreach println
 ...
 (spark.rdd.compress,true)
 ...

 I haven't tried lzo vs snappy, but my guess is that either one should
 provide at least some benefit..

 Thanks.
 -Simon




Scala By the Bay Developer Conference and Training Registration

2014-06-05 Thread Alexy Khrabrov
Scala by the Bay registration and training is now open!

We are assembling a great two-day program for Scala By the Bay

www.scalabythebay.org

-- the yearly SF Scala developer conference.  This year the conference
itself is on August 8-9 in Fort Mason, near the Golden Gate bridge,
with the Scala training on August 6-7 and Apache Spark training August
11-12.  We have key companies using Scala as sponsors and partners,
and many great talk submissions already in the pipeline, including
Play and Akka.

We provide pre-conference training, Scala Foundations, and
post-conference training, Fast Track to Spark.  Each training is a two
day intensive, hands-on course.  The training and registration tickets
have early bird discounts of about $200 each, so e.g. on the all three
package the savings would be $600.  Early bird pricing ends on July
2nd.  Training capacity is limited to 25 seats in each training
session.

The keynotes are by Marius Eriksen, Principal Engineer at Twitter, the
lead on Finagle and other Twitter Stack components, and Matei Zaharia,
CTO of Databricks, the lead on Apache Spark and other Berkeley Stack
components.

There will be great lunch and dinner presentations and we plan to
bring the Off the Grid trucks for awesome food, as well as create
other opportunities afforded By the Bay to enjoy our spectacular
location and community.

We are still looking for more great talk submissions, both full-length
and lightning.  The CFP is open through Friday the 13th, and the
registration fee for the authors of accepted full-length talks will be
waived.  We will open up voting for registered attendees once we have
the majority on hand.  The program will be composed based on the votes
from the attendees, the reviewers, and the organizers.

We hope to see you at Fort Mason in August!

Alexy Khrabrov, Jason Swartz, and the Organizing Committee of Scala By the Bay


Re: Unable to run a Standalone job([NOT FOUND ] org.eclipse.jetty.orbit#javax.mail.glassfish;1.4.1.v201005082020)

2014-06-05 Thread Shrikar archak
Hi Prabeesh/ Sean,

I tried both the steps you guys mentioned looks like its not able to
resolve it.

[warn] [NOT FOUND  ]
org.eclipse.jetty.orbit#javax.transaction;1.1.1.v201105210645!javax.transaction.orbit
(131ms)
[warn]  public: tried
[warn]
http://repo1.maven.org/maven2/org/eclipse/jetty/orbit/javax.transaction/1.1.1.v201105210645/javax.transaction-1.1.1.v201105210645.orbit
[warn] [NOT FOUND  ]
org.eclipse.jetty.orbit#javax.servlet;3.0.0.v201112011016!javax.servlet.orbit
(225ms)
[warn]  public: tried
[warn]
http://repo1.maven.org/maven2/org/eclipse/jetty/orbit/javax.servlet/3.0.0.v201112011016/javax.servlet-3.0.0.v201112011016.orbit
[warn] [NOT FOUND  ]
org.eclipse.jetty.orbit#javax.mail.glassfish;1.4.1.v201005082020!javax.mail.glassfish.orbit
(214ms)
[warn]  public: tried
[warn]
http://repo1.maven.org/maven2/org/eclipse/jetty/orbit/javax.mail.glassfish/1.4.1.v201005082020/javax.mail.glassfish-1.4.1.v201005082020.orbit
[warn] [NOT FOUND  ]
org.eclipse.jetty.orbit#javax.activation;1.1.0.v201105071233!javax.activation.orbit
(112ms)
[warn]  public: tried

Thanks,
Shrikar


On Thu, Jun 5, 2014 at 1:27 AM, prabeesh k prabsma...@gmail.com wrote:

 try sbt clean command before build the app.

 or delete .ivy2 ans .sbt  folders(not a good methode). Then try to rebuild
 the project.


 On Thu, Jun 5, 2014 at 11:45 AM, Sean Owen so...@cloudera.com wrote:

 I think this is SPARK-1949 again:
 https://github.com/apache/spark/pull/906
 I think this change fixed this issue for a few people using the SBT
 build, worth committing?

 On Thu, Jun 5, 2014 at 6:40 AM, Shrikar archak shrika...@gmail.com
 wrote:
  Hi All,
  Now that the Spark Version 1.0.0 is release there should not be any
 problem
  with the local jars.
  Shrikars-MacBook-Pro:SimpleJob shrikar$ cat simple.sbt
  name := Simple Project
 
  version := 1.0
 
  scalaVersion := 2.10.4
 
  libraryDependencies ++= Seq(org.apache.spark %% spark-core %
 1.0.0,
  org.apache.spark %% spark-streaming %
  1.0.0)
 
  resolvers += Akka Repository at http://repo.akka.io/releases/;
 
  I am still having this issue
  [error] (run-main) java.lang.NoClassDefFoundError:
  javax/servlet/http/HttpServletResponse
  java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse
  at org.apache.spark.HttpServer.start(HttpServer.scala:54)
  at
 
 org.apache.spark.broadcast.HttpBroadcast$.createServer(HttpBroadcast.scala:156)
  at
 
 org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:127)
  at
 
 org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31)
  at
 
 org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48)
  at
 
 org.apache.spark.broadcast.BroadcastManager.init(BroadcastManager.scala:35)
  at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218)
  at org.apache.spark.SparkContext.init(SparkContext.scala:202)
 
  Any help would be greatly appreciated.
 
  Thanks,
  Shrikar
 
 
  On Fri, May 23, 2014 at 3:58 PM, Shrikar archak shrika...@gmail.com
 wrote:
 
  Still the same error no change
 
  Thanks,
  Shrikar
 
 
  On Fri, May 23, 2014 at 2:38 PM, Jacek Laskowski ja...@japila.pl
 wrote:
 
  Hi Shrikar,
 
  How did you build Spark 1.0.0-SNAPSHOT on your machine? My
  understanding is that `sbt publishLocal` is not enough and you really
  need `sbt assembly` instead. Give it a try and report back.
 
  As to your build.sbt, upgrade Scala to 2.10.4 and org.apache.spark
  %% spark-streaming % 1.0.0-SNAPSHOT only that will pull down
  spark-core as a transitive dep. The resolver for Akka Repository is
  not needed. Your build.sbt should really look as follows:
 
  name := Simple Project
 
  version := 1.0
 
  scalaVersion := 2.10.4
 
  libraryDependencies += org.apache.spark %% spark-streaming %
  1.0.0-SNAPSHOT
 
  Jacek
 
  On Thu, May 22, 2014 at 11:27 PM, Shrikar archak shrika...@gmail.com
 
  wrote:
   Hi All,
  
   I am trying to run the network count example as a seperate
 standalone
   job
   and running into some issues.
  
   Environment:
   1) Mac Mavericks
   2) Latest spark repo from Github.
  
  
   I have a structure like this
  
   Shrikars-MacBook-Pro:SimpleJob shrikar$ find .
   .
   ./simple.sbt
   ./src
   ./src/main
   ./src/main/scala
   ./src/main/scala/NetworkWordCount.scala
   ./src/main/scala/SimpleApp.scala.bk
  
  
   simple.sbt
   name := Simple Project
  
   version := 1.0
  
   scalaVersion := 2.10.3
  
   libraryDependencies ++= Seq(org.apache.spark %% spark-core %
   1.0.0-SNAPSHOT,
   org.apache.spark %% spark-streaming
 %
   1.0.0-SNAPSHOT)
  
   resolvers += Akka Repository at http://repo.akka.io/releases/;
  
  
   I am able to run the SimpleApp which is mentioned in the doc but
 when I
   try
   to run the NetworkWordCount app I get error like this am I missing
   something?
  
   [info] Running com.shrikar.sparkapps.NetworkWordCount
   14/05/22 14:26:47 INFO spark.SecurityManager: 

Re: reuse hadoop code in Spark

2014-06-05 Thread Wei Tan
Thanks Matei.

Using your pointers I can import data frrom HDFS, what I want to do now is 
something like this in Spark:

---
import myown.mapper

rdd.map (mapper.map)
---

The reason why I want this: myown.mapper is a java class I already 
developed. I used to run it in Hadoop. It is fairly complex and relies on 
a lot of utility java classes I wrote. Can I reuse the map function in 
java and port it into Spark?

Best regards,
Wei


-
Wei Tan, PhD
Research Staff Member
IBM T. J. Watson Research Center
http://researcher.ibm.com/person/us-wtan



From:   Matei Zaharia matei.zaha...@gmail.com
To: user@spark.apache.org, 
Date:   06/04/2014 04:28 PM
Subject:Re: reuse hadoop code in Spark



Yes, you can write some glue in Spark to call these. Some functions to 
look at:

- SparkContext.hadoopRDD lets you create an input RDD from an existing 
JobConf configured by Hadoop (including InputFormat, paths, etc)
- RDD.mapPartitions lets you operate in all the values on one partition 
(block) at a time, similar to how Mappers in MapReduce work
- PairRDDFunctions.reduceByKey and groupByKey can be used for aggregation.
- RDD.pipe() can be used to call out to a script or binary, like Hadoop 
Streaming.

A fair number of people have been running both Java and Hadoop Streaming 
apps like this.

Matei

On Jun 4, 2014, at 1:08 PM, Wei Tan w...@us.ibm.com wrote:

Hello, 

  I am trying to use spark in such a scenario: 

  I have code written in Hadoop and now I try to migrate to Spark. The 
mappers and reducers are fairly complex. So I wonder if I can reuse the 
map() functions I already wrote in Hadoop (Java), and use Spark to chain 
them, mixing the Java map() functions with Spark operators? 

  Another related question, can I use binary as operators, like Hadoop 
streaming? 

  Thanks! 
Wei 

 



Re: Native library can not be loaded when using Mllib PCA

2014-06-05 Thread Xiangrui Meng
For standalone and yarn mode, you need to install native libraries on all 
nodes. The best solution is installing them to /usr/lib/libblas.so.3 and 
/usr/lib/liblapack.so.3 . If your matrix is sparse, the native libraries cannot 
help because they are for dense linear algebra. You can create RDD of sparse 
rows and try k-means directly, it supports sparse input. -Xiangrui

Sent from my iPad

 On Jun 5, 2014, at 2:36 AM, yangliuyu yangli...@163.com wrote:
 
 Hi,
 
 We're using Mllib (1.0.0 release version) on a k-means clustering problem.
 We want to reduce the matrix column size before send the points to k-means
 solver.
 
 It works on my mac with the local mode: spark-test-run-assembly-1.0.jar
 contains my application code, com.github.fommil, netlib code and
 netlib-native*.so files (include jnilib and dll files) 
 
 spark-submit --class test.TestMllibPCA --master local[4] --executor-memory
 3g --driver-memory 3g --driver-class-path
 /data/user/dump/spark-test-run-assembly-1.0.jar
 /data/user/dump/spark-test-run-assembly-1.0.jar
 /data/user/dump/user_fav_2014_04_09.csv.head1w 
 
 But if  --driver-class-path removed, the warn message appears:
 14/06/05 16:36:20 WARN LAPACK: Failed to load implementation from:
 com.github.fommil.netlib.NativeSystemLAPACK
 14/06/05 16:36:20 WARN LAPACK: Failed to load implementation from:
 com.github.fommil.netlib.NativeRefLAPACK
 
 or set SPARK_CLASSPATH=/data/user/dump/spark-test-run-assembly-1.0.jar can
 also solve the problem.
 
 The matrix contain sparse data with rows: 6778, columns: 2487 and the time
 consume of calculating PCA is 10s and 47s respectively which infers the
 native library works well.
 
 Then I want to test it on a spark standalone cluster(on CentOS), but it
 failed again.
 After change JDK logging level to FINEST, got the message:
 14/06/05 16:19:15 INFO JniLoader: JNI LIB =
 netlib-native_system-linux-x86_64.so
 14/06/05 16:19:15 INFO JniLoader: extracting
 jar:file:/data/user/dump/spark-test-run-assembly-1.0.jar!/netlib-native_system-linux-x86_64.so
 to /tmp/jniloader6648403281987654682netlib-native_system-linux-x86_64.so
 14/06/05 16:19:15 WARN LAPACK: Failed to load implementation from:
 com.github.fommil.netlib.NativeSystemLAPACK
 14/06/05 16:19:15 INFO JniLoader: JNI LIB =
 netlib-native_ref-linux-x86_64.so
 14/06/05 16:19:15 INFO JniLoader: extracting
 jar:file:/data/user/dump/spark-test-run-assembly-1.0.jar!/netlib-native_ref-linux-x86_64.so
 to /tmp/jniloader2298588627398263902netlib-native_ref-linux-x86_64.so
 14/06/05 16:19:16 WARN LAPACK: Failed to load implementation from:
 com.github.fommil.netlib.NativeRefLAPACK
 14/06/05 16:19:16 INFO LAPACK: Implementation provided by class
 com.github.fommil.netlib.F2jLAPACK
 
 The libgfortran ,atlas, blas, lapack and arpack are all installed and all of
 the .so files are located under /usr/lib64, spark.executor.extraLibraryPath
 is set to /usr/lib64 in conf/spark-defaults.conf but none of them works. I
 tried add --jars /data/user/dump/spark-test-run-assembly-1.0.jar but no good
 news.
 
 What should I try next?
 
 Is the native library need to be visible for driver and executor both? In
 local mode the problem seems to be a classpath problem, but for standalone
 and yarn mode it get more complex. A detail document is really helpful.
 
 Thanks.
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Native-library-can-not-be-loaded-when-using-Mllib-PCA-tp7042.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: compress in-memory cache?

2014-06-05 Thread Xu (Simon) Chen
Thanks.. it works now.

-Simon


On Thu, Jun 5, 2014 at 10:47 AM, Nick Pentreath nick.pentre...@gmail.com
wrote:

 Have you set the persistence level of the RDD to MEMORY_ONLY_SER (
 http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence)?
 If you're calling cache, the default persistence level is MEMORY_ONLY so
 that setting will have no impact.


 On Thu, Jun 5, 2014 at 4:41 PM, Xu (Simon) Chen xche...@gmail.com wrote:

 I have a working set larger than available memory, thus I am hoping to
 turn on rdd compression so that I can store more in-memory. Strangely it
 made no difference. The number of cached partitions, fraction cached, and
 size in memory remain the same. Any ideas?

 I confirmed that rdd compression wasn't on before and it was on for the
 second test.

 scala sc.getConf.getAll foreach println
 ...
 (spark.rdd.compress,true)
 ...

 I haven't tried lzo vs snappy, but my guess is that either one should
 provide at least some benefit..

 Thanks.
 -Simon





Re: Loading Python libraries into Spark

2014-06-05 Thread mrm
Hi Andrei,

Thank you for your help! Just to make sure I understand, when I run this
command sc.addPyFile(/path/to/yourmodule.py), I need to be already logged
into the master node and have my python files somewhere, is that correct?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Loading-Python-libraries-into-Spark-tp7059p7073.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: reuse hadoop code in Spark

2014-06-05 Thread Matei Zaharia
Use RDD.mapPartitions to go over all the items in a partition with one Mapper 
object. It will look something like this:

rdd.mapPartitions(iterator =
  val mapper = new myown.Mapper()
  mapper.configure(conf)
  val output = // {{create an OutputCollector that stores stuff in an 
ArrayBuffer}}
  for ((key, value) - iterator) {
mapper.map(key, value, output, Reporter.NULL)
  }
  output
}

On Jun 5, 2014, at 8:12 AM, Wei Tan w...@us.ibm.com wrote:

 Thanks Matei. 
 
 Using your pointers I can import data frrom HDFS, what I want to do now is 
 something like this in Spark: 
 
 --- 
 import myown.mapper 
 
 rdd.map (mapper.map) 
 --- 
 
 The reason why I want this: myown.mapper is a java class I already developed. 
 I used to run it in Hadoop. It is fairly complex and relies on a lot of 
 utility java classes I wrote. Can I reuse the map function in java and port 
 it into Spark? 
 
 Best regards, 
 Wei 
 
 
 - 
 Wei Tan, PhD 
 Research Staff Member 
 IBM T. J. Watson Research Center 
 http://researcher.ibm.com/person/us-wtan 
 
 
 
 From:Matei Zaharia matei.zaha...@gmail.com 
 To:user@spark.apache.org, 
 Date:06/04/2014 04:28 PM 
 Subject:Re: reuse hadoop code in Spark 
 
 
 
 Yes, you can write some glue in Spark to call these. Some functions to look 
 at: 
 
 - SparkContext.hadoopRDD lets you create an input RDD from an existing 
 JobConf configured by Hadoop (including InputFormat, paths, etc) 
 - RDD.mapPartitions lets you operate in all the values on one partition 
 (block) at a time, similar to how Mappers in MapReduce work 
 - PairRDDFunctions.reduceByKey and groupByKey can be used for aggregation. 
 - RDD.pipe() can be used to call out to a script or binary, like Hadoop 
 Streaming. 
 
 A fair number of people have been running both Java and Hadoop Streaming apps 
 like this. 
 
 Matei 
 
 On Jun 4, 2014, at 1:08 PM, Wei Tan w...@us.ibm.com wrote: 
 
 Hello, 
 
  I am trying to use spark in such a scenario: 
 
  I have code written in Hadoop and now I try to migrate to Spark. The mappers 
 and reducers are fairly complex. So I wonder if I can reuse the map() 
 functions I already wrote in Hadoop (Java), and use Spark to chain them, 
 mixing the Java map() functions with Spark operators? 
 
  Another related question, can I use binary as operators, like Hadoop 
 streaming? 
 
  Thanks! 
 Wei 
 
 
 



Re: Loading Python libraries into Spark

2014-06-05 Thread Andrei
In my answer I assumed you run your program with pyspark command (e.g.
pyspark mymainscript.py, pyspark should be on your path). In this case
workflow is as follows:

1. You create SparkConf object that simply contains your app's options.
2. You create SparkContext, which initializes your application. At this
point application connects to master and asks for resources.
3. You modify SparkContext object to include everything you want to make
available for mappers on other hosts, e.g. other *.py files.
4. You create RDD (e.g. with sc.textFile) and run actual commands (e.g.
map, filter, etc.). SparkContext knows about your additional files, so
these commands are aware of your library code.

So, yes, in these settings you need to create sc (SparkContext object)
beforehand and make *.py files available on application's host.

With pyspark shell you already do have sc object initialized for you (try
running pyspark and typing sc + Enter - shell will print spark context
details). You can also use spark-submit [1], which will initialize
SparkContext from command line options. But essentially idea is always the
same: there's driver application running on one host that creates
SparkContext, collects dependencies, controls program flow, etc., and there
are workers - applications on slave hosts, that use created SparkContext
and all serialized data to perform driver's commands. Driver should know
about everything and let workers know about what they need to know (e.g.
your library code).


[1]: http://spark.apache.org/docs/latest/submitting-applications.html





On Thu, Jun 5, 2014 at 8:10 PM, mrm ma...@skimlinks.com wrote:

 Hi Andrei,

 Thank you for your help! Just to make sure I understand, when I run this
 command sc.addPyFile(/path/to/yourmodule.py), I need to be already logged
 into the master node and have my python files somewhere, is that correct?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Loading-Python-libraries-into-Spark-tp7059p7073.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



creating new ami image for spark ec2 commands

2014-06-05 Thread Matt Work Coarr
How would I go about creating a new AMI image that I can use with the spark
ec2 commands? I can't seem to find any documentation.  I'm looking for a
list of steps that I'd need to perform to make an Amazon Linux image ready
to be used by the spark ec2 tools.

I've been reading through the spark 1.0.0 documentation, looking at the
script itself (spark_ec2.py), and looking at the github project
mesos/spark-ec2.

From what I can tell, the spark_ec2.py script looks up the id of the AMI
based on the region and machine type (hvm or pvm) using static content
derived from the github repo mesos/spark-ec2.

The spark ec2 script loads the AMI id from this base url:
https://raw.github.com/mesos/spark-ec2/v2/ami-list
(Which presumably comes from https://github.com/mesos/spark-ec2 )

For instance, I'm working with us-east-1 and pvm, I'd end up with AMI id:
ami-5bb18832

Is there a list of instructions for how this AMI was created?  Assuming I'm
starting with my own Amazon Linux image, what would I need to do to make it
usable where I could pass that AMI id to spark_ec2.py rather than using the
default spark-provided AMI?

Thanks,
Matt


Examples

2014-06-05 Thread Tim Kellogg
Hi,

I’m still having trouble running the CassandraTest example from the Spark-1.0.0 
binary package. I’ve made a Stackoverflow question for it so you can get some 
street cred for helping me :)

http://stackoverflow.com/q/24069039/503826

Thanks!

Tim Kellogg
Sr. Software Engineer, Protocols
2lemetry
605-593-7099
@kellogh

Setting executor memory when using spark-shell

2014-06-05 Thread Oleg Proudnikov
Hi All,

Please help me set Executor JVM memory size. I am using Spark shell and it
appears that the executors are started with a predefined JVM heap of 512m
as soon as Spark shell starts. How can I change this setting? I tried
setting SPARK_EXECUTOR_MEMORY before launching Spark shell:

export SPARK_EXECUTOR_MEMORY=1g

I also tried several other approaches:

1) setting SPARK_WORKER_MEMORY in conf/spark-env.sh on the worker
2)  passing it as -m argument and running bin/start-slave.sh 1 -m 1g on the
worker

Thank you,
Oleg


Re: Setting executor memory when using spark-shell

2014-06-05 Thread Andrew Ash
Hi Oleg,

I set the size of my executors on a standalone cluster when using the shell
like this:

./bin/spark-shell --master $MASTER --total-executor-cores
$CORES_ACROSS_CLUSTER --driver-java-options
-Dspark.executor.memory=$MEMORY_PER_EXECUTOR

It doesn't seem particularly clean, but it works.

Andrew


On Thu, Jun 5, 2014 at 2:15 PM, Oleg Proudnikov oleg.proudni...@gmail.com
wrote:

 Hi All,

 Please help me set Executor JVM memory size. I am using Spark shell and it
 appears that the executors are started with a predefined JVM heap of 512m
 as soon as Spark shell starts. How can I change this setting? I tried
 setting SPARK_EXECUTOR_MEMORY before launching Spark shell:

 export SPARK_EXECUTOR_MEMORY=1g

 I also tried several other approaches:

 1) setting SPARK_WORKER_MEMORY in conf/spark-env.sh on the worker
 2)  passing it as -m argument and running bin/start-slave.sh 1 -m 1g on
 the worker

 Thank you,
 Oleg




Spark Streaming, download a s3 file to run a script shell on it

2014-06-05 Thread Gianluca Privitera

Hi,
I've got a weird question but maybe someone has already dealt with it.
My Spark Streaming application needs to
- download a file from a S3 bucket,
- run a script with the file as input,
- create a DStream from this script output.
I've already got the second part done with the rdd.pipe() API that 
really fits my request, but I have no idea how to manage the first part.
How can I manage to download a file and run a script on them inside a 
Spark Streaming Application?

Should I use process() from Scala or it won't work?

Thanks
Gianluca



Re: Setting executor memory when using spark-shell

2014-06-05 Thread Oleg Proudnikov
Thank you, Andrew,

I am using Spark 0.9.1 and tried your approach like this:

bin/spark-shell --driver-java-options
-Dspark.executor.memory=$MEMORY_PER_EXECUTOR

I get

bad option: '--driver-java-options'

There must be something different in my setup. Any ideas?

Thank you again,
Oleg





On 5 June 2014 22:28, Andrew Ash and...@andrewash.com wrote:

 Hi Oleg,

 I set the size of my executors on a standalone cluster when using the
 shell like this:

 ./bin/spark-shell --master $MASTER --total-executor-cores
 $CORES_ACROSS_CLUSTER --driver-java-options
 -Dspark.executor.memory=$MEMORY_PER_EXECUTOR

 It doesn't seem particularly clean, but it works.

 Andrew


 On Thu, Jun 5, 2014 at 2:15 PM, Oleg Proudnikov oleg.proudni...@gmail.com
  wrote:

 Hi All,

 Please help me set Executor JVM memory size. I am using Spark shell and
 it appears that the executors are started with a predefined JVM heap of
 512m as soon as Spark shell starts. How can I change this setting? I tried
 setting SPARK_EXECUTOR_MEMORY before launching Spark shell:

 export SPARK_EXECUTOR_MEMORY=1g

 I also tried several other approaches:

 1) setting SPARK_WORKER_MEMORY in conf/spark-env.sh on the worker
 2)  passing it as -m argument and running bin/start-slave.sh 1 -m 1g on
 the worker

 Thank you,
 Oleg





-- 
Kind regards,

Oleg


Re: implicit ALS dataSet

2014-06-05 Thread redocpot
Thank you for your quick reply.

As far as I know, the update does not require negative observations, because
the update rule

Xu = (YtCuY + λI)^-1 Yt Cu P(u)

can be simplified by taking advantage of its algebraic structure, so
negative observations are not needed. This is what I think at the first time
I read the paper.

What makes me confused is, after that, the paper (in Discussion section)
says 

Unlike explicit datasets, here *the model should take all user-item
preferences as an input, including those which are not related to any input
observation (thus hinting to a zero preference).* This is crucial, as the
given observations are inherently biased towards a positive preference, and
thus do not reflect well the user profile. 
However, taking all user-item values as an input to the model raises serious
scalability issues – the number of all those pairs tends to significantly
exceed the input size since a typical user would provide feedback only on a
small fraction of the available items. We address this by exploiting the
algebraic structure of the model, leading to an algorithm that scales
linearly with the input size *while addressing the full scope of user-item
pairs* without resorting to any sub-sampling.

If my understanding is right, it seems that we need negative obs as input,
but we dont use them during the updating. It is strange for me, because that
will generate too many use-time pair, which is not possible.

Thx for the confirmation. I will read the ALS implementation for more
details.

Hao



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/implicit-ALS-dataSet-tp7067p7086.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: implicit ALS dataSet

2014-06-05 Thread Sean Owen
On Thu, Jun 5, 2014 at 10:38 PM, redocpot julien19890...@gmail.com wrote:
 can be simplified by taking advantage of its algebraic structure, so
 negative observations are not needed. This is what I think at the first time
 I read the paper.

Correct, a big part of the reason that is efficient is because of
sparsity of the input.

 What makes me confused is, after that, the paper (in Discussion section)
 says

 Unlike explicit datasets, here *the model should take all user-item
 preferences as an input, including those which are not related to any input

It is not saying that these non-observations (I would not call them
negative) should explicitly appear in the input. But their implicit
existence can and should be used in the math.

In particular, the loss function that is being minimized is minimizing
error in the implicit 0 cells of the input too, just with much less
weight.


Re: spark worker and yarn memory

2014-06-05 Thread Sandy Ryza
Hi Xu,

As crazy as it might sound, this all makes sense.

There are a few different quantities at play here:
* the heap size of the executor (controlled by --executor-memory)
* the amount of memory spark requests from yarn (the heap size plus
384 mb to account for fixed memory costs outside if the heap)
* the amount of memory yarn grants to the container (yarn rounds up to
the nearest multiple of yarn.scheduler.minimum-allocation-mb or
yarn.scheduler.fair.increment-allocation-mb, depending on the
scheduler used)
* the amount of memory spark uses for caching on each executor, which
is spark.storage.memoryFraction (default 0.6) of the executor heap
size

So, with --executor-memory 8g, spark requests 8g + 384m from yarn,
which doesn't fit into it's container max.  With --executor-memory 7g,
Spark requests 7g + 384m from yarn, which fits into its container max.
 This gets rounded up to 8g by the yarn scheduler.  7g is still used
as the executor heap size, and .6 of this is about 4g, shown as the
cache space in the spark.

-Sandy

 On Jun 5, 2014, at 9:44 AM, Xu (Simon) Chen xche...@gmail.com wrote:

 I am slightly confused about the --executor-memory setting. My yarn cluster 
 has a maximum container memory of 8192MB.

 When I specify --executor-memory 8G in my spark-shell, no container can be 
 started at all. It only works when I lower the executor memory to 7G. But 
 then, on yarn, I see 2 container per node, using 16G of memory.

 Then on the spark UI, it shows that each worker has 4GB of memory, rather 
 than 7.

 Can someone explain the relationship among the numbers I see here?

 Thanks.


Re: Join : Giving incorrect result

2014-06-05 Thread Matei Zaharia
Hey Ajay, thanks for reporting this. There was indeed a bug, specifically in 
the way join tasks spill to disk (which happened when you had more concurrent 
tasks competing for memory). I’ve posted a patch for it here: 
https://github.com/apache/spark/pull/986. Feel free to try that if you’d like; 
it will also be in 0.9.2 and 1.0.1.

Matei

On Jun 5, 2014, at 12:19 AM, Ajay Srivastava a_k_srivast...@yahoo.com wrote:

 Sorry for replying late. It was night here.
 
 Lian/Matei,
 Here is the code snippet -
 sparkConf.set(spark.executor.memory, 10g)
 sparkConf.set(spark.cores.max, 5)
 
 val sc = new SparkContext(sparkConf)
 
 val accId2LocRDD = 
 sc.textFile(hdfs://bbr-dev178:9000/data/subDbSpark/account2location).map(getKeyValueFromString(_,
  0, ',', true))
   
 val accId2DemoRDD = 
 sc.textFile(hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType).map(getKeyValueFromString(_,
  0, ',', true))
 
 val joinedRDD = accId2LocRDD.join(accId2DemoRDD)
 
   def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char, 
 retFullLine: Boolean): Tuple2[String, String] = {
 val splits = line.split(delimit)
 if (splits.length = 1) {
   (null, null)
 } else if (retFullLine) {
   (splits(keyIndex), line)
 } else{
 (splits(keyIndex), splits(splits.length-keyIndex-1))
 }
   }
 
 Both of these files have 10 M records with same unique keys. Size of the file 
 is nearly 280 MB and block size in hdfs is 256 MB. The output of join should 
 contain 10 M records.
 
 We have done some more experiments -
 1) Running cogroup instead of join - it also gives incorrect count.
 2) Running union followed by groupbykey and then filtering records with two 
 entries in sequence - It also gives incorrect count.
 3) Increase spark.executor.memory to 50 g and everything works fine. Count 
 comes 10 M for join,cogroup and union/groupbykey/filter transformations.
 
 I thought that 10g is enough memory for executors but even if the memory is 
 less it should not result in incorrect computation. Probably there is a 
 problem in reconstructing RDDs when memory is not enough. 
 
 Thanks Chen for your observation. I get this problem on single worker so 
 there will not be any mismatch of jars. On two workers, since executor memory 
 gets doubled the code works fine.
 
 Regards,
 Ajay
 
 
 On Thursday, June 5, 2014 1:35 AM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 
 
 If this isn’t the problem, it would be great if you can post the code for the 
 program.
 
 Matei
 
 On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen xche...@gmail.com wrote:
 
 Maybe your two workers have different assembly jar files?
 I just ran into a similar problem that my spark-shell is using a different 
 jar file than my workers - got really confusing results.
 On Jun 4, 2014 8:33 AM, Ajay Srivastava a_k_srivast...@yahoo.com wrote:
 Hi,
 
 I am doing join of two RDDs which giving different results ( counting number 
 of records ) each time I run this code on same input.
 
 The input files are large enough to be divided in two splits. When the 
 program runs on two workers with single core assigned to these, output is 
 consistent and looks correct. But when single worker is used with two or 
 more than two cores, the result seems to be random. Every time, count of 
 joined record is different.
 
 Does this sound like a defect or I need to take care of something while 
 using join ? I am using spark-0.9.1.
 
 Regards
 Ajay
 
 
 



When does Spark switch from PROCESS_LOCAL to NODE_LOCAL or RACK_LOCAL?

2014-06-05 Thread Sung Hwan Chung
I noticed that sometimes tasks would switch from PROCESS_LOCAL (I'd assume
that this means fully cached) to NODE_LOCAL or even RACK_LOCAL.

When these happen things get extremely slow.

Does this mean that the executor got terminated and restarted?

Is there a way to prevent this from happening (barring the machine actually
going down, I'd rather stick with the same process)?


Re: Setting executor memory when using spark-shell

2014-06-05 Thread Andrew Ash
Oh my apologies that was for 1.0

For Spark 0.9 I did it like this:

MASTER=spark://mymaster:7077 SPARK_MEM=8g ./bin/spark-shell -c
$CORES_ACROSS_CLUSTER

The downside of this though is that SPARK_MEM also sets the driver's JVM to
be 8g, rather than just the executors.  I think this is the reason for why
SPARK_MEM was deprecated.  See https://github.com/apache/spark/pull/99


On Thu, Jun 5, 2014 at 2:37 PM, Oleg Proudnikov oleg.proudni...@gmail.com
wrote:

 Thank you, Andrew,

 I am using Spark 0.9.1 and tried your approach like this:

 bin/spark-shell --driver-java-options
 -Dspark.executor.memory=$MEMORY_PER_EXECUTOR

 I get

 bad option: '--driver-java-options'

 There must be something different in my setup. Any ideas?

 Thank you again,
 Oleg





 On 5 June 2014 22:28, Andrew Ash and...@andrewash.com wrote:

 Hi Oleg,

 I set the size of my executors on a standalone cluster when using the
 shell like this:

 ./bin/spark-shell --master $MASTER --total-executor-cores
 $CORES_ACROSS_CLUSTER --driver-java-options
 -Dspark.executor.memory=$MEMORY_PER_EXECUTOR

 It doesn't seem particularly clean, but it works.

 Andrew


 On Thu, Jun 5, 2014 at 2:15 PM, Oleg Proudnikov 
 oleg.proudni...@gmail.com wrote:

 Hi All,

 Please help me set Executor JVM memory size. I am using Spark shell and
 it appears that the executors are started with a predefined JVM heap of
 512m as soon as Spark shell starts. How can I change this setting? I tried
 setting SPARK_EXECUTOR_MEMORY before launching Spark shell:

 export SPARK_EXECUTOR_MEMORY=1g

 I also tried several other approaches:

 1) setting SPARK_WORKER_MEMORY in conf/spark-env.sh on the worker
 2)  passing it as -m argument and running bin/start-slave.sh 1 -m 1g on
 the worker

 Thank you,
 Oleg





 --
 Kind regards,

 Oleg




Re: Join : Giving incorrect result

2014-06-05 Thread Andrew Ash
Hi Ajay,

Can you please try running the same code with spark.shuffle.spill=false and
see if the numbers turn out correctly?  That parameter controls whether or
not the buggy code that Matei fixed in ExternalAppendOnlyMap is used.

FWIW I saw similar issues in 0.9.0 but no longer in 0.9.1 after I think
some fixes in spilling landed.

Andrew


On Thu, Jun 5, 2014 at 3:05 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 Hey Ajay, thanks for reporting this. There was indeed a bug, specifically
 in the way join tasks spill to disk (which happened when you had more
 concurrent tasks competing for memory). I’ve posted a patch for it here:
 https://github.com/apache/spark/pull/986. Feel free to try that if you’d
 like; it will also be in 0.9.2 and 1.0.1.

 Matei

 On Jun 5, 2014, at 12:19 AM, Ajay Srivastava a_k_srivast...@yahoo.com
 wrote:

 Sorry for replying late. It was night here.

 Lian/Matei,
 Here is the code snippet -
 sparkConf.set(spark.executor.memory, 10g)
 sparkConf.set(spark.cores.max, 5)

 val sc = new SparkContext(sparkConf)

 val accId2LocRDD = sc.textFile(
 hdfs://bbr-dev178:9000/data/subDbSpark/account2location).map(getKeyValueFromString(_,
 0, ',', true))

 val accId2DemoRDD = sc.textFile(
 hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType).map(getKeyValueFromString(_,
 0, ',', true))

 val joinedRDD = accId2LocRDD.join(accId2DemoRDD)

   def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char,
 retFullLine: Boolean): Tuple2[String, String] = {
 val splits = line.split(delimit)
 if (splits.length = 1) {
   (null, null)
 } else if (retFullLine) {
   (splits(keyIndex), line)
 } else{
 (splits(keyIndex), splits(splits.length-keyIndex-1))
 }
   }

 Both of these files have 10 M records with same unique keys. Size of the
 file is nearly 280 MB and block size in hdfs is 256 MB. The output of join
 should contain 10 M records.

 We have done some more experiments -
 1) Running cogroup instead of join - it also gives incorrect count.
 2) Running union followed by groupbykey and then filtering records with
 two entries in sequence - It also gives incorrect count.
 3) Increase spark.executor.memory to 50 g and everything works fine. Count
 comes 10 M for join,cogroup and union/groupbykey/filter transformations.

 I thought that 10g is enough memory for executors but even if the memory
 is less it should not result in incorrect computation. Probably there is a
 problem in reconstructing RDDs when memory is not enough.

 Thanks Chen for your observation. I get this problem on single worker so
 there will not be any mismatch of jars. On two workers, since executor
 memory gets doubled the code works fine.

 Regards,
 Ajay


   On Thursday, June 5, 2014 1:35 AM, Matei Zaharia 
 matei.zaha...@gmail.com wrote:


  If this isn’t the problem, it would be great if you can post the code
 for the program.

 Matei

 On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen xche...@gmail.com wrote:

 Maybe your two workers have different assembly jar files?
 I just ran into a similar problem that my spark-shell is using a different
 jar file than my workers - got really confusing results.
 On Jun 4, 2014 8:33 AM, Ajay Srivastava a_k_srivast...@yahoo.com
 wrote:

 Hi,

 I am doing join of two RDDs which giving different results ( counting
 number of records ) each time I run this code on same input.

 The input files are large enough to be divided in two splits. When the
 program runs on two workers with single core assigned to these, output is
 consistent and looks correct. But when single worker is used with two or
 more than two cores, the result seems to be random. Every time, count of
 joined record is different.

 Does this sound like a defect or I need to take care of something while
 using join ? I am using spark-0.9.1.

 Regards
 Ajay








Re: When does Spark switch from PROCESS_LOCAL to NODE_LOCAL or RACK_LOCAL?

2014-06-05 Thread Sung Hwan Chung
On a related note, I'd also minimize any kind of executor movement. I.e.,
once an executor is spawned and data cached in the executor, I want that
executor to live all the way till the job is finished, or the machine fails
in a fatal manner.

What would be the best way to ensure that this is the case?


On Thu, Jun 5, 2014 at 3:13 PM, Sung Hwan Chung coded...@cs.stanford.edu
wrote:

 I noticed that sometimes tasks would switch from PROCESS_LOCAL (I'd assume
 that this means fully cached) to NODE_LOCAL or even RACK_LOCAL.

 When these happen things get extremely slow.

 Does this mean that the executor got terminated and restarted?

 Is there a way to prevent this from happening (barring the machine
 actually going down, I'd rather stick with the same process)?



Re: Using Spark on Data size larger than Memory size

2014-06-05 Thread Roger Hoover
Hi Aaron,

When you say that sorting is being worked on, can you elaborate a little
more please?

If particular, I want to sort the items within each partition (not
globally) without necessarily bringing them all into memory at once.

Thanks,

Roger


On Sat, May 31, 2014 at 11:10 PM, Aaron Davidson ilike...@gmail.com wrote:

 There is no fundamental issue if you're running on data that is larger
 than cluster memory size. Many operations can stream data through, and thus
 memory usage is independent of input data size. Certain operations require
 an entire *partition* (not dataset) to fit in memory, but there are not
 many instances of this left (sorting comes to mind, and this is being
 worked on).

 In general, one problem with Spark today is that you *can* OOM under
 certain configurations, and it's possible you'll need to change from the
 default configuration if you're using doing very memory-intensive jobs.
 However, there are very few cases where Spark would simply fail as a matter
 of course *-- *for instance, you can always increase the number of
 partitions to decrease the size of any given one. or repartition data to
 eliminate skew.

 Regarding impact on performance, as Mayur said, there may absolutely be an
 impact depending on your jobs. If you're doing a join on a very large
 amount of data with few partitions, then we'll have to spill to disk. If
 you can't cache your working set of data in memory, you will also see a
 performance degradation. Spark enables the use of memory to make things
 fast, but if you just don't have enough memory, it won't be terribly fast.


 On Sat, May 31, 2014 at 12:14 AM, Mayur Rustagi mayur.rust...@gmail.com
 wrote:

 Clearly thr will be impact on performance but frankly depends on what you
 are trying to achieve with the dataset.

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Sat, May 31, 2014 at 11:45 AM, Vibhor Banga vibhorba...@gmail.com
 wrote:

 Some inputs will be really helpful.

 Thanks,
 -Vibhor


 On Fri, May 30, 2014 at 7:51 PM, Vibhor Banga vibhorba...@gmail.com
 wrote:

 Hi all,

 I am planning to use spark with HBase, where I generate RDD by reading
 data from HBase Table.

 I want to know that in the case when the size of HBase Table grows
 larger than the size of RAM available in the cluster, will the application
 fail, or will there be an impact in performance ?

 Any thoughts in this direction will be helpful and are welcome.

 Thanks,
 -Vibhor




 --
 Vibhor Banga
 Software Development Engineer
 Flipkart Internet Pvt. Ltd., Bangalore






Re: Using Spark on Data size larger than Memory size

2014-06-05 Thread Roger Hoover
I think it would very handy to be able to specify that you want sorting
during a partitioning stage.


On Thu, Jun 5, 2014 at 4:42 PM, Roger Hoover roger.hoo...@gmail.com wrote:

 Hi Aaron,

 When you say that sorting is being worked on, can you elaborate a little
 more please?

 If particular, I want to sort the items within each partition (not
 globally) without necessarily bringing them all into memory at once.

 Thanks,

 Roger


 On Sat, May 31, 2014 at 11:10 PM, Aaron Davidson ilike...@gmail.com
 wrote:

 There is no fundamental issue if you're running on data that is larger
 than cluster memory size. Many operations can stream data through, and thus
 memory usage is independent of input data size. Certain operations require
 an entire *partition* (not dataset) to fit in memory, but there are not
 many instances of this left (sorting comes to mind, and this is being
 worked on).

 In general, one problem with Spark today is that you *can* OOM under
 certain configurations, and it's possible you'll need to change from the
 default configuration if you're using doing very memory-intensive jobs.
 However, there are very few cases where Spark would simply fail as a matter
 of course *-- *for instance, you can always increase the number of
 partitions to decrease the size of any given one. or repartition data to
 eliminate skew.

 Regarding impact on performance, as Mayur said, there may absolutely be
 an impact depending on your jobs. If you're doing a join on a very large
 amount of data with few partitions, then we'll have to spill to disk. If
 you can't cache your working set of data in memory, you will also see a
 performance degradation. Spark enables the use of memory to make things
 fast, but if you just don't have enough memory, it won't be terribly fast.


 On Sat, May 31, 2014 at 12:14 AM, Mayur Rustagi mayur.rust...@gmail.com
 wrote:

 Clearly thr will be impact on performance but frankly depends on what
 you are trying to achieve with the dataset.

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Sat, May 31, 2014 at 11:45 AM, Vibhor Banga vibhorba...@gmail.com
 wrote:

 Some inputs will be really helpful.

 Thanks,
 -Vibhor


 On Fri, May 30, 2014 at 7:51 PM, Vibhor Banga vibhorba...@gmail.com
 wrote:

 Hi all,

 I am planning to use spark with HBase, where I generate RDD by reading
 data from HBase Table.

 I want to know that in the case when the size of HBase Table grows
 larger than the size of RAM available in the cluster, will the application
 fail, or will there be an impact in performance ?

 Any thoughts in this direction will be helpful and are welcome.

 Thanks,
 -Vibhor




 --
 Vibhor Banga
 Software Development Engineer
 Flipkart Internet Pvt. Ltd., Bangalore







Re: Using Spark on Data size larger than Memory size

2014-06-05 Thread Andrew Ash
Hi Roger,

You should be able to sort within partitions using the rdd.mapPartitions()
method, and that shouldn't require holding all data in memory at once.  It
does require holding the entire partition in memory though.  Do you need
the partition to never be held in memory all at once?

As far as the work that Aaron mentioned is happening, I think he might be
referring to the discussion and code surrounding
https://issues.apache.org/jira/browse/SPARK-983

Cheers!
Andrew


On Thu, Jun 5, 2014 at 5:16 PM, Roger Hoover roger.hoo...@gmail.com wrote:

 I think it would very handy to be able to specify that you want sorting
 during a partitioning stage.


 On Thu, Jun 5, 2014 at 4:42 PM, Roger Hoover roger.hoo...@gmail.com
 wrote:

 Hi Aaron,

 When you say that sorting is being worked on, can you elaborate a little
 more please?

 If particular, I want to sort the items within each partition (not
 globally) without necessarily bringing them all into memory at once.

 Thanks,

 Roger


 On Sat, May 31, 2014 at 11:10 PM, Aaron Davidson ilike...@gmail.com
 wrote:

 There is no fundamental issue if you're running on data that is larger
 than cluster memory size. Many operations can stream data through, and thus
 memory usage is independent of input data size. Certain operations require
 an entire *partition* (not dataset) to fit in memory, but there are not
 many instances of this left (sorting comes to mind, and this is being
 worked on).

 In general, one problem with Spark today is that you *can* OOM under
 certain configurations, and it's possible you'll need to change from the
 default configuration if you're using doing very memory-intensive jobs.
 However, there are very few cases where Spark would simply fail as a matter
 of course *-- *for instance, you can always increase the number of
 partitions to decrease the size of any given one. or repartition data to
 eliminate skew.

 Regarding impact on performance, as Mayur said, there may absolutely be
 an impact depending on your jobs. If you're doing a join on a very large
 amount of data with few partitions, then we'll have to spill to disk. If
 you can't cache your working set of data in memory, you will also see a
 performance degradation. Spark enables the use of memory to make things
 fast, but if you just don't have enough memory, it won't be terribly fast.


 On Sat, May 31, 2014 at 12:14 AM, Mayur Rustagi mayur.rust...@gmail.com
  wrote:

 Clearly thr will be impact on performance but frankly depends on what
 you are trying to achieve with the dataset.

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Sat, May 31, 2014 at 11:45 AM, Vibhor Banga vibhorba...@gmail.com
 wrote:

 Some inputs will be really helpful.

 Thanks,
 -Vibhor


 On Fri, May 30, 2014 at 7:51 PM, Vibhor Banga vibhorba...@gmail.com
 wrote:

 Hi all,

 I am planning to use spark with HBase, where I generate RDD by
 reading data from HBase Table.

 I want to know that in the case when the size of HBase Table grows
 larger than the size of RAM available in the cluster, will the 
 application
 fail, or will there be an impact in performance ?

 Any thoughts in this direction will be helpful and are welcome.

 Thanks,
 -Vibhor




 --
 Vibhor Banga
 Software Development Engineer
 Flipkart Internet Pvt. Ltd., Bangalore








Re: How to shut down Spark Streaming with Kafka properly?

2014-06-05 Thread Tobias Pfeiffer
Sean,

your patch fixes the issue, thank you so much! (This is the second
time within one week I run into network libraries not shutting down
threads properly, I'm really glad your code fixes the issue.)

I saw your pull request is closed, but not merged yet. Can I do
anything to get your fix into Spark? Open an issue, send a pull
request myself etc.?

Thanks
Tobias


Re: Setting executor memory when using spark-shell

2014-06-05 Thread hassan
just use -Dspark.executor.memory=



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Setting-executor-memory-when-using-spark-shell-tp7082p7103.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Twitter feed options?

2014-06-05 Thread Jeremy Lee
Me again,

Things have been going well, actually. I've got my build chain sorted,
1.0.0 and streaming is working reliably. I managed to turn off the INFO
messages by messing with every log4j properties file on the system. :-)

On thing I would like to try now is some natural language processing on
some selected twitter streams. (ie: my own.) but the streaming example
seems to be 'sipping from the firehose'. I'm combing through the twitter4j
documentation now, but does anyone know a simple way of restricting the
'flood' to just my own timeline?

Otherwise, yes, this is now the fun part!

-- 
Jeremy Lee  BCompSci(Hons)
  The Unorthodox Engineers


RE: When does Spark switch from PROCESS_LOCAL to NODE_LOCAL or RACK_LOCAL?

2014-06-05 Thread Liu, Raymond
If some task have no locality preference,  it will also show up as 
PROCESS_LOCAL, yet, I think we probably need to name it NO_PREFER to make it 
more clear. Not sure is this your case.

Best Regards,
Raymond Liu

From: coded...@gmail.com [mailto:coded...@gmail.com] On Behalf Of Sung Hwan 
Chung
Sent: Friday, June 06, 2014 6:53 AM
To: user@spark.apache.org
Subject: Re: When does Spark switch from PROCESS_LOCAL to NODE_LOCAL or 
RACK_LOCAL?

Additionally, I've encountered some confusing situation where the locality 
level for a task showed up as 'PROCESS_LOCAL' even though I didn't cache the 
data. I wonder some implicit caching happens even without the user specifying 
things.

On Thu, Jun 5, 2014 at 3:50 PM, Sung Hwan Chung 
coded...@cs.stanford.edumailto:coded...@cs.stanford.edu wrote:
Thanks Andrew,

Is there a chance that even with full-caching, that modes other than 
PROCESS_LOCAL will be used? E.g., let's say, an executor will try to perform 
tasks although the data are cached on a different executor.

What I'd like to do is to prevent such a scenario entirely.

I'd like to know if setting 'spark.locality.wait' to a very high value would 
guarantee that the mode will always be 'PROCESS_LOCAL'.

On Thu, Jun 5, 2014 at 3:36 PM, Andrew Ash 
and...@andrewash.commailto:and...@andrewash.com wrote:
The locality is how close the data is to the code that's processing it.  
PROCESS_LOCAL means data is in the same JVM as the code that's running, so it's 
really fast.  NODE_LOCAL might mean that the data is in HDFS on the same node, 
or in another executor on the same node, so is a little slower because the data 
has to travel across an IPC connection.  RACK_LOCAL is even slower -- data is 
on a different server so needs to be sent over the network.

Spark switches to lower locality levels when there's no unprocessed data on a 
node that has idle CPUs.  In that situation you have two options: wait until 
the busy CPUs free up so you can start another task that uses data on that 
server, or start a new task on a farther away server that needs to bring data 
from that remote place.  What Spark typically does is wait a bit in the hopes 
that a busy CPU frees up.  Once that timeout expires, it starts moving the data 
from far away to the free CPU.

The main tunable option is how far long the scheduler waits before starting to 
move data rather than code.  Those are the spark.locality.* settings here: 
http://spark.apache.org/docs/latest/configuration.html

If you want to prevent this from happening entirely, you can set the values to 
ridiculously high numbers.  The documentation also mentions that 0 has 
special meaning, so you can try that as well.

Good luck!
Andrew

On Thu, Jun 5, 2014 at 3:13 PM, Sung Hwan Chung 
coded...@cs.stanford.edumailto:coded...@cs.stanford.edu wrote:
I noticed that sometimes tasks would switch from PROCESS_LOCAL (I'd assume that 
this means fully cached) to NODE_LOCAL or even RACK_LOCAL.

When these happen things get extremely slow.

Does this mean that the executor got terminated and restarted?

Is there a way to prevent this from happening (barring the machine actually 
going down, I'd rather stick with the same process)?





Re: spark worker and yarn memory

2014-06-05 Thread Xu (Simon) Chen
Nice explanation... Thanks!


On Thu, Jun 5, 2014 at 5:50 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Hi Xu,

 As crazy as it might sound, this all makes sense.

 There are a few different quantities at play here:
 * the heap size of the executor (controlled by --executor-memory)
 * the amount of memory spark requests from yarn (the heap size plus
 384 mb to account for fixed memory costs outside if the heap)
 * the amount of memory yarn grants to the container (yarn rounds up to
 the nearest multiple of yarn.scheduler.minimum-allocation-mb or
 yarn.scheduler.fair.increment-allocation-mb, depending on the
 scheduler used)
 * the amount of memory spark uses for caching on each executor, which
 is spark.storage.memoryFraction (default 0.6) of the executor heap
 size

 So, with --executor-memory 8g, spark requests 8g + 384m from yarn,
 which doesn't fit into it's container max.  With --executor-memory 7g,
 Spark requests 7g + 384m from yarn, which fits into its container max.
  This gets rounded up to 8g by the yarn scheduler.  7g is still used
 as the executor heap size, and .6 of this is about 4g, shown as the
 cache space in the spark.

 -Sandy

  On Jun 5, 2014, at 9:44 AM, Xu (Simon) Chen xche...@gmail.com wrote:
 
  I am slightly confused about the --executor-memory setting. My yarn
 cluster has a maximum container memory of 8192MB.
 
  When I specify --executor-memory 8G in my spark-shell, no container
 can be started at all. It only works when I lower the executor memory to
 7G. But then, on yarn, I see 2 container per node, using 16G of memory.
 
  Then on the spark UI, it shows that each worker has 4GB of memory,
 rather than 7.
 
  Can someone explain the relationship among the numbers I see here?
 
  Thanks.



Re: Twitter feed options?

2014-06-05 Thread Jeremy Lee
Nope, sorry, nevermind!

I looked at the source, and it was pretty obvious that it didn't implement
that yet, so I've ripped the classes out and am mutating them into a new
receivers right now...

... starting to get the hang of this.


On Fri, Jun 6, 2014 at 1:07 PM, Jeremy Lee unorthodox.engine...@gmail.com
wrote:


 Me again,

 Things have been going well, actually. I've got my build chain sorted,
 1.0.0 and streaming is working reliably. I managed to turn off the INFO
 messages by messing with every log4j properties file on the system. :-)

 On thing I would like to try now is some natural language processing on
 some selected twitter streams. (ie: my own.) but the streaming example
 seems to be 'sipping from the firehose'. I'm combing through the twitter4j
 documentation now, but does anyone know a simple way of restricting the
 'flood' to just my own timeline?

 Otherwise, yes, this is now the fun part!

 --
 Jeremy Lee  BCompSci(Hons)
   The Unorthodox Engineers




-- 
Jeremy Lee  BCompSci(Hons)
  The Unorthodox Engineers


KyroException: Unable to find class

2014-06-05 Thread Justin Yip
Hello,

I have been using Externalizer from Chill to as serialization wrapper. It
appears to me that Spark have some conflict with the classloader with
Chill. I have the (a simplified version) following program:

import java.io._
import com.twitter.chill.Externalizer

class X(val i: Int) { override def toString() = sX(i=$i) }

object SimpleApp {
  def main(args: Array[String]) {
val bos = new ByteArrayOutputStream(10)
val oos = new ObjectOutputStream(bos)
oos.writeObject(Externalizer(new X(10)))
oos.close()

val ois = new ObjectInputStream(new
ByteArrayInputStream(bos.toByteArray))
val y = ois.readObject.asInstanceOf[Externalizer[X]]
println(y.get)
  }
}

When I run it as a normal program (i.e. sbt run), the program runs fine.

But when I run it with spark-submit (i.e. spark-submit --verbose --class
SimpleApp --master local[4]
target/scala-2.10/simple-project_2.10-1.0.jar ), the program fails at
ois.readObject call. I got an error that Kryo fails to find the class X.

Exception in thread main com.esotericsoftware.kryo.KryoException: Unable
to find class: X
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)

I guess the issue is that Spark has a magic classloader, and Kryo fails to
see the same classpaths.

Is there anyway to remedy this issue?

Thanks.

Justin


Spark Streaming NeteorkReceiver problems

2014-06-05 Thread zzzzzqf12345
hi,
here is problem description, I write a custom networkreceiver to receive
image data from camera. I had confirmed all the data received correctly.

1)when data received, only the networkreceiver node run at full speed, while
other nodes keep idle, my spark cluster has 6 nodes.

2)And every image data is calculated many times, which I expected to
calculate once.

3)How to distribute tasks to the whole cluster?

I tried dstream.repartitioin.map(), problems in 2) is still existed ,and
sometimes job faidled.

here is spark configration,
conf.set(spark.executor.memory, 3g);
conf.set(spark.shuffle.netty.connect.timeout, 30)
conf.set(spark.storage.blockManagerSlaveTimeoutMs, 30)
conf.set(spark.akka.timeout,600)
conf.set(spark.akka.threads,8)

node : 8 cores, mem 6G.
NetWork : 1Gb/bps

Any suggestions will be appreciated. 

thanks,
QingFeng



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-NeteorkReceiver-problems-tp7109.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.