Re: Parquet files are only 6-20MB in size?
Before doing saveAsParquetFile, you can call the repartition and provide a decent number which will result in the total number of output files generated. Thanks Best Regards On Mon, Nov 3, 2014 at 1:12 PM, ag007 agre...@mac.com wrote: Hi there, I have a pySpark job that is simply taking a tab separated CSV outputting it to a Parquet file. The code is based on the SQL write parquet example. (Using a different inferred schema, only 35 columns). The input files range from 100MB to 12 Gb. I have tried different different block sizes from 10MB through to 1 Gb, I have tried different parallelism. The total part files total about 1:5 compression. I am trying to get large parquet files. Having this many small files will cause problems to my name node. I have over 500,000 of these files. Your assistance would be greatly appreciated. cheers, Ag PS Another solution may be if there is a parquet concat tool around. I couldn't see one. I understand that this tool would have to adjust the footer. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-files-are-only-6-20MB-in-size-tp17935.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
To find distances to reachable source vertices using GraphX
Hi All, I'm trying to understand below link example program. When I run this program, I'm getting *java.lang.NullPointerException* at below highlighted line. *https://gist.github.com/ankurdave/4a17596669b36be06100 https://gist.github.com/ankurdave/4a17596669b36be06100* val updatedDists = edge.srcAttr.filter { case (source, dist) = *val existingDist = edge.dstAttr.getOrElse(source, Int.MaxValue) * existingDist dist + 1 }.mapValues(_ + 1).map(identity) Could you please help me to resolve this issue. Regards, Rajesh
Re: SQL COUNT DISTINCT
Hi Michael, Thanks for response. I did test with query that you send me. And it works really faster: Old queries stats by phases: 3.2min 17s Your query stats by phases: 0.3 s 16 s 20 s But will this improvement also affect when you want to count distinct on 2 or more fields: SELECT COUNT(f1), COUNT(DISTINCT f2), COUNT(DISTINCT f3), COUNT(DISTINCT f4) FROM parquetFile Should i still create Jira issue/improvement for this? @Nick That also make sense. But should i just get count of my data to driver node? I just started to learn about Spark(and it is great) so sorry if i ask stupid questions or anything like that. Best regards Bojan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQL-COUNT-DISTINCT-tp17818p17939.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
GraphX : Vertices details in Triangles
Hi All, I'm new to GraphX. I'm understanding Triangle Count use cases. I'm able to get number of triangles in a graph but I'm not able to collect vertices details in each Triangle. *For example* : I'm playing one of the graphx graph example Vertices and Edges val vertexArray = Array( (1L, (Alice, 28)), (2L, (Bob, 27)), (3L, (Charlie, 65)), (4L, (David, 42)), (5L, (Ed, 55)), (6L, (Fran, 50)) ) val edgeArray = Array( Edge(2L, 1L, 7), Edge(2L, 4L, 2), Edge(3L, 2L, 4), Edge(3L, 6L, 3), Edge(4L, 1L, 1), Edge(5L, 2L, 2), Edge(5L, 3L, 8), Edge(5L, 6L, 3) ) [image: Toy Social Network] I'm able to get 3 Triangles count in the above graph. I want to know Vertices details in a each Triangle. *For example like* : Triangle 1 : 1, 2, 4 Triangle 2 : 2, 5, 3 Triangle 3 : 5, 3, 6 Could you please help on this. Regards, Rajesh
Re: Parquet files are only 6-20MB in size?
Thanks Akhil, Am I right in saying that the repartition will spread the data randomly so I loose chronological order? I really just want the csv -- parquet format in the same order it came in. If I set repartition with 1 will this not be random? cheers, Ag -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-files-are-only-6-20MB-in-size-tp17935p17941.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: about aggregateByKey and standard deviation
I don't think directy .aggregateByKey() can be done, because we will need count of keys (for average). Maybe we can use .countByKey() which returns a map and .foldByKey(0)(_+_) (or aggregateByKey()) which gives sum of values per key. I myself ain't getting how to proceed. Regards On Fri, Oct 31, 2014 at 1:26 PM, qinwei wei@dewmobile.net wrote: Hi, everyone I have an RDD filled with data like (k1, v11) (k1, v12) (k1, v13) (k2, v21) (k2, v22) (k2, v23) ... I want to calculate the average and standard deviation of (v11, v12, v13) and (v21, v22, v23) group by there keys for the moment, i have done that by using groupByKey and map, I notice that groupByKey is very expensive, but i can not figure out how to do it by using aggregateByKey, so i wonder is there any better way to do this? Thanks! -- qinwei
How number of partitions effect the performance?
Hi, I just wonder how number of partitions effect the performance in Spark! Is it just the parallelism (more partitions, more parallel sub-tasks) that improves the performance? or there exist other considerations? In my case,I run couple of map/reduce jobs on same dataset two times with two different partition numbers, 7 and 9. I used a stand alone cluster, with two workers on each, where the master resides with the same machine as one of the workers. Surprisingly, the performance of map/reduce jobs in case of 9 partitions is almost 4X-5X better than that of 7 partitions !?? Does it mean that choosing right number of partitions is the key factor in the Spark performance ? best, /Shahab
Re: OOM with groupBy + saveAsTextFile
The result was no different with saveAsHadoopFile. In both cases, I can see that I've misinterpreted the API docs. I'll explore the API's a bit further for ways to save the iterable as chunks rather than one large text/binary. It might also help to clarify this aspect in the API docs. For those (like me) whose first practical experience with data processing is through spark, having skipped the Hadoop MR ecosystem, it might help to clarify interactions with HDFS and the likes. Thanks for all the help. On Sun, Nov 2, 2014 at 10:22 PM, Sean Owen so...@cloudera.com wrote: saveAsText means save every element of the RDD as one line of text. It works like TextOutputFormat in Hadoop MapReduce since that's what it uses. So you are causing it to create one big string out of each Iterable this way. On Sun, Nov 2, 2014 at 4:48 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Thanks for responding. This is what I initially suspected, and hence asked why the library needed to construct the entire value buffer on a single host before writing it out. The stacktrace appeared to suggest that user code is not constructing the large buffer. I'm simply calling groupBy and saveAsText on the resulting grouped rdd. The value after grouping is an IterableTuple4String, Double, String, String. None of the strings are large. I also do not need a single large string created out of the Iterable for writing to disk. Instead, I expect the iterable to get written out in chunks in response to saveAsText. This shouldn't be the default behaviour of saveAsText perhaps? Hence my original question of the behavior of saveAsText. The tuning / partitioning attempts were aimed at reducing memory pressure so that multiple such buffers aren't constructed at the same time on a host. I'll take a second look at the data and code before updating this thread. Thanks.
Re: OOM with groupBy + saveAsTextFile
I also realized from your description of saveAsText that the API is indeed behaving as expected i.e. it is appropriate (though not optimal) for the API to construct a single string out of the value. If the value turns out to be large, the user of the API needs to reconsider the implementation approach. My bad. On Mon, Nov 3, 2014 at 3:38 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: The result was no different with saveAsHadoopFile. In both cases, I can see that I've misinterpreted the API docs. I'll explore the API's a bit further for ways to save the iterable as chunks rather than one large text/binary. It might also help to clarify this aspect in the API docs. For those (like me) whose first practical experience with data processing is through spark, having skipped the Hadoop MR ecosystem, it might help to clarify interactions with HDFS and the likes. Thanks for all the help. On Sun, Nov 2, 2014 at 10:22 PM, Sean Owen so...@cloudera.com wrote: saveAsText means save every element of the RDD as one line of text. It works like TextOutputFormat in Hadoop MapReduce since that's what it uses. So you are causing it to create one big string out of each Iterable this way. On Sun, Nov 2, 2014 at 4:48 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Thanks for responding. This is what I initially suspected, and hence asked why the library needed to construct the entire value buffer on a single host before writing it out. The stacktrace appeared to suggest that user code is not constructing the large buffer. I'm simply calling groupBy and saveAsText on the resulting grouped rdd. The value after grouping is an IterableTuple4String, Double, String, String. None of the strings are large. I also do not need a single large string created out of the Iterable for writing to disk. Instead, I expect the iterable to get written out in chunks in response to saveAsText. This shouldn't be the default behaviour of saveAsText perhaps? Hence my original question of the behavior of saveAsText. The tuning / partitioning attempts were aimed at reducing memory pressure so that multiple such buffers aren't constructed at the same time on a host. I'll take a second look at the data and code before updating this thread. Thanks.
Re: OOM with groupBy + saveAsTextFile
Yes, that's the same thing really. You're still writing a huge value as part of one single (key,value) record. The value exists in memory in order to be written to storage. Although there aren't hard limits, in general, keys and values aren't intended to be huge, like, hundreds of megabytes. You should probably design this differently, to not try to collect a massive value per key. That is a generally good idea, not just for this reason. Certainly, you don't have to be able to fit many (key,value) in memory at once. One, yes. On Mon, Nov 3, 2014 at 10:08 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: The result was no different with saveAsHadoopFile. In both cases, I can see that I've misinterpreted the API docs. I'll explore the API's a bit further for ways to save the iterable as chunks rather than one large text/binary. It might also help to clarify this aspect in the API docs. For those (like me) whose first practical experience with data processing is through spark, having skipped the Hadoop MR ecosystem, it might help to clarify interactions with HDFS and the likes. Thanks for all the help. On Sun, Nov 2, 2014 at 10:22 PM, Sean Owen so...@cloudera.com wrote: saveAsText means save every element of the RDD as one line of text. It works like TextOutputFormat in Hadoop MapReduce since that's what it uses. So you are causing it to create one big string out of each Iterable this way. On Sun, Nov 2, 2014 at 4:48 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Thanks for responding. This is what I initially suspected, and hence asked why the library needed to construct the entire value buffer on a single host before writing it out. The stacktrace appeared to suggest that user code is not constructing the large buffer. I'm simply calling groupBy and saveAsText on the resulting grouped rdd. The value after grouping is an IterableTuple4String, Double, String, String. None of the strings are large. I also do not need a single large string created out of the Iterable for writing to disk. Instead, I expect the iterable to get written out in chunks in response to saveAsText. This shouldn't be the default behaviour of saveAsText perhaps? Hence my original question of the behavior of saveAsText. The tuning / partitioning attempts were aimed at reducing memory pressure so that multiple such buffers aren't constructed at the same time on a host. I'll take a second look at the data and code before updating this thread. Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL performance
I did some simple experiments with Impala and Spark, and Impala came out ahead. But it’s also less flexible, couldn’t handle irregular schemas, didn't support Json, and so on. On 01.11.2014, at 02:20, Soumya Simanta soumya.sima...@gmail.com wrote: I agree. My personal experience with Spark core is that it performs really well once you tune it properly. As far I understand SparkSQL under the hood performs many of these optimizations (order of Spark operations) and uses a more efficient storage format. Is this assumption correct? Has anyone done any comparison of SparkSQL with Impala ? The fact that many of the queries don't even finish in the benchmark is quite surprising and hard to believe. A few months ago there were a few emails about Spark not being able to handle large volumes (TBs) of data. That myth was busted recently when the folks at Databricks published their sorting record results. Thanks -Soumya On Fri, Oct 31, 2014 at 7:35 PM, Du Li l...@yahoo-inc.com wrote: We have seen all kinds of results published that often contradict each other. My take is that the authors often know more tricks about how to tune their own/familiar products than the others. So the product on focus is tuned for ideal performance while the competitors are not. The authors are not necessarily biased but as a consequence the results are. Ideally it’s critical for the user community to be informed of all the in-depth tuning tricks of all products. However, realistically, there is a big gap in terms of documentation. Hope the Spark folks will make a difference. :-) Du From: Soumya Simanta soumya.sima...@gmail.com Date: Friday, October 31, 2014 at 4:04 PM To: user@spark.apache.org user@spark.apache.org Subject: SparkSQL performance I was really surprised to see the results here, esp. SparkSQL not completing http://www.citusdata.com/blog/86-making-postgresql-scale-hadoop-style I was under the impression that SparkSQL performs really well because it can optimize the RDD operations and load only the columns that are required. This essentially means in most cases SparkSQL should be as fast as Spark is. I would be very interested to hear what others in the group have to say about this. Thanks -Soumya
Hive Context and Mapr
Hi We are currently using Mapr Distribution. To read the files from the file system we specify as follows : test = sc.textFile(mapr/mycluster/user/mapr/test.csv) This works fine from Spark Context. But ... Currently we are trying to create a table in hive using the hiveContext from Spark. So when I specify something like from pyspark.sql import HiveContext hc = HiveContext(sc) hc.sql(Create table deleteme_1 + createString ) hc.sql(LOAD DATA LOCAL INPATH ' mapr/BI2-104/user/mapr/test.csv' INTO TABLE deleteme_1) We get the following error FAILED: RuntimeException java.io.IOException: No FileSystem for scheme: maprfs An error occurred while calling o79.sql. : org.apache.spark.sql.execution.QueryExecutionException: FAILED: RuntimeException java.io.IOException: No FileSystem for scheme: maprfs at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:302) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:272) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:38) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:360) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:360) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:103) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:98) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) Best Regards Santosh
Schema RDD and saveAsTable in hive
Hi, I have a schemaRDD created like below : schemaTransactions = sqlContext.applySchema(transactions,schema); When I try to save the schemaRDD as a table using : schemaTransactions.saveAsTable(transactions) I get the error below Py4JJavaError: An error occurred while calling o70.saveAsTable. : java.lang.AssertionError: assertion failed: No plan for InsertIntoCreatedTable None, transactions SparkLogicalPlan (ExistingRdd [GUID#21,INSTANCE#22,TRANSID#23,CONTEXT_ID#24,DIALOG_STEP#25,REPORT#26,ACCOUNT#27,MANDT#28,ACTION#29,TASKTYPE#30,TCODE#31,F12#32,F13#33,STARTDATE#34,STARTTIME#35,F16#36,RESPTIME#37,F18#38,F19#39,F20#40,F21#41], MapPartitionsRDD[11] at mapPartitions at SQLContext.scala:522) Also I have copied my hive-site.xml to the spark conf folder and started the thrift server .So where does the saveAsTable store the table in ..hive ?? Best Regards Santosh
Re: How number of partitions effect the performance?
Yes partitions matter. Usually you can use the default, which will make a partition per input split, and that's usually good, to let one task process one block of data, which will all be on one machine. Reasons I could imagine why 9 partitions is faster than 7: Probably: Your cluster can execute at least 9 tasks concurrently. It will finish faster since each partition is smaller when split into 9 partitions. This just means you weren't using your cluster's full parallelism at 7. 9 partitions lets tasks execute entirely locally to the data, whereas 7 is too few compared to how the data blocks are distributed on HDFS. That is, maybe 7 is inducing a shuffle whereas 9 is not for some reason in your code. Your executors are running near their memory limit and are thrashing in GC. With less data to process each, you may avoid thrashing and so go a lot faster. (Or there's some other factor that messed up your measurements :)) There can be instances where more partitions is slower too. On Mon, Nov 3, 2014 at 9:57 AM, shahab shahab.mok...@gmail.com wrote: Hi, I just wonder how number of partitions effect the performance in Spark! Is it just the parallelism (more partitions, more parallel sub-tasks) that improves the performance? or there exist other considerations? In my case,I run couple of map/reduce jobs on same dataset two times with two different partition numbers, 7 and 9. I used a stand alone cluster, with two workers on each, where the master resides with the same machine as one of the workers. Surprisingly, the performance of map/reduce jobs in case of 9 partitions is almost 4X-5X better than that of 7 partitions !?? Does it mean that choosing right number of partitions is the key factor in the Spark performance ? best, /Shahab - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How number of partitions effect the performance?
Thanks Sean for very useful comments. I understand now better what could be the reasons that my evaluations are messed up. best, /Shahab On Mon, Nov 3, 2014 at 12:08 PM, Sean Owen so...@cloudera.com wrote: Yes partitions matter. Usually you can use the default, which will make a partition per input split, and that's usually good, to let one task process one block of data, which will all be on one machine. Reasons I could imagine why 9 partitions is faster than 7: Probably: Your cluster can execute at least 9 tasks concurrently. It will finish faster since each partition is smaller when split into 9 partitions. This just means you weren't using your cluster's full parallelism at 7. 9 partitions lets tasks execute entirely locally to the data, whereas 7 is too few compared to how the data blocks are distributed on HDFS. That is, maybe 7 is inducing a shuffle whereas 9 is not for some reason in your code. Your executors are running near their memory limit and are thrashing in GC. With less data to process each, you may avoid thrashing and so go a lot faster. (Or there's some other factor that messed up your measurements :)) There can be instances where more partitions is slower too. On Mon, Nov 3, 2014 at 9:57 AM, shahab shahab.mok...@gmail.com wrote: Hi, I just wonder how number of partitions effect the performance in Spark! Is it just the parallelism (more partitions, more parallel sub-tasks) that improves the performance? or there exist other considerations? In my case,I run couple of map/reduce jobs on same dataset two times with two different partition numbers, 7 and 9. I used a stand alone cluster, with two workers on each, where the master resides with the same machine as one of the workers. Surprisingly, the performance of map/reduce jobs in case of 9 partitions is almost 4X-5X better than that of 7 partitions !?? Does it mean that choosing right number of partitions is the key factor in the Spark performance ? best, /Shahab
unsubscribe
hi - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: unsubscribe
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org Thanks Best Regards On Mon, Nov 3, 2014 at 5:53 PM, Karthikeyan Arcot Kuppusamy karthikeyan...@zanec.com wrote: hi - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Bug in DISK related Storage level?
Hello, I am trying to load a very large graph to run a GraphX algorithm, and the graph is not fix the memory, I found that if I use DISK_ONLY or MEMORY_AND_DISK_SER storage level, the program will met OOM, but if I use MEMORY_ONLY_SER, the program will not. Thus I want to know what kind of difference would risk OOM to the program ? Alcaid
Dynamically switching Nr of allocated core
Hi all, I can't seem to find a clear answer on the documentation. Does the standalone cluster support dynamic assigment of nr of allocated cores to an application once another app stops? I'm aware that we can have core sharding if we use Mesos between active applications depending on the nr of parallel tasks I believe my question is slightly simpler. For example: 1 - There are 12 cores available in the cluster 2 - I start app A with 2 cores - gets 2 3 - I start app B - gets remaining 10 4 - If I stop app A, app B *does not* get the now available remaining 2 cores. Should I expect Mesos to have this scenario working? Also, the same question applies to when we add more cores to a cluster. Let's say ideally I want 12 cores for my app, although there are only 10. As I add more workers, they should get assigned to my app dynamically. I haven't tested this in a while but I think the app will not even start and complain about not enough resources... Would very much appreciate any knowledge share on this! tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Dynamically-switching-Nr-of-allocated-core-tp17955.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark cluster stability
Great! Thanks for the information. I will try it out. - Novice Big Data Programmer -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-cluster-stability-tp17929p17956.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Kafka Performance
Hi Guys, Anyone could explain me how to work Kafka with Spark, I am using the JavaKafkaWordCount.java like a test and the line command is: ./run-example org.apache.spark.streaming.examples.JavaKafkaWordCount spark://192.168.0.13:7077 computer49:2181 test-consumer-group unibs.it 3 and like a producer I am using this command: rdkafka_cachesender -t unibs.nec -p 1 -b 192.168.0.46:9092 -f output.txt -l 100 -n 10 rdkafka_cachesender is a program that was developed by me which send to kafka the output.txt’s content where -l is the length of each send(upper bound) and -n is the lines to send in a row. Bellow is the throughput calculated by the program: File is 2235755 bytes throughput (b/s) = 699751388 throughput (b/s) = 723542382 throughput (b/s) = 662989745 throughput (b/s) = 505028200 throughput (b/s) = 471263416 throughput (b/s) = 446837266 throughput (b/s) = 409856716 throughput (b/s) = 373994467 throughput (b/s) = 366343097 throughput (b/s) = 373240017 throughput (b/s) = 386139016 throughput (b/s) = 373802209 throughput (b/s) = 369308515 throughput (b/s) = 366935820 throughput (b/s) = 365175388 throughput (b/s) = 362175419 throughput (b/s) = 358356633 throughput (b/s) = 357219124 throughput (b/s) = 352174125 throughput (b/s) = 348313093 throughput (b/s) = 355099099 throughput (b/s) = 348069777 throughput (b/s) = 348478302 throughput (b/s) = 340404276 throughput (b/s) = 339876031 throughput (b/s) = 339175102 throughput (b/s) = 327555252 throughput (b/s) = 324272374 throughput (b/s) = 322479222 throughput (b/s) = 319544906 throughput (b/s) = 317201853 throughput (b/s) = 317351399 throughput (b/s) = 315027978 throughput (b/s) = 313831014 throughput (b/s) = 310050384 throughput (b/s) = 307654601 throughput (b/s) = 305707061 throughput (b/s) = 307961102 throughput (b/s) = 296898200 throughput (b/s) = 296409904 throughput (b/s) = 294609332 throughput (b/s) = 293397843 throughput (b/s) = 293194876 throughput (b/s) = 291724886 throughput (b/s) = 290031314 throughput (b/s) = 289747022 throughput (b/s) = 289299632 The throughput goes down after some seconds and it does not maintain the performance like the initial values: throughput (b/s) = 699751388 throughput (b/s) = 723542382 throughput (b/s) = 662989745 Another question is about spark, after I have started the spark line command after 15 sec spark continue to repeat the words counted, but my program continue to send words to kafka, so I mean that the words counted in spark should grow up. I have attached the log from spark. My Case is: ComputerA(Kafka_cachsesender) - ComputerB(Kakfa-Brokers-Zookeeper) - ComputerC (Spark) If I don’t explain very well send a reply to me. Thanks Guys -- Informativa sulla Privacy: http://www.unibs.it/node/8155
Fwd: GraphX : Vertices details in Triangles
Hi All, I'm new to GraphX. I'm understanding Triangle Count use cases. I'm able to get number of triangles in a graph but I'm not able to collect vertices details in each Triangle. *For example* : I'm playing one of the graphx graph example Vertices and Edges val vertexArray = Array( (1L, (Alice, 28)), (2L, (Bob, 27)), (3L, (Charlie, 65)), (4L, (David, 42)), (5L, (Ed, 55)), (6L, (Fran, 50)) ) val edgeArray = Array( Edge(2L, 1L, 7), Edge(2L, 4L, 2), Edge(3L, 2L, 4), Edge(3L, 6L, 3), Edge(4L, 1L, 1), Edge(5L, 2L, 2), Edge(5L, 3L, 8), Edge(5L, 6L, 3) ) [image: Toy Social Network] I'm able to get 3 Triangles count in the above graph. I want to know Vertices details in a each Triangle. *For example like* : Triangle 1 : 1, 2, 4 Triangle 2 : 2, 5, 3 Triangle 3 : 5, 3, 6 Could you please help on this. Regards, Rajesh
Spark job resource allocation best practices
Hello, I have a Spark 1.1.0 standalone cluster, with several nodes, and several jobs (applications) being scheduled at the same time. By default, each Spark job takes up all available CPUs. This way, when more than one job is scheduled, all but the first are stuck in WAITING. On the other hand, if I tell each job to initially limit itself to a fixed number of CPUs, and that job runs by itself, the cluster is under-utilized and the job runs longer than it could have if it took all the available resources. - How to give the tasks a more fair resource division, which lets many jobs run together, and together lets them use all the available resources? - How do you divide resources between applications on your usecase? P.S. I started reading about Mesos but couldn't figure out if/how it could solve the described issue. Thanks! *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com
Re: Spark job resource allocation best practices
Have a look at scheduling pools https://spark.apache.org/docs/latest/job-scheduling.html. If you want more sophisticated resource allocation, then you are better of to use cluster managers like mesos or yarn Thanks Best Regards On Mon, Nov 3, 2014 at 9:10 PM, Romi Kuntsman r...@totango.com wrote: Hello, I have a Spark 1.1.0 standalone cluster, with several nodes, and several jobs (applications) being scheduled at the same time. By default, each Spark job takes up all available CPUs. This way, when more than one job is scheduled, all but the first are stuck in WAITING. On the other hand, if I tell each job to initially limit itself to a fixed number of CPUs, and that job runs by itself, the cluster is under-utilized and the job runs longer than it could have if it took all the available resources. - How to give the tasks a more fair resource division, which lets many jobs run together, and together lets them use all the available resources? - How do you divide resources between applications on your usecase? P.S. I started reading about Mesos but couldn't figure out if/how it could solve the described issue. Thanks! *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com
Re: Spark job resource allocation best practices
So, as said there, static partitioning is used in Spark’s standalone and YARN modes, as well as the coarse-grained Mesos mode. That leaves us only with Mesos, where there is *dynamic sharing* of CPU cores. It says when the application is not running tasks on a machine, other applications may run tasks on those cores. But my applications are short lived (seconds to minutes), and they read a large dataset, process it, and write the results. They are also IO-bound, meaning most of the time is spent reading input data (from S3) and writing the results back. Is it possible to divide the resources between them, according to how many are trying to run at the same time? So for example if I have 12 cores - if one job is scheduled, it will get 12 cores, but if 3 are scheduled, then each one will get 4 cores and then will all start. Thanks! *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Nov 3, 2014 at 5:46 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Have a look at scheduling pools https://spark.apache.org/docs/latest/job-scheduling.html. If you want more sophisticated resource allocation, then you are better of to use cluster managers like mesos or yarn Thanks Best Regards On Mon, Nov 3, 2014 at 9:10 PM, Romi Kuntsman r...@totango.com wrote: Hello, I have a Spark 1.1.0 standalone cluster, with several nodes, and several jobs (applications) being scheduled at the same time. By default, each Spark job takes up all available CPUs. This way, when more than one job is scheduled, all but the first are stuck in WAITING. On the other hand, if I tell each job to initially limit itself to a fixed number of CPUs, and that job runs by itself, the cluster is under-utilized and the job runs longer than it could have if it took all the available resources. - How to give the tasks a more fair resource division, which lets many jobs run together, and together lets them use all the available resources? - How do you divide resources between applications on your usecase? P.S. I started reading about Mesos but couldn't figure out if/how it could solve the described issue. Thanks! *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com
Cincinnati, OH Meetup for Apache Spark
Let me know if you are interested in participating in a meet up in Cincinnati, OH to discuss Apache Spark. We currently have 4-5 different companies expressing interest but would like a few more. Darin.
random shuffle streaming RDDs?
Hi, Is there a nice or optimal method to randomly shuffle spark streaming RDDs? Thanks, Josh
Re: Dynamically switching Nr of allocated core
I didn't notice your message and asked about the same question, in the thread with the title Spark job resource allocation best practices. Adding specific case to your example: 1 - There are 12 cores available in the cluster 2 - I start app B with all cores - gets 12 3 - I start app A - it needs just 2 cores (as you said it will get even when there are 12 available), but gets nothing 4 - Until I stop app B, app A is stuck waiting, instead of app B freeing 2 cores and dropping to 10 cores. *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Nov 3, 2014 at 3:17 PM, RodrigoB rodrigo.boav...@aspect.com wrote: Hi all, I can't seem to find a clear answer on the documentation. Does the standalone cluster support dynamic assigment of nr of allocated cores to an application once another app stops? I'm aware that we can have core sharding if we use Mesos between active applications depending on the nr of parallel tasks I believe my question is slightly simpler. For example: 1 - There are 12 cores available in the cluster 2 - I start app A with 2 cores - gets 2 3 - I start app B - gets remaining 10 4 - If I stop app A, app B *does not* get the now available remaining 2 cores. Should I expect Mesos to have this scenario working? Also, the same question applies to when we add more cores to a cluster. Let's say ideally I want 12 cores for my app, although there are only 10. As I add more workers, they should get assigned to my app dynamically. I haven't tested this in a while but I think the app will not even start and complain about not enough resources... Would very much appreciate any knowledge share on this! tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Dynamically-switching-Nr-of-allocated-core-tp17955.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Key-Value decomposition
Hi, I'm a newbie in Spark and faces the following use case : val data = Array ( A, 1;2;3) val rdd = sc.parallelize(data) // Something here to produce RDD of (Key,value) // ( A, 1) , (A, 2), (A, 3) Does anybody know how to do ? Thank's -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Key-Value-decomposition-tp17966.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Key-Value decomposition
Very straightforward: You want to use cartesian. If you have two RDDs - RDD_1(³A²) and RDD_2(1,2,3) RDD_1.cartesian(RDD_2) will generate the cross product between the two RDDs and you will have RDD_3((³A²,1), (³B²,2), (³C², 3)) On 11/3/14, 11:38 AM, david david...@free.fr wrote: Hi, I'm a newbie in Spark and faces the following use case : val data = Array ( A, 1;2;3) val rdd = sc.parallelize(data) // Something here to produce RDD of (Key,value) // ( A, 1) , (A, 2), (A, 3) Does anybody know how to do ? Thank's -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Key-Value-decompositio n-tp17966.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: random shuffle streaming RDDs?
I think the answer will be the same in streaming as in the core. You want a random permutation of an RDD? in general RDDs don't have ordering at all -- excepting when you sort for example -- so a permutation doesn't make sense. Do you just want a well-defined but random ordering of the data? Do you just want to (re-)assign elements randomly to partitions? On Mon, Nov 3, 2014 at 4:33 PM, Josh J joshjd...@gmail.com wrote: Hi, Is there a nice or optimal method to randomly shuffle spark streaming RDDs? Thanks, Josh - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: random shuffle streaming RDDs?
When I'm outputting the RDDs to an external source, I would like the RDDs to be outputted in a random shuffle so that even the order is random. So far what I understood is that the RDDs do have a type of order, in that the order for spark streaming RDDs would be the order in which spark streaming read the tuples from source (e.g. ordered by roughly when the producer sent the tuple in addition to any latency) On Mon, Nov 3, 2014 at 8:48 AM, Sean Owen so...@cloudera.com wrote: I think the answer will be the same in streaming as in the core. You want a random permutation of an RDD? in general RDDs don't have ordering at all -- excepting when you sort for example -- so a permutation doesn't make sense. Do you just want a well-defined but random ordering of the data? Do you just want to (re-)assign elements randomly to partitions? On Mon, Nov 3, 2014 at 4:33 PM, Josh J joshjd...@gmail.com wrote: Hi, Is there a nice or optimal method to randomly shuffle spark streaming RDDs? Thanks, Josh
Accumulables in transformation operations
Hi i'm reading the O´really´s book Learning Spark and i have a doubt, the accumulator's fault tolerance still only happening in the actions operations? this behaviour is also expected if we use accumulables? Thank in advance Jorge López-Malla Matute Big Data Developer Avenida de Europa, 26. Ática 5. 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: 91 352 59 42 // @stratiobd
Re: random shuffle streaming RDDs?
A use case would be helpful? Batches of RDDs from Streams are going to have temporal ordering in terms of when they are processed in a typical application ... , but maybe you could shuffle the way batch iterations work On Nov 3, 2014, at 11:59 AM, Josh J joshjd...@gmail.com wrote: When I'm outputting the RDDs to an external source, I would like the RDDs to be outputted in a random shuffle so that even the order is random. So far what I understood is that the RDDs do have a type of order, in that the order for spark streaming RDDs would be the order in which spark streaming read the tuples from source (e.g. ordered by roughly when the producer sent the tuple in addition to any latency) On Mon, Nov 3, 2014 at 8:48 AM, Sean Owen so...@cloudera.com wrote: I think the answer will be the same in streaming as in the core. You want a random permutation of an RDD? in general RDDs don't have ordering at all -- excepting when you sort for example -- so a permutation doesn't make sense. Do you just want a well-defined but random ordering of the data? Do you just want to (re-)assign elements randomly to partitions? On Mon, Nov 3, 2014 at 4:33 PM, Josh J joshjd...@gmail.com wrote: Hi, Is there a nice or optimal method to randomly shuffle spark streaming RDDs? Thanks, Josh
Re: akka connection refused bug, fix?
Any one has experience or advice to fix this problem? highly appreciated! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/akka-connection-refused-bug-fix-tp17764p17972.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Parquet files are only 6-20MB in size?
Befire saveAsParquetFile(), you can call coalesce(N), then you will have N files, it will keep the order as before (repartition() will not). On Mon, Nov 3, 2014 at 1:16 AM, ag007 agre...@mac.com wrote: Thanks Akhil, Am I right in saying that the repartition will spread the data randomly so I loose chronological order? I really just want the csv -- parquet format in the same order it came in. If I set repartition with 1 will this not be random? cheers, Ag -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-files-are-only-6-20MB-in-size-tp17935p17941.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on Yarn probably trying to load all the data to RAM
On Sun, Nov 2, 2014 at 1:35 AM, jan.zi...@centrum.cz wrote: Hi, I am using Spark on Yarn, particularly Spark in Python. I am trying to run: myrdd = sc.textFile(s3n://mybucket/files/*/*/*.json) How many files do you have? and the average size of each file? myrdd.getNumPartitions() Unfortunately it seems that Spark tries to load everything to RAM, or at least after while of running this everything slows down and then I am getting errors with log below. Everything works fine for datasets smaller than RAM, but I would expect Spark doing this without storing everything to RAM. So I would like to ask if I'm not missing some settings in Spark on Yarn? Thank you in advance for any help. 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-375] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 11744,575: [Full GC 1194515K-1192839K(1365504K), 2,2367150 secs] 11746,814: [Full GC 1194507K-1193186K(1365504K), 2,1788150 secs] 11748,995: [Full GC 1194507K-1193278K(1365504K), 1,3511480 secs] 11750,347: [Full GC 1194507K-1193263K(1365504K), 2,2735350 secs] 11752,622: [Full GC 1194506K-1193192K(1365504K), 1,2700110 secs] Traceback (most recent call last): File stdin, line 1, in module File /home/hadoop/spark/python/pyspark/rdd.py, line 391, in getNumPartitions return self._jrdd.partitions().size() File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError14/11/01 22:07:07 INFO scheduler.DAGScheduler: Failed to run saveAsTextFile at NativeMethodAccessorImpl.java:-2 : An error occurred while calling o112.partitions. : java.lang.OutOfMemoryError: GC overhead limit exceeded 11753,896: [Full GC 1194506K-947839K(1365504K), 2,1483780 secs] 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-309] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 14/11/01 22:07:09 INFO Remoting: Remoting shut down 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 14/11/01 22:07:09 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871) 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871) 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871) 14/11/01 22:07:09 INFO network.ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@5ca1c790 14/11/01 22:07:09 INFO network.ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@5ca1c790 java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768) 14/11/01 22:07:09 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768) 14/11/01 22:07:09 ERROR network.ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768) not found 14/11/01 22:07:10 ERROR cluster.YarnClientSchedulerBackend: Yarn application already ended: FINISHED 14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/metrics/json,null} 14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null} 14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/,null} 14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/static,null}
Re: random shuffle streaming RDDs?
If you iterated over an RDD's partitions, I'm not sure that in practice you would find the order matches the order they were received. The receiver is replicating data to another node or node as it goes and I don't know much is guaranteed about that. If you want to permute an RDD, how about a sortBy() on a good hash function of each value plus some salt? (Haven't thought this through much but sounds about right.) On Mon, Nov 3, 2014 at 4:59 PM, Josh J joshjd...@gmail.com wrote: When I'm outputting the RDDs to an external source, I would like the RDDs to be outputted in a random shuffle so that even the order is random. So far what I understood is that the RDDs do have a type of order, in that the order for spark streaming RDDs would be the order in which spark streaming read the tuples from source (e.g. ordered by roughly when the producer sent the tuple in addition to any latency) On Mon, Nov 3, 2014 at 8:48 AM, Sean Owen so...@cloudera.com wrote: I think the answer will be the same in streaming as in the core. You want a random permutation of an RDD? in general RDDs don't have ordering at all -- excepting when you sort for example -- so a permutation doesn't make sense. Do you just want a well-defined but random ordering of the data? Do you just want to (re-)assign elements randomly to partitions? On Mon, Nov 3, 2014 at 4:33 PM, Josh J joshjd...@gmail.com wrote: Hi, Is there a nice or optimal method to randomly shuffle spark streaming RDDs? Thanks, Josh - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
NoClassDefFoundError encountered in Spark 1.2-snapshot build with hive-0.13.1 profile
I just built the 1.2 snapshot current as of commit 76386e1a23c using: $ ./make-distribution.sh —tgz —name my-spark —skip-java-test -DskipTests -Phadoop-2.4 -Phive -Phive-0.13.1 -Pyarn I drop in my Hive configuration files into the conf directory, launch spark-shell, and then create my HiveContext, hc. I then issue a “use db” command: scala hc.hql(“use db”) and receive the following class-not-found error: java.lang.NoClassDefFoundError: com/esotericsoftware/shaded/org/objenesis/strategy/InstantiatorStrategy at org.apache.hadoop.hive.ql.exec.Utilities.clinit(Utilities.java:925) at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1224) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1088) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:911) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:901) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:315) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:286) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35) at org.apache.spark.sql.execution.Command$class.execute(commands.scala:46) at org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:30) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:424) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:424) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:103) at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:111) at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:115) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:31) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:36) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:38) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:40) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:42) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:44) at $iwC$$iwC$$iwC$$iwC.init(console:46) at $iwC$$iwC$$iwC.init(console:48) at $iwC$$iwC.init(console:50) at $iwC.init(console:52) at init(console:54) at .init(console:58) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorIva:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125 at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:8 at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILola:968) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scal at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scal at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoadla:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorIva:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:353) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at
MLlib - Naive Bayes Java example bug
Hi, I noticed a bug in the sample java code in MLlib - Naive Bayes docs page: http://spark.apache.org/docs/1.1.0/mllib-naive-bayes.html In the filter: |double accuracy = 1.0 * predictionAndLabel.filter(new FunctionTuple2Double, Double, Boolean() { @Override public Boolean call(Tuple2Double, Double pl) { return pl._1() == pl._2(); } }).count() / test.count(); it tests Double object by references whereas it should test their values: ||double accuracy = 1.0 * predictionAndLabel.filter(new FunctionTuple2Double, Double, Boolean() { @Override public Boolean call(Tuple2Double, Double pl) { ||| |return pl._1().doubleValue() == pl._2().doubleValue(); } }).count() / test.count();| The Java version accuracy is always 0.0. Scala code outputs the correct value 1.0 Thanks,
ParquetFilters and StringType support for GT, GTE, LT, LTE
Is there any reason why StringType is not a supported type the GT, GTE, LT, LTE operations? I was able to previously have a predicate where my column type was a string and execute a filter with one of the above operators in SparkSQL w/o any problems. However, I synced up to the latest code this morning and now the same query will give me a MatchError for this column of string type. Thanks, -Terry
Re: NoClassDefFoundError encountered in Spark 1.2-snapshot build with hive-0.13.1 profile
Hi Terry I think the issue you mentioned will be resolved by following PR. https://github.com/apache/spark/pull/3072 - Kousuke (2014/11/03 10:42), Terry Siu wrote: I just built the 1.2 snapshot current as of commit 76386e1a23c using: $ ./make-distribution.sh —tgz —name my-spark —skip-java-test -DskipTests -Phadoop-2.4 -Phive -Phive-0.13.1 -Pyarn I drop in my Hive configuration files into the conf directory, launch spark-shell, and then create my HiveContext, hc. I then issue a “use db” command: scala hc.hql(“use db”) and receive the following class-not-found error: java.lang.NoClassDefFoundError: com/esotericsoftware/shaded/org/objenesis/strategy/InstantiatorStrategy at org.apache.hadoop.hive.ql.exec.Utilities.clinit(Utilities.java:925) at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1224) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1088) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:911) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:901) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:315) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:286) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35) at org.apache.spark.sql.execution.Command$class.execute(commands.scala:46) at org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:30) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:424) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:424) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:103) at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:111) at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:115) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:31) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:36) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:38) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:40) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:42) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:44) at $iwC$$iwC$$iwC$$iwC.init(console:46) at $iwC$$iwC$$iwC.init(console:48) at $iwC$$iwC.init(console:50) at $iwC.init(console:52) at init(console:54) at .init(console:58) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorIva:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125 at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:8 at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILola:968) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scal at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scal at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoadla:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorIva:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:353) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at
Re: MLlib - Naive Bayes Java example bug
Yes, good catch. I also think the 1.0 * is suboptimal as a cast to double. I searched for similar issues and didn't see any. Open a PR -- I'm not even sure this is enough to warrant a JIRA? but feel free to as well. On Mon, Nov 3, 2014 at 6:46 PM, Dariusz Kobylarz darek.kobyl...@gmail.com wrote: Hi, I noticed a bug in the sample java code in MLlib - Naive Bayes docs page: http://spark.apache.org/docs/1.1.0/mllib-naive-bayes.html In the filter: double accuracy = 1.0 * predictionAndLabel.filter(new FunctionTuple2Double, Double, Boolean() { @Override public Boolean call(Tuple2Double, Double pl) { return pl._1() == pl._2(); } }).count() / test.count(); it tests Double object by references whereas it should test their values: double accuracy = 1.0 * predictionAndLabel.filter(new FunctionTuple2Double, Double, Boolean() { @Override public Boolean call(Tuple2Double, Double pl) { return pl._1().doubleValue() == pl._2().doubleValue(); } }).count() / test.count(); The Java version accuracy is always 0.0. Scala code outputs the correct value 1.0 Thanks, - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on Yarn probably trying to load all the data to RAM
I have 3 datasets in all the datasets the average file size is 10-12Kb. I am able to run my code on the dataset with 70K files, but I am not able to run it on datasets with 1.1M and 3.8M files. __ On Sun, Nov 2, 2014 at 1:35 AM, jan.zi...@centrum.cz wrote: Hi, I am using Spark on Yarn, particularly Spark in Python. I am trying to run: myrdd = sc.textFile(s3n://mybucket/files/*/*/*.json) How many files do you have? and the average size of each file? myrdd.getNumPartitions() Unfortunately it seems that Spark tries to load everything to RAM, or at least after while of running this everything slows down and then I am getting errors with log below. Everything works fine for datasets smaller than RAM, but I would expect Spark doing this without storing everything to RAM. So I would like to ask if I'm not missing some settings in Spark on Yarn? Thank you in advance for any help. 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-375] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 11744,575: [Full GC 1194515K-1192839K(1365504K), 2,2367150 secs] 11746,814: [Full GC 1194507K-1193186K(1365504K), 2,1788150 secs] 11748,995: [Full GC 1194507K-1193278K(1365504K), 1,3511480 secs] 11750,347: [Full GC 1194507K-1193263K(1365504K), 2,2735350 secs] 11752,622: [Full GC 1194506K-1193192K(1365504K), 1,2700110 secs] Traceback (most recent call last): File stdin, line 1, in module File /home/hadoop/spark/python/pyspark/rdd.py, line 391, in getNumPartitions return self._jrdd.partitions().size() File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError14/11/01 22:07:07 INFO scheduler.DAGScheduler: Failed to run saveAsTextFile at NativeMethodAccessorImpl.java:-2 : An error occurred while calling o112.partitions. : java.lang.OutOfMemoryError: GC overhead limit exceeded 11753,896: [Full GC 1194506K-947839K(1365504K), 2,1483780 secs] 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-309] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 14/11/01 22:07:09 INFO Remoting: Remoting shut down 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 14/11/01 22:07:09 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871) 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871) 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871) 14/11/01 22:07:09 INFO network.ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@5ca1c790 14/11/01 22:07:09 INFO network.ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@5ca1c790 java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768) 14/11/01 22:07:09 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768) 14/11/01 22:07:09 ERROR network.ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768) not found 14/11/01 22:07:10 ERROR cluster.YarnClientSchedulerBackend: Yarn application already ended: FINISHED 14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/metrics/json,null} 14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null} 14/11/01 22:07:10
Re: NoClassDefFoundError encountered in Spark 1.2-snapshot build with hive-0.13.1 profile
Thanks, Kousuke. I’ll wait till this pull request makes it into the master branch. -Terry From: Kousuke Saruta saru...@oss.nttdata.co.jpmailto:saru...@oss.nttdata.co.jp Date: Monday, November 3, 2014 at 11:11 AM To: Terry Siu terry@smartfocus.commailto:terry@smartfocus.com, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: NoClassDefFoundError encountered in Spark 1.2-snapshot build with hive-0.13.1 profile Hi Terry I think the issue you mentioned will be resolved by following PR. https://github.com/apache/spark/pull/3072 - Kousuke (2014/11/03 10:42), Terry Siu wrote: I just built the 1.2 snapshot current as of commit 76386e1a23c using: $ ./make-distribution.sh —tgz —name my-spark —skip-java-test -DskipTests -Phadoop-2.4 -Phive -Phive-0.13.1 -Pyarn I drop in my Hive configuration files into the conf directory, launch spark-shell, and then create my HiveContext, hc. I then issue a “use db” command: scala hc.hql(“use db”) and receive the following class-not-found error: java.lang.NoClassDefFoundError: com/esotericsoftware/shaded/org/objenesis/strategy/InstantiatorStrategy at org.apache.hadoop.hive.ql.exec.Utilities.clinit(Utilities.java:925) at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1224) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1088) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:911) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:901) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:315) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:286) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35) at org.apache.spark.sql.execution.Command$class.execute(commands.scala:46) at org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:30) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:424) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:424) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:103) at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:111) at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:115) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:31) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:36) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:38) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:40) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:42) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:44) at $iwC$$iwC$$iwC$$iwC.init(console:46) at $iwC$$iwC$$iwC.init(console:48) at $iwC$$iwC.init(console:50) at $iwC.init(console:52) at init(console:54) at .init(console:58) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorIva:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125 at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:8 at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILola:968) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scal at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scal at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoadla:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916) at
Re: Cannot instantiate hive context
Thanks Akhil. I realized that earlier, and i thought mvn -Phive should have captured and included all these dependencies. In any case, i proceeded with that, included other such dependencies that were missing, and finally hit the guava version mismatch issue. (Spark with Guava 14 vs Hadoop/Hive with Guava 11). There are 2 parts: 1. Spark includes Guava library within its jars and that may conflict with Hadoop/Hive components depending on older version of the library. It seems this has been solved with SPARK-2848 https://issues.apache.org/jira/browse/SPARK-2848 patch to shade the Guava libraries. 2. Spark actually uses interfaces from newer version of Guava library, that needs to be rewritten to use older version (i.e. downgrade Spark dependency on Guava). I wasn't able to find the related patches (I need them since i am on Spark 1.0.1). Applying patch for #1 above, i still hit the following error: 14/11/03 15:01:32 WARN storage.BlockManager: Putting block broadcast_0 failed java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode; at org.apache.spark.util.collection.OpenHashSet.org $apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261) at org.apache.spark.util.collection.OpenHashSet$mcI$sp.getPos$mcI$sp(OpenHashSet.scala:165) at org.apache.spark.util.collection.OpenHashSet$mcI$sp.contains$mcI$sp(OpenHashSet.scala:102) stack continues I haven't been able to find the other patches that actually downgrade the dependency. Please point me to those patches, or any other ideas about fixing these dependency issues. Thanks. On Sun, Nov 2, 2014 at 8:41 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Adding the libthrift jar http://mvnrepository.com/artifact/org.apache.thrift/libthrift/0.9.0 in the class path would resolve this issue. Thanks Best Regards On Sat, Nov 1, 2014 at 12:34 AM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi, I am trying to load hive datasets using HiveContext, in spark shell. Spark ver 1.0.1 and Hive ver 0.12. We are trying to get Spark work with hive datasets. I already have existing Spark deployment. Following is what i did on top of that: 1. Build spark using 'mvn -Pyarn,hive -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package' 2. Copy over spark-assembly-1.0.1-hadoop2.4.0.jar into spark deployment directory. 3. Launch spark-shell with the spark hive jar included in the list. When i execute *'* *val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)* i get the following error stack: java.lang.NoClassDefFoundError: org/apache/thrift/TBase at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:792) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: org.apache.thrift.TBase at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 55 more I thought that building with -Phive option should include all the necessary hive packages into the assembly jar (according to here https://spark.apache.org/docs/1.0.1/sql-programming-guide.html#hive-tables). I tried searching online and in this mailing list archive but haven't found any instructions on how to get this working. I know that there is additional step of updating the assembly jar across the whole cluster, not just client side, but right now, even the client is not working. Would appreciate instructions (or link to them) on how to get this working end-to-end. Thanks, pala
Re: random shuffle streaming RDDs?
If you want to permute an RDD, how about a sortBy() on a good hash function of each value plus some salt? (Haven't thought this through much but sounds about right.) This sounds promising. Where can I read more about the space (memory and network overhead) and time complexity of sortBy? On Mon, Nov 3, 2014 at 10:38 AM, Sean Owen so...@cloudera.com wrote: If you iterated over an RDD's partitions, I'm not sure that in practice you would find the order matches the order they were received. The receiver is replicating data to another node or node as it goes and I don't know much is guaranteed about that. If you want to permute an RDD, how about a sortBy() on a good hash function of each value plus some salt? (Haven't thought this through much but sounds about right.) On Mon, Nov 3, 2014 at 4:59 PM, Josh J joshjd...@gmail.com wrote: When I'm outputting the RDDs to an external source, I would like the RDDs to be outputted in a random shuffle so that even the order is random. So far what I understood is that the RDDs do have a type of order, in that the order for spark streaming RDDs would be the order in which spark streaming read the tuples from source (e.g. ordered by roughly when the producer sent the tuple in addition to any latency) On Mon, Nov 3, 2014 at 8:48 AM, Sean Owen so...@cloudera.com wrote: I think the answer will be the same in streaming as in the core. You want a random permutation of an RDD? in general RDDs don't have ordering at all -- excepting when you sort for example -- so a permutation doesn't make sense. Do you just want a well-defined but random ordering of the data? Do you just want to (re-)assign elements randomly to partitions? On Mon, Nov 3, 2014 at 4:33 PM, Josh J joshjd...@gmail.com wrote: Hi, Is there a nice or optimal method to randomly shuffle spark streaming RDDs? Thanks, Josh
Model characterization
Hi All, I have been using LinearRegression model of MLLib and very pleased with its scalability and robustness. Right now, we are just calculating MSE of our model. We would like to characterize the performance of our model. I was wondering adding support for computing things such as Confidence Interval etc. are they something that are on the roadmap? Graphical things such as ROC curves etc. will that be supported by MLLib/other parts of the ecosystem? or is this something for which other statistical packages are recommended?
Any Replicated RDD in Spark?
Hi All, I have spent last two years on hadoop but new to spark. I am planning to move one of my existing system to spark to get some enhanced features. My question is: If I try to do a map side join (something similar to Replicated key word in Pig), how can I do it? Is it anyway to declare a RDD as replicated (means distribute it to all nodes and each node will have a full copy)? I know I can use accumulator to get this feature, but I am not sure what is the best practice. And if I accumulator to broadcast the data set, can then (after broadcast) convert it into a RDD and do the join? Regards, Shuai
Re: CANNOT FIND ADDRESS
no luck :(! Still observing the same behavior! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/CANNOT-FIND-ADDRESS-tp17637p17988.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Memory limitation on EMR Node?
Hi, I am planning to run spark on EMR. And because my application might take a lot of memory. On EMR, I know there is a hard limit 16G physical memory on individual mapper/reducer (otherwise I will have an exception and this is confirmed by AWS EMR team, at least it is the spec at this moment). And if I use Yarn on EMR, and submit the spark job to YARN, I assume the yarn will take the responsibility to do the resource allocation, so the limitation on the physical memory still be 16G? Is it a reasonable guess or anyone has any experience to use more than 16G memory on the EMR for individual executor? And I notice that there are some examples that allocate more than 16G memory in the doc, so if I use spark cluster by itself, I can use more memory? Regards, Shuai
with SparkStreeaming spark-submit, don't see output after ssc.start()
I have a Spark Streaming program that works fine if I execute it via sbt runMain com.cray.examples.spark.streaming.cyber.StatefulDhcpServerHisto -f /Users/spr/Documents/.../tmp/ -t 10 but if I start it via $S/bin/spark-submit --master local[12] --class StatefulNewDhcpServers target/scala-2.10/newd*jar -f /Users/spr/Documents/.../tmp/ -t 10 (where $S points to the base of the Spark installation), it prints the output of print statements before the ssc.start() but nothing after that. I might well have screwed up something, but I'm getting no output anywhere AFAICT. I have set spark.eventLog.enabled to True in my spark-defaults.conf file. The Spark History Server at localhost:18080 says no completed applications found. There must be some log output somewhere. Any ideas? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/with-SparkStreeaming-spark-submit-don-t-see-output-after-ssc-start-tp17989.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Any Replicated RDD in Spark?
You need to use broadcast followed by flatMap or mapPartitions to do map-side joins (in your map function, you can look at the hash table you broadcast and see what records match it). Spark SQL also does it by default for tables smaller than the spark.sql.autoBroadcastJoinThreshold setting (by default 10 KB, which is really small, but you can bump this up with set spark.sql.autoBroadcastJoinThreshold=100 for example). Matei On Nov 3, 2014, at 1:03 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I have spent last two years on hadoop but new to spark. I am planning to move one of my existing system to spark to get some enhanced features. My question is: If I try to do a map side join (something similar to Replicated key word in Pig), how can I do it? Is it anyway to declare a RDD as replicated (means distribute it to all nodes and each node will have a full copy)? I know I can use accumulator to get this feature, but I am not sure what is the best practice. And if I accumulator to broadcast the data set, can then (after broadcast) convert it into a RDD and do the join? Regards, Shuai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: --executor-cores cannot change vcores in yarn?
Hi, Well, I doesn't find original documentation, but according to http://qnalist.com/questions/2791828/about-the-cpu-cores-and-cpu-usage http://qnalist.com/questions/2791828/about-the-cpu-cores-and-cpu-usage , the vcores is not for physics cpu core but for virtual cores. And I used top command to monitor the cpu utilization during the spark task. The spark can use all cpu even I leave --executor-cores as default(1). Hope that it can be a help. Cheers Gen Gen wrote Hi, Maybe it is a stupid question, but I am running spark on yarn. I request the resources by the following command: {code} ./spark-submit --master yarn-client --num-executors #number of worker --executor-cores #number of cores. ... {code} However, after launching the task, I use / yarn node -status ID / to monitor the situation of cluster. It shows that the number of Vcores used for each container is always 1 no matter what number I pass by --executor-cores. Any ideas how to solve this problem? Thanks a lot in advance for your help. Cheers Gen -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/executor-cores-cannot-change-vcores-in-yarn-tp17883p17992.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ParquetFilters and StringType support for GT, GTE, LT, LTE
That sounds like a regression. Could you open a JIRA with steps to reproduce (https://issues.apache.org/jira/browse/SPARK)? We'll want to fix this before the 1.2 release. On Mon, Nov 3, 2014 at 11:04 AM, Terry Siu terry@smartfocus.com wrote: Is there any reason why StringType is not a supported type the GT, GTE, LT, LTE operations? I was able to previously have a predicate where my column type was a string and execute a filter with one of the above operators in SparkSQL w/o any problems. However, I synced up to the latest code this morning and now the same query will give me a MatchError for this column of string type. Thanks, -Terry
Re: SQL COUNT DISTINCT
On Mon, Nov 3, 2014 at 12:45 AM, Bojan Kostic blood9ra...@gmail.com wrote: But will this improvement also affect when you want to count distinct on 2 or more fields: SELECT COUNT(f1), COUNT(DISTINCT f2), COUNT(DISTINCT f3), COUNT(DISTINCT f4) FROM parquetFile Unfortunately I think this case may be harder for us to optimize, though could be possible with some work. Should i still create Jira issue/improvement for this? Yes please.
OOM - Requested array size exceeds VM limit
I am running local (client). My vm is 16 cpu/108gb ram. My configuration is as following: spark.executor.extraJavaOptions -XX:+PrintGCDetails -XX:+UseCompressedOops -XX:+UseParallelGC -XX:+UseParallelOldGC -XX:+DisableExplicitGC -XX:MaxPermSize=1024m spark.daemon.memory=20g spark.driver.memory=20g spark.executor.memory=20g export SPARK_DAEMON_JAVA_OPTS=-XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseCompressedOops -XX:+UseParallelGC -XX:+UseParallelOldGC -XX:+DisableExplicitGC -XX:MaxPermSize=1024m /usr/local/spark-1.1.0/bin/spark-submit --class main.java.MyAppMainProcess --master local[32] MyApp.jar myapp.out /11/03 20:45:43 INFO BlockManager: Removing block broadcast_4 14/11/03 20:45:43 INFO MemoryStore: Block broadcast_4 of size 3872 dropped from memory (free 16669590422) 14/11/03 20:45:43 INFO ContextCleaner: Cleaned broadcast 4 14/11/03 20:46:00 WARN BlockManager: Putting block rdd_19_5 failed 14/11/03 20:46:00 ERROR Executor: Exception in task 5.0 in stage 3.0 (TID 70) java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1047) at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1056) at org.apache.spark.storage.TachyonStore.putIterator(TachyonStore.scala:60) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:743) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:594) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745 It is hard to see from this output what stage it fails, but the output is saving textFile. Individual record (key, value or key and value is relatively small, but number of records in the collection is large.) There seems to be a bottleneck that I have run into that I can't seem to get pass. Any pointers in the right direction will be helpful! Thanks, Ami -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/OOM-Requested-array-size-exceeds-VM-limit-tp17996.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NoClassDefFoundError encountered in Spark 1.2-snapshot build with hive-0.13.1 profile
It is merged! On Mon, Nov 3, 2014 at 12:06 PM, Terry Siu terry@smartfocus.com wrote: Thanks, Kousuke. I’ll wait till this pull request makes it into the master branch. -Terry From: Kousuke Saruta saru...@oss.nttdata.co.jp Date: Monday, November 3, 2014 at 11:11 AM To: Terry Siu terry@smartfocus.com, user@spark.apache.org user@spark.apache.org Subject: Re: NoClassDefFoundError encountered in Spark 1.2-snapshot build with hive-0.13.1 profile Hi Terry I think the issue you mentioned will be resolved by following PR. https://github.com/apache/spark/pull/3072 - Kousuke (2014/11/03 10:42), Terry Siu wrote: I just built the 1.2 snapshot current as of commit 76386e1a23c using: $ ./make-distribution.sh —tgz —name my-spark —skip-java-test -DskipTests -Phadoop-2.4 -Phive -Phive-0.13.1 -Pyarn I drop in my Hive configuration files into the conf directory, launch spark-shell, and then create my HiveContext, hc. I then issue a “use db” command: scala hc.hql(“use db”) and receive the following class-not-found error: java.lang.NoClassDefFoundError: com/esotericsoftware/shaded/org/objenesis/strategy/InstantiatorStrategy at org.apache.hadoop.hive.ql.exec.Utilities.clinit(Utilities.java:925) at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1224) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1088) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:911) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:901) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:315) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:286) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35) at org.apache.spark.sql.execution.Command$class.execute(commands.scala:46) at org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:30) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:424) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:424) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:103) at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:111) at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:115) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:31) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:36) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:38) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:40) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:42) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:44) at $iwC$$iwC$$iwC$$iwC.init(console:46) at $iwC$$iwC$$iwC.init(console:48) at $iwC$$iwC.init(console:50) at $iwC.init(console:52) at init(console:54) at .init(console:58) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorIva:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125 at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:8 at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILola:968) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scal at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scal at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoadla:135) at
Spark streaming job failed due to java.util.concurrent.TimeoutException
Hi all, I have a spark streaming job that consumes data from Kafka and produces some simple operations on the data. This job is run in an EMR cluster with 10 nodes. The batch size I use is 1 minute and it takes around 10 seconds to generate the results that are inserted to a MySQL database. However, after more than 2 days, the job failed with a list of the following error information in the log: jjava.util.concurrent.TimeoutException: Futures timed out after [30 seconds] java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] Does anyone know the reason? Thanks! Bill
is spark a good fit for sequential machine learning algorithms?
i'm struggling with implementing a few algorithms with spark. hope to get help from the community. most of the machine learning algorithms today are sequential, while spark is all about parallelism. it seems to me that using spark doesn't actually help much, because in most cases you can't really paralellize a sequential algorithm. there must be some strong reasons why mllib was created and so many people claim spark is ideal for machine learning. what are those reasons? what are some specific examples when how to use spark to implement sequential machine learning algorithms? any commen/feedback/answer is much appreciated. thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/is-spark-a-good-fit-for-sequential-machine-learning-algorithms-tp18000.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ParquetFilters and StringType support for GT, GTE, LT, LTE
Done. https://issues.apache.org/jira/browse/SPARK-4213 Thanks, -Terry From: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com Date: Monday, November 3, 2014 at 1:37 PM To: Terry Siu terry@smartfocus.commailto:terry@smartfocus.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: ParquetFilters and StringType support for GT, GTE, LT, LTE That sounds like a regression. Could you open a JIRA with steps to reproduce (https://issues.apache.org/jira/browse/SPARK)? We'll want to fix this before the 1.2 release. On Mon, Nov 3, 2014 at 11:04 AM, Terry Siu terry@smartfocus.commailto:terry@smartfocus.com wrote: Is there any reason why StringType is not a supported type the GT, GTE, LT, LTE operations? I was able to previously have a predicate where my column type was a string and execute a filter with one of the above operators in SparkSQL w/o any problems. However, I synced up to the latest code this morning and now the same query will give me a MatchError for this column of string type. Thanks, -Terry
Re: Parquet files are only 6-20MB in size?
David, that's exactly what I was after :) Awesome, thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-files-are-only-6-20MB-in-size-tp17935p18002.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Deleting temp dir Exception
Hi, I've written a short scala app to perform word counts on a text file and am getting the following exception as the program completes (after it prints out all of the word counts). Exception in thread delete Spark temp dir C:\Users\Josh\AppData\Local\Temp\spark-0fdd0b79-7329-4690-a093-0fdb0d21e32c java.io.IOException: Failed to delete: C:\Users\Josh\AppData\Local\Temp\spark-0fdd0b79-7329-4690-a093-0fdb0d21e32c\word-count_2.10-1.0.jar at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:692) at org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:686) at org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:685) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:685) at org.apache.spark.util.Utils$$anon$4.run(Utils.scala:281) A few details: Spark version 1.1.0 built from source on Windows 8. Scala version 2.11.4, JRE version 7 Update 71. Everything is local on a single machine. I can run the code in the scala shell just fine, this appears to be an issue with not being able to delete the temporary JAR file after running the application via spark-submit. I've checked the location of the JAR file and it is indeed left behind, so it is not being deleted. I guess my question is why is it unable to be deleted and is there anything I can do differently to fix this? I do not see this exception when I run the SimpleApp example taken from here: http://spark.apache.org/docs/latest/quick-start.html#standalone-applications Here is the code for my WordCount app that is producing this exception: import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object WordCount { def main(args: Array[String]) { val filename = data/rideshare.txt val conf = new SparkConf().setAppName(Word Count).setMaster(local) val sc = new SparkContext(conf) val file = sc.textFile(filename, 2).cache() val wordCounts = file.flatMap(line = line.split( )).map(word = (word, 1)).reduceByKey((a, b) = a + b) wordCounts.collect().foreach(println) } } Thanks, Josh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Deleting-temp-dir-Exception-tp18006.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: To find distances to reachable source vertices using GraphX
The NullPointerException seems to be because edge.dstAttr is null, which might be due to SPARK-3936 https://issues.apache.org/jira/browse/SPARK-3936. Until that's fixed, I edited the Gist with a workaround. Does that fix the problem? Ankur http://www.ankurdave.com/ On Mon, Nov 3, 2014 at 12:23 AM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi All, I'm trying to understand below link example program. When I run this program, I'm getting *java.lang.NullPointerException* at below highlighted line. *https://gist.github.com/ankurdave/4a17596669b36be06100 https://gist.github.com/ankurdave/4a17596669b36be06100* val updatedDists = edge.srcAttr.filter { case (source, dist) = *val existingDist = edge.dstAttr.getOrElse(source, Int.MaxValue) * existingDist dist + 1 }.mapValues(_ + 1).map(identity) Could you please help me to resolve this issue. Regards, Rajesh
Snappy and spark 1.1
Team, We are running a build of spark 1.1.1 for hadoop 2.2. We can't get the code to read LZO or snappy files in YARN. It fails to find the native libs. I have tried many different ways of defining the lib path - LD_LIBRARY_PATH, --driver-class-path, spark.executor.extraLibraryPath in spark-defaults.conf, --driver-java-options, and SPARK_LIBRARY_PATH. But none of them seem to take effect. What am I missing? Or is this a known issue? The example below (HdfsTest) works with plain text on both cluster and local mode. LZO and snappy files work on local mode, but both fail in the YARN cluster mode LD_LIBRARY_PATH=/opt/hadoop/lib/native/ MASTER=yarn SPARK_EXAMPLES_JAR=./examples/target/spark-examples_2.10-1.1.1.jar ./bin/run-example HdfsTest /user/input/part-r-0.snappy Stack Trace: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 101-26-03.sc1.verticloud.com): ExecutorLostFailure (executor lost) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Thanks, Aravind
Spark Streaming - Most popular Twitter Hashtags
Hi all, I was just reading this nice documentation here: http://ampcamp.berkeley.edu/3/exercises/realtime-processing-with-spark-streaming.html And got to the end of it, which says: Note that there are more efficient ways to get the top 10 hashtags. For example, instead of sorting the entire of 5-minute-counts (thereby, incurring the cost of a data shuffle), one can get the top 10 hashtags in each partition, collect them together at the driver and then find the top 10 hashtags among them. We leave this as an exercise for the reader to try. I was just wondering if anyone had managed to do this, and was willing to share as an example :) This seems to be the exact use case that will help me! Thanks! Harold
IllegalStateException: unread block data
Hollo there, Just set up an ec2 cluster with no HDFS, hadoop, hbase whatsoever. Just installed spark to read/process data from a hbase in a different cluster. The spark was built against the hbase/hadoop version in the remote (ec2) hbase cluster, which is 0.98.1 and 2.3.0 respectively. but I got the following error when running a simple test python script. The command line ./spark-submit --master spark://master:7077 --driver-class-path ./spark-examples-1.1.0-hadoop2.3.0.jar ~/workspace/test/sparkhbase.py From the worker log, I can see the worker node got the request from the master. Can anyone help with this problem? Tons of thanks! java.lang.IllegalStateException: unread block data java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2399) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1378) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1969) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1776) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346) java.io.ObjectInputStream.readObject(ObjectInputStream.java:368) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:162) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:679) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/IllegalStateException-unread-block-data-tp18011.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: hadoop_conf_dir when running spark on yarn
Hi, On Mon, Nov 3, 2014 at 1:29 PM, Amey Chaugule ambr...@gmail.com wrote: I thought that only applied when you're trying to run a job using spark-submit or in the shell... And how are you starting your Yarn job, if not via spark-submit? Tobias
Re: different behaviour of the same code
Hi, On Fri, Oct 31, 2014 at 4:31 PM, lieyan lie...@yahoo.com wrote: The code are here: LogReg.scala http://apache-spark-user-list.1001560.n3.nabble.com/file/n17803/LogReg.scala Then I click the Run button of the IDEA, and I get the following error message errlog.txt http://apache-spark-user-list.1001560.n3.nabble.com/file/n17803/errlog.txt . But when I export the jar file, and use *spark-submit --class net.yanl.spark.LogReg log_reg.jar 15*. The program works finely. I have not used the spark built-in cluster manager and I don't know how application jar distribution is done in it. However, it seems to me that when you use spark-submit, then spark-submit takes care of distributing your jar file properly to all the cluster nodes, that's why it works fine. When you run it from your IDE, it seems not to do that, that's why some classes are not there on all cluster nodes and you run into ClassNotFoundExceptions. If you change the master to local[3] instead of spark://master.local:7077 and run it from IDEA, does it work? Tobias
cannot import name accumulators in python 2.7
I am running python 2.7.3 and 2.1.0 of ipython notebook. I installed spark in my home directory. '/home/felix/spark-1.1.0/python/lib/py4j-0.8.1-src.zip', '/home/felix/spark-1.1.0/python', '', '/opt/bluekai/python/src/bk', '/usr/local/lib/python2.7/dist-packages/setuptools-6.1-py2.7.egg', '/usr/lib/python2.7', '/usr/lib/python2.7/plat-linux2', '/usr/lib/python2.7/lib-tk', '/usr/lib/python2.7/lib-old', '/usr/lib/python2.7/lib-dynload', '/usr/local/lib/python2.7/dist-packages', '/usr/lib/python2.7/dist-packages', '/usr/lib/python2.7/dist-packages/PIL', '/usr/lib/python2.7/dist-packages/gtk-2.0', '/usr/lib/pymodules/python2.7', '/usr/local/lib/python2.7/dist-packages/IPython/extensions', '/home/felix/.ipython' show when I print the sys path --- ImportError Traceback (most recent call last) ipython-input-3-0e976f4d3617 in module() 1 from pyspark import SparkContext as sc 2 3 print sc /home/felix/spark-1.1.0/python/pyspark/__init__.py in module() 61 62 from pyspark.conf import SparkConf --- 63 from pyspark.context import SparkContext 64 from pyspark.sql import SQLContext 65 from pyspark.rdd import RDD /home/felix/spark-1.1.0/python/pyspark/context.py in module() 23 from collections import namedtuple 24 --- 25 from pyspark import accumulators 26 from pyspark.accumulators import Accumulator 27 from pyspark.broadcast import Broadcast ImportError: cannot import name accumulators I followed the instruction on http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/ Thanks, Felix -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/cannot-import-name-accumulators-in-python-2-7-tp18015.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: with SparkStreeaming spark-submit, don't see output after ssc.start()
From: Tobias Pfeiffer t...@preferred.jpmailto:t...@preferred.jp Am I right that you are actually executing two different classes here? Yes, I realized after I posted that I was calling 2 different classes, though they are in the same JAR. I went back and tried it again with the same class in both cases, and it failed the same way. I thought perhaps having 2 classes in a JAR was an issue, but commenting out one of the classes did not seem to make a difference.
Re: Spark SQL takes unexpected time
Yes, I am using Spark1.1.0 and have used rdd.registerTempTable(). I tried by adding sqlContext.cacheTable(), but it took 59 seconds (more than earlier). I also tried by changing schema to use Long data type in some fields but seems conversion takes more time. Is there any way to specify index ? Though I checked and didn't found any, just want to confirm. For your reference here is the snippet of code. - case class EventDataTbl(EventUID: Long, ONum: Long, RNum: Long, Timestamp: java.sql.Timestamp, Duration: String, Type: String, Source: String, OName: String, RName: String) val format = new java.text.SimpleDateFormat(-MM-dd hh:mm:ss) val cedFileName = hdfs://hadoophost:8020/demo/poc/JoinCsv/output_2 val cedRdd = sc.textFile(cedFileName).map(_.split(,, -1)).map(p = EventDataTbl(p(0).toLong, p(1).toLong, p(2).toLong, new java.sql.Timestamp(format.parse(p(3)).getTime()), p(4), p(5), p(6), p(7), p(8))) cedRdd.registerTempTable(EventDataTbl) sqlCntxt.cacheTable(EventDataTbl) val t1 = System.nanoTime() println(\n\n10 Most frequent conversations between the Originators and Recipients\n) sql(SELECT COUNT(*) AS Frequency,ONum,OName,RNum,RName FROM EventDataTbl GROUP BY ONum,OName,RNum,RName ORDER BY Frequency DESC LIMIT 10).collect().foreach(println) val t2 = System.nanoTime() println(Time taken + (t2-t1)/10.0 + Seconds) - Thanks, Shailesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-takes-unexpected-time-tp17925p18017.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to make sure a ClassPath is always shipped to workers?
I have a spark application that deserialize an object 'Seq[Page]', save to HDFS/S3, and read by another worker to be used elsewhere. The serialization and deserialization use the same serializer as Spark itself. (Read from SparkEnv.get.serializer.newInstance()) However I sporadically get this error: java.lang.ClassNotFoundException: org.***.***.Page at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) It seems like Page class wasn't shipped with the Jar and executor and all its information was erased in runtime. The most weird thing: this error doesn't always happen, sometimes the old Seq[Page] was get properly, sometimes it throws the exception, how could this happen and how do I fix it? Yours Peng -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-sure-a-ClassPath-is-always-shipped-to-workers-tp18018.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to make sure a ClassPath is always shipped to workers?
I have a spark application that deserialize an object 'Seq[Page]', save to HDFS/S3, and read by another worker to be used elsewhere. The serialization and deserialization use the same serializer as Spark itself. (Read from SparkEnv.get.serializer.newInstance()) However I sporadically get this error: java.lang.ClassNotFoundException: org.***.***.Page at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) It seems like Page class wasn't shipped with the Jar and executor and all its information was erased in runtime. The most weird thing: this error doesn't always happen, sometimes the old Seq[Page] was get properly, sometimes it throws the exception, how could this happen and how do I fix it? Yours Peng -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-sure-a-ClassPath-is-always-shipped-to-workers-tp18019.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to make sure a ClassPath is always shipped to workers?
Sorry its a timeout duplicate, please remove it -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-sure-a-ClassPath-is-always-shipped-to-workers-tp18018p18020.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: different behaviour of the same code
You are right. You pointed out the very cause of my problem. Thanks. I have to specify the path to my jar file. The solution can be found in an earlier post. http://apache-spark-user-list.1001560.n3.nabble.com/ClassNotFoundException-with-simple-Spark-job-on-cluster-td932.html -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/different-behaviour-of-the-same-code-tp17803p18021.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
avro + parquet + vectorstring + NullPointerException while reading
Greetings! I'm trying to use avro and parquet with the following schema: { name: TestStruct, namespace: bughunt, type: record, fields: [ { name: string_array, type: { type: array, items: string } } ] } The writing process seems to be OK, but when I try to read it with Spark, I get: com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException Serialization trace: string_array (bughunt.TestStruct) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) When I try to read it with Hive, I get this: Failed with exception java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.io.ArrayWritable Which would lead me to suspect that this might be related to this one: https://github.com/Parquet/parquet-mr/issues/281 , but that one seems to be Hive specific, and I am not seeing Spark read the data it claims to have written itself. I'm running on an Amazon EMR cluster using the version 2.4.0 hadoop code and spark 1.1.0.Has anyone else observed this sort of behavior? For completeness, here is the code that writes the data: package bughunt import org.apache.hadoop.mapreduce.Job import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import parquet.avro.AvroWriteSupport import parquet.avro.AvroParquetOutputFormat import parquet.hadoop.ParquetOutputFormat import java.util.ArrayList object GenData { val outputPath = /user/x/testdata val words = List( List(apple, banana, cherry), List(car, boat, plane), List(lion, tiger, bear), List(north, south, east, west), List(up, down, left, right), List(red, green, blue)) def main(args: Array[String]) { val conf = new SparkConf(true) .setAppName(IngestLoanApplicattion) //.set(spark.kryo.registrator, // classOf[CommonRegistrator].getName) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryoserializer.buffer.mb, 4.toString) .set(spark.kryo.referenceTracking, false) val sc = new SparkContext(conf) val rdd = sc.parallelize(words) val job = new Job(sc.hadoopConfiguration) ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport]) AvroParquetOutputFormat.setSchema(job, TestStruct.SCHEMA$) rdd.map(p = { val xs = new java.util.ArrayList[String] for (z-p) { xs.add(z) } val bldr = TestStruct.newBuilder() bldr.setStringArray(xs) (null, bldr.build()) }) .saveAsNewAPIHadoopFile(outputPath, classOf[Void], classOf[TestStruct], classOf[ParquetOutputFormat[TestStruct]], job.getConfiguration) } } To read the data, I use this sort of code from the spark-shell: :paste import bughunt.TestStruct import org.apache.hadoop.mapreduce.Job import org.apache.spark.SparkContext import parquet.hadoop.ParquetInputFormat import parquet.avro.AvroReadSupport def openRddSpecific(sc: SparkContext) = { val job = new Job(sc.hadoopConfiguration) ParquetInputFormat.setReadSupportClass(job, classOf[AvroReadSupport[TestStruct]]) sc.newAPIHadoopFile(/user/malbert/testdata, classOf[ParquetInputFormat[TestStruct]], classOf[Void], classOf[TestStruct], job.getConfiguration) } I start the Spark shell as follows: spark-shell \ --jars ../my-jar-containing-the-class-definitions.jar \ --conf mapreduce.user.classpath.first=true \ --conf spark.kryo.referenceTracking=false \ --conf spark.kryoserializer.buffer.mb=4 \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer I'm stumped. I can read and write records and maps, but arrays/vectors elude me.Am I missing something obvious? Thanks! Sincerely, Mike Albert
Cleaning/transforming json befor converting to SchemaRDD
I am trying to convert terabytes of json log files into parquet files. but I need to clean it a little first. I end up doing the following txt = sc.textFile(inpath).coalesce(800) val json = (for { line - txt JObject(child) = parse(line) child2 = (for { JField(name, value) - child _ - patt(name) // filter fields with invalid names } yield JField(name.toLowerCase, value)) } yield compact(render(JObject(child2 sqx.jsonRDD(json, 5e-2).saveAsParquetFile(outpath) And glaring inefficiency is that after parsing and cleaning the data i reserialize it by calling compact(render(JObject(child2 only to pass the text to jsonRDD to be parsed agian. However I see no way to turn an RDD of json4s objects directly into a SchemRDD without turning it back into text first Is there any way to do this? I am also open to other suggestions for speeding up the above code, it is very slow in its current form. I would also like to make jsonFile drop invalid json records rather than failing the entire job. Is that possible? thanks Daniel
Executor Log Rotation Is Not Working?
Hi, I'm using Spark Streaming 1.1, and I have the following logs keep growing: /opt/spark-1.1.0-bin-cdh4/work/app-20141029175309-0005/2/stderr I think it is executor log, so I setup the following options in spark-defaults.conf: spark.executor.logs.rolling.strategy time spark.executor.logs.rolling.time.interval daily spark.executor.logs.rolling.maxRetainedFiles 10 I can see these options on Web UI, so I suppose they are effective. However, the stderr is still not rotated. Am I doing wrong? Thanks. -- Jerry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: is spark a good fit for sequential machine learning algorithms?
Many ML algorithms are sequential because they were not designed to be parallel. However, ML is not driven by algorithms in practice, but by data and applications. As datasets getting bigger and bigger, some algorithms got revised to work in parallel, like SGD and matrix factorization. MLlib tries to implement those scalable algorithms that can handle large-scale datasets. That being said, even with sequential ML algorithms, Spark is helpful. Because in practice we need to test multiple sets of parameters and select the best one. Though the algorithm is sequential, the training part is embarrassingly parallel. We can broadcast the whole dataset, and then train model 1 on node 1, model 2 on node 2, etc. Cross validation also falls into this category. -Xiangrui On Mon, Nov 3, 2014 at 1:55 PM, ll duy.huynh@gmail.com wrote: i'm struggling with implementing a few algorithms with spark. hope to get help from the community. most of the machine learning algorithms today are sequential, while spark is all about parallelism. it seems to me that using spark doesn't actually help much, because in most cases you can't really paralellize a sequential algorithm. there must be some strong reasons why mllib was created and so many people claim spark is ideal for machine learning. what are those reasons? what are some specific examples when how to use spark to implement sequential machine learning algorithms? any commen/feedback/answer is much appreciated. thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/is-spark-a-good-fit-for-sequential-machine-learning-algorithms-tp18000.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: To find distances to reachable source vertices using GraphX
Thank you Ankur for your help and support!!! On Tue, Nov 4, 2014 at 5:24 AM, Ankur Dave ankurd...@gmail.com wrote: The NullPointerException seems to be because edge.dstAttr is null, which might be due to SPARK-3936 https://issues.apache.org/jira/browse/SPARK-3936. Until that's fixed, I edited the Gist with a workaround. Does that fix the problem? Ankur http://www.ankurdave.com/ On Mon, Nov 3, 2014 at 12:23 AM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi All, I'm trying to understand below link example program. When I run this program, I'm getting *java.lang.NullPointerException* at below highlighted line. *https://gist.github.com/ankurdave/4a17596669b36be06100 https://gist.github.com/ankurdave/4a17596669b36be06100* val updatedDists = edge.srcAttr.filter { case (source, dist) = *val existingDist = edge.dstAttr.getOrElse(source, Int.MaxValue) * existingDist dist + 1 }.mapValues(_ + 1).map(identity) Could you please help me to resolve this issue. Regards, Rajesh
netty on classpath when using spark-submit
Hi, I tried hard to get a version of netty into my jar file created with sbt assembly that works with all my libraries. Now I managed that and was really happy, but it seems like spark-submit puts an older version of netty on the classpath when submitting to a cluster, such that my code ends up with an NoSuchMethodError: Code: val a = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, http://localhost;) val f = new File(a.getClass.getProtectionDomain(). getCodeSource().getLocation().getPath()) println(f.getAbsolutePath) println(headers: + a.headers()) When executed with sbt run: ~/.ivy2/cache/io.netty/netty/bundles/netty-3.9.4.Final.jar headers: org.jboss.netty.handler.codec.http.DefaultHttpHeaders@64934069 When executed with spark-submit: ~/spark-1.1.0-bin-hadoop2.4/lib/spark-assembly-1.1.0-hadoop2.4.0.jar Exception in thread main java.lang.NoSuchMethodError: org.jboss.netty.handler.codec.http.DefaultHttpRequest.headers()Lorg/jboss/netty/handler/codec/http/HttpHeaders; ... How can I get the old netty version off my classpath? Thanks Tobias
Re: --executor-cores cannot change vcores in yarn?
If you are using capacity scheduler in yarn: By default yarn capacity scheduler uses DefaultResourceCalculator. DefaultResourceCalculator consider¹s only memory while allocating contains. You can use DominantResourceCalculator, it considers memory and cpu. In capacity-scheduler.xml set yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.res ource.DefaultResourceCalculator On 04/11/14 3:03 am, Gen gen.tan...@gmail.com wrote: Hi, Well, I doesn't find original documentation, but according to http://qnalist.com/questions/2791828/about-the-cpu-cores-and-cpu-usage http://qnalist.com/questions/2791828/about-the-cpu-cores-and-cpu-usage , the vcores is not for physics cpu core but for virtual cores. And I used top command to monitor the cpu utilization during the spark task. The spark can use all cpu even I leave --executor-cores as default(1). Hope that it can be a help. Cheers Gen Gen wrote Hi, Maybe it is a stupid question, but I am running spark on yarn. I request the resources by the following command: {code} ./spark-submit --master yarn-client --num-executors #number of worker --executor-cores #number of cores. ... {code} However, after launching the task, I use / yarn node -status ID / to monitor the situation of cluster. It shows that the number of Vcores used for each container is always 1 no matter what number I pass by --executor-cores. Any ideas how to solve this problem? Thanks a lot in advance for your help. Cheers Gen -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/executor-cores-cannot- change-vcores-in-yarn-tp17883p17992.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark job resource allocation best practices
Yes. i believe Mesos is the right choice for you. http://mesos.apache.org/documentation/latest/mesos-architecture/ Thanks Best Regards On Mon, Nov 3, 2014 at 9:33 PM, Romi Kuntsman r...@totango.com wrote: So, as said there, static partitioning is used in Spark’s standalone and YARN modes, as well as the coarse-grained Mesos mode. That leaves us only with Mesos, where there is *dynamic sharing* of CPU cores. It says when the application is not running tasks on a machine, other applications may run tasks on those cores. But my applications are short lived (seconds to minutes), and they read a large dataset, process it, and write the results. They are also IO-bound, meaning most of the time is spent reading input data (from S3) and writing the results back. Is it possible to divide the resources between them, according to how many are trying to run at the same time? So for example if I have 12 cores - if one job is scheduled, it will get 12 cores, but if 3 are scheduled, then each one will get 4 cores and then will all start. Thanks! *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com On Mon, Nov 3, 2014 at 5:46 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Have a look at scheduling pools https://spark.apache.org/docs/latest/job-scheduling.html. If you want more sophisticated resource allocation, then you are better of to use cluster managers like mesos or yarn Thanks Best Regards On Mon, Nov 3, 2014 at 9:10 PM, Romi Kuntsman r...@totango.com wrote: Hello, I have a Spark 1.1.0 standalone cluster, with several nodes, and several jobs (applications) being scheduled at the same time. By default, each Spark job takes up all available CPUs. This way, when more than one job is scheduled, all but the first are stuck in WAITING. On the other hand, if I tell each job to initially limit itself to a fixed number of CPUs, and that job runs by itself, the cluster is under-utilized and the job runs longer than it could have if it took all the available resources. - How to give the tasks a more fair resource division, which lets many jobs run together, and together lets them use all the available resources? - How do you divide resources between applications on your usecase? P.S. I started reading about Mesos but couldn't figure out if/how it could solve the described issue. Thanks! *Romi Kuntsman*, *Big Data Engineer* http://www.totango.com
Got java.lang.SecurityException: class javax.servlet.FilterRegistration's when running job from intellij Idea
Hi all, I have a spark job that I build with sbt and I can run without any problem with sbt run. But when I run it inside IntelliJ Idea I got the following error : *Exception encountered when invoking run on a nested suite - class javax.servlet.FilterRegistration's signer information does not match signer information of other classes in the same package* *java.lang.SecurityException: class javax.servlet.FilterRegistration's signer information does not match signer information of other classes in the same package* * at java.lang.ClassLoader.checkCerts(ClassLoader.java:952)* * at java.lang.ClassLoader.preDefineClass(ClassLoader.java:666)* * at java.lang.ClassLoader.defineClass(ClassLoader.java:794)* * at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)* * at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)* * at java.net.URLClassLoader.access$100(URLClassLoader.java:71)* * at java.net.URLClassLoader$1.run(URLClassLoader.java:361)* * at java.net.URLClassLoader$1.run(URLClassLoader.java:355)* * at java.security.AccessController.doPrivileged(Native Method)* * at java.net.URLClassLoader.findClass(URLClassLoader.java:354)* * at java.lang.ClassLoader.loadClass(ClassLoader.java:425)* * at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)* * at java.lang.ClassLoader.loadClass(ClassLoader.java:358)* * at org.eclipse.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:136)* * at org.eclipse.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:129)* * at org.eclipse.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:98)* * at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:98)* * at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:89)* * at org.apache.spark.ui.WebUI.attachPage(WebUI.scala:67)* * at org.apache.spark.ui.WebUI$$anonfun$attachTab$1.apply(WebUI.scala:60)* * at org.apache.spark.ui.WebUI$$anonfun$attachTab$1.apply(WebUI.scala:60)* * at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)* * at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)* * at org.apache.spark.ui.WebUI.attachTab(WebUI.scala:60)* * at org.apache.spark.ui.SparkUI.initialize(SparkUI.scala:66)* * at org.apache.spark.ui.SparkUI.init(SparkUI.scala:60)* * at org.apache.spark.ui.SparkUI.init(SparkUI.scala:42)* * at org.apache.spark.SparkContext.init(SparkContext.scala:223)* * at org.apache.spark.SparkContext.init(SparkContext.scala:98)* How can I solve this ? Cheers, Jao
Re: Cannot instantiate hive context
Not quiet sure, but moving the Guava 11 jar to first position in the classpath may solve this issue. Thanks Best Regards On Tue, Nov 4, 2014 at 1:47 AM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Thanks Akhil. I realized that earlier, and i thought mvn -Phive should have captured and included all these dependencies. In any case, i proceeded with that, included other such dependencies that were missing, and finally hit the guava version mismatch issue. (Spark with Guava 14 vs Hadoop/Hive with Guava 11). There are 2 parts: 1. Spark includes Guava library within its jars and that may conflict with Hadoop/Hive components depending on older version of the library. It seems this has been solved with SPARK-2848 https://issues.apache.org/jira/browse/SPARK-2848 patch to shade the Guava libraries. 2. Spark actually uses interfaces from newer version of Guava library, that needs to be rewritten to use older version (i.e. downgrade Spark dependency on Guava). I wasn't able to find the related patches (I need them since i am on Spark 1.0.1). Applying patch for #1 above, i still hit the following error: 14/11/03 15:01:32 WARN storage.BlockManager: Putting block broadcast_0 failed java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode; at org.apache.spark.util.collection.OpenHashSet.org $apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261) at org.apache.spark.util.collection.OpenHashSet$mcI$sp.getPos$mcI$sp(OpenHashSet.scala:165) at org.apache.spark.util.collection.OpenHashSet$mcI$sp.contains$mcI$sp(OpenHashSet.scala:102) stack continues I haven't been able to find the other patches that actually downgrade the dependency. Please point me to those patches, or any other ideas about fixing these dependency issues. Thanks. On Sun, Nov 2, 2014 at 8:41 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Adding the libthrift jar http://mvnrepository.com/artifact/org.apache.thrift/libthrift/0.9.0 in the class path would resolve this issue. Thanks Best Regards On Sat, Nov 1, 2014 at 12:34 AM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi, I am trying to load hive datasets using HiveContext, in spark shell. Spark ver 1.0.1 and Hive ver 0.12. We are trying to get Spark work with hive datasets. I already have existing Spark deployment. Following is what i did on top of that: 1. Build spark using 'mvn -Pyarn,hive -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package' 2. Copy over spark-assembly-1.0.1-hadoop2.4.0.jar into spark deployment directory. 3. Launch spark-shell with the spark hive jar included in the list. When i execute *'* *val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)* i get the following error stack: java.lang.NoClassDefFoundError: org/apache/thrift/TBase at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:792) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: org.apache.thrift.TBase at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 55 more I thought that building with -Phive option should include all the necessary hive packages into the assembly jar (according to here https://spark.apache.org/docs/1.0.1/sql-programming-guide.html#hive-tables). I tried searching online and in this mailing list archive but haven't found any instructions on how to get this working. I know that there is additional step of updating the assembly jar across the whole cluster, not just client side, but right now, even the client is not working. Would appreciate instructions (or link to them) on how to get this working end-to-end. Thanks, pala
Re: Key-Value decomposition
Hi, But i've only one RDD. Hre is a more complete exemple : my rdd is something like (A, 1;2;3), (B, 2;5;6), (C, 3;2;1) And i expect to have the following result : (A,1) , (A,2) , (A,3) , (B,2) , (B,5) , (B,6) , (C,3) , (C,2) , (C,1) Any idea about how can i achieve this ? Thank's -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Key-Value-decomposition-tp17966p18036.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org