Re: high GC in the Kmeans algorithm
Thanks for your answer. Yes, I cached the data, I can observed from the WebUI that all the data is cached in the memory. What I worry is that the dimension, not the total size. Sean Owen ever answered me that the Broadcast support the maximum array size is 2GB, so 10^7 is a little huge? On Wed, Feb 18, 2015 at 5:43 AM, Xiangrui Meng men...@gmail.com wrote: Did you cache the data? Was it fully cached? The k-means implementation doesn't create many temporary objects. I guess you need more RAM to avoid GC triggered frequently. Please monitor the memory usage using YourKit or VisualVM. -Xiangrui On Wed, Feb 11, 2015 at 1:35 AM, lihu lihu...@gmail.com wrote: I just want to make the best use of CPU, and test the performance of spark if there is a lot of task in a single node. On Wed, Feb 11, 2015 at 5:29 PM, Sean Owen so...@cloudera.com wrote: Good, worth double-checking that's what you got. That's barely 1GB per task though. Why run 48 if you have 24 cores? On Wed, Feb 11, 2015 at 9:03 AM, lihu lihu...@gmail.com wrote: I give 50GB to the executor, so it seem that there is no reason the memory is not enough. On Wed, Feb 11, 2015 at 4:50 PM, Sean Owen so...@cloudera.com wrote: Meaning, you have 128GB per machine but how much memory are you giving the executors? On Wed, Feb 11, 2015 at 8:49 AM, lihu lihu...@gmail.com wrote: What do you mean? Yes,I an see there is some data put in the memory from the web ui. On Wed, Feb 11, 2015 at 4:25 PM, Sean Owen so...@cloudera.com wrote: Are you actually using that memory for executors? On Wed, Feb 11, 2015 at 8:17 AM, lihu lihu...@gmail.com wrote: Hi, I run the kmeans(MLlib) in a cluster with 12 workers. Every work own a 128G RAM, 24Core. I run 48 task in one machine. the total data is just 40GB. When the dimension of the data set is about 10^7, for every task the duration is about 30s, but the cost for GC is about 20s. When I reduce the dimension to 10^4, then the gc is small. So why gc is so high when the dimension is larger? or this is the reason caused by MLlib? -- Best Wishes! Li Hu(李浒) | Graduate Student Institute for Interdisciplinary Information Sciences(IIIS) Tsinghua University, China Email: lihu...@gmail.com Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/ -- Best Wishes! Li Hu(李浒) | Graduate Student Institute for Interdisciplinary Information Sciences(IIIS) Tsinghua University, China Email: lihu...@gmail.com Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/ -- Best Wishes! Li Hu(李浒) | Graduate Student Institute for Interdisciplinary Information Sciences(IIIS) Tsinghua University, China Email: lihu...@gmail.com Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
Re: Lost task - connection closed
Hi, Thanks for the reponse. I discovered my problem was that some of the executors got OOM, tracing down the logs of executors helps discovering the problem. Usually the log from the driver do not reflect the OOM error and therefore causes confusions among users. This is just the discoveries on my side, not sure if OP was having the same problem though On Wed, Feb 11, 2015 at 12:03 AM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: Hi Can you share the code you are trying to run. Thanks Arush On Wed, Feb 11, 2015 at 9:12 AM, Tianshuo Deng td...@twitter.com.invalid wrote: I have seen the same problem, It causes some tasks to fail, but not the whole job to fail. Hope someone could shed some light on what could be the cause of this. On Mon, Jan 26, 2015 at 9:49 AM, Aaron Davidson ilike...@gmail.com wrote: It looks like something weird is going on with your object serialization, perhaps a funny form of self-reference which is not detected by ObjectOutputStream's typical loop avoidance. That, or you have some data structure like a linked list with a parent pointer and you have many thousand elements. Assuming the stack trace is coming from an executor, it is probably a problem with the objects you're sending back as results, so I would carefully examine these and maybe try serializing some using ObjectOutputStream manually. If your program looks like foo.map { row = doComplexOperation(row) }.take(10) you can also try changing it to foo.map { row = doComplexOperation(row); 1 }.take(10) to avoid serializing the result of that complex operation, which should help narrow down where exactly the problematic objects are coming from. On Mon, Jan 26, 2015 at 8:31 AM, octavian.ganea octavian.ga...@inf.ethz.ch wrote: Here is the first error I get at the executors: 15/01/26 17:27:04 ERROR ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[handle-message-executor-16,5,main] java.lang.StackOverflowError at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1840) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1533) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
Re: How do you get the partitioner for an RDD in Java?
Thanks Imran. That's exactly what I needed to know. Darin. From: Imran Rashid iras...@cloudera.com To: Darin McBeath ddmcbe...@yahoo.com Cc: User user@spark.apache.org Sent: Tuesday, February 17, 2015 8:35 PM Subject: Re: How do you get the partitioner for an RDD in Java? a JavaRDD is just a wrapper around a normal RDD defined in scala, which is stored in the rdd field. You can access everything that way. The JavaRDD wrappers just provide some interfaces that are a bit easier to work with in Java. If this is at all convincing, here's me demonstrating it inside the spark-shell (yes its scala, but I'm using the java api) scala val jsc = new JavaSparkContext(sc) jsc: org.apache.spark.api.java.JavaSparkContext = org.apache.spark.api.java.JavaSparkContext@7d365529 scala val data = jsc.parallelize(java.util.Arrays.asList(Array(a, b, c))) data: org.apache.spark.api.java.JavaRDD[Array[String]] = ParallelCollectionRDD[0] at parallelize at console:15 scala data.rdd.partitioner res0: Option[org.apache.spark.Partitioner] = None On Tue, Feb 17, 2015 at 3:44 PM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: In an 'early release' of the Learning Spark book, there is the following reference: In Scala and Java, you can determine how an RDD is partitioned using its partitioner property (or partitioner() method in Java) However, I don't see the mentioned 'partitioner()' method in Spark 1.2 or a way of getting this information. I'm curious if anyone has any suggestions for how I might go about finding how an RDD is partitioned in a Java program. Thanks. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RangePartitioner in Spark 1.2.1
Hi, Sparkers: I just happened to search in google for something related to the RangePartitioner of spark, and found an old thread in this email list as here: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-and-Partition-td991.html I followed the code example mentioned in that email thread as following: scala import org.apache.spark.RangePartitionerimport org.apache.spark.RangePartitioner scala val rdd = sc.parallelize(List(apple, Ball, cat, dog, Elephant, fox, gas, horse, index, jet, kitsch, long, moon, Neptune, ooze, Pen, quiet, rose, sun, talk, umbrella, voice, Walrus, xeon, Yam, zebra))rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at console:13 scala rdd.keyBy(s = s(0).toUpper)res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy at console:16 scala res0.partitionBy(new RangePartitioner[Char, String](26, res0)).valuesres1: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at console:18 scala res1.mapPartitionsWithIndex((idx, itr) = itr.map(s = (idx, s))).collect.foreach(println) The above example is clear for me to understand the meaning of the RangePartitioner, but to my surprise, I got the following result: (0,apple)(0,Ball)(1,cat)(2,dog)(3,Elephant)(4,fox)(5,gas)(6,horse)(7,index)(8,jet)(9,kitsch)(10,long)(11,moon)(12,Neptune)(13,ooze)(14,Pen)(15,quiet)(16,rose)(17,sun)(18,talk)(19,umbrella)(20,voice)(21,Walrus)(22,xeon)(23,Yam)(24,zebra) instead of a perfect range index from 0 to 25 in old email thread. Why is that? Is this a bug, or some new feature I don't understand? BTW, the above environment I tested is in Spark 1.2.1 with Hadoop 2.4 binary release. Thanks Yong
JsonRDD to parquet -- data loss
Hi, I am running spark batch processing job using spark-submit command. And below is my code snippet. Basically converting JsonRDD to parquet and storing it in HDFS location. The problem I am facing is if multiple jobs are are triggered parallely, even though job executes properly (as i can see in spark webUI), there is no parquet file created in hdfs path. If 5 jobs are executed parallely than only 3 parquet files are getting created. Is this the data loss scenario ? Or am I missing something here. Please help me in this Here tableName is unique with timestamp appended to it. val sqlContext = new org.apache.spark.sql.SQLContext(sc) val jsonRdd = sqlContext.jsonRDD(results) val parquetTable = sqlContext.parquetFile(parquetFilePath) parquetTable.registerTempTable(tableName) jsonRdd.insertInto(tableName) Regards, Vasu C
Berlin Apache Spark Meetup
Hi, there is a small Spark Meetup group in Berlin, Germany :-) http://www.meetup.com/Berlin-Apache-Spark-Meetup/ Plaes add this group to the Meetups list at https://spark.apache.org/community.html Ralph - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Berlin Apache Spark Meetup
Thanks! I've added you. Matei On Feb 17, 2015, at 4:06 PM, Ralph Bergmann | the4thFloor.eu ra...@the4thfloor.eu wrote: Hi, there is a small Spark Meetup group in Berlin, Germany :-) http://www.meetup.com/Berlin-Apache-Spark-Meetup/ Plaes add this group to the Meetups list at https://spark.apache.org/community.html Ralph - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MapValues and Shuffle Reads
Hi Darrin, You are asking for something near dear to me: https://issues.apache.org/jira/browse/SPARK-1061 There is a PR attached there as well. Note that you could do everything in that PR in your own user code, you don't need to wait for it to get merged, *except* for the change to HadoopRDD so that it sorts the input partitions. (Though of course, you could always just have your implementation of HadoopRDD as well ...) you could also vote for the issue watch it as well to encourage some progress on it :) On Tue, Feb 17, 2015 at 2:56 PM, Darin McBeath ddmcbe...@yahoo.com wrote: Thanks Imran. I think you are probably correct. I was a bit surprised that there was no shuffle read in the initial hash partition step. I will adjust the code as you suggest to prove that is the case. I have a slightly different question. If I save an RDD to S3 (or some equivalent) and this RDD was hash partitioned at the time, do I still need to hash partition the RDD again when I read it in? Is there a way that I could prevent all of the shuffling (such as providing a hint)? My parts for the RDD will be gzipped so they would not be splittable). In reality, that's what I would really want to do in the first place. Thanks again for your insights. Darin. -- *From:* Imran Rashid iras...@cloudera.com *To:* Darin McBeath ddmcbe...@yahoo.com *Cc:* User user@spark.apache.org *Sent:* Tuesday, February 17, 2015 3:29 PM *Subject:* Re: MapValues and Shuffle Reads Hi Darin, When you say you see 400GB of shuffle writes from the first code snippet, what do you mean? There is no action in that first set, so it won't do anything. By itself, it won't do any shuffle writing, or anything else for that matter. Most likely, the .count() on your second code snippet is actually causing the execution of some of the first snippet as well. The .partitionBy will result in both shuffle writes and shuffle reads, but they aren't set in motion until the .count further down the line. Its confusing b/c the stage boundaries don't line up exactly with your RDD variables here. hsfBaselinePairRDD spans 2 stages, and baselinePairRDD actually gets merged into the stage above it. If you do a hsfBaselinePairRDD.count after your first code snippet, and then run the second code snippet afterwards, is it more like what you expect? Imran On Tue, Feb 17, 2015 at 1:52 PM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: In the following code, I read in a large sequence file from S3 (1TB) spread across 1024 partitions. When I look at the job/stage summary, I see about 400GB of shuffle writes which seems to make sense as I'm doing a hash partition on this file. // Get the baseline input file JavaPairRDDText,Text hsfBaselinePairRDDReadable = sc.hadoopFile(baselineInputBucketFile, SequenceFileInputFormat.class, Text.class, Text.class); JavaPairRDDString, String hsfBaselinePairRDD = hsfBaselinePairRDDReadable.mapToPair(new ConvertFromWritableTypes()).partitionBy(new HashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_AND_DISK_SER()); I then execute the following code (with a count to force execution) and what I find very strange is that when I look at the job/stage summary, I see more than 340GB of shuffle read. Why would there be any shuffle read in this step? I would expect there to be little (if any) shuffle reads in this step. // Use 'substring' to extract the epoch value from each record. JavaPairRDDString, Long baselinePairRDD = hsfBaselinePairRDD.mapValues(new ExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_AND_DISK_SER()); log.info(Number of baseline records: + baselinePairRDD.count()); Both hsfBaselinePairRDD and baselinePairRDD have 1024 partitions. Any insights would be appreciated. I'm using Spark 1.2.0 in a stand-alone cluster. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How do you get the partitioner for an RDD in Java?
a JavaRDD is just a wrapper around a normal RDD defined in scala, which is stored in the rdd field. You can access everything that way. The JavaRDD wrappers just provide some interfaces that are a bit easier to work with in Java. If this is at all convincing, here's me demonstrating it inside the spark-shell (yes its scala, but I'm using the java api) scala val jsc = new JavaSparkContext(sc) jsc: org.apache.spark.api.java.JavaSparkContext = org.apache.spark.api.java.JavaSparkContext@7d365529 scala val data = jsc.parallelize(java.util.Arrays.asList(Array(a, b, c))) data: org.apache.spark.api.java.JavaRDD[Array[String]] = ParallelCollectionRDD[0] at parallelize at console:15 scala data.rdd.partitioner res0: Option[org.apache.spark.Partitioner] = None On Tue, Feb 17, 2015 at 3:44 PM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: In an 'early release' of the Learning Spark book, there is the following reference: In Scala and Java, you can determine how an RDD is partitioned using its partitioner property (or partitioner() method in Java) However, I don't see the mentioned 'partitioner()' method in Spark 1.2 or a way of getting this information. I'm curious if anyone has any suggestions for how I might go about finding how an RDD is partitioned in a Java program. Thanks. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming output cannot be used as input?
Hello folks, Our intended use case is: - Spark Streaming app #1 reads from RabbitMQ and output to HDFS - Spark Streaming app #2 reads #1's output and stores the data into Elasticsearch The idea behind this architecture is that if Elasticsearch is down due to an upgrade or system error we don't have to stop reading messages from the queue. We could also scale each process separately as needed. After a few hours research my understanding is that Spark Streaming outputs files in a *directory* for which you provide the prefix and suffix. This is despite the ScalaDoc for DStream saveAsObjectFiles suggesting otherwise: /** * Save each RDD in this DStream as a Sequence file of serialized objects. * The file name at each batch interval is generated based on `prefix` and * `suffix`: prefix-TIME_IN_MS.suffix. */ Spark Streaming can monitor an HDFS directory for files but subfolders are not supported. So as far as I can tell, it is not possible to use Spark Streaming output as input for a different Spark Streaming app without somehow performing a separate operation in the middle. Am I missing something obvious? I've read some suggestions like using Hadoop to merge the directories (whose names I don't see how you would know) and to reduce the partitions to 1 (which wouldn't help). Any other suggestions? What is the expected pattern a developer would follow that would make Spark Streaming's output format usable? /prefont face=arial size=2 color=#736F6E a href=http://www.sdl.com/?utm_source=Emailutm_medium=Email%2BSignatureutm_campaign=SDL%2BStandard%2BEmail%2BSignature; img src=http://www.sdl.com/Content/images/SDLlogo2014.png; border=0brbrwww.sdl.com /abrbr font face=arial size=1 color=#736F6E bSDL PLC confidential, all rights reserved./b If you are not the intended recipient of this mail SDL requests and requires that you delete it without acting upon or copying any of its contents, and we further request that you advise us.BRBR SDL PLC is a public limited company registered in England and Wales. Registered number: 02675207. br Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, UK./font This message has been scanned for malware by Websense. www.websense.com
spark-core in a servlet
Hi, I want to use spark-core inside of a HttpServlet. I use Maven for the build task but I have a dependency problem :-( I get this error message: ClassCastException: com.sun.jersey.server.impl.container.servlet.JerseyServletContainerInitializer cannot be cast to javax.servlet.ServletContainerInitializer When I add this exclusions it builds but than there are other classes not found at runtime: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.11/artifactId version1.2.1/version exclusions exclusion groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId /exclusion exclusion groupIdorg.eclipse.jetty/groupId artifactId*/artifactId /exclusion /exclusions /dependency What can I do? Thanks a lot!, Ralph -- Ralph Bergmann iOS and Android app developer www http://www.the4thFloor.eu mail ra...@the4thfloor.eu skypedasralph google+ https://plus.google.com/+RalphBergmann xing https://www.xing.com/profile/Ralph_Bergmann3 linkedin https://www.linkedin.com/in/ralphbergmann gulp https://www.gulp.de/Profil/RalphBergmann.html github https://github.com/the4thfloor pgp key id 0x421F9B78 pgp fingerprint CEE3 7AE9 07BE 98DF CD5A E69C F131 4A8E 421F 9B78 project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd; modelVersion4.0.0/modelVersion nameSparkWordCount/name version1.0-SNAPSHOT/version groupIdeu.the4thfloor/groupId artifactIdSparkWordCount/artifactId packagingwar/packaging urlhttp://maven.apache.org/url properties jdk.version1.7/jdk.version /properties dependencies dependency groupIdjunit/groupId artifactIdjunit/artifactId version4.11/version scopetest/scope /dependency dependency groupIdjavax.servlet/groupId artifactIdjavax.servlet-api/artifactId version3.1.0/version scopeprovided/scope /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.11/artifactId version1.2.1/version exclusions exclusion groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId /exclusion exclusion groupIdorg.eclipse.jetty/groupId artifactId*/artifactId /exclusion /exclusions /dependency /dependencies build finalNameSparkWordCount/finalName plugins !-- Eclipse project -- plugin groupIdorg.apache.maven.plugins/groupId artifactIdmaven-eclipse-plugin/artifactId version2.9/version configuration !-- Always download and attach dependencies source code -- downloadSourcestrue/downloadSources downloadJavadocsfalse/downloadJavadocs !-- Avoid type mvn eclipse:eclipse -Dwtpversion=2.0 -- wtpversion2.0/wtpversion /configuration /plugin !-- Set JDK Compiler Level -- plugin groupIdorg.apache.maven.plugins/groupId artifactIdmaven-compiler-plugin/artifactId version2.3.2/version configuration source${jdk.version}/source target${jdk.version}/target /configuration /plugin !-- For Maven Tomcat Plugin -- plugin groupIdorg.apache.tomcat.maven/groupId artifactIdtomcat7-maven-plugin/artifactId version2.2/version configuration path/SparkWordCount/path /configuration /plugin /plugins /build /project signature.asc Description: OpenPGP digital signature
Re: Lost task - connection closed
Hi, Thanks for the reponse. I discovered my problem was that some of the executors got OOM, tracing down the logs of executors helps discovering the problem. Usually the log from the driver do not reflect the OOM error and therefore causes confusions among users. This is just the discoveries on my side, not sure if OP was having the same problem though On Wed, Feb 11, 2015 at 12:03 AM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: Hi Can you share the code you are trying to run. Thanks Arush On Wed, Feb 11, 2015 at 9:12 AM, Tianshuo Deng td...@twitter.com.invalid wrote: I have seen the same problem, It causes some tasks to fail, but not the whole job to fail. Hope someone could shed some light on what could be the cause of this. On Mon, Jan 26, 2015 at 9:49 AM, Aaron Davidson ilike...@gmail.com wrote: It looks like something weird is going on with your object serialization, perhaps a funny form of self-reference which is not detected by ObjectOutputStream's typical loop avoidance. That, or you have some data structure like a linked list with a parent pointer and you have many thousand elements. Assuming the stack trace is coming from an executor, it is probably a problem with the objects you're sending back as results, so I would carefully examine these and maybe try serializing some using ObjectOutputStream manually. If your program looks like foo.map { row = doComplexOperation(row) }.take(10) you can also try changing it to foo.map { row = doComplexOperation(row); 1 }.take(10) to avoid serializing the result of that complex operation, which should help narrow down where exactly the problematic objects are coming from. On Mon, Jan 26, 2015 at 8:31 AM, octavian.ganea octavian.ga...@inf.ethz.ch wrote: Here is the first error I get at the executors: 15/01/26 17:27:04 ERROR ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[handle-message-executor-16,5,main] java.lang.StackOverflowError at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1840) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1533) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
Re: JsonRDD to parquet -- data loss
I am not sure, if this the easiest way to solve your problem. But you can connect to the HIVE metastore(through derby) and find the HDFS path from there. On Wed, Feb 18, 2015 at 9:31 AM, Vasu C vasuc.bigd...@gmail.com wrote: Hi, I am running spark batch processing job using spark-submit command. And below is my code snippet. Basically converting JsonRDD to parquet and storing it in HDFS location. The problem I am facing is if multiple jobs are are triggered parallely, even though job executes properly (as i can see in spark webUI), there is no parquet file created in hdfs path. If 5 jobs are executed parallely than only 3 parquet files are getting created. Is this the data loss scenario ? Or am I missing something here. Please help me in this Here tableName is unique with timestamp appended to it. val sqlContext = new org.apache.spark.sql.SQLContext(sc) val jsonRdd = sqlContext.jsonRDD(results) val parquetTable = sqlContext.parquetFile(parquetFilePath) parquetTable.registerTempTable(tableName) jsonRdd.insertInto(tableName) Regards, Vasu C -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: OutOfMemory and GC limits (TODO) Error in map after self-join
Thanks for the reply, I'll try your suggestions. Apologies, in my previous post I was mistaken. rdd is actually an PairRDD of (Int, Int). I'm doing the self-join so I can count two things. First, I can count the number of times a value appears in the data set. Second I can count number of times values occur with the same key. For example, if I have the following: (1,2) (1,3) (4,3) Then joining with itself I get: (1,(2,2)) - map - ((2,2),1) - reduceByKey - ((2,2),1) (1,(2,3)) - map - ((2,3),1) - reduceByKey - ((2,3),1) (1,(3,2)) - map - ((3,2),1) - reduceByKey - ((3,2),1) (1,(3,3)) - map - ((3,3),1) - reduceByKey - ((3,3),2) (4,(3,3)) - map - ((3,3),1) _| Note that I want to keep the duplicates (2,2) and reflections. Rgds On 18 February 2015 at 09:00, Akhil Das ak...@sigmoidanalytics.com wrote: Why are you joining the rdd with itself? You can try these things: - Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory. - Set your default Serializer to Kryo (.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)) - Enable rdd compression (.set(spark.rdd.compress,true)) Thanks Best Regards On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn twal...@gmail.com wrote: Hi All, I'm a new Spark (and Hadoop) user and I want to find out if the cluster resources I am using are feasible for my use-case. The following is a snippet of code that is causing a OOM exception in the executor after about 125/1000 tasks during the map stage. val rdd2 = rdd.join(rdd, numPartitions=1000) .map(fp=((fp._2._1, fp._2._2), 1)) .reduceByKey((x,y)=x+y) rdd2.count() Which errors with a stack trace like: 15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in stage 2.0 (TID 498) java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168) at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.immutable.List.foreach(List.scala:318) rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of co-occuring values by key in the dataset, i.e. 'These two numbers occurred with the same key n times'. I intentionally don't want to filter out duplicates and reflections. rdd is about 3.6 million records, which has a size in memory of about 120MB, and results in a 'joined' RDD (before the reduceByKey stage) of around 460 million records, with a size in memory of about 35GB. My cluster setup is as follows. I have 3 nodes, where each node has 2 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and executors are allowed 1280m each and the job has 5 executors and 1 driver. Additionally, I have set spark.storage.memoryFraction to 0.06, and spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate the issue. I've also tried increasing the number of partitions after the join dramatically (up to 15000). Nothing has been effective. Thus, I'm beginning to suspect I don't have enough resources for the job. Does anyone have a feeling about what the resource requirements would be for a use-case like this? I could scale the cluster up if necessary, but would like to avoid it. I'm willing to accept longer computation times if that is an option. Warm Regards, Thomas
Class loading issue, spark.files.userClassPathFirst doesn't seem to be working
I'm getting the below error when running spark-submit on my class. This class has a transitive dependency on HttpClient v.4.3.1 since I'm calling SolrJ 4.10.3 from within the class. This is in conflict with the older version, HttpClient 3.1 that's a dependency of Hadoop 2.4 (I'm running Spark 1.2.1 built for Hadoop 2.4). I've tried setting spark.files.userClassPathFirst to true in SparkConf in my program, also setting it to true in $SPARK-HOME/conf/spark-defaults.conf as spark.files.userClassPathFirst true No go, I'm still getting the error, as below. Is there anything else I can try? Are there any plans in Spark to support multiple class loaders? Exception in thread main java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry; at org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121) at org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445) at org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206) at org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35) at org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142) at org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141) ... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Class-loading-issue-spark-files-userClassPathFirst-doesn-t-seem-to-be-working-tp21693.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
OutOfMemory and GC limits (TODO) Error in map after self-join
Hi All, I'm a new Spark (and Hadoop) user and I want to find out if the cluster resources I am using are feasible for my use-case. The following is a snippet of code that is causing a OOM exception in the executor after about 125/1000 tasks during the map stage. val rdd2 = rdd.join(rdd, numPartitions=1000) .map(fp=((fp._2._1, fp._2._2), 1)) .reduceByKey((x,y)=x+y) rdd2.count() Which errors with a stack trace like: 15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in stage 2.0 (TID 498) java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168) at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.immutable.List.foreach(List.scala:318) rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of co-occuring values by key in the dataset, i.e. 'These two numbers occurred with the same key n times'. I intentionally don't want to filter out duplicates and reflections. rdd is about 3.6 million records, which has a size in memory of about 120MB, and results in a 'joined' RDD (before the reduceByKey stage) of around 460 million records, with a size in memory of about 35GB. My cluster setup is as follows. I have 3 nodes, where each node has 2 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and executors are allowed 1280m each and the job has 5 executors and 1 driver. Additionally, I have set spark.storage.memoryFraction to 0.06, and spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate the issue. I've also tried increasing the number of partitions after the join dramatically (up to 15000). Nothing has been effective. Thus, I'm beginning to suspect I don't have enough resources for the job. Does anyone have a feeling about what the resource requirements would be for a use-case like this? I could scale the cluster up if necessary, but would like to avoid it. I'm willing to accept longer computation times if that is an option. Warm Regards, Thomas
RE: Percentile example
Thanks Imran for very detailed explanations and options. I think for now T-Digest is what I want. From: iras...@cloudera.com Date: Tue, 17 Feb 2015 08:39:48 -0600 Subject: Re: Percentile example To: myl...@hotmail.com CC: user@spark.apache.org (trying to repost to the list w/out URLs -- rejected as spam earlier) Hi, Using take() is not a good idea, as you have noted it will pull a lot of data down to the driver so its not scalable. Here are some more scalable alternatives: 1. Approximate solutions 1a. Sample the data. Just sample some of the data to the driver, sort that data in memory, and take the 66th percentile of that sample. 1b. Make a histogram with pre-determined buckets. Eg., if you know your data ranges from 0 to 1 and is uniform-ish, you could make buckets every 0.01. Then count how many data points go into each bucket. Or if you only care about relative error and you have integers (often the case if your data is counts), then you can span the full range of integers with a relatively small number of buckets. Eg., you only need 200 buckets for 5% error. See the Histogram class in twitter's Ostrich library The problem is, if you have no idea what the distribution of your data is, its very hard to come up with good buckets; you could have an arbitrary amount of data going to one bucket, and thus tons of error. 1c. Use a TDigest , a compact scalable data structure for approximating distributions, and performs reasonably across a wide range of distributions. You would make one TDigest for each partition (with mapPartitions), and then merge all of the TDigests together. I wrote up a little more detail on this earlier, you can search the spark-user on nabble for tdigest 2. Exact solutions. There are also a few options here, but I'll give one that is a variant of what you suggested. Start out by doing a sortByKey. Then figure out how many records you have in each partitions (with mapPartitions). Figure out which partition the 66th percentile would be in. Then just read the one partition you want, and go down to the Nth record in that partition. To read the one partition you want, you can either (a) use mapPartitionsWithIndex, and just ignore every partition that isnt' the one you want or (b) use PartitionPruningRDD. PartitionPruningRDD will avoid launching empty tasks on the other partitions, so it will be slightly more efficient, but its also a developer api, so perhaps not worth going to that level of detail. Note that internally, sortByKey will sample your data to get an approximate distribution, to figure out what data to put in each partition. However, your still getting an exact answer this way -- the approximation is only important for distributing work among all executors. Even if the approximation is inaccurate, you'll still correct for it, you will just have unequal partitions. Imran On Sun, Feb 15, 2015 at 9:37 AM, SiMaYunRui myl...@hotmail.com wrote: hello, I am a newbie to spark and trying to figure out how to get percentile against a big data set. Actually, I googled this topic but not find any very useful code example and explanation. Seems that I can use transformer SortBykey to get my data set in order, but not pretty sure how can I get value of , for example, percentile 66. Should I use take() to pick up the value of percentile 66? I don't believe any machine can load my data set in memory. I believe there must be more efficient approaches. Can anyone shed some light on this problem? Regards
Re: OutOfMemory and GC limits (TODO) Error in map after self-join
Why are you joining the rdd with itself? You can try these things: - Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory. - Set your default Serializer to Kryo (.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)) - Enable rdd compression (.set(spark.rdd.compress,true)) Thanks Best Regards On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn twal...@gmail.com wrote: Hi All, I'm a new Spark (and Hadoop) user and I want to find out if the cluster resources I am using are feasible for my use-case. The following is a snippet of code that is causing a OOM exception in the executor after about 125/1000 tasks during the map stage. val rdd2 = rdd.join(rdd, numPartitions=1000) .map(fp=((fp._2._1, fp._2._2), 1)) .reduceByKey((x,y)=x+y) rdd2.count() Which errors with a stack trace like: 15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in stage 2.0 (TID 498) java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168) at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.immutable.List.foreach(List.scala:318) rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of co-occuring values by key in the dataset, i.e. 'These two numbers occurred with the same key n times'. I intentionally don't want to filter out duplicates and reflections. rdd is about 3.6 million records, which has a size in memory of about 120MB, and results in a 'joined' RDD (before the reduceByKey stage) of around 460 million records, with a size in memory of about 35GB. My cluster setup is as follows. I have 3 nodes, where each node has 2 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and executors are allowed 1280m each and the job has 5 executors and 1 driver. Additionally, I have set spark.storage.memoryFraction to 0.06, and spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate the issue. I've also tried increasing the number of partitions after the join dramatically (up to 15000). Nothing has been effective. Thus, I'm beginning to suspect I don't have enough resources for the job. Does anyone have a feeling about what the resource requirements would be for a use-case like this? I could scale the cluster up if necessary, but would like to avoid it. I'm willing to accept longer computation times if that is an option. Warm Regards, Thomas
Re: spark-core in a servlet
I am not sure if this could be causing the issue but spark is compatible with scala 2.10. Instead of spark-core_2.11 you might want to try spark-core_2.10 On Wed, Feb 18, 2015 at 5:44 AM, Ralph Bergmann | the4thFloor.eu ra...@the4thfloor.eu wrote: Hi, I want to use spark-core inside of a HttpServlet. I use Maven for the build task but I have a dependency problem :-( I get this error message: ClassCastException: com.sun.jersey.server.impl.container.servlet.JerseyServletContainerInitializer cannot be cast to javax.servlet.ServletContainerInitializer When I add this exclusions it builds but than there are other classes not found at runtime: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.11/artifactId version1.2.1/version exclusions exclusion groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId /exclusion exclusion groupIdorg.eclipse.jetty/groupId artifactId*/artifactId /exclusion /exclusions /dependency What can I do? Thanks a lot!, Ralph -- Ralph Bergmann iOS and Android app developer www http://www.the4thFloor.eu mail ra...@the4thfloor.eu skypedasralph google+ https://plus.google.com/+RalphBergmann xing https://www.xing.com/profile/Ralph_Bergmann3 linkedin https://www.linkedin.com/in/ralphbergmann gulp https://www.gulp.de/Profil/RalphBergmann.html github https://github.com/the4thfloor pgp key id 0x421F9B78 pgp fingerprint CEE3 7AE9 07BE 98DF CD5A E69C F131 4A8E 421F 9B78 -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
RE: Percentile example
Thanks Kohler, that's very interesting approach. I never used Spark SQL and not sure whether my cluster was configured well for it. But will definitely have a try. From: c.koh...@elsevier.com To: myl...@hotmail.com; user@spark.apache.org Subject: Re: Percentile example Date: Tue, 17 Feb 2015 17:41:53 + The best approach I’ve found to calculate Percentiles in Spark is to leverage SparkSQL. If you use the Hive Query Language support, you can use the UDAFs for percentiles (as of Spark 1.2) Something like this (Note: syntax not guaranteed to run but should give you the gist of what you need to do): JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaHiveContext hsc = new JavaHiveContext(sc); //Get your Data into a SchemaRDD and register the Table // Query it String hql = SELECT FIELD1, FIELD2, percentile(FIELD3, 0.05) AS ptile5 from TABLE-NAME GROUP BY FIELD1, FIELD2;” JavaSchemaRDD result = hsc.hql(hql); ListRow grp = result.collect(); for (int z = 2; z row.length(); z++) { // Do something with the results } Curt From: SiMaYunRui myl...@hotmail.com Date: Sunday, February 15, 2015 at 10:37 AM To: user@spark.apache.org user@spark.apache.org Subject: Percentile example hello, I am a newbie to spark and trying to figure out how to get percentile against a big data set. Actually, I googled this topic but not find any very useful code example and explanation. Seems that I can use transformer SortBykey to get my data set in order, but not pretty sure how can I get value of , for example, percentile 66. Should I use take() to pick up the value of percentile 66? I don't believe any machine can load my data set in memory. I believe there must be more efficient approaches. Can anyone shed some light on this problem? Regards
Re: How to pass parameters to a spark-jobserver Scala class?
Hi Sasi, To pass parameters to spark-jobserver usecurl -d input.string = a b c a b see and in Job server class use config.getString(input.string). You can pass multiple parameters like starttime,endtime etc and use config.getString() to get the values. The examples are shown here https://github.com/spark-jobserver/spark-jobserver Regards, Vasu C -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-pass-parameters-to-a-spark-jobserver-Scala-class-tp21671p21692.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RangePartitioner in Spark 1.2.1
RangePartitioner does not actually provide a guarantee that all partitions will be equal sized (that is hard), and instead uses sampling to approximate equal buckets. Thus, it is possible that a bucket is left empty. If you want the specified behavior, you should define your own partitioner. It would look something like this (untested): class AlphabetPartitioner extends Partitioner { def numPartitions = 26 def getPartition(key: Any): Int = key match { case s: String = s(0).toUpper - 'A' } override def equals(other: Any): Boolean = other.isInstanceOf[AlphabetPartitioner] override def hashCode: Int = 0 } On Tue, Feb 17, 2015 at 7:05 PM, java8964 java8...@hotmail.com wrote: Hi, Sparkers: I just happened to search in google for something related to the RangePartitioner of spark, and found an old thread in this email list as here: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-and-Partition-td991.html I followed the code example mentioned in that email thread as following: scala import org.apache.spark.RangePartitioner import org.apache.spark.RangePartitioner scala val rdd = sc.parallelize(List(apple, Ball, cat, dog, Elephant, fox, gas, horse, index, jet, kitsch, long, moon, Neptune, ooze, Pen, quiet, rose, sun, talk, umbrella, voice, Walrus, xeon, Yam, zebra)) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at console:13 scala rdd.keyBy(s = s(0).toUpper) res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy at console:16 scala res0.partitionBy(new RangePartitioner[Char, String](26, res0)).values res1: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at console:18 scala res1.mapPartitionsWithIndex((idx, itr) = itr.map(s = (idx, s))).collect.foreach(println) The above example is clear for me to understand the meaning of the RangePartitioner, but to my surprise, I got the following result: *(0,apple)* *(0,Ball)* (1,cat) (2,dog) (3,Elephant) (4,fox) (5,gas) (6,horse) (7,index) (8,jet) (9,kitsch) (10,long) (11,moon) (12,Neptune) (13,ooze) (14,Pen) (15,quiet) (16,rose) (17,sun) (18,talk) (19,umbrella) (20,voice) (21,Walrus) (22,xeon) (23,Yam) (24,zebra) instead of a perfect range index from 0 to 25 in old email thread. Why is that? Is this a bug, or some new feature I don't understand? BTW, the above environment I tested is in Spark 1.2.1 with Hadoop 2.4 binary release. Thanks Yong
Re: Class loading issue, spark.files.userClassPathFirst doesn't seem to be working
Hi Did you try to make maven pick the latest version http://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Management That way solrj won't cause any issue, you can try this and check if the part of your code where you access HDFS works fine? On Wed, Feb 18, 2015 at 10:23 AM, dgoldenberg dgoldenberg...@gmail.com wrote: I'm getting the below error when running spark-submit on my class. This class has a transitive dependency on HttpClient v.4.3.1 since I'm calling SolrJ 4.10.3 from within the class. This is in conflict with the older version, HttpClient 3.1 that's a dependency of Hadoop 2.4 (I'm running Spark 1.2.1 built for Hadoop 2.4). I've tried setting spark.files.userClassPathFirst to true in SparkConf in my program, also setting it to true in $SPARK-HOME/conf/spark-defaults.conf as spark.files.userClassPathFirst true No go, I'm still getting the error, as below. Is there anything else I can try? Are there any plans in Spark to support multiple class loaders? Exception in thread main java.lang.NoSuchMethodError: org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry; at org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121) at org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445) at org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206) at org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35) at org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142) at org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168) at org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141) ... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Class-loading-issue-spark-files-userClassPathFirst-doesn-t-seem-to-be-working-tp21693.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: Identify the performance bottleneck from hardware prospective
It would be good if you can share the piece of code that you are using, so people can suggest you how to optimize it further and stuffs like that. Also, since you are having 20Gb of memory and ~30Gb of data, you can try doing a rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) or .persist(StorageLevel.MEMORY_AND_DISK_2), ~12Gb of memory will be usable by default out of 20Gb, you can increase it by setting spark.storage.memoryFraction. Thanks Best Regards On Tue, Feb 17, 2015 at 4:06 PM, Julaiti Alafate jalaf...@eng.ucsd.edu wrote: Thank you very much for your reply! My task is to count the number of word pairs in a document. If w1 and w2 occur together in one sentence, the number of occurrence of word pair (w1, w2) adds 1. So the computational part of this algorithm is simply a two-level for-loop. Since the cluster is monitored by Ganglia, I can easily see that neither CPU or network IO is under pressure. The only parameter left is memory. In the executor tab of Spark Web UI, I can see a column named memory used. It showed that only 6GB of 20GB memory is used. I understand this is measuring the size of RDD that persist in memory. So can I at least assume the data/object I used in my program is not exceeding memory limit? My confusion here is, why can't my program run faster while there is still efficient memory, CPU time and network bandwidth it can utilize? Best regards, Julaiti On Tue, Feb 17, 2015 at 12:53 AM, Akhil Das ak...@sigmoidanalytics.com wrote: What application are you running? Here's a few things: - You will hit bottleneck on CPU if you are doing some complex computation (like parsing a json etc.) - You will hit bottleneck on Memory if your data/objects used in the program is large (like defining playing with HashMaps etc inside your map* operations), Here you can set spark.executor.memory to a higher number and also you can change the spark.storage.memoryFraction whose default value is 0.6 of your executor memory. - Network will be a bottleneck if data is not available locally on one of the worker and hence it has to collect it from others, which is a lot of Serialization and data transfer across your cluster. Thanks Best Regards On Tue, Feb 17, 2015 at 11:20 AM, Julaiti Alafate jalaf...@eng.ucsd.edu wrote: Hi there, I am trying to scale up the data size that my application is handling. This application is running on a cluster with 16 slave nodes. Each slave node has 60GB memory. It is running in standalone mode. The data is coming from HDFS that also in same local network. In order to have an understanding on how my program is running, I also had a Ganglia installed on the cluster. From previous run, I know the stage that taking longest time to run is counting word pairs (my RDD consists of sentences from a corpus). My goal is to identify the bottleneck of my application, then modify my program or hardware configurations according to that. Unfortunately, I didn't find too much information on Spark monitoring and optimization topics. Reynold Xin gave a great talk on Spark Summit 2014 for application tuning from tasks perspective. Basically, his focus is on tasks that oddly slower than the average. However, it didn't solve my problem because there is no such tasks that run way slow than others in my case. So I tried to identify the bottleneck from hardware prospective. I want to know what the limitation of the cluster is. I think if the executers are running hard, either CPU, memory or network bandwidth (or maybe the combinations) is hitting the roof. But Ganglia reports the CPU utilization of cluster is no more than 50%, network utilization is high for several seconds at the beginning, then drop close to 0. From Spark UI, I can see the nodes with maximum memory usage is consuming around 6GB, while spark.executor.memory is set to be 20GB. I am very confused that the program is not running fast enough, while hardware resources are not in shortage. Could you please give me some hints about what decides the performance of a Spark application from hardware perspective? Thanks! Julaiti
Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?
Emre, As you are keeping the properties file external to the JAR you need to make sure to submit the properties file as an additional --files (or whatever the necessary CLI switch is) so all the executors get a copy of the file along with the JAR. If you know you are going to just put the properties file on HDFS then why don't you define a custom system setting like properties.url and pass it along: (this is for Spark shell, the only CLI string I have available at the moment:) spark-shell --jars $JAR_NAME \ --conf 'properties.url=hdfs://config/stuff.properties' \ --conf 'spark.executor.extraJavaOptions=-Dproperties.url=hdfs://config/stuff.properties' ... then load the properties file during initialization by examining the properties.url system setting. I'd still strongly recommend Typesafe Config as it makes this a lot less painful, and I know for certain you can place your *.conf at a URL (using the -Dconfig.url=) though it probably won't work with an HDFS URL. On Tue Feb 17 2015 at 9:53:08 AM Gerard Maas gerard.m...@gmail.com wrote: +1 for TypeSafe config Our practice is to include all spark properties under a 'spark' entry in the config file alongside job-specific configuration: A config file would look like: spark { master = cleaner.ttl = 123456 ... } job { context { src = foo action = barAction } prop1 = val1 } Then, to create our Spark context, we transparently pass the spark section to a SparkConf instance. This idiom will instantiate the context with the spark specific configuration: sparkConfig.setAll(configToStringSeq(config.getConfig(spark).atPath(spark))) And we can make use of the config object everywhere else. We use the override model of the typesafe config: reasonable defaults go in the reference.conf (within the jar). Environment-specific overrides go in the application.conf (alongside the job jar) and hacks are passed with -Dprop=value :-) -kr, Gerard. On Tue, Feb 17, 2015 at 1:45 PM, Emre Sevinc emre.sev...@gmail.com wrote: I've decided to try spark-submit ... --conf spark.driver.extraJavaOptions=-DpropertiesFile=/home/emre/data/myModule.properties But when I try to retrieve the value of propertiesFile via System.err.println(propertiesFile : + System.getProperty(propertiesFile)); I get NULL: propertiesFile : null Interestingly, when I run spark-submit with --verbose, I see that it prints: spark.driver.extraJavaOptions - -DpropertiesFile=/home/emre/data/belga/schemavalidator.properties I couldn't understand why I couldn't get to the value of propertiesFile by using standard System.getProperty method. (I can use new SparkConf().get(spark.driver.extraJavaOptions) and manually parse it, and retrieve the value, but I'd like to know why I cannot retrieve that value using System.getProperty method). Any ideas? If I can achieve what I've described above properly, I plan to pass a properties file that resides on HDFS, so that it will be available to my driver program wherever that program runs. -- Emre On Mon, Feb 16, 2015 at 4:41 PM, Charles Feduke charles.fed...@gmail.com wrote: I haven't actually tried mixing non-Spark settings into the Spark properties. Instead I package my properties into the jar and use the Typesafe Config[1] - v1.2.1 - library (along with Ficus[2] - Scala specific) to get at my properties: Properties file: src/main/resources/integration.conf (below $ENV might be set to either integration or prod[3]) ssh -t root@$HOST /root/spark/bin/spark-shell --jars /root/$JAR_NAME \ --conf 'config.resource=$ENV.conf' \ --conf 'spark.executor.extraJavaOptions=-Dconfig.resource=$ENV.conf' Since the properties file is packaged up with the JAR I don't have to worry about sending the file separately to all of the slave nodes. Typesafe Config is written in Java so it will work if you're not using Scala. (The Typesafe Config also has the advantage of being extremely easy to integrate with code that is using Java Properties today.) If you instead want to send the file separately from the JAR and you use the Typesafe Config library, you can specify config.file instead of .resource; though I'd point you to [3] below if you want to make your development life easier. 1. https://github.com/typesafehub/config 2. https://github.com/ceedubs/ficus 3. http://deploymentzone.com/2015/01/27/spark-ec2-and-easy-spark-shell-deployment/ On Mon Feb 16 2015 at 10:27:01 AM Emre Sevinc emre.sev...@gmail.com wrote: Hello, I'm using Spark 1.2.1 and have a module.properties file, and in it I have non-Spark properties, as well as Spark properties, e.g.: job.output.dir=file:///home/emre/data/mymodule/out I'm trying to pass it to spark-submit via: spark-submit --class com.myModule --master local[4] --deploy-mode client --verbose --properties-file /home/emre/data/mymodule.properties mymodule.jar And I
Re: Percentile example
(trying to repost to the list w/out URLs -- rejected as spam earlier) Hi, Using take() is not a good idea, as you have noted it will pull a lot of data down to the driver so its not scalable. Here are some more scalable alternatives: 1. Approximate solutions 1a. Sample the data. Just sample some of the data to the driver, sort that data in memory, and take the 66th percentile of that sample. 1b. Make a histogram with pre-determined buckets. Eg., if you know your data ranges from 0 to 1 and is uniform-ish, you could make buckets every 0.01. Then count how many data points go into each bucket. Or if you only care about relative error and you have integers (often the case if your data is counts), then you can span the full range of integers with a relatively small number of buckets. Eg., you only need 200 buckets for 5% error. See the Histogram class in twitter's Ostrich library The problem is, if you have no idea what the distribution of your data is, its very hard to come up with good buckets; you could have an arbitrary amount of data going to one bucket, and thus tons of error. 1c. Use a TDigest , a compact scalable data structure for approximating distributions, and performs reasonably across a wide range of distributions. You would make one TDigest for each partition (with mapPartitions), and then merge all of the TDigests together. I wrote up a little more detail on this earlier, you can search the spark-user on nabble for tdigest 2. Exact solutions. There are also a few options here, but I'll give one that is a variant of what you suggested. Start out by doing a sortByKey. Then figure out how many records you have in each partitions (with mapPartitions). Figure out which partition the 66th percentile would be in. Then just read the one partition you want, and go down to the Nth record in that partition. To read the one partition you want, you can either (a) use mapPartitionsWithIndex, and just ignore every partition that isnt' the one you want or (b) use PartitionPruningRDD. PartitionPruningRDD will avoid launching empty tasks on the other partitions, so it will be slightly more efficient, but its also a developer api, so perhaps not worth going to that level of detail. Note that internally, sortByKey will sample your data to get an approximate distribution, to figure out what data to put in each partition. However, your still getting an exact answer this way -- the approximation is only important for distributing work among all executors. Even if the approximation is inaccurate, you'll still correct for it, you will just have unequal partitions. Imran On Sun, Feb 15, 2015 at 9:37 AM, SiMaYunRui myl...@hotmail.com wrote: hello, I am a newbie to spark and trying to figure out how to get percentile against a big data set. Actually, I googled this topic but not find any very useful code example and explanation. Seems that I can use transformer SortBykey to get my data set in order, but not pretty sure how can I get value of , for example, percentile 66. Should I use take() to pick up the value of percentile 66? I don't believe any machine can load my data set in memory. I believe there must be more efficient approaches. Can anyone shed some light on this problem? *Regards*
Processing graphs
Hi, I am working on a Spark application that processes graphs and I am trying to do the following. - group the vertices (key - vertex, value - set of its outgoing edges) - distribute each key to separate processes and process them (like mapper) - reduce the results back at the main process Does the groupBy functionality do the distribution by default? Do we have to explicitly use RDDs to enable automatic distribution? It'd be great if you could help me understand these and how to go about with the problem. Thanks.
Re: Tuning number of partitions per CPU
More tasks means a little *more* total CPU time is required, not less, because of the overhead of handling tasks. However, more tasks can actually mean less wall-clock time. This is because tasks vary in how long they take. If you have 1 task per core, the job takes as long as the slowest task and at the end all other cores are idling. Splitting up the tasks makes the distribution of time taken across cores more even on average. That is, the variance of task completion times is higher than the variance of the sum of N task completion times each of which is 1/N the size. Whether this is actually better depends on the per-task overhead and variance in execution time. With high overhead and low variance, one task per core is probably optimal. On Tue, Feb 17, 2015 at 3:38 PM, Igor Petrov igorpetrov...@gmail.com wrote: Hello, thank for your replies. The question is actually about the recommendation in Spark docs: Typically you want 2-4 partitions for each CPU in your cluster. Why having several partitions per CPU is better than one partition per CPU? How one CPU can handle several tasks faster than one task? Thank You On Fri, Feb 13, 2015 at 2:44 PM, Puneet Kumar Ojha puneet.ku...@pubmatic.com wrote: Use below configuration if u r using 1.2 version:- SET spark.shuffle.consolidateFiles=true; SET spark.rdd.compress=true; SET spark.default.parallelism=1000; SET spark.deploy.defaultCores=54; Thanks Puneet. -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Friday, February 13, 2015 4:46 PM To: Igor Petrov Cc: user@spark.apache.org Subject: Re: Tuning number of partitions per CPU 18 cores or 36? doesn't probably matter. For this case where you have some overhead per partition of setting up the DB connection, it may indeed not help to chop up the data more finely than your total parallelism. Although that would imply quite an overhead. Are you doing any other expensive initialization per partition in your code? You might check some other basic things, like, are you bottlenecked on the DB (probably not) and are there task stragglers drawing out the completion time. On Fri, Feb 13, 2015 at 11:06 AM, Igor Petrov igorpetrov...@gmail.com wrote: Hello, In Spark programming guide (http://spark.apache.org/docs/1.2.0/programming-guide.html) there is a recommendation: Typically you want 2-4 partitions for each CPU in your cluster. We have a Spark Master and two Spark workers each with 18 cores and 18 GB of RAM. In our application we use JdbcRDD to load data from a DB and then cache it. We load entities from a single table, now we have 76 million of entities (entity size in memory is about 160 bytes). We call count() during application startup to force entities loading. Here are our measurements for count() operation (cores x partitions = time): 36x36 = 6.5 min 36x72 = 7.7 min 36x108 = 9.4 min So despite recommendations the most efficient setup is one partition per core. What is the reason for above recommendation? Java 8, Apache Spark 1.1.0 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Tuning-number-of-p artitions-per-CPU-tp21642.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: SparkSQL + Tableau Connector
Hi Todd, When I see /data/json appears in your log, I have a feeling that that is the default hive.metastore.warehouse.dir from hive-site.xml where the value is /data/. Could you check that property and see if you can point that to the correct Hive table HDFS directory? Another thing to look at is the Hive metastore mysql database if you are using mysql as the DB for metastore. Date: Wed, 11 Feb 2015 19:53:35 -0500 Subject: Re: SparkSQL + Tableau Connector From: tsind...@gmail.com To: alee...@hotmail.com CC: ar...@sigmoidanalytics.com; user@spark.apache.org First sorry for the long post. So back to tableau and Spark SQL, I'm still missing something. TL;DR To get the Spark SQL Temp table associated with the metastore are there additional steps required beyond doing the below? Initial SQL on connection: create temporary table test using org.apache.spark.sql.json options (path '/data/json/*'); cache table test; I feel like I'm missing a step of associating the Spark SQL table with the metastore, do I need to actually save it in some fashion? I'm trying to avoid saving to hive if possible. Details: I configured the hive-site.xml and placed it in the $SPARK_HOME/conf. It looks like this, thanks Andrew and Arush for the assistance: ?xml version=1.0??xml-stylesheet type=text/xsl href=configuration.xsl? configuration propertynamehive.semantic.analyzer.factory.impl/name valueorg.apache.hcatalog.cli.HCatSemanticAnalyzerFactory/value /property propertynamehive.metastore.sasl.enabled/name valuefalse/value /property propertynamehive.server2.authentication/namevalueNONE/value /property propertynamehive.server2.enable.doAs/namevaluetrue/value /property !-- propertynamehive.metastore.uris/name valuethrift://localhost:9083/valuedescriptionIP address (or fully-qualified domain name) and port of the metastore host/description /property -- propertynamehive.warehouse.subdir.inherit.perms/name valuetrue/value /property propertynamehive.metastore.schema.verification/name valuefalse/value /property propertynamejavax.jdo.option.ConnectionURL/name valuejdbc:mysql://localhost:3306/metastore_db?createDatabaseIfNotExist=true/value descriptionmetadata is stored in a MySQL server/description /property propertynamejavax.jdo.option.ConnectionDriverName/name valuecom.mysql.jdbc.Driver/valuedescriptionMySQL JDBC driver class/description /property propertynamejavax.jdo.option.ConnectionUserName/name valuehiveuser/value /property propertynamejavax.jdo.option.ConnectionPassword/name valuehiveuser/value /property /configuration When I start the server it looks fine: $ ./sbin/start-thriftserver.sh --hiveconf hive.server2.thrift.port=10001 --hiveconf hive.server2.thrift.bind.host radtech.io --master spark://radtech.io:7077 --driver-class-path /usr/local/spark/lib/mysql-connector-java-5.1.34-bin.jarstarting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to /usr/local/spark-1.2.1-bin-hadoop2.4/logs/spark-tnist-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-radtech.io.outradtech:spark tnist$ tail -f logs/spark-tnist-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-radtech.io.out15/02/11 19:15:24 INFO SparkDeploySchedulerBackend: Granted executor ID app-20150211191524-0008/1 on hostPort 192.168.1.2:50851 with 2 cores, 512.0 MB RAM15/02/11 19:15:24 INFO AppClient$ClientActor: Executor updated: app-20150211191524-0008/0 is now LOADING15/02/11 19:15:24 INFO AppClient$ClientActor: Executor updated: app-20150211191524-0008/1 is now LOADING15/02/11 19:15:24 INFO AppClient$ClientActor: Executor updated: app-20150211191524-0008/0 is now RUNNING15/02/11 19:15:24 INFO AppClient$ClientActor: Executor updated: app-20150211191524-0008/1 is now RUNNING15/02/11 19:15:24 INFO NettyBlockTransferService: Server created on 5093815/02/11 19:15:24 INFO BlockManagerMaster: Trying to register BlockManager15/02/11 19:15:24 INFO BlockManagerMasterActor: Registering block manager 192.168.1.2:50938 with 265.1 MB RAM, BlockManagerId(driver, 192.168.1.2, 50938)15/02/11 19:15:24 INFO BlockManagerMaster: Registered BlockManager15/02/11 19:15:25 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.015/02/11 19:15:25 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore15/02/11 19:15:25 INFO ObjectStore: ObjectStore, initialize called15/02/11 19:15:26 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored15/02/11 19:15:26 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored15/02/11 19:15:26 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)15/02/11 19:15:26 WARN Connection: BoneCP
RE: Spark sql failed in yarn-cluster mode when connecting to non-default hive database
HI All, Just want to give everyone an update of what worked for me. Thanks for Cheng's comment and other ppl's help. So what I misunderstood was the --driver-class-path and how that was related to --files. I put both /etc/hive/hive-site.xml in both --files and --driver-class-path when I started in yarn-cluster mode. ./bin/spark-submit --verbose --queue research --driver-java-options -XX:MaxPermSize=8192M --files /etc/hive/hive-site.xml --driver-class-path /etc/hive/hive-site.xml --master yarn --deploy-mode cluster The problem here is that --files only look for the local files to distribute it onto HDFS. The --driver-class-path is what brings to CLASSPATH during runtime, and as you can see, it is trying to look at /etc/hive/hive-site.xml on the container in the remote nodes which apparently doesn't exist. For some ppl, it may work fine is b/c they may deploy Hive configuration and JARs across their entire cluster so every node looks the same. But this wasn't my case in multi-tenant environment or a restricted secured cluster. So my parameter looks like this when I launch it. ./bin/spark-submit --verbose --queue research --driver-java-options -XX:MaxPermSize=8192M --files /etc/hive/hive-site.xml --driver-class-path hive-site.xml --master yarn --deploy-mode cluster So --driver-class-path here will only look at ./hive-site.xml on the remote container which was pre-deployed already by the --files. This worked for me, and I can have HiveContext API to talk to Hive metastore, and vice versa. Thanks. Date: Thu, 5 Feb 2015 16:59:12 -0800 From: lian.cs@gmail.com To: linlin200...@gmail.com; huaiyin@gmail.com CC: user@spark.apache.org Subject: Re: Spark sql failed in yarn-cluster mode when connecting to non-default hive database Hi Jenny, You may try to use --files $SPARK_HOME/conf/hive-site.xml --driver-class-path hive-site.xml when submitting your application. The problem is that when running in cluster mode, the driver is actually running in a random container directory on a random executor node. By using --files, you upload hive-site.xml to the container directory, by using --driver-class-path hive-site.xml, you add the file to classpath (the path is relative to the container directory). When running in cluster mode, have you tried to check the tables inside the default database? If my guess is right, this should be an empty default database inside the default Derby metastore created by HiveContext when the hive-site.xml is missing. Best, Cheng On 8/12/14 5:38 PM, Jenny Zhao wrote: Hi Yin, hive-site.xml was copied to spark/conf and the same as the one under $HIVE_HOME/conf. through hive cli, I don't see any problem. but for spark on yarn-cluster mode, I am not able to switch to a database other than the default one, for Yarn-client mode, it works fine. Thanks! Jenny On Tue, Aug 12, 2014 at 12:53 PM, Yin Huai huaiyin@gmail.com wrote: Hi Jenny, Have you copied hive-site.xml to spark/conf directory? If not, can you put it in conf/ and try again? Thanks, Yin On Mon, Aug 11, 2014 at 8:57 PM, Jenny Zhao linlin200...@gmail.com wrote: Thanks Yin! here is my hive-site.xml, which I copied from $HIVE_HOME/conf, didn't experience problem connecting to the metastore through hive. which uses DB2 as metastore database. ?xml version=1.0? ?xml-stylesheet type=text/xsl
Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?
+1 for TypeSafe config Our practice is to include all spark properties under a 'spark' entry in the config file alongside job-specific configuration: A config file would look like: spark { master = cleaner.ttl = 123456 ... } job { context { src = foo action = barAction } prop1 = val1 } Then, to create our Spark context, we transparently pass the spark section to a SparkConf instance. This idiom will instantiate the context with the spark specific configuration: sparkConfig.setAll(configToStringSeq(config.getConfig(spark).atPath(spark))) And we can make use of the config object everywhere else. We use the override model of the typesafe config: reasonable defaults go in the reference.conf (within the jar). Environment-specific overrides go in the application.conf (alongside the job jar) and hacks are passed with -Dprop=value :-) -kr, Gerard. On Tue, Feb 17, 2015 at 1:45 PM, Emre Sevinc emre.sev...@gmail.com wrote: I've decided to try spark-submit ... --conf spark.driver.extraJavaOptions=-DpropertiesFile=/home/emre/data/myModule.properties But when I try to retrieve the value of propertiesFile via System.err.println(propertiesFile : + System.getProperty(propertiesFile)); I get NULL: propertiesFile : null Interestingly, when I run spark-submit with --verbose, I see that it prints: spark.driver.extraJavaOptions - -DpropertiesFile=/home/emre/data/belga/schemavalidator.properties I couldn't understand why I couldn't get to the value of propertiesFile by using standard System.getProperty method. (I can use new SparkConf().get(spark.driver.extraJavaOptions) and manually parse it, and retrieve the value, but I'd like to know why I cannot retrieve that value using System.getProperty method). Any ideas? If I can achieve what I've described above properly, I plan to pass a properties file that resides on HDFS, so that it will be available to my driver program wherever that program runs. -- Emre On Mon, Feb 16, 2015 at 4:41 PM, Charles Feduke charles.fed...@gmail.com wrote: I haven't actually tried mixing non-Spark settings into the Spark properties. Instead I package my properties into the jar and use the Typesafe Config[1] - v1.2.1 - library (along with Ficus[2] - Scala specific) to get at my properties: Properties file: src/main/resources/integration.conf (below $ENV might be set to either integration or prod[3]) ssh -t root@$HOST /root/spark/bin/spark-shell --jars /root/$JAR_NAME \ --conf 'config.resource=$ENV.conf' \ --conf 'spark.executor.extraJavaOptions=-Dconfig.resource=$ENV.conf' Since the properties file is packaged up with the JAR I don't have to worry about sending the file separately to all of the slave nodes. Typesafe Config is written in Java so it will work if you're not using Scala. (The Typesafe Config also has the advantage of being extremely easy to integrate with code that is using Java Properties today.) If you instead want to send the file separately from the JAR and you use the Typesafe Config library, you can specify config.file instead of .resource; though I'd point you to [3] below if you want to make your development life easier. 1. https://github.com/typesafehub/config 2. https://github.com/ceedubs/ficus 3. http://deploymentzone.com/2015/01/27/spark-ec2-and-easy-spark-shell-deployment/ On Mon Feb 16 2015 at 10:27:01 AM Emre Sevinc emre.sev...@gmail.com wrote: Hello, I'm using Spark 1.2.1 and have a module.properties file, and in it I have non-Spark properties, as well as Spark properties, e.g.: job.output.dir=file:///home/emre/data/mymodule/out I'm trying to pass it to spark-submit via: spark-submit --class com.myModule --master local[4] --deploy-mode client --verbose --properties-file /home/emre/data/mymodule.properties mymodule.jar And I thought I could read the value of my non-Spark property, namely, job.output.dir by using: SparkConf sparkConf = new SparkConf(); final String validatedJSONoutputDir = sparkConf.get(job.output.dir); But it gives me an exception: Exception in thread main java.util.NoSuchElementException: job.output.dir Is it not possible to mix Spark and non-Spark properties in a single .properties file, then pass it via --properties-file and then get the values of those non-Spark properties via SparkConf? Or is there another object / method to retrieve the values for those non-Spark properties? -- Emre Sevinç -- Emre Sevinc
Re: Implementing FIRST_VALUE, LEAD, LAG in Spark
I ended up with the following: def firstValue(items: Iterable[String]) = for { i - items } yield (i, items.head) data.groupByKey().map{case(a, b)=(a, firstValue(b))}.collect More details: http://dmtolpeko.com/2015/02/17/first_value-last_value-lead-and-lag-in-spark/ I would appreciate any feedback. Dmitry On Fri, Feb 13, 2015 at 11:54 AM, Dmitry Tolpeko dmtolp...@gmail.com wrote: Hello, To convert existing Map Reduce jobs to Spark, I need to implement window functions such as FIRST_VALUE, LEAD, LAG and so on. For example, FIRST_VALUE function: Source (1st column is key): A, A1 A, A2 A, A3 B, B1 B, B2 C, C1 and the result should be A, A1, A1 A, A2, A1 A, A3, A1 B, B1, B1 B, B2, B1 C, C1, C1 You can see that the first value in a group is repeated in each row. My current Spark/Scala code: def firstValue(b: Iterable[String]) : List[(String, String)] = { val c = scala.collection.mutable.MutableList[(String, String)]() var f = b.foreach(d = { if(f.isEmpty()) f = d; c += d - f}) c.toList } val data=sc.parallelize(List( (A, A1), (A, A2), (A, A3), (B, B1), (B, B2), (C, C1))) data.groupByKey().map{case(a, b)=(a, firstValue(b))}.collect So I create a new list after groupByKey. Is it right approach to do this in Spark? Are there any other options? Please point me to any drawbacks. Thanks, Dmitry
Large Similarity Job failing
Hi, I am running brute force similarity from RowMatrix on a job with 5M x 1.5M sparse matrix with 800M entries. With 200M entries the job run fine but with 800M I am getting exceptions like too many files open and no space left on device... Seems like I need more nodes or use dimsum sampling ? I am running on 10 nodes where ulimit on each node is set at 65K...Memory is not an issue since I can cache the dataset before similarity computation starts. I tested the same job on YARN with Spark 1.1 and Spark 1.2 stable. Both the jobs failed with FetchFailed msgs. Thanks. Deb
Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?
Hi Emre, there shouldn't be any difference in which files get processed w/ print() vs. foreachRDD(). In fact, if you look at the definition of print(), it is just calling foreachRDD() underneath. So there is something else going on here. We need a little more information to figure out exactly what is going on. (I think Sean was getting at the same thing ...) (a) how do you know that when you use foreachRDD, all 20 files get processed? (b) How do you know that only 16 files get processed when you print()? Do you know the other files are being skipped, or maybe they are just stuck somewhere? eg., suppose you start w/ 20 files, and you see 16 get processed ... what happens after you add a few more files to the directory? Are they processed immediately, or are they never processed either? (c) Can you share any more code of what you are doing to the dstreams *before* the print() / foreachRDD()? That might give us more details about what the difference is. I can't see how .count.println() would be different than just println(), but maybe I am missing something also. Imran On Mon, Feb 16, 2015 at 7:49 AM, Emre Sevinc emre.sev...@gmail.com wrote: Sean, In this case, I've been testing the code on my local machine and using Spark locally, so I all the log output was available on my terminal. And I've used the .print() method to have an output operation, just to force Spark execute. And I was not using foreachRDD, I was only using print() method on a JavaDStream object, and it was working fine for a few files, up to 16 (and without print() it did not do anything because there were no output operations). To sum it up, in my case: - Initially, use .print() and no foreachRDD: processes up to 16 files and does not do anything for the remaining 4. - Remove .print() and use foreachRDD: processes all of the 20 files. Maybe, as in Akhil Das's suggestion, using .count.print() might also have fixed my problem, but I'm satisfied with foreachRDD approach for now. (Though it is still a mystery to me why using .print() had a difference, maybe my mental model of Spark is wrong, I thought no matter what output operation I used, the number of files processed by Spark would be independent of that because the processing is done in a different method, .print() is only used to force Spark execute that processing, am I wrong?). -- Emre On Mon, Feb 16, 2015 at 2:26 PM, Sean Owen so...@cloudera.com wrote: Materialization shouldn't be relevant. The collect by itself doesn't let you detect whether it happened. Print should print some results to the console but on different machines, so may not be a reliable way to see what happened. Yes I understand your real process uses foreachRDD and that's what you should use. It sounds like that works. But you must always have been using that right? What do you mean that you changed to use it? Basically I'm not clear on what the real code does and what about the output of that code tells you only 16 files were processed. On Feb 16, 2015 1:18 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello Sean, I did not understand your question very well, but what I do is checking the output directory (and I have various logger outputs at various stages showing the contents of an input file being processed, the response from the web service, etc.). By the way, I've already solved my problem by using foreachRDD instead of print (see my second message in this thread). Apparently forcing Spark to materialize DAG via print() is not the way to go. (My interpretation might be wrong, but this is what I've just seen in my case). -- Emre On Mon, Feb 16, 2015 at 2:11 PM, Sean Owen so...@cloudera.com wrote: How are you deciding whether files are processed or not? It doesn't seem possible from this code. Maybe it just seems so. On Feb 16, 2015 12:51 PM, Emre Sevinc emre.sev...@gmail.com wrote: I've managed to solve this, but I still don't know exactly why my solution works: In my code I was trying to force the Spark to output via: jsonIn.print(); jsonIn being a JavaDStreamString. When removed the code above, and added the code below to force the output operation, hence the execution: jsonIn.foreachRDD(new FunctionJavaRDDString, Void() { @Override public Void call(JavaRDDString stringJavaRDD) throws Exception { stringJavaRDD.collect(); return null; } }); It works as I expect, processing all of the 20 files I give to it, instead of stopping at 16. -- Emre On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I have an application in Java that uses Spark Streaming 1.2.1 in the following manner: - Listen to the input directory. - If a new file is copied to that input directory process it. - Process: contact a RESTful web service (running also locally and responsive), send the contents of the file, receive the response from the web
MapValues and Shuffle Reads
In the following code, I read in a large sequence file from S3 (1TB) spread across 1024 partitions. When I look at the job/stage summary, I see about 400GB of shuffle writes which seems to make sense as I'm doing a hash partition on this file. // Get the baseline input file JavaPairRDDText,Text hsfBaselinePairRDDReadable = sc.hadoopFile(baselineInputBucketFile, SequenceFileInputFormat.class, Text.class, Text.class); JavaPairRDDString, String hsfBaselinePairRDD = hsfBaselinePairRDDReadable.mapToPair(new ConvertFromWritableTypes()).partitionBy(new HashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_AND_DISK_SER()); I then execute the following code (with a count to force execution) and what I find very strange is that when I look at the job/stage summary, I see more than 340GB of shuffle read. Why would there be any shuffle read in this step? I would expect there to be little (if any) shuffle reads in this step. // Use 'substring' to extract the epoch value from each record. JavaPairRDDString, Long baselinePairRDD = hsfBaselinePairRDD.mapValues(new ExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_AND_DISK_SER()); log.info(Number of baseline records: + baselinePairRDD.count()); Both hsfBaselinePairRDD and baselinePairRDD have 1024 partitions. Any insights would be appreciated. I'm using Spark 1.2.0 in a stand-alone cluster. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: OOM error
Thanks for the pointer it led me to http://spark.apache.org/docs/1.2.0/tuning.html increasing parallelism resolved the issue. On Mon, Feb 16, 2015 at 11:57 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Increase your executor memory, Also you can play around with increasing the number of partitions/parallelism etc. Thanks Best Regards On Tue, Feb 17, 2015 at 3:39 AM, Harshvardhan Chauhan ha...@gumgum.com wrote: Hi All, I need some help with Out Of Memory errors in my application. I am using Spark 1.1.0 and my application is using Java API. I am running my app on EC2 25 m3.xlarge (4 Cores 15GB Memory) instances. The app only fails sometimes. Lots of mapToPair tasks a failing. My app is configured to run 120 executors and executor memory is 2G. These are various errors i see the in my logs. 15/02/16 10:53:48 INFO storage.MemoryStore: Block broadcast_1 of size 4680 dropped from memory (free 257277829) 15/02/16 10:53:49 WARN channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0x6e0138a3, /10.61.192.194:35196 = /10.164.164.228:49445] EXCEPTION: java.lang.OutOfMemoryError: Java heap space) java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) at org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649) at org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530) at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77) at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:194) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:152) at org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/02/16 10:53:49 WARN channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0x2d0c1db1, /10.169.226.254:55790 = /10.164.164.228:49445] EXCEPTION: java.lang.OutOfMemoryError: Java heap space) java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) at org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649) at org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530) at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77) at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:194) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:152) at org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/02/16 10:53:50 WARN channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0xd4211985, /10.181.125.52:60959 = /10.164.164.228:49445] EXCEPTION: java.lang.OutOfMemoryError: Java heap space) java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) at
Re: MapValues and Shuffle Reads
Hi Darin, When you say you see 400GB of shuffle writes from the first code snippet, what do you mean? There is no action in that first set, so it won't do anything. By itself, it won't do any shuffle writing, or anything else for that matter. Most likely, the .count() on your second code snippet is actually causing the execution of some of the first snippet as well. The .partitionBy will result in both shuffle writes and shuffle reads, but they aren't set in motion until the .count further down the line. Its confusing b/c the stage boundaries don't line up exactly with your RDD variables here. hsfBaselinePairRDD spans 2 stages, and baselinePairRDD actually gets merged into the stage above it. If you do a hsfBaselinePairRDD.count after your first code snippet, and then run the second code snippet afterwards, is it more like what you expect? Imran On Tue, Feb 17, 2015 at 1:52 PM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: In the following code, I read in a large sequence file from S3 (1TB) spread across 1024 partitions. When I look at the job/stage summary, I see about 400GB of shuffle writes which seems to make sense as I'm doing a hash partition on this file. // Get the baseline input file JavaPairRDDText,Text hsfBaselinePairRDDReadable = sc.hadoopFile(baselineInputBucketFile, SequenceFileInputFormat.class, Text.class, Text.class); JavaPairRDDString, String hsfBaselinePairRDD = hsfBaselinePairRDDReadable.mapToPair(new ConvertFromWritableTypes()).partitionBy(new HashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_AND_DISK_SER()); I then execute the following code (with a count to force execution) and what I find very strange is that when I look at the job/stage summary, I see more than 340GB of shuffle read. Why would there be any shuffle read in this step? I would expect there to be little (if any) shuffle reads in this step. // Use 'substring' to extract the epoch value from each record. JavaPairRDDString, Long baselinePairRDD = hsfBaselinePairRDD.mapValues(new ExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_AND_DISK_SER()); log.info(Number of baseline records: + baselinePairRDD.count()); Both hsfBaselinePairRDD and baselinePairRDD have 1024 partitions. Any insights would be appreciated. I'm using Spark 1.2.0 in a stand-alone cluster. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Percentile example
The best approach I've found to calculate Percentiles in Spark is to leverage SparkSQL. If you use the Hive Query Language support, you can use the UDAFs for percentiles (as of Spark 1.2) Something like this (Note: syntax not guaranteed to run but should give you the gist of what you need to do): JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaHiveContext hsc = new JavaHiveContext(sc); //Get your Data into a SchemaRDD and register the Table // Query it String hql = SELECT FIELD1, FIELD2, percentile(FIELD3, 0.05) AS ptile5 from TABLE-NAME GROUP BY FIELD1, FIELD2; JavaSchemaRDD result = hsc.hql(hql); ListRow grp = result.collect(); for (int z = 2; z row.length(); z++) { // Do something with the results } Curt From: SiMaYunRui myl...@hotmail.commailto:myl...@hotmail.com Date: Sunday, February 15, 2015 at 10:37 AM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Percentile example hello, I am a newbie to spark and trying to figure out how to get percentile against a big data set. Actually, I googled this topic but not find any very useful code example and explanation. Seems that I can use transformer SortBykey to get my data set in order, but not pretty sure how can I get value of , for example, percentile 66. Should I use take() to pick up the value of percentile 66? I don't believe any machine can load my data set in memory. I believe there must be more efficient approaches. Can anyone shed some light on this problem? Regards
ERROR actor.OneForOneStrategy: org.apache.hadoop.conf.Configuration
Hi, I am running streaning word count program in Spark Standalone mode cluster, having four machines in cluster. public final class JavaKafkaStreamingWordCount { private static final Pattern SPACE = Pattern.compile( ); static transient Configuration conf; private JavaKafkaStreamingWordCount() { } public static void main(String[] args) { if (args.length 4) { System.err.println(Usage: JavaKafkaWordCount zkQuorum group topics numThreads); System.exit(1); } StreamingExamples.setStreamingLogLevels(); SparkConf sparkConf = new SparkConf().setAppName(JavaKafkaWordCount); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1)); jssc.checkpoint(hdfs://172.17.199.229:8020/spark/wordcountKafkaCheckpoint); int numThreads = Integer.parseInt(args[3]); MapString, Integer topicMap = new HashMapString, Integer(); String[] topics = args[2].split(//,); for (String topic: topics) { topicMap.put(topic, numThreads); } JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap); JavaDStreamString lines = messages.map(new FunctionTuple2String, String, String() { @Override public String call(Tuple2String, String tuple2) { return tuple2._2(); } }); JavaDStreamString words = lines.flatMap(new FlatMapFunctionString, String() { @Override public IterableString call(String x) { return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStreamString, Integer pairs = words.mapToPair( new PairFunctionString, String, Integer() { @Override public Tuple2String, Integer call(String s) { return new Tuple2String, Integer(s, 1); } }); Function2ListInteger, OptionalInteger, OptionalInteger updateFunction = new Function2ListInteger, OptionalInteger, OptionalInteger() { @Override public OptionalInteger call(ListInteger values, OptionalInteger state) { Integer newSum = 0; if(state.isPresent()){ if(values.size()!=0){ newSum = state.get(); for(int temp : values){ newSum += temp; } }else{ newSum = state.get(); } } else{ if(values.size()!=0){ for(int temp : values){ newSum += 1; } } } return Optional.of(newSum); } }; JavaPairDStreamString, Integer runningCounts = pairs.updateStateByKey(updateFunction); conf = new Configuration(); runningCounts.saveAsNewAPIHadoopFiles(hdfs://172.17.199.229:8020/spark/wordCountOutput/word, stream, Text.class, Text.class, (Class? extends org.apache.hadoop.mapreduce.OutputFormat?, ?)TextOutputFormat.class,conf); //jssc.sparkContext().hadoopConfiguration(); jssc.start(); jssc.awaitTermination(); } } This is working fine in one node cluster but its giving following error when i try to run the same in cluster. 15/02/17 12:57:10 ERROR actor.OneForOneStrategy: org.apache.hadoop.conf.Configuration java.io.NotSerializableException: org.apache.hadoop.conf.Configuration at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1180) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at
Re: Identify the performance bottleneck from hardware prospective
The raw data is ~30 GB. It consists of 250 millions sentences. The total length of the documents (i.e. the sum of the length of all sentences) is 11 billions. I also ran a simple algorithm to roughly count the maximum number of word pairs by summing up d * (d - 1) over all sentences, where d is the length of the sentence. It is about 63 billions. Thanks, Julaiti On Tue, Feb 17, 2015 at 2:44 AM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: Hi How big is your dataset? Thanks Arush On Tue, Feb 17, 2015 at 4:06 PM, Julaiti Alafate jalaf...@eng.ucsd.edu wrote: Thank you very much for your reply! My task is to count the number of word pairs in a document. If w1 and w2 occur together in one sentence, the number of occurrence of word pair (w1, w2) adds 1. So the computational part of this algorithm is simply a two-level for-loop. Since the cluster is monitored by Ganglia, I can easily see that neither CPU or network IO is under pressure. The only parameter left is memory. In the executor tab of Spark Web UI, I can see a column named memory used. It showed that only 6GB of 20GB memory is used. I understand this is measuring the size of RDD that persist in memory. So can I at least assume the data/object I used in my program is not exceeding memory limit? My confusion here is, why can't my program run faster while there is still efficient memory, CPU time and network bandwidth it can utilize? Best regards, Julaiti On Tue, Feb 17, 2015 at 12:53 AM, Akhil Das ak...@sigmoidanalytics.com wrote: What application are you running? Here's a few things: - You will hit bottleneck on CPU if you are doing some complex computation (like parsing a json etc.) - You will hit bottleneck on Memory if your data/objects used in the program is large (like defining playing with HashMaps etc inside your map* operations), Here you can set spark.executor.memory to a higher number and also you can change the spark.storage.memoryFraction whose default value is 0.6 of your executor memory. - Network will be a bottleneck if data is not available locally on one of the worker and hence it has to collect it from others, which is a lot of Serialization and data transfer across your cluster. Thanks Best Regards On Tue, Feb 17, 2015 at 11:20 AM, Julaiti Alafate jalaf...@eng.ucsd.edu wrote: Hi there, I am trying to scale up the data size that my application is handling. This application is running on a cluster with 16 slave nodes. Each slave node has 60GB memory. It is running in standalone mode. The data is coming from HDFS that also in same local network. In order to have an understanding on how my program is running, I also had a Ganglia installed on the cluster. From previous run, I know the stage that taking longest time to run is counting word pairs (my RDD consists of sentences from a corpus). My goal is to identify the bottleneck of my application, then modify my program or hardware configurations according to that. Unfortunately, I didn't find too much information on Spark monitoring and optimization topics. Reynold Xin gave a great talk on Spark Summit 2014 for application tuning from tasks perspective. Basically, his focus is on tasks that oddly slower than the average. However, it didn't solve my problem because there is no such tasks that run way slow than others in my case. So I tried to identify the bottleneck from hardware prospective. I want to know what the limitation of the cluster is. I think if the executers are running hard, either CPU, memory or network bandwidth (or maybe the combinations) is hitting the roof. But Ganglia reports the CPU utilization of cluster is no more than 50%, network utilization is high for several seconds at the beginning, then drop close to 0. From Spark UI, I can see the nodes with maximum memory usage is consuming around 6GB, while spark.executor.memory is set to be 20GB. I am very confused that the program is not running fast enough, while hardware resources are not in shortage. Could you please give me some hints about what decides the performance of a Spark application from hardware perspective? Thanks! Julaiti -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Hive, Spark, Cassandra, Tableau, BI, etc.
Hi,I've seen a few articles where they CqlStorageHandler to create hive tables referencing Cassandra data using the thriftserver. Is there a secret to getting this to work? I've basically got Spark built with Hive, and a Cassandra cluster. Is there a way to get the hive server to talk to Cassandra? I've seen Calliope from Tuplejump, and it looks good, but was wondering if there's a more direct approach. The version updates of various things mean that if I could go direct even with a bit of hassle, I'd prefer that. I've also managed to connect the Simba ODBC driver for Spark. Is there are way to get ODBC clients to use that to get to Cassandra data? Would I need to build a Spark application and pull in Cassandra data (or define RDDs) for ODBC clients to make use of it? I can't really use the Simba ODBC connector for Cassandra as it's missing a few things currently (like understanding UDTs + collection columns). What I'm really after is somehow letting Tableau + Excel users access to Cassandra data. I'd love to be able to use DataStax Enterprise, but as of now, the client is firmly in Apache Cassandra + Apache Spark territory. Thanks,Ashic.
Re: MapValues and Shuffle Reads
Thanks Imran. I think you are probably correct. I was a bit surprised that there was no shuffle read in the initial hash partition step. I will adjust the code as you suggest to prove that is the case. I have a slightly different question. If I save an RDD to S3 (or some equivalent) and this RDD was hash partitioned at the time, do I still need to hash partition the RDD again when I read it in? Is there a way that I could prevent all of the shuffling (such as providing a hint)? My parts for the RDD will be gzipped so they would not be splittable). In reality, that's what I would really want to do in the first place. Thanks again for your insights. Darin. From: Imran Rashid iras...@cloudera.com To: Darin McBeath ddmcbe...@yahoo.com Cc: User user@spark.apache.org Sent: Tuesday, February 17, 2015 3:29 PM Subject: Re: MapValues and Shuffle Reads Hi Darin, When you say you see 400GB of shuffle writes from the first code snippet, what do you mean? There is no action in that first set, so it won't do anything. By itself, it won't do any shuffle writing, or anything else for that matter. Most likely, the .count() on your second code snippet is actually causing the execution of some of the first snippet as well. The .partitionBy will result in both shuffle writes and shuffle reads, but they aren't set in motion until the .count further down the line. Its confusing b/c the stage boundaries don't line up exactly with your RDD variables here. hsfBaselinePairRDD spans 2 stages, and baselinePairRDD actually gets merged into the stage above it. If you do a hsfBaselinePairRDD.count after your first code snippet, and then run the second code snippet afterwards, is it more like what you expect? Imran On Tue, Feb 17, 2015 at 1:52 PM, Darin McBeath ddmcbe...@yahoo.com.invalid wrote: In the following code, I read in a large sequence file from S3 (1TB) spread across 1024 partitions. When I look at the job/stage summary, I see about 400GB of shuffle writes which seems to make sense as I'm doing a hash partition on this file. // Get the baseline input file JavaPairRDDText,Text hsfBaselinePairRDDReadable = sc.hadoopFile(baselineInputBucketFile, SequenceFileInputFormat.class, Text.class, Text.class); JavaPairRDDString, String hsfBaselinePairRDD = hsfBaselinePairRDDReadable.mapToPair(new ConvertFromWritableTypes()).partitionBy(new HashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_AND_DISK_SER()); I then execute the following code (with a count to force execution) and what I find very strange is that when I look at the job/stage summary, I see more than 340GB of shuffle read. Why would there be any shuffle read in this step? I would expect there to be little (if any) shuffle reads in this step. // Use 'substring' to extract the epoch value from each record. JavaPairRDDString, Long baselinePairRDD = hsfBaselinePairRDD.mapValues(new ExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_AND_DISK_SER()); log.info(Number of baseline records: + baselinePairRDD.count()); Both hsfBaselinePairRDD and baselinePairRDD have 1024 partitions. Any insights would be appreciated. I'm using Spark 1.2.0 in a stand-alone cluster. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Stepsize with Linear Regression
The best step size depends on the condition number of the problem. You can try some conditioning heuristics first, e.g., normalizing the columns, and then try a common step size like 0.01. We should implement line search for linear regression in the future, as in LogisticRegressionWithLBFGS. Line search can determine the step size automatically for you. -Xiangrui On Tue, Feb 10, 2015 at 8:56 AM, Rishi Yadav ri...@infoobjects.com wrote: Are there any thumbrules how to set stepsize with gradient descent. I am using it for Linear Regression but I am sure it applies in general to gradient descent. I am at present deriving a number which fits closest to training data set response variable values. I am sure there is a better way to do it. Thanks and Regards, Rishi @meditativesoul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: high GC in the Kmeans algorithm
Did you cache the data? Was it fully cached? The k-means implementation doesn't create many temporary objects. I guess you need more RAM to avoid GC triggered frequently. Please monitor the memory usage using YourKit or VisualVM. -Xiangrui On Wed, Feb 11, 2015 at 1:35 AM, lihu lihu...@gmail.com wrote: I just want to make the best use of CPU, and test the performance of spark if there is a lot of task in a single node. On Wed, Feb 11, 2015 at 5:29 PM, Sean Owen so...@cloudera.com wrote: Good, worth double-checking that's what you got. That's barely 1GB per task though. Why run 48 if you have 24 cores? On Wed, Feb 11, 2015 at 9:03 AM, lihu lihu...@gmail.com wrote: I give 50GB to the executor, so it seem that there is no reason the memory is not enough. On Wed, Feb 11, 2015 at 4:50 PM, Sean Owen so...@cloudera.com wrote: Meaning, you have 128GB per machine but how much memory are you giving the executors? On Wed, Feb 11, 2015 at 8:49 AM, lihu lihu...@gmail.com wrote: What do you mean? Yes,I an see there is some data put in the memory from the web ui. On Wed, Feb 11, 2015 at 4:25 PM, Sean Owen so...@cloudera.com wrote: Are you actually using that memory for executors? On Wed, Feb 11, 2015 at 8:17 AM, lihu lihu...@gmail.com wrote: Hi, I run the kmeans(MLlib) in a cluster with 12 workers. Every work own a 128G RAM, 24Core. I run 48 task in one machine. the total data is just 40GB. When the dimension of the data set is about 10^7, for every task the duration is about 30s, but the cost for GC is about 20s. When I reduce the dimension to 10^4, then the gc is small. So why gc is so high when the dimension is larger? or this is the reason caused by MLlib? -- Best Wishes! Li Hu(李浒) | Graduate Student Institute for Interdisciplinary Information Sciences(IIIS) Tsinghua University, China Email: lihu...@gmail.com Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/ -- Best Wishes! Li Hu(李浒) | Graduate Student Institute for Interdisciplinary Information Sciences(IIIS) Tsinghua University, China Email: lihu...@gmail.com Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/ -- Best Wishes! Li Hu(李浒) | Graduate Student Institute for Interdisciplinary Information Sciences(IIIS) Tsinghua University, China Email: lihu...@gmail.com Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: feeding DataFrames into predictive algorithms
Hey Sandy, The work should be done by a VectorAssembler, which combines multiple columns (double/int/vector) into a vector column, which becomes the features column for regression. We can going to create JIRAs for each of these standard feature transformers. It would be great if you can help implement some of them. Best, Xiangrui On Wed, Feb 11, 2015 at 7:55 PM, Patrick Wendell pwend...@gmail.com wrote: I think there is a minor error here in that the first example needs a tail after the seq: df.map { row = (row.getDouble(0), row.toSeq.tail.map(_.asInstanceOf[Double])) }.toDataFrame(label, features) On Wed, Feb 11, 2015 at 7:46 PM, Michael Armbrust mich...@databricks.com wrote: It sounds like you probably want to do a standard Spark map, that results in a tuple with the structure you are looking for. You can then just assign names to turn it back into a dataframe. Assuming the first column is your label and the rest are features you can do something like this: val df = sc.parallelize( (1.0, 2.3, 2.4) :: (1.2, 3.4, 1.2) :: (1.2, 2.3, 1.2) :: Nil).toDataFrame(a, b, c) df.map { row = (row.getDouble(0), row.toSeq.map(_.asInstanceOf[Double])) }.toDataFrame(label, features) df: org.apache.spark.sql.DataFrame = [label: double, features: arraydouble] If you'd prefer to stick closer to SQL you can define a UDF: val createArray = udf((a: Double, b: Double) = Seq(a, b)) df.select('a as 'label, createArray('b,'c) as 'features) df: org.apache.spark.sql.DataFrame = [label: double, features: arraydouble] We'll add createArray as a first class member of the DSL. Michael On Wed, Feb 11, 2015 at 6:37 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hey All, I've been playing around with the new DataFrame and ML pipelines APIs and am having trouble accomplishing what seems like should be a fairly basic task. I have a DataFrame where each column is a Double. I'd like to turn this into a DataFrame with a features column and a label column that I can feed into a regression. So far all the paths I've gone down have led me to internal APIs or convoluted casting in and out of RDD[Row] and DataFrame. Is there a simple way of accomplishing this? any assistance (lookin' at you Xiangrui) much appreciated, Sandy - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLib usage on Spark Streaming
JavaDStream.foreachRDD (https://spark.apache.org/docs/1.2.1/api/java/org/apache/spark/streaming/api/java/JavaDStreamLike.html#foreachRDD(org.apache.spark.api.java.function.Function)) and Statistics.corr (https://spark.apache.org/docs/1.2.1/api/java/org/apache/spark/mllib/stat/Statistics.html#corr(org.apache.spark.rdd.RDD)) should be good starting points. -Xiangrui On Mon, Feb 16, 2015 at 6:39 AM, Spico Florin spicoflo...@gmail.com wrote: Hello! I'm newbie to Spark and I have the following case study: 1. Client sending at 100ms the following data: {uniqueId, timestamp, measure1, measure2 } 2. Each 30 seconds I would like to correlate the data collected in the window, with some predefined double vector pattern for each given key. The predefined pattern has 300 records. The data should be also sorted by timestamp. 3. When the correlation is greater than a predefined threshold (e.g 0.9) I would like to emit an new message containing {uniqueId, doubleCorrelationValue} 4. For the correlation I would like to use MLlib 5. As a programming language I would like to muse Java 7. Can you please give me some suggestions on how to create the skeleton for the above scenario? Thanks. Regards, Florin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Large Similarity Job failing
The complexity of DIMSUM is independent of the number of rows but still have quadratic dependency on the number of columns. 1.5M columns may be too large to use DIMSUM. Try to increase the threshold and see whether it helps. -Xiangrui On Tue, Feb 17, 2015 at 6:28 AM, Debasish Das debasish.da...@gmail.com wrote: Hi, I am running brute force similarity from RowMatrix on a job with 5M x 1.5M sparse matrix with 800M entries. With 200M entries the job run fine but with 800M I am getting exceptions like too many files open and no space left on device... Seems like I need more nodes or use dimsum sampling ? I am running on 10 nodes where ulimit on each node is set at 65K...Memory is not an issue since I can cache the dataset before similarity computation starts. I tested the same job on YARN with Spark 1.1 and Spark 1.2 stable. Both the jobs failed with FetchFailed msgs. Thanks. Deb - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [POWERED BY] Radius Intelligence
Thanks! I added Radius to https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark. -Xiangrui On Tue, Feb 10, 2015 at 12:02 AM, Alexis Roos alexis.r...@gmail.com wrote: Also long due given our usage of Spark .. Radius Intelligence: URL: radius.com Description: Spark, MLLib Using Scala, Spark and MLLib for Radius Marketing and Sales intelligence platform including data aggregation, data processing, data clustering, data analysis and predictive modeling of all US businesses. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unknown sample in Naive Baye's
If there exists a sample that doesn't not belong to A/B/C, it means that there exists another class D or Unknown besides A/B/C. You should have some of these samples in the training set in order to let naive Bayes learn the priors. -Xiangrui On Tue, Feb 10, 2015 at 10:44 PM, jatinpreet jatinpr...@gmail.com wrote: Hi, I am using MLlib's Naive Baye's classifier to classify textual data. I am accessing the posterior probabilities through a hack for each class. Once I have trained the model, I want to remove documents whose confidence of classification is low. Say for a document, if the highest class probability is lesser than a pre-defined threshold(separate for each class), categorize this document as 'unknown'. Say there are three classes A, B and C with thresholds 0.35, 0.32 and 0.33 respectively defined after training and testing. If I score a sample that belongs to neither of the three categories, I wish to classify it as 'unknown'. But the issue is I can get a probability higher than these thresholds for a document that doesn't belong to the trained categories. Is there any technique which I can apply to segregate documents that belong to untrained classes with certain degree of confidence? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unknown-sample-in-Naive-Baye-s-tp21594.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Naive Bayes model fails after a few predictions
Could you share the error log? What do you mean by 500 instead of 200? If this is the number of files, try to use `repartition` before calling naive Bayes, which works the best when the number of partitions matches the number of cores, or even less. -Xiangrui On Tue, Feb 10, 2015 at 10:34 PM, rkgurram rkgur...@gmail.com wrote: Further I have tried HttpBroadcast but that too does not work. It is almost like there is a MemoryLeak because if I increase the input files to 500 instead of 200 the system crashes early. The code is as follows logger.info(Training the model Fold:[+ fold +]) logger.info(Step 1: Split the input into Training and Testing sets) val splits = labeledPointRDD.randomSplit(Array(0.6, 0.4), seed = 11L) logger.info(Step 1: splits successful...) val training = splits(0) val test = splits(1) status = ModelStatus.IN_TRAINING //logger.info(Fold:[ + fold + ] Training count: + training.count() + Testing/Verification count: + test.count()) logger.info(Step 2: Train the NB classifier) model = NaiveBayes.train(training, lambda = 1.0) logger.info(Step 2: NB model training complete Fold:[ + fold + ]) logger.info(Step 3: Testing/Verification of the model) status = ModelStatus.IN_VERIFICATION val predictionAndLabel = test.map(p = (model.predict(p.features), p.label)) val arry = predictionAndLabel.filter(x = x._1 == x._2) val accuracy = 1.0 * predictionAndLabel.filter(x = x._1 == x._2).count() / test.count() logger.info(Step 3: Testing complete) status = ModelStatus.INITIALIZED logger.info(Fold[+ fold +] Accuracy:[ + accuracy + ] Model Status:[ + status + ]) -Ravi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Naive-Bayes-model-fails-after-a-few-predictions-tp21592p21593.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How do you get the partitioner for an RDD in Java?
In an 'early release' of the Learning Spark book, there is the following reference: In Scala and Java, you can determine how an RDD is partitioned using its partitioner property (or partitioner() method in Java) However, I don't see the mentioned 'partitioner()' method in Spark 1.2 or a way of getting this information. I'm curious if anyone has any suggestions for how I might go about finding how an RDD is partitioned in a Java program. Thanks. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: WARN from Similarity Calculation
It may be caused by GC pause. Did you check the GC time in the Spark UI? -Xiangrui On Sun, Feb 15, 2015 at 8:10 PM, Debasish Das debasish.da...@gmail.com wrote: Hi, I am sometimes getting WARN from running Similarity calculation: 15/02/15 23:07:55 WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(7, abc.com, 48419, 0) with no recent heart beats: 66435ms exceeds 45000ms Do I need to increase the default 45 s to larger values for cases where we are doing blocked operation or long compute in the mapPartitions ? Thanks. Deb - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Processing graphs
Hi Kannan, I am not sure I have understood what your question is exactly, but maybe the reduceByKey or reduceByKeyLocally functionality is better to your need. Best, Yifan LI On 17 Feb 2015, at 17:37, Vijayasarathy Kannan kvi...@vt.edu wrote: Hi, I am working on a Spark application that processes graphs and I am trying to do the following. - group the vertices (key - vertex, value - set of its outgoing edges) - distribute each key to separate processes and process them (like mapper) - reduce the results back at the main process Does the groupBy functionality do the distribution by default? Do we have to explicitly use RDDs to enable automatic distribution? It'd be great if you could help me understand these and how to go about with the problem. Thanks.
RE: Use of nscala-time within spark-shell
I can use nscala-time with scala, but my issue is that I can't use it witinh spark-shell console! It gives my the error below. Thanks From: kevin...@apache.org Date: Tue, 17 Feb 2015 08:50:04 + Subject: Re: Use of nscala-time within spark-shell To: hscha...@hotmail.com; kevin...@apache.org; user@spark.apache.org Great, or you can just use nscala-time with scala 2.10! On Tue Feb 17 2015 at 5:41:53 PM Hammam CHAMSI hscha...@hotmail.com wrote: Thanks Kevin for your reply, I downloaded the pre_built version and as you said the default spark scala version is 2.10. I'm now building spark 1.2.1 with scala 2.11. I'll share the results here. Regards, From: kevin...@apache.org Date: Tue, 17 Feb 2015 01:10:09 + Subject: Re: Use of nscala-time within spark-shell To: hscha...@hotmail.com; user@spark.apache.org What is your scala version used to build Spark? It seems your nscala-time library scala version is 2.11, and default Spark scala version is 2.10. On Tue Feb 17 2015 at 1:51:47 AM Hammam CHAMSI hscha...@hotmail.com wrote: Hi All, Thanks in advance for your help. I have timestamp which I need to convert to datetime using scala. A folder contains the three needed jar files: joda-convert-1.5.jar joda-time-2.4.jar nscala-time_2.11-1.8.0.jar Using scala REPL and adding the jars: scala -classpath *.jar I can use nscala-time like following: scala import com.github.nscala_time.time.Imports._ import com.github.nscala_time.time.Imports._ scala import org.joda._ import org.joda._ scala DateTime.now res0: org.joda.time.DateTime = 2015-02-12T15:51:46.928+01:00 But when i try to use spark-shell: ADD_JARS=/home/scala_test_class/nscala-time_2.11-1.8.0.jar,/home/scala_test_class/joda-time-2.4.jar,/home/scala_test_class/joda-convert-1.5.jar /usr/local/spark/bin/spark-shell --master local --driver-memory 2g --executor-memory 2g --executor-cores 1 It successfully imports the jars: scala import com.github.nscala_time.time.Imports._ import com.github.nscala_time.time.Imports._ scala import org.joda._ import org.joda._ but fails using them scala DateTime.now java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less; at com.github.nscala_time.time.LowPriorityOrderingImplicits$class.ReadableInstantOrdering(Implicits.scala:69) at com.github.nscala_time.time.Imports$.ReadableInstantOrdering(Imports.scala:20) at com.github.nscala_time.time.OrderingImplicits$class.$init$(Implicits.scala:61) at com.github.nscala_time.time.Imports$.init(Imports.scala:20) at com.github.nscala_time.time.Imports$.clinit(Imports.scala) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:17) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:22) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:26) at $iwC$$iwC$$iwC$$iwC.init(console:28) at $iwC$$iwC$$iwC.init(console:30) at $iwC$$iwC.init(console:32) at $iwC.init(console:34) at init(console:36) at .init(console:40) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916) at
RE: Use of nscala-time within spark-shell
Thanks Kevin for your reply, I downloaded the pre_built version and as you said the default spark scala version is 2.10. I'm now building spark 1.2.1 with scala 2.11. I'll share the results here. Regards, From: kevin...@apache.org Date: Tue, 17 Feb 2015 01:10:09 + Subject: Re: Use of nscala-time within spark-shell To: hscha...@hotmail.com; user@spark.apache.org What is your scala version used to build Spark? It seems your nscala-time library scala version is 2.11, and default Spark scala version is 2.10. On Tue Feb 17 2015 at 1:51:47 AM Hammam CHAMSI hscha...@hotmail.com wrote: Hi All, Thanks in advance for your help. I have timestamp which I need to convert to datetime using scala. A folder contains the three needed jar files: joda-convert-1.5.jar joda-time-2.4.jar nscala-time_2.11-1.8.0.jar Using scala REPL and adding the jars: scala -classpath *.jar I can use nscala-time like following: scala import com.github.nscala_time.time.Imports._ import com.github.nscala_time.time.Imports._ scala import org.joda._ import org.joda._ scala DateTime.now res0: org.joda.time.DateTime = 2015-02-12T15:51:46.928+01:00 But when i try to use spark-shell: ADD_JARS=/home/scala_test_class/nscala-time_2.11-1.8.0.jar,/home/scala_test_class/joda-time-2.4.jar,/home/scala_test_class/joda-convert-1.5.jar /usr/local/spark/bin/spark-shell --master local --driver-memory 2g --executor-memory 2g --executor-cores 1 It successfully imports the jars: scala import com.github.nscala_time.time.Imports._ import com.github.nscala_time.time.Imports._ scala import org.joda._ import org.joda._ but fails using them scala DateTime.now java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less; at com.github.nscala_time.time.LowPriorityOrderingImplicits$class.ReadableInstantOrdering(Implicits.scala:69) at com.github.nscala_time.time.Imports$.ReadableInstantOrdering(Imports.scala:20) at com.github.nscala_time.time.OrderingImplicits$class.$init$(Implicits.scala:61) at com.github.nscala_time.time.Imports$.init(Imports.scala:20) at com.github.nscala_time.time.Imports$.clinit(Imports.scala) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:17) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:22) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:26) at $iwC$$iwC$$iwC$$iwC.init(console:28) at $iwC$$iwC$$iwC.init(console:30) at $iwC$$iwC.init(console:32) at $iwC.init(console:34) at init(console:36) at .init(console:40) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at
Re: Use of nscala-time within spark-shell
Great, or you can just use nscala-time with scala 2.10! On Tue Feb 17 2015 at 5:41:53 PM Hammam CHAMSI hscha...@hotmail.com wrote: Thanks Kevin for your reply, I downloaded the pre_built version and as you said the default spark scala version is 2.10. I'm now building spark 1.2.1 with scala 2.11. I'll share the results here. Regards, -- From: kevin...@apache.org Date: Tue, 17 Feb 2015 01:10:09 + Subject: Re: Use of nscala-time within spark-shell To: hscha...@hotmail.com; user@spark.apache.org What is your scala version used to build Spark? It seems your nscala-time library scala version is 2.11, and default Spark scala version is 2.10. On Tue Feb 17 2015 at 1:51:47 AM Hammam CHAMSI hscha...@hotmail.com wrote: Hi All, Thanks in advance for your help. I have timestamp which I need to convert to datetime using scala. A folder contains the three needed jar files: joda-convert-1.5.jar joda-time-2.4.jar nscala-time_2.11-1.8.0.jar Using scala REPL and adding the jars: scala -classpath *.jar I can use nscala-time like following: scala import com.github.nscala_time.time.Imports._ import com.github.nscala_time.time.Imports._ scala import org.joda._ import org.joda._ scala DateTime.now res0: org.joda.time.DateTime = 2015-02-12T15:51:46.928+01:00 But when i try to use spark-shell: ADD_JARS=/home/scala_test_class/nscala-time_2.11-1.8.0.jar,/home/scala_test_class/joda-time-2.4.jar,/home/scala_test_class/joda-convert-1.5.jar /usr/local/spark/bin/spark-shell --master local --driver-memory 2g --executor-memory 2g --executor-cores 1 It successfully imports the jars: scala import com.github.nscala_time.time.Imports._ import com.github.nscala_time.time.Imports._ scala import org.joda._ import org.joda._ but fails using them scala DateTime.now java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less; at com.github.nscala_time.time.LowPriorityOrderingImplicits$class.ReadableInstantOrdering(Implicits.scala:69) at com.github.nscala_time.time.Imports$.ReadableInstantOrdering(Imports.scala:20) at com.github.nscala_time.time.OrderingImplicits$class.$init$(Implicits.scala:61) at com.github.nscala_time.time.Imports$.init(Imports.scala:20) at com.github.nscala_time.time.Imports$.clinit(Imports.scala) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:17) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:22) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:26) at $iwC$$iwC$$iwC$$iwC.init(console:28) at $iwC$$iwC$$iwC.init(console:30) at $iwC$$iwC.init(console:32) at $iwC.init(console:34) at init(console:36) at .init(console:40) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
Re: Identify the performance bottleneck from hardware prospective
What application are you running? Here's a few things: - You will hit bottleneck on CPU if you are doing some complex computation (like parsing a json etc.) - You will hit bottleneck on Memory if your data/objects used in the program is large (like defining playing with HashMaps etc inside your map* operations), Here you can set spark.executor.memory to a higher number and also you can change the spark.storage.memoryFraction whose default value is 0.6 of your executor memory. - Network will be a bottleneck if data is not available locally on one of the worker and hence it has to collect it from others, which is a lot of Serialization and data transfer across your cluster. Thanks Best Regards On Tue, Feb 17, 2015 at 11:20 AM, Julaiti Alafate jalaf...@eng.ucsd.edu wrote: Hi there, I am trying to scale up the data size that my application is handling. This application is running on a cluster with 16 slave nodes. Each slave node has 60GB memory. It is running in standalone mode. The data is coming from HDFS that also in same local network. In order to have an understanding on how my program is running, I also had a Ganglia installed on the cluster. From previous run, I know the stage that taking longest time to run is counting word pairs (my RDD consists of sentences from a corpus). My goal is to identify the bottleneck of my application, then modify my program or hardware configurations according to that. Unfortunately, I didn't find too much information on Spark monitoring and optimization topics. Reynold Xin gave a great talk on Spark Summit 2014 for application tuning from tasks perspective. Basically, his focus is on tasks that oddly slower than the average. However, it didn't solve my problem because there is no such tasks that run way slow than others in my case. So I tried to identify the bottleneck from hardware prospective. I want to know what the limitation of the cluster is. I think if the executers are running hard, either CPU, memory or network bandwidth (or maybe the combinations) is hitting the roof. But Ganglia reports the CPU utilization of cluster is no more than 50%, network utilization is high for several seconds at the beginning, then drop close to 0. From Spark UI, I can see the nodes with maximum memory usage is consuming around 6GB, while spark.executor.memory is set to be 20GB. I am very confused that the program is not running fast enough, while hardware resources are not in shortage. Could you please give me some hints about what decides the performance of a Spark application from hardware perspective? Thanks! Julaiti
Re: Use of nscala-time within spark-shell
Then, why don't you use nscala-time_2.10-1.8.0.jar, not nscala-time_2.11-1.8.0.jar ? On Tue Feb 17 2015 at 5:55:50 PM Hammam CHAMSI hscha...@hotmail.com wrote: I can use nscala-time with scala, but my issue is that I can't use it witinh spark-shell console! It gives my the error below. Thanks -- From: kevin...@apache.org Date: Tue, 17 Feb 2015 08:50:04 + Subject: Re: Use of nscala-time within spark-shell To: hscha...@hotmail.com; kevin...@apache.org; user@spark.apache.org Great, or you can just use nscala-time with scala 2.10! On Tue Feb 17 2015 at 5:41:53 PM Hammam CHAMSI hscha...@hotmail.com wrote: Thanks Kevin for your reply, I downloaded the pre_built version and as you said the default spark scala version is 2.10. I'm now building spark 1.2.1 with scala 2.11. I'll share the results here. Regards, -- From: kevin...@apache.org Date: Tue, 17 Feb 2015 01:10:09 + Subject: Re: Use of nscala-time within spark-shell To: hscha...@hotmail.com; user@spark.apache.org What is your scala version used to build Spark? It seems your nscala-time library scala version is 2.11, and default Spark scala version is 2.10. On Tue Feb 17 2015 at 1:51:47 AM Hammam CHAMSI hscha...@hotmail.com wrote: Hi All, Thanks in advance for your help. I have timestamp which I need to convert to datetime using scala. A folder contains the three needed jar files: joda-convert-1.5.jar joda-time-2.4.jar nscala-time_2.11-1.8.0.jar Using scala REPL and adding the jars: scala -classpath *.jar I can use nscala-time like following: scala import com.github.nscala_time.time.Imports._ import com.github.nscala_time.time.Imports._ scala import org.joda._ import org.joda._ scala DateTime.now res0: org.joda.time.DateTime = 2015-02-12T15:51:46.928+01:00 But when i try to use spark-shell: ADD_JARS=/home/scala_test_class/nscala-time_2.11-1.8.0.jar,/home/scala_test_class/joda-time-2.4.jar,/home/scala_test_class/joda-convert-1.5.jar /usr/local/spark/bin/spark-shell --master local --driver-memory 2g --executor-memory 2g --executor-cores 1 It successfully imports the jars: scala import com.github.nscala_time.time.Imports._ import com.github.nscala_time.time.Imports._ scala import org.joda._ import org.joda._ but fails using them scala DateTime.now java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less; at com.github.nscala_time.time.LowPriorityOrderingImplicits$class.ReadableInstantOrdering(Implicits.scala:69) at com.github.nscala_time.time.Imports$.ReadableInstantOrdering(Imports.scala:20) at com.github.nscala_time.time.OrderingImplicits$class.$init$(Implicits.scala:61) at com.github.nscala_time.time.Imports$.init(Imports.scala:20) at com.github.nscala_time.time.Imports$.clinit(Imports.scala) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:17) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:22) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:26) at $iwC$$iwC$$iwC$$iwC.init(console:28) at $iwC$$iwC$$iwC.init(console:30) at $iwC$$iwC.init(console:32) at $iwC.init(console:34) at init(console:36) at .init(console:40) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at
RE: How do you get the partitioner for an RDD in Java?
Where did you look? BTW, it is defined in the RDD class as a val: val partitioner: Option[Partitioner] Mohammed -Original Message- From: Darin McBeath [mailto:ddmcbe...@yahoo.com.INVALID] Sent: Tuesday, February 17, 2015 1:45 PM To: User Subject: How do you get the partitioner for an RDD in Java? In an 'early release' of the Learning Spark book, there is the following reference: In Scala and Java, you can determine how an RDD is partitioned using its partitioner property (or partitioner() method in Java) However, I don't see the mentioned 'partitioner()' method in Spark 1.2 or a way of getting this information. I'm curious if anyone has any suggestions for how I might go about finding how an RDD is partitioned in a Java program. Thanks. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Use of nscala-time within spark-shell
My fault, I didn't notice the 11 in the jar name. It is working now with nscala-time_2.10-1.8.0.jar Thanks Kevin From: kevin...@apache.org Date: Tue, 17 Feb 2015 08:58:13 + Subject: Re: Use of nscala-time within spark-shell To: hscha...@hotmail.com; kevin...@apache.org; user@spark.apache.org Then, why don't you use nscala-time_2.10-1.8.0.jar, not nscala-time_2.11-1.8.0.jar ? On Tue Feb 17 2015 at 5:55:50 PM Hammam CHAMSI hscha...@hotmail.com wrote: I can use nscala-time with scala, but my issue is that I can't use it witinh spark-shell console! It gives my the error below. Thanks From: kevin...@apache.org Date: Tue, 17 Feb 2015 08:50:04 + Subject: Re: Use of nscala-time within spark-shell To: hscha...@hotmail.com; kevin...@apache.org; user@spark.apache.org Great, or you can just use nscala-time with scala 2.10! On Tue Feb 17 2015 at 5:41:53 PM Hammam CHAMSI hscha...@hotmail.com wrote: Thanks Kevin for your reply, I downloaded the pre_built version and as you said the default spark scala version is 2.10. I'm now building spark 1.2.1 with scala 2.11. I'll share the results here. Regards, From: kevin...@apache.org Date: Tue, 17 Feb 2015 01:10:09 + Subject: Re: Use of nscala-time within spark-shell To: hscha...@hotmail.com; user@spark.apache.org What is your scala version used to build Spark? It seems your nscala-time library scala version is 2.11, and default Spark scala version is 2.10. On Tue Feb 17 2015 at 1:51:47 AM Hammam CHAMSI hscha...@hotmail.com wrote: Hi All, Thanks in advance for your help. I have timestamp which I need to convert to datetime using scala. A folder contains the three needed jar files: joda-convert-1.5.jar joda-time-2.4.jar nscala-time_2.11-1.8.0.jar Using scala REPL and adding the jars: scala -classpath *.jar I can use nscala-time like following: scala import com.github.nscala_time.time.Imports._ import com.github.nscala_time.time.Imports._ scala import org.joda._ import org.joda._ scala DateTime.now res0: org.joda.time.DateTime = 2015-02-12T15:51:46.928+01:00 But when i try to use spark-shell: ADD_JARS=/home/scala_test_class/nscala-time_2.11-1.8.0.jar,/home/scala_test_class/joda-time-2.4.jar,/home/scala_test_class/joda-convert-1.5.jar /usr/local/spark/bin/spark-shell --master local --driver-memory 2g --executor-memory 2g --executor-cores 1 It successfully imports the jars: scala import com.github.nscala_time.time.Imports._ import com.github.nscala_time.time.Imports._ scala import org.joda._ import org.joda._ but fails using them scala DateTime.now java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less; at com.github.nscala_time.time.LowPriorityOrderingImplicits$class.ReadableInstantOrdering(Implicits.scala:69) at com.github.nscala_time.time.Imports$.ReadableInstantOrdering(Imports.scala:20) at com.github.nscala_time.time.OrderingImplicits$class.$init$(Implicits.scala:61) at com.github.nscala_time.time.Imports$.init(Imports.scala:20) at com.github.nscala_time.time.Imports$.clinit(Imports.scala) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:17) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:22) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:26) at $iwC$$iwC$$iwC$$iwC.init(console:28) at $iwC$$iwC$$iwC.init(console:30) at $iwC$$iwC.init(console:32) at $iwC.init(console:34) at init(console:36) at .init(console:40) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641) at
Re: Configration Problem? (need help to get Spark job executed)
Hi It could be due to the connectivity issue between the master and the slaves. I have seen this issue occur for the following reasons.Are the slaves visible in the Spark UI?And how much memory is allocated to the executors. 1. Syncing of configuration between Spark Master and Slaves. 2. Network connectivity issues between the master and slave. Thanks Arush On Sat, Feb 14, 2015 at 3:07 PM, NORD SC jan.algermis...@nordsc.com wrote: Hi all, I am new to spark and seem to have hit a common newbie obstacle. I have a pretty simple setup and job but I am unable to get past this error when executing a job: TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory” I have so far gained a basic understanding of worker/executor/driver memory, but have run out of ideas what to try next - maybe someone has a clue. My setup: Three node standalone cluster with C* and spark on each node and the Datastax C*/Spark connector JAR placed on each node. On the master I have the slaves configured in conf/slaves and I am using sbin/start-all.sh to start the whole cluster. On each node I have this in conf/spark-defauls.conf spark.masterspark://devpeng-db-cassandra-1:7077 spark.eventLog.enabled true spark.serializer org.apache.spark.serializer.KryoSerializer spark.executor.extraClassPath /opt/spark-cassandra-connector-assembly-1.2.0-alpha1.jar and this in conf/spart-env.sh SPARK_WORKER_MEMORY=6g My App looks like this object TestApp extends App { val conf = new SparkConf(true).set(spark.cassandra.connection.host, devpeng-db-cassandra-1.) val sc = new SparkContext(spark://devpeng-db-cassandra-1:7077, testApp, conf) val rdd = sc.cassandraTable(test, kv) println(“Count: “ + String.valueOf(rdd.count) ) println(rdd.first) } Any kind of idea what to check next would help me at this point, I think. Jan Log of the application start: [info] Loading project definition from /Users/jan/projects/gkh/jump/workspace/gkh-spark-example/project [info] Set current project to csconnect (in build file:/Users/jan/projects/gkh/jump/workspace/gkh-spark-example/) [info] Compiling 1 Scala source to /Users/jan/projects/gkh/jump/workspace/gkh-spark-example/target/scala-2.10/classes... [info] Running jump.TestApp Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/02/14 10:30:11 INFO SecurityManager: Changing view acls to: jan 15/02/14 10:30:11 INFO SecurityManager: Changing modify acls to: jan 15/02/14 10:30:11 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(jan); users with modify permissions: Set(jan) 15/02/14 10:30:11 INFO Slf4jLogger: Slf4jLogger started 15/02/14 10:30:11 INFO Remoting: Starting remoting 15/02/14 10:30:12 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@xx:58197] 15/02/14 10:30:12 INFO Utils: Successfully started service 'sparkDriver' on port 58197. 15/02/14 10:30:12 INFO SparkEnv: Registering MapOutputTracker 15/02/14 10:30:12 INFO SparkEnv: Registering BlockManagerMaster 15/02/14 10:30:12 INFO DiskBlockManager: Created local directory at /var/folders/vr/w3whx92d0356g5nj1p6s59grgn/T/spark-local-20150214103012-5b53 15/02/14 10:30:12 INFO MemoryStore: MemoryStore started with capacity 530.3 MB 2015-02-14 10:30:12.304 java[24999:3b07] Unable to load realm info from SCDynamicStore 15/02/14 10:30:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/02/14 10:30:12 INFO HttpFileServer: HTTP File server directory is /var/folders/vr/w3whx92d0356g5nj1p6s59grgn/T/spark-48459a22-c1ff-42d5-8b8e-cc89fe84933d 15/02/14 10:30:12 INFO HttpServer: Starting HTTP Server 15/02/14 10:30:12 INFO Utils: Successfully started service 'HTTP file server' on port 58198. 15/02/14 10:30:12 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/02/14 10:30:12 INFO SparkUI: Started SparkUI at http://xx:4040 15/02/14 10:30:12 INFO AppClient$ClientActor: Connecting to master spark://devpeng-db-cassandra-1:7077... 15/02/14 10:30:13 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150214103013-0001 15/02/14 10:30:13 INFO AppClient$ClientActor: Executor added: app-20150214103013-0001/0 on worker-20150214102534-devpeng-db-cassandra-2.devpeng (devpeng-db-cassandra-2.devpeng.x:57563) with 8 cores 15/02/14 10:30:13 INFO SparkDeploySchedulerBackend: Granted executor ID app-20150214103013-0001/0 on hostPort devpeng-db-cassandra-2.devpeng.:57563 with 8 cores, 512.0 MB RAM 15/02/14 10:30:13 INFO AppClient$ClientActor: Executor added: app-20150214103013-0001/1 on worker-20150214102534-devpeng-db-cassandra-3.devpeng.-38773
Re: Identify the performance bottleneck from hardware prospective
Thank you very much for your reply! My task is to count the number of word pairs in a document. If w1 and w2 occur together in one sentence, the number of occurrence of word pair (w1, w2) adds 1. So the computational part of this algorithm is simply a two-level for-loop. Since the cluster is monitored by Ganglia, I can easily see that neither CPU or network IO is under pressure. The only parameter left is memory. In the executor tab of Spark Web UI, I can see a column named memory used. It showed that only 6GB of 20GB memory is used. I understand this is measuring the size of RDD that persist in memory. So can I at least assume the data/object I used in my program is not exceeding memory limit? My confusion here is, why can't my program run faster while there is still efficient memory, CPU time and network bandwidth it can utilize? Best regards, Julaiti On Tue, Feb 17, 2015 at 12:53 AM, Akhil Das ak...@sigmoidanalytics.com wrote: What application are you running? Here's a few things: - You will hit bottleneck on CPU if you are doing some complex computation (like parsing a json etc.) - You will hit bottleneck on Memory if your data/objects used in the program is large (like defining playing with HashMaps etc inside your map* operations), Here you can set spark.executor.memory to a higher number and also you can change the spark.storage.memoryFraction whose default value is 0.6 of your executor memory. - Network will be a bottleneck if data is not available locally on one of the worker and hence it has to collect it from others, which is a lot of Serialization and data transfer across your cluster. Thanks Best Regards On Tue, Feb 17, 2015 at 11:20 AM, Julaiti Alafate jalaf...@eng.ucsd.edu wrote: Hi there, I am trying to scale up the data size that my application is handling. This application is running on a cluster with 16 slave nodes. Each slave node has 60GB memory. It is running in standalone mode. The data is coming from HDFS that also in same local network. In order to have an understanding on how my program is running, I also had a Ganglia installed on the cluster. From previous run, I know the stage that taking longest time to run is counting word pairs (my RDD consists of sentences from a corpus). My goal is to identify the bottleneck of my application, then modify my program or hardware configurations according to that. Unfortunately, I didn't find too much information on Spark monitoring and optimization topics. Reynold Xin gave a great talk on Spark Summit 2014 for application tuning from tasks perspective. Basically, his focus is on tasks that oddly slower than the average. However, it didn't solve my problem because there is no such tasks that run way slow than others in my case. So I tried to identify the bottleneck from hardware prospective. I want to know what the limitation of the cluster is. I think if the executers are running hard, either CPU, memory or network bandwidth (or maybe the combinations) is hitting the roof. But Ganglia reports the CPU utilization of cluster is no more than 50%, network utilization is high for several seconds at the beginning, then drop close to 0. From Spark UI, I can see the nodes with maximum memory usage is consuming around 6GB, while spark.executor.memory is set to be 20GB. I am very confused that the program is not running fast enough, while hardware resources are not in shortage. Could you please give me some hints about what decides the performance of a Spark application from hardware perspective? Thanks! Julaiti
Re: ERROR actor.OneForOneStrategy: org.apache.hadoop.conf.Configuration
Tip: to debug where the unserializable reference comes from, run with -Dsun.io.serialization.extendeddebuginfo=true On Tue, Feb 17, 2015 at 10:20 AM, Jadhav Shweta jadhav.shw...@tcs.com wrote: 15/02/17 12:57:10 ERROR actor.OneForOneStrategy: org.apache.hadoop.conf.Configuration java.io.NotSerializableException: org.apache.hadoop.conf.Configuration - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Identify the performance bottleneck from hardware prospective
Hi How big is your dataset? Thanks Arush On Tue, Feb 17, 2015 at 4:06 PM, Julaiti Alafate jalaf...@eng.ucsd.edu wrote: Thank you very much for your reply! My task is to count the number of word pairs in a document. If w1 and w2 occur together in one sentence, the number of occurrence of word pair (w1, w2) adds 1. So the computational part of this algorithm is simply a two-level for-loop. Since the cluster is monitored by Ganglia, I can easily see that neither CPU or network IO is under pressure. The only parameter left is memory. In the executor tab of Spark Web UI, I can see a column named memory used. It showed that only 6GB of 20GB memory is used. I understand this is measuring the size of RDD that persist in memory. So can I at least assume the data/object I used in my program is not exceeding memory limit? My confusion here is, why can't my program run faster while there is still efficient memory, CPU time and network bandwidth it can utilize? Best regards, Julaiti On Tue, Feb 17, 2015 at 12:53 AM, Akhil Das ak...@sigmoidanalytics.com wrote: What application are you running? Here's a few things: - You will hit bottleneck on CPU if you are doing some complex computation (like parsing a json etc.) - You will hit bottleneck on Memory if your data/objects used in the program is large (like defining playing with HashMaps etc inside your map* operations), Here you can set spark.executor.memory to a higher number and also you can change the spark.storage.memoryFraction whose default value is 0.6 of your executor memory. - Network will be a bottleneck if data is not available locally on one of the worker and hence it has to collect it from others, which is a lot of Serialization and data transfer across your cluster. Thanks Best Regards On Tue, Feb 17, 2015 at 11:20 AM, Julaiti Alafate jalaf...@eng.ucsd.edu wrote: Hi there, I am trying to scale up the data size that my application is handling. This application is running on a cluster with 16 slave nodes. Each slave node has 60GB memory. It is running in standalone mode. The data is coming from HDFS that also in same local network. In order to have an understanding on how my program is running, I also had a Ganglia installed on the cluster. From previous run, I know the stage that taking longest time to run is counting word pairs (my RDD consists of sentences from a corpus). My goal is to identify the bottleneck of my application, then modify my program or hardware configurations according to that. Unfortunately, I didn't find too much information on Spark monitoring and optimization topics. Reynold Xin gave a great talk on Spark Summit 2014 for application tuning from tasks perspective. Basically, his focus is on tasks that oddly slower than the average. However, it didn't solve my problem because there is no such tasks that run way slow than others in my case. So I tried to identify the bottleneck from hardware prospective. I want to know what the limitation of the cluster is. I think if the executers are running hard, either CPU, memory or network bandwidth (or maybe the combinations) is hitting the roof. But Ganglia reports the CPU utilization of cluster is no more than 50%, network utilization is high for several seconds at the beginning, then drop close to 0. From Spark UI, I can see the nodes with maximum memory usage is consuming around 6GB, while spark.executor.memory is set to be 20GB. I am very confused that the program is not running fast enough, while hardware resources are not in shortage. Could you please give me some hints about what decides the performance of a Spark application from hardware perspective? Thanks! Julaiti -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Cleanup Questions
Two questions regarding worker cleanup: 1) Is the best place to enable worker cleanup setting export SPARK_WORKER_OPTS=-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.interval=30 in conf/spark-env.sh for each worker? Or is there a better place? 2) I see this has a default TTL of 7 days. I've got some spark streaming jobs that run for more than 7 days. Would the spark cleanup honour currently running applications and not discard the data, or will it obliterate everything that's older than 7 days? If the latter, what's a good approach to clean up considering my streaming app will be running for ttl period? Thanks,Ashic.
Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?
I've decided to try spark-submit ... --conf spark.driver.extraJavaOptions=-DpropertiesFile=/home/emre/data/myModule.properties But when I try to retrieve the value of propertiesFile via System.err.println(propertiesFile : + System.getProperty(propertiesFile)); I get NULL: propertiesFile : null Interestingly, when I run spark-submit with --verbose, I see that it prints: spark.driver.extraJavaOptions - -DpropertiesFile=/home/emre/data/belga/schemavalidator.properties I couldn't understand why I couldn't get to the value of propertiesFile by using standard System.getProperty method. (I can use new SparkConf().get(spark.driver.extraJavaOptions) and manually parse it, and retrieve the value, but I'd like to know why I cannot retrieve that value using System.getProperty method). Any ideas? If I can achieve what I've described above properly, I plan to pass a properties file that resides on HDFS, so that it will be available to my driver program wherever that program runs. -- Emre On Mon, Feb 16, 2015 at 4:41 PM, Charles Feduke charles.fed...@gmail.com wrote: I haven't actually tried mixing non-Spark settings into the Spark properties. Instead I package my properties into the jar and use the Typesafe Config[1] - v1.2.1 - library (along with Ficus[2] - Scala specific) to get at my properties: Properties file: src/main/resources/integration.conf (below $ENV might be set to either integration or prod[3]) ssh -t root@$HOST /root/spark/bin/spark-shell --jars /root/$JAR_NAME \ --conf 'config.resource=$ENV.conf' \ --conf 'spark.executor.extraJavaOptions=-Dconfig.resource=$ENV.conf' Since the properties file is packaged up with the JAR I don't have to worry about sending the file separately to all of the slave nodes. Typesafe Config is written in Java so it will work if you're not using Scala. (The Typesafe Config also has the advantage of being extremely easy to integrate with code that is using Java Properties today.) If you instead want to send the file separately from the JAR and you use the Typesafe Config library, you can specify config.file instead of .resource; though I'd point you to [3] below if you want to make your development life easier. 1. https://github.com/typesafehub/config 2. https://github.com/ceedubs/ficus 3. http://deploymentzone.com/2015/01/27/spark-ec2-and-easy-spark-shell-deployment/ On Mon Feb 16 2015 at 10:27:01 AM Emre Sevinc emre.sev...@gmail.com wrote: Hello, I'm using Spark 1.2.1 and have a module.properties file, and in it I have non-Spark properties, as well as Spark properties, e.g.: job.output.dir=file:///home/emre/data/mymodule/out I'm trying to pass it to spark-submit via: spark-submit --class com.myModule --master local[4] --deploy-mode client --verbose --properties-file /home/emre/data/mymodule.properties mymodule.jar And I thought I could read the value of my non-Spark property, namely, job.output.dir by using: SparkConf sparkConf = new SparkConf(); final String validatedJSONoutputDir = sparkConf.get(job.output.dir); But it gives me an exception: Exception in thread main java.util.NoSuchElementException: job.output.dir Is it not possible to mix Spark and non-Spark properties in a single .properties file, then pass it via --properties-file and then get the values of those non-Spark properties via SparkConf? Or is there another object / method to retrieve the values for those non-Spark properties? -- Emre Sevinç -- Emre Sevinc