Re: Is there a way to get previous/other keys' state in Spark Streaming?

2014-07-18 Thread Yan Fang
Thank you, TD. This is important information for us. Will keep an eye on
that.

Cheers,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


On Thu, Jul 17, 2014 at 6:54 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Yes, this is the limitation of the current implementation. But this will
 be improved a lt when we have IndexedRDD
 https://github.com/apache/spark/pull/1297 in the Spark that allows
 faster single value updates to a key-value (within each partition, without
 processing the entire partition.

 Soon.

 TD


 On Thu, Jul 17, 2014 at 5:57 PM, Yan Fang yanfang...@gmail.com wrote:

 Hi TD,

 Thank you. Yes, it behaves as you described. Sorry for missing this
 point.

 Then my only concern is in the performance side - since Spark Streaming
 operates on all the keys everytime a new batch comes, I think it is fine
 when the state size is small. When the state size becomes big, say, a few
 GBs, if we still go through the whole key list, would the operation be a
 little inefficient then? Maybe I miss some points in Spark Streaming, which
 consider this situation.

 Cheers,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Thu, Jul 17, 2014 at 1:47 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 The updateFunction given in updateStateByKey should be called on ALL the
 keys are in the state, even if there is no new data in the batch for some
 key. Is that not the behavior you see?

 What do you mean by show all the existing states? You have access to
 the latest state RDD by doing stateStream.foreachRDD(...). There you can do
 whatever operation on all the key-state pairs.

 TD




 On Thu, Jul 17, 2014 at 11:58 AM, Yan Fang yanfang...@gmail.com wrote:

 Hi TD,

 Thank you for the quick replying and backing my approach. :)

 1) The example is this:

 1. In the first 2 second interval, after updateStateByKey, I get a few
 keys and their states, say, (a - 1, b - 2, c - 3)
 2. In the following 2 second interval, I only receive c and d and
 their value. But I want to update/display the state of a and b
 accordingly.
 * It seems I have no way to access the a and b and get their
 states.
 * also, do I have a way to show all the existing states?

 I guess the approach to solve this will be similar to what you
 mentioned for 2). But the difficulty is that, if I want to display all the
 existing states, need to bundle all the rest keys to one key.

 Thank you.

 Cheers,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Thu, Jul 17, 2014 at 11:36 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 For accessing previous version, I would do it the same way. :)

 1. Can you elaborate on what you mean by that with an example? What do
 you mean by accessing keys?

 2. Yeah, that is hard to do with the ability to do point lookups into
 an RDD, which we dont support yet. You could try embedding the related key
 in the values of the keys that need it. That is, B will is present in the
 value of key A. Then put this transformed DStream through 
 updateStateByKey.

 TD








error from DecisonTree Training:

2014-07-18 Thread Jack Yang
Hi All,
I got an error while using DecisionTreeModel (my program is written in Java, 
spark 1.0.1, scala 2.10.1).
I have read a local file, loaded it as RDD, and then sent to decisionTree for 
training. See below for details:

JavaRDDLabeledPoint Points = lines.map(new ParsePoint()).cache();
LogisticRegressionModel model = 
LogisticRegressionWithSGD.train(Points.rdd(),iterations, stepSize);   // until 
here it is working
Strategy strategy = new Strategy( );
DecisionTree decisionTree = new DecisionTree(strategy);
DecisionTreeModel decisionTreeModel = decisionTree.train(Points.rdd());


The error is : java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast 
to [Lorg.apache.spark.mllib.regression.LabeledPoint;

Any thoughts?

Best regards,
Jack



Re: Spark Streaming

2014-07-18 Thread Guangle Fan
Thanks Tathagata! I tried it, and worked out perfectly.


On Thu, Jul 17, 2014 at 6:34 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Step 1: use rdd.mapPartitions(). Thats equivalent to mapper of the
 MapReduce. You can open connection, get all the data and buffer it, close
 connection, return iterator to the buffer
 Step 2: Make step 1 better, by making it reuse connections. You can use
 singletons / static vars, to lazily initialize and reuse a pool of
 connections. You will have to take care of concurrency, as multiple tasks
 may using the database in parallel in the same worker JVM.

 TD


 On Thu, Jul 17, 2014 at 4:41 PM, Guangle Fan fanguan...@gmail.com wrote:

 Hi, All

 When I run spark streaming, in one of the flatMap stage, I want to access
 database.

 Code looks like :

 stream.flatMap(
 new FlatMapFunction {
 call () {
 //access database cluster
 }
   }
 )

 Since I don't want to create database connection every time call() was
 called, where is the best place do I create the connection and reuse it on
 per-host basis (Like one database connection per Mapper/Reducer ) ?

 Regards,

 Guangle





Re: Large scale ranked recommendation

2014-07-18 Thread Nick Pentreath
It is very true that making predictions in batch for all 1 million users
against the 10k items will be quite onerous in terms of computation. I have
run into this issue too in making batch predictions.

Some ideas:

1. Do you really need to generate recommendations for each user in batch?
How are you serving these recommendations? In most cases, you only need to
make recs when a user is actively interacting with your service or product
etc. Doing it all in batch tends to be a big waste of computation resources.

In our system for example we are serving them in real time (as a user
arrives at a web page, say, our customer hits our API for recs), so we only
generate the rec at that time. You can take a look at Oryx for this (
https://github.com/cloudera/oryx) though it does not yet support Spark, you
may be able to save the model into the correct format in HDFS and have Oryx
read the data.

2. If you do need to make the recs in batch, then I would suggest:
(a) because you have few items, I would collect the item vectors and form a
matrix.
(b) broadcast that matrix
(c) do a mapPartitions on the user vectors. Form a user matrix from the
vectors in each partition (maybe create quite a few partitions to make each
user matrix not too big)
(d) do a value call on the broadcasted item matrix
(e) now for each partition you have the (small) item matrix and a (larger)
user matrix. Do a matrix multiply and you end up with a (U x I) matrix with
the scores for each user in the partition. Because you are using BLAS here,
it will be significantly faster than individually computed dot products
(f) sort the scores for each user and take top K
(g) save or collect and do whatever with the scores

3. in conjunction with (2) you can try throwing more resources at the
problem too

If you access the underlying Breeze vectors (I think the toBreeze method is
private so you may have to re-implement it), you can do all this using
Breeze (e.g. concatenating vectors to make matrices, iterating and whatnot).

Hope that helps

Nick


On Fri, Jul 18, 2014 at 1:17 AM, m3.sharma sharm...@umn.edu wrote:

 Yes, thats what prediction should be doing, taking dot products or sigmoid
 function for each user,item pair. For 1 million users and 10 K items data
 there are 10 billion pairs.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-ranked-recommendation-tp10098p10107.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



data locality

2014-07-18 Thread Haopu Wang
I have a standalone spark cluster and a HDFS cluster which share some of nodes.

 

When reading HDFS file, how does spark assign tasks to nodes? Will it ask HDFS 
the location for each file block in order to get a right worker node?

 

How about a spark cluster on Yarn?

 

Thank you very much!

 



Re: Is there a way to get previous/other keys' state in Spark Streaming?

2014-07-18 Thread Tathagata Das
Good to know! I am bumping the priority of this issue in my head. Thanks
for the feedback. Others seeing this thread, please comment if you think
that this is an important issue for you as well.

Not at my computer right now but I will make a Jira for this.

TD
On Jul 17, 2014 11:22 PM, Yan Fang yanfang...@gmail.com wrote:

 Thank you, TD. This is important information for us. Will keep an eye on
 that.

 Cheers,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Thu, Jul 17, 2014 at 6:54 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Yes, this is the limitation of the current implementation. But this will
 be improved a lt when we have IndexedRDD
 https://github.com/apache/spark/pull/1297 in the Spark that allows
 faster single value updates to a key-value (within each partition, without
 processing the entire partition.

 Soon.

 TD


 On Thu, Jul 17, 2014 at 5:57 PM, Yan Fang yanfang...@gmail.com wrote:

 Hi TD,

 Thank you. Yes, it behaves as you described. Sorry for missing this
 point.

 Then my only concern is in the performance side - since Spark Streaming
 operates on all the keys everytime a new batch comes, I think it is fine
 when the state size is small. When the state size becomes big, say, a few
 GBs, if we still go through the whole key list, would the operation be a
 little inefficient then? Maybe I miss some points in Spark Streaming, which
 consider this situation.

 Cheers,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Thu, Jul 17, 2014 at 1:47 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 The updateFunction given in updateStateByKey should be called on ALL
 the keys are in the state, even if there is no new data in the batch for
 some key. Is that not the behavior you see?

 What do you mean by show all the existing states? You have access to
 the latest state RDD by doing stateStream.foreachRDD(...). There you can do
 whatever operation on all the key-state pairs.

 TD




 On Thu, Jul 17, 2014 at 11:58 AM, Yan Fang yanfang...@gmail.com
 wrote:

 Hi TD,

 Thank you for the quick replying and backing my approach. :)

 1) The example is this:

 1. In the first 2 second interval, after updateStateByKey, I get a few
 keys and their states, say, (a - 1, b - 2, c - 3)
 2. In the following 2 second interval, I only receive c and d and
 their value. But I want to update/display the state of a and b
 accordingly.
 * It seems I have no way to access the a and b and get their
 states.
 * also, do I have a way to show all the existing states?

 I guess the approach to solve this will be similar to what you
 mentioned for 2). But the difficulty is that, if I want to display all the
 existing states, need to bundle all the rest keys to one key.

 Thank you.

 Cheers,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Thu, Jul 17, 2014 at 11:36 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 For accessing previous version, I would do it the same way. :)

 1. Can you elaborate on what you mean by that with an example? What
 do you mean by accessing keys?

 2. Yeah, that is hard to do with the ability to do point lookups into
 an RDD, which we dont support yet. You could try embedding the related 
 key
 in the values of the keys that need it. That is, B will is present in the
 value of key A. Then put this transformed DStream through 
 updateStateByKey.

 TD









Re: data locality

2014-07-18 Thread Sandy Ryza
Hi Haopu,

Spark will ask HDFS for file block locations and try to assign tasks based
on these.

There is a snag.  Spark schedules its tasks inside of executor processes
that stick around for the lifetime of a Spark application.  Spark requests
executors before it runs any jobs, i.e. before it has any information about
where the input data for the jobs is located.  If the executors occupy
significantly fewer nodes than exist in the cluster, it can be difficult
for Spark to achieve data locality.  The workaround for this is an API that
allows passing in a set of preferred locations when instantiating a Spark
context.  This API is currently broken in Spark 1.0, and will likely
changed to be something a little simpler in a future release.

val locData = InputFormatInfo.computePreferredLocations
  (Seq(new InputFormatInfo(conf, classOf[TextInputFormat], new
Path(“myfile.txt”)))

val sc = new SparkContext(conf, locData)

-Sandy



On Fri, Jul 18, 2014 at 12:35 AM, Haopu Wang hw...@qilinsoft.com wrote:

  I have a standalone spark cluster and a HDFS cluster which share some of
 nodes.



 When reading HDFS file, how does spark assign tasks to nodes? Will it ask
 HDFS the location for each file block in order to get a right worker node?



 How about a spark cluster on Yarn?



 Thank you very much!





Re: Large scale ranked recommendation

2014-07-18 Thread Bertrand Dechoux
And you might want to apply clustering before. It is likely that every user
and every item are not unique.

Bertrand Dechoux


On Fri, Jul 18, 2014 at 9:13 AM, Nick Pentreath nick.pentre...@gmail.com
wrote:

 It is very true that making predictions in batch for all 1 million users
 against the 10k items will be quite onerous in terms of computation. I have
 run into this issue too in making batch predictions.

 Some ideas:

 1. Do you really need to generate recommendations for each user in batch?
 How are you serving these recommendations? In most cases, you only need to
 make recs when a user is actively interacting with your service or product
 etc. Doing it all in batch tends to be a big waste of computation resources.

 In our system for example we are serving them in real time (as a user
 arrives at a web page, say, our customer hits our API for recs), so we only
 generate the rec at that time. You can take a look at Oryx for this (
 https://github.com/cloudera/oryx) though it does not yet support Spark,
 you may be able to save the model into the correct format in HDFS and have
 Oryx read the data.

 2. If you do need to make the recs in batch, then I would suggest:
 (a) because you have few items, I would collect the item vectors and form
 a matrix.
 (b) broadcast that matrix
 (c) do a mapPartitions on the user vectors. Form a user matrix from the
 vectors in each partition (maybe create quite a few partitions to make each
 user matrix not too big)
 (d) do a value call on the broadcasted item matrix
 (e) now for each partition you have the (small) item matrix and a (larger)
 user matrix. Do a matrix multiply and you end up with a (U x I) matrix with
 the scores for each user in the partition. Because you are using BLAS here,
 it will be significantly faster than individually computed dot products
 (f) sort the scores for each user and take top K
 (g) save or collect and do whatever with the scores

 3. in conjunction with (2) you can try throwing more resources at the
 problem too

 If you access the underlying Breeze vectors (I think the toBreeze method
 is private so you may have to re-implement it), you can do all this using
 Breeze (e.g. concatenating vectors to make matrices, iterating and whatnot).

 Hope that helps

 Nick


 On Fri, Jul 18, 2014 at 1:17 AM, m3.sharma sharm...@umn.edu wrote:

 Yes, thats what prediction should be doing, taking dot products or sigmoid
 function for each user,item pair. For 1 million users and 10 K items data
 there are 10 billion pairs.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-ranked-recommendation-tp10098p10107.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.





Re: Last step of processing is using too much memory.

2014-07-18 Thread Roch Denis
Well, for what it's worth, I found the issue after spending the whole night
running experiments;).

Basically, I needed to give a higher number of partition for the groupByKey.
I was simply using the default, which generated only 4 partitions and so the
whole thing blew up.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Last-step-of-processing-is-using-too-much-memory-tp10134p10147.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: data locality

2014-07-18 Thread Haopu Wang
Sandy,

 

Do you mean the “preferred location” is working for standalone cluster also? 
Because I check the code of SparkContext and see comments as below:

 

  // This is used only by YARN for now, but should be relevant to other cluster 
types (Mesos,

  // etc) too. This is typically generated from 
InputFormatInfo.computePreferredLocations. It

  // contains a map from hostname to a list of input format splits on the host.

  private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = 
Map()

 

BTW, even with the preferred hosts, how does Spark decide how many total 
executors to use for this application?

 

Thanks again!

 



From: Sandy Ryza [mailto:sandy.r...@cloudera.com] 
Sent: Friday, July 18, 2014 3:44 PM
To: user@spark.apache.org
Subject: Re: data locality

 

Hi Haopu,

 

Spark will ask HDFS for file block locations and try to assign tasks based on 
these.

 

There is a snag.  Spark schedules its tasks inside of executor processes that 
stick around for the lifetime of a Spark application.  Spark requests executors 
before it runs any jobs, i.e. before it has any information about where the 
input data for the jobs is located.  If the executors occupy significantly 
fewer nodes than exist in the cluster, it can be difficult for Spark to achieve 
data locality.  The workaround for this is an API that allows passing in a set 
of preferred locations when instantiating a Spark context.  This API is 
currently broken in Spark 1.0, and will likely changed to be something a little 
simpler in a future release.

 

val locData = InputFormatInfo.computePreferredLocations

  (Seq(new InputFormatInfo(conf, classOf[TextInputFormat], new 
Path(“myfile.txt”)))

 

val sc = new SparkContext(conf, locData)

 

-Sandy

 

 

On Fri, Jul 18, 2014 at 12:35 AM, Haopu Wang hw...@qilinsoft.com wrote:

I have a standalone spark cluster and a HDFS cluster which share some of nodes.

 

When reading HDFS file, how does spark assign tasks to nodes? Will it ask HDFS 
the location for each file block in order to get a right worker node?

 

How about a spark cluster on Yarn?

 

Thank you very much!

 

 



incompatible local class serialVersionUID with spark Shark

2014-07-18 Thread Megane1994
Hello,

I want to run Shark on yarn. 

My environment
Shark-0.9.1.
Spark-1.0.0
hadoop-2.3.0

My first question is that: Is it possible to run shark-0.9.1 with
Spark-1.0.0 on yarn? or Shark and Spark have to be necessarily in the same
version?

For the moment, when i make a  request like show tables,show databases
on Shark, it works well. But, when i make any select query (like select *
from test), i get this error: 

org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed 4 times
(most recent failure: Exception failure: java.io.InvalidClassException:
org.apache.spark.scheduler.Task; local class incompatible: stream classdesc
serialVersionUID = -14123687772669932, local class serialVersionUID =
-6531051710822689816)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
FAILED: Execution Error, return code -101 from shark.execution.SparkTask



Any ideas? 
i looked  for a solution  all the week, but without success.Any suggestions
will be appreciate. Thanks







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/incompatible-local-class-serialVersionUID-with-spark-Shark-tp10149.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


concurrent jobs

2014-07-18 Thread Haopu Wang
By looking at the code of JobScheduler, I find a parameter of below:

 

  private val numConcurrentJobs = 
ssc.conf.getInt(spark.streaming.concurrentJobs, 1)

  private val jobExecutor = Executors.newFixedThreadPool(numConcurrentJobs)

 

Does that mean each App can have only one active stage?

 

In my psydo-code below:

 

 S1 = viewDStream.forEach( collect() )..

 S2 = viewDStream.forEach( collect() )..

 

There should be two “collect()” jobs for each batch interval, right? Are they 
running in parallel?

 

Thank you!



Re: GraphX Pragel implementation

2014-07-18 Thread Arun Kumar
Thanks


On Fri, Jul 18, 2014 at 12:22 AM, Ankur Dave ankurd...@gmail.com wrote:

 If your sendMsg function needs to know the incoming messages as well as
 the vertex value, you could define VD to be a tuple of the vertex value and
 the last received message. The vprog function would then store the incoming
 messages into the tuple, allowing sendMsg to access them.

 For example, if the vertex value was a String and the message type was an
 Int, you could call Pregel as follows:

 val graph: Graph[String, _] = ...

 graph.mapVertices((id, attr) = (attr, 0)).pregel(0)(
   (id, attr: (String, Int), msg: Int) = (attr._1, msg),
   edge = Iterator(...), // can use edge.srcAttr._2 and edge.dstAttr._2 to 
 access the messages  (a: Int, b: Int) = a + b)


 Ankur http://www.ankurdave.com/



Re: ClassNotFoundException: $line11.$read$ when loading an HDFS text file with SparkQL in spark-shell

2014-07-18 Thread Svend
Hi, 

Yes, the error still occurs when we replace the lambdas with named
functions: 



(same error traces as in previous posts)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ClassNotFoundException-line11-read-when-loading-an-HDFS-text-file-with-SparkQL-in-spark-shell-tp9954p10154.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


TreeNodeException: No function to evaluate expression. type: AttributeReference, tree: id#0 on GROUP BY

2014-07-18 Thread Martin Gammelsæter
Hi again!

I am having problems when using GROUP BY on both SQLContext and
HiveContext (same problem).

My code (simplified as much as possible) can be seen here:
http://pastebin.com/33rjW67H

In short, I'm getting data from a Cassandra store with Datastax' new
driver (which works great by the way, recommended!), and mapping it to
a Spark SQL table through a Product class (Dokument in the source).
Regular SELECTs and stuff works fine, but once I try to do a GROUP BY,
I get the following error:

Exception in thread main org.apache.spark.SparkException: Job
aborted due to stage failure: Task 0.0:25 failed 4 times, most recent
failure: Exception failure in TID 63 on host 192.168.121.132:
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: No
function to evaluate expression. type: AttributeReference, tree: id#0

org.apache.spark.sql.catalyst.expressions.AttributeReference.eval(namedExpressions.scala:158)

org.apache.spark.sql.catalyst.expressions.MutableProjection.apply(Projection.scala:64)

org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7$$anon$1.next(Aggregate.scala:195)

org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7$$anon$1.next(Aggregate.scala:174)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
scala.collection.AbstractIterator.to(Iterator.scala:1157)

scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)

scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:750)
org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:750)

