Re: CDH 5.0 and Spark 0.9.0

2014-05-01 Thread Sean Owen
This codec does require native libraries to be installed, IIRC, but
they are installed with CDH 5.

The error you show does not look related though. Are you sure your HA
setup is working and that you have configured it correctly in whatever
config spark is seeing?
--
Sean Owen | Director, Data Science | London


On Thu, May 1, 2014 at 12:44 AM, Paul Schooss paulmscho...@gmail.com wrote:
 Hello,

 So I was unable to run the following commands from the spark shell with CDH
 5.0 and spark 0.9.0, see below.

 Once I removed the property

 property
 nameio.compression.codec.lzo.class/name
 valuecom.hadoop.compression.lzo.LzoCodec/value
 finaltrue/final
 /property

 from the core-site.xml on the node, the spark commands worked. Is there a
 specific setup I am missing?

 scala var log = sc.textFile(hdfs://jobs-ab-hnn1//input/core-site.xml)
 14/04/30 22:43:16 INFO MemoryStore: ensureFreeSpace(78800) called with
 curMem=150115, maxMem=308713881
 14/04/30 22:43:16 INFO MemoryStore: Block broadcast_1 stored as values to
 memory (estimated size 77.0 KB, free 294.2 MB)
 14/04/30 22:43:16 WARN Configuration: mapred-default.xml:an attempt to
 override final parameter: mapreduce.tasktracker.cache.local.size; Ignoring.
 14/04/30 22:43:16 WARN Configuration: yarn-site.xml:an attempt to override
 final parameter: mapreduce.output.fileoutputformat.compress.type; Ignoring.
 14/04/30 22:43:16 WARN Configuration: hdfs-site.xml:an attempt to override
 final parameter: mapreduce.map.output.compress.codec; Ignoring.
 log: org.apache.spark.rdd.RDD[String] = MappedRDD[3] at textFile at
 console:12

 scala log.count()
 14/04/30 22:43:03 WARN JobConf: The variable mapred.child.ulimit is no
 longer used.
 14/04/30 22:43:04 WARN Configuration: mapred-default.xml:an attempt to
 override final parameter: mapreduce.tasktracker.cache.local.size; Ignoring.
 14/04/30 22:43:04 WARN Configuration: yarn-site.xml:an attempt to override
 final parameter: mapreduce.output.fileoutputformat.compress.type; Ignoring.
 14/04/30 22:43:04 WARN Configuration: hdfs-site.xml:an attempt to override
 final parameter: mapreduce.map.output.compress.codec; Ignoring.
 java.lang.IllegalArgumentException: java.net.UnknownHostException:
 jobs-a-hnn1
 at
 org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:377)
 at
 org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:237)
 at
 org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:141)
 at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:576)
 at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:521)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:146)
 at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)
 at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
 at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
 at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
 at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
 at
 org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:221)
 at
 org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270)
 at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
 at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:902)
 at org.apache.spark.rdd.RDD.count(RDD.scala:720)
 at $iwC$$iwC$$iwC$$iwC.init(console:15)
 at $iwC$$iwC$$iwC.init(console:20)
 at $iwC$$iwC.init(console:22)
 at $iwC.init(console:24)
 at init(console:26)
 at .init(console:30)
 at .clinit(console)
 at .init(console:7)
 at .clinit(console)
 at $print(console)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:772)
 at
 org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1040)
 at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:609)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:640)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:604)
 at 

Re: Multiple Streams with Spark Streaming

2014-05-01 Thread Mayur Rustagi
File as a stream?
I think you are confusing Spark Streaming with buffer reader. Spark
streaming is meant to process batches of data (files, packets, messages) as
they come in, infact utilizing time of packet reception as a way to create
windows etc.

In your case you are better off reading the file, partitioning it 
operating on each column individually if that makes more sense to you.



Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Thu, May 1, 2014 at 3:24 PM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:

 Hi all,

 Is it possible to read and process multiple streams with spark. I have
 eeg(brain waves) csv file with 23 columns  Each column is one stream(wave)
 and each column has one million values.

 I know one way to do it is to take transpose of the file and then give it
 to spark and each mapper will get one or more waves out of the 23 waves,
 but then it will be non-streaming problem and I want to read the file as
 stream. Please correct me if I am wrong.

 I have to apply simple operations(mean and SD) on each window of a wave.

 Regards,
 Laeeq




Re: Broadcst RDD Lookup

2014-05-01 Thread Mayur Rustagi
Mostly none of the items in PairRDD match your input. Hence the error.


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Thu, May 1, 2014 at 2:06 PM, vivek.ys vivek...@gmail.com wrote:

 Hi All,
 I am facing an issue while performing the lookup. Please guide me on
 where the mistake is.

 val userCluster = sc.textFile(/vives/cluster2/day/users).map(_ match {
 case line : String = (line.split(',')(1).split(')')(0).trim.toInt,
 line.split(',')(0).split('(')(1).toInt)
   })

 val userClusterBroadCast = sc.broadcast(userCluster)

 val productCluster = sc.textFile(/vives/cluster2/day/sites).map(_ match {
 case line : String = (line.split(',')(1).split(')')(0).trim.toInt,
 line.split(',')(0).split('(')(1).toInt)
   })

 val productClusterBroadCast = sc.broadcast(productCluster)

   val nCut = data.map(_.split('\t') match {
 case Array(user, item, rate) =
 ((userClusterBroadCast.value.lookup(user.toInt)(0),
   productClusterBroadCast.value.lookup(item.toInt)(0)), (user.toInt,
 item.toInt, rate.toDouble))
   }).persist(StorageLevel.MEMORY_AND_DISK)

 When I try this I get scala match error null.

 scala.MatchError: null
 at
 org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:550)
 at

 $line27.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:34)
 at

 $line27.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:33)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:75)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
 at org.apache.spark.scheduler.Task.run(Task.scala:53)
 at

 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
 at
 org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Broadcst-RDD-Lookup-tp5142.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: How to handle this situation: Huge File Shared by All maps and Each Computer Has one copy?

2014-05-01 Thread Mayur Rustagi
Broadcast variable is meant to be shared across each node  not map tasks.
The process you are using should work, however having 6GB of broadcast
variable could be an issue. Does the broadcast variable finally move or
always stays stuck?

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Thu, May 1, 2014 at 10:07 AM, PengWeiPRC peng.wei@gmx.com wrote:

 Hi there,

 I was wondering if somebody could give me some suggestions about how to
 handle this situation:

   I have a spark program, in which it reads a 6GB file first (Not RDD)
 locally, and then do the map/reduce tasks. This 6GB file contains
 information that will be shared by all the map tasks. Previously, I handled
 it using the broadcast function in Spark, which is like this:
 global_file = fileRead(filename)
 global_file.broadcast()
 rdd.map(ele = MapFunc(ele))

   However, when running the spark program with a cluster of multiple
 computers, I found that the remote nodes waited forever for the
 broadcasting
 of the global_file. I think that it may not be a good solution to have each
 map task to load the global file by themselves, which would incur huge
 overhead.

   Actually, we have this global file in each node of our cluster. The ideal
 behavior I hope is that for each node, they can read this global file only
 from its local disk (and stay in memory), and then for all the map/reduce
 tasks scheduled to this node, it can share that data. Hence, the global
 file
 is neither like broadcasting variables, which is shared by all map/reduce
 tasks, nor private variables only seen by one map task. It is shared
 node-widely, which is read in each node only one time and shared by all the
 tasks mapped to this node.

 Could anybody tell me how to program in Spark to handle it? Thanks so much.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-handle-this-situation-Huge-File-Shared-by-All-maps-and-Each-Computer-Has-one-copy-tp5139.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Broadcst RDD Lookup

2014-05-01 Thread Vivek YS
No I am sure the items match.  Because userCluster  productCluster are
prepared from data . Cross product of userCluster  productCluster is a
super set of data.



On Thu, May 1, 2014 at 3:41 PM, Mayur Rustagi mayur.rust...@gmail.comwrote:

 Mostly none of the items in PairRDD match your input. Hence the error.


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Thu, May 1, 2014 at 2:06 PM, vivek.ys vivek...@gmail.com wrote:

 Hi All,
 I am facing an issue while performing the lookup. Please guide me on
 where the mistake is.

 val userCluster = sc.textFile(/vives/cluster2/day/users).map(_ match {
 case line : String = (line.split(',')(1).split(')')(0).trim.toInt,
 line.split(',')(0).split('(')(1).toInt)
   })

 val userClusterBroadCast = sc.broadcast(userCluster)

 val productCluster = sc.textFile(/vives/cluster2/day/sites).map(_ match
 {
 case line : String = (line.split(',')(1).split(')')(0).trim.toInt,
 line.split(',')(0).split('(')(1).toInt)
   })

 val productClusterBroadCast = sc.broadcast(productCluster)

   val nCut = data.map(_.split('\t') match {
 case Array(user, item, rate) =
 ((userClusterBroadCast.value.lookup(user.toInt)(0),
   productClusterBroadCast.value.lookup(item.toInt)(0)), (user.toInt,
 item.toInt, rate.toDouble))
   }).persist(StorageLevel.MEMORY_AND_DISK)

 When I try this I get scala match error null.

 scala.MatchError: null
 at
 org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:550)
 at

 $line27.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:34)
 at

 $line27.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:33)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:75)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at

 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
 at

 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
 at org.apache.spark.scheduler.Task.run(Task.scala:53)
 at

 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
 at

 org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Broadcst-RDD-Lookup-tp5142.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.





