Why doesn't the --conf parameter work in yarn-cluster mode (but works in yarn-client and local)?

2015-03-23 Thread Emre Sevinc
Hello,

According to Spark Documentation at
https://spark.apache.org/docs/1.2.1/submitting-applications.html :

  --conf: Arbitrary Spark configuration property in key=value format. For
values that contain spaces wrap “key=value” in quotes (as shown).

And indeed, when I use that parameter, in my Spark program I can retrieve
the value of the key by using:

System.getProperty(key);

This works when I test my program locally, and also in yarn-client mode, I
can log the value of the key and see that it matches what I wrote in the
command line, but it returns *null* when I submit the very same program in
*yarn-cluster* mode.

Why can't I retrieve the value of key given as --conf key=value when I
submit my Spark application in *yarn-cluster* mode?

Any ideas and/or workarounds?


-- 
Emre Sevinç
http://www.bigindustries.be/


Re: Saving Dstream into a single file

2015-03-23 Thread Dean Wampler
You can use the coalesce method to reduce the number of partitions. You can
reduce to one if the data is not too big. Then write the output.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Mar 16, 2015 at 2:42 PM, Zhan Zhang zzh...@hortonworks.com wrote:

 Each RDD has multiple partitions, each of them will produce one hdfs file
 when saving output. I don’t think you are allowed to have multiple file
 handler writing to the same hdfs file.  You still can load multiple files
 into hive tables, right?

 Thanks..

 Zhan Zhang

 On Mar 15, 2015, at 7:31 AM, tarek_abouzeid tarek.abouzei...@yahoo.com
 wrote:

  i am doing word count example on flume stream and trying to save output
 as
  text files in HDFS , but in the save directory i got multiple sub
  directories each having files with small size , i wonder if there is a
 way
  to append in a large file instead of saving in multiple files , as i
 intend
  to save the output in hive hdfs directory so i can query the result using
  hive
 
  hope anyone have a workaround for this issue , Thanks in advance
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Saving-Dstream-into-a-single-file-tp22058.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




[no subject]

2015-03-23 Thread Udbhav Agarwal
Hi,
I am querying hbase via Spark SQL with java APIs.
Step -1
creating
JavaPairRdd, then JavaRdd, then JavaSchemaRdd.applySchema objects.
Step -2
sqlContext.sql(sql query).

If am updating my hbase database between these two steps(by hbase shell in some 
other console) the query in step two is not picking the updated data from the 
table. Its showing the old data. Can somebody tell how to let spark know I have 
updated my database after spark has created Rdds.




Thanks,
Udbhav Agarwal



Re: registerTempTable is not a member of RDD on spark 1.2?

2015-03-23 Thread Dean Wampler
In 1.2 it's a member of SchemaRDD and it becomes available on RDD (through
the type class mechanism) when you add a SQLContext, like so.

val sqlContext = new SQLContext(sc)import sqlContext._


In 1.3, the method has moved to the new DataFrame type.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Mar 23, 2015 at 5:25 AM, IT CTO goi@gmail.com wrote:

 Hi,

 I am running spark when I use sc.version I get 1.2 but when I call
 registerTempTable(MyTable) I get error saying registedTempTable is not a
 member of RDD

 Why?

 --
 Eran | CTO



registerTempTable is not a member of RDD on spark 1.2?

2015-03-23 Thread IT CTO
Hi,

I am running spark when I use sc.version I get 1.2 but when I call
registerTempTable(MyTable) I get error saying registedTempTable is not a
member of RDD

Why?

-- 
Eran | CTO


Spark error NoClassDefFoundError: org/apache/hadoop/mapred/InputSplit

2015-03-23 Thread , Roy
Hi,


  I am using CDH 5.3.2 packages installation through Cloudera Manager 5.3.2

I am trying to run one spark job with following command

PYTHONPATH=~/code/utils/ spark-submit --master yarn --executor-memory 3G
--num-executors 30 --driver-memory 2G --executor-cores 2 --name=analytics
/home/abc/code/updb/spark/UPDB3analytics.py -date 2015-03-01

but I am getting following error

15/03/23 11:06:49 WARN TaskSetManager: Lost task 9.0 in stage 0.0 (TID 7,
hdp003.dev.xyz.com): java.lang.NoClassDefFoundError:
org/apache/hadoop/mapred/InputSplit
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2532)
at java.lang.Class.getDeclaredConstructors(Class.java:1901)
at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1749)
at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:72)
at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:250)
at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:248)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:247)
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:613)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.mapred.InputSplit
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 25 more

here is the full trace

https://gist.github.com/anonymous/3492f0ec63d7a23c47cf


Re: JAVA_HOME problem with upgrade to 1.3.0

2015-03-23 Thread Williams, Ken


 From: Williams, Ken Williams 
 ken.willi...@windlogics.commailto:ken.willi...@windlogics.com
 Date: Thursday, March 19, 2015 at 10:59 AM
 To: Spark list user@spark.apache.orgmailto:user@spark.apache.org
 Subject: JAVA_HOME problem with upgrade to 1.3.0

 […]
 Finally, I go and check the YARN app master’s web interface (since the job is 
 shown, I know it at least made it that far), and the
 only logs it shows are these:

 Log Type: stderr
 Log Length: 61
 /bin/bash: {{JAVA_HOME}}/bin/java: No such file or directory

 Log Type: stdout
 Log Length: 0

I’m still interested in a solution to this issue if anyone has comments.  I 
also posted to SO if that’s more convenient:


http://stackoverflow.com/questions/29170280/java-home-error-with-upgrade-to-spark-1-3-0

Thanks,

  -Ken




CONFIDENTIALITY NOTICE: This e-mail message is for the sole use of the intended 
recipient(s) and may contain confidential and privileged information. Any 
unauthorized review, use, disclosure or distribution of any kind is strictly 
prohibited. If you are not the intended recipient, please contact the sender 
via reply e-mail and destroy all copies of the original message. Thank you.


Re: join two DataFrames, same column name

2015-03-23 Thread Eric Friedman

 You can include * and a column alias in the same select clause
 var df1 = sqlContext.sql(select *, column_id AS table1_id from table1)


FYI, this does not ultimately work as the * still includes column_id and
you cannot have two columns of that name in the joined DataFrame.  So I
ended up aliasing both sides of the join.

On Sun, Mar 22, 2015 at 1:25 PM, Michael Armbrust mich...@databricks.com
wrote:

 You can include * and a column alias in the same select clause
 var df1 = sqlContext.sql(select *, column_id AS table1_id from table1)


 I'm also hoping to resolve SPARK-6376
 https://issues.apache.org/jira/browse/SPARK-6376 before Spark 1.3.1
 which will let you do something like:
 var df1 = sqlContext.sql(select * from table1).as(t1)
 var df2 = sqlContext.sql(select * from table2).as(t2)
 df1.join(df2, df1(column_id) === df2(column_id)).select(t1.column_id)

 Finally, there is SPARK-6380
 https://issues.apache.org/jira/browse/SPARK-6380 that hopes to simplify
 this particular case.

 Michael

 On Sat, Mar 21, 2015 at 3:02 PM, Eric Friedman eric.d.fried...@gmail.com
 wrote:

 I have a couple of data frames that I pulled from SparkSQL and the
 primary key of one is a foreign key of the same name in the other.  I'd
 rather not have to specify each column in the SELECT statement just so that
 I can rename this single column.

 When I try to join the data frames, I get an exception because it finds
 the two columns of the same name to be ambiguous.  Is there a way to
 specify which side of the join comes from data frame A and which comes from
 B?

 var df1 = sqlContext.sql(select * from table1)
 var df2 = sqlContext.sql(select * from table2)

 df1.join(df2, df1(column_id) === df2(column_id))





Re: Spark 1.2. loses often all executors

2015-03-23 Thread mrm
Hi, 

I have received three replies to my question on my personal e-mail, why
don't they also show up on the mailing list? I would like to reply to the 3
users through a thread.

Thanks,
Maria



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-loses-often-all-executors-tp22162p22187.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: EC2 cluster created by spark using old HDFS 1.0

2015-03-23 Thread Akhil Das
That's a hadoop version incompatibility issue, you need to make sure
everything runs on the same version.

Thanks
Best Regards

On Sat, Mar 21, 2015 at 1:24 AM, morfious902002 anubha...@gmail.com wrote:

 Hi,
 I created a cluster using spark-ec2 script. But it installs HDFS version
 1.0. I would like to use this cluster to connect to HIVE installed on a
 cloudera CDH 5.3 cluster. But I am getting the following error:-

 org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot
 communicate with client vers
 ion 4
 at org.apache.hadoop.ipc.Client.call(Client.java:1070)
 at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
 at com.sun.proxy.$Proxy10.getProtocolVersion(Unknown Source)
 at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)
 at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)
 at
 org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119)
 at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:238)
 at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:203)
 at

 org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:8
 9)
 at
 org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
 at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
 at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
 at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
 at

 org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176)
 at

 org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.
 java:40)
 at

 org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
 at
 org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
 at

 org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
 at

 org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
 at
 org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
 at
 org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
 at

 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at

 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
 at

 org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1511)
 at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
 at
 org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:83)
 at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:815)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:23)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:28)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:30)
 at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:32)
 at $iwC$$iwC$$iwC$$iwC.init(console:34)
 at $iwC$$iwC$$iwC.init(console:36)
 at $iwC$$iwC.init(console:38)
 at $iwC.init(console:40)
 at init(console:42)
 at 

Spark UI tunneling

2015-03-23 Thread sergunok
Is it a way to tunnel Spark UI?

I tried to tunnel client-node:4040  but my browser was redirected from
localhost to some cluster locally visible domain name..

Maybe there is some startup option to encourage Spark UI be fully
accessiable just through single endpoint (address:port)?

Serg.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-UI-tunneling-tp22184.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Sql with python udf fail

2015-03-23 Thread lonely Feb
I caught exceptions in the python UDF code, flush exceptions into a single
file, and made sure the the column number of the output lines as same as
sql schema.

Sth. interesting is that my output line of the UDF code is just 10 columns,
and the exception above is java.lang.ArrayIndexOutOfBoundsException: 9, is
there any inspirations?