org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1096)

org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1096)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:112)
org.apache.spark.scheduler.Task.run(Task.scala:51)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

What am I doing wrong?

-- 
Best regards,
Martin Gammelsæter


spark sql left join gives KryoException: Buffer overflow

2014-07-18 Thread Pei-Lun Lee
Hi,

We have a query with left joining and got this error:

Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 1.0:0 failed 4 times, most recent failure: Exception failure
in TID 5 on host ip-10-33-132-101.us-west-2.compute.internal:
com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0,
required: 1

Looks like spark sql tried to do a broadcast join and collecting one of the
table to master but it is too large.

How do we explicitly control the join behavior like this?

--
Pei-Lun Lee


What is shuffle spill to memory?

2014-07-18 Thread Sébastien Rainville
Hi,

in the Spark UI, one of the metrics is shuffle spill (memory). What is it
exactly? Spilling to disk when the shuffle data doesn't fit in memory I get
it, but what does it mean to spill to memory?

Thanks,

- Sebastien


Re: Error with spark-submit (formatting corrected)

2014-07-18 Thread MEETHU MATHEW
Hi,
Instead of spark://10.1.3.7:7077 use spark://vmsparkwin1:7077  try this

$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master
 spark://vmsparkwin1:7077 --executor-memory 1G --total-executor-cores 2
 ./lib/spark-examples-1.0.0-hadoop2.2.0.jar 10

 
Thanks  Regards, 
Meethu M


On Friday, 18 July 2014 7:51 AM, Jay Vyas jayunit100.apa...@gmail.com wrote:
 


I think I know what is happening to you.  I've looked some into this just this 
week, and so its fresh in my brain :) hope this helps.


When no workers are known to the master, iirc, you get this message.

I think  this is how it works.

1) You start your master
2) You start a slave, and give it master url as an argument.
3) The slave then binds to a random port
4) The slave then does a handshake with master, which you can see in the slave 
logs (it sais something like sucesfully connected to master at ….
  Actualy, i think tha master also logs that it now is aware of a slave running 
on ip:port…

So in your case, I suspect, none of the slaves have connected to the master, so 
the job sits idle.

This is similar to the yarn scenario of submitting a job to a resource manager 
with no node-managers running. 



On Jul 17, 2014, at 6:57 PM, ranjanp piyush_ran...@hotmail.com wrote:

 Hi, 
 I am new to Spark and trying out with a stand-alone, 3-node (1 master, 2
 workers) cluster. 
 
 From the Web UI at the master, I see that the workers are registered. But
 when I try running the SparkPi example from the master node, I get the
 following message and then an exception. 
 
 14/07/17 01:20:36 INFO AppClient$ClientActor: Connecting to master
 spark://10.1.3.7:7077... 
 14/07/17 01:20:46 WARN TaskSchedulerImpl: Initial job has not accepted any
 resources; check your cluster UI to ensure that workers are registered and
 have sufficient memory 
 
 I searched a bit for the above warning, and found and found that others have
 encountered this problem before, but did not see a clear resolution except
 for this link:
 http://apache-spark-user-list.1001560.n3.nabble.com/TaskSchedulerImpl-Initial-job-has-not-accepted-any-resources-check-your-cluster-UI-to-ensure-that-woy-tt8247.html#a8444
 
 Based on the suggestion there I tried supplying --executor-memory option to
 spark-submit but that did not help. 
 
 Any suggestions. Here are the details of my set up. 
 - 3 nodes (each with 4 CPU cores and 7 GB memory) 
 - 1 node configured as Master, and the other two configured as workers 
 - Firewall is disabled on all nodes, and network communication between the
 nodes is not a problem 
 - Edited the conf/spark-env.sh on all nodes to set the following: 
  SPARK_WORKER_CORES=3 
  SPARK_WORKER_MEMORY=5G 
 - The Web UI as well as logs on master show that Workers were able to
 register correctly. Also the Web UI correctly shows the aggregate available
 memory and CPU cores on the workers: 
 
 URL: spark://vmsparkwin1:7077
 Workers: 2
 Cores: 6 Total, 0 Used
 Memory: 10.0 GB Total, 0.0 B Used
 Applications: 0 Running, 0 Completed
 Drivers: 0 Running, 0 Completed
 Status: ALIVE
 
 I try running the SparkPi example first using the run-example (which was
 failing) and later directly using the spark-submit as shown below: 
 
 $ export MASTER=spark://vmsparkwin1:7077
 
 $ echo $MASTER
 spark://vmsparkwin1:7077
 
 azureuser@vmsparkwin1 /cygdrive/c/opt/spark-1.0.0
 $ ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master
 spark://10.1.3.7:7077 --executor-memory 1G --total-executor-cores 2
 ./lib/spark-examples-1.0.0-hadoop2.2.0.jar 10
 
 
 The following is the full screen output:
 
 14/07/17 01:20:13 INFO SecurityManager: Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties
 14/07/17 01:20:13 INFO SecurityManager: Changing view acls to: azureuser
 14/07/17 01:20:13 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(azureuser)
 14/07/17 01:20:14 INFO Slf4jLogger: Slf4jLogger started
 14/07/17 01:20:14 INFO Remoting: Starting remoting
 14/07/17 01:20:14 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sp...@vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49839]
 14/07/17 01:20:14 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://sp...@vmsparkwin1.cssparkwin.b1.internal.cloudapp.net:49839]
 14/07/17 01:20:14 INFO SparkEnv: Registering MapOutputTracker
 14/07/17 01:20:14 INFO SparkEnv: Registering BlockManagerMaster
 14/07/17 01:20:14 INFO DiskBlockManager: Created local directory at
 C:\cygwin\tmp\spark-local-20140717012014-b606
 14/07/17 01:20:14 INFO MemoryStore: MemoryStore started with capacity 294.9
 MB.
 14/07/17 01:20:14 INFO ConnectionManager: Bound socket to port 49842 with id
 = ConnectionManagerId(vmsparkwin1.cssparkwin.b1.internal.cloudapp.net,49842)
 14/07/17 01:20:14 INFO BlockManagerMaster: Trying to register BlockManager
 14/07/17 01:20:14 INFO BlockManagerInfo: Registering block manager
 

Dividing tasks among Spark workers

2014-07-18 Thread Madhura
I am running my program on a spark cluster but when I look into my UI while
the job is running I see that only one worker does most of the tasks. My
cluster has one master and 4 workers where the master is also a worker.

I want my task to complete as quickly as possible and I believe that if the
number of tasks were to be divided equally among the workers, the job will
be completed faster.

Is there any way I can customize the umber of job on each worker?
http://apache-spark-user-list.1001560.n3.nabble.com/file/n10160/Question.png 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Dividing-tasks-among-Spark-workers-tp10160.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: spark1.0.1 spark sql error java.lang.NoClassDefFoundError: Could not initialize class $line11.$read$

2014-07-18 Thread Victor Sheng
Hi,Svend
  Your reply is very helpful to me. I'll keep an eye on that ticket.
  And also... Cheers  :)
Best Regards,
Victor



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-1-spark-sql-error-java-lang-NoClassDefFoundError-Could-not-initialize-class-line11-read-tp10135p10162.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: how to pass extra Java opts to workers for spark streaming jobs

2014-07-18 Thread Chen Song
Thanks Andrew, I tried and it works.


On Fri, Jul 18, 2014 at 12:53 AM, Andrew Or and...@databricks.com wrote:

 You will need to include that in the SPARK_JAVA_OPTS environment variable,
 so add the following line to spark-env.sh:

 export SPARK_JAVA_OPTS= -XX:+UseConcMarkSweepGC

 This should propagate to the executors. (Though you should double check,
 since 0.9 is a little old and I could be forgetting something) If you wish
 to add spark options in addition to this, simply append them to the
 environment variable:

 export SPARK_JAVA_OPTS= -XX:+UseConcMarkSweepGC -Dspark.config.one=value
 -Dspark.config.two=value

 (Please note that this is only for Spark 0.9. The part where we set Spark
 options within SPARK_JAVA_OPTS is deprecated as of 1.0)


 2014-07-17 21:08 GMT-07:00 Chen Song chen.song...@gmail.com:

 Thanks Andrew.

 Say that I want to turn on CMS gc for each worker.

 All I need to do is add the following line to conf/spark-env.sh on node
 where I submit the application.

 -XX:+UseConcMarkSweepGC

 Is that correct?

 Will this option be populated to each worker in yarn?



 On Thu, Jul 17, 2014 at 9:26 PM, Andrew Or and...@databricks.com wrote:

 Hi Chen,

 spark.executor.extraJavaOptions is introduced in Spark 1.0, not in Spark
 0.9. You need to

 export SPARK_JAVA_OPTS= -Dspark.config1=value1 -Dspark.config2=value2

 in conf/spark-env.sh.

 Let me know if that works.
 Andrew


 2014-07-17 18:15 GMT-07:00 Tathagata Das tathagata.das1...@gmail.com:

 Can you check in the environment tab of Spark web ui to see whether this
 configuration parameter is in effect?

 TD


 On Thu, Jul 17, 2014 at 2:05 PM, Chen Song chen.song...@gmail.com
 wrote:

 I am using spark 0.9.0 and I am able to submit job to YARN,
 https://spark.apache.org/docs/0.9.0/running-on-yarn.html.

 I am trying to turn on gc logging on executors but could not find a
 way to set extra Java opts for workers.

 I tried to set spark.executor.extraJavaOptions but that did not work.

 Any idea on how I should do this?

 --
 Chen Song






 --
 Chen Song





-- 
Chen Song


Re: spark streaming rate limiting from kafka

2014-07-18 Thread Chen Song
Thanks Tathagata,

That would be awesome if Spark streaming can support receiving rate in
general. I tried to explore the link you provided but could not find any
specific JIRA related to this? Do you have the JIRA number for this?



On Thu, Jul 17, 2014 at 9:21 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 You can create multiple kafka stream to partition your topics across them,
 which will run multiple receivers or multiple executors. This is covered in
 the Spark streaming guide.
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

 And for the purpose of this thread, to answer the original question, we now
 have the ability
 https://issues.apache.org/jira/browse/SPARK-1854?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20Streaming%20ORDER%20BY%20priority%20DESC
 to limit the receiving rate. Its in the master branch, and will be
 available in Spark 1.1. It basically sets the limits at the receiver level
 (so applies to all sources) on what is the max records per second that can
 will be received by the receiver.

 TD


 On Thu, Jul 17, 2014 at 6:15 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Bill,

 are you saying, after repartition(400), you have 400 partitions on one
 host and the other hosts receive nothing of the data?

 Tobias


 On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 I also have an issue consuming from Kafka. When I consume from Kafka,
 there are always a single executor working on this job. Even I use
 repartition, it seems that there is still a single executor. Does anyone
 has an idea how to add parallelism to this job?



 On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com
 wrote:

 Thanks Luis and Tobias.


 On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Hi,

 On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com
 wrote:

 * Is there a way to control how far Kafka Dstream can read on
 topic-partition (via offset for example). By setting this to a small
 number, it will force DStream to read less data initially.


 Please see the post at

 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E
 Kafka's auto.offset.reset parameter may be what you are looking for.

 Tobias




 --
 Chen Song







-- 
Chen Song


Re: Distribute data from Kafka evenly on cluster

2014-07-18 Thread Chen Song
Speaking of this, I have another related question.

In my spark streaming job, I set up multiple consumers to receive data from
Kafka, with each worker from one partition.