Re: update of RDDs

2014-05-01 Thread Mayur Rustagi
RDD are immutable so cannot be updated. You can create new RDD containing
updated entries(often not what you want to do).



Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Thu, May 1, 2014 at 4:42 AM, narayanabhatla NarasimhaMurthy 
nn.mur...@cmcltd.com wrote:

 In our application, we need distributed RDDs containing key-value maps. We
 have operations that update RDDs by way of adding entries to the map,
 delete
 entries from the map as well as update value part of maps.
 We also have map reduce functions that operate on the RDDs.The questions
 are
 the following.
 1. Can RDDs be updated? if Yes, what rae the methods?
 2. If we update RDDs, will it happen in place or does it create new RDDs
 with almost double the original RDD size (original+newly created RDD)?
 Thank you very much.
 N.N.Murthy



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/update-of-RDDs-tp5132.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



RE: update of RDDs

2014-05-01 Thread NN Murthy
Thanks a lot for very prompt response. Then next questions are the following.

1.   Can we conclude that Spark is NOT the solution for our requirement? Or

2.   Is there a design approach to meet such requirements using Spark? 

 

From: Mayur Rustagi [mailto:mayur.rust...@gmail.com] 
Sent: 01 May 2014 18:22
To: user@spark.apache.org
Cc: u...@spark.incubator.apache.org
Subject: Re: update of RDDs

 

RDD are immutable so cannot be updated. You can create new RDD containing 
updated entries(often not what you want to do). 

 

 




Mayur Rustagi
Ph: +1 (760) 203 3257

http://www.sigmoidanalytics.com

@mayur_rustagi https://twitter.com/mayur_rustagi 

 

 

On Thu, May 1, 2014 at 4:42 AM, narayanabhatla NarasimhaMurthy 
nn.mur...@cmcltd.com wrote:

In our application, we need distributed RDDs containing key-value maps. We
have operations that update RDDs by way of adding entries to the map, delete
entries from the map as well as update value part of maps.
We also have map reduce functions that operate on the RDDs.The questions are
the following.
1. Can RDDs be updated? if Yes, what rae the methods?
2. If we update RDDs, will it happen in place or does it create new RDDs
with almost double the original RDD size (original+newly created RDD)?
Thank you very much.
N.N.Murthy



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/update-of-RDDs-tp5132.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

 

__

DISCLAIMER

The information contained in this e-mail message and/or attachments to it may
contain confidential or privileged information. If you are not the intended
recipient, any dissemination, use, review, distribution, printing or copying
of the information contained in this e-mail message and/or attachments to it
are strictly prohibited. If you have received this communication in error,
please notify us by reply e-mail or directly to netsupp...@cmcltd.com or
telephone and immediately and permanently delete the message and any
attachments. Thank you.

__

This email has been scrubbed for your protection by SecureMX.
For more information visit http://securemx.in
_


Re: GraphX. How to remove vertex or edge?

2014-05-01 Thread Daniel Darabos
Graph.subgraph() allows you to apply a filter to edges and/or vertices.


On Thu, May 1, 2014 at 8:52 AM, Николай Кинаш peroksi...@gmail.com wrote:

 Hello.

 How to remove vertex or edges from graph in GraphX?



Re: My talk on Spark: The Next Top (Compute) Model

2014-05-01 Thread Daniel Darabos
Cool intro, thanks! One question. On slide 23 it says Standalone (local
mode). That sounds a bit confusing without hearing the talk.

Standalone mode is not local. It just does not depend on a cluster
software. I think it's the best mode for EC2/GCE, because they provide a
distributed filesystem anyway (S3/GCS). Why configure Hadoop if you don't
have to.


On Thu, May 1, 2014 at 12:25 AM, Dean Wampler deanwamp...@gmail.com wrote:

 I meant to post this last week, but this is a talk I gave at the Philly
 ETE conf. last week:

 http://www.slideshare.net/deanwampler/spark-the-next-top-compute-model

 Also here:

 http://polyglotprogramming.com/papers/Spark-TheNextTopComputeModel.pdf

 dean

 --
 Dean Wampler, Ph.D.
 Typesafe
 @deanwampler
 http://typesafe.com
 http://polyglotprogramming.com



Re: My talk on Spark: The Next Top (Compute) Model

2014-05-01 Thread Dean Wampler
Thanks for the clarification. I'll fix the slide. I've done a lot of
Scalding/Cascading programming where the two concepts are synonymous, but
clearly I was imposing my prejudices here ;)

dean


On Thu, May 1, 2014 at 8:18 AM, Daniel Darabos 
daniel.dara...@lynxanalytics.com wrote:

 Cool intro, thanks! One question. On slide 23 it says Standalone (local
 mode). That sounds a bit confusing without hearing the talk.

 Standalone mode is not local. It just does not depend on a cluster
 software. I think it's the best mode for EC2/GCE, because they provide a
 distributed filesystem anyway (S3/GCS). Why configure Hadoop if you don't
 have to.


 On Thu, May 1, 2014 at 12:25 AM, Dean Wampler deanwamp...@gmail.comwrote:

 I meant to post this last week, but this is a talk I gave at the Philly
 ETE conf. last week:

 http://www.slideshare.net/deanwampler/spark-the-next-top-compute-model

 Also here:

 http://polyglotprogramming.com/papers/Spark-TheNextTopComputeModel.pdf

 dean

 --
 Dean Wampler, Ph.D.
 Typesafe
 @deanwampler
 http://typesafe.com
 http://polyglotprogramming.com





-- 
Dean Wampler, Ph.D.
Typesafe
@deanwampler
http://typesafe.com
http://polyglotprogramming.com


Re: My talk on Spark: The Next Top (Compute) Model

2014-05-01 Thread ZhangYi
Very Useful material. Currently, I am trying to persuade my client choose Spark 
instead of Hadoop MapReduce. Your slide give me more evidence to support my 
opinion.   

