Re: How to RDD.take(middle 10 elements)

2014-07-10 Thread Xiangrui Meng
This is expensive but doable:

rdd.zipWithIndex().filter { case (_, idx) => idx >= 10 && idx < 20 }.collect()

-Xiangrui

On Thu, Jul 10, 2014 at 12:53 PM, Nick Chammas
 wrote:
> Interesting question on Stack Overflow:
> http://stackoverflow.com/q/24677180/877069
>
> Basically, is there a way to take() elements of an RDD at an arbitrary
> index?
>
> Nick
>
>
> 
> View this message in context: How to RDD.take(middle 10 elements)
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Terminal freeze during SVM

2014-07-10 Thread Xiangrui Meng
news20.binary's feature dimension is 1.35M. So the serialized task
size is above the default limit 10M. You need to set
spark.akka.frameSize to, e.g, 20. Due to a bug SPARK-1112, this
parameter is not passed to executors automatically, which causes Spark
freezes. This was fixed in the latest master and v1.0.1-rc2. If you
rebuild spark, remember to sync the assembly jar to workers. -Xiangrui

On Thu, Jul 10, 2014 at 7:56 AM, AlexanderRiggers
 wrote:
> Tried the newest branch, but still get stuck on the same task: (kill) runJob
> at SlidingRDD.scala:74
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Terminal-freeze-during-SVM-Broken-pipe-tp9022p9304.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: KMeans code is rubbish

2014-07-10 Thread Xiangrui Meng
SparkKMeans is a naive implementation. Please use
mllib.clustering.KMeans in practice. I created a JIRA for this:
https://issues.apache.org/jira/browse/SPARK-2434 -Xiangrui

On Thu, Jul 10, 2014 at 2:45 AM, Tathagata Das
 wrote:
> I ran the SparkKMeans example (not the mllib KMeans that Sean ran) with your
> dataset as well, I got the expected answer. And I believe that even though
> initialization is done using sampling, the example actually sets the seed to
> a constant 42, so the result should always be the same no matter how many
> times you run it. So I am not really sure whats going on here.
>
> Can you tell us more about which version of Spark you are running? Which
> Java version?
>
>
> ==
>
> [tdas @ Xion spark2] cat input
> 2 1
> 1 2
> 3 2
> 2 3
> 4 1
> 5 1
> 6 1
> 4 2
> 6 2
> 4 3
> 5 3
> 6 3
> [tdas @ Xion spark2] ./bin/run-example SparkKMeans input 2 0.001
> 2014-07-10 02:45:06.764 java[45244:d17] Unable to load realm info from
> SCDynamicStore
> 14/07/10 02:45:07 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 14/07/10 02:45:07 WARN LoadSnappy: Snappy native library not loaded
> 14/07/10 02:45:08 WARN BLAS: Failed to load implementation from:
> com.github.fommil.netlib.NativeSystemBLAS
> 14/07/10 02:45:08 WARN BLAS: Failed to load implementation from:
> com.github.fommil.netlib.NativeRefBLAS
> Finished iteration (delta = 3.0)
> Finished iteration (delta = 0.0)
> Final centers:
> DenseVector(5.0, 2.0)
> DenseVector(2.0, 2.0)
>
>
>
> On Thu, Jul 10, 2014 at 2:17 AM, Wanda Hawk  wrote:
>>
>> so this is what I am running:
>> "./bin/run-example SparkKMeans ~/Documents/2dim2.txt 2 0.001"
>>
>> And this is the input file:"
>> ┌───[spark2013@SparkOne]──[~/spark-1.0.0].$
>> └───#!cat ~/Documents/2dim2.txt
>> 2 1
>> 1 2
>> 3 2
>> 2 3
>> 4 1
>> 5 1
>> 6 1
>> 4 2
>> 6 2
>> 4 3
>> 5 3
>> 6 3
>> "
>>
>> This is the final output from spark:
>> "14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
>> Getting 2 non-empty blocks out of 2 blocks
>> 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
>> Started 0 remote fetches in 0 ms
>> 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
>> maxBytesInFlight: 50331648, targetRequestSize: 10066329
>> 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
>> Getting 2 non-empty blocks out of 2 blocks
>> 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
>> Started 0 remote fetches in 0 ms
>> 14/07/10 20:05:12 INFO Executor: Serialized size of result for 14 is 1433
>> 14/07/10 20:05:12 INFO Executor: Sending result for 14 directly to driver
>> 14/07/10 20:05:12 INFO Executor: Finished task ID 14
>> 14/07/10 20:05:12 INFO DAGScheduler: Completed ResultTask(6, 0)
>> 14/07/10 20:05:12 INFO TaskSetManager: Finished TID 14 in 5 ms on
>> localhost (progress: 1/2)
>> 14/07/10 20:05:12 INFO Executor: Serialized size of result for 15 is 1433
>> 14/07/10 20:05:12 INFO Executor: Sending result for 15 directly to driver
>> 14/07/10 20:05:12 INFO Executor: Finished task ID 15
>> 14/07/10 20:05:12 INFO DAGScheduler: Completed ResultTask(6, 1)
>> 14/07/10 20:05:12 INFO TaskSetManager: Finished TID 15 in 7 ms on
>> localhost (progress: 2/2)
>> 14/07/10 20:05:12 INFO DAGScheduler: Stage 6 (collectAsMap at
>> SparkKMeans.scala:75) finished in 0.008 s
>> 14/07/10 20:05:12 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks
>> have all completed, from pool
>> 14/07/10 20:05:12 INFO SparkContext: Job finished: collectAsMap at
>> SparkKMeans.scala:75, took 0.02472681 s
>> Finished iteration (delta = 0.0)
>> Final centers:
>> DenseVector(2.8571428571428568, 2.0)
>> DenseVector(5.6005, 2.0)
>> "
>>
>>
>>
>>
>> On Thursday, July 10, 2014 12:02 PM, Bertrand Dechoux 
>> wrote:
>>
>>
>> A picture is worth a thousand... Well, a picture with this dataset, what
>> you are expecting and what you get, would help answering your initial
>> question.
>>
>> Bertrand
>>
>>
>> On Thu, Jul 10, 2014 at 10:44 AM, Wanda Hawk 
>> wrote:
>>
>> Can someone please run the standard kMeans code on this input with 2
>> centers ?:
>> 2 1
>> 1 2
>> 3 2
>> 2 3
>> 4 1
>> 5 1
>> 6 1
>> 4 2
>> 6 2
>> 4 3
>> 5 3
>> 6 3
>>
>> The obvious result should be (2,2) and (5,2) ... (you can draw them if you
>> don't believe me ...)
>>
>> Thanks,
>> Wanda
>>
>>
>>
>>
>


Re: Execution stalls in LogisticRegressionWithSGD

2014-07-09 Thread Xiangrui Meng
We have maven-enforcer-plugin defined in the pom. I don't know why it
didn't work for you. Could you try rebuild with maven2 and confirm
that there is no error message? If that is the case, please create a
JIRA for it. Thanks! -Xiangrui

On Wed, Jul 9, 2014 at 3:53 AM, Bharath Ravi Kumar  wrote:
> Xiangrui,
>
> Thanks for all the help in resolving this issue. The  cause turned out to
> bethe build environment rather than runtime configuration. The build process
> had picked up maven2 while building spark. Using binaries that were rebuilt
> using m3, the entire processing went through fine. While I'm aware that the
> build instruction page specifies m3 as the min requirement, declaratively
> preventing accidental m2 usage (e.g. through something like the maven
> enforcer plugin?) might help other developers avoid such issues.
>
> -Bharath
>
>
>
> On Mon, Jul 7, 2014 at 9:43 PM, Xiangrui Meng  wrote:
>>
>> It seems to me a setup issue. I just tested news20.binary (1355191
>> features) on a 2-node EC2 cluster and it worked well. I added one line
>> to conf/spark-env.sh:
>>
>> export SPARK_JAVA_OPTS=" -Dspark.akka.frameSize=20 "
>>
>> and launched spark-shell with "--driver-memory 20g". Could you re-try
>> with an EC2 setup? If it still doesn't work, please attach all your
>> code and logs.
>>
>> Best,
>> Xiangrui
>>
>> On Sun, Jul 6, 2014 at 1:35 AM, Bharath Ravi Kumar 
>> wrote:
>> > Hi Xiangrui,
>> >
>> > 1) Yes, I used the same build (compiled locally from source) to the host
>> > that has (master, slave1) and the second host with slave2.
>> >
>> > 2) The execution was successful when run in local mode with reduced
>> > number
>> > of partitions. Does this imply issues communicating/coordinating across
>> > processes (i.e. driver, master and workers)?
>> >
>> > Thanks,
>> > Bharath
>> >
>> >
>> >
>> > On Sun, Jul 6, 2014 at 11:37 AM, Xiangrui Meng  wrote:
>> >>
>> >> Hi Bharath,
>> >>
>> >> 1) Did you sync the spark jar and conf to the worker nodes after build?
>> >> 2) Since the dataset is not large, could you try local mode first
>> >> using `spark-summit --driver-memory 12g --master local[*]`?
>> >> 3) Try to use less number of partitions, say 5.
>> >>
>> >> If the problem is still there, please attach the full master/worker log
>> >> files.
>> >>
>> >> Best,
>> >> Xiangrui
>> >>
>> >> On Fri, Jul 4, 2014 at 12:16 AM, Bharath Ravi Kumar
>> >> 
>> >> wrote:
>> >> > Xiangrui,
>> >> >
>> >> > Leaving the frameSize unspecified led to an error message (and
>> >> > failure)
>> >> > stating that the task size (~11M) was larger. I hence set it to an
>> >> > arbitrarily large value ( I realize 500 was unrealistic & unnecessary
>> >> > in
>> >> > this case). I've now set the size to 20M and repeated the runs. The
>> >> > earlier
>> >> > runs were on an uncached RDD. Caching the RDD (and setting
>> >> > spark.storage.memoryFraction=0.5) resulted in marginal speed up of
>> >> > execution, but the end result remained the same. The cached RDD size
>> >> > is
>> >> > as
>> >> > follows:
>> >> >
>> >> > RDD NameStorage LevelCached Partitions
>> >> > Fraction CachedSize in MemorySize in TachyonSize on
>> >> > Disk
>> >> > 1084 Memory Deserialized 1x Replicated 80
>> >> > 100% 165.9 MB 0.0 B 0.0 B
>> >> >
>> >> >
>> >> >
>> >> > The corresponding master logs were:
>> >> >
>> >> > 14/07/04 06:29:34 INFO Master: Removing executor
>> >> > app-20140704062238-0033/1
>> >> > because it is EXITED
>> >> > 14/07/04 06:29:34 INFO Master: Launching executor
>> >> > app-20140704062238-0033/2
>> >> > on worker worker-20140630124441-slave1-40182
>> >> > 14/07/04 06:29:34 INFO Master: Removing executor
>> >> > app-20140704062238-0033/0
>> >> > because it is EXITED
>> >> > 14/07/04 06:29:34 INFO Master: Launching executor
>> >> > app-20140704062238-0033/3
>> >>

Re: Help for the large number of the input data files

2014-07-08 Thread Xiangrui Meng
You can either use sc.wholeTextFiles and then a flatMap to reduce the
number of partitions, or give more memory to the driver process by
using --driver-memory 20g and then call RDD.repartition(small number)
after you load the data in. -Xiangrui

On Mon, Jul 7, 2014 at 7:38 PM, innowireless TaeYun Kim
 wrote:
> Hi,
>
>
>
> A help for the implementation best practice is needed.
>
> The operating environment is as follows:
>
>
>
> - Log data file arrives irregularly.
>
> - The size of a log data file is from 3.9KB to 8.5MB. The average is about
> 1MB.
>
> - The number of records of a data file is from 13 lines to 22000 lines. The
> average is about 2700 lines.
>
> - Data file must be post-processed before aggregation.
>
> - Post-processing algorithm can be changed.
>
> - Post-processed file is managed separately with original data file, since
> the post-processing algorithm might be changed.
>
> - Daily aggregation is performed. All post-processed data file must be
> filtered record-by-record and aggregation(average, max min…) is calculated.
>
> - Since aggregation is fine-grained, the number of records after the
> aggregation is not so small. It can be about half of the number of the
> original records.
>
> - At a point, the number of the post-processed file can be about 200,000.
>
> - A data file should be able to be deleted individually.
>
>
>
> In a test, I tried to process 160,000 post-processed files by Spark starting
> with sc.textFile() with glob path, it failed with OutOfMemory exception on
> the driver process.
>
>
>
> What is the best practice to handle this kind of data?
>
> Should I use HBase instead of plain files to save post-processed data?
>
>
>
> Thank you.
>
>


Re: Error and doubts in using Mllib Naive bayes for text clasification

2014-07-08 Thread Xiangrui Meng
1) The feature dimension should be a fixed number before you run
NaiveBayes. If you use bag of words, you need to handle the
word-to-index dictionary by yourself. You can either ignore the words
that never appear in training (because they have no effect in
prediction), or use hashing to randomly project words to a fixed-sized
feature space (collision may happen).

3) Yes, we saved the log conditional probabilities. So to compute the
likelihood, we only need summation.

Best,
Xiangrui

On Tue, Jul 8, 2014 at 12:01 AM, Rahul Bhojwani
 wrote:
