So after getting to know Spark a bit better and some further digging, I now believe this is down to https://issues.apache.org/jira/browse/SPARK-2075.
I thought I could work around this, by using Mahout as a library and submitting it as a standard Spark job. Unfortunately, I can't work out how to express a dependency on the 1.0-SNAPSHOT appropriately, at least with SBT, which is my normal build tool. Is there an example build file for using the snapshot version as a library? Thanks, Phil On Wed, Sep 17, 2014 at 3:11 AM, Pat Ferrel <[email protected]> wrote: > Hmm, well if that’s so then you are also able to see the data since you’re > reading and writing to the same S3 location in either case. The only > difference is the Spark master and therefore perhaps a Spark issue? Not > sure I can help much more. I don’t have access to the same setup as you > have. Is the Spark community able to help or at least throw the ball back > in my court? > > Does the debug output indicate that the read and computation went ok? Does > it look the same as running local? No new warnings earlier in the run? BTW > to get local to use multiple cores run with master set to something like > “local[4]”. > > On Sep 16, 2014, at 1:22 PM, Phil Wills <[email protected]> wrote: > > No, by local I mean running on one a large ec2 box spun up by the same > script, but running the 'mahout spark-itemsimilarity' command without a > master specified, so that it runs locally to that box, so I'm confident > about the versions being the same in local to that box and distributed > across the cluster modes. Apologies for the lack of clarity. > > Phil > > On Tue, Sep 16, 2014 at 7:48 PM, Pat Ferrel <[email protected]> wrote: > > > By local I assume you are talking about your dev machine, not one of the > > cluster machines. > > > > Excuse me if I’m stating the obvious but you are using two completely > > different Spark and Hadoop installations, one local and one remote. They > > could be completely different codebases. Just because you have configured > > Spark and Hadoop to execute locally doesn’t mean they work remotely. It > > sounds like you are using the CLI on your dev machine, which is set to > run > > locally, and passing a remote Spark master URI and S3 URI to the local > > Mahout script. I would install and set up Mahout on your cluster master, > > make sure MAHOUT_LOCAL is not set there since you will be using a > cluster, > > and execute the CLI from there. > > > > Furthermore are you sure that the remote Spark cluster can see the S3 > > data? Ssh to the Spark master and do something like “hadoop fs -ls” or > > supply the URI to verify that the Hadoop config on the remote cluster, > > which is what the remote Spark will use, can get to the data. > > > > > > On Sep 15, 2014, at 2:28 PM, Phil Wills <[email protected]> wrote: > > > > The data and s3n file system is OK, since when I run 'locally' that's > just > > without a master specified, but otherwise identically, it works fine. > I've > > been using the spark-ec2 scripts to retrieve spark and hadoop, so had > > assumed that meant they were operating compatible versions, but I'm not > > specifying which hadoop to use explicitly, so I don't know if that has an > > effect. > > > > Phil > > > > On Mon, Sep 15, 2014 at 7:25 PM, Pat Ferrel <[email protected]> > wrote: > > > >> It should handle this input—no surprise. > >> > >> Spark must be compiled for the correct version of Hadoop that you are > >> using (Mahout also). I’d make sure Spark is working properly with your > > HDFS > >> by trying one of their examples if you haven’t already. Running locally > > may > >> not be using the same version of Hadoop, have you checked that? > >> > >> A filenamePattern of ‘.*’ will get all files in > >> s3n://recommendation-logs/2014/09/06 and you have it set to search > >> recursively. Check to make sure this is what you want. Did you use the > > same > >> dir structure as you have on s3n when you ran locally? Since this driver > >> looks at text files it can think it is working on data if it finds “[\t, > > ]” > >> a tab, comma, or space in the line when it’s reading garbage so you > > should > >> be sure it is working on only the files you want. Tell it to look for > > only > >> a tab if that’s what you are using or use a regex to match the entire > >> filename like “^part.*” or “.*log”. > >> > >> I have not tested with s3n:// URIs. I assume you can read all these with > >> the hadoop tools like “hadoop fs -ls > > s3n://recommendation-logs/2014/09/06”? > >> > >> off-list I’ll send a link to epinions data formatted for Mahout. You can > >> try putting that in HDFS via sn3 and running it because I have tested > > that > >> on a cluster. It is all in one file though so if there is a problem in > > file > >> discovery it won’t show up. > >> > >> > >> On Sep 15, 2014, at 9:10 AM, Phil Wills <[email protected]> wrote: > >> > >> Tried running locally on a reasonably beefy machine and it worked fine. > >> Which is the toy data, you're referring to? > >> > >> JAVA_HOME=/usr/lib/jvm/jre-1.7.0-openjdk.x86_64 SPARK_HOME=/root/spark > >> MAHOUT_HOME=. bin/mahout spark-itemsimilarity --input > >> s3n://recommendation-logs/2014/09/06 --output > >> s3n://recommendation-outputs/2014/09/06 --filenamePattern '.*' > > --recursive > >> --master spark://ec2-54-75-13-36.eu-west-1.compute.amazonaws.com:7077 > >> --sparkExecutorMem 6g > >> > >> and the working version running locally on a beefier box: > >> > >> JAVA_HOME=/usr/lib/jvm/jre-1.7.0-openjdk.x86_64 SPARK_HOME=/root/spark > >> MAHOUT_HOME=. MAHOUT_HEAPSIZE=16000 bin/mahout spark-itemsimilarity > > --input > >> s3n://ophan-recommendation-logs/2014/09/06 --output > >> s3n://ophan-recommendation-outputs/2014/09/06 --filenamePattern '.*' > >> --recursive --sparkExecutorMem 16g > >> > >> Sample input: > >> > >> nnS1dIIBBtTnehVD79lgYeBw > >> > >> > > > http://www.example.com/world/2014/sep/05/malaysia-airlines-mh370-six-months-chinese-families-lack-answers > >> > >> ikFSk14vHrTPqjSISvMihDUg > >> > >> > > > http://www.example.com/world/2014/sep/05/obama-core-coalition-10-countries-to-fight-isis > >> > >> edqu8kfgsFSg2w3MhV5rUwuQ > >> > >> > > > http://www.example.com/lifeandstyle/wordofmouth/2014/sep/05/food-and-drink2?CMP=fb_gu > >> > >> pfnmfONG1DQWG_EOOIxUASow > >> > >> > > > http://www.example.com/world/live/2014/sep/05/unresponsive-plane-f15-jets-aircraft-live-updates > >> > >> pfUil_W0s2TZSqojMQrVcxVw http://www. > >> > >> > > > example.com/football/blog/2014/sep/05/jose-mourinho-bargain-loic-remy-chelsea-france > >> > >> nxTJnpyenFSP-tqWSLHQdW8w > >> > > > http://www.example.com/books/2014/sep/05/were-we-happier-in-the-stone-age > >> > >> lba37jwJVQS5GbiSuus1i6tA > >> > >> > > > http://www.example.com/stage/2014/sep/05/titus-andronicus-review-visually-striking-but-flawed > >> > >> bEHaOzZPbtQz-X2K1wortBQQ > >> > >> > > > http://www.example.com/cities/2014/sep/05/death-america-suburban-dream-ferguson-missouri-resegregation > >> > >> gjTGzDXiDOT5W2SThhm0tUmg > >> > >> > > > http://www.example.com/world/2014/sep/05/man-jailed-phoning-texting-ex-21807-times > >> > >> pfFbQ5ddvBRhm0XLZbN6Xd2A > >> > >> > > > http://www.example.com/sport/2014/sep/05/gloucester-northampton-premiership-rugby > >> > >> > >> > >> On Sun, Sep 14, 2014 at 4:06 PM, Pat Ferrel <[email protected]> > > wrote: > >> > >>> I wonder if it’s trying to write an empty rdd to a text file. Can you > >> give > >>> the CLI options and a snippet of data? > >>> > >>> Also have you successfully run this on the toy data in the resource > dir? > >>> There is a script to run it locally that you can adapt for running on a > >>> cluster. This will eliminate any cluster problem. > >>> > >>> > >>> On Sep 13, 2014, at 1:13 PM, Phil Wills <[email protected]> wrote: > >>> > >>> Here's the master log from the line with the stack trace to > termination: > >>> > >>> 14/09/12 15:54:55 INFO scheduler.DAGScheduler: Failed to run > >> saveAsTextFile > >>> at TextDelimitedReaderWriter.scala:288 > >>> Exception in thread "main" org.apache.spark.SparkException: Job aborted > >> due > >>> to stage failure: Task 8.0:3 failed 4 times, most recent failure: TID > > 448 > >>> on host ip-10-105-176-77.eu-west-1.compute.internal failed for unknown > >>> reason > >>> Driver stacktrace: > >>> at org.apache.spark.scheduler.DAGScheduler.org > >>> > >>> > >> > > > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) > >>> at > >>> > >>> > >> > > > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) > >>> at > >>> > >>> > >> > > > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) > >>> at > >>> > >>> > >> > > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > >>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > >>> at > >>> > >> > > > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) > >>> at > >>> > >>> > >> > > > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) > >>> at > >>> > >>> > >> > > > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) > >>> at scala.Option.foreach(Option.scala:236) > >>> at > >>> > >>> > >> > > > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635) > >>> at > >>> > >>> > >> > > > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234) > >>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > >>> at akka.actor.ActorCell.invoke(ActorCell.scala:456) > >>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > >>> at akka.dispatch.Mailbox.run(Mailbox.scala:219) > >>> at > >>> > >>> > >> > > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > >>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > >>> at > >>> > >>> > >> > > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > >>> at > >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > >>> at > >>> > >>> > >> > > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > >>> 14/09/12 15:54:55 INFO scheduler.DAGScheduler: Executor lost: 8 (epoch > >> 20) > >>> 14/09/12 15:54:55 INFO storage.BlockManagerMasterActor: Trying to > remove > >>> executor 8 from BlockManagerMaster. > >>> 14/09/12 15:54:55 INFO storage.BlockManagerMaster: Removed 8 > > successfully > >>> in removeExecutor > >>> 14/09/12 15:54:55 INFO storage.BlockManagerInfo: Registering block > >> manager > >>> ip-10-105-176-77.eu-west-1.compute.internal:58803 with 3.4 GB RAM > >>> 14/09/12 15:54:55 INFO cluster.SparkDeploySchedulerBackend: Registered > >>> executor: > >>> Actor[akka.tcp://[email protected] > >>> :56590/user/Executor#1456047585] > >>> with ID 9 > >>> > >>> On Sat, Sep 13, 2014 at 4:21 PM, Pat Ferrel <[email protected]> > >> wrote: > >>> > >>>> It’s not an error I’ve seen but they can tend to be pretty cryptic. > >> Could > >>>> you post more of the stack trace? > >>>> > >>>> On Sep 12, 2014, at 2:55 PM, Phil Wills <[email protected]> wrote: > >>>> > >>>> I've tried on 1.0.1 and 1.0.2, updating the pom to 1.0.2 when running > > on > >>>> that. I used the spark-ec2 scripts to set up the cluster. > >>>> > >>>> I might be able to share the data I'll mull it over the weekend to > make > >>>> sure there's nothing sensitive, or if there's a way I can transform it > >> to > >>>> that point. > >>>> > >>>> Phil > >>>> > >>>> > >>>> On Fri, Sep 12, 2014 at 6:30 PM, Pat Ferrel <[email protected]> > >>> wrote: > >>>> > >>>>> The mahout pom says 1.0.1 but I’m running fine on 1.0.2 > >>>>> > >>>>> > >>>>> On Sep 12, 2014, at 10:08 AM, Pat Ferrel <[email protected]> > >> wrote: > >>>>> > >>>>> Is it a mature Spark cluster, what version of Spark? > >>>>> > >>>>> If you can share the data I can try it on mine. > >>>>> > >>>>> On Sep 12, 2014, at 9:42 AM, Phil Wills <[email protected]> wrote: > >>>>> > >>>>> I've been experimenting with the fairly new ItemSimilarityDriver, > > which > >>>> is > >>>>> working fine up until the point it tries to write out it's results. > >>>>> Initially I was getting an issue with the akka frameSize being too > >>> small, > >>>>> but after expanding that I'm now getting a much more cryptic error: > >>>>> > >>>>> 14/09/12 15:54:55 INFO scheduler.DAGScheduler: Failed to run > >>>> saveAsTextFile > >>>>> at TextDelimitedReaderWriter.scala:288 > >>>>> Exception in thread "main" org.apache.spark.SparkException: Job > > aborted > >>>> due > >>>>> to stage failure: Task 8.0:3 failed 4 times, most recent failure: TID > >>> 448 > >>>>> on host ip-10-105-176-77.eu-west-1.compute.internal failed for > unknown > >>>>> reason > >>>>> > >>>>> This is from the master node, but there doesn't seem to be anything > >> more > >>>>> intelligible in the slave node logs. > >>>>> > >>>>> I've tried writing to the local file system as well as s3n and can > see > >>>> it's > >>>>> not an access problem, as I am seeing a zero length file appear. > >>>>> > >>>>> Thanks for any pointers and apologies if this would be better to ask > > on > >>>> the > >>>>> Spark list, > >>>>> > >>>>> Phil > >>>>> > >>>>> > >>>>> > >>>> > >>>> > >>> > >>> > >> > >> > > > > > >
