Re: Why collect() has a stage but first() not?

2014-02-19 Thread Aaron Davidson
first() is allowed to run locally, which means that the driver will
execute the action itself without launching any tasks. This is also true of
take(n) for sufficiently small n, for instance.


On Wed, Feb 19, 2014 at 9:55 AM, David Thomas dt5434...@gmail.com wrote:

 If I perform a 'collect' action on the RDD, I can see a new stage getting
 created in the spark web UI (http://master:4040/stages/), but when I do a
 'first' action, I don't see any stage getting created. However on the
 console I see these lines:

 14/02/19 10:51:31 INFO SparkContext: Starting job: first at xxx.scala:110
 14/02/19 10:51:31 INFO DAGScheduler: Got job 110 (first at xxx.scala:110)
 with 1 output partitions (allowLocal=true)
 14/02/19 10:51:31 INFO DAGScheduler: Final stage: Stage 2 (first at
 xxx.scala:110)
 14/02/19 10:51:31 INFO DAGScheduler: Parents of final stage: List()
 14/02/19 10:51:31 INFO DAGScheduler: Missing parents: List()

 So why doesn't the webUI list the stages created when I run the 'first'
 action?



Re: Why collect() has a stage but first() not?

2014-02-19 Thread Aaron Davidson
The driver does query for the first partition of the RDD using the
BlockManager. If the RDD is cached, the worker process that has the first
partition in memory will ship it back to the driver, and the driver will
iterate over it.


On Wed, Feb 19, 2014 at 9:59 AM, David Thomas dt5434...@gmail.com wrote:

 But my RDD is placed on the worker nodes. So how can driver perform the
 action by itself?


 On Wed, Feb 19, 2014 at 10:57 AM, Aaron Davidson ilike...@gmail.comwrote:

 first() is allowed to run locally, which means that the driver will
 execute the action itself without launching any tasks. This is also true of
 take(n) for sufficiently small n, for instance.


 On Wed, Feb 19, 2014 at 9:55 AM, David Thomas dt5434...@gmail.comwrote:

 If I perform a 'collect' action on the RDD, I can see a new stage
 getting created in the spark web UI (http://master:4040/stages/), but
 when I do a 'first' action, I don't see any stage getting created. However
 on the console I see these lines:

 14/02/19 10:51:31 INFO SparkContext: Starting job: first at xxx.scala:110
 14/02/19 10:51:31 INFO DAGScheduler: Got job 110 (first at
 xxx.scala:110) with 1 output partitions (allowLocal=true)
 14/02/19 10:51:31 INFO DAGScheduler: Final stage: Stage 2 (first at
 xxx.scala:110)
 14/02/19 10:51:31 INFO DAGScheduler: Parents of final stage: List()
 14/02/19 10:51:31 INFO DAGScheduler: Missing parents: List()

 So why doesn't the webUI list the stages created when I run the 'first'
 action?






Re: Using local[N] gets Too many open files?

2014-02-16 Thread Aaron Davidson
If you are intentionally opening many files at once and getting that error,
then it is a fixable OS issue. Please check out this discussion regarding
changing the file limit in /etc/limits.conf:
http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-td1464.html

If you feel that your job should not be opening so many files at a time,
then please give a little more detail about the nature of your job. A few
questions bear answering:

Are you using a standard Spark input method, such as sc.textFile()? This
should only have one open file per partition per core (so 8 concurrently in
your case).

Are you performing any sort of join or shuffle operation? This can create
intermediate shuffle or external sorting files. Shuffling an RDD into N
partitions will cause us to open N files at a time (per core), so that
could be up to 800k in your case. You can reduce this by shuffling your
100k input partitions into many fewer output partitions, assuming that each
file is actually small. This can be set as a parameter to any
shuffle-inducing operation.

If your job is using external sorting to avoid OOMing (which it will warn
you about in the executor logs with messages like Spilling in-memory
map...), then you may have arbitrarily many files open. This is very
unlikely to happen if you've split your input into as many files as you
said, though.


On Sun, Feb 16, 2014 at 6:18 PM, Matthew Cheah matthew.c.ch...@gmail.comwrote:

 Hi everyone,

 I'm experimenting with Spark in both a distributed environment and as a
 multi-threaded local application.

 When I set the spark master to local[8] and attempt to read a ~20GB text
 file on the local file system into an RDD and perform computations on it, I
 don't get an out of memory error, but rather a Too many open files error.
 Is there a reason why this happens? How aggressively is Spark partitioning
 the data into intermediate files?

 I have also tried splitting the text file into numerous text files -
 around 100,000 of them - and processing 10,000 of them at a time
 sequentially. However then Spark seems to get bottlenecked on reading each
 individual file into the RDD before proceeding with the computation. This
 has issues even reading 10,000 files at once. I would have thought that
 Spark could do I/O in parallel with computation, but it seems that Spark
 does all of the I/O first?

 I was wondering if Spark was simply just not built for local applications
 outside of testing.

 Thanks,

 -Matt Cheah



Re: [External] Re: Too many open files

2014-02-13 Thread Aaron Davidson
Note that you may just be changing the soft limit, and are still being
capped by the hard (system-wide) limit. Changing the /etc/limit.conf file
as specified above allows you to modify both the soft and hard limits, and
requires a restart of the machine to take effect.


On Thu, Feb 13, 2014 at 11:05 AM, Mayur Rustagi mayur.rust...@gmail.comwrote:

 Did you change your system wide limit?

 Mayur Rustagi
 Ph: +919632149971
 h https://twitter.com/mayur_rustagittp://www.sigmoidanalytics.com
 https://twitter.com/mayur_rustagi



 On Thu, Feb 13, 2014 at 10:48 AM, Korb, Michael [USA] 
 korb_mich...@bah.com wrote:

  My ulimit is already unlimited. Wouldn't setting it to 18000 be
 decreasing it? Could it be an issue other than ulimit? Because I can't
 increase it beyond unlimited.

  Thanks,
 Mike

   From: Mayur Rustagi mayur.rust...@gmail.com
 Reply-To: user@spark.incubator.apache.org 
 user@spark.incubator.apache.org
 Date: Thursday, February 13, 2014 12:58 PM
 To: user@spark.incubator.apache.org user@spark.incubator.apache.org
 Subject: Re: [External] Re: Too many open files

   Easiest is ulimit -n 18000 to your conf/spark-env.sh
 Restart the cluster.. make sure you change the limits at OS level as well.
 Regards
 Mayur

  Mayur Rustagi
 Ph: +919632149971
 h https://twitter.com/mayur_rustagittp://www.sigmoidanalytics.com
  https://twitter.com/mayur_rustagi



 On Thu, Feb 13, 2014 at 9:51 AM, Korb, Michael [USA] 
 korb_mich...@bah.com wrote:

  No I don't. I ran all Spark processes as a user with ulimit =
 unlimited.

   From: Mayur Rustagi mayur.rust...@gmail.com
 Reply-To: user@spark.incubator.apache.org 
 user@spark.incubator.apache.org
 Date: Thursday, February 13, 2014 12:34 PM
 To: user@spark.incubator.apache.org user@spark.incubator.apache.org
 Subject: [External] Re: Too many open files

   The limit could be on any of the machines(including the master). Do
 you have ganglia setup?

  Mayur Rustagi
 Ph: +919632149971
 h https://twitter.com/mayur_rustagittp://www.sigmoidanalytics.com
  https://twitter.com/mayur_rustagi



 On Thu, Feb 13, 2014 at 7:13 AM, Korb, Michael [USA] 
 korb_mich...@bah.com wrote:

  Hi,

  When I submit a job to a cluster, I get a large string of errors like
 this:

  WARN TaskSetManager: Loss was due to java.io.FileNotFoundException
 java.io.FileNotFoundException: /tmp/spark-local* (Too many open files)

  All the answers I can find say to increase ulimit, but I have it set
 to unlimited (as the user running the spark daemons as well as the user
 submitting the job) and am still getting the error.

  I'm attempting to create an RDD like this: 
 sc.textFile(/path/to/files/*).persist(StorageLevel.MEMORY_ONLY_SER()), and
 run a series of maps and filters on the data. There are about 2k files
 for a total of about 230g of data, and my current cluster is 3 nodes, 32
 cores each, with spark.executor.memory set to 32g. I've tried different
 StorageLevel settings but still have the same error. Interestingly, the job
 works if I write and submit with pyspark, but I want to get it working in
 Java.

  Thanks,
 Mike







Re: Shuffle file not found Exception

2014-02-09 Thread Aaron Davidson
This sounds bad, and probably related to shuffle file consolidation.
Turning off consolidation would probably get you working again, but I'd
really love to track down the bug. Do you know if any tasks fail before
those errors start occurring? It's very possible that another exception is
occurring which is causing a file to not be written -- I think I've seen
latent OOMEs induce this behavior, for instance.


On Sun, Feb 9, 2014 at 8:28 AM, Guillaume Pitel
guillaume.pi...@exensa.comwrote:

  Hi,

 I've got a strange problem with 0.8.1 (we're going to make the jump to
 0.9.0 in a few days, but for now I'm woring with a 0.8.1 cluster) : After a
 few iteration of my method, one random node of my local cluster throws an
 exception like that :

 FileNotFoundException:
 /sparktmp/spark-local-20140209073949-29b1/37/merged_shuffle_24_23_1 (No
 such file or directory)

 Then, either the job get stuck for hours, or it fails right away.

 I've got the ulimit at 131k files, and consolidateFiles=true, so I don't
 think it a problem related to the # of file descriptors

 Guillaume

 --
[image: eXenSa]
  *Guillaume PITEL, Président*
 +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53

  eXenSa S.A.S. http://www.exensa.com/
  41, rue Périer - 92120 Montrouge - FRANCE
 Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05

exensa_logo_mail.png

Re: Hash Join in Spark

2014-02-03 Thread Aaron Davidson
This method is doing very little. Line 2 constructs the CoGroupedRDD, which
will do all the real work. Note that while this cogroup function just
groups 2 RDDs together, CoGroupedRDD allows general n-way cogrouping, so it
takes a Seq[RDD(K, _)] rather than just 2 such key-value RDDs.

The rest of the code in this method is simply converting the result of
CoGroupedRDD back from its generalized form into an RDD[(K, Seq[V],
Seq[W])]. (CoGroupedRDD returns an RDD[(K, Seq[Seq[_]])] as there are n of
those Seq[_]s, one per grouping RDD.) To go over some of the finer points
of these remaining lines;

3. This line is actually not necessary, and is simply confusing. I have
submitted a small patch
https://github.com/apache/incubator-spark/pull/530to remove it.*

4. mapValues will iterate through the results of the CoGroupedRDD (i.e.,
the already-cogrouped values) in order to change the type of the return
value from the generic Seq[Seq[_]] to a (Seq[V], Seq[W]), since we know
each Seq has exactly 2 elements. The remainder of this line simply does the
casting from Seq[_] to Seq[V] or Seq[W] as appropriate.

* Here's a real explanation for line 3, in case you're curious about the
Scala magic that's going on. Normally, all RDDs that look like key-value
pairs (having a generic type of Tuple2, like [K, V]) are implicitly
convertedhttp://tomjefferys.blogspot.com/2011/11/implicit-conversions-in-scala.html
to
PairRDDFunctions, to provide extra functions that can operate over these
types of RDDs. For reasons slightly unclear, the author of this code chose
to forgo using the implicit conversion in favor of explicitly converting
the CoGroupedRDD into a PairRDDFunctions in order to gain access to the
mapValues method.


On Sun, Feb 2, 2014 at 8:47 PM, rose kunj rosek...@yahoo.com wrote:

 Since, my earlier question is still unanswered, I have decided to dig into
 the spark code myself. However, I am new to spark as well as scala in
 particular. Can some one help me understand the following code snippet:

 1. def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K,
 (Seq[V], Seq[W]))] = {
 2.val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
 3.val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classTag[K],
 ClassTags.seqSeqClassTag)
 4.   prfs.mapValues { case Seq(vs, ws) =
   (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
 5.}
 6. }


 Thanks,
 rose

   On Friday, January 24, 2014 4:32 PM, rose rosek...@yahoo.com wrote:
  Hi all,

 I want to know more about join operation in spark. I know it uses hash
 join,
 but I am not able to figure out the  nature of the implementation such as
 blocking, non blocking, or shared , not shared partitions.

 If anyone knows, please reply to this post along with the linkers of the
 implementation in the spark source files.

 Thanks,
 rose



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Hash-Join-in-Spark-tp873.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.





Re: Spark app gets slower as it gets executed more times

2014-02-02 Thread Aaron Davidson
Are you seeing any exceptions in between running apps? Does restarting the
master/workers actually cause Spark to speed back up again? It's possible,
for instance, that you run out of disk space, which should cause exceptions
but not go away by restarting the master/workers.

One thing to worry about is long-running jobs or shells. Currently, state
buildup of a single job in Spark *is* a problem, as certain state such as
shuffle files and RDD metadata is not cleaned up until the job (or shell)
exits. We have hacky ways to reduce this, and are working on a long term
solution. However, separate, consecutive jobs should be independent in
terms of performance.


On Sat, Feb 1, 2014 at 8:27 PM, 尹绪森 yinxu...@gmail.com wrote:

 Is your spark app an iterative one ? If so, your app is creating a big DAG
 in every iteration. You should use checkpoint it periodically, say, 10
 iterations one checkpoint.


 2014-02-01 Aureliano Buendia buendia...@gmail.com:

 Hi,

 I've noticed my spark app (on ec2) gets slower and slower as I repeatedly
 execute it.

 With a fresh ec2 cluster, it is snappy and takes about 15 mins to
 complete, after running the same app 4 times it gets slower and takes to 40
 mins and more.

 While the cluster gets slower, the monitoring metrics show less and less
 activities (almost no cpu, or io).

 When it gets slow, sometimes the number of running tasks (light blue in
 web ui progress bar) is zero, and only the number of completed tasks (dark
 blue) increments.

 Is this a known spark issue?

 Do I need to restart spark master and workers in between running apps?




 --
 Best Regards
 ---
 Xusen Yin尹绪森
 Beijing Key Laboratory of Intelligent Telecommunications Software and
 Multimedia
 Beijing University of Posts  Telecommunications
 Intel Labs China
 Homepage: *http://yinxusen.github.io/ http://yinxusen.github.io/*



Re: default parallelism in trunk

2014-02-02 Thread Aaron Davidson
Could you give an example where default parallelism is set to 2 where it
didn't used to be?

Here is the relevant section for the spark standalone mode:
CoarseGrainedSchedulerBackend.scala#L211https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L211.
If spark.default.parallelism is set, it will override anything else. If it
is not set, we will use the total number of cores in the cluster and 2,
which is the same logic that has been used since
spark-0.7https://github.com/apache/incubator-spark/blob/branch-0.7/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala#L156
.

Simplest possibility is that you're setting spark.default.parallelism,
otherwise there may be a bug introduced somewhere that isn't defaulting
correctly anymore.


On Sat, Feb 1, 2014 at 12:30 AM, Koert Kuipers ko...@tresata.com wrote:

 i just managed to upgrade my 0.9-SNAPSHOT from the last scala 2.9.x
 version to the latest.


 everything seems good except that my default parallelism is now set to 2
 for jobs instead of some smart number based on the number of cores (i think
 that is what it used to do). it this change on purpose?

 i am running spark standalone.

 thx, koert



Re: How to access global kryo instance?

2014-01-06 Thread Aaron Davidson
I see -- the answer is no, we do currently not use an object pool, but
instead just try to create it less frequently (typically one
SerializerInstance per partition). For instance, you could do

rdd.mapPartitions { partitionIterator =
  val kryo = SparkEnv.get.serializer.newKryo()
  partitionIterator.map(row = doWorkWithKryo(kryo, row))
}

This should amortize the cost greatly. The only requirement of an instance
is that it not be used by multiple threads simultaneously, and this fits
that requirement perfectly.


On Mon, Jan 6, 2014 at 6:59 PM, Aureliano Buendia buendia...@gmail.comwrote:




 On Tue, Jan 7, 2014 at 2:52 AM, Aaron Davidson ilike...@gmail.com wrote:

 Please take a look at the source code -- it's relatively friendly, and
 very useful for digging into Spark internals! 
 (KryoSerializerhttps://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
 )

 As you can see, a Kryo instance is available via ser.newKryo(). You can
 also use Spark's SerializerInstance interface (which features serialize()
 and deserialize() methods) by simply calling ser.newInstance().


 Sorry, maybe I wasn't clear. What I meant was, does spark use a singleton
 instance of kryo that can be accessed inside the map closure?

 Keep calling ser.newKryo() for every element (inside a map closure) has a
 huge overhead, and it seems newKryo() doesn't use any caching. Twitter
 chill uses an object pool for kryo instances, I'm not sure how spark
 handles this.




 On Mon, Jan 6, 2014 at 5:20 PM, Aureliano Buendia 
 buendia...@gmail.comwrote:

 In a map closure, I could use:

 val ser = SparkEnv.get.serializer.asInstanceOf[KryoSerializer]

 But how to get the instance of Kryo that spark uses from ser?


 On Tue, Jan 7, 2014 at 1:04 AM, Aaron Davidson ilike...@gmail.comwrote:

 I believe SparkEnv.get.serializer would return the serializer created
 from the spark.serializer property.

 You can also obtain a Kryo serializer directly via it's no-arg
 constructor (it still invokes your spark.kryo.registrator):
 val serializer = new KryoSerializer()
 but this could have some overhead, and so should probably not be done
 for every element you process.


 On Mon, Jan 6, 2014 at 4:36 PM, Aureliano Buendia buendia...@gmail.com
  wrote:

 Hi,

 Is there a way to access the global kryo instance created by spark?
 I'm referring to the one which is passed to registerClasses() in a
 KryoRegistrator sub class.

 I'd like to access this kryo instance inside a map closure, so it
 should be accessible from thw workers side too.








Re: ADD_JARS doesn't properly work for spark-shell

2014-01-04 Thread Aaron Davidson
Cool. To confirm, you said you can access the class and construct new
objects -- did you do this in the shell itself (i.e., on the driver), or on
the executors?

Specifically, one of the following two should fail in the shell:
 new mypackage.MyClass()
 sc.parallelize(0 until 10, 2).foreach(_ = new mypackage.MyClass())
(or just import it)

You could also try running MASTER=local-cluster[2,1,512] which launches 2
executors, 1 core each, with 512MB in a setup that mimics a real cluster
more closely, in case it's a bug only related to using local mode.


On Sat, Jan 4, 2014 at 7:07 PM, Aureliano Buendia buendia...@gmail.comwrote:




 On Sun, Jan 5, 2014 at 2:28 AM, Aaron Davidson ilike...@gmail.com wrote:

 Additionally, which version of Spark are you running?


 0.8.1.

 Unfortunately, this doesn't work either:

 MASTER=local[2] ADD_JARS=/path/to/my/jar 
 SPARK_CLASSPATH=/path/to/my/jar./spark-shell




 On Sat, Jan 4, 2014 at 6:27 PM, Aaron Davidson ilike...@gmail.comwrote:

 I am not an expert on these classpath issues, but if you're using local
 mode, you might also try to set SPARK_CLASSPATH to include the path to the
 jar file as well. This should not really help, since adding jars is the
 right way to get the jars to your executors (which is where the exception
 appears to be happening), but it would sure be interesting if it did.


 On Sat, Jan 4, 2014 at 4:50 PM, Aureliano Buendia 
 buendia...@gmail.comwrote:

 I should add that I can see in the log that the jar being shipped to
 the workers:

 14/01/04 15:34:52 INFO Executor: Fetching
 http://192.168.1.111:51031/jars/my.jar.jar with timestamp 131979092
 14/01/04 15:34:52 INFO Utils: Fetching
 http://192.168.1.111:51031/jars/my.jar.jar to
 /var/folders/3g/jyx81ctj3698wbvphxhm4dw4gn/T/fetchFileTemp8322008964976744710.tmp
 14/01/04 15:34:53 INFO Executor: Adding
 file:/var/folders/3g/jyx81ctj3698wbvphxhm4dw4gn/T/spark-d8ac8f66-fad6-4b3f-8059-73f13b86b070/my.jar.jar
 to class loader


 On Sun, Jan 5, 2014 at 12:46 AM, Aureliano Buendia 
 buendia...@gmail.com wrote:

 Hi,

 I'm trying to access my stand alone spark app from spark-shell. I
 tried starting the shell by:

 MASTER=local[2] ADD_JARS=/path/to/my/jar ./spark-shell

 The log shows that the jar file was loaded. Also, I can access and
 create a new instance of mypackage.MyClass.

 The problem is that myRDD.collect() returns RDD[MyClass], and that
 throws this exception:

 java.lang.ClassNotFoundException: mypackage.MyClass
   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
   at java.security.AccessController.doPrivileged(Native Method)
   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
   at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
   at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
   at java.lang.Class.forName0(Native Method)
   at java.lang.Class.forName(Class.java:264)
   at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:622)
   at
 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1593)
   at
 java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514)
   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1642)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1341)
   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
   at org.apache.spark.util.Utils$.deserialize(Utils.scala:59)
   at
 org.apache.spark.SparkContext$$anonfun$objectFile$1.apply(SparkContext.scala:573)
   at
 org.apache.spark.SparkContext$$anonfun$objectFile$1.apply(SparkContext.scala:573)
   at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:440)
   at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:702)
   at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:698)
   at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:872)
   at
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:872)
   at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
   at org.apache.spark.scheduler.Task.run(Task.scala:53)
   at
 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
   at
 org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
   at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
   at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
   at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
   at java.lang.Thread.run(Thread.java:722)

 Does this mean that my jar was not shipped to the workers? Is this a
 known issue, or am I doing something wrong here?








Re: NPE while reading broadcast variable.

2013-12-30 Thread Aaron Davidson
Could you post the stack trace you see for the NPE?


On Mon, Dec 30, 2013 at 11:31 AM, Archit Thakur
archit279tha...@gmail.comwrote:

 I am still getting it. I googled and found a similar open problem on
 stackoverflow:
 http://stackoverflow.com/questions/17794664/accumulator-fails-on-cluster-works-locally
 .

 Thx, Archit_Thakur.


 On Mon, Dec 23, 2013 at 11:32 AM, Archit Thakur archit279tha...@gmail.com
  wrote:

 Accessed it:

 val CURRENT_EXECUTING =
 ClusterVariableEnumeration.CURRENT_EXECUTION.value.asInstanceOf[String]


 On Mon, Dec 23, 2013 at 11:27 AM, Archit Thakur 
 archit279tha...@gmail.com wrote:

 Hi,

 I am getting NPE while I access broadcast variable.

 I created an object:

 object ClusterVariableEnumeration {

   var CURRENT_EXECUTION: org.apache.spark.broadcast.Broadcast[Any] =
 null.asInstanceOf[org.apache.spark.broadcast.Broadcast[Any]];
 }

 and then after creating SparkContext, I did
 ClusterVariableEnumeration.CURRENT_EXECUTION = sc.broadcast(value)

 and then in map function When I tried to access it it gave me NPE. Idea?

 Thanks and Regards,
 Archit Thakur.






Re: How to set Akka frame size

2013-12-24 Thread Aaron Davidson
The error you're receiving is because the Akka frame size must be
a positive Java Integer, i.e., less than 2^31. However, the frame size is
not intended to be nearly the size of the job memory -- it is the smallest
unit of data transfer that Spark does. In this case, your task result
size is exceeding 10MB, which means that returning the results for a single
partition of your data is 10MB.

It appears that the default JavaWordCount example has a minSplits value of
1 (ctx.textFile(args[1], 1)). This really means that the number of
partitions will be max(1, # hdfs blocks in file). If you have an HDFS block
of size ~64MB and all distinct words, the resulting task set may be around
the same size, which is 10MB.

You have two collaborating solutions:

   1. Increase the value of minSplits to reduce the size of any single
   TaskResult set, like: ctx.textFile(args[1], 256)
   2. Increase the Akka frame size by a small amount (e.g., to 20-70MB).

Please note that this issue, while annoying, is in good part due to the
lack of realism of this example. You very rarely call collect() in Spark in
actual usage, as that will put *all *your output data on the driver
machine. Much more likely you'd save to an HDFS file or compute the top 100
words or something like that, which would not have this problem.

(One final note about your configuration, the Spark Worker is simply
responsible for spawning Executors, which do the actual computation. As
such, it is typical not to change the Worker memory at all [as it needs
very little] but rather to give the majority of a machine's memory
distributed amongst the Executors. If each machine has 16 GB of RAM and 4
cores, for example, you might set spark.executor.memory between 2 and 3 GB,
totaling 8-12 GB used by Spark.)


On Tue, Dec 24, 2013 at 3:58 AM, leosand...@gmail.com
leosand...@gmail.comwrote:

  Hi, everyone

 I have a question about the arg spark.akka.frameSize , it default value is
 10m .
 I execute the JavaWordCount read data from hdfs , there is a 7G file .
 there is a oom error caused by
 some task result exceeded Akka frame size .
 but when I modify the arg 1G ,2G , 10G , it show me
 ERROR ClusterScheduler: Lost executor 1 on ocnosql84: remote Akka client
 shutdown
 13/12/24 19:41:14 ERROR StandaloneExecutorBackend: Driver terminated or
 disconnected! Shutting down.

 Sometimes it show me different error info :
  [lh1@ocnosql84 src]$ java MyWordCount spark://ocnosql84:7077
 hdfs://ocnosql76:8030/user/lh1/cdr_ismp_20130218 15000 1g 120
 13/12/24 19:20:33 ERROR Client$ClientActor: Failed to connect to master
 org.jboss.netty.channel.ChannelPipelineException: Failed to initialize a
 pipeline.
 at
 org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:209)
 at
 org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:183)
 at
 akka.remote.netty.ActiveRemoteClient$$anonfun$connect$1.apply$mcV$sp(Client.scala:173)
 at akka.util.Switch.liftedTree1$1(LockUtil.scala:33)
 at akka.util.Switch.transcend(LockUtil.scala:32)
 at akka.util.Switch.switchOn(LockUtil.scala:55)
 at akka.remote.netty.ActiveRemoteClient.connect(Client.scala:158)
 at
 akka.remote.netty.NettyRemoteTransport.send(NettyRemoteSupport.scala:153)
 at
 akka.remote.RemoteActorRef.$bang(RemoteActorRefProvider.scala:247)
 at
 org.apache.spark.deploy.client.Client$ClientActor.preStart(Client.scala:61)
 at akka.actor.ActorCell.create$1(ActorCell.scala:508)
 at akka.actor.ActorCell.systemInvoke(ActorCell.scala:600)
 at
 akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:209)
 at akka.dispatch.Mailbox.run(Mailbox.scala:178)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:516)
 at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
 at
 akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975)
 at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1479)
 at
 akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
 Caused by: java.lang.IllegalArgumentException: maxFrameLength must be a
 positive integer: -1451229184
 at
 org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder.init(LengthFieldBasedFrameDecoder.java:270)
 at
 org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder.init(LengthFieldBasedFrameDecoder.java:236)
 at
 akka.remote.netty.ActiveRemoteClientPipelineFactory.getPipeline(Client.scala:340)
 at
 org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:207)
 ... 18 more
 13/12/24 19:20:33 ERROR SparkDeploySchedulerBackend: Disconnected from
 Spark cluster!
 13/12/24 19:20:33 ERROR ClusterScheduler: Exiting due to error from
 cluster scheduler: Disconnected from Spark cluster

 It seems caused by
 LengthFieldBasedFrameDecoder lenDec = new
 

Re: IMPORTANT: Spark mailing lists moving to Apache by September 1st

2013-12-19 Thread Aaron Davidson
I'd be fine with one-way mirrors here (Apache threads being reflected in
Google groups) -- I have no idea how one is supposed to navigate the Apache
list to look for historic threads.


On Thu, Dec 19, 2013 at 7:58 PM, Mike Potts maspo...@gmail.com wrote:

 Thanks very much for the prompt and comprehensive reply!  I appreciate the
 overarching desire to integrate with apache: I'm very happy to hear that
 there's a move to use the existing groups as mirrors: that will overcome
 all of my objections: particularly if it's bidirectional! :)


 On Thursday, December 19, 2013 7:19:06 PM UTC-8, Andy Konwinski wrote:

 Hey Mike,

 As you probably noticed when you CC'd spark-de...@googlegroups.com, that
 list has already be reconfigured so that it no longer allows posting (and
 bounces emails sent to it).

 We will be doing the same thing to the spark...@googlegroups.com list
 too (we'll announce a date for that soon).

 That may sound very frustrating, and you are *not* alone feeling that
 way. We've had a long conversation with our mentors about this, and I've
 felt very similar to you, so I'd like to give you background.

 As I'm coming to see it, part of becoming an Apache project is moving the
 community *fully* over to Apache infrastructure, and more generally the
 Apache way of organizing the community.

 This applies in both the nuts-and-bolts sense of being on apache infra,
 but possibly more importantly, it is also a guiding principle and way of
 thinking.

 In various ways, moving to apache Infra can be a painful process, and IMO
 the loss of all the great mailing list functionality that comes with using
 Google Groups is perhaps the most painful step. But basically, the de facto
 mailing lists need to be the Apache ones, and not Google Groups. The
 underlying reason is that Apache needs to take full accountability for
 recording and publishing the mailing lists, it has to be able to
 institutionally guarantee this. This is because discussion on mailing lists
 is one of the core things that defines an Apache community. So at a minimum
 this means Apache owning the master copy of the bits.

 All that said, we are discussing the possibility of having a google group
 that subscribes to each list that would provide an easier to use and
 prettier archive for each list (so far we haven't gotten that to work).

 I hope this was helpful. It has taken me a few years now, and a lot of
 conversations with experienced (and patient!) Apache mentors, to
 internalize some of the nuance about the Apache way. That's why I wanted
 to share.

 Andy

 On Thu, Dec 19, 2013 at 6:28 PM, Mike Potts masp...@gmail.com wrote:

 I notice that there are still a lot of active topics in this group: and
 also activity on the apache mailing list (which is a really horrible
 experience!).  Is it a firm policy on apache's front to disallow external
 groups?  I'm going to be ramping up on spark, and I really hate the idea of
 having to rely on the apache archives and my mail client.  Also: having to
 search for topics/keywords both in old threads (here) as well as new
 threads in apache's (clunky) archive, is going to be a pain!  I almost feel
 like I must be missing something because the current solution seems
 unfeasibly awkward!

  --
 You received this message because you are subscribed to the Google
 Groups Spark Users group.
 To unsubscribe from this group and stop receiving emails from it, send
 an email to spark-users...@googlegroups.com.

 For more options, visit https://groups.google.com/groups/opt_out.





Re: problems with standalone cluster

2013-12-12 Thread Aaron Davidson
You might also check the spark/work/ directory for application (Executor)
logs on the slaves.


On Tue, Nov 19, 2013 at 6:13 PM, Umar Javed umarj.ja...@gmail.com wrote:

 I have a scala script that I'm trying to run on a Spark standalone cluster
 with just one worker (existing on the master node). But the application
 just hangs. Here's the worker log output at the time of starting the job:

 3/11/19 18:03:13 INFO Worker: Asked to launch executor
 app-20131119180313-0001/0 for job
 13/11/19 18:03:13 INFO ExecutorRunner: Launch command: java -cp
 :/homes/network/revtr/ujaved/incubator-spark/conf:/homes/network/revtr/ujaved/incubator-spark/assembly/target/scala\
 -2.9.3/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar
 -Xms512M -Xmx512M org.apache.spark.executor.StandaloneExecutorBackend
 akka://sp...@drone.cs.washington.edu:57653\
 /user/StandaloneScheduler 0 drone.cs.washington.edu 16
 app-20131119180313-0001
 13/11/19 18:03:13 INFO Worker: Asked to kill executor
 app-20131119180313-0001/0
 13/11/19 18:03:13 INFO ExecutorRunner: Killing process!
 13/11/19 18:03:13 INFO Worker: Executor app-20131119180313-0001/0 finished
 with state KILLED
 13/11/19 18:03:13 INFO ExecutorRunner: Redirection to
 /homes/network/revtr/ujaved/incubator-spark/work/app-20131119180313-0001/0/stdout
 closed: Stream closed
 13/11/19 18:03:13 INFO ExecutorRunner: Redirection to
 /homes/network/revtr/ujaved/incubator-spark/work/app-20131119180313-0001/0/stderr
 closed: Bad file descriptor
 13/11/19 18:03:13 ERROR Worker: key not found: app-20131119180313-0001/0


 Why is the worker killed as soon as it is started? I should mention I
 don't have this problem when using pyspark.

 thanks!
 Umar



Re: groupBy() with really big groups fails

2013-12-09 Thread Aaron Davidson
This is very likely due to memory issues. The problem is that each
reducer (partition of the groupBy) builds an in-memory table of that
partition. If you have very few partitions, this will fail, so the solution
is to simply increase the number of reducers. For example:
sc.parallelize(1 to 4).groupBy(x = x % 10, *256*).takeSample(false,
10, 10)

See also: Tuning
Sparkhttp://spark.incubator.apache.org/docs/latest/tuning.html#memory-usage-of-reduce-tasksdocumentation.


On Mon, Dec 9, 2013 at 11:46 AM, Matt Cheah mch...@palantir.com wrote:

  Hi everyone,

  I was wondering if I could get any insight as to why the following query
 fails:

  scala sc.parallelize(1 to 4).groupBy(x = x %
 10).takeSample(false, 10, 10)
 some generic stuff happens
  2013-12-09 19:42:30,756 [pool-3-thread-1] INFO
  org.apache.spark.network.ConnectionManager - Removing ReceivingConnection
 to ConnectionManagerId(ip-172-31-13-201.us-west-1.compute.internal,50351)
 2013-12-09 19:42:30,813 [connection-manager-thread] INFO
  org.apache.spark.network.ConnectionManager - Key not valid ?
 sun.nio.ch.SelectionKeyImpl@f2d755
 2013-12-09 19:42:30,756 [pool-3-thread-2] INFO
  org.apache.spark.network.ConnectionManager - Removing ReceivingConnection
 to ConnectionManagerId(ip-172-31-13-199.us-west-1.compute.internal,43961)
 2013-12-09 19:42:30,814 [pool-3-thread-4] INFO
  org.apache.spark.network.ConnectionManager - Removing SendingConnection to
 ConnectionManagerId(ip-172-31-13-199.us-west-1.compute.internal,43961)
 2013-12-09 19:42:30,814 [pool-3-thread-2] INFO
  org.apache.spark.network.ConnectionManager - Removing SendingConnection to
 ConnectionManagerId(ip-172-31-13-199.us-west-1.compute.internal,43961)
 2013-12-09 19:42:30,814 [pool-3-thread-1] INFO
  org.apache.spark.network.ConnectionManager - Removing SendingConnection to
 ConnectionManagerId(ip-172-31-13-201.us-west-1.compute.internal,50351)
 2013-12-09 19:42:30,815 [pool-3-thread-3] INFO
  org.apache.spark.network.ConnectionManager - Removing SendingConnection to
 ConnectionManagerId(ip-172-31-13-203.us-west-1.compute.internal,43126)
 2013-12-09 19:42:30,818 [connection-manager-thread] INFO
  org.apache.spark.network.ConnectionManager - key already cancelled ?
 sun.nio.ch.SelectionKeyImpl@f2d755
 java.nio.channels.CancelledKeyException
 at
 org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:266)
 at
 org.apache.spark.network.ConnectionManager$$anon$3.run(ConnectionManager.scala:97)
 2013-12-09 19:42:30,825 [pool-3-thread-4] INFO
  org.apache.spark.network.ConnectionManager - Removing ReceivingConnection
 to ConnectionManagerId(ip-172-31-13-203.us-west-1.compute.internal,43126)
 house of cards collapses

  Let me know if more of the logs would be useful, although it seems from
 this point on everything falls apart.

  I'm using an EC2-script launched cluster, 10 nodes, m2.4xlarge, 65.9GB
 of RAM per slave. Let me know if any other system configurations are
 relevant.

  In a more general sense – our use case has been involving large group-by
 queries, so I was trying to simulate this kind of workload in the spark
 shell. Is there any good way to consistently get these kinds of queries to
 work? Assume that during the general use-case it can't be known a-priori
 how many groups there will be.

  Thanks,

  -Matt Cheah



Re: Serializable incompatible with Externalizable error

2013-12-03 Thread Aaron Davidson
This discussion seems to indicate the possibility of a mismatch between one
side being Serializable and the other being Externalizable:
https://forums.oracle.com/thread/2147644

In general, the semantics of Serializable can be pretty strange as it
doesn't really behave the same as usual interfaces. It may actually be
possible that if a superclass implements Serializable and you implement
Externalizable, things can screw up, for instance.


On Tue, Dec 3, 2013 at 10:48 AM, Matt Cheah mch...@palantir.com wrote:

  That can't be – Externalizable is a specific sub-interface of
 Serializable that allows custom serialization formats.

  Granted, I could accomplish the same thing by implementing the private
 readObject() and writeObject() methods in this class, and implement
 Serializable only. It just seems odd to me that I'd have to do so.
 Especially since the tuning guide suggests to use Externalizable:
 http://spark.incubator.apache.org/docs/latest/tuning.html

  -Matt Cheah

   From: Andrew Ash and...@andrewash.com
 Reply-To: user@spark.incubator.apache.org 
 user@spark.incubator.apache.org
 Date: Monday, December 2, 2013 10:45 PM
 To: user@spark.incubator.apache.org user@spark.incubator.apache.org
 Subject: Re: Serializable incompatible with Externalizable error

   At least from
 http://stackoverflow.com/questions/817853/what-is-the-difference-between-serializable-and-externalizable-in-javahttps://urldefense.proofpoint.com/v1/url?u=http://stackoverflow.com/questions/817853/what-is-the-difference-between-serializable-and-externalizable-in-javak=fDZpZZQMmYwf27OU23GmAQ%3D%3D%0Ar=gxvgJndY02bAG2cHbPl1cUTcd%2FLzFGz7wtfiAfRKPpk%3D%0Am=1AzN%2FqpvE4RUjLF0SQcqdBp8GpAKxHbhF7JFlnvwkHI%3D%0As=ca3c6dbf7a4045601fbd4b5634863075aca6f5dda2606bb01b34eb47f49eb225it
  looks like Externalizable is roughly an old-java version of
 Serializable.  Does that class implement both interfaces?  Can you take
 away the Externalizable interface if it's being used?


 On Mon, Dec 2, 2013 at 7:15 PM, Matt Cheah mch...@palantir.com wrote:

  Hi everyone,

  I'm running into a case where I'm creating a Java RDD of an
 Externalizable class, and getting this stack trace:

  java.io.InvalidClassException (java.io.InvalidClassException:
 com.palantir.finance.datatable.server.spark.WritableDataRow; Serializable
 incompatible with Externalizable)
 java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:634)
 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
 java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
 java.io.ObjectInputStream.readClass(ObjectInputStream.java:1483)some
 other Java stuff
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39)
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:61)
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:153)
 I'm running on a spark cluster generated by the EC2 Scripts. This doesn't
 happen if I'm running things with local[N]. Any ideas?
 Thanks,
 -Matt Cheah





Re: spark-shell not working on standalone cluster (java.io.IOException: Cannot run program compute-classpath.sh)

2013-11-25 Thread Aaron Davidson
There is a pull request currently to fix this exact issue, I believe, at
https://github.com/apache/incubator-spark/pull/192. It's very small and
only touches the script files, so you could apply it to your current
version and distribute it to the workers. The fix here is that you add an
additional variable in spark-env that specifies the REMOTE_SPARK_HOME.


On Mon, Nov 25, 2013 at 3:40 AM, Grega Kešpret gr...@celtra.com wrote:

 It seems there is already an open ticket for this -
 https://spark-project.atlassian.net/browse/SPARK-905 , but for version
 0.7.3.

 Grega
 --
 [image: Inline image 1]
 *Grega Kešpret*
 Analytics engineer

 Celtra — Rich Media Mobile Advertising
 celtra.com http://www.celtra.com/ | 
 @celtramobilehttp://www.twitter.com/celtramobile


 On Mon, Nov 25, 2013 at 12:13 PM, Grega Kešpret gr...@celtra.com wrote:

 Sorry, forgot to mention, I run spark version v0.8.0-incubating from
 https://github.com/apache/incubator-spark.git.
 It seems to work when local Spark directory is also /opt/spark, so I
 think this confirms my doubt that SPARK_HOME somehow doesn't get passed to
 the Executor?

 Grega
 --
 [image: Inline image 1]
 *Grega Kešpret*
 Analytics engineer

 Celtra — Rich Media Mobile Advertising
 celtra.com http://www.celtra.com/ | 
 @celtramobilehttp://www.twitter.com/celtramobile



celtra_logo.png

Re: oome from blockmanager

2013-11-22 Thread Aaron Davidson
Jerry, I need to correct what I said about the 100KB for
each FastBufferedOutputStream -- this is actually a Spark buffer, not a
compression buffer. The size can be configured using the
spark.shuffle.file.buffer.kb System property, and it defaults to 100. I
am still curious if you're using compression or seeing more than 48k
DiskBlockObjectWriters to account for the remaining memory used.


On Fri, Nov 22, 2013 at 9:05 AM, Aaron Davidson ilike...@gmail.com wrote:

 Great, thanks for the feedback. It sounds like you're using the LZF
 compression scheme -- switching to Snappy should see significantly less
 buffer space used up per DiskBlockObjectWriter, but this doesn't really
 solve the underlying problem. In general I've been thinking of Spark
 nodes as having high memory and a moderate number of cores, but with 24
 cores and 40GB of memory, each core really doesn't get that much memory
 individually, despite every one needing its own set of
 DiskBlockObjectWriters.

 One thing that is a little odd is that with your numbers, you should have
 2000 (reducers) * 24 (cores) = 48k DiskBlockObjectWriters. These should
 only require a total of 4.8GB for the entire node, though, rather than 80%
 of your JVM memory. Were you seeing significantly more than 48k
 DiskBlockObjectWriters?


 On Fri, Nov 22, 2013 at 1:38 AM, Shao, Saisai saisai.s...@intel.comwrote:

  Hi Aaron,



 I’ve also met the same problem that shuffle takes so much overhead for
 large number of partitions. I think it is an important issue when
 processing large data.



 In my case I have 2000 mapper and 2000 reducers,  I dump the memory of
 executor and found that byte array takes about 80% of total jvm memory,
  which are referred by FastBufferedOutputStream, and created by
 DiskBlockObjectWriter. It seems that there are so many instances of
 DiskBlockObjectWriter and each DiskBlockObjectWriter will has 100KB buffer
 for FastBufferedOutputStream by default. These buffers are persisted
 through task execution period and cannot be garbage collected unless task
 is finished.



 My cluster has 6 nodes, and 40G memory and 24 core per node, I tried with
 5000 partitions, this will easily got OOM.



 What a dilemma is that my application needs groupByKey transformation
 which requires small partitions size, but small partition size will lead to
 more partition numbers that also consumes lots of memory.



 Thanks

 Jerry



 *From:* Aaron Davidson [mailto:ilike...@gmail.com]
 *Sent:* Friday, November 22, 2013 2:54 PM
 *To:* user@spark.incubator.apache.org
 *Subject:* Re: oome from blockmanager



 Thanks for your feedback; I think this is a very important issue on the
 usability front. One thing to consider is that at some data size, one
 simply needs larger or more nodes. m1.large is essentially the smallest ec2
 instance size that can run a Spark job of any reasonable size. That's not
 an excuse for an OOM, really -- one should generally just see (heavily)
 degraded performance instead of actually failing the job. Additionally, the
 number of open files scales with the number of reducers in Spark, rather
 than, say, Map Reduce, where each mapper only writes to one file, at the
 cost of later sorting the entire thing. This unfortunately means that
 adding nodes isn't really a full solution in your case, since each one
 would try to have 36k compressed output streams open.



 The short term solutions have already been discussed: decrease the number
 of reducers (and mappers, if you need them to be tied) or potentially turn
 off compression if Snappy is holding too much buffer space. A third option
 would actually be to decrease the number of executors per node to 1, since
 that would double the available memory and roughly halve the usage. Clearly
 either of the latter two solutions will produce a significant slowdown,
 while the first should keep the same or better performance. While Spark is
 good at handling a large number of partitions, there is still some cost to
 schedule every task, as well as to store and forward the metadata for every
 shuffle block (which grows with R * M), so the ideal partition size is one
 that fits exactly into memory without OOMing -- although this is of course
 an unrealistic situation to aim for.



 The longer term solutions include algorithms which degrade gracefully
 instead of OOMing (although this would be a solution for too-large
 partitions instead of too-little, where the metadata and buffering becomes
 the issue) and to potentially adopt a more Map-Reducey style of shuffling
 where we would only need to write to 1 file per executor at a time, with
 some significant processing and disk bandwidth cost. I am currently
 investigating shuffle file performance, and thanks to your feedback here,
 I'll additionally investigate the memory overheads inherent in shuffling as
 well.





 On Thu, Nov 21, 2013 at 10:20 PM, Stephen Haberman 
 stephen.haber...@gmail.com wrote:


  More significant in shuffling data

Re: debugging a Spark error

2013-11-18 Thread Aaron Davidson
Have you looked a the Spark executor logs? They're usually located in the
$SPARK_HOME/work/ directory. If you're running in a cluster, they'll be on
the individual slave nodes. These should hopefully reveal more information.


On Mon, Nov 18, 2013 at 3:42 PM, Chris Grier gr...@icsi.berkeley.eduwrote:

 Hi,

 I'm trying to figure out what the problem is with a job that we are
 running on Spark 0.7.3. When we write out via saveAsTextFile we get an
 exception that doesn't reveal much:

 13/11/18 15:06:19 INFO cluster.TaskSetManager: Loss was due to
 java.io.IOException
 java.io.IOException: Map failed
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:849)
 at spark.storage.DiskStore.getBytes(DiskStore.scala:86)
 at spark.storage.DiskStore.getValues(DiskStore.scala:92)
 at spark.storage.BlockManager.getLocal(BlockManager.scala:284)
 at spark.storage.BlockFetcherIterator$$anonfun$
 13.apply(BlockManager.scala:1027)
 at spark.storage.BlockFetcherIterator$$anonfun$
 13.apply(BlockManager.scala:1026)
 at scala.collection.mutable.ResizableArray$class.foreach(
 ResizableArray.scala:60)
 at scala.collection.mutable.ArrayBuffer.foreach(
 ArrayBuffer.scala:47)
 at spark.storage.BlockFetcherIterator.init(
 BlockManager.scala:1026)
 at spark.storage.BlockManager.getMultiple(BlockManager.scala:478)
 at spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.
 scala:51)
 at spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.
 scala:10)
 at spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(
 CoGroupedRDD.scala:127)
 at spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(
 CoGroupedRDD.scala:115)
 at scala.collection.IndexedSeqOptimized$class.
 foreach(IndexedSeqOptimized.scala:34)
 at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:38)
 at spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:115)
 at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
 at spark.RDD.iterator(RDD.scala:196)
 at spark.MappedValuesRDD.compute(PairRDDFunctions.scala:704)
 at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
 at spark.RDD.iterator(RDD.scala:196)
 at spark.FlatMappedValuesRDD.compute(PairRDDFunctions.scala:714)
 at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
 at spark.RDD.iterator(RDD.scala:196)
 at spark.rdd.MappedRDD.compute(MappedRDD.scala:12)
 at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
 at spark.RDD.iterator(RDD.scala:196)
 at spark.rdd.MappedRDD.compute(MappedRDD.scala:12)
 at spark.RDD.computeOrReadCheckpoint(RDD.scala:207)
 at spark.RDD.iterator(RDD.scala:196)
 at spark.scheduler.ResultTask.run(ResultTask.scala:77)
 at spark.executor.Executor$TaskRunner.run(Executor.scala:100)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(
 ThreadPoolExecutor.java:1145)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(
 ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:724)

 Any ideas?

 -Chris



Re: EC2 node submit jobs to separate Spark Cluster

2013-11-18 Thread Aaron Davidson
The main issue with running a spark-shell locally is that it orchestrates
the actual computation, so you want it to be close to the actual Worker
nodes for latency reasons. Running a spark-shell on EC2 in the same region
as the Spark cluster avoids this problem.

The error you're seeing seems to indicate a different issue. Check the
Master web UI (accessible on port 8080 at the master's IP address) to make
sure that Workers are successfully registered and they have the expected
amount of memory available to Spark. You can also check to see how much
memory your spark-shell is trying to get per executor. A couple common
problems are (1) an abandoned spark-shell is holding onto all of your
cluster's resources or (2) you've manually configured your spark-shell to
try to get more memory than your Workers have available. Both of these
should be visible in the web UI.


On Mon, Nov 18, 2013 at 5:00 PM, Matt Cheah mch...@palantir.com wrote:

  Hi,

  I'm working with an infrastructure that already has its own web server
 set up on EC2. I would like to set up a *separate* spark cluster on EC2
 with the scripts and have the web server submit jobs to this spark cluster.

  Is it possible to do this? I'm getting some errors running the spark
 shell from the spark shell on the web server: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient memory. I have heard that it's not possible for any
 local computer to connect to the spark cluster, but I was wondering if
 other EC2 nodes could have their firewalls configured to allow this.

  We don't want to deploy the web server on the master node of the spark
 cluster.

  Thanks,

  -Matt Cheah





Re: foreachPartition in Java

2013-11-17 Thread Aaron Davidson
Also, in general, you can workaround shortcomings in the Java API by
converting to a Scala RDD (using JavaRDD's rdd() method). The API tends to
be much clunkier since you have to jump through some hoops to talk to a
Scala API in Java, though. In this case, JavaRDD's mapPartition() method
will likely be the cleanest solution as Patrick said.


On Sun, Nov 17, 2013 at 5:03 PM, Patrick Wendell pwend...@gmail.com wrote:

 Can you just call mapPartitions and ignore the result?

 - Patrick

 On Sun, Nov 17, 2013 at 4:45 PM, Yadid Ayzenberg ya...@media.mit.edu
 wrote:
  Hi,
 
  According to the API, foreachPartition() is not yet implemented in Java.
  Are there any workarounds to get the same functionality ?
  I have a non serializable DB connection and instantiating it is pretty
  expensive, so I prefer to do it on a per partition basis.
 
  thanks,
  Yadid
 



Re: number of splits for standalone cluster mode

2013-11-17 Thread Aaron Davidson
The number of splits can be configured when reading the file, as an
argument to textFile(), sequenceFile(), etc (see
docshttp://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.SparkContext@textFile(String,Int):RDD[String]).
Note that this is a minimum, however, as certain input sources may not
allow partitions larger than a certain size (e.g., reading from HDFS may
force partitions to be at most ~130 MB [depending on HDFS block size]).

If you wish to have fewer partitions than the minimum your input source
allows, you can use the
RDD.coalesce()http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.RDDmethod
to locally combine partitions.


On Sun, Nov 17, 2013 at 1:33 PM, Umar Javed umarj.ja...@gmail.com wrote:

 Hi,

 When running Spark in the standalone cluster node, is there a way to
 configure the number of splits for the input file(s)? It seems like it is
 approximately 32 MB for every core be default. Is that correct? For example
 in my cluster there are two workers, each running on a machine with two
 cores. For an input file of size 500MB, Spark schedules 16 tasks for the
 initial map (500/32 ~ 16)

 thanks!
 Umar



Re: failure notice

2013-11-17 Thread Aaron Davidson
Great, thanks for the update! Your ant version matches mine, though I can't
reproduce the error. Weird.

I have created SPARK-959https://spark-project.atlassian.net/browse/SPARK-959to
track this issue and PR
#183 https://github.com/apache/incubator-spark/pull/183 to hopefully
solve the issue in the future.



On Sun, Nov 17, 2013 at 10:08 PM, Egon Kidmose kidm...@gmail.com wrote:

 I applied the first approach in $SPARK_HOME/project/SparkBuild.scala
 (.scala, not .sbt, as the later wasn't present)
 This solved the issue.

 Thanks for your help!

  ~/Downloads/spark-0.8.0-incubating $ ant -v
 Apache Ant(TM) version 1.8.2 compiled on May 18 2012
 Mvh/BR
 Egon Kidmose


 On Sun, Nov 17, 2013 at 7:40 PM, Aaron Davidson ilike...@gmail.com
 wrote:
  Could you report your ant/Ivy version? Just run
  ant -version
 
  The fundamental problem is that Ivy is stupidly thinking .orbit is the
  file extension when it should be .jar. There are two possible fixes you
  can try, and please let us know if one or the other works. In
  $SPARK_HOME/project/SparkBuild.sbt, find the line that contains the text
  org.eclipse.jetty and add this line directly after it:
  org.eclipse.jetty.orbit % javax.servlet % 2.5.0.v201103041518
  artifacts Artifact(javax.servlet, jar, jar),
  Hopefully this will force Ivy to do the right thing.
 
  If not, you can try the solution from the mailing list in the same file,
 by
  adding the lines:
 
  ivyXML :=
dependency org=org.eclipse.jetty.orbit name=javax.servlet
  rev=2.5.0.v201103041518
  artifact name=javax.servlet type=orbit ext=jar/
/dependency
 
  This solution ideally just does the same thing, albeit in a slightly less
  SBT-ish way.
 
 
 
  On Sun, Nov 17, 2013 at 6:44 PM, Egon Kidmose kidm...@gmail.com wrote:
 
  Hi All,
 
  I'm trying to get started on using spark, but following the quick
  start guide I encounter an error.
 
  As per documentation I run sbt/sbt assembly from the root folder,
  but I get an error for downloading javax.servlet.orbit. (see below)
  It's the same result for 0.8.0 and what's on github.
  It seems similar to the issue earlier discussed here:
 
 
 http://mail-archives.apache.org/mod_mbox/spark-user/201309.mbox/%3ccajbo4nexyzqe6zgreqjtzzz5zrcoavfen+wmbyced6n1epf...@mail.gmail.com%3E
  I don't understand how to apply this to my problem. (Which *.sbt
  should the line be entered into?)
 
  Any advice on how to solve this?
  Thanks!
 
  Mvh/BR
  Egon Kidmose
 
 
 
  ~/Downloads/spark-0.8.0-
  incubating $ uname -a
  Linux ekidmose-laptop-linux 3.8.0-19-generic #30-Ubuntu SMP Wed May 1
  16:35:23 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux
   ~/Downloads/spark-0.8.0-incubating $ javac -version
  javac 1.7.0_25
   ~/Downloads/spark-0.8.0-incubating $ sbt/sbt assembly
  [info] Loading project definition from
  /home/ekidmose/Downloads/spark-0.8.0-incubating/project/project
  [info] Updating
 
 
 {file:/home/ekidmose/Downloads/spark-0.8.0-incubating/project/project/}default-8557a6...
  [info] Resolving org.scala-sbt#precompiled-2_10_1;0.12.4 ...
  [info] Done updating.
  [info] Compiling 1 Scala source to
 
 
 /home/ekidmose/Downloads/spark-0.8.0-incubating/project/project/target/scala-2.9.2/sbt-0.12/classes...
  [info] Loading project definition from
  /home/ekidmose/Downloads/spark-0.8.0-incubating/project
  [info] Updating
 
 {file:/home/ekidmose/Downloads/spark-0.8.0-incubating/project/}plugins...
  [info] Resolving org.scala-sbt#precompiled-2_10_1;0.12.4 ...
  [info] Done updating.
  [info] Compiling 1 Scala source to
 
 
 /home/ekidmose/Downloads/spark-0.8.0-incubating/project/target/scala-2.9.2/sbt-0.12/classes...
  [info] Set current project to root (in build
  file:/home/ekidmose/Downloads/spark-0.8.0-incubating/)
  [info] Updating
  {file:/home/ekidmose/Downloads/spark-0.8.0-incubating/}core...
  [info] Resolving org.apache.derby#derby;10.4.2.0 ...
  [warn] [NOT FOUND  ]
 
 
 org.eclipse.jetty.orbit#javax.servlet;2.5.0.v201103041518!javax.servlet.orbit
  (77ms)
  [warn]  public: tried
  [warn]
 
 http://repo1.maven.org/maven2/org/eclipse/jetty/orbit/javax.servlet/2.5.0.v201103041518/javax.servlet-2.5.0.v201103041518.orbit
  [warn] ::
  [warn] ::  FAILED DOWNLOADS::
  [warn] :: ^ see resolution messages for details  ^ ::
  [warn] ::
  [warn] ::
 
 org.eclipse.jetty.orbit#javax.servlet;2.5.0.v201103041518!javax.servlet.orbit
  [warn] ::
  sbt.ResolveException: download failed:
 
 
 org.eclipse.jetty.orbit#javax.servlet;2.5.0.v201103041518!javax.servlet.orbit
  at sbt.IvyActions$.sbt$IvyActions$$resolve(IvyActions.scala:214)
  at sbt.IvyActions$$anonfun$update$1.apply(IvyActions.scala:122)
  at sbt.IvyActions$$anonfun$update$1.apply(IvyActions.scala:121)
  at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:117)
  at sbt.IvySbt$Module

Re: Memory configuration in local mode

2013-11-16 Thread Aaron Davidson
I was under the impression that he was using the same JVM for Spark and
other stuff, and wanted to limit how much of it Spark could use. Patrick's
solution is of course the right way to go if that's not the case.


On Sat, Nov 16, 2013 at 9:40 AM, Patrick Wendell pwend...@gmail.com wrote:

 If you are using local mode, you can just pass -Xmx32g to the JVM that
 is launching spark and it will have that much memory.

 On Fri, Nov 15, 2013 at 6:30 PM, Aaron Davidson ilike...@gmail.com
 wrote:
  One possible workaround would be to use the local-cluster Spark mode.
 This
  is normally used only for testing, but it will actually spawn a separate
  process for the executor. The format is:
  new SparkContext(local-cluster[1,4,32000])
  This will spawn 1 Executor that is allocated 4 cores and 32GB
 (approximated
  as 32k MB). Since this is a separate process with its own JVM, you'd
  probably want to just change your original JVM's memory to 32 GB.
 
  Note that since local-cluster mode more closely simulates a cluster, it's
  possible that certain issues like dependency problems may arise that
 don't
  appear when using local mode.
 
 
  On Fri, Nov 15, 2013 at 11:43 AM, Alex Boisvert alex.boisv...@gmail.com
 
  wrote:
 
  When starting a local-mode Spark instance, e.g., new
  SparkContext(local[4]), what memory configuration options are
  available/considered to limit Spark's memory usage?
 
  For instance, if I have a JVM with 64GB and would like to reserve/limit
  Spark to using only 32GB of the heap.
 
  thanks!
 
 



Re: Memory configuration in local mode

2013-11-15 Thread Aaron Davidson
One possible workaround would be to use the local-cluster Spark mode. This
is normally used only for testing, but it will actually spawn a separate
process for the executor. The format is:
new SparkContext(local-cluster[1,4,32000])
This will spawn 1 Executor that is allocated 4 cores and 32GB (approximated
as 32k MB). Since this is a separate process with its own JVM, you'd
probably want to just change your original JVM's memory to 32 GB.

Note that since local-cluster mode more closely simulates a cluster, it's
possible that certain issues like dependency problems may arise that don't
appear when using local mode.


On Fri, Nov 15, 2013 at 11:43 AM, Alex Boisvert alex.boisv...@gmail.comwrote:

 When starting a local-mode Spark instance, e.g., new
 SparkContext(local[4]), what memory configuration options are
 available/considered to limit Spark's memory usage?

 For instance, if I have a JVM with 64GB and would like to reserve/limit
 Spark to using only 32GB of the heap.

 thanks!




Re: mapping of shuffle outputs to reduce tasks

2013-11-10 Thread Aaron Davidson
It is responsible for a subset of shuffle blocks. MapTasks split up their
data, creating one shuffle block for every reducer. During the shuffle
phase, the reducer will fetch all shuffle blocks that were intended for it.


On Sun, Nov 10, 2013 at 9:38 PM, Umar Javed umarj.ja...@gmail.com wrote:

 I was wondering how does the scheduler assign the ShuffledRDD locations to
 the reduce tasks? Say that you have 4 reduce tasks, and a number of shuffle
 blocks across two machines. Is each reduce task responsible for a subset of
 individual keys or a subset of shuffle blocks?

 Umar



Re: oome from blockmanager

2013-11-05 Thread Aaron Davidson
As a followup on this, the memory footprint of all shuffle metadata has
been  greatly reduced. For your original workload with 7k mappers, 7k
reducers, and 5 machines, the total metadata size should have decreased
from ~3.3 GB to ~80 MB.


On Tue, Oct 29, 2013 at 9:07 AM, Aaron Davidson ilike...@gmail.com wrote:

 Great! Glad to hear it worked out. Spark definitely has a pain point about
 deciding the right number of partitions, and I think we're going to be
 spending a lot of time trying to reduce that issue.

 Currently working on the patch to reduce the shuffle file block overheads,
 but in the meantime, you can set spark.shuffle.consolidateFiles=false to
 exchange OOMEs due to too many partitions for worse performance (probably
 an acceptable tradeoff).



 On Mon, Oct 28, 2013 at 2:31 PM, Stephen Haberman 
 stephen.haber...@gmail.com wrote:

 Hey guys,

 As a follow up, I raised our target partition size to 600mb (up from
 64mb), which split this report's 500gb of tiny S3 files into ~700
 partitions, and everything ran much smoother.

 In retrospect, this was the same issue we'd ran into before, having too
 many partitions, and had previously solved by throwing some guesses at
 coalesce to make it magically go away.

 But now I feel like we have a much better understanding of why the
 numbers need to be what they are, which is great.

 So, thanks for all the input and helping me understand what's going on.

 It'd be great to see some of the optimizations to BlockManager happen,
 but I understand in the end why it needs to track what it does. And I
 was also admittedly using a small cluster anyway.

 - Stephen





Re: Executor could not connect to Driver?

2013-11-01 Thread Aaron Davidson
I've seen this happen before due to the driver doing long GCs when the
driver machine was heavily memory-constrained. For this particular issue,
simply freeing up memory used by other applications fixed the problem.


On Fri, Nov 1, 2013 at 12:14 AM, Liu, Raymond raymond@intel.com wrote:

 Hi

 I am encounter an issue that the executor actor could not connect to
 Driver actor. But I could not figure out what's the reason.

 Say the Driver actor is listening on :35838

 root@sr434:~# netstat -lpv
 Active Internet connections (only servers)
 Proto Recv-Q Send-Q Local Address   Foreign Address State
   PID/Program name
 tcp0  0 *:50075 *:* LISTEN
  18242/java
 tcp0  0 *:50020 *:* LISTEN
  18242/java
 tcp0  0 *:ssh   *:* LISTEN
  1325/sshd
 tcp0  0 *:50010 *:* LISTEN
  18242/java
 tcp6   0  0 sr434:35838 [::]:*  LISTEN
  9420/java
 tcp6   0  0 [::]:40390  [::]:*  LISTEN
  9420/java
 tcp6   0  0 [::]:4040   [::]:*  LISTEN
  9420/java
 tcp6   0  0 [::]:8040   [::]:*  LISTEN
  28324/java
 tcp6   0  0 [::]:60712  [::]:*  LISTEN
  28324/java
 tcp6   0  0 [::]:8042   [::]:*  LISTEN
  28324/java
 tcp6   0  0 [::]:34028  [::]:*  LISTEN
  9420/java
 tcp6   0  0 [::]:ssh[::]:*  LISTEN
  1325/sshd
 tcp6   0  0 [::]:45528  [::]:*  LISTEN
  9420/java
 tcp6   0  0 [::]:13562  [::]:*  LISTEN
  28324/java


 while the executor driver report errors as below :

 13/11/01 13:16:43 INFO executor.CoarseGrainedExecutorBackend: Connecting
 to driver: akka://spark@sr434:35838/user/CoarseGrainedScheduler
 13/11/01 13:16:43 ERROR executor.CoarseGrainedExecutorBackend: Driver
 terminated or disconnected! Shutting down.

 Any idea?

 Best Regards,
 Raymond Liu



Re: repartitioning RDDS

2013-10-31 Thread Aaron Davidson
Stephen is exactly correct, I just wanted to point out that in Spark 0.8.1
and above, the repartition function has been added to be a clearer way to
accomplish what you want. (Coalescing into a larger number of partitions
doesn't make much linguistic sense.)


On Thu, Oct 31, 2013 at 7:48 AM, Stephen Haberman 
stephen.haber...@gmail.com wrote:


  Is it possible to repartition RDDs other than by the coalesce method.
  I am primarily interested in making finer grained partitioning or
  rebalancing an unbalanced parttioning, without coalescing.

 I believe if you use the shuffle=true parameter, coalesce will do what
 you want, and essentially becomes a general repartition method.

 Specifically, yes, while shuffle=false can only make larger partitions,
 but with shuffle=true, you can break your partitions up into many
 smaller partitions, with the content based on a hash partitioner.

 I believe that's what you're asking for?

 - Stephen





Re: Spark cluster memory configuration for spark-shell

2013-10-29 Thread Aaron Davidson
You are correct. If you are just using spark-shell in local mode (i.e.,
without cluster), you can set the SPARK_MEM environment variable to give
the driver more memory. E.g.:
SPARK_MEM=24g ./spark-shell

Otherwise, if you're using a real cluster, the driver shouldn't require a
significant amount of memory, so SPARK_MEM should not have to be used.


On Tue, Oct 29, 2013 at 12:40 PM, Soumya Simanta
soumya.sima...@gmail.comwrote:

 I'm new to Spark. I want to try out a few simple example from the Spark
 shell. However, I'm not sure how to configure it so that I can make the
 max. use of memory on my workers.

 On average I've around 48 GB of RAM on each node on my cluster. I've
 around 10 nodes.

 Based on the documentation I could find memory based configuration in two
 places.

 *1. $SPARK_INSTALL_DIR/dist/conf/spark-env.sh *

 *SPARK_WORKER_MEMORY* Total amount of memory to allow Spark applications
 to use on the machine, e.g. 1000m, 2g (default: total memory minus 1 GB);
 note that each application's *individual* memory is configured using its
 spark.executor.memory property.
 *2. spark.executor.memory JVM flag. *
  spark.executor.memory512m Amount of memory to use per executor process,
 in the same format as JVM memory strings (e.g. 512m, 2g).

 http://spark.incubator.apache.org/docs/latest/configuration.html#system-properties

 In my case I want to use the max. memory possible on each node. My
 understanding is that I don't have to change *SPARK_WORKER_MEMORY *and I
 will have to increase spark.executor.memory to something big (e.g., 24g or
 32g). Is this correct? If yes, what is the correct way of setting this
 property if I just want to use the spark-shell.


 Thanks.
 -Soumya




Re: gc/oome from 14,000 DiskBlockObjectWriters

2013-10-25 Thread Aaron Davidson
Snappy sounds like it'd be a better solution here. LZF requires a pretty
sizeable buffer per stream (accounting for the 300k you're seeing). It
looks like you have 7000 reducers, and each one requires an LZF-compressed
stream. Snappy has a much lower overhead per stream, so I'd give it a try.

Thanks, by the way, for the very detailed problem description and trying it
on Spark 0.8! Hopefully we can get this issue resolved.


On Fri, Oct 25, 2013 at 8:30 AM, Stephen Haberman 
stephen.haber...@gmail.com wrote:

  We're moving from Spark 0.7.x to Spark 0.8 (or master actually, to get
  the latest shuffle improvement)

 FWIW I went back from Spark master to Spark 0.8 and am seeing the same
 behavior: OOMEs because of ~14,000 DiskBlockObjectWriters with 300k of
 memory each.

 So, it is not a change from the recent shuffle patch.

 - Stephen




Re: Broken link in quickstart

2013-10-22 Thread Aaron Davidson
Thanks for the heads up! I have submitted a pull request to fix it
(#98https://github.com/apache/incubator-spark/pull/98),
so it should be corrected soon. In the meantime, if anyone is curious, the
real link should be
http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.RDD
.


On Tue, Oct 22, 2013 at 3:21 AM, Sebastian Schelter s...@apache.org wrote:

 No, the link to

 https://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.RDD
 gives a 404.


 On 22.10.2013 12:19, Jesvin Jose wrote:
  Do you mean the error that the SSL certificates are valid only for *.
  apache.org?
 
 
  On Tue, Oct 22, 2013 at 12:59 PM, Sebastian Schelter s...@apache.org
 wrote:
 
  The link to the RDD API doc on
 
 
 https://spark.incubator.apache.org/docs/latest/scala-programming-guide.html#transformations
  points to a non-existing page.
 
  Best,
  Sebastian
 
 
 
 




Re: Help with Initial Cluster Configuration / Tuning

2013-10-22 Thread Aaron Davidson
On the other hand, I totally agree that memory usage in Spark is rather
opaque, and is one area where we could do a lot better at in terms of
communicating issues, through both docs and instrumentation. At least with
serialization and such, you can get meaningful exceptions (hopefully), but
OOMs are just blanket something wasn't right somewhere. Debugging them
empirically would require deep diving into Spark's heap allocations, which
requires a lot more knowledge of Spark internals than should be required
for general usage.


On Tue, Oct 22, 2013 at 9:22 AM, Mark Hamstra m...@clearstorydata.comwrote:

 Yes, but that also illustrates the problem faced by anyone trying to write
 a little white paper or guide lines to make newbies' experience painless.
  Distributed computing clusters are necessarily complex things, and
 problems can crop up in multiple locations, layers or subsystems.  It's
 just not feasible to quickly bring up to speed someone with no experience
 in distributed programming and cluster systems.  It takes a lot of
 knowledge, both broad and deep.  Very few people have the complete scope of
 knowledge and experience required, so creating, debugging and maintaining a
 cluster computing application almost always has to be a team effort.

 Support organizations and communities can replace some of the need for a
 knowledgeable and well-functioning team, but not all of it; and at some
 point you have to expect that debugging is going to take a considerable
 amount of painstaking, systematic effort -- including a close reading of
 the available docs.

 Several people are working on making more and better reference and
 training material available, and some of that will include trouble-shooting
 guidance, but that doesn't mean that there can ever be one little paper
 to solve newbies' (or more experienced developers') problems or provide
 adequate guidance.  There's just too much to cover and too many different
 kinds or levels of initial-user knowledge to make that completely feasible.



 On Tue, Oct 22, 2013 at 8:50 AM, Shay Seng s...@1618labs.com wrote:

 Hey Mark, I didn't mean to say that the information isn't out there --
 just that when something goes wrong with spark, the scope of what could be
 wrong is so large - some bad setting with JVM, serializer, akka, badly
 written scala code, algorithm wrong, check worker logs, check executor
 stderrs, 

 When I looked at this post this morning, my initial thought wasn't that
 countByValue would be at fault. ...probably since I've only been using
 Scala/Spark for a month or so.

 It was just a suggestion to help newbies come up to speed more quickly
 and gain insights into how to debug issues.


 On Tue, Oct 22, 2013 at 8:14 AM, Mark Hamstra m...@clearstorydata.comwrote:

 There's no need to guess at that.  The docs tell you directly:

 def countByValue(): Map[T, Long]

 Return the count of each unique value in this RDD as a map of (value,
 count) pairs. The final combine step happens locally on the master,
 equivalent to running a single reduce task.



 On Tue, Oct 22, 2013 at 7:22 AM, Shay Seng s...@1618labs.com wrote:

 Hi Matei,

 I've seen several memory tuning queries on this mailing list, and also
 heard the same kinds of queries at the spark meetup. In fact the last
 bullet point in Josh Carver(?) slides, the guy from Bizo, was memory
 tuning is still a mystery.

 I certainly had lots of issues in when I first started. From memory
 issues to gc issues, things seem to run fine until you try something with
 500GB of data etc.

 I was wondering if you could write up a little white paper or some
 guide lines on how to set memory values, and what to look at when something
 goes wrong? Eg. I would never gave guessed that countByValue happens on a
 single machine etc.
  On Oct 21, 2013 6:18 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 Hi there,

 The problem is that countByValue happens in only a single reduce task
 -- this is probably something we should fix but it's basically not 
 designed
 for lots of values. Instead, do the count in parallel as follows:

 val counts = mapped.map(str = (str, 1)).reduceByKey((a, b) = a + b)

 If this still has trouble, you can also increase the level of
 parallelism of reduceByKey by passing it a second parameter for the number
 of tasks (e.g. 100).

 BTW one other small thing with your code, flatMap should actually work
 fine if your function returns an Iterator to Traversable, so there's no
 need to call toList and return a Seq in ngrams; you can just return an
 Iterator[String].

 Matei

 On Oct 21, 2013, at 1:05 PM, Timothy Perrigo tperr...@gmail.com
 wrote:

  Hi everyone,
  I am very new to Spark, so as a learning exercise I've set up a
 small cluster consisting of 4 EC2 m1.large instances (1 master, 3 slaves),
 which I'm hoping to use to calculate ngram frequencies from text files of
 various sizes (I'm not doing anything with them; I just thought this would
 be slightly more interesting 

Re: Spark REPL produces error on a piece of scala code that works in pure Scala REPL

2013-10-12 Thread Aaron Davidson
Out of curiosity, does the Scala 2.10 Spark interpreter patch
fix this using macros as Matei suggests in the linked discussion? Or is
that still future work, but now possible?


On Fri, Oct 11, 2013 at 6:04 PM, Reynold Xin r...@apache.org wrote:

 This is a known problem and has to do with peculiarity of the Scala shell:


 https://groups.google.com/forum/#!searchin/spark-users/error$3A$20type$20mismatch|sort:relevance/spark-users/bwAmbUgxWrA/HwP4Nv4adfEJ


 On Fri, Oct 11, 2013 at 6:01 PM, Aaron Davidson ilike...@gmail.comwrote:

 Playing around with this a little more, it seems that classOf[Animal] is
 this.Animal in Spark and Animal in normal Scala.

 Also, trying to do something like this:
 class Zoo[A : *this.*Animal](thing: A) { }

 works in Scala but throws a weird error in Spark:
 error: type Animal is not a member of this.$iwC


 On Fri, Oct 11, 2013 at 4:55 PM, Shay Seng s...@1618labs.com wrote:

 Hey,
 I seeing a funny situation where a piece of code executes in a pure
 Scala REPL but not in a Spark-shell.
 I'm using Scala 2.9.3 with Spark 0.8.0

 In Spark I see:
 class Animal() {
 def says():String = ???
 }

 val amimal = new Animal
 amimal: this.Animal = Animal@df27cd5

 class Zoo[A : Animal](thing: A) {
 def whoami()=thing.getClass
 def chat()=thing.says
 }

 val z = new Zoo[Animal](amimal)
 console:16: error: type mismatch;
  found   : this.Animal
  required: this.Animal
val z = new Zoo[Animal](amimal)
  ^

 But if I run the exact code in the scala REPL:

 val z = new Zoo[Animal](amimal)
 z: Zoo[Animal] = Zoo@738ff53f


 Both repl report using scala 2.9.3
 Spark: Using Scala version 2.9.3 (Java HotSpot(TM) 64-Bit Server VM,
 Java 1.7.0_40)
 Scala: Welcome to Scala version 2.9.3 (Java HotSpot(TM) 64-Bit Server
 VM, Java 1.7.0_40).
 Any ideas?

 tks,
 Shay






Re: Spark REPL produces error on a piece of scala code that works in pure Scala REPL

2013-10-11 Thread Aaron Davidson
Playing around with this a little more, it seems that classOf[Animal] is
this.Animal in Spark and Animal in normal Scala.

Also, trying to do something like this:
class Zoo[A : *this.*Animal](thing: A) { }

works in Scala but throws a weird error in Spark:
error: type Animal is not a member of this.$iwC


On Fri, Oct 11, 2013 at 4:55 PM, Shay Seng s...@1618labs.com wrote:

 Hey,
 I seeing a funny situation where a piece of code executes in a pure Scala
 REPL but not in a Spark-shell.
 I'm using Scala 2.9.3 with Spark 0.8.0

 In Spark I see:
 class Animal() {
 def says():String = ???
 }

 val amimal = new Animal
 amimal: this.Animal = Animal@df27cd5

 class Zoo[A : Animal](thing: A) {
 def whoami()=thing.getClass
 def chat()=thing.says
 }

 val z = new Zoo[Animal](amimal)
 console:16: error: type mismatch;
  found   : this.Animal
  required: this.Animal
val z = new Zoo[Animal](amimal)
  ^

 But if I run the exact code in the scala REPL:

 val z = new Zoo[Animal](amimal)
 z: Zoo[Animal] = Zoo@738ff53f


 Both repl report using scala 2.9.3
 Spark: Using Scala version 2.9.3 (Java HotSpot(TM) 64-Bit Server VM, Java
 1.7.0_40)
 Scala: Welcome to Scala version 2.9.3 (Java HotSpot(TM) 64-Bit Server VM,
 Java 1.7.0_40).
 Any ideas?

 tks,
 Shay



Re: spark_ec2 script in 0.8.0 and mesos

2013-10-08 Thread Aaron Davidson
Also, please post feature requests here: http://spark-project.atlassian.net
Make sure to search prior to posting to avoid duplicates.


On Tue, Oct 8, 2013 at 11:50 AM, Matei Zaharia matei.zaha...@gmail.comwrote:

 Hi Shay,

 We actually don't support Mesos in the EC2 scripts anymore -- sorry about
 that. If you want to deploy Mesos on EC2, I'd recommend looking at Mesos's
 own EC2 scripts. Then it's fairly easy to launch Spark on there. If you
 want to deploy Mesos locally you can go through the Spark docs for that.

 Matei

 On Oct 8, 2013, at 10:15 AM, Shay Seng s...@1618labs.com wrote:


 Hi,
 The http://spark.incubator.apache.org/docs/latest/running-on-mesos.html;
 running on mesos web page says that the spark_ec2 script provides and easy
 way to launch spark cluster with mesos configured.

 In 0.7.3, the spark_ec2 script had a --cluster-type option, but that seems
 to be taken out of the 0.8.0 version...

 Any tips on how I can configure and run spark on mesos?

 Also I have a couple of suggestions for the spark_ec2 scripts is there
 somewhere I can post feature requests?

 tks,
 Shay





Re: spark through vpn, SPARK_LOCAL_IP

2013-10-05 Thread Aaron Davidson
You might try also setting spark.driver.host to the correct IP in the
conf/spark-env.sh SPARK_JAVA_OPTs as well.

e.g.,
-Dspark.driver.host=192.168.250.47



On Sat, Oct 5, 2013 at 2:45 PM, Aaron Babcock aaron.babc...@gmail.comwrote:

 Hello,

 I am using spark through a vpn. My driver machine ends up with two ip
 addresses, one routable from the cluster and one not.

 Things generally work when I set the SPARK_LOCAL_IP environment
 variable to the proper ip address.

 However, when I try to use the take function ie: myRdd.take(1),  I run
 into a hiccup. From the logfiles on the workers I can see that they
 trying to connect to the nonroutable ip address, they are not
 respecting SPARK_LOCAL_IP somehow.

 Here is the relevant worker log snippet, 192.168.250.47 is the correct
 routable ip address of the driver, 192.168.0.7 is the incorrect
 address of the driver. Any thoughts about what else I need to
 configure?

 13/10/05 16:17:36 INFO ConnectionManager: Accepted connection from
 [192.168.250.47/192.168.250.47]
 13/10/05 16:18:41 WARN SendingConnection: Error finishing connection
 to /192.168.0.7:60513
 java.net.ConnectException: Connection timed out
 at sun.nio.ch.SocketChannelImpl.$$YJP$$checkConnect(Native Method)
 at sun.nio.ch.SocketChannelImpl.checkConnect(SocketChannelImpl.java)
 at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
 at spark.network.SendingConnection.finishConnect(Connection.scala:221)
 at
 spark.network.ConnectionManager.spark$network$ConnectionManager$$run(ConnectionManager.scala:127)
 at spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:70)
 13/10/05 16:18:41 INFO ConnectionManager: Handling connection error on
 connection to ConnectionManagerId(192.168.0.7,60513)
 13/10/05 16:18:41 INFO ConnectionManager: Removing SendingConnection
 to ConnectionManagerId(192.168.0.7,60513)