Re: Rename filter() into keep(), remove() or take() ?
Agree that filter is perhaps unintuitive. Though the Scala collections API has filter and filterNot which together provide context that makes it more intuitive. And yes the change could be via added methods that don't break existing API. Still overall I would be -1 on this unless a significant proportion of users would find it added value. Actually adding filterNot while not that necessary would make more sense in my view — Sent from Mailbox for iPhone On Thu, Feb 27, 2014 at 3:56 PM, Bertrand Dechoux decho...@gmail.com wrote: I understand the explanation but I had to try. However, the change could be made without breaking anything but that's another story. Regards Bertrand Bertrand Dechoux On Thu, Feb 27, 2014 at 2:05 PM, Nick Pentreath nick.pentre...@gmail.comwrote: filter comes from the Scala collection method filter. I'd say it's best to keep in line with the Scala collections API, as Spark has done with RDDs generally (map, flatMap, take etc), so that is is easier and natural for developers to apply the same thinking for Scala (parallel) collections to Spark RDDs. Plus, such an API change would be a major breaking one and IMO not a good idea at this stage. deffilter(p: (A) = Booleanhttp://www.scala-lang.org/api/2.10.3/scala/Boolean.html ): Seq http://www.scala-lang.org/api/2.10.3/scala/collection/Seq.html[A] Selects all elements of this sequence which satisfy a predicate. p the predicate used to test elements. returns a new sequence consisting of all elements of this sequence that satisfy the given predicate p. The order of the elements is preserved. On Thu, Feb 27, 2014 at 2:36 PM, Bertrand Dechoux decho...@gmail.comwrote: Hi, It might seem like a trivial issue but even though it is somehow a standard name filter() is not really explicit in which way it does work. Sure, it makes sense to provide a filter function but what happens when it returns true? Is the current element removed or kept? It is not really obvious. Has another name been already discussed? It could be keep() or remove(). But take() could also be reused and instead of providing a number, the filter function could be requested. Regards Bertrand
Re: Running actions in loops
There is #3 which is use mapPartitions and init one jodatime obj per partition, which is less overhead for large objects— Sent from Mailbox for iPhone On Sat, Mar 8, 2014 at 2:54 AM, Mayur Rustagi mayur.rust...@gmail.com wrote: So the whole function closure you want to apply on your RDD needs to be serializable so that it can be serialized sent to workers to operate on RDD. So objects of jodatime cannot be serialized sent hence jodatime is out of work. 2 bad answers 1. initialize jodatime for each row complete work destroy them, that way they are only intialized when job is running need not be sent across. 2. Write your own parser hope jodatime guys get their act together. Regards Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Mar 7, 2014 at 12:56 PM, Ognen Duzlevski og...@nengoiksvelzud.comwrote: Mayur, have not thought of that. Yes, I use jodatime. What is the scope that this serialization issue applies to? Only the method making a call into / using such a library? The whole class the method using such a library belongs to? Sorry if it is a dumb question :) Ognen On 3/7/14, 1:29 PM, Mayur Rustagi wrote: Mostly the job you are executing is not serializable, this typically happens when you have a library that is not serializable.. are you using any library like jodatime etc ? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Mar 6, 2014 at 9:50 PM, Ognen Duzlevski og...@plainvanillagames.com wrote: It looks like the problem is in the filter task - is there anything special about filter()? I have removed the filter line from the loops just to see if things will work and they do. Anyone has any ideas? Thanks! Ognen On 3/6/14, 9:39 PM, Ognen Duzlevski wrote: Hello, What is the general approach people take when trying to do analysis across multiple large files where the data to be extracted from a successive file depends on the data extracted from a previous file or set of files? For example: I have the following: a group of HDFS files each 20+GB in size. I need to extract event1 on day 1 from first file and extract event2 from all remaining files in a period of successive dates, then do a calculation on the two events. I then need to move on to day2, extract event1 (with certain properties), take all following days, extract event2 and run a calculation against previous day for all days in period. So on and so on. I have verified that the following (very naive approach doesn't work): def calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]] = { val epd = new PipelineDate(end) val result = for { dt1 - PipelineDate.getPeriod(new PipelineDate(start), epd) val f1 = sc.textFile(dt1.toJsonHdfsFileName) val e1 = f1.filter(_.split(,)(0).split(:)(1).replace(\,) == event1).map(line = (line.split(,)(2).split(:)(1).replace(\,),0)).cache val c = e1.count.toDouble val intres = for { dt2 - PipelineDate.getPeriod(dt1+1,epd) val f2 = sc.textFile(dt2.toJsonHdfsFileName) val e2 = f2.filter(_.split(,)(0).split(:)(1).replace(\,) == event2).map(line = (line.split(,)(2).split(:)(1).replace(\,),1)) val e1e2 = e1.union(e2) val r = e1e2.groupByKey().filter(e = e._2.length 1 e._2.filter(_==0).length0).count.toDouble } yield (c/r) // get the retention rate } yield (dt1.toString-intres) Map(result:_*) } I am getting the following errors: 14/03/07 03:22:25 INFO SparkContext: Starting job: count at CountActor.scala:33 14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at CountActor.scala:33) with 140 output partitions (allowLocal=false) 14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at CountActor.scala:33) 14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List() 14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List() 14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at map at CountActor.scala:32), which has no missing parents 14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at CountActor.scala:33 14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not serializable: java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at
Re: Running Spark on a single machine
Please follow the instructions at http://spark.apache.org/docs/latest/index.html and http://spark.apache.org/docs/latest/quick-start.html to get started on a local machine. — Sent from Mailbox for iPhone On Sun, Mar 16, 2014 at 11:39 PM, goi cto goi@gmail.com wrote: Hi, I know it is probably not the purpose of spark but the syntax is easy and cool... I need to run some spark like code in memory on a single machine any pointers how to optimize it to run only on one machine? -- Eran | CTO
Re: Calling Spahk enthusiasts in Boston
I would offer to host one in Cape Town but we're almost certainly the only Spark users in the country apart from perhaps one in Johanmesburg :)— Sent from Mailbox for iPhone On Mon, Mar 31, 2014 at 8:53 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: My fellow Bostonians and New Englanders, We cannot allow New York to beat us to having a banging Spark meetup. Respond to me (and I guess also Andy?) if you are interested. Yana, I'm not sure either what is involved in organizing, but we can figure it out. I didn't know about the meetup that never took off. Nick On Mon, Mar 31, 2014 at 2:31 PM, Yana Kadiyska yana.kadiy...@gmail.comwrote: Nicholas, I'm in Boston and would be interested in a Spark group. Not sure if you know this -- there was a meetup that never got off the ground. Anyway, I'd be +1 for attending. Not sure what is involved in organizing. Seems a shame that a city like Boston doesn't have one. On Mon, Mar 31, 2014 at 2:02 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: As in, I am interested in helping organize a Spark meetup in the Boston area. On Mon, Mar 31, 2014 at 2:00 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Well, since this thread has played out as it has, lemme throw in a shout-out for Boston. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Calling-Spahk-enthusiasts-in-Boston-tp3544.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
NPE using saveAsTextFile
Hi I'm using Spark 0.9.0. When calling saveAsTextFile on a custom hadoop inputformat (loaded with newAPIHadoopRDD), I get the following error below. If I call count, I get the correct count of number of records, so the inputformat is being read correctly... the issue only appears when trying to use saveAsTextFile. If I call first() I get the correct output, also. So it doesn't appear to be anything with the data or inputformat. Any idea what the actual problem is, since this stack trace is not obvious (though it seems to be in ResultTask which ultimately causes this). Is this a known issue at all? == 14/04/08 16:00:46 ERROR OneForOneStrategy: java.lang.NullPointerException at com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202) at com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228) at com.typesafe.config.ConfigException.writeObject(ConfigException.java:58) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:975) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346) at scala.collection.immutable.$colon$colon.writeObject(List.scala:379) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:975) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:28) at org.apache.spark.scheduler.ResultTask$.serializeInfo(ResultTask.scala:48) at org.apache.spark.scheduler.ResultTask.writeExternal(ResultTask.scala:123) at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1443) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1414) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at
Re: NPE using saveAsTextFile
Ok I thought it may be closing over the config option. I am using config for job configuration, but extracting vals from that. So not sure why as I thought I'd avoided closing over it. Will go back to source and see where it is creeping in. On Thu, Apr 10, 2014 at 8:42 AM, Matei Zaharia matei.zaha...@gmail.comwrote: I haven't seen this but it may be a bug in Typesafe Config, since this is serializing a Config object. We don't actually use Typesafe Config ourselves. Do you have any nulls in the data itself by any chance? And do you know how that Config object is getting there? Matei On Apr 9, 2014, at 11:38 PM, Nick Pentreath nick.pentre...@gmail.com wrote: Anyone have a chance to look at this? Am I just doing something silly somewhere? If it makes any difference, I am using the elasticsearch-hadoop plugin for ESInputFormat. But as I say, I can parse the data (count, first() etc). I just can't save it as text file. On Tue, Apr 8, 2014 at 4:50 PM, Nick Pentreath nick.pentre...@gmail.comwrote: Hi I'm using Spark 0.9.0. When calling saveAsTextFile on a custom hadoop inputformat (loaded with newAPIHadoopRDD), I get the following error below. If I call count, I get the correct count of number of records, so the inputformat is being read correctly... the issue only appears when trying to use saveAsTextFile. If I call first() I get the correct output, also. So it doesn't appear to be anything with the data or inputformat. Any idea what the actual problem is, since this stack trace is not obvious (though it seems to be in ResultTask which ultimately causes this). Is this a known issue at all? == 14/04/08 16:00:46 ERROR OneForOneStrategy: java.lang.NullPointerException at com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202) at com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228) at com.typesafe.config.ConfigException.writeObject(ConfigException.java:58) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:975) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346) at scala.collection.immutable.$colon$colon.writeObject(List.scala:379) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at java.io.ObjectStreamClass.invokeWriteObject
Re: NPE using saveAsTextFile
There was a closure over the config object lurking around - but in any case upgrading to 1.2.0 for config did the trick as it seems to have been a bug in Typesafe config, Thanks Matei! On Thu, Apr 10, 2014 at 8:46 AM, Nick Pentreath nick.pentre...@gmail.comwrote: Ok I thought it may be closing over the config option. I am using config for job configuration, but extracting vals from that. So not sure why as I thought I'd avoided closing over it. Will go back to source and see where it is creeping in. On Thu, Apr 10, 2014 at 8:42 AM, Matei Zaharia matei.zaha...@gmail.comwrote: I haven't seen this but it may be a bug in Typesafe Config, since this is serializing a Config object. We don't actually use Typesafe Config ourselves. Do you have any nulls in the data itself by any chance? And do you know how that Config object is getting there? Matei On Apr 9, 2014, at 11:38 PM, Nick Pentreath nick.pentre...@gmail.com wrote: Anyone have a chance to look at this? Am I just doing something silly somewhere? If it makes any difference, I am using the elasticsearch-hadoop plugin for ESInputFormat. But as I say, I can parse the data (count, first() etc). I just can't save it as text file. On Tue, Apr 8, 2014 at 4:50 PM, Nick Pentreath nick.pentre...@gmail.comwrote: Hi I'm using Spark 0.9.0. When calling saveAsTextFile on a custom hadoop inputformat (loaded with newAPIHadoopRDD), I get the following error below. If I call count, I get the correct count of number of records, so the inputformat is being read correctly... the issue only appears when trying to use saveAsTextFile. If I call first() I get the correct output, also. So it doesn't appear to be anything with the data or inputformat. Any idea what the actual problem is, since this stack trace is not obvious (though it seems to be in ResultTask which ultimately causes this). Is this a known issue at all? == 14/04/08 16:00:46 ERROR OneForOneStrategy: java.lang.NullPointerException at com.typesafe.config.impl.SerializedConfigValue.writeOrigin(SerializedConfigValue.java:202) at com.typesafe.config.impl.ConfigImplUtil.writeOrigin(ConfigImplUtil.java:228) at com.typesafe.config.ConfigException.writeObject(ConfigException.java:58) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:975) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1480) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1416) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346) at scala.collection.immutable.$colon$colon.writeObject(List.scala:379) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Re: StackOverflow Error when run ALS with 100 iterations
I'd also say that running for 100 iterations is a waste of resources, as ALS will typically converge pretty quickly, as in within 10-20 iterations. On Wed, Apr 16, 2014 at 3:54 AM, Xiaoli Li lixiaolima...@gmail.com wrote: Thanks a lot for your information. It really helps me. On Tue, Apr 15, 2014 at 7:57 PM, Cheng Lian lian.cs@gmail.com wrote: Probably this JIRA issuehttps://spark-project.atlassian.net/browse/SPARK-1006solves your problem. When running with large iteration number, the lineage DAG of ALS becomes very deep, both DAGScheduler and Java serializer may overflow because they are implemented in a recursive way. You may resort to checkpointing as a workaround. On Wed, Apr 16, 2014 at 5:29 AM, Xiaoli Li lixiaolima...@gmail.comwrote: Hi, I am testing ALS using 7 nodes. Each node has 4 cores and 8G memeory. ALS program cannot run even with a very small size of training data (about 91 lines) due to StackVverFlow error when I set the number of iterations to 100. I think the problem may be caused by updateFeatures method which updates products RDD iteratively by join previous products RDD. I am writing a program which has a similar update process with ALS. This problem also appeared when I iterate too many times (more than 80). The iterative part of my code is as following: solution = outlinks.join(solution). map { ... } Has anyone had similar problem? Thanks. Xiaoli
Re: User/Product Clustering with pySpark ALS
There's no easy way to d this currently. The pieces are there from the PySpark code for regression which should be adaptable. But you'd have to roll your own solution. This is something I also want so I intend to put together a pull request for this soon — Sent from Mailbox On Tue, Apr 29, 2014 at 4:28 PM, Laird, Benjamin benjamin.la...@capitalone.com wrote: Hi all - I’m using pySpark/MLLib ALS for user/item clustering and would like to directly access the user/product RDDs (called userFeatures/productFeatures in class MatrixFactorizationModel in mllib/recommendation/MatrixFactorizationModel.scala This doesn’t seem to complex, but it doesn’t seem like the functionality is currently available. I think it requires accessing the underlying java mode like so: model = ALS.train(ratings,1,iterations=1,blocks=5) userFeatures = RDD(model.javamodel.userFeatures, sc, ???) However, I don’t know what to pass as the deserializer. I need these low dimensional vectors as an RDD to then use in Kmeans clustering. Has anyone done something similar? Ben The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
spark-submit / S3
Hi I see from the docs for 1.0.0 that the new spark-submit mechanism seems to support specifying the jar with hdfs:// or http:// Does this support S3? (It doesn't seem to as I have tried it on EC2 but doesn't seem to work): ./bin/spark-submit --master local[2] --class myclass s3n://bucket/myapp.jar args
Re: Spark on HBase vs. Spark on HDFS
Hi In my opinion, running HBase for immutable data is generally overkill in particular if you are using Shark anyway to cache and analyse the data and provide the speed. HBase is designed for random-access data patterns and high throughput R/W activities. If you are only ever writing immutable logs, then that is what HDFS is designed for. Having said that, if you replace HBase you will need to come up with a reliable way to put data into HDFS (a log aggregator like Flume or message bus like Kafka perhaps, etc), so the pain of doing that may not be worth it given you already know HBase. On Thu, May 22, 2014 at 9:33 AM, Limbeck, Philip philip.limb...@automic.com wrote: HI! We are currently using HBase as our primary data store of different event-like data. On-top of that, we use Shark to aggregate this data and keep it in memory for fast data access. Since we use no specific HBase functionality whatsoever except Putting data into it, a discussion came up on having to set up an additional set of components on top of HDFS instead of just writing to HDFS directly. Is there any overview regarding implications of doing that ? I mean except things like taking care of file structure and the like. What is the true advantage of Spark on HBase in favor of Spark on HDFS? Best Philip Automic Software GmbH, Hauptstrasse 3C, 3012 Wolfsgraben Firmenbuchnummer/Commercial Register No. 275184h Firmenbuchgericht/Commercial Register Court: Landesgericht St. Poelten This email (including any attachments) may contain information which is privileged, confidential, or protected. If you are not the intended recipient, note that any disclosure, copying, distribution, or use of the contents of this message and attached files is prohibited. If you have received this email in error, please notify the sender and delete this email and any attached files.
Re: Writing RDDs from Python Spark progrma (pyspark) to HBase
It's not possible currently to write anything other than text (or pickle files I think in 1.0.0 or if not then in 1.0.1) from PySpark. I have an outstanding pull request to add READING any InputFormat from PySpark, and after that is in I will look into OutputFormat too. What does your data look like? Any details about your use case that you could share would aid the design of this feature. N On Wed, May 28, 2014 at 3:00 PM, gaurav.dasgupta gaurav.d...@gmail.comwrote: Hi, I am unable to understand how to write data directly on HBase table from a Spark (pyspark) Python program. Is this possible in the current Spark releases? If so, can someone provide an example code snippet to do this? Thanks in advance. Regards, Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Writing-RDDs-from-Python-Spark-progrma-pyspark-to-HBase-tp6469.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Python, Spark and HBase
Hi Tommer, I'm working on updating and improving the PR, and will work on getting an HBase example working with it. Will feed back as soon as I have had the chance to work on this a bit more. N On Thu, May 29, 2014 at 3:27 AM, twizansk twiza...@gmail.com wrote: The code which causes the error is: The code which causes the error is: sc = SparkContext(local, My App) rdd = sc.newAPIHadoopFile( name, 'org.apache.hadoop.hbase.mapreduce.TableInputFormat', 'org.apache.hadoop.hbase.io.ImmutableBytesWritable', 'org.apache.hadoop.hbase.client.Result', conf={hbase.zookeeper.quorum: my-host, hbase.rootdir: hdfs://my-host:8020/hbase, hbase.mapreduce.inputtable: data}) The full stack trace is: Py4JError Traceback (most recent call last) ipython-input-8-3b9a4ea2f659 in module() 7 conf={hbase.zookeeper.quorum: my-host, 8 hbase.rootdir: hdfs://my-host:8020/hbase, 9 hbase.mapreduce.inputtable: data}) 10 11 /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/context.pyc in newAPIHadoopFile(self, name, inputformat_class, key_class, value_class, key_wrapper, value_wrapper, conf) 281 for k, v in conf.iteritems(): 282 jconf[k] = v -- 283 jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, name, inputformat_class, key_class, value_class, 284 key_wrapper, value_wrapper, jconf) 285 return RDD(jrdd, self, PickleSerializer()) /opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py in __getattr__(self, name) 657 else: 658 raise Py4JError('{0} does not exist in the JVM'. -- 659 format(self._fqn + name)) 660 661 def __call__(self, *args): Py4JError: org.apache.spark.api.python.PythonRDDnewAPIHadoopFile does not exist in the JVM -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Python-Spark-and-HBase-tp6142p6507.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Can't seem to link external/twitter classes from my own app
@Sean, the %% syntax in SBT should automatically add the Scala major version qualifier (_2.10, _2.11 etc) for you, so that does appear to be correct syntax for the build. I seemed to run into this issue with some missing Jackson deps, and solved it by including the jar explicitly on the driver class path: bin/spark-submit *-* *-driver-class-path SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar* --class SimpleApp SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar Seems redundant to me since I thought that the JAR as argument is copied to driver and made available. But this solved it for me so perhaps give it a try? On Wed, Jun 4, 2014 at 3:01 PM, Sean Owen so...@cloudera.com wrote: Those aren't the names of the artifacts: http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22spark-streaming-twitter_2.10%22 The name is spark-streaming-twitter_2.10 On Wed, Jun 4, 2014 at 1:49 PM, Jeremy Lee unorthodox.engine...@gmail.com wrote: Man, this has been hard going. Six days, and I finally got a Hello World App working that I wrote myself. Now I'm trying to make a minimal streaming app based on the twitter examples, (running standalone right now while learning) and when running it like this: bin/spark-submit --class SimpleApp SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar I'm getting this error: Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/streaming/twitter/TwitterUtils$ Which I'm guessing is because I haven't put in a dependency to external/twitter in the .sbt, but _how_? I can't find any docs on it. Here's my build file so far: simple.sbt -- name := Simple Project version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming-twitter % 1.0.0 libraryDependencies += org.twitter4j % twitter4j-stream % 3.0.3 resolvers += Akka Repository at http://repo.akka.io/releases/; -- I've tried a few obvious things like adding: libraryDependencies += org.apache.spark %% spark-external % 1.0.0 libraryDependencies += org.apache.spark %% spark-external-twitter % 1.0.0 because, well, that would match the naming scheme implied so far, but it errors. Also, I just realized I don't completely understand if: (a) the spark-submit command _sends_ the .jar to all the workers, or (b) the spark-submit commands sends a _job_ to the workers, which are supposed to already have the jar file installed (or in hdfs), or (c) the Context is supposed to list the jars to be distributed. (is that deprecated?) One part of the documentation says: Once you have an assembled jar you can call the bin/spark-submit script as shown here while passing your jar. but another says: application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes. I suppose both could be correct if you take a certain point of view. -- Jeremy Lee BCompSci(Hons) The Unorthodox Engineers
Re: Can't seem to link external/twitter classes from my own app
The magic incantation is sbt assembly (not assemble). Actually I find maven with their assembly plugins to be very easy (mvn package). I can send a Pom.xml for a skeleton project if you need — Sent from Mailbox On Thu, Jun 5, 2014 at 6:59 AM, Jeremy Lee unorthodox.engine...@gmail.com wrote: Hmm.. That's not working so well for me. First, I needed to add a project/plugin.sbt file with the contents: addSbtPlugin(com.eed3si9n % sbt-assembly % 0.11.4) Before 'sbt/sbt assemble' worked at all. And I'm not sure about that version number, but 0.9.1 isn't working much better and 11.4 is the latest one recommended by the sbt project site. Where did you get your version from? Second, even when I do get it to build a .jar, spark-submit is still telling me the external.twitter library is missing. I tried using your github project as-is, but it also complained about the missing plugin.. I'm trying it with various versions now to see if I can get that working, even though I don't know anything about kafka. Hmm, and no. Here's what I get: [info] Set current project to Simple Project (in build file:/home/ubuntu/spark-1.0.0/SparkKafka/) [error] Not a valid command: assemble [error] Not a valid project ID: assemble [error] Expected ':' (if selecting a configuration) [error] Not a valid key: assemble (similar: assembly, assemblyJarName, assemblyDirectory) [error] assemble [error] I also found this project which seemed to be exactly what I was after: https://github.com/prabeesh/SparkTwitterAnalysis ...but it was for Spark 0.9, and though I updated all the version references to 1.0.0, that one doesn't work either. I can't even get it to build. *sigh* Is it going to be easier to just copy the external/ source code into my own project? Because I will... especially if creating Uberjars takes this long every... single... time... On Thu, Jun 5, 2014 at 8:52 AM, Jeremy Lee unorthodox.engine...@gmail.com wrote: Thanks Patrick! Uberjars. Cool. I'd actually heard of them. And thanks for the link to the example! I shall work through that today. I'm still learning sbt and it's many options... the last new framework I learned was node.js, and I think I've been rather spoiled by npm. At least it's not maven. Please, oh please don't make me learn maven too. (The only people who seem to like it have Software Stockholm Syndrome: I know maven kidnapped me and beat me up, but if you spend long enough with it, you eventually start to sympathize and see it's point of view.) On Thu, Jun 5, 2014 at 3:39 AM, Patrick Wendell pwend...@gmail.com wrote: Hey Jeremy, The issue is that you are using one of the external libraries and these aren't actually packaged with Spark on the cluster, so you need to create an uber jar that includes them. You can look at the example here (I recently did this for a kafka project and the idea is the same): https://github.com/pwendell/kafka-spark-example You'll want to make an uber jar that includes these packages (run sbt assembly) and then submit that jar to spark-submit. Also, I'd try running it locally first (if you aren't already) just to make the debugging simpler. - Patrick On Wed, Jun 4, 2014 at 6:16 AM, Sean Owen so...@cloudera.com wrote: Ah sorry, this may be the thing I learned for the day. The issue is that classes from that particular artifact are missing though. Worth interrogating the resulting .jar file with jar tf to see if it made it in? On Wed, Jun 4, 2014 at 2:12 PM, Nick Pentreath nick.pentre...@gmail.com wrote: @Sean, the %% syntax in SBT should automatically add the Scala major version qualifier (_2.10, _2.11 etc) for you, so that does appear to be correct syntax for the build. I seemed to run into this issue with some missing Jackson deps, and solved it by including the jar explicitly on the driver class path: bin/spark-submit --driver-class-path SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar --class SimpleApp SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar Seems redundant to me since I thought that the JAR as argument is copied to driver and made available. But this solved it for me so perhaps give it a try? On Wed, Jun 4, 2014 at 3:01 PM, Sean Owen so...@cloudera.com wrote: Those aren't the names of the artifacts: http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22spark-streaming-twitter_2.10%22 The name is spark-streaming-twitter_2.10 On Wed, Jun 4, 2014 at 1:49 PM, Jeremy Lee unorthodox.engine...@gmail.com wrote: Man, this has been hard going. Six days, and I finally got a Hello World App working that I wrote myself. Now I'm trying to make a minimal streaming app based on the twitter examples, (running standalone right now while learning) and when running it like this: bin/spark-submit --class SimpleApp SimpleApp/target/scala-2.10/simple-project_2.10-1.0.jar I'm getting this error
Re: Cassandra examples don't work for me
Yyou need cassandra 1.2.6 for Spark examples — Sent from Mailbox On Thu, Jun 5, 2014 at 12:02 AM, Tim Kellogg t...@2lemetry.com wrote: Hi, I’m following the directions to run the cassandra example “org.apache.spark.examples.CassandraTest” and I get this error Exception in thread main java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected at org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat.getSplits(AbstractColumnFamilyInputFormat.java:113) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:90) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:59) at org.apache.spark.rdd.PairRDDFunctions.reduceByKey(PairRDDFunctions.scala:370) at org.apache.spark.examples.CassandraTest$.main(CassandraTest.scala:100) at org.apache.spark.examples.CassandraTest.main(CassandraTest.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) I’m running Cassandra version 2.0.6, and this comes from the spark-1.0.0-bin-hadoop2 distribution package. I am running the example with this commandline: bin/run-example org.apache.spark.examples.CassandraTest localhost localhost 9160 I suspect it’s because I’m running the wrong version of Cassandra, but I can’t find the correct version listed anywhere. I hope this is an easy issue to address. Much thanks, Tim
Re: compress in-memory cache?
Have you set the persistence level of the RDD to MEMORY_ONLY_SER ( http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence)? If you're calling cache, the default persistence level is MEMORY_ONLY so that setting will have no impact. On Thu, Jun 5, 2014 at 4:41 PM, Xu (Simon) Chen xche...@gmail.com wrote: I have a working set larger than available memory, thus I am hoping to turn on rdd compression so that I can store more in-memory. Strangely it made no difference. The number of cached partitions, fraction cached, and size in memory remain the same. Any ideas? I confirmed that rdd compression wasn't on before and it was on for the second test. scala sc.getConf.getAll foreach println ... (spark.rdd.compress,true) ... I haven't tried lzo vs snappy, but my guess is that either one should provide at least some benefit.. Thanks. -Simon
Re: error loading large files in PySpark 0.9.0
Ah looking at that inputformat it should just work out the box using sc.newAPIHadoopFile ... Would be interested to hear if it works as expected for you (in python you'll end up with bytearray values). N — Sent from Mailbox On Fri, Jun 6, 2014 at 9:38 PM, Jeremy Freeman freeman.jer...@gmail.com wrote: Oh cool, thanks for the heads up! Especially for the Hadoop InputFormat support. We recently wrote a custom hadoop input format so we can support flat binary files (https://github.com/freeman-lab/thunder/tree/master/scala/src/main/scala/thunder/util/io/hadoop), and have been testing it in Scala. So I was following Nick's progress and was eager to check this out when ready. Will let you guys know how it goes. -- J -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/error-loading-large-files-in-PySpark-0-9-0-tp3049p7144.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Are scala.MatchError messages a problem?
When you use match, the match must be exhaustive. That is, a match error is thrown if the match fails. That's why you usually handle the default case using case _ = ... Here it looks like your taking the text of all statuses - which means not all of them will be commands... Which means your match will not be exhaustive. The solution is either to add a default case which does nothing, or probably better to add a .filter such that you filter out anything that's not a command before matching. Just looking at it again it could also be that you take x = x._2._1 ... What type is that? Should it not be a Seq if you're joining, in which case the match will also fail... Hope this helps. — Sent from Mailbox On Sun, Jun 8, 2014 at 6:45 PM, Jeremy Lee unorthodox.engine...@gmail.com wrote: I shut down my first (working) cluster and brought up a fresh one... and It's been a bit of a horror and I need to sleep now. Should I be worried about these errors? Or did I just have the old log4j.config tuned so I didn't see them? I 14/06/08 16:32:52 ERROR scheduler.JobScheduler: Error running job streaming job 1402245172000 ms.2 scala.MatchError: 0101-01-10 (of class java.lang.String) at SimpleApp$$anonfun$6$$anonfun$apply$6.apply(SimpleApp.scala:218) at SimpleApp$$anonfun$6$$anonfun$apply$6.apply(SimpleApp.scala:217) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at SimpleApp$$anonfun$6.apply(SimpleApp.scala:217) at SimpleApp$$anonfun$6.apply(SimpleApp.scala:214) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) The error comes from this code, which seemed like a sensible way to match things: (The case cmd_plus(w) statement is generating the error,) val cmd_plus = [+]([\w]+).r val cmd_minus = [-]([\w]+).r // find command user tweets val commands = stream.map( status = ( status.getUser().getId(), status.getText() ) ).foreachRDD(rdd = { rdd.join(superusers).map( x = x._2._1 ).collect().foreach{ cmd = { 218: cmd match { case cmd_plus(w) = { ... } case cmd_minus(w) = { ... } } }} }) It seems a bit excessive for scala to throw exceptions because a regex didn't match. Something feels wrong.
Re: mllib, python and SVD
Don't think SVD is exposed via MLlib in Python yet, but you can also check out: https://github.com/ogrisel/spylearn where Jeremy Freeman put together a numpy-based SVD algorithm (this is a bit outdated but should still work I assume) (also https://github.com/freeman-lab/thunder has a PCA implementation). On Mon, Jun 9, 2014 at 11:32 AM, Håvard Wahl Kongsgård haavard.kongsga...@gmail.com wrote: Hi, is it possible to do Singular value decomposition (SVD) with python in spark(1.0.0)? -Havard WK
Re: Optimizing reduce for 'huge' aggregated outputs.
Can you key your RDD by some key and use reduceByKey? In fact if you are merging bunch of maps you can create a set of (k, v) in your mapPartitions and then reduceByKey using some merge function. The reduce will happen in parallel on multiple nodes in this case. You'll end up with just a single set of k, v per partition which you can reduce or collect and merge on the driver. — Sent from Mailbox On Tue, Jun 10, 2014 at 1:05 AM, Sung Hwan Chung coded...@cs.stanford.edu wrote: I suppose what I want is the memory efficiency of toLocalIterator and the speed of collect. Is there any such thing? On Mon, Jun 9, 2014 at 3:19 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Hello, I noticed that the final reduce function happens in the driver node with a code that looks like the following. val outputMap = mapPartition(domsomething).reduce(a: Map, b: Map) { a.merge(b) } although individual outputs from mappers are small. Over time the aggregated result outputMap could be huuuge (say with hundreds of millions of keys and values, reaching giga bytes). I noticed that, even if we have a lot of memory in the driver node, this process becomes realy slow eventually (say we have 100+ partitions. the first reduce is fast, but progressively, it becomes veeery slow as more and more partition outputs get aggregated). Is this because the intermediate reduce output gets serialized and then deserialized every time? What I'd like ideally is, since reduce is taking place in the same machine any way, there's no need for any serialization and deserialization, and just aggregate the incoming results into the final aggregation. Is this possible?
RE: Question about RDD cache, unpersist, materialization
If you want to force materialization use .count() Also if you can simply don't unpersist anything, unless you really need to free the memory — Sent from Mailbox On Wed, Jun 11, 2014 at 5:13 AM, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: BTW, it is possible that rdd.first() does not compute the whole partitions. So, first() cannot be uses for the situation below. -Original Message- From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] Sent: Wednesday, June 11, 2014 11:40 AM To: user@spark.apache.org Subject: Question about RDD cache, unpersist, materialization Hi, What I (seems to) know about RDD persisting API is as follows: - cache() and persist() is not an action. It only does a marking. - unpersist() is also not an action. It only removes a marking. But if the rdd is already in memory, it is unloaded. And there seems no API to forcefully materialize the RDD without requiring a data by an action method, for example first(). So, I am faced with the following scenario. { JavaRDDT rddUnion = sc.parallelize(new ArrayListT()); // create empty for merging for (int i = 0; i 10; i++) { JavaRDDT2 rdd = sc.textFile(inputFileNames[i]); rdd.cache(); // Since it will be used twice, cache. rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]); // Transform and save, rdd materializes rddUnion = rddUnion.union(rdd.map(...).filter(...)); // Do another transform to T and merge by union rdd.unpersist(); // Now it seems not needed. (But needed actually) } // Here, rddUnion actually materializes, and needs all 10 rdds that already unpersisted. // So, rebuilding all 10 rdds will occur. rddUnion.saveAsTextFile(mergedFileName); } If rddUnion can be materialized before the rdd.unpersist() line and cache()d, the rdds in the loop will not be needed on rddUnion.saveAsTextFile(). Now what is the best strategy? - Do not unpersist all 10 rdds in the loop. - Materialize rddUnion in the loop by calling 'light' action API, like first(). - Give up and just rebuild/reload all 10 rdds when saving rddUnion. Is there some misunderstanding? Thanks.
Re: Using CQLSSTableWriter to batch load data from Spark to Cassandra.
can you not use a Cassandra OutputFormat? Seems they have BulkOutputFormat. An example of using it with Hadoop is here: http://shareitexploreit.blogspot.com/2012/03/bulkloadto-cassandra-with-hadoop.html Using it with Spark will be similar to the examples: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala and https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala On Wed, Jun 25, 2014 at 8:44 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi, (My excuses for the cross-post from SO) I'm trying to create Cassandra SSTables from the results of a batch computation in Spark. Ideally, each partition should create the SSTable for the data it holds in order to parallelize the process as much as possible (and probably even stream it to the Cassandra ring as well) After the initial hurdles with the CQLSSTableWriter (like requiring the yaml file), I'm confronted now with this issue: java.lang.RuntimeException: Attempting to load already loaded column family customer.rawts at org.apache.cassandra.config.Schema.load(Schema.java:347) at org.apache.cassandra.config.Schema.load(Schema.java:112) at org.apache.cassandra.io.sstable.CQLSSTableWriter$Builder.forTable(CQLSSTableWriter.java:336) I'm creating a writer on each parallel partition like this: def store(rdd:RDD[Message]) = { rdd.foreachPartition( msgIterator = { val writer = CQLSSTableWriter.builder() .inDirectory(/tmp/cass) .forTable(schema) .using(insertSttmt).build() msgIterator.foreach(msg = {...}) })} And if I'm reading the exception correctly, I can only create one writer per table in one JVM. Digging a bit further in the code, it looks like the Schema.load(...) singleton enforces that limitation. I guess writings to the writer will not be thread-safe and even if they were the contention that multiple threads will create by having all parallel tasks trying to dump few GB of data to disk at the same time will defeat the purpose of using the SSTables for bulk upload anyway. So, are there ways to use the CQLSSTableWriter concurrently? If not, what is the next best option to load batch data at high throughput in Cassandra? Will the upcoming Spark-Cassandra integration help with this? (ie. should I just sit back, relax and the problem will solve itself?) Thanks, Gerard.
Re: Using CQLSSTableWriter to batch load data from Spark to Cassandra.
Right, ok. I can't say I've used the Cassandra OutputFormats before. But perhaps if you use it directly (instead of via Calliope) you may be able to get it to work, albeit with less concise code? Or perhaps you may be able to build Cassandra from source with Hadoop 2 / CDH4 support: https://groups.google.com/forum/#!topic/nosql-databases/Y-9amAdZk1s On Wed, Jun 25, 2014 at 9:14 PM, Gerard Maas gerard.m...@gmail.com wrote: Thanks Nick. We used the CassandraOutputFormat through Calliope. The Calliope API makes the CassandraOutputFormat quite accessible and is cool to work with. It worked fine at prototype level, but we had Hadoop version conflicts when we put it in our Spark environment (Using our Spark assembly compiled with CDH4.4). The conflict seems to be at the Cassandra-all lib level, which is compiled against a different hadoop version (v1). We could not get round that issue. (Any pointers in that direction?) That's why I'm trying the direct CQLSSTableWriter way but it looks blocked as well. -kr, Gerard. On Wed, Jun 25, 2014 at 8:57 PM, Nick Pentreath nick.pentre...@gmail.com wrote: can you not use a Cassandra OutputFormat? Seems they have BulkOutputFormat. An example of using it with Hadoop is here: http://shareitexploreit.blogspot.com/2012/03/bulkloadto-cassandra-with-hadoop.html Using it with Spark will be similar to the examples: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala and https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala On Wed, Jun 25, 2014 at 8:44 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi, (My excuses for the cross-post from SO) I'm trying to create Cassandra SSTables from the results of a batch computation in Spark. Ideally, each partition should create the SSTable for the data it holds in order to parallelize the process as much as possible (and probably even stream it to the Cassandra ring as well) After the initial hurdles with the CQLSSTableWriter (like requiring the yaml file), I'm confronted now with this issue: java.lang.RuntimeException: Attempting to load already loaded column family customer.rawts at org.apache.cassandra.config.Schema.load(Schema.java:347) at org.apache.cassandra.config.Schema.load(Schema.java:112) at org.apache.cassandra.io.sstable.CQLSSTableWriter$Builder.forTable(CQLSSTableWriter.java:336) I'm creating a writer on each parallel partition like this: def store(rdd:RDD[Message]) = { rdd.foreachPartition( msgIterator = { val writer = CQLSSTableWriter.builder() .inDirectory(/tmp/cass) .forTable(schema) .using(insertSttmt).build() msgIterator.foreach(msg = {...}) })} And if I'm reading the exception correctly, I can only create one writer per table in one JVM. Digging a bit further in the code, it looks like the Schema.load(...) singleton enforces that limitation. I guess writings to the writer will not be thread-safe and even if they were the contention that multiple threads will create by having all parallel tasks trying to dump few GB of data to disk at the same time will defeat the purpose of using the SSTables for bulk upload anyway. So, are there ways to use the CQLSSTableWriter concurrently? If not, what is the next best option to load batch data at high throughput in Cassandra? Will the upcoming Spark-Cassandra integration help with this? (ie. should I just sit back, relax and the problem will solve itself?) Thanks, Gerard.
Re: ElasticSearch enrich
You can just add elasticsearch-hadoop as a dependency to your project to user the ESInputFormat and ESOutputFormat ( https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics here: http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html For testing, yes I think you will need to start ES in local mode (just ./bin/elasticsearch) and use the default config (host = localhost, port = 9200). On Thu, Jun 26, 2014 at 9:04 AM, boci boci.b...@gmail.com wrote: That's okay, but hadoop has ES integration. what happened if I run saveAsHadoopFile without hadoop (or I must need to pull up hadoop programatically? (if I can)) b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau hol...@pigscanfly.ca wrote: On Wed, Jun 25, 2014 at 4:16 PM, boci boci.b...@gmail.com wrote: Hi guys, thanks the direction now I have some problem/question: - in local (test) mode I want to use ElasticClient.local to create es connection, but in prodution I want to use ElasticClient.remote, to this I want to pass ElasticClient to mapPartitions, or what is the best practices? In this case you probably want to make the ElasticClient inside of mapPartitions (since it isn't serializable) and if you want to use a different client in local mode just have a flag that control what type of client you create. - my stream output is write into elasticsearch. How can I test output.saveAsHadoopFile[ESOutputFormat](-) in local environment? - After store the enriched data into ES, I want to generate aggregated data (EsInputFormat) how can I test it in local? I think the simplest thing to do would be use the same client in mode and just start single node elastic search cluster. Thanks guys b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau hol...@pigscanfly.ca wrote: So I'm giving a talk at the Spark summit on using Spark ElasticSearch, but for now if you want to see a simple demo which uses elasticsearch for geo input you can take a look at my quick dirty implementation with TopTweetsInALocation ( https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala ). This approach uses the ESInputFormat which avoids the difficulty of having to manually create ElasticSearch clients. This approach might not work for your data, e.g. if you need to create a query for each record in your RDD. If this is the case, you could instead look at using mapPartitions and setting up your Elasticsearch connection inside of that, so you could then re-use the client for all of the queries on each partition. This approach will avoid having to serialize the Elasticsearch connection because it will be local to your function. Hope this helps! Cheers, Holden :) On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: Its not used as default serializer for some issues with compatibility requirement to register the classes.. Which part are you getting as nonserializable... you need to serialize that class if you are sending it to spark workers inside a map, reduce , mappartition or any of the operations on RDD. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng pc...@uow.edu.au wrote: I'm afraid persisting connection across two tasks is a dangerous act as they can't be guaranteed to be executed on the same machine. Your ES server may think its a man-in-the-middle attack! I think its possible to invoke a static method that give you a connection in a local 'pool', so nothing will sneak into your closure, but its too complex and there should be a better option. Never use kryo before, if its that good perhaps we should use it as the default serializer -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Cell : 425-233-8271 -- Cell : 425-233-8271
Re: Sample datasets for MLlib and Graphx
Take a look at Kaggle competition datasets - https://www.kaggle.com/competitions For svm there are a couple of ad click prediction datasets of pretty large size. For graph stuff the SNAP has large network data: https://snap.stanford.edu/data/ — Sent from Mailbox On Thu, Jul 3, 2014 at 3:25 PM, AlexanderRiggers alexander.rigg...@gmail.com wrote: Hello! I want to play around with several different cluster settings and measure performances for MLlib and GraphX and was wondering if anybody here could hit me up with datasets for these applications from 5GB onwards? I mostly interested in SVM and Triangle Count, but would be glad for any help. Best regards, Alex -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sample-datasets-for-MLlib-and-Graphx-tp8760.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Sample datasets for MLlib and Graphx
The Kaggle data is not in libsvm format so you'd have to do some transformation. The Criteo and KDD cup datasets are if I recall fairly large. Criteo ad prediction data is around 2-3GB compressed I think. To my knowledge these are the largest binary classification datasets I've come across which are easily publicly available (very happy to be proved wrong about this though :) — Sent from Mailbox On Thu, Jul 3, 2014 at 4:39 PM, AlexanderRiggers alexander.rigg...@gmail.com wrote: Nick Pentreath wrote Take a look at Kaggle competition datasets - https://www.kaggle.com/competitions I was looking for files in LIBSVM format and never found something on Kaggle in bigger size. Most competitions I ve seen need data processing and feature generating, but maybe I ve to take a second look. Nick Pentreath wrote For graph stuff the SNAP has large network data: https://snap.stanford.edu/data/ Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sample-datasets-for-MLlib-and-Graphx-tp8760p8762.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: DynamoDB input source
You should be able to use DynamoDBInputFormat (I think this should be part of AWS libraries for Java) and create a HadoopRDD from that. On Fri, Jul 4, 2014 at 8:28 AM, Ian Wilkinson ia...@me.com wrote: Hi, I noticed mention of DynamoDB as input source in http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf . Unfortunately, Google is not coming to my rescue on finding further mention for this support. Any pointers would be well received. Big thanks, ian
Re: DynamoDB input source
No boto support for that. In master there is Python support for loading Hadoop inputFormat. Not sure if it will be in 1.0.1 or 1.1 I master docs under the programming guide are instructions and also under examples project there are pyspark examples of using Cassandra and HBase. These should hopefully give you enough to get started. Depending on how easy it is to use the dynamo DB format, you may have to write a custom converter (see the mentioned examples for storm details). Sent from my iPhone On 4 Jul 2014, at 08:38, Ian Wilkinson ia...@me.com wrote: Hi Nick, I’m going to be working with python primarily. Are you aware of comparable boto support? ian On 4 Jul 2014, at 16:32, Nick Pentreath nick.pentre...@gmail.com wrote: You should be able to use DynamoDBInputFormat (I think this should be part of AWS libraries for Java) and create a HadoopRDD from that. On Fri, Jul 4, 2014 at 8:28 AM, Ian Wilkinson ia...@me.com wrote: Hi, I noticed mention of DynamoDB as input source in http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf. Unfortunately, Google is not coming to my rescue on finding further mention for this support. Any pointers would be well received. Big thanks, ian
Re: DynamoDB input source
I should qualify by saying there is boto support for dynamodb - but not for the inputFormat. You could roll your own python-based connection but this involves figuring out how to split the data in dynamo - inputFormat takes care of this so should be the easier approach — Sent from Mailbox On Fri, Jul 4, 2014 at 8:51 AM, Ian Wilkinson ia...@me.com wrote: Excellent. Let me get browsing on this. Huge thanks, ian On 4 Jul 2014, at 16:47, Nick Pentreath nick.pentre...@gmail.com wrote: No boto support for that. In master there is Python support for loading Hadoop inputFormat. Not sure if it will be in 1.0.1 or 1.1 I master docs under the programming guide are instructions and also under examples project there are pyspark examples of using Cassandra and HBase. These should hopefully give you enough to get started. Depending on how easy it is to use the dynamo DB format, you may have to write a custom converter (see the mentioned examples for storm details). Sent from my iPhone On 4 Jul 2014, at 08:38, Ian Wilkinson ia...@me.com wrote: Hi Nick, I’m going to be working with python primarily. Are you aware of comparable boto support? ian On 4 Jul 2014, at 16:32, Nick Pentreath nick.pentre...@gmail.com wrote: You should be able to use DynamoDBInputFormat (I think this should be part of AWS libraries for Java) and create a HadoopRDD from that. On Fri, Jul 4, 2014 at 8:28 AM, Ian Wilkinson ia...@me.com wrote: Hi, I noticed mention of DynamoDB as input source in http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf. Unfortunately, Google is not coming to my rescue on finding further mention for this support. Any pointers would be well received. Big thanks, ian
Re: DynamoDB input source
Interesting - I would have thought they would make that available publicly. Unfortunately, unless you can use Spark on EMR, I guess your options are to hack it by spinning up an EMR cluster and getting the JAR, or maybe fall back to using boto and rolling your own :( On Fri, Jul 4, 2014 at 9:28 AM, Ian Wilkinson ia...@me.com wrote: Trying to discover source for the DynamoDBInputFormat. Not appearing in: - https://github.com/aws/aws-sdk-java - https://github.com/apache/hive Then came across http://stackoverflow.com/questions/1704/jar-containing-org-apache-hadoop-hive-dynamodb . Unsure whether this represents the latest situation… ian On 4 Jul 2014, at 16:58, Nick Pentreath nick.pentre...@gmail.com wrote: I should qualify by saying there is boto support for dynamodb - but not for the inputFormat. You could roll your own python-based connection but this involves figuring out how to split the data in dynamo - inputFormat takes care of this so should be the easier approach — Sent from Mailbox https://www.dropbox.com/mailbox On Fri, Jul 4, 2014 at 8:51 AM, Ian Wilkinson ia...@me.com wrote: Excellent. Let me get browsing on this. Huge thanks, ian On 4 Jul 2014, at 16:47, Nick Pentreath nick.pentre...@gmail.com wrote: No boto support for that. In master there is Python support for loading Hadoop inputFormat. Not sure if it will be in 1.0.1 or 1.1 I master docs under the programming guide are instructions and also under examples project there are pyspark examples of using Cassandra and HBase. These should hopefully give you enough to get started. Depending on how easy it is to use the dynamo DB format, you may have to write a custom converter (see the mentioned examples for storm details). Sent from my iPhone On 4 Jul 2014, at 08:38, Ian Wilkinson ia...@me.com wrote: Hi Nick, I’m going to be working with python primarily. Are you aware of comparable boto support? ian On 4 Jul 2014, at 16:32, Nick Pentreath nick.pentre...@gmail.com wrote: You should be able to use DynamoDBInputFormat (I think this should be part of AWS libraries for Java) and create a HadoopRDD from that. On Fri, Jul 4, 2014 at 8:28 AM, Ian Wilkinson ia...@me.com wrote: Hi, I noticed mention of DynamoDB as input source in http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf . Unfortunately, Google is not coming to my rescue on finding further mention for this support. Any pointers would be well received. Big thanks, ian
Re: taking top k values of rdd
To make it efficient in your case you may need to do a bit of custom code to emit the top k per partition and then only send those to the driver. On the driver you can just top k the combined top k from each partition (assuming you have (object, count) for each top k list). — Sent from Mailbox On Sat, Jul 5, 2014 at 10:17 AM, Koert Kuipers ko...@tresata.com wrote: my initial approach to taking top k values of a rdd was using a priority-queue monoid. along these lines: rdd.mapPartitions({ items = Iterator.single(new PriorityQueue(...)) }, false).reduce(monoid.plus) this works fine, but looking at the code for reduce it first reduces within a partition (which doesnt help me) and then sends the results to the driver where these again get reduced. this means that for every partition the (potentially very bulky) priorityqueue gets shipped to the driver. my driver is client side, not inside cluster, and i cannot change this, so this shipping to driver of all these queues can be expensive. is there a better way to do this? should i try to a shuffle first to reduce the partitions to the minimal amount (since number of queues shipped is equal to number of partitions)? is was a way to reduce to a single item RDD, so the queues stay inside cluster and i can retrieve the final result with RDD.first?
Re: taking top k values of rdd
Right. That is unavoidable unless as you say you repartition into 1 partition, which may do the trick. When I say send the top k per partition I don't mean send the pq but the actual values. This may end up being relatively small if k and p are not too big. (I'm not sure how large serialized pq is). — Sent from Mailbox On Sat, Jul 5, 2014 at 10:29 AM, Koert Kuipers ko...@tresata.com wrote: hey nick, you are right. i didnt explain myself well and my code example was wrong... i am keeping a priority-queue with k items per partition (using com.twitter.algebird.mutable.PriorityQueueMonoid.build to limit the sizes of the queues). but this still means i am sending k items per partition to my driver, so k x p, while i only need k. thanks! koert On Sat, Jul 5, 2014 at 1:21 PM, Nick Pentreath nick.pentre...@gmail.com wrote: To make it efficient in your case you may need to do a bit of custom code to emit the top k per partition and then only send those to the driver. On the driver you can just top k the combined top k from each partition (assuming you have (object, count) for each top k list). — Sent from Mailbox https://www.dropbox.com/mailbox On Sat, Jul 5, 2014 at 10:17 AM, Koert Kuipers ko...@tresata.com wrote: my initial approach to taking top k values of a rdd was using a priority-queue monoid. along these lines: rdd.mapPartitions({ items = Iterator.single(new PriorityQueue(...)) }, false).reduce(monoid.plus) this works fine, but looking at the code for reduce it first reduces within a partition (which doesnt help me) and then sends the results to the driver where these again get reduced. this means that for every partition the (potentially very bulky) priorityqueue gets shipped to the driver. my driver is client side, not inside cluster, and i cannot change this, so this shipping to driver of all these queues can be expensive. is there a better way to do this? should i try to a shuffle first to reduce the partitions to the minimal amount (since number of queues shipped is equal to number of partitions)? is was a way to reduce to a single item RDD, so the queues stay inside cluster and i can retrieve the final result with RDD.first?
Re: How to parallelize model fitting with different cross-validation folds?
For linear models the 3rd option is by far most efficient and I suspect what Evan is alluding to. Unfortunately it's not directly possible with the classes in Mllib now so you'll have to roll your own using underlying sgd / bfgs primitives. — Sent from Mailbox On Sat, Jul 5, 2014 at 10:45 AM, Christopher Nguyen c...@adatao.com wrote: Hi sparkuser2345, I'm inferring the problem statement is something like how do I make this complete faster (given my compute resources)? Several comments. First, Spark only allows launching parallel tasks from the driver, not from workers, which is why you're seeing the exception when you try. Whether the latter is a sensible/doable idea is another discussion, but I can appreciate why many people assume this should be possible. Second, on optimization, you may be able to apply Sean's idea about (thread) parallelism at the driver, combined with the knowledge that often these cluster tasks bottleneck while competing for the same resources at the same time (cpu vs disk vs network, etc.) You may be able to achieve some performance optimization by randomizing these timings. This is not unlike GMail randomizing user storage locations around the world for load balancing. Here, you would partition each of your RDDs into a different number of partitions, making some tasks larger than others, and thus some may be in cpu-intensive map while others are shuffling data around the network. This is rather cluster-specific; I'd be interested in what you learn from such an exercise. Third, I find it useful always to consider doing as much as possible in one pass, subject to memory limits, e.g., mapPartitions() vs map(), thus minimizing map/shuffle/reduce boundaries with their context switches and data shuffling. In this case, notice how you're running the training+prediction k times over mostly the same rows, with map/reduce boundaries in between. While the training phase is sealed in this context, you may be able to improve performance by collecting all the k models together, and do a [m x k] predictions all at once which may end up being faster. Finally, as implied from the above, for the very common k-fold cross-validation pattern, the algorithm itself might be written to be smart enough to take both train and test data and do the right thing within itself, thus obviating the need for the user to prepare k data sets and running over them serially, and likely saving a lot of repeated computations in the right internal places. Enjoy, -- Christopher T. Nguyen Co-founder CEO, Adatao http://adatao.com linkedin.com/in/ctnguyen On Sat, Jul 5, 2014 at 1:50 AM, Sean Owen so...@cloudera.com wrote: If you call .par on data_kfolded it will become a parallel collection in Scala and so the maps will happen in parallel . On Jul 5, 2014 9:35 AM, sparkuser2345 hm.spark.u...@gmail.com wrote: Hi, I am trying to fit a logistic regression model with cross validation in Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where each element is a pair of RDDs containing the training and test data: (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint], test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint]) scala data_kfolded res21: Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint], org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])] = Array((MappedRDD[9] at map at console:24,MappedRDD[7] at map at console:23), (MappedRDD[13] at map at console:24,MappedRDD[11] at map at console:23), (MappedRDD[17] at map at console:24,MappedRDD[15] at map at console:23)) Everything works fine when using data_kfolded: val validationErrors = data_kfolded.map { datafold = val svmAlg = new SVMWithSGD() val model_reg = svmAlg.run(datafold._1) val labelAndPreds = datafold._2.map { point = val prediction = model_reg.predict(point.features) (point.label, prediction) } val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble / datafold._2.count trainErr.toDouble } scala validationErrors res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837, 0.29833546734955185) However, I have understood that the models are not fitted in parallel as data_kfolded is not an RDD (although it's an array of pairs of RDDs). When running the same code where data_kfolded has been replaced with sc.parallelize(data_kfolded), I get a null pointer exception from the line where the run method of the SVMWithSGD object is called with the traning data. I guess this is somehow related to the fact that RDDs can't be accessed from inside a closure. I fail to understand though why the first version works and the second doesn't. Most importantly, is there a way to fit the models in parallel? I would really appreciate your help. val validationErrors = sc.parallelize(data_kfolded).map { datafold = val svmAlg = new SVMWithSGD() val
Re: Recommended pipeline automation tool? Oozie?
You may look into the new Azkaban - which while being quite heavyweight is actually quite pleasant to use when set up. You can run spark jobs (spark-submit) using azkaban shell commands and pass paremeters between jobs. It supports dependencies, simple dags and scheduling with retries. I'm digging deeper and it may be worthwhile extending it with a Spark job type... It's probably best for mixed Hadoop / Spark clusters... — Sent from Mailbox On Fri, Jul 11, 2014 at 12:52 AM, Andrei faithlessfri...@gmail.com wrote: I used both - Oozie and Luigi - but found them inflexible and still overcomplicated, especially in presence of Spark. Oozie has a fixed list of building blocks, which is pretty limiting. For example, you can launch Hive query, but Impala, Shark/SparkSQL, etc. are out of scope (of course, you can always write wrapper as Java or Shell action, but does it really need to be so complicated?). Another issue with Oozie is passing variables between actions. There's Oozie context that is suitable for passing key-value pairs (both strings) between actions, but for more complex objects (say, FileInputStream that should be closed at last step only) you have to do some advanced kung fu. Luigi, on other hand, has its niche - complicated dataflows with many tasks that depend on each other. Basically, there are tasks (this is where you define computations) and targets (something that can exist - file on disk, entry in ZooKeeper, etc.). You ask Luigi to get some target, and it creates a plan for achieving this. Luigi is really shiny when your workflow fits this model, but one step away and you are in trouble. For example, consider simple pipeline: run MR job and output temporary data, run another MR job and output final data, clean temporary data. You can make target Clean, that depends on target MRJob2 that, in its turn, depends on MRJob1, right? Not so easy. How do you check that Clean task is achieved? If you just test whether temporary directory is empty or not, you catch both cases - when all tasks are done and when they are not even started yet. Luigi allows you to specify all 3 actions - MRJob1, MRJob2, Clean - in a single run() method, but ruins the entire idea. And of course, both of these frameworks are optimized for standard MapReduce jobs, which is probably not what you want on Spark mailing list :) Experience with these frameworks, however, gave me some insights about typical data pipelines. 1. Pipelines are mostly linear. Oozie, Luigi and number of other frameworks allow branching, but most pipelines actually consist of moving data from source to destination with possibly some transformations in between (I'll be glad if somebody share use cases when you really need branching). 2. Transactional logic is important. Either everything, or nothing. Otherwise it's really easy to get into inconsistent state. 3. Extensibility is important. You never know what will need in a week or two. So eventually I decided that it is much easier to create your own pipeline instead of trying to adopt your code to existing frameworks. My latest pipeline incarnation simply consists of a list of steps that are started sequentially. Each step is a class with at least these methods: * run() - launch this step * fail() - what to do if step fails * finalize() - (optional) what to do when all steps are done For example, if you want to add possibility to run Spark jobs, you just create SparkStep and configure it with required code. If you want Hive query - just create HiveStep and configure it with Hive connection settings. I use YAML file to configure steps and Context (basically, Map[String, Any]) to pass variables between them. I also use configurable Reporter available for all steps to report the progress. Hopefully, this will give you some insights about best pipeline for your specific case. On Thu, Jul 10, 2014 at 9:10 PM, Paul Brown p...@mult.ifario.us wrote: We use Luigi for this purpose. (Our pipelines are typically on AWS (no EMR) backed by S3 and using combinations of Python jobs, non-Spark Java/Scala, and Spark. We run Spark jobs by connecting drivers/clients to the master, and those are what is invoked from Luigi.) — p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/ On Thu, Jul 10, 2014 at 10:20 AM, k.tham kevins...@gmail.com wrote: I'm just wondering what's the general recommendation for data pipeline automation. Say, I want to run Spark Job A, then B, then invoke script C, then do D, and if D fails, do E, and if Job A fails, send email F, etc... It looks like Oozie might be the best choice. But I'd like some advice/suggestions. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Recommended-pipeline-automation-tool-Oozie-tp9319.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Recommended pipeline automation tool? Oozie?
Did you use old azkaban or azkaban 2.5? It has been completely rewritten. Not saying it is the best but I found it way better than oozie for example. Sent from my iPhone On 11 Jul 2014, at 09:24, 明风 mingf...@taobao.com wrote: We use Azkaban for a short time and suffer a lot. Finally we almost rewrite it totally. Don’t recommend it really. 发件人: Nick Pentreath nick.pentre...@gmail.com 答复: user@spark.apache.org 日期: 2014年7月11日 星期五 下午3:18 至: user@spark.apache.org 主题: Re: Recommended pipeline automation tool? Oozie? You may look into the new Azkaban - which while being quite heavyweight is actually quite pleasant to use when set up. You can run spark jobs (spark-submit) using azkaban shell commands and pass paremeters between jobs. It supports dependencies, simple dags and scheduling with retries. I'm digging deeper and it may be worthwhile extending it with a Spark job type... It's probably best for mixed Hadoop / Spark clusters... — Sent from Mailbox On Fri, Jul 11, 2014 at 12:52 AM, Andrei faithlessfri...@gmail.com wrote: I used both - Oozie and Luigi - but found them inflexible and still overcomplicated, especially in presence of Spark. Oozie has a fixed list of building blocks, which is pretty limiting. For example, you can launch Hive query, but Impala, Shark/SparkSQL, etc. are out of scope (of course, you can always write wrapper as Java or Shell action, but does it really need to be so complicated?). Another issue with Oozie is passing variables between actions. There's Oozie context that is suitable for passing key-value pairs (both strings) between actions, but for more complex objects (say, FileInputStream that should be closed at last step only) you have to do some advanced kung fu. Luigi, on other hand, has its niche - complicated dataflows with many tasks that depend on each other. Basically, there are tasks (this is where you define computations) and targets (something that can exist - file on disk, entry in ZooKeeper, etc.). You ask Luigi to get some target, and it creates a plan for achieving this. Luigi is really shiny when your workflow fits this model, but one step away and you are in trouble. For example, consider simple pipeline: run MR job and output temporary data, run another MR job and output final data, clean temporary data. You can make target Clean, that depends on target MRJob2 that, in its turn, depends on MRJob1, right? Not so easy. How do you check that Clean task is achieved? If you just test whether temporary directory is empty or not, you catch both cases - when all tasks are done and when they are not even started yet. Luigi allows you to specify all 3 actions - MRJob1, MRJob2, Clean - in a single run() method, but ruins the entire idea. And of course, both of these frameworks are optimized for standard MapReduce jobs, which is probably not what you want on Spark mailing list :) Experience with these frameworks, however, gave me some insights about typical data pipelines. 1. Pipelines are mostly linear. Oozie, Luigi and number of other frameworks allow branching, but most pipelines actually consist of moving data from source to destination with possibly some transformations in between (I'll be glad if somebody share use cases when you really need branching). 2. Transactional logic is important. Either everything, or nothing. Otherwise it's really easy to get into inconsistent state. 3. Extensibility is important. You never know what will need in a week or two. So eventually I decided that it is much easier to create your own pipeline instead of trying to adopt your code to existing frameworks. My latest pipeline incarnation simply consists of a list of steps that are started sequentially. Each step is a class with at least these methods: * run() - launch this step * fail() - what to do if step fails * finalize() - (optional) what to do when all steps are done For example, if you want to add possibility to run Spark jobs, you just create SparkStep and configure it with required code. If you want Hive query - just create HiveStep and configure it with Hive connection settings. I use YAML file to configure steps and Context (basically, Map[String, Any]) to pass variables between them. I also use configurable Reporter available for all steps to report the progress. Hopefully, this will give you some insights about best pipeline for your specific case. On Thu, Jul 10, 2014 at 9:10 PM, Paul Brown p...@mult.ifario.us wrote: We use Luigi for this purpose. (Our pipelines are typically on AWS (no EMR) backed by S3 and using combinations of Python jobs, non-Spark Java/Scala, and Spark. We run Spark jobs by connecting drivers/clients to the master, and those are what is invoked from Luigi.) — p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/ On Thu, Jul 10, 2014 at 10:20 AM, k.tham
Re: import org.apache.spark.streaming.twitter._ in Shell
You could try the following: create a minimal project using sbt or Maven, add spark-streaming-twitter as a dependency, run sbt assembly (or mvn package) on that to create a fat jar (with Spark as provided dependency), and add that to the shell classpath when starting up. On Tue, Jul 15, 2014 at 9:06 AM, Praveen Seluka psel...@qubole.com wrote: If you want to make Twitter* classes available in your shell, I believe you could do the following 1. Change the parent pom module ordering - Move external/twitter before assembly 2. In assembly/pom.xm, add external/twitter dependency - this will package twitter* into the assembly jar Now when spark-shell is launched, assembly jar is in classpath - hence twitter* too. I think this will work (remember trying this sometime back) On Tue, Jul 15, 2014 at 11:59 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Hmm, I'd like to clarify something from your comments, Tathagata. Going forward, is Twitter Streaming functionality not supported from the shell? What should users do if they'd like to process live Tweets from the shell? Nick On Mon, Jul 14, 2014 at 11:50 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: At some point, you were able to access TwitterUtils from spark shell using Spark 1.0.0+ ? Yep. If yes, then what change in Spark caused it to not work any more? It still works for me. I was just commenting on your remark that it doesn't work through the shell, which I now understand to apply to versions of Spark before 1.0.0. Nick
Re: Count distinct with groupBy usage
You can use .distinct.count on your user RDD. What are you trying to achieve with the time group by? — Sent from Mailbox On Tue, Jul 15, 2014 at 8:14 PM, buntu buntu...@gmail.com wrote: Hi -- New to Spark and trying to figure out how to do a generate unique counts per page by date given this raw data: timestamp,page,userId 1405377264,google,user1 1405378589,google,user2 1405380012,yahoo,user1 .. I can do a groupBy a field and get the count: val lines=sc.textFile(data.csv) val csv=lines.map(_.split(,)) // group by page csv.groupBy(_(1)).count But not able to see how to do count distinct on userId and also apply another groupBy on timestamp field. Please let me know how to handle such cases. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Count-distinct-with-groupBy-usage-tp9781.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Large scale ranked recommendation
It is very true that making predictions in batch for all 1 million users against the 10k items will be quite onerous in terms of computation. I have run into this issue too in making batch predictions. Some ideas: 1. Do you really need to generate recommendations for each user in batch? How are you serving these recommendations? In most cases, you only need to make recs when a user is actively interacting with your service or product etc. Doing it all in batch tends to be a big waste of computation resources. In our system for example we are serving them in real time (as a user arrives at a web page, say, our customer hits our API for recs), so we only generate the rec at that time. You can take a look at Oryx for this ( https://github.com/cloudera/oryx) though it does not yet support Spark, you may be able to save the model into the correct format in HDFS and have Oryx read the data. 2. If you do need to make the recs in batch, then I would suggest: (a) because you have few items, I would collect the item vectors and form a matrix. (b) broadcast that matrix (c) do a mapPartitions on the user vectors. Form a user matrix from the vectors in each partition (maybe create quite a few partitions to make each user matrix not too big) (d) do a value call on the broadcasted item matrix (e) now for each partition you have the (small) item matrix and a (larger) user matrix. Do a matrix multiply and you end up with a (U x I) matrix with the scores for each user in the partition. Because you are using BLAS here, it will be significantly faster than individually computed dot products (f) sort the scores for each user and take top K (g) save or collect and do whatever with the scores 3. in conjunction with (2) you can try throwing more resources at the problem too If you access the underlying Breeze vectors (I think the toBreeze method is private so you may have to re-implement it), you can do all this using Breeze (e.g. concatenating vectors to make matrices, iterating and whatnot). Hope that helps Nick On Fri, Jul 18, 2014 at 1:17 AM, m3.sharma sharm...@umn.edu wrote: Yes, thats what prediction should be doing, taking dot products or sigmoid function for each user,item pair. For 1 million users and 10 K items data there are 10 billion pairs. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-ranked-recommendation-tp10098p10107.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Large scale ranked recommendation
Agree GPUs may be interesting for this kind of massively parallel linear algebra on reasonable size vectors. These projects might be of interest in this regard: https://github.com/BIDData/BIDMach https://github.com/BIDData/BIDMat https://github.com/dlwh/gust Nick On Fri, Jul 18, 2014 at 7:40 PM, m3.sharma sharm...@umn.edu wrote: Thanks Nick real-time suggestion is good, will see if we can add that to our deployment strategy and you are correct we may not need recommendation for each user. Will try adding more resources and broadcasting item features suggestion as currently they don't seem to be huge. As users and items both will continue to grow in future for faster vector computations I think few GPU nodes will suffice to serve faster recommendation after learning model with SPARK. It will be great to have builtin GPU support in SPARK for faster computations to leverage GPU capability of nodes for performing these flops faster. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-ranked-recommendation-tp10098p10183.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: NullPointerException When Reading Avro Sequence Files
I got this working locally a little while ago when playing around with AvroKeyInputFile: https://gist.github.com/MLnick/5864741781b9340cb211 But not sure about AvroSequenceFile. Any chance you have an example datafile or records? On Sat, Jul 19, 2014 at 11:00 AM, Sparky gullo_tho...@bah.com wrote: To be more specific, I'm working with a system that stores data in org.apache.avro.hadoop.io.AvroSequenceFile format. An AvroSequenceFile is A wrapper around a Hadoop SequenceFile that also supports reading and writing Avro data. It seems that Spark does not support this out of the box. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-when-reading-Avro-Sequence-files-tp10201p10234.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark clustered client
At the moment your best bet for sharing SparkContexts across jobs will be Ooyala job server: https://github.com/ooyala/spark-jobserver It doesn't yet support spark 1.0 though I did manage to amend it to get it to build and run on 1.0 — Sent from Mailbox On Wed, Jul 23, 2014 at 1:21 AM, Asaf Lahav asaf.la...@gmail.com wrote: Hi Folks, I have been trying to dig up some information in regards to what are the possibilities when wanting to deploy more than one client process that consumes Spark. Let's say I have a Spark Cluster of 10 servers, and would like to setup 2 additional servers which are sending requests to it through a Spark context, referencing one specific file of 1TB of data. Each client process, has its own SparkContext instance. Currently, the result is that that same file is loaded into memory twice because the Spark Context resources are not shared between processes/jvms. I wouldn't like to have that same file loaded over and over again with every new client being introduced. What would be the best practice here? Am I missing something? Thank you, Asaf
Re: Workarounds for accessing sequence file data via PySpark?
Load from sequenceFile for PySpark is in master and save is in this PR underway (https://github.com/apache/spark/pull/1338) I hope that Kan will have it ready to merge in time for 1.1 release window (it should be, the PR just needs a final review or two). In the meantime you can check out master and test out the sequenceFile load support in PySpark (there are examples in the /examples project and in python test, and some documentation in /docs) On Wed, Jul 23, 2014 at 4:42 PM, Gary Malouf malouf.g...@gmail.com wrote: I am aware that today PySpark can not load sequence files directly. Are there work-arounds people are using (short of duplicating all the data to text files) for accessing this data?
Re: iScala or Scala-notebook
IScala itself seems to be a bit dead unfortunately. I did come across this today: https://github.com/tribbloid/ISpark On Fri, Jul 18, 2014 at 4:59 AM, ericjohnston1989 ericjohnston1...@gmail.com wrote: Hey everyone, I know this was asked before but I'm wondering if there have since been any updates. Are there any plans to integrate iScala/Scala-notebook with spark in the near future? This seems like something a lot of people would find very useful, so I was just wondering if anyone has started working on it. Thanks, Eric -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/iScala-or-Scala-notebook-tp10127.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: zip two RDD in pyspark
parallelize uses the default Serializer (PickleSerializer) while textFile uses UTF8Serializer. You can get around this with index.zip(input_data._reserialize()) (or index.zip(input_data.map(lambda x: x))) (But if you try to just do this, you run into the issue with different number of partitions): index.zip(input_data._reserialize()).count() Py4JJavaError: An error occurred while calling o60.collect. : java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions at org.apache.spark.rdd.ZippedRDD.getPartitions(ZippedRDD.scala:55) On Wed, Jul 30, 2014 at 7:53 AM, Davies Liu dav...@databricks.com wrote: On Mon, Jul 28, 2014 at 12:58 PM, l lishu...@gmail.com wrote: I have a file in s3 that I want to map each line with an index. Here is my code: input_data = sc.textFile('s3n:/myinput',minPartitions=6).cache() N input_data.count() index = sc.parallelize(range(N), 6) index.zip(input_data).collect() I think you can not do zipWithIndex() in this way, because the number of lines in each partition of input_data will be different than index. You need get the exact number of lines for each partitions first, then generate correct index. It will be easy to do with mapPartitions() nums = input_data.mapPartitions(lambda it: [sum(1 for i in it)]).collect() starts = [sum(nums[:i]) for i in range(len(nums))] zipped = input_data.mapPartitionsWithIndex(lambda i,it: ((starts[i]+j, x) for j,x in enumerate(it))) ... 14/07/28 19:49:31 INFO DAGScheduler: Completed ResultTask(18, 4) 14/07/28 19:49:31 INFO DAGScheduler: Stage 18 (collect at stdin:1) finished in 0.031 s 14/07/28 19:49:31 INFO SparkContext: Job finished: collect at stdin:1, took 0.03707 s Traceback (most recent call last): File stdin, line 1, in module File /root/spark/python/pyspark/rdd.py, line 584, in collect return list(self._collect_iterator_through_file(bytesInJava)) File /root/spark/python/pyspark/rdd.py, line 592, in _collect_iterator_through_file self.ctx._writeToFile(iterator, tempFile.name) File /root/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 537, in __call__ File /root/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.writeToFile. : java.lang.ClassCastException: java.lang.String cannot be cast to [B at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:312) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:309) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:309) at org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:342) at org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:337) at org.apache.spark.api.python.PythonRDD.writeToFile(PythonRDD.scala) at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:744) As I see it, the job is completed, but I don't understand what's happening to 'String cannot be cast to [B'. I tried to zip two parallelCollectionRDD and it works fine. But here I have a MappedRDD at textFile. Not sure what's going on here. Could you provide an script and dataset to reproduce this error? Maybe there are some corner cases during serialization. Also, why Python does not have ZipWithIndex()? The features in PySpark are much less than Spark, hopefully it will catch up in next two releases. Thanks for any help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/zip-two-RDD-in-pyspark-tp10806.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass with spark-submit
I'm also getting this - Ryan we both seem to be running into this issue with elasticsearch-hadoop :) I tried spark.files.userClassPathFirst true on command line and that doesn;t work If I put it that line in spark/conf/spark-defaults it works but now I'm getting: java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/InputFormat think I may need to add hadoop-client to my assembly, but any other ideas welcome. Ryan, will let you know how I get on On Mon, Aug 4, 2014 at 10:28 AM, Sean Owen so...@cloudera.com wrote: I'm guessing you have the Jackson classes in your assembly but so does Spark. Its classloader wins, and does not contain the class present in your app's version of Jackson. Try spark.files.userClassPathFirst ? On Mon, Aug 4, 2014 at 6:28 AM, Ryan Braley r...@traintracks.io wrote: Hi Folks, I have an assembly jar that I am submitting using spark-submit script on a cluster I created with the spark-ec2 script. I keep running into the java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass error on my workers even though jar tf clearly shows that class being a part of my assembly jar. I have the spark program working locally. Here is the error log: https://gist.github.com/rbraley/cf5cd3457a89b1c0ac88 Anybody have any suggestions of things I can try? It seems http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3c1403899110.65393.yahoomail...@web160503.mail.bf1.yahoo.com%3E that this is a similar error. I am open to recompiling spark to fix this, but I would like to run my job on my cluster rather than just locally. Thanks, Ryan Ryan Braley | Founder http://traintracks.io/ US: +1 (206) 866 5661 CN: +86 156 1153 7598 Coding the future. Decoding the game. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass with spark-submit
By the way, for anyone using elasticsearch-hadoop, there is a fix for this here: https://github.com/elasticsearch/elasticsearch-hadoop/issues/239 Ryan - using the nightly snapshot build of 2.1.0.BUILD-SNAPSHOT fixed this for me. On Thu, Aug 7, 2014 at 3:58 PM, Nick Pentreath nick.pentre...@gmail.com wrote: I'm also getting this - Ryan we both seem to be running into this issue with elasticsearch-hadoop :) I tried spark.files.userClassPathFirst true on command line and that doesn;t work If I put it that line in spark/conf/spark-defaults it works but now I'm getting: java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/InputFormat think I may need to add hadoop-client to my assembly, but any other ideas welcome. Ryan, will let you know how I get on On Mon, Aug 4, 2014 at 10:28 AM, Sean Owen so...@cloudera.com wrote: I'm guessing you have the Jackson classes in your assembly but so does Spark. Its classloader wins, and does not contain the class present in your app's version of Jackson. Try spark.files.userClassPathFirst ? On Mon, Aug 4, 2014 at 6:28 AM, Ryan Braley r...@traintracks.io wrote: Hi Folks, I have an assembly jar that I am submitting using spark-submit script on a cluster I created with the spark-ec2 script. I keep running into the java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass error on my workers even though jar tf clearly shows that class being a part of my assembly jar. I have the spark program working locally. Here is the error log: https://gist.github.com/rbraley/cf5cd3457a89b1c0ac88 Anybody have any suggestions of things I can try? It seems http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3c1403899110.65393.yahoomail...@web160503.mail.bf1.yahoo.com%3E that this is a similar error. I am open to recompiling spark to fix this, but I would like to run my job on my cluster rather than just locally. Thanks, Ryan Ryan Braley | Founder http://traintracks.io/ US: +1 (206) 866 5661 CN: +86 156 1153 7598 Coding the future. Decoding the game. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Failed running Spark ALS
Have you set spark.local.dir (I think this is the config setting)? It needs to point to a volume with plenty of space. By default if I recall it point to /tmp Sent from my iPhone On 19 Sep 2014, at 23:35, jw.cmu jinliangw...@gmail.com wrote: I'm trying to run Spark ALS using the netflix dataset but failed due to No space on device exception. It seems the exception is thrown after the training phase. It's not clear to me what is being written and where is the output directory. I was able to run the same code on the provided test.data dataset. I'm new to Spark and I'd like to get some hints for resolving this problem. The code I ran was got from https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html (the Java version). Relevant info: Spark version: 1.0.2 (Standalone deployment) # slaves/workers/exectuors: 8 Core per worker: 64 memory per executor: 100g Application parameters are left as default. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Failed-running-Spark-ALS-tp14704.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark 1.1.0 - hbase 0.98.6-hadoop2 version - py4j.protocol.Py4JJavaError java.lang.ClassNotFoundException
forgot to copy user list On Sat, Oct 4, 2014 at 3:12 PM, Nick Pentreath nick.pentre...@gmail.com wrote: what version did you put in the pom.xml? it does seem to be in Maven central: http://search.maven.org/#artifactdetails%7Corg.apache.hbase%7Chbase%7C0.98.6-hadoop2%7Cpom dependency groupIdorg.apache.hbase/groupId artifactIdhbase/artifactId version0.98.6-hadoop2/version /dependency Note you shouldn't need to rebuild Spark, I think just the example project via sbt examples/assembly On Fri, Oct 3, 2014 at 10:55 AM, serkan.dogan foreignerdr...@yahoo.com wrote: Hi, I installed hbase-0.98.6-hadoop2. It's working not any problem with that. When i am try to run spark hbase python examples, (wordcount examples working - not python issue) ./bin/spark-submit --master local --driver-class-path ./examples/target/spark-examples_2.10-1.1.0.jar ./examples/src/main/python/hbase_inputformat.py localhost myhbasetable the process exit with ClassNotFoundException... I search lots of blogs, sites all says spark 1.1 version built with hbase 0.94.6 rebuild with own hbase version. I try first, change hbase version number - in pom.xml -- nothing found maven central I try second, compile hbase from src and copy hbase/lib folder hbase jars to spark/lib_managed folder and edit spark-defaults.conf my spark-defaults.conf spark.executor.extraClassPath /home/downloads/spark/spark-1.1.0/lib_managed/jars/hbase-server-0.98.6-hadoop2.jar:/home/downloads/spark/spark-1.1.0/lib_managed/jars/hbase-protocol-0.98.6-hadoop2.jar:/home/downloads/spark/spark-1.1.0/lib_managed/jars/hbase-hadoop2-compat-0.98.6-hadoop2.jar:/home/downloads/spark/spark-1.1.0/lib_managed/jars/hbase-client-0.98.6-hadoop2.jar:/home/downloads/spark/spark-1.1.0/lib_managed/jars/hbase-commont-0.98.6-hadoop2.jar:/home/downloads/spark/spark-1.1.0/lib_managed/jars/htrace-core-2.04.jar My question is how i can work with hbase 0.98.6-hadoop2 with spark 1.1.0 Here is the exception message Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/10/03 11:27:15 WARN Utils: Your hostname, xxx.yyy.com resolves to a loopback address: 127.0.0.1; using 1.1.1.1 instead (on interface eth0) 14/10/03 11:27:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 14/10/03 11:27:15 INFO SecurityManager: Changing view acls to: root, 14/10/03 11:27:15 INFO SecurityManager: Changing modify acls to: root, 14/10/03 11:27:15 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root, ); users with modify permissions: Set(root, ) 14/10/03 11:27:16 INFO Slf4jLogger: Slf4jLogger started 14/10/03 11:27:16 INFO Remoting: Starting remoting 14/10/03 11:27:16 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkdri...@1-1-1-1-1.rev.mydomain.io:49256] 14/10/03 11:27:16 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkdri...@1-1-1-1-1.rev.mydomain.io:49256] 14/10/03 11:27:16 INFO Utils: Successfully started service 'sparkDriver' on port 49256. 14/10/03 11:27:16 INFO SparkEnv: Registering MapOutputTracker 14/10/03 11:27:16 INFO SparkEnv: Registering BlockManagerMaster 14/10/03 11:27:16 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20141003112716-298d 14/10/03 11:27:16 INFO Utils: Successfully started service 'Connection manager for block manager' on port 35106. 14/10/03 11:27:16 INFO ConnectionManager: Bound socket to port 35106 with id = ConnectionManagerId(1-1-1-1-1.rev.mydomain.io,35106) 14/10/03 11:27:16 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 14/10/03 11:27:16 INFO BlockManagerMaster: Trying to register BlockManager 14/10/03 11:27:16 INFO BlockManagerMasterActor: Registering block manager 1-1-1-1-1.rev.mydomain.io:35106 with 267.3 MB RAM 14/10/03 11:27:16 INFO BlockManagerMaster: Registered BlockManager 14/10/03 11:27:16 INFO HttpFileServer: HTTP File server directory is /tmp/spark-f60b0533-998f-4af2-a208-d04c571eab82 14/10/03 11:27:16 INFO HttpServer: Starting HTTP Server 14/10/03 11:27:16 INFO Utils: Successfully started service 'HTTP file server' on port 49611. 14/10/03 11:27:16 INFO Utils: Successfully started service 'SparkUI' on port 4040. 14/10/03 11:27:16 INFO SparkUI: Started SparkUI at http://1-1-1-1-1.rev.mydomain.io:4040 14/10/03 11:27:16 INFO Utils: Copying /home/downloads/spark/spark-1.1.0/./examples/src/main/python/hbase_inputformat.py to /tmp/spark-7232227a-0547-454e-9f68-805fa7b0c2f0/hbase_inputformat.py 14/10/03 11:27:16 INFO SparkContext: Added file file:/home/downloads/spark/spark-1.1.0/./examples/src/main/python/hbase_inputformat.py at http://1.1.1.1:49611/files/hbase_inputformat.py with timestamp 1412324836837 14/10/03 11:27:16 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp:// sparkdri...@1-1-1-1-1.rev.mydomain.io:49256/user/HeartbeatReceiver Traceback (most
Re: word2vec: how to save an mllib model and reload it?
Currently I see the word2vec model is collected onto the master, so the model itself is not distributed. I guess the question is why do you need a distributed model? Is the vocab size so large that it's necessary? For model serving in general, unless the model is truly massive (ie cannot fit into memory on a modern high end box with 64, or 128GB ram) then single instance is way faster and simpler (using a cluster of machines is more for load balancing / fault tolerance). What is your use case for model serving? — Sent from Mailbox On Fri, Nov 7, 2014 at 5:47 PM, Duy Huynh duy.huynh@gmail.com wrote: you're right, serialization works. what is your suggestion on saving a distributed model? so part of the model is in one cluster, and some other parts of the model are in other clusters. during runtime, these sub-models run independently in their own clusters (load, train, save). and at some point during run time these sub-models merge into the master model, which also loads, trains, and saves at the master level. much appreciated. On Fri, Nov 7, 2014 at 2:53 AM, Evan R. Sparks evan.spa...@gmail.com wrote: There's some work going on to support PMML - https://issues.apache.org/jira/browse/SPARK-1406 - but it's not yet been merged into master. What are you used to doing in other environments? In R I'm used to running save(), same with matlab. In python either pickling things or dumping to json seems pretty common. (even the scikit-learn docs recommend pickling - http://scikit-learn.org/stable/modules/model_persistence.html). These all seem basically equivalent java serialization to me.. Would some helper functions (in, say, mllib.util.modelpersistence or something) make sense to add? On Thu, Nov 6, 2014 at 11:36 PM, Duy Huynh duy.huynh@gmail.com wrote: that works. is there a better way in spark? this seems like the most common feature for any machine learning work - to be able to save your model after training it and load it later. On Fri, Nov 7, 2014 at 2:30 AM, Evan R. Sparks evan.spa...@gmail.com wrote: Plain old java serialization is one straightforward approach if you're in java/scala. On Thu, Nov 6, 2014 at 11:26 PM, ll duy.huynh@gmail.com wrote: what is the best way to save an mllib model that you just trained and reload it in the future? specifically, i'm using the mllib word2vec model... thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/word2vec-how-to-save-an-mllib-model-and-reload-it-tp18329.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: word2vec: how to save an mllib model and reload it?
For ALS if you want real time recs (and usually this is order 10s to a few 100s ms response), then Spark is not the way to go - a serving layer like Oryx, or prediction.io is what you want. (At graphflow we've built our own). You hold the factor matrices in memory and do the dot product in real time (with optional caching). Again, even for huge models (10s of millions users/items) this can be handled on a single, powerful instance. The issue at this scale is winnowing down the search space using LSH or similar approach to get to real time speeds. For word2vec it's pretty much the same thing as what you have is very similar to one of the ALS factor matrices. One problem is you can't access the wors2vec vectors as they are private val. I think this should be changed actually, so that just the word vectors could be saved and used in a serving layer. — Sent from Mailbox On Fri, Nov 7, 2014 at 7:37 PM, Evan R. Sparks evan.spa...@gmail.com wrote: There are a few examples where this is the case. Let's take ALS, where the result is a MatrixFactorizationModel, which is assumed to be big - the model consists of two matrices, one (users x k) and one (k x products). These are represented as RDDs. You can save these RDDs out to disk by doing something like model.userFeatures.saveAsObjectFile(...) and model.productFeatures.saveAsObjectFile(...) to save out to HDFS or Tachyon or S3. Then, when you want to reload you'd have to instantiate them into a class of MatrixFactorizationModel. That class is package private to MLlib right now, so you'd need to copy the logic over to a new class, but that's the basic idea. That said - using spark to serve these recommendations on a point-by-point basis might not be optimal. There's some work going on in the AMPLab to address this issue. On Fri, Nov 7, 2014 at 7:44 AM, Duy Huynh duy.huynh@gmail.com wrote: you're right, serialization works. what is your suggestion on saving a distributed model? so part of the model is in one cluster, and some other parts of the model are in other clusters. during runtime, these sub-models run independently in their own clusters (load, train, save). and at some point during run time these sub-models merge into the master model, which also loads, trains, and saves at the master level. much appreciated. On Fri, Nov 7, 2014 at 2:53 AM, Evan R. Sparks evan.spa...@gmail.com wrote: There's some work going on to support PMML - https://issues.apache.org/jira/browse/SPARK-1406 - but it's not yet been merged into master. What are you used to doing in other environments? In R I'm used to running save(), same with matlab. In python either pickling things or dumping to json seems pretty common. (even the scikit-learn docs recommend pickling - http://scikit-learn.org/stable/modules/model_persistence.html). These all seem basically equivalent java serialization to me.. Would some helper functions (in, say, mllib.util.modelpersistence or something) make sense to add? On Thu, Nov 6, 2014 at 11:36 PM, Duy Huynh duy.huynh@gmail.com wrote: that works. is there a better way in spark? this seems like the most common feature for any machine learning work - to be able to save your model after training it and load it later. On Fri, Nov 7, 2014 at 2:30 AM, Evan R. Sparks evan.spa...@gmail.com wrote: Plain old java serialization is one straightforward approach if you're in java/scala. On Thu, Nov 6, 2014 at 11:26 PM, ll duy.huynh@gmail.com wrote: what is the best way to save an mllib model that you just trained and reload it in the future? specifically, i'm using the mllib word2vec model... thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/word2vec-how-to-save-an-mllib-model-and-reload-it-tp18329.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: pyspark get column family and qualifier names from hbase table
Feel free to add that converter as an option in the Spark examples via a PR :) — Sent from Mailbox On Wed, Nov 12, 2014 at 3:27 AM, alaa contact.a...@gmail.com wrote: Hey freedafeng, I'm exactly where you are. I want the output to show the rowkey and all column qualifiers that correspond to it. How did you write HBaseResultToStringConverter to do what you wanted it to do? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-get-column-family-and-qualifier-names-from-hbase-table-tp18613p18650.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RMSE in MovieLensALS increases or stays stable as iterations increase.
copying user group - I keep replying directly vs reply all :) On Wed, Nov 26, 2014 at 2:03 PM, Nick Pentreath nick.pentre...@gmail.com wrote: ALS will be guaranteed to decrease the squared error (therefore RMSE) in each iteration, on the *training* set. This does not hold for the *test* set / cross validation. You would expect the test set RMSE to stabilise as iterations increase, since the algorithm converges - but not necessarily to decrease. On Wed, Nov 26, 2014 at 1:57 PM, Kostas Kloudas kklou...@gmail.com wrote: Hi all, I am getting familiarized with Mllib and a thing I noticed is that running the MovieLensALS example on the movieLens dataset for increasing number of iterations does not decrease the rmse. The results for 0.6% training set and 0.4% test are below. For training set to 0.8%, the results are almost identical. Shouldn’t it be normal to see a decreasing error? Especially going from 1 to 5 iterations. Running 1 iterations Test RMSE for 1 iter. = 1.2452964343277886 (52.75712592704 s). Running 5 iterations Test RMSE for 5 iter. = 1.3258973764470259 (61.183927666 s). Running 9 iterations Test RMSE for 9 iter. = 1.3260308117704385 (61.8494887581 s). Running 13 iterations Test RMSE for 13 iter. = 1.3260310099809915 (73.799510125 s). Running 17 iterations Test RMSE for 17 iter. = 1.3260310102735398 (77.5651218531 s). Running 21 iterations Test RMSE for 21 iter. = 1.3260310102739703 (79.607495074 s). Running 25 iterations Test RMSE for 25 iter. = 1.326031010273971 (88.631776301 s). Running 29 iterations Test RMSE for 29 iter. = 1.3260310102739712 (101.178383079 s). Thanks a lot, Kostas - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: locality sensitive hashing for spark
Looks interesting thanks for sharing. Does it support cosine similarity ? I only saw jaccard mentioned from a quick glance. — Sent from Mailbox On Mon, Dec 22, 2014 at 4:12 AM, morr0723 michael.d@gmail.com wrote: I've pushed out an implementation of locality sensitive hashing for spark. LSH has a number of use cases, most prominent being if the features are not based in Euclidean space. Code, documentation, and small exemplar dataset is available on github: https://github.com/mrsqueeze/spark-hash Feel free to pass along any comments or issues. Enjoy! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/locality-sensitive-hashing-for-spark-tp20803.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SaveAsTextFile to S3 bucket
Your output folder specifies rdd.saveAsTextFile(s3n://nexgen-software/dev/output); So it will try to write to /dev/output which is as expected. If you create the directory /dev/output upfront in your bucket, and try to save it to that (empty) directory, what is the behaviour? On Tue, Jan 27, 2015 at 6:21 AM, Chen, Kevin kevin.c...@neustar.biz wrote: Does anyone know if I can save a RDD as a text file to a pre-created directory in S3 bucket? I have a directory created in S3 bucket: //nexgen-software/dev When I tried to save a RDD as text file in this directory: rdd.saveAsTextFile(s3n://nexgen-software/dev/output); I got following exception at runtime: Exception in thread main org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException: S3 HEAD request failed for '/dev' - ResponseCode=403, ResponseMessage=Forbidden I have verified /dev has write permission. However, if I grant the bucket //nexgen-software write permission, I don't get exception. But the output is not created under dev. Rather, a different /dev/output directory is created directory in the bucket (//nexgen-software). Is this how saveAsTextFile behalves in S3? Is there anyway I can have output created under a pre-defied directory. Thanks in advance.
Re: Is it possible to do incremental training using ALSModel (MLlib)?
As I recall Oryx (the old version, and I assume the new one too) provide something like this: http://cloudera.github.io/oryx/apidocs/com/cloudera/oryx/als/common/OryxRecommender.html#recommendToAnonymous-java.lang.String:A-float:A-int- though Sean will be more on top of that than me :) On Mon, Jan 5, 2015 at 2:17 PM, Wouter Samaey wouter.sam...@storefront.be wrote: One other idea was that I don’t need to re-train the model, but simply pass all the current user’s recent ratings (including one’s created after the training) to the existing model… Is this a valid option? Wouter Samaey Zaakvoerder Storefront BVBA Tel: +32 472 72 83 07 Web: http://storefront.be LinkedIn: http://www.linkedin.com/in/woutersamaey On 05 Jan 2015, at 13:13, Sean Owen so...@cloudera.com wrote: In the first instance, I'm suggesting that ALS in Spark could perhaps expose a run() method that accepts a previous MatrixFactorizationModel, and uses the product factors from it as the initial state instead. If anybody seconds that idea, I'll make a PR. The second idea is just fold-in: http://www.slideshare.net/srowen/big-practical-recommendations-with-alternating-least-squares/14 Whether you do this or something like SGD, inside or outside Spark, depends on your requirements I think. On Sat, Jan 3, 2015 at 12:04 PM, Wouter Samaey wouter.sam...@storefront.be wrote: Do you know a place where I could find a sample or tutorial for this? I'm still very new at this. And struggling a bit... Thanks in advance Wouter Sent from my iPhone. On 03 Jan 2015, at 10:36, Sean Owen so...@cloudera.com wrote: Yes, it is easy to simply start a new factorization from the current model solution. It works well. That's more like incremental *batch* rebuilding of the model. That is not in MLlib but fairly trivial to add. You can certainly 'fold in' new data to approximately update with one new datum too, which you can find online. This is not quite the same idea as streaming SGD. I'm not sure this fits the RDD model well since it entails updating one element at a time but mini batch could be reasonable. On Jan 3, 2015 5:29 AM, Peng Cheng rhw...@gmail.com wrote: I was under the impression that ALS wasn't designed for it :- The famous ebay online recommender uses SGD However, you can try using the previous model as starting point, and gradually reduce the number of iteration after the model stablize. I never verify this idea, so you need to at least cross-validate it before putting into productio On 2 January 2015 at 04:40, Wouter Samaey wouter.sam...@storefront.be wrote: Hi all, I'm curious about MLlib and if it is possible to do incremental training on the ALSModel. Usually training is run first, and then you can query. But in my case, data is collected in real-time and I want the predictions of my ALSModel to consider the latest data without complete re-training phase. I've checked out these resources, but could not find any info on how to solve this: https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html http://ampcamp.berkeley.edu/big-data-mini-course/movie-recommendation-with-mllib.html My question fits in a larger picture where I'm using Prediction IO, and this in turn is based on Spark. Thanks in advance for any advice! Wouter -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-do-incremental-training-using-ALSModel-MLlib-tp20942.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Did DataFrames break basic SQLContext?
To answer your first question - yes 1.3.0 did break backward compatibility for the change from SchemaRDD - DataFrame. SparkSQL was an alpha component so api breaking changes could happen. It is no longer an alpha component as of 1.3.0 so this will not be the case in future. Adding toDF should hopefully not be too much of an effort. For the second point - I also have seen these exceptions when upgrading jobs to 1.3.0 - but they don't fail my jobs. Not sure what the cause is would be good to understand this. — Sent from Mailbox On Wed, Mar 18, 2015 at 5:22 PM, Justin Pihony justin.pih...@gmail.com wrote: I started to play with 1.3.0 and found that there are a lot of breaking changes. Previously, I could do the following: case class Foo(x: Int) val rdd = sc.parallelize(List(Foo(1))) import sqlContext._ rdd.registerTempTable(foo) Now, I am not able to directly use my RDD object and have it implicitly become a DataFrame. It can be used as a DataFrameHolder, of which I could write: rdd.toDF.registerTempTable(foo) But, that is kind of a pain in comparison. The other problem for me is that I keep getting a SQLException: java.sql.SQLException: Failed to start database 'metastore_db' with class loader sun.misc.Launcher$AppClassLoader@10393e97, see the next exception for details. This seems to be a dependency on Hive, when previously (1.2.0) there was no such dependency. I can open tickets for these, but wanted to ask here firstmaybe I am doing something wrong? Thanks, Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Did-DataFrames-break-basic-SQLContext-tp22120.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Iterative Algorithms with Spark Streaming
MLlib supports streaming linear models: http://spark.apache.org/docs/latest/mllib-linear-methods.html#streaming-linear-regression and k-means: http://spark.apache.org/docs/latest/mllib-clustering.html#k-means With an iteration parameter of 1, this amounts to mini-batch SGD where the mini-batch is the Spark Streaming batch. On Mon, Mar 16, 2015 at 2:57 PM, Alex Minnaar aminn...@verticalscope.com wrote: I wanted to ask a basic question about the types of algorithms that are possible to apply to a DStream with Spark streaming. With Spark it is possible to perform iterative computations on RDDs like in the gradient descent example val points = spark.textFile(...).map(parsePoint).cache() var w = Vector.random(D) // current separating plane for (i - 1 to ITERATIONS) { val gradient = points.map(p = (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x ).reduce(_ + _) w -= gradient } which has a global state w that is updated after each iteration and the updated value is then used in the next iteration. My question is whether this type of algorithm is possible if the points variable was a DStream instead of an RDD? It seems like you could perform the same map as above which would create a gradient DStream and also use updateStateByKey to create a DStream for the w variable. But the problem is that there doesn't seem to be a way to reuse the w DStream inside the map. I don't think that it is possible for DStreams to communicate this way. Am I correct that this is not possible with DStreams or am I missing something? Note: The reason I ask this question is that many machine learning algorithms are trained by stochastic gradient descent. sgd is similar to the above gradient descent algorithm except each iteration is on a new minibatch of data points rather than the same data points for every iteration. It seems like Spark streaming provides a natural way to stream in these minibatches (as RDDs) but if it is not able to keep track of an updating global state variable then I don't think it Spark streaming can be used for sgd. Thanks, Alex
Re: Software stack for Recommendation engine with spark mlib
As Sean says, precomputing recommendations is pretty inefficient. Though with 500k items its easy to get all the item vectors in memory so pre-computing is not too bad. Still, since you plan to serve these via a REST service anyway, computing on demand via a serving layer such as Oryx or PredictionIO (or the newly open sourced Seldon.io) is a good option. You can also cache the recommendations quite aggressively - once you compute a user or item top-K list, just stick the result in mem cache / redis / whatever and evict it when you recompute your offline model, or every hour or whatever. — Sent from Mailbox On Sun, Mar 15, 2015 at 3:03 PM, Shashidhar Rao raoshashidhar...@gmail.com wrote: Thanks Sean, your suggestions and the links provided are just what I needed to start off with. On Sun, Mar 15, 2015 at 6:16 PM, Sean Owen so...@cloudera.com wrote: I think you're assuming that you will pre-compute recommendations and store them in Mongo. That's one way to go, with certain tradeoffs. You can precompute offline easily, and serve results at large scale easily, but, you are forced to precompute everything -- lots of wasted effort, not completely up to date. The front-end part of the stack looks right. Spark would do the model building; you'd have to write a process to score recommendations and store the result. Mahout is the same thing, really. 500K items isn't all that large. Your requirements aren't driven just by items though. Number of users and latent features matter too. It matters how often you want to build the model too. I'm guessing you would get away with a handful of modern machines for a problem this size. In a way what you describe reminds me of Wibidata, since it built recommender-like solutions on top of data and results published to a NoSQL store. You might glance at the related OSS project Kiji (http://kiji.org/) for ideas about how to manage the schema. You should have a look at things like Nick's architecture for Graphflow, however it's more concerned with computing recommendation on the fly, and describes a shift from an architecture originally built around something like a NoSQL store: http://spark-summit.org/wp-content/uploads/2014/07/Using-Spark-and-Shark-to-Power-a-Realt-time-Recommendation-and-Customer-Intelligence-Platform-Nick-Pentreath.pdf This is also the kind of ground the oryx project is intended to cover, something I've worked on personally: https://github.com/OryxProject/oryx -- a layer on and around the core model building in Spark + Spark Streaming to provide a whole recommender (for example), down to the REST API. On Sun, Mar 15, 2015 at 10:45 AM, Shashidhar Rao raoshashidhar...@gmail.com wrote: Hi, Can anyone who has developed recommendation engine suggest what could be the possible software stack for such an application. I am basically new to recommendation engine , I just found out Mahout and Spark Mlib which are available . I am thinking the below software stack. 1. The user is going to use Android app. 2. Rest Api sent to app server from the android app to get recommendations. 3. Spark Mlib core engine for recommendation engine 4. MongoDB database backend. I would like to know more on the cluster configuration( how many nodes etc) part of spark for calculating the recommendations for 500,000 items. This items include products for day care etc. Other software stack suggestions would also be very useful.It has to run on multiple vendor machines. Please suggest. Thanks shashi
Re: Spark Release 1.3.0 DataFrame API
I've found people.toDF gives you a data frame (roughly equivalent to the previous Row RDD), And you can then call registerTempTable on that DataFrame. So people.toDF.registerTempTable(people) should work — Sent from Mailbox On Sat, Mar 14, 2015 at 5:33 PM, David Mitchell jdavidmitch...@gmail.com wrote: I am pleased with the release of the DataFrame API. However, I started playing with it, and neither of the two main examples in the documentation work: http://spark.apache.org/docs/1.3.0/sql-programming-guide.html Specfically: - Inferring the Schema Using Reflection - Programmatically Specifying the Schema Scala 2.11.6 Spark 1.3.0 prebuilt for Hadoop 2.4 and later *Inferring the Schema Using Reflection* scala people.registerTempTable(people) console:31: error: value registerTempTable is not a member of org.apache.spark .rdd.RDD[Person] people.registerTempTable(people) ^ *Programmatically Specifying the Schema* scala val peopleDataFrame = sqlContext.createDataFrame(people, schema) console:41: error: overloaded method value createDataFrame with alternatives: (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spar k.sql.DataFrame and (rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.Dat aFrame and (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],columns: java.util.List[String])org.apache.spark.sql.DataFrame and (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: o rg.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame and (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache .spark.sql.types.StructType)org.apache.spark.sql.DataFrame cannot be applied to (org.apache.spark.rdd.RDD[String], org.apache.spark.sql.ty pes.StructType) val df = sqlContext.createDataFrame(people, schema) Any help would be appreciated. David
Re: How do you write Dataframes to elasticsearch
Spark 1.3 is not supported by elasticsearch-hadoop yet but will be very soon: https://github.com/elastic/elasticsearch-hadoop/issues/400 However in the meantime you could use df.toRDD.saveToEs - though you may have to manipulate the Row object perhaps to extract fields, not sure if it will serialize directly to ES JSON... — Sent from Mailbox On Wed, Mar 25, 2015 at 2:07 PM, yamanoj manoj.per...@gmail.com wrote: It seems that elasticsearch-spark_2.10 currently not supporting spart 1.3. Could you tell me if there is an alternative way to save Dataframes to elasticsearch? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-you-write-Dataframes-to-elasticsearch-tp3.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: upgrade from spark 1.2.1 to 1.3 on EC2 cluster and problems
What version of Spark do the other dependencies rely on (Adam and H2O?) - that could be it Or try sbt clean compile — Sent from Mailbox On Wed, Mar 25, 2015 at 5:58 PM, roni roni.epi...@gmail.com wrote: I have a EC2 cluster created using spark version 1.2.1. And I have a SBT project . Now I want to upgrade to spark 1.3 and use the new features. Below are issues . Sorry for the long post. Appreciate your help. Thanks -Roni Question - Do I have to create a new cluster using spark 1.3? Here is what I did - In my SBT file I changed to - libraryDependencies += org.apache.spark %% spark-core % 1.3.0 But then I started getting compilation error. along with Here are some of the libraries that were evicted: [warn] * org.apache.spark:spark-core_2.10:1.2.0 - 1.3.0 [warn] * org.apache.hadoop:hadoop-client:(2.5.0-cdh5.2.0, 2.2.0) - 2.6.0 [warn] Run 'evicted' to see detailed eviction warnings constructor cannot be instantiated to expected type; [error] found : (T1, T2) [error] required: org.apache.spark.sql.catalyst.expressions.Row [error] val ty = joinRDD.map{case(word, (file1Counts, file2Counts)) = KmerIntesect(word, file1Counts,xyz)} [error] ^ Here is my SBT and code -- SBT - version := 1.0 scalaVersion := 2.10.4 resolvers += Sonatype OSS Snapshots at https://oss.sonatype.org/content/repositories/snapshots;; resolvers += Maven Repo1 at https://repo1.maven.org/maven2;; resolvers += Maven Repo at https://s3.amazonaws.com/h2o-release/h2o-dev/master/1056/maven/repo/;; /* Dependencies - %% appends Scala version to artifactId */ libraryDependencies += org.apache.hadoop % hadoop-client % 2.6.0 libraryDependencies += org.apache.spark %% spark-core % 1.3.0 libraryDependencies += org.bdgenomics.adam % adam-core % 0.16.0 libraryDependencies += ai.h2o % sparkling-water-core_2.10 % 0.2.10 CODE -- import org.apache.spark.{SparkConf, SparkContext} case class KmerIntesect(kmer: String, kCount: Int, fileName: String) object preDefKmerIntersection { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(preDefKmer-intersect) val sc = new SparkContext(sparkConf) import sqlContext.createSchemaRDD val sqlContext = new org.apache.spark.sql.SQLContext(sc) val bedFile = sc.textFile(s3n://a/b/c,40) val hgfasta = sc.textFile(hdfs://a/b/c,40) val hgPair = hgfasta.map(_.split (,)).map(a= (a(0), a(1).trim().toInt)) val filtered = hgPair.filter(kv = kv._2 == 1) val bedPair = bedFile.map(_.split (,)).map(a= (a(0), a(1).trim().toInt)) val joinRDD = bedPair.join(filtered) val ty = joinRDD.map{case(word, (file1Counts, file2Counts)) = KmerIntesect(word, file1Counts,xyz)} ty.registerTempTable(KmerIntesect) ty.saveAsParquetFile(hdfs://x/y/z/kmerIntersect.parquet) } }
Re: upgrade from spark 1.2.1 to 1.3 on EC2 cluster and problems
Ah I see now you are trying to use a spark 1.2 cluster - you will need to be running spark 1.3 on your EC2 cluster in order to run programs built against spark 1.3. You will need to terminate and restart your cluster with spark 1.3 — Sent from Mailbox On Wed, Mar 25, 2015 at 6:39 PM, roni roni.epi...@gmail.com wrote: Even if H2o and ADA are dependent on 1.2.1 , it should be backword compatible, right? So using 1.3 should not break them. And the code is not using the classes from those libs. I tried sbt clean compile .. same errror Thanks _R On Wed, Mar 25, 2015 at 9:26 AM, Nick Pentreath nick.pentre...@gmail.com wrote: What version of Spark do the other dependencies rely on (Adam and H2O?) - that could be it Or try sbt clean compile — Sent from Mailbox https://www.dropbox.com/mailbox On Wed, Mar 25, 2015 at 5:58 PM, roni roni.epi...@gmail.com wrote: I have a EC2 cluster created using spark version 1.2.1. And I have a SBT project . Now I want to upgrade to spark 1.3 and use the new features. Below are issues . Sorry for the long post. Appreciate your help. Thanks -Roni Question - Do I have to create a new cluster using spark 1.3? Here is what I did - In my SBT file I changed to - libraryDependencies += org.apache.spark %% spark-core % 1.3.0 But then I started getting compilation error. along with Here are some of the libraries that were evicted: [warn] * org.apache.spark:spark-core_2.10:1.2.0 - 1.3.0 [warn] * org.apache.hadoop:hadoop-client:(2.5.0-cdh5.2.0, 2.2.0) - 2.6.0 [warn] Run 'evicted' to see detailed eviction warnings constructor cannot be instantiated to expected type; [error] found : (T1, T2) [error] required: org.apache.spark.sql.catalyst.expressions.Row [error] val ty = joinRDD.map{case(word, (file1Counts, file2Counts)) = KmerIntesect(word, file1Counts,xyz)} [error] ^ Here is my SBT and code -- SBT - version := 1.0 scalaVersion := 2.10.4 resolvers += Sonatype OSS Snapshots at https://oss.sonatype.org/content/repositories/snapshots;; resolvers += Maven Repo1 at https://repo1.maven.org/maven2;; resolvers += Maven Repo at https://s3.amazonaws.com/h2o-release/h2o-dev/master/1056/maven/repo/;; /* Dependencies - %% appends Scala version to artifactId */ libraryDependencies += org.apache.hadoop % hadoop-client % 2.6.0 libraryDependencies += org.apache.spark %% spark-core % 1.3.0 libraryDependencies += org.bdgenomics.adam % adam-core % 0.16.0 libraryDependencies += ai.h2o % sparkling-water-core_2.10 % 0.2.10 CODE -- import org.apache.spark.{SparkConf, SparkContext} case class KmerIntesect(kmer: String, kCount: Int, fileName: String) object preDefKmerIntersection { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(preDefKmer-intersect) val sc = new SparkContext(sparkConf) import sqlContext.createSchemaRDD val sqlContext = new org.apache.spark.sql.SQLContext(sc) val bedFile = sc.textFile(s3n://a/b/c,40) val hgfasta = sc.textFile(hdfs://a/b/c,40) val hgPair = hgfasta.map(_.split (,)).map(a= (a(0), a(1).trim().toInt)) val filtered = hgPair.filter(kv = kv._2 == 1) val bedPair = bedFile.map(_.split (,)).map(a= (a(0), a(1).trim().toInt)) val joinRDD = bedPair.join(filtered) val ty = joinRDD.map{case(word, (file1Counts, file2Counts)) = KmerIntesect(word, file1Counts,xyz)} ty.registerTempTable(KmerIntesect) ty.saveAsParquetFile(hdfs://x/y/z/kmerIntersect.parquet) } }
Re: iPython Notebook + Spark + Accumulo -- best practice?
I'm guessing the Accumulo Key and Value classes are not serializable, so you would need to do something like val rdd = sc.newAPIHadoopRDD(...).map { case (key, value) = (extractScalaType(key), extractScalaType(value)) } Where 'extractScalaType converts the key or Value to a standard Scala type or case class or whatever - basically extracts the data from the Key or Value in a form usable in Scala — Sent from Mailbox On Thu, Mar 26, 2015 at 8:59 PM, Russ Weeks rwe...@newbrightidea.com wrote: Hi, David, This is the code that I use to create a JavaPairRDD from an Accumulo table: JavaSparkContext sc = new JavaSparkContext(conf); Job hadoopJob = Job.getInstance(conf,TestSparkJob); job.setInputFormatClass(AccumuloInputFormat.class); AccumuloInputFormat.setZooKeeperInstance(job, conf.get(ZOOKEEPER_INSTANCE_NAME, conf.get(ZOOKEEPER_HOSTS) ); AccumuloInputFormat.setConnectorInfo(job, conf.get(ACCUMULO_AGILE_USERNAME), new PasswordToken(conf.get(ACCUMULO_AGILE_PASSWORD)) ); AccumuloInputFormat.setInputTableName(job, conf.get(ACCUMULO_TABLE_NAME)); AccumuloInputFormat.setScanAuthorizations(job, auths); JavaPairRDDKey, Value values = sc.newAPIHadoopRDD(hadoopJob.getConfiguration(), AccumuloInputFormat.class, Key.class, Value.class); Key.class and Value.class are from org.apache.accumulo.core.data. I use a WholeRowIterator so that the Value is actually an encoded representation of an entire logical row; it's a useful convenience if you can be sure that your rows always fit in memory. I haven't tested it since Spark 1.0.1 but I doubt anything important has changed. Regards, -Russ On Thu, Mar 26, 2015 at 11:41 AM, David Holiday dav...@annaisystems.com wrote: * progress!* i was able to figure out why the 'input INFO not set' error was occurring. the eagle-eyed among you will no doubt see the following code is missing a closing '(' AbstractInputFormat.setConnectorInfo(jobConf, root, new PasswordToken(password) as I'm doing this in spark-notebook, I'd been clicking the execute button and moving on because I wasn't seeing an error. what I forgot was that notebook is going to do what spark-shell will do when you leave off a closing ')' -- *it will wait forever for you to add it*. so the error was the result of the 'setConnectorInfo' method never getting executed. unfortunately, I'm still unable to shove the accumulo table data into an RDD that's useable to me. when I execute rddX.count I get back res15: Long = 1 which is the correct response - there are 10,000 rows of data in the table I pointed to. however, when I try to grab the first element of data thusly: rddX.first I get the following error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.accumulo.core.data.Key any thoughts on where to go from here? DAVID HOLIDAY Software Engineer 760 607 3300 | Office 312 758 8385 | Mobile dav...@annaisystems.com broo...@annaisystems.com www.AnnaiSystems.com On Mar 26, 2015, at 8:35 AM, David Holiday dav...@annaisystems.com wrote: hi Nick Unfortunately the Accumulo docs are woefully inadequate, and in some places, flat wrong. I'm not sure if this is a case where the docs are 'flat wrong', or if there's some wrinke with spark-notebook in the mix that's messing everything up. I've been working with some people on stack overflow on this same issue (including one of the people from the spark-notebook team): http://stackoverflow.com/questions/29244530/how-do-i-create-a-spark-rdd-from-accumulo-1-6-in-spark-notebook?noredirect=1#comment46755938_29244530 if you click the link you can see the entire thread of code, responses from notebook, etc. I'm going to try invoking the same techniques both from within a stand-alone scala problem and from the shell itself to see if I can get some traction. I'll report back when I have more data. cheers (and thx!) DAVID HOLIDAY Software Engineer 760 607 3300 | Office 312 758 8385 | Mobile dav...@annaisystems.com broo...@annaisystems.com GetFileAttachment.jpg www.AnnaiSystems.com http://www.annaisystems.com/ On Mar 25, 2015, at 11:43 PM, Nick Pentreath nick.pentre...@gmail.com wrote: From a quick look at this link - http://accumulo.apache.org/1.6/accumulo_user_manual.html#_mapreduce - it seems you need to call some static methods on AccumuloInputFormat in order to set the auth, table, and range settings. Try setting these config options first and then call newAPIHadoopRDD? On Thu, Mar 26, 2015 at 2:34 AM, David Holiday dav...@annaisystems.com wrote: hi Irfan, thanks for getting back to me - i'll try the accumulo list to be sure. what is the normal use case for spark though? I'm surprised that hooking it into something as common and popular as accumulo isn't more of an every-day task. DAVID HOLIDAY Software Engineer 760 607 3300
Re: hadoop input/output format advanced control
You can indeed override the Hadoop configuration at a per-RDD level - though it is a little more verbose, as in the below example, and you need to effectively make a copy of the hadoop Configuration: val thisRDDConf = new Configuration(sc.hadoopConfiguration) thisRDDConf.set(mapred.min.split.size, 5) val rdd = sc.newAPIHadoopFile(path, classOf[SequenceFileInputFormat[IntWritable, Text]], classOf[IntWritable], classOf[Text], thisRDDConf ) println(rdd.partitions.size) val rdd2 = sc.newAPIHadoopFile(path, classOf[SequenceFileInputFormat[IntWritable, Text]], classOf[IntWritable], classOf[Text] ) println(rdd2.partitions.size) For example, if I run the above on the following directory (some files I have lying around): -rw-r--r-- 1 Nick staff 0B Jul 11 2014 _SUCCESS -rw-r--r-- 1 Nick staff 291M Sep 16 2014 part-0 -rw-r--r-- 1 Nick staff 227M Sep 16 2014 part-1 -rw-r--r-- 1 Nick staff 370M Sep 16 2014 part-2 -rw-r--r-- 1 Nick staff 244M Sep 16 2014 part-3 -rw-r--r-- 1 Nick staff 240M Sep 16 2014 part-4 I get output: 15/03/24 20:43:12 INFO FileInputFormat: Total input paths to process : 5 *5* ... and then for the second RDD: 15/03/24 20:43:12 INFO SparkContext: Created broadcast 1 from newAPIHadoopFile at TestHash.scala:41 *45* As expected. Though a more succinct way of passing in those conf options would be nice - but this should get you what you need. On Mon, Mar 23, 2015 at 10:36 PM, Koert Kuipers ko...@tresata.com wrote: currently its pretty hard to control the Hadoop Input/Output formats used in Spark. The conventions seems to be to add extra parameters to all methods and then somewhere deep inside the code (for example in PairRDDFunctions.saveAsHadoopFile) all these parameters get translated into settings on the Hadoop Configuration object. for example for compression i see codec: Option[Class[_ : CompressionCodec]] = None added to a bunch of methods. how scalable is this solution really? for example i need to read from a hadoop dataset and i dont want the input (part) files to get split up. the way to do this is to set mapred.min.split.size. now i dont want to set this at the level of the SparkContext (which can be done), since i dont want it to apply to input formats in general. i want it to apply to just this one specific input dataset i need to read. which leaves me with no options currently. i could go add yet another input parameter to all the methods (SparkContext.textFile, SparkContext.hadoopFile, SparkContext.objectFile, etc.). but that seems ineffective. why can we not expose a Map[String, String] or some other generic way to manipulate settings for hadoop input/output formats? it would require adding one more parameter to all methods to deal with hadoop input/output formats, but after that its done. one parameter to rule them all then i could do: val x = sc.textFile(/some/path, formatSettings = Map(mapred.min.split.size - 12345)) or rdd.saveAsTextFile(/some/path, formatSettings = Map(mapred.output.compress - true, mapred.output.compression.codec - somecodec))
Re: iPython Notebook + Spark + Accumulo -- best practice?
From a quick look at this link - http://accumulo.apache.org/1.6/accumulo_user_manual.html#_mapreduce - it seems you need to call some static methods on AccumuloInputFormat in order to set the auth, table, and range settings. Try setting these config options first and then call newAPIHadoopRDD? On Thu, Mar 26, 2015 at 2:34 AM, David Holiday dav...@annaisystems.com wrote: hi Irfan, thanks for getting back to me - i'll try the accumulo list to be sure. what is the normal use case for spark though? I'm surprised that hooking it into something as common and popular as accumulo isn't more of an every-day task. DAVID HOLIDAY Software Engineer 760 607 3300 | Office 312 758 8385 | Mobile dav...@annaisystems.com broo...@annaisystems.com www.AnnaiSystems.com On Mar 25, 2015, at 5:27 PM, Irfan Ahmad ir...@cloudphysics.com wrote: Hmmm this seems very accumulo-specific, doesn't it? Not sure how to help with that. *Irfan Ahmad* CTO | Co-Founder | *CloudPhysics* http://www.cloudphysics.com/ Best of VMworld Finalist Best Cloud Management Award NetworkWorld 10 Startups to Watch EMA Most Notable Vendor On Tue, Mar 24, 2015 at 4:09 PM, David Holiday dav...@annaisystems.com wrote: hi all, got a vagrant image with spark notebook, spark, accumulo, and hadoop all running. from notebook I can manually create a scanner and pull test data from a table I created using one of the accumulo examples: val instanceNameS = accumuloval zooServersS = localhost:2181val instance: Instance = new ZooKeeperInstance(instanceNameS, zooServersS)val connector: Connector = instance.getConnector( root, new PasswordToken(password))val auths = new Authorizations(exampleVis)val scanner = connector.createScanner(batchtest1, auths) scanner.setRange(new Range(row_00, row_10)) for(entry: Entry[Key, Value] - scanner) { println(entry.getKey + is + entry.getValue)} will give the first ten rows of table data. when I try to create the RDD thusly: val rdd2 = sparkContext.newAPIHadoopRDD ( new Configuration(), classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat], classOf[org.apache.accumulo.core.data.Key], classOf[org.apache.accumulo.core.data.Value] ) I get an RDD returned to me that I can't do much with due to the following error: java.io.IOException: Input info has not been set. at org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630) at org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343) at org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1367) at org.apache.spark.rdd.RDD.count(RDD.scala:927) which totally makes sense in light of the fact that I haven't specified any parameters as to which table to connect with, what the auths are, etc. so my question is: what do I need to do from here to get those first ten rows of table data into my RDD? DAVID HOLIDAY Software Engineer 760 607 3300 | Office 312 758 8385 | Mobile dav...@annaisystems.com broo...@annaisystems.com GetFileAttachment.jpg www.AnnaiSystems.com http://www.annaisystems.com/ On Mar 19, 2015, at 11:25 AM, David Holiday dav...@annaisystems.com wrote: kk - I'll put something together and get back to you with more :-) DAVID HOLIDAY Software Engineer 760 607 3300 | Office 312 758 8385 | Mobile dav...@annaisystems.com broo...@annaisystems.com GetFileAttachment.jpg www.AnnaiSystems.com http://www.annaisystems.com/ On Mar 19, 2015, at 10:59 AM, Irfan Ahmad ir...@cloudphysics.com wrote: Once you setup spark-notebook, it'll handle the submits for interactive work. Non-interactive is not handled by it. For that spark-kernel could be used. Give it a shot ... it only takes 5 minutes to get it running in local-mode. *Irfan Ahmad* CTO | Co-Founder | *CloudPhysics* http://www.cloudphysics.com/ Best of VMworld Finalist Best Cloud Management Award NetworkWorld 10 Startups to Watch EMA Most Notable Vendor On Thu, Mar 19, 2015 at 9:51 AM, David Holiday dav...@annaisystems.com wrote: hi all - thx for the alacritous replies! so regarding how to get things from notebook to spark and back, am I correct that spark-submit is the way to go? DAVID HOLIDAY Software Engineer 760 607 3300 | Office 312 758 8385 | Mobile dav...@annaisystems.com broo...@annaisystems.com GetFileAttachment.jpg www.AnnaiSystems.com http://www.annaisystems.com/ On Mar 19, 2015, at
Re: StackOverflow Problem with 1.3 mllib ALS
Fair enough but I'd say you hit that diminishing return after 20 iterations or so... :) On Thu, Apr 2, 2015 at 9:39 AM, Justin Yip yipjus...@gmail.com wrote: Thanks Xiangrui, I used 80 iterations to demonstrates the marginal diminishing return in prediction quality :) Justin On Apr 2, 2015 00:16, Xiangrui Meng men...@gmail.com wrote: I think before 1.3 you also get stackoverflow problem in ~35 iterations. In 1.3.x, please use setCheckpointInterval to solve this problem, which is available in the current master and 1.3.1 (to be released soon). Btw, do you find 80 iterations are needed for convergence? -Xiangrui On Wed, Apr 1, 2015 at 11:54 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I have been using Mllib's ALS in 1.2 and it works quite well. I have just upgraded to 1.3 and I encountered stackoverflow problem. After some digging, I realized that when the iteration ~35, I will get overflow problem. However, I can get at least 80 iterations with ALS in 1.2. Is there any change to the ALS algorithm? And are there any ways to achieve more iterations? Thanks. Justin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RE: ElasticSearch for Spark times out
Is your ES cluster reachable from your Spark cluster via network / firewall? Can you run the same query from the spark master and slave nodes via curl / one of the other clients? Seems odd that GC issues would be a problem from the scan but not when running query from a browser plugin... Sounds like it could be a network issue. — Sent from Mailbox On Thu, Apr 23, 2015 at 5:11 AM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Hi, If you get ES response back in 1-5 seconds that's pretty slow. Are these ES aggregation queries? Costin may be right about GC possibly causing timeouts. SPM http://sematext.com/spm/ can give you all Spark and all key Elasticsearch metrics, including various JVM metrics. If the problem is GC, you'll see it. If you monitor both Spark side and ES side, you should be able to find some correlation with SPM. Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Wed, Apr 22, 2015 at 5:43 PM, Costin Leau costin.l...@gmail.com wrote: Hi, First off, for Elasticsearch questions is worth pinging the Elastic mailing list as that is closer monitored than this one. Back to your question, Jeetendra is right that the exception indicates nodata is flowing back to the es-connector and Spark. The default is 1m [1] which should be more than enough for a typical scenario. As a side note the scroll size is 50 per tasks (so 150 suggests 3 tasks). Once the query is made, scrolling the document is fast - likely there's something else at hand that causes the connection to timeout. In such cases, you can enable logging on the REST package and see what type of data transfer occurs between ES and Spark. Do note that if a GC occurs, that can freeze Elastic (or Spark) which might trigger the timeout. Consider monitoring Elasticsearch during the query and see whether anything jumps - in particular the memory pressure. Hope this helps, [1] http://www.elastic.co/guide/en/elasticsearch/hadoop/master/configuration.html#_network On 4/22/15 10:44 PM, Adrian Mocanu wrote: Hi Thanks for the help. My ES is up. Out of curiosity, do you know what the timeout value is? There are probably other things happening to cause the timeout; I don’t think my ES is that slow but it’s possible that ES is taking too long to find the data. What I see happening is that it uses scroll to get the data from ES; about 150 items at a time.Usual delay when I perform the same query from a browser plugin ranges from 1-5sec. Thanks *From:*Jeetendra Gangele [mailto:gangele...@gmail.com] *Sent:* April 22, 2015 3:09 PM *To:* Adrian Mocanu *Cc:* u...@spark.incubator.apache.org *Subject:* Re: ElasticSearch for Spark times out Basically ready timeout means hat no data arrived within the specified receive timeout period. Few thing I would suggest 1.are your ES cluster Up and running? 2. if 1 is yes then reduce the size of the Index make it few kbps and then test? On 23 April 2015 at 00:19, Adrian Mocanu amoc...@verticalscope.com mailto:amoc...@verticalscope.com wrote: Hi I use the ElasticSearch package for Spark and very often it times out reading data from ES into an RDD. How can I keep the connection alive (why doesn’t it? Bug?) Here’s the exception I get: org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.net.SocketTimeoutException: Read timed out at org.elasticsearch.hadoop.serialization.json.JacksonJsonParser.nextToken(JacksonJsonParser.java:86) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ParsingUtils.doSeekToken(ParsingUtils.java:70) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ParsingUtils.seek(ParsingUtils.java:58) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:149) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:102) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:81) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:314) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46) ~[elasticsearch-hadoop-2.1.0.Beta3.jar:2.1.0.Beta3] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
Re: MLlib -Collaborative Filtering
You will have to get the two user factor vectors from the ALS model and compute the cosine similarity between them. You can do this using Breeze vectors: import breeze.linalg._ val user1 = new DenseVector[Double](userFactors.lookup(user1).head) val user2 = new DenseVector[Double](userFactors.lookup(user2).head) val sim = user1.t * user2 / (norm(user1)* norm(user2)) There is no built-in way currently to compute user or item similarities, though there is a PR working on it: https://github.com/apache/spark/pull/3536 On Sun, Apr 19, 2015 at 7:29 PM, Christian S. Perone christian.per...@gmail.com wrote: The easiest way to do that is to use a similarity metric between the different user factors. On Sat, Apr 18, 2015 at 7:49 AM, riginos samarasrigi...@gmail.com wrote: Is there any way that i can see the similarity table of 2 users in that algorithm? by that i mean the similarity between 2 users -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Collaborative-Filtering-tp22553.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Blog http://blog.christianperone.com | Github https://github.com/perone | Twitter https://twitter.com/tarantulae Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big joke on me.
Re: solr in spark
I haven't used Solr for a long time, and haven't used Solr in Spark. However, why do you say Elasticsearch is not a good option ...? ES absolutely supports full-text search and not just filtering and grouping (in fact it's original purpose was and still is text search, though filtering, grouping and aggregation are heavily used). http://www.elastic.co/guide/en/elasticsearch/guide/master/full-text-search.html On Tue, Apr 28, 2015 at 6:27 PM, Jeetendra Gangele gangele...@gmail.com wrote: Does anyone tried using solr inside spark? below is the project describing it. https://github.com/LucidWorks/spark-solr. I have a requirement in which I want to index 20 millions companies name and then search as and when new data comes in. the output should be list of companies matching the query. Spark has inbuilt elastic search but for this purpose Elastic search is not a good option since this is totally text search problem? Elastic search is good for filtering and grouping. Does any body used solr inside spark? Regards jeetendra
Re: solr in spark
Depends on your use case and search volume. Typically you'd have a dedicated ES cluster if your app is doing a lot of real time indexing and search. If it's only for spark integration then you could colocate ES and spark — Sent from Mailbox On Tue, Apr 28, 2015 at 6:41 PM, Jeetendra Gangele gangele...@gmail.com wrote: Thanks for reply. Elastic search index will be within my Cluster? or I need the separate host the elastic search? On 28 April 2015 at 22:03, Nick Pentreath nick.pentre...@gmail.com wrote: I haven't used Solr for a long time, and haven't used Solr in Spark. However, why do you say Elasticsearch is not a good option ...? ES absolutely supports full-text search and not just filtering and grouping (in fact it's original purpose was and still is text search, though filtering, grouping and aggregation are heavily used). http://www.elastic.co/guide/en/elasticsearch/guide/master/full-text-search.html On Tue, Apr 28, 2015 at 6:27 PM, Jeetendra Gangele gangele...@gmail.com wrote: Does anyone tried using solr inside spark? below is the project describing it. https://github.com/LucidWorks/spark-solr. I have a requirement in which I want to index 20 millions companies name and then search as and when new data comes in. the output should be list of companies matching the query. Spark has inbuilt elastic search but for this purpose Elastic search is not a good option since this is totally text search problem? Elastic search is good for filtering and grouping. Does any body used solr inside spark? Regards jeetendra
Re: Content based filtering
Content based filtering is a pretty broad term - do you have any particular approach in mind? MLLib does not have any purely content-based methods. Your main alternative is ALS collaborative filtering. However, using a system like Oryx / PredictionIO / elasticsearch etc you can combine factor-based collaborative filtering with content-based pre- and post-filtering steps (eg filter recommendations by geolocation, price, category and so on). — Sent from Mailbox On Tue, May 12, 2015 at 1:45 PM, Yasemin Kaya godo...@gmail.com wrote: Hi, is Content based filtering available for Spark in Mllib? If it isn't , what can I use as an alternative? Thank you. Have a nice day yasemin -- hiç ender hiç
Re: Passing Elastic Search Mappings in Spark Conf
If you want to specify mapping you must first create the mappings for your index types before indexing. As far as I know there is no way to specify this via ES-hadoop. But it's best practice to explicitly create mappings prior to indexing, or to use index templates when dynamically creating indexes. — Sent from Mailbox On Thu, Apr 16, 2015 at 1:14 AM, Deepak Subhramanian deepak.subhraman...@gmail.com wrote: Hi, Is there a way to pass the mapping to define a field as not analyzed with es-spark settings. I am just wondering if I can set the mapping type for a field as not analyzed using the set function in spark conf as similar to the other es settings. val sconf = new SparkConf() .setMaster(local[1]) .setAppName(Load Data To ES) .set(spark.ui.port, 4141) .set(es.index.auto.create, true) .set(es.net.http.auth.user, es_admin) .set(es.index.auto.create, true) .set(es.mapping.names, CREATED_DATE:@timestamp) Thanks, Deepak Subhramanian - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: When querying ElasticSearch, score is 0
ES-hadoop uses a scan scroll search to efficiently retrieve large result sets. Scores are not tracked in a scan and sorting is not supported hence 0 scores. http://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html#scroll-scan — Sent from Mailbox On Thu, Apr 16, 2015 at 10:46 PM, Andrejs Abele andrejs.ab...@insight-centre.org wrote: Hi, I have data in my ElasticSearch server, when I query it using rest interface, I get results and score for each result, but when I run the same query in spark using ElasticSearch API, I get results and meta data, but the score is shown 0 for each record. My configuration is ... val conf = new SparkConf() .setMaster(local[6]) .setAppName(DBpedia to ElasticSearch) .set(es.index.auto.create, true) .set(es.field.read.empty.as.null,true) .set(es.read.metadata,true) ... val sc = new SparkContext(conf) val test= Map(query-{\n\query\:{\n \fuzzy_like_this\ : {\n \fields\ : [\label\],\n \like_text\ : \102nd Ohio Infantry\ }\n } \n}) val mYRDD = sc.esRDD(dbpedia/docs,test.get(query).get) Sample output: Map(id - http://dbpedia.org/resource/Alert,_Ohio;, label - Alert, Ohio, category - Unincorporated communities in Ohio, abstract - Alert is an unincorporated community in southern Morgan Township, Butler County, Ohio, in the United States. It is located about ten miles southwest of Hamilton on Howards Creek, a tributary of the Great Miami River in section 28 of R1ET3N of the Congress Lands. It is three miles west of Shandon and two miles south of Okeana., _metadata - Map(_index - dbpedia, _type - docs, _id - AUy5aQs7895C6HE5GmG4, _score - 0.0)) As you can see _score is 0. Would appreciate any help, Cheers, Andrejs
Re: MLlib -Collaborative Filtering
What do you mean by similarity table of 2 users? Do you mean the similarity between 2 users? — Sent from Mailbox On Sat, Apr 18, 2015 at 11:09 AM, riginos samarasrigi...@gmail.com wrote: Is there any way that i can see the similarity table of 2 users in that algorithm? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Collaborative-Filtering-tp22552.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Difference between textFile Vs hadoopFile (textInoutFormat) on HDFS data
There is no difference - textFile calls hadoopFile with a TextInputFormat, and maps each value to a String. — Sent from Mailbox On Tue, Apr 7, 2015 at 1:46 PM, Puneet Kumar Ojha puneet.ku...@pubmatic.com wrote: Hi , Is there any difference between Difference between textFile Vs hadoopFile (textInoutFormat) when data is present in HDFS? Will there be any performance gain that can be observed? Puneet Kumar Ojha Data Architect | PubMatichttp://www.pubmatic.com/
Re: Migrating from Spark 0.8.0 to Spark 1.3.0
It shouldn't be too bad - pertinent changes migration notes are here: http://spark.apache.org/docs/1.0.0/programming-guide.html#migrating-from-pre-10-versions-of-spark for pre-1.0 and here: http://spark.apache.org/docs/latest/sql-programming-guide.html#upgrading-from-spark-sql-10-12-to-13 for SparkSQL pre-1.3 Since you aren't using SparkSQL the 2nd link is probably not useful. Generally you should find very few changes in the core API but things like MLlib would have changed a fair bit - though again the API should have been relatively stable. Your biggest change is probably going to be running jobs through spark-submit rather than spark-class etc: http://spark.apache.org/docs/latest/submitting-applications.html — Sent from Mailbox On Sat, Apr 4, 2015 at 1:11 AM, Ritesh Kumar Singh riteshoneinamill...@gmail.com wrote: Hi, Are there any tutorials that explains all the changelogs between Spark 0.8.0 and Spark 1.3.0 and how can we approach this issue.
Re: Spark and Google Cloud Storage
I believe it is available here: https://cloud.google.com/hadoop/google-cloud-storage-connector 2015-06-18 15:31 GMT+02:00 Klaus Schaefers klaus.schaef...@ligatus.com: Hi, is there a kind adapter to use GoogleCloudStorage with Spark? Cheers, Klaus -- -- Klaus Schaefers Senior Optimization Manager Ligatus GmbH Hohenstaufenring 30-32 D-50674 Köln Tel.: +49 (0) 221 / 56939 -784 Fax: +49 (0) 221 / 56 939 - 599 E-Mail: klaus.schaef...@ligatus.com Web: www.ligatus.de HRB Köln 56003 Geschäftsführung: Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann, Dipl.-Wirtschaftsingenieur Arne Wolter
Re: Spark Titan
Something like this works (or at least worked with titan 0.4 back when I was using it): val graph = sc.newAPIHadoopRDD( configuration, fClass = classOf[TitanHBaseInputFormat], kClass = classOf[NullWritable], vClass = classOf[FaunusVertex]) graph.flatMap { vertex = val edges = vertex.getEdges(Direction.OUT).filter(e = e.getLabel == ...) edges.map { edge = (...) } } Note that FaunusVertex is not Serializable so you'll need to extract the properties (or say a JSON representation) of your vertices in the first map or flatMap operation (or extract your edges and properties). On Sun, Jun 21, 2015 at 6:57 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Have a look at http://s3.thinkaurelius.com/docs/titan/0.5.0/titan-io-format.html You could use those Input/Output formats with newAPIHadoopRDD api call. Thanks Best Regards On Sun, Jun 21, 2015 at 8:50 PM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi, How to connect TItan database from Spark? Any out of the box api's available? Regards, Rajesh
Re: Velox Model Server
Ok My view is with only 100k items, you are better off serving in-memory for items vectors. i.e. store all item vectors in memory, and compute user * item score on-demand. In most applications only a small proportion of users are active, so really you don't need all 10m user vectors in memory. They could be looked up from a K-V store and have an LRU cache in memory for say 1m of those. Optionally also update them as feedback comes in. As far as I can see, this is pretty much what velox does except it partitions all user vectors across nodes to scale. Oryx does almost the same but Oryx1 kept all user and item vectors in memory (though I am not sure about whether Oryx2 still stores all user and item vectors in memory or partitions in some way). Deb, we are using a custom Akka-based model server (with Scalatra frontend). It is more focused on many small models in-memory (largest of these is around 5m user vectors, 100k item vectors, with factor size 20-50). We use Akka cluster sharding to allow scale-out across nodes if required. We have a few hundred models comfortably powered by m3.xlarge AWS instances. Using floats you could probably have all of your factors in memory on one 64GB machine (depending on how many models you have). Our solution is not that generic and a little hacked-together - but I'd be happy to chat offline about sharing what we've done. I think it still has a basic client to the Spark JobServer which would allow triggering re-computation jobs periodically. We currently just run batch re-computation and reload factors from S3 periodically. We then use Elasticsearch to post-filter results and blend content-based stuff - which I think might be more efficient than SparkSQL for this particular purpose. On Wed, Jun 24, 2015 at 8:59 AM, Debasish Das debasish.da...@gmail.com wrote: Model sizes are 10m x rank, 100k x rank range. For recommendation/topic modeling I can run batch recommendAll and then keep serving the model using a distributed cache but then I can't incorporate per user model re-predict if user feedback is making the current topk stale. I have to wait for next batch refresh which might be 1 hr away. spark job server + spark sql can get me fresh updates but each time running a predict might be slow. I am guessing the better idea might be to start with batch recommendAll and then update the per user model if it get stale but that needs acess to the key value store and the model over a API like spark job server. I am running experiments with job server. In general it will be nice if my key value store and model are both managed by same akka based API. Yes sparksql is to filter/boost recommendation results using business logic like user demography for example.. On Jun 23, 2015 2:07 AM, Sean Owen so...@cloudera.com wrote: Yes, and typically needs are 100ms. Now imagine even 10 concurrent requests. My experience has been that this approach won't nearly scale. The best you could probably do is async mini-batch near-real-time scoring, pushing results to some store for retrieval, which could be entirely suitable for your use case. On Tue, Jun 23, 2015 at 8:52 AM, Nick Pentreath nick.pentre...@gmail.com wrote: If your recommendation needs are real-time (1s) I am not sure job server and computing the refs with spark will do the trick (though those new BLAS-based methods may have given sufficient speed up).
Re: Matrix Multiplication and mllib.recommendation
Yup, numpy calls into BLAS for matrix multiply. Sent from my iPad On 18 Jun 2015, at 8:54 PM, Ayman Farahat ayman.fara...@yahoo.com wrote: Thanks all for the help. It turned out that using the bumpy matrix multiplication made a huge difference in performance. I suspect that Numpy already uses BLAS optimized code. Here is Python code #This is where i load and directly test the predictions myModel = MatrixFactorizationModel.load(sc, FlurryModelPath) m1 = myModel.productFeatures().sample(False, 1.00) m2 = m1.map(lambda (user,feature) : feature).collect() m3 = matrix(m2).transpose() pf = sc.broadcast(m3) uf = myModel.userFeatures() f1 = uf.map(lambda (userID, features): (userID, squeeze(asarray(matrix(array(features)) * pf.value dog = f1.count() On Jun 18, 2015, at 8:42 AM, Debasish Das debasish.da...@gmail.com wrote: Also in my experiments, it's much faster to blocked BLAS through cartesian rather than doing sc.union. Here are the details on the experiments: https://issues.apache.org/jira/browse/SPARK-4823 On Thu, Jun 18, 2015 at 8:40 AM, Debasish Das debasish.da...@gmail.com wrote: Also not sure how threading helps here because Spark puts a partition to each core. On each core may be there are multiple threads if you are using intel hyperthreading but I will let Spark handle the threading. On Thu, Jun 18, 2015 at 8:38 AM, Debasish Das debasish.da...@gmail.com wrote: We added SPARK-3066 for this. In 1.4 you should get the code to do BLAS dgemm based calculation. On Thu, Jun 18, 2015 at 8:20 AM, Ayman Farahat ayman.fara...@yahoo.com.invalid wrote: Thanks Sabarish and Nick Would you happen to have some code snippets that you can share. Best Ayman On Jun 17, 2015, at 10:35 PM, Sabarish Sasidharan sabarish.sasidha...@manthan.com wrote: Nick is right. I too have implemented this way and it works just fine. In my case, there can be even more products. You simply broadcast blocks of products to userFeatures.mapPartitions() and BLAS multiply in there to get recommendations. In my case 10K products form one block. Note that you would then have to union your recommendations. And if there lots of product blocks, you might also want to checkpoint once every few times. Regards Sab On Thu, Jun 18, 2015 at 10:43 AM, Nick Pentreath nick.pentre...@gmail.com wrote: One issue is that you broadcast the product vectors and then do a dot product one-by-one with the user vector. You should try forming a matrix of the item vectors and doing the dot product as a matrix-vector multiply which will make things a lot faster. Another optimisation that is avalailable on 1.4 is a recommendProducts method that blockifies the factors to make use of level 3 BLAS (ie matrix-matrix multiply). I am not sure if this is available in The Python api yet. But you can do a version yourself by using mapPartitions over user factors, blocking the factors into sub-matrices and doing matrix multiply with item factor matrix to get scores on a block-by-block basis. Also as Ilya says more parallelism can help. I don't think it's so necessary to do LSH with 30,000 items. — Sent from Mailbox On Thu, Jun 18, 2015 at 6:01 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Actually talk about this exact thing in a blog post here http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/. Keep in mind, you're actually doing a ton of math. Even with proper caching and use of broadcast variables this will take a while defending on the size of your cluster. To get real results you may want to look into locality sensitive hashing to limit your search space and definitely look into spinning up multiple threads to process your product features in parallel to increase resource utilization on the cluster. Thank you, Ilya Ganelin -Original Message- From: afarahat [ayman.fara...@yahoo.com] Sent: Wednesday, June 17, 2015 11:16 PM Eastern Standard Time To: user@spark.apache.org Subject: Matrix Multiplication and mllib.recommendation Hello; I am trying to get predictions after running the ALS model. The model works fine. In the prediction/recommendation , I have about 30 ,000 products and 90 Millions users. When i try the predict all it fails. I have been trying to formulate the problem as a Matrix multiplication where I first get the product features, broadcast them and then do a dot product. Its still very slow. Any reason why here is a sample code def doMultiply(x): a = [] #multiply by mylen = len(pf.value) for i in range(mylen) : myprod = numpy.dot(x,pf.value[i][1]) a.append(myprod) return a myModel = MatrixFactorizationModel.load(sc, FlurryModelPath) #I need to select which products to broadcast but lets try all m1
Re: Velox Model Server
How large are your models? Spark job server does allow synchronous job execution and with a warm long-lived context it will be quite fast - but still in the order of a second or a few seconds usually (depending on model size - for very large models possibly quite a lot more than that). What are your use cases for SQL during recommendation? Filtering? If your recommendation needs are real-time (1s) I am not sure job server and computing the refs with spark will do the trick (though those new BLAS-based methods may have given sufficient speed up). — Sent from Mailbox On Mon, Jun 22, 2015 at 11:17 PM, Debasish Das debasish.da...@gmail.com wrote: Models that I am looking for are mostly factorization based models (which includes both recommendation and topic modeling use-cases). For recommendation models, I need a combination of Spark SQL and ml model prediction api...I think spark job server is what I am looking for and it has fast http rest backend through spray which will scale fine through akka. Out of curiosity why netty? What model are you serving? Velox doesn't look like it is optimized for cases like ALS recs, if that's what you mean. I think scoring ALS at scale in real time takes a fairly different approach. The servlet engine probably doesn't matter at all in comparison. On Sat, Jun 20, 2015, 9:40 PM Debasish Das debasish.da...@gmail.com wrote: After getting used to Scala, writing Java is too much work :-) I am looking for scala based project that's using netty at its core (spray is one example). prediction.io is an option but that also looks quite complicated and not using all the ML features that got added in 1.3/1.4 Velox built on top of ML / Keystone ML pipeline API and that's useful but it is still using javax servlets which is not netty based. On Sat, Jun 20, 2015 at 10:25 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Oops, that link was for Oryx 1. Here's the repo for Oryx 2: https://github.com/OryxProject/oryx On Sat, Jun 20, 2015 at 10:20 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Debasish, The Oryx project (https://github.com/cloudera/oryx), which is Apache 2 licensed, contains a model server that can serve models built with MLlib. -Sandy On Sat, Jun 20, 2015 at 8:00 AM, Charles Earl charles.ce...@gmail.com wrote: Is velox NOT open source? On Saturday, June 20, 2015, Debasish Das debasish.da...@gmail.com wrote: Hi, The demo of end-to-end ML pipeline including the model server component at Spark Summit was really cool. I was wondering if the Model Server component is based upon Velox or it uses a completely different architecture. https://github.com/amplab/velox-modelserver We are looking for an open source version of model server to build upon. Thanks. Deb -- - Charles
Re: Velox Model Server
Is there a presentation up about this end-to-end example? I'm looking into velox now - our internal model pipeline just saves factors to S3 and model server loads them periodically from S3 — Sent from Mailbox On Sat, Jun 20, 2015 at 9:46 PM, Debasish Das debasish.da...@gmail.com wrote: Integration of model server with ML pipeline API. On Sat, Jun 20, 2015 at 12:25 PM, Donald Szeto don...@prediction.io wrote: Mind if I ask what 1.3/1.4 ML features that you are looking for? On Saturday, June 20, 2015, Debasish Das debasish.da...@gmail.com wrote: After getting used to Scala, writing Java is too much work :-) I am looking for scala based project that's using netty at its core (spray is one example). prediction.io is an option but that also looks quite complicated and not using all the ML features that got added in 1.3/1.4 Velox built on top of ML / Keystone ML pipeline API and that's useful but it is still using javax servlets which is not netty based. On Sat, Jun 20, 2015 at 10:25 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Oops, that link was for Oryx 1. Here's the repo for Oryx 2: https://github.com/OryxProject/oryx On Sat, Jun 20, 2015 at 10:20 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Debasish, The Oryx project (https://github.com/cloudera/oryx), which is Apache 2 licensed, contains a model server that can serve models built with MLlib. -Sandy On Sat, Jun 20, 2015 at 8:00 AM, Charles Earl charles.ce...@gmail.com wrote: Is velox NOT open source? On Saturday, June 20, 2015, Debasish Das debasish.da...@gmail.com wrote: Hi, The demo of end-to-end ML pipeline including the model server component at Spark Summit was really cool. I was wondering if the Model Server component is based upon Velox or it uses a completely different architecture. https://github.com/amplab/velox-modelserver We are looking for an open source version of model server to build upon. Thanks. Deb -- - Charles -- Donald Szeto PredictionIO
RE: Matrix Multiplication and mllib.recommendation
One issue is that you broadcast the product vectors and then do a dot product one-by-one with the user vector. You should try forming a matrix of the item vectors and doing the dot product as a matrix-vector multiply which will make things a lot faster. Another optimisation that is avalailable on 1.4 is a recommendProducts method that blockifies the factors to make use of level 3 BLAS (ie matrix-matrix multiply). I am not sure if this is available in The Python api yet. But you can do a version yourself by using mapPartitions over user factors, blocking the factors into sub-matrices and doing matrix multiply with item factor matrix to get scores on a block-by-block basis. Also as Ilya says more parallelism can help. I don't think it's so necessary to do LSH with 30,000 items. — Sent from Mailbox On Thu, Jun 18, 2015 at 6:01 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Actually talk about this exact thing in a blog post here http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/. Keep in mind, you're actually doing a ton of math. Even with proper caching and use of broadcast variables this will take a while defending on the size of your cluster. To get real results you may want to look into locality sensitive hashing to limit your search space and definitely look into spinning up multiple threads to process your product features in parallel to increase resource utilization on the cluster. Thank you, Ilya Ganelin -Original Message- From: afarahat [ayman.fara...@yahoo.commailto:ayman.fara...@yahoo.com] Sent: Wednesday, June 17, 2015 11:16 PM Eastern Standard Time To: user@spark.apache.org Subject: Matrix Multiplication and mllib.recommendation Hello; I am trying to get predictions after running the ALS model. The model works fine. In the prediction/recommendation , I have about 30 ,000 products and 90 Millions users. When i try the predict all it fails. I have been trying to formulate the problem as a Matrix multiplication where I first get the product features, broadcast them and then do a dot product. Its still very slow. Any reason why here is a sample code def doMultiply(x): a = [] #multiply by mylen = len(pf.value) for i in range(mylen) : myprod = numpy.dot(x,pf.value[i][1]) a.append(myprod) return a myModel = MatrixFactorizationModel.load(sc, FlurryModelPath) #I need to select which products to broadcast but lets try all m1 = myModel.productFeatures().sample(False, 0.001) pf = sc.broadcast(m1.collect()) uf = myModel.userFeatures() f1 = uf.map(lambda x : (x[0], doMultiply(x[1]))) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Matrix-Multiplication-and-mllib-recommendation-tp23384.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: ALS predictALL not completing
So to be clear, you're trying to use the recommendProducts method of MatrixFactorizationModel? I don't see predictAll in 1.3.1 1.4.0 has a more efficient method to recommend products for all users (or vice versa): https://github.com/apache/spark/blob/v1.4.0/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala#L152 On Tue, Jun 16, 2015 at 4:30 PM, Ayman Farahat ayman.fara...@yahoo.com wrote: This is 1.3.1 Ayman Farahat -- View my research on my SSRN Author page: http://ssrn.com/author=1594571 -- *From:* Nick Pentreath nick.pentre...@gmail.com *To:* user@spark.apache.org user@spark.apache.org *Sent:* Tuesday, June 16, 2015 4:23 AM *Subject:* Re: ALS predictALL not completing Which version of Spark are you using? On Tue, Jun 16, 2015 at 6:20 AM, afarahat ayman.fara...@yahoo.com wrote: Hello; I have a data set of about 80 Million users and 12,000 items (very sparse ). I can get the training part working no problem. (model has 20 factors), However, when i try using Predict all for 80 Million x 10 items , the jib does not complete. When i use a smaller data set say 500k or a million it completes. Any ideas suggestions ? Thanks Ayman -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ALS-predictALL-not-completing-tp23327.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ALS predictALL not completing
Which version of Spark are you using? On Tue, Jun 16, 2015 at 6:20 AM, afarahat ayman.fara...@yahoo.com wrote: Hello; I have a data set of about 80 Million users and 12,000 items (very sparse ). I can get the training part working no problem. (model has 20 factors), However, when i try using Predict all for 80 Million x 10 items , the jib does not complete. When i use a smaller data set say 500k or a million it completes. Any ideas suggestions ? Thanks Ayman -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ALS-predictALL-not-completing-tp23327.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark job workflow engine recommendations
I also tend to agree that Azkaban is somehqat easier to get set up. Though I haven't used the new UI for Oozie that is part of CDH, so perhaps that is another good option. It's a pity Azkaban is a little rough in terms of documenting its API, and the scalability is an issue. However it would be possible to have a few different instances running for different use cases / groups within the org perhaps — Sent from Mailbox On Wed, Aug 12, 2015 at 12:14 AM, Vikram Kone vikramk...@gmail.com wrote: Hi LarsThanks for the brain dump. All the points you made about target audience, degree of high availability and time based scheduling instead of event based scheduling are all valid and make sense.In our case, most of your Devs are .net based and so xml or web based scheduling is preferred over something written in Java/Scalia/Python. Based on my research so far on the available workflow managers today, azkaban is the most easier to adopt since it doesn't have any hard dependence on Hadoop and is easy to onboard and schedule jobs. I was able to install and execute some spark workflows in a day. Though the fact that it's being phased out in linkedin is troubling , I think it's the best suited for our use case today. Sent from Outlook On Sun, Aug 9, 2015 at 4:51 PM -0700, Lars Albertsson lars.alberts...@gmail.com wrote: I used to maintain Luigi at Spotify, and got some insight in workflow manager characteristics and production behaviour in the process. I am evaluating options for my current employer, and the short list is basically: Luigi, Azkaban, Pinball, Airflow, and rolling our own. The latter is not necessarily more work than adapting an existing tool, since existing managers are typically more or less tied to the technology used by the company that created them. Are your users primarily developers building pipelines that drive data-intensive products, or are they analysts, producing business intelligence? These groups tend to have preferences for different types of tools and interfaces. I have a love/hate relationship with Luigi, but given your requirements, it is probably the best fit: * It has support for Spark, and it seems to be used and maintained. * It has no builtin support for Cassandra, but Cassandra is heavily used at Spotify. IIRC, the code required to support Cassandra targets is more or less trivial. There is no obvious single definition of a dataset in C*, so you'll have to come up with a convention and encode it as a Target subclass. I guess that is why it never made it outside Spotify. * The open source community is active and it is well tested in production at multiple sites. * It is easy to write dependencies, but in a Python DSL. If your users are developers, this is preferable over XML or a web interface. There are always quirks and odd constraints somewhere that require the expressive power of a programming language. It also allows you to create extensions without changing Luigi itself. * It does not have recurring scheduling bulitin. Luigi needs a motor to get going, typically cron, installed on a few machines for redundancy. In a typical pipeline scenario, you give output datasets a time parameter, which arranges for a dataset to be produced each hour/day/week/month. * It supports failure notifications. Pinball and Airflow have similar architecture to Luigi, with a single central scheduler and workers that submit and execute jobs. They seem to be more solidly engineered at a glance, but less battle tested outside Pinterest/Airbnb, and they have fewer integrations to the data ecosystem. Azkaban has a different architecture and user interface, and seems more geared towards data scientists than developers; it has a good UI for controlling jobs, but writing extensions and controlling it programmatically seems more difficult than for Luigi. All of the tools above are centralised, and the central component can become a bottleneck and a single point of problem. I am not aware of any decentralised open source workflow managers, but you can run multiple instances and shard manually. Regarding recurring jobs, it is typically undesirable to blindly run jobs at a certain time. If you run jobs, e.g. with cron, and process whatever data is available in your input sources, your jobs become indeterministic and unreliable. If incoming data is late or missing, your jobs will fail or create artificial skews in output data, leading to confusing results. Moreover, if jobs fail or have bugs, it will be difficult to rerun them and get predictable results. This is why I don't think Chronos is a meaningful alternative for scheduling data processing. There are different strategies on this topic, but IMHO, it is easiest create predictable and reliable pipelines by bucketing incoming data into datasets that you seal off, and mark ready for processing, and then use the workflow manager's DAG logic to process data when input
Re: Is there any tool that i can prove to customer that spark is faster then hive ?
Perhaps you could time the end-to-end runtime for each pipeline, and each stage? Through Id be fairly confidant that Spark will outperform hive/mahout on MR, that's not he only consideration - having everything on a single platform and the Spark / data frame API is a huge win just by itself — Sent from Mailbox On Wed, Aug 12, 2015 at 1:45 PM, Ladle ladle.pa...@tcs.com wrote: Hi , I have build the the machine learning features and model using Apache spark. And the same features i have i build using hive,java and used mahout to run model. Now how can i show to customer that Apache Spark is more faster then hive. Is there any tool that shows the time ? Regards, Ladle -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-any-tool-that-i-can-prove-to-customer-that-spark-is-faster-then-hive-tp24224.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD[Future[T]] = Future[RDD[T]]
In this case, each partition will block until the futures in that partition are completed. If you are in the end collecting all the Futures to the driver, what is the reasoning behind using an RDD? You could just use a bunch of Futures directly. If you want to do some processing on the results of the futures, then I'd say you would need to block in each partition until the Futures' results are completed, as I'm not at all sure whether Futures would be composable across stage / task boundaries. On Mon, Jul 27, 2015 at 9:33 AM, Ayoub benali.ayoub.i...@gmail.com wrote: do you mean something like this ? val values = rdd.mapPartitions{ i: Iterator[Future[T]] = val future: Future[Iterator[T]] = Future sequence i Await result (future, someTimeout) } Where is the blocking happening in this case? It seems to me that all the workers will be blocked until the future is completed, no ? 2015-07-27 7:24 GMT+02:00 Nick Pentreath [hidden email] http:///user/SendEmail.jtp?type=nodenode=24005i=0: You could use Iterator.single on the future[iterator]. However if you collect all the partitions I'm not sure if it will work across executor boundaries. Perhaps you may need to await the sequence of futures in each partition and return the resulting iterator. — Sent from Mailbox https://www.dropbox.com/mailbox On Sun, Jul 26, 2015 at 10:43 PM, Ayoub Benali [hidden email] http:///user/SendEmail.jtp?type=nodenode=24005i=1 wrote: It doesn't work because mapPartitions expects a function f:(Iterator[T]) ⇒ Iterator[U] while .sequence wraps the iterator in a Future 2015-07-26 22:25 GMT+02:00 Ignacio Blasco [hidden email] http:///user/SendEmail.jtp?type=nodenode=24005i=2: Maybe using mapPartitions and .sequence inside it? El 26/7/2015 10:22 p. m., Ayoub [hidden email] http:///user/SendEmail.jtp?type=nodenode=24005i=3 escribió: Hello, I am trying to convert the result I get after doing some async IO : val rdd: RDD[T] = // some rdd val result: RDD[Future[T]] = rdd.map(httpCall) Is there a way collect all futures once they are completed in a *non blocking* (i.e. without scala.concurrent Await) and lazy way? If the RDD was a standard scala collection then calling scala.concurrent.Future.sequence would have resolved the issue but RDD is not a TraversableOnce (which is required by the method). Is there a way to do this kind of transformation with an RDD[Future[T]] ? Thanks, Ayoub. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Future-T-Future-RDD-T-tp24000.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: [hidden email] http:///user/SendEmail.jtp?type=nodenode=24005i=4 For additional commands, e-mail: [hidden email] http:///user/SendEmail.jtp?type=nodenode=24005i=5 -- View this message in context: Re: RDD[Future[T]] = Future[RDD[T]] http://apache-spark-user-list.1001560.n3.nabble.com/Re-RDD-Future-T-Future-RDD-T-tp24005.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Velox Model Server
Honestly I don't believe this kind of functionality belongs within spark-jobserver. For serving of factor-type models, you are typically in the realm of recommendations or ad-serving scenarios - i.e. needing to score a user / context against many possible items and return a top-k list of those. In addition, filtering and search comes into play heavily - e.g. filter recommendations by item category, by geo-location, by stock-level status, by price / profit levels, by promoted / blocked content, etc etc. And the requirements are typically real-time (i.e. a few hundred ms at the most). So I think there are too many specialist requirements vs spark-jobserver. In terms of general approach, your options are to: (a) score first to get a list recs, and then filter / re-rank / apply queries to further winnow that down. This typically means returning the top L K recs from the scoring, so that you have enough left after filtering. (b) score and filter in the same step (or at least using the same engine). Scoring is really the easiest part - modulo dealing with massive item-sets which can be dealt with by (i) LSH / approx. nearest neighbour approaches and/or (ii) partitioning / parallelization approaches. We currently use approach (a) but are looking into (b) also. I think the best idea is to pick one of the existing frameworks (whether Oryx, Velox, PredictionIO, SeldonIO etc) that best suits your requirements, and build around that. Or build something new of your own if you want to use Akka. I went with Scalatra because it is what our API layer is built with, and I find the routing DSL much nicer vs Spray. Both have good Akka integration. I don't think the front-end matters that much (Finatra is another option). If you want Akka then maybe Spray / Akka HTTP is the best way to go. Our current model server as I mentioned is very basic and a bit hacked together, but it may have some useful ideas or serve as a starting point if there is interest. On Wed, Jun 24, 2015 at 5:46 PM, Debasish Das debasish.da...@gmail.com wrote: Thanks Nick, Sean for the great suggestions... Since you guys have already hit these issues before I think it will be great if we can add the learning to Spark Job Server and enhance it for community. Nick, do you see any major issues in using Spray over Scalatra ? Looks like Model Server API layer needs access to a performant KV store (Redis/Memcached), Elastisearch (we used Solr before for item-item serving but I liked the Spark-Elastisearch integration, REST is Netty based unlike Solr's Jetty and YARN client looks more stable and so it is worthwhile to see if it improves over Solr based serving) and ML Models (which are moving towards Spark SQL style in 1.3/1.4 with the introduction of Pipeline API) An initial version of KV store might be simple LRU cache. For KV store are there any comparisons available with IndexedRDD and Redis/Memcached ? Velox is using CoreOS EtcdClient (which is Go based) but I am not sure if it is used as a full fledged distributed cache or not. May be it is being used as zookeeper alternative. On Wed, Jun 24, 2015 at 2:02 AM, Nick Pentreath nick.pentre...@gmail.com wrote: Ok My view is with only 100k items, you are better off serving in-memory for items vectors. i.e. store all item vectors in memory, and compute user * item score on-demand. In most applications only a small proportion of users are active, so really you don't need all 10m user vectors in memory. They could be looked up from a K-V store and have an LRU cache in memory for say 1m of those. Optionally also update them as feedback comes in. As far as I can see, this is pretty much what velox does except it partitions all user vectors across nodes to scale. Oryx does almost the same but Oryx1 kept all user and item vectors in memory (though I am not sure about whether Oryx2 still stores all user and item vectors in memory or partitions in some way). Deb, we are using a custom Akka-based model server (with Scalatra frontend). It is more focused on many small models in-memory (largest of these is around 5m user vectors, 100k item vectors, with factor size 20-50). We use Akka cluster sharding to allow scale-out across nodes if required. We have a few hundred models comfortably powered by m3.xlarge AWS instances. Using floats you could probably have all of your factors in memory on one 64GB machine (depending on how many models you have). Our solution is not that generic and a little hacked-together - but I'd be happy to chat offline about sharing what we've done. I think it still has a basic client to the Spark JobServer which would allow triggering re-computation jobs periodically. We currently just run batch re-computation and reload factors from S3 periodically. We then use Elasticsearch to post-filter results and blend content-based stuff - which I think might be more efficient than SparkSQL for this particular purpose. On Wed, Jun 24
Re: thought experiment: use spark ML to real time prediction
Yup, currently PMML export, or Java serialization, are the options realistically available. Though PMML may deter some, there are not many viable cross-platform alternatives (with nearly as much coverage). On Thu, Nov 12, 2015 at 1:42 PM, Sean Owenwrote: > This is all starting to sound a lot like what's already implemented in > Java-based PMML parsing/scoring libraries like JPMML and OpenScoring. I'm > not clear it helps a lot to reimplement this in Spark. > > On Thu, Nov 12, 2015 at 8:05 AM, Felix Cheung > wrote: > >> +1 on that. It would be useful to use the model outside of Spark. >> >> >> _ >> From: DB Tsai >> Sent: Wednesday, November 11, 2015 11:57 PM >> Subject: Re: thought experiment: use spark ML to real time prediction >> To: Nirmal Fernando >> Cc: Andy Davidson , Adrian Tanase < >> atan...@adobe.com>, user @spark >> >> >> >> Do you think it will be useful to separate those models and model >> loader/writer code into another spark-ml-common jar without any spark >> platform dependencies so users can load the models trained by Spark ML in >> their application and run the prediction? >> >> >> Sincerely, >> >> DB Tsai >> -- >> Web: https://www.dbtsai.com >> PGP Key ID: 0xAF08DF8D >> >> On Wed, Nov 11, 2015 at 3:14 AM, Nirmal Fernando >> wrote: >> >>> As of now, we are basically serializing the ML model and then >>> deserialize it for prediction at real time. >>> >>> On Wed, Nov 11, 2015 at 4:39 PM, Adrian Tanase >>> wrote: >>> I don’t think this answers your question but here’s how you would evaluate the model in realtime in a streaming app https://databricks.gitbooks.io/databricks-spark-reference-applications/content/twitter_classifier/predict.html Maybe you can find a way to extract portions of MLLib and run them outside of spark – loading the precomputed model and calling .predict on it… -adrian From: Andy Davidson Date: Tuesday, November 10, 2015 at 11:31 PM To: "user @spark" Subject: thought experiment: use spark ML to real time prediction Lets say I have use spark ML to train a linear model. I know I can save and load the model to disk. I am not sure how I can use the model in a real time environment. For example I do not think I can return a “prediction” to the client using spark streaming easily. Also for some applications the extra latency created by the batch process might not be acceptable. If I was not using spark I would re-implement the model I trained in my batch environment in a lang like Java and implement a rest service that uses the model to create a prediction and return the prediction to the client. Many models make predictions using linear algebra. Implementing predictions is relatively easy if you have a good vectorized LA package. Is there a way to use a model I trained using spark ML outside of spark? As a motivating example, even if its possible to return data to the client using spark streaming. I think the mini batch latency would not be acceptable for a high frequency stock trading system. Kind regards Andy P.s. The examples I have seen so far use spark streaming to “preprocess” predictions. For example a recommender system might use what current users are watching to calculate “trending recommendations”. These are stored on disk and served up to users when the use the “movie guide”. If a recommendation was a couple of min. old it would not effect the end users experience. >>> >>> >>> -- >>> >>> Thanks & regards, >>> Nirmal >>> >>> Team Lead - WSO2 Machine Learner >>> Associate Technical Lead - Data Technologies Team, WSO2 Inc. >>> Mobile: +94715779733 >>> Blog: http://nirmalfdo.blogspot.com/ >>> >>> >>> >> >> >> >
Re: DynamoDB Connector?
See this thread for some info: http://apache-spark-user-list.1001560.n3.nabble.com/DynamoDB-input-source-td8814.html I don't think the situation has changed that much - if you're using Spark on EMR, then I think the InputFormat is available in a JAR (though I haven't tested that). Otherwise you'll need to try to get the JAR and see if you can get it to work outside of EMR. I'm afraid this thread ( https://forums.aws.amazon.com/thread.jspa?threadID=168506) does not appear encouraging, even for using Spark on EMR to read from DynamoDB using the InputFormat! It's a pity AWS doesn't open source the InputFormat. On Mon, Nov 16, 2015 at 5:00 AM, Charles Cobbwrote: > Hi, > > What is the best practice for reading from DynamoDB from Spark? I know I > can use the Java API, but this doesn't seem to take data locality into > consideration at all. > > I was looking for something along the lines of the cassandra connector: > https://github.com/datastax/spark-cassandra-connector > > Thanks, > CJ > >
Re: Machine learning with spark (book code example error)
Hi there. I'm the author of the book (thanks for buying it by the way :) Ideally if you're having any trouble with the book or code, it's best to contact the publisher and submit a query ( https://www.packtpub.com/books/content/support/17400) However, I can help with this issue. The problem is that the "testLabels" code needs to be indented over multiple lines: val testPath = "/PATH/20news-bydate-test/*" val testRDD = sc.wholeTextFiles(testPath) val testLabels = testRDD.map { case (file, text) => val topic = file.split("/").takeRight(2).head newsgroupsMap(topic) } As it is in the sample code attached. If you copy the whole indented block (or line by line) into the console, it should work - I've tested all the sample code again and indeed it works for me. Hope this helps Nick On Tue, Oct 13, 2015 at 8:31 PM, Zsombor Egyedwrote: > Hi! > > I was reading the ML with spark book, and I was very interested about the > 9. chapter (text mining), so I tried code examples. > > Everything was fine, but in this line: > > val testLabels = testRDD.map { > > case (file, text) => val topic = file.split("/").takeRight(2).head > > newsgroupsMap(topic) } > > I got an error: "value newsgroupsMap is not a member of String" > > Other relevant part of the code: > val path = "/PATH/20news-bydate-train/*" > val rdd = sc.wholeTextFiles(path) > val newsgroups = rdd.map { case (file, text) => > file.split("/").takeRight(2).head } > > val tf = hashingTF.transform(tokens) > val idf = new IDF().fit(tf) > val tfidf = idf.transform(tf) > > val newsgroupsMap = newsgroups.distinct.collect().zipWithIndex.toMap > val zipped = newsgroups.zip(tfidf) > val train = zipped.map { case (topic, vector) > =>LabeledPoint(newsgroupsMap(topic), vector) } > train.cache > > val model = NaiveBayes.train(train, lambda = 0.1) > > val testPath = "/PATH//20news-bydate-test/*" > val testRDD = sc.wholeTextFiles(testPath) > val testLabels = testRDD.map { case (file, text) => val topic = > file.split("/").takeRight(2).head newsgroupsMap(topic) } > > I attached the whole program code. > Can anyone help, what the problem is? > > Regards, > Zsombor > > > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org >
Re: How to specify the numFeatures in HashingTF
Setting the numfeatures higher than vocab size will tend to reduce the chance of hash collisions, but it's not strictly necessary - it becomes a memory / accuracy trade off. Surprisingly, the impact on model performance of moderate hash collisions is often not significant. So it may be worth trying a few settings out (lower than vocab, higher etc) and see what the impact is on evaluation metrics. — Sent from Mailbox On Thu, Oct 15, 2015 at 5:46 PM, Jianguo Liwrote: > Hi, > There is a parameter in the HashingTF called "numFeatures". I was wondering > what is the best way to set the value to this parameter. In the use case of > text categorization, do you need to know in advance the number of words in > your vocabulary? or do you set it to be a large value, greater than the > number of words in your vocabulary? > Thanks, > Jianguo
Re: Spark job workflow engine recommendations
We're also using Azkaban for scheduling, and we simply use spark-submit via she'll scripts. It works fine. The auto retry feature with a large number of retries (like 100 or 1000 perhaps) should take care of long-running jobs with restarts on failure. We haven't used it for streaming yet though we have long-running jobs and Azkaban won't kill them unless an SLA is in place. — Sent from Mailbox On Wed, Oct 7, 2015 at 7:18 PM, Vikram Kone <vikramk...@gmail.com> wrote: > Hien, > I saw this pull request and from what I understand this is geared towards > running spark jobs over hadoop. We are using spark over cassandra and not > sure if this new jobtype supports that. I haven't seen any documentation in > regards to how to use this spark job plugin, so that I can test it out on > our cluster. > We are currently submitting our spark jobs using command job type using the > following command "dse spark-submit --class com.org.classname ./test.jar" > etc. What would be the advantage of using the native spark job type over > command job type? > I didn't understand from your reply if azkaban already supports long > running jobs like spark streaming..does it? streaming jobs generally need > to be running indefinitely or forever and needs to be restarted if for some > reason they fail (lack of resources may be..). I can probably use the auto > retry feature for this, but not sure > I'm looking forward to the multiple executor support which will greatly > enhance the scalability issue. > On Wed, Oct 7, 2015 at 9:56 AM, Hien Luu <h...@linkedin.com> wrote: >> The spark job type was added recently - see this pull request >> https://github.com/azkaban/azkaban-plugins/pull/195. You can leverage >> the SLA feature to kill a job if it ran longer than expected. >> >> BTW, we just solved the scalability issue by supporting multiple >> executors. Within a week or two, the code for that should be merged in the >> main trunk. >> >> Hien >> >> On Tue, Oct 6, 2015 at 9:40 PM, Vikram Kone <vikramk...@gmail.com> wrote: >> >>> Does Azkaban support scheduling long running jobs like spark steaming >>> jobs? Will Azkaban kill a job if it's running for a long time. >>> >>> >>> On Friday, August 7, 2015, Vikram Kone <vikramk...@gmail.com> wrote: >>> >>>> Hien, >>>> Is Azkaban being phased out at linkedin as rumored? If so, what's >>>> linkedin going to use for workflow scheduling? Is there something else >>>> that's going to replace Azkaban? >>>> >>>> On Fri, Aug 7, 2015 at 11:25 AM, Ted Yu <yuzhih...@gmail.com> wrote: >>>> >>>>> In my opinion, choosing some particular project among its peers should >>>>> leave enough room for future growth (which may come faster than you >>>>> initially think). >>>>> >>>>> Cheers >>>>> >>>>> On Fri, Aug 7, 2015 at 11:23 AM, Hien Luu <h...@linkedin.com> wrote: >>>>> >>>>>> Scalability is a known issue due the the current architecture. >>>>>> However this will be applicable if you run more 20K jobs per day. >>>>>> >>>>>> On Fri, Aug 7, 2015 at 10:30 AM, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>> >>>>>>> From what I heard (an ex-coworker who is Oozie committer), Azkaban >>>>>>> is being phased out at LinkedIn because of scalability issues (though >>>>>>> UI-wise, Azkaban seems better). >>>>>>> >>>>>>> Vikram: >>>>>>> I suggest you do more research in related projects (maybe using their >>>>>>> mailing lists). >>>>>>> >>>>>>> Disclaimer: I don't work for LinkedIn. >>>>>>> >>>>>>> On Fri, Aug 7, 2015 at 10:12 AM, Nick Pentreath < >>>>>>> nick.pentre...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Vikram, >>>>>>>> >>>>>>>> We use Azkaban (2.5.0) in our production workflow scheduling. We >>>>>>>> just use local mode deployment and it is fairly easy to set up. It is >>>>>>>> pretty easy to use and has a nice scheduling and logging interface, as >>>>>>>> well >>>>>>>> as SLAs (like kill job and notify if it doesn't complete in 3 hours or >>>>>>>> whatever). >>>>>>>> >>
Re: thought experiment: use spark ML to real time prediction
I think the issue with pulling in all of spark-core is often with dependencies (and versions) conflicting with the web framework (or Akka in many cases). Plus it really is quite heavy if you just want a fairly lightweight model-serving app. For example we've built a fairly simple but scalable ALS factor model server on Scalatra, Akka and Breeze. So all you really need is the web framework and Breeze (or an alternative linear algebra lib). I definitely hear the pain-point that PMML might not be able to handle some types of transformations or models that exist in Spark. However, here's an example from scikit-learn -> PMML that may be instructive ( https://github.com/scikit-learn/scikit-learn/issues/1596 and https://github.com/jpmml/jpmml-sklearn), where a fairly impressive list of estimators and transformers are supported (including e.g. scaling and encoding, and PCA). I definitely think the current model I/O and "export" or "deploy to production" situation needs to be improved substantially. However, you are left with the following options: (a) build out a lightweight "spark-ml-common" project that brings in the dependencies needed for production scoring / transformation in independent apps. However, here you only support Scala/Java - what about R and Python? Also, what about the distributed models? Perhaps "local" wrappers can be created, though this may not work for very large factor or LDA models. See also H20 example http://docs.h2o.ai/h2oclassic/userguide/scorePOJO.html (b) build out Spark's PMML support, and add missing stuff to PMML where possible. The benefit here is an existing standard with various tools for scoring (via REST server, Java app, Pig, Hive, various language support). (c) build out a more comprehensive I/O, serialization and scoring framework. Here you face the issue of supporting various predictors and transformers generically, across platforms and versioning. i.e. you're re-creating a new standard like PMML Option (a) is do-able, but I'm a bit concerned that it may be too "Spark specific", or even too "Scala / Java" specific. But it is still potentially very useful to Spark users to build this out and have a somewhat standard production serving framework and/or library (there are obviously existing options like PredictionIO etc). Option (b) is really building out the existing PMML support within Spark, so a lot of the initial work has already been done. I know some folks had (or have) licensing issues with some components of JPMML (e.g. the evaluator and REST server). But perhaps the solution here is to build an Apache2-licensed evaluator framework. Option (c) is obviously interesting - "let's build a better PMML (that uses JSON or whatever instead of XML!)". But it also seems like a huge amount of reinventing the wheel, and like any new standard would take time to garner wide support (if at all). It would be really useful to start to understand what the main missing pieces are in PMML - perhaps the lowest-hanging fruit is simply to contribute improvements or additions to PMML. On Fri, Nov 13, 2015 at 11:46 AM, Sabarish Sasidharan < sabarish.sasidha...@manthan.com> wrote: > That may not be an issue if the app using the models runs by itself (not > bundled into an existing app), which may actually be the right way to > design it considering separation of concerns. > > Regards > Sab > > On Fri, Nov 13, 2015 at 9:59 AM, DB Tsaiwrote: > >> This will bring the whole dependencies of spark will may break the web >> app. >> >> >> Sincerely, >> >> DB Tsai >> -- >> Web: https://www.dbtsai.com >> PGP Key ID: 0xAF08DF8D >> >> On Thu, Nov 12, 2015 at 8:15 PM, Nirmal Fernando wrote: >> >>> >>> >>> On Fri, Nov 13, 2015 at 2:04 AM, darren wrote: >>> I agree 100%. Making the model requires large data and many cpus. Using it does not. This is a very useful side effect of ML models. If mlib can't use models outside spark that's a real shame. >>> >>> Well you can as mentioned earlier. You don't need Spark runtime for >>> predictions, save the serialized model and deserialize to use. (you need >>> the Spark Jars in the classpath though) >>> Sent from my Verizon Wireless 4G LTE smartphone Original message From: "Kothuvatiparambil, Viju" < viju.kothuvatiparam...@bankofamerica.com> Date: 11/12/2015 3:09 PM (GMT-05:00) To: DB Tsai , Sean Owen Cc: Felix Cheung , Nirmal Fernando < nir...@wso2.com>, Andy Davidson , Adrian Tanase , "user @spark" , Xiangrui Meng , hol...@pigscanfly.ca Subject: RE: thought experiment: use spark ML to real time prediction I am glad to see DB’s comments,
Re: Spark works with the data in another cluster(Elasticsearch)
While it's true locality might speed things up, I'd say it's a very bad idea to mix your Spark and ES clusters - if your ES cluster is serving production queries (and in particular using aggregations), you'll run into performance issues on your production ES cluster. ES-hadoop uses ES scan scroll to pull data pretty efficiently, so pulling it across the network is not too bad. If you do need to avoid that, pull the data and write what you need to HDFS as say parquet files (eg pull data daily and write it, then you have all data available on your Spark cluster). And of course ensure thatbwhen you do pull data from ES to Spark, you cache it to avoid hitting the network again — Sent from Mailbox On Tue, Aug 25, 2015 at 12:01 PM, Akhil Das ak...@sigmoidanalytics.com wrote: If the data is local to the machine then obviously it will be faster compared to pulling it through the network and storing it locally (either memory or disk etc). Have a look at the data locality http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/performance_optimization/data_locality.html . Thanks Best Regards On Tue, Aug 18, 2015 at 8:09 PM, gen tang gen.tan...@gmail.com wrote: Hi, Currently, I have my data in the cluster of Elasticsearch and I try to use spark to analyse those data. The cluster of Elasticsearch and the cluster of spark are two different clusters. And I use hadoop input format(es-hadoop) to read data in ES. I am wondering how this environment affect the speed of analysis. If I understand well, spark will read data from ES cluster and do calculate on its own cluster(include writing shuffle result on its own machine), Is this right? If this is correct, I think that the performance will just a little bit slower than the data stored on the same cluster. I will be appreciated if someone can share his/her experience about using spark with elasticsearch. Thanks a lot in advance for your help. Cheers Gen
Re: Spark ANN
Haven't checked the actual code but that doc says "MLPC employes backpropagation for learning the model. .."? — Sent from Mailbox On Mon, Sep 7, 2015 at 8:18 PM, Ruslan Dautkhanovwrote: > http://people.apache.org/~pwendell/spark-releases/latest/ml-ann.html > Implementation seems missing backpropagation? > Was there is a good reason to omit BP? > What are the drawbacks of a pure feedforward-only ANN? > Thanks! > -- > Ruslan Dautkhanov
Re: What is the best way to migrate existing scikit-learn code to PySpark?
You might want to check out https://github.com/lensacom/sparkit-learn Though it's true for random Forests / trees you will need to use MLlib — Sent from Mailbox On Sat, Sep 12, 2015 at 9:00 PM, Jörn Frankewrote: > I fear you have to do the plumbing all yourself. This is the same for all > commercial and non-commercial libraries/analytics packages. It often also > depends on the functional requirements on how you distribute. > Le sam. 12 sept. 2015 à 20:18, Rex X a écrit : >> Hi everyone, >> >> What is the best way to migrate existing scikit-learn code to PySpark >> cluster? Then we can bring together the full power of both scikit-learn and >> spark, to do scalable machine learning. (I know we have MLlib. But the >> existing code base is big, and some functions are not fully supported yet.) >> >> Currently I use multiprocessing module of Python to boost the speed. But >> this only works for one node, while the data set is small. >> >> For many real cases, we may need to deal with gigabytes or even terabytes >> of data, with thousands of raw categorical attributes, which can lead to >> millions of discrete features, using 1-of-k representation. >> >> For these cases, one solution is to use distributed memory. That's why I >> am considering spark. And spark support Python! >> With Pyspark, we can import scikit-learn. >> >> But the question is how to make the scikit-learn code, decisionTree >> classifier for example, running in distributed computing mode, to benefit >> the power of Spark? >> >> >> Best, >> Rex >>
Re: What is the best way to migrate existing scikit-learn code to PySpark?
I should point out that I'm not sure what the performance of that project is. I'd expect that native data frame in PySpark will be significantly more efficient than their DictRDD. It would be interesting to see a performance comparison for the pipelines relative to native Spark ML pipelines, if you do test both out. — Sent from Mailbox On Sat, Sep 12, 2015 at 10:52 PM, Rex X <dnsr...@gmail.com> wrote: > Jorn and Nick, > Thanks for answering. > Nick, the sparkit-learn project looks interesting. Thanks for mentioning it. > Rex > On Sat, Sep 12, 2015 at 12:05 PM, Nick Pentreath <nick.pentre...@gmail.com> > wrote: >> You might want to check out https://github.com/lensacom/sparkit-learn >> <https://github.com/lensacom/sparkit-learn/blob/master/README.rst> >> >> Though it's true for random >> Forests / trees you will need to use MLlib >> >> — >> Sent from Mailbox <https://www.dropbox.com/mailbox> >> >> >> On Sat, Sep 12, 2015 at 9:00 PM, Jörn Franke <jornfra...@gmail.com> wrote: >> >>> I fear you have to do the plumbing all yourself. This is the same for all >>> commercial and non-commercial libraries/analytics packages. It often also >>> depends on the functional requirements on how you distribute. >>> >>> Le sam. 12 sept. 2015 à 20:18, Rex X <dnsr...@gmail.com> a écrit : >>> >>>> Hi everyone, >>>> >>>> What is the best way to migrate existing scikit-learn code to PySpark >>>> cluster? Then we can bring together the full power of both scikit-learn and >>>> spark, to do scalable machine learning. (I know we have MLlib. But the >>>> existing code base is big, and some functions are not fully supported yet.) >>>> >>>> Currently I use multiprocessing module of Python to boost the speed. But >>>> this only works for one node, while the data set is small. >>>> >>>> For many real cases, we may need to deal with gigabytes or even >>>> terabytes of data, with thousands of raw categorical attributes, which can >>>> lead to millions of discrete features, using 1-of-k representation. >>>> >>>> For these cases, one solution is to use distributed memory. That's why I >>>> am considering spark. And spark support Python! >>>> With Pyspark, we can import scikit-learn. >>>> >>>> But the question is how to make the scikit-learn code, decisionTree >>>> classifier for example, running in distributed computing mode, to benefit >>>> the power of Spark? >>>> >>>> >>>> Best, >>>> Rex >>>> >>> >>