Initially, Spark is intelligent enough to associate each worker to each
partition, to make data consumption distributed. After running for a while,
consumers rebalance themselves and some workers start reading partitions
which were with others. This leads to a situation that some worker read
from multiple partitions and some don't read at all. Because of data
volume, this causes heap pressure on some workers.

Any thoughts on why rebalance is triggered and how to monitor to avoid that?




On Fri, Jul 4, 2014 at 11:11 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 unfortunately, when I go the above approach, I run into this problem:

 http://mail-archives.apache.org/mod_mbox/kafka-users/201401.mbox/%3ccabtfevyxvtaqvnmvwmh7yscfgxpw5kmrnw_gnq72cy4oa1b...@mail.gmail.com%3E
 That is, a NoNode error in Zookeeper when rebalancing. The Kafka receiver
 will retry again and again, but will eventually fail, leading to
 unprocessed data and, worse, the task never terminating. There is nothing
 exotic about my setup; one Zookeeper node, one Kafka broker, so I am
 wondering if other people have seen this error before and, more important,
 how to fix it. When I don't use the approach of multiple kafkaStreams, I
 don't get this error, but also work is never distributed in my cluster...

 Thanks
 Tobias


 On Thu, Jul 3, 2014 at 11:58 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 Thank you very much for the link, that was very helpful!

 So, apparently the `topics: Map[String, Int]` parameter controls the
 number of partitions that the data is initially added to; the number N in

   val kafkaInputs = (1 to N).map { _ =
 ssc.kafkaStream(zkQuorum, groupId, Map(topic - 1))
   }
   val union = ssc.union(kafkaInputs)

 controls how many connections are made to Kafka. Note that the number of
 Kafka partitions for that topic must be at least N for this to work.

 Thanks
 Tobias





-- 
Chen Song


Re: Dividing tasks among Spark workers

2014-07-18 Thread Shannon Quinn

The default # of partitions is the # of cores, correct?

On 7/18/14, 10:53 AM, Yanbo Liang wrote:

check how many partitions in your program.
If only one, change it to more partitions will make the execution 
parallel.



2014-07-18 20:57 GMT+08:00 Madhura das.madhur...@gmail.com 
mailto:das.madhur...@gmail.com:


I am running my program on a spark cluster but when I look into my
UI while
the job is running I see that only one worker does most of the
tasks. My
cluster has one master and 4 workers where the master is also a
worker.

I want my task to complete as quickly as possible and I believe
that if the
number of tasks were to be divided equally among the workers, the
job will
be completed faster.

Is there any way I can customize the umber of job on each worker?

http://apache-spark-user-list.1001560.n3.nabble.com/file/n10160/Question.png



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/Dividing-tasks-among-Spark-workers-tp10160.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.






Re: Distribute data from Kafka evenly on cluster

2014-07-18 Thread Tobias Pfeiffer
Hi,

as far as I know, rebalance is triggered from Kafka in order to distribute
partitions evenly. That is, to achieve the opposite of what you are seeing.
I think it would be interesting to check the Kafka logs for the result of
the rebalance operation and why you see what you are seeing. I know that in
the client logs it says which partitions of a topic were assigned to this
particular consumer, maybe you can have a look.

Tobias


On Fri, Jul 18, 2014 at 11:42 PM, Chen Song chen.song...@gmail.com wrote:

 Speaking of this, I have another related question.

 In my spark streaming job, I set up multiple consumers to receive data
 from Kafka, with each worker from one partition.

 Initially, Spark is intelligent enough to associate each worker to each
 partition, to make data consumption distributed. After running for a while,
 consumers rebalance themselves and some workers start reading partitions
 which were with others. This leads to a situation that some worker read
 from multiple partitions and some don't read at all. Because of data
 volume, this causes heap pressure on some workers.

 Any thoughts on why rebalance is triggered and how to monitor to avoid
 that?




 On Fri, Jul 4, 2014 at 11:11 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 unfortunately, when I go the above approach, I run into this problem:

 http://mail-archives.apache.org/mod_mbox/kafka-users/201401.mbox/%3ccabtfevyxvtaqvnmvwmh7yscfgxpw5kmrnw_gnq72cy4oa1b...@mail.gmail.com%3E
 That is, a NoNode error in Zookeeper when rebalancing. The Kafka
 receiver will retry again and again, but will eventually fail, leading to
 unprocessed data and, worse, the task never terminating. There is nothing
 exotic about my setup; one Zookeeper node, one Kafka broker, so I am
 wondering if other people have seen this error before and, more important,
 how to fix it. When I don't use the approach of multiple kafkaStreams, I
 don't get this error, but also work is never distributed in my cluster...

 Thanks
 Tobias


 On Thu, Jul 3, 2014 at 11:58 AM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Thank you very much for the link, that was very helpful!

 So, apparently the `topics: Map[String, Int]` parameter controls the
 number of partitions that the data is initially added to; the number N in

   val kafkaInputs = (1 to N).map { _ =
 ssc.kafkaStream(zkQuorum, groupId, Map(topic - 1))
   }
   val union = ssc.union(kafkaInputs)

 controls how many connections are made to Kafka. Note that the number of
 Kafka partitions for that topic must be at least N for this to work.

 Thanks
 Tobias





 --
 Chen Song




Re: Dividing tasks among Spark workers

2014-07-18 Thread Yanbo Liang
Clusters will not be fully utilized unless you set the level of parallelism
for each operation high enough. Spark automatically sets the number of
“map” tasks to run on each file according to its size. You can pass the
level of parallelism as a second argument or set the config property
*spark.default.parallelism* to change the default. In general, we recommend
2-3 tasks per CPU core in your cluster.
For example, the following code can set the partition number of data to 10
and it will be executed parallel:

val data = Array(1, 2, 3, 4, 5)val distData = sc.parallelize(data,10)




2014-07-18 23:00 GMT+08:00 Shannon Quinn squ...@gatech.edu:

  The default # of partitions is the # of cores, correct?


 On 7/18/14, 10:53 AM, Yanbo Liang wrote:

 check how many partitions in your program.
 If only one, change it to more partitions will make the execution
 parallel.


 2014-07-18 20:57 GMT+08:00 Madhura das.madhur...@gmail.com:

 I am running my program on a spark cluster but when I look into my UI
 while
 the job is running I see that only one worker does most of the tasks. My
 cluster has one master and 4 workers where the master is also a worker.

 I want my task to complete as quickly as possible and I believe that if
 the
 number of tasks were to be divided equally among the workers, the job will
 be completed faster.

 Is there any way I can customize the umber of job on each worker?
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n10160/Question.png
 



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Dividing-tasks-among-Spark-workers-tp10160.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.






Re: Spark Streaming Json file groupby function

2014-07-18 Thread srinivas
Hi 

 I am able to save my RDD generated to local file that are coming from Spark
SQL that are getting from Spark Streaming. If i put the steamingcontext to
10 sec the data coming in that 10 sec time window is only processed by my
sql and the data is stored in the location i specified and for next set of
data (streamingcontext) its erroring that the save to folder already exist.
So i increase my time sparkcontext duration to 100 sec for this the data
thats comes in 100 sec window is processed at once and outputting the data
to several files in that folder like 10 different files
(part-0001,part-2...) each having one or two records. but i want to save
those files to single file. 
Please let me know if there any work around solution for this. 

the code that i am using

case class Record(ID:String,name:String,score:Int,school:String)
case class OutPut(name:String,score:String)
object KafkaWordCount {
  def main(args: Array[String]) {
if (args.length  4) {
  System.err.println(Usage: KafkaWordCount zkQuorum group topics
numThreads)
  System.exit(1)
}

   //StreamingExamples.setStreamingLogLevels()
val datenow = new Date()
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName(KafkaWordCount); 
 val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(100))
 val sqlContext = new SQLContext(sc)
val timer = Time(10)
   ssc.remember(Seconds(100))
//val timenow = new java.util.Date
import sqlContext._
val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap
 val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicpMap).map(_._2)
 val jsonf =
lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,Any]])
val fields
=jsonf.map(data=Record(data(ID).toString,data(name).toString,data(score).toString.toInt,data(school).toString))
fields.print()
//fields.saveAsTextFile(/home/ubuntu/spark-1.0.0/external/jsonfile2/`+timenow`)
val results = fields.foreachRDD((recrdd,timer) = {
recrdd.registerAsTable(table1)
val sqlreport =sqlContext.sql(select max(score) from table1 where ID =
'math' and score  50)
sqlreport.map(t=
OutPut(t(0).toString,t(1).toString)).collect().foreach(println)
//println(sqlreport)
//sqlreport.foreach(println)
sqlreport.saveAsTextFile(/home/ubuntu/spark-1.0.0/external/jsonfile2/+datenow)
})
//results.print()
ssc.start()
ssc.awaitTermination()
  }
Thanks,
-Srinivas



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p10170.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Python: saving/reloading RDD

2014-07-18 Thread Roch Denis
Hello,

Just to make sure I correctly read the doc and the forums. It's my
understanding that currently in python with Spark 1.0.1 there is no way to
save my RDD to disk that I can just reload. The hadoop RDD are not yet
present in Python.

Is that correct? I just want to make sure that's the case before I write a
workaround.

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Python-saving-reloading-RDD-tp10172.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Large scale ranked recommendation

2014-07-18 Thread Xiangrui Meng
Nick's suggestion is a good approach for your data. The item factors to 
broadcast should be a few MBs. -Xiangrui

 On Jul 18, 2014, at 12:59 AM, Bertrand Dechoux decho...@gmail.com wrote:
 
 And you might want to apply clustering before. It is likely that every user 
 and every item are not unique.
 
 Bertrand Dechoux
 
 
 On Fri, Jul 18, 2014 at 9:13 AM, Nick Pentreath nick.pentre...@gmail.com 
 wrote:
 It is very true that making predictions in batch for all 1 million users 
 against the 10k items will be quite onerous in terms of computation. I have 
 run into this issue too in making batch predictions.
 
 Some ideas:
 
 1. Do you really need to generate recommendations for each user in batch? 
 How are you serving these recommendations? In most cases, you only need to 
 make recs when a user is actively interacting with your service or product 
 etc. Doing it all in batch tends to be a big waste of computation resources.
 
 In our system for example we are serving them in real time (as a user 
 arrives at a web page, say, our customer hits our API for recs), so we only 
 generate the rec at that time. You can take a look at Oryx for this 
 (https://github.com/cloudera/oryx) though it does not yet support Spark, you 
 may be able to save the model into the correct format in HDFS and have Oryx 
 read the data.
 
 2. If you do need to make the recs in batch, then I would suggest:
 (a) because you have few items, I would collect the item vectors and form a 
 matrix.
 (b) broadcast that matrix
 (c) do a mapPartitions on the user vectors. Form a user matrix from the 
 vectors in each partition (maybe create quite a few partitions to make each 
 user matrix not too big)
 (d) do a value call on the broadcasted item matrix
 (e) now for each partition you have the (small) item matrix and a (larger) 
 user matrix. Do a matrix multiply and you end up with a (U x I) matrix with 
 the scores for each user in the partition. Because you are using BLAS here, 
 it will be significantly faster than individually computed dot products
 (f) sort the scores for each user and take top K
 (g) save or collect and do whatever with the scores
 
 3. in conjunction with (2) you can try throwing more resources at the 
 problem too
 
 If you access the underlying Breeze vectors (I think the toBreeze method is 
 private so you may have to re-implement it), you can do all this using 
 Breeze (e.g. concatenating vectors to make matrices, iterating and whatnot).
 
 Hope that helps
 
 Nick
 
 
 On Fri, Jul 18, 2014 at 1:17 AM, m3.sharma sharm...@umn.edu wrote:
 Yes, thats what prediction should be doing, taking dot products or sigmoid
 function for each user,item pair. For 1 million users and 10 K items data
 there are 10 billion pairs.
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-ranked-recommendation-tp10098p10107.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 


Re: Python: saving/reloading RDD

2014-07-18 Thread Xiangrui Meng
You can save RDDs to text files using RDD.saveAsTextFile and load it back using 
sc.textFile. But make sure the record to string conversion is correctly 
implemented if the type is not primitive and you have the parser to load them 
back. -Xiangrui

 On Jul 18, 2014, at 8:39 AM, Roch Denis rde...@exostatic.com wrote:
 
 Hello,
 
 Just to make sure I correctly read the doc and the forums. It's my
 understanding that currently in python with Spark 1.0.1 there is no way to
 save my RDD to disk that I can just reload. The hadoop RDD are not yet
 present in Python.
 
 Is that correct? I just want to make sure that's the case before I write a
 workaround.
 
 Thanks!
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Python-saving-reloading-RDD-tp10172.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Python: saving/reloading RDD

2014-07-18 Thread Shannon Quinn
+1, had to learn this the hard way when some of my objects were written 
as pointers, rather than translated correctly to strings :)


On 7/18/14, 11:52 AM, Xiangrui Meng wrote:

You can save RDDs to text files using RDD.saveAsTextFile and load it back using 
sc.textFile. But make sure the record to string conversion is correctly 
implemented if the type is not primitive and you have the parser to load them 
back. -Xiangrui


On Jul 18, 2014, at 8:39 AM, Roch Denis rde...@exostatic.com wrote:

Hello,

Just to make sure I correctly read the doc and the forums. It's my
understanding that currently in python with Spark 1.0.1 there is no way to
save my RDD to disk that I can just reload. The hadoop RDD are not yet
present in Python.

Is that correct? I just want to make sure that's the case before I write a
workaround.

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Python-saving-reloading-RDD-tp10172.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




Re: the default GraphX graph-partition strategy on multicore machine?

2014-07-18 Thread Yifan LI
Hi Ankur,

Thanks so much! :))

Yes, is possible to defining a custom partition strategy?

And, some other questions:
(2*4 cores machine, 24GB memory)

- if I load one edges file(5 GB), without any cores/partitions setting, what is 
the default partition in graph construction? and how many cores will be used?
Or, if the size of file is 50 GB(more than available memory, without partition 
setting)?

- because each vertex must be replicated to all partitions where it is 
referenced.
I don't understand, for instance, we have 3 edge partition tables(EA: a - b, a 
- c; EB: a - d, a - e; EC: d - c ), 2 vertex partition tables(VA: a, b, c; 
VB: d, e), the whole vertex table VA will be replicated to all these 3 edge 
partitions? since each of them refers to some vertexes in VA.

- there is no shared-memory parallelism.
You mean that the core is stricter to access only its own partition in memory?
how do they communicate when the required data(edges?) in another partition?



On Jul 15, 2014, at 9:30 PM, Ankur Dave ankurd...@gmail.com wrote:

 On Jul 15, 2014, at 12:06 PM, Yifan LI iamyifa...@gmail.com wrote: 
 Btw, is there any possibility to customise the partition strategy as we 
 expect?
 
 I'm not sure I understand. Are you asking about defining a custom partition 
 strategy?
 
 On Tue, Jul 15, 2014 at 6:20 AM, Yifan LI iamyifa...@gmail.com wrote:
 when I load the file using sc.textFile (minPartitions = 16, 
 PartitionStrategy.RandomVertexCut)
 
 The easiest way to load the edge file would actually be to use 
 GraphLoader.edgeListFile(sc, path, minEdgePartitions = 
 16).partitionBy(PartitionStrategy.RandomVertexCut).
 
 1) how much data will be loaded into memory?
 
 The exact size of the graph (vertices + edges) in memory depends on the 
 graph's structure, the partition function, and the average vertex degree, 
 because each vertex must be replicated to all partitions where it is 
 referenced.
 
 It's easier to estimate the size of just the edges, which I did on the 
 mailing list a while ago. To summarize, during graph construction each edge 
 takes 60 bytes, and once the graph is constructed it takes 20 bytes.
 
 2) how many partitions will be stored in memory?
 
 Once you call cache() on the graph, all 16 partitions will be stored in 
 memory. You can also tell GraphX to spill them to disk in case of memory 
 pressure by passing edgeStorageLevel=StorageLevel.MEMORY_AND_DISK to 
 GraphLoader.edgeListFile.
 
 3) If the thread/task on each core will read only one edge from memory and 
 then compute it at every time?
 
 Yes, each task is single-threaded, so it will read the edges in its partition 
 sequentially.
 
 3.1) which edge on memory was determined to read into cache?
 
 In theory, each core should independently read the edges in its partition 
 into its own cache. Cache issues should be much less of a concern than in 
 most applications because different tasks (cores) operate on independent 
 partitions; there is no shared-memory parallelism. The tradeoff is heavier 
 reliance on shuffles.
 
 3.2) how are those partitions being scheduled?
 
 Spark handles the scheduling. There are details in Section 5.1 of the Spark 
 NSDI paper; in short, tasks are scheduled to run on the same machine as their 
 data partition, and by default each machine can accept at most one task per 
 core.
 
 Ankur
 