2015-03-23 16:24 GMT+08:00 Cheng Lian lian.cs@gmail.com:

 Could you elaborate on the UDF code?


 On 3/23/15 3:43 PM, lonely Feb wrote:

 Hi all, I tried to transfer some hive jobs into spark-sql. When i ran a
 sql job with python udf i got a exception:

 java.lang.ArrayIndexOutOfBoundsException: 9
 at org.apache.spark.sql.catalyst.expressions.GenericRow.apply(
 Row.scala:142)
 at org.apache.spark.sql.catalyst.expressions.BoundReference.
 eval(BoundAttribute.scala:37)
 at org.apache.spark.sql.catalyst.expressions.EqualTo.eval(
 predicates.scala:166)
 at org.apache.spark.sql.catalyst.expressions.
 InterpretedPredicate$$anonfun$apply$1.apply(predicates.scala:30)
 at org.apache.spark.sql.catalyst.expressions.
 InterpretedPredicate$$anonfun$apply$1.apply(predicates.scala:30)
 at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at org.apache.spark.sql.execution.Aggregate$$anonfun$
 execute$1$$anonfun$7.apply(Aggregate.scala:156)
 at org.apache.spark.sql.execution.Aggregate$$anonfun$
 execute$1$$anonfun$7.apply(Aggregate.scala:151)
 at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
 at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(
 MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.
 scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(
 MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.
 scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(
 ShuffleMapTask.scala:68)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(
 ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at org.apache.spark.executor.Executor$TaskRunner.run(
 Executor.scala:197)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(
 ThreadPoolExecutor.java:1145)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(
 ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)

 I suspected there was an odd line in the input file. But the input file
 is so large and i could not found any abnormal lines with several jobs to
 check. How can i get the abnormal line here ?





RDD storage in spark steaming

2015-03-23 Thread abhi
HI,
i have a simple question about creating RDD . Whenever RDD is created in
spark streaming for the particular time window .When does the RDD gets
stored .

1. Does it get stored at the Driver machine ? or it gets stored on all the
machines in the cluster ?
2. Does the data gets stored in memory by default ? Can it store at the
memory and disk ? How can it configured ?


Thanks,
Abhi


Re: RDD storage in spark steaming

2015-03-23 Thread Jeffrey Jedele
Hey Abhi,
many of StreamingContext's methods to create input streams take a
StorageLevel parameter to configure this behavior. RDD partitions are
generally stored in the in-memory cache of worker nodes I think. You can
also configure replication and spilling to disk if needed.

Regards,
Jeff

2015-03-23 15:26 GMT+01:00 abhi abhishek...@gmail.com:

 HI,
 i have a simple question about creating RDD . Whenever RDD is created in
 spark streaming for the particular time window .When does the RDD gets
 stored .

 1. Does it get stored at the Driver machine ? or it gets stored on all the
 machines in the cluster ?
 2. Does the data gets stored in memory by default ? Can it store at the
 memory and disk ? How can it configured ?


 Thanks,
 Abhi




Re: join two DataFrames, same column name

2015-03-23 Thread Eric Friedman
Michael, thank you for the workaround and for letting me know of the
upcoming enhancements, both of which sound appealing.

On Sun, Mar 22, 2015 at 1:25 PM, Michael Armbrust mich...@databricks.com
wrote:

 You can include * and a column alias in the same select clause
 var df1 = sqlContext.sql(select *, column_id AS table1_id from table1)


 I'm also hoping to resolve SPARK-6376
 https://issues.apache.org/jira/browse/SPARK-6376 before Spark 1.3.1
 which will let you do something like:
 var df1 = sqlContext.sql(select * from table1).as(t1)
 var df2 = sqlContext.sql(select * from table2).as(t2)
 df1.join(df2, df1(column_id) === df2(column_id)).select(t1.column_id)

 Finally, there is SPARK-6380
 https://issues.apache.org/jira/browse/SPARK-6380 that hopes to simplify
 this particular case.

 Michael

 On Sat, Mar 21, 2015 at 3:02 PM, Eric Friedman eric.d.fried...@gmail.com
 wrote:

 I have a couple of data frames that I pulled from SparkSQL and the
 primary key of one is a foreign key of the same name in the other.  I'd
 rather not have to specify each column in the SELECT statement just so that
 I can rename this single column.

 When I try to join the data frames, I get an exception because it finds
 the two columns of the same name to be ambiguous.  Is there a way to
 specify which side of the join comes from data frame A and which comes from
 B?

 var df1 = sqlContext.sql(select * from table1)
 var df2 = sqlContext.sql(select * from table2)

 df1.join(df2, df1(column_id) === df2(column_id))





Re: registerTempTable is not a member of RDD on spark 1.2?

2015-03-23 Thread IT CTO
Thanks.
I am new to the environment and running cloudera CDH5.3 with spark in it.

apparently when running in spark-shell this command  val sqlContext = new
SQLContext(sc)
I am failing with the not found type SQLContext

Any idea why?

On Mon, Mar 23, 2015 at 3:05 PM, Dean Wampler deanwamp...@gmail.com wrote:

 In 1.2 it's a member of SchemaRDD and it becomes available on RDD (through
 the type class mechanism) when you add a SQLContext, like so.

 val sqlContext = new SQLContext(sc)import sqlContext._


 In 1.3, the method has moved to the new DataFrame type.

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Mon, Mar 23, 2015 at 5:25 AM, IT CTO goi@gmail.com wrote:

 Hi,

 I am running spark when I use sc.version I get 1.2 but when I call
 registerTempTable(MyTable) I get error saying registedTempTable is not a
 member of RDD

 Why?

 --
 Eran | CTO





-- 
Eran | CTO


Re: registerTempTable is not a member of RDD on spark 1.2?

2015-03-23 Thread Ted Yu
Have you tried adding the following ?

import org.apache.spark.sql.SQLContext

Cheers

On Mon, Mar 23, 2015 at 6:45 AM, IT CTO goi@gmail.com wrote:

 Thanks.
 I am new to the environment and running cloudera CDH5.3 with spark in it.

 apparently when running in spark-shell this command  val sqlContext = new
 SQLContext(sc)
 I am failing with the not found type SQLContext

 Any idea why?

 On Mon, Mar 23, 2015 at 3:05 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 In 1.2 it's a member of SchemaRDD and it becomes available on RDD
 (through the type class mechanism) when you add a SQLContext, like so.

 val sqlContext = new SQLContext(sc)import sqlContext._


 In 1.3, the method has moved to the new DataFrame type.

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Mon, Mar 23, 2015 at 5:25 AM, IT CTO goi@gmail.com wrote:

 Hi,

 I am running spark when I use sc.version I get 1.2 but when I call
 registerTempTable(MyTable) I get error saying registedTempTable is not a
 member of RDD

 Why?

 --
 Eran | CTO





 --
 Eran | CTO



Spark RDD mapped to Hbase to be updateable

2015-03-23 Thread Siddharth Ubale
Hi,

We have a JavaRDD mapped to a hbase table  and when we query on the Hbase table 
using Spark-sql API we can access the data. However when we update Hbase table 
while the SparkSQL  SparkConf is intialised we cannot see updated data. Is 
there any way we can have the RDD mapped to Hbase updated as and when Hbase 
table reflects any change??

Thanks,
Siddharth Ubale,
Synchronized Communications
#43, Velankani Tech Park, Block No. II,
3rd Floor, Electronic City Phase I,
Bangalore – 560 100
Tel : +91 80 3202 4060
Web: www.syncoms.comhttp://www.syncoms.com/
[LogoNEWmohLARGE]
London|Bangalore|Orlando

we innovate, plan, execute, and transform the business​



RE: Spark SQL udf(ScalaUdf) is very slow

2015-03-23 Thread Cheng, Hao
This is a very interesting issue, the root reason for the lower performance 
probably is, in Scala UDF, Spark SQL converts the data type from internal 
representation to Scala representation via Scala reflection recursively.

Can you create a Jira issue for tracking this? I can start to work on the 
improvement soon.

From: zzcclp [mailto:441586...@qq.com]
Sent: Monday, March 23, 2015 5:10 PM
To: user@spark.apache.org
Subject: Spark SQL udf(ScalaUdf) is very slow

My test env: 1. Spark version is 1.3.0 2. 3 node per 80G/20C 3. read 250G 
parquet files from hdfs Test case: 1. register floor func with command: 
sqlContext.udf.register(floor, (ts: Int) = ts - ts % 300), then run with sql 
select chan, floor(ts) as tt, sum(size) from qlogbase3 group by chan, 
floor(ts), it takes 17 minutes. == Physical Plan == Aggregate false, 
[chan#23015,PartialGroup#23500], [chan#23015,PartialGroup#23500 AS 
tt#23494,CombineSum(PartialSum#23499L) AS c2#23495L] Exchange (HashPartitioning 
[chan#23015,PartialGroup#23500], 54) Aggregate true, 
[chan#23015,scalaUDF(ts#23016)], [chan#23015,scalaUDF(ts#23016) AS 
PartialGroup#23500,SUM(size#23023L) AS PartialSum#23499L] PhysicalRDD 
[chan#23015,ts#23016,size#23023L], MapPartitionsRDD[115] at map at 
newParquet.scala:562 2. run with sql select chan, (ts - ts % 300) as tt, 
sum(size) from qlogbase3 group by chan, (ts - ts % 300), it takes only 5 
minutes. == Physical Plan == Aggregate false, [chan#23015,PartialGroup#23349], 
[chan#23015,PartialGroup#23349 AS tt#23343,CombineSum(PartialSum#23348L) AS 
c2#23344L] Exchange (HashPartitioning [chan#23015,PartialGroup#23349], 54) 
Aggregate true, [chan#23015,(ts#23016 - (ts#23016 % 300))], 
[chan#23015,(ts#23016 - (ts#23016 % 300)) AS 
PartialGroup#23349,SUM(size#23023L) AS PartialSum#23348L] PhysicalRDD 
[chan#23015,ts#23016,size#23023L], MapPartitionsRDD[83] at map at 
newParquet.scala:562 3. use HiveContext with sql select chan, floor((ts - ts % 
300)) as tt, sum(size) from qlogbase3 group by chan, floor((ts - ts % 300)) it 
takes only 5 minutes too. == Physical Plan == Aggregate false, 
[chan#23015,PartialGroup#23108L], [chan#23015,PartialGroup#23108L AS 
tt#23102L,CombineSum(PartialSum#23107L) AS _c2#23103L] Exchange 
(HashPartitioning [chan#23015,PartialGroup#23108L], 54) Aggregate true, 
[chan#23015,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor((ts#23016
 - (ts#23016 % 300)))], 
[chan#23015,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor((ts#23016
 - (ts#23016 % 300))) AS PartialGroup#23108L,SUM(size#23023L) AS 
PartialSum#23107L] PhysicalRDD [chan#23015,ts#23016,size#23023L], 
MapPartitionsRDD[28] at map at newParquet.scala:562 Why? ScalaUdf is so slow?? 
How to improve it?

View this message in context: Spark SQL udf(ScalaUdf) is very 
slowhttp://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-udf-ScalaUdf-is-very-slow-tp22185.html
Sent from the Apache Spark User List mailing list 
archivehttp://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.


Re: Spark 1.2. loses often all executors

2015-03-23 Thread Ted Yu
In this thread:
http://search-hadoop.com/m/JW1q5DM69G

I only saw two replies. Maybe some people forgot to use 'Reply to All' ?

Cheers

On Mon, Mar 23, 2015 at 8:19 AM, mrm ma...@skimlinks.com wrote:

 Hi,

 I have received three replies to my question on my personal e-mail, why
 don't they also show up on the mailing list? I would like to reply to the 3
 users through a thread.

 Thanks,
 Maria



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-loses-often-all-executors-tp22162p22187.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Is yarn-standalone mode deprecated?

2015-03-23 Thread nitinkak001
Is yarn-standalone mode deprecated in Spark now. The reason I am asking is
because while I can find it in 0.9.0
documentation(https://spark.apache.org/docs/0.9.0/running-on-yarn.html). I
am not able to find it in 1.2.0. 

I am using this mode to run the Spark jobs from Oozie as a java action.
Removing this mode will prevent me from doing that. Are there any other ways
of running a Spark job from Oozie other than Shell action? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-yarn-standalone-mode-deprecated-tp22188.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to check that a dataset is sorted after it has been written out?

2015-03-23 Thread Sean Owen
Data is not (necessarily) sorted when read from disk, no. A file might
have many blocks even, and while a block yields a partition in
general, the order in which those partitions appear in the RDD is not
defined. This is why you'd sort if you need the data sorted.

I think you could conceivably make some custom RDD or InputFormat that
reads blocks in a well-defined order and, assuming the data is sorted
in some knowable way on disk, then must have them sorted. I think
that's even been brought up.

Deciding whether the data is sorted is quite different. You'd have to
decide what ordering you expect (is part 0 before part 1? should it be
sorted in a part file?) and then just verify that externally.

On Fri, Mar 20, 2015 at 10:41 PM, Michael Albert
m_albert...@yahoo.com.invalid wrote:
 Greetings!

 I sorted a dataset in Spark and then wrote it out in avro/parquet.

 Then I wanted to check that it was sorted.

 It looks like each partition has been sorted, but when reading in, the first
 partition (i.e., as
 seen in the partition index of mapPartitionsWithIndex) is not the same  as
 implied by
 the names of the parquet files (even when the number of partitions is the
 same in the
 rdd which was read as on disk).

 If I take() a few hundred values, they are sorted, but they are *not* the
 same as if I
 explicitly open part-r-0.parquet and take values from that.

 It seems that when opening the rdd, the partitions of the rdd are not in
 the same
 order as implied by the data on disk (i.e., part-r-0.parquet,
 part-r-1.parquet, etc).

 So, how might one read the data so that one maintains the sort order?

 And while on the subject, after the terasort, how did they check that the
 data was actually sorted correctly? (or did they :-) ? ).

 Is there any way to read the data back in so as to preserve the sort, or do
 I need to
 zipWithIndex before writing it out, and write the index at that time? (I
 haven't tried the
 latter yet).

 Thanks!
 -Mike


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark sql thrift server slower than hive

2015-03-23 Thread Arush Kharbanda
A basis change needed by spark is setting the executor memory which
defaults to 512MB by default.

On Mon, Mar 23, 2015 at 10:16 AM, Denny Lee denny.g@gmail.com wrote:

 How are you running your spark instance out of curiosity?  Via YARN or
 standalone mode?  When connecting Spark thriftserver to the Spark service,
 have you allocated enough memory and CPU when executing with spark?

 On Sun, Mar 22, 2015 at 3:39 AM fanooos dev.fano...@gmail.com wrote:

 We have cloudera CDH 5.3 installed on one machine.

 We are trying to use spark sql thrift server to execute some analysis
 queries against hive table.

 Without any changes in the configurations, we run the following query on
 both hive and spark sql thrift server

 *select * from tableName;*

 The time taken by spark is larger than the time taken by hive which is not
 supposed to be the like that.

 The hive table is mapped to json files stored on HDFS directory and we are
 using *org.openx.data.jsonserde.JsonSerDe* for
 serialization/deserialization.

 Why spark takes much more time to execute the query than hive ?



 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/Spark-sql-thrift-server-slower-than-
 hive-tp22177.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 

[image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: How to use DataFrame with MySQL

2015-03-23 Thread gavin zhang
OK,I found what the problem is: It couldn't work with mysql-connector-5.0.8.
I updated the connector version to 5.1.34 and it worked.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-DataFrame-with-MySQL-tp22178p22182.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is it possible to use json4s 3.2.11 with Spark 1.3.0?

2015-03-23 Thread Alexey Zinoviev
Thanks Ted, I'll try, hope there's no transitive dependencies on 3.2.10.

On Tue, Mar 24, 2015 at 4:21 AM, Ted Yu yuzhih...@gmail.com wrote:

 Looking at core/pom.xml :
 dependency
   groupIdorg.json4s/groupId
   artifactIdjson4s-jackson_${scala.binary.version}/artifactId
   version3.2.10/version
 /dependency

 The version is hard coded.

 You can rebuild Spark 1.3.0 with json4s 3.2.11

 Cheers

 On Mon, Mar 23, 2015 at 2:12 PM, Alexey Zinoviev 
 alexey.zinov...@gmail.com wrote:

 Spark has a dependency on json4s 3.2.10, but this version has several
 bugs and I need to use 3.2.11. I added json4s-native 3.2.11 dependency to
 build.sbt and everything compiled fine. But when I spark-submit my JAR it
 provides me with 3.2.10.


 build.sbt

 import sbt.Keys._

 name := sparkapp

 version := 1.0

 scalaVersion := 2.10.4

 libraryDependencies += org.apache.spark %% spark-core  % 1.3.0 %
 provided

 libraryDependencies += org.json4s %% json4s-native % 3.2.11`


 plugins.sbt

 logLevel := Level.Warn

 resolvers += Resolver.url(artifactory, url(
 http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases
 ))(Resolver.ivyStylePatterns)

 addSbtPlugin(com.eed3si9n % sbt-assembly % 0.13.0)


 App1.scala

 import org.apache.spark.SparkConf
 import org.apache.spark.rdd.RDD
 import org.apache.spark.{Logging, SparkConf, SparkContext}
 import org.apache.spark.SparkContext._

 object App1 extends Logging {
   def main(args: Array[String]) = {
 val conf = new SparkConf().setAppName(App1)
 val sc = new SparkContext(conf)
 println(sjson4s version: ${org.json4s.BuildInfo.version.toString})
   }
 }



 sbt 0.13.7, sbt-assembly 0.13.0, Scala 2.10.4

 Is it possible to force 3.2.11 version usage?

 Thanks,
 Alexey





RE: Use pig load function in spark

2015-03-23 Thread Dai, Kevin
Hi, Yin

But our data is customized sequence file which can be read by our customized 
load in pig

And I want to use spark to reuse these load function to read data and transfer 
them to the RDD.

Best Regards,
Kevin.

From: Yin Huai [mailto:yh...@databricks.com]
Sent: 2015年3月24日 11:53
To: Dai, Kevin
Cc: Paul Brown; user@spark.apache.org
Subject: Re: Use pig load function in spark

Hello Kevin,

You can take a look at our generic load 
functionhttps://spark.apache.org/docs/1.3.0/sql-programming-guide.html#generic-loadsave-functions.

For example, you can use

val df = sqlContext.load(/myData, parquet)
To load a parquet dataset stored in /myData as a 
DataFramehttps://spark.apache.org/docs/1.3.0/sql-programming-guide.html#dataframes.

You can use it to load data stored in various formats, like json (Spark 
built-in), parquet (Spark built-in), 
avrohttps://github.com/databricks/spark-avro, and 
csvhttps://github.com/databricks/spark-csv.

Thanks,

Yin

On Mon, Mar 23, 2015 at 7:14 PM, Dai, Kevin 
yun...@ebay.commailto:yun...@ebay.com wrote:
Hi, Paul

You are right.

The story is that we have a lot of pig load function to load our different data.

And now we want to use spark to read and process these data.

So we want to figure out a way to reuse our existing load function in spark to 
read these data.

Any idea?

Best Regards,
Kevin.

From: Paul Brown [mailto:p...@mult.ifario.usmailto:p...@mult.ifario.us]
Sent: 2015年3月24日 4:11
To: Dai, Kevin
Subject: Re: Use pig load function in spark


The answer is Maybe, but you probably don't want to do that..

A typical Pig load function is devoted to bridging external data into Pig's 
type system, but you don't really need to do that in Spark because it is 
(thankfully) not encumbered by Pig's type system.  What you probably want to do 
is to figure out a way to use native Spark facilities (e.g., textFile) coupled 
with some of the logic out of your Pig load function necessary to turn your 
external data into an RDD.


—
p...@mult.ifario.usmailto:p...@mult.ifario.us | Multifarious, Inc. | 
http://mult.ifario.us/

On Mon, Mar 23, 2015 at 2:29 AM, Dai, Kevin 
yun...@ebay.commailto:yun...@ebay.com wrote:
Hi, all

Can spark use pig’s load function to load data?

Best Regards,
Kevin.




Re: Is it possible to use json4s 3.2.11 with Spark 1.3.0?

2015-03-23 Thread Alexey Zinoviev
Thanks Marcelo, this options solved the problem (I'm using 1.3.0), but it
works only if I remove extends Logging from the object, with extends
Logging it return:

Exception in thread main java.lang.LinkageError: loader constraint
violation in interface itable initialization: when resolving method
App1$.logInfo(Lscala/Function0;Ljava/lang/Throwable;)V the class loader
(instance of org/apache/spark/util/ChildFirstURLClassLoader) of the current
class, App1$, and the class loader (instance of
sun/misc/Launcher$AppClassLoader) for interface org/apache/spark/Logging
have different Class objects for the type scala/Function0 used in the
signature
at App1.main(App1.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Do you have any idea what's wrong with Logging?

PS: I'm running it with spark-1.3.0/bin/spark-submit --class App1 --conf
spark.driver.userClassPathFirst=true --conf
spark.executor.userClassPathFirst=true
$HOME/projects/sparkapp/target/scala-2.10/sparkapp-assembly-1.0.jar

Thanks,
Alexey


On Tue, Mar 24, 2015 at 5:03 AM, Marcelo Vanzin van...@cloudera.com wrote:

 You could build a far jar for your application containing both your
 code and the json4s library, and then run Spark with these two
 options:

   spark.driver.userClassPathFirst=true
   spark.executor.userClassPathFirst=true

 Both only work in 1.3. (1.2 has spark.files.userClassPathFirst, but
 that only works for executors.)


 On Mon, Mar 23, 2015 at 2:12 PM, Alexey Zinoviev
 alexey.zinov...@gmail.com wrote:
  Spark has a dependency on json4s 3.2.10, but this version has several
 bugs
  and I need to use 3.2.11. I added json4s-native 3.2.11 dependency to
  build.sbt and everything compiled fine. But when I spark-submit my JAR it
  provides me with 3.2.10.
 
 
  build.sbt
 
  import sbt.Keys._
 
  name := sparkapp
 
  version := 1.0
 
  scalaVersion := 2.10.4
 
  libraryDependencies += org.apache.spark %% spark-core  % 1.3.0 %
  provided
 
  libraryDependencies += org.json4s %% json4s-native % 3.2.11`
 
 
  plugins.sbt
 
  logLevel := Level.Warn
 
  resolvers += Resolver.url(artifactory,
  url(http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases
 ))(Resolver.ivyStylePatterns)
 
  addSbtPlugin(com.eed3si9n % sbt-assembly % 0.13.0)
 
 
  App1.scala
 
  import org.apache.spark.SparkConf
  import org.apache.spark.rdd.RDD
  import org.apache.spark.{Logging, SparkConf, SparkContext}
  import org.apache.spark.SparkContext._
 
  object App1 extends Logging {
def main(args: Array[String]) = {
  val conf = new SparkConf().setAppName(App1)
  val sc = new SparkContext(conf)
  println(sjson4s version: ${org.json4s.BuildInfo.version.toString})
}
  }
 
 
 
  sbt 0.13.7, sbt-assembly 0.13.0, Scala 2.10.4
 
  Is it possible to force 3.2.11 version usage?
 
  Thanks,
  Alexey



 --
 Marcelo



Re: Is yarn-standalone mode deprecated?

2015-03-23 Thread Sandy Ryza
The mode is not deprecated, but the name yarn-standalone is now
deprecated.  It's now referred to as yarn-cluster.

-Sandy

On Mon, Mar 23, 2015 at 11:49 AM, nitinkak001 nitinkak...@gmail.com wrote:

 Is yarn-standalone mode deprecated in Spark now. The reason I am asking is
 because while I can find it in 0.9.0
 documentation(https://spark.apache.org/docs/0.9.0/running-on-yarn.html). I
 am not able to find it in 1.2.0.

 I am using this mode to run the Spark jobs from Oozie as a java action.
 Removing this mode will prevent me from doing that. Are there any other
 ways
 of running a Spark job from Oozie other than Shell action?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Is-yarn-standalone-mode-deprecated-tp22188.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-23 Thread Yiannis Gkoufas
Hi Yin,

Yes, I have set spark.executor.memory to 8g and the worker memory to 16g
without any success.
I cannot figure out how to increase the number of mapPartitions tasks.

Thanks a lot

On 20 March 2015 at 18:44, Yin Huai yh...@databricks.com wrote:

 spark.sql.shuffle.partitions only control the number of tasks in the
 second stage (the number of reducers). For your case, I'd say that the
 number of tasks in the first state (number of mappers) will be the number
 of files you have.

 Actually, have you changed spark.executor.memory (it controls the
 memory for an executor of your application)? I did not see it in your
 original email. The difference between worker memory and executor memory
 can be found at (http://spark.apache.org/docs/1.3.0/spark-standalone.html
 ),

 SPARK_WORKER_MEMORY
 Total amount of memory to allow Spark applications to use on the machine,
 e.g. 1000m, 2g (default: total memory minus 1 GB); note that each
 application's individual memory is configured using its
 spark.executor.memory property.


 On Fri, Mar 20, 2015 at 9:25 AM, Yiannis Gkoufas johngou...@gmail.com
 wrote:

 Actually I realized that the correct way is:

 sqlContext.sql(set spark.sql.shuffle.partitions=1000)

 but I am still experiencing the same behavior/error.

 On 20 March 2015 at 16:04, Yiannis Gkoufas johngou...@gmail.com wrote:

 Hi Yin,

 the way I set the configuration is:

 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 sqlContext.setConf(spark.sql.shuffle.partitions,1000);

 it is the correct way right?
 In the mapPartitions task (the first task which is launched), I get
 again the same number of tasks and again the same error. :(

 Thanks a lot!

 On 19 March 2015 at 17:40, Yiannis Gkoufas johngou...@gmail.com wrote:

 Hi Yin,

 thanks a lot for that! Will give it a shot and let you know.

 On 19 March 2015 at 16:30, Yin Huai yh...@databricks.com wrote:

 Was the OOM thrown during the execution of first stage (map) or the
 second stage (reduce)? If it was the second stage, can you increase the
 value of spark.sql.shuffle.partitions and see if the OOM disappears?

 This setting controls the number of reduces Spark SQL will use and the
 default is 200. Maybe there are too many distinct values and the memory
 pressure on every task (of those 200 reducers) is pretty high. You can
 start with 400 and increase it until the OOM disappears. Hopefully this
 will help.

 Thanks,

 Yin


 On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas johngou...@gmail.com
  wrote:

 Hi Yin,

 Thanks for your feedback. I have 1700 parquet files, sized 100MB
 each. The number of tasks launched is equal to the number of parquet 
 files.
 Do you have any idea on how to deal with this situation?

 Thanks a lot
 On 18 Mar 2015 17:35, Yin Huai yh...@databricks.com wrote:

 Seems there are too many distinct groups processed in a task, which
 trigger the problem.

 How many files do your dataset have and how large is a file? Seems
 your query will be executed with two stages, table scan and map-side
 aggregation in the first stage and the final round of reduce-side
 aggregation in the second stage. Can you take a look at the numbers of
 tasks launched in these two stages?

 Thanks,

 Yin

 On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas 
 johngou...@gmail.com wrote:

 Hi there, I set the executor memory to 8g but it didn't help

 On 18 March 2015 at 13:59, Cheng Lian lian.cs@gmail.com
 wrote:

 You should probably increase executor memory by setting
 spark.executor.memory.

 Full list of available configurations can be found here
 http://spark.apache.org/docs/latest/configuration.html

 Cheng


 On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:

 Hi there,

 I was trying the new DataFrame API with some basic operations on
 a parquet dataset.
 I have 7 nodes of 12 cores and 8GB RAM allocated to each worker
 in a standalone cluster mode.
 The code is the following:

 val people = sqlContext.parquetFile(/data.parquet);
 val res = people.groupBy(name,date).
 agg(sum(power),sum(supply)).take(10);
 System.out.println(res);

 The dataset consists of 16 billion entries.
 The error I get is java.lang.OutOfMemoryError: GC overhead limit
 exceeded

 My configuration is:

 spark.serializer org.apache.spark.serializer.KryoSerializer
 spark.driver.memory6g
 spark.executor.extraJavaOptions -XX:+UseCompressedOops
 spark.shuffle.managersort

 Any idea how can I workaround this?

 Thanks a lot












Re: Spark error NoClassDefFoundError: org/apache/hadoop/mapred/InputSplit

2015-03-23 Thread Ted Yu
InputSplit is in hadoop-mapreduce-client-core jar

Please check that the jar is in your classpath.

Cheers

On Mon, Mar 23, 2015 at 8:10 AM, , Roy rp...@njit.edu wrote:

 Hi,


   I am using CDH 5.3.2 packages installation through Cloudera Manager 5.3.2

 I am trying to run one spark job with following command

 PYTHONPATH=~/code/utils/ spark-submit --master yarn --executor-memory 3G
 --num-executors 30 --driver-memory 2G --executor-cores 2 --name=analytics
 /home/abc/code/updb/spark/UPDB3analytics.py -date 2015-03-01

 but I am getting following error

 15/03/23 11:06:49 WARN TaskSetManager: Lost task 9.0 in stage 0.0 (TID 7,
 hdp003.dev.xyz.com): java.lang.NoClassDefFoundError:
 org/apache/hadoop/mapred/InputSplit
 at java.lang.Class.getDeclaredConstructors0(Native Method)
 at java.lang.Class.privateGetDeclaredConstructors(Class.java:2532)
 at java.lang.Class.getDeclaredConstructors(Class.java:1901)
 at
 java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1749)
 at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:72)
 at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:250)
 at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:248)
 at java.security.AccessController.doPrivileged(Native Method)
 at
 java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:247)
 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:613)
 at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
 at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
 at
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.lang.ClassNotFoundException:
 org.apache.hadoop.mapred.InputSplit
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 ... 25 more

 here is the full trace

 https://gist.github.com/anonymous/3492f0ec63d7a23c47cf




Re: PySpark, ResultIterable and taking a list and saving it into different parquet files

2015-03-23 Thread chuwiey
In case anyone wants to learn about my solution for this:
groupByKey is highly inefficient due to the swapping of elements between the
different partitions as well as requiring enough mem in each worker to
handle the elements for each group. So instead of using groupByKey, I ended
up taking the flatMap result, and using subtractByKey in such a way that I
ended up with multiple rdds only including the key I wanted; Now I can
iterate over each rdd independently and end up with multiple parquets.

Thinking of submitting a splitByKeys() pull request, that would take an
array of keys and an rdd, and return an array of rdds each with only one of
the keys. Any thoughts around this?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-ResultIterable-and-taking-a-list-and-saving-it-into-different-parquet-files-tp22152p22189.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is yarn-standalone mode deprecated?

2015-03-23 Thread Sandy Ryza
The former is deprecated.  However, the latter is functionally equivalent
to it.  Both launch an app in what is now called yarn-cluster mode.

Oozie now also has a native Spark action, though I'm not familiar on the
specifics.

-Sandy

On Mon, Mar 23, 2015 at 1:01 PM, Nitin kak nitinkak...@gmail.com wrote:

 To be more clear, I am talking about

 SPARK_JAR=SPARK_ASSEMBLY_JAR_FILE ./bin/spark-class 
 org.apache.spark.deploy.yarn.Client \
   --jar YOUR_APP_JAR_FILE \
   --class APP_MAIN_CLASS \
   --args APP_MAIN_ARGUMENTS \
   --num-workers NUMBER_OF_WORKER_MACHINES \
   --master-class ApplicationMaster_CLASS
   --master-memory MEMORY_FOR_MASTER \
   --worker-memory MEMORY_PER_WORKER \
   --worker-cores CORES_PER_WORKER \
   --name application_name \
   --queue queue_name \
   --addJars any_local_files_used_in_SparkContext.addJar \
   --files files_for_distributed_cache \
   --archives archives_for_distributed_cache

 which I thought was the yarn-standalone mode

 vs

 spark-submit

 ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
 --master yarn-cluster \
 --num-executors 3 \
 --driver-memory 4g \
 --executor-memory 2g \
 --executor-cores 1 \
 --queue thequeue \
 lib/spark-examples*.jar


 I didnt see example of ./bin/spark-class in 1.2.0 documentation, so am
 wondering if that is deprecated.





 On Mon, Mar 23, 2015 at 12:11 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 The mode is not deprecated, but the name yarn-standalone is now
 deprecated.  It's now referred to as yarn-cluster.

 -Sandy

 On Mon, Mar 23, 2015 at 11:49 AM, nitinkak001 nitinkak...@gmail.com
 wrote:

 Is yarn-standalone mode deprecated in Spark now. The reason I am asking
 is
 because while I can find it in 0.9.0
 documentation(https://spark.apache.org/docs/0.9.0/running-on-yarn.html).
 I
 am not able to find it in 1.2.0.

 I am using this mode to run the Spark jobs from Oozie as a java action.
 Removing this mode will prevent me from doing that. Are there any other
 ways
 of running a Spark job from Oozie other than Shell action?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Is-yarn-standalone-mode-deprecated-tp22188.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Getting around Serializability issues for types not in my control

2015-03-23 Thread adelbertc
Hey all,

I'd like to use the Scalaz library in some of my Spark jobs, but am running
into issues where some stuff I use from Scalaz is not serializable. For
instance, in Scalaz there is a trait

/** In Scalaz */
trait Applicative[F[_]] {
  def apply2[A, B, C](fa: F[A], fb: F[B])(f: (A, B) = C): F[C]
  def point[A](a: = A): F[A]
}

But when I try to use it in say, in an `RDD#aggregate` call I get:


Caused by: java.io.NotSerializableException:
scalaz.std.OptionInstances$$anon$1
Serialization stack:
- object not serializable (class: scalaz.std.OptionInstances$$anon$1,
value: scalaz.std.OptionInstances$$anon$1@4516ee8c)
- field (class: dielectric.syntax.RDDOps$$anonfun$1, name: G$1, type:
interface scalaz.Applicative)
- object (class dielectric.syntax.RDDOps$$anonfun$1, function2)
- field (class: dielectric.syntax.RDDOps$$anonfun$traverse$extension$1,
name: apConcat$1, type: interface scala.Function2)
- object (class dielectric.syntax.RDDOps$$anonfun$traverse$extension$1,
function2)

Outside of submitting a PR to Scalaz to make things Serializable, what can I
do to make things Serializable? I considered something like

implicit def applicativeSerializable[F[_]](implicit F: Applicative[F]):
SomeSerializableType[F] =
  new SomeSerializableType { ... } ??

Not sure how to go about doing it - I looked at java.io.Externalizable but
given `scalaz.Applicative` has no value members I'm not sure how to
implement the interface.

Any guidance would be much appreciated - thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Getting-around-Serializability-issues-for-types-not-in-my-control-tp22193.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: JDBC DF using DB2

2015-03-23 Thread Ted Yu
bq. is to modify compute_classpath.sh on all worker nodes to include your
driver JARs.

Please follow the above advice.

Cheers

On Mon, Mar 23, 2015 at 12:34 PM, Jack Arenas j...@ckarenas.com wrote:

 Hi Team,



 I’m trying to create a DF using jdbc as detailed here
 https://spark.apache.org/docs/1.3.0/sql-programming-guide.html#jdbc-to-other-databases
  –
 I’m currently using DB2 v9.7.0.6 and I’ve tried to use the db2jcc.jar and
 db2jcc_license_cu.jar combo, and while it works in --master local using the
 command below, I get some strange behavior in --master yarn-client. Here is
 the command:



 *val df = sql.load(jdbc, Map(url -
 jdbc:db2://host:port/db:currentSchema=schema;user=user;password=password;,
 driver - com.ibm.db2.jcc.DB2Driver, dbtable - table))*



 It seems to also be working on yarn-client because once executed I get the
 following log:

 *df: org.apache.spark.sql.DataFrame = [DATE_FIELD: date, INT_FIELD: int,
 DOUBLE_FIELD: double]*



 Which indicates me that Spark was able to connect to the DB. But once I
 run *df.count() *or *df.take(5).foreach(println)* in order to operate on
 the data and get a result, I get back a ‘*No suitable driver found*’
 exception, which makes me think the driver wasn’t shipped with the spark
 job.



 I’ve tried using *--driver-class-path, --jars, SPARK_CLASSPATH* to add
 the jars to the spark job. I also have the jars in my*$CLASSPATH* and
 *$HADOOP_CLASSPATH*.



 I also saw this in the trouble shooting section, but quite frankly I’m not
 sure what primordial class loader it’s talking about:



 The JDBC driver class must be visible to the primordial class loader on
 the client session and on all executors. This is because Java’s
 DriverManager class does a security check that results in it ignoring all
 drivers not visible to the primordial class loader when one goes to open a
 connection. One convenient way to do this is to modify compute_classpath.sh
 on all worker nodes to include your driver JARs.


 Any advice is welcome!



 Thanks,

 Jack









Re: Spark per app logging

2015-03-23 Thread Udit Mehta
Yes each application can use its own log4j.properties but I am not sure how
to configure log4j so that the driver and executor write to file. This is
because if we set the spark.executor.extraJavaOptions it will read from a
file and that is not what I need.
How do I configure log4j from the app so that the driver and the executors
use these configs?

Thanks,
Udit

On Sat, Mar 21, 2015 at 3:13 AM, Jeffrey Jedele jeffrey.jed...@gmail.com
wrote:

 Hi,
 I'm not completely sure about this either, but this is what we are doing
 currently:
 Configure your logging to write to STDOUT, not to a file explicitely.
 Spark will capture stdour and stderr and separate the messages into a
 app/driver folder structure in the configured worker directory.

 We then use logstash to collect the logs and index them to a elasticsearch
 cluster (Spark seems to produce a lot of logging data). With some simple
 regex processing, you also get the application id as searchable field.

 Regards,
 Jeff

 2015-03-20 22:37 GMT+01:00 Ted Yu yuzhih...@gmail.com:

 Are these jobs the same jobs, just run by different users or, different
 jobs ?
 If the latter, can each application use its own log4j.properties ?

 Cheers

 On Fri, Mar 20, 2015 at 1:43 PM, Udit Mehta ume...@groupon.com wrote:

 Hi,

 We have spark setup such that there are various users running multiple
 jobs at the same time. Currently all the logs go to 1 file specified in the
 log4j.properties.
 Is it possible to configure log4j in spark for per app/user logging
 instead of sending all logs to 1 file mentioned in the log4j.properties?

 Thanks
 Udit






Re: Spark streaming alerting

2015-03-23 Thread Mohit Anchlia
I think I didn't explain myself properly :) What I meant to say was that
generally spark worker runs on either on HDFS's data nodes or on Cassandra
nodes, which typically is in a private network (protected). When a
condition is matched it's difficult to send out the alerts directly from
the worker nodes because of the security concerns. I was wondering if there
is a way to listen on the events as they occur on the sliding window scale
or is the best way to accomplish is to post back to a queue?

On Mon, Mar 23, 2015 at 2:22 AM, Khanderao Kand Gmail 
khanderao.k...@gmail.com wrote:

 Akhil

 You are right in tour answer to what Mohit wrote. However what Mohit seems
 to be alluring but did not write properly might be different.

 Mohit

 You are wrong in saying generally streaming works in HDFS and cassandra
 . Streaming typically works with streaming or queing source like Kafka,
 kinesis, Twitter, flume, zeroMQ, etc (but can also from HDFS and S3 )
 However , streaming context ( receiver wishing the streaming context )
 gets events/messages/records and forms a time window based batch (RDD)-

 So there is a maximum gap of window time from alert message was available
 to spark and when the processing happens. I think you meant about this.

 As per spark programming model, RDD is the right way to deal with data.
 If you are fine with the minimum delay of say a sec (based on min time
 window that dstreaming can support) then what Rohit gave is a right model.

 Khanderao

 On Mar 22, 2015, at 11:39 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 What do you mean you can't send it directly from spark workers? Here's a
 simple approach which you could do:

 val data = ssc.textFileStream(sigmoid/)
 val dist = data.filter(_.contains(ERROR)).foreachRDD(rdd =
 alert(Errors : + rdd.count()))

 And the alert() function could be anything triggering an email or sending
 an SMS alert.

 Thanks
 Best Regards

 On Sun, Mar 22, 2015 at 1:52 AM, Mohit Anchlia mohitanch...@gmail.com
 wrote:

 Is there a module in spark streaming that lets you listen to
 the alerts/conditions as they happen in the streaming module? Generally
 spark streaming components will execute on large set of clusters like hdfs
 or Cassandra, however when it comes to alerting you generally can't send it
 directly from the spark workers, which means you need a way to listen to
 the alerts.





Spark-thriftserver Issue

2015-03-23 Thread Neil Dev
Hi,

I am having issue starting spark-thriftserver. I'm running spark 1.3.with
Hadoop 2.4.0. I would like to be able to change its port too so, I can hive
hive-thriftserver as well as spark-thriftserver running at the same time.

Starting sparkthrift server:-
sudo ./start-thriftserver.sh --master spark://ip-172-31-10-124:7077
--executor-memory 2G

Error:-
I created the folder manually but still getting the following error
Exception in thread main java.lang.IllegalArgumentException: Log
directory /tmp/spark-events does not exist.


I am getting the following error
15/03/23 15:07:02 ERROR thrift.ThriftCLIService: Error:
org.apache.thrift.transport.TTransportException: Could not create
ServerSocket on address0.0.0.0/0.0.0.0:1.
at
org.apache.thrift.transport.TServerSocket.init(TServerSocket.java:93)
at
org.apache.thrift.transport.TServerSocket.init(TServerSocket.java:79)
at
org.apache.hive.service.auth.HiveAuthFactory.getServerSocket(HiveAuthFactory.java:236)
at
org.apache.hive.service.cli.thrift.ThriftBinaryCLIService.run(ThriftBinaryCLIService.java:69)
at java.lang.Thread.run(Thread.java:745)

Thanks
Neil


Re: newbie quesiton - spark with mesos

2015-03-23 Thread Dean Wampler
That's a very old page, try this instead:

http://spark.apache.org/docs/latest/running-on-mesos.html

When you run your Spark job on Mesos, tasks will be started on the slave
nodes as needed, since fine-grained mode is the default.

For a job like your example, very few tasks will be needed. Actually only
one would be enough, but the default number of partitions will be used. I
believe 8 is the default for Mesos. For local mode (local[*]), it's the
number of cores. You can also set the propoerty spark.default.parallelism.

HTH,

Dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Mar 23, 2015 at 11:46 AM, Anirudha Jadhav aniru...@nyu.edu wrote:

 i have a mesos cluster, which i deploy spark to by using instructions on
 http://spark.apache.org/docs/0.7.2/running-on-mesos.html

 after that the spark shell starts up fine.
 then i try the following on the shell:

 val data = 1 to 1

 val distData = sc.parallelize(data)

 distData.filter(_ 10).collect()

 open spark web ui at host:4040 and see an active job.

 NOW, how do i start workers or spark workers on mesos ? who completes my
 job?
 thanks,

 --
 Ani



Re: Getting around Serializability issues for types not in my control

2015-03-23 Thread Cody Koeninger
Have you tried instantiating the instance inside the closure, rather than
outside of it?

If that works, you may need to switch to use mapPartition /
foreachPartition for efficiency reasons.


On Mon, Mar 23, 2015 at 3:03 PM, Adelbert Chang adelbe...@gmail.com wrote:

 Is there no way to pull out the bits of the instance I want before I sent
 it through the closure for aggregate? I did try pulling things out, along
 the lines of

 def foo[G[_], B](blah: Blah)(implicit G: Applicative[G]) = {
   val lift: B = G[RDD[B]] = b =
 G.point(sparkContext.parallelize(List(b)))

   rdd.aggregate(/* use lift in here */)
 }

 But that doesn't seem to work either, still seems to be trying to
 serialize the Applicative... :(

 On Mon, Mar 23, 2015 at 12:27 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 Well, it's complaining about trait OptionInstances which is defined in
 Option.scala in the std package. Use scalap or javap on the scalaz library
 to find out which member of the trait is the problem, but since it says
 $$anon$1, I suspect it's the first value member, implicit val
 optionInstance, which has a long list of mixin traits, one of which is
 probably at fault. OptionInstances is huge, so there might be other
 offenders.

 Scalaz wasn't designed for distributed systems like this, so you'll
 probably find many examples of nonserializability. An alternative is to
 avoid using Scalaz in any closures passed to Spark methods, but that's
 probably not what you want.

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Mon, Mar 23, 2015 at 12:03 PM, adelbertc adelbe...@gmail.com wrote:

 Hey all,

 I'd like to use the Scalaz library in some of my Spark jobs, but am
 running
 into issues where some stuff I use from Scalaz is not serializable. For
 instance, in Scalaz there is a trait

 /** In Scalaz */
 trait Applicative[F[_]] {
   def apply2[A, B, C](fa: F[A], fb: F[B])(f: (A, B) = C): F[C]
   def point[A](a: = A): F[A]
 }

 But when I try to use it in say, in an `RDD#aggregate` call I get:


 Caused by: java.io.NotSerializableException:
 scalaz.std.OptionInstances$$anon$1
 Serialization stack:
 - object not serializable (class:
 scalaz.std.OptionInstances$$anon$1,
 value: scalaz.std.OptionInstances$$anon$1@4516ee8c)
 - field (class: dielectric.syntax.RDDOps$$anonfun$1, name: G$1,
 type:
 interface scalaz.Applicative)
 - object (class dielectric.syntax.RDDOps$$anonfun$1, function2)
 - field (class:
 dielectric.syntax.RDDOps$$anonfun$traverse$extension$1,
 name: apConcat$1, type: interface scala.Function2)
 - object (class
 dielectric.syntax.RDDOps$$anonfun$traverse$extension$1,
 function2)

 Outside of submitting a PR to Scalaz to make things Serializable, what
 can I
 do to make things Serializable? I considered something like

 implicit def applicativeSerializable[F[_]](implicit F: Applicative[F]):
 SomeSerializableType[F] =
   new SomeSerializableType { ... } ??

 Not sure how to go about doing it - I looked at java.io.Externalizable
 but
 given `scalaz.Applicative` has no value members I'm not sure how to
 implement the interface.

 Any guidance would be much appreciated - thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Getting-around-Serializability-issues-for-types-not-in-my-control-tp22193.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





 --
 Adelbert (Allen) Chang



Re: Strange behavior with PySpark when using Join() and zip()

2015-03-23 Thread Sean Owen
I think this is a bad example since testData is not deterministic at
all. I thought we had fixed this or similar examples in the past? As
in https://github.com/apache/spark/pull/1250/files

Hm, anyone see a reason that shouldn't be changed too?

On Mon, Mar 23, 2015 at 7:00 PM, Ofer Mendelevitch
omendelevi...@hortonworks.com wrote:
 Thanks Sean,

 Sorting definitely solves it, but I was hoping it could be avoided :)

 In the documentation for Classification in ML-Lib for example, zip() is used
 to create labelsAndPredictions:

 from pyspark.mllib.tree import RandomForest
 from pyspark.mllib.util import MLUtils

 # Load and parse the data file into an RDD of LabeledPoint.
 data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
 # Split the data into training and test sets (30% held out for testing)
 (trainingData, testData) = data.randomSplit([0.7, 0.3])

 # Train a RandomForest model.
 #  Empty categoricalFeaturesInfo indicates all features are continuous.
 #  Note: Use larger numTrees in practice.
 #  Setting featureSubsetStrategy=auto lets the algorithm choose.
 model = RandomForest.trainClassifier(trainingData, numClasses=2,
 categoricalFeaturesInfo={},
  numTrees=3,
 featureSubsetStrategy=auto,
  impurity='gini', maxDepth=4,
 maxBins=32)

 # Evaluate model on test instances and compute test error
 predictions = model.predict(testData.map(lambda x: x.features))
 labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
 testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() /
 float(testData.count())
 print('Test Error = ' + str(testErr))
 print('Learned classification forest model:')
 print(model.toDebugString())


 The reason the zip() works here is because the RDD is loaded from a file.
 If it was generated with something that includes a JOIN() it won’t work due
 to this same issue.

 Maybe worth mentioning in the docs then?

 Ofer



 On Mar 23, 2015, at 11:40 AM, Sean Owen so...@cloudera.com wrote:

 I think the explanation is that the join does not guarantee any order,
 since it causes a shuffle in general, and it is computed twice in the
 first example, resulting in a difference for d1 and d2.

 You can persist() the result of the join and in practice I believe
 you'd find it behaves as expected, although that is even not 100%
 guaranteed since a block could be lost and recomputed (differently).

 If order matters, and it does for zip(), then the reliable way to
 guarantee a well defined ordering for zipping is to sort the RDDs.

 On Mon, Mar 23, 2015 at 6:27 PM, Ofer Mendelevitch
 omendelevi...@hortonworks.com wrote:

 Hi,

 I am running into a strange issue when doing a JOIN of two RDDs followed by
 ZIP from PySpark.
 It’s part of a more complex application, but was able to narrow it down to a
 simplified example that’s easy to replicate and causes the same problem to
 appear:


 raw = sc.parallelize([('k'+str(x),'v'+str(x)) for x in range(100)])
 data = raw.join(raw).mapValues(lambda x: [x[0]]+[x[1]]).map(lambda pair:
 ','.join([x for x in pair[1]]))
 d1 = data.map(lambda s: s.split(',')[0])
 d2 = data.map(lambda s: s.split(',')[1])
 x = d1.zip(d2)

 print x.take(10)


 The output is:


 [('v44', 'v80'), ('v79', 'v44'), ('v80', 'v79'), ('v45', 'v78'), ('v81',
 'v81'), ('v78', 'v45'), ('v99', 'v99'), ('v82', 'v82'), ('v46', 'v46'),
 ('v83', 'v83')]


 As you can see, the ordering of items is not preserved anymore in all cases.
 (e.g., ‘v81’ is preserved, and ‘v45’ is not)
 Is it not supposed to be preserved?

 If I do the same thing without the JOIN:

 data = sc.parallelize('v'+str(x)+',v'+str(x) for x in range(100))
 d1 = data.map(lambda s: s.split(',')[0])
 d2 = data.map(lambda s: s.split(',')[1])
 x = d1.zip(d2)

 print x.take(10)

 The output is:


 [('v0', 'v0'), ('v1', 'v1'), ('v2', 'v2'), ('v3', 'v3'), ('v4', 'v4'),
 ('v5', 'v5'), ('v6', 'v6'), ('v7', 'v7'), ('v8', 'v8'), ('v9', 'v9')]


 As expected.

 Anyone run into this or a similar issue?

 Ofer



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Strange behavior with PySpark when using Join() and zip()

2015-03-23 Thread Ofer Mendelevitch
Thanks Sean,

Sorting definitely solves it, but I was hoping it could be avoided :)

In the documentation for Classification in ML-Lib for example, zip() is used to 
create labelsAndPredictions:

-
from pyspark.mllib.tree import RandomForest
from pyspark.mllib.util import MLUtils

# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
#  Empty categoricalFeaturesInfo indicates all features are continuous.
#  Note: Use larger numTrees in practice.
#  Setting featureSubsetStrategy=auto lets the algorithm choose.
model = RandomForest.trainClassifier(trainingData, numClasses=2, 
categoricalFeaturesInfo={}, numTrees=3, featureSubsetStrategy=auto”, 
impurity='gini', maxDepth=4, maxBins=32)

# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / 
float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification forest model:')
print(model.toDebugString())



The reason the zip() works here is because the RDD is loaded from a file.
If it was generated with something that includes a JOIN() it won’t work due to 
this same issue.

Maybe worth mentioning in the docs then?

Ofer



 On Mar 23, 2015, at 11:40 AM, Sean Owen so...@cloudera.com wrote:
 
 I think the explanation is that the join does not guarantee any order,
 since it causes a shuffle in general, and it is computed twice in the
 first example, resulting in a difference for d1 and d2.
 
 You can persist() the result of the join and in practice I believe
 you'd find it behaves as expected, although that is even not 100%
 guaranteed since a block could be lost and recomputed (differently).
 
 If order matters, and it does for zip(), then the reliable way to
 guarantee a well defined ordering for zipping is to sort the RDDs.
 
 On Mon, Mar 23, 2015 at 6:27 PM, Ofer Mendelevitch
 omendelevi...@hortonworks.com wrote:
 Hi,
 
 I am running into a strange issue when doing a JOIN of two RDDs followed by
 ZIP from PySpark.
 It’s part of a more complex application, but was able to narrow it down to a
 simplified example that’s easy to replicate and causes the same problem to
 appear:
 
 
 raw = sc.parallelize([('k'+str(x),'v'+str(x)) for x in range(100)])
 data = raw.join(raw).mapValues(lambda x: [x[0]]+[x[1]]).map(lambda pair:
 ','.join([x for x in pair[1]]))
 d1 = data.map(lambda s: s.split(',')[0])
 d2 = data.map(lambda s: s.split(',')[1])
 x = d1.zip(d2)
 
 print x.take(10)
 
 
 The output is:
 
 
 [('v44', 'v80'), ('v79', 'v44'), ('v80', 'v79'), ('v45', 'v78'), ('v81',
 'v81'), ('v78', 'v45'), ('v99', 'v99'), ('v82', 'v82'), ('v46', 'v46'),
 ('v83', 'v83')]
 
 
 As you can see, the ordering of items is not preserved anymore in all cases.
 (e.g., ‘v81’ is preserved, and ‘v45’ is not)
 Is it not supposed to be preserved?
 
 If I do the same thing without the JOIN:
 
 data = sc.parallelize('v'+str(x)+',v'+str(x) for x in range(100))
 d1 = data.map(lambda s: s.split(',')[0])
 d2 = data.map(lambda s: s.split(',')[1])
 x = d1.zip(d2)
 
 print x.take(10)
 
 The output is:
 
 
 [('v0', 'v0'), ('v1', 'v1'), ('v2', 'v2'), ('v3', 'v3'), ('v4', 'v4'),
 ('v5', 'v5'), ('v6', 'v6'), ('v7', 'v7'), ('v8', 'v8'), ('v9', 'v9')]
 
 
 As expected.
 
 Anyone run into this or a similar issue?
 
 Ofer



SparkEnv

2015-03-23 Thread Koert Kuipers
is it safe to access SparkEnv.get inside say mapPartitions?
i need to get a Serializer (so SparkEnv.get.serializer)

thanks


Re: Getting around Serializability issues for types not in my control

2015-03-23 Thread Dean Wampler
Well, it's complaining about trait OptionInstances which is defined in
Option.scala in the std package. Use scalap or javap on the scalaz library
to find out which member of the trait is the problem, but since it says
$$anon$1, I suspect it's the first value member, implicit val
optionInstance, which has a long list of mixin traits, one of which is
probably at fault. OptionInstances is huge, so there might be other
offenders.

Scalaz wasn't designed for distributed systems like this, so you'll
probably find many examples of nonserializability. An alternative is to
avoid using Scalaz in any closures passed to Spark methods, but that's
probably not what you want.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Mar 23, 2015 at 12:03 PM, adelbertc adelbe...@gmail.com wrote:

 Hey all,

 I'd like to use the Scalaz library in some of my Spark jobs, but am running
 into issues where some stuff I use from Scalaz is not serializable. For
 instance, in Scalaz there is a trait

 /** In Scalaz */
 trait Applicative[F[_]] {
   def apply2[A, B, C](fa: F[A], fb: F[B])(f: (A, B) = C): F[C]
   def point[A](a: = A): F[A]
 }

 But when I try to use it in say, in an `RDD#aggregate` call I get:


 Caused by: java.io.NotSerializableException:
 scalaz.std.OptionInstances$$anon$1
 Serialization stack:
 - object not serializable (class:
 scalaz.std.OptionInstances$$anon$1,
 value: scalaz.std.OptionInstances$$anon$1@4516ee8c)
 - field (class: dielectric.syntax.RDDOps$$anonfun$1, name: G$1,
 type:
 interface scalaz.Applicative)
 - object (class dielectric.syntax.RDDOps$$anonfun$1, function2)
 - field (class:
 dielectric.syntax.RDDOps$$anonfun$traverse$extension$1,
 name: apConcat$1, type: interface scala.Function2)
 - object (class
 dielectric.syntax.RDDOps$$anonfun$traverse$extension$1,
 function2)

 Outside of submitting a PR to Scalaz to make things Serializable, what can
 I
 do to make things Serializable? I considered something like

 implicit def applicativeSerializable[F[_]](implicit F: Applicative[F]):
 SomeSerializableType[F] =
   new SomeSerializableType { ... } ??

 Not sure how to go about doing it - I looked at java.io.Externalizable but
 given `scalaz.Applicative` has no value members I'm not sure how to
 implement the interface.

 Any guidance would be much appreciated - thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Getting-around-Serializability-issues-for-types-not-in-my-control-tp22193.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




JDBC DF using DB2

2015-03-23 Thread Jack Arenas
Hi Team, 
 
I’m trying to create a DF using jdbc as detailed here – I’m currently using DB2 
v9.7.0.6 and I’ve tried to use the db2jcc.jar and db2jcc_license_cu.jar combo, 
and while it works in --master local using the command below, I get some 
strange behavior in --master yarn-client. Here is the command:
 
val df = sql.load(jdbc, Map(url - 
jdbc:db2://host:port/db:currentSchema=schema;user=user;password=password;,
 driver - com.ibm.db2.jcc.DB2Driver, dbtable - table))
 
It seems to also be working on yarn-client because once executed I get the 
following log:
df: org.apache.spark.sql.DataFrame = [DATE_FIELD: date, INT_FIELD: int, 
DOUBLE_FIELD: double]
 
Which indicates me that Spark was able to connect to the DB. But once I run 
df.count() or df.take(5).foreach(println) in order to operate on the data and 
get a result, I get back a ‘No suitable driver found’ exception, which makes me 
think the driver wasn’t shipped with the spark job.
 
I’ve tried using --driver-class-path, --jars, SPARK_CLASSPATH to add the jars 
to the spark job. I also have the jars in my$CLASSPATH and $HADOOP_CLASSPATH.
 
I also saw this in the trouble shooting section, but quite frankly I’m not sure 
what primordial class loader it’s talking about:
 
The JDBC driver class must be visible to the primordial class loader on the 
client session and on all executors. This is because Java’s DriverManager class 
does a security check that results in it ignoring all drivers not visible to 
the primordial class loader when one goes to open a connection. One convenient 
way to do this is to modify compute_classpath.sh on all worker nodes to include 
your driver JARs.

Any advice is welcome!
 
Thanks,
Jack
 
 
 

Re: Use pig load function in spark

2015-03-23 Thread Denny Lee
You may be able to utilize Spork (Pig on Apache Spark) as a mechanism to do
this: https://github.com/sigmoidanalytics/spork


On Mon, Mar 23, 2015 at 2:29 AM Dai, Kevin yun...@ebay.com wrote:

  Hi, all



 Can spark use pig’s load function to load data?



 Best Regards,

 Kevin.



Re: Getting around Serializability issues for types not in my control

2015-03-23 Thread Adelbert Chang
Is there no way to pull out the bits of the instance I want before I sent
it through the closure for aggregate? I did try pulling things out, along
the lines of

def foo[G[_], B](blah: Blah)(implicit G: Applicative[G]) = {
  val lift: B = G[RDD[B]] = b = G.point(sparkContext.parallelize(List(b)))

  rdd.aggregate(/* use lift in here */)
}

But that doesn't seem to work either, still seems to be trying to serialize
the Applicative... :(

On Mon, Mar 23, 2015 at 12:27 PM, Dean Wampler deanwamp...@gmail.com
wrote:

 Well, it's complaining about trait OptionInstances which is defined in
 Option.scala in the std package. Use scalap or javap on the scalaz library
 to find out which member of the trait is the problem, but since it says
 $$anon$1, I suspect it's the first value member, implicit val
 optionInstance, which has a long list of mixin traits, one of which is
 probably at fault. OptionInstances is huge, so there might be other
 offenders.

 Scalaz wasn't designed for distributed systems like this, so you'll
 probably find many examples of nonserializability. An alternative is to
 avoid using Scalaz in any closures passed to Spark methods, but that's
 probably not what you want.

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Mon, Mar 23, 2015 at 12:03 PM, adelbertc adelbe...@gmail.com wrote:

 Hey all,

 I'd like to use the Scalaz library in some of my Spark jobs, but am
 running
 into issues where some stuff I use from Scalaz is not serializable. For
 instance, in Scalaz there is a trait

 /** In Scalaz */
 trait Applicative[F[_]] {
   def apply2[A, B, C](fa: F[A], fb: F[B])(f: (A, B) = C): F[C]
   def point[A](a: = A): F[A]
 }

 But when I try to use it in say, in an `RDD#aggregate` call I get:


 Caused by: java.io.NotSerializableException:
 scalaz.std.OptionInstances$$anon$1
 Serialization stack:
 - object not serializable (class:
 scalaz.std.OptionInstances$$anon$1,
 value: scalaz.std.OptionInstances$$anon$1@4516ee8c)
 - field (class: dielectric.syntax.RDDOps$$anonfun$1, name: G$1,
 type:
 interface scalaz.Applicative)
 - object (class dielectric.syntax.RDDOps$$anonfun$1, function2)
 - field (class:
 dielectric.syntax.RDDOps$$anonfun$traverse$extension$1,
 name: apConcat$1, type: interface scala.Function2)
 - object (class
 dielectric.syntax.RDDOps$$anonfun$traverse$extension$1,
 function2)

 Outside of submitting a PR to Scalaz to make things Serializable, what
 can I
 do to make things Serializable? I considered something like

 implicit def applicativeSerializable[F[_]](implicit F: Applicative[F]):
 SomeSerializableType[F] =
   new SomeSerializableType { ... } ??

 Not sure how to go about doing it - I looked at java.io.Externalizable but
 given `scalaz.Applicative` has no value members I'm not sure how to
 implement the interface.

 Any guidance would be much appreciated - thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Getting-around-Serializability-issues-for-types-not-in-my-control-tp22193.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-- 
Adelbert (Allen) Chang


objectFile uses only java serializer?

2015-03-23 Thread Koert Kuipers
in the comments on SparkContext.objectFile it says:
It will also be pretty slow if you use the default serializer (Java
serialization)

this suggests the spark.serializer is used, which means i can switch to the
much faster kryo serializer. however when i look at the code it uses
Utils.deserialize, which is always using Java serialization.

did i get that right? and is this desired?
it seems straightforward to switch objectFile to use the serializer as
specified by spark.serializer (although it might being in new classloader
issues).


Re: Converting SparkSQL query to Scala query

2015-03-23 Thread Dean Wampler
There isn't any automated way. Note that as the DataFrame implementation
improves, it will probably do a better job with query optimization than
hand-rolled Scala code. I don't know if that's true yet, though.

For now, there are a few examples at the beginning of the DataFrame
scaladocs
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame
page
showing typical scenarios. Also, the DataFrameSuite of tests in the Spark
source code has many examples.

If you scan through the list of methods, you'll see that many of them have
the same name as the corresponding SQL keyword.

HTH,

Dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Mar 23, 2015 at 11:42 AM, nishitd nishitde...@yahoo.com wrote:

 I have a complex SparkSQL query of the nature

 select a.a, b.b, c.c from a,b,c where a.x = b.x and b.y = c.y

 How do I convert this efficiently into scala query of

 a.join(b,..,..)

 and so on. Can anyone help me with this? If my question needs more
 clarification, please let me know.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Converting-SparkSQL-query-to-Scala-query-tp22192.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: SchemaRDD/DataFrame result partitioned according to the underlying datasource partitions

2015-03-23 Thread Michael Armbrust
There is not an interface to this at this time, and in general I'm hesitant
to open up interfaces where the user could make a mistake where they think
something is going to improve performance but will actually impact
correctness.  Since, as you say, we are picking the partitioner
automatically in the query planner its much harder to know if you are
actually going to preserve the expected partitioning.

Additionally, its also a little more complicated when reading in data as
even if you have files that are partitioned correctly, the InputFormat is
free to split those files, violating our assumptions about partitioning.

On Mon, Mar 23, 2015 at 10:22 AM, Stephen Boesch java...@gmail.com wrote:


 Is there a way to take advantage of the underlying datasource partitions
 when generating a DataFrame/SchemaRDD via catalyst?  It seems from the sql
 module that the only options are RangePartitioner and HashPartitioner - and
 further that those are selected automatically by the code .  It was not
 apparent that either the underlying partitioning were translated to the
 partitions presented in the rdd or that a custom partitioner were possible
 to be provided.

 The motivation would be to subsequently use df.map (with
 preservesPartitioning=true) and/or df.mapPartitions (likewise) to perform
 operations that work within the original datasource partitions - thus
 avoiding a shuffle.









Re: How to check that a dataset is sorted after it has been written out?

2015-03-23 Thread Akhil Das
One approach would be to repartition the whole data into 1 (costly
operation though, but will give you a single file). Also, You could try
using zipWithIndex before writing it out.

Thanks
Best Regards

On Sat, Mar 21, 2015 at 4:11 AM, Michael Albert 
m_albert...@yahoo.com.invalid wrote:

 Greetings!

 I sorted a dataset in Spark and then wrote it out in avro/parquet.

 Then I wanted to check that it was sorted.

 It looks like each partition has been sorted, but when reading in, the
 first partition (i.e., as
 seen in the partition index of mapPartitionsWithIndex) is not the same  as
 implied by
 the names of the parquet files (even when the number of partitions is the
 same in the
 rdd which was read as on disk).

 If I take() a few hundred values, they are sorted, but they are *not*
 the same as if I
 explicitly open part-r-0.parquet and take values from that.

 It seems that when opening the rdd, the partitions of the rdd are not in
 the same
 order as implied by the data on disk (i.e., part-r-0.parquet,
 part-r-1.parquet, etc).

 So, how might one read the data so that one maintains the sort order?

 And while on the subject, after the terasort, how did they check that
 the
 data was actually sorted correctly? (or did they :-) ? ).

 Is there any way to read the data back in so as to preserve the sort, or
 do I need to
 zipWithIndex before writing it out, and write the index at that time? (I
 haven't tried the
 latter yet).

 Thanks!
 -Mike




Spark Sql with python udf fail

2015-03-23 Thread lonely Feb
Hi all, I tried to transfer some hive jobs into spark-sql. When i ran a sql
job with python udf i got a exception:

java.lang.ArrayIndexOutOfBoundsException: 9
at
org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142)
at
org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37)
at
org.apache.spark.sql.catalyst.expressions.EqualTo.eval(predicates.scala:166)
at
org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$apply$1.apply(predicates.scala:30)
at
org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$apply$1.apply(predicates.scala:30)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:156)
at
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151)
at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

I suspected there was an odd line in the input file. But the input file is
so large and i could not found any abnormal lines with several jobs to
check. How can i get the abnormal line here ?


Re: Spark UI tunneling

2015-03-23 Thread Sergey Gerasimov
Akhil,

that's what I did.

The problem is that probably web server tried to forward my request to another 
address accessible locally only.



 23 марта 2015 г., в 11:12, Akhil Das ak...@sigmoidanalytics.com написал(а):
 
 Did you try ssh -L 4040:127.0.0.1:4040 user@host
 
 Thanks
 Best Regards
 
 On Mon, Mar 23, 2015 at 1:12 PM, sergunok ser...@gmail.com wrote:
 Is it a way to tunnel Spark UI?
 
 I tried to tunnel client-node:4040  but my browser was redirected from
 localhost to some cluster locally visible domain name..
 
 Maybe there is some startup option to encourage Spark UI be fully
 accessiable just through single endpoint (address:port)?
 
 Serg.
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-UI-tunneling-tp22184.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


Re: Data/File structure Validation

2015-03-23 Thread Ahmed Nawar
Dear Taotao,

Yes, I tried sparkCSV.


Thanks,
Nawwar


On Mon, Mar 23, 2015 at 12:20 PM, Taotao.Li taotao...@datayes.com wrote:

 can it load successfully if the format is invalid?

 --
 *发件人: *Ahmed Nawar ahmed.na...@gmail.com
 *收件人: *user@spark.apache.org
 *发送时间: *星期一, 2015年 3 月 23日 下午 4:48:54
 *主题: *Data/File structure Validation

 Dears,

 Is there any way to validate the CSV, Json ... Files while loading to
 DataFrame.
 I need to ignore corrupted rows.(Rows with not matching with the
 schema).


 Thanks,
 Ahmed Nawwar



 --


 *---*

 *Thanks  Best regards*

 李涛涛 Taotao · Li  |  Fixed Income@Datayes  |  Software Engineer

 地址:上海市浦东新区陆家嘴西路99号万向大厦8楼, 200120
 Address :Wanxiang Towen 8F, Lujiazui West Rd. No.99, Pudong New District,
 Shanghai, 200120

 电话|Phone:021-60216502  手机|Mobile: +86-18202171279




Re: Data/File structure Validation

2015-03-23 Thread Ahmed Nawar
Dear Raunak,

   Source system provided logs with some errors. I need to make sure each
row is in correct format (number of columns/ attributes and data types is
correct) and move incorrect Rows to separated List.

Of course i can do my logic but i need to make sure there is no direct way.


Thanks,
Nawwar


On Mon, Mar 23, 2015 at 1:14 PM, Raunak Jhawar raunak.jha...@gmail.com
wrote:

 CSV is a structured format and JSON is not (semi structured). It is
 obvious for different JSON documents to have differing schema? What are you
 trying to do here?

 --
 Thanks,
 Raunak Jhawar
 m: 09820890034






 On Mon, Mar 23, 2015 at 2:18 PM, Ahmed Nawar ahmed.na...@gmail.com
 wrote:

 Dears,

 Is there any way to validate the CSV, Json ... Files while loading to
 DataFrame.
 I need to ignore corrupted rows.(Rows with not matching with the
 schema).


 Thanks,
 Ahmed Nawwar





Re: Buffering for Socket streams

2015-03-23 Thread Akhil Das
You can try playing with spark.streaming.blockInterval so that it wont
consume a lot of data, default value is 200ms

Thanks
Best Regards

On Fri, Mar 20, 2015 at 8:49 PM, jamborta jambo...@gmail.com wrote:

 Hi all,

 We are designing a workflow where we try to stream local files to a Socket
 streamer, that would clean and process the files and write them to hdfs. We
 have an issue with bigger files when the streamer cannot keep up with the
 data, and runs out of memory.

 What would be the best way to implement an approach where the Socket stream
 receiver would notify the stream not to send more data (stop reading from
 disk too?), just before it might run out of memory?

 thanks,




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Buffering-for-Socket-streams-tp22164.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Cassandra time series + Spark

2015-03-23 Thread Rumph, Frens Jan
Hi,

I'm working on a system which has to deal with time series data. I've been
happy using Cassandra for time series and Spark looks promising as a
computational platform.

I consider chunking time series in Cassandra necessary, e.g. by 3 weeks as
kairosdb does it. This allows an 8 byte chunk start timestamp with 4 byte
offsets for the individual measurements. And it keeps the data below 2x10^9
even at 1000 Hz.

This schema works quite okay when dealing with one time series at a time.
Because the data is partitioned by time series id and chunk of time (e.g.
the three weeks mentioned above), it requires a little client side logic to
retrieve the partitions and glue them together, but this is quite okay.

However, when working with many / all of the time series in a table at
once, e.g. in Spark, the story changes dramatically. Say I'd want to
compute something simple as a moving average, I have to deal with data all
over the place. I can't currently think of anything but performing
aggregateByKey causing a shuffle every time.

Anyone have experience with combining time series chunking and computation
on all / many time series at once? Any advice?

Cheers,
Frens Jan


log files of failed task

2015-03-23 Thread sergunok
Hi,

I executed a task on Spark in YARN and it failed.
I see just executor lost message from YARNClientScheduler, no further
details..
(I read ths error can be connected to spark.yarn.executor.memoryOverhead
setting and already played with this param)

How to go more deeply in details in log files and find exact reason? How can
log of failed task be examined?

Unfortunately I haven't access to UI of Spark just can use command line.

Thanks!

Serg.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/log-files-of-failed-task-tp22183.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Data/File structure Validation

2015-03-23 Thread Taotao.Li
can it load successfully if the format is invalid? 

- 原始邮件 -

发件人: Ahmed Nawar ahmed.na...@gmail.com 
收件人: user@spark.apache.org 
发送时间: 星期一, 2015年 3 月 23日 下午 4:48:54 
主题: Data/File structure Validation 

Dears, 

Is there any way to validate the CSV, Json ... Files while loading to 
DataFrame. 
I need to ignore corrupted rows.(Rows with not matching with the schema). 


Thanks, 
Ahmed Nawwar 



-- 


--- 

Thanks  Best regards 

李涛涛 Taotao · Li | Fixed Income@Datayes | Software Engineer 

地址:上海市浦东新区陆家嘴西路 99 号万向大厦8 楼, 200120 
Address :Wanxiang Towen 8 F, Lujiazui West Rd. No.99, Pudong New District, 
Shanghai, 200120 

电话 |Phone : 021-60216502 手机 |Mobile: +86-18202171279 



Re: Spark Sql with python udf fail

2015-03-23 Thread Cheng Lian

Could you elaborate on the UDF code?

On 3/23/15 3:43 PM, lonely Feb wrote:
Hi all, I tried to transfer some hive jobs into spark-sql. When i ran 
a sql job with python udf i got a exception:


java.lang.ArrayIndexOutOfBoundsException: 9
at 
org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142)
at 
org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37)
at 
org.apache.spark.sql.catalyst.expressions.EqualTo.eval(predicates.scala:166)
at 
org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$apply$1.apply(predicates.scala:30)
at 
org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$apply$1.apply(predicates.scala:30)

at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:156)
at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151)

at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

at org.apache.spark.scheduler.Task.run(Task.scala:56)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:744)

I suspected there was an odd line in the input file. But the input 
file is so large and i could not found any abnormal lines with several 
jobs to check. How can i get the abnormal line here ?



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming alerting

2015-03-23 Thread Jeffrey Jedele
What exactly do you mean by alerts?

Something specific to your data or general events of the spark cluster? For
the first, sth like Akhil suggested should work. For the latter, I would
suggest having a log consolidation system like logstash in place and use
this to generate alerts.

Regards,
Jeff

2015-03-23 7:39 GMT+01:00 Akhil Das ak...@sigmoidanalytics.com:

 What do you mean you can't send it directly from spark workers? Here's a
 simple approach which you could do:

 val data = ssc.textFileStream(sigmoid/)
 val dist = data.filter(_.contains(ERROR)).foreachRDD(rdd =
 alert(Errors : + rdd.count()))

 And the alert() function could be anything triggering an email or sending
 an SMS alert.

 Thanks
 Best Regards

 On Sun, Mar 22, 2015 at 1:52 AM, Mohit Anchlia mohitanch...@gmail.com
 wrote:

 Is there a module in spark streaming that lets you listen to
 the alerts/conditions as they happen in the streaming module? Generally
 spark streaming components will execute on large set of clusters like hdfs
 or Cassandra, however when it comes to alerting you generally can't send it
 directly from the spark workers, which means you need a way to listen to
 the alerts.





Data/File structure Validation

2015-03-23 Thread Ahmed Nawar
Dears,

Is there any way to validate the CSV, Json ... Files while loading to
DataFrame.
I need to ignore corrupted rows.(Rows with not matching with the
schema).


Thanks,
Ahmed Nawwar


Re: Spark streaming alerting

2015-03-23 Thread Akhil Das
What do you mean you can't send it directly from spark workers? Here's a
simple approach which you could do:

val data = ssc.textFileStream(sigmoid/)
val dist = data.filter(_.contains(ERROR)).foreachRDD(rdd =
alert(Errors : + rdd.count()))

And the alert() function could be anything triggering an email or sending
an SMS alert.

Thanks
Best Regards

On Sun, Mar 22, 2015 at 1:52 AM, Mohit Anchlia mohitanch...@gmail.com
wrote:

 Is there a module in spark streaming that lets you listen to
 the alerts/conditions as they happen in the streaming module? Generally
 spark streaming components will execute on large set of clusters like hdfs
 or Cassandra, however when it comes to alerting you generally can't send it
 directly from the spark workers, which means you need a way to listen to
 the alerts.



Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class

2015-03-23 Thread Night Wolf
Was a solution ever found for this. Trying to run some test cases with sbt
test which use spark sql and in Spark 1.3.0 release with Scala 2.11.6 I get
this error. Setting fork := true in sbt seems to work but its a less than
idea work around.

On Tue, Mar 17, 2015 at 9:37 PM, Eric Charles e...@apache.org wrote:

 Launching from eclipse (scala-ide) as a scala process gives such error,
 but as  a java process (a java main class) works fine.

 Launching as a scala process from Intellij works fine.

 There is something wrong at eclipse side, not in Spark.


 On 03/13/2015 11:47 AM, Jianshi Huang wrote:
  Liancheng also found out that the Spark jars are not included in the
  classpath of URLClassLoader.
 
  Hmm... we're very close to the truth now.
 
  Jianshi
 
  On Fri, Mar 13, 2015 at 6:03 PM, Jianshi Huang jianshi.hu...@gmail.com
  mailto:jianshi.hu...@gmail.com wrote:
 
  I'm almost certain the problem is the ClassLoader.
 
  So adding
 
fork := true
 
  solves problems for test and run.
 
  The problem is how can I fork a JVM for sbt console? fork in console
  := true seems not working...
 
  Jianshi
 
 
  On Fri, Mar 13, 2015 at 4:35 PM, Jianshi Huang
  jianshi.hu...@gmail.com mailto:jianshi.hu...@gmail.com wrote:
 
  I guess it's a ClassLoader issue. But I have no idea how to
  debug it. Any hints?
 
  Jianshi
 
  On Fri, Mar 13, 2015 at 3:00 PM, Eric Charles e...@apache.org
  mailto:e...@apache.org wrote:
 
  i have the same issue running spark sql code from eclipse
  workspace. If you run your code from the command line (with
  a packaged jar) or from Intellij, I bet it should work.
 
  IMHO This is some how related to eclipse env, but would love
  to know how to fix it (whether via eclipse conf, or via a
  patch in spark).
 
 
 
  On 03/01/2015 02:32 AM, Michael Armbrust wrote:
  I think its possible that the problem is that the scala
  compiler is not being loaded by the primordial classloader
  (but instead by some child classloader) and thus the scala
  reflection mirror is failing to initialize when it can't
  find it. Unfortunately, the only solution that I know of
  is to load all required jars when the JVM starts.
 
  On Sat, Feb 28, 2015 at 5:26 PM, Ashish Nigam
  ashnigamt...@gmail.com mailto:ashnigamt...@gmail.com
  wrote:
 
  Also, can scala version play any role here?
  I am using scala 2.11.5 but all spark packages have
  dependency to scala 2.11.2
  Just wanted to make sure that scala version is not an
  issue here.
 
  On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam
  ashnigamt...@gmail.com
  mailto:ashnigamt...@gmail.com wrote:
 
  Hi,
  I wrote a very simple program in scala to convert
  an existing RDD to SchemaRDD.
  But createSchemaRDD function is throwing exception
 
  Exception in thread main
  scala.ScalaReflectionException: class
  org.apache.spark.sql.catalyst.ScalaReflection in
  JavaMirror with primordial classloader with boot
  classpath [.] not found
 
 
  Here's more info on the versions I am using -
 
  scala.binary.version2.11/scala.binary.version
  spark.version1.2.1/spark.version
  scala.version2.11.5/scala.version
 
  Please let me know how can I resolve this problem.
 
  Thanks
  Ashish
 
 
 
 
 
 
  --
  Jianshi Huang
 
  LinkedIn: jianshi
  Twitter: @jshuang
  Github  Blog: http://huangjs.github.com/
 
 
 
 
  --
  Jianshi Huang
 
  LinkedIn: jianshi
  Twitter: @jshuang
  Github  Blog: http://huangjs.github.com/
 
 
 
 
  --
  Jianshi Huang
 
  LinkedIn: jianshi
  Twitter: @jshuang
  Github  Blog: http://huangjs.github.com/

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Sql with python udf fail

2015-03-23 Thread lonely Feb
ok i'll try asap

2015-03-23 17:00 GMT+08:00 Cheng Lian lian.cs@gmail.com:

  I suspect there is a malformed row in your input dataset. Could you try
 something like this to confirm:

 sql(SELECT * FROM your-table).foreach(println)

 If there does exist a malformed line, you should see similar exception.
 And you can catch it with the help of the output. Notice that the messages
 are printed to stdout on executor side.

 On 3/23/15 4:36 PM, lonely Feb wrote:

   I caught exceptions in the python UDF code, flush exceptions into a
 single file, and made sure the the column number of the output lines as
 same as sql schema.

  Sth. interesting is that my output line of the UDF code is just 10
 columns, and the exception above is java.lang.
 ArrayIndexOutOfBoundsException: 9, is there any inspirations?

 2015-03-23 16:24 GMT+08:00 Cheng Lian lian.cs@gmail.com:

 Could you elaborate on the UDF code?


 On 3/23/15 3:43 PM, lonely Feb wrote:

 Hi all, I tried to transfer some hive jobs into spark-sql. When i ran a
 sql job with python udf i got a exception:

 java.lang.ArrayIndexOutOfBoundsException: 9
 at
 org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142)
 at
 org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37)
 at
 org.apache.spark.sql.catalyst.expressions.EqualTo.eval(predicates.scala:166)
 at
 org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$anonfun$apply$1.apply(predicates.scala:30)
 at
 org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$anonfun$apply$1.apply(predicates.scala:30)
 at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:390)
 at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
 at
 org.apache.spark.sql.execution.Aggregate$anonfun$execute$1$anonfun$7.apply(Aggregate.scala:156)
 at
 org.apache.spark.sql.execution.Aggregate$anonfun$execute$1$anonfun$7.apply(Aggregate.scala:151)
 at org.apache.spark.rdd.RDD$anonfun$13.apply(RDD.scala:601)
 at org.apache.spark.rdd.RDD$anonfun$13.apply(RDD.scala:601)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)

 I suspected there was an odd line in the input file. But the input file
 is so large and i could not found any abnormal lines with several jobs to
 check. How can i get the abnormal line here ?



​



Re: Spark Sql with python udf fail

2015-03-23 Thread lonely Feb
sql(SELECT * FROM your-table).foreach(println)

can be executed successfully. So the problem may still be in UDF code. How
can i print the the line with ArrayIndexOutOfBoundsException in catalyst?

2015-03-23 17:04 GMT+08:00 lonely Feb lonely8...@gmail.com:

 ok i'll try asap

 2015-03-23 17:00 GMT+08:00 Cheng Lian lian.cs@gmail.com:

  I suspect there is a malformed row in your input dataset. Could you try
 something like this to confirm:

 sql(SELECT * FROM your-table).foreach(println)

 If there does exist a malformed line, you should see similar exception.
 And you can catch it with the help of the output. Notice that the messages
 are printed to stdout on executor side.

 On 3/23/15 4:36 PM, lonely Feb wrote:

   I caught exceptions in the python UDF code, flush exceptions into a
 single file, and made sure the the column number of the output lines as
 same as sql schema.

  Sth. interesting is that my output line of the UDF code is just 10
 columns, and the exception above is java.lang.
 ArrayIndexOutOfBoundsException: 9, is there any inspirations?

 2015-03-23 16:24 GMT+08:00 Cheng Lian lian.cs@gmail.com:

 Could you elaborate on the UDF code?


 On 3/23/15 3:43 PM, lonely Feb wrote:

 Hi all, I tried to transfer some hive jobs into spark-sql. When i ran a
 sql job with python udf i got a exception:

 java.lang.ArrayIndexOutOfBoundsException: 9
 at
 org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142)
 at
 org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37)
 at
 org.apache.spark.sql.catalyst.expressions.EqualTo.eval(predicates.scala:166)
 at
 org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$anonfun$apply$1.apply(predicates.scala:30)
 at
 org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$anonfun$apply$1.apply(predicates.scala:30)
 at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:390)
 at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
 at
 org.apache.spark.sql.execution.Aggregate$anonfun$execute$1$anonfun$7.apply(Aggregate.scala:156)
 at
 org.apache.spark.sql.execution.Aggregate$anonfun$execute$1$anonfun$7.apply(Aggregate.scala:151)
 at org.apache.spark.rdd.RDD$anonfun$13.apply(RDD.scala:601)
 at org.apache.spark.rdd.RDD$anonfun$13.apply(RDD.scala:601)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)

 I suspected there was an odd line in the input file. But the input file
 is so large and i could not found any abnormal lines with several jobs to
 check. How can i get the abnormal line here ?



