Re: Question regarding doing aggregation over custom partitions

2014-05-03 Thread Arun Swami
Thanks, that was what I was missing!

arun


arun
*__*
*Arun Swami*
+1 408-338-0906



On Fri, May 2, 2014 at 4:28 AM, Mayur Rustagi mayur.rust...@gmail.comwrote:

 You need to first partition the data by the key
 Use mappartition instead of map.

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Fri, May 2, 2014 at 5:33 AM, Arun Swami a...@caspida.com wrote:

 Hi,

 I am a newbie to Spark. I looked for documentation or examples to answer
 my question but came up empty handed.

 I don't know whether I am using the right terminology but here goes.

 I have a file of records. Initially, I had the following Spark program (I
 am omitting all the surrounding code and focusing only on the Spark related
 code):

 ...
 val recordsRDD = sc.textFile(pathSpec, 2).cache
 val countsRDD: RDD[(String, Int)] = recordsRDD.flatMap(x =
 getCombinations(x))
   .map(e = (e, 1))
   .reduceByKey(_ + _)
 ...

 Here getCombinations() is a function I have written that takes a record
 and returns List[String].

 This program works as expected.

 Now, I want to do the following. I want to partition the records in
 recordsRDD by some key extracted from each record. I do this as follows:

 val keyValueRecordsRDD: RDD[(String, String)] =
   recodsRDD.flatMap(getKeyValueRecord(_))

 Here getKeyValueRecord() is a function I have written that takes a record
 and returns a Tuple2 of a key and the original record.

 Now I want to do the same operations as before (getCombinations(), and
 count occurrences) BUT on each partition as defined by the key.
 Essentially, I want to apply the operations individually in each partition.
 In a separate step, I want to recover the
 global counts across all partitions while keeping the partition based
 counts.

 How can I do this in Spark?

 Thanks!

 arun
 *__*
 *Arun Swami*





Re: string to int conversion

2014-05-03 Thread Sean Owen
On Sat, May 3, 2014 at 2:00 AM, SK skrishna...@gmail.com wrote:
 1) I have a csv file where one of the field has integer data but it appears
 as strings: 1, 3 etc. I tried using toInt to implcitly  convert the
 strings to int after reading (field(3).toInt). But I got a
 NumberFormatException. So I defined my own conversion
 as follows, but I still get a NumberFormatException - the toInt function on
 StringOps is failing. Any idea, how I can convert strings to int?

toInt certainly works, and you need not write your own conversion,
but, a string with double-quotes is not a valid number. To be safest,
you might write an unquote function that strips quotes if they
exist, rather than just remove all quotes in the string. (Or see above
using a Java library that specializes in handling all the details of
CSV, including quoted and escaped fields -- I liked SuperCSV.)


Re: sbt/sbt run command returns a JVM problem

2014-05-03 Thread Carter
Hi, thanks for all your help.
I tried your setting in the sbt file, but the problem is still there.

The Java setting in my sbt file is:
java \
  -Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=256m \
  -jar ${JAR} \
  $@

I have tried to set these 3 parameters bigger and smaller, but nothing
works. Did I change the right thing?

Thank you very much.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sbt-sbt-run-command-returns-a-JVM-problem-tp5157p5267.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark's behavior

2014-05-03 Thread Eduardo Costa Alfaia
Hi TD, Thanks for reply
This last experiment I did with one computer, like local, but I think that time 
gap grow up when I add more computer. I will do again now with 8 worker and 1 
word source and I will see what’s go on. I will control the time too, like 
suggested by Andrew. 
On May 3, 2014, at 1:19, Tathagata Das tathagata.das1...@gmail.com wrote:

 From the logs, I see that the print() starts printing stuff 10 seconds after 
 the context is started. And that 10 seconds is taken by the initial empty job 
 (50 map + 20 reduce tasks) that spark streaming starts to ensure all the 
 executors have started. Somehow the first empty task takes 7-8 seconds to 
 complete. See if this can be reproduced by running a simple, empty job in 
 spark shell (in the same cluster) and see if the first task takes 7-8 
 seconds. 
 
 Either way, I didnt see the 30 second gap, but a 10 second gap. And that does 
 not seem to be a persistent problem as after that 10 seconds, the data is 
 being received and processed.
 
 TD
 
 
 On Fri, May 2, 2014 at 2:14 PM, Eduardo Costa Alfaia e.costaalf...@unibs.it 
 wrote:
 Hi TD,
 
 I got the another information today using Spark 1.0 RC3 and the situation 
 remain the same:
 PastedGraphic-1.png
 
 The lines begin after 17 sec:
 
 14/05/02 21:52:25 INFO SparkDeploySchedulerBackend: Granted executor ID 
 app-20140502215225-0005/0 on hostPort computer8.ant-net:57229 with 2 cores, 
 2.0 GB RAM
 14/05/02 21:52:25 INFO AppClient$ClientActor: Executor updated: 
 app-20140502215225-0005/0 is now RUNNING
 14/05/02 21:52:25 INFO ReceiverTracker: ReceiverTracker started
 14/05/02 21:52:26 INFO ForEachDStream: metadataCleanupDelay = -1
 14/05/02 21:52:26 INFO SocketInputDStream: metadataCleanupDelay = -1
 14/05/02 21:52:26 INFO SocketInputDStream: Slide time = 1000 ms
 14/05/02 21:52:26 INFO SocketInputDStream: Storage level = 
 StorageLevel(false, false, false, false, 1)
 14/05/02 21:52:26 INFO SocketInputDStream: Checkpoint interval = null
 14/05/02 21:52:26 INFO SocketInputDStream: Remember duration = 1000 ms
 14/05/02 21:52:26 INFO SocketInputDStream: Initialized and validated 
 org.apache.spark.streaming.dstream.SocketInputDStream@5433868e
 14/05/02 21:52:26 INFO ForEachDStream: Slide time = 1000 ms
 14/05/02 21:52:26 INFO ForEachDStream: Storage level = StorageLevel(false, 
 false, false, false, 1)
 14/05/02 21:52:26 INFO ForEachDStream: Checkpoint interval = null
 14/05/02 21:52:26 INFO ForEachDStream: Remember duration = 1000 ms
 14/05/02 21:52:26 INFO ForEachDStream: Initialized and validated 
 org.apache.spark.streaming.dstream.ForEachDStream@1ebdbc05
 14/05/02 21:52:26 INFO SparkContext: Starting job: collect at 
 ReceiverTracker.scala:270
 14/05/02 21:52:26 INFO RecurringTimer: Started timer for JobGenerator at time 
 1399060346000
 14/05/02 21:52:26 INFO JobGenerator: Started JobGenerator at 1399060346000 ms
 14/05/02 21:52:26 INFO JobScheduler: Started JobScheduler
 14/05/02 21:52:26 INFO DAGScheduler: Registering RDD 3 (reduceByKey at 
 ReceiverTracker.scala:270)
 14/05/02 21:52:26 INFO ReceiverTracker: Stream 0 received 0 blocks
 14/05/02 21:52:26 INFO DAGScheduler: Got job 0 (collect at 
 ReceiverTracker.scala:270) with 20 output partitions (allowLocal=false)
 14/05/02 21:52:26 INFO DAGScheduler: Final stage: Stage 0(collect at 
 ReceiverTracker.scala:270)
 14/05/02 21:52:26 INFO DAGScheduler: Parents of final stage: List(Stage 1)
 14/05/02 21:52:26 INFO JobScheduler: Added jobs for time 1399060346000 ms
 14/05/02 21:52:26 INFO JobScheduler: Starting job streaming job 1399060346000 
 ms.0 from job set of time 1399060346000 ms
 14/05/02 21:52:26 INFO JobGenerator: Checkpointing graph for time 
 1399060346000 ms
 ---14/05/02 21:52:26 INFO 
 DStreamGraph: Updating checkpoint data for time 1399060346000 ms
 
 Time: 1399060346000 ms
 ---
 
 14/05/02 21:52:26 INFO JobScheduler: Finished job streaming job 1399060346000 
 ms.0 from job set of time 1399060346000 ms
 14/05/02 21:52:26 INFO JobScheduler: Total delay: 0.325 s for time 
 1399060346000 ms (execution: 0.024 s)
 
 
 
 14/05/02 21:52:42 INFO JobScheduler: Added jobs for time 1399060362000 ms
 14/05/02 21:52:42 INFO JobGenerator: Checkpointing graph for time 
 1399060362000 ms
 14/05/02 21:52:42 INFO DStreamGraph: Updating checkpoint data for time 
 1399060362000 ms
 14/05/02 21:52:42 INFO DStreamGraph: Updated checkpoint data for time 
 1399060362000 ms
 14/05/02 21:52:42 INFO JobScheduler: Starting job streaming job 1399060362000 
 ms.0 from job set of time 1399060362000 ms
 14/05/02 21:52:42 INFO SparkContext: Starting job: take at DStream.scala:593
 14/05/02 21:52:42 INFO DAGScheduler: Got job 2 (take at DStream.scala:593) 
 with 1 output partitions (allowLocal=true)
 14/05/02 21:52:42 INFO DAGScheduler: Final stage: Stage 3(take at 
 DStream.scala:593)
 14/05/02 21:52:42 INFO DAGScheduler: Parents of final stage: List()
 14/05/02 21:52:42 INFO 