Re: Python: saving/reloading RDD

2014-07-18 Thread Roch Denis
Yeah but I would still have to do a map pass with an ast.litteral_eval() for
each line, correct?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Python-saving-reloading-RDD-tp10172p10179.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: What is shuffle spill to memory?

2014-07-18 Thread Andrew Or
Shuffle spill (memory) is the size of the deserialized form of the data in
memory at the time when we spill it, whereas shuffle spill (disk) is the
size of the serialized form of the data on disk after we spill it. This is
why the latter tends to be much smaller than the former. Note that both
metrics are aggregated over the entire duration of the task (i.e. within
each task you can spill multiple times).

Andrew


2014-07-18 4:09 GMT-07:00 Sébastien Rainville sebastienrainvi...@gmail.com
:

 Hi,

 in the Spark UI, one of the metrics is shuffle spill (memory). What is
 it exactly? Spilling to disk when the shuffle data doesn't fit in memory I
 get it, but what does it mean to spill to memory?

 Thanks,

 - Sebastien




Re: Need help on Spark UDF (Join) Performance tuning .

2014-07-18 Thread S Malligarjunan
Hello Experts,

Appreciate your input highly, please suggest/ give me hint, what would be the 
issue here?

 
Thanks and Regards,
Malligarjunan S.  



On Thursday, 17 July 2014, 22:47, S Malligarjunan smalligarju...@yahoo.com 
wrote:
 


Hello Experts,

I am facing performance problem when I use the UDF function call. Please help 
me to tune the query.
Please find the details below

shark select count(*) from table1;
OK
151096
Time taken: 7.242 seconds
shark select count(*) from table2; 
OK
938
Time taken: 1.273 seconds

Without UDF:
shark SELECT
 
 count(pvc1.time)
    FROM table2 pvc2 JOIN table1 pvc1
    WHERE pvc1.col1 = pvc2.col2
    AND  unix_timestamp(pvc2.time, '-MM-dd HH:mm:ss,SSS')  
unix_timestamp(pvc1.time, '-MM-dd HH:mm:ss,SSS');
OK
328
Time taken: 200.487 seconds


shark 
  SELECT
  count(pvc1.time)
        FROM table2 pvc2 JOIN table1 pvc1
    WHERE (pvc1.col1 = pvc2.col1 OR pvc1.col1 = pvc2.col2)
    AND  unix_timestamp(pvc2.time, '-MM-dd HH:mm:ss,SSS')  
unix_timestamp(pvc1.time, '-MM-dd HH:mm:ss,SSS');
OK
331
Time taken: 292.86 seconds

With UDF:
shark  
   SELECT
  count(pvc1.time)
    FROM table2 pvc2 JOIN table1 pvc1
    WHERE testCompare(pvc1.col1, pvc1.col2, pvc2.col1,pvc2.col2)
   
 AND  unix_timestamp(pvc2.time, '-MM-dd HH:mm:ss,SSS')  
unix_timestamp(pvc1.time, '-MM-dd HH:mm:ss,SSS');

OK
331
Time taken: 3718.23 seconds

The above UDF query takes more time to run. 


Where testCompare is an udf function, The function just does the pvc1.col1 = 
pvc2.col1 OR pvc1.col1 = pvc2.col2

Please let me know what is the issue here?

 
Thanks and Regards,
Sankar S.  

Re: Spark Streaming timestamps

2014-07-18 Thread Bill Jay
Hi Tathagata,




On Thu, Jul 17, 2014 at 6:12 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 The RDD parameter in foreachRDD contains raw/transformed data from the
 last batch. So when forearchRDD is called with the time parameter as 5:02:01
 and batch size is 1 minute, then the rdd will contain data based on the
 data received by between 5:02:00 and 5:02:01.

Do you mean the data between 5:02:02 and 5:02:01? The time parameter is
5:02:01. Moreover, when the program is running, it is very difficult to
specify a starting time because sometimes it is difficult to know when the
program executes that line. And do we need a different time parameter for
each foreachRDD or Spark will calculate the next one according to batch.


 If you want to do custom intervals, then I suggest the following
 1. Do 1 second batch intervals
 2. Then in the foreachRDD, from  5:02:30 to 5:03:28, put all the RDDs in
 a ArrayBuffer/ListBuffer
 3. At 5:03:29, add the RDD to the buffer, and do a union of all the
 buffered RDDs, and process them.

 So in foreachRDD, based on the time, buffer the RDDs, until you reach the
 appropriate time. Then union all the buffered RDDs and process them.

 TD


 On Thu, Jul 17, 2014 at 2:05 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 Thanks for your answer. Please see my further question below:


 On Wed, Jul 16, 2014 at 6:57 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Answers inline.


 On Wed, Jul 16, 2014 at 5:39 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am currently using Spark Streaming to conduct a real-time data
 analytics. We receive data from Kafka. We want to generate output files
 that contain results that are based on the data we receive from a specific
 time interval.

 I have several questions on Spark Streaming's timestamp:

 1) If I use saveAsTextFiles, it seems Spark streaming will generate
 files in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix
 time), etc. Does this mean the results are based on the data from 5:00:01
 to 5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time
 the files are generated?

 File named  5:00:01 contains results from data received between
  5:00:00 and  5:00:01 (based on system time of the cluster).



 2) If I do not use saveAsTextFiles, how do I get the exact time
 interval of the RDD when I use foreachRDD to do custom output of the
 results?

 There is a version of foreachRDD which allows you specify the function
 that takes in Time object.


 3) How can we specify the starting time of the batches?


 What do you mean? Batches are timed based on the system time of the
 cluster.

 I would like to control the starting time and ending time of each batch.
 For example, if I use saveAsTextFiles as output method and the batch size
 is 1 minute, Spark will align time intervals to complete minutes, such as
 5:01:00, 5:02:00, 5:03:00. It will have not results that are 5:01:03,
 5:02:03, 5:03:03, etc. My goal is to generate output for a customized
 interval such as from 5:01:30 to 5:02:29, 5:02:30 to 5:03:29, etc.

 I checked the api of foreachRDD with time parameter. It seems there is
 not explanation on what does that parameter mean. Does it mean the starting
 time of the first batch?




 Thanks!

 Bill







Re: Large scale ranked recommendation

2014-07-18 Thread m3.sharma
Thanks Nick real-time suggestion is good, will see if we can add that to our
deployment strategy and you are correct we may not need recommendation for
each user.   

Will try adding more resources and broadcasting item features suggestion as
currently they don't seem to be huge. 

As users and items both will continue to grow in future for faster vector
computations I think few GPU nodes will suffice to serve faster
recommendation after learning model with SPARK. It will be great to have
builtin GPU support in SPARK for faster computations to leverage GPU
capability of nodes for performing these flops faster. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-ranked-recommendation-tp10098p10183.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: MLLib - Regularized logistic regression in python

2014-07-18 Thread fjeg
Thanks for all your helpful replies.

Best,
Francisco



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-Regularized-logistic-regression-in-python-tp9780p10184.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Large scale ranked recommendation

2014-07-18 Thread Nick Pentreath
Agree GPUs may be interesting for this kind of massively parallel linear
algebra on reasonable size vectors.

These projects might be of interest in this regard:
https://github.com/BIDData/BIDMach
https://github.com/BIDData/BIDMat
https://github.com/dlwh/gust

Nick



On Fri, Jul 18, 2014 at 7:40 PM, m3.sharma sharm...@umn.edu wrote:

 Thanks Nick real-time suggestion is good, will see if we can add that to
 our
 deployment strategy and you are correct we may not need recommendation for
 each user.

 Will try adding more resources and broadcasting item features suggestion as
 currently they don't seem to be huge.

 As users and items both will continue to grow in future for faster vector
 computations I think few GPU nodes will suffice to serve faster
 recommendation after learning model with SPARK. It will be great to have
 builtin GPU support in SPARK for faster computations to leverage GPU
 capability of nodes for performing these flops faster.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-ranked-recommendation-tp10098p10183.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Decision tree classifier in MLlib

2014-07-18 Thread Joseph Bradley
Hi Sudha,
Have you checked if the labels are being loaded correctly?  It sounds like
the DT algorithm can't find any useful splits to make, so maybe it thinks
they are all the same?  Some data loading functions threshold labels to
make them binary.
Hope it helps,
Joseph


On Fri, Jul 11, 2014 at 2:25 PM, SK skrishna...@gmail.com wrote:

 Hi,

 I have a small dataset (120 training points, 30 test points) that I am
 trying to classify into binary classes (1 or 0). The dataset has 4
 numerical
 features and 1 binary label (1 or 0).

 I used LogisticRegression and SVM in MLLib and I got 100% accuracy in both
 cases. But when I used DecisionTree, I am getting only 33% accuracy
 (basically all the predicted test labels are 1 whereas actually only 10 out
 of the 30 should be 1). I tried modifying the different parameters
 (maxDepth, bins, impurity etc) and still am able to get only 33% accuracy.

 I used the same dataset with R's decision tree  (rpart) and I am getting
 100% accuracy. I would like to understand why the performance of MLLib's
 decision tree model is poor  and if there is some way I can improve it.

 thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Decision-tree-classifier-in-MLlib-tp9457.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Job aborted due to stage failure: TID x failed for unknown reasons

2014-07-18 Thread Shannon Quinn

Hi all,

I'm dealing with some strange error messages that I *think* comes down 
to a memory issue, but I'm having a hard time pinning it down and could 
use some guidance from the experts.


I have a 2-machine Spark (1.0.1) cluster. Both machines have 8 cores; 
one has 16GB memory, the other 32GB (which is the master). My 
application involves computing pairwise pixel affinities in images, 
though the images I've tested so far only get as big as 1920x1200, and 
as small as 16x16.


I did have to change a few memory and parallelism settings, otherwise I 
was getting explicit OutOfMemoryExceptions. In spark-default.conf:


spark.executor.memory14g
spark.default.parallelism32
spark.akka.frameSize1000

In spark-env.sh:

SPARK_DRIVER_MEMORY=10G

With those settings, however, I get a bunch of WARN statements about 
Lost TIDs (no task is successfully completed) in addition to lost 
Executors, which are repeated 4 times until I finally get the following 
error message and crash:


---

14/07/18 12:06:20 INFO TaskSchedulerImpl: Cancelling stage 0
14/07/18 12:06:20 INFO DAGScheduler: Failed to run collect at 
/home/user/Programming/PySpark-Affinities/affinity.py:243

Traceback (most recent call last):
  File /home/user/Programming/PySpark-Affinities/affinity.py, line 
243, in module

lambda x: np.abs(IMAGE.value[x[0]] - IMAGE.value[x[1]])
  File 
/net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/pyspark/rdd.py, 
line 583, in collect

bytesInJava = self._jrdd.collect().iterator()
  File 
/net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, 
line 537, in __call__
  File 
/net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, 
line 300, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling o27.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 0.0:13 failed 4 times, most recent failure: *TID 32 on host 
master.host.univ.edu failed for unknown reason*

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)

at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


14/07/18 12:06:20 INFO DAGScheduler: Executor lost: 4 (epoch 4)
14/07/18 12:06:20 INFO BlockManagerMasterActor: Trying to remove 
executor 4 from BlockManagerMaster.
14/07/18 12:06:20 INFO BlockManagerMaster: Removed 4 successfully in 
removeExecutor

user@master:~/Programming/PySpark-Affinities$

---

If I run the really small image instead (16x16), it *appears* to run to 
completion (gives me the output I expect without any exceptions being 
thrown). However, in the stderr logs for the app that was run, it lists 
the state as KILLED with the final message a ERROR 
CoarseGrainedExecutorBackend: Driver Disassociated. If I run any larger 
images, I get the exception I pasted above.


Furthermore, if I just do a spark-submit with master=local[*], aside 
from still needing to set the aforementioned memory options, it will 
work for an image of *any* size (I've tested both machines 
independently; they both do this when running as local[*]), whereas 
working on a cluster will result in the aforementioned crash at stage 0 
with anything but the smallest images.


Any ideas what is going on?

Thank you very much in advance!

Regards,

Re: spark streaming rate limiting from kafka

2014-07-18 Thread Tathagata Das
Oops, wrong link!
JIRA: https://github.com/apache/spark/pull/945/files
Github PR: https://github.com/apache/spark/pull/945/files


On Fri, Jul 18, 2014 at 7:19 AM, Chen Song chen.song...@gmail.com wrote:

 Thanks Tathagata,

 That would be awesome if Spark streaming can support receiving rate in
 general. I tried to explore the link you provided but could not find any
 specific JIRA related to this? Do you have the JIRA number for this?



 On Thu, Jul 17, 2014 at 9:21 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 You can create multiple kafka stream to partition your topics across
 them, which will run multiple receivers or multiple executors. This is
 covered in the Spark streaming guide.
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

 And for the purpose of this thread, to answer the original question, we now
 have the ability
 https://issues.apache.org/jira/browse/SPARK-1854?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20Streaming%20ORDER%20BY%20priority%20DESC
 to limit the receiving rate. Its in the master branch, and will be
 available in Spark 1.1. It basically sets the limits at the receiver level
 (so applies to all sources) on what is the max records per second that can
 will be received by the receiver.

 TD


 On Thu, Jul 17, 2014 at 6:15 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Bill,

 are you saying, after repartition(400), you have 400 partitions on one
 host and the other hosts receive nothing of the data?

 Tobias


 On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 I also have an issue consuming from Kafka. When I consume from Kafka,
 there are always a single executor working on this job. Even I use
 repartition, it seems that there is still a single executor. Does anyone
 has an idea how to add parallelism to this job?



 On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com
 wrote:

 Thanks Luis and Tobias.


 On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Hi,

 On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com
 wrote:

 * Is there a way to control how far Kafka Dstream can read on
 topic-partition (via offset for example). By setting this to a small
 number, it will force DStream to read less data initially.


 Please see the post at

 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E
 Kafka's auto.offset.reset parameter may be what you are looking for.

 Tobias




 --
 Chen Song







 --
 Chen Song




Re: Memory compute-intensive tasks

2014-07-18 Thread rpandya
Hi Matei-

Changing to coalesce(numNodes, true) still runs all partitions on a single
node, which I verified by printing the hostname before I exec the external
process.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p10189.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: spark streaming rate limiting from kafka

2014-07-18 Thread Tathagata Das
Dang! Messed it up again!

JIRA: https://issues.apache.org/jira/browse/SPARK-1341
Github PR: https://github.com/apache/spark/pull/945/files


