Re: saveAsTextFiles file not found exception

2014-08-12 Thread Andrew Ash
Hi Chen,

Please see the bug I filed at
https://issues.apache.org/jira/browse/SPARK-2984 with the
FileNotFoundException on _temporary directory issue.

Andrew


On Mon, Aug 11, 2014 at 10:50 PM, Andrew Ash and...@andrewash.com wrote:

 Not sure which stalled HDFS client issue your'e referring to, but there
 was one fixed in Spark 1.0.2 that could help you out --
 https://github.com/apache/spark/pull/1409.  I've still seen one related
 to Configuration objects not being threadsafe though so you'd still need to
 keep speculation on to fix that (SPARK-2546)

 As it stands now, I can:

 A) have speculation off, in which case I get random hangs for a variety of
 reasons (your HDFS stall, my Configuration safety issue)

 or

 B) have speculation on, in which case I get random failures related to
 LeaseExpiredExceptions and .../_temporary/... file doesn't exist exceptions.


 Kind of a catch-22 -- there's no reliable way to run large jobs on Spark
 right now!

 I'm going to file a bug for the _temporary and LeaseExpiredExceptions as I
 think these are widespread enough that we need a place to track a
 resolution.


 On Mon, Aug 11, 2014 at 9:08 AM, Chen Song chen.song...@gmail.com wrote:

 Andrew that is a good finding.

 Yes, I have speculative execution turned on, becauseI saw tasks stalled
 on HDFS client.

 If I turned off speculative execution, is there a way to circumvent the
 hanging task issue?



 On Mon, Aug 11, 2014 at 11:13 AM, Andrew Ash and...@andrewash.com
 wrote:

 I've also been seeing similar stacktraces on Spark core (not streaming)
 and have a theory it's related to spark.speculation being turned on.  Do
 you have that enabled by chance?


 On Mon, Aug 11, 2014 at 8:10 AM, Chen Song chen.song...@gmail.com
 wrote:

 Bill

 Did you get this resolved somehow? Anyone has any insight into this
 problem?

 Chen


 On Mon, Aug 11, 2014 at 10:30 AM, Chen Song chen.song...@gmail.com
 wrote:

 The exception was thrown out in application master(spark streaming
 driver) and the job shut down after this exception.


 On Mon, Aug 11, 2014 at 10:29 AM, Chen Song chen.song...@gmail.com
 wrote:

 I got the same exception after the streaming job runs for a while,
 The ERROR message was complaining about a temp file not being found in 
 the
 output folder.

 14/08/11 08:05:08 ERROR JobScheduler: Error running job streaming job
 140774430 ms.0
 java.io.FileNotFoundException: File
 hdfs://hadoopc/user/csong/output/human_bot/-140774430.out/_temporary/0/task_201408110805__m_07
 does not exist.
 at
 org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:654)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:102)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:712)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:708)
 at
 org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:708)
 at
 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360)
 at
 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
 at
 org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
 at
 org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:841)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:724)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:643)
 at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1068)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$8.apply(DStream.scala:773)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$8.apply(DStream.scala:771)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
 at
 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)


 On Fri, Jul 25, 2014 at 7:04 PM, Bill Jay bill.jaypeter...@gmail.com
  wrote:

 I just saw another error 

Re: ClassNotFound exception on class in uber.jar

2014-08-12 Thread Akhil Das
This is how i used to do it:

  *// Create a list of jars*

 ListString jars =
 Lists.newArrayList(/home/akhld/mobi/localcluster/x/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly-0.9.1-hadoop2.2.0.jar,ADD-All-The-Jars-Here
 );



 *// Create a SparkConf*
 SparkConf spconf = new SparkConf();
 spconf.setMaster(local);
 spconf.setAppName(YourApp);

 spconf.setSparkHome(/home/akhld/mobi/localcluster/x/spark-0.9.1-bin-hadoop2);
 *spconf.setJars(jars.toArray(new String[jars.size()]));*
 spconf.set(spark.executor.memory, 1g);

*// Now create the
context.*

 JavaStreamingContext jsc = new JavaStreamingContext(spconf,new
 Duration(1));


Thanks
Best Regards


On Mon, Aug 11, 2014 at 9:36 PM, lbustelo g...@bustelos.com wrote:

 Not sure if this problem reached the Spark guys because it shows in Nabble
 that This post has NOT been accepted by the mailing list yet.


 http://apache-spark-user-list.1001560.n3.nabble.com/ClassNotFound-for-user-class-in-uber-jar-td10613.html#a11902

 I'm resubmitting.

 Greetings,

 I'm currently building a fat or uber jar with dependencies using maven
 that a docker-ized Spark cluster (1 master 3 workers, version 1.0.0, scala
 2.10.4) points to locally on the same VM. It seems that sometimes a
 particular class is found, and things are fine, and other times it is not.
 Doing a find on the jar affirms that it is actually there. I `setJars` with
 JavaStreamingContext.jarOfClass(the main class).

 I cannot say I know much about how the ClassPath mechanisms of Spark so I
 appreciate any and all suggestions to find out what exactly is happening.

 The exception is as follows:

 14/07/24 18:48:52 INFO Executor: Sending result for 139 directly to driver
 14/07/24 18:48:52 INFO Executor: Finished task ID 139
 14/07/24 18:48:56 WARN BlockManager: Putting block input-0-1406227713800
 failed
 14/07/24 18:48:56 ERROR BlockManagerWorker: Exception handling buffer
 message
 java.lang.ClassNotFoundException: com.cjm5325.MyProject.MyClass
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:270)
 at

 org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:60)
 at
 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
 at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at

 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
 at

 org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
 at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
 at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at org.apache.spark.storage.MemoryStore.putBytes(MemoryStore.scala:59)
 at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:666)
 at
 org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:587)
 at

 org.apache.spark.storage.BlockManagerWorker.putBlock(BlockManagerWorker.scala:82)
 at

 org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:63)
 at

 org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44)
 at

 org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44)
 at

 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at

 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at

 org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at
 

Re: KMeans - java.lang.IllegalArgumentException: requirement failed

2014-08-12 Thread Sean Owen
It sounds like your data does not all have the same dimension? that's
a decent guess. Have a look at the assertions in this method.

On Tue, Aug 12, 2014 at 4:44 AM, Ge, Yao (Y.) y...@ford.com wrote:
 I am trying to train a KMeans model with sparse vector with Spark 1.0.1.

 When I run the training I got the following exception:

 java.lang.IllegalArgumentException: requirement failed

 at scala.Predef$.require(Predef.scala:221)

 at
 org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:271)

 at
 org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:398)

 at
 org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:372)

 at
 org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:366)

 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 at
 org.apache.spark.mllib.clustering.KMeans$.findClosest(KMeans.scala:366)

 at
 org.apache.spark.mllib.clustering.KMeans$.pointCost(KMeans.scala:389)

 at
 org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:269)

 at
 org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:268)

 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

 at scala.collection.immutable.Range.foreach(Range.scala:141)

 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

 at
 scala.collection.AbstractTraversable.map(Traversable.scala:105)

 at
 org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:268)

 at
 org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:267)



 What does this means? How do I troubleshoot this problem?

 Thanks.



 -Yao

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: set SPARK_LOCAL_DIRS issue

2014-08-12 Thread Andrew Ash
// assuming Spark 1.0

Hi Baoqiang,

In my experience for the standalone cluster you need to set
SPARK_WORKER_DIR not SPARK_LOCAL_DIRS to control where shuffle files are
written.  I think this is a documentation issue that could be improved, as
http://spark.apache.org/docs/latest/spark-standalone.html suggests using
SPARK_LOCAL_DIRS for scratch, and I'm not sure that it actually does
anything.

Did you see anything in /mnt/data/tmp when you used SPARK_LOCAL_DIRS?

Cheers!
Andrew


On Sat, Aug 9, 2014 at 7:21 AM, Baoqiang Cao bqcaom...@gmail.com wrote:

 Hi

 I’m trying to using a specific dir for spark working directory since I
 have limited space at /tmp. I tried:
 1)
 export SPARK_LOCAL_DIRS=“/mnt/data/tmp”
 or 2)
 SPARK_LOCAL_DIRS=“/mnt/data/tmp” in spark-env.sh

 But neither worked, since the output of spark still saying

 ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial
 writes to file /tmp/spark-local-20140809134509-0502/34/shuffle_0_436_1
 java.io.FileNotFoundException:
 /tmp/spark-local-20140809134509-0502/34/shuffle_0_436_1 (No space left on
 device)

 anybody help with correctly setting up the “tmp” directory?

 Best,
 Baoqiang Cao
 Blog: http://baoqiang.org
 Email: bqcaom...@gmail.com







Re: Parallelizing a task makes it freeze

2014-08-12 Thread sparkuser2345
Actually the program hangs just by calling dataAllRDD.count(). I suspect
creating the RDD is not successful when its elements are too big. When nY =
3000, dataAllRDD.count() works (each element of dataAll = 3000*400*64 bits =
9.6 MB), but when nY = 4000, it hangs (4000*400*64 bits = 12.8 MB). 

What are the limiting factors to the size of the elements of an RDD? 