--  
ZhangYi (张逸)
Developer
tel: 15023157626
blog: agiledon.github.com
weibo: tw张逸
Sent with Sparrow (http://www.sparrowmailapp.com/?sig)


On Thursday, May 1, 2014 at 9:18 PM, Daniel Darabos wrote:

 Cool intro, thanks! One question. On slide 23 it says Standalone (local 
 mode). That sounds a bit confusing without hearing the talk.
  
 Standalone mode is not local. It just does not depend on a cluster software. 
 I think it's the best mode for EC2/GCE, because they provide a distributed 
 filesystem anyway (S3/GCS). Why configure Hadoop if you don't have to.
  
  
 On Thu, May 1, 2014 at 12:25 AM, Dean Wampler deanwamp...@gmail.com 
 (mailto:deanwamp...@gmail.com) wrote:
  I meant to post this last week, but this is a talk I gave at the Philly ETE 
  conf. last week:  
   
  http://www.slideshare.net/deanwampler/spark-the-next-top-compute-model  
   
  Also here:
   
  http://polyglotprogramming.com/papers/Spark-TheNextTopComputeModel.pdf  
   
  dean  
   
  --  
  Dean Wampler, Ph.D.
  Typesafe
  @deanwampler
  http://typesafe.com
  http://polyglotprogramming.com
   
   
   
   
  
  
  



Re: My talk on Spark: The Next Top (Compute) Model

2014-05-01 Thread Dean Wampler
That's great! Thanks. Let me know if it works ;) or what I could improve to
make it work.

dean


On Thu, May 1, 2014 at 8:45 AM, ZhangYi yizh...@thoughtworks.com wrote:

  Very Useful material. Currently, I am trying to persuade my client choose
 Spark instead of Hadoop MapReduce. Your slide give me more evidence to
 support my opinion.

 --
 ZhangYi (张逸)
 Developer
 tel: 15023157626
 blog: agiledon.github.com
 weibo: tw张逸
 Sent with Sparrow http://www.sparrowmailapp.com/?sig

 On Thursday, May 1, 2014 at 9:18 PM, Daniel Darabos wrote:

 Cool intro, thanks! One question. On slide 23 it says Standalone (local
 mode). That sounds a bit confusing without hearing the talk.

 Standalone mode is not local. It just does not depend on a cluster
 software. I think it's the best mode for EC2/GCE, because they provide a
 distributed filesystem anyway (S3/GCS). Why configure Hadoop if you don't
 have to.


 On Thu, May 1, 2014 at 12:25 AM, Dean Wampler deanwamp...@gmail.comwrote:

  I meant to post this last week, but this is a talk I gave at the Philly
 ETE conf. last week:

 http://www.slideshare.net/deanwampler/spark-the-next-top-compute-model

 Also here:

 http://polyglotprogramming.com/papers/Spark-TheNextTopComputeModel.pdf

 dean

 --
 Dean Wampler, Ph.D.
 Typesafe
 @deanwampler
 http://typesafe.com
 http://polyglotprogramming.com






-- 
Dean Wampler, Ph.D.
Typesafe
@deanwampler
http://typesafe.com
http://polyglotprogramming.com


sbt/sbt run command returns a JVM problem

2014-05-01 Thread Carter
Hi, I have a very simple spark program written in Scala:
/*** testApp.scala ***/
object testApp {
  def main(args: Array[String]) {
println(Hello! World!)
  }
}
Then I use the following command to compile it:
$ sbt/sbt package
The compilation finished successfully and I got a JAR file.
But when I use this command to run it:
$ sbt/sbt run
it returned an error with JVM:
[info] Error occurred during initialization of VM 
[info] Could not reserve enough space for object heap 
[error] Error: Could not create the Java Virtual Machine. 
[error] Error: A fatal exception has occurred. Program will exit. 
java.lang.RuntimeException: Nonzero exit code returned from runner: 1   
at scala.sys.package$.error(package.scala:27)

My machine has 2G memory, and runs on Ubuntu 11.04. I also tried to change
the setting of java parameter (e.g.,  -Xmx, -Xms, -XX:MaxPermSize,
-XX:ReservedCodeCacheSize) in the file sbt/sbt, but it looks like non of the
change works. Can anyone help me out with this problem? Thank you very much.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sbt-sbt-run-command-returns-a-JVM-problem-tp5157.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: sbt/sbt run command returns a JVM problem

2014-05-01 Thread Sean Owen
Here's how I configure SBT, which I think is the usual way:

export SBT_OPTS=-XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256m -Xmx1g

See if that takes. But your error is that you're already asking for
too much memory for your machine. So maybe you are setting the value
successfully, but it's not valid. How big?


On Thu, May 1, 2014 at 2:57 PM, Chester Chen chesterxgc...@yahoo.com wrote:
 You might want to check the memory settings in sbt itself, which its a shell 
 scripts run a java command. I don't have computer at hand, but if you vim or 
 cat the sbt/sbt , you might see the memory settings , you change it to fit 
 your need

 You might also can overwrite the setting by change .sbtopts  without change 
 the script , but google it for sure.

 Chester

 Sent from my iPhone

 On May 1, 2014, at 6:47 AM, Carter gyz...@hotmail.com wrote:

 Hi, I have a very simple spark program written in Scala:
 /*** testApp.scala ***/
 object testApp {
  def main(args: Array[String]) {
println(Hello! World!)
  }
 }
 Then I use the following command to compile it:
 $ sbt/sbt package
 The compilation finished successfully and I got a JAR file.
 But when I use this command to run it:
 $ sbt/sbt run
 it returned an error with JVM:
 [info] Error occurred during initialization of VM
 [info] Could not reserve enough space for object heap
 [error] Error: Could not create the Java Virtual Machine.
 [error] Error: A fatal exception has occurred. Program will exit.
 java.lang.RuntimeException: Nonzero exit code returned from runner: 1
 at scala.sys.package$.error(package.scala:27)

 My machine has 2G memory, and runs on Ubuntu 11.04. I also tried to change
 the setting of java parameter (e.g.,  -Xmx, -Xms, -XX:MaxPermSize,
 -XX:ReservedCodeCacheSize) in the file sbt/sbt, but it looks like non of the
 change works. Can anyone help me out with this problem? Thank you very much.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/sbt-sbt-run-command-returns-a-JVM-problem-tp5157.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: What is Seq[V] in updateStateByKey?

2014-05-01 Thread Adrian Mocanu
So Seq[V] contains only new tuples. I initially thought that whenever a new 
tuple was found, it would add it to Seq and call the update function 
immediately so there wouldn't be more than 1 update to Seq per function call.

Say I want to sum tuples with the same key is an RDD using updateStateByKey, 
Then (1) Seq[V] would contain the numbers for a particular key and my S state 
could be the sum? 
Or would (2) Seq contain partial sums (say sum per partition?) which I then 
need to sum into the final sum?

After writing this out and thinking a little more about it I think #2 is 
correct. Can you confirm?

Thanks again!
-A

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: April-30-14 4:30 PM
To: user@spark.apache.org
Subject: Re: What is Seq[V] in updateStateByKey?

S is the previous count, if any. Seq[V] are potentially many new counts. All of 
them have to be added together to keep an accurate total.  It's as if the count 
were 3, and I tell you I've just observed 2, 5, and 1 additional occurrences -- 
the new count is 3 + (2+5+1) not
1 + 1.


I butted in since I'd like to ask a different question about the same line of 
code. Why:

  val currentCount = values.foldLeft(0)(_ + _)

instead of

  val currentCount = values.sum

This happens a few places in the code. sum seems equivalent and likely quicker. 
Same with things like filter(_ == 200).size instead of count(_ == 200)... 
pretty trivial but hey.


On Wed, Apr 30, 2014 at 9:23 PM, Adrian Mocanu amoc...@verticalscope.com 
wrote:
 Hi TD,

 Why does the example keep recalculating the count via fold?

 Wouldn’t it make more sense to get the last count in values Seq and 
 add 1 to it and save that as current count?



 From what Sean explained I understand that all values in Seq have the 
 same key. Then when a new value for that key is found it is added to 
 this Seq collection and the update function is called.



 Is my understanding correct?


Spark Training

2014-05-01 Thread Nicholas Chammas
There are many freely-available resources for the enterprising individual
to use if they want to Spark up their life.

For others, some structured training is in order. Say I want everyone from
my department at my company to get something like the AMP
Camphttp://ampcamp.berkeley.edu/experience, perhaps on-site.

What are my options for that?

Databricks doesn't have a contact page, so I figured this would be the next
best place to ask.

Nick




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Training-tp5166.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Spark profiler

2014-05-01 Thread Punya Biswal
Hi all,

I am thinking of starting work on a profiler for Spark clusters. The current
idea is that it would collect jstacks from executor nodes and put them into
a central index (either a database or elasticsearch), and it would present
them to people in a UI that would let people slice and dice the jstacks
based on what job was running at the time, and what executor node was
running. In addition, the UI would also present time spent doing
non-computational work, such as shuffling and input/output IO. In a future
extension, we might support reading from JMX and/or a JVM agent to get more
precise data.

I know that it's already possible to use YourKit to profile individual
processes, but YourKit costs money, needs a desktop client to be installed,
and doesn't place its data in the context relevant to a Spark cluster.

Does something like this already exist (or is such a project already in
progress)? Do you have any feedback or recommendations for how to go about
it?

Thanks!
Punya





smime.p7s
Description: S/MIME cryptographic signature


RE: Spark Training

2014-05-01 Thread Huang, Roger
If you're in the Bay Area, the Spark Summit would be a great source of 
information.
http://spark-summit.org/2014
-Roger

From: Nicholas Chammas [mailto:nicholas.cham...@gmail.com]
Sent: Thursday, May 01, 2014 10:12 AM
To: u...@spark.incubator.apache.org
Subject: Spark Training

There are many freely-available resources for the enterprising individual to 
use if they want to Spark up their life.

For others, some structured training is in order. Say I want everyone from my 
department at my company to get something like the AMP 
Camphttp://ampcamp.berkeley.edu/ experience, perhaps on-site.

What are my options for that?

Databricks doesn't have a contact page, so I figured this would be the next 
best place to ask.

Nick



View this message in context: Spark 
Traininghttp://apache-spark-user-list.1001560.n3.nabble.com/Spark-Training-tp5166.html
Sent from the Apache Spark User List mailing list 
archivehttp://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.


Re: Spark profiler

2014-05-01 Thread Mayur Rustagi
Some thing like Twitter Ambrose would be lovely to integrate :)


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Thu, May 1, 2014 at 8:44 PM, Punya Biswal pbis...@palantir.com wrote:

 Hi all,

 I am thinking of starting work on a profiler for Spark clusters. The
 current idea is that it would collect jstacks from executor nodes and put
 them into a central index (either a database or elasticsearch), and it
 would present them to people in a UI that would let people slice and dice
 the jstacks based on what job was running at the time, and what executor
 node was running. In addition, the UI would also present time spent doing
 non-computational work, such as shuffling and input/output IO. In a future
 extension, we might support reading from JMX and/or a JVM agent to get more
 precise data.

 I know that it's already possible to use YourKit to profile individual
 processes, but YourKit costs money, needs a desktop client to be installed,
 and doesn't place its data in the context relevant to a Spark cluster.

 Does something like this already exist (or is such a project already in
 progress)? Do you have any feedback or recommendations for how to go about
 it?

 Thanks!
 Punya