> I am really sorry. Its actually my mistake. My problem 2 is wrong because
> using a single feature is a senseless thing. Sorry for the inconvenience.
> But still I will be waiting for the solutions for problem 1 and 3.
>
> Thanks,
>
>
> On Tue, Jul 8, 2014 at 12:14 PM, Rahul Bhojwani
>  wrote:
>>
>> Hello,
>>
>> I am a novice.I want to classify the text into two classes. For this
>> purpose I  want to use Naive Bayes model. I am using Python for it.
>>
>> Here are the problems I am facing:
>>
>> Problem 1: I wanted to use all words as features for the bag of words
>> model. Which means my features will be count of individual words. In this
>> case whenever a new word comes in the test data (which was never present in
>> the train data) I need to increase the size of the feature vector to
>> incorporate that word as well. Correct me if I am wrong. Can I do that in
>> the present Mllib NaiveBayes. Or what is the way in which I can incorporate
>> this?
>>
>> Problem 2: As I was not able to proceed with all words I did some
>> pre-processing and figured out few features from the text. But using this
>> also is giving errors.
>> Right now I was testing for only one feature from the text that is count
>> of positive words. I am submitting the code below, along with the error:
>>
>>
>> #Code
>>
>> import tokenizer
>> import gettingWordLists as gl
>> from pyspark.mllib.classification import NaiveBayes
>> from numpy import array
>> from pyspark import SparkContext, SparkConf
>>
>> conf = (SparkConf().setMaster("local[6]").setAppName("My
>> app").set("spark.executor.memory", "1g"))
>>
>> sc=SparkContext(conf = conf)
>>
>> # Getting the positive dict:
>> pos_list = []
>> pos_list = gl.getPositiveList()
>> tok = tokenizer.Tokenizer(preserve_case=False)
>>
>>
>> train_data  = []
>>
>> with open("training_file.csv","r") as train_file:
>> for line in train_file:
>> tokens = line.split(",")
>> msg = tokens[0]
>> sentiment = tokens[1]
>> count = 0
>> tokens = set(tok.tokenize(msg))
>> for i in tokens:
>> if i.encode('utf-8') in pos_list:
>> count+=1
>> if sentiment.__contains__('NEG'):
>> label = 0.0
>> else:
>> label = 1.0
>> feature = []
>> feature.append(label)
>> feature.append(float(count))
>> train_data.append(feature)
>>
>>
>> model = NaiveBayes.train(sc.parallelize(array(train_data)))
>> print model.pi
>> print model.theta
>> print "\n\n\n\n\n" , model.predict(array([5.0]))
>>
>> ##
>> This is the output:
>>
>> [-2.24512292 -0.11195389]
>> [[ 0.]
>>  [ 0.]]
>>
>>
>>
>>
>>
>> Traceback (most recent call last):
>>   File "naive_bayes_analyser.py", line 77, in 
>> print "\n\n\n\n\n" , model.predict(array([5.0]))
>>   File
>> "F:\spark-0.9.1\spark-0.9.1\python\pyspark\mllib\classification.py", line
>>  101, in predict
>> return numpy.argmax(self.pi + dot(x, self.theta))
>> ValueError: matrices are not aligned
>>
>> ##
>>
>> Problem 3: As you can see the output for model.pi is -ve. That is prior
>> probabilities are negative. Can someone explain that also. Is it the log of
>> the probability?
>>
>>
>>
>> Thanks,
>> --
>> Rahul K Bhojwani
>> 3rd Year B.Tech
>> Computer Science and Engineering
>> National Institute of Technology, Karnataka
>
>
>
>
> --
> Rahul K Bhojwani
> 3rd Year B.Tech
> Computer Science and Engineering
> National Institute of Technology, Karnataka


Re: got java.lang.AssertionError when run sbt/sbt compile

2014-07-08 Thread Xiangrui Meng
try sbt/sbt clean first

On Tue, Jul 8, 2014 at 8:25 AM, bai阿蒙  wrote:
> Hi guys,
> when i try to compile the latest source by sbt/sbt compile, I got an error.
> Can any one help me?
>
> The following is the detail: it may cause by TestSQLContext.scala
> [error]
> [error]  while compiling:
> /disk3/spark/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
> [error] during phase: jvm
> [error]  library version: version 2.10.4
> [error] compiler version: version 2.10.4
> [error]   reconstructed args: -Xmax-classfile-name 120 -classpath
> /disk3/spark/sql/core/target/scala-2.10/classes:/disk3/spark/core/target/scala-2.10/classes:/disk3/spark/sql/catalyst/target/scala-2.10/classes:/disk3/spark/lib_managed/jars/netty-all-4.0.17.Final.jar:/disk3/spark/lib_managed/jars/jetty-server-8.1.14.v20131031.jar:/disk3/spark/lib_managed/orbits/javax.servlet-3.0.0.v201112011016.jar:/disk3/spark/lib_managed/jars/jetty-continuation-8.1.14.v20131031.jar:/disk3/spark/lib_managed/jars/jetty-http-8.1.14.v20131031.jar:/disk3/spark/lib_managed/jars/jetty-io-8.1.14.v20131031.jar:/disk3/spark/lib_managed/jars/jetty-util-8.1.14.v20131031.jar:/disk3/spark/lib_managed/jars/jetty-plus-8.1.14.v20131031.jar:/disk3/spark/lib_managed/orbits/javax.transaction-1.1.1.v201105210645.jar:/disk3/spark/lib_managed/jars/jetty-webapp-8.1.14.v20131031.jar:/disk3/spark/lib_managed/jars/jetty-xml-8.1.14.v20131031.jar:/disk3/spark/lib_managed/jars/jetty-servlet-8.1.14.v20131031.jar:/disk3/spark/lib_managed/jars/jetty-security-8.1.14.v20131031.jar:/disk3/spark/lib_managed/jars/jetty-jndi-8.1.14.v20131031.jar:/disk3/spark/lib_managed/orbits/javax.mail.glassfish-1.4.1.v201005082020.jar:/disk3/spark/lib_managed/orbits/javax.activation-1.1.0.v201105071233.jar:/disk3/spark/lib_managed/bundles/guava-14.0.1.jar:/disk3/spark/lib_managed/jars/commons-lang3-3.3.2.jar:/disk3/spark/lib_managed/jars/jsr305-1.3.9.jar:/disk3/spark/lib_managed/bundles/log4j-1.2.17.jar:/disk3/spark/lib_managed/jars/slf4j-api-1.7.5.jar:/disk3/spark/lib_managed/jars/slf4j-log4j12-1.7.5.jar:/disk3/spark/lib_managed/jars/jul-to-slf4j-1.7.5.jar:/disk3/spark/lib_managed/jars/jcl-over-slf4j-1.7.5.jar:/disk3/spark/lib_managed/jars/commons-daemon-1.0.10.jar:/disk3/spark/lib_managed/bundles/compress-lzf-1.0.0.jar:/disk3/spark/lib_managed/bundles/snappy-java-1.0.5.jar:/disk3/spark/lib_managed/bundles/akka-remote_2.10-2.2.3-shaded-protobuf.jar:/disk3/spark/lib_managed/jars/akka-actor_2.10-2.2.3-shaded-protobuf.jar:/disk3/spark/lib_managed/bundles/config-1.0.2.jar:/disk3/spark/lib_managed/bundles/netty-3.6.6.Final.jar:/disk3/spark/lib_managed/jars/protobuf-java-2.4.1-shaded.jar:/disk3/spark/lib_managed/jars/uncommons-maths-1.2.2a.jar:/disk3/spark/lib_managed/bundles/akka-slf4j_2.10-2.2.3-shaded-protobuf.jar:/disk3/spark/lib_managed/jars/json4s-jackson_2.10-3.2.6.jar:/disk3/spark/lib_managed/jars/json4s-core_2.10-3.2.6.jar:/disk3/spark/lib_managed/jars/json4s-ast_2.10-3.2.6.jar:/disk3/spark/lib_managed/jars/paranamer-2.6.jar:/disk3/spark/lib_managed/jars/scalap-2.10.0.jar:/disk3/spark/lib_managed/jars/scala-compiler-2.10.0.jar:/disk3/spark/lib_managed/jars/scala-reflect-2.10.0.jar:/disk3/spark/lib_managed/bundles/jackson-databind-2.3.0.jar:/disk3/spark/lib_managed/bundles/jackson-annotations-2.3.0.jar:/disk3/spark/lib_managed/bundles/jackson-core-2.3.0.jar:/disk3/spark/lib_managed/jars/colt-1.2.0.jar:/disk3/spark/lib_managed/jars/concurrent-1.3.4.jar:/disk3/spark/lib_managed/jars/mesos-0.18.1-shaded-protobuf.jar:/disk3/spark/lib_managed/jars/commons-net-2.2.jar:/disk3/spark/lib_managed/jars/jets3t-0.7.1.jar:/disk3/spark/lib_managed/jars/commons-codec-1.5.jar:/disk3/spark/lib_managed/jars/commons-httpclient-3.1.jar:/disk3/spark/lib_managed/jars/hadoop-client-1.0.4.jar:/disk3/spark/lib_managed/jars/hadoop-core-1.0.4.jar:/disk3/spark/lib_managed/jars/xmlenc-0.52.jar:/disk3/spark/lib_managed/jars/commons-math-2.1.jar:/disk3/spark/lib_managed/jars/commons-configuration-1.6.jar:/disk3/spark/lib_managed/jars/commons-collections-3.2.1.jar:/disk3/spark/lib_managed/jars/commons-lang-2.4.jar:/disk3/spark/lib_managed/jars/commons-digester-1.8.jar:/disk3/spark/lib_managed/jars/commons-beanutils-1.7.0.jar:/disk3/spark/lib_managed/jars/commons-beanutils-core-1.8.0.jar:/disk3/spark/lib_managed/jars/commons-el-1.0.jar:/disk3/spark/lib_managed/jars/hsqldb-1.8.0.10.jar:/disk3/spark/lib_managed/jars/oro-2.0.8.jar:/disk3/spark/lib_managed/bundles/curator-recipes-2.4.0.jar:/disk3/spark/lib_managed/bundles/curator-framework-2.4.0.jar:/disk3/spark/lib_managed/bundles/curator-client-2.4.0.jar:/disk3/spark/lib_managed/jars/zookeeper-3.4.5.jar:/disk3/spark/lib_managed/jars/jline-0.9.94.jar:/disk3/spark/lib_managed/bundles/metrics-core-3.0.0.jar:/disk3/spark/lib_managed/bundles/metrics-jvm-3.0.0.jar:/disk3/spark/lib_managed/bundles/metrics-json-3.0.0.jar:/disk3/spark/lib_managed/bundles/metrics-graphite-3.0.0.jar:/disk3/spark/lib_managed/jars/chill_2.10-0.3.6.jar:/disk3/spark/lib_managed/jars

Re: Is MLlib NaiveBayes implementation for Spark 0.9.1 correct?

2014-07-08 Thread Xiangrui Meng
Well, I believe this is a correct implementation but please let us
know if you run into problems. The NaiveBayes implementation in MLlib
v1.0 supports sparse data, which is usually the case for text
classificiation. I would recommend upgrading to v1.0. -Xiangrui

On Tue, Jul 8, 2014 at 7:20 AM, Rahul Bhojwani
 wrote:
> Hi,
>
> I wanted to use Naive Bayes for a text classification problem.I am using
> Spark 0.9.1.
> I was just curious to ask that is the Naive Bayes implementation in Spark
> 0.9.1 correct? Or are there any bugs in the Spark 0.9.1 implementation which
> are taken care in Spark 1.0. My question is specific about MLlib Naive Bayes
> implementation only. Also I am using Python.(If that adds any ease for
> answer)
>
>
> Thanks
> --
> Rahul K Bhojwani
> 3rd Year B.Tech
> Computer Science and Engineering
> National Institute of Technology, Karnataka


Re: How to incorporate the new data in the MLlib-NaiveBayes model along with predicting?

2014-07-08 Thread Xiangrui Meng
Hi Rahul,

We plan to add online model updates with Spark Streaming, perhaps in
v1.1, starting with linear methods. Please open a JIRA for Naive
Bayes. For Naive Bayes, we need to update the priors and conditional
probabilities, which means we should also remember the number of
observations for the updates.

Best,
Xiangrui

On Tue, Jul 8, 2014 at 7:35 AM, Rahul Bhojwani
 wrote:
> Hi,
>
> I am using the MLlib Naive Bayes for a text classification problem. I have
> very less amount of training data. And then the data will be coming
> continuously and I need to classify it as either A or B. I am training the
> MLlib Naive Bayes model using the training data but next time when data
> comes, I want to predict its class and then incorporate that also in the
> model for next time prediction of new data(I think that is obvious).
>
> So I am not able to figure out what is the way to do that using MLlib Naive
> Bayes. Is it that I have to train the model on the whole data every time new
> data comes in??
>
> Thanks in Advance!
> --
> Rahul K Bhojwani
> 3rd Year B.Tech
> Computer Science and Engineering
> National Institute of Technology, Karnataka


Re: Dense to sparse vector converter

2014-07-07 Thread Xiangrui Meng
No, but it should be easy to add one. -Xiangrui

On Mon, Jul 7, 2014 at 12:37 AM, Ulanov, Alexander
 wrote:
> Hi,
>
>
>
> Is there a method in Spark/MLlib to convert DenseVector to SparseVector?
>
>
>
> Best regards, Alexander


Re: Execution stalls in LogisticRegressionWithSGD

2014-07-07 Thread Xiangrui Meng
It seems to me a setup issue. I just tested news20.binary (1355191
features) on a 2-node EC2 cluster and it worked well. I added one line
to conf/spark-env.sh:

export SPARK_JAVA_OPTS=" -Dspark.akka.frameSize=20 "

and launched spark-shell with "--driver-memory 20g". Could you re-try
with an EC2 setup? If it still doesn't work, please attach all your
code and logs.

Best,
Xiangrui

On Sun, Jul 6, 2014 at 1:35 AM, Bharath Ravi Kumar  wrote:
> Hi Xiangrui,
>
> 1) Yes, I used the same build (compiled locally from source) to the host
> that has (master, slave1) and the second host with slave2.
>
> 2) The execution was successful when run in local mode with reduced number
> of partitions. Does this imply issues communicating/coordinating across
> processes (i.e. driver, master and workers)?
>
> Thanks,
> Bharath
>
>
>
> On Sun, Jul 6, 2014 at 11:37 AM, Xiangrui Meng  wrote:
>>
>> Hi Bharath,
>>
>> 1) Did you sync the spark jar and conf to the worker nodes after build?
>> 2) Since the dataset is not large, could you try local mode first
>> using `spark-summit --driver-memory 12g --master local[*]`?
>> 3) Try to use less number of partitions, say 5.
>>
>> If the problem is still there, please attach the full master/worker log
>> files.
>>
>> Best,
>> Xiangrui
>>
>> On Fri, Jul 4, 2014 at 12:16 AM, Bharath Ravi Kumar 
>> wrote:
>> > Xiangrui,
>> >
>> > Leaving the frameSize unspecified led to an error message (and failure)
>> > stating that the task size (~11M) was larger. I hence set it to an
>> > arbitrarily large value ( I realize 500 was unrealistic & unnecessary in
>> > this case). I've now set the size to 20M and repeated the runs. The
>> > earlier
>> > runs were on an uncached RDD. Caching the RDD (and setting
>> > spark.storage.memoryFraction=0.5) resulted in marginal speed up of
>> > execution, but the end result remained the same. The cached RDD size is
>> > as
>> > follows:
>> >
>> > RDD NameStorage LevelCached Partitions
>> > Fraction CachedSize in MemorySize in TachyonSize on Disk
>> > 1084 Memory Deserialized 1x Replicated 80
>> > 100% 165.9 MB 0.0 B 0.0 B
>> >
>> >
>> >
>> > The corresponding master logs were:
>> >
>> > 14/07/04 06:29:34 INFO Master: Removing executor
>> > app-20140704062238-0033/1
>> > because it is EXITED
>> > 14/07/04 06:29:34 INFO Master: Launching executor
>> > app-20140704062238-0033/2
>> > on worker worker-20140630124441-slave1-40182
>> > 14/07/04 06:29:34 INFO Master: Removing executor
>> > app-20140704062238-0033/0
>> > because it is EXITED
>> > 14/07/04 06:29:34 INFO Master: Launching executor
>> > app-20140704062238-0033/3
>> > on worker worker-20140630102913-slave2-44735
>> > 14/07/04 06:29:37 INFO Master: Removing executor
>> > app-20140704062238-0033/2
>> > because it is EXITED
>> > 14/07/04 06:29:37 INFO Master: Launching executor
>> > app-20140704062238-0033/4
>> > on worker worker-20140630124441-slave1-40182
>> > 14/07/04 06:29:37 INFO Master: Removing executor
>> > app-20140704062238-0033/3
>> > because it is EXITED
>> > 14/07/04 06:29:37 INFO Master: Launching executor
>> > app-20140704062238-0033/5
>> > on worker worker-20140630102913-slave2-44735
>> > 14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2:45172 got
>> > disassociated, removing it.
>> > 14/07/04 06:29:39 INFO Master: Removing app app-20140704062238-0033
>> > 14/07/04 06:29:39 INFO LocalActorRef: Message
>> > [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying]
>> > from
>> > Actor[akka://sparkMaster/deadLetters] to
>> >
>> > Actor[akka://sparkMaster/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkMaster%4010.3.1.135%3A33061-123#1986674260]
>> > was not delivered. [39] dead letters encountered. This logging can be
>> > turned
>> > off or adjusted with configuration settings 'akka.log-dead-letters' and
>> > 'akka.log-dead-letters-during-shutdown'.
>> > 14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2:45172 got
>> > disassociated, removing it.
>> > 14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2:45172 got
>> > disassociated, removing it.
>> > 14/07/04 06:29:39 ERROR EndpointWriter: AssociationE

Re: Execution stalls in LogisticRegressionWithSGD

2014-07-05 Thread Xiangrui Meng
NFO SecurityManager: Changing view acls to: user1
> 14/07/04 06:22:40 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(user1)
> 14/07/04 06:22:40 INFO Slf4jLogger: Slf4jLogger started
> 14/07/04 06:22:40 INFO Remoting: Starting remoting
> 14/07/04 06:22:40 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://spark@slave1:38558]
> 14/07/04 06:22:40 INFO SparkEnv: Connecting to MapOutputTracker:
> akka.tcp://spark@master:45172/user/MapOutputTracker
> 14/07/04 06:22:40 INFO SparkEnv: Connecting to BlockManagerMaster:
> akka.tcp://spark@master:45172/user/BlockManagerMaster
> 14/07/04 06:22:40 INFO DiskBlockManager: Created local directory at
> /tmp/spark-local-20140704062240-6a65
> 14/07/04 06:22:40 INFO MemoryStore: MemoryStore started with capacity 6.7
> GB.
> 14/07/04 06:22:40 INFO ConnectionManager: Bound socket to port 46901 with id
> = ConnectionManagerId(slave1,46901)
> 14/07/04 06:22:40 INFO BlockManagerMaster: Trying to register BlockManager
> 14/07/04 06:22:40 INFO BlockManagerMaster: Registered BlockManager
> 14/07/04 06:22:40 INFO HttpFileServer: HTTP File server directory is
> /tmp/spark-9eba78f9-8ae9-477c-9338-7222ae6fe306
> 14/07/04 06:22:40 INFO HttpServer: Starting HTTP Server
> 14/07/04 06:22:42 INFO CoarseGrainedExecutorBackend: Got assigned task 0
> 14/07/04 06:22:42 INFO Executor: Running task ID 0
> 14/07/04 06:22:42 INFO CoarseGrainedExecutorBackend: Got assigned task 2
> 14/07/04 06:22:42 INFO Executor: Running task ID 2
> ...
>
>
>
> On Fri, Jul 4, 2014 at 5:52 AM, Xiangrui Meng  wrote:
>>
>> The feature dimension is small. You don't need a big akka.frameSize.
>> The default one (10M) should be sufficient. Did you cache the data
>> before calling LRWithSGD? -Xiangrui
>>
>> On Thu, Jul 3, 2014 at 10:02 AM, Bharath Ravi Kumar 
>> wrote:
>> > I tried another run after setting the driver memory to 8G (and
>> > spark.akka.frameSize = 500 on the executors and the driver). In
>> > addition, I
>> > also tried to reduce the amount of data that a single task processes, by
>> > increasing the number of partitions (of the labeled points) to 120
>> > (instead
>> > of 2 used earlier), and then setting max cores to 2. That made no
>> > difference
>> > since, at the end of 120 tasks, the familiar error message appeared on a
>> > slave:
>> >
>> > 
>> > 14/07/03 16:18:48 INFO CoarseGrainedExecutorBackend: Got assigned task
>> > 1436
>> > 14/07/03 16:18:48 INFO Executor: Running task ID 1436
>> > 14/07/03 16:18:53 INFO HadoopRDD: Input split:
>> > file:~//2014-05-24-02/part-r-00014:0+2215337
>> > 14/07/03 16:18:54 INFO HadoopRDD: Input split:
>> > file:~//2014-05-24-02/part-r-00014:2215337+2215338
>> > 14/07/03 16:18:54 INFO HadoopRDD: Input split:
>> > file:~//2014-05-24-02/part-r-3:0+2196429
>> > 14/07/03 16:18:54 INFO HadoopRDD: Input split:
>> > file:~//2014-05-24-02/part-r-3:2196429+2196430
>> > 14/07/03 16:18:54 INFO HadoopRDD: Input split:
>> > file:~//2014-05-24-02/part-r-00010:0+2186751
>> > 14/07/03 16:18:54 INFO HadoopRDD: Input split:
>> > file:~//2014-05-24-02/part-r-00010:2186751+2186751
>> > 14/07/03 16:18:54 INFO Executor: Serialized size of result for 1436 is
>> > 5958822
>> > 14/07/03 16:18:54 INFO Executor: Sending result for 1436 directly to
>> > driver
>> > 14/07/03 16:18:54 INFO Executor: Finished task ID 1436
>> > 14/07/03 16:18:54 INFO CoarseGrainedExecutorBackend: Got assigned task
>> > 1438
>> > 14/07/03 16:18:54 INFO Executor: Running task ID 1438
>> > 14/07/03 16:19:00 INFO HadoopRDD: Input split:
>> > file:~//2014-05-24-02/part-r-4:0+2209615
>> > 14/07/03 16:19:00 INFO HadoopRDD: Input split:
>> > file:~//2014-05-24-02/part-r-4:2209615+2209616
>> > 14/07/03 16:19:00 INFO HadoopRDD: Input split:
>> > file:~//2014-05-24-02/part-r-00011:0+2202240
>> > 14/07/03 16:19:00 INFO HadoopRDD: Input split:
>> > file:~//2014-05-24-02/part-r-00011:2202240+2202240
>> > 14/07/03 16:19:00 INFO HadoopRDD: Input split:
>> > file:~//2014-05-24-02/part-r-9:0+2194423
>> > 14/07/03 16:19:00 INFO HadoopRDD: Input split:
>> > file:~//2014-05-24-02/part-r-9:2194423+2194424
>> > 14/07/03 16:19:00 INFO Executor: Serialized size of result for 1438 is
>> > 5958822
>> > 14/07/03 16:19:00 INFO Executor: Sending result for 1438 directly to
>> > driver
>> > 14/07/03 16:19:00 INFO Executor: Finished task ID

Re: Execution stalls in LogisticRegressionWithSGD

2014-07-03 Thread Xiangrui Meng
couple of
> times and appeared on the executors list again before they too died and the
> job failed.
> So it appears that no matter what the task input-result size, the execution
> fails at the end of the stage corresponding to GradientDescent.aggregate
> (and the preceding count() in GradientDescent goes through fine). Let me
> know if you need any additional information.
>
>
> On Thu, Jul 3, 2014 at 12:27 PM, Xiangrui Meng  wrote:
>>
>> Could you check the driver memory in the executor tab of the Spark UI
>> when the job is running? If it is too small, please set
>> --driver-memory with spark-submit, e.g. 10g. Could you also attach the
>> master log under spark/logs as well? -Xiangrui
>>
>> On Wed, Jul 2, 2014 at 9:34 AM, Bharath Ravi Kumar 
>> wrote:
>> > Hi Xiangrui,
>> >
>> > The issue with aggergating/counting over large feature vectors (as part
>> > of
>> > LogisticRegressionWithSGD) continues to exist, but now in another form:
>> > while the execution doesn't freeze (due to SPARK-1112), it now fails at
>> > the
>> > second or third gradient descent iteration consistently with an error
>> > level
>> > log message, but no stacktrace. I'm running against 1.0.1-rc1, and have
>> > tried setting spark.akka.frameSize as high as 500. When the execution
>> > fails,
>> > each of the two executors log the following message (corresponding to
>> > aggregate at GradientDescent.scala:178) :
>> >
>> > 14/07/02 14:09:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
>> > maxBytesInFlight: 50331648, targetRequestSize: 10066329
>> > 14/07/02 14:09:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
>> > Getting 2 non-empty blocks out of 2 blocks
>> > 14/07/02 14:09:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
>> > Started 1 remote fetches in 0 ms
>> > 14/07/02 14:09:11 INFO Executor: Serialized size of result for 737 is
>> > 5959086
>> > 14/07/02 14:09:11 INFO Executor: Sending result for 737 directly to
>> > driver
>> > 14/07/02 14:09:11 INFO Executor: Finished task ID 737
>> > 14/07/02 14:09:18 ERROR CoarseGrainedExecutorBackend: Driver
>> > Disassociated
>> > [akka.tcp://sparkExecutor@(slave1,slave2):51941] ->
>> > [akka.tcp://spark@master:59487] disassociated! Shutting down.
>> >
>> >
>> > There is no separate stacktrace on the driver side.
>> >
>> > Each input record is of the form p1, p2, (p1,p2) where p1, p2 & (p1,p2)
>> > are
>> > categorical features with large cardinality, and X is the double label
>> > with
>> > a continuous value. The categorical variables are converted to binary
>> > variables which results in a feature vector of size 741092 (composed of
>> > all
>> > unique categories across p1, p2 and (p1,p2)). Thus, the labeled point
>> > for
>> > input record is a sparse vector of size 741092 with only 3 variables set
>> > in
>> > the record. The total number of records is 683233 after aggregating the
>> > input data on (p1, p2). When attempting to train on the unaggregated
>> > records
>> > (1337907 in number spread across 455 files), the execution fails at
>> > count,
>> > GradientDescent.scala:161 with the following log
>> >
>> >
>> > (Snipped lines corresponding to other input files)
>> > 14/07/02 16:02:03 INFO HadoopRDD: Input split:
>> > file:~/part-r-00012:2834590+2834590
>> > 14/07/02 16:02:03 INFO HadoopRDD: Input split:
>> > file:~/part-r-5:0+2845559
>> > 14/07/02 16:02:03 INFO HadoopRDD: Input split:
>> > file:~/part-r-5:2845559+2845560
>> > 14/07/02 16:02:03 INFO Executor: Serialized size of result for 726 is
>> > 615
>> > 14/07/02 16:02:03 INFO Executor: Sending result for 726 directly to
>> > driver
>> > 14/07/02 16:02:03 INFO Executor: Finished task ID 726
>> > 14/07/02 16:02:12 ERROR CoarseGrainedExecutorBackend: Driver
>> > Disassociated
>> > [akka.tcp://sparkExecutor@slave1:48423] ->
>> > [akka.tcp://spark@master:55792]
>> > disassociated! Shutting down.
>> >
>> > A count() attempted on the input RDD before beginning training has the
>> > following metrics:
>> >
>> >
>> > MetricMin25thMedian75th Max
>> >
>> > Result
>> > serialization
>> > time0 ms0 ms0 ms0 ms0 ms
>> >

Re: Execution stalls in LogisticRegressionWithSGD

2014-07-02 Thread Xiangrui Meng
Could you check the driver memory in the executor tab of the Spark UI
when the job is running? If it is too small, please set
--driver-memory with spark-submit, e.g. 10g. Could you also attach the
master log under spark/logs as well? -Xiangrui

On Wed, Jul 2, 2014 at 9:34 AM, Bharath Ravi Kumar  wrote:
> Hi Xiangrui,
>
> The issue with aggergating/counting over large feature vectors (as part of
> LogisticRegressionWithSGD) continues to exist, but now in another form:
> while the execution doesn't freeze (due to SPARK-1112), it now fails at the
> second or third gradient descent iteration consistently with an error level
> log message, but no stacktrace. I'm running against 1.0.1-rc1, and have
> tried setting spark.akka.frameSize as high as 500. When the execution fails,
> each of the two executors log the following message (corresponding to
> aggregate at GradientDescent.scala:178) :
>
> 14/07/02 14:09:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
> maxBytesInFlight: 50331648, targetRequestSize: 10066329
> 14/07/02 14:09:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
> Getting 2 non-empty blocks out of 2 blocks
> 14/07/02 14:09:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
> Started 1 remote fetches in 0 ms
> 14/07/02 14:09:11 INFO Executor: Serialized size of result for 737 is
> 5959086
> 14/07/02 14:09:11 INFO Executor: Sending result for 737 directly to driver
> 14/07/02 14:09:11 INFO Executor: Finished task ID 737
> 14/07/02 14:09:18 ERROR CoarseGrainedExecutorBackend: Driver Disassociated
> [akka.tcp://sparkExecutor@(slave1,slave2):51941] ->
> [akka.tcp://spark@master:59487] disassociated! Shutting down.
>
>
> There is no separate stacktrace on the driver side.
>
> Each input record is of the form p1, p2, (p1,p2) where p1, p2 & (p1,p2) are
> categorical features with large cardinality, and X is the double label with
> a continuous value. The categorical variables are converted to binary
> variables which results in a feature vector of size 741092 (composed of all
> unique categories across p1, p2 and (p1,p2)). Thus, the labeled point for
> input record is a sparse vector of size 741092 with only 3 variables set in
> the record. The total number of records is 683233 after aggregating the
> input data on (p1, p2). When attempting to train on the unaggregated records
> (1337907 in number spread across 455 files), the execution fails at count,
> GradientDescent.scala:161 with the following log
>
>
> (Snipped lines corresponding to other input files)
> 14/07/02 16:02:03 INFO HadoopRDD: Input split:
> file:~/part-r-00012:2834590+2834590
> 14/07/02 16:02:03 INFO HadoopRDD: Input split: file:~/part-r-5:0+2845559
> 14/07/02 16:02:03 INFO HadoopRDD: Input split:
> file:~/part-r-5:2845559+2845560
> 14/07/02 16:02:03 INFO Executor: Serialized size of result for 726 is 615
> 14/07/02 16:02:03 INFO Executor: Sending result for 726 directly to driver
> 14/07/02 16:02:03 INFO Executor: Finished task ID 726
> 14/07/02 16:02:12 ERROR CoarseGrainedExecutorBackend: Driver Disassociated
> [akka.tcp://sparkExecutor@slave1:48423] -> [akka.tcp://spark@master:55792]
> disassociated! Shutting down.
>
> A count() attempted on the input RDD before beginning training has the
> following metrics:
>
>
> MetricMin25thMedian75th Max
>
> Result
> serialization
> time0 ms0 ms0 ms0 ms0 ms
>
> Duration33 s33 s35 s35 s35 s
>
> Time spent
> fetching task
> results0 ms0 ms0 ms0 ms0 ms
>
> Scheduler
> delay0.1 s0.1 s0.3 s0.3 s0.3 s
>
> Aggregated Metrics by Executor
>
> ID Address Task Time Total Failed Succeeded Shuffle Read
> Shuffle Write Shuf Spill (Mem) Shuf Spill (Disk)
> 0 CANNOT FIND ADDRESS 34 s 1 0 1 0.0 B
> 0.0 B 0.0 B 0.0 B
> 1 CANNOT FIND ADDRESS 36 s 1 0 1 0.0 B
> 0.0 B 0.0 B 0.0 B
>
> Tasks
>
> Task IndexTask IDStatusLocality LevelExecutorLaunch Time
> DurationGC TimeResult Ser TimeErrors
> 0 726 SUCCESS PROCESS_LOCAL slave1 2014/07/02
> 16:01:28 35 s 0.1 s
> 1 727 SUCCESS     PROCESS_LOCAL slave2 2014/07/02
> 16:01:28 33 s 99 ms
>
> Any pointers / diagnosis please?
>
>
>
>
> On Thu, Jun 19, 2014 at 10:03 AM, Bharath Ravi Kumar 
> wrote:
>>
>> Thanks. I'll await the fix to re-run my test.
>>
>>
>> On Thu, Jun 19, 2014 at 8:28 AM, Xiangrui Meng  wrote:
>>>
>>&

Re: SparkKMeans.scala from examples will show: NoClassDefFoundError: breeze/linalg/Vector

2014-07-02 Thread Xiangrui Meng
The SparkKMeans is just an example code showing a barebone
implementation of k-means. To run k-means on big datasets, please use
the KMeans implemented in MLlib directly:
http://spark.apache.org/docs/latest/mllib-clustering.html

-Xiangrui

On Wed, Jul 2, 2014 at 9:50 AM, Wanda Hawk  wrote:
> I can run it now with the suggested method. However, I have encountered a
> new problem that I have not faced before (sent another email with that one
> but here it goes again ...)
>
> I ran SparkKMeans with a big file (~ 7 GB of data) for one iteration with
> spark-0.8.0 with this line in bash.rc " export _JAVA_OPTIONS="-Xmx15g
> -Xms15g -verbose:gc -XX:+PrintGCTimeStamps -XX:+PrintGCDetails" ". It
> finished in a decent time, ~50 seconds, and I had only a few "Full GC"
> messages from Java. (a max of 4-5)
>
> Now, using the same export in bash.rc but with spark-1.0.0  (and running it
> with spark-submit) the first loop never finishes and  I get a lot of:
> "18.537: [GC (Allocation Failure) --[PSYoungGen:
> 11796992K->11796992K(13762560K)] 11797442K->11797450K(13763072K), 2.8420311
> secs] [Times: user=5.81 sys=2.12, real=2.85 secs]
> "
> or
>
>  "31.867: [Full GC (Ergonomics) [PSYoungGen: 11796992K->3177967K(13762560K)]
> [ParOldGen: 505K->505K(512K)] 11797497K->3178473K(13763072K), [Metaspace:
> 37646K->37646K(1081344K)], 2.3053283 secs] [Times: user=37.74 sys=0.11,
> real=2.31 secs]"
>
> I tried passing different parameters for the JVM through spark-submit, but
> the results are the same
> This happens with java 1.7 and also with java 1.8.
> I do not know what the "Ergonomics" stands for ...
>
> How can I get a decent performance from spark-1.0.0 considering that
> spark-0.8.0 did not need any fine tuning on the gargage collection method
> (the default worked well) ?
>
> Thank you
>
>
> On Wednesday, July 2, 2014 4:45 PM, Yana Kadiyska 
> wrote:
>
>
> The scripts that Xiangrui mentions set up the classpath...Can you run
> ./run-example for the provided example sucessfully?
>
> What you can try is set SPARK_PRINT_LAUNCH_COMMAND=1 and then call
> run-example -- that will show you the exact java command used to run
> the example at the start of execution. Assuming you can run examples
> succesfully, you should be able to just copy that and add your jar to
> the front of the classpath. If that works you can start removing extra
> jars (run-examples put all the example jars in the cp, which you won't
> need)
>
> As you said the error you see is indicative of the class not being
> available/seen at runtime but it's hard to tell why.
>
> On Wed, Jul 2, 2014 at 2:13 AM, Wanda Hawk  wrote:
>> I want to make some minor modifications in the SparkMeans.scala so running
>> the basic example won't do.
>> I have also packed my code under a "jar" file with sbt. It completes
>> successfully but when I try to run it : "java -jar myjar.jar" I get the
>> same
>> error:
>> "Exception in thread "main" java.lang.NoClassDefFoundError:
>> breeze/linalg/Vector
>>at java.lang.Class.getDeclaredMethods0(Native Method)
>>at java.lang.Class.privateGetDeclaredMethods(Class.java:2531)
>>at java.lang.Class.getMethod0(Class.java:2774)
>>at java.lang.Class.getMethod(Class.java:1663)
>>at
>> sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494)
>>at
>> sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486)
>> "
>>
>> If "scalac -d classes/ SparkKMeans.scala" can't see my classpath, why does
>> it succeeds in compiling and does not give the same error ?
>> The error itself "NoClassDefFoundError" means that the files are available
>> at compile time, but for some reason I cannot figure out they are not
>> available at run time. Does anyone know why ?
>>
>> Thank you
>>
>>
>> On Tuesday, July 1, 2014 7:03 PM, Xiangrui Meng  wrote:
>>
>>
>> You can use either bin/run-example or bin/spark-summit to run example
>> code. "scalac -d classes/ SparkKMeans.scala" doesn't recognize Spark
>> classpath. There are examples in the official doc:
>> http://spark.apache.org/docs/latest/quick-start.html#where-to-go-from-here
>> -Xiangrui
>>
>> On Tue, Jul 1, 2014 at 4:39 AM, Wanda Hawk  wrote:
>>> Hello,
>>>
>>> I have installed spark-1.0.0 with scala2.10.3. I have built spark with
>>> "sbt/sbt assembly" and added
>>>
>>>
>>> "/home/wanda/spark-1.0.0/assembly/target/s

Re: MLLib : Math on Vector and Matrix

2014-07-02 Thread Xiangrui Meng
Hi Dmitriy,

It is sweet to have the bindings, but it is very easy to downgrade the
performance with them. The BLAS/LAPACK APIs have been there for more
than 20 years and they are still the top choice for high-performance
linear algebra. I'm thinking about whether it is possible to make the
evaluation lazy in bindings. For example,

y += a * x

can be translated to an AXPY call instead of creating a temporary
vector for a*x. There were some work in C++ but none achieved good
performance. I'm not sure whether this is a good direction to explore.

Best,
Xiangrui


Re: MLLib : Math on Vector and Matrix

2014-07-02 Thread Xiangrui Meng
Hi Thunder,

Please understand that both MLlib and breeze are in active
development. Before v1.0, we used jblas but in the public APIs we only
exposed Array[Double]. In v1.0, we introduced Vector that supports
both dense and sparse data and switched the backend to
breeze/netlib-java (except ALS). We only used few breeze methods in
our implementation and we benchmarked them one by one. It was hard to
foresee problems caused by including breeze at that time, for example,
https://issues.apache.org/jira/browse/SPARK-1520. Being conservative
in v1.0 was not a bad choice. We should benchmark breeze v0.8.1 for
v1.1 and consider make toBreeze a developer API. For now, if you are
migrating code from v0.9, you can use `Vector.toArray` to get the
value array. Sorry for the inconvenience!

Best,
Xiangrui

On Wed, Jul 2, 2014 at 2:42 PM, Dmitriy Lyubimov  wrote:
> in my humble opinion Spark should've supported linalg a-la [1] before it
> even started dumping methodologies into mllib.
>
> [1] http://mahout.apache.org/users/sparkbindings/home.html
>
>
> On Wed, Jul 2, 2014 at 2:16 PM, Thunder Stumpges
>  wrote:
>>
>> Thanks. I always hate having to do stuff like this. It seems like they
>> went a bit overboard with all the "private[mllib]" declarations... possibly
>> all in the name of "thou shalt not change your public API". If you don't
>> make your public API usable, we end up having to work around it anyway...
>>
>> Oh well.
>>
>> Thunder
>>
>>
>>
>> On Wed, Jul 2, 2014 at 2:05 PM, Koert Kuipers  wrote:
>>>
>>> i did the second option: re-implemented .toBreeze as .breeze using pimp
>>> classes
>>>
>>>
>>> On Wed, Jul 2, 2014 at 5:00 PM, Thunder Stumpges
>>>  wrote:

 I am upgrading from Spark 0.9.0 to 1.0 and I had a pretty good amount of
 code working with internals of MLLib. One of the big changes was the move
 from the old jblas.Matrix to the Vector/Matrix classes included in MLLib.

 However I don't see how we're supposed to use them for ANYTHING other
 than a container for passing data to the included APIs... how do we do any
 math on them? Looking at the internal code, there are quite a number of
 private[mllib] declarations including access to the Breeze representations
 of the classes.

 Was there a good reason this was not exposed? I could see maybe not
 wanting to expose the 'toBreeze' function which would tie it to the breeze
 implementation, however it would be nice to have the various mathematics
 wrapped at least.

 Right now I see no way to code any vector/matrix math without moving my
 code namespaces into org.apache.spark.mllib or duplicating the code in
 'toBreeze' in my own util functions. Not very appealing.

 What are others doing?
 thanks,
 Thunder

>>>
>>
>


Re: One question about RDD.zip function when trying Naive Bayes

2014-07-02 Thread Xiangrui Meng
This is due to a bug in sampling, which was fixed in 1.0.1 and latest
master. See https://github.com/apache/spark/pull/1234 . -Xiangrui

On Wed, Jul 2, 2014 at 8:23 PM, x  wrote:
> Hello,
>
> I a newbie to Spark MLlib and ran into a curious case when following the
> instruction at the page below.
>
> http://spark.apache.org/docs/latest/mllib-naive-bayes.html
>
> I ran a test program on my local machine using some data.
>
> val spConfig = (new
> SparkConf).setMaster("local").setAppName("SparkNaiveBayes")
> val sc = new SparkContext(spConfig)
>
> The test data was as follows and there were three lableled categories I
> wanted to predict.
>
>  1  LabeledPoint(0.0, [4.9,3.0,1.4,0.2])
>  2  LabeledPoint(0.0, [4.6,3.4,1.4,0.3])
>  3  LabeledPoint(0.0, [5.7,4.4,1.5,0.4])
>  4  LabeledPoint(0.0, [5.2,3.4,1.4,0.2])
>  5  LabeledPoint(0.0, [4.7,3.2,1.6,0.2])
>  6  LabeledPoint(0.0, [4.8,3.1,1.6,0.2])
>  7  LabeledPoint(0.0, [5.1,3.8,1.9,0.4])
>  8  LabeledPoint(0.0, [4.8,3.0,1.4,0.3])
>  9  LabeledPoint(0.0, [5.0,3.3,1.4,0.2])
> 10  LabeledPoint(1.0, [6.6,2.9,4.6,1.3])
> 11  LabeledPoint(1.0, [5.2,2.7,3.9,1.4])
> 12  LabeledPoint(1.0, [5.6,2.5,3.9,1.1])
> 13  LabeledPoint(1.0, [6.4,2.9,4.3,1.3])
> 14  LabeledPoint(1.0, [6.6,3.0,4.4,1.4])
> 15  LabeledPoint(1.0, [6.0,2.7,5.1,1.6])
> 16  LabeledPoint(1.0, [5.5,2.6,4.4,1.2])
> 17  LabeledPoint(1.0, [5.8,2.6,4.0,1.2])
> 18  LabeledPoint(1.0, [5.7,2.9,4.2,1.3])
> 19  LabeledPoint(1.0, [5.7,2.8,4.1,1.3])
> 20  LabeledPoint(2.0, [6.3,2.9,5.6,1.8])
> 21  LabeledPoint(2.0, [6.5,3.0,5.8,2.2])
> 22  LabeledPoint(2.0, [6.5,3.0,5.5,1.8])
> 23  LabeledPoint(2.0, [6.7,3.3,5.7,2.1])
> 24  LabeledPoint(2.0, [7.4,2.8,6.1,1.9])
> 25  LabeledPoint(2.0, [6.3,3.4,5.6,2.4])
> 26  LabeledPoint(2.0, [6.0,3.0,4.8,1.8])
> 27  LabeledPoint(2.0, [6.8,3.2,5.9,2.3])
>
> The predicted result via NaiveBayes is below. Comparing to test data, only
> two predicted results(#11 and #15) were different.
>
>  1  0.0
>  2  0.0
>  3  0.0
>  4  0.0
>  5  0.0
>  6  0.0
>  7  0.0
>  8  0.0
>  9  0.0
> 10  1.0
> 11  2.0
> 12  1.0
> 13  1.0
> 14  1.0
> 15  2.0
> 16  1.0
> 17  1.0
> 18  1.0
> 19  1.0
> 20  2.0
> 21  2.0
> 22  2.0
> 23  2.0
> 24  2.0
> 25  2.0
> 26  2.0
> 27  2.0
>
> After grouping test RDD and predicted RDD via zip I got this.
>
>  1  (0.0,0.0)
>  2  (0.0,0.0)
>  3  (0.0,0.0)
>  4  (0.0,0.0)
>  5  (0.0,0.0)
>  6  (0.0,0.0)
>  7  (0.0,0.0)
>  8  (0.0,0.0)
>  9  (0.0,1.0)
> 10  (0.0,1.0)
> 11  (0.0,1.0)
> 12  (1.0,1.0)
> 13  (1.0,1.0)
> 14  (2.0,1.0)
> 15  (1.0,1.0)
> 16  (1.0,2.0)
> 17  (1.0,2.0)
> 18  (1.0,2.0)
> 19  (1.0,2.0)
> 20  (2.0,2.0)
> 21  (2.0,2.0)
> 22  (2.0,2.0)
> 23  (2.0,2.0)
> 24  (2.0,2.0)
> 25  (2.0,2.0)
>
> I expected there were 27 pairs but I saw two results were lost.
> Could someone please point out what I missed something here?
>
> Regards,
> xj


Re: why is toBreeze private everywhere in mllib?

2014-07-01 Thread Xiangrui Meng
We were not ready to expose it as a public API in v1.0. Both breeze
and MLlib are in rapid development. It would be possible to expose it
as a developer API in v1.1. For now, it should be easy to define a
toBreeze method in your own project. -Xiangrui

On Tue, Jul 1, 2014 at 12:17 PM, Koert Kuipers  wrote:
> its kind of handy to be able to convert stuff to breeze... is there some
> other way i am supposed to access that functionality?


Re: Questions about disk IOs

2014-07-01 Thread Xiangrui Meng
Try to reduce number of partitions to match the number of cores. We
will add treeAggregate to reduce the communication cost.

PR: https://github.com/apache/spark/pull/1110

-Xiangrui

On Tue, Jul 1, 2014 at 12:55 AM, Charles Li  wrote:
> Hi Spark,
>
> I am running LBFGS on our user data. The data size with Kryo serialisation is 
> about 210G. The weight size is around 1,300,000. I am quite confused that the 
> performance is very close whether the data is cached or not.
>
> The program is simple:
> points = sc.hadoopFIle(int, SequenceFileInputFormat.class …..)
> points.persist(StorageLevel.Memory_AND_DISK_SER()) // comment it if not cached
> gradient = new LogisticGrandient();
> updater = new SquaredL2Updater();
> initWeight = Vectors.sparse(size, new int[]{}, new double[]{})
> result = LBFGS.runLBFGS(points.rdd(), grandaunt, updater, numCorrections, 
> convergeTol, maxIter, regParam, initWeight);
>
> I have 13 machines with 16 cpus, 48G RAM each. Spark is running on its 
> cluster mode. Below are some arguments I am using:
> —executor-memory 10G
> —num-executors 50
> —executor-cores 2
>
> Storage Using:
> When caching:
> Cached Partitions 951
> Fraction Cached 100%
> Size in Memory 215.7GB
> Size in Tachyon 0.0B
> Size on Disk 1029.7MB
>
> The time cost by every aggregate is around 5 minutes with cache enabled. Lots 
> of disk IOs can be seen on the hadoop node. I have the same result with cache 
> disabled.
>
> Should data points caching improve the performance? Should caching decrease 
> the disk IO?
>
> Thanks in advance.


Re: SparkKMeans.scala from examples will show: NoClassDefFoundError: breeze/linalg/Vector

2014-07-01 Thread Xiangrui Meng
You can use either bin/run-example or bin/spark-summit to run example
code. "scalac -d classes/ SparkKMeans.scala" doesn't recognize Spark
classpath. There are examples in the official doc:
http://spark.apache.org/docs/latest/quick-start.html#where-to-go-from-here
-Xiangrui

On Tue, Jul 1, 2014 at 4:39 AM, Wanda Hawk  wrote:
> Hello,
>
> I have installed spark-1.0.0 with scala2.10.3. I have built spark with
> "sbt/sbt assembly" and added
> "/home/wanda/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar"
> to my CLASSPATH variable.
> Then I went here
> "../spark-1.0.0/examples/src/main/scala/org/apache/spark/examples" created a
> new directory "classes" and compiled SparkKMeans.scala with "scalac -d
> classes/ SparkKMeans.scala"
> Then I navigated to "classes" (I commented this line in the scala file :
> package org.apache.spark.examples ) and tried to run it with "java -cp .
> SparkKMeans" and I get the following error:
> "Exception in thread "main" java.lang.NoClassDefFoundError:
> breeze/linalg/Vector
> at java.lang.Class.getDeclaredMethods0(Native Method)
> at java.lang.Class.privateGetDeclaredMethods(Class.java:2531)
> at java.lang.Class.getMethod0(Class.java:2774)
> at java.lang.Class.getMethod(Class.java:1663)
> at
> sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494)
> at
> sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486)
> Caused by: java.lang.ClassNotFoundException: breeze.linalg.Vector
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> ... 6 more
> "
> The jar under
> "/home/wanda/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar"
> contains the breeze/linalg/Vector* path, I even tried to unpack it and put
> it in CLASSPATH to it does not seem to pick it up
>
>
> I am currently running java 1.8
> "java version "1.8.0_05"
> Java(TM) SE Runtime Environment (build 1.8.0_05-b13)
> Java HotSpot(TM) 64-Bit Server VM (build 25.5-b02, mixed mode)"
>
> What I am doing wrong ?
>


Re: Spark 1.0 and Logistic Regression Python Example

2014-06-30 Thread Xiangrui Meng
You were using an old version of numpy, 1.4? I think this is fixed in
the latest master. Try to replace vec.dot(target) by numpy.dot(vec,
target), or use the latest master. -Xiangrui

On Mon, Jun 30, 2014 at 2:04 PM, Sam Jacobs  wrote:
> Hi,
>
>
> I modified the example code for logistic regression to compute the error in
> classification. Please see below. However the code is failing when it makes
> a call to:
>
>
> labelsAndPreds.filter(lambda (v, p): v != p).count()
>
>
> with the error message (something related to numpy or dot product):
>
>
> File "/opt/spark-1.0.0-bin-hadoop2/python/pyspark/mllib/classification.py",
> line 65, in predict
>
> margin = _dot(x, self._coeff) + self._intercept
>
>   File "/opt/spark-1.0.0-bin-hadoop2/python/pyspark/mllib/_common.py", line
> 443, in _dot
>
> return vec.dot(target)
>
> AttributeError: 'numpy.ndarray' object has no attribute 'dot'
>
>
> FYI, I am running the code using spark-submit i.e.
>
>
> ./bin/spark-submit examples/src/main/python/mllib/logistic_regression2.py
>
>
>
> The code is posted below if it will be useful in any way:
>
>
> from math import exp
>
> import sys
> import time
>
> from pyspark import SparkContext
>
> from pyspark.mllib.classification import LogisticRegressionWithSGD
> from pyspark.mllib.regression import LabeledPoint
> from numpy import array
>
>
> # Load and parse the data
> def parsePoint(line):
> values = [float(x) for x in line.split(',')]
> if values[0] == -1:   # Convert -1 labels to 0 for MLlib
> values[0] = 0
> return LabeledPoint(values[0], values[1:])
>
> sc = SparkContext(appName="PythonLR")
> # start timing
> start = time.time()
> #start = time.clock()
>
> data = sc.textFile("sWAMSpark_train.csv")
> parsedData = data.map(parsePoint)
>
> # Build the model
> model = LogisticRegressionWithSGD.train(parsedData)
>
> #load test data
>
> testdata = sc.textFile("sWSpark_test.csv")
> parsedTestData = testdata.map(parsePoint)
>
> # Evaluating the model on test data
> labelsAndPreds = parsedTestData.map(lambda p: (p.label,
> model.predict(p.features)))
> trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() /
> float(parsedData.count())
> print("Training Error = " + str(trainErr))
> end = time.time()
> print("Time is = " + str(end - start))
>
>
>
>
>
>
>


Re: TaskNotSerializable when invoking KMeans.run

2014-06-30 Thread Xiangrui Meng
Could you post the code snippet and the error stack trace? -Xiangrui

On Mon, Jun 30, 2014 at 7:03 AM, Daniel Micol  wrote:
> Hello,
>
> I’m trying to use KMeans with MLLib but am getting a TaskNotSerializable
> error. I’m using Spark 0.9.1 and invoking the KMeans.run method with k = 2
> and numPartitions = 200. Has anyone seen this error before and know what
> could be the reason for this?
>
> Thanks,
>
> Daniel


Re: Improving Spark multithreaded performance?

2014-06-27 Thread Xiangrui Meng
The RDD is cached in only one or two workers. All other executors need
to fetch its content via network. Since the dataset is not huge, could
you try this?

val features: Array[Vector] = ...
val featuresBc = sc.broadcast(features)
 // parallel loops
 val labels: Array[Double] =
 val rdd = sc.parallelize(0 until 1, 1).flatMap(i =>
featuresBc.value.view.zip(labels))
 val model = SVMWithSGD.train(rdd)
 models(i) = model

Using BT broadcast factory would improve the performance of broadcasting.

Best,
Xiangrui

On Fri, Jun 27, 2014 at 3:06 PM, Kyle Ellrott  wrote:
> 1) I'm using the static SVMWithSGD.train, with no options.
> 2) I have about 20,000 features (~5000 samples) that are being attached and
> trained against 14,000 different sets of labels (ie I'll be doing 14,000
> different training runs against the same sets of features trying to figure
> out which labels can be learned), and I would also like to do cross fold
> validation.
>
> The driver doesn't seem to be using too much memory. I left it as -Xmx8g and
> it never complained.
>
> Kyle
>
>
>
> On Fri, Jun 27, 2014 at 1:18 PM, Xiangrui Meng  wrote:
>>
>> Hi Kyle,
>>
>> A few questions:
>>
>> 1) Did you use `setIntercept(true)`?
>> 2) How many features?
>>
>> I'm a little worried about driver's load because the final aggregation
>> and weights update happen on the driver. Did you check driver's memory
>> usage as well?
>>
>> Best,
>> Xiangrui
>>
>> On Fri, Jun 27, 2014 at 8:10 AM, Kyle Ellrott 
>> wrote:
>> > As far as I can tell there are is no data to broadcast (unless there is
>> > something internal to mllib that needs to be broadcast) I've coalesced
>> > the
>> > input RDDs to keep the number of partitions limited. When running, I've
>> > tried to get up to 500 concurrent stages, and I've coalesced the RDDs
>> > down
>> > to 2 partitions, so about 1000 tasks.
>> > Despite having over 500 threads in the threadpool working on mllib
>> > tasks,
>> > the total CPU usage never really goes above 150%.
>> > I've tried increasing 'spark.akka.threads' but that doesn't seem to do
>> > anything.
>> >
>> > My one thought would be that maybe because I'm using MLUtils.kFold to
>> > generate the RDDs is that because I have so many tasks working off RDDs
>> > that
>> > are permutations of original RDDs that maybe that is creating some sort
>> > of
>> > dependency bottleneck.
>> >
>> > Kyle
>> >
>> >
>> > On Thu, Jun 26, 2014 at 6:35 PM, Aaron Davidson 
>> > wrote:
>> >>
>> >> I don't have specific solutions for you, but the general things to try
>> >> are:
>> >>
>> >> - Decrease task size by broadcasting any non-trivial objects.
>> >> - Increase duration of tasks by making them less fine-grained.
>> >>
>> >> How many tasks are you sending? I've seen in the past something like 25
>> >> seconds for ~10k total medium-sized tasks.
>> >>
>> >>
>> >> On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott 
>> >> wrote:
>> >>>
>> >>> I'm working to set up a calculation that involves calling mllib's
>> >>> SVMWithSGD.train several thousand times on different permutations of
>> >>> the
>> >>> data. I'm trying to run the separate jobs using a threadpool to
>> >>> dispatch the
>> >>> different requests to a spark context connected a Mesos's cluster,
>> >>> using
>> >>> course scheduling, and a max of 2000 cores on Spark 1.0.
>> >>> Total utilization of the system is terrible. Most of the 'aggregate at
>> >>> GradientDescent.scala:178' stages(where mllib spends most of its time)
>> >>> take
>> >>> about 3 seconds, but have ~25 seconds of scheduler delay time.
>> >>> What kind of things can I do to improve this?
>> >>>
>> >>> Kyle
>> >>
>> >>
>> >
>
>