On Fri, Jul 18, 2014 at 11:35 AM, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 Oops, wrong link!
 JIRA: https://github.com/apache/spark/pull/945/files
 Github PR: https://github.com/apache/spark/pull/945/files


 On Fri, Jul 18, 2014 at 7:19 AM, Chen Song chen.song...@gmail.com wrote:

 Thanks Tathagata,

 That would be awesome if Spark streaming can support receiving rate in
 general. I tried to explore the link you provided but could not find any
 specific JIRA related to this? Do you have the JIRA number for this?



 On Thu, Jul 17, 2014 at 9:21 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 You can create multiple kafka stream to partition your topics across
 them, which will run multiple receivers or multiple executors. This is
 covered in the Spark streaming guide.
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

 And for the purpose of this thread, to answer the original question, we now
 have the ability
 https://issues.apache.org/jira/browse/SPARK-1854?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20Streaming%20ORDER%20BY%20priority%20DESC
 to limit the receiving rate. Its in the master branch, and will be
 available in Spark 1.1. It basically sets the limits at the receiver level
 (so applies to all sources) on what is the max records per second that can
 will be received by the receiver.

 TD


 On Thu, Jul 17, 2014 at 6:15 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Bill,

 are you saying, after repartition(400), you have 400 partitions on one
 host and the other hosts receive nothing of the data?

 Tobias


 On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 I also have an issue consuming from Kafka. When I consume from Kafka,
 there are always a single executor working on this job. Even I use
 repartition, it seems that there is still a single executor. Does anyone
 has an idea how to add parallelism to this job?



 On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com
 wrote:

 Thanks Luis and Tobias.


 On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Hi,

 On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com
 wrote:

 * Is there a way to control how far Kafka Dstream can read on
 topic-partition (via offset for example). By setting this to a small
 number, it will force DStream to read less data initially.


 Please see the post at

 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E
 Kafka's auto.offset.reset parameter may be what you are looking for.

 Tobias




 --
 Chen Song







 --
 Chen Song





Spark Streaming with long batch / window duration

2014-07-18 Thread aaronjosephs
Would it be a reasonable use case of spark streaming to have a very large
window size (lets say on the scale of weeks). In this particular case the
reduce function would be invertible so that would aid in efficiency. I
assume that having a larger batch size since the window is so large would
also lighten the workload for spark. The sliding duration is not too
important, I just want to know if this is reasonable for spark to handle
with any slide duration



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-long-batch-window-duration-tp10191.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: the default GraphX graph-partition strategy on multicore machine?

2014-07-18 Thread Ankur Dave
On Fri, Jul 18, 2014 at 9:13 AM, Yifan LI iamyifa...@gmail.com wrote:

 Yes, is possible to defining a custom partition strategy?


Yes, you just need to create a subclass of PartitionStrategy as follows:

import org.apache.spark.graphx._
object MyPartitionStrategy extends PartitionStrategy {
  override def getPartition(src: VertexId, dst: VertexId, numParts:
PartitionID): PartitionID = {
// put your hash function here  }
}

if I load one edges file(5 GB), without any cores/partitions setting, what
 is the default partition in graph construction? and how many cores will be
 used?
 Or, if the size of file is 50 GB(more than available memory, without
 partition setting)?


If you don't specify a number of partitions, it will default to 2 or the
number of blocks in the input file, whichever is greater (this is the same
behavior as sc.textFile
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1243).
I'm not sure exactly how many this will be for your file, but you can find
out by running sc.textFile(...).partitions.length.

The default partitioning strategy is to leave the edges in the same
partitions as they were initially loaded from.

For the 50 GB file, everything is the same except the default number of
partitions will probably be larger. Hopefully each partition will
individually still fit in memory, which will allow GraphX to proceed.

the whole vertex table VA will be replicated to all these 3 edge
 partitions? since each of them refers to some vertexes in VA.


Yes, for that graph and partitioning, all vertices will be replicated to
all edge partitions. It won't be quite as bad for most graphs, and
EdgePartition2D
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.PartitionStrategy$$EdgePartition2D$
provides a bound on the replication factor, as described in the docs.

You mean that the core is stricter to access only its own partition in
 memory?
 how do they communicate when the required data(edges?) in another
 partition?


Right, there is one task per core, and each task only accesses its own
partition, never the partitions of other tasks.

Tasks communicate in two ways:
1. Vertex replication (multicast). When an edge needs to access its
neighboring vertices (to construct the triplets or perform the map step in
mapReduceTriplets), GraphX replicates vertices to all edge partitions where
they are referenced.
2. Vertex aggregation. When an edge needs to update the value of a
neighboring vertex (to perform the reduce step in mapReduceTriplets),
GraphX aggregates vertex updates from the edge partitions back to the
vertices.

Both of these communication steps happen using Spark shuffles, which work
by writing messages to disk and notifying other tasks to read them from
disk.

Note that edges never need to access other edges directly, and all
communication is done through the vertices.

Ankur http://www.ankurdave.com/


Re: the default GraphX graph-partition strategy on multicore machine?

2014-07-18 Thread Ankur Dave
Sorry, I didn't read your vertex replication example carefully, so my
previous answer is wrong. Here's the correct one:

On Fri, Jul 18, 2014 at 9:13 AM, Yifan LI iamyifa...@gmail.com wrote:

 I don't understand, for instance, we have 3 edge partition tables(EA: a -
 b, a - c; EB: a - d, a - e; EC: d - c ), 2 vertex partition tables(VA:
 a, b, c; VB: d, e), the whole vertex table VA will be replicated to all
 these 3 edge partitions? since each of them refers to some vertexes in VA.


Vertices can be replicated individually without requiring the entire vertex
partition to be replicated. In this case, here's what will get replicated
to each partition:

EA: a (from VA), b (from VA), c (from VA)
EB: a (from VA), d (from VB), e (from VB)
EC: c (from VA), d (from VB)

Ankur http://www.ankurdave.com/


Re: incompatible local class serialVersionUID with spark Shark

2014-07-18 Thread Michael Armbrust
There is no version of shark that works with spark 1.0.

More details about the path forward here:
http://databricks.com/blog/2014/07/01/shark-spark-sql-hive-on-spark-and-the-future-of-sql-on-spark.html
On Jul 18, 2014 4:53 AM, Megane1994 leumenilari...@yahoo.fr wrote:

 Hello,

 I want to run Shark on yarn.

 My environment
 Shark-0.9.1.
 Spark-1.0.0
 hadoop-2.3.0

 My first question is that: Is it possible to run shark-0.9.1 with
 Spark-1.0.0 on yarn? or Shark and Spark have to be necessarily in the same
 version?

 For the moment, when i make a  request like show tables,show databases
 on Shark, it works well. But, when i make any select query (like select *
 from test), i get this error:

 org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed 4 times
 (most recent failure: Exception failure: java.io.InvalidClassException:
 org.apache.spark.scheduler.Task; local class incompatible: stream classdesc
 serialVersionUID = -14123687772669932, local class serialVersionUID =
 -6531051710822689816)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
 at

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
 at scala.Option.foreach(Option.scala:236)
 at

 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at

 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at

 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at

 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 FAILED: Execution Error, return code -101 from shark.execution.SparkTask



 Any ideas?
 i looked  for a solution  all the week, but without success.Any suggestions
 will be appreciate. Thanks







 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/incompatible-local-class-serialVersionUID-with-spark-Shark-tp10149.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: TreeNodeException: No function to evaluate expression. type: AttributeReference, tree: id#0 on GROUP BY

2014-07-18 Thread Michael Armbrust
Sorry for the non-obvious error message.  It is not valid SQL to include
attributes in the select clause unless they are also in the group by clause
or are inside of an aggregate function.
On Jul 18, 2014 5:12 AM, Martin Gammelsæter martingammelsae...@gmail.com
wrote:

 Hi again!

 I am having problems when using GROUP BY on both SQLContext and
 HiveContext (same problem).

 My code (simplified as much as possible) can be seen here:
 http://pastebin.com/33rjW67H

 In short, I'm getting data from a Cassandra store with Datastax' new
 driver (which works great by the way, recommended!), and mapping it to
 a Spark SQL table through a Product class (Dokument in the source).
 Regular SELECTs and stuff works fine, but once I try to do a GROUP BY,
 I get the following error:

 Exception in thread main org.apache.spark.SparkException: Job
 aborted due to stage failure: Task 0.0:25 failed 4 times, most recent
 failure: Exception failure in TID 63 on host 192.168.121.132:
 org.apache.spark.sql.catalyst.errors.package$TreeNodeException: No
 function to evaluate expression. type: AttributeReference, tree: id#0

 org.apache.spark.sql.catalyst.expressions.AttributeReference.eval(namedExpressions.scala:158)

 org.apache.spark.sql.catalyst.expressions.MutableProjection.apply(Projection.scala:64)

 org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7$$anon$1.next(Aggregate.scala:195)

 org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7$$anon$1.next(Aggregate.scala:174)
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 scala.collection.Iterator$class.foreach(Iterator.scala:727)
 scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)

 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 scala.collection.TraversableOnce$class.to
 (TraversableOnce.scala:273)
 scala.collection.AbstractIterator.to(Iterator.scala:1157)

 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)

 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:750)
 org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:750)

 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1096)

 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1096)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:112)
 org.apache.spark.scheduler.Task.run(Task.scala:51)

 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)

 What am I doing wrong?

 --
 Best regards,
 Martin Gammelsæter



Re: Need help on Spark UDF (Join) Performance tuning .

2014-07-18 Thread Michael Armbrust
It's likely that since your UDF is a black box to hive's query optimizer
that it must choose a less efficient join algorithm that passes all
possible  matches to your function for comparison.  This will happen any
time your UDF touches attributes from both sides of the join.

In general you can learn more about the chosen execution strategy by
running explain.
On Jul 18, 2014 12:04 PM, S Malligarjunan smalligarju...@yahoo.com
wrote:

 Hello Experts,

 Appreciate your input highly, please suggest/ give me hint, what would be
 the issue here?


 Thanks and Regards,
 Malligarjunan S.



   On Thursday, 17 July 2014, 22:47, S Malligarjunan 
 smalligarju...@yahoo.com wrote:


 Hello Experts,

 I am facing performance problem when I use the UDF function call. Please
 help me to tune the query.
 Please find the details below

 shark select count(*) from table1;
 OK
 151096
 Time taken: 7.242 seconds
 shark select count(*) from table2;
 OK
 938
 Time taken: 1.273 seconds


 *Without UDF:*shark SELECT
   count(pvc1.time)
 FROM table2 pvc2 JOIN table1 pvc1
 WHERE pvc1.col1 = pvc2.col2
 AND  unix_timestamp(pvc2.time, '-MM-dd HH:mm:ss,SSS') 
 unix_timestamp(pvc1.time, '-MM-dd HH:mm:ss,SSS');
 OK
 328
 Time taken: 200.487 seconds


 shark
   SELECT
   count(pvc1.time)
 FROM table2 pvc2 JOIN table1 pvc1
 WHERE (pvc1.col1 = pvc2.col1 OR pvc1.col1 = pvc2.col2)
 AND  unix_timestamp(pvc2.time, '-MM-dd HH:mm:ss,SSS') 
 unix_timestamp(pvc1.time, '-MM-dd HH:mm:ss,SSS');
 OK
 331
 Time taken: 292.86 seconds

 *With UDF:*
 shark
SELECT
   count(pvc1.time)
 FROM table2 pvc2 JOIN table1 pvc1
 WHERE testCompare(pvc1.col1, pvc1.col2, pvc2.col1,pvc2.col2)
 AND  unix_timestamp(pvc2.time, '-MM-dd HH:mm:ss,SSS') 
 unix_timestamp(pvc1.time, '-MM-dd HH:mm:ss,SSS');

 OK
 331
 Time taken: 3718.23 seconds
 The above UDF query takes more time to run.

 Where testCompare is an udf function, The function just does the pvc1.col1
 = pvc2.col1 OR pvc1.col1 = pvc2.col2

 Please let me know what is the issue here?


 Thanks and Regards,
 Sankar S.






Re: spark1.0.1 spark sql error java.lang.NoClassDefFoundError: Could not initialize class $line11.$read$

2014-07-18 Thread Michael Armbrust
Can you tell us more about your environment.  Specifically, are you also
running on Mesos?
On Jul 18, 2014 12:39 AM, Victor Sheng victorsheng...@gmail.com wrote:

 when I run a query to a hadoop file.
 mobile.registerAsTable(mobile)
 val count = sqlContext.sql(select count(1) from mobile)
 res5: org.apache.spark.sql.SchemaRDD =
 SchemaRDD[21] at RDD at SchemaRDD.scala:100
 == Query Plan ==
 ExistingRdd [data_date#0,mobile#1,create_time#2], MapPartitionsRDD[4] at
 mapPartitions at basicOperators.scala:176

 when I run collect.
 count.collect()

 It throws exceptions, Can anyone help me ?

 Job aborted due to stage failure: Task 3.0:22 failed 4 times, most recent
 failure: Exception failure in TID 153 on host wh-8-210:
 java.lang.NoClassDefFoundError: Could not initialize class $line11.$read$
 $line12.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:19)
 $line12.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:19)
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 scala.collection.Iterator$$anon$1.next(Iterator.scala:853)
 scala.collection.Iterator$$anon$1.head(Iterator.scala:840)

 org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.apply(basicOperators.scala:181)

 org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.apply(basicOperators.scala:176)
 org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559)
 org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559)
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
 org.apache.spark.scheduler.Task.run(Task.scala:51)
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
 java.lang.Thread.run(Thread.java:722) Driver stacktrace:



 java.lang.ExceptionInInitializerError
 at $line11.$read$$iwC.init(console:6)
 at $line11.$read.init(console:26)
 at $line11.$read$.init(console:30)
 at $line11.$read$.clinit(console)
 at $line12.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:19)
 at $line12.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:19)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$$anon$1.next(Iterator.scala:853)
 at scala.collection.Iterator$$anon$1.head(Iterator.scala:840)
 at

 org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.apply(basicOperators.scala:181)
 at

 org.apache.spark.sql.execution.ExistingRdd$$anonfun$productToRowRdd$1.apply(basicOperators.scala:176)
 at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559)
 at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 at org.apache.spark.scheduler.Task.run(Task.scala:51)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
 at java.lang.Thread.run(Thread.java:722)


 My classpath is :

 /app/hadoop/spark-1.0.1/assembly/target/scala-2.10/spark-assembly-1.0.1-hadoop0.20.2-cdh3u5.jar
 System Classpath
 /app/hadoop/spark-1.0.1/confSystem Classpath
 /app/hadoop/spark-1.0.1/lib_managed/jars/JavaEWAH-0.3.2.jar System
 Classpath
 /app/hadoop/spark-1.0.1/lib_managed/jars/JavaEWAH-0.6.6.jar System
 Classpath
 /app/hadoop/spark-1.0.1/lib_managed/jars/ST4-4.0.4.jar  System Classpath
 

Re: Cannot connect to hive metastore

2014-07-18 Thread Michael Armbrust
See the section on advanced dependency management:

http://spark.apache.org/docs/latest/submitting-applications.html
On Jul 17, 2014 10:53 PM, linkpatrickliu linkpatrick...@live.com wrote:

 Seems like the mysql connector jar is not included in the classpath.
 Where can I set the jar to the classpath?

 hive-site.xml:
   property
 namejavax.jdo.option.ConnectionURL/name


 valuejdbc:mysql://localhost:3306/metastore?createDatabaseIfNotExist=trueamp;characterEncoding=UTF-8/value
 descriptionJDBC connect string for a JDBC metastore/description
   /property

 Log:
 14/07/18 11:46:58 ERROR DDLTask:
 org.apache.hadoop.hive.ql.metadata.HiveException:
 java.lang.RuntimeException: Unable to instantiate
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient
 at
 org.apache.hadoop.hive.ql.metadata.Hive.getDatabase(Hive.java:1143)
 at
 org.apache.hadoop.hive.ql.metadata.Hive.databaseExists(Hive.java:1128)
 at
 org.apache.hadoop.hive.ql.exec.DDLTask.showTables(DDLTask.java:2236)
 at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:333)
 at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:151)
 at
 org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:65)
 at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1414)
 at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1192)
 at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1020)
 at org.apache.hadoop.hive.ql.Driver.run(Driver.java:888)
 at
 org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:189)
 at
 org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:163)
 at

 org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35)
 at

 org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35)
 at

 org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:38)
 at

 org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:250)
 at

 org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:250)
 at
 org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)
 at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:100)
 at
 org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:75)
 at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:78)
 at $line9.$read$$iwC$$iwC$$iwC$$iwC.init(console:15)
 at $line9.$read$$iwC$$iwC$$iwC.init(console:20)
 at $line9.$read$$iwC$$iwC.init(console:22)
 at $line9.$read$$iwC.init(console:24)
 at $line9.$read.init(console:26)
 at $line9.$read$.init(console:30)
 at $line9.$read$.clinit(console)
 at $line9.$eval$.init(console:7)
 at $line9.$eval$.clinit(console)
 at $line9.$eval.$print(console)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788)
 at
 org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056)
 at
 org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609)
 at
 org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796)
 at

 org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841)
 at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753)
 at
 org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601)
 at
 org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608)
 at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611)
 at

 org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936)
 at

 org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
 at

 org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
 at

 scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
 at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884)
 at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:982)
 at org.apache.spark.repl.Main$.main(Main.scala:31)
 at org.apache.spark.repl.Main.main(Main.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

 

Re: can we insert and update with spark sql

2014-07-18 Thread Michael Armbrust
You can do insert into.  As with other SQL on HDFS systems there is no
updating of data.
On Jul 17, 2014 1:26 AM, Akhil Das ak...@sigmoidanalytics.com wrote:

 Is this what you are looking for?


 https://spark.apache.org/docs/1.0.0/api/java/org/apache/spark/sql/parquet/InsertIntoParquetTable.html

 According to the doc, it says Operator that acts as a sink for queries
 on RDDs and can be used to store the output inside a directory of Parquet
 files. This operator is similar to Hive's INSERT INTO TABLE operation in
 the sense that one can choose to either overwrite or append to a directory.
 Note that consecutive insertions to the same table must have compatible
 (source) schemas.

 Thanks
 Best Regards


 On Thu, Jul 17, 2014 at 11:42 AM, Hu, Leo leo.h...@sap.com wrote:

  Hi

As for spark 1.0, can we insert and update a table with SPARK SQL, and
 how?



 Thanks

 Best Regard





Visualization/Summary tools for Spark Streaming data

2014-07-18 Thread Subodh Nijsure
Hi,

This is my second week of working with Spark, pardon if this is elementary
question in spark domain.

I am looking for ways to render output of Spark Streaming.

First let me describe problem set. I am monitoring (push from devices every
minute) temperature/humidity and other environmental parameters across 1000
installations. This data comes to me via Kafka and goes into Spark
Streaming for processing.

There I am querying my own mySQL database to find out if specific location
is exceeding thresholds configured for specific site if it does, it spits
out alarm which is stored in mySQL DB. For all mySQL access I am using
java odbc interface.

Now the question all the trend data that I am saving for various
environmental parameters how do I graph/visualize them. Are there any
examples of how folks are doing visualization of data collected/refined by
Spark Streaming.

I watched the DataBricks presentation (
https://www.youtube.com/watch?v=dJQ5lV5Tldw) , that is the kind of system I
am looking for, but they are under limited beta so no way for me to get
hands on that tech. Are there any other platforms that offer similar
functionality for visualizing data under Apache Spark?

Regards,
-Subodh


Reading Avro Sequence Files

2014-07-18 Thread tcg
I'm trying to read and an Avro Sequence File using the sequenceFile method on
the spark context object and I get a NullPointerException.  If I read the
file outside of Spark using AvroSequenceFile.Reader I don't have any
problems.

Anyone have success in doing this?

Below is one I typed and saw at the spark shell:

scalavar myAvroSequenceFile = sc.sequenceFile(hdfs://my url is here,
classOf[AvroKey[GenericRecord], ClassOf[AvroValue[GenericRecord]])

scalamyAvroSequenceFile.first
14/07/18 16:31:31 INFO FileInputFormat: Total input paths to process : 1
14/07/18 16:31:31 INFO SparkContext: Starting job: first at console:18
14/07/18 16:31:31 INFO DAGScheduler: Got job 2 (first at console:18) with
1 output partitions (allowLocal=true)
14/07/18 16:31:31 INFO DAGScheduler: Final stage: Stage 2(first at
console:18)
14/07/18 16:31:31 INFO DAGScheduler: Parents of final stage: List()
14/07/18 16:31:31 INFO DAGScheduler: Missing parents: List()
14/07/18 16:31:31 INFO DAGScheduler: Computing the requested partition
locally
14/07/18 16:31:31 INFO HadoopRDD: Input split: hdfs://my url
14/07/18 16:31:31 INFO DAGScheduler: Failed to run first at console:18
java.lang.NullPointerException
at
org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1902)
at
org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1765)
at
org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1714)
at
org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1728)
at
org.apache.hadoop.mapred.SequenceFileRecordReader.init(SequenceFileRecordReader.java:43)
at
org.apache.hadoop.mapred.SequenceFileInputFormat.getRecordReader(SequenceFileInputFormat.java:59)
at
org.apache.spark.rdd.HadoopRDD$$anon$1.init(HadoopRDD.scala:190)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:181)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:93)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at
org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:574)
at
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:559)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Reading-Avro-Sequence-Files-tp10201.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Reading Avro Sequence Files

2014-07-18 Thread tcg
Correction: I get a null pointer exception when I attempt to perform an
action like 'first'.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Reading-Avro-Sequence-Files-tp10201p10202.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: NullPointerException When Reading Avro Sequence Files

2014-07-18 Thread aaronjosephs
I think you probably want to use `AvroSequenceFileOutputFormat` with
`newAPIHadoopFile`. I'm not even sure that in hadoop you would use
SequenceFileInput format to read an avro sequence file



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-when-reading-Avro-Sequence-files-tp10201p10203.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: NullPointerException When Reading Avro Sequence Files

2014-07-18 Thread Sparky
Thanks for responding.  I tried using the newAPIHadoopFile method and got an
IO Exception with the message Not a data file.  

If anyone has an example of this working I'd appreciate your input or
examples.  

What I entered at the repl and what I got back are below:

val myAvroSequenceFile = sc.newAPIHadoopFile(hdfs://my url, 
classOf[AvroKeyInputFormat[GenericRecord]], classOf[AvroKey[GenericRecord]],
classOf[NullWritable])

scala myAvroSequenceFile.first()
14/07/18 17:02:38 INFO FileInputFormat: Total input paths to process : 1
14/07/18 17:02:38 INFO SparkContext: Starting job: first at console:19
14/07/18 17:02:38 INFO DAGScheduler: Got job 0 (first at console:19) with
1 output partitions (allowLocal=true)
14/07/18 17:02:38 INFO DAGScheduler: Final stage: Stage 0(first at
console:19)
14/07/18 17:02:38 INFO DAGScheduler: Parents of final stage: List()
14/07/18 17:02:38 INFO DAGScheduler: Missing parents: List()
14/07/18 17:02:38 INFO DAGScheduler: Computing the requested partition
locally
14/07/18 17:02:38 INFO NewHadoopRDD: Input split: hdfs:my url
14/07/18 17:02:38 WARN AvroKeyInputFormat: Reader schema was not set. Use
AvroJob.setInputKeySchema() if desired.
14/07/18 17:02:38 INFO AvroKeyInputFormat: Using a reader schema equal to
the writer schema.
14/07/18 17:02:38 INFO DAGScheduler: Failed to run first at console:19
org.apache.spark.SparkDriverExecutionException: Execution error
at
org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:585)
at
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:563)
Caused by: java.io.IOException: Not a data file.
at 
org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:105)
at org.apache.avro.file.DataFileReader.init(DataFileReader.java:97)
at
org.apache.avro.mapreduce.AvroRecordReaderBase.createAvroFileReader(AvroRecordReaderBase.java:180)
at
org.apache.avro.mapreduce.AvroRecordReaderBase.initialize(AvroRecordReaderBase.java:90)
at 
org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:114)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:100)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:62)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:261)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
at
org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:578)
... 1 more



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-when-reading-Avro-Sequence-files-tp10201p10204.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Large scale ranked recommendation

2014-07-18 Thread Christopher Johnson
If you are performing recommendations via a latent factor model then I
highly recommend you look into methods of approximate nearest neighbors.
 At Spotify we batch process top N recommendations for 40M users with a
catalog of  40M items, but we avoid the naive O(n*m) process you are
describing by performing an approximate nearest neighbors search.  There
are a bunch of open source packages you can use including our own
https://github.com/spotify/annoy which uses random projections in your
latent factor space to build a forest of trees with constant time nearest
neighbors lookup.


On Fri, Jul 18, 2014 at 1:57 PM, Nick Pentreath nick.pentre...@gmail.com
wrote:

 Agree GPUs may be interesting for this kind of massively parallel linear
 algebra on reasonable size vectors.

 These projects might be of interest in this regard:
 https://github.com/BIDData/BIDMach
 https://github.com/BIDData/BIDMat
 https://github.com/dlwh/gust

 Nick



 On Fri, Jul 18, 2014 at 7:40 PM, m3.sharma sharm...@umn.edu wrote:

 Thanks Nick real-time suggestion is good, will see if we can add that to
 our
 deployment strategy and you are correct we may not need recommendation for
 each user.

 Will try adding more resources and broadcasting item features suggestion
 as
 currently they don't seem to be huge.

 As users and items both will continue to grow in future for faster vector
 computations I think few GPU nodes will suffice to serve faster
 recommendation after learning model with SPARK. It will be great to have
 builtin GPU support in SPARK for faster computations to leverage GPU
 capability of nodes for performing these flops faster.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-ranked-recommendation-tp10098p10183.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.





Re: spark sql left join gives KryoException: Buffer overflow

2014-07-18 Thread Michael Armbrust
Unfortunately, this is a query where we just don't have an efficiently
implementation yet.  You might try switching the table order.

Here is the JIRA for doing something more efficient:
https://issues.apache.org/jira/browse/SPARK-2212


On Fri, Jul 18, 2014 at 7:05 AM, Pei-Lun Lee pl...@appier.com wrote:

 Hi,

 We have a query with left joining and got this error:

 Caused by: org.apache.spark.SparkException: Job aborted due to stage
 failure: Task 1.0:0 failed 4 times, most recent failure: Exception failure
 in TID 5 on host ip-10-33-132-101.us-west-2.compute.internal:
 com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0,
 required: 1

 Looks like spark sql tried to do a broadcast join and collecting one of
 the table to master but it is too large.

 How do we explicitly control the join behavior like this?

 --
 Pei-Lun Lee




Re: Visualization/Summary tools for Spark Streaming data

2014-07-18 Thread bryanv
You might check out Bokeh ( http://bokeh.pydata.org http://bokeh.pydata.org 
) which is a python (and other languages) system for streaming and big data
vis targeting the browser. I just gave a talk at SciPy 2014 where you can
hear more and see examples: https://www.youtube.com/watch?v=B9NpLOyp-dI



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Visualization-Summary-tools-for-Spark-Streaming-data-tp10200p10207.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Memory compute-intensive tasks

2014-07-18 Thread rpandya
I also tried increasing --num-executors to numNodes * coresPerNode and using
coalesce(numNodes*10,true), and it still ran all the tasks on one node. It
seems like it is placing all the executors on one node (though not always
the same node, which indicates it is aware of more than one!). I'm using
spark-submit --master yarn --deploy-mode cluster with spark-1.0.1 built for
hadoop 2.4 on HDP 2.1/Hadoop 2.4.

There's clearly just something wrong with my Hadoop configuration, or in how
I'm submitting my spark job - any suggestions?

Thanks,

Ravi



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p10209.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Broadcasting a set in PySpark

2014-07-18 Thread Josh Rosen
You have to use `myBroadcastVariable.value` to access the broadcasted
value; see
https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables


On Fri, Jul 18, 2014 at 2:56 PM, Vedant Dhandhania 
ved...@retentionscience.com wrote:

 Hi All,

 I am trying to broadcast a set in a PySpark script.

 I create the set like this:

 Uid_male_set = set(maleUsers.map(lambda x:x[1]).collect())


 Then execute this line:


 uid_iid_iscore_tuple_GenderFlag = uid_iid_iscore.map(lambda
 x:(x[0],zip(x[1],x[2]),x[0] in Uid_male_set))


  An error occurred while calling o104.collectPartitions.

 : org.apache.spark.SparkException: Job aborted due to stage failure:
 Serialized task 1131:0 was 23503247 bytes which exceeds
 spark.akka.frameSize (10485760 bytes). Consider using broadcast variables
 for large values.



 So I tried broadcasting it:

 Uid_male_setbc = sc.broadcast(Uid_male_set)


  Uid_male_setbc

 pyspark.broadcast.Broadcast object at 0x1ba2ed0


 Then I execute it line:


 uid_iid_iscore_tuple_GenderFlag = uid_iid_iscore.map(lambda
 x:(x[0],zip(x[1],x[2]),x[0] in Uid_male_setbc))

 ile stdin, line 1, in lambda

 TypeError: argument of type 'Broadcast' is not iterable

  [duplicate 1]


 So I am stuck either ways, the script runs locally well on a smaller
 dataset, but throws me this error. Could any one point out how to correct
 this or where I am going wrong?

 Thanks


 *Vedant Dhandhania*

 *Retention** Science*

 call: 805.574.0873

 visit: Site http://www.retentionscience.com/ | like: Facebook
 http://www.facebook.com/RetentionScience | follow: Twitter
 http://twitter.com/RetentionSci



Re: Large scale ranked recommendation

2014-07-18 Thread m3.sharma
Christopher, that's really a great idea to search in latent factor space
rather than computing each entry of matrix, now the complexity of the
problem has reduced drastically from naive O(n*m). Since our data is not
that huge I will try exact nbrhood search then fallback to approximate if
that don't work. I will look into annoy. Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-ranked-recommendation-tp10098p10212.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Broadcasting a set in PySpark

2014-07-18 Thread Vedant Dhandhania
Hi Josh,
I did make that change, however I get this error now:

568.492: [GC [PSYoungGen: 1412948K-207017K(1465088K)]
4494287K-3471149K(4960384K), 0.1280200 secs] [Times: user=0.23 sys=0.63,
real=0.13 secs]

568.642: [Full GCTraceback (most recent call last):

  File stdin, line 1, in module

  File /home/hadoop/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py, line
708, in count

return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()

  File /home/hadoop/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py, line
699, in sum

return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)

  File /home/hadoop/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py, line
619, in reduce

vals = self.mapPartitions(func).collect()

  File /home/hadoop/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py, line
583, in collect

bytesInJava = self._jrdd.collect().iterator()

  File /home/hadoop/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py, line
94, in __exit__

self._context._jsc.setCallSite(None)

  File
/home/hadoop/spark-1.0.1-bin-hadoop1/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
line 535, in __call__

  File
/home/hadoop/spark-1.0.1-bin-hadoop1/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
line 361, in send_command

  File
/home/hadoop/spark-1.0.1-bin-hadoop1/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
line 317, in _get_connection

  File
/home/hadoop/spark-1.0.1-bin-hadoop1/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
line 324, in _create_connection

  File
/home/hadoop/spark-1.0.1-bin-hadoop1/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
line 431, in start

py4j.protocol.Py4JNetworkError: An error occurred while trying to connect
to the Java server


*Vedant Dhandhania*

*Retention** Science*

call: 805.574.0873

visit: Site http://www.retentionscience.com/ | like: Facebook
http://www.facebook.com/RetentionScience | follow: Twitter
http://twitter.com/RetentionSci