Re: Spark Training

2014-05-01 Thread Denny Lee
You may also want to check out Paco Nathan's Introduction to Spark courses: 
http://liber118.com/pxn/



 On May 1, 2014, at 8:20 AM, Mayur Rustagi mayur.rust...@gmail.com wrote:
 
 Hi Nicholas,
 We provide training on spark, hands-on also associated ecosystem. 
 We gave it recently at a conference in Santa Clara. Primarily its targetted 
 to novices in Spark ecosystem, to introduce them  hands on to get them to 
 write simple codes  also queries on Shark. 
 I think Cloudera also has one which is for Spark, Streaming  MLLib. 
 
 Regards
 Mayur
 
 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi
 
 
 
 On Thu, May 1, 2014 at 8:42 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:
 There are many freely-available resources for the enterprising individual to 
 use if they want to Spark up their life.
 
 For others, some structured training is in order. Say I want everyone from 
 my department at my company to get something like the AMP Camp experience, 
 perhaps on-site.
 
 What are my options for that?
 
 Databricks doesn't have a contact page, so I figured this would be the next 
 best place to ask.
 
 Nick
 
 
 View this message in context: Spark Training
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 


Equally weighted partitions in Spark

2014-05-01 Thread deenar.toraskar
Hi

I am using Spark to distribute computationally intensive tasks across the
cluster. Currently I partition my RDD of tasks randomly. There is a large
variation in how long each of the jobs take to complete, leading to most
partitions being processed quickly and a couple of partitions take forever
to complete. I can mitigate this problem by increasing the number of
partitions to some extent.

Ideally i would like to partition tasks by complexity (Let's assume I can
get such a value from the task object) such that each sum of complexity in
of elements in each partition evenly distributed. Has anyone created such a
partitioner before? 


Regards
Deenar



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Equally-weighted-partitions-in-Spark-tp5171.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Training

2014-05-01 Thread Dean Wampler
I'm working on a 1-day workshop that I'm giving in Australia next week and
a few other conferences later in the year. I'll post a link when it's ready.

dean


On Thu, May 1, 2014 at 10:30 AM, Denny Lee denny.g@gmail.com wrote:

 You may also want to check out Paco Nathan's Introduction to Spark
 courses: http://liber118.com/pxn/



 On May 1, 2014, at 8:20 AM, Mayur Rustagi mayur.rust...@gmail.com wrote:

 Hi Nicholas,
 We provide training on spark, hands-on also associated ecosystem.
 We gave it recently at a conference in Santa Clara. Primarily its
 targetted to novices in Spark ecosystem, to introduce them  hands on to
 get them to write simple codes  also queries on Shark.
 I think Cloudera also has one which is for Spark, Streaming  MLLib.

 Regards
 Mayur

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Thu, May 1, 2014 at 8:42 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 There are many freely-available resources for the enterprising individual
 to use if they want to Spark up their life.

 For others, some structured training is in order. Say I want everyone
 from my department at my company to get something like the AMP 
 Camphttp://ampcamp.berkeley.edu/experience, perhaps on-site.

 What are my options for that?

 Databricks doesn't have a contact page, so I figured this would be the
 next best place to ask.

 Nick


 --
 View this message in context: Spark 
 Traininghttp://apache-spark-user-list.1001560.n3.nabble.com/Spark-Training-tp5166.html
 Sent from the Apache Spark User List mailing list 
 archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at
 Nabble.com.





-- 
Dean Wampler, Ph.D.
Typesafe
@deanwampler
http://typesafe.com
http://polyglotprogramming.com


Re: Efficient Aggregation over DB data

2014-05-01 Thread Andrea Esposito
Hi Sai,

i don't sincerely figure out where you are using the RDDs (because the
split method isn't defined in them) by the way you should use the map
function instead of the foreach due the fact it is NOT idempotent and some
partitions could be recomputed executing the function multiple times.

What maybe you are searching is:

val input = sc.textFile(inputFile)
val result= input.flatMap(line = line.split(\\n).map(x =
x.split(\\s)(2).toInt))
result.max
result.min

result.filter

??

Best,
EA



2014-04-22 11:02 GMT+02:00 Sai Prasanna ansaiprasa...@gmail.com:

 Hi All,

 I want to access a particular column of a DB table stored in a CSV format
 and perform some aggregate queries over it. I wrote the following query in
 scala as a first step.

 *var add=(x:String)=x.split(\\s+)(2).toInt*
 *var result=List[Int]()*

 *input.split(\\n).foreach(x=result::=add(x)) *
 *[Queries:]result.max/min/filter/sum...*

 But is there an efficient way/in-built function to access a particular
 column value or entire column in Spark ? Because built-in implementation
 might be efficient !

 Thanks.



Re: Reading multiple S3 objects, transforming, writing back one

2014-05-01 Thread Peter
Thank you Patrick. 

I took a quick stab at it:

    val s3Client = new AmazonS3Client(...)
    val copyObjectResult = s3Client.copyObject(upload, outputPrefix + 
/part-0, rolled-up-logs, 2014-04-28.csv)
    val objectListing = s3Client.listObjects(upload, outputPrefix)
    s3Client.deleteObjects(new 
DeleteObjectsRequest(upload).withKeys(objectListing.getObjectSummaries.asScala.map(s
 = new KeyVersion(s.getKey)).asJava))

Using a 3GB object I achieved about 33MB/s between buckets in the same AZ. 

This is a workable solution for the short term but not ideal for the longer 
term as data size increases. I understand it's a limitation of the Hadoop API 
but ultimately it must be possible to dump a RDD to a single S3 object :) 

On Wednesday, April 30, 2014 7:01 PM, Patrick Wendell pwend...@gmail.com 
wrote:
 
This is a consequence of the way the Hadoop files API works. However,
you can (fairly easily) add code to just rename the file because it
will always produce the same filename.

(heavy use of pseudo code)

dir = /some/dir
rdd.coalesce(1).saveAsTextFile(dir)
f = new File(dir + part-0)
f.moveTo(somewhere else)
dir.remove()

It might be cool to add a utility called `saveAsSingleFile` or
something that does this for you. In fact probably we should have
called saveAsTextfile saveAsTextFiles to make it more clear...