Re: TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

2014-06-27 Thread Xiangrui Meng
Try to use --executor-memory 12g with spark-summit. Or you can set it
in conf/spark-defaults.properties and rsync it to all workers and then
restart. -Xiangrui

On Fri, Jun 27, 2014 at 1:05 PM, Peng Cheng  wrote:
> I give up, communication must be blocked by the complex EC2 network topology
> (though the error information indeed need some improvement). It doesn't make
> sense to run a client thousands miles away to communicate frequently with
> workers. I have moved everything to EC2 now.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/TaskSchedulerImpl-Initial-job-has-not-accepted-any-resources-check-your-cluster-UI-to-ensure-that-woy-tp8247p8444.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Improving Spark multithreaded performance?

2014-06-27 Thread Xiangrui Meng
Hi Kyle,

A few questions:

1) Did you use `setIntercept(true)`?
2) How many features?

I'm a little worried about driver's load because the final aggregation
and weights update happen on the driver. Did you check driver's memory
usage as well?

Best,
Xiangrui

On Fri, Jun 27, 2014 at 8:10 AM, Kyle Ellrott  wrote:
> As far as I can tell there are is no data to broadcast (unless there is
> something internal to mllib that needs to be broadcast) I've coalesced the
> input RDDs to keep the number of partitions limited. When running, I've
> tried to get up to 500 concurrent stages, and I've coalesced the RDDs down
> to 2 partitions, so about 1000 tasks.
> Despite having over 500 threads in the threadpool working on mllib tasks,
> the total CPU usage never really goes above 150%.
> I've tried increasing 'spark.akka.threads' but that doesn't seem to do
> anything.
>
> My one thought would be that maybe because I'm using MLUtils.kFold to
> generate the RDDs is that because I have so many tasks working off RDDs that
> are permutations of original RDDs that maybe that is creating some sort of
> dependency bottleneck.
>
> Kyle
>
>
> On Thu, Jun 26, 2014 at 6:35 PM, Aaron Davidson  wrote:
>>
>> I don't have specific solutions for you, but the general things to try
>> are:
>>
>> - Decrease task size by broadcasting any non-trivial objects.
>> - Increase duration of tasks by making them less fine-grained.
>>
>> How many tasks are you sending? I've seen in the past something like 25
>> seconds for ~10k total medium-sized tasks.
>>
>>
>> On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott 
>> wrote:
>>>
>>> I'm working to set up a calculation that involves calling mllib's
>>> SVMWithSGD.train several thousand times on different permutations of the
>>> data. I'm trying to run the separate jobs using a threadpool to dispatch the
>>> different requests to a spark context connected a Mesos's cluster, using
>>> course scheduling, and a max of 2000 cores on Spark 1.0.
>>> Total utilization of the system is terrible. Most of the 'aggregate at
>>> GradientDescent.scala:178' stages(where mllib spends most of its time) take
>>> about 3 seconds, but have ~25 seconds of scheduler delay time.
>>> What kind of things can I do to improve this?
>>>
>>> Kyle
>>
>>
>


Re: Performance problems on SQL JOIN

2014-06-20 Thread Xiangrui Meng
Your data source is S3 and data is used twice. m1.large does not have very good 
network performance. Please try file.count() and see how fast it goes. -Xiangrui

> On Jun 20, 2014, at 8:16 AM, mathias  wrote:
> 
> Hi there,
> 
> We're trying out Spark and are experiencing some performance issues using
> Spark SQL.
> Anyone who can tell us if our results are normal?
> 
> We are using the Amazon EC2 scripts to create a cluster with 3
> workers/executors (m1.large).
> Tried both spark 1.0.0 as well as the git master; the Scala as well as the
> Python shells.
> 
> Running the following code takes about 5 minutes, which seems a long time
> for this query.
> 
> val file = sc.textFile("s3n:// ...  .csv");
> val data = file.map(x => x.split('|')); // 300k rows
> 
> case class BookingInfo(num_rooms: String, hotelId: String, toDate: String,
> ...);
> val rooms2 = data.filter(x => x(0) == "2").map(x => BookingInfo(x(0), x(1),
> ... , x(9))); // 50k rows
> val rooms3 = data.filter(x => x(0) == "3").map(x => BookingInfo(x(0), x(1),
> ... , x(9))); // 30k rows
> 
> rooms2.registerAsTable("rooms2");
> cacheTable("rooms2");
> rooms3.registerAsTable("rooms3");
> cacheTable("rooms3");
> 
> sql("SELECT * FROM rooms2 LEFT JOIN rooms3 ON rooms2.hotelId =
> rooms3.hotelId AND rooms2.toDate = rooms3.toDate").count();
> 
> 
> Are we doing something wrong here?
> Thanks!
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Performance-problems-on-SQL-JOIN-tp8001.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Anything like grid search available for mlbase?

2014-06-20 Thread Xiangrui Meng
This is a planned feature for v1.1. I'm going to work on it after v1.0.1 
release. -Xiangrui

> On Jun 20, 2014, at 6:46 AM, Charles Earl  wrote:
> 
> Looking for something like scikit's grid search module.
> C


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-19 Thread Xiangrui Meng
It is because the frame size is not set correctly in executor backend. see 
spark-1112 . We are going to fix it in v1.0.1 . Did you try the treeAggregate?

> On Jun 19, 2014, at 2:01 AM, Makoto Yui  wrote:
> 
> Xiangrui and Debasish,
> 
> (2014/06/18 6:33), Debasish Das wrote:
>> I did run pretty big sparse dataset (20M rows, 3M sparse features) and I
>> got 100 iterations of SGD running in 200 seconds...10 executors each
>> with 16 GB memory...
> 
> I could figure out what the problem is. "spark.akka.frameSize" was too large. 
> By setting spark.akka.frameSize=10, it worked for the news20 dataset.
> 
> The execution was slow for more large KDD cup 2012, Track 2 dataset (235M+ 
> records of 16.7M+ (2^24) sparse features in about 33.6GB) due to the 
> sequential aggregation of dense vectors on a single driver node.
> 
> It took about 7.6m for aggregation for an iteration.
> 
> Thanks,
> Makoto


Re: Contribution to Spark MLLib

2014-06-18 Thread Xiangrui Meng
Denis, I think it is fine to have PLSA in MLlib. But I'm not familiar
with the modification you mentioned since the paper is new. We may
need to spend more time to learn the trade-offs. Feel free to create a
JIRA for PLSA and we can move our discussion there. It would be great
if you can share your current implementation. So it is easy for
developers to join the discussion.

Jayati, it is certainly NOT mandatory. But if you want to contribute
something new, please create a JIRA first.

Best,
Xiangrui


Re: Execution stalls in LogisticRegressionWithSGD

2014-06-18 Thread Xiangrui Meng
Hi Bharath,

This is related to SPARK-1112, which we already found the root cause.
I will let you know when this is fixed.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 7:37 PM, Bharath Ravi Kumar  wrote:
> Couple more points:
> 1)The inexplicable stalling of execution with large feature sets appears
> similar to that reported with the news-20 dataset:
> http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3c53a03542.1010...@gmail.com%3E
>
> 2) The NPE trying to call mapToPair convert an RDD Integer> into a JavaPairRDD, Tuple2> is
> unrelated to mllib.
>
> Thanks,
> Bharath
>
>
>
> On Wed, Jun 18, 2014 at 7:14 AM, Bharath Ravi Kumar 
> wrote:
>>
>> Hi  Xiangrui ,
>>
>> I'm using 1.0.0.
>>
>> Thanks,
>> Bharath
>>
>> On 18-Jun-2014 1:43 am, "Xiangrui Meng"  wrote:
>>>
>>> Hi Bharath,
>>>
>>> Thanks for posting the details! Which Spark version are you using?
>>>
>>> Best,
>>> Xiangrui
>>>
>>> On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar 
>>> wrote:
>>> > Hi,
>>> >
>>> > (Apologies for the long mail, but it's necessary to provide sufficient
>>> > details considering the number of issues faced.)
>>> >
>>> > I'm running into issues testing LogisticRegressionWithSGD a two node
>>> > cluster
>>> > (each node with 24 cores and 16G available to slaves out of 24G on the
>>> > system). Here's a description of the application:
>>> >
>>> > The model is being trained based on categorical features x, y, and
>>> > (x,y).
>>> > The categorical features are mapped to binary features by converting
>>> > each
>>> > distinct value in the category enum into a binary feature by itself
>>> > (i.e
>>> > presence of that value in a record implies corresponding feature = 1,
>>> > else
>>> > feature = 0. So, there'd be as many distinct features as enum values) .
>>> > The
>>> > training vector is laid out as
>>> > [x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the
>>> > training data has only one combination (Xk,Yk) and a label appearing in
>>> > the
>>> > record. Thus, the corresponding labeledpoint sparse vector would only
>>> > have 3
>>> > values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector
>>> > (though parse) would be nearly 614000.  The number of records is about
>>> > 1.33
>>> > million. The records have been coalesced into 20 partitions across two
>>> > nodes. The input data has not been cached.
>>> > (NOTE: I do realize the records & features may seem large for a two
>>> > node
>>> > setup, but given the memory & cpu, and the fact that I'm willing to
>>> > give up
>>> > some turnaround time, I don't see why tasks should inexplicably fail)
>>> >
>>> > Additional parameters include:
>>> >
>>> > spark.executor.memory = 14G
>>> > spark.default.parallelism = 1
>>> > spark.cores.max=20
>>> > spark.storage.memoryFraction=0.8 //No cache space required
>>> > (Trying to set spark.akka.frameSize to a larger number, say, 20 didn't
>>> > help
>>> > either)
>>> >
>>> > The model training was initialized as : new
>>> > LogisticRegressionWithSGD(1,
>>> > maxIterations, 0.0, 0.05)
>>> >
>>> > However, after 4 iterations of gradient descent, the entire execution
>>> > appeared to stall inexplicably. The corresponding executor details and
>>> > details of the stalled stage (number 14) are as follows:
>>> >
>>> > MetricMin25th Median75th
>>> > Max
>>> > Result serialization time12 ms13 ms14 ms16 ms18 ms
>>> > Duration4 s4 s5 s5 s
>>> > 5 s
>>> > Time spent fetching task 0 ms0 ms0 ms0 ms0 ms
>>> > results
>>> > Scheduler delay6 s6 s6 s6 s
>>> > 12 s
>>> >
>>> >
>>> > Stage Id
>>> > 14 aggregate at GradientDescent.scala:178
>>> >
>>> > Task IndexTask IDStatusLocality Level Executor
>>> > Launch TimeDurationGC  

Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Xiangrui Meng
Makoto, please use --driver-memory 8G when you launch spark-shell. -Xiangrui

On Tue, Jun 17, 2014 at 4:49 PM, Xiangrui Meng  wrote:
> DB, Yes, reduce and aggregate are linear.
>
> Makoto, dense vectors are used to in aggregation. If you have 32
> partitions and each one sending a dense vector of size 1,354,731 to
> master. Then the driver needs 300M+. That may be the problem. Which
> deploy mode are you using, standalone or local?
>
> Debasish, there is an old PR for butterfly allreduce. However, it
> doesn't seem to be the right way to go for Spark. I just sent out the
> PR: https://github.com/apache/spark/pull/1110 . This is a WIP and it
> needs more testing before we are confident to merge it. It would be
> great if you can help test it.
>
> Best,
> Xiangrui
>
> On Tue, Jun 17, 2014 at 2:33 PM, Debasish Das  
> wrote:
>> Xiangrui,
>>
>> Could you point to the JIRA related to tree aggregate ? ...sounds like the
>> allreduce idea...
>>
>> I would definitely like to try it on our dataset...
>>
>> Makoto,
>>
>> I did run pretty big sparse dataset (20M rows, 3M sparse features) and I got
>> 100 iterations of SGD running in 200 seconds...10 executors each with 16 GB
>> memory...
>>
>> Although the best result on the same dataset came out of liblinear and
>> BFGS-L1 out of box...so I did not tune the SGD further on learning rate and
>> other heuristics...it was arnd 5% off...
>>
>> Thanks.
>> Deb
>>
>>
>>
>> On Tue, Jun 17, 2014 at 2:09 PM, DB Tsai  wrote:
>>>
>>> Hi Xiangrui,
>>>
>>> Does it mean that mapPartition and then reduce shares the same
>>> behavior as aggregate operation which is O(n)?
>>>
>>> Sincerely,
>>>
>>> DB Tsai
>>> ---
>>> My Blog: https://www.dbtsai.com
>>> LinkedIn: https://www.linkedin.com/in/dbtsai
>>>
>>>
>>> On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng  wrote:
>>> > Hi DB,
>>> >
>>> > treeReduce (treeAggregate) is a feature I'm testing now. It is a
>>> > compromise between current reduce and butterfly allReduce. The former
>>> > runs in linear time on the number of partitions, the latter introduces
>>> > too many dependencies. treeAggregate with depth = 2 should run in
>>> > O(sqrt(n)) time, where n is the number of partitions. It would be
>>> > great if someone can help test its scalability.
>>> >
>>> > Best,
>>> > Xiangrui
>>> >
>>> > On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui  wrote:
>>> >> Hi Xiangrui,
>>> >>
>>> >>
>>> >> (2014/06/18 4:58), Xiangrui Meng wrote:
>>> >>>
>>> >>> How many partitions did you set? If there are too many partitions,
>>> >>> please do a coalesce before calling ML algorithms.
>>> >>
>>> >>
>>> >> The training data "news20.random.1000" is small and thus only 2
>>> >> partitions
>>> >> are used by the default.
>>> >>
>>> >> val training = MLUtils.loadLibSVMFile(sc,
>>> >> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>>> >> multiclass=false).
>>> >>
>>> >> We also tried 32 partitions as follows but the aggregate never
>>> >> finishes.
>>> >>
>>> >> val training = MLUtils.loadLibSVMFile(sc,
>>> >> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>>> >> multiclass=false, numFeatures = 1354731 , minPartitions = 32)
>>> >>
>>> >>
>>> >>> Btw, could you try the tree branch in my repo?
>>> >>> https://github.com/mengxr/spark/tree/tree
>>> >>>
>>> >>> I used tree aggregate in this branch. It should help with the
>>> >>> scalability.
>>> >>
>>> >>
>>> >> Is treeAggregate itself available on Spark 1.0?
>>> >>
>>> >> I wonder.. Could I test your modification just by running the following
>>> >> code
>>> >> on REPL?
>>> >>
>>> >> ---
>>> >> val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 +
>>> >> i)
>>> >> .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
>>> >>   seqOp = (c, v) => (c, v) match { case ((grad, loss), (label,
>>> >> features)) =>
>>> >> val l = gradient.compute(features, label, weights,
>>> >> Vectors.fromBreeze(grad))
>>> >> (grad, loss + l)
>>> >>   },
>>> >>   combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1),
>>> >> (grad2, loss2)) =>
>>> >> (grad1 += grad2, loss1 + loss2)
>>> >>   }, 2)
>>> >> -
>>> >>
>>> >> Rebuilding Spark is quite something to do evaluation.
>>> >>
>>> >> Thanks,
>>> >> Makoto
>>
>>


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Xiangrui Meng
DB, Yes, reduce and aggregate are linear.

