Re: Hanging tasks in spark 1.2.1 while working with 1.1.1
FWIW observed similar behavior in similar situation. Was able to work around by forcefully committing one of the rdds right before the union into cache, and forcing that by executing take(1). Nothing else ever helped. Seems like yet-undiscovered 1.2.x thing. On Tue, Mar 17, 2015 at 4:21 PM, Eugen Cepoi cepoi.eu...@gmail.com wrote: Doing the reduceByKey without changing the number of partitions and then do a coalesce works. But the other version still hangs, without any information (while working with spark 1.1.1). The previous logs don't seem to be related to what happens. I don't think this is a memory issue as the GC time remains low and the shuffle read is small. My guess is that it might be related to a high number of initial partitions, but in that case shouldn't it fail for coalesce too...? Does anyone have an idea where to look at to find what the source of the problem is? Thanks, Eugen 2015-03-13 19:18 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com: Hum increased it to 1024 but doesn't help still have the same problem :( 2015-03-13 18:28 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com: The one by default 0.07 of executor memory. I'll try increasing it and post back the result. Thanks 2015-03-13 18:09 GMT+01:00 Ted Yu yuzhih...@gmail.com: Might be related: what's the value for spark.yarn.executor.memoryOverhead ? See SPARK-6085 Cheers On Fri, Mar 13, 2015 at 9:45 AM, Eugen Cepoi cepoi.eu...@gmail.com wrote: Hi, I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1. Strange thing, the exact same code does work (after upgrade) in the spark-shell. But this information might be misleading as it works with 1.1.1... The job takes as input two data sets: - rdd A of +170gb (with less it is hard to reproduce) and more than 11K partitions - rdd B of +100mb and 32 partitions I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am not sure the executor config is relevant here. Anyway I tried with multiple small executors with fewer ram and the inverse. The job basically does this: A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save After the flatMap rdd A size is much smaller similar to B. Configs I used to run this job: storage.memoryFraction: 0 shuffle.memoryFraction: 0.5 akka.timeout 500 akka.frameSize 40 // this one defines also the memory used by yarn master, but not sure if it needs to be important driver.memory 5g excutor.memory 4250m I have 7 executors with 2 cores. What happens: The job produces two stages: keyBy and save. The keyBy stage runs fine and produces a shuffle write of ~150mb. The save stage where the suffle read occurs hangs. Greater the initial dataset is more tasks hang. I did run it for much larger datasets with same config/cluster but without doing the union and it worked fine. Some more infos and logs: Amongst 4 nodes 1 finished all his tasks and the running ones are on the 3 other nodes. But not sure this is a good information (one node that completed all his work vs the others) as with some smaller dataset I manage to get only one hanging task. Here are the last parts of the executor logs that show some timeouts. An executor from node ip-10-182-98-220 15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote fetches in 66 ms 15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in connection from /10.181.48.153:56806 java.io.IOException: Connection timed out An executor from node ip-10-181-103-186 15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote fetches in 20 ms 15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in connection from /10.182.98.220:38784 java.io.IOException: Connection timed out An executor from node ip-10-181-48-153 (all the logs bellow belong this node) 15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage 1.0 (TID 13860). 802 bytes result sent to driver 15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in connection from /10.181.103.186:46381 java.io.IOException: Connection timed out Followed by many 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=2064203432016, chunkIndex=405}, buffer=FileSegmentManagedBuffer{file=/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1426256247374_0002/spark-1659efcd-c6b6-4a12-894d-e869486d3d00/35/shuffle_0_9885_0.data, offset=8631, length=571}} to /10.181.103.186:46381; closing connection java.nio.channels.ClosedChannelException with last one being 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending result RpcResponse{requestId=7377187355282895939, response=[B@6fcd0014} to /10.181.103.186:46381; closing connection java.nio.channels.ClosedChannelException The executors from the node that finished his tasks doesn't show anything
Task result deserialization error (1.1.0)
Hi, I am getting task result deserialization error (kryo is enabled). Is it some sort of `chill` registration issue at front end? This is application that lists spark as maven dependency (so it gets correct hadoop and chill dependencies in classpath, i checked). Thanks in advance. 15/01/20 18:21:51 ERROR TaskResultGetter: Exception while getting task result java.lang.ArrayStoreException: scala.Tuple1 at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at
Re: Upgrade to Spark 1.1.0?
Mahout context does not include _all_ possible transitive dependencies. Would not be lighting fast to take all legacy etc. dependencies. There's an ignored unit test that asserts context path correctness. you can uningnore it and run to verify it still works as ex[ected.The reason it is set to ingored is because it requires mahout environment + already built mahout in order to run successfully. i can probably look it up if you don't find it immediately. Now. mahout context only includes what's really used in the drm algebra. Which is just a handful of jars. Apache commons math is not one of them. But, your driver can add it when creating mahout context, by tinkering additionally with the method parameters there (such as spark config). However, you may incounter a problem which may be that mahout assembly currently may not build -- and copy -- commons math jar into any of mahout tree. Finally, i am against adding commons-math by default, as general algebra does not depend on it. I'd suggest, in order of preference, (1) get rid of relying on commons math random generator (surely, by now we should be ok with scala.Random or even standard random?), or (2) add dependency in a custom way per above. If there's an extremely compelling reason why commons-math random gen dependency cannot be eliminated, then a better way is to include commons math into assembly (i think right now the only assembly that really copies in dependencies is the examples; which is probably wrong as examples are not the core product here), and add it explicitly to createMahoutContext (or whatever that method's name was) code. My understanding is the random from utils was mainly encouraged because it is automatically made deterministic in tests. I am unaware any fundamental deficiencies of scala random w.r.t its uses in existing methods. So perhaps scala side needs its own RandomUtils for testing that do not rely on commons math. On Sun, Oct 19, 2014 at 4:36 PM, Pat Ferrel p...@occamsmachete.com wrote: Trying to upgrade from Spark 1.0.1 to 1.1.0. Can’t imagine the upgrade is the problem but anyway... I get a NoClassDefFoundError for RandomGenerator when running a driver from the CLI. But only when using a named master, even a standalone master. If I run using master = local[4] the job executes correctly but if I set the master to spark://Maclaurin.local:7077 though they are the same machine I get the NoClassDefFoundError. The classpath seems correct on the CLI and the jars do indeed contain the offending class (see below). There must be some difference in how classes are loaded between local[4] and spark://Maclaurin.local:7077? Any ideas? === The driver is in mahout-spark_2.10-1.0-SNAPSHOT-job.jar so it’s execution means it must be in the classpath. When I look at what’s in the jar I see RandomGenerator. Maclaurin:target pat$ jar tf mahout-spark_2.10-1.0-SNAPSHOT-job.jar | grep RandomGenerator cern/jet/random/engine/RandomGenerator.class org/apache/commons/math3/random/GaussianRandomGenerator.class org/apache/commons/math3/random/JDKRandomGenerator.class org/apache/commons/math3/random/UniformRandomGenerator.class org/apache/commons/math3/random/RandomGenerator.class ==! org/apache/commons/math3/random/NormalizedRandomGenerator.class org/apache/commons/math3/random/AbstractRandomGenerator.class org/apache/commons/math3/random/StableRandomGenerator.class But get the following error executing the job: 14/10/19 15:39:00 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 6.9 (TID 84, 192.168.0.2): java.lang.NoClassDefFoundError: org/apache/commons/math3/random/RandomGenerator org.apache.mahout.common.RandomUtils.getRandom(RandomUtils.java:65) org.apache.mahout.math.cf.SimilarityAnalysis$$anonfun$5.apply(SimilarityAnalysis.scala:272) org.apache.mahout.math.cf.SimilarityAnalysis$$anonfun$5.apply(SimilarityAnalysis.scala:267) org.apache.mahout.sparkbindings.blas.MapBlock$$anonfun$1.apply(MapBlock.scala:33) org.apache.mahout.sparkbindings.blas.MapBlock$$anonfun$1.apply(MapBlock.scala:32) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235) org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
Re: Spark QL and protobuf schema
ok i'll try. happen to do that a lot to other tools. So I am guessing you are saying if i wanted to do it now, i'd start against https://github.com/apache/spark/tree/branch-1.1 and PR against it? On Thu, Aug 21, 2014 at 12:28 AM, Michael Armbrust mich...@databricks.com wrote: I do not know of any existing way to do this. It should be possible using the new public API for applying schema (will be available in 1.1) to an RDD. Basically you'll need to convert the proto buff records into rows, and also create a StructType that represents the schema. With this two things you can all the applySchema method on SparkContext. Would be great if you could contribute this back. On Wed, Aug 20, 2014 at 5:57 PM, Dmitriy Lyubimov dlie...@gmail.com wrote: Hello, is there any known work to adapt protobuf schema to Spark QL data sourcing? If not, would it present interest to contribute one? thanks. -d
Re: MLLib : Math on Vector and Matrix
On Wed, Jul 2, 2014 at 11:40 PM, Xiangrui Meng men...@gmail.com wrote: Hi Dmitriy, It is sweet to have the bindings, but it is very easy to downgrade the performance with them. The BLAS/LAPACK APIs have been there for more than 20 years and they are still the top choice for high-performance linear algebra. There's no such limitation there. In fact, LAPACK/jblas is very easy fruit to have there. algebraic optimizer is not about so much about in-core block-on-block techniques. It is about optimizing/simplification of algebraic expressions, especially their distributed plans/side of things. Another side of the story is consistent matrix representation for block-2-block in-memory computations and passing stuff in and out. R-like look feel. It is true that in-core-only computations currently are not deferrably optimized, nor do they have LAPack back but this is a low hanging fruit there. the main idea is consistency of algebraic API/DSL, be it distributed or in-core, and having algebraic optimizer, and pluggable backs (both in-core backs or distributed engine backs as well). It is so happened the only in-memory back right now is Mahout's Colt derivation, but there's fairly little reason not to pick-plug Breeze/Lapack, or say GPU -backed representations. In fact, that was my first attempt a year ago (Breeze) but it unfortunately was not where it needed to be (not sure about now). As for LAPack, yes it is easy to integrate. But the only reason I (personally) haven't integrated it yet is because my problems tend to be sparse, not dense, and also fairly invasive in terms of custom matrix traversals (probabilistic fitting, for the most part). So most specifically tweaked methodologies are thus really more quasi-algebraic than purely algebraic, unfortunately. Having LAPack blockwise operartors on dense matrices would not help me terribly there. But the architectural problem in terms of foundation, and, more specifically, customization of processes IMO does exist here (in mllib). This thread (and there was another one just like this a few threads below this one) are read by me as the manifestation of such lack of algebraic foundation apis/optimizers.
Re: MLLib : Math on Vector and Matrix
in my humble opinion Spark should've supported linalg a-la [1] before it even started dumping methodologies into mllib. [1] http://mahout.apache.org/users/sparkbindings/home.html On Wed, Jul 2, 2014 at 2:16 PM, Thunder Stumpges thunder.stump...@gmail.com wrote: Thanks. I always hate having to do stuff like this. It seems like they went a bit overboard with all the private[mllib] declarations... possibly all in the name of thou shalt not change your public API. If you don't make your public API usable, we end up having to work around it anyway... Oh well. Thunder On Wed, Jul 2, 2014 at 2:05 PM, Koert Kuipers ko...@tresata.com wrote: i did the second option: re-implemented .toBreeze as .breeze using pimp classes On Wed, Jul 2, 2014 at 5:00 PM, Thunder Stumpges thunder.stump...@gmail.com wrote: I am upgrading from Spark 0.9.0 to 1.0 and I had a pretty good amount of code working with internals of MLLib. One of the big changes was the move from the old jblas.Matrix to the Vector/Matrix classes included in MLLib. However I don't see how we're supposed to use them for ANYTHING other than a container for passing data to the included APIs... how do we do any math on them? Looking at the internal code, there are quite a number of private[mllib] declarations including access to the Breeze representations of the classes. Was there a good reason this was not exposed? I could see maybe not wanting to expose the 'toBreeze' function which would tie it to the breeze implementation, however it would be nice to have the various mathematics wrapped at least. Right now I see no way to code any vector/matrix math without moving my code namespaces into org.apache.spark.mllib or duplicating the code in 'toBreeze' in my own util functions. Not very appealing. What are others doing? thanks, Thunder
Re: Why Scala?
There were few known concerns about Scala, and some still are, but having been doing Scala professionally over two years now, i learned to master and appreciate the advanatages. Major concern IMO is Scala in a less-than-scrupulous corporate environment. First, Scala requires significantly more discipline in commenting and style to still stay painlessly readable, than java. People with less than stellar code hygiene can easily turn a project into an unmaintainable mess. Second, from corporate management prospective, it is (still?) much harder to staff with Scala coders as opposed to Java ones. All these things are a headache for corporate bosses, but for public and academic projects with thorough peer review and increased desire for contributors to look clean in public it works out quite well, and strong sides really shine. Spark specifically builds around FP patterns -- such as monads and functors -- which were absent in java prior to 8 (i am not sure that they are as well worked out in java 8 collections even now, as opposed to Scala collections). So java 8 simply comes a little late to the show in that department. Also FP is not the only thing that is used by Spark. Spark also uses stuff like implicits, akka/agent framework for IPC. Let's not forget that FP is albeit important but only one out of many stories in Scala in the grand scale of things. On Thu, May 29, 2014 at 1:55 PM, Nick Chammas nicholas.cham...@gmail.comwrote: I recently discovered Hacker News and started reading through older posts about Scala https://hn.algolia.com/?q=scala#!/story/forever/0/scala. It looks like the language is fairly controversial on there, and it got me thinking. Scala appears to be the preferred language to work with in Spark, and Spark itself is written in Scala, right? I know that often times a successful project evolves gradually out of something small, and that the choice of programming language may not always have been made consciously at the outset. But pretending that it was, why is Scala the preferred language of Spark? Nick -- View this message in context: Why Scala?http://apache-spark-user-list.1001560.n3.nabble.com/Why-Scala-tp6536.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.
Re: How to use Mahout VectorWritable in Spark.
PS spark shell with all proper imports are also supported natively in Mahout (mahout spark-shell command). See M-1489 for specifics. There's also a tutorial somewhere but i suspect it has not been yet finished/publised via public link yet. Again, you need trunk to use spark shell there. On Wed, May 14, 2014 at 12:43 AM, Stuti Awasthi stutiawas...@hcl.comwrote: Hi Xiangrui, Thanks for the response .. I tried few ways to include mahout-math jar while launching Spark shell.. but no success.. Can you please point what I am doing wrong 1. mahout-math.jar exported in CLASSPATH, and PATH 2. Tried Launching Spark Shell by : MASTER=spark://HOSTNAME:PORT ADD_JARS=~/installations/work-space/mahout-math-0.7.jar park-0.9.0/bin/spark-shell After launching, I checked the environment details on WebUi: It looks like mahout-math jar is included. spark.jars /home/hduser/installations/work-space/mahout-math-0.7.jar Then I try : scala import org.apache.mahout.math.VectorWritable console:10: error: object mahout is not a member of package org.apache import org.apache.mahout.math.VectorWritable scala val raw = sc.sequenceFile(path, classOf[Text], classOf[VectorWritable]) console:12: error: not found: type Text val data = sc.sequenceFile(/stuti/ML/Clustering/KMeans/HAR/KMeans_dataset_seq/part-r-0, classOf[Text], classOf[VectorWritable]) ^ Im using Spark 0.9 and Hadoop 1.0.4 and Mahout 0.7 Thanks Stuti -Original Message- From: Xiangrui Meng [mailto:men...@gmail.com] Sent: Wednesday, May 14, 2014 11:56 AM To: user@spark.apache.org Subject: Re: How to use Mahout VectorWritable in Spark. You need val raw = sc.sequenceFile(path, classOf[Text], classOf[VectorWriteable]) to load the data. After that, you can do val data = raw.values.map(_.get) To get an RDD of mahout's Vector. You can use `--jar mahout-math.jar` when you launch spark-shell to include mahout-math. Best, Xiangrui On Tue, May 13, 2014 at 10:37 PM, Stuti Awasthi stutiawas...@hcl.com wrote: Hi All, I am very new to Spark and trying to play around with Mllib hence apologies for the basic question. I am trying to run KMeans algorithm using Mahout and Spark MLlib to see the performance. Now initial datasize was 10 GB. Mahout converts the data in Sequence File Text,VectorWritable which is used for KMeans Clustering. The Sequence File crated was ~ 6GB in size. Now I wanted if I can use the Mahout Sequence file to be executed in Spark MLlib for KMeans . I have read that SparkContext.sequenceFile may be used here. Hence I tried to read my sequencefile as below but getting the error : Command on Spark Shell : scala val data = sc.sequenceFile[String,VectorWritable](/ KMeans_dataset_seq/part-r-0,String,VectorWritable) console:12: error: not found: type VectorWritable val data = sc.sequenceFile[String,VectorWritable]( /KMeans_dataset_seq/part-r-0,String,VectorWritable) Here I have 2 ques: 1. Mahout has “Text” as Key but Spark is printing “not found: type:Text” hence I changed it to String.. Is this correct ??? 2. How will VectorWritable be found in Spark. Do I need to include Mahout jar in Classpath or any other option ?? Please Suggest Regards Stuti Awasthi ::DISCLAIMER:: -- -- The contents of this e-mail and any attachment(s) are confidential and intended for the named recipient(s) only. E-mail transmission is not guaranteed to be secure or error-free as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or may contain viruses in transmission. The e mail and its contents (with or without referred errors) shall therefore not attach any liability on the originator or HCL or its affiliates. Views or opinions, if any, presented in this email are solely those of the author and may not necessarily reflect the views or opinions of HCL or its affiliates. Any form of reproduction, dissemination, copying, disclosure, modification, distribution and / or publication of this message without the prior written consent of authorized representative of HCL is strictly prohibited. If you have received this email in error please delete it and notify the sender immediately. Before opening any email and/or attachments, please check them for viruses and other defects. -- --
Re: How to use Mahout VectorWritable in Spark.
PPS The shell/spark tutorial i've mentioned is actually being developed in MAHOUT-1542. As it stands, i believe it is now complete in its core. On Wed, May 14, 2014 at 5:48 PM, Dmitriy Lyubimov dlie...@gmail.com wrote: PS spark shell with all proper imports are also supported natively in Mahout (mahout spark-shell command). See M-1489 for specifics. There's also a tutorial somewhere but i suspect it has not been yet finished/publised via public link yet. Again, you need trunk to use spark shell there. On Wed, May 14, 2014 at 12:43 AM, Stuti Awasthi stutiawas...@hcl.comwrote: Hi Xiangrui, Thanks for the response .. I tried few ways to include mahout-math jar while launching Spark shell.. but no success.. Can you please point what I am doing wrong 1. mahout-math.jar exported in CLASSPATH, and PATH 2. Tried Launching Spark Shell by : MASTER=spark://HOSTNAME:PORT ADD_JARS=~/installations/work-space/mahout-math-0.7.jar park-0.9.0/bin/spark-shell After launching, I checked the environment details on WebUi: It looks like mahout-math jar is included. spark.jars /home/hduser/installations/work-space/mahout-math-0.7.jar Then I try : scala import org.apache.mahout.math.VectorWritable console:10: error: object mahout is not a member of package org.apache import org.apache.mahout.math.VectorWritable scala val raw = sc.sequenceFile(path, classOf[Text], classOf[VectorWritable]) console:12: error: not found: type Text val data = sc.sequenceFile(/stuti/ML/Clustering/KMeans/HAR/KMeans_dataset_seq/part-r-0, classOf[Text], classOf[VectorWritable]) ^ Im using Spark 0.9 and Hadoop 1.0.4 and Mahout 0.7 Thanks Stuti -Original Message- From: Xiangrui Meng [mailto:men...@gmail.com] Sent: Wednesday, May 14, 2014 11:56 AM To: user@spark.apache.org Subject: Re: How to use Mahout VectorWritable in Spark. You need val raw = sc.sequenceFile(path, classOf[Text], classOf[VectorWriteable]) to load the data. After that, you can do val data = raw.values.map(_.get) To get an RDD of mahout's Vector. You can use `--jar mahout-math.jar` when you launch spark-shell to include mahout-math. Best, Xiangrui On Tue, May 13, 2014 at 10:37 PM, Stuti Awasthi stutiawas...@hcl.com wrote: Hi All, I am very new to Spark and trying to play around with Mllib hence apologies for the basic question. I am trying to run KMeans algorithm using Mahout and Spark MLlib to see the performance. Now initial datasize was 10 GB. Mahout converts the data in Sequence File Text,VectorWritable which is used for KMeans Clustering. The Sequence File crated was ~ 6GB in size. Now I wanted if I can use the Mahout Sequence file to be executed in Spark MLlib for KMeans . I have read that SparkContext.sequenceFile may be used here. Hence I tried to read my sequencefile as below but getting the error : Command on Spark Shell : scala val data = sc.sequenceFile[String,VectorWritable](/ KMeans_dataset_seq/part-r-0,String,VectorWritable) console:12: error: not found: type VectorWritable val data = sc.sequenceFile[String,VectorWritable]( /KMeans_dataset_seq/part-r-0,String,VectorWritable) Here I have 2 ques: 1. Mahout has “Text” as Key but Spark is printing “not found: type:Text” hence I changed it to String.. Is this correct ??? 2. How will VectorWritable be found in Spark. Do I need to include Mahout jar in Classpath or any other option ?? Please Suggest Regards Stuti Awasthi ::DISCLAIMER:: -- -- The contents of this e-mail and any attachment(s) are confidential and intended for the named recipient(s) only. E-mail transmission is not guaranteed to be secure or error-free as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or may contain viruses in transmission. The e mail and its contents (with or without referred errors) shall therefore not attach any liability on the originator or HCL or its affiliates. Views or opinions, if any, presented in this email are solely those of the author and may not necessarily reflect the views or opinions of HCL or its affiliates. Any form of reproduction, dissemination, copying, disclosure, modification, distribution and / or publication of this message without the prior written consent of authorized representative of HCL is strictly prohibited. If you have received this email in error please delete it and notify the sender immediately. Before opening any email and/or attachments, please check them for viruses and other defects. -- --
Re: How to use Mahout VectorWritable in Spark.
Mahout now supports doing its distributed linalg natively on Spark so the problem of sequence file input load into Spark is already solved there (trunk, http://mahout.apache.org/users/sparkbindings/home.html, drmFromHDFS() call -- and then you can access to the direct rdd via rdd matrix property if needed). if you specifically try ensure interoperability with MLlib, however, I did not try that -- however, Mahout's linalg tits bindings to Spark works with Kryo serializer only, so if/when MLLib algorithms do not support kryo serializer, it would not be interoperable. -d On Tue, May 13, 2014 at 10:37 PM, Stuti Awasthi stutiawas...@hcl.comwrote: Hi All, I am very new to Spark and trying to play around with Mllib hence apologies for the basic question. I am trying to run KMeans algorithm using Mahout and Spark MLlib to see the performance. Now initial datasize was 10 GB. Mahout converts the data in Sequence File Text,VectorWritable which is used for KMeans Clustering. The Sequence File crated was ~ 6GB in size. Now I wanted if I can use the Mahout Sequence file to be executed in Spark MLlib for KMeans . I have read that SparkContext.sequenceFile may be used here. Hence I tried to read my sequencefile as below but getting the error : Command on Spark Shell : scala val data = sc.sequenceFile[String,VectorWritable](/ KMeans_dataset_seq/part-r-0,String,VectorWritable) console:12: error: not found: type VectorWritable val data = sc.sequenceFile[String,VectorWritable]( /KMeans_dataset_seq/part-r-0,String,VectorWritable) Here I have 2 ques: 1. Mahout has “Text” as Key but Spark is printing “not found: type:Text” hence I changed it to String.. Is this correct ??? 2. How will VectorWritable be found in Spark. Do I need to include Mahout jar in Classpath or any other option ?? Please Suggest Regards Stuti Awasthi ::DISCLAIMER:: The contents of this e-mail and any attachment(s) are confidential and intended for the named recipient(s) only. E-mail transmission is not guaranteed to be secure or error-free as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or may contain viruses in transmission. The e mail and its contents (with or without referred errors) shall therefore not attach any liability on the originator or HCL or its affiliates. Views or opinions, if any, presented in this email are solely those of the author and may not necessarily reflect the views or opinions of HCL or its affiliates. Any form of reproduction, dissemination, copying, disclosure, modification, distribution and / or publication of this message without the prior written consent of authorized representative of HCL is strictly prohibited. If you have received this email in error please delete it and notify the sender immediately. Before opening any email and/or attachments, please check them for viruses and other defects.
Re: Spark - ready for prime time?
On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash and...@andrewash.com wrote: The biggest issue I've come across is that the cluster is somewhat unstable when under memory pressure. Meaning that if you attempt to persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll often still get OOMs. I had to carefully modify some of the space tuning parameters and GC settings to get some jobs to even finish. The other issue I've observed is if you group on a key that is highly skewed, with a few massively-common keys and a long tail of rare keys, the one massive key can be too big for a single machine and again cause OOMs. My take on it -- Spark doesn't believe in sort-and-spill things to enable super long groups, and IMO for a good reason. Here are my thoughts: (1) in my work i don't need sort in 99% of the cases, i only need group which absolutely doesn't need the spill which makes things slow down to a crawl. (2) if that's an aggregate (such as group count), use combine(), not groupByKey -- this will do tons of good on memory use. (3) if you really need groups that don't fit into memory, that is always because you want to do something that is other than aggregation, with them. E,g build an index of that grouped data. we actually had a case just like that. In this case your friend is really not groupBy, but rather PartitionBy. I.e. what happens there you build a quick count sketch, perhaps on downsampled data, to figure which keys have sufficiently big count -- and then you build a partitioner that redirects large groups to a dedicated map(). assuming this map doesn't try to load things in memory but rather do something like streaming BTree build, that should be fine. In certain cituations such processing may require splitting super large group even into smaller sub groups (e.g. partitioned BTree structure), at which point you should be fine even from uniform load point of view. It takes a little of jiu-jitsu to do it all, but it is not Spark's fault here, it did not promise do this all for you in the groupBy contract. I'm hopeful that off-heap caching (Tachyon) could fix some of these issues. Just my personal experience, but I've observed significant improvements in stability since even the 0.7.x days, so I'm confident that things will continue to get better as long as people report what they're seeing so it can get fixed. Andrew On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert alex.boisv...@gmail.comwrote: I'll provide answers from our own experience at Bizo. We've been using Spark for 1+ year now and have found it generally better than previous approaches (Hadoop + Hive mostly). On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth andras.nem...@lynxanalytics.com wrote: I. Is it too much magic? Lots of things just work right in Spark and it's extremely convenient and efficient when it indeed works. But should we be worried that customization is hard if the built in behavior is not quite right for us? Are we to expect hard to track down issues originating from the black box behind the magic? I think is goes back to understanding Spark's architecture, its design constraints and the problems it explicitly set out to address. If the solution to your problems can be easily formulated in terms of the map/reduce model, then it's a good choice. You'll want your customizations to go with (not against) the grain of the architecture. II. Is it mature enough? E.g. we've created a pull requesthttps://github.com/apache/spark/pull/181which fixes a problem that we were very surprised no one ever stumbled upon before. So that's why I'm asking: is Spark being already used in professional settings? Can one already trust it being reasonably bug free and reliable? There are lots of ways to use Spark; and not all of the features are necessarily at the same level of maturity. For instance, we put all the jars on the main classpath so we've never run into the issue your pull request addresses. We definitely use and rely on Spark on a professional basis. We have 5+ spark jobs running nightly on Amazon's EMR, slicing through GBs of data. Once we got them working with the proper configuration settings, they have been running reliability since. I would characterize our use of Spark as a better Hadoop, in the sense that we use it for batch processing only, no streaming yet. We're happy it performs better than Hadoop but we don't require/rely on its memory caching features. In fact, for most of our jobs it would simplify our lives if Spark wouldn't cache so many things in memory since it would make configuration/tuning a lot simpler and jobs would run successfully on the first try instead of having to tweak things (# of partitions and such). So, to the concrete issues. Sorry for the long mail, and let me know if I should break this out into more threads or if there is some other way to have this discussion... 1. Memory management The
Re: Multi master Spark
The only way i know to do this is to use mesos with zookeepers. you specify zookeeper url as spark url that contains multiple zookeeper hosts. Multiple mesos masters are then elected thru zookeeper leader election until current leader dies; at which point mesos will elect another master (if still left). iirc, in this mode spark master never runs, only master slaves are being spun by mesos slaves directly. On Wed, Apr 9, 2014 at 3:08 PM, Pradeep Ch pradeep.chanum...@gmail.comwrote: Hi, I want to enable Spark Master HA in spark. Documentation specifies that we can do this with the help of Zookeepers. But what I am worried is how to configure one master with the other and similarly how do workers know that the have two masters? where do you specify the multi-master information? Thanks for the help. Thanks, Pradeep