sparkuser2345 wrote
 I have an array 'dataAll' of key-value pairs where each value is an array
 of arrays. I would like to parallelize a task over the elements of
 'dataAll' to the workers. In the dummy example below, the number of
 elements in 'dataAll' is 3 but in real application it would be tens to
 hundreds. 
 
 Without parallelizing dataAll, 'result' is calculated in less than a
 second: 
 
 import org.jblas.DoubleMatrix  
 
 val nY = 5000
 val nX = 400
 
 val dataAll = Array((1, Array.fill(nY)(Array.fill(nX)(1.0))),
 (2, Array.fill(nY)(Array.fill(nX)(1.0))),
 (3, Array.fill(nY)(Array.fill(nX)(1.0
 
 val w1 = DoubleMatrix.ones(400)
 
 // This finishes in less than a second: 
 val result = dataAll.map { dat =
   val c   = dat._1
   val dataArr = dat._2
   // Map over the Arrays within dataArr: 
   val test = dataArr.map { arr =
 val test2 = new DoubleMatrix(arr.length, 1, arr:_*)
 val out = test2.dot(w1)
 out
   }
   (c, test)
 }
 
 However, when I parallelize dataAll, the same task freezes: 
 
 val dataAllRDD = sc.parallelize(dataAll, 3)
 
 // This doesn't finish in several minutes: 
 val result = dataAllRDD.map { dat =
   val c   = dat._1
   val dataArr = dat._2
   // Map over the Arrays within dataArr: 
   val test = dataArr.map { arr =
 val test2 = new DoubleMatrix(arr.length, 1, arr:_*)
 val out = test2.dot(w1)
 out
   }
   (c, test)
 }.collect
 
 After sending the above task, nothing is written to the worker logs (as
 viewed through the web UI), but the following output is printed in the
 Spark shell where I'm running the task: 
 
 14/08/11 18:17:31 INFO SparkContext: Starting job: collect at 
 console
 :33
 14/08/11 18:17:31 INFO DAGScheduler: Got job 0 (collect at 
 console
 :33) with 3 output partitions (allowLocal=false)
 14/08/11 18:17:31 INFO DAGScheduler: Final stage: Stage 0 (collect at 
 console
 :33)
 14/08/11 18:17:31 INFO DAGScheduler: Parents of final stage: List()
 14/08/11 18:17:31 INFO DAGScheduler: Missing parents: List()
 14/08/11 18:17:31 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at
 map at 
 console
 :23), which has no missing parents
 14/08/11 18:17:32 INFO DAGScheduler: Submitting 3 missing tasks from Stage
 0 (MappedRDD[1] at map at 
 console
 :23)
 14/08/11 18:17:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
 14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on
 executor 2: 
 executor_2_IP
  (PROCESS_LOCAL)
 14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:0 as 16154060
 bytes in 69 ms
 14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:1 as TID 1 on
 executor 1: 
 executor_1_IP
  (PROCESS_LOCAL)
 14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:1 as 16154060
 bytes in 81 ms
 14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:2 as TID 2 on
 executor 0: 
 executor_0_IP
  (PROCESS_LOCAL)
 14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:2 as 16154060
 bytes in 66 ms
 
 
 dataAllRDD.map does work with smaller array though (e.g. nY = 100;
 finishes in less than a second). 
 
 Why is dataAllRDD.map so much slower than dataAll.map, or even not
 executing at all? 
 
 The Spark version I'm using is 0.9.0.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parallelizing-a-task-makes-it-freeze-tp11900p11967.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Transform RDD[List]

2014-08-12 Thread Sean Owen
-incubator, +user

So, are you trying to transpose your data?

val rdd = sc.parallelize(List(List(1,2,3,4,5),List(6,7,8,9,10))).repartition(2)

First you could pair each value with its position in its list:

val withIndex = rdd.flatMap(_.zipWithIndex)

then group by that position, and discard the position:

withIndex.groupBy(_._2).values.map(_.map(_._1))

Printing the RDD gives what you want:

List(5, 10)
List(1, 6)
List(3, 8)
List(2, 7)
List(4, 9)

On Tue, Aug 12, 2014 at 5:42 AM, Kevin Jung itsjb.j...@samsung.com wrote:
 Hi
 It may be simple question, but I can not figure out the most efficient way.
 There is a RDD containing list.

 RDD
 (
  List(1,2,3,4,5)
  List(6,7,8,9,10)
 )

 I want to transform this to

 RDD
 (
 List(1,6)
 List(2,7)
 List(3,8)
 List(4,9)
 List(5,10)
 )

 And I want to achieve this without using collect method because realworld
 RDD can have a lot of elements then it may cause out of memory.
 Any ideas will be welcome.

 Best regards
 Kevin



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Transform-RDD-List-tp11948.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Killing spark app problem

2014-08-12 Thread Grzegorz Białek
Hi,

when I run some spark application on my local machine using spark-submit:
$SPARK_HOME/bin/spark-submit --driver-memory 1g class jar
When I want to interrupt computing by ctrl-c it interrupt current stage but
later it waits and exit after around 5min and sometimes doesn't exit at all,
and the only way that I was able to kill it was kill -9 pid, but after
this
my system doesn't want to boot correctly after reboot.

Is there a better way to properly kill Spark application?

Thanks,
Grzegorz


Re: java.lang.StackOverflowError when calling count()

2014-08-12 Thread Tathagata Das
The long lineage causes a long/deep Java object tree (DAG of RDD objects),
which needs to be serialized as part of the task creation. When
serializing, the whole object DAG needs to be traversed leading to the
stackoverflow error.

TD


On Mon, Aug 11, 2014 at 7:14 PM, randylu randyl...@gmail.com wrote:

 hi, TD. I also fall into the trap of long lineage, and your suggestions do
 work well. But i don't understand why the long lineage can cause stackover,
 and where it takes effect?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p11941.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Is there any way to control the parallelism in LogisticRegression

2014-08-12 Thread Xiangrui Meng
Assuming that your data is very sparse, I would recommend
RDD.repartition. But if it is not the case and you don't want to
shuffle the data, you can try a CombineInputFormat and then parse the
lines into labeled points. Coalesce may cause locality problems if you
didn't use the right number of partitions. -Xiangrui

On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong dong...@gmail.com wrote:
 I think this has the same effect and issue with #1, right?


 On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen chenjiush...@gmail.com
 wrote:

 How about increase HDFS file extent size? like current value is 128M, we
 make it 512M or bigger.


 On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong dong...@gmail.com
 wrote:

 Hi all,

 We are trying to use Spark MLlib to train super large data (100M features
 and 5B rows). The input data in HDFS has ~26K partitions. By default, MLlib
 will create a task for every partition at each iteration. But because our
 dimensions are also very high, such large number of tasks will increase
 large network overhead to transfer the weight vector. So we want to reduce
 the number of tasks, we tried below ways:

 1. Coalesce partitions without shuffling, then cache.

 data.coalesce(numPartitions).cache()

 This works fine for relative small data, but when data is increasing and
 numPartitions is fixed, the size of one partition will be large. This
 introduces two issues: the first is, the larger partition will need larger
 object and more memory at runtime, and trigger GC more frequently; the
 second is, we meet the issue 'size exceeds integer.max_value' error, which
 seems be caused by the size of one partition larger than 2G
 (https://issues.apache.org/jira/browse/SPARK-1391).

 2. Coalesce partitions with shuffling, then cache.

 data.coalesce(numPartitions, true).cache()

 It could mitigate the second issue in #1 at some degree, but fist issue
 is still there, and it also will introduce large amount of shullfling.

 3. Cache data first, and coalesce partitions.

 data.cache().coalesce(numPartitions)

 In this way, the number of cached partitions is not change, but each task
 read the data from multiple partitions. However, I find the task will loss
 locality by this way. I find a lot of 'ANY' tasks, that means that tasks
 read data from other nodes, and become slower than that read data from local
 memory.

 I think the best way should like #3, but leverage locality as more as
 possible. Is there any way to do that? Any suggestions?

 Thanks!

 --
 ZHENG, Xu-dong





 --
 郑旭东
 ZHENG, Xu-dong


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to save mllib model to hdfs and reload it

2014-08-12 Thread Xiangrui Meng
For linear models, the constructors are now public. You can save the
weights to HDFS, then load the weights back and use the constructor to
create the model. -Xiangrui

On Mon, Aug 11, 2014 at 10:27 PM, XiaoQinyu xiaoqinyu_sp...@outlook.com wrote:
 hello:

 I want to know,if I use history data to training model and I want to use
 this model in other app.How should I do?

 Should I save this model in disk? And when I use this model then load it
 from disk.But I don't know how to save the mllib model,and reload it?

 I will be very pleasure,if anyone can give some tips.

 Thanks

 XiaoQinyu



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-save-mllib-model-to-hdfs-and-reload-it-tp11953.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Transform RDD[List]

2014-08-12 Thread Kevin Jung
Thanks for your answer. 
Yes, I want to transpose data.
At this point, I have one more question.
I tested it with
RDD1
List(1, 2, 3, 4, 5)
List(6, 7, 8, 9, 10)
List(11, 12, 13, 14, 15)
List(16, 17, 18, 19, 20)

And the result is... 
ArrayBuffer(11, 1, 16, 6)
ArrayBuffer(2, 12, 7, 17)
ArrayBuffer(3, 13, 18, 8)
ArrayBuffer(9, 19, 4, 14)
ArrayBuffer(15, 20, 10, 5)

It collects well but the order is shuffled.
Can I maintain the order?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Transform-RDD-List-tp11948p11974.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: [spark-streaming] kafka source and flow control

2014-08-12 Thread Gwenhael Pasquiers
I was hoping I could make the system behave as a blocking queue : if the 
outputs is too slow, buffers (storing space for RDDs) fills up, then blocks 
instead of dropping existing rdds, until the input itself blocks (slows down 
it’s consumption).

On a side note I was wondering: is there the same issue with file (hdfs) inputs 
? how can I be sure the input won’t “overflow” the process chain ?


From: Tobias Pfeiffer [mailto:t...@preferred.jp]
Sent: mardi 12 août 2014 02:58
To: Gwenhael Pasquiers
Cc: u...@spark.incubator.apache.org
Subject: Re: [spark-streaming] kafka source and flow control

Hi,

On Mon, Aug 11, 2014 at 9:41 PM, Gwenhael Pasquiers 
gwenhael.pasqui...@ericsson.commailto:gwenhael.pasqui...@ericsson.com wrote:
We intend to apply other operations on the data later in the same spark 
context, but our first step is to archive it.

Our goal is somth like this
Step 1 : consume kafka
Step 2 : archive to hdfs AND send to step 3
Step 3 : transform data
Step 4 : save transformed data to HDFS as input for M/R

I see. Well I think Spark Streaming may be well suited for that purpose.

To us it looks like a great flaw if, in streaming mode, spark-streaming cannot 
slow down it’s consumption depending on the available resources.

On Mon, Aug 11, 2014 at 10:10 PM, Gwenhael Pasquiers 
gwenhael.pasqui...@ericsson.commailto:gwenhael.pasqui...@ericsson.com wrote:
I think the kind of self-regulating system you describe would be too difficult 
to implement and probably unreliable (even more with the fact that we have 
multiple slaves).

Isn't slow down its consumption depending on the available resources a 
self-regulating system? I don't see how you can adapt to available resources 
without measuring your execution time and then change how much you consume. Did 
you have any particular form of adaption in mind?

Tobias


Re: Transform RDD[List]

2014-08-12 Thread Sean Owen
Sure, just add .toList.sorted in there. Putting together in one big
expression:

val rdd = sc.parallelize(List(List(1,2,3,4,5),List(6,7,8,9,10)))
val result = 
rdd.flatMap(_.zipWithIndex).groupBy(_._2).values.map(_.map(_._1).toList.sorted)

List(2, 7)
List(1, 6)
List(4, 9)
List(3, 8)
List(5, 10)

On Tue, Aug 12, 2014 at 8:58 AM, Kevin Jung itsjb.j...@samsung.com wrote:
 Thanks for your answer.
 Yes, I want to transpose data.
 At this point, I have one more question.
 I tested it with
 RDD1
 List(1, 2, 3, 4, 5)
 List(6, 7, 8, 9, 10)
 List(11, 12, 13, 14, 15)
 List(16, 17, 18, 19, 20)

 And the result is...
 ArrayBuffer(11, 1, 16, 6)
 ArrayBuffer(2, 12, 7, 17)
 ArrayBuffer(3, 13, 18, 8)
 ArrayBuffer(9, 19, 4, 14)
 ArrayBuffer(15, 20, 10, 5)

 It collects well but the order is shuffled.
 Can I maintain the order?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: share/reuse off-heap persisted (tachyon) RDD in SparkContext or saveAsParquetFile on tachyon in SQLContext

2014-08-12 Thread chutium
spark.speculation was not set, any speculative execution on tachyon side?

tachyon-env.sh only changed following

export TACHYON_MASTER_ADDRESS=test01.zala
#export TACHYON_UNDERFS_ADDRESS=$TACHYON_HOME/underfs
export TACHYON_UNDERFS_ADDRESS=hdfs://test01.zala:8020
export TACHYON_WORKER_MEMORY_SIZE=16GB

test01.zala is master node for HDFS, tachyon, Spark, etc.
worker nodes are
test02.zala
test03.zala
test04.zala

spark-shell run on test02

after parquetFile.saveAsParquetFile(tachyon://test01.zala:19998/parquet_1)

i got FailedToCheckpointException with Failed to rename

but tfs lsr, there are some temporary files and metadata file

0.00 B08-11-2014 16:19:28:054 /parquet_1
881.00 B  08-11-2014 16:19:28:054  In Memory  /parquet_1/_metadata
0.00 B08-11-2014 16:19:28:314 /parquet_1/_temporary
0.00 B08-11-2014 16:19:28:314 /parquet_1/_temporary/0
0.00 B08-11-2014 16:19:28:931
/parquet_1/_temporary/0/_temporary
0.00 B08-11-2014 16:19:28:931
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_00_33
0.00 B08-11-2014 16:19:28:931  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_00_33/part-r-1.parquet
0.00 B08-11-2014 16:19:28:940
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_01_35
0.00 B08-11-2014 16:19:28:940  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_01_35/part-r-2.parquet
0.00 B08-11-2014 16:19:28:962
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_03_36
0.00 B08-11-2014 16:19:28:962  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_03_36/part-r-4.parquet
0.00 B08-11-2014 16:19:28:971
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_02_34
0.00 B08-11-2014 16:19:28:971  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_02_34/part-r-3.parquet
0.00 B08-11-2014 16:20:06:349
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_01_37
0.00 B08-11-2014 16:20:06:349  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_01_37/part-r-2.parquet
0.00 B08-11-2014 16:20:09:519
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_03_38
0.00 B08-11-2014 16:20:09:519  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_03_38/part-r-4.parquet
0.00 B08-11-2014 16:20:18:777
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_02_39
0.00 B08-11-2014 16:20:18:777  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_02_39/part-r-3.parquet
0.00 B08-11-2014 16:20:28:315
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_00_40
0.00 B08-11-2014 16:20:28:315  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_00_40/part-r-1.parquet
0.00 B08-11-2014 16:20:38:382
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_01_41
0.00 B08-11-2014 16:20:38:382  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_01_41/part-r-2.parquet
0.00 B08-11-2014 16:20:40:681
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_03_42
0.00 B08-11-2014 16:20:40:681  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_03_42/part-r-4.parquet
0.00 B08-11-2014 16:20:50:376
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_02_43
0.00 B08-11-2014 16:20:50:376  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_02_43/part-r-3.parquet
0.00 B08-11-2014 16:21:00:932
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_00_44
0.00 B08-11-2014 16:21:00:932  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_00_44/part-r-1.parquet
0.00 B08-11-2014 16:21:10:355
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_01_45
0.00 B08-11-2014 16:21:10:355  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_01_45/part-r-2.parquet
0.00 B08-11-2014 16:21:11:468
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_03_46
0.00 B08-11-2014 16:21:11:468  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_03_46/part-r-4.parquet
0.00 B08-11-2014 16:21:21:681
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_02_47
0.00 B08-11-2014 16:21:21:681  In Memory 
/parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_02_47/part-r-3.parquet
0.00 B08-11-2014 16:21:32:583

Re: share/reuse off-heap persisted (tachyon) RDD in SparkContext or saveAsParquetFile on tachyon in SQLContext

2014-08-12 Thread chutium
more interesting is if spark-shell started on master node (test01)

then

parquetFile.saveAsParquetFile(tachyon://test01.zala:19998/parquet_tablex)

14/08/12 11:42:06 INFO : initialize(tachyon://...
...
...
14/08/12 11:42:06 INFO : File does not exist:
tachyon://test01.zala:19998/parquet_tablex/_metadata
14/08/12 11:42:06 INFO : getWorkingDirectory: /
14/08/12 11:42:06 INFO :
create(tachyon://test01.zala:19998/parquet_tablex/_metadata, rw-r--r--,
true, 65536, 1, 33554432, null)
14/08/12 11:42:06 WARN : tachyon.home is not set. Using
/mnt/tachyon_default_home as the default value.
14/08/12 11:42:06 INFO : Trying to get local worker host : test01.zala
14/08/12 11:42:06 ERROR : No local worker on test01.zala
NoWorkerException(message:No local worker on test01.zala)
at
tachyon.thrift.MasterService$user_getWorker_result$user_getWorker_resultStandardScheme.read(MasterService.java:25675)
at
tachyon.thrift.MasterService$user_getWorker_result$user_getWorker_resultStandardScheme.read(MasterService.java:25652)
at
tachyon.thrift.MasterService$user_getWorker_result.read(MasterService.java:25591)
at
tachyon.org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
at
tachyon.thrift.MasterService$Client.recv_user_getWorker(MasterService.java:832)
at
tachyon.thrift.MasterService$Client.user_getWorker(MasterService.java:818)
at tachyon.master.MasterClient.user_getWorker(MasterClient.java:648)
at tachyon.worker.WorkerClient.connect(WorkerClient.java:199)
at tachyon.worker.WorkerClient.mustConnect(WorkerClient.java:360)
at
tachyon.worker.WorkerClient.getUserUfsTempFolder(WorkerClient.java:298)
at
tachyon.client.TachyonFS.createAndGetUserUfsTempFolder(TachyonFS.java:270)
at tachyon.client.FileOutStream.init(FileOutStream.java:72)
at tachyon.client.TachyonFile.getOutStream(TachyonFile.java:207)
at tachyon.hadoop.AbstractTFS.create(AbstractTFS.java:102)
at tachyon.hadoop.TFS.create(TFS.java:24)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:773)
at
parquet.hadoop.ParquetFileWriter.writeMetadataFile(ParquetFileWriter.java:344)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetTypes.scala:345)
at
org.apache.spark.sql.parquet.ParquetRelation$.createEmpty(ParquetRelation.scala:142)
at
org.apache.spark.sql.parquet.ParquetRelation$.create(ParquetRelation.scala:120)
at
org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:197)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:399)
at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:397)
at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:403)
at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:403)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:406)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:406)
at
org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(SchemaRDDLike.scala:77)
at
org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:103)
at $line12.$read$$iwC$$iwC$$iwC$$iwC.init(console:17)
at $line12.$read$$iwC$$iwC$$iwC.init(console:22)
at $line12.$read$$iwC$$iwC.init(console:24)
at $line12.$read$$iwC.init(console:26)
at $line12.$read.init(console:28)
at $line12.$read$.init(console:32)
at $line12.$read$.clinit(console)
at $line12.$eval$.init(console:7)
at $line12.$eval$.clinit(console)
at $line12.$eval.$print(console)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
at
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
at 