Makoto, dense vectors are used to in aggregation. If you have 32
partitions and each one sending a dense vector of size 1,354,731 to
master. Then the driver needs 300M+. That may be the problem. Which
deploy mode are you using, standalone or local?

Debasish, there is an old PR for butterfly allreduce. However, it
doesn't seem to be the right way to go for Spark. I just sent out the
PR: https://github.com/apache/spark/pull/1110 . This is a WIP and it
needs more testing before we are confident to merge it. It would be
great if you can help test it.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 2:33 PM, Debasish Das  wrote:
> Xiangrui,
>
> Could you point to the JIRA related to tree aggregate ? ...sounds like the
> allreduce idea...
>
> I would definitely like to try it on our dataset...
>
> Makoto,
>
> I did run pretty big sparse dataset (20M rows, 3M sparse features) and I got
> 100 iterations of SGD running in 200 seconds...10 executors each with 16 GB
> memory...
>
> Although the best result on the same dataset came out of liblinear and
> BFGS-L1 out of box...so I did not tune the SGD further on learning rate and
> other heuristics...it was arnd 5% off...
>
> Thanks.
> Deb
>
>
>
> On Tue, Jun 17, 2014 at 2:09 PM, DB Tsai  wrote:
>>
>> Hi Xiangrui,
>>
>> Does it mean that mapPartition and then reduce shares the same
>> behavior as aggregate operation which is O(n)?
>>
>> Sincerely,
>>
>> DB Tsai
>> ---
>> My Blog: https://www.dbtsai.com
>> LinkedIn: https://www.linkedin.com/in/dbtsai
>>
>>
>> On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng  wrote:
>> > Hi DB,
>> >
>> > treeReduce (treeAggregate) is a feature I'm testing now. It is a
>> > compromise between current reduce and butterfly allReduce. The former
>> > runs in linear time on the number of partitions, the latter introduces
>> > too many dependencies. treeAggregate with depth = 2 should run in
>> > O(sqrt(n)) time, where n is the number of partitions. It would be
>> > great if someone can help test its scalability.
>> >
>> > Best,
>> > Xiangrui
>> >
>> > On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui  wrote:
>> >> Hi Xiangrui,
>> >>
>> >>
>> >> (2014/06/18 4:58), Xiangrui Meng wrote:
>> >>>
>> >>> How many partitions did you set? If there are too many partitions,
>> >>> please do a coalesce before calling ML algorithms.
>> >>
>> >>
>> >> The training data "news20.random.1000" is small and thus only 2
>> >> partitions
>> >> are used by the default.
>> >>
>> >> val training = MLUtils.loadLibSVMFile(sc,
>> >> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>> >> multiclass=false).
>> >>
>> >> We also tried 32 partitions as follows but the aggregate never
>> >> finishes.
>> >>
>> >> val training = MLUtils.loadLibSVMFile(sc,
>> >> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>> >> multiclass=false, numFeatures = 1354731 , minPartitions = 32)
>> >>
>> >>
>> >>> Btw, could you try the tree branch in my repo?
>> >>> https://github.com/mengxr/spark/tree/tree
>> >>>
>> >>> I used tree aggregate in this branch. It should help with the
>> >>> scalability.
>> >>
>> >>
>> >> Is treeAggregate itself available on Spark 1.0?
>> >>
>> >> I wonder.. Could I test your modification just by running the following
>> >> code
>> >> on REPL?
>> >>
>> >> ---
>> >> val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 +
>> >> i)
>> >> .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
>> >>   seqOp = (c, v) => (c, v) match { case ((grad, loss), (label,
>> >> features)) =>
>> >> val l = gradient.compute(features, label, weights,
>> >> Vectors.fromBreeze(grad))
>> >> (grad, loss + l)
>> >>   },
>> >>   combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1),
>> >> (grad2, loss2)) =>
>> >> (grad1 += grad2, loss1 + loss2)
>> >>   }, 2)
>> >> -
>> >>
>> >> Rebuilding Spark is quite something to do evaluation.
>> >>
>> >> Thanks,
>> >> Makoto
>
>


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Xiangrui Meng
Hi Makoto,

Are you using Spark 1.0 or 0.9? Could you go to the executor tab of
the web UI and check the driver's memory?

treeAggregate is not part of 1.0.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng  wrote:
> Hi DB,
>
> treeReduce (treeAggregate) is a feature I'm testing now. It is a
> compromise between current reduce and butterfly allReduce. The former
> runs in linear time on the number of partitions, the latter introduces
> too many dependencies. treeAggregate with depth = 2 should run in
> O(sqrt(n)) time, where n is the number of partitions. It would be
> great if someone can help test its scalability.
>
> Best,
> Xiangrui
>
> On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui  wrote:
>> Hi Xiangrui,
>>
>>
>> (2014/06/18 4:58), Xiangrui Meng wrote:
>>>
>>> How many partitions did you set? If there are too many partitions,
>>> please do a coalesce before calling ML algorithms.
>>
>>
>> The training data "news20.random.1000" is small and thus only 2 partitions
>> are used by the default.
>>
>> val training = MLUtils.loadLibSVMFile(sc,
>> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>> multiclass=false).
>>
>> We also tried 32 partitions as follows but the aggregate never finishes.
>>
>> val training = MLUtils.loadLibSVMFile(sc,
>> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>> multiclass=false, numFeatures = 1354731 , minPartitions = 32)
>>
>>
>>> Btw, could you try the tree branch in my repo?
>>> https://github.com/mengxr/spark/tree/tree
>>>
>>> I used tree aggregate in this branch. It should help with the scalability.
>>
>>
>> Is treeAggregate itself available on Spark 1.0?
>>
>> I wonder.. Could I test your modification just by running the following code
>> on REPL?
>>
>> ---
>> val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
>> .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
>>   seqOp = (c, v) => (c, v) match { case ((grad, loss), (label,
>> features)) =>
>> val l = gradient.compute(features, label, weights,
>> Vectors.fromBreeze(grad))
>> (grad, loss + l)
>>   },
>>   combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1),
>> (grad2, loss2)) =>
>> (grad1 += grad2, loss1 + loss2)
>>   }, 2)
>> -
>>
>> Rebuilding Spark is quite something to do evaluation.
>>
>> Thanks,
>> Makoto


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Xiangrui Meng
Hi DB,

treeReduce (treeAggregate) is a feature I'm testing now. It is a
compromise between current reduce and butterfly allReduce. The former
runs in linear time on the number of partitions, the latter introduces
too many dependencies. treeAggregate with depth = 2 should run in
O(sqrt(n)) time, where n is the number of partitions. It would be
great if someone can help test its scalability.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui  wrote:
> Hi Xiangrui,
>
>
> (2014/06/18 4:58), Xiangrui Meng wrote:
>>
>> How many partitions did you set? If there are too many partitions,
>> please do a coalesce before calling ML algorithms.
>
>
> The training data "news20.random.1000" is small and thus only 2 partitions
> are used by the default.
>
> val training = MLUtils.loadLibSVMFile(sc,
> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
> multiclass=false).
>
> We also tried 32 partitions as follows but the aggregate never finishes.
>
> val training = MLUtils.loadLibSVMFile(sc,
> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
> multiclass=false, numFeatures = 1354731 , minPartitions = 32)
>
>
>> Btw, could you try the tree branch in my repo?
>> https://github.com/mengxr/spark/tree/tree
>>
>> I used tree aggregate in this branch. It should help with the scalability.
>
>
> Is treeAggregate itself available on Spark 1.0?
>
> I wonder.. Could I test your modification just by running the following code
> on REPL?
>
> ---
> val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
> .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
>   seqOp = (c, v) => (c, v) match { case ((grad, loss), (label,
> features)) =>
> val l = gradient.compute(features, label, weights,
> Vectors.fromBreeze(grad))
> (grad, loss + l)
>   },
>   combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1),
> (grad2, loss2)) =>
> (grad1 += grad2, loss1 + loss2)
>   }, 2)
> -
>
> Rebuilding Spark is quite something to do evaluation.
>
> Thanks,
> Makoto


Re: Execution stalls in LogisticRegressionWithSGD

2014-06-17 Thread Xiangrui Meng
Hi Bharath,

Thanks for posting the details! Which Spark version are you using?

Best,
Xiangrui

On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar  wrote:
> Hi,
>
> (Apologies for the long mail, but it's necessary to provide sufficient
> details considering the number of issues faced.)
>
> I'm running into issues testing LogisticRegressionWithSGD a two node cluster
> (each node with 24 cores and 16G available to slaves out of 24G on the
> system). Here's a description of the application:
>
> The model is being trained based on categorical features x, y, and (x,y).
> The categorical features are mapped to binary features by converting each
> distinct value in the category enum into a binary feature by itself (i.e
> presence of that value in a record implies corresponding feature = 1, else
> feature = 0. So, there'd be as many distinct features as enum values) . The
> training vector is laid out as
> [x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the
> training data has only one combination (Xk,Yk) and a label appearing in the
> record. Thus, the corresponding labeledpoint sparse vector would only have 3
> values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector
> (though parse) would be nearly 614000.  The number of records is about 1.33
> million. The records have been coalesced into 20 partitions across two
> nodes. The input data has not been cached.
> (NOTE: I do realize the records & features may seem large for a two node
> setup, but given the memory & cpu, and the fact that I'm willing to give up
> some turnaround time, I don't see why tasks should inexplicably fail)
>
> Additional parameters include:
>
> spark.executor.memory = 14G
> spark.default.parallelism = 1
> spark.cores.max=20
> spark.storage.memoryFraction=0.8 //No cache space required
> (Trying to set spark.akka.frameSize to a larger number, say, 20 didn't help
> either)
>
> The model training was initialized as : new LogisticRegressionWithSGD(1,
> maxIterations, 0.0, 0.05)
>
> However, after 4 iterations of gradient descent, the entire execution
> appeared to stall inexplicably. The corresponding executor details and
> details of the stalled stage (number 14) are as follows:
>
> MetricMin25th Median75th Max
> Result serialization time12 ms13 ms14 ms16 ms18 ms
> Duration4 s4 s5 s5 s5 s
> Time spent fetching task 0 ms0 ms0 ms0 ms0 ms
> results
> Scheduler delay6 s6 s6 s6 s
> 12 s
>
>
> Stage Id
> 14 aggregate at GradientDescent.scala:178
>
> Task IndexTask IDStatusLocality Level Executor
> Launch TimeDurationGC Result Ser TimeErrors
>
> Time
>
> 0 600 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
> 2014/06/17 10:32:27 1.1 h
> 1 601 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com
> 2014/06/17 10:32:27 1.1 h
> 2 602 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
> 2014/06/17 10:32:27 1.1 h
> 3 603 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com
> 2014/06/17 10:32:27 1.1 h
> 4 604 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
> 2014/06/17 10:32:27 1.1 h
> 5 605 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
> 2014/06/17 10:32:27 4 s 2 s 12 ms
> 6 606 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
> 2014/06/17 10:32:27 4 s 1 s 14 ms
> 7 607 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
> 2014/06/17 10:32:27 4 s 2 s 12 ms
> 8 608 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
> 2014/06/17 10:32:27 5 s 1 s 15 ms
> 9 609 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
> 2014/06/17 10:32:27 5 s 1 s 14 ms
> 10 610 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
> 2014/06/17 10:32:27 5 s 1 s 15 ms
> 11 611 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
> 2014/06/17 10:32:27 4 s 1 s 13 ms
> 12 612 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
> 2014/06/17 10:32:27 5 s 1 s 18 ms
> 13 613 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
> 2014/06/17 10:32:27 5 s 1 s 13 ms
> 14 614 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
> 2014/06/17 10:32:27 4 s 1 s 14 ms
> 15 615 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
> 2014/06/17 10:32:27 4 s 1 s 12 ms
> 16 616 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
> 2014/06/17 10:32:27 5 s 1 s 15 ms
> 17 617 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
> 2014/06/17 10:32:27 5 s 1 s 18 ms
> 18 618 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
> 2014/06/17 10:32

Re: Contribution to Spark MLLib

2014-06-17 Thread Xiangrui Meng
Hi Jayati,

Thanks for asking! MLlib algorithms are all implemented in Scala. It
makes us easier to maintain if we have the implementations in one
place. For the roadmap, please visit
http://www.slideshare.net/xrmeng/m-llib-hadoopsummit to see features
planned for v1.1. Before contributing new algorithms, it would be
great if you can start with working on an existing JIRA.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 12:22 AM, Jayati  wrote:
> Hello,
>
> I wish to contribute some algorithms to the MLLib of Spark but at the same
> time wanted to make sure that I don't try something redundant.
>
> Will it be okay with you to let me know the set of algorithms which aren't
> there in your road map in the near future ?
>
> Also, can I use Java to write machine learning algorithms for Spark MLLib
> instead of Scala ?
>
> Regards,
> Jayati
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Contribution-to-Spark-MLLib-tp7716.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Xiangrui Meng
Hi Makoto,

How many partitions did you set? If there are too many partitions,
please do a coalesce before calling ML algorithms.

Btw, could you try the tree branch in my repo?
https://github.com/mengxr/spark/tree/tree

I used tree aggregate in this branch. It should help with the scalability.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 12:22 PM, Makoto Yui  wrote:
> Here is follow-up to the previous evaluation.
>
> "aggregate at GradientDescent.scala:178" never finishes at
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala#L178
>
> We confirmed, by -verbose:gc, that GC is not happening during the aggregate
> and the cumulative CPU time for the task is increasing little by little.
>
> LBFGS also does not work for large # of features (news20.random.1000)
> though it works fine for small # of features (news20.binary.1000).
>
> "aggregate at LBFGS.scala:201" also never finishes at
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala#L201
>
> ---
> [Evaluated code for LBFGS]
>
> import org.apache.spark.SparkContext
> import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
> import org.apache.spark.mllib.linalg.Vectors
> import org.apache.spark.mllib.util.MLUtils
> import org.apache.spark.mllib.classification.LogisticRegressionModel
> import org.apache.spark.mllib.optimization._
>
> val data = MLUtils.loadLibSVMFile(sc,
> "hdfs://dm01:8020/dataset/news20-binary/news20.random.1000",
> multiclass=false)
> val numFeatures = data.take(1)(0).features.size
>
> val training = data.map(x => (x.label, 
> MLUtils.appendBias(x.features))).cache()
>
> // Run training algorithm to build the model
> val numCorrections = 10
> val convergenceTol = 1e-4
> val maxNumIterations = 20
> val regParam = 0.1
> val initialWeightsWithIntercept = Vectors.dense(new
> Array[Double](numFeatures + 1))
>
> val (weightsWithIntercept, loss) = LBFGS.runLBFGS(
>   training,
>   new LogisticGradient(),
>   new SquaredL2Updater(),
>   numCorrections,
>   convergenceTol,
>   maxNumIterations,
>   regParam,
>   initialWeightsWithIntercept)
> ---
>
>
> Thanks,
> Makoto
>
> 2014-06-17 21:32 GMT+09:00 Makoto Yui :
>> Hello,
>>
>> I have been evaluating LogisticRegressionWithSGD of Spark 1.0 MLlib on
>> Hadoop 0.20.2-cdh3u6 but it does not work for a sparse dataset though
>> the number of training examples used in the evaluation is just 1,000.
>>
>> It works fine for the dataset *news20.binary.1000* that has 178,560
>> features. However, it does not work for *news20.random.1000* where # of
>> features is large  (1,354,731 features) though we used a sparse vector
>> through MLUtils.loadLibSVMFile().
>>
>> The execution seems not progressing while no error is reported in the
>> spark-shell as well as in the stdout/stderr of executors.
>>
>> We used 32 executors with each allocating 7GB (2GB is for RDD) for
>> working memory.
>>
>> Any suggesions? Your help is really appreciated.
>>
>> ==
>> Executed code
>> ==
>> import org.apache.spark.mllib.util.MLUtils
>> import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
>>
>> //val training = MLUtils.loadLibSVMFile(sc,
>> "hdfs://host:8020/dataset/news20-binary/news20.binary.1000",
>> multiclass=false)
>> val training = MLUtils.loadLibSVMFile(sc,
>> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>> multiclass=false)
>>
>> val numFeatures = training .take(1)(0).features.size
>> //numFeatures: Int = 178560 for news20.binary.1000
>> //numFeatures: Int = 1354731 for news20.random.1000
>> val model = LogisticRegressionWithSGD.train(training, numIterations=1)
>>
>> ==
>> The dataset used in the evaluation
>> ==
>>
>> http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#news20.binary
>>
>> $ head -1000 news20.binary | sed 's/+1/1/g' | sed 's/-1/0/g' >
>> news20.binary.1000
>> $ sort -R news20.binary > news20.random
>> $ head -1000 news20.random | sed 's/+1/1/g' | sed 's/-1/0/g' >
>> news20.random.1000
>>
>> You can find the dataset in
>> https://dl.dropboxusercontent.com/u/13123103/news20.random.1000
>> https://dl.dropboxusercontent.com/u/13123103/news20.binary.1000
>>
>>
>> Thanks,
>> Makoto


Re: MLlib-Missing Regularization Parameter and Intercept for Logistic Regression

2014-06-16 Thread Xiangrui Meng
Someone is working on weighted regularization. Stay tuned. -Xiangrui

On Mon, Jun 16, 2014 at 9:36 AM, FIXED-TERM Yi Congrui (CR/RTC1.3-NA)
 wrote:
> Hi Xiangrui,
>
> Thank you for the reply! I have tried customizing 
> LogisticRegressionSGD.optimizer as in the example you mentioned, but the 
> source code reveals that the intercept is also penalized if one is included, 
> which is usually inappropriate. The developer should fix this problem.
>
> Best,
>
> Congrui
>
> -Original Message-
> From: Xiangrui Meng [mailto:men...@gmail.com]
> Sent: Friday, June 13, 2014 11:50 PM
> To: user@spark.apache.org
> Cc: user
> Subject: Re: MLlib-Missing Regularization Parameter and Intercept for 
> Logistic Regression
>
> 1. 
> "examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala"
> contains example code that shows how to set regParam.
>
> 2. A static method with more than 3 parameters becomes hard to
> remember and hard to maintain. Please use LogistricRegressionWithSGD's
> default constructor and setters.
>
> -Xiangrui


Re: MLlib-Missing Regularization Parameter and Intercept for Logistic Regression

2014-06-13 Thread Xiangrui Meng
1. 
"examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala"
contains example code that shows how to set regParam.

2. A static method with more than 3 parameters becomes hard to
remember and hard to maintain. Please use LogistricRegressionWithSGD's
default constructor and setters.

-Xiangrui


Re: Convert text into tfidf vectors for Classification

2014-06-13 Thread Xiangrui Meng
You can create tf vectors and then use
RowMatrix.computeColumnSummaryStatistics to get df (numNonzeros). For
tokenizer and stemmer, you can use scalanlp/chalk. Yes, it is worth
having a simple interface for it. -Xiangrui

On Fri, Jun 13, 2014 at 1:21 AM, Stuti Awasthi  wrote:
> Hi all,
>
>
>
> I wanted to perform Text Classification using Spark1.0 Naïve Bayes. I was
> looking for the way to convert text into sparse vector with TFIDF weighting
> scheme.
>
> I found that MLI library supports that but it is compatible with Spark 0.8.
>
>
>
> What are all the options available to achieve text vectorization. Is there
> any pre-built api’s which can be used or other way in which we can achieve
> this
>
> Please suggest
>
>
>
> Thanks
>
> Stuti Awasthi
>
>
>
> ::DISCLAIMER::
> 
>
> The contents of this e-mail and any attachment(s) are confidential and
> intended for the named recipient(s) only.
> E-mail transmission is not guaranteed to be secure or error-free as
> information could be intercepted, corrupted,
> lost, destroyed, arrive late or incomplete, or may contain viruses in
> transmission. The e mail and its contents
> (with or without referred errors) shall therefore not attach any liability
> on the originator or HCL or its affiliates.
> Views or opinions, if any, presented in this email are solely those of the
> author and may not necessarily reflect the
> views or opinions of HCL or its affiliates. Any form of reproduction,
> dissemination, copying, disclosure, modification,
> distribution and / or publication of this message without the prior written
> consent of authorized representative of
> HCL is strictly prohibited. If you have received this email in error please
> delete it and notify the sender immediately.
> Before opening any email and/or attachments, please check them for viruses
> and other defects.
>
> 


Re: Not fully cached when there is enough memory

2014-06-11 Thread Xiangrui Meng
Could you try to click one that RDD and see the storage info per
partition? I tried continuously caching RDDs, so new ones kick old
ones out when there is not enough memory. I saw similar glitches but
the storage info per partition is correct. If you find a way to
reproduce this error, please create a JIRA. Thanks! -Xiangrui


Re: How to process multiple classification with SVM in MLlib

2014-06-09 Thread Xiangrui Meng
For broadcast data, please read
http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
.
For one-vs-all, please read
https://en.wikipedia.org/wiki/Multiclass_classification .

-Xiangrui

On Mon, Jun 9, 2014 at 7:24 AM, littlebird  wrote:
> Thank you for your reply, I don't quite understand how to do one-vs-all
> manually for multiclass
> training. And for the second question, My algorithm is implemented in Java
> and designed for single machine, How can I broadcast the dataset to each
> worker, train models on workers? Thank you very much.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-process-multiple-classification-with-SVM-in-MLlib-tp7174p7251.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Classpath errors with Breeze

2014-06-08 Thread Xiangrui Meng
Hi Tobias,

Which file system and which encryption are you using?

Best,
Xiangrui

On Sun, Jun 8, 2014 at 10:16 PM, Xiangrui Meng  wrote:
> Hi dlaw,
>
> You are using breeze-0.8.1, but the spark assembly jar depends on
> breeze-0.7. If the spark assembly jar comes the first on the classpath
> but the method from DenseMatrix is only available in breeze-0.8.1, you
> get NoSuchMethod. So,
>
> a) If you don't need the features in breeze-0.8.1, do not include it
> as a dependency.
>
> or
>
> b) Try an experimental features by turning on
> spark.files.userClassPathFirst in your Spark configuration.
>
> Best,
> Xiangrui
>
> On Sun, Jun 8, 2014 at 10:08 PM, dlaw  wrote:
>> Thanks for the quick response. No, I actually build my jar via 'sbt package'
>> on EC2 on the master itself.
>>
>>
>>
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Classpath-errors-with-Breeze-tp7220p7225.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Classpath errors with Breeze

2014-06-08 Thread Xiangrui Meng
Hi dlaw,

You are using breeze-0.8.1, but the spark assembly jar depends on
breeze-0.7. If the spark assembly jar comes the first on the classpath
but the method from DenseMatrix is only available in breeze-0.8.1, you
get NoSuchMethod. So,

a) If you don't need the features in breeze-0.8.1, do not include it
as a dependency.

or

b) Try an experimental features by turning on
spark.files.userClassPathFirst in your Spark configuration.

Best,
Xiangrui

On Sun, Jun 8, 2014 at 10:08 PM, dlaw  wrote:
> Thanks for the quick response. No, I actually build my jar via 'sbt package'
> on EC2 on the master itself.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Classpath-errors-with-Breeze-tp7220p7225.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to process multiple classification with SVM in MLlib

2014-06-07 Thread Xiangrui Meng
At this time, you need to do one-vs-all manually for multiclass
training. For your second question, if the algorithm is implemented in
Java/Scala/Python and designed for single machine, you can broadcast
the dataset to each worker, train models on workers. If the algorithm
is implemented in a different language, maybe you need pipe to train
the models outside JVM (similar to Hadoop Streaming). If the algorithm
is designed for a different parallel platform, then it may be hard to
use it in Spark. -Xiangrui

On Sat, Jun 7, 2014 at 7:15 AM, littlebird  wrote:
> Hi All,
>   As we know, In MLlib the SVM is used for binary classification. I wonder
> how to train SVM model for mutiple classification in MLlib. In addition, how
> to apply the machine learning algorithm in Spark if the algorithm isn't
> included in MLlib. Thank you.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-process-multiple-classification-with-SVM-in-MLlib-tp7174.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Identify #iterations KMeans executing

2014-06-06 Thread Xiangrui Meng
Yes. If k-means reached the max number of iterations, you should see
the following in the log:

KMeans reached the max number of iterations:

Best,
Xiangrui

On Fri, Jun 6, 2014 at 2:08 AM, Stuti Awasthi  wrote:
> Hi all,
>
>
>
> I have a very basic question. I  tried running KMeans with 10 iterations but
> with only 1 run. Sometimes I receive “0 run completed in 5 iteration” which
> indicates that my run is completed in 5 iterations.
>
> But some time I did not get any such line in logs.
>
>
>
> Does it mean that my run is taking whole 10 iterations to complete the
> clustering. I checked the code and found that the log info is present in if
> statement hence may be if its completing all iterations, its not printing it
> in the console.
>
>
>
> Just wanted to confirm my understanding is correct or not
>
>
>
> Thanks
>
> Stuti Awasthi
>
>
>
> ::DISCLAIMER::
> 
>
> The contents of this e-mail and any attachment(s) are confidential and
> intended for the named recipient(s) only.
> E-mail transmission is not guaranteed to be secure or error-free as
> information could be intercepted, corrupted,
> lost, destroyed, arrive late or incomplete, or may contain viruses in
> transmission. The e mail and its contents
> (with or without referred errors) shall therefore not attach any liability
> on the originator or HCL or its affiliates.
> Views or opinions, if any, presented in this email are solely those of the
> author and may not necessarily reflect the
> views or opinions of HCL or its affiliates. Any form of reproduction,
> dissemination, copying, disclosure, modification,
> distribution and / or publication of this message without the prior written
> consent of authorized representative of
> HCL is strictly prohibited. If you have received this email in error please
> delete it and notify the sender immediately.
> Before opening any email and/or attachments, please check them for viruses
> and other defects.
>
> 


Re: Native library can not be loaded when using Mllib PCA

2014-06-05 Thread Xiangrui Meng
For standalone and yarn mode, you need to install native libraries on all 
nodes. The best solution is installing them to /usr/lib/libblas.so.3 and 
/usr/lib/liblapack.so.3 . If your matrix is sparse, the native libraries cannot 
help because they are for dense linear algebra. You can create RDD of sparse 
rows and try k-means directly, it supports sparse input. -Xiangrui