On Wed, Apr 30, 2014 at 2:00 PM, Peter thenephili...@yahoo.com wrote:
 Thanks Nicholas, this is a bit of a shame, not very practical for log roll
 up for example when every output needs to be in it's own directory.
 On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:
 Yes, saveAsTextFile() will give you 1 part per RDD partition. When you
 coalesce(1), you move everything in the RDD to a single partition, which
 then gives you 1 output file.
 It will still be called part-0 or something like that because that's
 defined by the Hadoop API that Spark uses for reading to/writing from S3. I
 don't know of a way to change that.


 On Wed, Apr 30, 2014 at 2:47 PM, Peter thenephili...@yahoo.com wrote:

 Ah, looks like RDD.coalesce(1) solves one part of the problem.
 On Wednesday, April 30, 2014 11:15 AM, Peter thenephili...@yahoo.com
 wrote:
 Hi

 Playing around with Spark  S3, I'm opening multiple objects (CSV files)
 with:

     val hfile = sc.textFile(s3n://bucket/2014-04-28/)

 so hfile is a RDD representing 10 objects that were underneath 2014-04-28.
 After I've sorted and otherwise transformed the content, I'm trying to write
 it back to a single object:


 sortedMap.values.map(_.mkString(,)).saveAsTextFile(s3n://bucket/concatted.csv)

 unfortunately this results in a folder named concatted.csv with 10 objects
 underneath, part-0 .. part-00010, corresponding to the 10 original
 objects loaded.

 How can I achieve the desired behaviour of putting a single object named
 concatted.csv ?

 I've tried 0.9.1 and 1.0.0-RC3.

 Thanks!
 Peter








Spark streaming

2014-05-01 Thread Mohit Singh
Hi,
  I guess Spark is using streaming in context of streaming live data but
what I mean is something more on the lines of  hadoop streaming.. where one
can code in any programming language?
Or is something among that lines on the cards?

Thanks
-- 
Mohit

When you want success as badly as you want the air, then you will get it.
There is no other secret of success.
-Socrates


Re: Spark streaming

2014-05-01 Thread Tathagata Das
Take a look at the RDD.pipe() operation. That allows you to pipe the data
in a RDD to any external shell command (just like Unix Shell pipe).
On May 1, 2014 10:46 AM, Mohit Singh mohit1...@gmail.com wrote:

 Hi,
   I guess Spark is using streaming in context of streaming live data but
 what I mean is something more on the lines of  hadoop streaming.. where one
 can code in any programming language?
 Or is something among that lines on the cards?

 Thanks
 --
 Mohit

 When you want success as badly as you want the air, then you will get it.
 There is no other secret of success.
 -Socrates



permition problem

2014-05-01 Thread Livni, Dana
I'm working with spark 0.9.0 on cdh5.
I'm running a spark application written in java in yarn-client mode.

Cause of the OP installed on the cluster I need to run the application using 
the hdfs user, otherwise I have a permission problem  and getting the following 
error:
org.apache.hadoop.ipc.RemoteException: Permission denied: user=root, 
access=WRITE, inode=/user:hdfs:supergroup:drwxr-xr-x

I need to run my application in two modes.
The first using java -cp , (in this case there is no problem, since I can 
change the running user using sudo -su hdfs, and then everything is working 
great).

But the second mode is running the application on top of tomcat service.
This tomcat is running on a different computer (outside the cluster, but it 
have permeations on for the cluster and have mount folder to all the resources 
needed )
the tomcat is running on root user.
Is there a way (spark environment  variable/other configuration/java runtime 
code) to make the spark part (mappers) to run using the hdfs user instead of 
the root user?

Thanks Dana




-
Intel Electronics Ltd.

This e-mail and any attachments may contain confidential material for
the sole use of the intended recipient(s). Any review or distribution
by others is strictly prohibited. If you are not the intended
recipient, please contact the sender and delete all copies.


Re: permition problem

2014-05-01 Thread Sean Owen
Yeah actually it's hdfs that has superuser privileges on HDFS, not
root. It looks like you're trying to access a nonexistent user
directory like /user/foo, and it fails because root can't create it,
and you inherit privileges for root since that is what your app runs
as.

I don't think you want to impersonate the hdfs user if you can avoid
it, for some of the same reasons you shouldn't run as root. This
account won't be stopped from deleting the entire cluster
accidentally!

I take it you must run as root because of Tomcat and privileged ports.
One solution is to put a proxy/load-balancer in front that runs as
root, which is a bit safer anyway, letting you run Tomcat as an
application user, whose data directory can be set up ahead of time
with the right permission.

If you really have to impersonate a different user from the process
that runs as root, I bet someone else can tell you how to do that!
--
Sean Owen | Director, Data Science | London


On Thu, May 1, 2014 at 7:00 PM, Livni, Dana dana.li...@intel.com wrote:
 I'm working with spark 0.9.0 on cdh5.

 I'm running a spark application written in java in yarn-client mode.



 Cause of the OP installed on the cluster I need to run the application using
 the hdfs user, otherwise I have a permission problem  and getting the
 following error:

 org.apache.hadoop.ipc.RemoteException: Permission denied: user=root,
 access=WRITE, inode=/user:hdfs:supergroup:drwxr-xr-x



 I need to run my application in two modes.

 The first using java –cp , (in this case there is no problem, since I can
 change the running user using sudo –su hdfs, and then everything is working
 great).



 But the second mode is running the application on top of tomcat service.

 This tomcat is running on a different computer (outside the cluster, but it
 have permeations on for the cluster and have mount folder to all the
 resources needed )

 the tomcat is running on root user.

 Is there a way (spark environment  variable/other configuration/java runtime
 code) to make the spark part (mappers) to run using the hdfs user instead of
 the root user?



 Thanks Dana









 -
 Intel Electronics Ltd.

 This e-mail and any attachments may contain confidential material for
 the sole use of the intended recipient(s). Any review or distribution
 by others is strictly prohibited. If you are not the intended
 recipient, please contact the sender and delete all copies.


Re: Reading multiple S3 objects, transforming, writing back one

2014-05-01 Thread Nicholas Chammas
The fastest way to save to S3 should be to leave the RDD with many
partitions, because all partitions will be written out in parallel.

Then, once the various parts are in S3, somehow concatenate the files
together into one file.

If this can be done within S3 (I don't know if this is possible), then you
get the best of both worlds: a highly parallelized write to S3, and a
single cleanly named output file.


On Thu, May 1, 2014 at 12:52 PM, Peter thenephili...@yahoo.com wrote:

 Thank you Patrick.

 I took a quick stab at it:

 val s3Client = new AmazonS3Client(...)
 val copyObjectResult = s3Client.copyObject(upload, outputPrefix +
 /part-0, rolled-up-logs, 2014-04-28.csv)
 val objectListing = s3Client.listObjects(upload, outputPrefix)
 s3Client.deleteObjects(new
 DeleteObjectsRequest(upload).withKeys(objectListing.getObjectSummaries.asScala.map(s
 = new KeyVersion(s.getKey)).asJava))

 Using a 3GB object I achieved about 33MB/s between buckets in the same AZ.

 This is a workable solution for the short term but not ideal for the
 longer term as data size increases. I understand it's a limitation of the
 Hadoop API but ultimately it must be possible to dump a RDD to a single S3
 object :)

   On Wednesday, April 30, 2014 7:01 PM, Patrick Wendell 
 pwend...@gmail.com wrote:
  This is a consequence of the way the Hadoop files API works. However,
 you can (fairly easily) add code to just rename the file because it
 will always produce the same filename.

 (heavy use of pseudo code)

 dir = /some/dir
 rdd.coalesce(1).saveAsTextFile(dir)
 f = new File(dir + part-0)
 f.moveTo(somewhere else)
 dir.remove()

 It might be cool to add a utility called `saveAsSingleFile` or
 something that does this for you. In fact probably we should have
 called saveAsTextfile saveAsTextFiles to make it more clear...

 On Wed, Apr 30, 2014 at 2:00 PM, Peter thenephili...@yahoo.com wrote:
  Thanks Nicholas, this is a bit of a shame, not very practical for log
 roll
  up for example when every output needs to be in it's own directory.
  On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas
  nicholas.cham...@gmail.com wrote:
  Yes, saveAsTextFile() will give you 1 part per RDD partition. When you
  coalesce(1), you move everything in the RDD to a single partition, which
  then gives you 1 output file.
  It will still be called part-0 or something like that because that's
  defined by the Hadoop API that Spark uses for reading to/writing from
 S3. I
  don't know of a way to change that.
 
 
  On Wed, Apr 30, 2014 at 2:47 PM, Peter thenephili...@yahoo.com wrote:
 
  Ah, looks like RDD.coalesce(1) solves one part of the problem.
  On Wednesday, April 30, 2014 11:15 AM, Peter thenephili...@yahoo.com
  wrote:
  Hi
 
  Playing around with Spark  S3, I'm opening multiple objects (CSV files)
  with:
 
 val hfile = sc.textFile(s3n://bucket/2014-04-28/)
 
  so hfile is a RDD representing 10 objects that were underneath
 2014-04-28.
  After I've sorted and otherwise transformed the content, I'm trying to
 write
  it back to a single object:
 
 
 
 sortedMap.values.map(_.mkString(,)).saveAsTextFile(s3n://bucket/concatted.csv)
 
  unfortunately this results in a folder named concatted.csv with 10
 objects
  underneath, part-0 .. part-00010, corresponding to the 10 original
  objects loaded.
 
  How can I achieve the desired behaviour of putting a single object named
  concatted.csv ?
 
  I've tried 0.9.1 and 1.0.0-RC3.
 
  Thanks!
  Peter
 
 
 
 
 
 
 





ClassNotFoundException

2014-05-01 Thread Joe L
Hi, I am getting the following error. How could I fix this problem?

Joe

14/05/02 03:51:48 WARN TaskSetManager: Lost TID 12 (task 2.0:1)
14/05/02 03:51:48 INFO TaskSetManager: Loss was due to
java.lang.ClassNotFoundException:
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4 [duplicate 6]
14/05/02 03:51:48 ERROR TaskSetManager: Task 2.0:1 failed 4 times; aborting
job
14/05/02 03:51:48 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks
have all completed, from pool 
14/05/02 03:51:48 INFO TaskSetManager: Loss was due to
java.lang.ClassNotFoundException:
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4 [duplicate 7]
14/05/02 03:51:48 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks
have all completed, from pool 
14/05/02 03:51:48 INFO DAGScheduler: Failed to run count at
reasoner.scala:70
[error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task
2.0:1 failed 4 times (most recent failure: Exception failure:
java.lang.ClassNotFoundException:
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4)
org.apache.spark.SparkException: Job aborted: Task 2.0:1 failed 4 times
(most recent failure: Exception failure: java.lang.ClassNotFoundException:
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ClassNotFoundException-tp5182.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Can't be built on MAC

2014-05-01 Thread Zhige Xin
Hi dear all,

When I tried to build Spark 0.9.1 on my Mac OS X 10.9.2 with Java 8, I
found the following errors:

[error] error while loading CharSequence, class file
'/Library/Java/JavaVirtualMachines/jdk1.8.0_05.jdk/Contents/Home/jre/lib/rt.jar(java/lang/CharSequence.class)'
is broken
[error] (bad constant pool tag 18 at byte 10)
[error] error while loading Comparator, class file
'/Library/Java/JavaVirtualMachines/jdk1.8.0_05.jdk/Contents/Home/jre/lib/rt.jar(java/util/Comparator.class)'
is broken
[error] (bad constant pool tag 18 at byte 20)
[error] two errors found
[debug] Compilation failed (CompilerInterface)
[error] (compile:compile) Compilation failed

Can someone tell me what is going on? Thanks a lot!



Best,
Isaiah


Re: Can't be built on MAC

2014-05-01 Thread Ian Ferreira
HI Zhige,
I had the same issue and revert to using JDK 1.7.055

From:  Zhige Xin xinzhi...@gmail.com
Reply-To:  user@spark.apache.org
Date:  Thursday, May 1, 2014 at 12:32 PM
To:  user@spark.apache.org
Subject:  Can't be built on MAC

Hi dear all,

When I tried to build Spark 0.9.1 on my Mac OS X 10.9.2 with Java 8, I found
the following errors:

[error] error while loading CharSequence, class file
'/Library/Java/JavaVirtualMachines/jdk1.8.0_05.jdk/Contents/Home/jre/lib/rt.
jar(java/lang/CharSequence.class)' is broken
[error] (bad constant pool tag 18 at byte 10)
[error] error while loading Comparator, class file
'/Library/Java/JavaVirtualMachines/jdk1.8.0_05.jdk/Contents/Home/jre/lib/rt.
jar(java/util/Comparator.class)' is broken
[error] (bad constant pool tag 18 at byte 20)
[error] two errors found
[debug] Compilation failed (CompilerInterface)
[error] (compile:compile) Compilation failed

Can someone tell me what is going on? Thanks a lot!



Best,
Isaiah




Re: Can't be built on MAC

2014-05-01 Thread Zhige Xin
Thank you! Ian.


Zhige


On Thu, May 1, 2014 at 12:35 PM, Ian Ferreira ianferre...@hotmail.comwrote:

 HI Zhige,
 I had the same issue and revert to using JDK 1.7.055

 From: Zhige Xin xinzhi...@gmail.com
 Reply-To: user@spark.apache.org
 Date: Thursday, May 1, 2014 at 12:32 PM
 To: user@spark.apache.org
 Subject: Can't be built on MAC

 Hi dear all,

 When I tried to build Spark 0.9.1 on my Mac OS X 10.9.2 with Java 8, I
 found the following errors:

 [error] error while loading CharSequence, class file
 '/Library/Java/JavaVirtualMachines/jdk1.8.0_05.jdk/Contents/Home/jre/lib/rt.jar(java/lang/CharSequence.class)'
 is broken
 [error] (bad constant pool tag 18 at byte 10)
 [error] error while loading Comparator, class file
 '/Library/Java/JavaVirtualMachines/jdk1.8.0_05.jdk/Contents/Home/jre/lib/rt.jar(java/util/Comparator.class)'
 is broken
 [error] (bad constant pool tag 18 at byte 20)
 [error] two errors found
 [debug] Compilation failed (CompilerInterface)
 [error] (compile:compile) Compilation failed

 Can someone tell me what is going on? Thanks a lot!



 Best,
 Isaiah



updateStateByKey example not using correct input data?

2014-05-01 Thread Adrian Mocanu
I'm trying to understand updateStateByKey.

Here's an example I'm testing with:
Input data: DStream( RDD( (a,2) ), RDD( (a,3) ), RDD( (a,4) ), RDD( 
(a,5) ), RDD( (a,6) ), RDD( (a,7) ) )

Code:
  val updateFunc = (values: Seq[Int], state: Option[StateClass]) = {
  val previousState = state.getOrElse( StateClass(0,0, Seq()) )
  val currentSum = values.sum + previousState.sum
  val currentCount = values.size + previousState.count

if (currentCount==previousState.count) {
  None//if this RDD has no change then remove the tuple
} else {
  Some( StateClass(currentSum, currentCount, values) )
}
}

intStream.updateStateByKey[StateClass](updateFunc).transform(rdd=rdd.map(t=(t,rdd.id))).print()

Results:
((a,StateClass(14,5,ArrayBuffer(2.0, 3.0, 3.0, 3.0, 3.0))),12)
((a,StateClass(17,6,ArrayBuffer(3.0))),22)
((a,StateClass(20,7,ArrayBuffer(3.0))),32)

Questions:
Why does RDD with ID=12 have these elements: (2.0, 3.0, 3.0, 3.0, 3.0) ?
These do not exist in input data so where do these numbers come from? ..well 2 
and 3 exists but not the other 3's and it's missing 4,5,6,7 also.
What is going on here?

-Adrian



Running Spark jobs via oozie

2014-05-01 Thread Shivani Rao
Hello Spark Fans,

I am trying to run a spark job via oozie as a java action. The spark code
is packaged as a MySparkJob.jar compiled using sbt assembly (excluding
spark and hadoop dependencies).

I am able to invoke the spark job from any client using

java -cp lib/MySparkJob.jar:lib/spark-0.9-assembly-cdh4.jar Test

where Test is the name of the main class in my jar.

We have a spark cluster (say lab1) of 4 machines that are divided as 1
spark master and 3 workers. Oozie is running on another server ( say
oozie). In oozie I created a /tmp/test_deploy_spark folder in the hdfs and
this contains the following:

the workflow.xml
and a lib folder containing
 a) MySparkJob.jar
 b) spark-0.9-assembly-cdh4.jar (spark assembled with cdh4

The job launches successfully, but the mapper fails with following error at
the val sc = new SparkContext(sparkConf) line

[ERROR] [04/30/2014 22:25:15.440] [main] [Remoting] Remoting error:
[Startup timed out] [

akka.remote.RemoteTransportException: Startup timed out
at 
akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:129)
at akka.remote.Remoting.start(Remoting.scala:191)
at 
akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579)
at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577)
at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)
at 
org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:96)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:126)
at org.apache.spark.SparkContext.init(SparkContext.scala:139)