RE: KMeans - java.lang.IllegalArgumentException: requirement failed

2014-08-12 Thread Ge, Yao (Y.)
I figured it out. My indices parameters for the sparse vector are messed up. It 
is a good learning for me:
When use the Vectors.sparse(int size, int[] indices, double[] values) to 
generate a vector, size is the size of the whole vector, not just the size of 
the elements with value. The indices array will need to be in ascending order. 
In many cases, it probably easier to use other two forms of Vectors.sparse 
functions if the indices and value positions are not naturally sorted.

-Yao


From: Ge, Yao (Y.)
Sent: Monday, August 11, 2014 11:44 PM
To: 'u...@spark.incubator.apache.org'
Subject: KMeans - java.lang.IllegalArgumentException: requirement failed

I am trying to train a KMeans model with sparse vector with Spark 1.0.1.
When I run the training I got the following exception:
java.lang.IllegalArgumentException: requirement failed
at scala.Predef$.require(Predef.scala:221)
at 
org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:271)
at 
org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:398)
at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:372)
at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:366)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.mllib.clustering.KMeans$.findClosest(KMeans.scala:366)
at 
org.apache.spark.mllib.clustering.KMeans$.pointCost(KMeans.scala:389)
at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:269)
at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:268)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at 
scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:268)
at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:267)

What does this means? How do I troubleshoot this problem?
Thanks.

-Yao


Re: java.lang.StackOverflowError when calling count()

2014-08-12 Thread randylu
hi, TD. Thanks very much! I got it.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p11980.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



LDA in MLBase

2014-08-12 Thread Aslan Bekirov
Hi All,

I have a question regarding LDA topic modeling. I need to do topic modeling
on ad data.

Does MLBase supports LDA topic modeling?

Or any stable, tested LDA implementation on Spark?

BR,
Aslan


Re: Is there any way to control the parallelism in LogisticRegression

2014-08-12 Thread ZHENG, Xu-dong
Hi Xiangrui,

Thanks for your reply!

Yes, our data is very sparse, but RDD.repartition invoke
RDD.coalesce(numPartitions, shuffle = true) internally, so I think it has
the same effect with #2, right?

For CombineInputFormat, although I haven't tried it, but it sounds that it
will combine multiple partitions into a large partition if I cache it, so
same issues as #1?

For coalesce, could you share some best practice how to set the right
number of partitions to avoid locality problem?

Thanks!