Sent from my iPad

> On Jun 5, 2014, at 2:36 AM, yangliuyu  wrote:
> 
> Hi,
> 
> We're using Mllib (1.0.0 release version) on a k-means clustering problem.
> We want to reduce the matrix column size before send the points to k-means
> solver.
> 
> It works on my mac with the local mode: spark-test-run-assembly-1.0.jar
> contains my application code, com.github.fommil, netlib code and
> netlib-native*.so files (include jnilib and dll files) 
> 
> spark-submit --class test.TestMllibPCA --master local[4] --executor-memory
> 3g --driver-memory 3g --driver-class-path
> /data/user/dump/spark-test-run-assembly-1.0.jar
> /data/user/dump/spark-test-run-assembly-1.0.jar
> /data/user/dump/user_fav_2014_04_09.csv.head1w 
> 
> But if  --driver-class-path removed, the warn message appears:
> 14/06/05 16:36:20 WARN LAPACK: Failed to load implementation from:
> com.github.fommil.netlib.NativeSystemLAPACK
> 14/06/05 16:36:20 WARN LAPACK: Failed to load implementation from:
> com.github.fommil.netlib.NativeRefLAPACK
> 
> or set SPARK_CLASSPATH=/data/user/dump/spark-test-run-assembly-1.0.jar can
> also solve the problem.
> 
> The matrix contain sparse data with rows: 6778, columns: 2487 and the time
> consume of calculating PCA is 10s and 47s respectively which infers the
> native library works well.
> 
> Then I want to test it on a spark standalone cluster(on CentOS), but it
> failed again.
> After change JDK logging level to FINEST, got the message:
> 14/06/05 16:19:15 INFO JniLoader: JNI LIB =
> netlib-native_system-linux-x86_64.so
> 14/06/05 16:19:15 INFO JniLoader: extracting
> jar:file:/data/user/dump/spark-test-run-assembly-1.0.jar!/netlib-native_system-linux-x86_64.so
> to /tmp/jniloader6648403281987654682netlib-native_system-linux-x86_64.so
> 14/06/05 16:19:15 WARN LAPACK: Failed to load implementation from:
> com.github.fommil.netlib.NativeSystemLAPACK
> 14/06/05 16:19:15 INFO JniLoader: JNI LIB =
> netlib-native_ref-linux-x86_64.so
> 14/06/05 16:19:15 INFO JniLoader: extracting
> jar:file:/data/user/dump/spark-test-run-assembly-1.0.jar!/netlib-native_ref-linux-x86_64.so
> to /tmp/jniloader2298588627398263902netlib-native_ref-linux-x86_64.so
> 14/06/05 16:19:16 WARN LAPACK: Failed to load implementation from:
> com.github.fommil.netlib.NativeRefLAPACK
> 14/06/05 16:19:16 INFO LAPACK: Implementation provided by class
> com.github.fommil.netlib.F2jLAPACK
> 
> The libgfortran ,atlas, blas, lapack and arpack are all installed and all of
> the .so files are located under /usr/lib64, spark.executor.extraLibraryPath
> is set to /usr/lib64 in conf/spark-defaults.conf but none of them works. I
> tried add --jars /data/user/dump/spark-test-run-assembly-1.0.jar but no good
> news.
> 
> What should I try next?
> 
> Is the native library need to be visible for driver and executor both? In
> local mode the problem seems to be a classpath problem, but for standalone
> and yarn mode it get more complex. A detail document is really helpful.
> 
> Thanks.
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Native-library-can-not-be-loaded-when-using-Mllib-PCA-tp7042.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Logistic Regression MLLib Slow

2014-06-04 Thread Xiangrui Meng
Hi Krishna,

Specifying executor memory in local mode has no effect, because all of
the threads run inside the same JVM. You can either try
--driver-memory 60g or start a standalone server.

Best,
Xiangrui

On Wed, Jun 4, 2014 at 7:28 PM, Xiangrui Meng  wrote:
> 80M by 4 should be about 2.5GB uncompressed. 10 iterations shouldn't
> take that long, even on a single executor. Besides what Matei
> suggested, could you also verify the executor memory in
> http://localhost:4040 in the Executors tab. It is very likely the
> executors do not have enough memory. In that case, caching may be
> slower than reading directly from disk. -Xiangrui
>
> On Wed, Jun 4, 2014 at 7:06 PM, Matei Zaharia  wrote:
>> Ah, is the file gzipped by any chance? We can’t decompress gzipped files in
>> parallel so they get processed by a single task.
>>
>> It may also be worth looking at the application UI (http://localhost:4040)
>> to see 1) whether all the data fits in memory in the Storage tab (maybe it
>> somehow becomes larger, though it seems unlikely that it would exceed 20 GB)
>> and 2) how many parallel tasks run in each iteration.
>>
>> Matei
>>
>> On Jun 4, 2014, at 6:56 PM, Srikrishna S  wrote:
>>
>> I am using the MLLib one (LogisticRegressionWithSGD)  with PySpark. I am
>> running to only 10 iterations.
>>
>> The MLLib version of logistic regression doesn't seem to use all the cores
>> on my machine.
>>
>> Regards,
>> Krishna
>>
>>
>>
>> On Wed, Jun 4, 2014 at 6:47 PM, Matei Zaharia 
>> wrote:
>>>
>>> Are you using the logistic_regression.py in examples/src/main/python or
>>> examples/src/main/python/mllib? The first one is an example of writing
>>> logistic regression by hand and won’t be as efficient as the MLlib one. I
>>> suggest trying the MLlib one.
>>>
>>> You may also want to check how many iterations it runs — by default I
>>> think it runs 100, which may be more than you need.
>>>
>>> Matei
>>>
>>> On Jun 4, 2014, at 5:47 PM, Srikrishna S  wrote:
>>>
>>> > Hi All.,
>>> >
>>> > I am new to Spark and I am trying to run LogisticRegression (with SGD)
>>> > using MLLib on a beefy single machine with about 128GB RAM. The dataset 
>>> > has
>>> > about 80M rows with only 4 features so it barely occupies 2Gb on disk.
>>> >
>>> > I am running the code using all 8 cores with 20G memory using
>>> > spark-submit --executor-memory 20G --master local[8]
>>> > logistic_regression.py
>>> >
>>> > It seems to take about 3.5 hours without caching and over 5 hours with
>>> > caching.
>>> >
>>> > What is the recommended use for Spark on a beefy single machine?
>>> >
>>> > Any suggestions will help!
>>> >
>>> > Regards,
>>> > Krishna
>>> >
>>> >
>>> > Code sample:
>>> >
>>> > -
>>> > # Dataset
>>> > d = sys.argv[1]
>>> > data = sc.textFile(d)
>>> >
>>> > # Load and parse the data
>>> > #
>>> > --
>>> > def parsePoint(line):
>>> > values = [float(x) for x in line.split(',')]
>>> > return LabeledPoint(values[0], values[1:])
>>> > _parsedData = data.map(parsePoint)
>>> > parsedData = _parsedData.cache()
>>> > results = {}
>>> >
>>> > # Spark
>>> > #
>>> > --
>>> > start_time = time.time()
>>> > # Build the gl_model
>>> > niters = 10
>>> > spark_model = LogisticRegressionWithSGD.train(parsedData,
>>> > iterations=niters)
>>> >
>>> > # Evaluate the gl_model on training data
>>> > labelsAndPreds = parsedData.map(lambda p: (p.label,
>>> > spark_model.predict(p.features)))
>>> > trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() /
>>> > float(parsedData.count())
>>> >
>>>
>>
>>


Re: Logistic Regression MLLib Slow

2014-06-04 Thread Xiangrui Meng
80M by 4 should be about 2.5GB uncompressed. 10 iterations shouldn't
take that long, even on a single executor. Besides what Matei
suggested, could you also verify the executor memory in
http://localhost:4040 in the Executors tab. It is very likely the
executors do not have enough memory. In that case, caching may be
slower than reading directly from disk. -Xiangrui

On Wed, Jun 4, 2014 at 7:06 PM, Matei Zaharia  wrote:
> Ah, is the file gzipped by any chance? We can’t decompress gzipped files in
> parallel so they get processed by a single task.
>
> It may also be worth looking at the application UI (http://localhost:4040)
> to see 1) whether all the data fits in memory in the Storage tab (maybe it
> somehow becomes larger, though it seems unlikely that it would exceed 20 GB)
> and 2) how many parallel tasks run in each iteration.
>
> Matei
>
> On Jun 4, 2014, at 6:56 PM, Srikrishna S  wrote:
>
> I am using the MLLib one (LogisticRegressionWithSGD)  with PySpark. I am
> running to only 10 iterations.
>
> The MLLib version of logistic regression doesn't seem to use all the cores
> on my machine.
>
> Regards,
> Krishna
>
>
>
> On Wed, Jun 4, 2014 at 6:47 PM, Matei Zaharia 
> wrote:
>>
>> Are you using the logistic_regression.py in examples/src/main/python or
>> examples/src/main/python/mllib? The first one is an example of writing
>> logistic regression by hand and won’t be as efficient as the MLlib one. I
>> suggest trying the MLlib one.
>>
>> You may also want to check how many iterations it runs — by default I
>> think it runs 100, which may be more than you need.
>>
>> Matei
>>
>> On Jun 4, 2014, at 5:47 PM, Srikrishna S  wrote:
>>
>> > Hi All.,
>> >
>> > I am new to Spark and I am trying to run LogisticRegression (with SGD)
>> > using MLLib on a beefy single machine with about 128GB RAM. The dataset has
>> > about 80M rows with only 4 features so it barely occupies 2Gb on disk.
>> >
>> > I am running the code using all 8 cores with 20G memory using
>> > spark-submit --executor-memory 20G --master local[8]
>> > logistic_regression.py
>> >
>> > It seems to take about 3.5 hours without caching and over 5 hours with
>> > caching.
>> >
>> > What is the recommended use for Spark on a beefy single machine?
>> >
>> > Any suggestions will help!
>> >
>> > Regards,
>> > Krishna
>> >
>> >
>> > Code sample:
>> >
>> > -
>> > # Dataset
>> > d = sys.argv[1]
>> > data = sc.textFile(d)
>> >
>> > # Load and parse the data
>> > #
>> > --
>> > def parsePoint(line):
>> > values = [float(x) for x in line.split(',')]
>> > return LabeledPoint(values[0], values[1:])
>> > _parsedData = data.map(parsePoint)
>> > parsedData = _parsedData.cache()
>> > results = {}
>> >
>> > # Spark
>> > #
>> > --
>> > start_time = time.time()
>> > # Build the gl_model
>> > niters = 10
>> > spark_model = LogisticRegressionWithSGD.train(parsedData,
>> > iterations=niters)
>> >
>> > # Evaluate the gl_model on training data
>> > labelsAndPreds = parsedData.map(lambda p: (p.label,
>> > spark_model.predict(p.features)))
>> > trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() /
>> > float(parsedData.count())
>> >
>>
>
>


Re: IllegalArgumentException on calling KMeans.train()

2014-06-04 Thread Xiangrui Meng
Could you check whether the vectors have the same size? -Xiangrui

On Wed, Jun 4, 2014 at 1:43 AM, bluejoe2008  wrote:
> what does this exception mean?
>
> 14/06/04 16:35:15 ERROR executor.Executor: Exception in task ID 6
> java.lang.IllegalArgumentException: requirement failed
> at scala.Predef$.require(Predef.scala:221)
> at
> org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:271)
> at
> org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:398)
> at
> org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:372)
> at
> org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:366)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at org.apache.spark.mllib.clustering.KMeans$.findClosest(KMeans.scala:366)
> at org.apache.spark.mllib.clustering.KMeans$.pointCost(KMeans.scala:389)
> at
> org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:269)
> at
> org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:268)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.Range.foreach(Range.scala:141)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at
> org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:268)
> at
> org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:267)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
> at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)
> at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.Task.run(Task.scala:51)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:619)
>
> my spark version: 1.0.0
> Java: 1.7
> my codes:
>
> JavaRDD docVectors = generateDocVector(...);
> int numClusters = 20;
> int numIterations = 20;
> KMeansModel clusters = KMeans.train(docVectors.rdd(), numClusters,
> numIterations);
>
> another strange thing is that the mapPartitionsWithIndex() method call in
> generateDocVector() are invoked for 3 times...
>
> 2014-06-04
> 
> bluejoe2008


Re: How to stop a running SparkContext in the proper way?

2014-06-03 Thread Xiangrui Meng
Did you try sc.stop()?

On Tue, Jun 3, 2014 at 9:54 PM, MEETHU MATHEW  wrote:
> Hi,
>
> I want to know how I can stop a running SparkContext in a proper way so that
> next time when I start a new SparkContext, the web UI can be launched on the
> same port 4040.Now when i quit the job using ctrl+z the new sc are launched
> in new ports.
>
> I have the same problem with ipython notebook.It is launched on a different
> port when I start the notebook second time after closing the first one.I am
> starting ipython using the command
>
> IPYTHON_OPTS="notebook --ip  --pylab inline" ./bin/pyspark
>
> Thanks & Regards,
> Meethu M


Re: Using MLLib in Scala

2014-06-03 Thread Xiangrui Meng
Hi Suela,

(Please subscribe our user mailing list and send your questions there
in the future.) For your case, each file contains a column of numbers.
So you can use `sc.textFile` to read them first, zip them together,
and then create labeled points:

val xx = sc.textFile("/path/to/ex2x.dat").map(x => Vectors.dense(_.toDouble))
val yy = sc.textFile("/path/to/ex2y.dat").map(_.toDouble)
val examples = yy.zip(xx).map { case (y, x) => LabeledPoint(y, x) }

Best,
Xiangrui

On Thu, May 29, 2014 at 2:35 AM, Suela Haxhi  wrote:
>
> Hello Xiangrui ,
> my name is Suela Haxhi. Let me ask you a little help. I find some difficulty
> in uploading files in Mllib , namely:
> Binary Classification ;
> Linear Regression ;
> 
>
> E.g. , the file " mllib / data / sample_svm_data.txt " contains the
> following data :
> 1 0 2.52078447201548 0 0 0 2.004684436494304 2.000347299268466 0
> 2.228387042742021 2.228387042742023 0 0 0 0 0 0
> 0 2.857738033247042 0 0 2.619965104088255 0 2.004684436494304
> 2.000347299268466 0 2.228387042742021 2.228387042742023 0 0 0 0 0 0
>
> etc  ..
>
> I don't understand what are the input / output.
> The problem comes when I want to load another type of dataset. E.g. , I want
> to make a Binary Classification on the presence of a disease.
>
> For example, the estimated proffessor Andrew Ng, on courses in machine
> learning explains:
>
> Download ex2Data.zip, and extract the files from the zip file.The files
> Contain some example measurements of various heights for boys between the
> ages of two and eights. The y-values are the heights Measured in meters, and
> the x-values are the ages of the boys Corresponding to the heights. Each
> height and age tuples constitutes one training example $ (x ^ {(i)}, y ^
> {(i)} $ in our dataset. = There are $ m $ 50 training examples, and you will
> use them to develop a linear regression model .
> In this problem, you'll Implement linear regression using gradient descent.
> In Matlab / Octave, you can load the training set using the commands
> x = load ( ' ex2x.dat ' ) ;
> y = load ( ' ex2y.dat ' ) ;
>
>
>
> But,  in Mllib,  I can't figure out what these data mean (mllib / data /
> sample_svm_data.txt).
> And I don't know how to load another type of data set using the following
> code:
>
> Binary Classification
> import org.apache.spark.SparkContext
> import org.apache.spark.mllib.classification.SVMWithSGD
> import org.apache.spark.mllib.regression.LabeledPoint
>
> / / Load and parse the data file
>
> / / Run training algorithm to build the model
>
> / / Evaluate model on training examples and compute the training error
>
>
>
> Can you help me please? Thank you in advance.
>
> Best Regards
> Suela Haxhi


Re: Using String Dataset for Logistic Regression

2014-06-02 Thread Xiangrui Meng
Yes. MLlib 1.0 supports sparse input data for linear methods. -Xiangrui

On Mon, Jun 2, 2014 at 11:36 PM, praveshjain1991
 wrote:
> I am not sure. I have just been using some numerical datasets.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-String-Dataset-for-Logistic-Regression-tp5523p6784.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Create/shutdown objects before/after RDD use (or: Non-serializable classes)

2014-05-31 Thread Xiangrui Meng
Hi Tobias,

One hack you can try is:

rdd.mapPartitions(iter => {
  val x = new X()
  iter.map(row => x.doSomethingWith(row)) ++ { x.shutdown(); Iterator.empty }
})

Best,
Xiangrui

On Thu, May 29, 2014 at 11:38 PM, Tobias Pfeiffer  wrote:
> Hi,
>
> I want to use an object x in my RDD processing as follows:
>
> val x = new X()
> rdd.map(row => x.doSomethingWith(row))
> println(rdd.count())
> x.shutdown()
>
> Now the problem is that X is non-serializable, so while this works
> locally, it does not work in cluster setup. I thought I could do
>
> rdd.mapPartitions(iter => {
>   val x = new X()
>   val result = iter.map(row => x.doSomethingWith(row))
>   x.shutdown()
>   result
> })
>
> to create an instance of X locally, but obviously x.shutdown() is
> called before the first row is processed.
>
> How can I specify these node-local setup/teardown functions or how do
> I deal in general with non-serializable classes?
>
> Thanks
> Tobias


Re: pyspark MLlib examples don't work with Spark 1.0.0

2014-05-31 Thread Xiangrui Meng
The documentation you looked at is not official, though it is from
@pwendell's website. It was for the Spark SQL release. Please find the
official documentation here:

http://spark.apache.org/docs/latest/mllib-linear-methods.html#linear-support-vector-machine-svm

It contains a working example showing how to construct LabeledPoint
and use it for training.

Best,
Xiangrui

On Fri, May 30, 2014 at 5:10 AM, jamborta  wrote:
> thanks for the reply. I am definitely running 1.0.0, I set it up manually.
>
> To answer my question, I found out from the examples that it would need a
> new data type called LabeledPoint instead of numpy array.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-MLlib-examples-don-t-work-with-Spark-1-0-0-tp6546p6579.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: pyspark MLlib examples don't work with Spark 1.0.0

2014-05-29 Thread Xiangrui Meng
You are using ec2. Did you specify the spark version when you ran
spark-ec2 script or update /root/spark after the cluster was created?
It is very likely that you are running 0.9 on ec2. -Xiangrui

On Thu, May 29, 2014 at 5:22 PM, jamborta  wrote:
> Hi all,
>
> I wanted to try spark 1.0.0, because of the new SQL component. I have cloned
> and built the latest from git. But the examples described here do not work
> anymore:
>
> http://people.apache.org/~pwendell/catalyst-docs/mllib-classification-regression.html#binary-classification-2
>
> I get the following error:
>
> /home/ec2-user/spark/python/pyspark/mllib/_common.pyc in
> _get_initial_weights(initial_weights, data)
> 313 def _get_initial_weights(initial_weights, data):
> 314 if initial_weights is None:
> --> 315 initial_weights = _convert_vector(data.first().features)
> 316 if type(initial_weights) == ndarray:
> 317 if initial_weights.ndim != 1:
>
> AttributeError: 'numpy.ndarray' object has no attribute 'features'
>
> not sure what type is intended as an input, any help would be appreciated.
>
> thanks,
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-MLlib-examples-don-t-work-with-Spark-1-0-0-tp6546.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Error while launching ec2 spark cluster with HVM (r3.large)

2014-05-22 Thread Xiangrui Meng
Was the error message the same as you posted when you used `root` as
the user id? Could you try this:

1) Do not specify user id. (Default would be `root`.)
2) If it fails in the middle, try `spark-ec2  --resume launch
` to continue launching the cluster.

Best,
Xiangrui

On Thu, May 22, 2014 at 12:44 PM, adparker  wrote:
> I had this problem too and fixed it by setting the wait time-out to a larger
> value: --wait
>
> For example, in "stand alone" mode with default values, a time out of 480
> seconds worked for me:
>
> $ cd spark-0.9.1/ec2
> $ ./spark-ec2 --key-pair= --identity-file= --instance-type=r3.large
> --wait=480  launch 
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-launching-ec2-spark-cluster-with-HVM-r3-large-tp5862p6276.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Inconsistent RDD Sample size

2014-05-21 Thread Xiangrui Meng
It doesn't guarantee the exact sample size. If you fix the random
seed, it would return the same result every time. -Xiangrui

On Wed, May 21, 2014 at 2:05 PM, glxc  wrote:
> I have a graph and am trying to take a random sample of vertices without
> replacement, using the RDD.sample() method
>
> verts are the vertices in the graph
>
>>  val verts = graph.vertices
>
> and executing this multiple times in a row
>
>>  verts.sample(false, 1.toDouble/v1.count.toDouble,
>> System.currentTimeMillis).count
>
> yields different results roughly each time (albeit +/- a small % of the
> target)
>
> why does this happen? Looked at PartionwiseSampledRDD but can't figure it
> out
>
> Also, is there another method/technique to yield the same result each time?
> My understanding is that grabbing random indices may not be the best use of
> the RDD model
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Inconsistent-RDD-Sample-size-tp6197.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Job Processing Large Data Set Got Stuck

2014-05-21 Thread Xiangrui Meng
If the RDD is cached, you can check its storage information in the
Storage tab of the Web UI.

On Wed, May 21, 2014 at 12:31 PM, yxzhao  wrote:
> Thanks Xiangrui, How to check and make sure the data is distributed
> evenly? Thanks again.
> On Wed, May 21, 2014 at 2:17 PM, Xiangrui Meng [via Apache Spark User
> List] <[hidden email]> wrote:
>
>> Many OutOfMemoryErrors in the log. Is your data distributed evenly?
>> -Xiangrui
>>
>> On Wed, May 21, 2014 at 11:23 AM, yxzhao <[hidden email]> wrote:
>>
>>> I run the pagerank example processing a large data set, 5GB in size,
>>> using
>>> 48
>>> machines. The job got stuck at the time point: 14/05/20 21:32:17, as the
>>> attached log shows. It was stuck there for more than 10 hours and then I
>>> killed it at last. But I did not find any information explaining why it
>>> was
>>> stuck. Any suggestions? Thanks.
>>>
>>>
>>> Spark_OK_48_pagerank.log
>>>
>>>
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/file/n6185/Spark_OK_48_pagerank.log>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Job-Processing-Large-Data-Set-Got-Stuck-tp6185.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>>
>> 
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Job-Processing-Large-Data-Set-Got-Stuck-tp6185p6187.html
>> To unsubscribe from Job Processing Large Data Set Got Stuck, click here.
>> NAML
>
> 
> View this message in context: Re: Job Processing Large Data Set Got Stuck
>
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Job Processing Large Data Set Got Stuck

2014-05-21 Thread Xiangrui Meng
Many OutOfMemoryErrors in the log. Is your data distributed evenly? -Xiangrui

On Wed, May 21, 2014 at 11:23 AM, yxzhao  wrote:
> I run the pagerank example processing a large data set, 5GB in size, using 48
> machines. The job got stuck at the time point: 14/05/20 21:32:17, as the
> attached log shows. It was stuck there for more than 10 hours and then I
> killed it at last. But I did not find any information explaining why it was
> stuck. Any suggestions? Thanks.
>
>
> Spark_OK_48_pagerank.log
> 
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Job-Processing-Large-Data-Set-Got-Stuck-tp6185.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: reading large XML files

2014-05-20 Thread Xiangrui Meng
You can search for XMLInputFormat on Google. There are some
implementations that allow you to specify the  to split on, e.g.:
https://github.com/lintool/Cloud9/blob/master/src/dist/edu/umd/cloud9/collection/XMLInputFormat.java

On Tue, May 20, 2014 at 10:31 AM, Nathan Kronenfeld
 wrote:
> Unfortunately, I don't have a bunch of moderately big xml files; I have one,
> really big file - big enough that reading it into memory as a single string
> is not feasible.
>
>
> On Tue, May 20, 2014 at 1:24 PM, Xiangrui Meng  wrote:
>>
>> Try sc.wholeTextFiles(). It reads the entire file into a string
>> record. -Xiangrui
>>
>> On Tue, May 20, 2014 at 8:25 AM, Nathan Kronenfeld
>>  wrote:
>> > We are trying to read some large GraphML files to use in spark.
>> >
>> > Is there an easy way to read XML-based files like this that accounts for
>> > partition boundaries and the like?
>> >
>> >  Thanks,
>> >  Nathan
>> >
>> >
>> > --
>> > Nathan Kronenfeld
>> > Senior Visualization Developer
>> > Oculus Info Inc
>> > 2 Berkeley Street, Suite 600,
>> > Toronto, Ontario M5A 4J5
>> > Phone:  +1-416-203-3003 x 238
>> > Email:  nkronenf...@oculusinfo.com
>
>
>
>
> --
> Nathan Kronenfeld
> Senior Visualization Developer
> Oculus Info Inc
> 2 Berkeley Street, Suite 600,
> Toronto, Ontario M5A 4J5
> Phone:  +1-416-203-3003 x 238
> Email:  nkronenf...@oculusinfo.com


Re: reading large XML files

2014-05-20 Thread Xiangrui Meng
Try sc.wholeTextFiles(). It reads the entire file into a string
record. -Xiangrui

On Tue, May 20, 2014 at 8:25 AM, Nathan Kronenfeld
 wrote:
> We are trying to read some large GraphML files to use in spark.
>
> Is there an easy way to read XML-based files like this that accounts for
> partition boundaries and the like?
>
>  Thanks,
>  Nathan
>
>
> --
> Nathan Kronenfeld
> Senior Visualization Developer
> Oculus Info Inc
> 2 Berkeley Street, Suite 600,
> Toronto, Ontario M5A 4J5
> Phone:  +1-416-203-3003 x 238
> Email:  nkronenf...@oculusinfo.com


Re: filling missing values in a sequence

2014-05-19 Thread Xiangrui Meng
Actually there is a sliding method implemented in
mllib.rdd.RDDFunctions. Since this is not for general use cases, we
didn't include it in spark-core. You can take a look at the
implementation there and see whether it fits. -Xiangrui

On Mon, May 19, 2014 at 10:06 PM, Mohit Jaggi  wrote:
> Thanks Sean. Yes, your solution works :-) I did oversimplify my real
> problem, which has other parameters that go along with the sequence.
>
>
> On Fri, May 16, 2014 at 3:03 AM, Sean Owen  wrote:
>>
>> Not sure if this is feasible, but this literally does what I think you
>> are describing:
>>
>> sc.parallelize(rdd1.first to rdd1.last)
>>
>> On Tue, May 13, 2014 at 4:56 PM, Mohit Jaggi  wrote:
>> > Hi,
>> > I am trying to find a way to fill in missing values in an RDD. The RDD
>> > is a
>> > sorted sequence.
>> > For example, (1, 2, 3, 5, 8, 11, ...)
>> > I need to fill in the missing numbers and get (1,2,3,4,5,6,7,8,9,10,11)
>> >
>> > One way to do this is to "slide and zip"
>> > rdd1 =  sc.parallelize(List(1, 2, 3, 5, 8, 11, ...))
>> > x = rdd1.first
>> > rdd2 = rdd1 filter (_ != x)
>> > rdd3 = rdd2 zip rdd1
>> > rdd4 = rdd3 flatmap { (x, y) => generate missing elements between x and
>> > y }
>> >
>> > Another method which I think is more efficient is to use
>> > mapParititions() on
>> > rdd1 to be able to iterate on elements of rdd1 in each partition.
>> > However,
>> > that leaves the boundaries of the partitions to be "unfilled". Is there
>> > a
>> > way within the function passed to mapPartitions, to read the first
>> > element
>> > in the next partition?
>> >
>> > The latter approach also appears to work for a general "sliding window"
>> > calculation on the RDD. The former technique requires a lot of "sliding
>> > and
>> > zipping" and I believe it is not efficient. If only I could read the
>> > next
>> > partition...I have tried passing a pointer to rdd1 to the function
>> > passed to
>> > mapPartitions but the rdd1 pointer turns out to be NULL, I guess because
>> > Spark cannot deal with a mapper calling another mapper (since it happens
>> > on
>> > a worker not the driver)
>> >
>> > Mohit.
>
>