On Fri, Jul 18, 2014 at 3:10 PM, Josh Rosen rosenvi...@gmail.com wrote:

 You have to use `myBroadcastVariable.value` to access the broadcasted
 value; see
 https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables


 On Fri, Jul 18, 2014 at 2:56 PM, Vedant Dhandhania 
 ved...@retentionscience.com wrote:

 Hi All,

 I am trying to broadcast a set in a PySpark script.

 I create the set like this:

 Uid_male_set = set(maleUsers.map(lambda x:x[1]).collect())


 Then execute this line:


 uid_iid_iscore_tuple_GenderFlag = uid_iid_iscore.map(lambda
 x:(x[0],zip(x[1],x[2]),x[0] in Uid_male_set))


  An error occurred while calling o104.collectPartitions.

 : org.apache.spark.SparkException: Job aborted due to stage failure:
 Serialized task 1131:0 was 23503247 bytes which exceeds
 spark.akka.frameSize (10485760 bytes). Consider using broadcast variables
 for large values.



 So I tried broadcasting it:

 Uid_male_setbc = sc.broadcast(Uid_male_set)


  Uid_male_setbc

 pyspark.broadcast.Broadcast object at 0x1ba2ed0


 Then I execute it line:


 uid_iid_iscore_tuple_GenderFlag = uid_iid_iscore.map(lambda
 x:(x[0],zip(x[1],x[2]),x[0] in Uid_male_setbc))

 ile stdin, line 1, in lambda

 TypeError: argument of type 'Broadcast' is not iterable

  [duplicate 1]


 So I am stuck either ways, the script runs locally well on a smaller
 dataset, but throws me this error. Could any one point out how to correct
 this or where I am going wrong?

 Thanks


 *Vedant Dhandhania*

 *Retention** Science*

 call: 805.574.0873

 visit: Site http://www.retentionscience.com/ | like: Facebook
 http://www.facebook.com/RetentionScience | follow: Twitter
 http://twitter.com/RetentionSci





Running Spark/YARN on AWS EMR - Issues finding file on hdfs?

2014-07-18 Thread _soumya_
I'm stumped with this one. I'm using YARN on EMR to distribute my spark job.
While it seems initially, the job is starting up fine - the Spark Executor
nodes are having trouble pulling the jars from the location on hdfs that the
master just put the files on. 

[hadoop@ip-172-16-2-167 ~]$
SPARK_JAR=./spark/lib/spark-assembly-1.0.0-hadoop2.4.0.jar
./spark/bin/spark-class org.apache.spark.deploy.yarn.Client --jar
/mnt/tmp/GenerateAssetContent/rickshaw-spark-0.0.1-SNAPSHOT.jar --class
com.evocalize.rickshaw.spark.applications.GenerateAssetContent --args
yarn-standalone --num-workers 3 --master-memory 2g --worker-memory 2g
--worker-cores 1
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/home/hadoop/.versions/2.4.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/home/hadoop/.versions/spark-1.0.0-bin-hadoop2/lib/spark-assembly-1.0.0-hadoop2.4.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
WARNING: This client is deprecated and will be removed in a future version
of Spark. Use ./bin/spark-submit with --master yarn
--args is deprecated. Use --arg instead.
--num-workers is deprecated. Use --num-executors instead.
--master-memory is deprecated. Use --driver-memory instead.
--worker-memory is deprecated. Use --executor-memory instead.
--worker-cores is deprecated. Use --executor-cores instead.
14/07/18 22:27:50 INFO client.RMProxy: Connecting to ResourceManager at
/172.16.2.167:9022
14/07/18 22:27:51 INFO yarn.Client: Got Cluster metric info from
ApplicationsManager (ASM), number of NodeManagers: 2
14/07/18 22:27:51 INFO yarn.Client: Queue info ... queueName: default,
queueCurrentCapacity: 0.0, queueMaxCapacity: 1.0,
  queueApplicationCount = 0, queueChildQueueCount = 0
14/07/18 22:27:51 INFO yarn.Client: Max mem capabililty of a single resource
in this cluster 3072
14/07/18 22:27:51 INFO yarn.Client: Preparing Local resources
14/07/18 22:27:53 INFO yarn.Client: Uploading
file:/mnt/tmp/GenerateAssetContent/rickshaw-spark-0.0.1-SNAPSHOT.jar to
hdfs://172.16.2.167:9000/user/hadoop/.sparkStaging/application_1405713259773_0014/rickshaw-spark-0.0.1-SNAPSHOT.jar
14/07/18 22:27:57 INFO yarn.Client: Uploading
file:/home/hadoop/spark/lib/spark-assembly-1.0.0-hadoop2.4.0.jar to
hdfs://172.16.2.167:9000/user/hadoop/.sparkStaging/application_1405713259773_0014/spark-assembly-1.0.0-hadoop2.4.0.jar
14/07/18 22:27:59 INFO yarn.Client: Setting up the launch environment
14/07/18 22:27:59 INFO yarn.Client: Setting up container launch context
14/07/18 22:27:59 INFO yarn.Client: Command for starting the Spark
ApplicationMaster: List($JAVA_HOME/bin/java, -server, -Xmx2048m,
-Djava.io.tmpdir=$PWD/tmp, 
-Dlog4j.configuration=log4j-spark-container.properties,
org.apache.spark.deploy.yarn.ApplicationMaster, --class,
com.evocalize.rickshaw.spark.applications.GenerateAssetContent, --jar ,
/mnt/tmp/GenerateAssetContent/rickshaw-spark-0.0.1-SNAPSHOT.jar,  --args 
'yarn-standalone' , --executor-memory, 2048, --executor-cores, 1,
--num-executors , 3, 1, LOG_DIR/stdout, 2, LOG_DIR/stderr)
14/07/18 22:27:59 INFO yarn.Client: Submitting application to ASM
14/07/18 22:27:59 INFO impl.YarnClientImpl: Submitted application
application_1405713259773_0014
...
14/07/18 22:28:23 INFO yarn.Client: Application report from ASM:
 application identifier: application_1405713259773_0014
 appId: 14
 clientToAMToken: null
 appDiagnostics: Application application_1405713259773_0014 failed 2 
times
due to AM Container for appattempt_1405713259773_0014_02 exited with 
exitCode: -1000 due to: File does not exist:
hdfs://172.16.2.167:9000/user/hadoop/.sparkStaging/application_1405713259773_0014/spark-assembly-1.0.0-hadoop2.4.0.jar
.Failing this attempt.. Failing the application.
 appMasterHost: N/A
 appQueue: default
 appMasterRpcPort: -1
 appStartTime: 1405722479547
 yarnAppState: FAILED
 distributedFinalState: FAILED
 appTrackingUrl:
ip-172-16-2-167.us-west-1.compute.internal:9026/cluster/app/application_1405713259773_0014
 appUser: hadoop

-

I tried to ls the file on the location - it doesn't exist either - although
Spark could have cleaned that up before exiting. I verified that on EMR, all
ports are open between each other so this can't be a port issue. What am I
missing? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-YARN-on-AWS-EMR-Issues-finding-file-on-hdfs-tp10214.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: Hive From Spark

2014-07-18 Thread JiajiaJing
Hi Cheng Hao,

Thank you very much for your reply.

Basically, the program runs on Spark 1.0.0 and Hive 0.12.0 .

Some setups of the environment are done by running SPARK_HIVE=true sbt/sbt
assembly/assembly, including the jar in all the workers, and copying the
hive-site.xml to spark's conf dir. 

And then run the program as:   ./bin/run-example
org.apache.spark.examples.sql.hive.HiveFromSpark  

It's good to know that this example runs well on your machine, could you
please give me some insight about your have done as well?

Thank you very much!

Jiajia







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Hive-From-Spark-tp10110p10215.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: jar changed on src filesystem

2014-07-18 Thread cmti95035
Andrew,

Yes, this works after cleaning up the .staging as you suggested. Thanks a
lot!

Jian



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/jar-changed-on-src-filesystem-tp10011p10216.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


BUG in spark-ec2 script (--ebs-vol-size) and workaround...

2014-07-18 Thread Ben Horner
Hello all,

There is a bug in the spark-ec2 script (perhaps due to a change in the
Amazon AMI).

The --ebs-vol-size option directs the spark-ec2 script to add an EBS volume
of the specified size, and mount it at /vol for a persistent HDFS.  To do
this, it uses mkfs.xfs which is not available (though mkfs is).

To work around this, I was able to run yum install xfsprogs on the master
and each slave, and then use the --resume option with the script, and the
persistent HDFS actually worked!

This has been a frustrating experience, but I've used the spark-ec2 script
for several months now, and it's incredibly helpful.  I hope this post helps
towards fixing the problem!

Thanks,
-Ben

P.S.  This is the full initial command I used, in case this is isolated to
particular instance types or anything:
ec2/spark-ec2 -k ... -i ... -z us-east-1d -s 4 -t m3.2xlarge
--ebs-vol-size=250 -m r3.2xlarge launch ...

P.P.S.  Ganglia is still broken, and has been for a while...



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/BUG-in-spark-ec2-script-ebs-vol-size-and-workaround-tp10217.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming with long batch / window duration

2014-07-18 Thread Tathagata Das
If you want to process data that spans across weeks, then it best to use a
dedicated data store (file system, sql / nosql database, etc.) that is
designed for long term data storage and retrieval. Spark Streaming is not
designed as a long term data store. Also it does not seem like you need low
latency. So it might be better to use a combination of Spark Streaming and
Spark programs - Spark Streaming to receive data and store it some long
term data store, and Spark to periodically (every hour, day?) pull the data
from the store and process them. You can implement the invertible function
yourself in Spark by storing the previous reduced values in the same data
store every time the spark program is run, and then using that data the
next time.

The great thing is that both these program can share all the map, and
reduce functions.

TD


On Fri, Jul 18, 2014 at 12:09 PM, aaronjosephs aa...@placeiq.com wrote:

 Would it be a reasonable use case of spark streaming to have a very large
 window size (lets say on the scale of weeks). In this particular case the
 reduce function would be invertible so that would aid in efficiency. I
 assume that having a larger batch size since the window is so large would
 also lighten the workload for spark. The sliding duration is not too
 important, I just want to know if this is reasonable for spark to handle
 with any slide duration



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-long-batch-window-duration-tp10191.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spark Streaming with long batch / window duration

2014-07-18 Thread aaronjosephs
Unfortunately for reasons I won't go into my options for what I can use are
limited, it was more of a curiosity to see if spark could handle a use case
like this since the functionality I wanted fit perfectly into the
reduceByKeyAndWindow frame of thinking. Anyway thanks for answering.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-long-batch-window-duration-tp10191p10219.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Java null pointer exception while saving hadoop file

2014-07-18 Thread durga
Hi I am getting null pointer exception while saving the data into hadoop.

code as follows.
If I change the last line to 
sorted_tup.take(count.toInt).foreach { case ((a, b, c), l) =
sc.parallelize(l.toSeq).coalesce(1).saveAsTextFile(hdfsDir + a + / + b +
/ + c)} . I am able to save it , But for larger files I am getting heap
space error . I am thinking it is due to take . Can some please help me
with this.

Thanks,
Durga 

import org.apache.spark.SparkContext._
val conf = new SparkConf()
  .setMaster(master)
  .setAppName(appName)
  .set(spark.cores.max, numCores)
 
.setJars(Seq(/home/hadoopuser/testing/Rest_1/FileSplitter/target/scala-2.10/filesplitter_2.10-1.0.jar)).set(spark.executor.memory,
5g)
val sc = new SparkContext(conf)
val action_results = sc.textFile(inputData)

import scala.util.parsing.json.JSON

val actions = action_results.map(l = JSON.parseFull(l).get).cache()

val tuples = actions.map { l =
  var m = l.asInstanceOf[Map[Any, Any]];
  ((m(deviceId).asInstanceOf[Map[Any,
Any]]($numberLong).asInstanceOf[String],
m(actionName).asInstanceOf[String],
m(timestamp).asInstanceOf[Map[Any,
Any]]($date).asInstanceOf[String].substring(0, 10)), l)
}

val tup_grp = tuples.groupByKey

val tup_counts = tup_grp.map { case ((d: String, a: String, t: String),
g) = ((d, a, t), g.toArray)}

val sorted_tup = tup_counts.sortByKey(true)
//val count = sorted_tup.count
//println(Sorted Tuples:  + sorted_tup.count)

sorted_tup.foreach{case ((a, b, c), l:Array[Any]) =
val lines = sc.parallelize(l.toSeq)
lines.coalesce(2,true).saveAsTextFile(hdfsDir + a + / + b + / +
c)
}





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Java-null-pointer-exception-while-saving-hadoop-file-tp10220.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: BUG in spark-ec2 script (--ebs-vol-size) and workaround...

2014-07-18 Thread Shivaram Venkataraman
Thanks a lot for reporting this. I think we just missed installing xfsprogs
on the AMI. I have a fix for this at
https://github.com/mesos/spark-ec2/pull/59.

After the pull request is merged, any new clusters launched should have
mkfs.xfs

Thanks
Shivaram


On Fri, Jul 18, 2014 at 4:56 PM, Ben Horner ben.hor...@atigeo.com wrote:

 Hello all,

 There is a bug in the spark-ec2 script (perhaps due to a change in the
 Amazon AMI).

 The --ebs-vol-size option directs the spark-ec2 script to add an EBS volume
 of the specified size, and mount it at /vol for a persistent HDFS.  To do
 this, it uses mkfs.xfs which is not available (though mkfs is).

 To work around this, I was able to run yum install xfsprogs on the master
 and each slave, and then use the --resume option with the script, and the
 persistent HDFS actually worked!

 This has been a frustrating experience, but I've used the spark-ec2 script
 for several months now, and it's incredibly helpful.  I hope this post
 helps
 towards fixing the problem!

 Thanks,
 -Ben

 P.S.  This is the full initial command I used, in case this is isolated to
 particular instance types or anything:
 ec2/spark-ec2 -k ... -i ... -z us-east-1d -s 4 -t m3.2xlarge
 --ebs-vol-size=250 -m r3.2xlarge launch ...

 P.P.S.  Ganglia is still broken, and has been for a while...



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/BUG-in-spark-ec2-script-ebs-vol-size-and-workaround-tp10217.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: unserializable object in Spark Streaming context

2014-07-18 Thread Gino Bustelo
I get TD's recommendation of sharing a connection among tasks. Now, is there a 
good way to determine when to close connections? 

Gino B.

 On Jul 17, 2014, at 7:05 PM, Yan Fang yanfang...@gmail.com wrote:
 
 Hi Sean,
 
 Thank you. I see your point. What I was thinking is that, do computation in a 
 distributed fashion and do the storing from a single place. But you are 
 right, having multiple DB connections actually is fine.
 
 Thanks for answering my questions. That helps me understand the system.
 
 Cheers,
 
 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108
 
 
 On Thu, Jul 17, 2014 at 2:53 PM, Sean Owen so...@cloudera.com wrote:
 On Thu, Jul 17, 2014 at 10:39 PM, Yan Fang yanfang...@gmail.com wrote:
  Thank you for the help. If I use TD's approache, it works and there is no
  exception. Only drawback is that it will create many connections to the DB,
  which I was trying to avoid.
 
 Connection-like objects aren't data that can be serialized. What would
 it mean to share one connection with N workers? that they all connect
 back to the driver, and through one DB connection there? this defeats
 the purpose of distributed computing. You want multiple DB
 connections. You can limit the number of partitions if needed.
 
 
  Here is a snapshot of my code. Mark as red for the important code. What I
  was thinking is that, if I call the collect() method, Spark Streaming will
  bring the data to the driver and then the db object does not need to be 
  sent
 
 The Function you pass to foreachRDD() has a reference to db though.
 That's what is making it be serialized.
 
  to executors. My observation is that, thought exceptions are thrown, the
  insert function still works. Any thought about that? Also paste the log in
  case it helps .http://pastebin.com/T1bYvLWB
 
 Any executors that run locally might skip the serialization and
 succeed (?) but I don't think the remote executors can be succeeding.
 