On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng men...@gmail.com wrote:

 Assuming that your data is very sparse, I would recommend
 RDD.repartition. But if it is not the case and you don't want to
 shuffle the data, you can try a CombineInputFormat and then parse the
 lines into labeled points. Coalesce may cause locality problems if you
 didn't use the right number of partitions. -Xiangrui

 On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong dong...@gmail.com
 wrote:
  I think this has the same effect and issue with #1, right?
 
 
  On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen chenjiush...@gmail.com
  wrote:
 
  How about increase HDFS file extent size? like current value is 128M, we
  make it 512M or bigger.
 
 
  On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong dong...@gmail.com
  wrote:
 
  Hi all,
 
  We are trying to use Spark MLlib to train super large data (100M
 features
  and 5B rows). The input data in HDFS has ~26K partitions. By default,
 MLlib
  will create a task for every partition at each iteration. But because
 our
  dimensions are also very high, such large number of tasks will increase
  large network overhead to transfer the weight vector. So we want to
 reduce
  the number of tasks, we tried below ways:
 
  1. Coalesce partitions without shuffling, then cache.
 
  data.coalesce(numPartitions).cache()
 
  This works fine for relative small data, but when data is increasing
 and
  numPartitions is fixed, the size of one partition will be large. This
  introduces two issues: the first is, the larger partition will need
 larger
  object and more memory at runtime, and trigger GC more frequently; the
  second is, we meet the issue 'size exceeds integer.max_value' error,
 which
  seems be caused by the size of one partition larger than 2G
  (https://issues.apache.org/jira/browse/SPARK-1391).
 
  2. Coalesce partitions with shuffling, then cache.
 
  data.coalesce(numPartitions, true).cache()
 
  It could mitigate the second issue in #1 at some degree, but fist issue
  is still there, and it also will introduce large amount of shullfling.
 
  3. Cache data first, and coalesce partitions.
 
  data.cache().coalesce(numPartitions)
 
  In this way, the number of cached partitions is not change, but each
 task
  read the data from multiple partitions. However, I find the task will
 loss
  locality by this way. I find a lot of 'ANY' tasks, that means that
 tasks
  read data from other nodes, and become slower than that read data from
 local
  memory.
 
  I think the best way should like #3, but leverage locality as more as
  possible. Is there any way to do that? Any suggestions?
 
  Thanks!
 
  --
  ZHENG, Xu-dong
 
 
 
 
 
  --
  郑旭东
  ZHENG, Xu-dong
 




-- 
郑旭东
ZHENG, Xu-dong


Fwd: how to split RDD by key and save to different path

2014-08-12 Thread Fengyun RAO
1. be careful, HDFS are better for large files, not bunches of small files.

2. if that's really what you want, roll it your own.

def writeLines(iterator: Iterator[(String, String)]) = {
  val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) map
  try {
  while (iterator.hasNext) {
val item = iterator.next()
val key = item._1
val line = item._2
val writer = writers.get(key) match {
  case Some(writer) = writer
  case None =
val path = arg(1) + key
val outputStream = FileSystem.get(new
Configuration()).create(new Path(path))
writer = new BufferedWriter(outputStream)
}
writer.writeLine(line)
} finally {
writers.values.foreach(._close())
}
}

val inputData = sc.textFile()
val keyValue = inputData.map(line = (key, line))
val partitions = keValue.partitionBy(new MyPartition(10))
partitions.foreachPartition(writeLines)


class MyPartitioner(partitions: Int) extends Partitioner {
override def numPartitions: Int = partitions

override def getPartition(key: Any): Int = {
(key.toString.hashCode  Integer.MAX_VALUE) % numPartitions //
make sure lines with the same key in the same partition
}
}



2014-08-12 21:34 GMT+08:00 Fengyun RAO raofeng...@gmail.com:

 1. be careful, HDFS are better for large files, not bunches of small files.

 2. if that's really what you want, roll it your own.

 def writeAvro(iterator: Iterator[(String, String)]) = {
   val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) 
 map
   try {
   while (iterator.hasNext) {
 val item = iterator.next()
 val key = item._1
 val line = item._2
 val writer = writers.get(key) match {
   case Some(writer) = writer
   case None =
 val path = arg(1) + key
 val outputStream = FileSystem.get(new Configuration()).create(new 
 Path(path))
 writer = new BufferedWriter(outputStream)
 }
 writer.writeLine(line)
 } finally {
 writers.values.foreach(._close())
 }
 }

 val inputData = sc.textFile()
 val keyValue = inputData.map(line = (key, line))
 val partitions = keValue.partitionBy(new MyPartition(10))
 partitions.foreachPartition(writeLines)


 class MyPartitioner(partitions: Int) extends Partitioner {
 override def numPartitions: Int = partitions

 override def getPartition(key: Any): Int = {
 (key.toString.hashCode  Integer.MAX_VALUE) % numPartitions // make 
 sure lines with the same key in the same partition

 }
 }



 2014-08-11 20:42 GMT+08:00 诺铁 noty...@gmail.com:

 hi,

 I have googled and find similar question without good answer,
 http://stackoverflow.com/questions/24520225/writing-to-hadoop-distributed-file-system-multiple-times-with-spark

 in short, I would like to separate raw data and divide by some key, for
 example, create date, and put the in directory named by date, so that I can
 easily access portion of data later.

  for now I have to extract all keys and then filter by key and save to
 file repeatly. are there any good way to do this?  or maybe I shouldn't do
 such thing?





Re: Spark SQL JDBC

2014-08-12 Thread John Omernik
Yin helped me with that, and I appreciate the onlist followup.  A few
questions: Why is this the case?  I guess, does building it with
thriftserver add much more time/size to the final build? It seems that
unless documented well, people will miss that and this situation would
occur, why would we not just build the thrift server in? (I am not a
programming expert, and not trying to judge the decision to have it in a
separate profile, I would just like to understand why it'd done that way)




On Mon, Aug 11, 2014 at 11:47 AM, Cheng Lian lian.cs@gmail.com wrote:

 Hi John, the JDBC Thrift server resides in its own build profile and need
 to be enabled explicitly by ./sbt/sbt -Phive-thriftserver assembly.
 ​


 On Tue, Aug 5, 2014 at 4:54 AM, John Omernik j...@omernik.com wrote:

 I am using spark-1.1.0-SNAPSHOT right now and trying to get familiar with
 the JDBC thrift server.  I have everything compiled correctly, I can access
 data in spark-shell on yarn from my hive installation. Cached tables, etc
 all work.

 When I execute ./sbin/start-thriftserver.sh

 I get the error below. Shouldn't it just ready my spark-env? I guess I am
 lost on how to make this work.

 Thanks1

 $ ./start-thriftserver.sh


 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath

 Exception in thread main java.lang.ClassNotFoundException:
 org.apache.spark.sql.hive.thriftserver.HiveThriftServer2

 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)

 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)

 at java.security.AccessController.doPrivileged(Native Method)

 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)

 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)

 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)

 at java.lang.Class.forName0(Native Method)

 at java.lang.Class.forName(Class.java:270)

 at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:311)

 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:73)

 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)





Re: saveAsTextFiles file not found exception

2014-08-12 Thread Chen Song
Thanks for putting this together, Andrew.


On Tue, Aug 12, 2014 at 2:11 AM, Andrew Ash and...@andrewash.com wrote:

 Hi Chen,

 Please see the bug I filed at
 https://issues.apache.org/jira/browse/SPARK-2984 with the
 FileNotFoundException on _temporary directory issue.

 Andrew


 On Mon, Aug 11, 2014 at 10:50 PM, Andrew Ash and...@andrewash.com wrote:

 Not sure which stalled HDFS client issue your'e referring to, but there
 was one fixed in Spark 1.0.2 that could help you out --
 https://github.com/apache/spark/pull/1409.  I've still seen one related
 to Configuration objects not being threadsafe though so you'd still need to
 keep speculation on to fix that (SPARK-2546)

 As it stands now, I can:

 A) have speculation off, in which case I get random hangs for a variety
 of reasons (your HDFS stall, my Configuration safety issue)

 or

 B) have speculation on, in which case I get random failures related to
 LeaseExpiredExceptions and .../_temporary/... file doesn't exist exceptions.


 Kind of a catch-22 -- there's no reliable way to run large jobs on Spark
 right now!

 I'm going to file a bug for the _temporary and LeaseExpiredExceptions as
 I think these are widespread enough that we need a place to track a
 resolution.


 On Mon, Aug 11, 2014 at 9:08 AM, Chen Song chen.song...@gmail.com
 wrote:

 Andrew that is a good finding.

 Yes, I have speculative execution turned on, becauseI saw tasks stalled
 on HDFS client.

 If I turned off speculative execution, is there a way to circumvent the
 hanging task issue?



 On Mon, Aug 11, 2014 at 11:13 AM, Andrew Ash and...@andrewash.com
 wrote:

 I've also been seeing similar stacktraces on Spark core (not streaming)
 and have a theory it's related to spark.speculation being turned on.  Do
 you have that enabled by chance?


 On Mon, Aug 11, 2014 at 8:10 AM, Chen Song chen.song...@gmail.com
 wrote:

 Bill

 Did you get this resolved somehow? Anyone has any insight into this
 problem?

 Chen


 On Mon, Aug 11, 2014 at 10:30 AM, Chen Song chen.song...@gmail.com
 wrote:

 The exception was thrown out in application master(spark streaming
 driver) and the job shut down after this exception.


 On Mon, Aug 11, 2014 at 10:29 AM, Chen Song chen.song...@gmail.com
 wrote:

 I got the same exception after the streaming job runs for a while,
 The ERROR message was complaining about a temp file not being found in 
 the
 output folder.

 14/08/11 08:05:08 ERROR JobScheduler: Error running job streaming
 job 140774430 ms.0
 java.io.FileNotFoundException: File
 hdfs://hadoopc/user/csong/output/human_bot/-140774430.out/_temporary/0/task_201408110805__m_07
 does not exist.
 at
 org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:654)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:102)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:712)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:708)
 at
 org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:708)
 at
 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360)
 at
 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
 at
 org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
 at
 org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:841)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:724)
 at
 org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:643)
 at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1068)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$8.apply(DStream.scala:773)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$8.apply(DStream.scala:771)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
 at
 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at 

Re: anaconda and spark integration

2014-08-12 Thread Oleg Ruchovets
Hello.
Is there an integration spark ( pyspark) with anaconda?
I googled a lot and didn't find relevant information.
Could you please pointing me on tutorial or simple example.

Thanks in advance
Oleg.


RE: Benchmark on physical Spark cluster

2014-08-12 Thread Mozumder, Monir
An on-list follow up: http://prof.ict.ac.cn/BigDataBench/#Benchmarks looks 
promising as it has spark as one of the platforms used.

Bests,
-Monir


From: Mozumder, Monir
Sent: Monday, August 11, 2014 7:18 PM
To: user@spark.apache.org
Subject: Benchmark on physical Spark cluster

I am trying to get some workloads or benchmarks for running on a physical spark 
cluster and find relative speedups on different physical clusters.