Re: How to run the SVM and LogisticRegression

2014-05-19 Thread Xiangrui Meng
Checkout the master or branch-1.0. Then the examples should be there. -Xiangrui

On Mon, May 19, 2014 at 11:36 AM, yxzhao  wrote:
> Thanks Xiangrui,
>
> But I did not find the directory:
> examples/src/main/scala/org/apache/spark/examples/mllib.
> Could you give me more detail or show me one example? Thanks a lot.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-run-the-SVM-and-LogisticRegression-tp5720p6049.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: breeze DGEMM slow in spark

2014-05-18 Thread Xiangrui Meng
The classpath seems to be correct. Where did you link libopenblas*.so
to? The safest approach is to rename it to /usr/lib/libblas.so.3 and
/usr/lib/liblapack.so.3 . This is the way I made it work. -Xiangrui

On Sun, May 18, 2014 at 4:49 PM, wxhsdp  wrote:
> ok
>
> Spark Executor Command: "java" "-cp"
> ":/root/ephemeral-hdfs/conf:/root/.ivy2/cache/org.scala-lang/scala-library/jars/scala-library-2.10.4.jar:/root/.ivy2/cache/org.scalanlp/breeze_2.10/jars/breeze_2.10-0.7.jar:/root/.ivy2/cache/org.scalanlp/breeze-macros_2.10/jars/breeze-macros_2.10-0.3.jar:/root/.sbt/boot/scala-2.10.3/lib/scala-reflect.jar:/root/.ivy2/cache/com.thoughtworks.paranamer/paranamer/jars/paranamer-2.2.jar:/root/.ivy2/cache/com.github.fommil.netlib/core/jars/core-1.1.2.jar:/root/.ivy2/cache/net.sourceforge.f2j/arpack_combined_all/jars/arpack_combined_all-0.1.jar:/root/.ivy2/cache/net.sourceforge.f2j/arpack_combined_all/jars/arpack_combined_all-0.1-javadoc.jar:/root/.ivy2/cache/net.sf.opencsv/opencsv/jars/opencsv-2.3.jar:/root/.ivy2/cache/com.github.rwl/jtransforms/jars/jtransforms-2.4.0.jar:/root/.ivy2/cache/junit/junit/jars/junit-4.8.2.jar:/root/.ivy2/cache/org.apache.commons/commons-math3/jars/commons-math3-3.2.jar:/root/.ivy2/cache/org.spire-math/spire_2.10/jars/spire_2.10-0.7.1.jar:/root/.ivy2/cache/org.spire-math/spire-macros_2.10/jars/spire-macros_2.10-0.7.1.jar:/root/.ivy2/cache/com.typesafe/scalalogging-slf4j_2.10/jars/scalalogging-slf4j_2.10-1.0.1.jar:/root/.ivy2/cache/org.slf4j/slf4j-api/jars/slf4j-api-1.7.2.jar:/root/.ivy2/cache/org.scalanlp/breeze-natives_2.10/jars/breeze-natives_2.10-0.7.jar:/root/.ivy2/cache/com.github.fommil.netlib/netlib-native_ref-osx-x86_64/jars/netlib-native_ref-osx-x86_64-1.1-natives.jar:/root/.ivy2/cache/com.github.fommil.netlib/native_ref-java/jars/native_ref-java-1.1.jar:/root/.ivy2/cache/com.github.fommil/jniloader/jars/jniloader-1.1.jar:/root/.ivy2/cache/com.github.fommil.netlib/netlib-native_ref-linux-x86_64/jars/netlib-native_ref-linux-x86_64-1.1-natives.jar:/root/.ivy2/cache/com.github.fommil.netlib/netlib-native_ref-linux-i686/jars/netlib-native_ref-linux-i686-1.1-natives.jar:/root/.ivy2/cache/com.github.fommil.netlib/netlib-native_ref-win-x86_64/jars/netlib-native_ref-win-x86_64-1.1-natives.jar:/root/.ivy2/cache/com.github.fommil.netlib/netlib-native_ref-win-i686/jars/netlib-native_ref-win-i686-1.1-natives.jar:/root/.ivy2/cache/com.github.fommil.netlib/netlib-native_ref-linux-armhf/jars/netlib-native_ref-linux-armhf-1.1-natives.jar:/root/.ivy2/cache/com.github.fommil.netlib/netlib-native_system-osx-x86_64/jars/netlib-native_system-osx-x86_64-1.1-natives.jar:/root/.ivy2/cache/com.github.fommil.netlib/native_system-java/jars/native_system-java-1.1.jar:/root/.ivy2/cache/com.github.fommil.netlib/netlib-native_system-linux-x86_64/jars/netlib-native_system-linux-x86_64-1.1-natives.jar:/root/.ivy2/cache/com.github.fommil.netlib/netlib-native_system-linux-i686/jars/netlib-native_system-linux-i686-1.1-natives.jar:/root/.ivy2/cache/com.github.fommil.netlib/netlib-native_system-linux-armhf/jars/netlib-native_system-linux-armhf-1.1-natives.jar:/root/.ivy2/cache/com.github.fommil.netlib/netlib-native_system-win-x86_64/jars/netlib-native_system-win-x86_64-1.1-natives.jar:/root/.ivy2/cache/com.github.fommil.netlib/netlib-native_system-win-i686/jars/netlib-native_system-win-i686-1.1-natives.jar
> ::/root/spark/conf:/root/spark/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar"
> "-Xms4096M" "-Xmx4096M"
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/breeze-DGEMM-slow-in-spark-tp5950p5994.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: breeze DGEMM slow in spark

2014-05-18 Thread Xiangrui Meng
Can you attach the slave classpath? -Xiangrui

On Sun, May 18, 2014 at 2:02 AM, wxhsdp  wrote:
> Hi, xiangrui
>
>   you said "It doesn't work if you put the netlib-native jar inside an
> assembly
>   jar. Try to mark it "provided" in the dependencies, and use --jars to
>   include them with spark-submit. -Xiangrui"
>
>   i'am not use an assembly jar which contains every thing, i also mark
> breeze dependencies
>   provided, and manually download the jars and add them to slave classpath.
> but doesn't work:(
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/breeze-DGEMM-slow-in-spark-tp5950p5979.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: breeze DGEMM slow in spark

2014-05-17 Thread Xiangrui Meng
You need to include breeze-natives or netlib:all to load the native
libraries. Check the log messages to ensure native libraries are used,
especially on the worker nodes. The easiest way to use OpenBLAS is
copying the shared library to /usr/lib/libblas.so.3 and
/usr/lib/liblapack.so.3. -Xiangrui

On Sat, May 17, 2014 at 8:02 PM, wxhsdp  wrote:
> i think maybe it's related to m1.large, because i also tested on my laptop,
> the two case cost nearly
> the same amount of time.
>
> my laptop:
> model name  : Intel(R) Core(TM) i5-3380M CPU @ 2.90GHz
> cpu MHz : 2893.549
>
> os:
> Linux ubuntu 3.11.0-12-generic #19-Ubuntu SMP Wed Oct 9 16:20:46 UTC 2013
> x86_64 x86_64 x86_64 GNU/Linux
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/breeze-DGEMM-slow-in-spark-tp5950p5971.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Reading from .bz2 files with Spark

2014-05-16 Thread Xiangrui Meng
Hi Andrew,

I submitted a patch and verified it solves the problem. You can
download the patch from
https://issues.apache.org/jira/browse/HADOOP-10614 .

Best,
Xiangrui

On Fri, May 16, 2014 at 6:48 PM, Xiangrui Meng  wrote:
> Hi Andrew,
>
> This is the JIRA I created:
> https://issues.apache.org/jira/browse/MAPREDUCE-5893 . Hopefully
> someone wants to work on it.
>
> Best,
> Xiangrui
>
> On Fri, May 16, 2014 at 6:47 PM, Xiangrui Meng  wrote:
>> Hi Andre,
>>
>> I could reproduce the bug with Hadoop 2.2.0. Some older version of
>> Hadoop do not support splittable compression, so you ended up with
>> sequential reads. It is easy to reproduce the bug with the following
>> setup:
>>
>> 1) Workers are configured with multiple cores.
>> 2) BZip2 files are big enough or minPartitions is large enough when
>> you load the file via sc.textFile(), so that one worker has more than
>> one tasks.
>>
>> Best,
>> Xiangrui
>>
>> On Fri, May 16, 2014 at 4:06 PM, Andrew Ash  wrote:
>>> Hi Xiangrui,
>>>
>>> // FYI I'm getting your emails late due to the Apache mailing list outage
>>>
>>> I'm using CDH4.4.0, which I think uses the MapReduce v2 API.  The .jars are
>>> named like this: hadoop-hdfs-2.0.0-cdh4.4.0.jar
>>>
>>> I'm also glad you were able to reproduce!  Please paste a link to the Hadoop
>>> bug you file so I can follow along.
>>>
>>> Thanks!
>>> Andrew
>>>
>>>
>>> On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng  wrote:
>>>>
>>>> Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
>>>> the problem you described, but it does contain several fixes to bzip2
>>>> format. -Xiangrui
>>>>
>>>> On Wed, May 7, 2014 at 9:19 PM, Andrew Ash  wrote:
>>>> > Hi all,
>>>> >
>>>> > Is anyone reading and writing to .bz2 files stored in HDFS from Spark
>>>> > with
>>>> > success?
>>>> >
>>>> >
>>>> > I'm finding the following results on a recent commit (756c96 from 24hr
>>>> > ago)
>>>> > and CDH 4.4.0:
>>>> >
>>>> > Works: val r = sc.textFile("/user/aa/myfile.bz2").count
>>>> > Doesn't work: val r = sc.textFile("/user/aa/myfile.bz2").map((s:String)
>>>> > =>
>>>> > s+"| " ).count
>>>> >
>>>> > Specifically, I'm getting an exception coming out of the bzip2 libraries
>>>> > (see below stacktraces), which is unusual because I'm able to read from
>>>> > that
>>>> > file without an issue using the same libraries via Pig.  It was
>>>> > originally
>>>> > created from Pig as well.
>>>> >
>>>> > Digging a little deeper I found this line in the .bz2 decompressor's
>>>> > javadoc
>>>> > for CBZip2InputStream:
>>>> >
>>>> > "Instances of this class are not threadsafe." [source]
>>>> >
>>>> >
>>>> > My current working theory is that Spark has a much higher level of
>>>> > parallelism than Pig/Hadoop does and thus I get these wild
>>>> > IndexOutOfBounds
>>>> > exceptions much more frequently (as in can't finish a run over a little
>>>> > 2M
>>>> > row file) vs hardly at all in other libraries.
>>>> >
>>>> > The only other reference I could find to the issue was in presto-users,
>>>> > but
>>>> > the recommendation to leave .bz2 for .lzo doesn't help if I actually do
>>>> > want
>>>> > the higher compression levels of .bz2.
>>>> >
>>>> >
>>>> > Would love to hear if I have some kind of configuration issue or if
>>>> > there's
>>>> > a bug in .bz2 that's fixed in later versions of CDH, or generally any
>>>> > other
>>>> > thoughts on the issue.
>>>> >
>>>> >
>>>> > Thanks!
>>>> > Andrew
>>>> >
>>>> >
>>>> >
>>>> > Below are examples of some exceptions I'm getting:
>>>> >
>>>> > 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
>>>> > java.lang.ArrayIndexOutOfBoundsException
>>>>

Re: Reading from .bz2 files with Spark

2014-05-16 Thread Xiangrui Meng
Hi Andre,

I could reproduce the bug with Hadoop 2.2.0. Some older version of
Hadoop do not support splittable compression, so you ended up with
sequential reads. It is easy to reproduce the bug with the following
setup:

1) Workers are configured with multiple cores.
2) BZip2 files are big enough or minPartitions is large enough when
you load the file via sc.textFile(), so that one worker has more than
one tasks.

Best,
Xiangrui

On Fri, May 16, 2014 at 4:06 PM, Andrew Ash  wrote:
> Hi Xiangrui,
>
> // FYI I'm getting your emails late due to the Apache mailing list outage
>
> I'm using CDH4.4.0, which I think uses the MapReduce v2 API.  The .jars are
> named like this: hadoop-hdfs-2.0.0-cdh4.4.0.jar
>
> I'm also glad you were able to reproduce!  Please paste a link to the Hadoop
> bug you file so I can follow along.
>
> Thanks!
> Andrew
>
>
> On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng  wrote:
>>
>> Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
>> the problem you described, but it does contain several fixes to bzip2
>> format. -Xiangrui
>>
>> On Wed, May 7, 2014 at 9:19 PM, Andrew Ash  wrote:
>> > Hi all,
>> >
>> > Is anyone reading and writing to .bz2 files stored in HDFS from Spark
>> > with
>> > success?
>> >
>> >
>> > I'm finding the following results on a recent commit (756c96 from 24hr
>> > ago)
>> > and CDH 4.4.0:
>> >
>> > Works: val r = sc.textFile("/user/aa/myfile.bz2").count
>> > Doesn't work: val r = sc.textFile("/user/aa/myfile.bz2").map((s:String)
>> > =>
>> > s+"| " ).count
>> >
>> > Specifically, I'm getting an exception coming out of the bzip2 libraries
>> > (see below stacktraces), which is unusual because I'm able to read from
>> > that
>> > file without an issue using the same libraries via Pig.  It was
>> > originally
>> > created from Pig as well.
>> >
>> > Digging a little deeper I found this line in the .bz2 decompressor's
>> > javadoc
>> > for CBZip2InputStream:
>> >
>> > "Instances of this class are not threadsafe." [source]
>> >
>> >
>> > My current working theory is that Spark has a much higher level of
>> > parallelism than Pig/Hadoop does and thus I get these wild
>> > IndexOutOfBounds
>> > exceptions much more frequently (as in can't finish a run over a little
>> > 2M
>> > row file) vs hardly at all in other libraries.
>> >
>> > The only other reference I could find to the issue was in presto-users,
>> > but
>> > the recommendation to leave .bz2 for .lzo doesn't help if I actually do
>> > want
>> > the higher compression levels of .bz2.
>> >
>> >
>> > Would love to hear if I have some kind of configuration issue or if
>> > there's
>> > a bug in .bz2 that's fixed in later versions of CDH, or generally any
>> > other
>> > thoughts on the issue.
>> >
>> >
>> > Thanks!
>> > Andrew
>> >
>> >
>> >
>> > Below are examples of some exceptions I'm getting:
>> >
>> > 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
>> > java.lang.ArrayIndexOutOfBoundsException
>> > java.lang.ArrayIndexOutOfBoundsException: 65535
>> > at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
>> > at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
>> > at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
>> > at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
>> > at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
>> > at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
>> > at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
>> > at
>> >
>> > org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:

Re: NoSuchMethodError: breeze.linalg.DenseMatrix

2014-05-16 Thread Xiangrui Meng
It doesn't work if you put the netlib-native jar inside an assembly
jar. Try to mark it "provided" in the dependencies, and use --jars to
include them with spark-submit. -Xiangrui

On Wed, May 14, 2014 at 6:12 PM, wxhsdp  wrote:
> Hi, DB
>
>   i tried including breeze library by using spark 1.0, it works. but how can
> i call
>   the native library in standalone cluster mode.
>
>   in local mode
>   1. i include "org.scalanlp" % "breeze-natives_2.10" % "0.7" dependency in
> sbt build file
>   2. i install openblas
>   it works
>
>   in standalone mode
>   1. i include "org.scalanlp" % "breeze-natives_2.10" % "0.7" dependency in
> sbt build file
>   2. install openblas in workers
>   3. add sepatate jars using sc.addJar(). jars: breeze-natives_2.10-0.7.jar,
> netlib-native_ref-linux-
>   x86_64-1.1-natives.jar,
> netlib-native_system-linux-x86_64-1.1-natives.jar
>   4. i also include classpath of the above jars
>   but does not work:(
>
>
>
> DB Tsai-2 wrote
>> Hi Wxhsdp,
>>
>> I also have some difficulties witth "sc.addJar()". Since we include the
>> breeze library by using Spark 1.0, we don't have the problem you ran into.
>> However, when we add external jars via sc.addJar(), I found that the
>> executors actually fetch the jars but the classloader still doesn't honor
>> it. I'm trying to figure out the problem now.
>>
>>
>> Sincerely,
>>
>> DB Tsai
>> ---
>> My Blog: https://www.dbtsai.com
>> LinkedIn: https://www.linkedin.com/in/dbtsai
>>
>>
>> On Wed, May 14, 2014 at 5:46 AM, wxhsdp <
>
>> wxhsdp@
>
>> > wrote:
>>
>>> Hi, DB
>>>   i've add breeze jars to workers using sc.addJar()
>>>   breeze jars include :
>>>   breeze-natives_2.10-0.7.jar
>>>   breeze-macros_2.10-0.3.jar
>>>   breeze-macros_2.10-0.3.1.jar
>>>   breeze_2.10-0.8-SNAPSHOT.jar
>>>   breeze_2.10-0.7.jar
>>>
>>>   almost all the jars about breeze i can find, but still
>>> NoSuchMethodError:
>>> breeze.linalg.DenseMatrix
>>>
>>>   from the executor stderr, you can see the executor successsully fetches
>>> these jars, what's wrong
>>>   about my method? thank you!
>>>
>>> 14/05/14 20:36:02 INFO Executor: Fetching
>>> http://192.168.0.106:42883/jars/breeze-natives_2.10-0.7.jar with
>>> timestamp
>>> 1400070957376
>>> 14/05/14 20:36:02 INFO Utils: Fetching
>>> http://192.168.0.106:42883/jars/breeze-natives_2.10-0.7.jar to
>>> /tmp/fetchFileTemp7468892065227766972.tmp
>>> 14/05/14 20:36:02 INFO Executor: Adding
>>>
>>> file:/home/wxhsdp/spark/spark/tags/v1.0.0-rc3/work/app-20140514203557-/0/./breeze-natives_2.10-0.7.jar
>>> to class loader
>>> 14/05/14 20:36:02 INFO Executor: Fetching
>>> http://192.168.0.106:42883/jars/breeze-macros_2.10-0.3.jar with timestamp
>>> 1400070957441
>>> 14/05/14 20:36:02 INFO Utils: Fetching
>>> http://192.168.0.106:42883/jars/breeze-macros_2.10-0.3.jar to
>>> /tmp/fetchFileTemp2324565598765584917.tmp
>>> 14/05/14 20:36:02 INFO Executor: Adding
>>>
>>> file:/home/wxhsdp/spark/spark/tags/v1.0.0-rc3/work/app-20140514203557-/0/./breeze-macros_2.10-0.3.jar
>>> to class loader
>>> 14/05/14 20:36:02 INFO Executor: Fetching
>>> http://192.168.0.106:42883/jars/breeze_2.10-0.8-SNAPSHOT.jar with
>>> timestamp
>>> 1400070957358
>>> 14/05/14 20:36:02 INFO Utils: Fetching
>>> http://192.168.0.106:42883/jars/breeze_2.10-0.8-SNAPSHOT.jar to
>>> /tmp/fetchFileTemp8730123100104850193.tmp
>>> 14/05/14 20:36:02 INFO Executor: Adding
>>>
>>> file:/home/wxhsdp/spark/spark/tags/v1.0.0-rc3/work/app-20140514203557-/0/./breeze_2.10-0.8-SNAPSHOT.jar
>>> to class loader
>>> 14/05/14 20:36:02 INFO Executor: Fetching
>>> http://192.168.0.106:42883/jars/breeze-macros_2.10-0.3.1.jar with
>>> timestamp
>>> 1400070957414
>>> 14/05/14 20:36:02 INFO Utils: Fetching
>>> http://192.168.0.106:42883/jars/breeze-macros_2.10-0.3.1.jar to
>>> /tmp/fetchFileTemp3473404556989515218.tmp
>>> 14/05/14 20:36:02 INFO Executor: Adding
>>>
>>> file:/home/wxhsdp/spark/spark/tags/v1.0.0-rc3/work/app-20140514203557-/0/./breeze-macros_2.10-0.3.1.jar
>>> to class loader
>>> 14/05/14 20:36:02 INFO Executor: Fetching
>>> http://192.168.0.106:42883/jars/build-project_2.10-1.0.jar with timestamp
>>> 1400070956753
>>> 14/05/14 20:36:02 INFO Utils: Fetching
>>> http://192.168.0.106:42883/jars/build-project_2.10-1.0.jar to
>>> /tmp/fetchFileTemp1289055585501269156.tmp
>>> 14/05/14 20:36:02 INFO Executor: Adding
>>>
>>> file:/home/wxhsdp/spark/spark/tags/v1.0.0-rc3/work/app-20140514203557-/0/./build-project_2.10-1.0.jar
>>> to class loader
>>> 14/05/14 20:36:02 INFO Executor: Fetching
>>> http://192.168.0.106:42883/jars/breeze_2.10-0.7.jar with timestamp
>>> 1400070957228
>>> 14/05/14 20:36:02 INFO Utils: Fetching
>>> http://192.168.0.106:42883/jars/breeze_2.10-0.7.jar to
>>> /tmp/fetchFileTemp1287317286108432726.tmp
>>> 14/05/14 20:36:02 INFO Executor: Adding
>>>
>>> file:/home/wxhsdp/spark/spark/tags/v1.0.0-rc3/work/app-20140514203557-/0/./breeze_2.10-0.7.jar
>>> to class loader
>>>
>>>
>>> DB Ts

Re: Reading from .bz2 files with Spark

2014-05-16 Thread Xiangrui Meng
Hi Andrew,

This is the JIRA I created:
https://issues.apache.org/jira/browse/MAPREDUCE-5893 . Hopefully
someone wants to work on it.

Best,
Xiangrui

On Fri, May 16, 2014 at 6:47 PM, Xiangrui Meng  wrote:
> Hi Andre,
>
> I could reproduce the bug with Hadoop 2.2.0. Some older version of
> Hadoop do not support splittable compression, so you ended up with
> sequential reads. It is easy to reproduce the bug with the following
> setup:
>
> 1) Workers are configured with multiple cores.
> 2) BZip2 files are big enough or minPartitions is large enough when
> you load the file via sc.textFile(), so that one worker has more than
> one tasks.
>
> Best,
> Xiangrui
>
> On Fri, May 16, 2014 at 4:06 PM, Andrew Ash  wrote:
>> Hi Xiangrui,
>>
>> // FYI I'm getting your emails late due to the Apache mailing list outage
>>
>> I'm using CDH4.4.0, which I think uses the MapReduce v2 API.  The .jars are
>> named like this: hadoop-hdfs-2.0.0-cdh4.4.0.jar
>>
>> I'm also glad you were able to reproduce!  Please paste a link to the Hadoop
>> bug you file so I can follow along.
>>
>> Thanks!
>> Andrew
>>
>>
>> On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng  wrote:
>>>
>>> Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
>>> the problem you described, but it does contain several fixes to bzip2
>>> format. -Xiangrui
>>>
>>> On Wed, May 7, 2014 at 9:19 PM, Andrew Ash  wrote:
>>> > Hi all,
>>> >
>>> > Is anyone reading and writing to .bz2 files stored in HDFS from Spark
>>> > with
>>> > success?
>>> >
>>> >
>>> > I'm finding the following results on a recent commit (756c96 from 24hr
>>> > ago)
>>> > and CDH 4.4.0:
>>> >
>>> > Works: val r = sc.textFile("/user/aa/myfile.bz2").count
>>> > Doesn't work: val r = sc.textFile("/user/aa/myfile.bz2").map((s:String)
>>> > =>
>>> > s+"| " ).count
>>> >
>>> > Specifically, I'm getting an exception coming out of the bzip2 libraries
>>> > (see below stacktraces), which is unusual because I'm able to read from
>>> > that
>>> > file without an issue using the same libraries via Pig.  It was
>>> > originally
>>> > created from Pig as well.
>>> >
>>> > Digging a little deeper I found this line in the .bz2 decompressor's
>>> > javadoc
>>> > for CBZip2InputStream:
>>> >
>>> > "Instances of this class are not threadsafe." [source]
>>> >
>>> >
>>> > My current working theory is that Spark has a much higher level of
>>> > parallelism than Pig/Hadoop does and thus I get these wild
>>> > IndexOutOfBounds
>>> > exceptions much more frequently (as in can't finish a run over a little
>>> > 2M
>>> > row file) vs hardly at all in other libraries.
>>> >
>>> > The only other reference I could find to the issue was in presto-users,
>>> > but
>>> > the recommendation to leave .bz2 for .lzo doesn't help if I actually do
>>> > want
>>> > the higher compression levels of .bz2.
>>> >
>>> >
>>> > Would love to hear if I have some kind of configuration issue or if
>>> > there's
>>> > a bug in .bz2 that's fixed in later versions of CDH, or generally any
>>> > other
>>> > thoughts on the issue.
>>> >
>>> >
>>> > Thanks!
>>> > Andrew
>>> >
>>> >
>>> >
>>> > Below are examples of some exceptions I'm getting:
>>> >
>>> > 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
>>> > java.lang.ArrayIndexOutOfBoundsException
>>> > java.lang.ArrayIndexOutOfBoundsException: 65535
>>> > at
>>> >
>>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
>>> > at
>>> >
>>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
>>> > at
>>> >
>>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
>>> > at
>>> >
>>> > org.apache.

Re: Reading from .bz2 files with Spark

2014-05-16 Thread Xiangrui Meng
Hi Andrew,

I verified that this is due to thread safety. I changed
SPARK_WORKER_CORES to 1 in spark-env.sh, so there is only 1 thread per
worker. Then I can load the file without any problem with different
values of minPartitions. I will submit a JIRA to both Spark and
Hadoop.

Best,
Xiangrui

On Thu, May 15, 2014 at 3:48 PM, Xiangrui Meng  wrote:
> Hi Andrew,
>
> Could you try varying the minPartitions parameter? For example:
>
> val r = sc.textFile("/user/aa/myfile.bz2", 4).count
> val r = sc.textFile("/user/aa/myfile.bz2", 8).count
>
> Best,
> Xiangrui
>
> On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng  wrote:
>> Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
>> the problem you described, but it does contain several fixes to bzip2
>> format. -Xiangrui
>>
>> On Wed, May 7, 2014 at 9:19 PM, Andrew Ash  wrote:
>>> Hi all,
>>>
>>> Is anyone reading and writing to .bz2 files stored in HDFS from Spark with
>>> success?
>>>
>>>
>>> I'm finding the following results on a recent commit (756c96 from 24hr ago)
>>> and CDH 4.4.0:
>>>
>>> Works: val r = sc.textFile("/user/aa/myfile.bz2").count
>>> Doesn't work: val r = sc.textFile("/user/aa/myfile.bz2").map((s:String) =>
>>> s+"| " ).count
>>>
>>> Specifically, I'm getting an exception coming out of the bzip2 libraries
>>> (see below stacktraces), which is unusual because I'm able to read from that
>>> file without an issue using the same libraries via Pig.  It was originally
>>> created from Pig as well.
>>>
>>> Digging a little deeper I found this line in the .bz2 decompressor's javadoc
>>> for CBZip2InputStream:
>>>
>>> "Instances of this class are not threadsafe." [source]
>>>
>>>
>>> My current working theory is that Spark has a much higher level of
>>> parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds
>>> exceptions much more frequently (as in can't finish a run over a little 2M
>>> row file) vs hardly at all in other libraries.
>>>
>>> The only other reference I could find to the issue was in presto-users, but
>>> the recommendation to leave .bz2 for .lzo doesn't help if I actually do want
>>> the higher compression levels of .bz2.
>>>
>>>
>>> Would love to hear if I have some kind of configuration issue or if there's
>>> a bug in .bz2 that's fixed in later versions of CDH, or generally any other
>>> thoughts on the issue.
>>>
>>>
>>> Thanks!
>>> Andrew
>>>
>>>
>>>
>>> Below are examples of some exceptions I'm getting:
>>>
>>> 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
>>> java.lang.ArrayIndexOutOfBoundsException
>>> java.lang.ArrayIndexOutOfBoundsException: 65535
>>> at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
>>> at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
>>> at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
>>> at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
>>> at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
>>> at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
>>> at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
>>> at
>>> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
>>> at java.io.InputStream.read(InputStream.java:101)
>>> at
>>> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
>>> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
>>> at
>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
>>> at
>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
>>>
>>>
>>>
>>>
>>> java.lang.ArrayIndexOutOfBoundsException: 90
>>> at
>>> or