I have the following questions

a) Setting a jar path in Spark so that the job can be scheduled by oozie.
When I use the following setting in my sparkConf:
sparkConf.setJars(List(MySpark.jar)) where am I expected to load
MySpark.jar?

b) The above mentioned error seems to be arising from akka, not from not
being able to find the jar

If anyone has tried to run a spark job through oozie, please let me know if
you have any ideas

Thanks!
Shivani

-- 
Software Engineer
Analytics Engineering Team@ Box
Mountain View, CA


Setting the Scala version in the EC2 script?

2014-05-01 Thread Ian Ferreira
Is this possible, it is very annoying to have such a great script, but still
have to manually update stuff afterwards.




Re: Equally weighted partitions in Spark

2014-05-01 Thread Andrew Ash
The problem is that equally-sized partitions take variable time to complete
based on their contents?

Sent from my mobile phone
On May 1, 2014 8:31 AM, deenar.toraskar deenar.toras...@db.com wrote:

 Hi

 I am using Spark to distribute computationally intensive tasks across the
 cluster. Currently I partition my RDD of tasks randomly. There is a large
 variation in how long each of the jobs take to complete, leading to most
 partitions being processed quickly and a couple of partitions take forever
 to complete. I can mitigate this problem by increasing the number of
 partitions to some extent.

 Ideally i would like to partition tasks by complexity (Let's assume I can
 get such a value from the task object) such that each sum of complexity in
 of elements in each partition evenly distributed. Has anyone created such a
 partitioner before?


 Regards
 Deenar



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Equally-weighted-partitions-in-Spark-tp5171.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



range partitioner with updateStateByKey

2014-05-01 Thread Adrian Mocanu
If I use a range partitioner, will this make updateStateByKey take the tuples 
in order?
Right now I see them not being taken in order (most of them are ordered but not 
all)

-Adrian



java.lang.ClassNotFoundException

2014-05-01 Thread İbrahim Rıza HALLAÇ



HelIo. I followed A Standalone App in Java part of the tutorial 
https://spark.apache.org/docs/0.8.1/quick-start.html
Spark standalone cluster looks it's running without a problem : 
http://i.stack.imgur.com/7bFv8.png
I have built a fat jar for running this JavaApp on the cluster. Before maven 
package:find ../pom.xml./src./src/main
./src/main/java./src/main/java/SimpleApp.java

content of SimpleApp.java is :
 import org.apache.spark.api.java.*; import 
org.apache.spark.api.java.function.Function; import 
org.apache.spark.SparkConf; import org.apache.spark.SparkContext;

 public class SimpleApp { public static void main(String[] args) {
 SparkConf conf =  new SparkConf()   
.setMaster(spark://10.35.23.13:7077)   .setAppName(My 
app)   .set(spark.executor.memory, 1g);
 JavaSparkContext   sc = new JavaSparkContext (conf); String logFile = 
/home/ubuntu/spark-0.9.1/test_data; JavaRDDString logData = 
sc.textFile(logFile).cache();
 long numAs = logData.filter(new FunctionString, Boolean() {  public 
Boolean call(String s) { return s.contains(a); } }).count();
 System.out.println(Lines with a:  + numAs);  } } This program 