The instructions at 
https://databricks.com/blog/2014/02/12/big-data-benchmark.html uses Amazon EC2. 
I was wondering if anyone got other benchmarks for spark on physical clusters. 
Hoping to get a CloudSuite like suite for Spark.

Bests,
-Monir


Re: Is there any way to control the parallelism in LogisticRegression

2014-08-12 Thread Xiangrui Meng
Sorry, I missed #2. My suggestion is the same as #2. You need to set a
bigger numPartitions to avoid hitting integer bound or 2G limitation,
at the cost of increased shuffle size per iteration. If you use a
CombineInputFormat and then cache, it will try to give you roughly the
same size per partition. There will be some remote fetches from HDFS
but still cheaper than calling RDD.repartition().

For coalesce without shuffle, I don't know how to set the right number
of partitions either ...

-Xiangrui

On Tue, Aug 12, 2014 at 6:16 AM, ZHENG, Xu-dong dong...@gmail.com wrote:
 Hi Xiangrui,

 Thanks for your reply!

 Yes, our data is very sparse, but RDD.repartition invoke
 RDD.coalesce(numPartitions, shuffle = true) internally, so I think it has
 the same effect with #2, right?

 For CombineInputFormat, although I haven't tried it, but it sounds that it
 will combine multiple partitions into a large partition if I cache it, so
 same issues as #1?

 For coalesce, could you share some best practice how to set the right number
 of partitions to avoid locality problem?

 Thanks!



 On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng men...@gmail.com wrote:

 Assuming that your data is very sparse, I would recommend
 RDD.repartition. But if it is not the case and you don't want to
 shuffle the data, you can try a CombineInputFormat and then parse the
 lines into labeled points. Coalesce may cause locality problems if you
 didn't use the right number of partitions. -Xiangrui

 On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong dong...@gmail.com
 wrote:
  I think this has the same effect and issue with #1, right?
 
 
  On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen chenjiush...@gmail.com
  wrote:
 
  How about increase HDFS file extent size? like current value is 128M,
  we
  make it 512M or bigger.
 
 
  On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong dong...@gmail.com
  wrote:
 
  Hi all,
 
  We are trying to use Spark MLlib to train super large data (100M
  features
  and 5B rows). The input data in HDFS has ~26K partitions. By default,
  MLlib
  will create a task for every partition at each iteration. But because
  our
  dimensions are also very high, such large number of tasks will
  increase
  large network overhead to transfer the weight vector. So we want to
  reduce
  the number of tasks, we tried below ways:
 
  1. Coalesce partitions without shuffling, then cache.
 
  data.coalesce(numPartitions).cache()
 
  This works fine for relative small data, but when data is increasing
  and
  numPartitions is fixed, the size of one partition will be large. This
  introduces two issues: the first is, the larger partition will need
  larger
  object and more memory at runtime, and trigger GC more frequently; the
  second is, we meet the issue 'size exceeds integer.max_value' error,
  which
  seems be caused by the size of one partition larger than 2G
  (https://issues.apache.org/jira/browse/SPARK-1391).
 
  2. Coalesce partitions with shuffling, then cache.
 
  data.coalesce(numPartitions, true).cache()
 
  It could mitigate the second issue in #1 at some degree, but fist
  issue
  is still there, and it also will introduce large amount of shullfling.
 
  3. Cache data first, and coalesce partitions.
 
  data.cache().coalesce(numPartitions)
 
  In this way, the number of cached partitions is not change, but each
  task
  read the data from multiple partitions. However, I find the task will
  loss
  locality by this way. I find a lot of 'ANY' tasks, that means that
  tasks
  read data from other nodes, and become slower than that read data from
  local
  memory.
 
  I think the best way should like #3, but leverage locality as more as
  possible. Is there any way to do that? Any suggestions?
 
  Thanks!
 
  --
  ZHENG, Xu-dong
 
 
 
 
 
  --
  郑旭东
  ZHENG, Xu-dong
 




 --
 郑旭东
 ZHENG, Xu-dong


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL JDBC

2014-08-12 Thread Michael Armbrust
Hive pulls in a ton of dependencies that we were afraid would break
existing spark applications.  For this reason all hive submodules are
optional.


On Tue, Aug 12, 2014 at 7:43 AM, John Omernik j...@omernik.com wrote:

 Yin helped me with that, and I appreciate the onlist followup.  A few
 questions: Why is this the case?  I guess, does building it with
 thriftserver add much more time/size to the final build? It seems that
 unless documented well, people will miss that and this situation would
 occur, why would we not just build the thrift server in? (I am not a
 programming expert, and not trying to judge the decision to have it in a
 separate profile, I would just like to understand why it'd done that way)




 On Mon, Aug 11, 2014 at 11:47 AM, Cheng Lian lian.cs@gmail.com
 wrote:

 Hi John, the JDBC Thrift server resides in its own build profile and need
 to be enabled explicitly by ./sbt/sbt -Phive-thriftserver assembly.
 ​


 On Tue, Aug 5, 2014 at 4:54 AM, John Omernik j...@omernik.com wrote:

 I am using spark-1.1.0-SNAPSHOT right now and trying to get familiar
 with the JDBC thrift server.  I have everything compiled correctly, I can
 access data in spark-shell on yarn from my hive installation. Cached
 tables, etc all work.

 When I execute ./sbin/start-thriftserver.sh

 I get the error below. Shouldn't it just ready my spark-env? I guess I
 am lost on how to make this work.

 Thanks1

 $ ./start-thriftserver.sh


 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath

 Exception in thread main java.lang.ClassNotFoundException:
 org.apache.spark.sql.hive.thriftserver.HiveThriftServer2

 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)

 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)

 at java.security.AccessController.doPrivileged(Native Method)

 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)

 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)

 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)

 at java.lang.Class.forName0(Native Method)

 at java.lang.Class.forName(Class.java:270)

 at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:311)

 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:73)

 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)






Reference External Variables in Map Function (Inner class)

2014-08-12 Thread Sunny Khatri
I have a class defining an inner static class (map function). The inner
class tries to refer the variable instantiated in the outside class, which
results in a NullPointerException. Sample Code as follows:

class SampleOuterClass {

 private static ArrayListString someVariable;

 SampleOuterClass() {
   // initialize someVariable
 }

public static class Parse implements Function... {
  public TypeReturn call (...) {
  // Try using someVariable: *Raises NullPointerException*
  }
}

public void run() {
RDD rdd = data.map(new Parse()).rdd()
}
}

Am I missing something with how Closures work with Spark or something else
is wrong ?

Thanks
Sunny


Re: Reference External Variables in Map Function (Inner class)

2014-08-12 Thread Sunny Khatri
Are there any other workarounds that could be used to pass in the values
from *someVariable *to the transformation function ?


On Tue, Aug 12, 2014 at 10:48 AM, Sean Owen so...@cloudera.com wrote:

 I don't think static members are going to be serialized in the
 closure? the instance of Parse will be looking at its local
 SampleOuterClass, which is maybe not initialized on the remote JVM.

 On Tue, Aug 12, 2014 at 6:02 PM, Sunny Khatri sunny.k...@gmail.com
 wrote:
  I have a class defining an inner static class (map function). The inner
  class tries to refer the variable instantiated in the outside class,
 which
  results in a NullPointerException. Sample Code as follows:
 
  class SampleOuterClass {
 
   private static ArrayListString someVariable;
 
   SampleOuterClass() {
 // initialize someVariable
   }
 
  public static class Parse implements Function... {
public TypeReturn call (...) {
// Try using someVariable: Raises NullPointerException
}
  }
 
  public void run() {
  RDD rdd = data.map(new Parse()).rdd()
  }
  }
 
  Am I missing something with how Closures work with Spark or something
 else
  is wrong ?
 
  Thanks
  Sunny
 
 



Re: Reference External Variables in Map Function (Inner class)

2014-08-12 Thread Marcelo Vanzin
You could create a copy of the variable inside your Parse class;
that way it would be serialized with the instance you create when
calling map() below.

On Tue, Aug 12, 2014 at 10:56 AM, Sunny Khatri sunny.k...@gmail.com wrote:
 Are there any other workarounds that could be used to pass in the values
 from someVariable to the transformation function ?


 On Tue, Aug 12, 2014 at 10:48 AM, Sean Owen so...@cloudera.com wrote:

 I don't think static members are going to be serialized in the
 closure? the instance of Parse will be looking at its local
 SampleOuterClass, which is maybe not initialized on the remote JVM.

 On Tue, Aug 12, 2014 at 6:02 PM, Sunny Khatri sunny.k...@gmail.com
 wrote:
  I have a class defining an inner static class (map function). The inner
  class tries to refer the variable instantiated in the outside class,
  which
  results in a NullPointerException. Sample Code as follows:
 
  class SampleOuterClass {
 
   private static ArrayListString someVariable;
 
   SampleOuterClass() {
 // initialize someVariable
   }
 
  public static class Parse implements Function... {
public TypeReturn call (...) {
// Try using someVariable: Raises NullPointerException
}
  }
 
  public void run() {
  RDD rdd = data.map(new Parse()).rdd()
  }
  }
 
  Am I missing something with how Closures work with Spark or something
  else
  is wrong ?
 
  Thanks
  Sunny
 
 





-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



DistCP - Spark-based

2014-08-12 Thread Gary Malouf
We are probably still the minority, but our analytics platform based on
Spark + HDFS does not have map/reduce installed.  I'm wondering if there is
a distcp equivalent that leverages Spark to do the work.

Our team is trying to find the best way to do cross-datacenter replication
of our HDFS data to minimize the impact of outages/dc failure.


Re: Reference External Variables in Map Function (Inner class)

2014-08-12 Thread Sunny Khatri
That should work. Gonna give it a try. Thanks !


On Tue, Aug 12, 2014 at 11:01 AM, Marcelo Vanzin van...@cloudera.com
wrote:

 You could create a copy of the variable inside your Parse class;
 that way it would be serialized with the instance you create when
 calling map() below.

 On Tue, Aug 12, 2014 at 10:56 AM, Sunny Khatri sunny.k...@gmail.com
 wrote:
  Are there any other workarounds that could be used to pass in the values
  from someVariable to the transformation function ?
 
 
  On Tue, Aug 12, 2014 at 10:48 AM, Sean Owen so...@cloudera.com wrote:
 
  I don't think static members are going to be serialized in the
  closure? the instance of Parse will be looking at its local
  SampleOuterClass, which is maybe not initialized on the remote JVM.
 
  On Tue, Aug 12, 2014 at 6:02 PM, Sunny Khatri sunny.k...@gmail.com
  wrote:
   I have a class defining an inner static class (map function). The
 inner
   class tries to refer the variable instantiated in the outside class,
   which
   results in a NullPointerException. Sample Code as follows:
  
   class SampleOuterClass {
  
private static ArrayListString someVariable;
  
SampleOuterClass() {
  // initialize someVariable
}
  
   public static class Parse implements Function... {
 public TypeReturn call (...) {
 // Try using someVariable: Raises
 NullPointerException
 }
   }
  
   public void run() {
   RDD rdd = data.map(new Parse()).rdd()
   }
   }
  
   Am I missing something with how Closures work with Spark or something
   else
   is wrong ?
  
   Thanks
   Sunny
  
  
 
 



 --
 Marcelo



