Re: saveAsTextFiles file not found exception
Hi Chen, Please see the bug I filed at https://issues.apache.org/jira/browse/SPARK-2984 with the FileNotFoundException on _temporary directory issue. Andrew On Mon, Aug 11, 2014 at 10:50 PM, Andrew Ash and...@andrewash.com wrote: Not sure which stalled HDFS client issue your'e referring to, but there was one fixed in Spark 1.0.2 that could help you out -- https://github.com/apache/spark/pull/1409. I've still seen one related to Configuration objects not being threadsafe though so you'd still need to keep speculation on to fix that (SPARK-2546) As it stands now, I can: A) have speculation off, in which case I get random hangs for a variety of reasons (your HDFS stall, my Configuration safety issue) or B) have speculation on, in which case I get random failures related to LeaseExpiredExceptions and .../_temporary/... file doesn't exist exceptions. Kind of a catch-22 -- there's no reliable way to run large jobs on Spark right now! I'm going to file a bug for the _temporary and LeaseExpiredExceptions as I think these are widespread enough that we need a place to track a resolution. On Mon, Aug 11, 2014 at 9:08 AM, Chen Song chen.song...@gmail.com wrote: Andrew that is a good finding. Yes, I have speculative execution turned on, becauseI saw tasks stalled on HDFS client. If I turned off speculative execution, is there a way to circumvent the hanging task issue? On Mon, Aug 11, 2014 at 11:13 AM, Andrew Ash and...@andrewash.com wrote: I've also been seeing similar stacktraces on Spark core (not streaming) and have a theory it's related to spark.speculation being turned on. Do you have that enabled by chance? On Mon, Aug 11, 2014 at 8:10 AM, Chen Song chen.song...@gmail.com wrote: Bill Did you get this resolved somehow? Anyone has any insight into this problem? Chen On Mon, Aug 11, 2014 at 10:30 AM, Chen Song chen.song...@gmail.com wrote: The exception was thrown out in application master(spark streaming driver) and the job shut down after this exception. On Mon, Aug 11, 2014 at 10:29 AM, Chen Song chen.song...@gmail.com wrote: I got the same exception after the streaming job runs for a while, The ERROR message was complaining about a temp file not being found in the output folder. 14/08/11 08:05:08 ERROR JobScheduler: Error running job streaming job 140774430 ms.0 java.io.FileNotFoundException: File hdfs://hadoopc/user/csong/output/human_bot/-140774430.out/_temporary/0/task_201408110805__m_07 does not exist. at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:654) at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:102) at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:712) at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:708) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:708) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310) at org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136) at org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:841) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:724) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:643) at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1068) at org.apache.spark.streaming.dstream.DStream$$anonfun$8.apply(DStream.scala:773) at org.apache.spark.streaming.dstream.DStream$$anonfun$8.apply(DStream.scala:771) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) On Fri, Jul 25, 2014 at 7:04 PM, Bill Jay bill.jaypeter...@gmail.com wrote: I just saw another error
Re: ClassNotFound exception on class in uber.jar
This is how i used to do it: *// Create a list of jars* ListString jars = Lists.newArrayList(/home/akhld/mobi/localcluster/x/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly-0.9.1-hadoop2.2.0.jar,ADD-All-The-Jars-Here ); *// Create a SparkConf* SparkConf spconf = new SparkConf(); spconf.setMaster(local); spconf.setAppName(YourApp); spconf.setSparkHome(/home/akhld/mobi/localcluster/x/spark-0.9.1-bin-hadoop2); *spconf.setJars(jars.toArray(new String[jars.size()]));* spconf.set(spark.executor.memory, 1g); *// Now create the context.* JavaStreamingContext jsc = new JavaStreamingContext(spconf,new Duration(1)); Thanks Best Regards On Mon, Aug 11, 2014 at 9:36 PM, lbustelo g...@bustelos.com wrote: Not sure if this problem reached the Spark guys because it shows in Nabble that This post has NOT been accepted by the mailing list yet. http://apache-spark-user-list.1001560.n3.nabble.com/ClassNotFound-for-user-class-in-uber-jar-td10613.html#a11902 I'm resubmitting. Greetings, I'm currently building a fat or uber jar with dependencies using maven that a docker-ized Spark cluster (1 master 3 workers, version 1.0.0, scala 2.10.4) points to locally on the same VM. It seems that sometimes a particular class is found, and things are fine, and other times it is not. Doing a find on the jar affirms that it is actually there. I `setJars` with JavaStreamingContext.jarOfClass(the main class). I cannot say I know much about how the ClassPath mechanisms of Spark so I appreciate any and all suggestions to find out what exactly is happening. The exception is as follows: 14/07/24 18:48:52 INFO Executor: Sending result for 139 directly to driver 14/07/24 18:48:52 INFO Executor: Finished task ID 139 14/07/24 18:48:56 WARN BlockManager: Putting block input-0-1406227713800 failed 14/07/24 18:48:56 ERROR BlockManagerWorker: Exception handling buffer message java.lang.ClassNotFoundException: com.cjm5325.MyProject.MyClass at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:60) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at org.apache.spark.storage.MemoryStore.putBytes(MemoryStore.scala:59) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:666) at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:587) at org.apache.spark.storage.BlockManagerWorker.putBlock(BlockManagerWorker.scala:82) at org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:63) at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at
Re: KMeans - java.lang.IllegalArgumentException: requirement failed
It sounds like your data does not all have the same dimension? that's a decent guess. Have a look at the assertions in this method. On Tue, Aug 12, 2014 at 4:44 AM, Ge, Yao (Y.) y...@ford.com wrote: I am trying to train a KMeans model with sparse vector with Spark 1.0.1. When I run the training I got the following exception: java.lang.IllegalArgumentException: requirement failed at scala.Predef$.require(Predef.scala:221) at org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:271) at org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:398) at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:372) at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:366) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.mllib.clustering.KMeans$.findClosest(KMeans.scala:366) at org.apache.spark.mllib.clustering.KMeans$.pointCost(KMeans.scala:389) at org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:269) at org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:268) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Range.foreach(Range.scala:141) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:268) at org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:267) What does this means? How do I troubleshoot this problem? Thanks. -Yao - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: set SPARK_LOCAL_DIRS issue
// assuming Spark 1.0 Hi Baoqiang, In my experience for the standalone cluster you need to set SPARK_WORKER_DIR not SPARK_LOCAL_DIRS to control where shuffle files are written. I think this is a documentation issue that could be improved, as http://spark.apache.org/docs/latest/spark-standalone.html suggests using SPARK_LOCAL_DIRS for scratch, and I'm not sure that it actually does anything. Did you see anything in /mnt/data/tmp when you used SPARK_LOCAL_DIRS? Cheers! Andrew On Sat, Aug 9, 2014 at 7:21 AM, Baoqiang Cao bqcaom...@gmail.com wrote: Hi I’m trying to using a specific dir for spark working directory since I have limited space at /tmp. I tried: 1) export SPARK_LOCAL_DIRS=“/mnt/data/tmp” or 2) SPARK_LOCAL_DIRS=“/mnt/data/tmp” in spark-env.sh But neither worked, since the output of spark still saying ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /tmp/spark-local-20140809134509-0502/34/shuffle_0_436_1 java.io.FileNotFoundException: /tmp/spark-local-20140809134509-0502/34/shuffle_0_436_1 (No space left on device) anybody help with correctly setting up the “tmp” directory? Best, Baoqiang Cao Blog: http://baoqiang.org Email: bqcaom...@gmail.com
Re: Parallelizing a task makes it freeze
Actually the program hangs just by calling dataAllRDD.count(). I suspect creating the RDD is not successful when its elements are too big. When nY = 3000, dataAllRDD.count() works (each element of dataAll = 3000*400*64 bits = 9.6 MB), but when nY = 4000, it hangs (4000*400*64 bits = 12.8 MB). What are the limiting factors to the size of the elements of an RDD? sparkuser2345 wrote I have an array 'dataAll' of key-value pairs where each value is an array of arrays. I would like to parallelize a task over the elements of 'dataAll' to the workers. In the dummy example below, the number of elements in 'dataAll' is 3 but in real application it would be tens to hundreds. Without parallelizing dataAll, 'result' is calculated in less than a second: import org.jblas.DoubleMatrix val nY = 5000 val nX = 400 val dataAll = Array((1, Array.fill(nY)(Array.fill(nX)(1.0))), (2, Array.fill(nY)(Array.fill(nX)(1.0))), (3, Array.fill(nY)(Array.fill(nX)(1.0 val w1 = DoubleMatrix.ones(400) // This finishes in less than a second: val result = dataAll.map { dat = val c = dat._1 val dataArr = dat._2 // Map over the Arrays within dataArr: val test = dataArr.map { arr = val test2 = new DoubleMatrix(arr.length, 1, arr:_*) val out = test2.dot(w1) out } (c, test) } However, when I parallelize dataAll, the same task freezes: val dataAllRDD = sc.parallelize(dataAll, 3) // This doesn't finish in several minutes: val result = dataAllRDD.map { dat = val c = dat._1 val dataArr = dat._2 // Map over the Arrays within dataArr: val test = dataArr.map { arr = val test2 = new DoubleMatrix(arr.length, 1, arr:_*) val out = test2.dot(w1) out } (c, test) }.collect After sending the above task, nothing is written to the worker logs (as viewed through the web UI), but the following output is printed in the Spark shell where I'm running the task: 14/08/11 18:17:31 INFO SparkContext: Starting job: collect at console :33 14/08/11 18:17:31 INFO DAGScheduler: Got job 0 (collect at console :33) with 3 output partitions (allowLocal=false) 14/08/11 18:17:31 INFO DAGScheduler: Final stage: Stage 0 (collect at console :33) 14/08/11 18:17:31 INFO DAGScheduler: Parents of final stage: List() 14/08/11 18:17:31 INFO DAGScheduler: Missing parents: List() 14/08/11 18:17:31 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at map at console :23), which has no missing parents 14/08/11 18:17:32 INFO DAGScheduler: Submitting 3 missing tasks from Stage 0 (MappedRDD[1] at map at console :23) 14/08/11 18:17:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks 14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor 2: executor_2_IP (PROCESS_LOCAL) 14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:0 as 16154060 bytes in 69 ms 14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:1 as TID 1 on executor 1: executor_1_IP (PROCESS_LOCAL) 14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:1 as 16154060 bytes in 81 ms 14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:2 as TID 2 on executor 0: executor_0_IP (PROCESS_LOCAL) 14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:2 as 16154060 bytes in 66 ms dataAllRDD.map does work with smaller array though (e.g. nY = 100; finishes in less than a second). Why is dataAllRDD.map so much slower than dataAll.map, or even not executing at all? The Spark version I'm using is 0.9.0. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parallelizing-a-task-makes-it-freeze-tp11900p11967.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Transform RDD[List]
-incubator, +user So, are you trying to transpose your data? val rdd = sc.parallelize(List(List(1,2,3,4,5),List(6,7,8,9,10))).repartition(2) First you could pair each value with its position in its list: val withIndex = rdd.flatMap(_.zipWithIndex) then group by that position, and discard the position: withIndex.groupBy(_._2).values.map(_.map(_._1)) Printing the RDD gives what you want: List(5, 10) List(1, 6) List(3, 8) List(2, 7) List(4, 9) On Tue, Aug 12, 2014 at 5:42 AM, Kevin Jung itsjb.j...@samsung.com wrote: Hi It may be simple question, but I can not figure out the most efficient way. There is a RDD containing list. RDD ( List(1,2,3,4,5) List(6,7,8,9,10) ) I want to transform this to RDD ( List(1,6) List(2,7) List(3,8) List(4,9) List(5,10) ) And I want to achieve this without using collect method because realworld RDD can have a lot of elements then it may cause out of memory. Any ideas will be welcome. Best regards Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Transform-RDD-List-tp11948.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Killing spark app problem
Hi, when I run some spark application on my local machine using spark-submit: $SPARK_HOME/bin/spark-submit --driver-memory 1g class jar When I want to interrupt computing by ctrl-c it interrupt current stage but later it waits and exit after around 5min and sometimes doesn't exit at all, and the only way that I was able to kill it was kill -9 pid, but after this my system doesn't want to boot correctly after reboot. Is there a better way to properly kill Spark application? Thanks, Grzegorz
Re: java.lang.StackOverflowError when calling count()
The long lineage causes a long/deep Java object tree (DAG of RDD objects), which needs to be serialized as part of the task creation. When serializing, the whole object DAG needs to be traversed leading to the stackoverflow error. TD On Mon, Aug 11, 2014 at 7:14 PM, randylu randyl...@gmail.com wrote: hi, TD. I also fall into the trap of long lineage, and your suggestions do work well. But i don't understand why the long lineage can cause stackover, and where it takes effect? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p11941.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is there any way to control the parallelism in LogisticRegression
Assuming that your data is very sparse, I would recommend RDD.repartition. But if it is not the case and you don't want to shuffle the data, you can try a CombineInputFormat and then parse the lines into labeled points. Coalesce may cause locality problems if you didn't use the right number of partitions. -Xiangrui On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong dong...@gmail.com wrote: I think this has the same effect and issue with #1, right? On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen chenjiush...@gmail.com wrote: How about increase HDFS file extent size? like current value is 128M, we make it 512M or bigger. On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong dong...@gmail.com wrote: Hi all, We are trying to use Spark MLlib to train super large data (100M features and 5B rows). The input data in HDFS has ~26K partitions. By default, MLlib will create a task for every partition at each iteration. But because our dimensions are also very high, such large number of tasks will increase large network overhead to transfer the weight vector. So we want to reduce the number of tasks, we tried below ways: 1. Coalesce partitions without shuffling, then cache. data.coalesce(numPartitions).cache() This works fine for relative small data, but when data is increasing and numPartitions is fixed, the size of one partition will be large. This introduces two issues: the first is, the larger partition will need larger object and more memory at runtime, and trigger GC more frequently; the second is, we meet the issue 'size exceeds integer.max_value' error, which seems be caused by the size of one partition larger than 2G (https://issues.apache.org/jira/browse/SPARK-1391). 2. Coalesce partitions with shuffling, then cache. data.coalesce(numPartitions, true).cache() It could mitigate the second issue in #1 at some degree, but fist issue is still there, and it also will introduce large amount of shullfling. 3. Cache data first, and coalesce partitions. data.cache().coalesce(numPartitions) In this way, the number of cached partitions is not change, but each task read the data from multiple partitions. However, I find the task will loss locality by this way. I find a lot of 'ANY' tasks, that means that tasks read data from other nodes, and become slower than that read data from local memory. I think the best way should like #3, but leverage locality as more as possible. Is there any way to do that? Any suggestions? Thanks! -- ZHENG, Xu-dong -- 郑旭东 ZHENG, Xu-dong - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to save mllib model to hdfs and reload it
For linear models, the constructors are now public. You can save the weights to HDFS, then load the weights back and use the constructor to create the model. -Xiangrui On Mon, Aug 11, 2014 at 10:27 PM, XiaoQinyu xiaoqinyu_sp...@outlook.com wrote: hello: I want to know,if I use history data to training model and I want to use this model in other app.How should I do? Should I save this model in disk? And when I use this model then load it from disk.But I don't know how to save the mllib model,and reload it? I will be very pleasure,if anyone can give some tips. Thanks XiaoQinyu -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-save-mllib-model-to-hdfs-and-reload-it-tp11953.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Transform RDD[List]
Thanks for your answer. Yes, I want to transpose data. At this point, I have one more question. I tested it with RDD1 List(1, 2, 3, 4, 5) List(6, 7, 8, 9, 10) List(11, 12, 13, 14, 15) List(16, 17, 18, 19, 20) And the result is... ArrayBuffer(11, 1, 16, 6) ArrayBuffer(2, 12, 7, 17) ArrayBuffer(3, 13, 18, 8) ArrayBuffer(9, 19, 4, 14) ArrayBuffer(15, 20, 10, 5) It collects well but the order is shuffled. Can I maintain the order? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Transform-RDD-List-tp11948p11974.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: [spark-streaming] kafka source and flow control
I was hoping I could make the system behave as a blocking queue : if the outputs is too slow, buffers (storing space for RDDs) fills up, then blocks instead of dropping existing rdds, until the input itself blocks (slows down it’s consumption). On a side note I was wondering: is there the same issue with file (hdfs) inputs ? how can I be sure the input won’t “overflow” the process chain ? From: Tobias Pfeiffer [mailto:t...@preferred.jp] Sent: mardi 12 août 2014 02:58 To: Gwenhael Pasquiers Cc: u...@spark.incubator.apache.org Subject: Re: [spark-streaming] kafka source and flow control Hi, On Mon, Aug 11, 2014 at 9:41 PM, Gwenhael Pasquiers gwenhael.pasqui...@ericsson.commailto:gwenhael.pasqui...@ericsson.com wrote: We intend to apply other operations on the data later in the same spark context, but our first step is to archive it. Our goal is somth like this Step 1 : consume kafka Step 2 : archive to hdfs AND send to step 3 Step 3 : transform data Step 4 : save transformed data to HDFS as input for M/R I see. Well I think Spark Streaming may be well suited for that purpose. To us it looks like a great flaw if, in streaming mode, spark-streaming cannot slow down it’s consumption depending on the available resources. On Mon, Aug 11, 2014 at 10:10 PM, Gwenhael Pasquiers gwenhael.pasqui...@ericsson.commailto:gwenhael.pasqui...@ericsson.com wrote: I think the kind of self-regulating system you describe would be too difficult to implement and probably unreliable (even more with the fact that we have multiple slaves). Isn't slow down its consumption depending on the available resources a self-regulating system? I don't see how you can adapt to available resources without measuring your execution time and then change how much you consume. Did you have any particular form of adaption in mind? Tobias
Re: Transform RDD[List]
Sure, just add .toList.sorted in there. Putting together in one big expression: val rdd = sc.parallelize(List(List(1,2,3,4,5),List(6,7,8,9,10))) val result = rdd.flatMap(_.zipWithIndex).groupBy(_._2).values.map(_.map(_._1).toList.sorted) List(2, 7) List(1, 6) List(4, 9) List(3, 8) List(5, 10) On Tue, Aug 12, 2014 at 8:58 AM, Kevin Jung itsjb.j...@samsung.com wrote: Thanks for your answer. Yes, I want to transpose data. At this point, I have one more question. I tested it with RDD1 List(1, 2, 3, 4, 5) List(6, 7, 8, 9, 10) List(11, 12, 13, 14, 15) List(16, 17, 18, 19, 20) And the result is... ArrayBuffer(11, 1, 16, 6) ArrayBuffer(2, 12, 7, 17) ArrayBuffer(3, 13, 18, 8) ArrayBuffer(9, 19, 4, 14) ArrayBuffer(15, 20, 10, 5) It collects well but the order is shuffled. Can I maintain the order? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: share/reuse off-heap persisted (tachyon) RDD in SparkContext or saveAsParquetFile on tachyon in SQLContext
spark.speculation was not set, any speculative execution on tachyon side? tachyon-env.sh only changed following export TACHYON_MASTER_ADDRESS=test01.zala #export TACHYON_UNDERFS_ADDRESS=$TACHYON_HOME/underfs export TACHYON_UNDERFS_ADDRESS=hdfs://test01.zala:8020 export TACHYON_WORKER_MEMORY_SIZE=16GB test01.zala is master node for HDFS, tachyon, Spark, etc. worker nodes are test02.zala test03.zala test04.zala spark-shell run on test02 after parquetFile.saveAsParquetFile(tachyon://test01.zala:19998/parquet_1) i got FailedToCheckpointException with Failed to rename but tfs lsr, there are some temporary files and metadata file 0.00 B08-11-2014 16:19:28:054 /parquet_1 881.00 B 08-11-2014 16:19:28:054 In Memory /parquet_1/_metadata 0.00 B08-11-2014 16:19:28:314 /parquet_1/_temporary 0.00 B08-11-2014 16:19:28:314 /parquet_1/_temporary/0 0.00 B08-11-2014 16:19:28:931 /parquet_1/_temporary/0/_temporary 0.00 B08-11-2014 16:19:28:931 /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_00_33 0.00 B08-11-2014 16:19:28:931 In Memory /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_00_33/part-r-1.parquet 0.00 B08-11-2014 16:19:28:940 /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_01_35 0.00 B08-11-2014 16:19:28:940 In Memory /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_01_35/part-r-2.parquet 0.00 B08-11-2014 16:19:28:962 /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_03_36 0.00 B08-11-2014 16:19:28:962 In Memory /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_03_36/part-r-4.parquet 0.00 B08-11-2014 16:19:28:971 /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_02_34 0.00 B08-11-2014 16:19:28:971 In Memory /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_02_34/part-r-3.parquet 0.00 B08-11-2014 16:20:06:349 /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_01_37 0.00 B08-11-2014 16:20:06:349 In Memory /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_01_37/part-r-2.parquet 0.00 B08-11-2014 16:20:09:519 /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_03_38 0.00 B08-11-2014 16:20:09:519 In Memory /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_03_38/part-r-4.parquet 0.00 B08-11-2014 16:20:18:777 /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_02_39 0.00 B08-11-2014 16:20:18:777 In Memory /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_02_39/part-r-3.parquet 0.00 B08-11-2014 16:20:28:315 /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_00_40 0.00 B08-11-2014 16:20:28:315 In Memory /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_00_40/part-r-1.parquet 0.00 B08-11-2014 16:20:38:382 /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_01_41 0.00 B08-11-2014 16:20:38:382 In Memory /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_01_41/part-r-2.parquet 0.00 B08-11-2014 16:20:40:681 /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_03_42 0.00 B08-11-2014 16:20:40:681 In Memory /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_03_42/part-r-4.parquet 0.00 B08-11-2014 16:20:50:376 /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_02_43 0.00 B08-11-2014 16:20:50:376 In Memory /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_02_43/part-r-3.parquet 0.00 B08-11-2014 16:21:00:932 /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_00_44 0.00 B08-11-2014 16:21:00:932 In Memory /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_00_44/part-r-1.parquet 0.00 B08-11-2014 16:21:10:355 /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_01_45 0.00 B08-11-2014 16:21:10:355 In Memory /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_01_45/part-r-2.parquet 0.00 B08-11-2014 16:21:11:468 /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_03_46 0.00 B08-11-2014 16:21:11:468 In Memory /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_03_46/part-r-4.parquet 0.00 B08-11-2014 16:21:21:681 /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_02_47 0.00 B08-11-2014 16:21:21:681 In Memory /parquet_1/_temporary/0/_temporary/attempt_201408111619_0017_r_02_47/part-r-3.parquet 0.00 B08-11-2014 16:21:32:583
Re: share/reuse off-heap persisted (tachyon) RDD in SparkContext or saveAsParquetFile on tachyon in SQLContext
more interesting is if spark-shell started on master node (test01) then parquetFile.saveAsParquetFile(tachyon://test01.zala:19998/parquet_tablex) 14/08/12 11:42:06 INFO : initialize(tachyon://... ... ... 14/08/12 11:42:06 INFO : File does not exist: tachyon://test01.zala:19998/parquet_tablex/_metadata 14/08/12 11:42:06 INFO : getWorkingDirectory: / 14/08/12 11:42:06 INFO : create(tachyon://test01.zala:19998/parquet_tablex/_metadata, rw-r--r--, true, 65536, 1, 33554432, null) 14/08/12 11:42:06 WARN : tachyon.home is not set. Using /mnt/tachyon_default_home as the default value. 14/08/12 11:42:06 INFO : Trying to get local worker host : test01.zala 14/08/12 11:42:06 ERROR : No local worker on test01.zala NoWorkerException(message:No local worker on test01.zala) at tachyon.thrift.MasterService$user_getWorker_result$user_getWorker_resultStandardScheme.read(MasterService.java:25675) at tachyon.thrift.MasterService$user_getWorker_result$user_getWorker_resultStandardScheme.read(MasterService.java:25652) at tachyon.thrift.MasterService$user_getWorker_result.read(MasterService.java:25591) at tachyon.org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78) at tachyon.thrift.MasterService$Client.recv_user_getWorker(MasterService.java:832) at tachyon.thrift.MasterService$Client.user_getWorker(MasterService.java:818) at tachyon.master.MasterClient.user_getWorker(MasterClient.java:648) at tachyon.worker.WorkerClient.connect(WorkerClient.java:199) at tachyon.worker.WorkerClient.mustConnect(WorkerClient.java:360) at tachyon.worker.WorkerClient.getUserUfsTempFolder(WorkerClient.java:298) at tachyon.client.TachyonFS.createAndGetUserUfsTempFolder(TachyonFS.java:270) at tachyon.client.FileOutStream.init(FileOutStream.java:72) at tachyon.client.TachyonFile.getOutStream(TachyonFile.java:207) at tachyon.hadoop.AbstractTFS.create(AbstractTFS.java:102) at tachyon.hadoop.TFS.create(TFS.java:24) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:773) at parquet.hadoop.ParquetFileWriter.writeMetadataFile(ParquetFileWriter.java:344) at org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetTypes.scala:345) at org.apache.spark.sql.parquet.ParquetRelation$.createEmpty(ParquetRelation.scala:142) at org.apache.spark.sql.parquet.ParquetRelation$.create(ParquetRelation.scala:120) at org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:197) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:399) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:397) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:403) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:403) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:406) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:406) at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(SchemaRDDLike.scala:77) at org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:103) at $line12.$read$$iwC$$iwC$$iwC$$iwC.init(console:17) at $line12.$read$$iwC$$iwC$$iwC.init(console:22) at $line12.$read$$iwC$$iwC.init(console:24) at $line12.$read$$iwC.init(console:26) at $line12.$read.init(console:28) at $line12.$read$.init(console:32) at $line12.$read$.clinit(console) at $line12.$eval$.init(console:7) at $line12.$eval$.clinit(console) at $line12.$eval.$print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) at
RE: KMeans - java.lang.IllegalArgumentException: requirement failed
I figured it out. My indices parameters for the sparse vector are messed up. It is a good learning for me: When use the Vectors.sparse(int size, int[] indices, double[] values) to generate a vector, size is the size of the whole vector, not just the size of the elements with value. The indices array will need to be in ascending order. In many cases, it probably easier to use other two forms of Vectors.sparse functions if the indices and value positions are not naturally sorted. -Yao From: Ge, Yao (Y.) Sent: Monday, August 11, 2014 11:44 PM To: 'u...@spark.incubator.apache.org' Subject: KMeans - java.lang.IllegalArgumentException: requirement failed I am trying to train a KMeans model with sparse vector with Spark 1.0.1. When I run the training I got the following exception: java.lang.IllegalArgumentException: requirement failed at scala.Predef$.require(Predef.scala:221) at org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:271) at org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:398) at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:372) at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:366) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.mllib.clustering.KMeans$.findClosest(KMeans.scala:366) at org.apache.spark.mllib.clustering.KMeans$.pointCost(KMeans.scala:389) at org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:269) at org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:268) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Range.foreach(Range.scala:141) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:268) at org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:267) What does this means? How do I troubleshoot this problem? Thanks. -Yao
Re: java.lang.StackOverflowError when calling count()
hi, TD. Thanks very much! I got it. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p11980.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
LDA in MLBase
Hi All, I have a question regarding LDA topic modeling. I need to do topic modeling on ad data. Does MLBase supports LDA topic modeling? Or any stable, tested LDA implementation on Spark? BR, Aslan
Re: Is there any way to control the parallelism in LogisticRegression
Hi Xiangrui, Thanks for your reply! Yes, our data is very sparse, but RDD.repartition invoke RDD.coalesce(numPartitions, shuffle = true) internally, so I think it has the same effect with #2, right? For CombineInputFormat, although I haven't tried it, but it sounds that it will combine multiple partitions into a large partition if I cache it, so same issues as #1? For coalesce, could you share some best practice how to set the right number of partitions to avoid locality problem? Thanks! On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng men...@gmail.com wrote: Assuming that your data is very sparse, I would recommend RDD.repartition. But if it is not the case and you don't want to shuffle the data, you can try a CombineInputFormat and then parse the lines into labeled points. Coalesce may cause locality problems if you didn't use the right number of partitions. -Xiangrui On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong dong...@gmail.com wrote: I think this has the same effect and issue with #1, right? On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen chenjiush...@gmail.com wrote: How about increase HDFS file extent size? like current value is 128M, we make it 512M or bigger. On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong dong...@gmail.com wrote: Hi all, We are trying to use Spark MLlib to train super large data (100M features and 5B rows). The input data in HDFS has ~26K partitions. By default, MLlib will create a task for every partition at each iteration. But because our dimensions are also very high, such large number of tasks will increase large network overhead to transfer the weight vector. So we want to reduce the number of tasks, we tried below ways: 1. Coalesce partitions without shuffling, then cache. data.coalesce(numPartitions).cache() This works fine for relative small data, but when data is increasing and numPartitions is fixed, the size of one partition will be large. This introduces two issues: the first is, the larger partition will need larger object and more memory at runtime, and trigger GC more frequently; the second is, we meet the issue 'size exceeds integer.max_value' error, which seems be caused by the size of one partition larger than 2G (https://issues.apache.org/jira/browse/SPARK-1391). 2. Coalesce partitions with shuffling, then cache. data.coalesce(numPartitions, true).cache() It could mitigate the second issue in #1 at some degree, but fist issue is still there, and it also will introduce large amount of shullfling. 3. Cache data first, and coalesce partitions. data.cache().coalesce(numPartitions) In this way, the number of cached partitions is not change, but each task read the data from multiple partitions. However, I find the task will loss locality by this way. I find a lot of 'ANY' tasks, that means that tasks read data from other nodes, and become slower than that read data from local memory. I think the best way should like #3, but leverage locality as more as possible. Is there any way to do that? Any suggestions? Thanks! -- ZHENG, Xu-dong -- 郑旭东 ZHENG, Xu-dong -- 郑旭东 ZHENG, Xu-dong
Fwd: how to split RDD by key and save to different path
1. be careful, HDFS are better for large files, not bunches of small files. 2. if that's really what you want, roll it your own. def writeLines(iterator: Iterator[(String, String)]) = { val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) map try { while (iterator.hasNext) { val item = iterator.next() val key = item._1 val line = item._2 val writer = writers.get(key) match { case Some(writer) = writer case None = val path = arg(1) + key val outputStream = FileSystem.get(new Configuration()).create(new Path(path)) writer = new BufferedWriter(outputStream) } writer.writeLine(line) } finally { writers.values.foreach(._close()) } } val inputData = sc.textFile() val keyValue = inputData.map(line = (key, line)) val partitions = keValue.partitionBy(new MyPartition(10)) partitions.foreachPartition(writeLines) class MyPartitioner(partitions: Int) extends Partitioner { override def numPartitions: Int = partitions override def getPartition(key: Any): Int = { (key.toString.hashCode Integer.MAX_VALUE) % numPartitions // make sure lines with the same key in the same partition } } 2014-08-12 21:34 GMT+08:00 Fengyun RAO raofeng...@gmail.com: 1. be careful, HDFS are better for large files, not bunches of small files. 2. if that's really what you want, roll it your own. def writeAvro(iterator: Iterator[(String, String)]) = { val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) map try { while (iterator.hasNext) { val item = iterator.next() val key = item._1 val line = item._2 val writer = writers.get(key) match { case Some(writer) = writer case None = val path = arg(1) + key val outputStream = FileSystem.get(new Configuration()).create(new Path(path)) writer = new BufferedWriter(outputStream) } writer.writeLine(line) } finally { writers.values.foreach(._close()) } } val inputData = sc.textFile() val keyValue = inputData.map(line = (key, line)) val partitions = keValue.partitionBy(new MyPartition(10)) partitions.foreachPartition(writeLines) class MyPartitioner(partitions: Int) extends Partitioner { override def numPartitions: Int = partitions override def getPartition(key: Any): Int = { (key.toString.hashCode Integer.MAX_VALUE) % numPartitions // make sure lines with the same key in the same partition } } 2014-08-11 20:42 GMT+08:00 诺铁 noty...@gmail.com: hi, I have googled and find similar question without good answer, http://stackoverflow.com/questions/24520225/writing-to-hadoop-distributed-file-system-multiple-times-with-spark in short, I would like to separate raw data and divide by some key, for example, create date, and put the in directory named by date, so that I can easily access portion of data later. for now I have to extract all keys and then filter by key and save to file repeatly. are there any good way to do this? or maybe I shouldn't do such thing?
Re: Spark SQL JDBC
Yin helped me with that, and I appreciate the onlist followup. A few questions: Why is this the case? I guess, does building it with thriftserver add much more time/size to the final build? It seems that unless documented well, people will miss that and this situation would occur, why would we not just build the thrift server in? (I am not a programming expert, and not trying to judge the decision to have it in a separate profile, I would just like to understand why it'd done that way) On Mon, Aug 11, 2014 at 11:47 AM, Cheng Lian lian.cs@gmail.com wrote: Hi John, the JDBC Thrift server resides in its own build profile and need to be enabled explicitly by ./sbt/sbt -Phive-thriftserver assembly. On Tue, Aug 5, 2014 at 4:54 AM, John Omernik j...@omernik.com wrote: I am using spark-1.1.0-SNAPSHOT right now and trying to get familiar with the JDBC thrift server. I have everything compiled correctly, I can access data in spark-shell on yarn from my hive installation. Cached tables, etc all work. When I execute ./sbin/start-thriftserver.sh I get the error below. Shouldn't it just ready my spark-env? I guess I am lost on how to make this work. Thanks1 $ ./start-thriftserver.sh Spark assembly has been built with Hive, including Datanucleus jars on classpath Exception in thread main java.lang.ClassNotFoundException: org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:311) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:73) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Re: saveAsTextFiles file not found exception
Thanks for putting this together, Andrew. On Tue, Aug 12, 2014 at 2:11 AM, Andrew Ash and...@andrewash.com wrote: Hi Chen, Please see the bug I filed at https://issues.apache.org/jira/browse/SPARK-2984 with the FileNotFoundException on _temporary directory issue. Andrew On Mon, Aug 11, 2014 at 10:50 PM, Andrew Ash and...@andrewash.com wrote: Not sure which stalled HDFS client issue your'e referring to, but there was one fixed in Spark 1.0.2 that could help you out -- https://github.com/apache/spark/pull/1409. I've still seen one related to Configuration objects not being threadsafe though so you'd still need to keep speculation on to fix that (SPARK-2546) As it stands now, I can: A) have speculation off, in which case I get random hangs for a variety of reasons (your HDFS stall, my Configuration safety issue) or B) have speculation on, in which case I get random failures related to LeaseExpiredExceptions and .../_temporary/... file doesn't exist exceptions. Kind of a catch-22 -- there's no reliable way to run large jobs on Spark right now! I'm going to file a bug for the _temporary and LeaseExpiredExceptions as I think these are widespread enough that we need a place to track a resolution. On Mon, Aug 11, 2014 at 9:08 AM, Chen Song chen.song...@gmail.com wrote: Andrew that is a good finding. Yes, I have speculative execution turned on, becauseI saw tasks stalled on HDFS client. If I turned off speculative execution, is there a way to circumvent the hanging task issue? On Mon, Aug 11, 2014 at 11:13 AM, Andrew Ash and...@andrewash.com wrote: I've also been seeing similar stacktraces on Spark core (not streaming) and have a theory it's related to spark.speculation being turned on. Do you have that enabled by chance? On Mon, Aug 11, 2014 at 8:10 AM, Chen Song chen.song...@gmail.com wrote: Bill Did you get this resolved somehow? Anyone has any insight into this problem? Chen On Mon, Aug 11, 2014 at 10:30 AM, Chen Song chen.song...@gmail.com wrote: The exception was thrown out in application master(spark streaming driver) and the job shut down after this exception. On Mon, Aug 11, 2014 at 10:29 AM, Chen Song chen.song...@gmail.com wrote: I got the same exception after the streaming job runs for a while, The ERROR message was complaining about a temp file not being found in the output folder. 14/08/11 08:05:08 ERROR JobScheduler: Error running job streaming job 140774430 ms.0 java.io.FileNotFoundException: File hdfs://hadoopc/user/csong/output/human_bot/-140774430.out/_temporary/0/task_201408110805__m_07 does not exist. at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:654) at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:102) at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:712) at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:708) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:708) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310) at org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136) at org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:841) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:724) at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:643) at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1068) at org.apache.spark.streaming.dstream.DStream$$anonfun$8.apply(DStream.scala:773) at org.apache.spark.streaming.dstream.DStream$$anonfun$8.apply(DStream.scala:771) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at
Re: anaconda and spark integration
Hello. Is there an integration spark ( pyspark) with anaconda? I googled a lot and didn't find relevant information. Could you please pointing me on tutorial or simple example. Thanks in advance Oleg.
RE: Benchmark on physical Spark cluster
An on-list follow up: http://prof.ict.ac.cn/BigDataBench/#Benchmarks looks promising as it has spark as one of the platforms used. Bests, -Monir From: Mozumder, Monir Sent: Monday, August 11, 2014 7:18 PM To: user@spark.apache.org Subject: Benchmark on physical Spark cluster I am trying to get some workloads or benchmarks for running on a physical spark cluster and find relative speedups on different physical clusters. The instructions at https://databricks.com/blog/2014/02/12/big-data-benchmark.html uses Amazon EC2. I was wondering if anyone got other benchmarks for spark on physical clusters. Hoping to get a CloudSuite like suite for Spark. Bests, -Monir
Re: Is there any way to control the parallelism in LogisticRegression
Sorry, I missed #2. My suggestion is the same as #2. You need to set a bigger numPartitions to avoid hitting integer bound or 2G limitation, at the cost of increased shuffle size per iteration. If you use a CombineInputFormat and then cache, it will try to give you roughly the same size per partition. There will be some remote fetches from HDFS but still cheaper than calling RDD.repartition(). For coalesce without shuffle, I don't know how to set the right number of partitions either ... -Xiangrui On Tue, Aug 12, 2014 at 6:16 AM, ZHENG, Xu-dong dong...@gmail.com wrote: Hi Xiangrui, Thanks for your reply! Yes, our data is very sparse, but RDD.repartition invoke RDD.coalesce(numPartitions, shuffle = true) internally, so I think it has the same effect with #2, right? For CombineInputFormat, although I haven't tried it, but it sounds that it will combine multiple partitions into a large partition if I cache it, so same issues as #1? For coalesce, could you share some best practice how to set the right number of partitions to avoid locality problem? Thanks! On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng men...@gmail.com wrote: Assuming that your data is very sparse, I would recommend RDD.repartition. But if it is not the case and you don't want to shuffle the data, you can try a CombineInputFormat and then parse the lines into labeled points. Coalesce may cause locality problems if you didn't use the right number of partitions. -Xiangrui On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong dong...@gmail.com wrote: I think this has the same effect and issue with #1, right? On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen chenjiush...@gmail.com wrote: How about increase HDFS file extent size? like current value is 128M, we make it 512M or bigger. On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong dong...@gmail.com wrote: Hi all, We are trying to use Spark MLlib to train super large data (100M features and 5B rows). The input data in HDFS has ~26K partitions. By default, MLlib will create a task for every partition at each iteration. But because our dimensions are also very high, such large number of tasks will increase large network overhead to transfer the weight vector. So we want to reduce the number of tasks, we tried below ways: 1. Coalesce partitions without shuffling, then cache. data.coalesce(numPartitions).cache() This works fine for relative small data, but when data is increasing and numPartitions is fixed, the size of one partition will be large. This introduces two issues: the first is, the larger partition will need larger object and more memory at runtime, and trigger GC more frequently; the second is, we meet the issue 'size exceeds integer.max_value' error, which seems be caused by the size of one partition larger than 2G (https://issues.apache.org/jira/browse/SPARK-1391). 2. Coalesce partitions with shuffling, then cache. data.coalesce(numPartitions, true).cache() It could mitigate the second issue in #1 at some degree, but fist issue is still there, and it also will introduce large amount of shullfling. 3. Cache data first, and coalesce partitions. data.cache().coalesce(numPartitions) In this way, the number of cached partitions is not change, but each task read the data from multiple partitions. However, I find the task will loss locality by this way. I find a lot of 'ANY' tasks, that means that tasks read data from other nodes, and become slower than that read data from local memory. I think the best way should like #3, but leverage locality as more as possible. Is there any way to do that? Any suggestions? Thanks! -- ZHENG, Xu-dong -- 郑旭东 ZHENG, Xu-dong -- 郑旭东 ZHENG, Xu-dong - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL JDBC
Hive pulls in a ton of dependencies that we were afraid would break existing spark applications. For this reason all hive submodules are optional. On Tue, Aug 12, 2014 at 7:43 AM, John Omernik j...@omernik.com wrote: Yin helped me with that, and I appreciate the onlist followup. A few questions: Why is this the case? I guess, does building it with thriftserver add much more time/size to the final build? It seems that unless documented well, people will miss that and this situation would occur, why would we not just build the thrift server in? (I am not a programming expert, and not trying to judge the decision to have it in a separate profile, I would just like to understand why it'd done that way) On Mon, Aug 11, 2014 at 11:47 AM, Cheng Lian lian.cs@gmail.com wrote: Hi John, the JDBC Thrift server resides in its own build profile and need to be enabled explicitly by ./sbt/sbt -Phive-thriftserver assembly. On Tue, Aug 5, 2014 at 4:54 AM, John Omernik j...@omernik.com wrote: I am using spark-1.1.0-SNAPSHOT right now and trying to get familiar with the JDBC thrift server. I have everything compiled correctly, I can access data in spark-shell on yarn from my hive installation. Cached tables, etc all work. When I execute ./sbin/start-thriftserver.sh I get the error below. Shouldn't it just ready my spark-env? I guess I am lost on how to make this work. Thanks1 $ ./start-thriftserver.sh Spark assembly has been built with Hive, including Datanucleus jars on classpath Exception in thread main java.lang.ClassNotFoundException: org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:311) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:73) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Reference External Variables in Map Function (Inner class)
I have a class defining an inner static class (map function). The inner class tries to refer the variable instantiated in the outside class, which results in a NullPointerException. Sample Code as follows: class SampleOuterClass { private static ArrayListString someVariable; SampleOuterClass() { // initialize someVariable } public static class Parse implements Function... { public TypeReturn call (...) { // Try using someVariable: *Raises NullPointerException* } } public void run() { RDD rdd = data.map(new Parse()).rdd() } } Am I missing something with how Closures work with Spark or something else is wrong ? Thanks Sunny
Re: Reference External Variables in Map Function (Inner class)
Are there any other workarounds that could be used to pass in the values from *someVariable *to the transformation function ? On Tue, Aug 12, 2014 at 10:48 AM, Sean Owen so...@cloudera.com wrote: I don't think static members are going to be serialized in the closure? the instance of Parse will be looking at its local SampleOuterClass, which is maybe not initialized on the remote JVM. On Tue, Aug 12, 2014 at 6:02 PM, Sunny Khatri sunny.k...@gmail.com wrote: I have a class defining an inner static class (map function). The inner class tries to refer the variable instantiated in the outside class, which results in a NullPointerException. Sample Code as follows: class SampleOuterClass { private static ArrayListString someVariable; SampleOuterClass() { // initialize someVariable } public static class Parse implements Function... { public TypeReturn call (...) { // Try using someVariable: Raises NullPointerException } } public void run() { RDD rdd = data.map(new Parse()).rdd() } } Am I missing something with how Closures work with Spark or something else is wrong ? Thanks Sunny
Re: Reference External Variables in Map Function (Inner class)
You could create a copy of the variable inside your Parse class; that way it would be serialized with the instance you create when calling map() below. On Tue, Aug 12, 2014 at 10:56 AM, Sunny Khatri sunny.k...@gmail.com wrote: Are there any other workarounds that could be used to pass in the values from someVariable to the transformation function ? On Tue, Aug 12, 2014 at 10:48 AM, Sean Owen so...@cloudera.com wrote: I don't think static members are going to be serialized in the closure? the instance of Parse will be looking at its local SampleOuterClass, which is maybe not initialized on the remote JVM. On Tue, Aug 12, 2014 at 6:02 PM, Sunny Khatri sunny.k...@gmail.com wrote: I have a class defining an inner static class (map function). The inner class tries to refer the variable instantiated in the outside class, which results in a NullPointerException. Sample Code as follows: class SampleOuterClass { private static ArrayListString someVariable; SampleOuterClass() { // initialize someVariable } public static class Parse implements Function... { public TypeReturn call (...) { // Try using someVariable: Raises NullPointerException } } public void run() { RDD rdd = data.map(new Parse()).rdd() } } Am I missing something with how Closures work with Spark or something else is wrong ? Thanks Sunny -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
DistCP - Spark-based
We are probably still the minority, but our analytics platform based on Spark + HDFS does not have map/reduce installed. I'm wondering if there is a distcp equivalent that leverages Spark to do the work. Our team is trying to find the best way to do cross-datacenter replication of our HDFS data to minimize the impact of outages/dc failure.
Re: Reference External Variables in Map Function (Inner class)
That should work. Gonna give it a try. Thanks ! On Tue, Aug 12, 2014 at 11:01 AM, Marcelo Vanzin van...@cloudera.com wrote: You could create a copy of the variable inside your Parse class; that way it would be serialized with the instance you create when calling map() below. On Tue, Aug 12, 2014 at 10:56 AM, Sunny Khatri sunny.k...@gmail.com wrote: Are there any other workarounds that could be used to pass in the values from someVariable to the transformation function ? On Tue, Aug 12, 2014 at 10:48 AM, Sean Owen so...@cloudera.com wrote: I don't think static members are going to be serialized in the closure? the instance of Parse will be looking at its local SampleOuterClass, which is maybe not initialized on the remote JVM. On Tue, Aug 12, 2014 at 6:02 PM, Sunny Khatri sunny.k...@gmail.com wrote: I have a class defining an inner static class (map function). The inner class tries to refer the variable instantiated in the outside class, which results in a NullPointerException. Sample Code as follows: class SampleOuterClass { private static ArrayListString someVariable; SampleOuterClass() { // initialize someVariable } public static class Parse implements Function... { public TypeReturn call (...) { // Try using someVariable: Raises NullPointerException } } public void run() { RDD rdd = data.map(new Parse()).rdd() } } Am I missing something with how Closures work with Spark or something else is wrong ? Thanks Sunny -- Marcelo
Jobs get stuck at reduceByKey stage with spark 1.0.1
Hello spark aficionados, We upgraded from spark 1.0.0 to 1.0.1 when the new release came out and started noticing some weird errors. Even a simple operation like reduceByKey or count on an RDD gets stuck in cluster mode. This issue does not occur with spark 1.0.0 (in cluster or local mode) or spark 1.0.2 (in cluster or local mode) or with spark 1.0.1 (in local mode). I looked at the spark release notes and it did not seem like the gigantic task size issue is a problem because we have tons of resources on this cluster. Has anyone else encountered this issue before? Thanks in advance for your help, Shivani -- Software Engineer Analytics Engineering Team@ Box Mountain View, CA
Re: Spark Hbase job taking long time
Hi , Today i created a table with 3 regions and 2 jobtrackers but still the spark job is taking lot of time I also noticed one thing that is the memory of client was increasing linearly is it like spark job was first bringing the complete data in memory? On Thu, Aug 7, 2014 at 7:31 PM, Ted Yu [via Apache Spark User List] ml-node+s1001560n11651...@n3.nabble.com wrote: Forgot to include user@ Another email from Amit indicated that there is 1 region in his table. This wouldn't give you the benefit TableInputFormat is expected to deliver. Please split your table into multiple regions. See http://hbase.apache.org/book.html#d3593e6847 and related links. Cheers On Wed, Aug 6, 2014 at 6:41 AM, Ted Yu [hidden email] http://user/SendEmail.jtp?type=nodenode=11651i=0 wrote: Can you try specifying some value (100, e.g.) for hbase.mapreduce.scan.cachedrows in your conf ? bq. table contains 10lakh rows How many rows are there in the table ? nit: Example uses classOf[TableInputFormat] instead of TableInputFormat.class. Cheers On Wed, Aug 6, 2014 at 5:54 AM, Amit Singh Hora [hidden email] http://user/SendEmail.jtp?type=nodenode=11651i=1 wrote: Hi All, I am trying to run a SQL query on HBase using spark job ,till now i am able to get the desierd results but as the data set size increases Spark job is taking a long time I believe i am doing something wrong,as after going through documentation and videos discussing on spark performance it should not take more then couple of seconds. PFB code snippet HBase table contains 10lakh rows JavaPairRDDImmutableBytesWritable, Result pairRdd = ctx .newAPIHadoopRDD(conf, TableInputFormat.class, ImmutableBytesWritable.class, org.apache.hadoop.hbase.client.Result.class).cache(); JavaRDDPerson people = pairRdd .map(new FunctionTuple2lt;ImmutableBytesWritable, Result, Person() { public Person call(Tuple2ImmutableBytesWritable, Result v1) throws Exception { System.out.println(comming); Person person = new Person(); String key=Bytes.toString(v1._2.getRow()); key=key.substring(0,key.lastIndexOf(_)); person.setCalling(Long.parseLong(key)); person.setCalled(Bytes.toLong(v1._2.getValue( Bytes.toBytes(si), Bytes.toBytes(called; person.setTime(Bytes.toLong(v1._2.getValue( Bytes.toBytes(si), Bytes.toBytes(at; return person; } }); JavaSchemaRDD schemaPeople = sqlCtx.applySchema(people, Person.class); schemaPeople.registerAsTable(people); // SQL can be run over RDDs that have been registered as tables. JavaSchemaRDD teenagers = sqlCtx .sql(SELECT count(*) from people group by calling); teenagers.printSchema(); I am running spark using start-all.sh script with 2 workers Any pointers will be of a great help Regards, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Hbase-job-taking-long-time-tp11541.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=11651i=2 For additional commands, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=11651i=3 -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Hbase-job-taking-long-time-tp11541p11651.html To unsubscribe from Spark Hbase job taking long time, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=11541code=aG9yYS5hbWl0QGdtYWlsLmNvbXwxMTU0MXw4OTIzNDIwNzY= . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Hbase-job-taking-long-time-tp11541p11998.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark Streaming example on your mesos cluster
Hi, I'm trying to run streaming.NetworkWordCount example on the Mesos Cluster (0.19.1). I am able to run SparkPi examples on my mesos cluster, can run the streaming example in local[n] mode, but no luck so far with the Spark streaming examples running my mesos cluster. I dont particularly see any errors on my logs, either on console, or on slaves. I see slave downloads the spark-1.0.2-bin-hadoop1.tgz file and unpacks them as well. Mesos Master shows quiet alot of Tasks created and Finished. I dont see any output on my console of the Word Counts, like in get in the Spark version. Any suggestions/ideas how i can make it work? Thanks, Zia
Re: Spark sql failed in yarn-cluster mode when connecting to non-default hive database
Hi Jenny, Have you copied hive-site.xml to spark/conf directory? If not, can you put it in conf/ and try again? Thanks, Yin On Mon, Aug 11, 2014 at 8:57 PM, Jenny Zhao linlin200...@gmail.com wrote: Thanks Yin! here is my hive-site.xml, which I copied from $HIVE_HOME/conf, didn't experience problem connecting to the metastore through hive. which uses DB2 as metastore database. ?xml version=1.0? ?xml-stylesheet type=text/xsl href=configuration.xsl? !-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the License); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -- configuration property namehive.hwi.listen.port/name value/value /property property namehive.querylog.location/name value/var/ibm/biginsights/hive/query/${user.name}/value /property property namehive.metastore.warehouse.dir/name value/biginsights/hive/warehouse/value /property property namehive.hwi.war.file/name valuelib/hive-hwi-0.12.0.war/value /property property namehive.metastore.metrics.enabled/name valuetrue/value /property property namejavax.jdo.option.ConnectionURL/name valuejdbc:db2://hdtest022.svl.ibm.com:50001/BIDB/value /property property namejavax.jdo.option.ConnectionDriverName/name valuecom.ibm.db2.jcc.DB2Driver/value /property property namehive.stats.autogather/name valuefalse/value /property property namejavax.jdo.mapping.Schema/name valueHIVE/value /property property namejavax.jdo.option.ConnectionUserName/name valuecatalog/value /property property namejavax.jdo.option.ConnectionPassword/name valueV2pJNWMxbFlVbWhaZHowOQ==/value /property property namehive.metastore.password.encrypt/name valuetrue/value /property property nameorg.jpox.autoCreateSchema/name valuetrue/value /property property namehive.server2.thrift.min.worker.threads/name value5/value /property property namehive.server2.thrift.max.worker.threads/name value100/value /property property namehive.server2.thrift.port/name value1/value /property property namehive.server2.thrift.bind.host/name valuehdtest022.svl.ibm.com/value /property property namehive.server2.authentication/name valueCUSTOM/value /property property namehive.server2.custom.authentication.class/name valueorg.apache.hive.service.auth.WebConsoleAuthenticationProviderImpl/value /property property namehive.server2.enable.impersonation/name valuetrue/value /property property namehive.security.webconsole.url/name valuehttp://hdtest022.svl.ibm.com:8080/value /property property namehive.security.authorization.enabled/name valuetrue/value /property property namehive.security.authorization.createtable.owner.grants/name valueALL/value /property /configuration On Mon, Aug 11, 2014 at 4:29 PM, Yin Huai huaiyin@gmail.com wrote: Hi Jenny, How's your metastore configured for both Hive and Spark SQL? Which metastore mode are you using (based on https://cwiki.apache.org/confluence/display/Hive/AdminManual+MetastoreAdmin )? Thanks, Yin On Mon, Aug 11, 2014 at 6:15 PM, Jenny Zhao linlin200...@gmail.com wrote: you can reproduce this issue with the following steps (assuming you have Yarn cluster + Hive 12): 1) using hive shell, create a database, e.g: create database ttt 2) write a simple spark sql program import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql._ import org.apache.spark.sql.hive.HiveContext object HiveSpark { case class Record(key: Int, value: String) def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(HiveSpark) val sc = new SparkContext(sparkConf) // A hive context creates an instance of the Hive Metastore in process, val hiveContext = new HiveContext(sc) import hiveContext._ hql(use ttt) hql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)) hql(LOAD DATA INPATH '/user/biadmin/kv1.txt' INTO TABLE src) // Queries are expressed in HiveQL println(Result of 'SELECT *': ) hql(SELECT * FROM src).collect.foreach(println) sc.stop() } } 3) run it in yarn-cluster mode. On Mon, Aug 11, 2014 at 9:44 AM, Cheng Lian
Re: DistCP - Spark-based
Good question; I don't know of one but I believe people at Cloudera had some thoughts of porting Sqoop to Spark in the future, and maybe they'd consider DistCP as part of this effort. I agree it's missing right now. Matei On August 12, 2014 at 11:04:28 AM, Gary Malouf (malouf.g...@gmail.com) wrote: We are probably still the minority, but our analytics platform based on Spark + HDFS does not have map/reduce installed. I'm wondering if there is a distcp equivalent that leverages Spark to do the work. Our team is trying to find the best way to do cross-datacenter replication of our HDFS data to minimize the impact of outages/dc failure.
how to access workers from spark context
I tried to access worker info from spark context but it seems spark context does no expose such API. The reason of doing that is: it seems spark context itself does not have logic to detect if its workers are in dead status. So I like to add such logic by myself. BTW, it seems spark web UI has some logic of detecting dead workers. But all relevant classes are declared as private for the spark package scope. Please let me know how to solve this issue (or if there is an alternative way to achieve the same purpose) Thanks
Re: how to split RDD by key and save to different path
understand, thank you small file is a problem, I am considering process data before put them in hdfs. On Tue, Aug 12, 2014 at 9:37 PM, Fengyun RAO raofeng...@gmail.com wrote: 1. be careful, HDFS are better for large files, not bunches of small files. 2. if that's really what you want, roll it your own. def writeLines(iterator: Iterator[(String, String)]) = { val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) map try { while (iterator.hasNext) { val item = iterator.next() val key = item._1 val line = item._2 val writer = writers.get(key) match { case Some(writer) = writer case None = val path = arg(1) + key val outputStream = FileSystem.get(new Configuration()).create(new Path(path)) writer = new BufferedWriter(outputStream) } writer.writeLine(line) } finally { writers.values.foreach(._close()) } } val inputData = sc.textFile() val keyValue = inputData.map(line = (key, line)) val partitions = keValue.partitionBy(new MyPartition(10)) partitions.foreachPartition(writeLines) class MyPartitioner(partitions: Int) extends Partitioner { override def numPartitions: Int = partitions override def getPartition(key: Any): Int = { (key.toString.hashCode Integer.MAX_VALUE) % numPartitions // make sure lines with the same key in the same partition } } 2014-08-12 21:34 GMT+08:00 Fengyun RAO raofeng...@gmail.com: 1. be careful, HDFS are better for large files, not bunches of small files. 2. if that's really what you want, roll it your own. def writeAvro(iterator: Iterator[(String, String)]) = { val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) map try { while (iterator.hasNext) { val item = iterator.next() val key = item._1 val line = item._2 val writer = writers.get(key) match { case Some(writer) = writer case None = val path = arg(1) + key val outputStream = FileSystem.get(new Configuration()).create(new Path(path)) writer = new BufferedWriter(outputStream) } writer.writeLine(line) } finally { writers.values.foreach(._close()) } } val inputData = sc.textFile() val keyValue = inputData.map(line = (key, line)) val partitions = keValue.partitionBy(new MyPartition(10)) partitions.foreachPartition(writeLines) class MyPartitioner(partitions: Int) extends Partitioner { override def numPartitions: Int = partitions override def getPartition(key: Any): Int = { (key.toString.hashCode Integer.MAX_VALUE) % numPartitions // make sure lines with the same key in the same partition } } 2014-08-11 20:42 GMT+08:00 诺铁 noty...@gmail.com: hi, I have googled and find similar question without good answer, http://stackoverflow.com/questions/24520225/writing-to-hadoop-distributed-file-system-multiple-times-with-spark in short, I would like to separate raw data and divide by some key, for example, create date, and put the in directory named by date, so that I can easily access portion of data later. for now I have to extract all keys and then filter by key and save to file repeatly. are there any good way to do this? or maybe I shouldn't do such thing?
Re: Spark sql failed in yarn-cluster mode when connecting to non-default hive database
Hi Yin, hive-site.xml was copied to spark/conf and the same as the one under $HIVE_HOME/conf. through hive cli, I don't see any problem. but for spark on yarn-cluster mode, I am not able to switch to a database other than the default one, for Yarn-client mode, it works fine. Thanks! Jenny On Tue, Aug 12, 2014 at 12:53 PM, Yin Huai huaiyin@gmail.com wrote: Hi Jenny, Have you copied hive-site.xml to spark/conf directory? If not, can you put it in conf/ and try again? Thanks, Yin On Mon, Aug 11, 2014 at 8:57 PM, Jenny Zhao linlin200...@gmail.com wrote: Thanks Yin! here is my hive-site.xml, which I copied from $HIVE_HOME/conf, didn't experience problem connecting to the metastore through hive. which uses DB2 as metastore database. ?xml version=1.0? ?xml-stylesheet type=text/xsl href=configuration.xsl? !-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the License); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -- configuration property namehive.hwi.listen.port/name value/value /property property namehive.querylog.location/name value/var/ibm/biginsights/hive/query/${user.name}/value /property property namehive.metastore.warehouse.dir/name value/biginsights/hive/warehouse/value /property property namehive.hwi.war.file/name valuelib/hive-hwi-0.12.0.war/value /property property namehive.metastore.metrics.enabled/name valuetrue/value /property property namejavax.jdo.option.ConnectionURL/name valuejdbc:db2://hdtest022.svl.ibm.com:50001/BIDB/value /property property namejavax.jdo.option.ConnectionDriverName/name valuecom.ibm.db2.jcc.DB2Driver/value /property property namehive.stats.autogather/name valuefalse/value /property property namejavax.jdo.mapping.Schema/name valueHIVE/value /property property namejavax.jdo.option.ConnectionUserName/name valuecatalog/value /property property namejavax.jdo.option.ConnectionPassword/name valueV2pJNWMxbFlVbWhaZHowOQ==/value /property property namehive.metastore.password.encrypt/name valuetrue/value /property property nameorg.jpox.autoCreateSchema/name valuetrue/value /property property namehive.server2.thrift.min.worker.threads/name value5/value /property property namehive.server2.thrift.max.worker.threads/name value100/value /property property namehive.server2.thrift.port/name value1/value /property property namehive.server2.thrift.bind.host/name valuehdtest022.svl.ibm.com/value /property property namehive.server2.authentication/name valueCUSTOM/value /property property namehive.server2.custom.authentication.class/name valueorg.apache.hive.service.auth.WebConsoleAuthenticationProviderImpl/value /property property namehive.server2.enable.impersonation/name valuetrue/value /property property namehive.security.webconsole.url/name valuehttp://hdtest022.svl.ibm.com:8080/value /property property namehive.security.authorization.enabled/name valuetrue/value /property property namehive.security.authorization.createtable.owner.grants/name valueALL/value /property /configuration On Mon, Aug 11, 2014 at 4:29 PM, Yin Huai huaiyin@gmail.com wrote: Hi Jenny, How's your metastore configured for both Hive and Spark SQL? Which metastore mode are you using (based on https://cwiki.apache.org/confluence/display/Hive/AdminManual+MetastoreAdmin )? Thanks, Yin On Mon, Aug 11, 2014 at 6:15 PM, Jenny Zhao linlin200...@gmail.com wrote: you can reproduce this issue with the following steps (assuming you have Yarn cluster + Hive 12): 1) using hive shell, create a database, e.g: create database ttt 2) write a simple spark sql program import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql._ import org.apache.spark.sql.hive.HiveContext object HiveSpark { case class Record(key: Int, value: String) def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(HiveSpark) val sc = new SparkContext(sparkConf) // A hive context creates an instance of the Hive Metastore in process, val hiveContext = new HiveContext(sc) import hiveContext._ hql(use ttt)
Re: spark.files.userClassPathFirst=true Not Working Correctly
Hi, sorry for the delay. Would you have yarn available to test? Given the discussion in SPARK-2878, this might be a different incarnation of the same underlying issue. The option in Yarn is spark.yarn.user.classpath.first On Mon, Aug 11, 2014 at 1:33 PM, DNoteboom dan...@wibidata.com wrote: I'm currently running on my local machine on standalone. The error shows up in my code when I am closing resources using the TaskContext.addOnCompleteCallBack. However, the cause of this error is because of a faulty classLoader which must occur in the Executor in the function createClassLoader. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-files-userClassPathFirst-true-Not-Working-Correctly-tp11917p11921.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Fwd: Task closures and synchronization
Uh, for some reason I don't seem to automatically reply to the list any more. Here is again my message to Tom. -- Forwarded message -- Tom, On Wed, Aug 13, 2014 at 5:35 AM, Tom Vacek minnesota...@gmail.com wrote: This is a back-to-basics question. How do we know when Spark will clone an object and distribute it with task closures versus synchronize access to it. For example, the old rookie mistake of random number generation: import scala.util.Random val randRDD = sc.parallelize(0 until 1000).map(ii = Random.nextGaussian) One can check to see that each partition contains a different set of random numbers, so the RNG obviously was not cloned, but access was synchronized. In this case, Random is a singleton object; Random.nextGaussian is like a static method of a Java class. The access is not synchronized (unless I misunderstand synchronized), but each Spark worker will use a JVM-local instance of the Random object. You don't actually close over the Random object in this case. In fact, this is one way to have node-local state (e.g., for DB connection pooling). However: val myMap = collection.mutable.Map.empty[Int,Int] sc.parallelize(0 until 100).mapPartitions(it = {it.foreach(ii = myMap(ii) = ii); Array(myMap).iterator}).collect This shows that each partition got a copy of the empty map and filled it in with its portion of the rdd. In this case, myMap is an instance of the Map class, so it will be serialized and shipped around. In fact, if you did `val Random = new scala.util.Random()` in your code above, then this object would also be serialized and treated just as myMap. (NB. No, it is not. Spark hangs for me when I do this and doesn't return anything...) Tobias
Re: Spark Streaming example on your mesos cluster
Hi, On Wed, Aug 13, 2014 at 4:24 AM, Zia Syed xia.s...@gmail.com wrote: I dont particularly see any errors on my logs, either on console, or on slaves. I see slave downloads the spark-1.0.2-bin-hadoop1.tgz file and unpacks them as well. Mesos Master shows quiet alot of Tasks created and Finished. I dont see any output on my console of the Word Counts, like in get in the Spark version. Any suggestions/ideas how i can make it work? You have to check the logs on the Mesos slaves in /tmp/mesos/slaves/***/frameworks/ -- I guess that you are missing the jar that your application is packed in. Tobias
Re: Mllib : Save SVM model to disk
you should try watching this video https://www.youtube.com/watch?v=sPhyePwo7FA, for more details, please search in the archives, I've got a same kind of question and other guys helped me to solve the problem. On Tue, Aug 12, 2014 at 12:36 PM, XiaoQinyu xiaoqinyu_sp...@outlook.com wrote: Have you solved this problem?? And could you share how to save model to hdfs and reload it? Thanks XiaoQinyu -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Mllib-Save-SVM-model-to-disk-tp74p11954.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Thu.
training recsys model
In MLLib, I found the method to train matrix factorization model to predict the taste of user. In this function, there are some parameters such as lambda, and rank, I can not find the best value to set these parameters and how to optimize this value. Could you please give me some recommends? -- Thu.
Re: how to access workers from spark context
Sometimes workers are dead but spark context does not know it and still send jobs. On Tuesday, August 12, 2014 7:14 PM, Stanley Shi s...@pivotal.io wrote: Why do you need to detect the worker status in the application? you application generally don't need to know where it is executed. On Wed, Aug 13, 2014 at 7:39 AM, S. Zhou myx...@yahoo.com.invalid wrote: I tried to access worker info from spark context but it seems spark context does no expose such API. The reason of doing that is: it seems spark context itself does not have logic to detect if its workers are in dead status. So I like to add such logic by myself. BTW, it seems spark web UI has some logic of detecting dead workers. But all relevant classes are declared as private for the spark package scope. Please let me know how to solve this issue (or if there is an alternative way to achieve the same purpose) Thanks -- Regards, Stanley Shi,
Re: how to access workers from spark context
actually if you search the spark mail archives you will find many similar topics. At this time, I just want to manage it by myself. On Tuesday, August 12, 2014 8:46 PM, Stanley Shi s...@pivotal.io wrote: This seems a bug, right? It's not the user's responsibility to manage the workers. On Wed, Aug 13, 2014 at 11:28 AM, S. Zhou myx...@yahoo.com wrote: Sometimes workers are dead but spark context does not know it and still send jobs. On Tuesday, August 12, 2014 7:14 PM, Stanley Shi s...@pivotal.io wrote: Why do you need to detect the worker status in the application? you application generally don't need to know where it is executed. On Wed, Aug 13, 2014 at 7:39 AM, S. Zhou myx...@yahoo.com.invalid wrote: I tried to access worker info from spark context but it seems spark context does no expose such API. The reason of doing that is: it seems spark context itself does not have logic to detect if its workers are in dead status. So I like to add such logic by myself. BTW, it seems spark web UI has some logic of detecting dead workers. But all relevant classes are declared as private for the spark package scope. Please let me know how to solve this issue (or if there is an alternative way to achieve the same purpose) Thanks -- Regards, Stanley Shi, -- Regards, Stanley Shi,
Re: Spark SQL JDBC
Hi Cheng, I also meet some issues when I try to start ThriftServer based a build from master branch (I could successfully run it from the branch-1.0-jdbc branch). Below is my build command: ./make-distribution.sh --skip-java-test -Phadoop-2.4 -Phive -Pyarn -Dyarn.version=2.4.0 -Dhadoop.version=2.4.0 -Phive-thriftserver And below are the printed errors: ERROR CompositeService: Error starting services HiveServer2 org.apache.hive.service.ServiceException: Unable to connect to MetaStore! at org.apache.hive.service.cli.CLIService.start(CLIService.java:85) at org.apache.hive.service.CompositeService.start(CompositeService.java:70) at org.apache.hive.service.server.HiveServer2.start(HiveServer2.java:73) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveThriftServer2.scala:71) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThriftServer2.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:314) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:73) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: javax.jdo.JDOFatalUserException: Class org.datanucleus.api.jdo.JDOPersistenceManagerFactory was not found. NestedThrowables: java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1175) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701) at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:275) at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:304) at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:234) at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:209) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) at org.apache.hadoop.hive.metastore.RetryingRawStore.init(RetryingRawStore.java:64) at org.apache.hadoop.hive.metastore.RetryingRawStore.getProxy(RetryingRawStore.java:73) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:415) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:402) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:441) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:326) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:286) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.init(RetryingHMSHandler.java:54) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:59) at org.apache.hadoop.hive.metastore.HiveMetaStore.newHMSHandler(HiveMetaStore.java:4060) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.init(HiveMetaStoreClient.java:121) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.init(HiveMetaStoreClient.java:104) at org.apache.hive.service.cli.CLIService.start(CLIService.java:82) ... 11 more Caused by: java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at javax.jdo.JDOHelper$18.run(JDOHelper.java:2018) at javax.jdo.JDOHelper$18.run(JDOHelper.java:2016) at java.security.AccessController.doPrivileged(Native Method) at javax.jdo.JDOHelper.forName(JDOHelper.java:2015) at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1162) ... 32 more 14/08/13 13:08:48 INFO AbstractService: Service:OperationManager is stopped. 14/08/13 13:08:48 INFO AbstractService: Service:SessionManager is stopped. 14/08/13 13:08:48 INFO AbstractService: Service:CLIService is stopped. 14/08/13 13:08:48 ERROR
Re: Spark SQL JDBC
Just find this is because below lines in make_distribution.sh doesn't work: if [ $SPARK_HIVE == true ]; then cp $FWDIR/lib_managed/jars/datanucleus*.jar $DISTDIR/lib/ fi There is no definition of $SPARK_HIVE in make_distribution.sh. I should set it explicitly. On Wed, Aug 13, 2014 at 1:10 PM, ZHENG, Xu-dong dong...@gmail.com wrote: Hi Cheng, I also meet some issues when I try to start ThriftServer based a build from master branch (I could successfully run it from the branch-1.0-jdbc branch). Below is my build command: ./make-distribution.sh --skip-java-test -Phadoop-2.4 -Phive -Pyarn -Dyarn.version=2.4.0 -Dhadoop.version=2.4.0 -Phive-thriftserver And below are the printed errors: ERROR CompositeService: Error starting services HiveServer2 org.apache.hive.service.ServiceException: Unable to connect to MetaStore! at org.apache.hive.service.cli.CLIService.start(CLIService.java:85) at org.apache.hive.service.CompositeService.start(CompositeService.java:70) at org.apache.hive.service.server.HiveServer2.start(HiveServer2.java:73) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveThriftServer2.scala:71) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThriftServer2.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:314) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:73) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: javax.jdo.JDOFatalUserException: Class org.datanucleus.api.jdo.JDOPersistenceManagerFactory was not found. NestedThrowables: java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1175) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701) at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:275) at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:304) at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:234) at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:209) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) at org.apache.hadoop.hive.metastore.RetryingRawStore.init(RetryingRawStore.java:64) at org.apache.hadoop.hive.metastore.RetryingRawStore.getProxy(RetryingRawStore.java:73) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:415) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:402) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:441) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:326) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:286) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.init(RetryingHMSHandler.java:54) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:59) at org.apache.hadoop.hive.metastore.HiveMetaStore.newHMSHandler(HiveMetaStore.java:4060) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.init(HiveMetaStoreClient.java:121) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.init(HiveMetaStoreClient.java:104) at org.apache.hive.service.cli.CLIService.start(CLIService.java:82) ... 11 more Caused by: java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at javax.jdo.JDOHelper$18.run(JDOHelper.java:2018) at javax.jdo.JDOHelper$18.run(JDOHelper.java:2016) at java.security.AccessController.doPrivileged(Native