what's local[n]

2014-05-03 Thread Weide Zhang
in spark kafka example,

it says

   `./bin/run-example org.apache.spark.streaming.examples.KafkaWordCount
local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1`

can any one tell me what does local[2] represent ? i thought master url
should be sth like spark://hostname:portname .

also, the thread count is specified as 1 in kafka example, what will happen
thread count goes to more than 1 ? does that mean multiple kafka consumer
will be created on different spark workers ? I'm not sure how does the code
mapped to realtime worker thread allocation ? Is there any documentation on
that ?

Weide


Re: Crazy Kryo Exception

2014-05-03 Thread Soren Macbeth
Poking around in the bowels of scala, it seems like this has something to
do with implicit scala - java collection munging. Why would it be doing
this and where? The stack trace given is entirely unhelpful to me. Is there
a better one buried in my task logs? None of my tasks actually failed, so
it seems that it dying while trying to fetch results from my tasks to
return back to the driver.

Am I close?


On Fri, May 2, 2014 at 3:35 PM, Soren Macbeth so...@yieldbot.com wrote:

 Hallo,

 I've getting this rather crazy kryo exception trying to run my spark job:

 Exception in thread main org.apache.spark.SparkException: Job aborted:
 Exception while deserializing and fetching task:
 com.esotericsoftware.kryo.KryoException:
 java.lang.IllegalArgumentException: Can not set final
 scala.collection.convert.Wrappers field
 scala.collection.convert.Wrappers$SeqWrapper.$outer to my.custom.class
 Serialization trace:
 $outer (scala.collection.convert.Wrappers$SeqWrapper)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 I have a kryo serializer for my.custom.class and I've registered it using
 a custom registrator on my context object. I've tested the custom
 serializer and the registrator locally and they both function as expected.
 This job is running spark 0.9.1 under mesos in fine grained mode.

 Please help!



Re: what's local[n]

2014-05-03 Thread Andrew Ash
Hi Weide,

The answer to your first question about local[2] can be found in the
Running the Examples and Shell section of
https://spark.apache.org/docs/latest/

Note that all of the sample programs take a master parameter specifying
 the cluster URL to connect to. This can be a URL for a distributed 
 clusterhttps://spark.apache.org/docs/latest/scala-programming-guide.html#master-urls,
 or local to run locally with one thread, or local[N] to run locally with
 N threads. You should start by using local for testing.


For the second, I'm not sure how the thread count affects the number of
kafka consumers.  My first guess would be that a thread is 1:1 with a kafka
consumer, but I'm not super familiar with Spark streaming.

Andrew



On Sat, May 3, 2014 at 7:00 AM, Weide Zhang weo...@gmail.com wrote:

 in spark kafka example,

 it says

`./bin/run-example org.apache.spark.streaming.examples.KafkaWordCount
 local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1`

 can any one tell me what does local[2] represent ? i thought master url
 should be sth like spark://hostname:portname .

 also, the thread count is specified as 1 in kafka example, what will
 happen thread count goes to more than 1 ? does that mean multiple kafka
 consumer will be created on different spark workers ? I'm not sure how does
 the code mapped to realtime worker thread allocation ? Is there any
 documentation on that ?

 Weide



Lease Exception hadoop 2.4

2014-05-03 Thread Andre Kuhnen
Hello, I am getting this warning after upgrading Hadoop 2.4, when I try to
write something to the HDFS.   The content is written correctly, but I do
not like this warning.

DO I have to compile SPARK with hadoop 2.4?

WARN TaskSetManager: Loss was due to org.apache.hadoop.ipc.RemoteException
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException)

thanks


Spark-1.0.0-rc3 compiled against Hadoop 2.3.0 cannot read HDFS 2.3.0?

2014-05-03 Thread Nan Zhu
Hi, all 

I compiled the Spark-1.0.0-rc3 against Hadoop 2.3.0 with 

SPARK_HADOOP_VERSION=2.3.0 sbt/sbt assembly

and deploy the spark with HDFS 2.3.0 (yarn = false), it seems that it cannot 
read data from HDFS

the following exception is thrown

java.lang.VerifyError: class 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$CreateSnapshotRequestProto
 overrides final method 
getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet; 
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2531)
at java.lang.Class.privateGetPublicMethods(Class.java:2651)
at java.lang.Class.privateGetPublicMethods(Class.java:2661)
at java.lang.Class.getMethods(Class.java:1467)
at sun.misc.ProxyGenerator.generateClassFile(ProxyGenerator.java:426)
at sun.misc.ProxyGenerator.generateProxyClass(ProxyGenerator.java:323)
at java.lang.reflect.Proxy.getProxyClass0(Proxy.java:636)
at java.lang.reflect.Proxy.newProxyInstance(Proxy.java:722)
at org.apache.hadoop.ipc.ProtobufRpcEngine.getProxy(ProtobufRpcEngine.java:92)
at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:537)
at 
org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:334)
at 
org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:241)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:141)
at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:569)
at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:512)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:142)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2316)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:90)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2350)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2332)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:369)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:221)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1085)
at org.apache.spark.rdd.RDD.count(RDD.scala:836)
at $iwC$$iwC$$iwC$$iwC.init(console:15)
at $iwC$$iwC$$iwC.init(console:20)
at $iwC$$iwC.init(console:22)
at $iwC.init(console:24)
at init(console:26)
at .init(console:30)
at .clinit(console)
at .init(console:7)
at .clinit(console)
at $print(console)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611)
at 

Re: sbt/sbt run command returns a JVM problem

2014-05-03 Thread Michael Armbrust
The problem is probably not with the JVM running sbt but with the one that
sbt is forking to run your program.

See here for the relevant option:
https://github.com/apache/spark/blob/master/project/SparkBuild.scala#L186

You might try starting sbt with no arguments (to bring up the sbt console).
 You can then set javaOptions += -Xmx1G and afterwards try run.

Michael


On Sat, May 3, 2014 at 5:15 AM, Carter gyz...@hotmail.com wrote:

 Hi, thanks for all your help.
 I tried your setting in the sbt file, but the problem is still there.

 The Java setting in my sbt file is:
 java \
   -Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=256m \
   -jar ${JAR} \
   $@

 I have tried to set these 3 parameters bigger and smaller, but nothing
 works. Did I change the right thing?

 Thank you very much.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/sbt-sbt-run-command-returns-a-JVM-problem-tp5157p5267.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Multiple Streams with Spark Streaming

2014-05-03 Thread Chris Fregly
if you want to use true Spark Streaming (not the same as Hadoop
Streaming/Piping, as Mayur pointed out), you can use the DStream.union()
method as described in the following docs:

http://spark.apache.org/docs/0.9.1/streaming-custom-receivers.html
http://spark.apache.org/docs/0.9.1/streaming-programming-guide.html

our friend, diana carroll, from cloudera recently posted a nice little
utility for sending files to a Spark Streaming Receiver to simulate a
streaming scenario from disk.

here's the link to her post:
http://apache-spark-user-list.1001560.n3.nabble.com/streaming-code-to-simulate-a-network-socket-data-source-tc3431.html

-chris

On Thu, May 1, 2014 at 3:09 AM, Mayur Rustagi mayur.rust...@gmail.comwrote:

 File as a stream?
 I think you are confusing Spark Streaming with buffer reader. Spark
 streaming is meant to process batches of data (files, packets, messages) as
 they come in, infact utilizing time of packet reception as a way to create
 windows etc.

 In your case you are better off reading the file, partitioning it 
 operating on each column individually if that makes more sense to you.



 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Thu, May 1, 2014 at 3:24 PM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:

 Hi all,

 Is it possible to read and process multiple streams with spark. I have
 eeg(brain waves) csv file with 23 columns  Each column is one stream(wave)
 and each column has one million values.

 I know one way to do it is to take transpose of the file and then give it
 to spark and each mapper will get one or more waves out of the 23 waves,
 but then it will be non-streaming problem and I want to read the file as
 stream. Please correct me if I am wrong.

 I have to apply simple operations(mean and SD) on each window of a wave.

 Regards,
 Laeeq






Re: Reading multiple S3 objects, transforming, writing back one

2014-05-03 Thread Chris Fregly
not sure if this directly addresses your issue, peter, but it's worth
mentioned a handy AWS EMR utility called s3distcp that can upload a single
HDFS file - in parallel - to a single, concatenated S3 file once all the
partitions are uploaded.  kinda cool.

here's some info:

http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/UsingEMR_s3distcp.html


s3distcp is an extension of the familiar hadoop distcp, of course.