Jobs get stuck at reduceByKey stage with spark 1.0.1

2014-08-12 Thread Shivani Rao
Hello spark aficionados,

We upgraded from spark 1.0.0 to 1.0.1 when the new release came out and
started noticing some weird errors. Even a simple operation like
reduceByKey or count on an RDD gets stuck in cluster mode. This issue
does not occur with spark 1.0.0 (in cluster or local mode)  or spark 1.0.2
(in cluster or local mode) or with spark 1.0.1 (in local mode).  I looked
at the spark release notes and it did not seem like the gigantic task
size issue is a problem because we have tons of resources on this cluster.

Has anyone else encountered this issue before?

Thanks in advance for your help,
Shivani


-- 
Software Engineer
Analytics Engineering Team@ Box
Mountain View, CA


Re: Spark Hbase job taking long time

2014-08-12 Thread Amit Singh Hora
Hi ,
Today i created a table with 3 regions and 2 jobtrackers but still the
spark job is taking lot of time
I also noticed one thing that is the memory of client was increasing
linearly is it like spark job was first bringing the complete data in
memory?


On Thu, Aug 7, 2014 at 7:31 PM, Ted Yu [via Apache Spark User List] 
ml-node+s1001560n11651...@n3.nabble.com wrote:

 Forgot to include user@

 Another email from Amit indicated that there is 1 region in his table.
 This wouldn't give you the benefit TableInputFormat is expected to deliver.

 Please split your table into multiple regions.

 See http://hbase.apache.org/book.html#d3593e6847 and related links.

 Cheers


 On Wed, Aug 6, 2014 at 6:41 AM, Ted Yu [hidden email]
 http://user/SendEmail.jtp?type=nodenode=11651i=0 wrote:

 Can you try specifying some value (100, e.g.) for
 hbase.mapreduce.scan.cachedrows in your conf ?

 bq.  table contains 10lakh rows

 How many rows are there in the table ?

 nit: Example uses classOf[TableInputFormat] instead of
 TableInputFormat.class.

 Cheers


 On Wed, Aug 6, 2014 at 5:54 AM, Amit Singh Hora [hidden email]
 http://user/SendEmail.jtp?type=nodenode=11651i=1 wrote:

 Hi All,

 I am trying to run a SQL query on HBase using spark job ,till now i am
 able
 to get the desierd results but as the data set size increases Spark job
 is
 taking a long time
 I believe i am doing something wrong,as after going through documentation
 and videos discussing on  spark performance  it should not take more then
 couple of seconds.

 PFB code snippet
 HBase table contains 10lakh rows

 JavaPairRDDImmutableBytesWritable, Result pairRdd = ctx
 .newAPIHadoopRDD(conf,
 TableInputFormat.class,

 ImmutableBytesWritable.class,

 org.apache.hadoop.hbase.client.Result.class).cache();

 JavaRDDPerson people = pairRdd
 .map(new
 FunctionTuple2lt;ImmutableBytesWritable, Result, Person() {

 public Person
 call(Tuple2ImmutableBytesWritable, Result v1)
 throws Exception
 {

 System.out.println(comming);
 Person person = new
 Person();
 String
 key=Bytes.toString(v1._2.getRow());

 key=key.substring(0,key.lastIndexOf(_));

 person.setCalling(Long.parseLong(key));

 person.setCalled(Bytes.toLong(v1._2.getValue(

 Bytes.toBytes(si), Bytes.toBytes(called;

 person.setTime(Bytes.toLong(v1._2.getValue(

 Bytes.toBytes(si), Bytes.toBytes(at;

 return person;
 }
 });
 JavaSchemaRDD schemaPeople = sqlCtx.applySchema(people, Person.class);
 schemaPeople.registerAsTable(people);

 // SQL can be run over RDDs that have been registered as
 tables.
 JavaSchemaRDD teenagers = sqlCtx
 .sql(SELECT count(*) from people group
 by calling);
 teenagers.printSchema();


 I am running spark using start-all.sh script with 2 workers

 Any pointers will be of a great help
 Regards,





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Hbase-job-taking-long-time-tp11541.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: [hidden email]
 http://user/SendEmail.jtp?type=nodenode=11651i=2
 For additional commands, e-mail: [hidden email]
 http://user/SendEmail.jtp?type=nodenode=11651i=3





 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Hbase-job-taking-long-time-tp11541p11651.html
  To unsubscribe from Spark Hbase job taking long time, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=11541code=aG9yYS5hbWl0QGdtYWlsLmNvbXwxMTU0MXw4OTIzNDIwNzY=
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Hbase-job-taking-long-time-tp11541p11998.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Spark Streaming example on your mesos cluster

2014-08-12 Thread Zia Syed
Hi,

I'm trying to run streaming.NetworkWordCount example on the Mesos Cluster
(0.19.1). I am able to run SparkPi examples on my mesos cluster, can run
the streaming example in local[n] mode, but no luck so far with the Spark
streaming examples running my mesos cluster.

I dont particularly see any errors on my logs, either on console, or on
slaves. I see slave downloads the  spark-1.0.2-bin-hadoop1.tgz file and
unpacks them as well. Mesos Master shows quiet alot of Tasks created and
Finished.  I dont see any output on my console of the Word Counts, like in
get in the Spark version.

Any suggestions/ideas how i can make it work?

Thanks,
Zia


Re: Spark sql failed in yarn-cluster mode when connecting to non-default hive database

2014-08-12 Thread Yin Huai
Hi Jenny,

Have you copied hive-site.xml to spark/conf directory? If not, can you put
it in conf/ and try again?

Thanks,

Yin


On Mon, Aug 11, 2014 at 8:57 PM, Jenny Zhao linlin200...@gmail.com wrote:


 Thanks Yin!

 here is my hive-site.xml,  which I copied from $HIVE_HOME/conf, didn't
 experience problem connecting to the metastore through hive. which uses DB2
 as metastore database.

 ?xml version=1.0?
 ?xml-stylesheet type=text/xsl href=configuration.xsl?
 !--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements.  See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the License); you may not use this file except in compliance with
the License.  You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an AS IS BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
 --
 configuration
  property
   namehive.hwi.listen.port/name
   value/value
  /property
  property
   namehive.querylog.location/name
   value/var/ibm/biginsights/hive/query/${user.name}/value
  /property
  property
   namehive.metastore.warehouse.dir/name
   value/biginsights/hive/warehouse/value
  /property
  property
   namehive.hwi.war.file/name
   valuelib/hive-hwi-0.12.0.war/value
  /property
  property
   namehive.metastore.metrics.enabled/name
   valuetrue/value
  /property
  property
   namejavax.jdo.option.ConnectionURL/name
   valuejdbc:db2://hdtest022.svl.ibm.com:50001/BIDB/value
  /property
  property
   namejavax.jdo.option.ConnectionDriverName/name
   valuecom.ibm.db2.jcc.DB2Driver/value
  /property
  property
   namehive.stats.autogather/name
   valuefalse/value
  /property
  property
   namejavax.jdo.mapping.Schema/name
   valueHIVE/value
  /property
  property
   namejavax.jdo.option.ConnectionUserName/name
   valuecatalog/value
  /property
  property
   namejavax.jdo.option.ConnectionPassword/name
   valueV2pJNWMxbFlVbWhaZHowOQ==/value
  /property
  property
   namehive.metastore.password.encrypt/name
   valuetrue/value
  /property
  property
   nameorg.jpox.autoCreateSchema/name
   valuetrue/value
  /property
  property
   namehive.server2.thrift.min.worker.threads/name
   value5/value
  /property
  property
   namehive.server2.thrift.max.worker.threads/name
   value100/value
  /property
  property
   namehive.server2.thrift.port/name
   value1/value
  /property
  property
   namehive.server2.thrift.bind.host/name
   valuehdtest022.svl.ibm.com/value
  /property
  property
   namehive.server2.authentication/name
   valueCUSTOM/value
  /property
  property
   namehive.server2.custom.authentication.class/name

 valueorg.apache.hive.service.auth.WebConsoleAuthenticationProviderImpl/value
  /property
  property
   namehive.server2.enable.impersonation/name
   valuetrue/value
  /property
  property
   namehive.security.webconsole.url/name
   valuehttp://hdtest022.svl.ibm.com:8080/value
  /property
  property
   namehive.security.authorization.enabled/name
   valuetrue/value
  /property
  property
   namehive.security.authorization.createtable.owner.grants/name
   valueALL/value
  /property
 /configuration



 On Mon, Aug 11, 2014 at 4:29 PM, Yin Huai huaiyin@gmail.com wrote:

 Hi Jenny,

 How's your metastore configured for both Hive and Spark SQL? Which
 metastore mode are you using (based on
 https://cwiki.apache.org/confluence/display/Hive/AdminManual+MetastoreAdmin
 )?

 Thanks,

 Yin


 On Mon, Aug 11, 2014 at 6:15 PM, Jenny Zhao linlin200...@gmail.com
 wrote:



 you can reproduce this issue with the following steps (assuming you have
 Yarn cluster + Hive 12):

 1) using hive shell, create a database, e.g: create database ttt

 2) write a simple spark sql program

 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql._
 import org.apache.spark.sql.hive.HiveContext

 object HiveSpark {
   case class Record(key: Int, value: String)

   def main(args: Array[String]) {
 val sparkConf = new SparkConf().setAppName(HiveSpark)
 val sc = new SparkContext(sparkConf)

 // A hive context creates an instance of the Hive Metastore in
 process,
 val hiveContext = new HiveContext(sc)
 import hiveContext._

 hql(use ttt)
 hql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING))
 hql(LOAD DATA INPATH '/user/biadmin/kv1.txt' INTO TABLE src)

 // Queries are expressed in HiveQL
 println(Result of 'SELECT *': )
 hql(SELECT * FROM src).collect.foreach(println)
 sc.stop()
   }
 }
 3) run it in yarn-cluster mode.


 On Mon, Aug 11, 2014 at 9:44 AM, Cheng Lian 

Re: DistCP - Spark-based

2014-08-12 Thread Matei Zaharia
Good question; I don't know of one but I believe people at Cloudera had some 
thoughts of porting Sqoop to Spark in the future, and maybe they'd consider 
DistCP as part of this effort. I agree it's missing right now.

Matei

On August 12, 2014 at 11:04:28 AM, Gary Malouf (malouf.g...@gmail.com) wrote:

We are probably still the minority, but our analytics platform based on Spark + 
HDFS does not have map/reduce installed.  I'm wondering if there is a distcp 
equivalent that leverages Spark to do the work.

Our team is trying to find the best way to do cross-datacenter replication of 
our HDFS data to minimize the impact of outages/dc failure.  

how to access workers from spark context

2014-08-12 Thread S. Zhou
I tried to access worker info from spark context but it seems spark context 
does no expose such API. The reason of doing that is: it seems spark context 
itself does not have logic to detect if its workers are in dead status. So I 
like to add such logic by myself. 

BTW, it seems spark web UI has some logic of detecting dead workers. But all 
relevant classes are declared as private for the spark package scope. 

Please let me know how to solve this issue (or if there is an alternative way 
to achieve the same purpose)

Thanks


Re: how to split RDD by key and save to different path

2014-08-12 Thread 诺铁
understand, thank you
small file is a problem, I am considering process data before put them in
hdfs.


On Tue, Aug 12, 2014 at 9:37 PM, Fengyun RAO raofeng...@gmail.com wrote:

 1. be careful, HDFS are better for large files, not bunches of small files.

 2. if that's really what you want, roll it your own.

 def writeLines(iterator: Iterator[(String, String)]) = {
   val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) 
 map
   try {
   while (iterator.hasNext) {
 val item = iterator.next()
 val key = item._1
 val line = item._2
 val writer = writers.get(key) match {
   case Some(writer) = writer
   case None =
 val path = arg(1) + key
 val outputStream = FileSystem.get(new Configuration()).create(new 
 Path(path))
 writer = new BufferedWriter(outputStream)
 }
 writer.writeLine(line)
 } finally {
 writers.values.foreach(._close())
 }
 }

 val inputData = sc.textFile()
 val keyValue = inputData.map(line = (key, line))
 val partitions = keValue.partitionBy(new MyPartition(10))
 partitions.foreachPartition(writeLines)


 class MyPartitioner(partitions: Int) extends Partitioner {
 override def numPartitions: Int = partitions

 override def getPartition(key: Any): Int = {
 (key.toString.hashCode  Integer.MAX_VALUE) % numPartitions // make 
 sure lines with the same key in the same partition
 }
 }



 2014-08-12 21:34 GMT+08:00 Fengyun RAO raofeng...@gmail.com:

 1. be careful, HDFS are better for large files, not bunches of small files.

 2. if that's really what you want, roll it your own.

 def writeAvro(iterator: Iterator[(String, String)]) = {
   val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) 
 map
   try {
   while (iterator.hasNext) {
 val item = iterator.next()
 val key = item._1
 val line = item._2
 val writer = writers.get(key) match {
   case Some(writer) = writer
   case None =
 val path = arg(1) + key
 val outputStream = FileSystem.get(new Configuration()).create(new 
 Path(path))
 writer = new BufferedWriter(outputStream)
 }
 writer.writeLine(line)
 } finally {
 writers.values.foreach(._close())
 }
 }

 val inputData = sc.textFile()
 val keyValue = inputData.map(line = (key, line))
 val partitions = keValue.partitionBy(new MyPartition(10))
 partitions.foreachPartition(writeLines)


 class MyPartitioner(partitions: Int) extends Partitioner {
 override def numPartitions: Int = partitions

 override def getPartition(key: Any): Int = {
 (key.toString.hashCode  Integer.MAX_VALUE) % numPartitions // make 
 sure lines with the same key in the same partition

 }
 }



 2014-08-11 20:42 GMT+08:00 诺铁 noty...@gmail.com:

 hi,

 I have googled and find similar question without good answer,
 http://stackoverflow.com/questions/24520225/writing-to-hadoop-distributed-file-system-multiple-times-with-spark

 in short, I would like to separate raw data and divide by some key, for
 example, create date, and put the in directory named by date, so that I can
 easily access portion of data later.

  for now I have to extract all keys and then filter by key and save to
 file repeatly. are there any good way to do this?  or maybe I shouldn't do
 such thing?






Re: Spark sql failed in yarn-cluster mode when connecting to non-default hive database

2014-08-12 Thread Jenny Zhao
Hi Yin,

hive-site.xml was copied to spark/conf and the same as the one under
$HIVE_HOME/conf.

through hive cli, I don't see any problem. but for spark on yarn-cluster
mode, I am not able to switch to a database other than the default one, for
Yarn-client mode, it works fine.

Thanks!

Jenny


On Tue, Aug 12, 2014 at 12:53 PM, Yin Huai huaiyin@gmail.com wrote:

 Hi Jenny,

 Have you copied hive-site.xml to spark/conf directory? If not, can you
 put it in conf/ and try again?

 Thanks,

 Yin


 On Mon, Aug 11, 2014 at 8:57 PM, Jenny Zhao linlin200...@gmail.com
 wrote:


 Thanks Yin!

 here is my hive-site.xml,  which I copied from $HIVE_HOME/conf, didn't
 experience problem connecting to the metastore through hive. which uses DB2
 as metastore database.

 ?xml version=1.0?
 ?xml-stylesheet type=text/xsl href=configuration.xsl?
 !--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements.  See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the License); you may not use this file except in compliance with
the License.  You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an AS IS BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 implied.
See the License for the specific language governing permissions and
limitations under the License.
 --
 configuration
  property
   namehive.hwi.listen.port/name
   value/value
  /property
  property
   namehive.querylog.location/name
   value/var/ibm/biginsights/hive/query/${user.name}/value
  /property
  property
   namehive.metastore.warehouse.dir/name
   value/biginsights/hive/warehouse/value
  /property
  property
   namehive.hwi.war.file/name
   valuelib/hive-hwi-0.12.0.war/value
  /property
  property
   namehive.metastore.metrics.enabled/name
   valuetrue/value
  /property
  property
   namejavax.jdo.option.ConnectionURL/name
   valuejdbc:db2://hdtest022.svl.ibm.com:50001/BIDB/value
  /property
  property
   namejavax.jdo.option.ConnectionDriverName/name
   valuecom.ibm.db2.jcc.DB2Driver/value
  /property
  property
   namehive.stats.autogather/name
   valuefalse/value
  /property
  property
   namejavax.jdo.mapping.Schema/name
   valueHIVE/value
  /property
  property
   namejavax.jdo.option.ConnectionUserName/name
   valuecatalog/value
  /property
  property
   namejavax.jdo.option.ConnectionPassword/name
   valueV2pJNWMxbFlVbWhaZHowOQ==/value
  /property
  property
   namehive.metastore.password.encrypt/name
   valuetrue/value
  /property
  property
   nameorg.jpox.autoCreateSchema/name
   valuetrue/value
  /property
  property
   namehive.server2.thrift.min.worker.threads/name
   value5/value
  /property
  property
   namehive.server2.thrift.max.worker.threads/name
   value100/value
  /property
  property
   namehive.server2.thrift.port/name
   value1/value
  /property
  property
   namehive.server2.thrift.bind.host/name
   valuehdtest022.svl.ibm.com/value
  /property
  property
   namehive.server2.authentication/name
   valueCUSTOM/value
  /property
  property
   namehive.server2.custom.authentication.class/name

 valueorg.apache.hive.service.auth.WebConsoleAuthenticationProviderImpl/value
  /property
  property
   namehive.server2.enable.impersonation/name
   valuetrue/value
  /property
  property
   namehive.security.webconsole.url/name
   valuehttp://hdtest022.svl.ibm.com:8080/value
  /property
  property
   namehive.security.authorization.enabled/name
   valuetrue/value
  /property
  property
   namehive.security.authorization.createtable.owner.grants/name
   valueALL/value
  /property
 /configuration



 On Mon, Aug 11, 2014 at 4:29 PM, Yin Huai huaiyin@gmail.com wrote:

 Hi Jenny,

 How's your metastore configured for both Hive and Spark SQL? Which
 metastore mode are you using (based on
 https://cwiki.apache.org/confluence/display/Hive/AdminManual+MetastoreAdmin
 )?

 Thanks,

 Yin


 On Mon, Aug 11, 2014 at 6:15 PM, Jenny Zhao linlin200...@gmail.com
 wrote:



 you can reproduce this issue with the following steps (assuming you
 have Yarn cluster + Hive 12):

 1) using hive shell, create a database, e.g: create database ttt

 2) write a simple spark sql program

 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql._
 import org.apache.spark.sql.hive.HiveContext

 object HiveSpark {
   case class Record(key: Int, value: String)

   def main(args: Array[String]) {
 val sparkConf = new SparkConf().setAppName(HiveSpark)
 val sc = new SparkContext(sparkConf)

 // A hive context creates an instance of the Hive Metastore in
 process,
 val hiveContext = new HiveContext(sc)
 import hiveContext._

 hql(use ttt)
   

Re: spark.files.userClassPathFirst=true Not Working Correctly

2014-08-12 Thread Marcelo Vanzin
Hi, sorry for the delay. Would you have yarn available to test? Given
the discussion in SPARK-2878, this might be a different incarnation of
the same underlying issue.

The option in Yarn is spark.yarn.user.classpath.first

On Mon, Aug 11, 2014 at 1:33 PM, DNoteboom dan...@wibidata.com wrote:
 I'm currently running on my local machine on standalone. The error shows up
 in my code when I am closing resources using the
 TaskContext.addOnCompleteCallBack. However, the cause of this error is
 because of a faulty classLoader which must occur in the Executor in the
 function createClassLoader.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-files-userClassPathFirst-true-Not-Working-Correctly-tp11917p11921.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Fwd: Task closures and synchronization

2014-08-12 Thread Tobias Pfeiffer
Uh, for some reason I don't seem to automatically reply to the list any
more.
Here is again my message to Tom.

-- Forwarded message --

Tom,

On Wed, Aug 13, 2014 at 5:35 AM, Tom Vacek minnesota...@gmail.com wrote:

 This is a back-to-basics question.  How do we know when Spark will clone
 an object and distribute it with task closures versus synchronize access to
 it.

 For example, the old rookie mistake of random number generation:

 import scala.util.Random
 val randRDD = sc.parallelize(0 until 1000).map(ii = Random.nextGaussian)

 One can check to see that each partition contains a different set of
 random numbers, so the RNG obviously was not cloned, but access was
 synchronized.


In this case, Random is a singleton object; Random.nextGaussian is like a
static method of a Java class. The access is not synchronized (unless I
misunderstand synchronized), but each Spark worker will use a JVM-local
instance of the Random object. You don't actually close over the Random
object in this case. In fact, this is one way to have node-local state
(e.g., for DB connection pooling).


 However:

 val myMap = collection.mutable.Map.empty[Int,Int]
 sc.parallelize(0 until 100).mapPartitions(it = {it.foreach(ii = myMap(ii) = 
 ii); Array(myMap).iterator}).collect


 This shows that each partition got a copy of the empty map and filled it
 in with its portion of the rdd.


In this case, myMap is an instance of the Map class, so it will be
serialized and shipped around. In fact, if you did `val Random = new
scala.util.Random()` in your code above, then this object would also be
serialized and treated just as myMap. (NB. No, it is not. Spark hangs for
me when I do this and doesn't return anything...)

Tobias


Re: Spark Streaming example on your mesos cluster

2014-08-12 Thread Tobias Pfeiffer
Hi,

On Wed, Aug 13, 2014 at 4:24 AM, Zia Syed xia.s...@gmail.com wrote:

 I dont particularly see any errors on my logs, either on console, or on
 slaves. I see slave downloads the  spark-1.0.2-bin-hadoop1.tgz file and
 unpacks them as well. Mesos Master shows quiet alot of Tasks created and
 Finished.  I dont see any output on my console of the Word Counts, like
 in get in the Spark version.

 Any suggestions/ideas how i can make it work?


You have to check the logs on the Mesos slaves in
 /tmp/mesos/slaves/***/frameworks/ -- I guess that you are missing the jar
that your application is packed in.

Tobias


Re: Mllib : Save SVM model to disk

2014-08-12 Thread Hoai-Thu Vuong
you should try watching this video
https://www.youtube.com/watch?v=sPhyePwo7FA, for more details, please
search in the archives, I've got a same kind of question and other guys
helped me to solve the problem.

On Tue, Aug 12, 2014 at 12:36 PM, XiaoQinyu xiaoqinyu_sp...@outlook.com
wrote:

 Have you solved this problem??

 And could you share how to save model to hdfs and reload it?

 Thanks

 XiaoQinyu



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Mllib-Save-SVM-model-to-disk-tp74p11954.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Thu.


training recsys model

2014-08-12 Thread Hoai-Thu Vuong
In MLLib, I found the method to train matrix factorization model to predict
the taste of user. In this function, there are some parameters such as
lambda, and rank, I can not find the best value to set these parameters and
how to optimize this value. Could you please give me some recommends?

-- 
Thu.


Re: how to access workers from spark context

2014-08-12 Thread S. Zhou
Sometimes workers are dead but spark context does not know it and still send 
jobs.


On Tuesday, August 12, 2014 7:14 PM, Stanley Shi s...@pivotal.io wrote:
 


Why do you need to detect the worker status in the application? you application 
generally don't need to know where it is executed.



On Wed, Aug 13, 2014 at 7:39 AM, S. Zhou myx...@yahoo.com.invalid wrote:

I tried to access worker info from spark context but it seems spark context 
does no expose such API. The reason of doing that is: it seems spark context 
itself does not have logic to detect if its workers are in dead status. So I 
like to add such logic by myself. 


BTW, it seems spark web UI has some logic of detecting dead workers. But all 
relevant classes are declared as private for the spark package scope. 


Please let me know how to solve this issue (or if there is an alternative way 
to achieve the same purpose)


Thanks




-- 

Regards,
Stanley Shi,

Re: how to access workers from spark context

2014-08-12 Thread S. Zhou
actually if you search the spark mail archives you will find many similar 
topics. At this time, I just want to manage it by myself.


On Tuesday, August 12, 2014 8:46 PM, Stanley Shi s...@pivotal.io wrote:
 


This seems a bug, right? It's not the user's responsibility to manage the 
workers.



On Wed, Aug 13, 2014 at 11:28 AM, S. Zhou myx...@yahoo.com wrote:

Sometimes workers are dead but spark context does not know it and still send 
jobs.



On Tuesday, August 12, 2014 7:14 PM, Stanley Shi s...@pivotal.io wrote:
 


Why do you need to detect the worker status in the application? you 
application generally don't need to know where it is executed.



On Wed, Aug 13, 2014 at 7:39 AM, S. Zhou myx...@yahoo.com.invalid wrote:

I tried to access worker info from spark context but it seems spark context 
does no expose such API. The reason of doing that is: it seems spark context 
itself does not have logic to detect if its workers are in dead status. So I 
like to add such logic by myself. 


BTW, it seems spark web UI has some logic of detecting dead workers. But all 
relevant classes are declared as private for the spark package scope. 


Please let me know how to solve this issue (or if there is an alternative way 
to achieve the same purpose)


Thanks





-- 

Regards,
Stanley Shi,





-- 

Regards,
Stanley Shi,

Re: Spark SQL JDBC

2014-08-12 Thread ZHENG, Xu-dong
Hi Cheng,

I also meet some issues when I try to start ThriftServer based a build from
master branch (I could successfully run it from the branch-1.0-jdbc
branch). Below is my build command:

./make-distribution.sh --skip-java-test -Phadoop-2.4 -Phive -Pyarn
-Dyarn.version=2.4.0 -Dhadoop.version=2.4.0 -Phive-thriftserver

And below are the printed errors:

ERROR CompositeService: Error starting services HiveServer2
org.apache.hive.service.ServiceException: Unable to connect to MetaStore!
at org.apache.hive.service.cli.CLIService.start(CLIService.java:85)
at
org.apache.hive.service.CompositeService.start(CompositeService.java:70)
at
org.apache.hive.service.server.HiveServer2.start(HiveServer2.java:73)
at
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveThriftServer2.scala:71)
at
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThriftServer2.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:314)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:73)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: javax.jdo.JDOFatalUserException: Class
org.datanucleus.api.jdo.JDOPersistenceManagerFactory was not found.
NestedThrowables:
java.lang.ClassNotFoundException:
org.datanucleus.api.jdo.JDOPersistenceManagerFactory
at
javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1175)
at
javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)
at
javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)
at
org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:275)
at
org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:304)
at
org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:234)
at
org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:209)
at
org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
at
org.apache.hadoop.hive.metastore.RetryingRawStore.init(RetryingRawStore.java:64)
at
org.apache.hadoop.hive.metastore.RetryingRawStore.getProxy(RetryingRawStore.java:73)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:415)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:402)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:441)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:326)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:286)
at
org.apache.hadoop.hive.metastore.RetryingHMSHandler.init(RetryingHMSHandler.java:54)
at
org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:59)
at
org.apache.hadoop.hive.metastore.HiveMetaStore.newHMSHandler(HiveMetaStore.java:4060)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.init(HiveMetaStoreClient.java:121)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.init(HiveMetaStoreClient.java:104)
at org.apache.hive.service.cli.CLIService.start(CLIService.java:82)
... 11 more
Caused by: java.lang.ClassNotFoundException:
org.datanucleus.api.jdo.JDOPersistenceManagerFactory
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at javax.jdo.JDOHelper$18.run(JDOHelper.java:2018)
at javax.jdo.JDOHelper$18.run(JDOHelper.java:2016)
at java.security.AccessController.doPrivileged(Native Method)
at javax.jdo.JDOHelper.forName(JDOHelper.java:2015)
at
javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1162)
... 32 more
14/08/13 13:08:48 INFO AbstractService: Service:OperationManager is stopped.
14/08/13 13:08:48 INFO AbstractService: Service:SessionManager is stopped.
14/08/13 13:08:48 INFO AbstractService: Service:CLIService is stopped.
14/08/13 13:08:48 ERROR 