only works when master is set as setMaster(local). Otherwise I get this error 
: http://i.stack.imgur.com/doRSn.png
Thanks,Ibrahim
  

Re: How to handle this situation: Huge File Shared by All maps and Each Computer Has one copy?

2014-05-01 Thread PengWeiPRC
Thanks, Rustagi. Yes, the global data is read-only and stays from the
beginning to the end of the whole Spark task. Actually, it is not only
identical for one Map/Reduce task, but used by a lot of map/reduce tasks of
mine. That's why I intend to put the data into each node of my cluster, and
hope to see if it is possible for a Spark Map/Reduce program to let all the
nodes read it simultaneously from their local disks rather than read it by
one node and broadcast to other nodes. Any suggestions for solving it?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-handle-this-situation-Huge-File-Shared-by-All-maps-and-Each-Computer-Has-one-copy-tp5139p5192.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Task not serializable: collect, take

2014-05-01 Thread SK
Hi,

I have the following code structure. I compiles ok, but at runtime it aborts
with the error:
Exception in thread main org.apache.spark.SparkException: Job aborted:
Task not serializable: java.io.NotSerializableException: 
I am running in local (standalone) mode.

trait A{
  def input(...): ...
  def output(...)
  def computeSim(...): ... { }
}

class TA extends A{
   override input(...): {...}
   override output(...): {...}
}

object TA{
def main(...) {

 val c  = new TA
 val r  = c.input()
 val s = c.computeSim(r)
 c.output(s) 

   }
}

When I have all of the code in a single object, it runs and outputs the
correct result. But this error occurs only when I have the class and trait,
which I want to use to make it more modular. 

The error appears to be happening in the output() method. The  
transformations I am using in the output method, in the order in which they
appear  are: 

.map().collect().filter().sortBy().take()

It appears that both collect() and take() are not serializable (even though
I am running the code in local mode). If I drop collect(), there is a
compile error when I use sortBy. I need both collect() and take(). I am not
sure why these transformations work when I use a single object, but fail
when I use a class.
 
I would appreciate your help in helping fix this.

thanks





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Task-not-serializable-collect-take-tp5193.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Task not serializable: collect, take

2014-05-01 Thread Marcelo Vanzin
Have you tried making A extend Serializable?


On Thu, May 1, 2014 at 3:47 PM, SK skrishna...@gmail.com wrote:
 Hi,

 I have the following code structure. I compiles ok, but at runtime it aborts
 with the error:
 Exception in thread main org.apache.spark.SparkException: Job aborted:
 Task not serializable: java.io.NotSerializableException:
 I am running in local (standalone) mode.

 trait A{
   def input(...): ...
   def output(...)
   def computeSim(...): ... { }
 }


-- 
Marcelo


Re: Opinions stratosphere

