Hi Pat, Had some work to do on the input side for the item-similarity, so just getting around to running it on a cluster now. I'm building from source from the 0.10 release against version 2.4.0 hadoop. The version of spark we are using is 1.1, we use the emr install-spark script to install spark. The EMR ami version is 3.6.0
I haven't got a complete run yet, but will update here when I have. We're running into some issues with getting the job to run on all nodes of the cluster and were wondering - 1. Does the itemsimilarity job run on all nodes when using yarn? 2. We are passing in yarn-client as the master, is that correct? This is the command we are running - ./bin/mahout spark-itemsimilarity --input hdfs://xx.xx.xx.xx:9000/userUrls --output hdfs://xx.xx.xx.xx:9000/output --master yarn-client -sem 16g -D:spark.akka.frameSize=30 We ran into a problem with the akka frame size which is why it is increased. Thanks On Mon, Apr 6, 2015 at 7:11 PM, Pat Ferrel <p...@occamsmachete.com> wrote: > Michael, > > There is a fix in the latest source on Github. > > If you’d like to try it > > mvn clean install -DskipTests #there is a failing test at present so skip > them > > Add your version of hadopp if needed, consult here: > http://mahout.apache.org/developers/buildingmahout.html > > From my side, spark-itemsimilarity completes on the data you gave me. If you > get a chance to try it can you report what happened and your Spark and Hadoop > version and your number of cluster nodes? > > Thanks > > On Apr 5, 2015, at 10:30 AM, Pat Ferrel <p...@occamsmachete.com> wrote: > > Michael, > > The problem is in partitioning the data and if you start with one file the > partitions are created fine. With a bunch of small files, the optimizer trips > up by not catching a range of size=0. This will be fixed in 0.10.1 but for > now (0.10.0) you can: > > 1) concat files into one > 2) I can repartition explicitly with a specified number of partitions (I > verified this works on your input) then allow you to pass that in since it > will depend on your cluster and data somewhat—not sure this will always work. > 3) we are still looking for a better work around so wait a few days... > > > On Apr 3, 2015, at 12:48 PM, Dmitriy Lyubimov <dlie...@gmail.com> wrote: > > no, i don't think there's a workaround. it needs a fix; however, in public > version there are much more fixes needed so I think this part will be > refactored completely in 0.10.1 > > On Fri, Apr 3, 2015 at 12:38 PM, Pat Ferrel <p...@occamsmachete.com> wrote: > >> OK, it was. Is there a workaround I can try? >> >> >> On Apr 3, 2015, at 12:22 PM, Dmitriy Lyubimov <dlie...@gmail.com> wrote: >> >> Although... i am not aware of one in A'A >> >> could be faulty vector length in a matrix if matrix was created by drmWrap >> with explicit specification of ncol >> >> On Fri, Apr 3, 2015 at 12:20 PM, Dmitriy Lyubimov <dlie...@gmail.com> >> wrote: >> >>> it's a bug. There's a number of similar ones in operator A'B. >>> >>> On Fri, Apr 3, 2015 at 6:23 AM, Michael Kelly <mich...@onespot.com> >> wrote: >>> >>>> Hi Pat, >>>> >>>> I've done some further digging and it looks like the problem is >>>> occurring when the input files are split up to into parts. The input >>>> to the item-similarity matrix is the output from a spark job and it >>>> ends up in about 2000 parts (on the hadoop file system). I have >>>> reproduced the error locally using a small subset of the rows. >>>> >>>> This is a snippet of the file I am using - >>>> >>>> ... >>>> >>>> 5138353282348067470,1891081885 >>>> 4417954190713934181,1828065687 >>>> 133682221673920382,1454844406 >>>> 133682221673920382,1129053737 >>>> 133682221673920382,548627241 >>>> 133682221673920382,1048452021 >>>> 8547417492653230933,1121310481 >>>> 7693904559640861382,1333374361 >>>> 7204049418352603234,606209305 >>>> 139299176617553863,467181330 >>>> ... >>>> >>>> >>>> When I run the item-similarity against a single input file which >>>> contains all the rows, the job succeeds without error. >>>> >>>> When I break up the input file into 100 parts, and use the directory >>>> containing them as input then I get the 'Index outside allowable >>>> range' exception. >>>> >>>> Her are the input files that I used tarred and gzipped - >>>> >>>> >>>> >> https://s3.amazonaws.com/static.onespot.com/mahout/passing_single_file.tar.gz >>>> >>>> >> https://s3.amazonaws.com/static.onespot.com/mahout/failing_split_into_100_parts.tar.gz >>>> >>>> There are 44067 rows in total, 11858 unique userIds and 24166 unique >>>> itemIds. >>>> >>>> This is the exception that I see on the 100 part run - >>>> 15/04/03 12:07:09 ERROR Executor: Exception in task 0.0 in stage 9.0 >> (TID >>>> 707) >>>> org.apache.mahout.math.IndexException: Index 24190 is outside >>>> allowable range of [0,24166) >>>> at >> org.apache.mahout.math.AbstractVector.viewPart(AbstractVector.java:147) >>>> at >>>> org.apache.mahout.math.scalabindings.VectorOps.apply(VectorOps.scala:37) >>>> at >>>> >> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$5$$anonfun$apply$6.apply(AtA.scala:152) >>>> at >>>> >> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$5$$anonfun$apply$6.apply(AtA.scala:149) >>>> at >>>> scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376) >>>> at >>>> scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376) >>>> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085) >>>> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077) >>>> at >>>> >> scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980) >>>> at >>>> >> scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980) >>>> at >>>> >> scala.collection.immutable.StreamIterator$LazyCell.v$lzycompute(Stream.scala:969) >>>> at >> scala.collection.immutable.StreamIterator$LazyCell.v(Stream.scala:969) >>>> at scala.collection.immutable.StreamIterator.hasNext(Stream.scala:974) >>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>> at >>>> >> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:202) >>>> at >>>> >> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56) >>>> at >>>> >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >>>> at >>>> >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>> at org.apache.spark.scheduler.Task.run(Task.scala:56) >>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) >>>> at >>>> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>> at >>>> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> >>>> I tried splitting the file up in 10,20 and 50 parts and the job >> completed. >>>> Also, should the resulting similarity matrix be the same wether the >>>> input is split up or not? I passed in the same random seed for the >>>> spark job, but the matrices were different >>>> >>>> Thanks, >>>> >>>> Michael >>>> >>>> >>>> >>>> On Thu, Apr 2, 2015 at 6:56 PM, Pat Ferrel <p...@occamsmachete.com> >> wrote: >>>>> The input must be tuples (if not using a filter) so the CLI you have >>>> expects user and item ids that are >>>>> >>>>> user-id1,item-id1 >>>>> user-id500,item-id3000 >>>>> … >>>>> >>>>> The ids must be tokenized because it doesn’t use a full csv parser, >>>> only lines of delimited text. >>>>> >>>>> If this doesn’t help can you supply a snippet of the input >>>>> >>>>> >>>>> On Apr 2, 2015, at 10:39 AM, Michael Kelly <mich...@onespot.com> >> wrote: >>>>> >>>>> Hi all, >>>>> >>>>> I'm running the spark-itemsimilarity job from the cli on an AWS emr >>>>> cluster, and I'm running into an exception. >>>>> >>>>> The input file format is >>>>> UserId<tab>ItemId1<tab>ItemId2<tab>ItemId3...... >>>>> >>>>> There is only one row per user, and a total of 97,000 rows. >>>>> >>>>> I also tried input with one row per UserId/ItemId pair, which had >>>>> about 250,000 rows, but I also saw a similar exception, this time the >>>>> out of bounds index was around 110,000. >>>>> >>>>> The input is stored in hdfs and this is the command I used to start the >>>> job - >>>>> >>>>> mahout spark-itemsimilarity --input userItems --output output --master >>>>> yarn-client >>>>> >>>>> Any idea what the problem might be? >>>>> >>>>> Thanks, >>>>> >>>>> Michael >>>>> >>>>> >>>>> >>>>> 15/04/02 16:37:40 WARN TaskSetManager: Lost task 1.0 in stage 10.0 >>>>> (TID 7631, ip-XX.XX.ec2.internal): >>>>> org.apache.mahout.math.IndexException: Index 22050 is outside >>>>> allowable range of [0,21997) >>>>> >>>>> >>>> org.apache.mahout.math.AbstractVector.viewPart(AbstractVector.java:147) >>>>> >>>>> >>>> org.apache.mahout.math.scalabindings.VectorOps.apply(VectorOps.scala:37) >>>>> >>>>> >>>> >> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$5$$anonfun$apply$6.apply(AtA.scala:152) >>>>> >>>>> >>>> >> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$5$$anonfun$apply$6.apply(AtA.scala:149) >>>>> >>>>> >>>> scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376) >>>>> >>>>> >>>> scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376) >>>>> >>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085) >>>>> >>>>> scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077) >>>>> >>>>> >>>> >> scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980) >>>>> >>>>> >>>> >> scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980) >>>>> >>>>> >>>> >> scala.collection.immutable.StreamIterator$LazyCell.v$lzycompute(Stream.scala:969) >>>>> >>>>> >>>> scala.collection.immutable.StreamIterator$LazyCell.v(Stream.scala:969) >>>>> >>>>> >>>> scala.collection.immutable.StreamIterator.hasNext(Stream.scala:974) >>>>> >>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>> >>>>> >>>> >> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:144) >>>>> >>>>> >>>> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) >>>>> >>>>> >>>> >> org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55) >>>>> >>>>> >>>> >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >>>>> >>>>> >>>> >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>>> >>>>> org.apache.spark.scheduler.Task.run(Task.scala:54) >>>>> >>>>> >>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) >>>>> >>>>> >>>> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>> >>>>> >>>> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>> >>>>> java.lang.Thread.run(Thread.java:745) >>>>> >>>> >>> >>> >> >> > >