​





Re: SocketTimeout only when launching lots of executors

2015-03-23 Thread Akhil Das
It seems your driver is getting flooded by those many executors and hence
it gets timeout. There are some configuration options like
spark.akka.timeout etc, you could try playing with those. More information
will be available here:
http://spark.apache.org/docs/latest/configuration.html

Thanks
Best Regards

On Mon, Mar 23, 2015 at 9:46 AM, Tianshuo Deng td...@twitter.com.invalid
wrote:

 Hi, spark users.

 When running a spark application with lots of executors(300+), I see
 following failures:

 java.net.SocketTimeoutException: Read timed out  at
 java.net.SocketInputStream.socketRead0(Native Method)  at
 java.net.SocketInputStream.read(SocketInputStream.java:152)  at
 java.net.SocketInputStream.read(SocketInputStream.java:122)  at
 java.io.BufferedInputStream.fill(BufferedInputStream.java:235)  at
 java.io.BufferedInputStream.read1(BufferedInputStream.java:275)  at
 java.io.BufferedInputStream.read(BufferedInputStream.java:334)  at
 sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:690)  at
 sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:633)  at
 sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1324)
 at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:583)  at
 org.apache.spark.util.Utils$.fetchFile(Utils.scala:421)  at
 org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:356)
 at
 org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:353)
 at
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)  at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at 
 org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:353)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 When I reduce the number of executors, the spark app runs fine. From the
 stack trace, it looks like that multiple executors requesting downloading
 dependencies at the same time is causing driver to timeout?

 Anyone experienced similar issues or has any suggestions?

 Thanks
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: netlib-java cannot load native lib in Windows when using spark-submit