On Thu, May 1, 2014 at 11:41 AM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 The fastest way to save to S3 should be to leave the RDD with many
 partitions, because all partitions will be written out in parallel.

 Then, once the various parts are in S3, somehow concatenate the files
 together into one file.

 If this can be done within S3 (I don't know if this is possible), then you
 get the best of both worlds: a highly parallelized write to S3, and a
 single cleanly named output file.


 On Thu, May 1, 2014 at 12:52 PM, Peter thenephili...@yahoo.com wrote:

 Thank you Patrick.

 I took a quick stab at it:

 val s3Client = new AmazonS3Client(...)
 val copyObjectResult = s3Client.copyObject(upload, outputPrefix +
 /part-0, rolled-up-logs, 2014-04-28.csv)
 val objectListing = s3Client.listObjects(upload, outputPrefix)
 s3Client.deleteObjects(new
 DeleteObjectsRequest(upload).withKeys(objectListing.getObjectSummaries.asScala.map(s
 = new KeyVersion(s.getKey)).asJava))

  Using a 3GB object I achieved about 33MB/s between buckets in the same
 AZ.

 This is a workable solution for the short term but not ideal for the
 longer term as data size increases. I understand it's a limitation of the
 Hadoop API but ultimately it must be possible to dump a RDD to a single S3
 object :)

   On Wednesday, April 30, 2014 7:01 PM, Patrick Wendell 
 pwend...@gmail.com wrote:
  This is a consequence of the way the Hadoop files API works. However,
 you can (fairly easily) add code to just rename the file because it
 will always produce the same filename.

 (heavy use of pseudo code)

 dir = /some/dir
 rdd.coalesce(1).saveAsTextFile(dir)
 f = new File(dir + part-0)
 f.moveTo(somewhere else)
 dir.remove()

 It might be cool to add a utility called `saveAsSingleFile` or
 something that does this for you. In fact probably we should have
 called saveAsTextfile saveAsTextFiles to make it more clear...

 On Wed, Apr 30, 2014 at 2:00 PM, Peter thenephili...@yahoo.com wrote:
  Thanks Nicholas, this is a bit of a shame, not very practical for log
 roll
  up for example when every output needs to be in it's own directory.
  On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas
  nicholas.cham...@gmail.com wrote:
  Yes, saveAsTextFile() will give you 1 part per RDD partition. When you
  coalesce(1), you move everything in the RDD to a single partition, which
  then gives you 1 output file.
  It will still be called part-0 or something like that because that's
  defined by the Hadoop API that Spark uses for reading to/writing from
 S3. I
  don't know of a way to change that.
 
 
  On Wed, Apr 30, 2014 at 2:47 PM, Peter thenephili...@yahoo.com wrote:
 
  Ah, looks like RDD.coalesce(1) solves one part of the problem.
  On Wednesday, April 30, 2014 11:15 AM, Peter thenephili...@yahoo.com
  wrote:
  Hi
 
  Playing around with Spark  S3, I'm opening multiple objects (CSV files)
  with:
 
 val hfile = sc.textFile(s3n://bucket/2014-04-28/)
 
  so hfile is a RDD representing 10 objects that were underneath
 2014-04-28.
  After I've sorted and otherwise transformed the content, I'm trying to
 write
  it back to a single object:
 
 
 
 sortedMap.values.map(_.mkString(,)).saveAsTextFile(s3n://bucket/concatted.csv)
 
  unfortunately this results in a folder named concatted.csv with 10
 objects
  underneath, part-0 .. part-00010, corresponding to the 10 original
  objects loaded.
 
  How can I achieve the desired behaviour of putting a single object named
  concatted.csv ?
 
  I've tried 0.9.1 and 1.0.0-RC3.
 
  Thanks!
  Peter
 
 
 
 
 
 
 






Re: Reading multiple S3 objects, transforming, writing back one

2014-05-03 Thread Patrick Wendell
Hi Peter,

If your dataset is large (3GB) then why not just chunk it into
multiple files? You'll need to do this anyways if you want to
read/write this from S3 quickly, because S3's throughput per file is
limited (as you've seen).

This is exactly why the Hadoop API lets you save datasets with many
partitions, since often there are bottlenecks at the granularity of a
file.

Is there a reason you need this to be exactly one file?

- Patrick

On Sat, May 3, 2014 at 4:14 PM, Chris Fregly ch...@fregly.com wrote:
 not sure if this directly addresses your issue, peter, but it's worth
 mentioned a handy AWS EMR utility called s3distcp that can upload a single
 HDFS file - in parallel - to a single, concatenated S3 file once all the
 partitions are uploaded.  kinda cool.

 here's some info:

 http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/UsingEMR_s3distcp.html

 s3distcp is an extension of the familiar hadoop distcp, of course.


 On Thu, May 1, 2014 at 11:41 AM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:

 The fastest way to save to S3 should be to leave the RDD with many
 partitions, because all partitions will be written out in parallel.

 Then, once the various parts are in S3, somehow concatenate the files
 together into one file.

 If this can be done within S3 (I don't know if this is possible), then you
 get the best of both worlds: a highly parallelized write to S3, and a single
 cleanly named output file.


 On Thu, May 1, 2014 at 12:52 PM, Peter thenephili...@yahoo.com wrote:

 Thank you Patrick.

 I took a quick stab at it:

 val s3Client = new AmazonS3Client(...)
 val copyObjectResult = s3Client.copyObject(upload, outputPrefix +
 /part-0, rolled-up-logs, 2014-04-28.csv)
 val objectListing = s3Client.listObjects(upload, outputPrefix)
 s3Client.deleteObjects(new
 DeleteObjectsRequest(upload).withKeys(objectListing.getObjectSummaries.asScala.map(s
 = new KeyVersion(s.getKey)).asJava))

 Using a 3GB object I achieved about 33MB/s between buckets in the same
 AZ.

 This is a workable solution for the short term but not ideal for the
 longer term as data size increases. I understand it's a limitation of the
 Hadoop API but ultimately it must be possible to dump a RDD to a single S3
 object :)

 On Wednesday, April 30, 2014 7:01 PM, Patrick Wendell
 pwend...@gmail.com wrote:
 This is a consequence of the way the Hadoop files API works. However,
 you can (fairly easily) add code to just rename the file because it
 will always produce the same filename.

 (heavy use of pseudo code)

 dir = /some/dir
 rdd.coalesce(1).saveAsTextFile(dir)
 f = new File(dir + part-0)
 f.moveTo(somewhere else)
 dir.remove()

 It might be cool to add a utility called `saveAsSingleFile` or
 something that does this for you. In fact probably we should have
 called saveAsTextfile saveAsTextFiles to make it more clear...

 On Wed, Apr 30, 2014 at 2:00 PM, Peter thenephili...@yahoo.com wrote:
  Thanks Nicholas, this is a bit of a shame, not very practical for log
  roll
  up for example when every output needs to be in it's own directory.
  On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas
  nicholas.cham...@gmail.com wrote:
  Yes, saveAsTextFile() will give you 1 part per RDD partition. When you
  coalesce(1), you move everything in the RDD to a single partition,
  which
  then gives you 1 output file.
  It will still be called part-0 or something like that because
  that's
  defined by the Hadoop API that Spark uses for reading to/writing from
  S3. I
  don't know of a way to change that.
 
 
  On Wed, Apr 30, 2014 at 2:47 PM, Peter thenephili...@yahoo.com wrote:
 
  Ah, looks like RDD.coalesce(1) solves one part of the problem.
  On Wednesday, April 30, 2014 11:15 AM, Peter thenephili...@yahoo.com
  wrote:
  Hi
 
  Playing around with Spark  S3, I'm opening multiple objects (CSV
  files)
  with:
 
 val hfile = sc.textFile(s3n://bucket/2014-04-28/)
 
  so hfile is a RDD representing 10 objects that were underneath
  2014-04-28.
  After I've sorted and otherwise transformed the content, I'm trying to
  write
  it back to a single object:
 
 
 
  sortedMap.values.map(_.mkString(,)).saveAsTextFile(s3n://bucket/concatted.csv)
 
  unfortunately this results in a folder named concatted.csv with 10
  objects
  underneath, part-0 .. part-00010, corresponding to the 10 original
  objects loaded.
 
  How can I achieve the desired behaviour of putting a single object
  named
  concatted.csv ?
 
  I've tried 0.9.1 and 1.0.0-RC3.
 
  Thanks!
  Peter
 
 
 
 
 
 
 






Re: Setting the Scala version in the EC2 script?

2014-05-03 Thread Patrick Wendell
Spark will only work with Scala 2.10... are you trying to do a minor
version upgrade or upgrade to a totally different version?

You could do this as follows if you want:
1. Fork the spark-ec2 repository and change the file here:
https://github.com/mesos/spark-ec2/blob/v2/scala/init.sh
2. Modify your spark-ec2.py script to checkout spark-ec2 from forked version.

- Patrick

On Thu, May 1, 2014 at 2:14 PM, Ian Ferreira ianferre...@hotmail.com wrote:
 Is this possible, it is very annoying to have such a great script, but still
 have to manually update stuff afterwards.


Re: when to use broadcast variables

2014-05-03 Thread Patrick Wendell
Broadcast variables need to fit entirely in memory - so that's a
pretty good litmus test for whether or not to broadcast a smaller
dataset or turn it into an RDD.

On Fri, May 2, 2014 at 7:50 AM, Prashant Sharma scrapco...@gmail.com wrote:
 I had like to be corrected on this but I am just trying to say small enough
 of the order of few 100 MBs. Imagine the size gets shipped to all nodes, it
 can be a GB but not GBs and then depends on the network too.

 Prashant Sharma


 On Fri, May 2, 2014 at 6:42 PM, Diana Carroll dcarr...@cloudera.com wrote:

 Anyone have any guidance on using a broadcast variable to ship data to
 workers vs. an RDD?

 Like, say I'm joining web logs in an RDD with user account data.  I could
 keep the account data in an RDD or if it's small, a broadcast variable
 instead.  How small is small?  Small enough that I know it can easily fit in
 memory on a single node?  Some other guideline?

 Thanks!

 Diana




Re: performance improvement on second operation...without caching?

2014-05-03 Thread Matei Zaharia
Hi Diana,

Apart from these reasons, in a multi-stage job, Spark saves the map output 
files from map stages to the filesystem, so it only needs to rerun the last 
reduce stage. This is why you only saw one stage executing. These files are 
saved for fault recovery but they speed up subsequent runs.

Matei

On May 3, 2014, at 5:21 PM, Patrick Wendell pwend...@gmail.com wrote:

 Ethan,
 
 What you said is actually not true, Spark won't cache RDD's unless you ask it 
 to.
 
 The observation here - that running the same job can speed up substantially 
 even without caching - is common. This is because other components in the 
 stack are performing caching and optimizations. Two that can make a huge 
 difference are:
 
 1. The OS buffer cache. Which will keep recently read disk blocks in memory.
 2. The Java just-in-time compiler (JIT) which will use runtime profiling to 
 significantly speed up execution speed.
 
 These can make a huge difference if you are running the same job 
 over-and-over. And there are other things like the OS network stack 
 increasing TCP windows and so fourth. These will all improve response time as 
 a spark program executes.
 
 
 On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett esjew...@gmail.com wrote:
 I believe Spark caches RDDs it has memory for regardless of whether you 
 actually call the 'cache' method on the RDD. The 'cache' method just tips off 
 Spark that the RDD should have higher priority. At least, that is my 
 experience and it seems to correspond with your experience and with my 
 recollection of other discussions on this topic on the list. However, going 
 back and looking at the programming guide, this is not the way the 
 cache/persist behavior is described. Does the guide need to be updated?
 
 
 On Fri, May 2, 2014 at 9:04 AM, Diana Carroll dcarr...@cloudera.com wrote:
 I'm just Posty McPostalot this week, sorry folks! :-)
 
 Anyway, another question today:
 I have a bit of code that is pretty time consuming (pasted at the end of the 
 message):
 It reads in a bunch of XML files, parses them, extracts some data in a map, 
 counts (using reduce), and then sorts.   All stages are executed when I do a 
 final operation (take).  The first stage is the most expensive: on first run 
 it takes 30s to a minute.
 
 I'm not caching anything.
 
 When I re-execute that take at the end, I expected it to re-execute all the 
 same stages, and take approximately the same amount of time, but it didn't.  
 The second take executes only a single stage which collectively run very 
 fast: the whole operation takes less than 1 second (down from 5 minutes!)
 
 While this is awesome (!) I don't understand it.  If I'm not caching data, 
 why would I see such a marked performance improvement on subsequent execution?
 
 (or is this related to the known .9.1 bug about sortByKey executing an action 
 when it shouldn't?)
 
 Thanks,
 Diana
 sparkdev_04-23_KEEP_FOR_BUILDS.png
 
 # load XML files containing device activation records.
 # Find the most common device models activated
 import xml.etree.ElementTree as ElementTree
 
 # Given a partition containing multi-line XML, parse the contents. 
 # Return an iterator of activation Elements contained in the partition
 def getactivations(fileiterator):
 s = ''
 for i in fileiterator: s = s + str(i)
 filetree = ElementTree.fromstring(s)
 return filetree.getiterator('activation')
 
 # Get the model name from a device activation record
 def getmodel(activation):
 return activation.find('model').text 
 
 filename=hdfs://localhost/user/training/activations/*.xml
 
 # parse each partition as a file into an activation XML record
 activations = sc.textFile(filename)
 activationTrees = activations.mapPartitions(lambda xml: getactivations(xml))
 models = activationTrees.map(lambda activation: getmodel(activation))
 
 # count and sort activations by model
 topmodels = models.map(lambda model: (model,1))\
 .reduceByKey(lambda v1,v2: v1+v2)\
 .map(lambda (model,count): (count,model))\
 .sortByKey(ascending=False)
 
 # display the top 10 models
 for (count,model) in topmodels.take(10):
 print Model %s (%s) % (model,count)
 
 # repeat!
 for (count,model) in topmodels.take(10):
 print Model %s (%s) % (model,count)
 
 
 



Re: Spark's behavior

2014-05-03 Thread Andrew Ash
Hi Eduardo,

Yep those machines look pretty well synchronized at this point.  Just
wanted to throw that out there and eliminate it as a possible source of
confusion.

Good luck on continuing the debugging!
Andrew


On Sat, May 3, 2014 at 11:59 AM, Eduardo Costa Alfaia 
e.costaalf...@unibs.it wrote:

 Hi TD,

 I did a test with 8 workers and 1 word source, the time gap was 27 sec,
 how can see in the log files(in attach).

 Hi Andrew,
 I configured the ntp, all machines are synchronized.
 root@computer8:/opt/unibs_test/spark-1.0RC3# for num in
 {1,8,10,11,13,15,16,18,22}; do ssh computer$num date; done
 Sat May  3 20:57:41 CEST 2014
 Sat May  3 20:57:41 CEST 2014
 Sat May  3 20:57:41 CEST 2014
 Sat May  3 20:57:42 CEST 2014
 Sat May  3 20:57:42 CEST 2014
 Sat May  3 20:57:42 CEST 2014
 Sat May  3 20:57:42 CEST 2014
 Sat May  3 20:57:42 CEST 2014
 Sat May  3 20:57:42 CEST 2014


 Informativa sulla Privacy: http://www.unibs.it/node/8155




 On May 3, 2014, at 15:46, Eduardo Costa Alfaia e.costaalf...@unibs.it
 wrote:

 Hi TD, Thanks for reply
 This last experiment I did with one computer, like local, but I think that
 time gap grow up when I add more computer. I will do again now with 8
 worker and 1 word source and I will see what’s go on. I will control the
 time too, like suggested by Andrew.
 On May 3, 2014, at 1:19, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 From the logs, I see that the print() starts printing stuff 10 seconds
 after the context is started. And that 10 seconds is taken by the initial
 empty job (50 map + 20 reduce tasks) that spark streaming starts to ensure
 all the executors have started. Somehow the first empty task takes 7-8
 seconds to complete. See if this can be reproduced by running a simple,
 empty job in spark shell (in the same cluster) and see if the first task
 takes 7-8 seconds.

 Either way, I didnt see the 30 second gap, but a 10 second gap. And that
 does not seem to be a persistent problem as after that 10 seconds, the data
 is being received and processed.

 TD


 On Fri, May 2, 2014 at 2:14 PM, Eduardo Costa Alfaia 
 e.costaalf...@unibs.it wrote:

 Hi TD,

 I got the another information today using Spark 1.0 RC3 and the situation
 remain the same:
 PastedGraphic-1.png

 The lines begin after 17 sec:

 14/05/02 21:52:25 INFO SparkDeploySchedulerBackend: Granted executor ID
 app-20140502215225-0005/0 on hostPort computer8.ant-net:57229 with 2 cores,
 2.0 GB RAM
 14/05/02 21:52:25 INFO AppClient$ClientActor: Executor updated:
 app-20140502215225-0005/0 is now RUNNING
 14/05/02 21:52:25 INFO ReceiverTracker: ReceiverTracker started
 14/05/02 21:52:26 INFO ForEachDStream: metadataCleanupDelay = -1
 14/05/02 21:52:26 INFO SocketInputDStream: metadataCleanupDelay = -1
 14/05/02 21:52:26 INFO SocketInputDStream: Slide time = 1000 ms
 14/05/02 21:52:26 INFO SocketInputDStream: Storage level =
 StorageLevel(false, false, false, false, 1)
 14/05/02 21:52:26 INFO SocketInputDStream: Checkpoint interval = null
 14/05/02 21:52:26 INFO SocketInputDStream: Remember duration = 1000 ms
 14/05/02 21:52:26 INFO SocketInputDStream: Initialized and validated
 org.apache.spark.streaming.dstream.SocketInputDStream@5433868e
 14/05/02 21:52:26 INFO ForEachDStream: Slide time = 1000 ms
 14/05/02 21:52:26 INFO ForEachDStream: Storage level =
 StorageLevel(false, false, false, false, 1)
 14/05/02 21:52:26 INFO ForEachDStream: Checkpoint interval = null
 14/05/02 21:52:26 INFO ForEachDStream: Remember duration = 1000 ms
 14/05/02 21:52:26 INFO ForEachDStream: Initialized and validated
 org.apache.spark.streaming.dstream.ForEachDStream@1ebdbc05
 14/05/02 21:52:26 INFO SparkContext: Starting job: collect at
 ReceiverTracker.scala:270
 14/05/02 21:52:26 INFO RecurringTimer: Started timer for JobGenerator at
 time 1399060346000
 14/05/02 21:52:26 INFO JobGenerator: Started JobGenerator at
 1399060346000 ms
 14/05/02 21:52:26 INFO JobScheduler: Started JobScheduler
 14/05/02 21:52:26 INFO DAGScheduler: Registering RDD 3 (reduceByKey at
 ReceiverTracker.scala:270)
 14/05/02 21:52:26 INFO ReceiverTracker: Stream 0 received 0 blocks
 14/05/02 21:52:26 INFO DAGScheduler: Got job 0 (collect at
 ReceiverTracker.scala:270) with 20 output partitions (allowLocal=false)
 14/05/02 21:52:26 INFO DAGScheduler: Final stage: Stage 0(collect at
 ReceiverTracker.scala:270)
 14/05/02 21:52:26 INFO DAGScheduler: Parents of final stage: List(Stage 1)
 14/05/02 21:52:26 INFO JobScheduler: Added jobs for time 1399060346000 ms
 14/05/02 21:52:26 INFO JobScheduler: Starting job streaming job
 1399060346000 ms.0 from job set of time 1399060346000 ms
 14/05/02 21:52:26 INFO JobGenerator: Checkpointing graph for time
 1399060346000 ms
 ---14/05/02 21:52:26 INFO
 DStreamGraph: Updating checkpoint data for time 1399060346000 ms

 Time: 1399060346000 ms
 ---

 14/05/02 21:52:26 INFO JobScheduler: Finished job streaming job
 1399060346000 

Re: help me

2014-05-03 Thread Chris Fregly
as Mayur indicated, it's odd that you are seeing better performance from a
less-local configuration.  however, the non-deterministic behavior that you
describe is likely caused by GC pauses in your JVM process.

take note of the *spark.locality.wait* configuration parameter described
here: http://spark.apache.org/docs/latest/configuration.html

this is the amount of time the Spark execution engine waits before
launching a new task on a less-data-local node (ie. process - node -
rack).  by default, this is 3 seconds.

if there is excessive GC occurring on the original process-local JVM, it is
possible that another node-local JVM process could actually load the data
from HDFS (on the same node) and complete the processing before the
original process's GC finishes.

you could bump up the *spark.locality.wait* default (not recommended) or
increase your number of nodes/partitions to increase parallelism and reduce
hotspots.

also, keep an eye on your GC characteristics.  perhaps you need to increase
your Eden size to reduce the amount of movement through the GC generations
and reduce major compactions.  (the usual GC tuning fun.)

curious if others have experienced this behavior, as well?

-chris


On Fri, May 2, 2014 at 6:07 AM, Mayur Rustagi mayur.rust...@gmail.comwrote:

 Spark would be much faster on process_local instead of node_local.
 Node_local references data from local harddisk, process_local references
 data from in-memory thread.

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Tue, Apr 22, 2014 at 4:45 PM, Joe L selme...@yahoo.com wrote:

 I got the following performance is it normal in spark to be like this.
 some
 times spark switchs into node_local mode from process_local and it becomes
 10x faster. I am very confused.

 scala val a = sc.textFile(/user/exobrain/batselem/LUBM1000)
 scala f.count()

 Long = 137805557
 took 130.809661618 s




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/help-me-tp4598.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.





spark run issue

2014-05-03 Thread Weide Zhang
Hi I'm trying to run the kafka-word-count example in spark2.9.1. I
encountered some exception when initialize kafka consumer/producer config.
I'm using scala 2.10.3 and used maven build inside spark streaming kafka
library comes with spark2.9.1. Any one see this exception before?

Thanks,

producer:
 [java] The args attribute is deprecated. Please use nested arg
elements.
 [java] Exception in thread main java.lang.NoClassDefFoundError:
scala/Tuple2$mcLL$sp
 [java] at
kafka.producer.ProducerConfig.init(ProducerConfig.scala:56)
 [java] at
com.turn.apache.KafkaWordCountProducer$.main(HelloWorld.scala:89)
 [java] at
com.turn.apache.KafkaWordCountProducer.main(HelloWorld.scala)
 [java] Caused by: java.lang.ClassNotFoundException:
scala.Tuple2$mcLL$sp
 [java] at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
 [java] at java.security.AccessController.doPrivileged(Native
Method)
 [java] at
java.net.URLClassLoader.findClass(URLClassLoader.java:190)
 [java] at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
 [java] at
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
 [java] at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
 [java] ... 3 more
 [java] Java Result: 1


Re: spark run issue

2014-05-03 Thread Weide Zhang
Hi Tathagata,

I figured out the reason. I was adding a wrong kafka lib along side with
the version spark uses. Sorry for spamming.

Weide


On Sat, May 3, 2014 at 7:04 PM, Tathagata Das
tathagata.das1...@gmail.comwrote:

 I am a little confused about the version of Spark you are using. Are you
 using Spark 0.9.1 that uses scala 2.10.3 ?

 TD


 On Sat, May 3, 2014 at 6:16 PM, Weide Zhang weo...@gmail.com wrote:

 Hi I'm trying to run the kafka-word-count example in spark2.9.1. I
 encountered some exception when initialize kafka consumer/producer config.
 I'm using scala 2.10.3 and used maven build inside spark streaming kafka
 library comes with spark2.9.1. Any one see this exception before?

 Thanks,

 producer:
  [java] The args attribute is deprecated. Please use nested arg
 elements.
  [java] Exception in thread main java.lang.NoClassDefFoundError:
 scala/Tuple2$mcLL$sp
  [java] at
 kafka.producer.ProducerConfig.init(ProducerConfig.scala:56)
  [java] at
 com.turn.apache.KafkaWordCountProducer$.main(HelloWorld.scala:89)
  [java] at
 com.turn.apache.KafkaWordCountProducer.main(HelloWorld.scala)
  [java] Caused by: java.lang.ClassNotFoundException:
 scala.Tuple2$mcLL$sp
  [java] at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
  [java] at java.security.AccessController.doPrivileged(Native
 Method)
  [java] at
 java.net.URLClassLoader.findClass(URLClassLoader.java:190)
  [java] at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
  [java] at
 sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
  [java] at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
  [java] ... 3 more
  [java] Java Result: 1





Re: spark run issue

2014-05-03 Thread Weide Zhang
Hi Tathagata,

I actually have a separate question. What's the usage of lib_managed folder
inside spark source folder ? Are those the library required for spark
streaming to run ? Do they needed to be added to spark classpath when
starting sparking cluster?

Weide


On Sat, May 3, 2014 at 7:08 PM, Weide Zhang weo...@gmail.com wrote:

 Hi Tathagata,

 I figured out the reason. I was adding a wrong kafka lib along side with
 the version spark uses. Sorry for spamming.

 Weide


 On Sat, May 3, 2014 at 7:04 PM, Tathagata Das tathagata.das1...@gmail.com
  wrote:

 I am a little confused about the version of Spark you are using. Are you
 using Spark 0.9.1 that uses scala 2.10.3 ?

 TD


 On Sat, May 3, 2014 at 6:16 PM, Weide Zhang weo...@gmail.com wrote:

 Hi I'm trying to run the kafka-word-count example in spark2.9.1. I
 encountered some exception when initialize kafka consumer/producer config.
 I'm using scala 2.10.3 and used maven build inside spark streaming kafka
 library comes with spark2.9.1. Any one see this exception before?

 Thanks,

 producer:
  [java] The args attribute is deprecated. Please use nested arg
 elements.
  [java] Exception in thread main java.lang.NoClassDefFoundError:
 scala/Tuple2$mcLL$sp
  [java] at
 kafka.producer.ProducerConfig.init(ProducerConfig.scala:56)
  [java] at
 com.turn.apache.KafkaWordCountProducer$.main(HelloWorld.scala:89)
  [java] at
 com.turn.apache.KafkaWordCountProducer.main(HelloWorld.scala)
  [java] Caused by: java.lang.ClassNotFoundException:
 scala.Tuple2$mcLL$sp
  [java] at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
  [java] at java.security.AccessController.doPrivileged(Native
 Method)
  [java] at
 java.net.URLClassLoader.findClass(URLClassLoader.java:190)
  [java] at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
  [java] at
 sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
  [java] at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
  [java] ... 3 more
  [java] Java Result: 1






cache not work as expected for iteration?

2014-05-03 Thread Earthson
I'm using spark for LDA impementation. I need cache RDD for next step of
Gibbs Sampling, and cached the result and the cache previous could be
uncache. Something like LRU cache should delete the previous cache because
it is never used then, but the cache runs into confusion:

Here is the code:)
https://github.com/Earthson/sparklda/blob/master/src/main/scala/net/earthson/nlp/lda/lda.scala#L99
  

http://apache-spark-user-list.1001560.n3.nabble.com/file/n5292/sparklda_cache1.png
 

http://apache-spark-user-list.1001560.n3.nabble.com/file/n5292/sparklda_cache2.png
 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/cache-not-work-as-expected-for-iteration-tp5292.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: performance improvement on second operation...without caching?

2014-05-03 Thread Koert Kuipers
Hey Matei,
Not sure i understand that. These are 2 separate jobs. So the second job
takes advantage of the fact that there is map output left somewhere on disk
from the first job, and re-uses that?


On Sat, May 3, 2014 at 8:29 PM, Matei Zaharia matei.zaha...@gmail.comwrote:

 Hi Diana,

 Apart from these reasons, in a multi-stage job, Spark saves the map output
 files from map stages to the filesystem, so it only needs to rerun the last
 reduce stage. This is why you only saw one stage executing. These files are
 saved for fault recovery but they speed up subsequent runs.

 Matei

 On May 3, 2014, at 5:21 PM, Patrick Wendell pwend...@gmail.com wrote:

 Ethan,

 What you said is actually not true, Spark won't cache RDD's unless you ask
 it to.

 The observation here - that running the same job can speed up
 substantially even without caching - is common. This is because other
 components in the stack are performing caching and optimizations. Two that
 can make a huge difference are:

 1. The OS buffer cache. Which will keep recently read disk blocks in
 memory.
 2. The Java just-in-time compiler (JIT) which will use runtime profiling
 to significantly speed up execution speed.

 These can make a huge difference if you are running the same job
 over-and-over. And there are other things like the OS network stack
 increasing TCP windows and so fourth. These will all improve response time
 as a spark program executes.


 On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett esjew...@gmail.com wrote:

 I believe Spark caches RDDs it has memory for regardless of whether you
 actually call the 'cache' method on the RDD. The 'cache' method just tips
 off Spark that the RDD should have higher priority. At least, that is my
 experience and it seems to correspond with your experience and with my
 recollection of other discussions on this topic on the list. However, going
 back and looking at the programming guide, this is not the way the
 cache/persist behavior is described. Does the guide need to be updated?


 On Fri, May 2, 2014 at 9:04 AM, Diana Carroll dcarr...@cloudera.comwrote:

 I'm just Posty McPostalot this week, sorry folks! :-)

 Anyway, another question today:
 I have a bit of code that is pretty time consuming (pasted at the end of
 the message):
 It reads in a bunch of XML files, parses them, extracts some data in a
 map, counts (using reduce), and then sorts.   All stages are executed when
 I do a final operation (take).  The first stage is the most expensive: on
 first run it takes 30s to a minute.

 I'm not caching anything.

 When I re-execute that take at the end, I expected it to re-execute all
 the same stages, and take approximately the same amount of time, but it
 didn't.  The second take executes only a single stage which collectively
 run very fast: the whole operation takes less than 1 second (down from 5
 minutes!)

 While this is awesome (!) I don't understand it.  If I'm not caching
 data, why would I see such a marked performance improvement on subsequent
 execution?

 (or is this related to the known .9.1 bug about sortByKey executing an
 action when it shouldn't?)

 Thanks,
 Diana
 sparkdev_04-23_KEEP_FOR_BUILDS.png

 # load XML files containing device activation records.
 # Find the most common device models activated
 import xml.etree.ElementTree as ElementTree

 # Given a partition containing multi-line XML, parse the contents.
 # Return an iterator of activation Elements contained in the partition
 def getactivations(fileiterator):
 s = ''
 for i in fileiterator: s = s + str(i)
 filetree = ElementTree.fromstring(s)
 return filetree.getiterator('activation')

 # Get the model name from a device activation record
 def getmodel(activation):
 return activation.find('model').text

 filename=hdfs://localhost/user/training/activations/*.xml

 # parse each partition as a file into an activation XML record
 activations = sc.textFile(filename)
 activationTrees = activations.mapPartitions(lambda xml:
 getactivations(xml))
 models = activationTrees.map(lambda activation: getmodel(activation))

 # count and sort activations by model
 topmodels = models.map(lambda model: (model,1))\
 .reduceByKey(lambda v1,v2: v1+v2)\
 .map(lambda (model,count): (count,model))\
 .sortByKey(ascending=False)

 # display the top 10 models
 for (count,model) in topmodels.take(10):
 print Model %s (%s) % (model,count)

  # repeat!
 for (count,model) in topmodels.take(10):
 print Model %s (%s) % (model,count)







Re: sbt/sbt run command returns a JVM problem

2014-05-03 Thread Carter
Hi Michael,

Thank you very much for your reply.

Sorry I am not very familiar with sbt. Could you tell me where to set the
Java option for the sbt fork for my program? I brought up the sbt console,
and run set javaOptions += -Xmx1G in it, but it returned an error: 
[error] scala.tools.nsc.MissingRequirementError: object scala not found.
[error] Use 'last' for the full log.

Is this the right way to set the java option? Thank you very much.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sbt-sbt-run-command-returns-a-JVM-problem-tp5157p5294.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: performance improvement on second operation...without caching?

2014-05-03 Thread Matei Zaharia
Yes, this happens as long as you use the same RDD. For example say you do the 
following:

data1 = sc.textFile(…).map(…).reduceByKey(…)
data1.count()
data1.filter(…).count()

The first count() causes outputs of the map/reduce pair in there to be written 
out to shuffle files. Next time you do a count, on either this RDD or a child 
(e.g. after the filter), we notice that output files were already generated for 
this shuffle so we don’t rerun the map stage. Note that the output does get 
read again over the network, which is kind of wasteful (if you really wanted to 
reuse this as quickly as possible you’d use cache()).

Matei

On May 3, 2014, at 8:44 PM, Koert Kuipers ko...@tresata.com wrote:

 Hey Matei,
 Not sure i understand that. These are 2 separate jobs. So the second job 
 takes advantage of the fact that there is map output left somewhere on disk 
 from the first job, and re-uses that?
 
 
 On Sat, May 3, 2014 at 8:29 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 Hi Diana,
 
 Apart from these reasons, in a multi-stage job, Spark saves the map output 
 files from map stages to the filesystem, so it only needs to rerun the last 
 reduce stage. This is why you only saw one stage executing. These files are 
 saved for fault recovery but they speed up subsequent runs.
 
 Matei
 
 On May 3, 2014, at 5:21 PM, Patrick Wendell pwend...@gmail.com wrote:
 
 Ethan,
 
 What you said is actually not true, Spark won't cache RDD's unless you ask 
 it to.
 
 The observation here - that running the same job can speed up substantially 
 even without caching - is common. This is because other components in the 
 stack are performing caching and optimizations. Two that can make a huge 
 difference are:
 
 1. The OS buffer cache. Which will keep recently read disk blocks in memory.
 2. The Java just-in-time compiler (JIT) which will use runtime profiling to 
 significantly speed up execution speed.
 
 These can make a huge difference if you are running the same job 
 over-and-over. And there are other things like the OS network stack 
 increasing TCP windows and so fourth. These will all improve response time 
 as a spark program executes.
 
 
 On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett esjew...@gmail.com wrote:
 I believe Spark caches RDDs it has memory for regardless of whether you 
 actually call the 'cache' method on the RDD. The 'cache' method just tips 
 off Spark that the RDD should have higher priority. At least, that is my 
 experience and it seems to correspond with your experience and with my 
 recollection of other discussions on this topic on the list. However, going 
 back and looking at the programming guide, this is not the way the 
 cache/persist behavior is described. Does the guide need to be updated?
 
 
 On Fri, May 2, 2014 at 9:04 AM, Diana Carroll dcarr...@cloudera.com wrote:
 I'm just Posty McPostalot this week, sorry folks! :-)
 
 Anyway, another question today:
 I have a bit of code that is pretty time consuming (pasted at the end of the 
 message):
 It reads in a bunch of XML files, parses them, extracts some data in a map, 
 counts (using reduce), and then sorts.   All stages are executed when I do a 
 final operation (take).  The first stage is the most expensive: on first run 
 it takes 30s to a minute.
 
 I'm not caching anything.
 
 When I re-execute that take at the end, I expected it to re-execute all the 
 same stages, and take approximately the same amount of time, but it didn't.  
 The second take executes only a single stage which collectively run very 
 fast: the whole operation takes less than 1 second (down from 5 minutes!)
 
 While this is awesome (!) I don't understand it.  If I'm not caching data, 
 why would I see such a marked performance improvement on subsequent 
 execution?
 
 (or is this related to the known .9.1 bug about sortByKey executing an 
 action when it shouldn't?)
 
 Thanks,
 Diana
 sparkdev_04-23_KEEP_FOR_BUILDS.png
 
 # load XML files containing device activation records.
 # Find the most common device models activated
 import xml.etree.ElementTree as ElementTree
 
 # Given a partition containing multi-line XML, parse the contents. 
 # Return an iterator of activation Elements contained in the partition
 def getactivations(fileiterator):
 s = ''
 for i in fileiterator: s = s + str(i)
 filetree = ElementTree.fromstring(s)
 return filetree.getiterator('activation')
 
 # Get the model name from a device activation record
 def getmodel(activation):
 return activation.find('model').text 
 
 filename=hdfs://localhost/user/training/activations/*.xml
 
 # parse each partition as a file into an activation XML record
 activations = sc.textFile(filename)
 activationTrees = activations.mapPartitions(lambda xml: getactivations(xml))
 models = activationTrees.map(lambda activation: getmodel(activation))
 
 # count and sort activations by model
 topmodels = models.map(lambda model: (model,1))\
 .reduceByKey(lambda v1,v2: v1+v2)\