Why doesn't the --conf parameter work in yarn-cluster mode (but works in yarn-client and local)?
Hello, According to Spark Documentation at https://spark.apache.org/docs/1.2.1/submitting-applications.html : --conf: Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap “key=value” in quotes (as shown). And indeed, when I use that parameter, in my Spark program I can retrieve the value of the key by using: System.getProperty(key); This works when I test my program locally, and also in yarn-client mode, I can log the value of the key and see that it matches what I wrote in the command line, but it returns *null* when I submit the very same program in *yarn-cluster* mode. Why can't I retrieve the value of key given as --conf key=value when I submit my Spark application in *yarn-cluster* mode? Any ideas and/or workarounds? -- Emre Sevinç http://www.bigindustries.be/
Re: Saving Dstream into a single file
You can use the coalesce method to reduce the number of partitions. You can reduce to one if the data is not too big. Then write the output. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Mar 16, 2015 at 2:42 PM, Zhan Zhang zzh...@hortonworks.com wrote: Each RDD has multiple partitions, each of them will produce one hdfs file when saving output. I don’t think you are allowed to have multiple file handler writing to the same hdfs file. You still can load multiple files into hive tables, right? Thanks.. Zhan Zhang On Mar 15, 2015, at 7:31 AM, tarek_abouzeid tarek.abouzei...@yahoo.com wrote: i am doing word count example on flume stream and trying to save output as text files in HDFS , but in the save directory i got multiple sub directories each having files with small size , i wonder if there is a way to append in a large file instead of saving in multiple files , as i intend to save the output in hive hdfs directory so i can query the result using hive hope anyone have a workaround for this issue , Thanks in advance -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Saving-Dstream-into-a-single-file-tp22058.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[no subject]
Hi, I am querying hbase via Spark SQL with java APIs. Step -1 creating JavaPairRdd, then JavaRdd, then JavaSchemaRdd.applySchema objects. Step -2 sqlContext.sql(sql query). If am updating my hbase database between these two steps(by hbase shell in some other console) the query in step two is not picking the updated data from the table. Its showing the old data. Can somebody tell how to let spark know I have updated my database after spark has created Rdds. Thanks, Udbhav Agarwal
Re: registerTempTable is not a member of RDD on spark 1.2?
In 1.2 it's a member of SchemaRDD and it becomes available on RDD (through the type class mechanism) when you add a SQLContext, like so. val sqlContext = new SQLContext(sc)import sqlContext._ In 1.3, the method has moved to the new DataFrame type. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Mar 23, 2015 at 5:25 AM, IT CTO goi@gmail.com wrote: Hi, I am running spark when I use sc.version I get 1.2 but when I call registerTempTable(MyTable) I get error saying registedTempTable is not a member of RDD Why? -- Eran | CTO
registerTempTable is not a member of RDD on spark 1.2?
Hi, I am running spark when I use sc.version I get 1.2 but when I call registerTempTable(MyTable) I get error saying registedTempTable is not a member of RDD Why? -- Eran | CTO
Spark error NoClassDefFoundError: org/apache/hadoop/mapred/InputSplit
Hi, I am using CDH 5.3.2 packages installation through Cloudera Manager 5.3.2 I am trying to run one spark job with following command PYTHONPATH=~/code/utils/ spark-submit --master yarn --executor-memory 3G --num-executors 30 --driver-memory 2G --executor-cores 2 --name=analytics /home/abc/code/updb/spark/UPDB3analytics.py -date 2015-03-01 but I am getting following error 15/03/23 11:06:49 WARN TaskSetManager: Lost task 9.0 in stage 0.0 (TID 7, hdp003.dev.xyz.com): java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/InputSplit at java.lang.Class.getDeclaredConstructors0(Native Method) at java.lang.Class.privateGetDeclaredConstructors(Class.java:2532) at java.lang.Class.getDeclaredConstructors(Class.java:1901) at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1749) at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:72) at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:250) at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:248) at java.security.AccessController.doPrivileged(Native Method) at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:247) at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:613) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.mapred.InputSplit at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 25 more here is the full trace https://gist.github.com/anonymous/3492f0ec63d7a23c47cf
Re: JAVA_HOME problem with upgrade to 1.3.0
From: Williams, Ken Williams ken.willi...@windlogics.commailto:ken.willi...@windlogics.com Date: Thursday, March 19, 2015 at 10:59 AM To: Spark list user@spark.apache.orgmailto:user@spark.apache.org Subject: JAVA_HOME problem with upgrade to 1.3.0 […] Finally, I go and check the YARN app master’s web interface (since the job is shown, I know it at least made it that far), and the only logs it shows are these: Log Type: stderr Log Length: 61 /bin/bash: {{JAVA_HOME}}/bin/java: No such file or directory Log Type: stdout Log Length: 0 I’m still interested in a solution to this issue if anyone has comments. I also posted to SO if that’s more convenient: http://stackoverflow.com/questions/29170280/java-home-error-with-upgrade-to-spark-1-3-0 Thanks, -Ken CONFIDENTIALITY NOTICE: This e-mail message is for the sole use of the intended recipient(s) and may contain confidential and privileged information. Any unauthorized review, use, disclosure or distribution of any kind is strictly prohibited. If you are not the intended recipient, please contact the sender via reply e-mail and destroy all copies of the original message. Thank you.
Re: join two DataFrames, same column name
You can include * and a column alias in the same select clause var df1 = sqlContext.sql(select *, column_id AS table1_id from table1) FYI, this does not ultimately work as the * still includes column_id and you cannot have two columns of that name in the joined DataFrame. So I ended up aliasing both sides of the join. On Sun, Mar 22, 2015 at 1:25 PM, Michael Armbrust mich...@databricks.com wrote: You can include * and a column alias in the same select clause var df1 = sqlContext.sql(select *, column_id AS table1_id from table1) I'm also hoping to resolve SPARK-6376 https://issues.apache.org/jira/browse/SPARK-6376 before Spark 1.3.1 which will let you do something like: var df1 = sqlContext.sql(select * from table1).as(t1) var df2 = sqlContext.sql(select * from table2).as(t2) df1.join(df2, df1(column_id) === df2(column_id)).select(t1.column_id) Finally, there is SPARK-6380 https://issues.apache.org/jira/browse/SPARK-6380 that hopes to simplify this particular case. Michael On Sat, Mar 21, 2015 at 3:02 PM, Eric Friedman eric.d.fried...@gmail.com wrote: I have a couple of data frames that I pulled from SparkSQL and the primary key of one is a foreign key of the same name in the other. I'd rather not have to specify each column in the SELECT statement just so that I can rename this single column. When I try to join the data frames, I get an exception because it finds the two columns of the same name to be ambiguous. Is there a way to specify which side of the join comes from data frame A and which comes from B? var df1 = sqlContext.sql(select * from table1) var df2 = sqlContext.sql(select * from table2) df1.join(df2, df1(column_id) === df2(column_id))
Re: Spark 1.2. loses often all executors
Hi, I have received three replies to my question on my personal e-mail, why don't they also show up on the mailing list? I would like to reply to the 3 users through a thread. Thanks, Maria -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-loses-often-all-executors-tp22162p22187.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: EC2 cluster created by spark using old HDFS 1.0
That's a hadoop version incompatibility issue, you need to make sure everything runs on the same version. Thanks Best Regards On Sat, Mar 21, 2015 at 1:24 AM, morfious902002 anubha...@gmail.com wrote: Hi, I created a cluster using spark-ec2 script. But it installs HDFS version 1.0. I would like to use this cluster to connect to HIVE installed on a cloudera CDH 5.3 cluster. But I am getting the following error:- org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client vers ion 4 at org.apache.hadoop.ipc.Client.call(Client.java:1070) at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225) at com.sun.proxy.$Proxy10.getProtocolVersion(Unknown Source) at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396) at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379) at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:238) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:203) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:8 9) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176) at org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat. java:40) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1511) at org.apache.spark.rdd.RDD.collect(RDD.scala:813) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:83) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:815) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:23) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:28) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:30) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:32) at $iwC$$iwC$$iwC$$iwC.init(console:34) at $iwC$$iwC$$iwC.init(console:36) at $iwC$$iwC.init(console:38) at $iwC.init(console:40) at init(console:42) at
Spark UI tunneling
Is it a way to tunnel Spark UI? I tried to tunnel client-node:4040 but my browser was redirected from localhost to some cluster locally visible domain name.. Maybe there is some startup option to encourage Spark UI be fully accessiable just through single endpoint (address:port)? Serg. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-UI-tunneling-tp22184.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Sql with python udf fail
I caught exceptions in the python UDF code, flush exceptions into a single file, and made sure the the column number of the output lines as same as sql schema. Sth. interesting is that my output line of the UDF code is just 10 columns, and the exception above is java.lang.ArrayIndexOutOfBoundsException: 9, is there any inspirations? 2015-03-23 16:24 GMT+08:00 Cheng Lian lian.cs@gmail.com: Could you elaborate on the UDF code? On 3/23/15 3:43 PM, lonely Feb wrote: Hi all, I tried to transfer some hive jobs into spark-sql. When i ran a sql job with python udf i got a exception: java.lang.ArrayIndexOutOfBoundsException: 9 at org.apache.spark.sql.catalyst.expressions.GenericRow.apply( Row.scala:142) at org.apache.spark.sql.catalyst.expressions.BoundReference. eval(BoundAttribute.scala:37) at org.apache.spark.sql.catalyst.expressions.EqualTo.eval( predicates.scala:166) at org.apache.spark.sql.catalyst.expressions. InterpretedPredicate$$anonfun$apply$1.apply(predicates.scala:30) at org.apache.spark.sql.catalyst.expressions. InterpretedPredicate$$anonfun$apply$1.apply(predicates.scala:30) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.Aggregate$$anonfun$ execute$1$$anonfun$7.apply(Aggregate.scala:156) at org.apache.spark.sql.execution.Aggregate$$anonfun$ execute$1$$anonfun$7.apply(Aggregate.scala:151) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601) at org.apache.spark.rdd.MapPartitionsRDD.compute( MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD. scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute( MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD. scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask( ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask( ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run( Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) I suspected there was an odd line in the input file. But the input file is so large and i could not found any abnormal lines with several jobs to check. How can i get the abnormal line here ?
RDD storage in spark steaming
HI, i have a simple question about creating RDD . Whenever RDD is created in spark streaming for the particular time window .When does the RDD gets stored . 1. Does it get stored at the Driver machine ? or it gets stored on all the machines in the cluster ? 2. Does the data gets stored in memory by default ? Can it store at the memory and disk ? How can it configured ? Thanks, Abhi
Re: RDD storage in spark steaming
Hey Abhi, many of StreamingContext's methods to create input streams take a StorageLevel parameter to configure this behavior. RDD partitions are generally stored in the in-memory cache of worker nodes I think. You can also configure replication and spilling to disk if needed. Regards, Jeff 2015-03-23 15:26 GMT+01:00 abhi abhishek...@gmail.com: HI, i have a simple question about creating RDD . Whenever RDD is created in spark streaming for the particular time window .When does the RDD gets stored . 1. Does it get stored at the Driver machine ? or it gets stored on all the machines in the cluster ? 2. Does the data gets stored in memory by default ? Can it store at the memory and disk ? How can it configured ? Thanks, Abhi
Re: join two DataFrames, same column name
Michael, thank you for the workaround and for letting me know of the upcoming enhancements, both of which sound appealing. On Sun, Mar 22, 2015 at 1:25 PM, Michael Armbrust mich...@databricks.com wrote: You can include * and a column alias in the same select clause var df1 = sqlContext.sql(select *, column_id AS table1_id from table1) I'm also hoping to resolve SPARK-6376 https://issues.apache.org/jira/browse/SPARK-6376 before Spark 1.3.1 which will let you do something like: var df1 = sqlContext.sql(select * from table1).as(t1) var df2 = sqlContext.sql(select * from table2).as(t2) df1.join(df2, df1(column_id) === df2(column_id)).select(t1.column_id) Finally, there is SPARK-6380 https://issues.apache.org/jira/browse/SPARK-6380 that hopes to simplify this particular case. Michael On Sat, Mar 21, 2015 at 3:02 PM, Eric Friedman eric.d.fried...@gmail.com wrote: I have a couple of data frames that I pulled from SparkSQL and the primary key of one is a foreign key of the same name in the other. I'd rather not have to specify each column in the SELECT statement just so that I can rename this single column. When I try to join the data frames, I get an exception because it finds the two columns of the same name to be ambiguous. Is there a way to specify which side of the join comes from data frame A and which comes from B? var df1 = sqlContext.sql(select * from table1) var df2 = sqlContext.sql(select * from table2) df1.join(df2, df1(column_id) === df2(column_id))
Re: registerTempTable is not a member of RDD on spark 1.2?
Thanks. I am new to the environment and running cloudera CDH5.3 with spark in it. apparently when running in spark-shell this command val sqlContext = new SQLContext(sc) I am failing with the not found type SQLContext Any idea why? On Mon, Mar 23, 2015 at 3:05 PM, Dean Wampler deanwamp...@gmail.com wrote: In 1.2 it's a member of SchemaRDD and it becomes available on RDD (through the type class mechanism) when you add a SQLContext, like so. val sqlContext = new SQLContext(sc)import sqlContext._ In 1.3, the method has moved to the new DataFrame type. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Mar 23, 2015 at 5:25 AM, IT CTO goi@gmail.com wrote: Hi, I am running spark when I use sc.version I get 1.2 but when I call registerTempTable(MyTable) I get error saying registedTempTable is not a member of RDD Why? -- Eran | CTO -- Eran | CTO
Re: registerTempTable is not a member of RDD on spark 1.2?
Have you tried adding the following ? import org.apache.spark.sql.SQLContext Cheers On Mon, Mar 23, 2015 at 6:45 AM, IT CTO goi@gmail.com wrote: Thanks. I am new to the environment and running cloudera CDH5.3 with spark in it. apparently when running in spark-shell this command val sqlContext = new SQLContext(sc) I am failing with the not found type SQLContext Any idea why? On Mon, Mar 23, 2015 at 3:05 PM, Dean Wampler deanwamp...@gmail.com wrote: In 1.2 it's a member of SchemaRDD and it becomes available on RDD (through the type class mechanism) when you add a SQLContext, like so. val sqlContext = new SQLContext(sc)import sqlContext._ In 1.3, the method has moved to the new DataFrame type. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Mar 23, 2015 at 5:25 AM, IT CTO goi@gmail.com wrote: Hi, I am running spark when I use sc.version I get 1.2 but when I call registerTempTable(MyTable) I get error saying registedTempTable is not a member of RDD Why? -- Eran | CTO -- Eran | CTO
Spark RDD mapped to Hbase to be updateable
Hi, We have a JavaRDD mapped to a hbase table and when we query on the Hbase table using Spark-sql API we can access the data. However when we update Hbase table while the SparkSQL SparkConf is intialised we cannot see updated data. Is there any way we can have the RDD mapped to Hbase updated as and when Hbase table reflects any change?? Thanks, Siddharth Ubale, Synchronized Communications #43, Velankani Tech Park, Block No. II, 3rd Floor, Electronic City Phase I, Bangalore – 560 100 Tel : +91 80 3202 4060 Web: www.syncoms.comhttp://www.syncoms.com/ [LogoNEWmohLARGE] London|Bangalore|Orlando we innovate, plan, execute, and transform the business
RE: Spark SQL udf(ScalaUdf) is very slow
This is a very interesting issue, the root reason for the lower performance probably is, in Scala UDF, Spark SQL converts the data type from internal representation to Scala representation via Scala reflection recursively. Can you create a Jira issue for tracking this? I can start to work on the improvement soon. From: zzcclp [mailto:441586...@qq.com] Sent: Monday, March 23, 2015 5:10 PM To: user@spark.apache.org Subject: Spark SQL udf(ScalaUdf) is very slow My test env: 1. Spark version is 1.3.0 2. 3 node per 80G/20C 3. read 250G parquet files from hdfs Test case: 1. register floor func with command: sqlContext.udf.register(floor, (ts: Int) = ts - ts % 300), then run with sql select chan, floor(ts) as tt, sum(size) from qlogbase3 group by chan, floor(ts), it takes 17 minutes. == Physical Plan == Aggregate false, [chan#23015,PartialGroup#23500], [chan#23015,PartialGroup#23500 AS tt#23494,CombineSum(PartialSum#23499L) AS c2#23495L] Exchange (HashPartitioning [chan#23015,PartialGroup#23500], 54) Aggregate true, [chan#23015,scalaUDF(ts#23016)], [chan#23015,scalaUDF(ts#23016) AS PartialGroup#23500,SUM(size#23023L) AS PartialSum#23499L] PhysicalRDD [chan#23015,ts#23016,size#23023L], MapPartitionsRDD[115] at map at newParquet.scala:562 2. run with sql select chan, (ts - ts % 300) as tt, sum(size) from qlogbase3 group by chan, (ts - ts % 300), it takes only 5 minutes. == Physical Plan == Aggregate false, [chan#23015,PartialGroup#23349], [chan#23015,PartialGroup#23349 AS tt#23343,CombineSum(PartialSum#23348L) AS c2#23344L] Exchange (HashPartitioning [chan#23015,PartialGroup#23349], 54) Aggregate true, [chan#23015,(ts#23016 - (ts#23016 % 300))], [chan#23015,(ts#23016 - (ts#23016 % 300)) AS PartialGroup#23349,SUM(size#23023L) AS PartialSum#23348L] PhysicalRDD [chan#23015,ts#23016,size#23023L], MapPartitionsRDD[83] at map at newParquet.scala:562 3. use HiveContext with sql select chan, floor((ts - ts % 300)) as tt, sum(size) from qlogbase3 group by chan, floor((ts - ts % 300)) it takes only 5 minutes too. == Physical Plan == Aggregate false, [chan#23015,PartialGroup#23108L], [chan#23015,PartialGroup#23108L AS tt#23102L,CombineSum(PartialSum#23107L) AS _c2#23103L] Exchange (HashPartitioning [chan#23015,PartialGroup#23108L], 54) Aggregate true, [chan#23015,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor((ts#23016 - (ts#23016 % 300)))], [chan#23015,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor((ts#23016 - (ts#23016 % 300))) AS PartialGroup#23108L,SUM(size#23023L) AS PartialSum#23107L] PhysicalRDD [chan#23015,ts#23016,size#23023L], MapPartitionsRDD[28] at map at newParquet.scala:562 Why? ScalaUdf is so slow?? How to improve it? View this message in context: Spark SQL udf(ScalaUdf) is very slowhttp://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-udf-ScalaUdf-is-very-slow-tp22185.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Spark 1.2. loses often all executors
In this thread: http://search-hadoop.com/m/JW1q5DM69G I only saw two replies. Maybe some people forgot to use 'Reply to All' ? Cheers On Mon, Mar 23, 2015 at 8:19 AM, mrm ma...@skimlinks.com wrote: Hi, I have received three replies to my question on my personal e-mail, why don't they also show up on the mailing list? I would like to reply to the 3 users through a thread. Thanks, Maria -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-loses-often-all-executors-tp22162p22187.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Is yarn-standalone mode deprecated?
Is yarn-standalone mode deprecated in Spark now. The reason I am asking is because while I can find it in 0.9.0 documentation(https://spark.apache.org/docs/0.9.0/running-on-yarn.html). I am not able to find it in 1.2.0. I am using this mode to run the Spark jobs from Oozie as a java action. Removing this mode will prevent me from doing that. Are there any other ways of running a Spark job from Oozie other than Shell action? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-yarn-standalone-mode-deprecated-tp22188.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to check that a dataset is sorted after it has been written out?
Data is not (necessarily) sorted when read from disk, no. A file might have many blocks even, and while a block yields a partition in general, the order in which those partitions appear in the RDD is not defined. This is why you'd sort if you need the data sorted. I think you could conceivably make some custom RDD or InputFormat that reads blocks in a well-defined order and, assuming the data is sorted in some knowable way on disk, then must have them sorted. I think that's even been brought up. Deciding whether the data is sorted is quite different. You'd have to decide what ordering you expect (is part 0 before part 1? should it be sorted in a part file?) and then just verify that externally. On Fri, Mar 20, 2015 at 10:41 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! I sorted a dataset in Spark and then wrote it out in avro/parquet. Then I wanted to check that it was sorted. It looks like each partition has been sorted, but when reading in, the first partition (i.e., as seen in the partition index of mapPartitionsWithIndex) is not the same as implied by the names of the parquet files (even when the number of partitions is the same in the rdd which was read as on disk). If I take() a few hundred values, they are sorted, but they are *not* the same as if I explicitly open part-r-0.parquet and take values from that. It seems that when opening the rdd, the partitions of the rdd are not in the same order as implied by the data on disk (i.e., part-r-0.parquet, part-r-1.parquet, etc). So, how might one read the data so that one maintains the sort order? And while on the subject, after the terasort, how did they check that the data was actually sorted correctly? (or did they :-) ? ). Is there any way to read the data back in so as to preserve the sort, or do I need to zipWithIndex before writing it out, and write the index at that time? (I haven't tried the latter yet). Thanks! -Mike - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark sql thrift server slower than hive
A basis change needed by spark is setting the executor memory which defaults to 512MB by default. On Mon, Mar 23, 2015 at 10:16 AM, Denny Lee denny.g@gmail.com wrote: How are you running your spark instance out of curiosity? Via YARN or standalone mode? When connecting Spark thriftserver to the Spark service, have you allocated enough memory and CPU when executing with spark? On Sun, Mar 22, 2015 at 3:39 AM fanooos dev.fano...@gmail.com wrote: We have cloudera CDH 5.3 installed on one machine. We are trying to use spark sql thrift server to execute some analysis queries against hive table. Without any changes in the configurations, we run the following query on both hive and spark sql thrift server *select * from tableName;* The time taken by spark is larger than the time taken by hive which is not supposed to be the like that. The hive table is mapped to json files stored on HDFS directory and we are using *org.openx.data.jsonserde.JsonSerDe* for serialization/deserialization. Why spark takes much more time to execute the query than hive ? -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/Spark-sql-thrift-server-slower-than- hive-tp22177.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: How to use DataFrame with MySQL
OK,I found what the problem is: It couldn't work with mysql-connector-5.0.8. I updated the connector version to 5.1.34 and it worked. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-DataFrame-with-MySQL-tp22178p22182.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is it possible to use json4s 3.2.11 with Spark 1.3.0?
Thanks Ted, I'll try, hope there's no transitive dependencies on 3.2.10. On Tue, Mar 24, 2015 at 4:21 AM, Ted Yu yuzhih...@gmail.com wrote: Looking at core/pom.xml : dependency groupIdorg.json4s/groupId artifactIdjson4s-jackson_${scala.binary.version}/artifactId version3.2.10/version /dependency The version is hard coded. You can rebuild Spark 1.3.0 with json4s 3.2.11 Cheers On Mon, Mar 23, 2015 at 2:12 PM, Alexey Zinoviev alexey.zinov...@gmail.com wrote: Spark has a dependency on json4s 3.2.10, but this version has several bugs and I need to use 3.2.11. I added json4s-native 3.2.11 dependency to build.sbt and everything compiled fine. But when I spark-submit my JAR it provides me with 3.2.10. build.sbt import sbt.Keys._ name := sparkapp version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.json4s %% json4s-native % 3.2.11` plugins.sbt logLevel := Level.Warn resolvers += Resolver.url(artifactory, url( http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases ))(Resolver.ivyStylePatterns) addSbtPlugin(com.eed3si9n % sbt-assembly % 0.13.0) App1.scala import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.{Logging, SparkConf, SparkContext} import org.apache.spark.SparkContext._ object App1 extends Logging { def main(args: Array[String]) = { val conf = new SparkConf().setAppName(App1) val sc = new SparkContext(conf) println(sjson4s version: ${org.json4s.BuildInfo.version.toString}) } } sbt 0.13.7, sbt-assembly 0.13.0, Scala 2.10.4 Is it possible to force 3.2.11 version usage? Thanks, Alexey
RE: Use pig load function in spark
Hi, Yin But our data is customized sequence file which can be read by our customized load in pig And I want to use spark to reuse these load function to read data and transfer them to the RDD. Best Regards, Kevin. From: Yin Huai [mailto:yh...@databricks.com] Sent: 2015年3月24日 11:53 To: Dai, Kevin Cc: Paul Brown; user@spark.apache.org Subject: Re: Use pig load function in spark Hello Kevin, You can take a look at our generic load functionhttps://spark.apache.org/docs/1.3.0/sql-programming-guide.html#generic-loadsave-functions. For example, you can use val df = sqlContext.load(/myData, parquet) To load a parquet dataset stored in /myData as a DataFramehttps://spark.apache.org/docs/1.3.0/sql-programming-guide.html#dataframes. You can use it to load data stored in various formats, like json (Spark built-in), parquet (Spark built-in), avrohttps://github.com/databricks/spark-avro, and csvhttps://github.com/databricks/spark-csv. Thanks, Yin On Mon, Mar 23, 2015 at 7:14 PM, Dai, Kevin yun...@ebay.commailto:yun...@ebay.com wrote: Hi, Paul You are right. The story is that we have a lot of pig load function to load our different data. And now we want to use spark to read and process these data. So we want to figure out a way to reuse our existing load function in spark to read these data. Any idea? Best Regards, Kevin. From: Paul Brown [mailto:p...@mult.ifario.usmailto:p...@mult.ifario.us] Sent: 2015年3月24日 4:11 To: Dai, Kevin Subject: Re: Use pig load function in spark The answer is Maybe, but you probably don't want to do that.. A typical Pig load function is devoted to bridging external data into Pig's type system, but you don't really need to do that in Spark because it is (thankfully) not encumbered by Pig's type system. What you probably want to do is to figure out a way to use native Spark facilities (e.g., textFile) coupled with some of the logic out of your Pig load function necessary to turn your external data into an RDD. — p...@mult.ifario.usmailto:p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/ On Mon, Mar 23, 2015 at 2:29 AM, Dai, Kevin yun...@ebay.commailto:yun...@ebay.com wrote: Hi, all Can spark use pig’s load function to load data? Best Regards, Kevin.
Re: Is it possible to use json4s 3.2.11 with Spark 1.3.0?
Thanks Marcelo, this options solved the problem (I'm using 1.3.0), but it works only if I remove extends Logging from the object, with extends Logging it return: Exception in thread main java.lang.LinkageError: loader constraint violation in interface itable initialization: when resolving method App1$.logInfo(Lscala/Function0;Ljava/lang/Throwable;)V the class loader (instance of org/apache/spark/util/ChildFirstURLClassLoader) of the current class, App1$, and the class loader (instance of sun/misc/Launcher$AppClassLoader) for interface org/apache/spark/Logging have different Class objects for the type scala/Function0 used in the signature at App1.main(App1.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Do you have any idea what's wrong with Logging? PS: I'm running it with spark-1.3.0/bin/spark-submit --class App1 --conf spark.driver.userClassPathFirst=true --conf spark.executor.userClassPathFirst=true $HOME/projects/sparkapp/target/scala-2.10/sparkapp-assembly-1.0.jar Thanks, Alexey On Tue, Mar 24, 2015 at 5:03 AM, Marcelo Vanzin van...@cloudera.com wrote: You could build a far jar for your application containing both your code and the json4s library, and then run Spark with these two options: spark.driver.userClassPathFirst=true spark.executor.userClassPathFirst=true Both only work in 1.3. (1.2 has spark.files.userClassPathFirst, but that only works for executors.) On Mon, Mar 23, 2015 at 2:12 PM, Alexey Zinoviev alexey.zinov...@gmail.com wrote: Spark has a dependency on json4s 3.2.10, but this version has several bugs and I need to use 3.2.11. I added json4s-native 3.2.11 dependency to build.sbt and everything compiled fine. But when I spark-submit my JAR it provides me with 3.2.10. build.sbt import sbt.Keys._ name := sparkapp version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.json4s %% json4s-native % 3.2.11` plugins.sbt logLevel := Level.Warn resolvers += Resolver.url(artifactory, url(http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases ))(Resolver.ivyStylePatterns) addSbtPlugin(com.eed3si9n % sbt-assembly % 0.13.0) App1.scala import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.{Logging, SparkConf, SparkContext} import org.apache.spark.SparkContext._ object App1 extends Logging { def main(args: Array[String]) = { val conf = new SparkConf().setAppName(App1) val sc = new SparkContext(conf) println(sjson4s version: ${org.json4s.BuildInfo.version.toString}) } } sbt 0.13.7, sbt-assembly 0.13.0, Scala 2.10.4 Is it possible to force 3.2.11 version usage? Thanks, Alexey -- Marcelo
Re: Is yarn-standalone mode deprecated?
The mode is not deprecated, but the name yarn-standalone is now deprecated. It's now referred to as yarn-cluster. -Sandy On Mon, Mar 23, 2015 at 11:49 AM, nitinkak001 nitinkak...@gmail.com wrote: Is yarn-standalone mode deprecated in Spark now. The reason I am asking is because while I can find it in 0.9.0 documentation(https://spark.apache.org/docs/0.9.0/running-on-yarn.html). I am not able to find it in 1.2.0. I am using this mode to run the Spark jobs from Oozie as a java action. Removing this mode will prevent me from doing that. Are there any other ways of running a Spark job from Oozie other than Shell action? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-yarn-standalone-mode-deprecated-tp22188.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DataFrame operation on parquet: GC overhead limit exceeded
Hi Yin, Yes, I have set spark.executor.memory to 8g and the worker memory to 16g without any success. I cannot figure out how to increase the number of mapPartitions tasks. Thanks a lot On 20 March 2015 at 18:44, Yin Huai yh...@databricks.com wrote: spark.sql.shuffle.partitions only control the number of tasks in the second stage (the number of reducers). For your case, I'd say that the number of tasks in the first state (number of mappers) will be the number of files you have. Actually, have you changed spark.executor.memory (it controls the memory for an executor of your application)? I did not see it in your original email. The difference between worker memory and executor memory can be found at (http://spark.apache.org/docs/1.3.0/spark-standalone.html ), SPARK_WORKER_MEMORY Total amount of memory to allow Spark applications to use on the machine, e.g. 1000m, 2g (default: total memory minus 1 GB); note that each application's individual memory is configured using its spark.executor.memory property. On Fri, Mar 20, 2015 at 9:25 AM, Yiannis Gkoufas johngou...@gmail.com wrote: Actually I realized that the correct way is: sqlContext.sql(set spark.sql.shuffle.partitions=1000) but I am still experiencing the same behavior/error. On 20 March 2015 at 16:04, Yiannis Gkoufas johngou...@gmail.com wrote: Hi Yin, the way I set the configuration is: val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext.setConf(spark.sql.shuffle.partitions,1000); it is the correct way right? In the mapPartitions task (the first task which is launched), I get again the same number of tasks and again the same error. :( Thanks a lot! On 19 March 2015 at 17:40, Yiannis Gkoufas johngou...@gmail.com wrote: Hi Yin, thanks a lot for that! Will give it a shot and let you know. On 19 March 2015 at 16:30, Yin Huai yh...@databricks.com wrote: Was the OOM thrown during the execution of first stage (map) or the second stage (reduce)? If it was the second stage, can you increase the value of spark.sql.shuffle.partitions and see if the OOM disappears? This setting controls the number of reduces Spark SQL will use and the default is 200. Maybe there are too many distinct values and the memory pressure on every task (of those 200 reducers) is pretty high. You can start with 400 and increase it until the OOM disappears. Hopefully this will help. Thanks, Yin On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas johngou...@gmail.com wrote: Hi Yin, Thanks for your feedback. I have 1700 parquet files, sized 100MB each. The number of tasks launched is equal to the number of parquet files. Do you have any idea on how to deal with this situation? Thanks a lot On 18 Mar 2015 17:35, Yin Huai yh...@databricks.com wrote: Seems there are too many distinct groups processed in a task, which trigger the problem. How many files do your dataset have and how large is a file? Seems your query will be executed with two stages, table scan and map-side aggregation in the first stage and the final round of reduce-side aggregation in the second stage. Can you take a look at the numbers of tasks launched in these two stages? Thanks, Yin On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas johngou...@gmail.com wrote: Hi there, I set the executor memory to 8g but it didn't help On 18 March 2015 at 13:59, Cheng Lian lian.cs@gmail.com wrote: You should probably increase executor memory by setting spark.executor.memory. Full list of available configurations can be found here http://spark.apache.org/docs/latest/configuration.html Cheng On 3/18/15 9:15 PM, Yiannis Gkoufas wrote: Hi there, I was trying the new DataFrame API with some basic operations on a parquet dataset. I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a standalone cluster mode. The code is the following: val people = sqlContext.parquetFile(/data.parquet); val res = people.groupBy(name,date). agg(sum(power),sum(supply)).take(10); System.out.println(res); The dataset consists of 16 billion entries. The error I get is java.lang.OutOfMemoryError: GC overhead limit exceeded My configuration is: spark.serializer org.apache.spark.serializer.KryoSerializer spark.driver.memory6g spark.executor.extraJavaOptions -XX:+UseCompressedOops spark.shuffle.managersort Any idea how can I workaround this? Thanks a lot
Re: Spark error NoClassDefFoundError: org/apache/hadoop/mapred/InputSplit
InputSplit is in hadoop-mapreduce-client-core jar Please check that the jar is in your classpath. Cheers On Mon, Mar 23, 2015 at 8:10 AM, , Roy rp...@njit.edu wrote: Hi, I am using CDH 5.3.2 packages installation through Cloudera Manager 5.3.2 I am trying to run one spark job with following command PYTHONPATH=~/code/utils/ spark-submit --master yarn --executor-memory 3G --num-executors 30 --driver-memory 2G --executor-cores 2 --name=analytics /home/abc/code/updb/spark/UPDB3analytics.py -date 2015-03-01 but I am getting following error 15/03/23 11:06:49 WARN TaskSetManager: Lost task 9.0 in stage 0.0 (TID 7, hdp003.dev.xyz.com): java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/InputSplit at java.lang.Class.getDeclaredConstructors0(Native Method) at java.lang.Class.privateGetDeclaredConstructors(Class.java:2532) at java.lang.Class.getDeclaredConstructors(Class.java:1901) at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1749) at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:72) at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:250) at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:248) at java.security.AccessController.doPrivileged(Native Method) at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:247) at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:613) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.mapred.InputSplit at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 25 more here is the full trace https://gist.github.com/anonymous/3492f0ec63d7a23c47cf
Re: PySpark, ResultIterable and taking a list and saving it into different parquet files
In case anyone wants to learn about my solution for this: groupByKey is highly inefficient due to the swapping of elements between the different partitions as well as requiring enough mem in each worker to handle the elements for each group. So instead of using groupByKey, I ended up taking the flatMap result, and using subtractByKey in such a way that I ended up with multiple rdds only including the key I wanted; Now I can iterate over each rdd independently and end up with multiple parquets. Thinking of submitting a splitByKeys() pull request, that would take an array of keys and an rdd, and return an array of rdds each with only one of the keys. Any thoughts around this? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-ResultIterable-and-taking-a-list-and-saving-it-into-different-parquet-files-tp22152p22189.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is yarn-standalone mode deprecated?
The former is deprecated. However, the latter is functionally equivalent to it. Both launch an app in what is now called yarn-cluster mode. Oozie now also has a native Spark action, though I'm not familiar on the specifics. -Sandy On Mon, Mar 23, 2015 at 1:01 PM, Nitin kak nitinkak...@gmail.com wrote: To be more clear, I am talking about SPARK_JAR=SPARK_ASSEMBLY_JAR_FILE ./bin/spark-class org.apache.spark.deploy.yarn.Client \ --jar YOUR_APP_JAR_FILE \ --class APP_MAIN_CLASS \ --args APP_MAIN_ARGUMENTS \ --num-workers NUMBER_OF_WORKER_MACHINES \ --master-class ApplicationMaster_CLASS --master-memory MEMORY_FOR_MASTER \ --worker-memory MEMORY_PER_WORKER \ --worker-cores CORES_PER_WORKER \ --name application_name \ --queue queue_name \ --addJars any_local_files_used_in_SparkContext.addJar \ --files files_for_distributed_cache \ --archives archives_for_distributed_cache which I thought was the yarn-standalone mode vs spark-submit ./bin/spark-submit --class org.apache.spark.examples.SparkPi \ --master yarn-cluster \ --num-executors 3 \ --driver-memory 4g \ --executor-memory 2g \ --executor-cores 1 \ --queue thequeue \ lib/spark-examples*.jar I didnt see example of ./bin/spark-class in 1.2.0 documentation, so am wondering if that is deprecated. On Mon, Mar 23, 2015 at 12:11 PM, Sandy Ryza sandy.r...@cloudera.com wrote: The mode is not deprecated, but the name yarn-standalone is now deprecated. It's now referred to as yarn-cluster. -Sandy On Mon, Mar 23, 2015 at 11:49 AM, nitinkak001 nitinkak...@gmail.com wrote: Is yarn-standalone mode deprecated in Spark now. The reason I am asking is because while I can find it in 0.9.0 documentation(https://spark.apache.org/docs/0.9.0/running-on-yarn.html). I am not able to find it in 1.2.0. I am using this mode to run the Spark jobs from Oozie as a java action. Removing this mode will prevent me from doing that. Are there any other ways of running a Spark job from Oozie other than Shell action? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-yarn-standalone-mode-deprecated-tp22188.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Getting around Serializability issues for types not in my control
Hey all, I'd like to use the Scalaz library in some of my Spark jobs, but am running into issues where some stuff I use from Scalaz is not serializable. For instance, in Scalaz there is a trait /** In Scalaz */ trait Applicative[F[_]] { def apply2[A, B, C](fa: F[A], fb: F[B])(f: (A, B) = C): F[C] def point[A](a: = A): F[A] } But when I try to use it in say, in an `RDD#aggregate` call I get: Caused by: java.io.NotSerializableException: scalaz.std.OptionInstances$$anon$1 Serialization stack: - object not serializable (class: scalaz.std.OptionInstances$$anon$1, value: scalaz.std.OptionInstances$$anon$1@4516ee8c) - field (class: dielectric.syntax.RDDOps$$anonfun$1, name: G$1, type: interface scalaz.Applicative) - object (class dielectric.syntax.RDDOps$$anonfun$1, function2) - field (class: dielectric.syntax.RDDOps$$anonfun$traverse$extension$1, name: apConcat$1, type: interface scala.Function2) - object (class dielectric.syntax.RDDOps$$anonfun$traverse$extension$1, function2) Outside of submitting a PR to Scalaz to make things Serializable, what can I do to make things Serializable? I considered something like implicit def applicativeSerializable[F[_]](implicit F: Applicative[F]): SomeSerializableType[F] = new SomeSerializableType { ... } ?? Not sure how to go about doing it - I looked at java.io.Externalizable but given `scalaz.Applicative` has no value members I'm not sure how to implement the interface. Any guidance would be much appreciated - thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-around-Serializability-issues-for-types-not-in-my-control-tp22193.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: JDBC DF using DB2
bq. is to modify compute_classpath.sh on all worker nodes to include your driver JARs. Please follow the above advice. Cheers On Mon, Mar 23, 2015 at 12:34 PM, Jack Arenas j...@ckarenas.com wrote: Hi Team, I’m trying to create a DF using jdbc as detailed here https://spark.apache.org/docs/1.3.0/sql-programming-guide.html#jdbc-to-other-databases – I’m currently using DB2 v9.7.0.6 and I’ve tried to use the db2jcc.jar and db2jcc_license_cu.jar combo, and while it works in --master local using the command below, I get some strange behavior in --master yarn-client. Here is the command: *val df = sql.load(jdbc, Map(url - jdbc:db2://host:port/db:currentSchema=schema;user=user;password=password;, driver - com.ibm.db2.jcc.DB2Driver, dbtable - table))* It seems to also be working on yarn-client because once executed I get the following log: *df: org.apache.spark.sql.DataFrame = [DATE_FIELD: date, INT_FIELD: int, DOUBLE_FIELD: double]* Which indicates me that Spark was able to connect to the DB. But once I run *df.count() *or *df.take(5).foreach(println)* in order to operate on the data and get a result, I get back a ‘*No suitable driver found*’ exception, which makes me think the driver wasn’t shipped with the spark job. I’ve tried using *--driver-class-path, --jars, SPARK_CLASSPATH* to add the jars to the spark job. I also have the jars in my*$CLASSPATH* and *$HADOOP_CLASSPATH*. I also saw this in the trouble shooting section, but quite frankly I’m not sure what primordial class loader it’s talking about: The JDBC driver class must be visible to the primordial class loader on the client session and on all executors. This is because Java’s DriverManager class does a security check that results in it ignoring all drivers not visible to the primordial class loader when one goes to open a connection. One convenient way to do this is to modify compute_classpath.sh on all worker nodes to include your driver JARs. Any advice is welcome! Thanks, Jack
Re: Spark per app logging
Yes each application can use its own log4j.properties but I am not sure how to configure log4j so that the driver and executor write to file. This is because if we set the spark.executor.extraJavaOptions it will read from a file and that is not what I need. How do I configure log4j from the app so that the driver and the executors use these configs? Thanks, Udit On Sat, Mar 21, 2015 at 3:13 AM, Jeffrey Jedele jeffrey.jed...@gmail.com wrote: Hi, I'm not completely sure about this either, but this is what we are doing currently: Configure your logging to write to STDOUT, not to a file explicitely. Spark will capture stdour and stderr and separate the messages into a app/driver folder structure in the configured worker directory. We then use logstash to collect the logs and index them to a elasticsearch cluster (Spark seems to produce a lot of logging data). With some simple regex processing, you also get the application id as searchable field. Regards, Jeff 2015-03-20 22:37 GMT+01:00 Ted Yu yuzhih...@gmail.com: Are these jobs the same jobs, just run by different users or, different jobs ? If the latter, can each application use its own log4j.properties ? Cheers On Fri, Mar 20, 2015 at 1:43 PM, Udit Mehta ume...@groupon.com wrote: Hi, We have spark setup such that there are various users running multiple jobs at the same time. Currently all the logs go to 1 file specified in the log4j.properties. Is it possible to configure log4j in spark for per app/user logging instead of sending all logs to 1 file mentioned in the log4j.properties? Thanks Udit
Re: Spark streaming alerting
I think I didn't explain myself properly :) What I meant to say was that generally spark worker runs on either on HDFS's data nodes or on Cassandra nodes, which typically is in a private network (protected). When a condition is matched it's difficult to send out the alerts directly from the worker nodes because of the security concerns. I was wondering if there is a way to listen on the events as they occur on the sliding window scale or is the best way to accomplish is to post back to a queue? On Mon, Mar 23, 2015 at 2:22 AM, Khanderao Kand Gmail khanderao.k...@gmail.com wrote: Akhil You are right in tour answer to what Mohit wrote. However what Mohit seems to be alluring but did not write properly might be different. Mohit You are wrong in saying generally streaming works in HDFS and cassandra . Streaming typically works with streaming or queing source like Kafka, kinesis, Twitter, flume, zeroMQ, etc (but can also from HDFS and S3 ) However , streaming context ( receiver wishing the streaming context ) gets events/messages/records and forms a time window based batch (RDD)- So there is a maximum gap of window time from alert message was available to spark and when the processing happens. I think you meant about this. As per spark programming model, RDD is the right way to deal with data. If you are fine with the minimum delay of say a sec (based on min time window that dstreaming can support) then what Rohit gave is a right model. Khanderao On Mar 22, 2015, at 11:39 PM, Akhil Das ak...@sigmoidanalytics.com wrote: What do you mean you can't send it directly from spark workers? Here's a simple approach which you could do: val data = ssc.textFileStream(sigmoid/) val dist = data.filter(_.contains(ERROR)).foreachRDD(rdd = alert(Errors : + rdd.count())) And the alert() function could be anything triggering an email or sending an SMS alert. Thanks Best Regards On Sun, Mar 22, 2015 at 1:52 AM, Mohit Anchlia mohitanch...@gmail.com wrote: Is there a module in spark streaming that lets you listen to the alerts/conditions as they happen in the streaming module? Generally spark streaming components will execute on large set of clusters like hdfs or Cassandra, however when it comes to alerting you generally can't send it directly from the spark workers, which means you need a way to listen to the alerts.
Spark-thriftserver Issue
Hi, I am having issue starting spark-thriftserver. I'm running spark 1.3.with Hadoop 2.4.0. I would like to be able to change its port too so, I can hive hive-thriftserver as well as spark-thriftserver running at the same time. Starting sparkthrift server:- sudo ./start-thriftserver.sh --master spark://ip-172-31-10-124:7077 --executor-memory 2G Error:- I created the folder manually but still getting the following error Exception in thread main java.lang.IllegalArgumentException: Log directory /tmp/spark-events does not exist. I am getting the following error 15/03/23 15:07:02 ERROR thrift.ThriftCLIService: Error: org.apache.thrift.transport.TTransportException: Could not create ServerSocket on address0.0.0.0/0.0.0.0:1. at org.apache.thrift.transport.TServerSocket.init(TServerSocket.java:93) at org.apache.thrift.transport.TServerSocket.init(TServerSocket.java:79) at org.apache.hive.service.auth.HiveAuthFactory.getServerSocket(HiveAuthFactory.java:236) at org.apache.hive.service.cli.thrift.ThriftBinaryCLIService.run(ThriftBinaryCLIService.java:69) at java.lang.Thread.run(Thread.java:745) Thanks Neil
Re: newbie quesiton - spark with mesos
That's a very old page, try this instead: http://spark.apache.org/docs/latest/running-on-mesos.html When you run your Spark job on Mesos, tasks will be started on the slave nodes as needed, since fine-grained mode is the default. For a job like your example, very few tasks will be needed. Actually only one would be enough, but the default number of partitions will be used. I believe 8 is the default for Mesos. For local mode (local[*]), it's the number of cores. You can also set the propoerty spark.default.parallelism. HTH, Dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Mar 23, 2015 at 11:46 AM, Anirudha Jadhav aniru...@nyu.edu wrote: i have a mesos cluster, which i deploy spark to by using instructions on http://spark.apache.org/docs/0.7.2/running-on-mesos.html after that the spark shell starts up fine. then i try the following on the shell: val data = 1 to 1 val distData = sc.parallelize(data) distData.filter(_ 10).collect() open spark web ui at host:4040 and see an active job. NOW, how do i start workers or spark workers on mesos ? who completes my job? thanks, -- Ani
Re: Getting around Serializability issues for types not in my control
Have you tried instantiating the instance inside the closure, rather than outside of it? If that works, you may need to switch to use mapPartition / foreachPartition for efficiency reasons. On Mon, Mar 23, 2015 at 3:03 PM, Adelbert Chang adelbe...@gmail.com wrote: Is there no way to pull out the bits of the instance I want before I sent it through the closure for aggregate? I did try pulling things out, along the lines of def foo[G[_], B](blah: Blah)(implicit G: Applicative[G]) = { val lift: B = G[RDD[B]] = b = G.point(sparkContext.parallelize(List(b))) rdd.aggregate(/* use lift in here */) } But that doesn't seem to work either, still seems to be trying to serialize the Applicative... :( On Mon, Mar 23, 2015 at 12:27 PM, Dean Wampler deanwamp...@gmail.com wrote: Well, it's complaining about trait OptionInstances which is defined in Option.scala in the std package. Use scalap or javap on the scalaz library to find out which member of the trait is the problem, but since it says $$anon$1, I suspect it's the first value member, implicit val optionInstance, which has a long list of mixin traits, one of which is probably at fault. OptionInstances is huge, so there might be other offenders. Scalaz wasn't designed for distributed systems like this, so you'll probably find many examples of nonserializability. An alternative is to avoid using Scalaz in any closures passed to Spark methods, but that's probably not what you want. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Mar 23, 2015 at 12:03 PM, adelbertc adelbe...@gmail.com wrote: Hey all, I'd like to use the Scalaz library in some of my Spark jobs, but am running into issues where some stuff I use from Scalaz is not serializable. For instance, in Scalaz there is a trait /** In Scalaz */ trait Applicative[F[_]] { def apply2[A, B, C](fa: F[A], fb: F[B])(f: (A, B) = C): F[C] def point[A](a: = A): F[A] } But when I try to use it in say, in an `RDD#aggregate` call I get: Caused by: java.io.NotSerializableException: scalaz.std.OptionInstances$$anon$1 Serialization stack: - object not serializable (class: scalaz.std.OptionInstances$$anon$1, value: scalaz.std.OptionInstances$$anon$1@4516ee8c) - field (class: dielectric.syntax.RDDOps$$anonfun$1, name: G$1, type: interface scalaz.Applicative) - object (class dielectric.syntax.RDDOps$$anonfun$1, function2) - field (class: dielectric.syntax.RDDOps$$anonfun$traverse$extension$1, name: apConcat$1, type: interface scala.Function2) - object (class dielectric.syntax.RDDOps$$anonfun$traverse$extension$1, function2) Outside of submitting a PR to Scalaz to make things Serializable, what can I do to make things Serializable? I considered something like implicit def applicativeSerializable[F[_]](implicit F: Applicative[F]): SomeSerializableType[F] = new SomeSerializableType { ... } ?? Not sure how to go about doing it - I looked at java.io.Externalizable but given `scalaz.Applicative` has no value members I'm not sure how to implement the interface. Any guidance would be much appreciated - thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-around-Serializability-issues-for-types-not-in-my-control-tp22193.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Adelbert (Allen) Chang
Re: Strange behavior with PySpark when using Join() and zip()
I think this is a bad example since testData is not deterministic at all. I thought we had fixed this or similar examples in the past? As in https://github.com/apache/spark/pull/1250/files Hm, anyone see a reason that shouldn't be changed too? On Mon, Mar 23, 2015 at 7:00 PM, Ofer Mendelevitch omendelevi...@hortonworks.com wrote: Thanks Sean, Sorting definitely solves it, but I was hoping it could be avoided :) In the documentation for Classification in ML-Lib for example, zip() is used to create labelsAndPredictions: from pyspark.mllib.tree import RandomForest from pyspark.mllib.util import MLUtils # Load and parse the data file into an RDD of LabeledPoint. data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt') # Split the data into training and test sets (30% held out for testing) (trainingData, testData) = data.randomSplit([0.7, 0.3]) # Train a RandomForest model. # Empty categoricalFeaturesInfo indicates all features are continuous. # Note: Use larger numTrees in practice. # Setting featureSubsetStrategy=auto lets the algorithm choose. model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={}, numTrees=3, featureSubsetStrategy=auto, impurity='gini', maxDepth=4, maxBins=32) # Evaluate model on test instances and compute test error predictions = model.predict(testData.map(lambda x: x.features)) labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions) testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count()) print('Test Error = ' + str(testErr)) print('Learned classification forest model:') print(model.toDebugString()) The reason the zip() works here is because the RDD is loaded from a file. If it was generated with something that includes a JOIN() it won’t work due to this same issue. Maybe worth mentioning in the docs then? Ofer On Mar 23, 2015, at 11:40 AM, Sean Owen so...@cloudera.com wrote: I think the explanation is that the join does not guarantee any order, since it causes a shuffle in general, and it is computed twice in the first example, resulting in a difference for d1 and d2. You can persist() the result of the join and in practice I believe you'd find it behaves as expected, although that is even not 100% guaranteed since a block could be lost and recomputed (differently). If order matters, and it does for zip(), then the reliable way to guarantee a well defined ordering for zipping is to sort the RDDs. On Mon, Mar 23, 2015 at 6:27 PM, Ofer Mendelevitch omendelevi...@hortonworks.com wrote: Hi, I am running into a strange issue when doing a JOIN of two RDDs followed by ZIP from PySpark. It’s part of a more complex application, but was able to narrow it down to a simplified example that’s easy to replicate and causes the same problem to appear: raw = sc.parallelize([('k'+str(x),'v'+str(x)) for x in range(100)]) data = raw.join(raw).mapValues(lambda x: [x[0]]+[x[1]]).map(lambda pair: ','.join([x for x in pair[1]])) d1 = data.map(lambda s: s.split(',')[0]) d2 = data.map(lambda s: s.split(',')[1]) x = d1.zip(d2) print x.take(10) The output is: [('v44', 'v80'), ('v79', 'v44'), ('v80', 'v79'), ('v45', 'v78'), ('v81', 'v81'), ('v78', 'v45'), ('v99', 'v99'), ('v82', 'v82'), ('v46', 'v46'), ('v83', 'v83')] As you can see, the ordering of items is not preserved anymore in all cases. (e.g., ‘v81’ is preserved, and ‘v45’ is not) Is it not supposed to be preserved? If I do the same thing without the JOIN: data = sc.parallelize('v'+str(x)+',v'+str(x) for x in range(100)) d1 = data.map(lambda s: s.split(',')[0]) d2 = data.map(lambda s: s.split(',')[1]) x = d1.zip(d2) print x.take(10) The output is: [('v0', 'v0'), ('v1', 'v1'), ('v2', 'v2'), ('v3', 'v3'), ('v4', 'v4'), ('v5', 'v5'), ('v6', 'v6'), ('v7', 'v7'), ('v8', 'v8'), ('v9', 'v9')] As expected. Anyone run into this or a similar issue? Ofer - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Strange behavior with PySpark when using Join() and zip()
Thanks Sean, Sorting definitely solves it, but I was hoping it could be avoided :) In the documentation for Classification in ML-Lib for example, zip() is used to create labelsAndPredictions: - from pyspark.mllib.tree import RandomForest from pyspark.mllib.util import MLUtils # Load and parse the data file into an RDD of LabeledPoint. data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt') # Split the data into training and test sets (30% held out for testing) (trainingData, testData) = data.randomSplit([0.7, 0.3]) # Train a RandomForest model. # Empty categoricalFeaturesInfo indicates all features are continuous. # Note: Use larger numTrees in practice. # Setting featureSubsetStrategy=auto lets the algorithm choose. model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={}, numTrees=3, featureSubsetStrategy=auto”, impurity='gini', maxDepth=4, maxBins=32) # Evaluate model on test instances and compute test error predictions = model.predict(testData.map(lambda x: x.features)) labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions) testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count()) print('Test Error = ' + str(testErr)) print('Learned classification forest model:') print(model.toDebugString()) The reason the zip() works here is because the RDD is loaded from a file. If it was generated with something that includes a JOIN() it won’t work due to this same issue. Maybe worth mentioning in the docs then? Ofer On Mar 23, 2015, at 11:40 AM, Sean Owen so...@cloudera.com wrote: I think the explanation is that the join does not guarantee any order, since it causes a shuffle in general, and it is computed twice in the first example, resulting in a difference for d1 and d2. You can persist() the result of the join and in practice I believe you'd find it behaves as expected, although that is even not 100% guaranteed since a block could be lost and recomputed (differently). If order matters, and it does for zip(), then the reliable way to guarantee a well defined ordering for zipping is to sort the RDDs. On Mon, Mar 23, 2015 at 6:27 PM, Ofer Mendelevitch omendelevi...@hortonworks.com wrote: Hi, I am running into a strange issue when doing a JOIN of two RDDs followed by ZIP from PySpark. It’s part of a more complex application, but was able to narrow it down to a simplified example that’s easy to replicate and causes the same problem to appear: raw = sc.parallelize([('k'+str(x),'v'+str(x)) for x in range(100)]) data = raw.join(raw).mapValues(lambda x: [x[0]]+[x[1]]).map(lambda pair: ','.join([x for x in pair[1]])) d1 = data.map(lambda s: s.split(',')[0]) d2 = data.map(lambda s: s.split(',')[1]) x = d1.zip(d2) print x.take(10) The output is: [('v44', 'v80'), ('v79', 'v44'), ('v80', 'v79'), ('v45', 'v78'), ('v81', 'v81'), ('v78', 'v45'), ('v99', 'v99'), ('v82', 'v82'), ('v46', 'v46'), ('v83', 'v83')] As you can see, the ordering of items is not preserved anymore in all cases. (e.g., ‘v81’ is preserved, and ‘v45’ is not) Is it not supposed to be preserved? If I do the same thing without the JOIN: data = sc.parallelize('v'+str(x)+',v'+str(x) for x in range(100)) d1 = data.map(lambda s: s.split(',')[0]) d2 = data.map(lambda s: s.split(',')[1]) x = d1.zip(d2) print x.take(10) The output is: [('v0', 'v0'), ('v1', 'v1'), ('v2', 'v2'), ('v3', 'v3'), ('v4', 'v4'), ('v5', 'v5'), ('v6', 'v6'), ('v7', 'v7'), ('v8', 'v8'), ('v9', 'v9')] As expected. Anyone run into this or a similar issue? Ofer
SparkEnv
is it safe to access SparkEnv.get inside say mapPartitions? i need to get a Serializer (so SparkEnv.get.serializer) thanks
Re: Getting around Serializability issues for types not in my control
Well, it's complaining about trait OptionInstances which is defined in Option.scala in the std package. Use scalap or javap on the scalaz library to find out which member of the trait is the problem, but since it says $$anon$1, I suspect it's the first value member, implicit val optionInstance, which has a long list of mixin traits, one of which is probably at fault. OptionInstances is huge, so there might be other offenders. Scalaz wasn't designed for distributed systems like this, so you'll probably find many examples of nonserializability. An alternative is to avoid using Scalaz in any closures passed to Spark methods, but that's probably not what you want. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Mar 23, 2015 at 12:03 PM, adelbertc adelbe...@gmail.com wrote: Hey all, I'd like to use the Scalaz library in some of my Spark jobs, but am running into issues where some stuff I use from Scalaz is not serializable. For instance, in Scalaz there is a trait /** In Scalaz */ trait Applicative[F[_]] { def apply2[A, B, C](fa: F[A], fb: F[B])(f: (A, B) = C): F[C] def point[A](a: = A): F[A] } But when I try to use it in say, in an `RDD#aggregate` call I get: Caused by: java.io.NotSerializableException: scalaz.std.OptionInstances$$anon$1 Serialization stack: - object not serializable (class: scalaz.std.OptionInstances$$anon$1, value: scalaz.std.OptionInstances$$anon$1@4516ee8c) - field (class: dielectric.syntax.RDDOps$$anonfun$1, name: G$1, type: interface scalaz.Applicative) - object (class dielectric.syntax.RDDOps$$anonfun$1, function2) - field (class: dielectric.syntax.RDDOps$$anonfun$traverse$extension$1, name: apConcat$1, type: interface scala.Function2) - object (class dielectric.syntax.RDDOps$$anonfun$traverse$extension$1, function2) Outside of submitting a PR to Scalaz to make things Serializable, what can I do to make things Serializable? I considered something like implicit def applicativeSerializable[F[_]](implicit F: Applicative[F]): SomeSerializableType[F] = new SomeSerializableType { ... } ?? Not sure how to go about doing it - I looked at java.io.Externalizable but given `scalaz.Applicative` has no value members I'm not sure how to implement the interface. Any guidance would be much appreciated - thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-around-Serializability-issues-for-types-not-in-my-control-tp22193.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
JDBC DF using DB2
Hi Team, I’m trying to create a DF using jdbc as detailed here – I’m currently using DB2 v9.7.0.6 and I’ve tried to use the db2jcc.jar and db2jcc_license_cu.jar combo, and while it works in --master local using the command below, I get some strange behavior in --master yarn-client. Here is the command: val df = sql.load(jdbc, Map(url - jdbc:db2://host:port/db:currentSchema=schema;user=user;password=password;, driver - com.ibm.db2.jcc.DB2Driver, dbtable - table)) It seems to also be working on yarn-client because once executed I get the following log: df: org.apache.spark.sql.DataFrame = [DATE_FIELD: date, INT_FIELD: int, DOUBLE_FIELD: double] Which indicates me that Spark was able to connect to the DB. But once I run df.count() or df.take(5).foreach(println) in order to operate on the data and get a result, I get back a ‘No suitable driver found’ exception, which makes me think the driver wasn’t shipped with the spark job. I’ve tried using --driver-class-path, --jars, SPARK_CLASSPATH to add the jars to the spark job. I also have the jars in my$CLASSPATH and $HADOOP_CLASSPATH. I also saw this in the trouble shooting section, but quite frankly I’m not sure what primordial class loader it’s talking about: The JDBC driver class must be visible to the primordial class loader on the client session and on all executors. This is because Java’s DriverManager class does a security check that results in it ignoring all drivers not visible to the primordial class loader when one goes to open a connection. One convenient way to do this is to modify compute_classpath.sh on all worker nodes to include your driver JARs. Any advice is welcome! Thanks, Jack
Re: Use pig load function in spark
You may be able to utilize Spork (Pig on Apache Spark) as a mechanism to do this: https://github.com/sigmoidanalytics/spork On Mon, Mar 23, 2015 at 2:29 AM Dai, Kevin yun...@ebay.com wrote: Hi, all Can spark use pig’s load function to load data? Best Regards, Kevin.
Re: Getting around Serializability issues for types not in my control
Is there no way to pull out the bits of the instance I want before I sent it through the closure for aggregate? I did try pulling things out, along the lines of def foo[G[_], B](blah: Blah)(implicit G: Applicative[G]) = { val lift: B = G[RDD[B]] = b = G.point(sparkContext.parallelize(List(b))) rdd.aggregate(/* use lift in here */) } But that doesn't seem to work either, still seems to be trying to serialize the Applicative... :( On Mon, Mar 23, 2015 at 12:27 PM, Dean Wampler deanwamp...@gmail.com wrote: Well, it's complaining about trait OptionInstances which is defined in Option.scala in the std package. Use scalap or javap on the scalaz library to find out which member of the trait is the problem, but since it says $$anon$1, I suspect it's the first value member, implicit val optionInstance, which has a long list of mixin traits, one of which is probably at fault. OptionInstances is huge, so there might be other offenders. Scalaz wasn't designed for distributed systems like this, so you'll probably find many examples of nonserializability. An alternative is to avoid using Scalaz in any closures passed to Spark methods, but that's probably not what you want. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Mar 23, 2015 at 12:03 PM, adelbertc adelbe...@gmail.com wrote: Hey all, I'd like to use the Scalaz library in some of my Spark jobs, but am running into issues where some stuff I use from Scalaz is not serializable. For instance, in Scalaz there is a trait /** In Scalaz */ trait Applicative[F[_]] { def apply2[A, B, C](fa: F[A], fb: F[B])(f: (A, B) = C): F[C] def point[A](a: = A): F[A] } But when I try to use it in say, in an `RDD#aggregate` call I get: Caused by: java.io.NotSerializableException: scalaz.std.OptionInstances$$anon$1 Serialization stack: - object not serializable (class: scalaz.std.OptionInstances$$anon$1, value: scalaz.std.OptionInstances$$anon$1@4516ee8c) - field (class: dielectric.syntax.RDDOps$$anonfun$1, name: G$1, type: interface scalaz.Applicative) - object (class dielectric.syntax.RDDOps$$anonfun$1, function2) - field (class: dielectric.syntax.RDDOps$$anonfun$traverse$extension$1, name: apConcat$1, type: interface scala.Function2) - object (class dielectric.syntax.RDDOps$$anonfun$traverse$extension$1, function2) Outside of submitting a PR to Scalaz to make things Serializable, what can I do to make things Serializable? I considered something like implicit def applicativeSerializable[F[_]](implicit F: Applicative[F]): SomeSerializableType[F] = new SomeSerializableType { ... } ?? Not sure how to go about doing it - I looked at java.io.Externalizable but given `scalaz.Applicative` has no value members I'm not sure how to implement the interface. Any guidance would be much appreciated - thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-around-Serializability-issues-for-types-not-in-my-control-tp22193.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Adelbert (Allen) Chang
objectFile uses only java serializer?
in the comments on SparkContext.objectFile it says: It will also be pretty slow if you use the default serializer (Java serialization) this suggests the spark.serializer is used, which means i can switch to the much faster kryo serializer. however when i look at the code it uses Utils.deserialize, which is always using Java serialization. did i get that right? and is this desired? it seems straightforward to switch objectFile to use the serializer as specified by spark.serializer (although it might being in new classloader issues).
Re: Converting SparkSQL query to Scala query
There isn't any automated way. Note that as the DataFrame implementation improves, it will probably do a better job with query optimization than hand-rolled Scala code. I don't know if that's true yet, though. For now, there are a few examples at the beginning of the DataFrame scaladocs http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame page showing typical scenarios. Also, the DataFrameSuite of tests in the Spark source code has many examples. If you scan through the list of methods, you'll see that many of them have the same name as the corresponding SQL keyword. HTH, Dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Mar 23, 2015 at 11:42 AM, nishitd nishitde...@yahoo.com wrote: I have a complex SparkSQL query of the nature select a.a, b.b, c.c from a,b,c where a.x = b.x and b.y = c.y How do I convert this efficiently into scala query of a.join(b,..,..) and so on. Can anyone help me with this? If my question needs more clarification, please let me know. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Converting-SparkSQL-query-to-Scala-query-tp22192.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SchemaRDD/DataFrame result partitioned according to the underlying datasource partitions
There is not an interface to this at this time, and in general I'm hesitant to open up interfaces where the user could make a mistake where they think something is going to improve performance but will actually impact correctness. Since, as you say, we are picking the partitioner automatically in the query planner its much harder to know if you are actually going to preserve the expected partitioning. Additionally, its also a little more complicated when reading in data as even if you have files that are partitioned correctly, the InputFormat is free to split those files, violating our assumptions about partitioning. On Mon, Mar 23, 2015 at 10:22 AM, Stephen Boesch java...@gmail.com wrote: Is there a way to take advantage of the underlying datasource partitions when generating a DataFrame/SchemaRDD via catalyst? It seems from the sql module that the only options are RangePartitioner and HashPartitioner - and further that those are selected automatically by the code . It was not apparent that either the underlying partitioning were translated to the partitions presented in the rdd or that a custom partitioner were possible to be provided. The motivation would be to subsequently use df.map (with preservesPartitioning=true) and/or df.mapPartitions (likewise) to perform operations that work within the original datasource partitions - thus avoiding a shuffle.
Re: How to check that a dataset is sorted after it has been written out?
One approach would be to repartition the whole data into 1 (costly operation though, but will give you a single file). Also, You could try using zipWithIndex before writing it out. Thanks Best Regards On Sat, Mar 21, 2015 at 4:11 AM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! I sorted a dataset in Spark and then wrote it out in avro/parquet. Then I wanted to check that it was sorted. It looks like each partition has been sorted, but when reading in, the first partition (i.e., as seen in the partition index of mapPartitionsWithIndex) is not the same as implied by the names of the parquet files (even when the number of partitions is the same in the rdd which was read as on disk). If I take() a few hundred values, they are sorted, but they are *not* the same as if I explicitly open part-r-0.parquet and take values from that. It seems that when opening the rdd, the partitions of the rdd are not in the same order as implied by the data on disk (i.e., part-r-0.parquet, part-r-1.parquet, etc). So, how might one read the data so that one maintains the sort order? And while on the subject, after the terasort, how did they check that the data was actually sorted correctly? (or did they :-) ? ). Is there any way to read the data back in so as to preserve the sort, or do I need to zipWithIndex before writing it out, and write the index at that time? (I haven't tried the latter yet). Thanks! -Mike
Spark Sql with python udf fail
Hi all, I tried to transfer some hive jobs into spark-sql. When i ran a sql job with python udf i got a exception: java.lang.ArrayIndexOutOfBoundsException: 9 at org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142) at org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37) at org.apache.spark.sql.catalyst.expressions.EqualTo.eval(predicates.scala:166) at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$apply$1.apply(predicates.scala:30) at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$apply$1.apply(predicates.scala:30) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:156) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) I suspected there was an odd line in the input file. But the input file is so large and i could not found any abnormal lines with several jobs to check. How can i get the abnormal line here ?
Re: Spark UI tunneling
Akhil, that's what I did. The problem is that probably web server tried to forward my request to another address accessible locally only. 23 марта 2015 г., в 11:12, Akhil Das ak...@sigmoidanalytics.com написал(а): Did you try ssh -L 4040:127.0.0.1:4040 user@host Thanks Best Regards On Mon, Mar 23, 2015 at 1:12 PM, sergunok ser...@gmail.com wrote: Is it a way to tunnel Spark UI? I tried to tunnel client-node:4040 but my browser was redirected from localhost to some cluster locally visible domain name.. Maybe there is some startup option to encourage Spark UI be fully accessiable just through single endpoint (address:port)? Serg. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-UI-tunneling-tp22184.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Data/File structure Validation
Dear Taotao, Yes, I tried sparkCSV. Thanks, Nawwar On Mon, Mar 23, 2015 at 12:20 PM, Taotao.Li taotao...@datayes.com wrote: can it load successfully if the format is invalid? -- *发件人: *Ahmed Nawar ahmed.na...@gmail.com *收件人: *user@spark.apache.org *发送时间: *星期一, 2015年 3 月 23日 下午 4:48:54 *主题: *Data/File structure Validation Dears, Is there any way to validate the CSV, Json ... Files while loading to DataFrame. I need to ignore corrupted rows.(Rows with not matching with the schema). Thanks, Ahmed Nawwar -- *---* *Thanks Best regards* 李涛涛 Taotao · Li | Fixed Income@Datayes | Software Engineer 地址:上海市浦东新区陆家嘴西路99号万向大厦8楼, 200120 Address :Wanxiang Towen 8F, Lujiazui West Rd. No.99, Pudong New District, Shanghai, 200120 电话|Phone:021-60216502 手机|Mobile: +86-18202171279
Re: Data/File structure Validation
Dear Raunak, Source system provided logs with some errors. I need to make sure each row is in correct format (number of columns/ attributes and data types is correct) and move incorrect Rows to separated List. Of course i can do my logic but i need to make sure there is no direct way. Thanks, Nawwar On Mon, Mar 23, 2015 at 1:14 PM, Raunak Jhawar raunak.jha...@gmail.com wrote: CSV is a structured format and JSON is not (semi structured). It is obvious for different JSON documents to have differing schema? What are you trying to do here? -- Thanks, Raunak Jhawar m: 09820890034 On Mon, Mar 23, 2015 at 2:18 PM, Ahmed Nawar ahmed.na...@gmail.com wrote: Dears, Is there any way to validate the CSV, Json ... Files while loading to DataFrame. I need to ignore corrupted rows.(Rows with not matching with the schema). Thanks, Ahmed Nawwar
Re: Buffering for Socket streams
You can try playing with spark.streaming.blockInterval so that it wont consume a lot of data, default value is 200ms Thanks Best Regards On Fri, Mar 20, 2015 at 8:49 PM, jamborta jambo...@gmail.com wrote: Hi all, We are designing a workflow where we try to stream local files to a Socket streamer, that would clean and process the files and write them to hdfs. We have an issue with bigger files when the streamer cannot keep up with the data, and runs out of memory. What would be the best way to implement an approach where the Socket stream receiver would notify the stream not to send more data (stop reading from disk too?), just before it might run out of memory? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Buffering-for-Socket-streams-tp22164.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Cassandra time series + Spark
Hi, I'm working on a system which has to deal with time series data. I've been happy using Cassandra for time series and Spark looks promising as a computational platform. I consider chunking time series in Cassandra necessary, e.g. by 3 weeks as kairosdb does it. This allows an 8 byte chunk start timestamp with 4 byte offsets for the individual measurements. And it keeps the data below 2x10^9 even at 1000 Hz. This schema works quite okay when dealing with one time series at a time. Because the data is partitioned by time series id and chunk of time (e.g. the three weeks mentioned above), it requires a little client side logic to retrieve the partitions and glue them together, but this is quite okay. However, when working with many / all of the time series in a table at once, e.g. in Spark, the story changes dramatically. Say I'd want to compute something simple as a moving average, I have to deal with data all over the place. I can't currently think of anything but performing aggregateByKey causing a shuffle every time. Anyone have experience with combining time series chunking and computation on all / many time series at once? Any advice? Cheers, Frens Jan
log files of failed task
Hi, I executed a task on Spark in YARN and it failed. I see just executor lost message from YARNClientScheduler, no further details.. (I read ths error can be connected to spark.yarn.executor.memoryOverhead setting and already played with this param) How to go more deeply in details in log files and find exact reason? How can log of failed task be examined? Unfortunately I haven't access to UI of Spark just can use command line. Thanks! Serg. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/log-files-of-failed-task-tp22183.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Data/File structure Validation
can it load successfully if the format is invalid? - 原始邮件 - 发件人: Ahmed Nawar ahmed.na...@gmail.com 收件人: user@spark.apache.org 发送时间: 星期一, 2015年 3 月 23日 下午 4:48:54 主题: Data/File structure Validation Dears, Is there any way to validate the CSV, Json ... Files while loading to DataFrame. I need to ignore corrupted rows.(Rows with not matching with the schema). Thanks, Ahmed Nawwar -- --- Thanks Best regards 李涛涛 Taotao · Li | Fixed Income@Datayes | Software Engineer 地址:上海市浦东新区陆家嘴西路 99 号万向大厦8 楼, 200120 Address :Wanxiang Towen 8 F, Lujiazui West Rd. No.99, Pudong New District, Shanghai, 200120 电话 |Phone : 021-60216502 手机 |Mobile: +86-18202171279
Re: Spark Sql with python udf fail
Could you elaborate on the UDF code? On 3/23/15 3:43 PM, lonely Feb wrote: Hi all, I tried to transfer some hive jobs into spark-sql. When i ran a sql job with python udf i got a exception: java.lang.ArrayIndexOutOfBoundsException: 9 at org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142) at org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37) at org.apache.spark.sql.catalyst.expressions.EqualTo.eval(predicates.scala:166) at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$apply$1.apply(predicates.scala:30) at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$apply$1.apply(predicates.scala:30) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:156) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) I suspected there was an odd line in the input file. But the input file is so large and i could not found any abnormal lines with several jobs to check. How can i get the abnormal line here ? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark streaming alerting
What exactly do you mean by alerts? Something specific to your data or general events of the spark cluster? For the first, sth like Akhil suggested should work. For the latter, I would suggest having a log consolidation system like logstash in place and use this to generate alerts. Regards, Jeff 2015-03-23 7:39 GMT+01:00 Akhil Das ak...@sigmoidanalytics.com: What do you mean you can't send it directly from spark workers? Here's a simple approach which you could do: val data = ssc.textFileStream(sigmoid/) val dist = data.filter(_.contains(ERROR)).foreachRDD(rdd = alert(Errors : + rdd.count())) And the alert() function could be anything triggering an email or sending an SMS alert. Thanks Best Regards On Sun, Mar 22, 2015 at 1:52 AM, Mohit Anchlia mohitanch...@gmail.com wrote: Is there a module in spark streaming that lets you listen to the alerts/conditions as they happen in the streaming module? Generally spark streaming components will execute on large set of clusters like hdfs or Cassandra, however when it comes to alerting you generally can't send it directly from the spark workers, which means you need a way to listen to the alerts.
Data/File structure Validation
Dears, Is there any way to validate the CSV, Json ... Files while loading to DataFrame. I need to ignore corrupted rows.(Rows with not matching with the schema). Thanks, Ahmed Nawwar
Re: Spark streaming alerting
What do you mean you can't send it directly from spark workers? Here's a simple approach which you could do: val data = ssc.textFileStream(sigmoid/) val dist = data.filter(_.contains(ERROR)).foreachRDD(rdd = alert(Errors : + rdd.count())) And the alert() function could be anything triggering an email or sending an SMS alert. Thanks Best Regards On Sun, Mar 22, 2015 at 1:52 AM, Mohit Anchlia mohitanch...@gmail.com wrote: Is there a module in spark streaming that lets you listen to the alerts/conditions as they happen in the streaming module? Generally spark streaming components will execute on large set of clusters like hdfs or Cassandra, however when it comes to alerting you generally can't send it directly from the spark workers, which means you need a way to listen to the alerts.
Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class
Was a solution ever found for this. Trying to run some test cases with sbt test which use spark sql and in Spark 1.3.0 release with Scala 2.11.6 I get this error. Setting fork := true in sbt seems to work but its a less than idea work around. On Tue, Mar 17, 2015 at 9:37 PM, Eric Charles e...@apache.org wrote: Launching from eclipse (scala-ide) as a scala process gives such error, but as a java process (a java main class) works fine. Launching as a scala process from Intellij works fine. There is something wrong at eclipse side, not in Spark. On 03/13/2015 11:47 AM, Jianshi Huang wrote: Liancheng also found out that the Spark jars are not included in the classpath of URLClassLoader. Hmm... we're very close to the truth now. Jianshi On Fri, Mar 13, 2015 at 6:03 PM, Jianshi Huang jianshi.hu...@gmail.com mailto:jianshi.hu...@gmail.com wrote: I'm almost certain the problem is the ClassLoader. So adding fork := true solves problems for test and run. The problem is how can I fork a JVM for sbt console? fork in console := true seems not working... Jianshi On Fri, Mar 13, 2015 at 4:35 PM, Jianshi Huang jianshi.hu...@gmail.com mailto:jianshi.hu...@gmail.com wrote: I guess it's a ClassLoader issue. But I have no idea how to debug it. Any hints? Jianshi On Fri, Mar 13, 2015 at 3:00 PM, Eric Charles e...@apache.org mailto:e...@apache.org wrote: i have the same issue running spark sql code from eclipse workspace. If you run your code from the command line (with a packaged jar) or from Intellij, I bet it should work. IMHO This is some how related to eclipse env, but would love to know how to fix it (whether via eclipse conf, or via a patch in spark). On 03/01/2015 02:32 AM, Michael Armbrust wrote: I think its possible that the problem is that the scala compiler is not being loaded by the primordial classloader (but instead by some child classloader) and thus the scala reflection mirror is failing to initialize when it can't find it. Unfortunately, the only solution that I know of is to load all required jars when the JVM starts. On Sat, Feb 28, 2015 at 5:26 PM, Ashish Nigam ashnigamt...@gmail.com mailto:ashnigamt...@gmail.com wrote: Also, can scala version play any role here? I am using scala 2.11.5 but all spark packages have dependency to scala 2.11.2 Just wanted to make sure that scala version is not an issue here. On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com mailto:ashnigamt...@gmail.com wrote: Hi, I wrote a very simple program in scala to convert an existing RDD to SchemaRDD. But createSchemaRDD function is throwing exception Exception in thread main scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial classloader with boot classpath [.] not found Here's more info on the versions I am using - scala.binary.version2.11/scala.binary.version spark.version1.2.1/spark.version scala.version2.11.5/scala.version Please let me know how can I resolve this problem. Thanks Ashish -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Sql with python udf fail
ok i'll try asap 2015-03-23 17:00 GMT+08:00 Cheng Lian lian.cs@gmail.com: I suspect there is a malformed row in your input dataset. Could you try something like this to confirm: sql(SELECT * FROM your-table).foreach(println) If there does exist a malformed line, you should see similar exception. And you can catch it with the help of the output. Notice that the messages are printed to stdout on executor side. On 3/23/15 4:36 PM, lonely Feb wrote: I caught exceptions in the python UDF code, flush exceptions into a single file, and made sure the the column number of the output lines as same as sql schema. Sth. interesting is that my output line of the UDF code is just 10 columns, and the exception above is java.lang. ArrayIndexOutOfBoundsException: 9, is there any inspirations? 2015-03-23 16:24 GMT+08:00 Cheng Lian lian.cs@gmail.com: Could you elaborate on the UDF code? On 3/23/15 3:43 PM, lonely Feb wrote: Hi all, I tried to transfer some hive jobs into spark-sql. When i ran a sql job with python udf i got a exception: java.lang.ArrayIndexOutOfBoundsException: 9 at org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142) at org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37) at org.apache.spark.sql.catalyst.expressions.EqualTo.eval(predicates.scala:166) at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$anonfun$apply$1.apply(predicates.scala:30) at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$anonfun$apply$1.apply(predicates.scala:30) at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.Aggregate$anonfun$execute$1$anonfun$7.apply(Aggregate.scala:156) at org.apache.spark.sql.execution.Aggregate$anonfun$execute$1$anonfun$7.apply(Aggregate.scala:151) at org.apache.spark.rdd.RDD$anonfun$13.apply(RDD.scala:601) at org.apache.spark.rdd.RDD$anonfun$13.apply(RDD.scala:601) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) I suspected there was an odd line in the input file. But the input file is so large and i could not found any abnormal lines with several jobs to check. How can i get the abnormal line here ?
Re: Spark Sql with python udf fail
sql(SELECT * FROM your-table).foreach(println) can be executed successfully. So the problem may still be in UDF code. How can i print the the line with ArrayIndexOutOfBoundsException in catalyst? 2015-03-23 17:04 GMT+08:00 lonely Feb lonely8...@gmail.com: ok i'll try asap 2015-03-23 17:00 GMT+08:00 Cheng Lian lian.cs@gmail.com: I suspect there is a malformed row in your input dataset. Could you try something like this to confirm: sql(SELECT * FROM your-table).foreach(println) If there does exist a malformed line, you should see similar exception. And you can catch it with the help of the output. Notice that the messages are printed to stdout on executor side. On 3/23/15 4:36 PM, lonely Feb wrote: I caught exceptions in the python UDF code, flush exceptions into a single file, and made sure the the column number of the output lines as same as sql schema. Sth. interesting is that my output line of the UDF code is just 10 columns, and the exception above is java.lang. ArrayIndexOutOfBoundsException: 9, is there any inspirations? 2015-03-23 16:24 GMT+08:00 Cheng Lian lian.cs@gmail.com: Could you elaborate on the UDF code? On 3/23/15 3:43 PM, lonely Feb wrote: Hi all, I tried to transfer some hive jobs into spark-sql. When i ran a sql job with python udf i got a exception: java.lang.ArrayIndexOutOfBoundsException: 9 at org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142) at org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37) at org.apache.spark.sql.catalyst.expressions.EqualTo.eval(predicates.scala:166) at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$anonfun$apply$1.apply(predicates.scala:30) at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$anonfun$apply$1.apply(predicates.scala:30) at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.Aggregate$anonfun$execute$1$anonfun$7.apply(Aggregate.scala:156) at org.apache.spark.sql.execution.Aggregate$anonfun$execute$1$anonfun$7.apply(Aggregate.scala:151) at org.apache.spark.rdd.RDD$anonfun$13.apply(RDD.scala:601) at org.apache.spark.rdd.RDD$anonfun$13.apply(RDD.scala:601) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) I suspected there was an odd line in the input file. But the input file is so large and i could not found any abnormal lines with several jobs to check. How can i get the abnormal line here ?
Re: SocketTimeout only when launching lots of executors
It seems your driver is getting flooded by those many executors and hence it gets timeout. There are some configuration options like spark.akka.timeout etc, you could try playing with those. More information will be available here: http://spark.apache.org/docs/latest/configuration.html Thanks Best Regards On Mon, Mar 23, 2015 at 9:46 AM, Tianshuo Deng td...@twitter.com.invalid wrote: Hi, spark users. When running a spark application with lots of executors(300+), I see following failures: java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.read(SocketInputStream.java:152) at java.net.SocketInputStream.read(SocketInputStream.java:122) at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) at java.io.BufferedInputStream.read1(BufferedInputStream.java:275) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:690) at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:633) at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1324) at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:583) at org.apache.spark.util.Utils$.fetchFile(Utils.scala:421) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:356) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:353) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:353) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) When I reduce the number of executors, the spark app runs fine. From the stack trace, it looks like that multiple executors requesting downloading dependencies at the same time is causing driver to timeout? Anyone experienced similar issues or has any suggestions? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: netlib-java cannot load native lib in Windows when using spark-submit
I did not build my own Spark. I got the binary version online. If it can load the native libs from IDE, it should also be able to load native when running with --matter local. On Mon, 23 Mar 2015 07:15 Burak Yavuz brk...@gmail.com wrote: Did you build Spark with: -Pnetlib-lgpl? Ref: https://spark.apache.org/docs/latest/mllib-guide.html Burak On Sun, Mar 22, 2015 at 7:37 AM, Ted Yu yuzhih...@gmail.com wrote: How about pointing LD_LIBRARY_PATH to native lib folder ? You need Spark 1.2.0 or higher for the above to work. See SPARK-1719 Cheers On Sun, Mar 22, 2015 at 4:02 AM, Xi Shen davidshe...@gmail.com wrote: Hi Ted, I have tried to invoke the command from both cygwin environment and powershell environment. I still get the messages: 15/03/22 21:56:00 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/03/22 21:56:00 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS From the Spark UI, I can see: spark.driver.extraLibrary c:\openblas Thanks, David On Sun, Mar 22, 2015 at 11:45 AM Ted Yu yuzhih...@gmail.com wrote: Can you try the --driver-library-path option ? spark-submit --driver-library-path /opt/hadoop/lib/native ... Cheers On Sat, Mar 21, 2015 at 4:58 PM, Xi Shen davidshe...@gmail.com wrote: Hi, I use the *OpenBLAS* DLL, and have configured my application to work in IDE. When I start my Spark application from IntelliJ IDE, I can see in the log that the native lib is loaded successfully. But if I use *spark-submit* to start my application, the native lib still cannot be load. I saw the WARN message that it failed to load both the native and native-ref library. I checked the *Environment* tab in the Spark UI, and the *java.library.path* is set correctly. Thanks, David
Re: Spark Sql with python udf fail
I suspect there is a malformed row in your input dataset. Could you try something like this to confirm: |sql(SELECT * FROM your-table).foreach(println) | If there does exist a malformed line, you should see similar exception. And you can catch it with the help of the output. Notice that the messages are printed to stdout on executor side. On 3/23/15 4:36 PM, lonely Feb wrote: I caught exceptions in the python UDF code, flush exceptions into a single file, and made sure the the column number of the output lines as same as sql schema. Sth. interesting is that my output line of the UDF code is just 10 columns, and the exception above is java.lang.ArrayIndexOutOfBoundsException: 9, is there any inspirations? 2015-03-23 16:24 GMT+08:00 Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com: Could you elaborate on the UDF code? On 3/23/15 3:43 PM, lonely Feb wrote: Hi all, I tried to transfer some hive jobs into spark-sql. When i ran a sql job with python udf i got a exception: java.lang.ArrayIndexOutOfBoundsException: 9 at org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142) at org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37) at org.apache.spark.sql.catalyst.expressions.EqualTo.eval(predicates.scala:166) at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$anonfun$apply$1.apply(predicates.scala:30) at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$anonfun$apply$1.apply(predicates.scala:30) at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.Aggregate$anonfun$execute$1$anonfun$7.apply(Aggregate.scala:156) at org.apache.spark.sql.execution.Aggregate$anonfun$execute$1$anonfun$7.apply(Aggregate.scala:151) at org.apache.spark.rdd.RDD$anonfun$13.apply(RDD.scala:601) at org.apache.spark.rdd.RDD$anonfun$13.apply(RDD.scala:601) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) I suspected there was an odd line in the input file. But the input file is so large and i could not found any abnormal lines with several jobs to check. How can i get the abnormal line here ?
Spark SQL udf(ScalaUdf) is very slow
My test env:1. Spark version is 1.3.02. 3 node per 80G/20C3. read 250G parquet files from hdfs Test case:1. register floor func with command: *sqlContext.udf.register(floor, (ts: Int) = ts - ts % 300), *then run with sql select chan, floor(ts) as tt, sum(size) from qlogbase3 group by chan, floor(ts), *it takes 17 minutes.*== Physical Plan == Aggregate false, [chan#23015,PartialGroup#23500], [chan#23015,PartialGroup#23500 AS tt#23494,CombineSum(PartialSum#23499L) AS c2#23495L] Exchange (HashPartitioning [chan#23015,PartialGroup#23500], 54) Aggregate true, [chan#23015,*scalaUDF*(ts#23016)], [chan#23015,scalaUDF(ts#23016) AS PartialGroup#23500,SUM(size#23023L) AS PartialSum#23499L] PhysicalRDD [chan#23015,ts#23016,size#23023L], MapPartitionsRDD[115] at map at newParquet.scala:5622.run with sql select chan, (ts - ts % 300) as tt, sum(size) from qlogbase3 group by chan, (ts - ts % 300), *it takes only 5 minutes.*== Physical Plan == Aggregate false, [chan#23015,PartialGroup#23349], [chan#23015,PartialGroup#23349 AS tt#23343,CombineSum(PartialSum#23348L) AS c2#23344L]Exchange (HashPartitioning [chan#23015,PartialGroup#23349], 54)Aggregate true, [chan#23015,(ts#23016 - (ts#23016 % 300))], [chan#23015,(ts#23016 - (ts#23016 % 300)) AS PartialGroup#23349,SUM(size#23023L) AS PartialSum#23348L] PhysicalRDD [chan#23015,ts#23016,size#23023L], MapPartitionsRDD[83] at map at newParquet.scala:5623. use *HiveContext* with sql select chan, floor((ts - ts % 300)) as tt, sum(size) from qlogbase3 group by chan, floor((ts - ts % 300)) it takes only 5 minutes too.== Physical Plan == Aggregate false, [chan#23015,PartialGroup#23108L], [chan#23015,PartialGroup#23108L AS tt#23102L,CombineSum(PartialSum#23107L) AS _c2#23103L] Exchange (HashPartitioning [chan#23015,PartialGroup#23108L], 54) Aggregate true, [chan#23015,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor((ts#23016 - (ts#23016 % 300)))], [chan#23015,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor((ts#23016 - (ts#23016 % 300))) AS PartialGroup#23108L,SUM(size#23023L) AS PartialSum#23107L] PhysicalRDD [chan#23015,ts#23016,size#23023L], MapPartitionsRDD[28] at map at newParquet.scala:562*Why? ScalaUdf is so slow?? How to improve it?* -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-udf-ScalaUdf-is-very-slow-tp22185.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark streaming alerting
Akhil You are right in tour answer to what Mohit wrote. However what Mohit seems to be alluring but did not write properly might be different. Mohit You are wrong in saying generally streaming works in HDFS and cassandra . Streaming typically works with streaming or queing source like Kafka, kinesis, Twitter, flume, zeroMQ, etc (but can also from HDFS and S3 ) However , streaming context ( receiver wishing the streaming context ) gets events/messages/records and forms a time window based batch (RDD)- So there is a maximum gap of window time from alert message was available to spark and when the processing happens. I think you meant about this. As per spark programming model, RDD is the right way to deal with data. If you are fine with the minimum delay of say a sec (based on min time window that dstreaming can support) then what Rohit gave is a right model. Khanderao On Mar 22, 2015, at 11:39 PM, Akhil Das ak...@sigmoidanalytics.com wrote: What do you mean you can't send it directly from spark workers? Here's a simple approach which you could do: val data = ssc.textFileStream(sigmoid/) val dist = data.filter(_.contains(ERROR)).foreachRDD(rdd = alert(Errors : + rdd.count())) And the alert() function could be anything triggering an email or sending an SMS alert. Thanks Best Regards On Sun, Mar 22, 2015 at 1:52 AM, Mohit Anchlia mohitanch...@gmail.com wrote: Is there a module in spark streaming that lets you listen to the alerts/conditions as they happen in the streaming module? Generally spark streaming components will execute on large set of clusters like hdfs or Cassandra, however when it comes to alerting you generally can't send it directly from the spark workers, which means you need a way to listen to the alerts.
Re: Spark UI tunneling
Oh in that case you could try adding the hostname in your /etc/hosts under your localhost. Also make sure there is a request going to another host by inspecting the network calls: [image: Inline image 1] Thanks Best Regards On Mon, Mar 23, 2015 at 1:55 PM, Sergey Gerasimov ser...@gmail.com wrote: Akhil, that's what I did. The problem is that probably web server tried to forward my request to another address accessible locally only. 23 марта 2015 г., в 11:12, Akhil Das ak...@sigmoidanalytics.com написал(а): Did you try ssh -L 4040:127.0.0.1:4040 user@host Thanks Best Regards On Mon, Mar 23, 2015 at 1:12 PM, sergunok ser...@gmail.com wrote: Is it a way to tunnel Spark UI? I tried to tunnel client-node:4040 but my browser was redirected from localhost to some cluster locally visible domain name.. Maybe there is some startup option to encourage Spark UI be fully accessiable just through single endpoint (address:port)? Serg. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-UI-tunneling-tp22184.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: log files of failed task
Hello Sergun, Generally you can use yarn application -list to see the applicationIDs of applications and then you can see the logs of finished applications using: yarn logs -applicationId applicationID Hope this helps. -- Emre Sevinç http://www.bigindustries.be/ On Mon, Mar 23, 2015 at 8:23 AM, sergunok ser...@gmail.com wrote: Hi, I executed a task on Spark in YARN and it failed. I see just executor lost message from YARNClientScheduler, no further details.. (I read ths error can be connected to spark.yarn.executor.memoryOverhead setting and already played with this param) How to go more deeply in details in log files and find exact reason? How can log of failed task be examined? Unfortunately I haven't access to UI of Spark just can use command line. Thanks! Serg. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/log-files-of-failed-task-tp22183.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Emre Sevinc
Re: How Does aggregate work
It is actually number of cores. If your processor has hyperthreading then it will be more (number of processors your OS sees) niedz., 22 mar 2015, 4:51 PM Ted Yu użytkownik yuzhih...@gmail.com napisał: I assume spark.default.parallelism is 4 in the VM Ashish was using. Cheers
Re: How to handle under-performing nodes in the cluster
It seems that node is not getting allocated with enough tasks, try increasing your level of parallelism or do a manual repartition so that everyone gets even tasks to operate on. Thanks Best Regards On Fri, Mar 20, 2015 at 8:05 PM, Yiannis Gkoufas johngou...@gmail.com wrote: Hi all, I have 6 nodes in the cluster and one of the nodes is clearly under-performing: I was wandering what is the impact of having such issues? Also what is the recommended way to workaround it? Thanks a lot, Yiannis
Re: updateStateByKey performance API
Hi Nikos, We experienced something similar in our setting where the Spark app was supposed to write to a Redis instance the final state changes. Over time the delay caused by re-writing the entire dataset in each iteration exceeded the Spark streaming batch size. In our cased the solution was to avoid updateStateByKey and persist the state directly to Redis. That of course means the join of the new keys to the old state needs to be done explicitly. I think the solution to this problem would be to just extend the Spark streaming API by having an alternative state update that instead of a cogroup does something like an inner join. Should be quite straightforward. I went ahead and added an issue: https://issues.apache.org/jira/browse/SPARK-6462 Note that this does not solve the problem when your state grows so large that merging in keys becomes a bottleneck (since that would require something like an IndexRDD). But in your case you mention serialization overhead to be the bottleneck, so maybe you could try filtering out unchanged keys before persisting the data? Just an idea.. Andre On 22/03/15 10:43, Andre Schumacher andre.sc...@gmail.com wrote: Forwarded Message Subject: Re: updateStateByKey performance API Date: Wed, 18 Mar 2015 13:06:15 +0200 From: Nikos Viorres nvior...@gmail.com To: Akhil Das ak...@sigmoidanalytics.com CC: user@spark.apache.org user@spark.apache.org Hi Akhil, Yes, that's what we are planning on doing at the end of the data. At the moment I am doing performance testing before the job hits production and testing on 4 cores to get baseline figures and deduced that in order to grow to 10 - 15 million keys we ll need at batch interval of ~20 secs if we don't want to allocate more than 8 cores on this job. The thing is that since we have a big silent window on the user interactions where the stream will have very few data we would like to be able to use these cores for batch processing during that window but we can't the way it currently works. best regards n On Wed, Mar 18, 2015 at 12:40 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You can always throw more machines at this and see if the performance is increasing. Since you haven't mentioned anything regarding your # cores etc. Thanks Best Regards On Wed, Mar 18, 2015 at 11:42 AM, nvrs nvior...@gmail.com wrote: Hi all, We are having a few issues with the performance of updateStateByKey operation in Spark Streaming (1.2.1 at the moment) and any advice would be greatly appreciated. Specifically, on each tick of the system (which is set at 10 secs) we need to update a state tuple where the key is the user_id and value an object with some state about the user. The problem is that using Kryo serialization for 5M users, this gets really slow to the point that we have to increase the period to more than 10 seconds so as not to fall behind. The input for the streaming job is a Kafka stream which is consists of key value pairs of user_ids with some sort of action codes, we join this to our checkpointed state key and update the state. I understand that the reason for iterating over the whole state set is for evicting items or updating state for everyone for time-depended computations but this does not apply on our situation and it hurts performance really bad. Is there a possibility of implementing in the future and extra call in the API for updating only a specific subset of keys? p.s. i will try asap to setting the dstream as non-serialized but then i am worried about GC and checkpointing performance -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/updateStateByKey-per formance-API-tp22113.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark UI tunneling
Did you try ssh -L 4040:127.0.0.1:4040 user@host Thanks Best Regards On Mon, Mar 23, 2015 at 1:12 PM, sergunok ser...@gmail.com wrote: Is it a way to tunnel Spark UI? I tried to tunnel client-node:4040 but my browser was redirected from localhost to some cluster locally visible domain name.. Maybe there is some startup option to encourage Spark UI be fully accessiable just through single endpoint (address:port)? Serg. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-UI-tunneling-tp22184.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Use pig load function in spark
Hi, all Can spark use pig's load function to load data? Best Regards, Kevin.
Re: spark disk-to-disk
Maybe implement a very simple function that uses the Hadoop API to read in based on file names (i.e. parts)? On Mon, Mar 23, 2015 at 10:55 AM, Koert Kuipers ko...@tresata.com wrote: there is a way to reinstate the partitioner, but that requires sc.objectFile to read exactly what i wrote, which means sc.objectFile should never split files on reading (a feature of hadoop file inputformat that gets in the way here). On Mon, Mar 23, 2015 at 1:39 PM, Koert Kuipers ko...@tresata.com wrote: i just realized the major limitation is that i lose partitioning info... On Mon, Mar 23, 2015 at 1:34 AM, Reynold Xin r...@databricks.com wrote: On Sun, Mar 22, 2015 at 6:03 PM, Koert Kuipers ko...@tresata.com wrote: so finally i can resort to: rdd.saveAsObjectFile(...) sc.objectFile(...) but that seems like a rather broken abstraction. This seems like a fine solution to me.
Re: Getting around Serializability issues for types not in my control
Instantiating the instance? The actual instance it's complaining about is: https://github.com/scalaz/scalaz/blob/16838556c9309225013f917e577072476f46dc14/core/src/main/scala/scalaz/std/Option.scala#L10-11 The specific import where it's picking up the instance is: https://github.com/scalaz/scalaz/blob/16838556c9309225013f917e577072476f46dc14/core/src/main/scala/scalaz/std/Option.scala#L227 Note the object extends OptionInstances which contains that instance. Is the suggestion to pass in something like new OptionInstances { } into the RDD#aggregate call? On Mon, Mar 23, 2015 at 1:09 PM, Cody Koeninger c...@koeninger.org wrote: Have you tried instantiating the instance inside the closure, rather than outside of it? If that works, you may need to switch to use mapPartition / foreachPartition for efficiency reasons. On Mon, Mar 23, 2015 at 3:03 PM, Adelbert Chang adelbe...@gmail.com wrote: Is there no way to pull out the bits of the instance I want before I sent it through the closure for aggregate? I did try pulling things out, along the lines of def foo[G[_], B](blah: Blah)(implicit G: Applicative[G]) = { val lift: B = G[RDD[B]] = b = G.point(sparkContext.parallelize(List(b))) rdd.aggregate(/* use lift in here */) } But that doesn't seem to work either, still seems to be trying to serialize the Applicative... :( On Mon, Mar 23, 2015 at 12:27 PM, Dean Wampler deanwamp...@gmail.com wrote: Well, it's complaining about trait OptionInstances which is defined in Option.scala in the std package. Use scalap or javap on the scalaz library to find out which member of the trait is the problem, but since it says $$anon$1, I suspect it's the first value member, implicit val optionInstance, which has a long list of mixin traits, one of which is probably at fault. OptionInstances is huge, so there might be other offenders. Scalaz wasn't designed for distributed systems like this, so you'll probably find many examples of nonserializability. An alternative is to avoid using Scalaz in any closures passed to Spark methods, but that's probably not what you want. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Mar 23, 2015 at 12:03 PM, adelbertc adelbe...@gmail.com wrote: Hey all, I'd like to use the Scalaz library in some of my Spark jobs, but am running into issues where some stuff I use from Scalaz is not serializable. For instance, in Scalaz there is a trait /** In Scalaz */ trait Applicative[F[_]] { def apply2[A, B, C](fa: F[A], fb: F[B])(f: (A, B) = C): F[C] def point[A](a: = A): F[A] } But when I try to use it in say, in an `RDD#aggregate` call I get: Caused by: java.io.NotSerializableException: scalaz.std.OptionInstances$$anon$1 Serialization stack: - object not serializable (class: scalaz.std.OptionInstances$$anon$1, value: scalaz.std.OptionInstances$$anon$1@4516ee8c) - field (class: dielectric.syntax.RDDOps$$anonfun$1, name: G$1, type: interface scalaz.Applicative) - object (class dielectric.syntax.RDDOps$$anonfun$1, function2) - field (class: dielectric.syntax.RDDOps$$anonfun$traverse$extension$1, name: apConcat$1, type: interface scala.Function2) - object (class dielectric.syntax.RDDOps$$anonfun$traverse$extension$1, function2) Outside of submitting a PR to Scalaz to make things Serializable, what can I do to make things Serializable? I considered something like implicit def applicativeSerializable[F[_]](implicit F: Applicative[F]): SomeSerializableType[F] = new SomeSerializableType { ... } ?? Not sure how to go about doing it - I looked at java.io.Externalizable but given `scalaz.Applicative` has no value members I'm not sure how to implement the interface. Any guidance would be much appreciated - thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-around-Serializability-issues-for-types-not-in-my-control-tp22193.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Adelbert (Allen) Chang -- Adelbert (Allen) Chang
Is it possible to use json4s 3.2.11 with Spark 1.3.0?
Spark has a dependency on json4s 3.2.10, but this version has several bugs and I need to use 3.2.11. I added json4s-native 3.2.11 dependency to build.sbt and everything compiled fine. But when I spark-submit my JAR it provides me with 3.2.10. build.sbt import sbt.Keys._ name := sparkapp version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.json4s %% json4s-native % 3.2.11` plugins.sbt logLevel := Level.Warn resolvers += Resolver.url(artifactory, url( http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases ))(Resolver.ivyStylePatterns) addSbtPlugin(com.eed3si9n % sbt-assembly % 0.13.0) App1.scala import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.{Logging, SparkConf, SparkContext} import org.apache.spark.SparkContext._ object App1 extends Logging { def main(args: Array[String]) = { val conf = new SparkConf().setAppName(App1) val sc = new SparkContext(conf) println(sjson4s version: ${org.json4s.BuildInfo.version.toString}) } } sbt 0.13.7, sbt-assembly 0.13.0, Scala 2.10.4 Is it possible to force 3.2.11 version usage? Thanks, Alexey
Re: Spark 1.3 Dynamic Allocation - Requesting 0 new executor(s) because tasks are backlogged
On Mon, Mar 23, 2015 at 2:15 PM, Manoj Samel manojsamelt...@gmail.com wrote: Found the issue above error - the setting for spark_shuffle was incomplete. Now it is able to ask and get additional executors. The issue is once they are released, it is not able to proceed with next query. That looks like SPARK-6325, which unfortunately was not fixed in time for 1.3.0... -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to use DataFrame with MySQL
for me, it's only working if I set --driver-class-path to mysql library. On Sun, Mar 22, 2015 at 11:29 PM, gavin zhang gavin@gmail.com wrote: OK,I found what the problem is: It couldn't work with mysql-connector-5.0.8. I updated the connector version to 5.1.34 and it worked. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-DataFrame-with-MySQL-tp22178p22182.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Shuffle Spill Memory and Shuffle Spill Disk
Hello, I am running TeraSort https://github.com/ehiggs/spark-terasort on 100GB of data. The final metrics I am getting on Shuffle Spill are: Shuffle Spill(Memory): 122.5 GB Shuffle Spill(Disk): 3.4 GB What's the difference and relation between these two metrics? Does these mean 122.5 GB was spill from memory during the shuffle? thank you, bijay
hadoop input/output format advanced control
currently its pretty hard to control the Hadoop Input/Output formats used in Spark. The conventions seems to be to add extra parameters to all methods and then somewhere deep inside the code (for example in PairRDDFunctions.saveAsHadoopFile) all these parameters get translated into settings on the Hadoop Configuration object. for example for compression i see codec: Option[Class[_ : CompressionCodec]] = None added to a bunch of methods. how scalable is this solution really? for example i need to read from a hadoop dataset and i dont want the input (part) files to get split up. the way to do this is to set mapred.min.split.size. now i dont want to set this at the level of the SparkContext (which can be done), since i dont want it to apply to input formats in general. i want it to apply to just this one specific input dataset i need to read. which leaves me with no options currently. i could go add yet another input parameter to all the methods (SparkContext.textFile, SparkContext.hadoopFile, SparkContext.objectFile, etc.). but that seems ineffective. why can we not expose a Map[String, String] or some other generic way to manipulate settings for hadoop input/output formats? it would require adding one more parameter to all methods to deal with hadoop input/output formats, but after that its done. one parameter to rule them all then i could do: val x = sc.textFile(/some/path, formatSettings = Map(mapred.min.split.size - 12345)) or rdd.saveAsTextFile(/some/path, formatSettings = Map(mapred.output.compress - true, mapred.output.compression.codec - somecodec))
Re: Spark 1.3 Dynamic Allocation - Requesting 0 new executor(s) because tasks are backlogged
Log shows stack traces that seem to match the assert in JIRA so it seems I am hitting the issue. Thanks for the heads up ... 15/03/23 20:29:50 ERROR actor.OneForOneStrategy: assertion failed: Allocator killed more executors than are allocated! java.lang.AssertionError: assertion failed: Allocator killed more executors than are allocated! at scala.Predef$.assert(Predef.scala:179) at org.apache.spark.deploy.yarn.YarnAllocator.killExecutor(YarnAllocator.scala:152) at org.apache.spark.deploy.yarn.ApplicationMaster$AMActor$$anonfun$receive$1$$anonfun$applyOrElse$6.apply(ApplicationMaster.scala:547) at org.apache.spark.deploy.yarn.ApplicationMaster$AMActor$$anonfun$receive$1$$anonfun$applyOrElse$6.apply(ApplicationMaster.scala:547) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.deploy.yarn.ApplicationMaster$AMActor$$anonfun$receive$1.applyOrElse(ApplicationMaster.scala:547) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.deploy.yarn.ApplicationMaster$AMActor.aroundReceive(ApplicationMaster.scala:506) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) On Mon, Mar 23, 2015 at 2:25 PM, Marcelo Vanzin van...@cloudera.com wrote: On Mon, Mar 23, 2015 at 2:15 PM, Manoj Samel manojsamelt...@gmail.com wrote: Found the issue above error - the setting for spark_shuffle was incomplete. Now it is able to ask and get additional executors. The issue is once they are released, it is not able to proceed with next query. That looks like SPARK-6325, which unfortunately was not fixed in time for 1.3.0... -- Marcelo
Re: Using a different spark jars than the one on the cluster
+1 - I currently am doing what Marcelo is suggesting as I have a CDH 5.2 cluster (with Spark 1.1) and I'm also running Spark 1.3.0+ side-by-side in my cluster. On Wed, Mar 18, 2015 at 1:23 PM Marcelo Vanzin van...@cloudera.com wrote: Since you're using YARN, you should be able to download a Spark 1.3.0 tarball from Spark's website and use spark-submit from that installation to launch your app against the YARN cluster. So effectively you would have 1.2.0 and 1.3.0 side-by-side in your cluster. On Wed, Mar 18, 2015 at 11:09 AM, jaykatukuri jkatuk...@apple.com wrote: Hi all, I am trying to run my job which needs spark-sql_2.11-1.3.0.jar. The cluster that I am running on is still on spark-1.2.0. I tried the following : spark-submit --class class-name --num-executors 100 --master yarn application_jar--jars hdfs:///path/spark-sql_2.11-1.3.0.jar hdfs:///input_data But, this did not work, I get an error that it is not able to find a class/method that is in spark-sql_2.11-1.3.0.jar . org.apache.spark.sql.SQLContext.implicits()Lorg/ apache/spark/sql/SQLContext$implicits$ The question in general is how do we use a different version of spark jars (spark-core, spark-sql, spark-ml etc) than the one's running on a cluster ? Thanks, Jay -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/Using-a-different-spark-jars-than-the- one-on-the-cluster-tp22125.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.3 Dynamic Allocation - Requesting 0 new executor(s) because tasks are backlogged
Found the issue above error - the setting for spark_shuffle was incomplete. Now it is able to ask and get additional executors. The issue is once they are released, it is not able to proceed with next query. The environment is CDH 5.3.2 (Hadoop 2.5) with Kerberos Spark 1.3 After idle time, the release of executor gives a stack trace under WARN and returns to the prompt (in spark-shell) 15/03/23 20:55:50 INFO YarnClientSchedulerBackend: Requesting to kill executor(s) 2 15/03/23 20:55:50 INFO ExecutorAllocationManager: Removing executor 2 because it has been idle for 60 seconds (new desired total will be 6) 15/03/23 20:55:50 INFO YarnClientSchedulerBackend: Requesting to kill executor(s) 5 15/03/23 20:55:50 INFO ExecutorAllocationManager: Removing executor 5 because it has been idle for 60 seconds (new desired total will be 5) 15/03/23 20:55:50 INFO YarnClientSchedulerBackend: Requesting to kill executor(s) 1 15/03/23 20:55:51 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@xxxl:47358] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 15/03/23 20:55:51 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@yyy:51807] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 15/03/23 20:55:52 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@zzz:54623] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 15/03/23 20:56:20 WARN AkkaUtils: Error sending message [message = KillExecutors(ArrayBuffer(1))] in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:171) at org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerActor$$anonfun$receive$1$$anonfun$applyOrElse$4.apply$mcV$sp(YarnSchedulerBackend.scala:136) at org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerActor$$anonfun$receive$1$$anonfun$applyOrElse$4.apply(YarnSchedulerBackend.scala:136) at org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerActor$$anonfun$receive$1$$anonfun$applyOrElse$4.apply(YarnSchedulerBackend.scala:136) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/03/23 20:56:20 WARN AkkaUtils: Error sending message [message = KillExecutors(ArrayBuffer(1))] in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:171) at org.apache.spark.scheduler.cluster.YarnSchedulerBackend.doKillExecutors(YarnSchedulerBackend.scala:68) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors(CoarseGrainedSchedulerBackend.scala:375) at org.apache.spark.SparkContext.killExecutors(SparkContext.scala:1173) at org.apache.spark.ExecutorAllocationClient$class.killExecutor(ExecutorAllocationClient.scala:49) at org.apache.spark.SparkContext.killExecutor(SparkContext.scala:1186) at org.apache.spark.ExecutorAllocationManager.org $apache$spark$ExecutorAllocationManager$$removeExecutor(ExecutorAllocationManager.scala:353) at org.apache.spark.ExecutorAllocationManager$$anonfun$org$apache$spark$ExecutorAllocationManager$$schedule$1.apply(ExecutorAllocationManager.scala:237) at org.apache.spark.ExecutorAllocationManager$$anonfun$org$apache$spark$ExecutorAllocationManager$$schedule$1.apply(ExecutorAllocationManager.scala:234) at scala.collection.mutable.MapLike$$anonfun$retain$2.apply(MapLike.scala:213) at scala.collection.mutable.MapLike$$anonfun$retain$2.apply(MapLike.scala:212) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.immutable.List.foreach(List.scala:318) at
SchemaRDD/DataFrame result partitioned according to the underlying datasource partitions
Is there a way to take advantage of the underlying datasource partitions when generating a DataFrame/SchemaRDD via catalyst? It seems from the sql module that the only options are RangePartitioner and HashPartitioner - and further that those are selected automatically by the code . It was not apparent that either the underlying partitioning were translated to the partitions presented in the rdd or that a custom partitioner were possible to be provided. The motivation would be to subsequently use df.map (with preservesPartitioning=true) and/or df.mapPartitions (likewise) to perform operations that work within the original datasource partitions - thus avoiding a shuffle.
Re: Why doesn't the --conf parameter work in yarn-cluster mode (but works in yarn-client and local)?
Hi Emre, The --conf property is meant to work with yarn-cluster mode. System.getProperty(key) isn't guaranteed, but new SparkConf().get(key) should. Does it not? -Sandy On Mon, Mar 23, 2015 at 8:39 AM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, According to Spark Documentation at https://spark.apache.org/docs/1.2.1/submitting-applications.html : --conf: Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap “key=value” in quotes (as shown). And indeed, when I use that parameter, in my Spark program I can retrieve the value of the key by using: System.getProperty(key); This works when I test my program locally, and also in yarn-client mode, I can log the value of the key and see that it matches what I wrote in the command line, but it returns *null* when I submit the very same program in *yarn-cluster* mode. Why can't I retrieve the value of key given as --conf key=value when I submit my Spark application in *yarn-cluster* mode? Any ideas and/or workarounds? -- Emre Sevinç http://www.bigindustries.be/
Parquet file + increase read parallelism
Hi All, Suppose I have a parquet file of 100 MB in HDFS my HDFS block is 64MB, so I have 2 block of data. When I do, *sqlContext.parquetFile(path)* followed by an action , two tasks are stared on two partitions. My intend is to read this 2 blocks in more partitions to fully utilize my cluster resources increase parallelism. Is there a way to do so like in case of sc.textFile(path,*numberOfPartitions*). Please note, I don't want to do *repartition* as that would result in lot of shuffle. Thanks in advance. Regards, Sam -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-file-increase-read-parallelism-tp22190.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DataFrame operation on parquet: GC overhead limit exceeded
Have you tried to repartition() your original data to make more partitions before you aggregate? -- Martin Goodson | VP Data Science (0)20 3397 1240 [image: Inline image 1] On Mon, Mar 23, 2015 at 4:12 PM, Yiannis Gkoufas johngou...@gmail.com wrote: Hi Yin, Yes, I have set spark.executor.memory to 8g and the worker memory to 16g without any success. I cannot figure out how to increase the number of mapPartitions tasks. Thanks a lot On 20 March 2015 at 18:44, Yin Huai yh...@databricks.com wrote: spark.sql.shuffle.partitions only control the number of tasks in the second stage (the number of reducers). For your case, I'd say that the number of tasks in the first state (number of mappers) will be the number of files you have. Actually, have you changed spark.executor.memory (it controls the memory for an executor of your application)? I did not see it in your original email. The difference between worker memory and executor memory can be found at (http://spark.apache.org/docs/1.3.0/spark-standalone.html ), SPARK_WORKER_MEMORY Total amount of memory to allow Spark applications to use on the machine, e.g. 1000m, 2g (default: total memory minus 1 GB); note that each application's individual memory is configured using its spark.executor.memory property. On Fri, Mar 20, 2015 at 9:25 AM, Yiannis Gkoufas johngou...@gmail.com wrote: Actually I realized that the correct way is: sqlContext.sql(set spark.sql.shuffle.partitions=1000) but I am still experiencing the same behavior/error. On 20 March 2015 at 16:04, Yiannis Gkoufas johngou...@gmail.com wrote: Hi Yin, the way I set the configuration is: val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext.setConf(spark.sql.shuffle.partitions,1000); it is the correct way right? In the mapPartitions task (the first task which is launched), I get again the same number of tasks and again the same error. :( Thanks a lot! On 19 March 2015 at 17:40, Yiannis Gkoufas johngou...@gmail.com wrote: Hi Yin, thanks a lot for that! Will give it a shot and let you know. On 19 March 2015 at 16:30, Yin Huai yh...@databricks.com wrote: Was the OOM thrown during the execution of first stage (map) or the second stage (reduce)? If it was the second stage, can you increase the value of spark.sql.shuffle.partitions and see if the OOM disappears? This setting controls the number of reduces Spark SQL will use and the default is 200. Maybe there are too many distinct values and the memory pressure on every task (of those 200 reducers) is pretty high. You can start with 400 and increase it until the OOM disappears. Hopefully this will help. Thanks, Yin On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas johngou...@gmail.com wrote: Hi Yin, Thanks for your feedback. I have 1700 parquet files, sized 100MB each. The number of tasks launched is equal to the number of parquet files. Do you have any idea on how to deal with this situation? Thanks a lot On 18 Mar 2015 17:35, Yin Huai yh...@databricks.com wrote: Seems there are too many distinct groups processed in a task, which trigger the problem. How many files do your dataset have and how large is a file? Seems your query will be executed with two stages, table scan and map-side aggregation in the first stage and the final round of reduce-side aggregation in the second stage. Can you take a look at the numbers of tasks launched in these two stages? Thanks, Yin On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas johngou...@gmail.com wrote: Hi there, I set the executor memory to 8g but it didn't help On 18 March 2015 at 13:59, Cheng Lian lian.cs@gmail.com wrote: You should probably increase executor memory by setting spark.executor.memory. Full list of available configurations can be found here http://spark.apache.org/docs/latest/configuration.html Cheng On 3/18/15 9:15 PM, Yiannis Gkoufas wrote: Hi there, I was trying the new DataFrame API with some basic operations on a parquet dataset. I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a standalone cluster mode. The code is the following: val people = sqlContext.parquetFile(/data.parquet); val res = people.groupBy(name,date). agg(sum(power),sum(supply)).take(10); System.out.println(res); The dataset consists of 16 billion entries. The error I get is java.lang.OutOfMemoryError: GC overhead limit exceeded My configuration is: spark.serializer org.apache.spark.serializer.KryoSerializer spark.driver.memory6g spark.executor.extraJavaOptions -XX:+UseCompressedOops spark.shuffle.managersort Any idea how can I workaround this? Thanks a lot
Re: spark disk-to-disk
i just realized the major limitation is that i lose partitioning info... On Mon, Mar 23, 2015 at 1:34 AM, Reynold Xin r...@databricks.com wrote: On Sun, Mar 22, 2015 at 6:03 PM, Koert Kuipers ko...@tresata.com wrote: so finally i can resort to: rdd.saveAsObjectFile(...) sc.objectFile(...) but that seems like a rather broken abstraction. This seems like a fine solution to me.
Re: Write to Parquet File in Python
Hey Akriti23, pyspark gives you a saveAsParquetFile() api, to save your rdd as parquet. You will however, need to infer the schema or describe it manually before you can do so. Here are some docs about that (v1.2.1, you can search for the others, they're relatively similar 1.1 and up): http://spark.apache.org/docs/1.2.1/sql-programming-guide.html#inferring-the-schema-using-reflection http://spark.apache.org/docs/1.2.1/sql-programming-guide.html#parquet-files As for whether it is the most efficient way to do a range query, that's a more difficult question and it would be helpful if you could give some more information. Another thing to think about is that you could just use a temp table, and not store the parquet all together. - same docs, just read through them -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Write-to-Parquet-File-in-Python-tp22186p22191.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
newbie quesiton - spark with mesos
i have a mesos cluster, which i deploy spark to by using instructions on http://spark.apache.org/docs/0.7.2/running-on-mesos.html after that the spark shell starts up fine. then i try the following on the shell: val data = 1 to 1 val distData = sc.parallelize(data) distData.filter(_ 10).collect() open spark web ui at host:4040 and see an active job. NOW, how do i start workers or spark workers on mesos ? who completes my job? thanks, -- Ani
Re: version conflict common-net
Hi Sean, Thanks a ton for you reply. The particular situation I have is case (3) that you have mentioned. The class that I am using from commons-net is FTPClient(). This class is present in both the 2.2 version and the 3.3 version. However, in the 3.3 version there are two additional methods (among several others) setAutoDetectUTF8() and setControlKeepAliveTimeout() that we require to use. The class FTPClient() and the two methods are a part of a custom receiver (ZipStream), that we wrote. Our spark app is deployed on YARN. Looking online and doing more research I found this link - SPARK-939 https://issues.apache.org/jira/browse/SPARK-939 In the comments section, I found a mention that there is already a flag spark.yarn.user.classpath.first to make this happen. I have not yet tried it out. I guess, this should do the trick right? Also, there are some more JIRA items I see that, indicate combining the spark.files.userClassPathFirst and spark.yarn.user.classpath.first. It has been marked as resolved. However I am a bit confused about the state of the JIRA as far as Cloudera version of spark. We are using spark that comes with CDH 5.3.2 (Spark 1.2.0). I think this combined flag is marked for spark 1.3. If all else fails shade 3.3... :) Thanks again, -Jacob On Fri, Mar 20, 2015 at 10:07 AM, Sean Owen so...@cloudera.com wrote: It's not a crazy question, no. I'm having a bit of trouble figuring out what's happening. Commons Net 2.2 is what's used by Spark. The error appears to come from Spark. But the error is not finding a method that did not exist in 2.2. I am not sure what ZipStream is, for example. This could be a bizarre situation where classloader rules mean that part of 2.2 and part of 3.3 are being used. For example, let's say: - your receiver uses 3.3 classes that are only in 3.3, so they are found in your user classloader - 3.3 classes call some class that also existed in 2.2, but those are found in the Spark classloader. - 2.2 class doesn't have methods that 3.3 expects userClassPathFirst is often a remedy. There are several versions of this flag though. For example you need a different one if on YARN to have it take effect. It's worth ruling that out first. If all else fails you can shade 3.3. On Fri, Mar 20, 2015 at 11:44 AM, Jacob Abraham abe.jac...@gmail.com wrote: Anyone? or is this question nonsensical... and I am doing something fundamentally wrong? On Mon, Mar 16, 2015 at 5:33 PM, Jacob Abraham abe.jac...@gmail.com wrote: Hi Folks, I have a situation where I am getting a version conflict between java libraries that is used by my application and ones used by spark. Following are the details - I use spark provided by Cloudera running on the CDH5.3.2 cluster (Spark 1.2.0-cdh5.3.2). The library that is causing the conflict is commons-net. In our spark application we use commons-net with version 3.3. However I found out that spark uses commons-net version 2.2. Hence when we try to submit our application using spark-submit, I end up getting, a NoSuchMethodError() Error starting receiver 5 - java.lang.NoSuchMethodError: org.apache.commons.net.ftp.FTPClient.setAutodetectUTF8(Z)V at ZipStream.onStart(ZipStream.java:55) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269) . Now, if I change the commons-net version to 2.2, the job runs fine (expect for the fact that some of the features we use from the commons-net 3.3 are not there). How does one resolve such an issue where sparks uses one set of libraries and our user application requires the same set of libraries, but just a different version of it (In my case commons-net 2.2 vs 3.3). I see that there is a setting that I can supply - spark.files.userClassPathFirst, but the documentation says that it is experimental and for us this did not work at all. Thanks in advance. Regards, -Jacob
Re: Strange behavior with PySpark when using Join() and zip()
I think the explanation is that the join does not guarantee any order, since it causes a shuffle in general, and it is computed twice in the first example, resulting in a difference for d1 and d2. You can persist() the result of the join and in practice I believe you'd find it behaves as expected, although that is even not 100% guaranteed since a block could be lost and recomputed (differently). If order matters, and it does for zip(), then the reliable way to guarantee a well defined ordering for zipping is to sort the RDDs. On Mon, Mar 23, 2015 at 6:27 PM, Ofer Mendelevitch omendelevi...@hortonworks.com wrote: Hi, I am running into a strange issue when doing a JOIN of two RDDs followed by ZIP from PySpark. It’s part of a more complex application, but was able to narrow it down to a simplified example that’s easy to replicate and causes the same problem to appear: raw = sc.parallelize([('k'+str(x),'v'+str(x)) for x in range(100)]) data = raw.join(raw).mapValues(lambda x: [x[0]]+[x[1]]).map(lambda pair: ','.join([x for x in pair[1]])) d1 = data.map(lambda s: s.split(',')[0]) d2 = data.map(lambda s: s.split(',')[1]) x = d1.zip(d2) print x.take(10) The output is: [('v44', 'v80'), ('v79', 'v44'), ('v80', 'v79'), ('v45', 'v78'), ('v81', 'v81'), ('v78', 'v45'), ('v99', 'v99'), ('v82', 'v82'), ('v46', 'v46'), ('v83', 'v83')] As you can see, the ordering of items is not preserved anymore in all cases. (e.g., ‘v81’ is preserved, and ‘v45’ is not) Is it not supposed to be preserved? If I do the same thing without the JOIN: data = sc.parallelize('v'+str(x)+',v'+str(x) for x in range(100)) d1 = data.map(lambda s: s.split(',')[0]) d2 = data.map(lambda s: s.split(',')[1]) x = d1.zip(d2) print x.take(10) The output is: [('v0', 'v0'), ('v1', 'v1'), ('v2', 'v2'), ('v3', 'v3'), ('v4', 'v4'), ('v5', 'v5'), ('v6', 'v6'), ('v7', 'v7'), ('v8', 'v8'), ('v9', 'v9')] As expected. Anyone run into this or a similar issue? Ofer - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark disk-to-disk
there is a way to reinstate the partitioner, but that requires sc.objectFile to read exactly what i wrote, which means sc.objectFile should never split files on reading (a feature of hadoop file inputformat that gets in the way here). On Mon, Mar 23, 2015 at 1:39 PM, Koert Kuipers ko...@tresata.com wrote: i just realized the major limitation is that i lose partitioning info... On Mon, Mar 23, 2015 at 1:34 AM, Reynold Xin r...@databricks.com wrote: On Sun, Mar 22, 2015 at 6:03 PM, Koert Kuipers ko...@tresata.com wrote: so finally i can resort to: rdd.saveAsObjectFile(...) sc.objectFile(...) but that seems like a rather broken abstraction. This seems like a fine solution to me.
Re: version conflict common-net
I think it's spark.yarn.user.classpath.first in 1.2, and spark.{driver,executor}.extraClassPath in 1.3. Obviously that's for if you are using YARN, in the first instance. On Mon, Mar 23, 2015 at 5:41 PM, Jacob Abraham abe.jac...@gmail.com wrote: Hi Sean, Thanks a ton for you reply. The particular situation I have is case (3) that you have mentioned. The class that I am using from commons-net is FTPClient(). This class is present in both the 2.2 version and the 3.3 version. However, in the 3.3 version there are two additional methods (among several others) setAutoDetectUTF8() and setControlKeepAliveTimeout() that we require to use. The class FTPClient() and the two methods are a part of a custom receiver (ZipStream), that we wrote. Our spark app is deployed on YARN. Looking online and doing more research I found this link - SPARK-939 In the comments section, I found a mention that there is already a flag spark.yarn.user.classpath.first to make this happen. I have not yet tried it out. I guess, this should do the trick right? Also, there are some more JIRA items I see that, indicate combining the spark.files.userClassPathFirst and spark.yarn.user.classpath.first. It has been marked as resolved. However I am a bit confused about the state of the JIRA as far as Cloudera version of spark. We are using spark that comes with CDH 5.3.2 (Spark 1.2.0). I think this combined flag is marked for spark 1.3. If all else fails shade 3.3... :) Thanks again, -Jacob On Fri, Mar 20, 2015 at 10:07 AM, Sean Owen so...@cloudera.com wrote: It's not a crazy question, no. I'm having a bit of trouble figuring out what's happening. Commons Net 2.2 is what's used by Spark. The error appears to come from Spark. But the error is not finding a method that did not exist in 2.2. I am not sure what ZipStream is, for example. This could be a bizarre situation where classloader rules mean that part of 2.2 and part of 3.3 are being used. For example, let's say: - your receiver uses 3.3 classes that are only in 3.3, so they are found in your user classloader - 3.3 classes call some class that also existed in 2.2, but those are found in the Spark classloader. - 2.2 class doesn't have methods that 3.3 expects userClassPathFirst is often a remedy. There are several versions of this flag though. For example you need a different one if on YARN to have it take effect. It's worth ruling that out first. If all else fails you can shade 3.3. On Fri, Mar 20, 2015 at 11:44 AM, Jacob Abraham abe.jac...@gmail.com wrote: Anyone? or is this question nonsensical... and I am doing something fundamentally wrong? On Mon, Mar 16, 2015 at 5:33 PM, Jacob Abraham abe.jac...@gmail.com wrote: Hi Folks, I have a situation where I am getting a version conflict between java libraries that is used by my application and ones used by spark. Following are the details - I use spark provided by Cloudera running on the CDH5.3.2 cluster (Spark 1.2.0-cdh5.3.2). The library that is causing the conflict is commons-net. In our spark application we use commons-net with version 3.3. However I found out that spark uses commons-net version 2.2. Hence when we try to submit our application using spark-submit, I end up getting, a NoSuchMethodError() Error starting receiver 5 - java.lang.NoSuchMethodError: org.apache.commons.net.ftp.FTPClient.setAutodetectUTF8(Z)V at ZipStream.onStart(ZipStream.java:55) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269) . Now, if I change the commons-net version to 2.2, the job runs fine (expect for the fact that some of the features we use from the commons-net 3.3 are not there). How does one resolve such an issue where sparks uses one set of libraries and our user application requires the same set of libraries, but just a different version of it (In my case commons-net 2.2 vs 3.3). I see that there is a setting that I can supply - spark.files.userClassPathFirst, but the documentation says that it is experimental and for us this did not work at all. Thanks in advance. Regards, -Jacob - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to check that a dataset is sorted after it has been written out?
Thanks for the information! (to all who responded) The code below *seems* to work.Any hidden gotcha's that anyone sees? And still, in terasort, how did they check that the data was actually sorted? :-) -Mike class MyInputFormat[T] extends parquet.hadoop.ParquetInputFormat[T]{ override def getSplits(jobContext: org.apache.hadoop.mapreduce.JobContext) :java.util.List[org.apache.hadoop.mapreduce.InputSplit] = { val splits = super.getSplits(jobContext) import scala.collection.JavaConversions._ splits.sortBy{ split = split match { case fileSplit :org.apache.hadoop.mapreduce.lib.input.FileSplit = (fileSplit.getPath.getName, fileSplit.getStart) case _ = (,-1L) } } }} From: Sean Owen so...@cloudera.com To: Michael Albert m_albert...@yahoo.com Cc: User user@spark.apache.org Sent: Monday, March 23, 2015 7:31 AM Subject: Re: How to check that a dataset is sorted after it has been written out? Data is not (necessarily) sorted when read from disk, no. A file might have many blocks even, and while a block yields a partition in general, the order in which those partitions appear in the RDD is not defined. This is why you'd sort if you need the data sorted. I think you could conceivably make some custom RDD or InputFormat that reads blocks in a well-defined order and, assuming the data is sorted in some knowable way on disk, then must have them sorted. I think that's even been brought up. Deciding whether the data is sorted is quite different. You'd have to decide what ordering you expect (is part 0 before part 1? should it be sorted in a part file?) and then just verify that externally. On Fri, Mar 20, 2015 at 10:41 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! I sorted a dataset in Spark and then wrote it out in avro/parquet. Then I wanted to check that it was sorted. It looks like each partition has been sorted, but when reading in, the first partition (i.e., as seen in the partition index of mapPartitionsWithIndex) is not the same as implied by the names of the parquet files (even when the number of partitions is the same in the rdd which was read as on disk). If I take() a few hundred values, they are sorted, but they are *not* the same as if I explicitly open part-r-0.parquet and take values from that. It seems that when opening the rdd, the partitions of the rdd are not in the same order as implied by the data on disk (i.e., part-r-0.parquet, part-r-1.parquet, etc). So, how might one read the data so that one maintains the sort order? And while on the subject, after the terasort, how did they check that the data was actually sorted correctly? (or did they :-) ? ). Is there any way to read the data back in so as to preserve the sort, or do I need to zipWithIndex before writing it out, and write the index at that time? (I haven't tried the latter yet). Thanks! -Mike - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Converting SparkSQL query to Scala query
I have a complex SparkSQL query of the nature select a.a, b.b, c.c from a,b,c where a.x = b.x and b.y = c.y How do I convert this efficiently into scala query of a.join(b,..,..) and so on. Can anyone help me with this? If my question needs more clarification, please let me know. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Converting-SparkSQL-query-to-Scala-query-tp22192.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org