Re: Spark SQL JDBC

2014-08-12 Thread ZHENG, Xu-dong
Just find this is because below lines in make_distribution.sh doesn't work:

if [ $SPARK_HIVE == true ]; then
  cp $FWDIR/lib_managed/jars/datanucleus*.jar $DISTDIR/lib/
fi

There is no definition of $SPARK_HIVE in make_distribution.sh. I should set
it explicitly.



On Wed, Aug 13, 2014 at 1:10 PM, ZHENG, Xu-dong dong...@gmail.com wrote:

 Hi Cheng,

 I also meet some issues when I try to start ThriftServer based a build
 from master branch (I could successfully run it from the branch-1.0-jdbc
 branch). Below is my build command:

 ./make-distribution.sh --skip-java-test -Phadoop-2.4 -Phive -Pyarn
 -Dyarn.version=2.4.0 -Dhadoop.version=2.4.0 -Phive-thriftserver

 And below are the printed errors:

 ERROR CompositeService: Error starting services HiveServer2
 org.apache.hive.service.ServiceException: Unable to connect to MetaStore!
 at org.apache.hive.service.cli.CLIService.start(CLIService.java:85)
 at
 org.apache.hive.service.CompositeService.start(CompositeService.java:70)
 at
 org.apache.hive.service.server.HiveServer2.start(HiveServer2.java:73)
 at
 org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveThriftServer2.scala:71)
 at
 org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThriftServer2.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:314)
  at
 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:73)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 Caused by: javax.jdo.JDOFatalUserException: Class
 org.datanucleus.api.jdo.JDOPersistenceManagerFactory was not found.
 NestedThrowables:
 java.lang.ClassNotFoundException:
 org.datanucleus.api.jdo.JDOPersistenceManagerFactory
 at
 javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1175)
 at
 javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)
 at
 javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)
 at
 org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:275)
 at
 org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:304)
 at
 org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:234)
 at
 org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:209)
 at
 org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
 at
 org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
 at
 org.apache.hadoop.hive.metastore.RetryingRawStore.init(RetryingRawStore.java:64)
 at
 org.apache.hadoop.hive.metastore.RetryingRawStore.getProxy(RetryingRawStore.java:73)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:415)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:402)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:441)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:326)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:286)
 at
 org.apache.hadoop.hive.metastore.RetryingHMSHandler.init(RetryingHMSHandler.java:54)
 at
 org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:59)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore.newHMSHandler(HiveMetaStore.java:4060)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient.init(HiveMetaStoreClient.java:121)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient.init(HiveMetaStoreClient.java:104)
 at org.apache.hive.service.cli.CLIService.start(CLIService.java:82)
 ... 11 more
 Caused by: java.lang.ClassNotFoundException:
 org.datanucleus.api.jdo.JDOPersistenceManagerFactory
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:270)
 at javax.jdo.JDOHelper$18.run(JDOHelper.java:2018)
 at javax.jdo.JDOHelper$18.run(JDOHelper.java:2016)
 at java.security.AccessController.doPrivileged(Native