SparkSQL operator priority

2014-07-18 Thread Christos Kozanitis
Hello

What is the order with which SparkSQL deserializes parquet fields? Is it
possible to modify it?

I am using SparkSQL to query a parquet file that consists of a lot of
fields (around 30 or so). Let me call an example table MyTable and let's
suppose the name of one of its fields is position.

The query that I am executing is:
sql(select * from MyTable where position = 243189160)

The query plan that I get from this query is:
Filter (position#6L:6 = 243189160)
 ParquetTableScan
[contig.contigName#0,contig.contigLength#1L,contig.contigMD5#2,contig.referenceURL#3,contig.assembly#4,contig.species#5,position#6L,rangeOffset#7,rangeLength#8,referenceBase#9,readBase#10,sangerQuality#11,mapQuality#12,numSoftClipped#13,numReverseStrand#14,countAtPosition#15,readName#16,readStart#17L,readEnd#18L,recordGroupSequencingCenter#19,recordGroupDescription#20,recordGroupRunDateEpoch#21L,recordGroupFlowOrder#22,recordGroupKeySequence#23,recordGroupLibrary#24,recordGroupPredictedMedianInsertSize#25,recordGroupPlatform#26,recordGroupPlatformUnit#27,recordGroupSample#28],
(ParquetRelation hdfs://
ec2-54-89-87-167.compute-1.amazonaws.com:9000/genomes/hg00096.plup), None

I expect 14 entries in the output but the execution of
.collect.foreach(println) takes forever to run on my cluster (more than an
hour).

Is it safe to assume in my example that SparkSQL deserializes all fields
first before applying the filter? If so, can a user change this behavior?

To support my assumption I replaced * with position, so my new query is
of the form sql(select position from MyTable where position = 243189160)
and this query runs much faster on the same hardware (2-3 minutes vs 65
min).

Any ideas?

thanks
Christos


Re: unserializable object in Spark Streaming context

2014-07-18 Thread Tathagata Das
Thats, a good question. My first reach is timeout. Timing out after 10s of
seconds should be sufficient. So there should be a timer in the singleton
that runs a check every second, on when the singleton was last used, and
closes the connections after a time out. Any attempts to use the connection
again will create a new connection.

TD


On Fri, Jul 18, 2014 at 7:59 PM, Gino Bustelo lbust...@gmail.com wrote:

 I get TD's recommendation of sharing a connection among tasks. Now, is
 there a good way to determine when to close connections?

 Gino B.

 On Jul 17, 2014, at 7:05 PM, Yan Fang yanfang...@gmail.com wrote:

 Hi Sean,

 Thank you. I see your point. What I was thinking is that, do computation
 in a distributed fashion and do the storing from a single place. But you
 are right, having multiple DB connections actually is fine.

 Thanks for answering my questions. That helps me understand the system.

 Cheers,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Thu, Jul 17, 2014 at 2:53 PM, Sean Owen so...@cloudera.com wrote:

 On Thu, Jul 17, 2014 at 10:39 PM, Yan Fang yanfang...@gmail.com wrote:
  Thank you for the help. If I use TD's approache, it works and there is
 no
  exception. Only drawback is that it will create many connections to the
 DB,
  which I was trying to avoid.

 Connection-like objects aren't data that can be serialized. What would
 it mean to share one connection with N workers? that they all connect
 back to the driver, and through one DB connection there? this defeats
 the purpose of distributed computing. You want multiple DB
 connections. You can limit the number of partitions if needed.


  Here is a snapshot of my code. Mark as red for the important code. What
 I
  was thinking is that, if I call the collect() method, Spark Streaming
 will
  bring the data to the driver and then the db object does not need to be
 sent

 The Function you pass to foreachRDD() has a reference to db though.
 That's what is making it be serialized.

  to executors. My observation is that, thought exceptions are thrown, the
  insert function still works. Any thought about that? Also paste the log
 in
  case it helps .http://pastebin.com/T1bYvLWB

 Any executors that run locally might skip the serialization and
 succeed (?) but I don't think the remote executors can be succeeding.





Re: Graphx : Perfomance comparison over cluster

2014-07-18 Thread Ankur Dave
Thanks for your interest. I should point out that the numbers in the arXiv
paper are from GraphX running on top of a custom version of Spark with an
experimental in-memory shuffle prototype. As a result, if you benchmark
GraphX at the current master, it's expected that it will be 2-3x slower
than GraphLab.

The version with in-memory shuffle is here:
https://github.com/amplab/graphx2/commits/vldb. Unfortunately Spark has
changed a lot since then, and the way to configure and invoke Spark is
different. I can send you the correct configuration/invocation for this if
you're interested in benchmarking it.

On Fri, Jul 18, 2014 at 7:14 PM, ShreyanshB shreyanshpbh...@gmail.com
 wrote:

 Should I use the pagerank application already available in graphx for this 
 purpose
 or need to modify or need to write my own?


You should use the built-in PageRank. If your graph is available in edge
list format, you can run it using the Analytics driver as follows:

~/spark/bin/spark-submit --master spark://$MASTER_URL:7077 --class
org.apache.spark.graphx.lib.Analytics
~/spark/assembly/target/scala-2.10/spark-assembly-1.1.0-SNAPSHOT-hadoop1.0.4.jar
pagerank $EDGE_FILE --numEPart=$NUM_PARTITIONS --numIter=$NUM_ITERATIONS
[--partStrategy=$PARTITION_STRATEGY]

What should be the executor_memory, i.e. maximum or according to graph
 size?


As much memory as possible while leaving room for the operating system.

Is there any other configuration I should do to have the best performance?


I think the parameters to Analytics above should be sufficient:

- numEPart - should be equal to or a small integer multiple of the number
of cores. More partitions improve work balance but also increase memory
usage and communication, so in some cases it can even be faster with fewer
partitions than cores.
- partStrategy - If your edges are already sorted, you can skip this
option, because GraphX will leave them as-is by default and that may be
close to optimal. Otherwise, EdgePartition2D and RandomVertexCut are both
worth trying.

CC'ing Joey and Dan, who may have other suggestions.

Ankur http://www.ankurdave.com/


On Fri, Jul 18, 2014 at 7:14 PM, ShreyanshB shreyanshpbh...@gmail.com
wrote:

 Hi,

 I am trying to compare Graphx and other distributed graph processing
 systems
 (graphlab) on my cluster of 64 nodes, each node having 32 cores and
 connected with infinite band.

 I looked at http://arxiv.org/pdf/1402.2394.pdf and stats provided over
 there. I had few questions regarding configuration and achieving best
 performance.

 * Should I use the pagerank application already available in graphx for
 this
 purpose or need to modify or need to write my own?
- If I shouldn't use the inbuilt pagerank, can you share your pagerank
 application?

 * What should be the executor_memory, i.e. maximum or according to graph
 size?

 * Other than, number of cores, executor_memory and partition strategy, Is
 there any other configuration I should do to have the best performance?

 I am using following script,
 import org.apache.spark._
 import org.apache.spark.graphx._
 import org.apache.spark.rdd.RDD

 val startgraphloading = System.currentTimeMillis;
 val graph = GraphLoader.edgeListFile(sc, filepath,true,32)
 val endgraphloading = System.currentTimeMillis;


 Thanks in advance :)



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Graphx-Perfomance-comparison-over-cluster-tp10222.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: unserializable object in Spark Streaming context

2014-07-18 Thread Tathagata Das
Actually, let me clarify further. There are number of possibilities.

1. The easier, less efficient way is to create a connection object every
time you do foreachPartition (as shown in the pseudocode earlier in the
thread). For each partition, you create a connection, use it to push a all
the records in the partition, and then close it. You dont even need to
create a singleton in that case. The cons of this method is
   - You are not reusing connection across tasks and jobs. So you will be
creating a lot of connections to the database, which may or may not be fine.
   - It will get worse if you partitions are tiny and pushing each
partition takes few 100ms or few seconds (as possible with Spark Streaming).

2. The slightly harder, but more efficient way would be to use singletons,
which can contain one connection, or maintain a connection pool. Then
connections in the pool are created on demand, but not explicitly closed at
the end of the task, and are reused across tasks and jobs. In that case,
closing the connection would require some kind of timeout mechanism as I
explained in the previous post. Care also need to be taken if these
connections are threadsafe or not.

Hope this helps!

TD








On Fri, Jul 18, 2014 at 8:14 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Thats, a good question. My first reach is timeout. Timing out after 10s of
 seconds should be sufficient. So there should be a timer in the singleton
 that runs a check every second, on when the singleton was last used, and
 closes the connections after a time out. Any attempts to use the connection
 again will create a new connection.

 TD


 On Fri, Jul 18, 2014 at 7:59 PM, Gino Bustelo lbust...@gmail.com wrote:

 I get TD's recommendation of sharing a connection among tasks. Now, is
 there a good way to determine when to close connections?

 Gino B.

 On Jul 17, 2014, at 7:05 PM, Yan Fang yanfang...@gmail.com wrote:

 Hi Sean,

 Thank you. I see your point. What I was thinking is that, do computation
 in a distributed fashion and do the storing from a single place. But you
 are right, having multiple DB connections actually is fine.

 Thanks for answering my questions. That helps me understand the system.

 Cheers,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Thu, Jul 17, 2014 at 2:53 PM, Sean Owen so...@cloudera.com wrote:

 On Thu, Jul 17, 2014 at 10:39 PM, Yan Fang yanfang...@gmail.com wrote:
  Thank you for the help. If I use TD's approache, it works and there is
 no
  exception. Only drawback is that it will create many connections to
 the DB,
  which I was trying to avoid.

 Connection-like objects aren't data that can be serialized. What would
 it mean to share one connection with N workers? that they all connect
 back to the driver, and through one DB connection there? this defeats
 the purpose of distributed computing. You want multiple DB
 connections. You can limit the number of partitions if needed.


  Here is a snapshot of my code. Mark as red for the important code.
 What I
  was thinking is that, if I call the collect() method, Spark Streaming
 will
  bring the data to the driver and then the db object does not need to
 be sent

 The Function you pass to foreachRDD() has a reference to db though.
 That's what is making it be serialized.

  to executors. My observation is that, thought exceptions are thrown,
 the
  insert function still works. Any thought about that? Also paste the
 log in
  case it helps .http://pastebin.com/T1bYvLWB

 Any executors that run locally might skip the serialization and
 succeed (?) but I don't think the remote executors can be succeeding.






Re: Graphx : Perfomance comparison over cluster

2014-07-18 Thread ShreyanshB
Thanks a lot Ankur.

The version with in-memory shuffle is here:
https://github.com/amplab/graphx2/commits/vldb. Unfortunately Spark has
changed a lot since then, and the way to configure and invoke Spark is
different. I can send you the correct configuration/invocation for this if
you're interested in benchmarking it.

Actually I wanted to see how graphlab and graphx performs for the cluster
we have (32 cores per node and infinite band). I tried the live journal
graph with partitions = 400 (16 nodes and each node with 32 cores). but it
performed better with partition=64. I'll try it again. Does the suggested
version with in-memory shuffle affects performance too much? (according to
previously reported numbers, graphx did 10 iterations in 142 seconds and in
latest stats it does it in 68 seconds). Is it just the in-memory version
which is changed?





On Fri, Jul 18, 2014 at 8:31 PM, ankurdave [via Apache Spark User List] 
ml-node+s1001560n10227...@n3.nabble.com wrote:

 Thanks for your interest. I should point out that the numbers in the arXiv
 paper are from GraphX running on top of a custom version of Spark with an
 experimental in-memory shuffle prototype. As a result, if you benchmark
 GraphX at the current master, it's expected that it will be 2-3x slower
 than GraphLab.

 The version with in-memory shuffle is here:
 https://github.com/amplab/graphx2/commits/vldb. Unfortunately Spark has
 changed a lot since then, and the way to configure and invoke Spark is
 different. I can send you the correct configuration/invocation for this if
 you're interested in benchmarking it.

 On Fri, Jul 18, 2014 at 7:14 PM, ShreyanshB [hidden email]
 http://user/SendEmail.jtp?type=nodenode=10227i=0 wrote:

 Should I use the pagerank application already available in graphx for
 this purpose or need to modify or need to write my own?


 You should use the built-in PageRank. If your graph is available in edge
 list format, you can run it using the Analytics driver as follows:

 ~/spark/bin/spark-submit --master spark://$MASTER_URL:7077 --class
 org.apache.spark.graphx.lib.Analytics
 ~/spark/assembly/target/scala-2.10/spark-assembly-1.1.0-SNAPSHOT-hadoop1.0.4.jar
 pagerank $EDGE_FILE --numEPart=$NUM_PARTITIONS --numIter=$NUM_ITERATIONS
 [--partStrategy=$PARTITION_STRATEGY]

 What should be the executor_memory, i.e. maximum or according to graph
 size?


 As much memory as possible while leaving room for the operating system.

 Is there any other configuration I should do to have the best performance?


 I think the parameters to Analytics above should be sufficient:

 - numEPart - should be equal to or a small integer multiple of the number
 of cores. More partitions improve work balance but also increase memory
 usage and communication, so in some cases it can even be faster with fewer
 partitions than cores.
 - partStrategy - If your edges are already sorted, you can skip this
 option, because GraphX will leave them as-is by default and that may be
 close to optimal. Otherwise, EdgePartition2D and RandomVertexCut are both
 worth trying.

 CC'ing Joey and Dan, who may have other suggestions.

 Ankur http://www.ankurdave.com/


 On Fri, Jul 18, 2014 at 7:14 PM, ShreyanshB [hidden email]
 http://user/SendEmail.jtp?type=nodenode=10227i=1 wrote:

 Hi,

 I am trying to compare Graphx and other distributed graph processing
 systems
 (graphlab) on my cluster of 64 nodes, each node having 32 cores and
 connected with infinite band.

 I looked at http://arxiv.org/pdf/1402.2394.pdf and stats provided over
 there. I had few questions regarding configuration and achieving best
 performance.

 * Should I use the pagerank application already available in graphx for
 this
 purpose or need to modify or need to write my own?
- If I shouldn't use the inbuilt pagerank, can you share your pagerank
 application?

 * What should be the executor_memory, i.e. maximum or according to graph
 size?

 * Other than, number of cores, executor_memory and partition strategy, Is
 there any other configuration I should do to have the best performance?

 I am using following script,
 import org.apache.spark._
 import org.apache.spark.graphx._
 import org.apache.spark.rdd.RDD

 val startgraphloading = System.currentTimeMillis;
 val graph = GraphLoader.edgeListFile(sc, filepath,true,32)
 val endgraphloading = System.currentTimeMillis;


 Thanks in advance :)



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Graphx-Perfomance-comparison-over-cluster-tp10222.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Graphx-Perfomance-comparison-over-cluster-tp10222p10227.html
  To unsubscribe from Graphx : Perfomance comparison over cluster, click
 here
 

SeattleSparkMeetup: Spark at eBay - Troubleshooting the everyday issues

2014-07-18 Thread Denny Lee
We're coming off a great Seattle Spark Meetup session with Evan Chan 
(@evanfchan) Interactive OLAP Queries with @ApacheSpark and #Cassandra 
(http://www.slideshare.net/EvanChan2/2014-07olapcassspark) at Whitepages. 

Now, we're proud to announce that our next session is Spark at eBay - 
Troubleshooting the Everyday Issues (http://meetu.ps/2dcsXs) at the Expedia 
Building on August 8th. 

Come join us!