2015-03-23 Thread Xi Shen
I did not build my own Spark. I got the binary version online. If it can
load the native libs from IDE, it should also be able to load native when
running with --matter local.

On Mon, 23 Mar 2015 07:15 Burak Yavuz brk...@gmail.com wrote:

 Did you build Spark with: -Pnetlib-lgpl?

 Ref: https://spark.apache.org/docs/latest/mllib-guide.html

 Burak

 On Sun, Mar 22, 2015 at 7:37 AM, Ted Yu yuzhih...@gmail.com wrote:

 How about pointing LD_LIBRARY_PATH to native lib folder ?

 You need Spark 1.2.0 or higher for the above to work. See SPARK-1719

 Cheers

 On Sun, Mar 22, 2015 at 4:02 AM, Xi Shen davidshe...@gmail.com wrote:

 Hi Ted,

 I have tried to invoke the command from both cygwin environment and
 powershell environment. I still get the messages:

 15/03/22 21:56:00 WARN netlib.BLAS: Failed to load implementation from:
 com.github.fommil.netlib.NativeSystemBLAS
 15/03/22 21:56:00 WARN netlib.BLAS: Failed to load implementation from:
 com.github.fommil.netlib.NativeRefBLAS

 From the Spark UI, I can see:

   spark.driver.extraLibrary c:\openblas


 Thanks,
 David


 On Sun, Mar 22, 2015 at 11:45 AM Ted Yu yuzhih...@gmail.com wrote:

 Can you try the --driver-library-path option ?

 spark-submit --driver-library-path /opt/hadoop/lib/native ...

 Cheers

 On Sat, Mar 21, 2015 at 4:58 PM, Xi Shen davidshe...@gmail.com wrote:

 Hi,

 I use the *OpenBLAS* DLL, and have configured my application to work
 in IDE. When I start my Spark application from IntelliJ IDE, I can see in
 the log that the native lib is loaded successfully.

 But if I use *spark-submit* to start my application, the native lib
 still cannot be load. I saw the WARN message that it failed to load both
 the native and native-ref library. I checked the *Environment* tab in
 the Spark UI, and the *java.library.path* is set correctly.


 Thanks,

 David