Re: Understanding epsilon in KMeans

2014-05-16 Thread Xiangrui Meng
In Spark's KMeans, if no cluster center moves more than epsilon in
Euclidean distance from previous iteration, the algorithm finishes. No
further iterations are performed. For Mahout, you need to check the
documentation or the code to see what epsilon means there. -Xiangrui

On Wed, May 14, 2014 at 8:35 PM, Stuti Awasthi  wrote:
> Hi All,
>
>
>
> Any ideas on this ??
>
>
>
> Thanks
>
> Stuti Awasthi
>
>
>
> From: Stuti Awasthi
> Sent: Wednesday, May 14, 2014 6:20 PM
> To: user@spark.apache.org
> Subject: Understanding epsilon in KMeans
>
>
>
> Hi All,
>
>
>
> I wanted to understand the functionality of epsilon in KMeans in Spark
> MLlib.
>
>
>
> As per documentation :
>
> distance threshold within which we've consider centers to have converged.If
> all centers move less than this Euclidean distance, we stop iterating one
> run.
>
>
>
> Now I have assumed that if centers are moving less than epsilon value then
> Clustering Stops but then what does it mean by “we stop iterating one run”..
>
> Now suppose I have given maxIterations=10  and epsilon = 0.1 and assume that
> centers are afteronly 2 iteration, the epsilon condition is met i.e. now
> centers are moving only less than 0.1..
>
>
>
> Now what happens ?? The whole 10 iterations are completed OR the Clustering
> stops ??
>
>
>
> My 2nd query is in Mahout, there is a configuration param : “Convergence
> Threshold (cd)”   which states : “in an iteration, the centroids don’t move
> more than this distance, no further iterations are done and clustering
> stops.”
>
>
>
> So is epsilon and cd similar ??
>
>
>
> 3rd query :
>
> How to pass epsilon as a configurable param. KMeans.train() does not provide
> the way but in code I can see “setEpsilon” as method. SO if I want to pass
> the parameter as epsilon=0.1 , how may I do that..
>
>
>
> Pardon my ignorance
>
>
>
> Thanks
>
> Stuti Awasthi
>
>
>
>
>
>
>
> ::DISCLAIMER::
> 
>
> The contents of this e-mail and any attachment(s) are confidential and
> intended for the named recipient(s) only.
> E-mail transmission is not guaranteed to be secure or error-free as
> information could be intercepted, corrupted,
> lost, destroyed, arrive late or incomplete, or may contain viruses in
> transmission. The e mail and its contents
> (with or without referred errors) shall therefore not attach any liability
> on the originator or HCL or its affiliates.
> Views or opinions, if any, presented in this email are solely those of the
> author and may not necessarily reflect the
> views or opinions of HCL or its affiliates. Any form of reproduction,
> dissemination, copying, disclosure, modification,
> distribution and / or publication of this message without the prior written
> consent of authorized representative of
> HCL is strictly prohibited. If you have received this email in error please
> delete it and notify the sender immediately.
> Before opening any email and/or attachments, please check them for viruses
> and other defects.
>
> 


Re: Reading from .bz2 files with Spark

2014-05-16 Thread Xiangrui Meng
Hi Andrew,

Could you try varying the minPartitions parameter? For example:

val r = sc.textFile("/user/aa/myfile.bz2", 4).count
val r = sc.textFile("/user/aa/myfile.bz2", 8).count

Best,
Xiangrui

On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng  wrote:
> Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
> the problem you described, but it does contain several fixes to bzip2
> format. -Xiangrui
>
> On Wed, May 7, 2014 at 9:19 PM, Andrew Ash  wrote:
>> Hi all,
>>
>> Is anyone reading and writing to .bz2 files stored in HDFS from Spark with
>> success?
>>
>>
>> I'm finding the following results on a recent commit (756c96 from 24hr ago)
>> and CDH 4.4.0:
>>
>> Works: val r = sc.textFile("/user/aa/myfile.bz2").count
>> Doesn't work: val r = sc.textFile("/user/aa/myfile.bz2").map((s:String) =>
>> s+"| " ).count
>>
>> Specifically, I'm getting an exception coming out of the bzip2 libraries
>> (see below stacktraces), which is unusual because I'm able to read from that
>> file without an issue using the same libraries via Pig.  It was originally
>> created from Pig as well.
>>
>> Digging a little deeper I found this line in the .bz2 decompressor's javadoc
>> for CBZip2InputStream:
>>
>> "Instances of this class are not threadsafe." [source]
>>
>>
>> My current working theory is that Spark has a much higher level of
>> parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds
>> exceptions much more frequently (as in can't finish a run over a little 2M
>> row file) vs hardly at all in other libraries.
>>
>> The only other reference I could find to the issue was in presto-users, but
>> the recommendation to leave .bz2 for .lzo doesn't help if I actually do want
>> the higher compression levels of .bz2.
>>
>>
>> Would love to hear if I have some kind of configuration issue or if there's
>> a bug in .bz2 that's fixed in later versions of CDH, or generally any other
>> thoughts on the issue.
>>
>>
>> Thanks!
>> Andrew
>>
>>
>>
>> Below are examples of some exceptions I'm getting:
>>
>> 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
>> java.lang.ArrayIndexOutOfBoundsException
>> java.lang.ArrayIndexOutOfBoundsException: 65535
>> at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
>> at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
>> at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
>> at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
>> at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
>> at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
>> at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
>> at
>> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
>> at java.io.InputStream.read(InputStream.java:101)
>> at
>> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
>> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
>> at
>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
>> at
>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
>>
>>
>>
>>
>> java.lang.ArrayIndexOutOfBoundsException: 90
>> at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900)
>> at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
>> at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
>> at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
>> at
>> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
>> at java.io.InputStream.read(InputStream.java:101)
>> at
>> org.apache.hadoop.util.LineReader.readDefault

Re: Turn BLAS on MacOSX

2014-05-16 Thread Xiangrui Meng
Are you running Spark or just Breeze? First try breeze-natives locally
with the reference blas library and see whether it works or not. Also,
do not enable multi-threading when you compile OpenBLAS
(USE_THREADS=0). -Xiangrui

