Re: Hanging tasks in spark 1.2.1 while working with 1.1.1

2015-03-17 Thread Dmitriy Lyubimov
FWIW observed similar behavior in similar situation. Was able to work
around by forcefully committing one of the rdds right before the union
into cache, and forcing that by executing take(1). Nothing else ever
helped.

Seems like yet-undiscovered 1.2.x thing.

On Tue, Mar 17, 2015 at 4:21 PM, Eugen Cepoi cepoi.eu...@gmail.com wrote:
 Doing the reduceByKey without changing the number of partitions and then do
 a coalesce works.
 But the other version still hangs, without any information (while working
 with spark 1.1.1). The previous logs don't seem to be related to what
 happens.
 I don't think this is a memory issue as the GC time remains low and the
 shuffle read is small. My guess is that it might be related to a high number
 of initial partitions, but in that case shouldn't it fail for coalesce
 too...?

 Does anyone have an idea where to look at to find what the source of the
 problem is?

 Thanks,
 Eugen

 2015-03-13 19:18 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com:

 Hum increased it to 1024 but doesn't help still have the same problem :(

 2015-03-13 18:28 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com:

 The one by default 0.07 of executor memory. I'll try increasing it and
 post back the result.

 Thanks

 2015-03-13 18:09 GMT+01:00 Ted Yu yuzhih...@gmail.com:

 Might be related: what's the value for
 spark.yarn.executor.memoryOverhead ?

 See SPARK-6085

 Cheers

 On Fri, Mar 13, 2015 at 9:45 AM, Eugen Cepoi cepoi.eu...@gmail.com
 wrote:

 Hi,

 I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1.
 Strange thing, the exact same code does work (after upgrade) in the
 spark-shell. But this information might be misleading as it works with
 1.1.1...


 The job takes as input two data sets:
  - rdd A of +170gb (with less it is hard to reproduce) and more than
 11K partitions
  - rdd B of +100mb and 32 partitions

 I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am
 not sure the executor config is relevant here. Anyway I tried with 
 multiple
 small executors with fewer ram and the inverse.


 The job basically does this:
 A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save

 After the flatMap rdd A size is much smaller similar to B.

 Configs I used to run this job:

 storage.memoryFraction: 0
 shuffle.memoryFraction: 0.5

 akka.timeout 500
 akka.frameSize 40

 // this one defines also the memory used by yarn master, but not sure
 if it needs to be important
 driver.memory 5g
 excutor.memory 4250m

 I have 7 executors with 2 cores.

 What happens:
 The job produces two stages: keyBy and save. The keyBy stage runs fine
 and produces a shuffle write of ~150mb. The save stage where the suffle 
 read
 occurs hangs. Greater the initial dataset is more tasks hang.

 I did run it for much larger datasets with same config/cluster but
 without doing the union and it worked fine.

 Some more infos and logs:

 Amongst 4 nodes 1 finished all his tasks and the running ones are on
 the 3 other nodes. But not sure this is a good information (one node that
 completed all his work vs the others) as with some smaller dataset I 
 manage
 to get only one hanging task.

 Here are the last parts of the executor logs that show some timeouts.

 An executor from node ip-10-182-98-220

 15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6
 remote fetches in 66 ms
 15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in
 connection from /10.181.48.153:56806
 java.io.IOException: Connection timed out


 An executor from node ip-10-181-103-186

 15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6
 remote fetches in 20 ms
 15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in
 connection from /10.182.98.220:38784
 java.io.IOException: Connection timed out

 An executor from node ip-10-181-48-153 (all the logs bellow belong this
 node)

 15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage
 1.0 (TID 13860). 802 bytes result sent to driver
 15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in
 connection from /10.181.103.186:46381
 java.io.IOException: Connection timed out

 Followed by many

 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending
 result 
 ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=2064203432016,
 chunkIndex=405},
 buffer=FileSegmentManagedBuffer{file=/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1426256247374_0002/spark-1659efcd-c6b6-4a12-894d-e869486d3d00/35/shuffle_0_9885_0.data,
 offset=8631, length=571}} to /10.181.103.186:46381; closing connection
 java.nio.channels.ClosedChannelException

 with last one being

 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending
 result RpcResponse{requestId=7377187355282895939, response=[B@6fcd0014} to
 /10.181.103.186:46381; closing connection
 java.nio.channels.ClosedChannelException


 The executors from the node that finished his tasks doesn't show
 anything 

Task result deserialization error (1.1.0)

2015-01-20 Thread Dmitriy Lyubimov
Hi,

I am getting task result deserialization error (kryo is enabled). Is it
some sort of `chill` registration issue at front end?

This is application that lists spark as maven dependency (so it gets
correct hadoop and chill dependencies in classpath, i checked).

Thanks in advance.

15/01/20 18:21:51 ERROR TaskResultGetter: Exception while getting task
result
java.lang.ArrayStoreException: scala.Tuple1
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at


Re: Upgrade to Spark 1.1.0?

2014-10-20 Thread Dmitriy Lyubimov
Mahout context does not include _all_ possible transitive dependencies.
Would not be lighting fast to take all legacy etc. dependencies.

There's an ignored unit test that asserts context path correctness. you
can uningnore it and run to verify it still works as ex[ected.The reason
it is set to ingored is because it requires mahout environment + already
built mahout in order to run successfully. i can probably look it up if you
don't find it immediately.


Now.
mahout context only includes what's really used in the drm algebra. Which
is just a handful of jars. Apache commons math is not one of them.

But, your driver can add it when creating mahout context, by tinkering
additionally with the method parameters there (such as spark config).
However, you may incounter a problem which may be that mahout assembly
currently may not build -- and copy -- commons math jar into any of mahout
tree.

Finally, i am against adding commons-math by default, as general algebra
does not depend on it. I'd suggest, in order of preference, (1) get rid of
relying on commons math random generator (surely, by now we should be ok
with scala.Random or even standard random?), or (2) add dependency in a
custom way per above.

If there's an extremely compelling reason why commons-math random gen
dependency cannot be eliminated, then a better way is to include commons
math into assembly (i think right now the only assembly that really copies
in dependencies is the examples; which is probably wrong as examples are
not the core product here), and add it explicitly to createMahoutContext
(or whatever that method's name was) code.

My understanding is the random from utils was mainly encouraged because it
is automatically made deterministic in tests. I am unaware any fundamental
deficiencies of scala random w.r.t its uses in existing methods. So perhaps
scala side needs its own RandomUtils for testing that do not rely on
commons math.


On Sun, Oct 19, 2014 at 4:36 PM, Pat Ferrel p...@occamsmachete.com wrote:

 Trying to upgrade from Spark 1.0.1 to 1.1.0. Can’t imagine the upgrade is
 the problem but anyway...

 I get a NoClassDefFoundError for RandomGenerator when running a driver
 from the CLI. But only when using a named master, even a standalone master.
 If I run using master = local[4] the job executes correctly but if I set
 the master to spark://Maclaurin.local:7077 though they are the same machine
 I get the NoClassDefFoundError. The classpath seems correct on the CLI and
 the jars do indeed contain the offending class (see below). There must be
 some difference in how classes are loaded between local[4] and
 spark://Maclaurin.local:7077?

 Any ideas?

 ===

 The driver is in mahout-spark_2.10-1.0-SNAPSHOT-job.jar so it’s execution
 means it must be in the classpath. When I look at what’s in the jar I see
 RandomGenerator.

 Maclaurin:target pat$ jar tf mahout-spark_2.10-1.0-SNAPSHOT-job.jar | grep
 RandomGenerator
 cern/jet/random/engine/RandomGenerator.class
 org/apache/commons/math3/random/GaussianRandomGenerator.class
 org/apache/commons/math3/random/JDKRandomGenerator.class
 org/apache/commons/math3/random/UniformRandomGenerator.class
 org/apache/commons/math3/random/RandomGenerator.class  ==!
 org/apache/commons/math3/random/NormalizedRandomGenerator.class
 org/apache/commons/math3/random/AbstractRandomGenerator.class
 org/apache/commons/math3/random/StableRandomGenerator.class

 But get the following error executing the job:

 14/10/19 15:39:00 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
 6.9 (TID 84, 192.168.0.2): java.lang.NoClassDefFoundError:
 org/apache/commons/math3/random/RandomGenerator
 org.apache.mahout.common.RandomUtils.getRandom(RandomUtils.java:65)

 org.apache.mahout.math.cf.SimilarityAnalysis$$anonfun$5.apply(SimilarityAnalysis.scala:272)

 org.apache.mahout.math.cf.SimilarityAnalysis$$anonfun$5.apply(SimilarityAnalysis.scala:267)

 org.apache.mahout.sparkbindings.blas.MapBlock$$anonfun$1.apply(MapBlock.scala:33)

 org.apache.mahout.sparkbindings.blas.MapBlock$$anonfun$1.apply(MapBlock.scala:32)
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

 org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235)

 org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
 org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 

Re: Spark QL and protobuf schema

2014-08-21 Thread Dmitriy Lyubimov
ok i'll try. happen to do that a lot to other tools.

So I am guessing you are saying if i wanted to do it now, i'd start against
https://github.com/apache/spark/tree/branch-1.1 and PR against it?


On Thu, Aug 21, 2014 at 12:28 AM, Michael Armbrust mich...@databricks.com
wrote:

 I do not know of any existing way to do this.  It should be possible using
 the new public API for applying schema (will be available in 1.1) to an
 RDD.  Basically you'll need to convert the proto buff records into rows,
 and also create a StructType that represents the schema.  With this two
 things you can all the applySchema method on SparkContext.

 Would be great if you could contribute this back.


 On Wed, Aug 20, 2014 at 5:57 PM, Dmitriy Lyubimov dlie...@gmail.com
 wrote:

 Hello,

 is there any known work to adapt protobuf schema to Spark QL data
 sourcing? If not, would it present interest to contribute one?

 thanks.
 -d





Re: MLLib : Math on Vector and Matrix

2014-07-03 Thread Dmitriy Lyubimov
On Wed, Jul 2, 2014 at 11:40 PM, Xiangrui Meng men...@gmail.com wrote:

 Hi Dmitriy,

 It is sweet to have the bindings, but it is very easy to downgrade the
 performance with them. The BLAS/LAPACK APIs have been there for more
 than 20 years and they are still the top choice for high-performance
 linear algebra.


There's no such limitation there. In fact, LAPACK/jblas is very easy fruit
to have there.

 algebraic optimizer is not about so much about in-core block-on-block
techniques. It is about optimizing/simplification of algebraic expressions,
especially their distributed plans/side of things. Another side of the
story is consistent matrix representation for block-2-block in-memory
computations and passing stuff in and out. R-like look  feel.

It is true that in-core-only computations currently are not deferrably
optimized, nor do they have LAPack back but this is a low hanging fruit
there. the main idea is consistency of algebraic API/DSL, be it distributed
or in-core, and having algebraic optimizer, and pluggable backs (both
in-core backs or distributed engine backs as well).

It is so happened the only in-memory back right now is Mahout's Colt
derivation, but there's fairly little reason not to pick-plug
Breeze/Lapack, or say GPU -backed representations. In fact, that was my
first attempt a year ago (Breeze) but it unfortunately was not where it
needed to be (not sure about now).

As for LAPack, yes it is easy to integrate. But the only reason I
(personally) haven't integrated it yet is because my problems tend to be
sparse, not dense, and also fairly invasive in terms of custom matrix
traversals (probabilistic fitting, for the most part). So most specifically
tweaked methodologies are thus really more quasi-algebraic than purely
algebraic, unfortunately. Having LAPack blockwise operartors on dense
matrices would not help me terribly there.

But the architectural problem in terms of foundation, and, more
specifically, customization of processes IMO does exist here (in mllib).
This thread (and there was another one just like this a few threads below
this one) are read by me as the manifestation of such lack of algebraic
foundation apis/optimizers.


Re: MLLib : Math on Vector and Matrix

2014-07-02 Thread Dmitriy Lyubimov
in my humble opinion Spark should've supported linalg a-la [1] before it
even started dumping methodologies into mllib.

[1] http://mahout.apache.org/users/sparkbindings/home.html


On Wed, Jul 2, 2014 at 2:16 PM, Thunder Stumpges thunder.stump...@gmail.com
 wrote:

 Thanks. I always hate having to do stuff like this. It seems like they
 went a bit overboard with all the private[mllib] declarations... possibly
 all in the name of thou shalt not change your public API. If you don't
 make your public API usable, we end up having to work around it anyway...

 Oh well.

 Thunder



 On Wed, Jul 2, 2014 at 2:05 PM, Koert Kuipers ko...@tresata.com wrote:

 i did the second option: re-implemented .toBreeze as .breeze using pimp
 classes


 On Wed, Jul 2, 2014 at 5:00 PM, Thunder Stumpges 
 thunder.stump...@gmail.com wrote:

 I am upgrading from Spark 0.9.0 to 1.0 and I had a pretty good amount of
 code working with internals of MLLib. One of the big changes was the move
 from the old jblas.Matrix to the Vector/Matrix classes included in MLLib.

 However I don't see how we're supposed to use them for ANYTHING other
 than a container for passing data to the included APIs... how do we do any
 math on them? Looking at the internal code, there are quite a number of
 private[mllib] declarations including access to the Breeze representations
 of the classes.

 Was there a good reason this was not exposed? I could see maybe not
 wanting to expose the 'toBreeze' function which would tie it to the breeze
 implementation, however it would be nice to have the various mathematics
 wrapped at least.

 Right now I see no way to code any vector/matrix math without moving my
 code namespaces into org.apache.spark.mllib or duplicating the code in
 'toBreeze' in my own util functions. Not very appealing.

 What are others doing?
 thanks,
 Thunder






Re: Why Scala?

2014-05-29 Thread Dmitriy Lyubimov
There were few known concerns about Scala, and some still are, but having
been doing Scala professionally over two years now, i learned to master and
appreciate the advanatages.

Major concern IMO is Scala in a less-than-scrupulous corporate environment.

First, Scala requires significantly more discipline in commenting and style
to still stay painlessly readable, than java. People with less than stellar
code hygiene can easily turn a project into an unmaintainable mess.

Second, from corporate management prospective, it is (still?) much harder
to staff with Scala coders as opposed to Java ones.

All these things are a headache for corporate bosses, but for public and
academic projects with thorough peer review and increased desire for
contributors to look clean in public it works out quite well, and strong
sides really shine.

Spark specifically builds around FP patterns -- such as monads and functors
-- which were absent in java prior to  8 (i am not sure that they are as
well worked out in java 8 collections even now, as opposed to Scala
collections). So java 8 simply comes a little late to the show in that
department.

Also FP is not the only thing that is used by Spark. Spark also uses stuff
like implicits, akka/agent framework for IPC. Let's not forget that FP is
albeit important but only one out of many  stories in Scala in the grand
scale of things.


On Thu, May 29, 2014 at 1:55 PM, Nick Chammas nicholas.cham...@gmail.comwrote:

 I recently discovered Hacker News and started reading through older posts
 about Scala https://hn.algolia.com/?q=scala#!/story/forever/0/scala. It
 looks like the language is fairly controversial on there, and it got me
 thinking.

 Scala appears to be the preferred language to work with in Spark, and
 Spark itself is written in Scala, right?

 I know that often times a successful project evolves gradually out of
 something small, and that the choice of programming language may not always
 have been made consciously at the outset.

 But pretending that it was, why is Scala the preferred language of Spark?

 Nick


 --
 View this message in context: Why 
 Scala?http://apache-spark-user-list.1001560.n3.nabble.com/Why-Scala-tp6536.html
 Sent from the Apache Spark User List mailing list 
 archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.



Re: How to use Mahout VectorWritable in Spark.

2014-05-15 Thread Dmitriy Lyubimov
PS spark shell with all proper imports are also supported natively in
Mahout (mahout spark-shell command). See M-1489 for specifics. There's also
a tutorial somewhere but i suspect it has not been yet finished/publised
via public link yet. Again, you need trunk to use spark shell there.


On Wed, May 14, 2014 at 12:43 AM, Stuti Awasthi stutiawas...@hcl.comwrote:

 Hi Xiangrui,
 Thanks for the response .. I tried few ways to include mahout-math jar
 while launching Spark shell.. but no success.. Can you please point what I
 am doing wrong

 1. mahout-math.jar exported in CLASSPATH, and PATH
 2. Tried Launching Spark Shell by :  MASTER=spark://HOSTNAME:PORT
 ADD_JARS=~/installations/work-space/mahout-math-0.7.jar
 park-0.9.0/bin/spark-shell

  After launching, I checked the environment details on WebUi: It looks
 like mahout-math jar is included.
 spark.jars  /home/hduser/installations/work-space/mahout-math-0.7.jar

 Then I try :
 scala import org.apache.mahout.math.VectorWritable
 console:10: error: object mahout is not a member of package org.apache
import org.apache.mahout.math.VectorWritable

 scala val raw = sc.sequenceFile(path, classOf[Text],
 classOf[VectorWritable])
 console:12: error: not found: type Text
val data =
 sc.sequenceFile(/stuti/ML/Clustering/KMeans/HAR/KMeans_dataset_seq/part-r-0,
 classOf[Text], classOf[VectorWritable])

^
 Im using Spark 0.9 and Hadoop 1.0.4 and Mahout 0.7

 Thanks
 Stuti



 -Original Message-
 From: Xiangrui Meng [mailto:men...@gmail.com]
 Sent: Wednesday, May 14, 2014 11:56 AM
 To: user@spark.apache.org
 Subject: Re: How to use Mahout VectorWritable in Spark.

 You need

  val raw = sc.sequenceFile(path, classOf[Text],
  classOf[VectorWriteable])

 to load the data. After that, you can do

  val data = raw.values.map(_.get)

 To get an RDD of mahout's Vector. You can use `--jar mahout-math.jar` when
 you launch spark-shell to include mahout-math.

 Best,
 Xiangrui

 On Tue, May 13, 2014 at 10:37 PM, Stuti Awasthi stutiawas...@hcl.com
 wrote:
  Hi All,
 
  I am very new to Spark and trying to play around with Mllib hence
  apologies for the basic question.
 
 
 
  I am trying to run KMeans algorithm using Mahout and Spark MLlib to
  see the performance. Now initial datasize was 10 GB. Mahout converts
  the data in Sequence File Text,VectorWritable which is used for KMeans
 Clustering.
  The Sequence File crated was ~ 6GB in size.
 
 
 
  Now I wanted if I can use the Mahout Sequence file to be executed in
  Spark MLlib for KMeans . I have read that SparkContext.sequenceFile
  may be used here. Hence I tried to read my sequencefile as below but
 getting the error :
 
 
 
  Command on Spark Shell :
 
  scala val data = sc.sequenceFile[String,VectorWritable](/
  KMeans_dataset_seq/part-r-0,String,VectorWritable)
 
  console:12: error: not found: type VectorWritable
 
 val data = sc.sequenceFile[String,VectorWritable](
  /KMeans_dataset_seq/part-r-0,String,VectorWritable)
 
 
 
  Here I have 2 ques:
 
  1.  Mahout has “Text” as Key but Spark is printing “not found: type:Text”
  hence I changed it to String.. Is this correct ???
 
  2. How will VectorWritable be found in Spark. Do I need to include
  Mahout jar in Classpath or any other option ??
 
 
 
  Please Suggest
 
 
 
  Regards
 
  Stuti Awasthi
 
 
 
  ::DISCLAIMER::
  --
  --
  
 
  The contents of this e-mail and any attachment(s) are confidential and
  intended for the named recipient(s) only.
  E-mail transmission is not guaranteed to be secure or error-free as
  information could be intercepted, corrupted, lost, destroyed, arrive
  late or incomplete, or may contain viruses in transmission. The e mail
  and its contents (with or without referred errors) shall therefore not
  attach any liability on the originator or HCL or its affiliates.
  Views or opinions, if any, presented in this email are solely those of
  the author and may not necessarily reflect the views or opinions of
  HCL or its affiliates. Any form of reproduction, dissemination,
  copying, disclosure, modification, distribution and / or publication
  of this message without the prior written consent of authorized
  representative of HCL is strictly prohibited. If you have received
  this email in error please delete it and notify the sender
  immediately.
  Before opening any email and/or attachments, please check them for
  viruses and other defects.
 
  --
  --
  



Re: How to use Mahout VectorWritable in Spark.

2014-05-15 Thread Dmitriy Lyubimov
PPS The shell/spark tutorial i've mentioned is actually being developed in
MAHOUT-1542. As it stands, i believe it is now complete in its core.


On Wed, May 14, 2014 at 5:48 PM, Dmitriy Lyubimov dlie...@gmail.com wrote:

 PS spark shell with all proper imports are also supported natively in
 Mahout (mahout spark-shell command). See M-1489 for specifics. There's also
 a tutorial somewhere but i suspect it has not been yet finished/publised
 via public link yet. Again, you need trunk to use spark shell there.


 On Wed, May 14, 2014 at 12:43 AM, Stuti Awasthi stutiawas...@hcl.comwrote:

 Hi Xiangrui,
 Thanks for the response .. I tried few ways to include mahout-math jar
 while launching Spark shell.. but no success.. Can you please point what I
 am doing wrong

 1. mahout-math.jar exported in CLASSPATH, and PATH
 2. Tried Launching Spark Shell by :  MASTER=spark://HOSTNAME:PORT
 ADD_JARS=~/installations/work-space/mahout-math-0.7.jar
 park-0.9.0/bin/spark-shell

  After launching, I checked the environment details on WebUi: It looks
 like mahout-math jar is included.
 spark.jars  /home/hduser/installations/work-space/mahout-math-0.7.jar

 Then I try :
 scala import org.apache.mahout.math.VectorWritable
 console:10: error: object mahout is not a member of package org.apache
import org.apache.mahout.math.VectorWritable

 scala val raw = sc.sequenceFile(path, classOf[Text],
 classOf[VectorWritable])
 console:12: error: not found: type Text
val data =
 sc.sequenceFile(/stuti/ML/Clustering/KMeans/HAR/KMeans_dataset_seq/part-r-0,
 classOf[Text], classOf[VectorWritable])

^
 Im using Spark 0.9 and Hadoop 1.0.4 and Mahout 0.7

 Thanks
 Stuti



 -Original Message-
 From: Xiangrui Meng [mailto:men...@gmail.com]
 Sent: Wednesday, May 14, 2014 11:56 AM
 To: user@spark.apache.org
 Subject: Re: How to use Mahout VectorWritable in Spark.

 You need

  val raw = sc.sequenceFile(path, classOf[Text],
  classOf[VectorWriteable])

 to load the data. After that, you can do

  val data = raw.values.map(_.get)

 To get an RDD of mahout's Vector. You can use `--jar mahout-math.jar`
 when you launch spark-shell to include mahout-math.

 Best,
 Xiangrui

 On Tue, May 13, 2014 at 10:37 PM, Stuti Awasthi stutiawas...@hcl.com
 wrote:
  Hi All,
 
  I am very new to Spark and trying to play around with Mllib hence
  apologies for the basic question.
 
 
 
  I am trying to run KMeans algorithm using Mahout and Spark MLlib to
  see the performance. Now initial datasize was 10 GB. Mahout converts
  the data in Sequence File Text,VectorWritable which is used for
 KMeans Clustering.
  The Sequence File crated was ~ 6GB in size.
 
 
 
  Now I wanted if I can use the Mahout Sequence file to be executed in
  Spark MLlib for KMeans . I have read that SparkContext.sequenceFile
  may be used here. Hence I tried to read my sequencefile as below but
 getting the error :
 
 
 
  Command on Spark Shell :
 
  scala val data = sc.sequenceFile[String,VectorWritable](/
  KMeans_dataset_seq/part-r-0,String,VectorWritable)
 
  console:12: error: not found: type VectorWritable
 
 val data = sc.sequenceFile[String,VectorWritable](
  /KMeans_dataset_seq/part-r-0,String,VectorWritable)
 
 
 
  Here I have 2 ques:
 
  1.  Mahout has “Text” as Key but Spark is printing “not found:
 type:Text”
  hence I changed it to String.. Is this correct ???
 
  2. How will VectorWritable be found in Spark. Do I need to include
  Mahout jar in Classpath or any other option ??
 
 
 
  Please Suggest
 
 
 
  Regards
 
  Stuti Awasthi
 
 
 
  ::DISCLAIMER::
  --
  --
  
 
  The contents of this e-mail and any attachment(s) are confidential and
  intended for the named recipient(s) only.
  E-mail transmission is not guaranteed to be secure or error-free as
  information could be intercepted, corrupted, lost, destroyed, arrive
  late or incomplete, or may contain viruses in transmission. The e mail
  and its contents (with or without referred errors) shall therefore not
  attach any liability on the originator or HCL or its affiliates.
  Views or opinions, if any, presented in this email are solely those of
  the author and may not necessarily reflect the views or opinions of
  HCL or its affiliates. Any form of reproduction, dissemination,
  copying, disclosure, modification, distribution and / or publication
  of this message without the prior written consent of authorized
  representative of HCL is strictly prohibited. If you have received
  this email in error please delete it and notify the sender
  immediately.
  Before opening any email and/or attachments, please check them for
  viruses and other defects.
 
  --
  --
  





Re: How to use Mahout VectorWritable in Spark.

2014-05-15 Thread Dmitriy Lyubimov
Mahout now supports doing its distributed linalg natively on Spark so the
problem of sequence file input load into Spark is already solved there
 (trunk, http://mahout.apache.org/users/sparkbindings/home.html,
drmFromHDFS() call -- and then you can access to the direct rdd via rdd
matrix property if needed).

if you specifically try ensure interoperability with MLlib, however, I did
not try that -- however, Mahout's linalg  tits bindings to Spark works
with Kryo serializer only, so if/when MLLib  algorithms do not  support
kryo serializer, it would not be interoperable.

-d


On Tue, May 13, 2014 at 10:37 PM, Stuti Awasthi stutiawas...@hcl.comwrote:

  Hi All,

 I am very new to Spark and trying to play around with Mllib hence
 apologies for the basic question.



 I am trying to run KMeans algorithm using Mahout and Spark MLlib to see
 the performance. Now initial datasize was 10 GB. Mahout converts the data
 in Sequence File Text,VectorWritable which is used for KMeans
 Clustering.  The Sequence File crated was ~ 6GB in size.



 Now I wanted if I can use the Mahout Sequence file to be executed in Spark
 MLlib for KMeans . I have read that SparkContext.sequenceFile may be used
 here. Hence I tried to read my sequencefile as below but getting the error :



 Command on Spark Shell :

 scala val data = sc.sequenceFile[String,VectorWritable](/
 KMeans_dataset_seq/part-r-0,String,VectorWritable)

 console:12: error: not found: type VectorWritable

val data = sc.sequenceFile[String,VectorWritable](
 /KMeans_dataset_seq/part-r-0,String,VectorWritable)



 Here I have 2 ques:

 1.  Mahout has “Text” as Key but Spark is printing “not found: type:Text”
 hence I changed it to String.. Is this correct ???

 2. How will VectorWritable be found in Spark. Do I need to include Mahout
 jar in Classpath or any other option ??



 Please Suggest



 Regards

 Stuti Awasthi



 ::DISCLAIMER::

 

 The contents of this e-mail and any attachment(s) are confidential and
 intended for the named recipient(s) only.
 E-mail transmission is not guaranteed to be secure or error-free as
 information could be intercepted, corrupted,
 lost, destroyed, arrive late or incomplete, or may contain viruses in
 transmission. The e mail and its contents
 (with or without referred errors) shall therefore not attach any liability
 on the originator or HCL or its affiliates.
 Views or opinions, if any, presented in this email are solely those of the
 author and may not necessarily reflect the
 views or opinions of HCL or its affiliates. Any form of reproduction,
 dissemination, copying, disclosure, modification,
 distribution and / or publication of this message without the prior
 written consent of authorized representative of
 HCL is strictly prohibited. If you have received this email in error
 please delete it and notify the sender immediately.
 Before opening any email and/or attachments, please check them for viruses
 and other defects.


 



Re: Spark - ready for prime time?

2014-04-10 Thread Dmitriy Lyubimov
On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash and...@andrewash.com wrote:

 The biggest issue I've come across is that the cluster is somewhat
 unstable when under memory pressure.  Meaning that if you attempt to
 persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll
 often still get OOMs.  I had to carefully modify some of the space tuning
 parameters and GC settings to get some jobs to even finish.

 The other issue I've observed is if you group on a key that is highly
 skewed, with a few massively-common keys and a long tail of rare keys, the
 one massive key can be too big for a single machine and again cause OOMs.


My take on it -- Spark doesn't believe in sort-and-spill things to enable
super long groups, and IMO for a good reason. Here are my thoughts:

(1) in my work i don't need sort in 99% of the cases, i only need group
which absolutely doesn't need the spill which makes things slow down to a
crawl.
(2) if that's an aggregate (such as group count), use combine(), not
groupByKey -- this will do tons of good on memory use.
(3) if you really need groups that don't fit into memory, that is always
because you want to do something that is other than aggregation, with them.
E,g build an index of that grouped data. we actually had a case just like
that. In this case your friend is really not groupBy, but rather
PartitionBy. I.e. what happens there you build a quick count sketch,
perhaps on downsampled data, to figure which keys have sufficiently big
count -- and then you build a partitioner that redirects large groups to a
dedicated map(). assuming this map doesn't try to load things in memory but
rather do something like streaming BTree build, that should be fine. In
certain cituations such processing may require splitting super large group
even into smaller sub groups (e.g. partitioned BTree structure), at which
point you should be fine even from uniform load point of view. It takes a
little of jiu-jitsu to do it all, but it is not Spark's fault here, it did
not promise do this all for you in the groupBy contract.




 I'm hopeful that off-heap caching (Tachyon) could fix some of these issues.

 Just my personal experience, but I've observed significant improvements in
 stability since even the 0.7.x days, so I'm confident that things will
 continue to get better as long as people report what they're seeing so it
 can get fixed.

 Andrew


 On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert alex.boisv...@gmail.comwrote:

 I'll provide answers from our own experience at Bizo.  We've been using
 Spark for 1+ year now and have found it generally better than previous
 approaches (Hadoop + Hive mostly).



 On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth 
 andras.nem...@lynxanalytics.com wrote:

 I. Is it too much magic? Lots of things just work right in Spark and
 it's extremely convenient and efficient when it indeed works. But should we
 be worried that customization is hard if the built in behavior is not quite
 right for us? Are we to expect hard to track down issues originating from
 the black box behind the magic?


 I think is goes back to understanding Spark's architecture, its design
 constraints and the problems it explicitly set out to address.   If the
 solution to your problems can be easily formulated in terms of the
 map/reduce model, then it's a good choice.  You'll want your
 customizations to go with (not against) the grain of the architecture.


 II. Is it mature enough? E.g. we've created a pull 
 requesthttps://github.com/apache/spark/pull/181which fixes a problem that 
 we were very surprised no one ever stumbled upon
 before. So that's why I'm asking: is Spark being already used in
 professional settings? Can one already trust it being reasonably bug free
 and reliable?


 There are lots of ways to use Spark; and not all of the features are
 necessarily at the same level of maturity.   For instance, we put all the
 jars on the main classpath so we've never run into the issue your pull
 request addresses.

 We definitely use and rely on Spark on a professional basis.  We have 5+
 spark jobs running nightly on Amazon's EMR, slicing through GBs of data.
 Once we got them working with the proper configuration settings, they have
 been running reliability since.

 I would characterize our use of Spark as a better Hadoop, in the sense
 that we use it for batch processing only, no streaming yet.   We're happy
 it performs better than Hadoop but we don't require/rely on its memory
 caching features.  In fact, for most of our jobs it would simplify our
 lives if Spark wouldn't cache so many things in memory since it would make
 configuration/tuning a lot simpler and jobs would run successfully on the
 first try instead of having to tweak things (# of partitions and such).

 So, to the concrete issues. Sorry for the long mail, and let me know if I
 should break this out into more threads or if there is some other way to
 have this discussion...

 1. Memory management
 The 

Re: Multi master Spark

2014-04-09 Thread Dmitriy Lyubimov
The only way i know to do this is to use mesos with zookeepers. you specify
zookeeper url as spark url that contains multiple zookeeper hosts. Multiple
mesos masters are then elected thru zookeeper leader election until current
leader dies; at which point mesos will elect another master (if still
left).

iirc, in this mode spark master never runs, only master slaves are being
spun by mesos slaves directly.





On Wed, Apr 9, 2014 at 3:08 PM, Pradeep Ch pradeep.chanum...@gmail.comwrote:

 Hi,

 I want to enable Spark Master HA in spark. Documentation specifies that we
 can do this with the help of Zookeepers. But what I am worried is how to
 configure one master with the other and similarly how do workers know that
 the have two masters? where do you specify the multi-master information?

 Thanks for the help.

 Thanks,
 Pradeep