Re: Spark Sql with python udf fail

2015-03-23 Thread Cheng Lian
I suspect there is a malformed row in your input dataset. Could you try 
something like this to confirm:


|sql(SELECT * FROM your-table).foreach(println)
|

If there does exist a malformed line, you should see similar exception. 
And you can catch it with the help of the output. Notice that the 
messages are printed to stdout on executor side.


On 3/23/15 4:36 PM, lonely Feb wrote:

I caught exceptions in the python UDF code, flush exceptions into a 
single file, and made sure the the column number of the output lines 
as same as sql schema.


Sth. interesting is that my output line of the UDF code is just 10 
columns, and the exception above is 
java.lang.ArrayIndexOutOfBoundsException: 9, is there any inspirations?


2015-03-23 16:24 GMT+08:00 Cheng Lian lian.cs@gmail.com 
mailto:lian.cs@gmail.com:


Could you elaborate on the UDF code?


On 3/23/15 3:43 PM, lonely Feb wrote:

Hi all, I tried to transfer some hive jobs into spark-sql.
When i ran a sql job with python udf i got a exception:

java.lang.ArrayIndexOutOfBoundsException: 9
at

org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142)
at

org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37)
at

org.apache.spark.sql.catalyst.expressions.EqualTo.eval(predicates.scala:166)
at