2014-05-01 Thread Christopher Nguyen
Someone (Ze Ni, https://www.sics.se/people/ze-ni) has actually attempted
such a comparative study as a Masters thesis:

http://www.diva-portal.org/smash/get/diva2:605106/FULLTEXT01.pdf

According to this snapshot (c. 2013), Stratosphere is different from Spark
in not having an explicit concept of an in-memory dataset (e.g., RDD).

In principle this could be argued to be an implementation detail; the
operators and execution plan/data flow are of primary concern in the API,
and the data representation/materializations are otherwise unspecified.

But in practice, for long-running interactive applications, I consider RDDs
to be of fundamental, first-class citizen importance, and the key
distinguishing feature of Spark's model vs other in-memory approaches
that treat memory merely as an implicit cache.

--
Christopher T. Nguyen
Co-founder  CEO, Adatao http://adatao.com
linkedin.com/in/ctnguyen



On Tue, Nov 26, 2013 at 1:26 PM, Matei Zaharia matei.zaha...@gmail.comwrote:

 I don’t know a lot about it except from the research side, where the team
 has done interesting optimization stuff for these types of applications. In
 terms of the engine, one thing I’m not sure of is whether Stratosphere
 allows explicit caching of datasets (similar to RDD.cache()) and
 interactive queries (similar to spark-shell). But it’s definitely an
 interesting project to watch.

 Matei

 On Nov 22, 2013, at 4:17 PM, Ankur Chauhan achau...@brightcove.com
 wrote:

  Hi,
 
  That's what I thought but as per the slides on
 http://www.stratosphere.eu they seem to know about spark and the scala
 api does look similar.
  I found the PACT model interesting. Would like to know if matei or other
 core comitters have something to weight in on.
 
  -- Ankur
  On 22 Nov 2013, at 16:05, Patrick Wendell pwend...@gmail.com wrote:
 
  I've never seen that project before, would be interesting to get a
  comparison. Seems to offer a much lower level API. For instance this
  is a wordcount program:
 
 
 https://github.com/stratosphere/stratosphere/blob/master/pact/pact-examples/src/main/java/eu/stratosphere/pact/example/wordcount/WordCount.java
 
  On Thu, Nov 21, 2013 at 3:15 PM, Ankur Chauhan achau...@brightcove.com
 wrote:
  Hi,
 
  I was just curious about https://github.com/stratosphere/stratosphere
  and how does spark compare to it. Anyone has any experience with it to
 make
  any comments?
 
  -- Ankur
 




Question regarding doing aggregation over custom partitions

2014-05-01 Thread Arun Swami
Hi,

I am a newbie to Spark. I looked for documentation or examples to answer my
question but came up empty handed.

I don't know whether I am using the right terminology but here goes.

I have a file of records. Initially, I had the following Spark program (I
am omitting all the surrounding code and focusing only on the Spark related
code):

...
val recordsRDD = sc.textFile(pathSpec, 2).cache
val countsRDD: RDD[(String, Int)] = recordsRDD.flatMap(x =
getCombinations(x))
  .map(e = (e, 1))
  .reduceByKey(_ + _)
...

Here getCombinations() is a function I have written that takes a record and
returns List[String].

This program works as expected.

Now, I want to do the following. I want to partition the records in
recordsRDD by some key extracted from each record. I do this as follows:

val keyValueRecordsRDD: RDD[(String, String)] =
  recodsRDD.flatMap(getKeyValueRecord(_))

Here getKeyValueRecord() is a function I have written that takes a record
and returns a Tuple2 of a key and the original record.

Now I want to do the same operations as before (getCombinations(), and
count occurrences) BUT on each partition as defined by the key.
Essentially, I want to apply the operations individually in each partition.
In a separate step, I want to recover the
global counts across all partitions while keeping the partition based
counts.

How can I do this in Spark?

Thanks!

arun
*__*
*Arun Swami*


configure spark history server for running on Yarn

2014-05-01 Thread Jenny Zhao
Hi,

I have installed spark 1.0 from the branch-1.0, build went fine, and I have
tried running the example on Yarn client mode, here is my command:

/home/hadoop/spark-branch-1.0/bin/spark-submit
/home/hadoop/spark-branch-1.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.2.0.jar
--master yarn --deploy-mode client --executor-memory 6g --executor-cores 3
--driver-memory 3g --name SparkPi --num-executors 2 --class
org.apache.spark.examples.SparkPi yarn-client 5

after the run, I was not being able to retrieve the log from Yarn's web UI,
while I have tried to specify the history server in spark-env.sh

export SPARK_DAEMON_JAVA_OPTS=-Dspark.yarn.historyServer.address=
master:18080 http://hdtest022.svl.ibm.com:18080


I also tried to specify it in spark-defaults.conf, doesn't work as well, I
would appreciate if someone can tell me what is the way of specifying it
either in spark-env.sh or spark-defaults.conf, so that this option can be
applied to any spark application.

another thing I found is the usage output for spark-submit is not
complete/not in sync with the online documentation, hope it is addressed
with the formal release.

and is this the latest documentation for spark 1.0?
http://people.csail.mit.edu/matei/spark-unified-docs/running-on-yarn.html

Thank you!


Re: same partition id means same location?

2014-05-01 Thread wxhsdp
anyone talk something about this?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/same-partition-id-means-same-location-tp5136p5200.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


YARN issues with resourcemanager.scheduler.address

2014-05-01 Thread zsterone
Hi,

I'm trying to connect to a YARN cluster by running these commands:
export HADOOP_CONF_DIR=/hadoop/var/hadoop/conf/
export YARN_CONF_DIR=$HADOOP_CONF_DIR
export SPARK_YARN_MODE=true
export
SPARK_JAR=./assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar
export
SPARK_YARN_APP_JAR=examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.1.jar
export MASTER=yarn-client

./bin/spark-shell

This is what I have in my yarn-site.xml, I have not set
yar.resourcemanager.scheduler.address per
defaults(https://hadoop.apache.org/docs/r2.2.0/hadoop-yarn/hadoop-yarn-common/yarn-default.xml):
configuration
property
nameyarn.resourcemanager.hostname/name
valuemy-machine/value
/property
property
nameyarn.resourcemanager.address/name
value${yarn.resourcemanager.hostname}:51176/value
/property
property
nameyarn.nodemanager.webapp.address/name
value${yarn.nodemanager.hostname}:1183/value
/property
property
nameyarn.nodemanager.aux-services/name
valuemapreduce_shuffle/value
/property
property
nameyarn.log-aggregation-enable/name
valuetrue/value
/property
property
nameyarn.application.classpath/name
value/apollo/env/ForecastPipelineHadoopCluster/lib/*/value
/property
property
nameyarn.scheduler.minimum-allocation-mb/name
value500/value
/property
property
nameyarn.nodemanager.vmem-pmem-ratio/name
value5.1/value
descriptionwe use a lot of jars which consumes a ton of
vmem/description
/property
property
nameyarn.nodemanager.resource.memory-mb/name
value24500/value
/property
property
nameyarn.resourcemanager.am.max-attempts/name
value10/value
/property
property
nameyarn.resourcemanager.nodes.exclude-path/name
   
value/apollo/env/ForecastPipelineHadoopCluster/var/hadoop/conf/exclude/resourcemanager.exclude/value
/property
property
nameyarn.scheduler.maximum-allocation-mb/name
value11000/value
descriptionThis is the maximum amount of ram that any job can ask
for. Any more and the job will be denied.
11000 is currently the largest amount of ram any job uses. If a
new job needs more ram this the team adding the job
needs to ask the Forecasting Platform team for permission to
change this number.
/description
/property
property
nameyarn.nodemanager.user-home-dir/name
   
value/apollo/env/ForecastPipelineHadoopCluster/var/hadoop/tmp//value
descriptionI'm not particularly fond of this but matlab writes to
the user's home directory. Without this variable matlab will always
segfault. /description
/property
/configuration


When I go to my-machine:8088/conf
I get the expected output:
propertynameyarn.resourcemanager.scheduler.address/namevaluemy-machine:8030/valuesourceprogramatically/source/property

however, when I try running spark-shell, my application is stuck at this
phase:

14/05/02 00:41:35 INFO yarn.Client: Submitting application to ASM
14/05/02 00:41:35 INFO impl.YarnClientImpl: Submitted application
application_1397083384516_6571 to ResourceManager at my-machine/my-ip:51176
14/05/02 00:41:35 INFO cluster.YarnClientSchedulerBackend: Application
report from ASM: 
 appMasterRpcPort: 0
 appStartTime: 1398991295872
 yarnAppState: ACCEPTED

14/05/02 00:41:36 INFO cluster.YarnClientSchedulerBackend: Application
report from ASM: 
 appMasterRpcPort: 0
 appStartTime: 1398991295872
 yarnAppState: ACCEPTED


and it keeps going.  When I look at the log on the resource manager UI, I
get this:
2014-05-02 02:57:31,862 INFO  [sparkYarnAM-akka.actor.default-dispatcher-2]
slf4j.Slf4jLogger (Slf4jLogger.scala:applyOrElse(80)) - Slf4jLogger started
2014-05-02 02:57:31,917 INFO  [sparkYarnAM-akka.actor.default-dispatcher-5]
Remoting (Slf4jLogger.scala:apply$mcV$sp(74)) - Starting remoting
2014-05-02 02:57:32,104 INFO  [sparkYarnAM-akka.actor.default-dispatcher-2]
Remoting (Slf4jLogger.scala:apply$mcV$sp(74)) - Remoting started; listening
on addresses :[akka.tcp://sparkYarnAM@another-machine:37400]
2014-05-02 02:57:32,105 INFO  [sparkYarnAM-akka.actor.default-dispatcher-2]
Remoting (Slf4jLogger.scala:apply$mcV$sp(74)) - Remoting now listens on
addresses: [akka.tcp://sparkYarnAM@another-machine:37400]
2014-05-02 02:57:33,217 INFO  [main] client.RMProxy
(RMProxy.java:createRMProxy(56)) - *Connecting to ResourceManager at
0.0.0.0/0.0.0.0:8030*
2014-05-02 02:57:33,293 INFO  [main] yarn.WorkerLauncher
(Logging.scala:logInfo(50)) - ApplicationAttemptId:
appattempt_1397083384516_6859_01
2014-05-02 02:57:33,294 INFO  [main] yarn.WorkerLauncher
(Logging.scala:logInfo(50)) - Registering the ApplicationMaster
2014-05-02 02:57:34,330 INFO  [main] ipc.Client
(Client.java:handleConnectionFailure(783)) - Retrying connect to server:

Re: What is Seq[V] in updateStateByKey?

2014-05-01 Thread Tathagata Das
Depends on your code. Referring to the earlier example, if you do

words.map(x = (x,1)).updateStateByKey()

then for a particular word, if a batch contains 6 occurrences of that word,
then the Seq[V] will be [1, 1, 1, 1, 1, 1]

Instead if you do

words.map(x = (x,1)).reduceByKey(_ + _).updateStateByKey(...)

then Seq[V] will be [ 6 ]  , that is, all the 1s will be summed up already
due to the reduceByKey.

TD



On Thu, May 1, 2014 at 7:29 AM, Adrian Mocanu amoc...@verticalscope.comwrote:

 So Seq[V] contains only new tuples. I initially thought that whenever a
 new tuple was found, it would add it to Seq and call the update function
 immediately so there wouldn't be more than 1 update to Seq per function
 call.

 Say I want to sum tuples with the same key is an RDD using
 updateStateByKey, Then (1) Seq[V] would contain the numbers for a
 particular key and my S state could be the sum?
 Or would (2) Seq contain partial sums (say sum per partition?) which I
 then need to sum into the final sum?

 After writing this out and thinking a little more about it I think #2 is
 correct. Can you confirm?

 Thanks again!
 -A

 -Original Message-
 From: Sean Owen [mailto:so...@cloudera.com]
 Sent: April-30-14 4:30 PM
 To: user@spark.apache.org
 Subject: Re: What is Seq[V] in updateStateByKey?

 S is the previous count, if any. Seq[V] are potentially many new counts.
 All of them have to be added together to keep an accurate total.  It's as
 if the count were 3, and I tell you I've just observed 2, 5, and 1
 additional occurrences -- the new count is 3 + (2+5+1) not
 1 + 1.


 I butted in since I'd like to ask a different question about the same line
 of code. Why:

   val currentCount = values.foldLeft(0)(_ + _)

 instead of

   val currentCount = values.sum

 This happens a few places in the code. sum seems equivalent and likely
 quicker. Same with things like filter(_ == 200).size instead of count(_
 == 200)... pretty trivial but hey.


 On Wed, Apr 30, 2014 at 9:23 PM, Adrian Mocanu amoc...@verticalscope.com
 wrote:
  Hi TD,
 
  Why does the example keep recalculating the count via fold?
 
  Wouldn’t it make more sense to get the last count in values Seq and
  add 1 to it and save that as current count?
 
 
 
  From what Sean explained I understand that all values in Seq have the
  same key. Then when a new value for that key is found it is added to
  this Seq collection and the update function is called.
 
 
 
  Is my understanding correct?



Re: range partitioner with updateStateByKey

2014-05-01 Thread Tathagata Das
Ordered by what? arrival order? sort order?

TD



On Thu, May 1, 2014 at 2:35 PM, Adrian Mocanu amoc...@verticalscope.comwrote:

  If I use a range partitioner, will this make updateStateByKey take the
 tuples in order?

 Right now I see them not being taken in order (most of them are ordered
 but not all)



 -Adrian





Getting the following error using EC2 deployment

2014-05-01 Thread Ian Ferreira
I have a custom app that was compiled with scala 2.10.3 which I believe is
what the latest spark-ec2 script installs. However running it on the master
yields this cryptic error which according to the web implies incompatible
jar versions.

Exception in thread main java.lang.NoClassDefFoundError: scala/Function1

at Demo(Demo.scala)

Caused by: java.lang.ClassNotFoundException: scala.Function1

at java.net.URLClassLoader$1.run(URLClassLoader.java:366)

at java.net.URLClassLoader$1.run(URLClassLoader.java:355)

at java.security.AccessController.doPrivileged(Native Method)

at java.net.URLClassLoader.findClass(URLClassLoader.java:354)

at java.lang.ClassLoader.loadClass(ClassLoader.java:425)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)

at java.lang.ClassLoader.loadClass(ClassLoader.java:358)





How can I tell from my list of jars which are the culprits (can I see what
version of scala they were compiled with). This seems to be an issue that
was around the 2.8.x timelines for scala so not sure why I am getting this
for 2.10.3.



Please help!