On Tue, May 13, 2014 at 2:17 AM, wxhsdp  wrote:
> Hi, Xiangrui
>
>   i compile openblas on ec2 m1.large, when breeze calls the native lib,
> error occurs:
>
> INFO: successfully loaded
> /mnt2/wxhsdp/libopenblas/lib/libopenblas_nehalemp-r0.2.9.rc2.so
> [error] (run-main-0) java.lang.UnsatisfiedLinkError:
> com.github.fommil.netlib.NativeSystemBLAS.dgemm_offsets(Ljava/lang/String;Ljava/lang/String;IIID[DII[DIID[DII)V
> java.lang.UnsatisfiedLinkError:
> com.github.fommil.netlib.NativeSystemBLAS.dgemm_offsets(Ljava/lang/String;Ljava/lang/String;IIID[DII[DIID[DII)V
> at com.github.fommil.netlib.NativeSystemBLAS.dgemm_offsets(Native 
> Method)
> at
> com.github.fommil.netlib.NativeSystemBLAS.dgemm(NativeSystemBLAS.java:100)
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Re-Turn-BLAS-on-MacOSX-tpp5648.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to run the SVM and LogisticRegression

2014-05-16 Thread Xiangrui Meng
If you check out the master branch, there are some examples that can
be used as templates under

examples/src/main/scala/org/apache/spark/examples/mllib

Best,
Xiangrui

On Wed, May 14, 2014 at 1:36 PM, yxzhao  wrote:
>
> Hello,
> I found the classfication algorithms SVM and LogisticRegression implemented
> in the following directory. And how to run them? What is the commnad line
> should be? Thanks.
> spark-0.9.0-incubating/mllib/src/main/scala/org/apache/spark/mllib/classification
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-run-the-SVM-and-LogisticRegression-tp5720.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: spark on yarn-standalone, throws StackOverflowError and fails somtimes and succeed for the rest

2014-05-16 Thread Xiangrui Meng
Could you try `println(result.toDebugString())` right after `val
result = ...` and attach the result? -Xiangrui

On Fri, May 9, 2014 at 8:20 AM, phoenix bai  wrote:
> after a couple of tests, I find that, if I use:
>
> val result = model.predict(prdctpairs)
> result.map(x =>
> x.user+","+x.product+","+x.rating).saveAsTextFile(output)
>
> it always fails with above error and the exception seems iterative.
>
> but if I do:
>
> val result = model.predict(prdctpairs)
> result.cach()
> result.map(x =>
> x.user+","+x.product+","+x.rating).saveAsTextFile(output)
>
> it succeeds.
>
> could anyone help explain why the cach() is necessary?
>
> thanks
>
>
>
> On Fri, May 9, 2014 at 6:45 PM, phoenix bai  wrote:
>>
>> Hi all,
>>
>> My spark code is running on yarn-standalone.
>>
>> the last three lines of the code as below,
>>
>> val result = model.predict(prdctpairs)
>> result.map(x =>
>> x.user+","+x.product+","+x.rating).saveAsTextFile(output)
>> sc.stop()
>>
>> the same code, sometimes be able to run successfully and could give out
>> the right result, while from time to time, it throws StackOverflowError and
>> fail.
>>
>> and  I don`t have a clue how I should debug.
>>
>> below is the error, (the start and end portion to be exact):
>>
>>
>> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-17]
>> MapOutputTrackerMasterActor: Asked to send map output locations for shuffle
>> 44 to sp...@rxx43.mc10.site.net:43885
>> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-17]
>> MapOutputTrackerMaster: Size of output statuses for shuffle 44 is 148 bytes
>> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-35]
>> MapOutputTrackerMasterActor: Asked to send map output locations for shuffle
>> 45 to sp...@rxx43.mc10.site.net:43885
>> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-35]
>> MapOutputTrackerMaster: Size of output statuses for shuffle 45 is 453 bytes
>> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-20]
>> MapOutputTrackerMasterActor: Asked to send map output locations for shuffle
>> 44 to sp...@rxx43.mc10.site.net:56767
>> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-29]
>> MapOutputTrackerMasterActor: Asked to send map output locations for shuffle
>> 45 to sp...@rxx43.mc10.site.net:56767
>> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-29]
>> MapOutputTrackerMasterActor: Asked to send map output locations for shuffle
>> 44 to sp...@rxx43.mc10.site.net:49879
>> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-29]
>> MapOutputTrackerMasterActor: Asked to send map output locations for shuffle
>> 45 to sp...@rxx43.mc10.site.net:49879
>> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-17]
>> TaskSetManager: Starting task 946.0:17 as TID 146 on executor 6:
>> rx15.mc10.site.net (PROCESS_LOCAL)
>> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-17]
>> TaskSetManager: Serialized task 946.0:17 as 6414 bytes in 0 ms
>> 14-05-09 17:55:51 WARN [Result resolver thread-0] TaskSetManager: Lost TID
>> 133 (task 946.0:4)
>> 14-05-09 17:55:51 WARN [Result resolver thread-0] TaskSetManager: Loss was
>> due to java.lang.StackOverflowError
>> java.lang.StackOverflowError
>> at java.lang.ClassLoader.defineClass1(Native Method)
>> at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)
>> at java.lang.ClassLoader.defineClass(ClassLoader.java:615)
>> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)
>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:283)
>> at java.net.URLClassLoader.access$000(URLClassLoader.java:58)
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
>> at java.lang.ClassLoader.defineClass1(Native Method)
>> at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)
>> at java.lang.ClassLoader.defineClass(ClassLoader.java:615)
>>
>> 
>>
>> at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>> at java.lang.reflect.Method.invoke(Method.java:597)
>> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
>> at
>> java.io.ObjectInputStream.re

Re: How to use Mahout VectorWritable in Spark.

2014-05-15 Thread Xiangrui Meng
You need

> val raw = sc.sequenceFile(path, classOf[Text], classOf[VectorWriteable])

to load the data. After that, you can do

> val data = raw.values.map(_.get)

To get an RDD of mahout's Vector. You can use `--jar mahout-math.jar`
when you launch spark-shell to include mahout-math.

Best,
Xiangrui

On Tue, May 13, 2014 at 10:37 PM, Stuti Awasthi  wrote:
> Hi All,
>
> I am very new to Spark and trying to play around with Mllib hence apologies
> for the basic question.
>
>
>
> I am trying to run KMeans algorithm using Mahout and Spark MLlib to see the
> performance. Now initial datasize was 10 GB. Mahout converts the data in
> Sequence File  which is used for KMeans Clustering.
> The Sequence File crated was ~ 6GB in size.
>
>
>
> Now I wanted if I can use the Mahout Sequence file to be executed in Spark
> MLlib for KMeans . I have read that SparkContext.sequenceFile may be used
> here. Hence I tried to read my sequencefile as below but getting the error :
>
>
>
> Command on Spark Shell :
>
> scala> val data = sc.sequenceFile[String,VectorWritable]("/
> KMeans_dataset_seq/part-r-0",String,VectorWritable)
>
> :12: error: not found: type VectorWritable
>
>val data = sc.sequenceFile[String,VectorWritable]("
> /KMeans_dataset_seq/part-r-0",String,VectorWritable)
>
>
>
> Here I have 2 ques:
>
> 1.  Mahout has “Text” as Key but Spark is printing “not found: type:Text”
> hence I changed it to String.. Is this correct ???
>
> 2. How will VectorWritable be found in Spark. Do I need to include Mahout
> jar in Classpath or any other option ??
>
>
>
> Please Suggest
>
>
>
> Regards
>
> Stuti Awasthi
>
>
>
> ::DISCLAIMER::
> 
>
> The contents of this e-mail and any attachment(s) are confidential and
> intended for the named recipient(s) only.
> E-mail transmission is not guaranteed to be secure or error-free as
> information could be intercepted, corrupted,
> lost, destroyed, arrive late or incomplete, or may contain viruses in
> transmission. The e mail and its contents
> (with or without referred errors) shall therefore not attach any liability
> on the originator or HCL or its affiliates.
> Views or opinions, if any, presented in this email are solely those of the
> author and may not necessarily reflect the
> views or opinions of HCL or its affiliates. Any form of reproduction,
> dissemination, copying, disclosure, modification,
> distribution and / or publication of this message without the prior written
> consent of authorized representative of
> HCL is strictly prohibited. If you have received this email in error please
> delete it and notify the sender immediately.
> Before opening any email and/or attachments, please check them for viruses
> and other defects.
>
> 


Re: Using String Dataset for Logistic Regression

2014-05-15 Thread Xiangrui Meng
It depends on how you want to use the string features. For the day of
the week, you can replace it with 6 binary features indicating
Mon/Tue/Wed/Th/Fri/Sat. -Xiangrui

On Fri, May 9, 2014 at 5:31 AM, praveshjain1991
 wrote:
> I have been trying to use LR in Spark's Java API. I used the dataset given
> along with Spark for the training and testing purposes.
>
> Now i want to use it on another dataset that contains string values along
> with numbers. Is there any way to do this?
>
> I am attaching the Dataset that i want to use.
>
> Thanks and Regards, Test.data
> 
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-String-Dataset-for-Logistic-Regression-tp5523.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: spark on yarn-standalone, throws StackOverflowError and fails somtimes and succeed for the rest

2014-05-15 Thread Xiangrui Meng
This is a known issue. Please try to reduce the number of iterations
(e.g., <35). -Xiangrui

On Fri, May 9, 2014 at 3:45 AM, phoenix bai  wrote:
> Hi all,
>
> My spark code is running on yarn-standalone.
>
> the last three lines of the code as below,
>
> val result = model.predict(prdctpairs)
> result.map(x =>
> x.user+","+x.product+","+x.rating).saveAsTextFile(output)
> sc.stop()
>
> the same code, sometimes be able to run successfully and could give out the
> right result, while from time to time, it throws StackOverflowError and
> fail.
>
> and  I don`t have a clue how I should debug.
>
> below is the error, (the start and end portion to be exact):
>
>
> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-17]
> MapOutputTrackerMasterActor: Asked to send map output locations for shuffle
> 44 to sp...@rxx43.mc10.site.net:43885
> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-17]
> MapOutputTrackerMaster: Size of output statuses for shuffle 44 is 148 bytes
> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-35]
> MapOutputTrackerMasterActor: Asked to send map output locations for shuffle
> 45 to sp...@rxx43.mc10.site.net:43885
> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-35]
> MapOutputTrackerMaster: Size of output statuses for shuffle 45 is 453 bytes
> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-20]
> MapOutputTrackerMasterActor: Asked to send map output locations for shuffle
> 44 to sp...@rxx43.mc10.site.net:56767
> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-29]
> MapOutputTrackerMasterActor: Asked to send map output locations for shuffle
> 45 to sp...@rxx43.mc10.site.net:56767
> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-29]
> MapOutputTrackerMasterActor: Asked to send map output locations for shuffle
> 44 to sp...@rxx43.mc10.site.net:49879
> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-29]
> MapOutputTrackerMasterActor: Asked to send map output locations for shuffle
> 45 to sp...@rxx43.mc10.site.net:49879
> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-17]
> TaskSetManager: Starting task 946.0:17 as TID 146 on executor 6:
> rx15.mc10.site.net (PROCESS_LOCAL)
> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-17]
> TaskSetManager: Serialized task 946.0:17 as 6414 bytes in 0 ms
> 14-05-09 17:55:51 WARN [Result resolver thread-0] TaskSetManager: Lost TID
> 133 (task 946.0:4)
> 14-05-09 17:55:51 WARN [Result resolver thread-0] TaskSetManager: Loss was
> due to java.lang.StackOverflowError
> java.lang.StackOverflowError
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:615)
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:283)
> at java.net.URLClassLoader.access$000(URLClassLoader.java:58)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:615)
>
> 
>
> at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-5]
> TaskSetManager: Starting task 946.0:4 as TID 147 on executor 6:
> r15.mc10.site.net (PROCESS_LOCAL)
> 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-5]
> TaskSetManager: Serialized task 946.0:4 as 6414 bytes in 0 ms
> 14-05-09 17:55:51 WARN [Result resolver thread-1] TaskSetManager: Lost TID
> 139 (task 946.0

Re: Distribute jar dependencies via sc.AddJar(fileName)

2014-05-15 Thread Xiangrui Meng
In SparkContext#addJar, for yarn-standalone mode, the workers should
get the jars from local distributed cache instead of fetching them
from the http server. Could you send the command you used to submit
the job? -Xiangrui

On Wed, May 14, 2014 at 1:26 AM, DB Tsai  wrote:
> Hi Xiangrui,
>
> I actually used `yarn-standalone`, sorry for misleading. I did debugging in
> the last couple days, and everything up to updateDependency in
> executor.scala works. I also checked the file size and md5sum in the
> executors, and they are the same as the one in driver. Gonna do more testing
> tomorrow.
>
> Thanks.
>
>
> Sincerely,
>
> DB Tsai
> ---
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
> On Tue, May 13, 2014 at 11:41 PM, Xiangrui Meng  wrote:
>>
>> I don't know whether this would fix the problem. In v0.9, you need
>> `yarn-standalone` instead of `yarn-cluster`.
>>
>> See
>> https://github.com/apache/spark/commit/328c73d037c17440c2a91a6c88b4258fbefa0c08
>>
>> On Tue, May 13, 2014 at 11:36 PM, Xiangrui Meng  wrote:
>> > Does v0.9 support yarn-cluster mode? I checked SparkContext.scala in
>> > v0.9.1 and didn't see special handling of `yarn-cluster`. -Xiangrui
>> >
>> > On Mon, May 12, 2014 at 11:14 AM, DB Tsai  wrote:
>> >> We're deploying Spark in yarn-cluster mode (Spark 0.9), and we add jar
>> >> dependencies in command line with "--addJars" option. However, those
>> >> external jars are only available in the driver (application running in
>> >> hadoop), and not available in the executors (workers).
>> >>
>> >> After doing some research, we realize that we've to push those jars to
>> >> executors in driver via sc.AddJar(fileName). Although in the driver's
>> >> log
>> >> (see the following), the jar is successfully added in the http server
>> >> in the
>> >> driver, and I confirm that it's downloadable from any machine in the
>> >> network, I still get `java.lang.NoClassDefFoundError` in the executors.
>> >>
>> >> 14/05/09 14:51:41 INFO spark.SparkContext: Added JAR
>> >> analyticshadoop-eba5cdce1.jar at
>> >> http://10.0.0.56:42522/jars/analyticshadoop-eba5cdce1.jar with
>> >> timestamp
>> >> 1399672301568
>> >>
>> >> Then I check the log in the executors, and I don't find anything
>> >> `Fetching
>> >>  with timestamp `, which implies something is wrong;
>> >> the
>> >> executors are not downloading the external jars.
>> >>
>> >> Any suggestion what we can look at?
>> >>
>> >> After digging into how spark distributes external jars, I wonder the
>> >> scalability of this approach. What if there are thousands of nodes
>> >> downloading the jar from single http server in the driver? Why don't we
>> >> push
>> >> the jars into HDFS distributed cache by default instead of distributing
>> >> them
>> >> via http server?
>> >>
>> >> Thanks.
>> >>
>> >> Sincerely,
>> >>
>> >> DB Tsai
>> >> ---
>> >> My Blog: https://www.dbtsai.com
>> >> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>


Re: Distribute jar dependencies via sc.AddJar(fileName)

2014-05-14 Thread Xiangrui Meng
I don't know whether this would fix the problem. In v0.9, you need
`yarn-standalone` instead of `yarn-cluster`.

See 
https://github.com/apache/spark/commit/328c73d037c17440c2a91a6c88b4258fbefa0c08

On Tue, May 13, 2014 at 11:36 PM, Xiangrui Meng  wrote:
> Does v0.9 support yarn-cluster mode? I checked SparkContext.scala in
> v0.9.1 and didn't see special handling of `yarn-cluster`. -Xiangrui
>
> On Mon, May 12, 2014 at 11:14 AM, DB Tsai  wrote:
>> We're deploying Spark in yarn-cluster mode (Spark 0.9), and we add jar
>> dependencies in command line with "--addJars" option. However, those
>> external jars are only available in the driver (application running in
>> hadoop), and not available in the executors (workers).
>>
>> After doing some research, we realize that we've to push those jars to
>> executors in driver via sc.AddJar(fileName). Although in the driver's log
>> (see the following), the jar is successfully added in the http server in the
>> driver, and I confirm that it's downloadable from any machine in the
>> network, I still get `java.lang.NoClassDefFoundError` in the executors.
>>
>> 14/05/09 14:51:41 INFO spark.SparkContext: Added JAR
>> analyticshadoop-eba5cdce1.jar at
>> http://10.0.0.56:42522/jars/analyticshadoop-eba5cdce1.jar with timestamp
>> 1399672301568
>>
>> Then I check the log in the executors, and I don't find anything `Fetching
>>  with timestamp `, which implies something is wrong; the
>> executors are not downloading the external jars.
>>
>> Any suggestion what we can look at?
>>
>> After digging into how spark distributes external jars, I wonder the
>> scalability of this approach. What if there are thousands of nodes
>> downloading the jar from single http server in the driver? Why don't we push
>> the jars into HDFS distributed cache by default instead of distributing them
>> via http server?
>>
>> Thanks.
>>
>> Sincerely,
>>
>> DB Tsai
>> ---
>> My Blog: https://www.dbtsai.com
>> LinkedIn: https://www.linkedin.com/in/dbtsai


Re: Distribute jar dependencies via sc.AddJar(fileName)

2014-05-14 Thread Xiangrui Meng
Does v0.9 support yarn-cluster mode? I checked SparkContext.scala in
v0.9.1 and didn't see special handling of `yarn-cluster`. -Xiangrui

On Mon, May 12, 2014 at 11:14 AM, DB Tsai  wrote:
> We're deploying Spark in yarn-cluster mode (Spark 0.9), and we add jar
> dependencies in command line with "--addJars" option. However, those
> external jars are only available in the driver (application running in
> hadoop), and not available in the executors (workers).
>
> After doing some research, we realize that we've to push those jars to
> executors in driver via sc.AddJar(fileName). Although in the driver's log
> (see the following), the jar is successfully added in the http server in the
> driver, and I confirm that it's downloadable from any machine in the
> network, I still get `java.lang.NoClassDefFoundError` in the executors.
>
> 14/05/09 14:51:41 INFO spark.SparkContext: Added JAR
> analyticshadoop-eba5cdce1.jar at
> http://10.0.0.56:42522/jars/analyticshadoop-eba5cdce1.jar with timestamp
> 1399672301568
>
> Then I check the log in the executors, and I don't find anything `Fetching
>  with timestamp `, which implies something is wrong; the
> executors are not downloading the external jars.
>
> Any suggestion what we can look at?
>
> After digging into how spark distributes external jars, I wonder the
> scalability of this approach. What if there are thousands of nodes
> downloading the jar from single http server in the driver? Why don't we push
> the jars into HDFS distributed cache by default instead of distributing them
> via http server?
>
> Thanks.
>
> Sincerely,
>
> DB Tsai
> ---
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai


Re: Reading from .bz2 files with Spark

2014-05-13 Thread Xiangrui Meng
Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
the problem you described, but it does contain several fixes to bzip2
format. -Xiangrui

On Wed, May 7, 2014 at 9:19 PM, Andrew Ash  wrote:
> Hi all,
>
> Is anyone reading and writing to .bz2 files stored in HDFS from Spark with
> success?
>
>
> I'm finding the following results on a recent commit (756c96 from 24hr ago)
> and CDH 4.4.0:
>
> Works: val r = sc.textFile("/user/aa/myfile.bz2").count
> Doesn't work: val r = sc.textFile("/user/aa/myfile.bz2").map((s:String) =>
> s+"| " ).count
>
> Specifically, I'm getting an exception coming out of the bzip2 libraries
> (see below stacktraces), which is unusual because I'm able to read from that
> file without an issue using the same libraries via Pig.  It was originally
> created from Pig as well.
>
> Digging a little deeper I found this line in the .bz2 decompressor's javadoc
> for CBZip2InputStream:
>
> "Instances of this class are not threadsafe." [source]
>
>
> My current working theory is that Spark has a much higher level of
> parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds
> exceptions much more frequently (as in can't finish a run over a little 2M
> row file) vs hardly at all in other libraries.
>
> The only other reference I could find to the issue was in presto-users, but
> the recommendation to leave .bz2 for .lzo doesn't help if I actually do want
> the higher compression levels of .bz2.
>
>
> Would love to hear if I have some kind of configuration issue or if there's
> a bug in .bz2 that's fixed in later versions of CDH, or generally any other
> thoughts on the issue.
>
>
> Thanks!
> Andrew
>
>
>
> Below are examples of some exceptions I'm getting:
>
> 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
> java.lang.ArrayIndexOutOfBoundsException
> java.lang.ArrayIndexOutOfBoundsException: 65535
> at
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
> at
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
> at
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
> at
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
> at
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
> at
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
> at
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
> at
> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
> at java.io.InputStream.read(InputStream.java:101)
> at
> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
> at
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
> at
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
>
>
>
>
> java.lang.ArrayIndexOutOfBoundsException: 90
> at
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900)
> at
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
> at
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
> at
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
> at
> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
> at java.io.InputStream.read(InputStream.java:101)
> at
> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
> at
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
> at
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
> at
> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
> at
> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> org.apache.spark.rdd.RDD.o

Re: java.lang.StackOverflowError when calling count()

2014-05-13 Thread Xiangrui Meng
After checkPoint, call count directly to materialize it. -Xiangrui

On Tue, May 13, 2014 at 4:20 AM, Mayur Rustagi  wrote:
> We are running into same issue. After 700 or so files the stack overflows,
> cache, persist & checkpointing dont help.
> Basically checkpointing only saves the RDD when it is materialized & it only
> materializes in the end, then it runs out of stack.
>
> Regards
> Mayur
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi
>
>
>
> On Tue, May 13, 2014 at 11:40 AM, Xiangrui Meng  wrote:
>>
>> You have a long lineage that causes the StackOverflow error. Try
>> rdd.checkPoint() and rdd.count() for every 20~30 iterations.
>> checkPoint can cut the lineage. -Xiangrui
>>
>> On Mon, May 12, 2014 at 3:42 PM, Guanhua Yan  wrote:
>> > Dear Sparkers:
>> >
>> > I am using Python spark of version 0.9.0 to implement some iterative
>> > algorithm. I got some errors shown at the end of this email. It seems
>> > that
>> > it's due to the Java Stack Overflow error. The same error has been
>> > duplicated on a mac desktop and a linux workstation, both running the
>> > same
>> > version of Spark.
>> >
>> > The same line of code works correctly after quite some iterations. At
>> > the
>> > line of error, rdd__new.count() could be 0. (In some previous rounds,
>> > this
>> > was also 0 without any problem).
>> >
>> > Any thoughts on this?
>> >
>> > Thank you very much,
>> > - Guanhua
>> >
>> >
>> > 
>> > CODE:print "round", round, rdd__new.count()
>> > 
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
>> > line 542, in count
>> > 14/05/12 16:20:28 INFO TaskSetManager: Loss was due to
>> > java.lang.StackOverflowError [duplicate 1]
>> > return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>> > 14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times;
>> > aborting job
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
>> > line 533, in sum
>> > 14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state
>> > FAILED
>> > from TID 1774 because its task set is gone
>> > return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
>> > line 499, in reduce
>> > vals = self.mapPartitions(func).collect()
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
>> > line 463, in collect
>> > bytesInJava = self._jrdd.collect().iterator()
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
>> > line 537, in __call__
>> >   File
>> >
>> > "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py",
>> > line 300, in get_return_value
>> > py4j.protocol.Py4JJavaError: An error occurred while calling
>> > o4317.collect.
>> > : org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1
>> > times
>> > (most recent failure: Exception failure: java.lang.StackOverflowError)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>> > at
>> >
>> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>> > at
>> >
>> > org.apache.spark.scheduler.DAGScheduler$$

Re: java.lang.StackOverflowError when calling count()

2014-05-13 Thread Xiangrui Meng
You have a long lineage that causes the StackOverflow error. Try
rdd.checkPoint() and rdd.count() for every 20~30 iterations.
checkPoint can cut the lineage. -Xiangrui

On Mon, May 12, 2014 at 3:42 PM, Guanhua Yan  wrote:
> Dear Sparkers:
>
> I am using Python spark of version 0.9.0 to implement some iterative
> algorithm. I got some errors shown at the end of this email. It seems that
> it's due to the Java Stack Overflow error. The same error has been
> duplicated on a mac desktop and a linux workstation, both running the same
> version of Spark.
>
> The same line of code works correctly after quite some iterations. At the
> line of error, rdd__new.count() could be 0. (In some previous rounds, this
> was also 0 without any problem).
>
> Any thoughts on this?
>
> Thank you very much,
> - Guanhua
>
>
> 
> CODE:print "round", round, rdd__new.count()
> 
>   File
> "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
> line 542, in count
> 14/05/12 16:20:28 INFO TaskSetManager: Loss was due to
> java.lang.StackOverflowError [duplicate 1]
> return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
> 14/05/12 16:20:28 ERROR TaskSetManager: Task 8419.0:0 failed 1 times;
> aborting job
>   File
> "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
> line 533, in sum
> 14/05/12 16:20:28 INFO TaskSchedulerImpl: Ignoring update with state FAILED
> from TID 1774 because its task set is gone
> return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
>   File
> "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
> line 499, in reduce
> vals = self.mapPartitions(func).collect()
>   File
> "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/pyspark/rdd.py",
> line 463, in collect
> bytesInJava = self._jrdd.collect().iterator()
>   File
> "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
> line 537, in __call__
>   File
> "/home1/ghyan/Software/spark-0.9.0-incubating-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py",
> line 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o4317.collect.
> : org.apache.spark.SparkException: Job aborted: Task 8419.0:1 failed 1 times
> (most recent failure: Exception failure: java.lang.StackOverflowError)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> ==
> The stack overflow error is shown as follows:
> ==
>
> 14/05/12 16:20:28 ERROR Executor: Exception in task ID 1774
> java.lang.StackOverflowError
> at java.util.zip.Inflater.inflate(Inflater.java:259)
> at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
> at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:116)
> at
> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
> at
> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323)
> at
> java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:2818)
> at java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1452)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1511)
> at

Re: Accuracy in mllib BinaryClassificationMetrics

2014-05-12 Thread Xiangrui Meng
Hi Deb, feel free to add accuracy along with precision and recall. -Xiangrui

On Mon, May 12, 2014 at 1:26 PM, Debasish Das  wrote:
> Hi,
>
> I see precision and recall but no accuracy in mllib.evaluation.binary.
>
> Is it already under development or it needs to be added ?
>
> Thanks.
> Deb
>


Re: Turn BLAS on MacOSX

2014-05-12 Thread Xiangrui Meng
Those are warning messages instead of errors. You need to add
netlib-java:all to use native BLAS/LAPACK. But it won't work if you
include netlib-java:all in an assembly jar. It has to be a separate
jar when you submit your job. For SGD, we only use level-1 BLAS, so I
don't think native code is called. -Xiangrui

On Sun, May 11, 2014 at 9:32 AM, DB Tsai  wrote:
> Hi Debasish,
>
> In https://github.com/apache/spark/blob/master/docs/mllib-guide.md
> Dependencies section, the document talks about the native blas dependencies
> issue.
>
> For netlib which breeze uses internally, if the native library isn't found,
> the jblas implementation will be used.
>
> Here is more detail about how to install native library in different
> platform.
> https://github.com/fommil/netlib-java/blob/master/README.md#machine-optimised-system-libraries
>
>
> Sincerely,
>
> DB Tsai
> ---
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
> On Wed, May 7, 2014 at 10:52 AM, Debasish Das 
> wrote:
>>
>> Hi,
>>
>> How do I load native BLAS libraries on Mac ?
>>
>> I am getting the following errors while running LR and SVM with SGD:
>>
>> 14/05/07 10:48:13 WARN BLAS: Failed to load implementation from:
>> com.github.fommil.netlib.NativeSystemBLAS
>>
>> 14/05/07 10:48:13 WARN BLAS: Failed to load implementation from:
>> com.github.fommil.netlib.NativeRefBLAS
>>
>> centos it was fine...but on mac I am getting these warnings..
>>
>> Also when it fails to run native blas does it use java code for BLAS
>> operations ?
>>
>> May be after addition of breeze, we should add these details on a page as
>> well so that users are aware of it before they report any performance
>> results..
>>
>> Thanks.
>>
>> Deb
>
>


Re: Spark LIBLINEAR

2014-05-12 Thread Xiangrui Meng
Hi Chieh-Yen,

Great to see the Spark implementation of LIBLINEAR! We will definitely
consider adding a wrapper in MLlib to support it. Is the source code
on github?

Deb, Spark LIBLINEAR uses BSD license, which is compatible with Apache.

Best,
Xiangrui

On Sun, May 11, 2014 at 10:29 AM, Debasish Das  wrote:
> Hello Prof. Lin,
>
> Awesome news ! I am curious if you have any benchmarks comparing C++ MPI
> with Scala Spark liblinear implementations...
>
> Is Spark Liblinear apache licensed or there are any specific restrictions on
> using it ?
>
> Except using native blas libraries (which each user has to manage by pulling
> in their best proprietary BLAS package), all Spark code is Apache licensed.
>
> Thanks.
> Deb
>
>
> On Sun, May 11, 2014 at 3:01 AM, DB Tsai  wrote:
>>
>> Dear Prof. Lin,
>>
>> Interesting! We had an implementation of L-BFGS in Spark and already
>> merged in the upstream now.
>>
>> We read your paper comparing TRON and OWL-QN for logistic regression with
>> L1 (http://www.csie.ntu.edu.tw/~cjlin/papers/l1.pdf), but it seems that it's
>> not in the distributed setup.
>>
>> Will be very interesting to know the L2 logistic regression benchmark
>> result in Spark with your TRON optimizer and the L-BFGS optimizer against
>> different datasets (sparse, dense, and wide, etc).
>>
>> I'll try your TRON out soon.
>>
>>
>> Sincerely,
>>
>> DB Tsai
>> ---
>> My Blog: https://www.dbtsai.com
>> LinkedIn: https://www.linkedin.com/in/dbtsai
>>
>>
>> On Sun, May 11, 2014 at 1:49 AM, Chieh-Yen 
>> wrote:
>>>
>>> Dear all,
>>>
>>> Recently we released a distributed extension of LIBLINEAR at
>>>
>>> http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/distributed-liblinear/
>>>
>>> Currently, TRON for logistic regression and L2-loss SVM is supported.
>>> We provided both MPI and Spark implementations.
>>> This is very preliminary so your comments are very welcome.
>>>
>>> Thanks,
>>> Chieh-Yen
>>
>>
>


Re: running SparkALS

2014-04-28 Thread Xiangrui Meng
Hi Diana,

SparkALS is an example implementation of ALS. It doesn't call the ALS
algorithm implemented in MLlib. M, U, and F are used to generate
synthetic data.

I'm updating the examples. In the meantime, you can take a look at the
updated MLlib guide:
http://50.17.120.186:4000/mllib-collaborative-filtering.html and try
the example code there.

Thanks,
Xiangrui

On Mon, Apr 28, 2014 at 10:30 AM, Diana Carroll  wrote:
> Hi everyone.  I'm trying to run some of the Spark example code, and most of
> it appears to be undocumented (unless I'm missing something).  Can someone
> help me out?
>
> I'm particularly interested in running SparkALS, which wants parameters:
> M U F iter slices
>
> What are these variables?  They appear to be integers and the default values
> are 100, 500 and 10 respectively but beyond that...huh?
>
> Thanks!
>
> Diana


Re: Running out of memory Naive Bayes

2014-04-27 Thread Xiangrui Meng
How big is your problem and how many labels? -Xiangrui

On Sun, Apr 27, 2014 at 10:28 PM, DB Tsai  wrote:
> Hi Xiangrui,
>
> We also run into this issue at Alpine Data Labs. We ended up using LRU cache
> to store the counts, and splitting those least used counts to distributed
> cache in HDFS.
>
>
> Sincerely,
>
> DB Tsai
> ---
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
> On Sun, Apr 27, 2014 at 7:34 PM, Xiangrui Meng  wrote:
>>
>> Even the features are sparse, the conditional probabilities are stored
>> in a dense matrix. With 200 labels and 2 million features, you need to
>> store at least 4e8 doubles on the driver node. With multiple
>> partitions, you may need more memory on the driver. Could you try
>> reducing the number of partitions and giving driver more ram and see
>> whether it can help? -Xiangrui
>>
>> On Sun, Apr 27, 2014 at 3:33 PM, John King 
>> wrote:
>> > I'm already using the SparseVector class.
>> >
>> > ~200 labels
>> >
>> >
>> > On Sun, Apr 27, 2014 at 12:26 AM, Xiangrui Meng 
>> > wrote:
>> >>
>> >> How many labels does your dataset have? -Xiangrui
>> >>
>> >> On Sat, Apr 26, 2014 at 6:03 PM, DB Tsai  wrote:
>> >> > Which version of mllib are you using? For Spark 1.0, mllib will
>> >> > support sparse feature vector which will improve performance a lot
>> >> > when computing the distance between points and centroid.
>> >> >
>> >> > Sincerely,
>> >> >
>> >> > DB Tsai
>> >> > ---
>> >> > My Blog: https://www.dbtsai.com
>> >> > LinkedIn: https://www.linkedin.com/in/dbtsai
>> >> >
>> >> >
>> >> > On Sat, Apr 26, 2014 at 5:49 AM, John King
>> >> >  wrote:
>> >> >> I'm just wondering are the SparkVector calculations really taking
>> >> >> into
>> >> >> account the sparsity or just converting to dense?
>> >> >>
>> >> >>
>> >> >> On Fri, Apr 25, 2014 at 10:06 PM, John King
>> >> >> 
>> >> >> wrote:
>> >> >>>
>> >> >>> I've been trying to use the Naive Bayes classifier. Each example in
>> >> >>> the
>> >> >>> dataset is about 2 million features, only about 20-50 of which are
>> >> >>> non-zero,
>> >> >>> so the vectors are very sparse. I keep running out of memory
>> >> >>> though,
>> >> >>> even
>> >> >>> for about 1000 examples on 30gb RAM while the entire dataset is 4
>> >> >>> million
>> >> >>> examples. And I would also like to note that I'm using the sparse
>> >> >>> vector
>> >> >>> class.
>> >> >>
>> >> >>
>> >
>> >
>
>


Re: Running out of memory Naive Bayes

2014-04-27 Thread Xiangrui Meng
Even the features are sparse, the conditional probabilities are stored
in a dense matrix. With 200 labels and 2 million features, you need to
store at least 4e8 doubles on the driver node. With multiple
partitions, you may need more memory on the driver. Could you try
reducing the number of partitions and giving driver more ram and see
whether it can help? -Xiangrui

On Sun, Apr 27, 2014 at 3:33 PM, John King  wrote:
> I'm already using the SparseVector class.
>
> ~200 labels
>
>
> On Sun, Apr 27, 2014 at 12:26 AM, Xiangrui Meng  wrote:
>>
>> How many labels does your dataset have? -Xiangrui
>>
>> On Sat, Apr 26, 2014 at 6:03 PM, DB Tsai  wrote:
>> > Which version of mllib are you using? For Spark 1.0, mllib will
>> > support sparse feature vector which will improve performance a lot
>> > when computing the distance between points and centroid.
>> >
>> > Sincerely,
>> >
>> > DB Tsai
>> > ---
>> > My Blog: https://www.dbtsai.com
>> > LinkedIn: https://www.linkedin.com/in/dbtsai
>> >
>> >
>> > On Sat, Apr 26, 2014 at 5:49 AM, John King
>> >  wrote:
>> >> I'm just wondering are the SparkVector calculations really taking into
>> >> account the sparsity or just converting to dense?
>> >>
>> >>
>> >> On Fri, Apr 25, 2014 at 10:06 PM, John King
>> >> 
>> >> wrote:
>> >>>
>> >>> I've been trying to use the Naive Bayes classifier. Each example in
>> >>> the
>> >>> dataset is about 2 million features, only about 20-50 of which are
>> >>> non-zero,
>> >>> so the vectors are very sparse. I keep running out of memory though,
>> >>> even
>> >>> for about 1000 examples on 30gb RAM while the entire dataset is 4
>> >>> million
>> >>> examples. And I would also like to note that I'm using the sparse
>> >>> vector
>> >>> class.
>> >>
>> >>
>
>


Re: Running out of memory Naive Bayes

2014-04-26 Thread Xiangrui Meng
How many labels does your dataset have? -Xiangrui

On Sat, Apr 26, 2014 at 6:03 PM, DB Tsai  wrote:
> Which version of mllib are you using? For Spark 1.0, mllib will
> support sparse feature vector which will improve performance a lot
> when computing the distance between points and centroid.
>
> Sincerely,
>
> DB Tsai
> ---
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
> On Sat, Apr 26, 2014 at 5:49 AM, John King  
> wrote:
>> I'm just wondering are the SparkVector calculations really taking into
>> account the sparsity or just converting to dense?
>>
>>
>> On Fri, Apr 25, 2014 at 10:06 PM, John King 
>> wrote:
>>>
>>> I've been trying to use the Naive Bayes classifier. Each example in the
>>> dataset is about 2 million features, only about 20-50 of which are non-zero,
>>> so the vectors are very sparse. I keep running out of memory though, even
>>> for about 1000 examples on 30gb RAM while the entire dataset is 4 million
>>> examples. And I would also like to note that I'm using the sparse vector
>>> class.
>>
>>


Re: Spark mllib throwing error

2014-04-24 Thread Xiangrui Meng
I only see one risk: if your feature indices are not sorted, it might
have undefined behavior. Other than that, I don't see any thing
suspicious. -Xiangrui

On Thu, Apr 24, 2014 at 4:56 PM, John King  wrote:
> It just displayed this error and stopped on its own. Do the lines of code
> mentioned in the error have anything to do with it?
>
>
> On Thu, Apr 24, 2014 at 7:54 PM, Xiangrui Meng  wrote:
>>
>> I don't see anything wrong with your code. Could you do points.count()
>> to see how many training examples you have? Also, make sure you don't
>> have negative feature values. The error message you sent did not say
>> NaiveBayes went wrong, but the Spark shell was killed. -Xiangrui
>>
>> On Thu, Apr 24, 2014 at 4:05 PM, John King 
>> wrote:
>> > In the other thread I had an issue with Python. In this issue, I tried
>> > switching to Scala. The code is:
>> >
>> > import org.apache.spark.mllib.regression.LabeledPoint;
>> >
>> > import org.apache.spark.mllib.linalg.SparseVector;
>> >
>> > import org.apache.spark.mllib.classification.NaiveBayes;
>> >
>> > import scala.collection.mutable.ArrayBuffer
>> >
>> >
>> >
>> > def isEmpty(a: String): Boolean = a != null &&
>> > !a.replaceAll("""(?m)\s+$""",
>> > "").isEmpty()
>> >
>> > def parsePoint(a: String): LabeledPoint = {
>> >
>> >val values = a.split('\t')
>> >
>> >val feat = values(1).split(' ')
>> >
>> >val indices = ArrayBuffer.empty[Int]
>> >
>> >val featValues = ArrayBuffer.empty[Double]
>> >
>> >for (f <- feat) {
>> >
>> >val q = f.split(':')
>> >
>> >if (q.length == 2) {
>> >
>> >   indices += (q(0).toInt)
>> >
>> >   featValues += (q(1).toDouble)
>> >
>> >}
>> >
>> >}
>> >
>> >val vector = new SparseVector(2357815, indices.toArray,
>> > featValues.toArray)
>> >
>> >return LabeledPoint(values(0).toDouble, vector)
>> >
>> >}
>> >
>> >
>> > val data = sc.textFile("data.txt")
>> >
>> > val empty = data.filter(isEmpty)
>> >
>> > val points = empty.map(parsePoint)
>> >
>> > points.cache()
>> >
>> > val model = new NaiveBayes().run(points)
>> >
>> >
>> >
>> > On Thu, Apr 24, 2014 at 6:57 PM, Xiangrui Meng  wrote:
>> >>
>> >> Do you mind sharing more code and error messages? The information you
>> >> provided is too little to identify the problem. -Xiangrui
>> >>
>> >> On Thu, Apr 24, 2014 at 1:55 PM, John King
>> >> 
>> >> wrote:
>> >> > Last command was:
>> >> >
>> >> > val model = new NaiveBayes().run(points)
>> >> >
>> >> >
>> >> >
>> >> > On Thu, Apr 24, 2014 at 4:27 PM, Xiangrui Meng 
>> >> > wrote:
>> >> >>
>> >> >> Could you share the command you used and more of the error message?
>> >> >> Also, is it an MLlib specific problem? -Xiangrui
>> >> >>
>> >> >> On Thu, Apr 24, 2014 at 11:49 AM, John King
>> >> >>  wrote:
>> >> >> > ./spark-shell: line 153: 17654 Killed
>> >> >> > $FWDIR/bin/spark-class org.apache.spark.repl.Main "$@"
>> >> >> >
>> >> >> >
>> >> >> > Any ideas?
>> >> >
>> >> >
>> >
>> >
>
>


Re: Spark mllib throwing error

2014-04-24 Thread Xiangrui Meng
I don't see anything wrong with your code. Could you do points.count()
to see how many training examples you have? Also, make sure you don't
have negative feature values. The error message you sent did not say
NaiveBayes went wrong, but the Spark shell was killed. -Xiangrui

On Thu, Apr 24, 2014 at 4:05 PM, John King  wrote:
> In the other thread I had an issue with Python. In this issue, I tried
> switching to Scala. The code is:
>
> import org.apache.spark.mllib.regression.LabeledPoint;
>
> import org.apache.spark.mllib.linalg.SparseVector;
>
> import org.apache.spark.mllib.classification.NaiveBayes;
>
> import scala.collection.mutable.ArrayBuffer
>
>
>
> def isEmpty(a: String): Boolean = a != null && !a.replaceAll("""(?m)\s+$""",
> "").isEmpty()
>
> def parsePoint(a: String): LabeledPoint = {
>
>val values = a.split('\t')
>
>val feat = values(1).split(' ')
>
>val indices = ArrayBuffer.empty[Int]
>
>val featValues = ArrayBuffer.empty[Double]
>
>for (f <- feat) {
>
>val q = f.split(':')
>
>if (q.length == 2) {
>
>   indices += (q(0).toInt)
>
>   featValues += (q(1).toDouble)
>
>}
>
>}
>
>val vector = new SparseVector(2357815, indices.toArray,
> featValues.toArray)
>
>return LabeledPoint(values(0).toDouble, vector)
>
>    }
>
>
> val data = sc.textFile("data.txt")
>
> val empty = data.filter(isEmpty)
>
> val points = empty.map(parsePoint)
>
> points.cache()
>
> val model = new NaiveBayes().run(points)
>
>
>
> On Thu, Apr 24, 2014 at 6:57 PM, Xiangrui Meng  wrote:
>>
>> Do you mind sharing more code and error messages? The information you
>> provided is too little to identify the problem. -Xiangrui
>>
>> On Thu, Apr 24, 2014 at 1:55 PM, John King 
>> wrote:
>> > Last command was:
>> >
>> > val model = new NaiveBayes().run(points)
>> >
>> >
>> >
>> > On Thu, Apr 24, 2014 at 4:27 PM, Xiangrui Meng  wrote:
>> >>
>> >> Could you share the command you used and more of the error message?
>> >> Also, is it an MLlib specific problem? -Xiangrui
>> >>
>> >> On Thu, Apr 24, 2014 at 11:49 AM, John King
>> >>  wrote:
>> >> > ./spark-shell: line 153: 17654 Killed
>> >> > $FWDIR/bin/spark-class org.apache.spark.repl.Main "$@"
>> >> >
>> >> >
>> >> > Any ideas?
>> >
>> >
>
>


Re: Spark mllib throwing error

2014-04-24 Thread Xiangrui Meng
Do you mind sharing more code and error messages? The information you
provided is too little to identify the problem. -Xiangrui

On Thu, Apr 24, 2014 at 1:55 PM, John King  wrote:
> Last command was:
>
> val model = new NaiveBayes().run(points)
>
>
>
> On Thu, Apr 24, 2014 at 4:27 PM, Xiangrui Meng  wrote:
>>
>> Could you share the command you used and more of the error message?
>> Also, is it an MLlib specific problem? -Xiangrui
>>
>> On Thu, Apr 24, 2014 at 11:49 AM, John King
>>  wrote:
>> > ./spark-shell: line 153: 17654 Killed
>> > $FWDIR/bin/spark-class org.apache.spark.repl.Main "$@"
>> >
>> >
>> > Any ideas?
>
>


Re: Trying to use pyspark mllib NaiveBayes

2014-04-24 Thread Xiangrui Meng
I tried locally with the example described in the latest guide:
http://54.82.157.211:4000/mllib-naive-bayes.html , and it worked fine.
Do you mind sharing the code you used? -Xiangrui

On Thu, Apr 24, 2014 at 1:57 PM, John King  wrote:
> Yes, I got it running for large RDD (~7 million lines) and mapping. Just
> received this error when trying to classify.
>
>
> On Thu, Apr 24, 2014 at 4:32 PM, Xiangrui Meng  wrote:
>>
>> Is your Spark cluster running? Try to start with generating simple
>> RDDs and counting. -Xiangrui
>>
>> On Thu, Apr 24, 2014 at 11:38 AM, John King
>>  wrote:
>> > I receive this error:
>> >
>> > Traceback (most recent call last):
>> >
>> >   File "", line 1, in 
>> >
>> >   File
>> > "/home/ubuntu/spark-1.0.0-rc2/python/pyspark/mllib/classification.py",
>> > line
>> > 178, in train
>> >
>> > ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd,
>> > lambda_)
>> >
>> >   File
>> >
>> > "/home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
>> > line 535, in __call__
>> >
>> >   File
>> >
>> > "/home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
>> > line 368, in send_command
>> >
>> >   File
>> >
>> > "/home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
>> > line 361, in send_command
>> >
>> >   File
>> >
>> > "/home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
>> > line 317, in _get_connection
>> >
>> >   File
>> >
>> > "/home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
>> > line 324, in _create_connection
>> >
>> >   File
>> >
>> > "/home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
>> > line 431, in start
>> >
>> > py4j.protocol.Py4JNetworkError: An error occurred while trying to
>> > connect to
>> > the Java server
>
>


Re: spark mllib to jblas calls..and comparison with VW

2014-04-24 Thread Xiangrui Meng
The data array in RDD is passed by reference to jblas, so data copying
in this stage. However, if jblas uses the native interface, there is a
copying overhead. I think jblas uses java implementation for at least
Level 1 BLAS, and calling native interface for Level 2 & 3. -Xiangrui

On Thu, Apr 24, 2014 at 3:14 PM, Mohit Jaggi  wrote:
> Folks,
> I am wondering how mllib interacts with jblas and lapack. Does it make
> copies of data from my RDD format to jblas's format? Does jblas copy it
> again before passing to lapack native code?
>
> I also saw some comparisons with VW and it seems mllib is slower on a single
> node but scales better and outperforms VW on 16 nodes. Any idea why? Are
> improvements in the pipeline?
>
> Mohit.


Re: Trying to use pyspark mllib NaiveBayes

2014-04-24 Thread Xiangrui Meng
Is your Spark cluster running? Try to start with generating simple
RDDs and counting. -Xiangrui

On Thu, Apr 24, 2014 at 11:38 AM, John King
 wrote:
> I receive this error:
>
> Traceback (most recent call last):
>
>   File "", line 1, in 
>
>   File
> "/home/ubuntu/spark-1.0.0-rc2/python/pyspark/mllib/classification.py", line
> 178, in train
>
> ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_)
>
>   File
> "/home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
> line 535, in __call__
>
>   File
> "/home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
> line 368, in send_command
>
>   File
> "/home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
> line 361, in send_command
>
>   File
> "/home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
> line 317, in _get_connection
>
>   File
> "/home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
> line 324, in _create_connection
>
>   File
> "/home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
> line 431, in start
>
> py4j.protocol.Py4JNetworkError: An error occurred while trying to connect to
> the Java server


<    1   2   3   4   5   6   >