org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$anonfun$apply$1.apply(predicates.scala:30)
at

org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$anonfun$apply$1.apply(predicates.scala:30)
at
scala.collection.Iterator$anon$14.hasNext(Iterator.scala:390)
at
scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
at

org.apache.spark.sql.execution.Aggregate$anonfun$execute$1$anonfun$7.apply(Aggregate.scala:156)
at

org.apache.spark.sql.execution.Aggregate$anonfun$execute$1$anonfun$7.apply(Aggregate.scala:151)
at
org.apache.spark.rdd.RDD$anonfun$13.apply(RDD.scala:601)
at
org.apache.spark.rdd.RDD$anonfun$13.apply(RDD.scala:601)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

I suspected there was an odd line in the input file. But the
input file is so large and i could not found any abnormal
lines with several jobs to check. How can i get the abnormal
line here ?




​


Spark SQL udf(ScalaUdf) is very slow

2015-03-23 Thread zzcclp
My test env:1. Spark version is 1.3.02. 3 node per 80G/20C3. read 250G
parquet files from hdfs Test case:1. register floor func with command:
*sqlContext.udf.register(floor, (ts: Int) = ts - ts % 300), *then run
with sql select chan, floor(ts) as tt, sum(size) from qlogbase3 group by
chan, floor(ts), *it takes 17 minutes.*== Physical Plan == 
   
Aggregate false, [chan#23015,PartialGroup#23500],
[chan#23015,PartialGroup#23500 AS tt#23494,CombineSum(PartialSum#23499L) AS
c2#23495L] Exchange (HashPartitioning [chan#23015,PartialGroup#23500], 54) 
Aggregate true, [chan#23015,*scalaUDF*(ts#23016)],
[chan#23015,scalaUDF(ts#23016) AS PartialGroup#23500,SUM(size#23023L) AS
PartialSum#23499L]   PhysicalRDD [chan#23015,ts#23016,size#23023L],
MapPartitionsRDD[115] at map at newParquet.scala:5622.run with sql select
chan, (ts - ts % 300) as tt, sum(size) from qlogbase3 group by chan, (ts -
ts % 300), *it takes only 5 minutes.*== Physical Plan == Aggregate false,
[chan#23015,PartialGroup#23349], [chan#23015,PartialGroup#23349 AS
tt#23343,CombineSum(PartialSum#23348L) AS c2#23344L]Exchange
(HashPartitioning [chan#23015,PartialGroup#23349], 54)Aggregate
true, [chan#23015,(ts#23016 - (ts#23016 % 300))], [chan#23015,(ts#23016 -
(ts#23016 % 300)) AS PartialGroup#23349,SUM(size#23023L) AS
PartialSum#23348L]   PhysicalRDD [chan#23015,ts#23016,size#23023L],
MapPartitionsRDD[83] at map at newParquet.scala:5623. use *HiveContext* with
sql select chan, floor((ts - ts % 300)) as tt, sum(size) from qlogbase3
group by chan, floor((ts - ts % 300)) it takes only 5 minutes too.==
Physical Plan == Aggregate false, [chan#23015,PartialGroup#23108L],
[chan#23015,PartialGroup#23108L AS tt#23102L,CombineSum(PartialSum#23107L)
AS _c2#23103L] Exchange (HashPartitioning [chan#23015,PartialGroup#23108L],
54)  Aggregate true,
[chan#23015,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor((ts#23016
- (ts#23016 % 300)))],
[chan#23015,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor((ts#23016
- (ts#23016 % 300))) AS PartialGroup#23108L,SUM(size#23023L) AS
PartialSum#23107L]   PhysicalRDD [chan#23015,ts#23016,size#23023L],
MapPartitionsRDD[28] at map at newParquet.scala:562*Why? ScalaUdf is so
slow?? How to improve it?*



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-udf-ScalaUdf-is-very-slow-tp22185.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Spark streaming alerting

2015-03-23 Thread Khanderao Kand Gmail
Akhil 

You are right in tour answer to what Mohit wrote. However what Mohit seems to 
be alluring but did not write properly might be different.

Mohit

You are wrong in saying generally streaming works in HDFS and cassandra . 
Streaming typically works with streaming or queing source like Kafka, kinesis, 
Twitter, flume, zeroMQ, etc (but can also from HDFS and S3 ) However , 
streaming context ( receiver wishing the streaming context ) gets 
events/messages/records and forms a time window based batch (RDD)- 

So there is a maximum gap of window time from alert message was available to 
spark and when the processing happens. I think you meant about this. 

As per spark programming model, RDD is the right way to deal with data.  If you 
are fine with the minimum delay of say a sec (based on min time window that 
dstreaming can support) then what Rohit gave is a right model. 

Khanderao

 On Mar 22, 2015, at 11:39 PM, Akhil Das ak...@sigmoidanalytics.com wrote:
 
 What do you mean you can't send it directly from spark workers? Here's a 
 simple approach which you could do:
 
 val data = ssc.textFileStream(sigmoid/)
 val dist = data.filter(_.contains(ERROR)).foreachRDD(rdd = 
 alert(Errors : + rdd.count()))
 
 And the alert() function could be anything triggering an email or sending an 
 SMS alert.
 
 Thanks
 Best Regards
 
 On Sun, Mar 22, 2015 at 1:52 AM, Mohit Anchlia mohitanch...@gmail.com 
 wrote:
 Is there a module in spark streaming that lets you listen to the 
 alerts/conditions as they happen in the streaming module? Generally spark 
 streaming components will execute on large set of clusters like hdfs or 
 Cassandra, however when it comes to alerting you generally can't send it 
 directly from the spark workers, which means you need a way to listen to the 
 alerts.
 


Re: Spark UI tunneling

2015-03-23 Thread Akhil Das
Oh in that case you could try adding the hostname in your /etc/hosts under
your localhost. Also make sure there is a request going to another host by
inspecting the network calls:

[image: Inline image 1]

Thanks
Best Regards

On Mon, Mar 23, 2015 at 1:55 PM, Sergey Gerasimov ser...@gmail.com wrote:

 Akhil,

 that's what I did.

 The problem is that probably web server tried to forward my request to
 another address accessible locally only.



 23 марта 2015 г., в 11:12, Akhil Das ak...@sigmoidanalytics.com
 написал(а):

 Did you try ssh -L 4040:127.0.0.1:4040 user@host

 Thanks
 Best Regards

 On Mon, Mar 23, 2015 at 1:12 PM, sergunok ser...@gmail.com wrote:

 Is it a way to tunnel Spark UI?

 I tried to tunnel client-node:4040  but my browser was redirected from
 localhost to some cluster locally visible domain name..

 Maybe there is some startup option to encourage Spark UI be fully
 accessiable just through single endpoint (address:port)?

 Serg.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-UI-tunneling-tp22184.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: log files of failed task

2015-03-23 Thread Emre Sevinc
Hello Sergun,

Generally you can use

   yarn application -list

to see the applicationIDs of applications and then you can see the logs
of finished applications using:

   yarn logs -applicationId applicationID

Hope this helps.

--
Emre Sevinç
http://www.bigindustries.be/



On Mon, Mar 23, 2015 at 8:23 AM, sergunok ser...@gmail.com wrote:

 Hi,

 I executed a task on Spark in YARN and it failed.
 I see just executor lost message from YARNClientScheduler, no further
 details..
 (I read ths error can be connected to spark.yarn.executor.memoryOverhead
 setting and already played with this param)

 How to go more deeply in details in log files and find exact reason? How
can
 log of failed task be examined?

 Unfortunately I haven't access to UI of Spark just can use command line.

 Thanks!

 Serg.



 --
 View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/log-files-of-failed-task-tp22183.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




--
Emre Sevinc


Re: How Does aggregate work

2015-03-23 Thread Paweł Szulc
It is actually number of cores. If your processor has hyperthreading then
it will be more (number of processors your OS sees)

niedz., 22 mar 2015, 4:51 PM Ted Yu użytkownik yuzhih...@gmail.com
napisał:

 I assume spark.default.parallelism is 4 in the VM Ashish was using.

 Cheers



Re: How to handle under-performing nodes in the cluster

2015-03-23 Thread Akhil Das
It seems that node is not getting allocated with enough tasks, try
increasing your level of parallelism or do a manual repartition so that
everyone gets even tasks to operate on.

Thanks
Best Regards

On Fri, Mar 20, 2015 at 8:05 PM, Yiannis Gkoufas johngou...@gmail.com
wrote:

 Hi all,

 I have 6 nodes in the cluster and one of the nodes is clearly
 under-performing:


 ​
 I was wandering what is the impact of having such issues? Also what is the
 recommended way to workaround it?

 Thanks a lot,
 Yiannis



Re: updateStateByKey performance API

2015-03-23 Thread Andre Schumacher

Hi Nikos,

We experienced something similar in our setting where the Spark app was
supposed to write to a Redis instance the final state changes. Over time
the delay caused by re-writing the entire dataset in each iteration
exceeded the Spark streaming batch size.

In our cased the solution was to avoid updateStateByKey and persist the
state directly to Redis. That of course means the join of the new keys to
the old state needs to be done explicitly. I think the solution to this
problem would be to just extend the Spark streaming API by having an
alternative state update that instead of a cogroup does something like an
inner join. Should be quite straightforward.

I went ahead and added an issue:
https://issues.apache.org/jira/browse/SPARK-6462


Note that this does not solve the problem when your state grows so large
that merging in keys becomes a bottleneck (since that would require
something like an IndexRDD). But in your case you mention serialization
overhead to be the bottleneck, so maybe you could try filtering out
unchanged keys before persisting the data? Just an idea..

Andre



On 22/03/15 10:43, Andre Schumacher andre.sc...@gmail.com wrote:




 Forwarded Message 
Subject: Re: updateStateByKey performance  API
Date: Wed, 18 Mar 2015 13:06:15 +0200
From: Nikos Viorres nvior...@gmail.com
To: Akhil Das ak...@sigmoidanalytics.com
CC: user@spark.apache.org user@spark.apache.org

Hi Akhil,

Yes, that's what we are planning on doing at the end of the data. At the
moment I am doing performance testing before the job hits production and
testing on 4 cores to get baseline figures and deduced that in order to
grow to 10 - 15 million keys we ll need at batch interval of ~20 secs if
we
don't want to allocate more than 8 cores on this job. The thing is that
since we have a big silent window on the user interactions where the
stream will have very few data we would like to be able to use these cores
for batch processing during that window but we can't the way it currently
works.

best regards
n

On Wed, Mar 18, 2015 at 12:40 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 You can always throw more machines at this and see if the performance is
 increasing. Since you haven't mentioned anything regarding your # cores
etc.

 Thanks
 Best Regards

 On Wed, Mar 18, 2015 at 11:42 AM, nvrs nvior...@gmail.com wrote:

 Hi all,

 We are having a few issues with the performance of updateStateByKey
 operation in Spark Streaming (1.2.1 at the moment) and any advice
would be
 greatly appreciated. Specifically, on each tick of the system (which is
 set
 at 10 secs) we need to update a state tuple where the key is the
user_id
 and
 value an object with some state about the user. The problem is that
using
 Kryo serialization for 5M users, this gets really slow to the point
that
 we
 have to increase the period to more than 10 seconds so as not to fall
 behind.
 The input for the streaming job is a Kafka stream which is consists of
key
 value pairs of user_ids with some sort of action codes, we join this to
 our
 checkpointed state key and update the state.
 I understand that the reason for iterating over the whole state set is
for
 evicting items or updating state for everyone for time-depended
 computations
 but this does not apply on our situation and it hurts performance
really
 bad.
 Is there a possibility of implementing in the future and extra call in
the
 API for updating only a specific subset of keys?

 p.s. i will try asap to setting the dstream as non-serialized but then
i
 am
 worried about GC and checkpointing performance



 --
 View this message in context:
 
http://apache-spark-user-list.1001560.n3.nabble.com/updateStateByKey-per
formance-API-tp22113.html
 Sent from the Apache Spark User List mailing list archive at
Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org








-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark UI tunneling

2015-03-23 Thread Akhil Das
Did you try ssh -L 4040:127.0.0.1:4040 user@host

Thanks
Best Regards

On Mon, Mar 23, 2015 at 1:12 PM, sergunok ser...@gmail.com wrote:

 Is it a way to tunnel Spark UI?

 I tried to tunnel client-node:4040  but my browser was redirected from
 localhost to some cluster locally visible domain name..

 Maybe there is some startup option to encourage Spark UI be fully
 accessiable just through single endpoint (address:port)?

 Serg.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-UI-tunneling-tp22184.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Use pig load function in spark

2015-03-23 Thread Dai, Kevin
Hi, all

Can spark use pig's load function to load data?

Best Regards,
Kevin.


Re: spark disk-to-disk

2015-03-23 Thread Reynold Xin
Maybe implement a very simple function that uses the Hadoop API to read in
based on file names (i.e. parts)?

On Mon, Mar 23, 2015 at 10:55 AM, Koert Kuipers ko...@tresata.com wrote:

 there is a way to reinstate the partitioner, but that requires
 sc.objectFile to read exactly what i wrote, which means sc.objectFile
 should never split files on reading (a feature of hadoop file inputformat
 that gets in the way here).

 On Mon, Mar 23, 2015 at 1:39 PM, Koert Kuipers ko...@tresata.com wrote:

 i just realized the major limitation is that i lose partitioning info...

 On Mon, Mar 23, 2015 at 1:34 AM, Reynold Xin r...@databricks.com wrote:


 On Sun, Mar 22, 2015 at 6:03 PM, Koert Kuipers ko...@tresata.com
 wrote:

 so finally i can resort to:
 rdd.saveAsObjectFile(...)
 sc.objectFile(...)
 but that seems like a rather broken abstraction.


 This seems like a fine solution to me.






Re: Getting around Serializability issues for types not in my control

2015-03-23 Thread Adelbert Chang
Instantiating the instance? The actual instance it's complaining about is:

https://github.com/scalaz/scalaz/blob/16838556c9309225013f917e577072476f46dc14/core/src/main/scala/scalaz/std/Option.scala#L10-11

The specific import where it's picking up the instance is:

https://github.com/scalaz/scalaz/blob/16838556c9309225013f917e577072476f46dc14/core/src/main/scala/scalaz/std/Option.scala#L227


Note the object extends OptionInstances which contains that instance.

Is the suggestion to pass in something like new OptionInstances { } into
the RDD#aggregate call?

On Mon, Mar 23, 2015 at 1:09 PM, Cody Koeninger c...@koeninger.org wrote:

 Have you tried instantiating the instance inside the closure, rather than
 outside of it?

 If that works, you may need to switch to use mapPartition /
 foreachPartition for efficiency reasons.


 On Mon, Mar 23, 2015 at 3:03 PM, Adelbert Chang adelbe...@gmail.com
 wrote:

 Is there no way to pull out the bits of the instance I want before I sent
 it through the closure for aggregate? I did try pulling things out, along
 the lines of

 def foo[G[_], B](blah: Blah)(implicit G: Applicative[G]) = {
   val lift: B = G[RDD[B]] = b =
 G.point(sparkContext.parallelize(List(b)))

   rdd.aggregate(/* use lift in here */)
 }

 But that doesn't seem to work either, still seems to be trying to
 serialize the Applicative... :(

 On Mon, Mar 23, 2015 at 12:27 PM, Dean Wampler deanwamp...@gmail.com
 wrote:

 Well, it's complaining about trait OptionInstances which is defined in
 Option.scala in the std package. Use scalap or javap on the scalaz library
 to find out which member of the trait is the problem, but since it says
 $$anon$1, I suspect it's the first value member, implicit val
 optionInstance, which has a long list of mixin traits, one of which is
 probably at fault. OptionInstances is huge, so there might be other
 offenders.

 Scalaz wasn't designed for distributed systems like this, so you'll
 probably find many examples of nonserializability. An alternative is to
 avoid using Scalaz in any closures passed to Spark methods, but that's
 probably not what you want.

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Mon, Mar 23, 2015 at 12:03 PM, adelbertc adelbe...@gmail.com wrote:

 Hey all,

 I'd like to use the Scalaz library in some of my Spark jobs, but am
 running
 into issues where some stuff I use from Scalaz is not serializable. For
 instance, in Scalaz there is a trait

 /** In Scalaz */
 trait Applicative[F[_]] {
   def apply2[A, B, C](fa: F[A], fb: F[B])(f: (A, B) = C): F[C]
   def point[A](a: = A): F[A]
 }

 But when I try to use it in say, in an `RDD#aggregate` call I get:


 Caused by: java.io.NotSerializableException:
 scalaz.std.OptionInstances$$anon$1
 Serialization stack:
 - object not serializable (class:
 scalaz.std.OptionInstances$$anon$1,
 value: scalaz.std.OptionInstances$$anon$1@4516ee8c)
 - field (class: dielectric.syntax.RDDOps$$anonfun$1, name: G$1,
 type:
 interface scalaz.Applicative)
 - object (class dielectric.syntax.RDDOps$$anonfun$1,
 function2)
 - field (class:
 dielectric.syntax.RDDOps$$anonfun$traverse$extension$1,
 name: apConcat$1, type: interface scala.Function2)
 - object (class
 dielectric.syntax.RDDOps$$anonfun$traverse$extension$1,
 function2)

 Outside of submitting a PR to Scalaz to make things Serializable, what
 can I
 do to make things Serializable? I considered something like

 implicit def applicativeSerializable[F[_]](implicit F: Applicative[F]):
 SomeSerializableType[F] =
   new SomeSerializableType { ... } ??

 Not sure how to go about doing it - I looked at java.io.Externalizable
 but
 given `scalaz.Applicative` has no value members I'm not sure how to
 implement the interface.

 Any guidance would be much appreciated - thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Getting-around-Serializability-issues-for-types-not-in-my-control-tp22193.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





 --
 Adelbert (Allen) Chang





-- 
Adelbert (Allen) Chang


Is it possible to use json4s 3.2.11 with Spark 1.3.0?

2015-03-23 Thread Alexey Zinoviev
Spark has a dependency on json4s 3.2.10, but this version has several bugs
and I need to use 3.2.11. I added json4s-native 3.2.11 dependency to
build.sbt and everything compiled fine. But when I spark-submit my JAR it
provides me with 3.2.10.


build.sbt

import sbt.Keys._

name := sparkapp

version := 1.0

scalaVersion := 2.10.4

libraryDependencies += org.apache.spark %% spark-core  % 1.3.0 %
provided

libraryDependencies += org.json4s %% json4s-native % 3.2.11`


plugins.sbt

logLevel := Level.Warn

resolvers += Resolver.url(artifactory, url(
http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases
))(Resolver.ivyStylePatterns)

addSbtPlugin(com.eed3si9n % sbt-assembly % 0.13.0)


App1.scala

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.SparkContext._

object App1 extends Logging {
  def main(args: Array[String]) = {
val conf = new SparkConf().setAppName(App1)
val sc = new SparkContext(conf)
println(sjson4s version: ${org.json4s.BuildInfo.version.toString})
  }
}



sbt 0.13.7, sbt-assembly 0.13.0, Scala 2.10.4

Is it possible to force 3.2.11 version usage?

Thanks,
Alexey


Re: Spark 1.3 Dynamic Allocation - Requesting 0 new executor(s) because tasks are backlogged

2015-03-23 Thread Marcelo Vanzin
On Mon, Mar 23, 2015 at 2:15 PM, Manoj Samel manojsamelt...@gmail.com wrote:
 Found the issue above error - the setting for spark_shuffle was incomplete.

 Now it is able to ask and get additional executors. The issue is once they
 are released, it is not able to proceed with next query.

That looks like SPARK-6325, which unfortunately was not fixed in time
for 1.3.0...

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to use DataFrame with MySQL

2015-03-23 Thread Rishi Yadav
for me, it's only working if I set --driver-class-path to mysql library.

On Sun, Mar 22, 2015 at 11:29 PM, gavin zhang gavin@gmail.com wrote:

 OK,I found what the problem is: It couldn't work with
 mysql-connector-5.0.8.
 I updated the connector version to 5.1.34 and it worked.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-DataFrame-with-MySQL-tp22178p22182.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Shuffle Spill Memory and Shuffle Spill Disk

2015-03-23 Thread Bijay Pathak
Hello,

I am running  TeraSort https://github.com/ehiggs/spark-terasort on 100GB
of data. The final metrics I am getting on Shuffle Spill are:

Shuffle Spill(Memory): 122.5 GB
Shuffle Spill(Disk): 3.4 GB

What's the difference and relation between these two metrics? Does these
mean 122.5 GB was spill from memory during the shuffle?

thank you,
bijay


hadoop input/output format advanced control

2015-03-23 Thread Koert Kuipers
currently its pretty hard to control the Hadoop Input/Output formats used
in Spark. The conventions seems to be to add extra parameters to all
methods and then somewhere deep inside the code (for example in
PairRDDFunctions.saveAsHadoopFile) all these parameters get translated into
settings on the Hadoop Configuration object.

for example for compression i see codec: Option[Class[_ :
CompressionCodec]] = None added to a bunch of methods.

how scalable is this solution really?

for example i need to read from a hadoop dataset and i dont want the input
(part) files to get split up. the way to do this is to set
mapred.min.split.size. now i dont want to set this at the level of the
SparkContext (which can be done), since i dont want it to apply to input
formats in general. i want it to apply to just this one specific input
dataset i need to read. which leaves me with no options currently. i could
go add yet another input parameter to all the methods
(SparkContext.textFile, SparkContext.hadoopFile, SparkContext.objectFile,
etc.). but that seems ineffective.

why can we not expose a Map[String, String] or some other generic way to
manipulate settings for hadoop input/output formats? it would require
adding one more parameter to all methods to deal with hadoop input/output
formats, but after that its done. one parameter to rule them all

then i could do:
val x = sc.textFile(/some/path, formatSettings =
Map(mapred.min.split.size - 12345))

or
rdd.saveAsTextFile(/some/path, formatSettings =
Map(mapred.output.compress - true, mapred.output.compression.codec -
somecodec))


Re: Spark 1.3 Dynamic Allocation - Requesting 0 new executor(s) because tasks are backlogged

2015-03-23 Thread Manoj Samel
Log shows stack traces that seem to match the assert in JIRA so it seems I
am hitting the issue. Thanks for the heads up ...

15/03/23 20:29:50 ERROR actor.OneForOneStrategy: assertion failed:
Allocator killed more executors than are allocated!
java.lang.AssertionError: assertion failed: Allocator killed more executors
than are allocated!
at scala.Predef$.assert(Predef.scala:179)
at
org.apache.spark.deploy.yarn.YarnAllocator.killExecutor(YarnAllocator.scala:152)
at
org.apache.spark.deploy.yarn.ApplicationMaster$AMActor$$anonfun$receive$1$$anonfun$applyOrElse$6.apply(ApplicationMaster.scala:547)
at
org.apache.spark.deploy.yarn.ApplicationMaster$AMActor$$anonfun$receive$1$$anonfun$applyOrElse$6.apply(ApplicationMaster.scala:547)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.deploy.yarn.ApplicationMaster$AMActor$$anonfun$receive$1.applyOrElse(ApplicationMaster.scala:547)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
org.apache.spark.deploy.yarn.ApplicationMaster$AMActor.aroundReceive(ApplicationMaster.scala:506)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

On Mon, Mar 23, 2015 at 2:25 PM, Marcelo Vanzin van...@cloudera.com wrote:

 On Mon, Mar 23, 2015 at 2:15 PM, Manoj Samel manojsamelt...@gmail.com
 wrote:
  Found the issue above error - the setting for spark_shuffle was
 incomplete.
 
  Now it is able to ask and get additional executors. The issue is once
 they
  are released, it is not able to proceed with next query.

 That looks like SPARK-6325, which unfortunately was not fixed in time
 for 1.3.0...

 --
 Marcelo



Re: Using a different spark jars than the one on the cluster

2015-03-23 Thread Denny Lee
+1  - I currently am doing what Marcelo is suggesting as I have a CDH 5.2
cluster (with Spark 1.1) and I'm also running Spark 1.3.0+ side-by-side in
my cluster.

On Wed, Mar 18, 2015 at 1:23 PM Marcelo Vanzin van...@cloudera.com wrote:

 Since you're using YARN, you should be able to download a Spark 1.3.0
 tarball from Spark's website and use spark-submit from that
 installation to launch your app against the YARN cluster.

 So effectively you would have 1.2.0 and 1.3.0 side-by-side in your cluster.

 On Wed, Mar 18, 2015 at 11:09 AM, jaykatukuri jkatuk...@apple.com wrote:
  Hi all,
  I am trying to run my job which needs spark-sql_2.11-1.3.0.jar.
  The cluster that I am running on is still on spark-1.2.0.
 
  I tried the following :
 
  spark-submit --class class-name --num-executors 100 --master yarn
  application_jar--jars hdfs:///path/spark-sql_2.11-1.3.0.jar
  hdfs:///input_data
 
  But, this did not work, I get an error that it is not able to find a
  class/method that is in spark-sql_2.11-1.3.0.jar .
 
  org.apache.spark.sql.SQLContext.implicits()Lorg/
 apache/spark/sql/SQLContext$implicits$
 
  The question in general is how do we use a different version of spark
 jars
  (spark-core, spark-sql, spark-ml etc) than the one's running on a
 cluster ?
 
  Thanks,
  Jay
 
 
 
 
 
  --
  View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/Using-a-different-spark-jars-than-the-
 one-on-the-cluster-tp22125.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



 --
 Marcelo

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark 1.3 Dynamic Allocation - Requesting 0 new executor(s) because tasks are backlogged

2015-03-23 Thread Manoj Samel
Found the issue above error - the setting for spark_shuffle was incomplete.

Now it is able to ask and get additional executors. The issue is once they
are released, it is not able to proceed with next query.

The environment is CDH 5.3.2 (Hadoop 2.5) with Kerberos  Spark 1.3

After idle time, the release of executor gives a stack trace under WARN and
returns to the prompt (in spark-shell)

15/03/23 20:55:50 INFO YarnClientSchedulerBackend: Requesting to kill
executor(s) 2
15/03/23 20:55:50 INFO ExecutorAllocationManager: Removing executor 2
because it has been idle for 60 seconds (new desired total will be 6)
15/03/23 20:55:50 INFO YarnClientSchedulerBackend: Requesting to kill
executor(s) 5
15/03/23 20:55:50 INFO ExecutorAllocationManager: Removing executor 5
because it has been idle for 60 seconds (new desired total will be 5)
15/03/23 20:55:50 INFO YarnClientSchedulerBackend: Requesting to kill
executor(s) 1
15/03/23 20:55:51 WARN ReliableDeliverySupervisor: Association with remote
system [akka.tcp://sparkExecutor@xxxl:47358] has failed, address is now
gated for [5000] ms. Reason is: [Disassociated].
15/03/23 20:55:51 WARN ReliableDeliverySupervisor: Association with remote
system [akka.tcp://sparkExecutor@yyy:51807] has failed, address is now
gated for [5000] ms. Reason is: [Disassociated].
15/03/23 20:55:52 WARN ReliableDeliverySupervisor: Association with remote
system [akka.tcp://sparkExecutor@zzz:54623] has failed, address is now
gated for [5000] ms. Reason is: [Disassociated].
15/03/23 20:56:20 WARN AkkaUtils: Error sending message [message =
KillExecutors(ArrayBuffer(1))] in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:171)
at
org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerActor$$anonfun$receive$1$$anonfun$applyOrElse$4.apply$mcV$sp(YarnSchedulerBackend.scala:136)
at
org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerActor$$anonfun$receive$1$$anonfun$applyOrElse$4.apply(YarnSchedulerBackend.scala:136)
at
org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerActor$$anonfun$receive$1$$anonfun$applyOrElse$4.apply(YarnSchedulerBackend.scala:136)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/03/23 20:56:20 WARN AkkaUtils: Error sending message [message =
KillExecutors(ArrayBuffer(1))] in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:171)
at
org.apache.spark.scheduler.cluster.YarnSchedulerBackend.doKillExecutors(YarnSchedulerBackend.scala:68)
at
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors(CoarseGrainedSchedulerBackend.scala:375)
at org.apache.spark.SparkContext.killExecutors(SparkContext.scala:1173)
at
org.apache.spark.ExecutorAllocationClient$class.killExecutor(ExecutorAllocationClient.scala:49)
at org.apache.spark.SparkContext.killExecutor(SparkContext.scala:1186)
at org.apache.spark.ExecutorAllocationManager.org
$apache$spark$ExecutorAllocationManager$$removeExecutor(ExecutorAllocationManager.scala:353)
at
org.apache.spark.ExecutorAllocationManager$$anonfun$org$apache$spark$ExecutorAllocationManager$$schedule$1.apply(ExecutorAllocationManager.scala:237)
at
org.apache.spark.ExecutorAllocationManager$$anonfun$org$apache$spark$ExecutorAllocationManager$$schedule$1.apply(ExecutorAllocationManager.scala:234)
at
scala.collection.mutable.MapLike$$anonfun$retain$2.apply(MapLike.scala:213)
at
scala.collection.mutable.MapLike$$anonfun$retain$2.apply(MapLike.scala:212)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.immutable.List.foreach(List.scala:318)
at

SchemaRDD/DataFrame result partitioned according to the underlying datasource partitions

2015-03-23 Thread Stephen Boesch
Is there a way to take advantage of the underlying datasource partitions
when generating a DataFrame/SchemaRDD via catalyst?  It seems from the sql
module that the only options are RangePartitioner and HashPartitioner - and
further that those are selected automatically by the code .  It was not
apparent that either the underlying partitioning were translated to the
partitions presented in the rdd or that a custom partitioner were possible
to be provided.

The motivation would be to subsequently use df.map (with
preservesPartitioning=true) and/or df.mapPartitions (likewise) to perform
operations that work within the original datasource partitions - thus
avoiding a shuffle.


Re: Why doesn't the --conf parameter work in yarn-cluster mode (but works in yarn-client and local)?

2015-03-23 Thread Sandy Ryza
Hi Emre,

The --conf property is meant to work with yarn-cluster mode.
System.getProperty(key) isn't guaranteed, but new SparkConf().get(key)
should.  Does it not?

-Sandy

On Mon, Mar 23, 2015 at 8:39 AM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello,

 According to Spark Documentation at
 https://spark.apache.org/docs/1.2.1/submitting-applications.html :

   --conf: Arbitrary Spark configuration property in key=value format. For
 values that contain spaces wrap “key=value” in quotes (as shown).

 And indeed, when I use that parameter, in my Spark program I can retrieve
 the value of the key by using:

 System.getProperty(key);

 This works when I test my program locally, and also in yarn-client mode, I
 can log the value of the key and see that it matches what I wrote in the
 command line, but it returns *null* when I submit the very same program in
 *yarn-cluster* mode.

 Why can't I retrieve the value of key given as --conf key=value when I
 submit my Spark application in *yarn-cluster* mode?

 Any ideas and/or workarounds?


 --
 Emre Sevinç
 http://www.bigindustries.be/




Parquet file + increase read parallelism

2015-03-23 Thread SamyaMaiti
Hi All,

Suppose I have a parquet file of 100 MB in HDFS  my HDFS block is 64MB, so
I have 2 block of data.

When I do, *sqlContext.parquetFile(path)* followed by an action , two
tasks are stared on two partitions.

My intend is to read this 2 blocks in more partitions to fully utilize my
cluster resources  increase parallelism. 

Is there a way to do so like in case of
sc.textFile(path,*numberOfPartitions*).

Please note, I don't want to do *repartition* as that would result in lot of
shuffle.

Thanks in advance.

Regards,
Sam



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-file-increase-read-parallelism-tp22190.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-23 Thread Martin Goodson
Have you tried to repartition() your original data to make more partitions
before you aggregate?


-- 
Martin Goodson  |  VP Data Science
(0)20 3397 1240
[image: Inline image 1]

On Mon, Mar 23, 2015 at 4:12 PM, Yiannis Gkoufas johngou...@gmail.com
wrote:

 Hi Yin,

 Yes, I have set spark.executor.memory to 8g and the worker memory to 16g
 without any success.
 I cannot figure out how to increase the number of mapPartitions tasks.

 Thanks a lot

 On 20 March 2015 at 18:44, Yin Huai yh...@databricks.com wrote:

 spark.sql.shuffle.partitions only control the number of tasks in the
 second stage (the number of reducers). For your case, I'd say that the
 number of tasks in the first state (number of mappers) will be the number
 of files you have.

 Actually, have you changed spark.executor.memory (it controls the
 memory for an executor of your application)? I did not see it in your
 original email. The difference between worker memory and executor memory
 can be found at (http://spark.apache.org/docs/1.3.0/spark-standalone.html
 ),

 SPARK_WORKER_MEMORY
 Total amount of memory to allow Spark applications to use on the machine,
 e.g. 1000m, 2g (default: total memory minus 1 GB); note that each
 application's individual memory is configured using its
 spark.executor.memory property.


 On Fri, Mar 20, 2015 at 9:25 AM, Yiannis Gkoufas johngou...@gmail.com
 wrote:

 Actually I realized that the correct way is:

 sqlContext.sql(set spark.sql.shuffle.partitions=1000)

 but I am still experiencing the same behavior/error.

 On 20 March 2015 at 16:04, Yiannis Gkoufas johngou...@gmail.com wrote:

 Hi Yin,

 the way I set the configuration is:

 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 sqlContext.setConf(spark.sql.shuffle.partitions,1000);

 it is the correct way right?
 In the mapPartitions task (the first task which is launched), I get
 again the same number of tasks and again the same error. :(

 Thanks a lot!

 On 19 March 2015 at 17:40, Yiannis Gkoufas johngou...@gmail.com
 wrote:

 Hi Yin,

 thanks a lot for that! Will give it a shot and let you know.

 On 19 March 2015 at 16:30, Yin Huai yh...@databricks.com wrote:

 Was the OOM thrown during the execution of first stage (map) or the
 second stage (reduce)? If it was the second stage, can you increase the
 value of spark.sql.shuffle.partitions and see if the OOM disappears?

 This setting controls the number of reduces Spark SQL will use and
 the default is 200. Maybe there are too many distinct values and the 
 memory
 pressure on every task (of those 200 reducers) is pretty high. You can
 start with 400 and increase it until the OOM disappears. Hopefully this
 will help.

 Thanks,

 Yin


 On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas 
 johngou...@gmail.com wrote:

 Hi Yin,

 Thanks for your feedback. I have 1700 parquet files, sized 100MB
 each. The number of tasks launched is equal to the number of parquet 
 files.
 Do you have any idea on how to deal with this situation?

 Thanks a lot
 On 18 Mar 2015 17:35, Yin Huai yh...@databricks.com wrote:

 Seems there are too many distinct groups processed in a task, which
 trigger the problem.

 How many files do your dataset have and how large is a file? Seems
 your query will be executed with two stages, table scan and map-side
 aggregation in the first stage and the final round of reduce-side
 aggregation in the second stage. Can you take a look at the numbers of
 tasks launched in these two stages?

 Thanks,

 Yin

 On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas 
 johngou...@gmail.com wrote:

 Hi there, I set the executor memory to 8g but it didn't help

 On 18 March 2015 at 13:59, Cheng Lian lian.cs@gmail.com
 wrote:

 You should probably increase executor memory by setting
 spark.executor.memory.

 Full list of available configurations can be found here
 http://spark.apache.org/docs/latest/configuration.html

 Cheng


 On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:

 Hi there,

 I was trying the new DataFrame API with some basic operations on
 a parquet dataset.
 I have 7 nodes of 12 cores and 8GB RAM allocated to each worker
 in a standalone cluster mode.
 The code is the following:

 val people = sqlContext.parquetFile(/data.parquet);
 val res = people.groupBy(name,date).
 agg(sum(power),sum(supply)).take(10);
 System.out.println(res);

 The dataset consists of 16 billion entries.
 The error I get is java.lang.OutOfMemoryError: GC overhead limit
 exceeded

 My configuration is:

 spark.serializer org.apache.spark.serializer.KryoSerializer
 spark.driver.memory6g
 spark.executor.extraJavaOptions -XX:+UseCompressedOops
 spark.shuffle.managersort

 Any idea how can I workaround this?

 Thanks a lot













Re: spark disk-to-disk

2015-03-23 Thread Koert Kuipers
i just realized the major limitation is that i lose partitioning info...

On Mon, Mar 23, 2015 at 1:34 AM, Reynold Xin r...@databricks.com wrote:


 On Sun, Mar 22, 2015 at 6:03 PM, Koert Kuipers ko...@tresata.com wrote:

 so finally i can resort to:
 rdd.saveAsObjectFile(...)
 sc.objectFile(...)
 but that seems like a rather broken abstraction.


 This seems like a fine solution to me.




Re: Write to Parquet File in Python

2015-03-23 Thread chuwiey
Hey Akriti23,

pyspark gives you a saveAsParquetFile() api, to save your rdd as parquet.
You will however, need to infer the schema or describe it manually before
you can do so. Here are some docs about that (v1.2.1, you can search for the
others, they're relatively similar 1.1 and up): 
http://spark.apache.org/docs/1.2.1/sql-programming-guide.html#inferring-the-schema-using-reflection
http://spark.apache.org/docs/1.2.1/sql-programming-guide.html#parquet-files

As for whether it is the most efficient way to do a range query, that's a
more difficult question and it would be helpful if you could give some more
information. Another thing to think about is that you could just use a temp
table, and not store the parquet all together. - same docs, just read
through them



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Write-to-Parquet-File-in-Python-tp22186p22191.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



newbie quesiton - spark with mesos

2015-03-23 Thread Anirudha Jadhav
i have a mesos cluster, which i deploy spark to by using instructions on
http://spark.apache.org/docs/0.7.2/running-on-mesos.html

after that the spark shell starts up fine.
then i try the following on the shell:

val data = 1 to 1

val distData = sc.parallelize(data)

distData.filter(_ 10).collect()

open spark web ui at host:4040 and see an active job.

NOW, how do i start workers or spark workers on mesos ? who completes my
job?
thanks,

-- 
Ani


Re: version conflict common-net

2015-03-23 Thread Jacob Abraham
Hi Sean,

Thanks a ton for you reply.

The particular situation I have is case (3) that you have mentioned. The
class that I am using from commons-net is FTPClient(). This class is
present in both the 2.2 version and the 3.3 version. However, in the 3.3
version there are two additional methods (among several others)
setAutoDetectUTF8() and setControlKeepAliveTimeout() that we require to
use.

The class FTPClient() and the two methods are a part of a custom receiver
(ZipStream), that we wrote. Our spark app is deployed on YARN. Looking
online and doing more research I found this link - SPARK-939
https://issues.apache.org/jira/browse/SPARK-939

In the comments section, I found a mention that there is already a flag 
spark.yarn.user.classpath.first to make this happen. I have not yet tried
it out. I guess, this should do the trick right? Also, there are some more
JIRA items I see that, indicate combining the 
spark.files.userClassPathFirst and spark.yarn.user.classpath.first. It
has been marked as resolved. However I am a bit confused about the state of
the JIRA as far as Cloudera version of spark. We are using spark that comes
with CDH 5.3.2 (Spark 1.2.0). I think this combined flag is marked for
spark 1.3.


If all else fails shade 3.3... :)


Thanks again,
-Jacob













On Fri, Mar 20, 2015 at 10:07 AM, Sean Owen so...@cloudera.com wrote:

 It's not a crazy question, no. I'm having a bit of trouble figuring
 out what's happening. Commons Net 2.2 is what's used by Spark. The
 error appears to come from Spark. But the error is not finding a
 method that did not exist in 2.2. I am not sure what ZipStream is, for
 example. This could be a bizarre situation where classloader rules
 mean that part of 2.2 and part of 3.3 are being used. For example,
 let's say:

 - your receiver uses 3.3 classes that are only in 3.3, so they are
 found in your user classloader
 - 3.3 classes call some class that also existed in 2.2, but those are
 found in the Spark classloader.
 - 2.2 class doesn't have methods that 3.3 expects

 userClassPathFirst is often a remedy. There are several versions of
 this flag though. For example you need a different one if on YARN to
 have it take effect.

 It's worth ruling that out first. If all else fails you can shade 3.3.

 On Fri, Mar 20, 2015 at 11:44 AM, Jacob Abraham abe.jac...@gmail.com
 wrote:
  Anyone? or is this question nonsensical... and I am doing something
  fundamentally wrong?
 
 
 
  On Mon, Mar 16, 2015 at 5:33 PM, Jacob Abraham abe.jac...@gmail.com
 wrote:
 
  Hi Folks,
 
  I have a situation where I am getting a version conflict between java
  libraries that is used by my application and ones used by spark.
 
  Following are the details -
 
  I use spark provided by Cloudera running on the CDH5.3.2 cluster (Spark
  1.2.0-cdh5.3.2). The library that is causing the conflict is
 commons-net.
 
  In our spark application we use commons-net with version 3.3.
 
  However I found out that spark uses commons-net version 2.2.
 
  Hence when we try to submit our application using spark-submit, I end up
  getting, a NoSuchMethodError()
 
  Error starting receiver 5 -
 
 
  java.lang.NoSuchMethodError:
  org.apache.commons.net.ftp.FTPClient.setAutodetectUTF8(Z)V
 
   at ZipStream.onStart(ZipStream.java:55)
   at
 
 org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
   at
 
 org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
   at
 
 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277)
   at
 
 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269)
 
.
 
 
 
 
 
 
  Now, if I change the commons-net version to 2.2, the job runs fine
 (expect
  for the fact that some of the features we use from the commons-net 3.3
 are
  not there).
 
 
 
  How does one resolve such an issue where sparks uses one set of
 libraries
  and our user application requires the same set of libraries, but just a
  different version of it (In my case commons-net 2.2 vs 3.3).
 
 
  I see that there is a setting that I can supply -
  spark.files.userClassPathFirst, but the documentation says that it is
  experimental and for us this did not work at all.
 
 
  Thanks in advance.
 
 
  Regards,
 
  -Jacob
 
 
 
 
 



Re: Strange behavior with PySpark when using Join() and zip()

2015-03-23 Thread Sean Owen
I think the explanation is that the join does not guarantee any order,
since it causes a shuffle in general, and it is computed twice in the
first example, resulting in a difference for d1 and d2.

You can persist() the result of the join and in practice I believe
you'd find it behaves as expected, although that is even not 100%
guaranteed since a block could be lost and recomputed (differently).

If order matters, and it does for zip(), then the reliable way to
guarantee a well defined ordering for zipping is to sort the RDDs.

On Mon, Mar 23, 2015 at 6:27 PM, Ofer Mendelevitch
omendelevi...@hortonworks.com wrote:
 Hi,

 I am running into a strange issue when doing a JOIN of two RDDs followed by
 ZIP from PySpark.
 It’s part of a more complex application, but was able to narrow it down to a
 simplified example that’s easy to replicate and causes the same problem to
 appear:


 raw = sc.parallelize([('k'+str(x),'v'+str(x)) for x in range(100)])
 data = raw.join(raw).mapValues(lambda x: [x[0]]+[x[1]]).map(lambda pair:
 ','.join([x for x in pair[1]]))
 d1 = data.map(lambda s: s.split(',')[0])
 d2 = data.map(lambda s: s.split(',')[1])
 x = d1.zip(d2)

 print x.take(10)


 The output is:


 [('v44', 'v80'), ('v79', 'v44'), ('v80', 'v79'), ('v45', 'v78'), ('v81',
 'v81'), ('v78', 'v45'), ('v99', 'v99'), ('v82', 'v82'), ('v46', 'v46'),
 ('v83', 'v83')]


 As you can see, the ordering of items is not preserved anymore in all cases.
 (e.g., ‘v81’ is preserved, and ‘v45’ is not)
 Is it not supposed to be preserved?

 If I do the same thing without the JOIN:

 data = sc.parallelize('v'+str(x)+',v'+str(x) for x in range(100))
 d1 = data.map(lambda s: s.split(',')[0])
 d2 = data.map(lambda s: s.split(',')[1])
 x = d1.zip(d2)

 print x.take(10)

 The output is:


 [('v0', 'v0'), ('v1', 'v1'), ('v2', 'v2'), ('v3', 'v3'), ('v4', 'v4'),
 ('v5', 'v5'), ('v6', 'v6'), ('v7', 'v7'), ('v8', 'v8'), ('v9', 'v9')]


 As expected.

 Anyone run into this or a similar issue?

 Ofer

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark disk-to-disk

2015-03-23 Thread Koert Kuipers
there is a way to reinstate the partitioner, but that requires
sc.objectFile to read exactly what i wrote, which means sc.objectFile
should never split files on reading (a feature of hadoop file inputformat
that gets in the way here).

On Mon, Mar 23, 2015 at 1:39 PM, Koert Kuipers ko...@tresata.com wrote:

 i just realized the major limitation is that i lose partitioning info...

 On Mon, Mar 23, 2015 at 1:34 AM, Reynold Xin r...@databricks.com wrote:


 On Sun, Mar 22, 2015 at 6:03 PM, Koert Kuipers ko...@tresata.com wrote:

 so finally i can resort to:
 rdd.saveAsObjectFile(...)
 sc.objectFile(...)
 but that seems like a rather broken abstraction.


 This seems like a fine solution to me.





Re: version conflict common-net

2015-03-23 Thread Sean Owen
I think it's spark.yarn.user.classpath.first in 1.2, and
spark.{driver,executor}.extraClassPath in 1.3.  Obviously that's for
if you are using YARN, in the first instance.

On Mon, Mar 23, 2015 at 5:41 PM, Jacob Abraham abe.jac...@gmail.com wrote:
 Hi Sean,

 Thanks a ton for you reply.

 The particular situation I have is case (3) that you have mentioned. The
 class that I am using from commons-net is FTPClient(). This class is present
 in both the 2.2 version and the 3.3 version. However, in the 3.3 version
 there are two additional methods (among several others)
 setAutoDetectUTF8() and setControlKeepAliveTimeout() that we require to
 use.

 The class FTPClient() and the two methods are a part of a custom receiver
 (ZipStream), that we wrote. Our spark app is deployed on YARN. Looking
 online and doing more research I found this link - SPARK-939

 In the comments section, I found a mention that there is already a flag
 spark.yarn.user.classpath.first to make this happen. I have not yet tried
 it out. I guess, this should do the trick right? Also, there are some more
 JIRA items I see that, indicate combining the
 spark.files.userClassPathFirst and spark.yarn.user.classpath.first. It
 has been marked as resolved. However I am a bit confused about the state of
 the JIRA as far as Cloudera version of spark. We are using spark that comes
 with CDH 5.3.2 (Spark 1.2.0). I think this combined flag is marked for spark
 1.3.


 If all else fails shade 3.3... :)


 Thanks again,
 -Jacob













 On Fri, Mar 20, 2015 at 10:07 AM, Sean Owen so...@cloudera.com wrote:

 It's not a crazy question, no. I'm having a bit of trouble figuring
 out what's happening. Commons Net 2.2 is what's used by Spark. The
 error appears to come from Spark. But the error is not finding a
 method that did not exist in 2.2. I am not sure what ZipStream is, for
 example. This could be a bizarre situation where classloader rules
 mean that part of 2.2 and part of 3.3 are being used. For example,
 let's say:

 - your receiver uses 3.3 classes that are only in 3.3, so they are
 found in your user classloader
 - 3.3 classes call some class that also existed in 2.2, but those are
 found in the Spark classloader.
 - 2.2 class doesn't have methods that 3.3 expects

 userClassPathFirst is often a remedy. There are several versions of
 this flag though. For example you need a different one if on YARN to
 have it take effect.

 It's worth ruling that out first. If all else fails you can shade 3.3.

 On Fri, Mar 20, 2015 at 11:44 AM, Jacob Abraham abe.jac...@gmail.com
 wrote:
  Anyone? or is this question nonsensical... and I am doing something
  fundamentally wrong?
 
 
 
  On Mon, Mar 16, 2015 at 5:33 PM, Jacob Abraham abe.jac...@gmail.com
  wrote:
 
  Hi Folks,
 
  I have a situation where I am getting a version conflict between java
  libraries that is used by my application and ones used by spark.
 
  Following are the details -
 
  I use spark provided by Cloudera running on the CDH5.3.2 cluster (Spark
  1.2.0-cdh5.3.2). The library that is causing the conflict is
  commons-net.
 
  In our spark application we use commons-net with version 3.3.
 
  However I found out that spark uses commons-net version 2.2.
 
  Hence when we try to submit our application using spark-submit, I end
  up
  getting, a NoSuchMethodError()
 
  Error starting receiver 5 -
 
 
  java.lang.NoSuchMethodError:
  org.apache.commons.net.ftp.FTPClient.setAutodetectUTF8(Z)V
 
   at ZipStream.onStart(ZipStream.java:55)
   at
 
  org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
   at
 
  org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
   at
 
  org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277)
   at
 
  org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269)
 
.
 
 
 
 
 
 
  Now, if I change the commons-net version to 2.2, the job runs fine
  (expect
  for the fact that some of the features we use from the commons-net 3.3
  are
  not there).
 
 
 
  How does one resolve such an issue where sparks uses one set of
  libraries
  and our user application requires the same set of libraries, but just a
  different version of it (In my case commons-net 2.2 vs 3.3).
 
 
  I see that there is a setting that I can supply -
  spark.files.userClassPathFirst, but the documentation says that it is
  experimental and for us this did not work at all.
 
 
  Thanks in advance.
 
 
  Regards,
 
  -Jacob
 
 
 
 
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to check that a dataset is sorted after it has been written out?

2015-03-23 Thread Michael Albert
Thanks for the information! (to all who responded)
The code below *seems* to work.Any hidden gotcha's that anyone sees?
And still, in terasort, how did they check that the data was actually sorted? 
:-)
-Mike
class MyInputFormat[T]    extends parquet.hadoop.ParquetInputFormat[T]{    
override def getSplits(jobContext: org.apache.hadoop.mapreduce.JobContext)      
  :java.util.List[org.apache.hadoop.mapreduce.InputSplit] =    {        val 
splits = super.getSplits(jobContext)           import 
scala.collection.JavaConversions._        splits.sortBy{ split = split match { 
                        case fileSplit                            
:org.apache.hadoop.mapreduce.lib.input.FileSplit                                
        = (fileSplit.getPath.getName,                                          
   fileSplit.getStart)                         case _ = (,-1L) } }    }}

  From: Sean Owen so...@cloudera.com
 To: Michael Albert m_albert...@yahoo.com 
Cc: User user@spark.apache.org 
 Sent: Monday, March 23, 2015 7:31 AM
 Subject: Re: How to check that a dataset is sorted after it has been written 
out?
   
Data is not (necessarily) sorted when read from disk, no. A file might
have many blocks even, and while a block yields a partition in
general, the order in which those partitions appear in the RDD is not
defined. This is why you'd sort if you need the data sorted.

I think you could conceivably make some custom RDD or InputFormat that
reads blocks in a well-defined order and, assuming the data is sorted
in some knowable way on disk, then must have them sorted. I think
that's even been brought up.

Deciding whether the data is sorted is quite different. You'd have to
decide what ordering you expect (is part 0 before part 1? should it be
sorted in a part file?) and then just verify that externally.



On Fri, Mar 20, 2015 at 10:41 PM, Michael Albert
m_albert...@yahoo.com.invalid wrote:
 Greetings!

 I sorted a dataset in Spark and then wrote it out in avro/parquet.

 Then I wanted to check that it was sorted.

 It looks like each partition has been sorted, but when reading in, the first
 partition (i.e., as
 seen in the partition index of mapPartitionsWithIndex) is not the same  as
 implied by
 the names of the parquet files (even when the number of partitions is the
 same in the
 rdd which was read as on disk).

 If I take() a few hundred values, they are sorted, but they are *not* the
 same as if I
 explicitly open part-r-0.parquet and take values from that.

 It seems that when opening the rdd, the partitions of the rdd are not in
 the same
 order as implied by the data on disk (i.e., part-r-0.parquet,
 part-r-1.parquet, etc).

 So, how might one read the data so that one maintains the sort order?

 And while on the subject, after the terasort, how did they check that the
 data was actually sorted correctly? (or did they :-) ? ).

 Is there any way to read the data back in so as to preserve the sort, or do
 I need to
 zipWithIndex before writing it out, and write the index at that time? (I
 haven't tried the
 latter yet).

 Thanks!
 -Mike


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  

Converting SparkSQL query to Scala query

2015-03-23 Thread nishitd
I have a complex SparkSQL query of the nature

select a.a, b.b, c.c from a,b,c where a.x = b.x and b.y = c.y

How do I convert this efficiently into scala query of

a.join(b,..,..)

and so on. Can anyone help me with this? If my question needs more
clarification, please let me know.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Converting-SparkSQL-query-to-Scala-query-tp22192.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  1   2   >